Echtzeit-Preisüberwachungs-Infrastruktur aufbauen

Entwerfen und bauen Sie ein Echtzeit-Preis-Überwachungssystem mit Prioritäts-Warteschlangen, Worker Pools, Änderungserkennung und Wohn-Proxy-Rotation. Vollständige Python und Node.js Umsetzung Anleitung.

Echtzeit-Preisüberwachungs-Infrastruktur aufbauen

Echtzeit vs Batch Preisüberwachung

Die meisten Preisüberwachungssysteme funktionieren im Batch-Modus: Überprüfen Sie alle Produkte jede Stunde (oder alle paar Stunden), speichern Sie die Ergebnisse und senden Sie Alarme auf Änderungen. Dies funktioniert für viele Anwendungsfälle, aber in schnelllebigen Märkten — Flash-Verkäufe, dynamische Preise, Marktplatz Wettbewerb — Batch-Monitoring verpasst kritische Preisänderungen, die zwischen Kontrollen passieren.

Die Echtzeit-Preisüberwachung reduziert den Nachweisstand von Stunden auf Minuten oder sogar Sekunden. Anstatt jedes Produkt auf einem festgelegten Zeitplan zu überprüfen, überwacht ein Echtzeit-System kontinuierlich hohe Prioritätsziele und reagiert auf Änderungen, wie sie geschehen. Dieser Leitfaden umfasst die Architektur, die Proxy-Infrastruktur und die notwendigen Implementierungsdetails, um ein Echtzeit-Überwachungssystem aufzubauen. Für grundlegende Preisüberwachungskonzepte siehe unser Leitfaden zu die Konkurrenzpreise automatisch überwachen.

Echtzeit vs Batch Preisüberwachung
AspekteBatch MonitoringEchtzeitüberwachung
PrüffrequenzAlle 1-24 StundenAlle 1-5 Minuten für Prioritätsgegenstände
Detektive VerzögerungBis zu einem vollen IntervallUnter 5 Minuten
Verwendung von ProxyKonzentrierte BurstsSteady, verteilter Strom
InfrastrukturEinfache Cron JobsEventgetrieben mit Worker Pools
KostenTiefHöher (mehr Anfragen, mehr Proxies)
Das Beste fürTagesberichte, TrendanalyseWiederholen, Flash-Verkauf Erkennung, wettbewerbsfähiges Bidding

Architektur für Echtzeitüberwachung

Ein Echtzeit-Preisüberwachungssystem verfügt über fünf Kernkomponenten, die als kontinuierliche Pipeline zusammenarbeiten.

1. Prioritätsliste

Produkte werden Prioritätsstufen zugeordnet, die die Prüffrequenz bestimmen. Eine Prioritätswarteschlange (Redis Sorted Sets funktionieren gut) sorgt dafür, dass hochwertige Produkte immer zuerst überprüft werden.

import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
    """Add a product to the monitoring queue."""
    next_check = time.time()  # Check immediately on first add
    r.zadd("price_queue", {json.dumps({
        "product_id": product_id,
        "url": url,
        "interval": priority_minutes * 60,
    }): next_check})
def get_next_batch(batch_size=10):
    """Get the next batch of products due for checking."""
    now = time.time()
    items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
    products = []
    for item in items:
        data = json.loads(item)
        r.zadd("price_queue", {item: now + data["interval"]})
        products.append(data)
    return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)

2. Worker Pool

Mehrere Worker-Prozesse ziehen von der Prioritätswarte, holen Preise durch Proxies, und drücken Sie Ergebnisse auf die Datenpipeline. Arbeiter arbeiten unabhängig, jede mit einer eigenen Proxyverbindung.

import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
    """Fetch the current price for a product."""
    headers = {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
    }
    proxies = {"http": PROXY_URL, "https": PROXY_URL}
    try:
        response = requests.get(
            product["url"], headers=headers,
            proxies=proxies, timeout=30
        )
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, "html.parser")
            price_el = soup.select_one("span.a-price-whole")
            if price_el:
                price = float(price_el.get_text(strip=True).replace(",", ""))
                return {
                    "product_id": product["product_id"],
                    "price": price,
                    "timestamp": time.time(),
                    "status": "success",
                }
    except Exception as e:
        pass
    return {
        "product_id": product["product_id"],
        "price": None,
        "timestamp": time.time(),
        "status": "failed",
    }
def run_workers(num_workers=10):
    """Run the monitoring worker pool."""
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        while True:
            batch = get_next_batch(batch_size=num_workers)
            if not batch:
                time.sleep(1)
                continue
            futures = [executor.submit(fetch_price, product) for product in batch]
            for future in futures:
                result = future.result()
                process_result(result)
            time.sleep(random.uniform(0.5, 2))

