بناء بنية تحتية لمراقبة الأسعار في الوقت الحقيقي

تصميم وبناء نظام لرصد الأسعار في الوقت الحقيقي مع تحديد الأولويات، ومجمعات العمال، وكشف التغيير، والتناوب في أماكن العمل. دليل تنفيذ (بايتون) و(نودج).

بناء بنية تحتية لمراقبة الأسعار في الوقت الحقيقي

Real-Time vs Batch Price Monitoring

وتعمل معظم نظم رصد الأسعار في شكل دفعات: التحقق من جميع المنتجات كل ساعة (أو كل بضع ساعات)، وتخزين النتائج، وإرسال إنذارات بشأن التغييرات. وهذا يعمل في كثير من حالات الاستخدام، ولكن في الأسواق السريعة الحركة - المبيعات الوميضية، والتسعير الدينامي، والمنافسة في السوق - يفتقد رصد الدفعات التغيرات الحرجة في الأسعار التي تحدث بين الشيكات.

ويقلل رصد الأسعار في الوقت الحقيقي من خط الكشف من ساعات إلى دقائق أو حتى ثواني. وبدلاً من التحقق من كل منتج في جدول زمني ثابت، يرصد نظام في الوقت الحقيقي باستمرار الأهداف ذات الأولوية العالية ويستجيب للتغييرات التي تحدث. ويشمل هذا الدليل الهيكل والهياكل الأساسية البديلة وتفاصيل التنفيذ اللازمة لبناء نظام رصد آني. لمفاهيم رصد الأسعار الأساسية، انظر دليلنا رصد أسعار المنافسين آليا.

Real-Time vs Batch Price Monitoring
Aspectمراقبة الصيدرصد الوقت الحقيقي
الترددكل ساعةكل 1-5 دقيقة للبنود ذات الأولوية
رمزحتى فترة واحدة كاملةأقل من 5 دقائق
استخدام الوكيلانفجارات مركزةثابت، موزع
الهياكل الأساسيةالوظائف البسيطةالحدث المدفوع بمجمعات العمال
التكلفةمنخفضأعلى (طلبات إضافية، المزيد من العملاء)
الأفضلالتقارير اليومية، تحليل الاتجاهاتالتعبئة، والكشف عن المبيعات، والعطاءات التنافسية

هيكل لرصد الزمن الحقيقي

ويوجد في نظام رصد الأسعار في الوقت الحقيقي خمسة عناصر أساسية تعمل معا كخط أنابيب مستمر.

1- الأولوية

وتُمنح المنتجات ذات الأولوية التي تحدد تواتر التحقق. A priority queue (Redis Sorted Sets work well) ensures high-value products are always check first.

import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
    """Add a product to the monitoring queue."""
    next_check = time.time()  # Check immediately on first add
    r.zadd("price_queue", {json.dumps({
        "product_id": product_id,
        "url": url,
        "interval": priority_minutes * 60,
    }): next_check})
def get_next_batch(batch_size=10):
    """Get the next batch of products due for checking."""
    now = time.time()
    items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
    products = []
    for item in items:
        data = json.loads(item)
        r.zadd("price_queue", {item: now + data["interval"]})
        products.append(data)
    return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)

2. Worker Pool

وتنسحب عمليات العمال المتعددة من التساؤلات ذات الأولوية، وتجلب الأسعار من خلال الشركات، وتدفع النتائج إلى خط البيانات. ويعمل العمال بصورة مستقلة، وكل واحد منهم له صلة مباشرة.

import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
    """Fetch the current price for a product."""
    headers = {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
    }
    proxies = {"http": PROXY_URL, "https": PROXY_URL}
    try:
        response = requests.get(
            product["url"], headers=headers,
            proxies=proxies, timeout=30
        )
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, "html.parser")
            price_el = soup.select_one("span.a-price-whole")
            if price_el:
                price = float(price_el.get_text(strip=True).replace(",", ""))
                return {
                    "product_id": product["product_id"],
                    "price": price,
                    "timestamp": time.time(),
                    "status": "success",
                }
    except Exception as e:
        pass
    return {
        "product_id": product["product_id"],
        "price": None,
        "timestamp": time.time(),
        "status": "failed",
    }
def run_workers(num_workers=10):
    """Run the monitoring worker pool."""
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        while True:
            batch = get_next_batch(batch_size=num_workers)
            if not batch:
                time.sleep(1)
                continue
            futures = [executor.submit(fetch_price, product) for product in batch]
            for future in futures:
                result = future.result()
                process_result(result)
            time.sleep(random.uniform(0.5, 2))

3 - مهندس كشف البيانات

