Real-Time vs Batch Price Monitoring
وتعمل معظم نظم رصد الأسعار في شكل دفعات: التحقق من جميع المنتجات كل ساعة (أو كل بضع ساعات)، وتخزين النتائج، وإرسال إنذارات بشأن التغييرات. وهذا يعمل في كثير من حالات الاستخدام، ولكن في الأسواق السريعة الحركة - المبيعات الوميضية، والتسعير الدينامي، والمنافسة في السوق - يفتقد رصد الدفعات التغيرات الحرجة في الأسعار التي تحدث بين الشيكات.
ويقلل رصد الأسعار في الوقت الحقيقي من خط الكشف من ساعات إلى دقائق أو حتى ثواني. وبدلاً من التحقق من كل منتج في جدول زمني ثابت، يرصد نظام في الوقت الحقيقي باستمرار الأهداف ذات الأولوية العالية ويستجيب للتغييرات التي تحدث. ويشمل هذا الدليل الهيكل والهياكل الأساسية البديلة وتفاصيل التنفيذ اللازمة لبناء نظام رصد آني. لمفاهيم رصد الأسعار الأساسية، انظر دليلنا رصد أسعار المنافسين آليا.
| Aspect | مراقبة الصيد | رصد الوقت الحقيقي |
|---|---|---|
| التردد | كل ساعة | كل 1-5 دقيقة للبنود ذات الأولوية |
| رمز | حتى فترة واحدة كاملة | أقل من 5 دقائق |
| استخدام الوكيل | انفجارات مركزة | ثابت، موزع |
| الهياكل الأساسية | الوظائف البسيطة | الحدث المدفوع بمجمعات العمال |
| التكلفة | منخفض | أعلى (طلبات إضافية، المزيد من العملاء) |
| الأفضل | التقارير اليومية، تحليل الاتجاهات | التعبئة، والكشف عن المبيعات، والعطاءات التنافسية |
هيكل لرصد الزمن الحقيقي
ويوجد في نظام رصد الأسعار في الوقت الحقيقي خمسة عناصر أساسية تعمل معا كخط أنابيب مستمر.
1- الأولوية
وتُمنح المنتجات ذات الأولوية التي تحدد تواتر التحقق. A priority queue (Redis Sorted Sets work well) ensures high-value products are always check first.
import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
"""Add a product to the monitoring queue."""
next_check = time.time() # Check immediately on first add
r.zadd("price_queue", {json.dumps({
"product_id": product_id,
"url": url,
"interval": priority_minutes * 60,
}): next_check})
def get_next_batch(batch_size=10):
"""Get the next batch of products due for checking."""
now = time.time()
items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
products = []
for item in items:
data = json.loads(item)
r.zadd("price_queue", {item: now + data["interval"]})
products.append(data)
return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)2. Worker Pool
وتنسحب عمليات العمال المتعددة من التساؤلات ذات الأولوية، وتجلب الأسعار من خلال الشركات، وتدفع النتائج إلى خط البيانات. ويعمل العمال بصورة مستقلة، وكل واحد منهم له صلة مباشرة.
import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
"""Fetch the current price for a product."""
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
proxies = {"http": PROXY_URL, "https": PROXY_URL}
try:
response = requests.get(
product["url"], headers=headers,
proxies=proxies, timeout=30
)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
price_el = soup.select_one("span.a-price-whole")
if price_el:
price = float(price_el.get_text(strip=True).replace(",", ""))
return {
"product_id": product["product_id"],
"price": price,
"timestamp": time.time(),
"status": "success",
}
except Exception as e:
pass
return {
"product_id": product["product_id"],
"price": None,
"timestamp": time.time(),
"status": "failed",
}
def run_workers(num_workers=10):
"""Run the monitoring worker pool."""
with ThreadPoolExecutor(max_workers=num_workers) as executor:
while True:
batch = get_next_batch(batch_size=num_workers)
if not batch:
time.sleep(1)
continue
futures = [executor.submit(fetch_price, product) for product in batch]
for future in futures:
result = future.result()
process_result(result)
time.sleep(random.uniform(0.5, 2))3 - مهندس كشف البيانات
وبدلا من تخزين كل شيك للأسعار، يقارن محرك الكشف عن التغيير الأسعار الحالية بالقيم المعروفة الأخيرة، ولا يؤدي إلا إلى حدوث تغييرات فعلية.
class ChangeDetector:
def __init__(self, redis_client):
self.redis = redis_client
def check_change(self, product_id, new_price):
"""Compare new price against last known and detect changes."""
key = f"last_price:{product_id}"
last_data = self.redis.get(key)
if last_data:
last = json.loads(last_data)
old_price = last["price"]
if old_price and new_price and old_price != new_price:
change_pct = ((new_price - old_price) / old_price) * 100
event = {
"product_id": product_id,
"old_price": old_price,
"new_price": new_price,
"change_pct": round(change_pct, 2),
"timestamp": time.time(),
}
# Publish change event
self.redis.publish("price_changes", json.dumps(event))
return event
# Update last known price
self.redis.set(key, json.dumps({
"price": new_price,
"timestamp": time.time(),
}))
return None4 - الأحداث
وتُنشر تغييرات الأسعار في قناة ريديس بوب/الخط الفرعي (أو موضوع كافكا للنظم الأكبر حجما). ويشترك في هذه الأحداث المستهلكون في المناطق السفلية - خدمات الإنذار، ومحركات إعادة ترتيب المحركات، ولوحات المتابعة - ويتصرفون بصورة مستقلة.
import redis
import json
def subscribe_to_changes():
"""Subscribe to price change events."""
r = redis.Redis(host="localhost", port=6379)
pubsub = r.pubsub()
pubsub.subscribe("price_changes")
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
handle_price_change(event)
def handle_price_change(event):
"""Process a price change event."""
change = event["change_pct"]
product = event["product_id"]
if change < -10:
send_urgent_alert(event) # Major price drop
elif change < -5:
send_alert(event) # Moderate drop
elif change > 10:
send_alert(event) # Significant increase
# Always log to time-series database
store_price_change(event)5 - لوح وتنبيه
وتحتاج البيانات في الوقت الحقيقي إلى تصور في الوقت الحقيقي. استعملي وصلات (ويب سوكيت) لضغط تحديثات الأسعار على اللوحات حالاً
Node.js Implementation
A Node.js version of the real-time monitoring motor using العميل (س.د.ك).
const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
constructor(concurrency = 10) {
this.concurrency = concurrency;
this.running = false;
this.agent = new HttpsProxyAgent(PROXY_URL);
}
async fetchPrice(product) {
try {
const { data } = await axios.get(product.url, {
httpsAgent: new HttpsProxyAgent(PROXY_URL),
headers: {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
},
timeout: 30000,
});
const $ = cheerio.load(data);
const priceText = $("span.a-price-whole").first().text().trim();
const price = parseFloat(priceText.replace(/,/g, "")) || null;
return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
} catch (err) {
return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
}
}
async checkChange(productId, newPrice) {
const key = `last_price:${productId}`;
const lastData = await redis.get(key);
if (lastData) {
const last = JSON.parse(lastData);
if (last.price && newPrice && last.price !== newPrice) {
const changePct = ((newPrice - last.price) / last.price) * 100;
const event = {
productId,
oldPrice: last.price,
newPrice,
changePct: Math.round(changePct * 100) / 100,
timestamp: Date.now(),
};
await redis.publish("price_changes", JSON.stringify(event));
return event;
}
}
await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
return null;
}
async processProduct(product) {
const result = await this.fetchPrice(product);
if (result.price) {
const change = await this.checkChange(result.productId, result.price);
if (change) {
console.log(
`Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
);
}
}
// Random delay
await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
}
async start() {
this.running = true;
console.log(`Starting monitor with ${this.concurrency} workers`);
while (this.running) {
const batch = await this.getNextBatch(this.concurrency);
if (batch.length === 0) {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
await Promise.all(batch.map((p) => this.processProduct(p)));
}
}
async getNextBatch(size) {
const now = Date.now() / 1000;
const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
const products = [];
for (const item of items) {
const data = JSON.parse(item);
await redis.zadd("price_queue", now + data.interval, item);
products.push(data);
}
return products;
}
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();Proxy Management for Continuous Monitoring
الرصد في الوقت الحقيقي يضع مطالب فريدة على البنية التحتية العميلة الخاصة بك مقارنة بخردة الصيد
طلب ثابت من الدولة
على عكس الخردة التي ترسل طلقات من الطلبات، الرصد في الوقت الحقيقي يخلق تدفقا ثابتا. وهذا في الواقع أفضل بالنسبة للصحة البديلة - فالتدفق المط َّرد من ٥ إلى ١٠ طلبات في الثانية يبدو أكثر طبيعية من ٠٠٠ ١ طلب في فجر دقيقتين.
ProxyHat Configuration for Real-Time
# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080IP Health Monitoring
تتبع معدلات النجاح لكل موقع مستهدف وتعديل نهجك ديناميكياً وإذا انخفضت معدلات النجاح في سوق معينة، تزيد من التأخيرات أو التحول إلى مجمع جغرافي مستهدف مختلف. مسبح (بروكس هات) السكني الكبير يُضمن أنّك دائماً لديك شركاء جدد تحقق من المواقع العميلة للتغطية الكاملة
الوجبات الرئيسية: يتطلب الرصد في الوقت الحقيقي استراتيجية دائمة ومستدامة. The goal is consistent low-volume requests across many IPs, not high-volume blasts from few IPs.
تخزين البيانات المتعلقة بالوقت الحقيقي
وتحتاج بيانات الأسعار في الوقت الحقيقي إلى حل للتخزين على النحو الأمثل من أجل إدخالات عالية التردد واستفسارات في المدى الزمني.
الجدول الزمني
-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
time TIMESTAMPTZ NOT NULL,
product_id TEXT NOT NULL,
price DECIMAL(10,2),
currency VARCHAR(3) DEFAULT 'USD',
source_url TEXT,
status VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
product_id,
AVG(price) AS avg_price,
MIN(price) AS min_price,
MAX(price) AS max_price,
COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');اعتبارات التصعيد
- رفع مستوى العمال الأفقيين: أضف العمال عبر آلات متعددة، كلّ من يسحب من نفس قضيّة (ريديس). ولا حاجة إلى التنسيق - فالسؤال يتناول التوزيع.
- الخنق على أساس الأولوية: وعندما تكون الميزانية البديلة محدودة، تخفض تلقائيا تواتر التحقق بالنسبة للمنتجات ذات الأولوية المنخفضة مع الحفاظ على التغطية في الوقت الحقيقي للبنود الحرجة.
- فترات التكيف: إذا كان سعر المنتج مستقرا لمدة 24 ساعة، زيادة فترة الشيك. إذا تغيرت مرتين خلال ساعة، قلها
- الاتّفاق على الموقع: مختلف المواقع المستهدفة لديها تسامحات مختلفة تشغيل المزيد من العمال المتزامنين من أجل التمحيص (أكثر جسامة) وأقل من ذلك في حالة الأمازون (الكشف الأكثر عدوانية).
من أجل المزيد من الاستراتيجيات البديلة التي تدعم الرصد العالي التردد، نستكشف دليلنا بشأن أفضل محترفين لخردة الإنترنت و خطط تسعير (بروكسي ها) لاستعمالات عالية الحجم
المداخل الرئيسية
- ويقلل الرصد في الوقت الحقيقي من الكشف عن تغير الأسعار من ساعات إلى دقائق، وهو أمر حاسم بالنسبة للتكرار والاستجابة التنافسية.
- Use a priority queue to focus resources on high-value products while still covering the long tail.
- وهناك تجمع عمالي له صلات متزامنة مع العملاء يقدم منافذ دون أنماط انفجار.
- تغيير ضوضاء أجهزة الكشف - فقط عملية وتنبيه بشأن التغيرات الفعلية في الأسعار.
- تخزين البيانات الأولية في قاعدة بيانات للسلاسل الزمنية (TimescaleDB) مع سياسات استبقاء التكاليف.
- ومن الأمور الأساسية للرصد المستمر وجود شركات تجارية مقيمة ذات تناوب ثابت في الولايات. ابدأ ProxyHat من أجل الوصول الموثوق
بناء الهياكل الأساسية في الوقت الحقيقي؟ اقرأ لنا دليل خردة التجارة الإلكترونية من أجل الإستراتيجية الكاملة وتفقد دليلنا (بإستعمال المحترفين في (بيثون و Node.js لتفاصيل التنفيذ.