3. Detection Engine ändern

Anstatt jede Preisüberprüfung zu speichern, vergleicht die Änderungserkennungsmaschine aktuelle Preise gegen die letzten bekannten Werte und löst nur Ereignisse auf tatsächlichen Veränderungen aus.

class ChangeDetector:
    def __init__(self, redis_client):
        self.redis = redis_client
    def check_change(self, product_id, new_price):
        """Compare new price against last known and detect changes."""
        key = f"last_price:{product_id}"
        last_data = self.redis.get(key)
        if last_data:
            last = json.loads(last_data)
            old_price = last["price"]
            if old_price and new_price and old_price != new_price:
                change_pct = ((new_price - old_price) / old_price) * 100
                event = {
                    "product_id": product_id,
                    "old_price": old_price,
                    "new_price": new_price,
                    "change_pct": round(change_pct, 2),
                    "timestamp": time.time(),
                }
                # Publish change event
                self.redis.publish("price_changes", json.dumps(event))
                return event
        # Update last known price
        self.redis.set(key, json.dumps({
            "price": new_price,
            "timestamp": time.time(),
        }))
        return None

4. Event Stream

Preisänderungen werden in einem Redis Pub/Sub Kanal (oder Kafka Thema für größere Systeme) veröffentlicht. Stromabnehmer — Warndienste, Repricing Motoren, Dashboards — abonnieren diese Ereignisse und reagieren unabhängig voneinander.

import redis
import json
def subscribe_to_changes():
    """Subscribe to price change events."""
    r = redis.Redis(host="localhost", port=6379)
    pubsub = r.pubsub()
    pubsub.subscribe("price_changes")
    for message in pubsub.listen():
        if message["type"] == "message":
            event = json.loads(message["data"])
            handle_price_change(event)
def handle_price_change(event):
    """Process a price change event."""
    change = event["change_pct"]
    product = event["product_id"]
    if change < -10:
        send_urgent_alert(event)  # Major price drop
    elif change < -5:
        send_alert(event)         # Moderate drop
    elif change > 10:
        send_alert(event)         # Significant increase
    # Always log to time-series database
    store_price_change(event)

5. Dashboard und Alarme

Echtzeitdaten benötigen Echtzeit-Visualisierung. Verwenden Sie WebSocket-Verbindungen, um Preis-Updates auf Dashboards sofort zu verschieben.

Node.js Implementierung

Eine Node.js-Version der Echtzeit-Überwachungsmaschine mit ProxyHat's Node SDK.