وبدلا من تخزين كل شيك للأسعار، يقارن محرك الكشف عن التغيير الأسعار الحالية بالقيم المعروفة الأخيرة، ولا يؤدي إلا إلى حدوث تغييرات فعلية.

class ChangeDetector:
    def __init__(self, redis_client):
        self.redis = redis_client
    def check_change(self, product_id, new_price):
        """Compare new price against last known and detect changes."""
        key = f"last_price:{product_id}"
        last_data = self.redis.get(key)
        if last_data:
            last = json.loads(last_data)
            old_price = last["price"]
            if old_price and new_price and old_price != new_price:
                change_pct = ((new_price - old_price) / old_price) * 100
                event = {
                    "product_id": product_id,
                    "old_price": old_price,
                    "new_price": new_price,
                    "change_pct": round(change_pct, 2),
                    "timestamp": time.time(),
                }
                # Publish change event
                self.redis.publish("price_changes", json.dumps(event))
                return event
        # Update last known price
        self.redis.set(key, json.dumps({
            "price": new_price,
            "timestamp": time.time(),
        }))
        return None

4 - الأحداث

وتُنشر تغييرات الأسعار في قناة ريديس بوب/الخط الفرعي (أو موضوع كافكا للنظم الأكبر حجما). ويشترك في هذه الأحداث المستهلكون في المناطق السفلية - خدمات الإنذار، ومحركات إعادة ترتيب المحركات، ولوحات المتابعة - ويتصرفون بصورة مستقلة.

import redis
import json
def subscribe_to_changes():
    """Subscribe to price change events."""
    r = redis.Redis(host="localhost", port=6379)
    pubsub = r.pubsub()
    pubsub.subscribe("price_changes")
    for message in pubsub.listen():
        if message["type"] == "message":
            event = json.loads(message["data"])
            handle_price_change(event)
def handle_price_change(event):
    """Process a price change event."""
    change = event["change_pct"]
    product = event["product_id"]
    if change < -10:
        send_urgent_alert(event)  # Major price drop
    elif change < -5:
        send_alert(event)         # Moderate drop
    elif change > 10:
        send_alert(event)         # Significant increase
    # Always log to time-series database
    store_price_change(event)

5 - لوح وتنبيه

وتحتاج البيانات في الوقت الحقيقي إلى تصور في الوقت الحقيقي. استعملي وصلات (ويب سوكيت) لضغط تحديثات الأسعار على اللوحات حالاً

Node.js Implementation

A Node.js version of the real-time monitoring motor using العميل (س.د.ك).

