Echtzeit vs Batch Preisüberwachung
Die meisten Preisüberwachungssysteme funktionieren im Batch-Modus: Überprüfen Sie alle Produkte jede Stunde (oder alle paar Stunden), speichern Sie die Ergebnisse und senden Sie Alarme auf Änderungen. Dies funktioniert für viele Anwendungsfälle, aber in schnelllebigen Märkten — Flash-Verkäufe, dynamische Preise, Marktplatz Wettbewerb — Batch-Monitoring verpasst kritische Preisänderungen, die zwischen Kontrollen passieren.
Die Echtzeit-Preisüberwachung reduziert den Nachweisstand von Stunden auf Minuten oder sogar Sekunden. Anstatt jedes Produkt auf einem festgelegten Zeitplan zu überprüfen, überwacht ein Echtzeit-System kontinuierlich hohe Prioritätsziele und reagiert auf Änderungen, wie sie geschehen. Dieser Leitfaden umfasst die Architektur, die Proxy-Infrastruktur und die notwendigen Implementierungsdetails, um ein Echtzeit-Überwachungssystem aufzubauen. Für grundlegende Preisüberwachungskonzepte siehe unser Leitfaden zu die Konkurrenzpreise automatisch überwachen.
| Aspekte | Batch Monitoring | Echtzeitüberwachung |
|---|---|---|
| Prüffrequenz | Alle 1-24 Stunden | Alle 1-5 Minuten für Prioritätsgegenstände |
| Detektive Verzögerung | Bis zu einem vollen Intervall | Unter 5 Minuten |
| Verwendung von Proxy | Konzentrierte Bursts | Steady, verteilter Strom |
| Infrastruktur | Einfache Cron Jobs | Eventgetrieben mit Worker Pools |
| Kosten | Tief | Höher (mehr Anfragen, mehr Proxies) |
| Das Beste für | Tagesberichte, Trendanalyse | Wiederholen, Flash-Verkauf Erkennung, wettbewerbsfähiges Bidding |
Architektur für Echtzeitüberwachung
Ein Echtzeit-Preisüberwachungssystem verfügt über fünf Kernkomponenten, die als kontinuierliche Pipeline zusammenarbeiten.
1. Prioritätsliste
Produkte werden Prioritätsstufen zugeordnet, die die Prüffrequenz bestimmen. Eine Prioritätswarteschlange (Redis Sorted Sets funktionieren gut) sorgt dafür, dass hochwertige Produkte immer zuerst überprüft werden.
import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
"""Add a product to the monitoring queue."""
next_check = time.time() # Check immediately on first add
r.zadd("price_queue", {json.dumps({
"product_id": product_id,
"url": url,
"interval": priority_minutes * 60,
}): next_check})
def get_next_batch(batch_size=10):
"""Get the next batch of products due for checking."""
now = time.time()
items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
products = []
for item in items:
data = json.loads(item)
r.zadd("price_queue", {item: now + data["interval"]})
products.append(data)
return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)2. Worker Pool
Mehrere Worker-Prozesse ziehen von der Prioritätswarte, holen Preise durch Proxies, und drücken Sie Ergebnisse auf die Datenpipeline. Arbeiter arbeiten unabhängig, jede mit einer eigenen Proxyverbindung.
import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
"""Fetch the current price for a product."""
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
proxies = {"http": PROXY_URL, "https": PROXY_URL}
try:
response = requests.get(
product["url"], headers=headers,
proxies=proxies, timeout=30
)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
price_el = soup.select_one("span.a-price-whole")
if price_el:
price = float(price_el.get_text(strip=True).replace(",", ""))
return {
"product_id": product["product_id"],
"price": price,
"timestamp": time.time(),
"status": "success",
}
except Exception as e:
pass
return {
"product_id": product["product_id"],
"price": None,
"timestamp": time.time(),
"status": "failed",
}
def run_workers(num_workers=10):
"""Run the monitoring worker pool."""
with ThreadPoolExecutor(max_workers=num_workers) as executor:
while True:
batch = get_next_batch(batch_size=num_workers)
if not batch:
time.sleep(1)
continue
futures = [executor.submit(fetch_price, product) for product in batch]
for future in futures:
result = future.result()
process_result(result)
time.sleep(random.uniform(0.5, 2))3. Detection Engine ändern
Anstatt jede Preisüberprüfung zu speichern, vergleicht die Änderungserkennungsmaschine aktuelle Preise gegen die letzten bekannten Werte und löst nur Ereignisse auf tatsächlichen Veränderungen aus.
class ChangeDetector:
def __init__(self, redis_client):
self.redis = redis_client
def check_change(self, product_id, new_price):
"""Compare new price against last known and detect changes."""
key = f"last_price:{product_id}"
last_data = self.redis.get(key)
if last_data:
last = json.loads(last_data)
old_price = last["price"]
if old_price and new_price and old_price != new_price:
change_pct = ((new_price - old_price) / old_price) * 100
event = {
"product_id": product_id,
"old_price": old_price,
"new_price": new_price,
"change_pct": round(change_pct, 2),
"timestamp": time.time(),
}
# Publish change event
self.redis.publish("price_changes", json.dumps(event))
return event
# Update last known price
self.redis.set(key, json.dumps({
"price": new_price,
"timestamp": time.time(),
}))
return None4. Event Stream
Preisänderungen werden in einem Redis Pub/Sub Kanal (oder Kafka Thema für größere Systeme) veröffentlicht. Stromabnehmer — Warndienste, Repricing Motoren, Dashboards — abonnieren diese Ereignisse und reagieren unabhängig voneinander.
import redis
import json
def subscribe_to_changes():
"""Subscribe to price change events."""
r = redis.Redis(host="localhost", port=6379)
pubsub = r.pubsub()
pubsub.subscribe("price_changes")
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
handle_price_change(event)
def handle_price_change(event):
"""Process a price change event."""
change = event["change_pct"]
product = event["product_id"]
if change < -10:
send_urgent_alert(event) # Major price drop
elif change < -5:
send_alert(event) # Moderate drop
elif change > 10:
send_alert(event) # Significant increase
# Always log to time-series database
store_price_change(event)5. Dashboard und Alarme
Echtzeitdaten benötigen Echtzeit-Visualisierung. Verwenden Sie WebSocket-Verbindungen, um Preis-Updates auf Dashboards sofort zu verschieben.
Node.js Implementierung
Eine Node.js-Version der Echtzeit-Überwachungsmaschine mit ProxyHat's Node SDK.
const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
constructor(concurrency = 10) {
this.concurrency = concurrency;
this.running = false;
this.agent = new HttpsProxyAgent(PROXY_URL);
}
async fetchPrice(product) {
try {
const { data } = await axios.get(product.url, {
httpsAgent: new HttpsProxyAgent(PROXY_URL),
headers: {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
},
timeout: 30000,
});
const $ = cheerio.load(data);
const priceText = $("span.a-price-whole").first().text().trim();
const price = parseFloat(priceText.replace(/,/g, "")) || null;
return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
} catch (err) {
return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
}
}
async checkChange(productId, newPrice) {
const key = `last_price:${productId}`;
const lastData = await redis.get(key);
if (lastData) {
const last = JSON.parse(lastData);
if (last.price && newPrice && last.price !== newPrice) {
const changePct = ((newPrice - last.price) / last.price) * 100;
const event = {
productId,
oldPrice: last.price,
newPrice,
changePct: Math.round(changePct * 100) / 100,
timestamp: Date.now(),
};
await redis.publish("price_changes", JSON.stringify(event));
return event;
}
}
await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
return null;
}
async processProduct(product) {
const result = await this.fetchPrice(product);
if (result.price) {
const change = await this.checkChange(result.productId, result.price);
if (change) {
console.log(
`Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
);
}
}
// Random delay
await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
}
async start() {
this.running = true;
console.log(`Starting monitor with ${this.concurrency} workers`);
while (this.running) {
const batch = await this.getNextBatch(this.concurrency);
if (batch.length === 0) {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
await Promise.all(batch.map((p) => this.processProduct(p)));
}
}
async getNextBatch(size) {
const now = Date.now() / 1000;
const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
const products = [];
for (const item of items) {
const data = JSON.parse(item);
await redis.zadd("price_queue", now + data.interval, item);
products.push(data);
}
return products;
}
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();Proxy Management für kontinuierliche Überwachung
Echtzeit-Überwachung stellt einzigartige Anforderungen an Ihre Proxy-Infrastruktur im Vergleich zu Batch-Schrott.
Steady-State Anfrage Muster
Im Gegensatz zum Batch-Schrott, der Bursts von Anfragen sendet, erzeugt Echtzeit-Überwachung einen konstanten Stream. Dies ist tatsächlich besser für die Proxy-Gesundheit — ein stetiger Fluss von 5-10 Anfragen pro Sekunde sieht natürlicher als 1.000 Anfragen in einem 2-minütigen Burst.
ProxyHat Konfiguration für Echtzeit
# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080IP Gesundheitsüberwachung
Verfolgen Sie Erfolgsquoten pro Zielseite und passen Sie Ihren Ansatz dynamisch an. Wenn die Erfolgsquoten auf einem bestimmten Marktplatz sinken, Verzögerungen erhöhen oder auf einen anderen geo-targeted Pool wechseln. Der große Wohnpool von ProxyHat sorgt dafür, dass Sie immer frische IPs zur Verfügung haben. Überprüfen Sie unsere Proxy-Standorte für volle Deckung.
Schlüsselübernahme: Echtzeitüberwachung erfordert eine stetige, nachhaltige Proxystrategie. Das Ziel ist konsequente Low-Volume-Anfragen in vielen IPs, nicht hochvolumige Bursts aus wenigen IPs.
Datenspeicher für Echtzeitdaten
Echtzeit-Preisdaten benötigen eine für Hochfrequenzeinsätze und Zeitbereichsabfragen optimierte Speicherlösung.
TimescaleDB Schema
-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
time TIMESTAMPTZ NOT NULL,
product_id TEXT NOT NULL,
price DECIMAL(10,2),
currency VARCHAR(3) DEFAULT 'USD',
source_url TEXT,
status VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
product_id,
AVG(price) AS avg_price,
MIN(price) AS min_price,
MAX(price) AS max_price,
COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');Skalierung von Erwägungen
- Horizontale Arbeiterskalierung: Fügen Sie Arbeiter über mehrere Maschinen, jede ziehen von der gleichen Redis Warteschlange. Keine Koordinierung erforderlich — die Warteschlangenverteilung.
- Vorrangige Drosselung: Wenn das Proxybudget begrenzt ist, verringern Sie automatisch die Kontrollfrequenz für Produkte mit niedriger Priorität, während die Echtzeitabdeckung für kritische Elemente beibehalten wird.
- Anpassungsintervalle: Wenn der Preis eines Produkts für 24 Stunden stabil war, erhöhen Sie das Kontrollintervall. Wenn es sich zweimal in einer Stunde ändert, verringern Sie es.
- Site-spezifische Koncurrenz: Verschiedene Zielorte haben unterschiedliche Toleranzen. Führen Sie mehr gleichzeitige Arbeiter für Shopify (mehr überzeugend) und weniger für Amazon ( aggressivere Erkennung).
Erfahren Sie mehr über Proxystrategien, die die Hochfrequenzüberwachung unterstützen. Best-Proxis für Web-Schrott und Preise von ProxyHat für den hochvolumigen Einsatz.
Schlüsselanhänger
- Echtzeit-Überwachung reduziert die Preisänderungserkennung von Stunden zu Minuten, kritisch für die Nachbesserung und wettbewerbsfähige Reaktion.
- Verwenden Sie eine Prioritätswarte, um Ressourcen auf hochwertige Produkte zu fokussieren, während Sie noch den langen Schwanz abdecken.
- Ein Worker Pool mit gleichzeitigen Proxyverbindungen bietet Durchsatz ohne Burstmuster.
- Änderung der Erkennungsmotoren Filtergeräusche — nur Prozess und Alarm auf tatsächliche Preisänderungen.
- Speichern Sie Rohdaten in einer Zeitreihen-Datenbank (TimescaleDB) mit Retentionsrichtlinien für das Kostenmanagement.
- Für die kontinuierliche Überwachung sind gebietsbezogene Proxie mit stationärer Rotation unerlässlich. Beginnen Sie mit ProxyHat für einen zuverlässigen Zugang.
Gebäude Echtzeit-Infrastruktur? Lesen Sie unsere E-Commerce-Schrottführer für die volle Strategie und überprüfen Sie unsere Anleitungen auf mit Proxies in Python und Node.js für Implementierungsdetails.