const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
  constructor(concurrency = 10) {
    this.concurrency = concurrency;
    this.running = false;
    this.agent = new HttpsProxyAgent(PROXY_URL);
  }
  async fetchPrice(product) {
    try {
      const { data } = await axios.get(product.url, {
        httpsAgent: new HttpsProxyAgent(PROXY_URL),
        headers: {
          "User-Agent":
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
          "Accept-Language": "en-US,en;q=0.9",
        },
        timeout: 30000,
      });
      const $ = cheerio.load(data);
      const priceText = $("span.a-price-whole").first().text().trim();
      const price = parseFloat(priceText.replace(/,/g, "")) || null;
      return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
    } catch (err) {
      return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
    }
  }
  async checkChange(productId, newPrice) {
    const key = `last_price:${productId}`;
    const lastData = await redis.get(key);
    if (lastData) {
      const last = JSON.parse(lastData);
      if (last.price && newPrice && last.price !== newPrice) {
        const changePct = ((newPrice - last.price) / last.price) * 100;
        const event = {
          productId,
          oldPrice: last.price,
          newPrice,
          changePct: Math.round(changePct * 100) / 100,
          timestamp: Date.now(),
        };
        await redis.publish("price_changes", JSON.stringify(event));
        return event;
      }
    }
    await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
    return null;
  }
  async processProduct(product) {
    const result = await this.fetchPrice(product);
    if (result.price) {
      const change = await this.checkChange(result.productId, result.price);
      if (change) {
        console.log(
          `Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
        );
      }
    }
    // Random delay
    await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
  }
  async start() {
    this.running = true;
    console.log(`Starting monitor with ${this.concurrency} workers`);
    while (this.running) {
      const batch = await this.getNextBatch(this.concurrency);
      if (batch.length === 0) {
        await new Promise((r) => setTimeout(r, 1000));
        continue;
      }
      await Promise.all(batch.map((p) => this.processProduct(p)));
    }
  }
  async getNextBatch(size) {
    const now = Date.now() / 1000;
    const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
    const products = [];
    for (const item of items) {
      const data = JSON.parse(item);
      await redis.zadd("price_queue", now + data.interval, item);
      products.push(data);
    }
    return products;
  }
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();

Proxy Management für kontinuierliche Überwachung

Echtzeit-Überwachung stellt einzigartige Anforderungen an Ihre Proxy-Infrastruktur im Vergleich zu Batch-Schrott.

Steady-State Anfrage Muster

Im Gegensatz zum Batch-Schrott, der Bursts von Anfragen sendet, erzeugt Echtzeit-Überwachung einen konstanten Stream. Dies ist tatsächlich besser für die Proxy-Gesundheit — ein stetiger Fluss von 5-10 Anfragen pro Sekunde sieht natürlicher als 1.000 Anfragen in einem 2-minütigen Burst.

ProxyHat Konfiguration für Echtzeit

# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080

IP Gesundheitsüberwachung

Verfolgen Sie Erfolgsquoten pro Zielseite und passen Sie Ihren Ansatz dynamisch an. Wenn die Erfolgsquoten auf einem bestimmten Marktplatz sinken, Verzögerungen erhöhen oder auf einen anderen geo-targeted Pool wechseln. Der große Wohnpool von ProxyHat sorgt dafür, dass Sie immer frische IPs zur Verfügung haben. Überprüfen Sie unsere Proxy-Standorte für volle Deckung.

Schlüsselübernahme: Echtzeitüberwachung erfordert eine stetige, nachhaltige Proxystrategie. Das Ziel ist konsequente Low-Volume-Anfragen in vielen IPs, nicht hochvolumige Bursts aus wenigen IPs.

Datenspeicher für Echtzeitdaten

Echtzeit-Preisdaten benötigen eine für Hochfrequenzeinsätze und Zeitbereichsabfragen optimierte Speicherlösung.

TimescaleDB Schema

-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
    time        TIMESTAMPTZ NOT NULL,
    product_id  TEXT NOT NULL,
    price       DECIMAL(10,2),
    currency    VARCHAR(3) DEFAULT 'USD',
    source_url  TEXT,
    status      VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS hour,
    product_id,
    AVG(price) AS avg_price,
    MIN(price) AS min_price,
    MAX(price) AS max_price,
    COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');

Skalierung von Erwägungen

  • Horizontale Arbeiterskalierung: Fügen Sie Arbeiter über mehrere Maschinen, jede ziehen von der gleichen Redis Warteschlange. Keine Koordinierung erforderlich — die Warteschlangenverteilung.
  • Vorrangige Drosselung: Wenn das Proxybudget begrenzt ist, verringern Sie automatisch die Kontrollfrequenz für Produkte mit niedriger Priorität, während die Echtzeitabdeckung für kritische Elemente beibehalten wird.
  • Anpassungsintervalle: Wenn der Preis eines Produkts für 24 Stunden stabil war, erhöhen Sie das Kontrollintervall. Wenn es sich zweimal in einer Stunde ändert, verringern Sie es.
  • Site-spezifische Koncurrenz: Verschiedene Zielorte haben unterschiedliche Toleranzen. Führen Sie mehr gleichzeitige Arbeiter für Shopify (mehr überzeugend) und weniger für Amazon ( aggressivere Erkennung).

Erfahren Sie mehr über Proxystrategien, die die Hochfrequenzüberwachung unterstützen. Best-Proxis für Web-Schrott und Preise von ProxyHat für den hochvolumigen Einsatz.

Schlüsselanhänger

  • Echtzeit-Überwachung reduziert die Preisänderungserkennung von Stunden zu Minuten, kritisch für die Nachbesserung und wettbewerbsfähige Reaktion.
  • Verwenden Sie eine Prioritätswarte, um Ressourcen auf hochwertige Produkte zu fokussieren, während Sie noch den langen Schwanz abdecken.
  • Ein Worker Pool mit gleichzeitigen Proxyverbindungen bietet Durchsatz ohne Burstmuster.
  • Änderung der Erkennungsmotoren Filtergeräusche — nur Prozess und Alarm auf tatsächliche Preisänderungen.
  • Speichern Sie Rohdaten in einer Zeitreihen-Datenbank (TimescaleDB) mit Retentionsrichtlinien für das Kostenmanagement.
  • Für die kontinuierliche Überwachung sind gebietsbezogene Proxie mit stationärer Rotation unerlässlich. Beginnen Sie mit ProxyHat für einen zuverlässigen Zugang.

Gebäude Echtzeit-Infrastruktur? Lesen Sie unsere E-Commerce-Schrottführer für die volle Strategie und überprüfen Sie unsere Anleitungen auf mit Proxies in Python und Node.js für Implementierungsdetails.

Bereit loszulegen?

Zugang zu über 50 Mio. Residential-IPs in über 148 Ländern mit KI-gesteuerter Filterung.

Preise ansehenResidential Proxies
← Zurück zum Blog