const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
  constructor(concurrency = 10) {
    this.concurrency = concurrency;
    this.running = false;
    this.agent = new HttpsProxyAgent(PROXY_URL);
  }
  async fetchPrice(product) {
    try {
      const { data } = await axios.get(product.url, {
        httpsAgent: new HttpsProxyAgent(PROXY_URL),
        headers: {
          "User-Agent":
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
          "Accept-Language": "en-US,en;q=0.9",
        },
        timeout: 30000,
      });
      const $ = cheerio.load(data);
      const priceText = $("span.a-price-whole").first().text().trim();
      const price = parseFloat(priceText.replace(/,/g, "")) || null;
      return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
    } catch (err) {
      return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
    }
  }
  async checkChange(productId, newPrice) {
    const key = `last_price:${productId}`;
    const lastData = await redis.get(key);
    if (lastData) {
      const last = JSON.parse(lastData);
      if (last.price && newPrice && last.price !== newPrice) {
        const changePct = ((newPrice - last.price) / last.price) * 100;
        const event = {
          productId,
          oldPrice: last.price,
          newPrice,
          changePct: Math.round(changePct * 100) / 100,
          timestamp: Date.now(),
        };
        await redis.publish("price_changes", JSON.stringify(event));
        return event;
      }
    }
    await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
    return null;
  }
  async processProduct(product) {
    const result = await this.fetchPrice(product);
    if (result.price) {
      const change = await this.checkChange(result.productId, result.price);
      if (change) {
        console.log(
          `Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
        );
      }
    }
    // Random delay
    await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
  }
  async start() {
    this.running = true;
    console.log(`Starting monitor with ${this.concurrency} workers`);
    while (this.running) {
      const batch = await this.getNextBatch(this.concurrency);
      if (batch.length === 0) {
        await new Promise((r) => setTimeout(r, 1000));
        continue;
      }
      await Promise.all(batch.map((p) => this.processProduct(p)));
    }
  }
  async getNextBatch(size) {
    const now = Date.now() / 1000;
    const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
    const products = [];
    for (const item of items) {
      const data = JSON.parse(item);
      await redis.zadd("price_queue", now + data.interval, item);
      products.push(data);
    }
    return products;
  }
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();

Proxy Management for Continuous Monitoring

الرصد في الوقت الحقيقي يضع مطالب فريدة على البنية التحتية العميلة الخاصة بك مقارنة بخردة الصيد

طلب ثابت من الدولة

على عكس الخردة التي ترسل طلقات من الطلبات، الرصد في الوقت الحقيقي يخلق تدفقا ثابتا. وهذا في الواقع أفضل بالنسبة للصحة البديلة - فالتدفق المط َّرد من ٥ إلى ١٠ طلبات في الثانية يبدو أكثر طبيعية من ٠٠٠ ١ طلب في فجر دقيقتين.

ProxyHat Configuration for Real-Time

# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080

IP Health Monitoring

تتبع معدلات النجاح لكل موقع مستهدف وتعديل نهجك ديناميكياً وإذا انخفضت معدلات النجاح في سوق معينة، تزيد من التأخيرات أو التحول إلى مجمع جغرافي مستهدف مختلف. مسبح (بروكس هات) السكني الكبير يُضمن أنّك دائماً لديك شركاء جدد تحقق من المواقع العميلة للتغطية الكاملة

الوجبات الرئيسية: يتطلب الرصد في الوقت الحقيقي استراتيجية دائمة ومستدامة. The goal is consistent low-volume requests across many IPs, not high-volume blasts from few IPs.

تخزين البيانات المتعلقة بالوقت الحقيقي

وتحتاج بيانات الأسعار في الوقت الحقيقي إلى حل للتخزين على النحو الأمثل من أجل إدخالات عالية التردد واستفسارات في المدى الزمني.

الجدول الزمني

-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
    time        TIMESTAMPTZ NOT NULL,
    product_id  TEXT NOT NULL,
    price       DECIMAL(10,2),
    currency    VARCHAR(3) DEFAULT 'USD',
    source_url  TEXT,
    status      VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS hour,
    product_id,
    AVG(price) AS avg_price,
    MIN(price) AS min_price,
    MAX(price) AS max_price,
    COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');

اعتبارات التصعيد

  • رفع مستوى العمال الأفقيين: أضف العمال عبر آلات متعددة، كلّ من يسحب من نفس قضيّة (ريديس). ولا حاجة إلى التنسيق - فالسؤال يتناول التوزيع.
  • الخنق على أساس الأولوية: وعندما تكون الميزانية البديلة محدودة، تخفض تلقائيا تواتر التحقق بالنسبة للمنتجات ذات الأولوية المنخفضة مع الحفاظ على التغطية في الوقت الحقيقي للبنود الحرجة.
  • فترات التكيف: إذا كان سعر المنتج مستقرا لمدة 24 ساعة، زيادة فترة الشيك. إذا تغيرت مرتين خلال ساعة، قلها
  • الاتّفاق على الموقع: مختلف المواقع المستهدفة لديها تسامحات مختلفة تشغيل المزيد من العمال المتزامنين من أجل التمحيص (أكثر جسامة) وأقل من ذلك في حالة الأمازون (الكشف الأكثر عدوانية).

من أجل المزيد من الاستراتيجيات البديلة التي تدعم الرصد العالي التردد، نستكشف دليلنا بشأن أفضل محترفين لخردة الإنترنت و خطط تسعير (بروكسي ها) لاستعمالات عالية الحجم

المداخل الرئيسية

  • ويقلل الرصد في الوقت الحقيقي من الكشف عن تغير الأسعار من ساعات إلى دقائق، وهو أمر حاسم بالنسبة للتكرار والاستجابة التنافسية.
  • Use a priority queue to focus resources on high-value products while still covering the long tail.
  • وهناك تجمع عمالي له صلات متزامنة مع العملاء يقدم منافذ دون أنماط انفجار.
  • تغيير ضوضاء أجهزة الكشف - فقط عملية وتنبيه بشأن التغيرات الفعلية في الأسعار.
  • تخزين البيانات الأولية في قاعدة بيانات للسلاسل الزمنية (TimescaleDB) مع سياسات استبقاء التكاليف.
  • ومن الأمور الأساسية للرصد المستمر وجود شركات تجارية مقيمة ذات تناوب ثابت في الولايات. ابدأ ProxyHat من أجل الوصول الموثوق

بناء الهياكل الأساسية في الوقت الحقيقي؟ اقرأ لنا دليل خردة التجارة الإلكترونية من أجل الإستراتيجية الكاملة وتفقد دليلنا (بإستعمال المحترفين في (بيثون و Node.js لتفاصيل التنفيذ.

¿Listo para empezar?

Accede a más de 50M de IPs residenciales en más de 148 países con filtrado impulsado por IA.

Ver preciosProxies residenciales
← Volver al Blog