Surveillance des prix en temps réel et par lots
La plupart des systèmes de surveillance des prix fonctionnent en mode batch : vérifier tous les produits toutes les heures (ou toutes les quelques heures), stocker les résultats et envoyer des alertes sur les changements. Cela fonctionne pour de nombreux cas d'utilisation, mais dans les marchés en évolution rapide — ventes éclair, prix dynamiques, concurrence sur le marché — la surveillance par lots manque de changements critiques des prix qui se produisent entre les contrôles.
La surveillance des prix en temps réel réduit le délai de détection des heures à des minutes ou même des secondes. Au lieu de vérifier chaque produit selon un calendrier fixe, un système en temps réel surveille en permanence les objectifs prioritaires et réagit aux changements qui se produisent. Ce guide couvre l'architecture, l'infrastructure de remplacement et les détails de mise en oeuvre nécessaires pour construire un système de surveillance en temps réel. Pour les concepts fondamentaux de surveillance des prix, voir notre guide sur surveiller automatiquement les prix des concurrents.
| Aspect | Surveillance par lots | Surveillance en temps réel |
|---|---|---|
| Vérifier la fréquence | Toutes les 1-24 heures | Toutes les 1 à 5 minutes pour les points prioritaires |
| Délai de détection | Jusqu'à un intervalle complet | Moins de 5 minutes |
| Utilisation par procuration | Brèches concentrées | Suspension, flux distribué |
| Infrastructure | Cron emplois simples | Animé par des événements avec des groupes de travailleurs |
| Coût | Moins | Plus haut (plus de demandes, plus de procurations) |
| Meilleur pour | Rapports quotidiens, analyse des tendances | Repricing, détection de vente éclair, appel d'offres |
Architecture pour la surveillance en temps réel
Un système de surveillance des prix en temps réel comporte cinq composantes essentielles qui fonctionnent ensemble comme un pipeline continu.
1. Demande prioritaire
Les produits se voient attribuer des niveaux de priorité qui déterminent la fréquence de vérification. Une file d'attente prioritaire (Redis Tried Sets fonctionne bien) garantit que les produits de grande valeur sont toujours vérifiés en premier.
import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
"""Add a product to the monitoring queue."""
next_check = time.time() # Check immediately on first add
r.zadd("price_queue", {json.dumps({
"product_id": product_id,
"url": url,
"interval": priority_minutes * 60,
}): next_check})
def get_next_batch(batch_size=10):
"""Get the next batch of products due for checking."""
now = time.time()
items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
products = []
for item in items:
data = json.loads(item)
r.zadd("price_queue", {item: now + data["interval"]})
products.append(data)
return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)2. Groupe de travailleurs
Les processus de travail multiples tirent de la file d'attente prioritaire, récupèrent les prix par les procurations, et poussent les résultats vers le pipeline de données. Les travailleurs opèrent indépendamment, chacun avec sa propre connexion par procuration.
import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
"""Fetch the current price for a product."""
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
proxies = {"http": PROXY_URL, "https": PROXY_URL}
try:
response = requests.get(
product["url"], headers=headers,
proxies=proxies, timeout=30
)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
price_el = soup.select_one("span.a-price-whole")
if price_el:
price = float(price_el.get_text(strip=True).replace(",", ""))
return {
"product_id": product["product_id"],
"price": price,
"timestamp": time.time(),
"status": "success",
}
except Exception as e:
pass
return {
"product_id": product["product_id"],
"price": None,
"timestamp": time.time(),
"status": "failed",
}
def run_workers(num_workers=10):
"""Run the monitoring worker pool."""
with ThreadPoolExecutor(max_workers=num_workers) as executor:
while True:
batch = get_next_batch(batch_size=num_workers)
if not batch:
time.sleep(1)
continue
futures = [executor.submit(fetch_price, product) for product in batch]
for future in futures:
result = future.result()
process_result(result)
time.sleep(random.uniform(0.5, 2))3. Moteur de détection de changement
Au lieu de stocker chaque contrôle de prix, le moteur de détection de changement compare les prix actuels aux dernières valeurs connues et ne déclenche des événements que sur les changements réels.
class ChangeDetector:
def __init__(self, redis_client):
self.redis = redis_client
def check_change(self, product_id, new_price):
"""Compare new price against last known and detect changes."""
key = f"last_price:{product_id}"
last_data = self.redis.get(key)
if last_data:
last = json.loads(last_data)
old_price = last["price"]
if old_price and new_price and old_price != new_price:
change_pct = ((new_price - old_price) / old_price) * 100
event = {
"product_id": product_id,
"old_price": old_price,
"new_price": new_price,
"change_pct": round(change_pct, 2),
"timestamp": time.time(),
}
# Publish change event
self.redis.publish("price_changes", json.dumps(event))
return event
# Update last known price
self.redis.set(key, json.dumps({
"price": new_price,
"timestamp": time.time(),
}))
return None4. Volet événementiel
Les modifications de prix sont publiées sur un canal Redis Pub/Sub (ou sur le sujet Kafka pour les systèmes plus grands). Les consommateurs en aval — services d'alerte, moteurs de réparation, tableaux de bord — souscrivent à ces événements et réagissent de manière indépendante.
import redis
import json
def subscribe_to_changes():
"""Subscribe to price change events."""
r = redis.Redis(host="localhost", port=6379)
pubsub = r.pubsub()
pubsub.subscribe("price_changes")
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
handle_price_change(event)
def handle_price_change(event):
"""Process a price change event."""
change = event["change_pct"]
product = event["product_id"]
if change < -10:
send_urgent_alert(event) # Major price drop
elif change < -5:
send_alert(event) # Moderate drop
elif change > 10:
send_alert(event) # Significant increase
# Always log to time-series database
store_price_change(event)5. Tableau de bord et alertes
Les données en temps réel nécessitent une visualisation en temps réel. Utilisez les connexions WebSocket pour pousser les mises à jour de prix sur les tableaux de bord instantanément.
Mise en œuvre de Node.js
Une version Node.js du moteur de surveillance en temps réel utilisant Le nœud SDK de ProxyHat.
const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
constructor(concurrency = 10) {
this.concurrency = concurrency;
this.running = false;
this.agent = new HttpsProxyAgent(PROXY_URL);
}
async fetchPrice(product) {
try {
const { data } = await axios.get(product.url, {
httpsAgent: new HttpsProxyAgent(PROXY_URL),
headers: {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
},
timeout: 30000,
});
const $ = cheerio.load(data);
const priceText = $("span.a-price-whole").first().text().trim();
const price = parseFloat(priceText.replace(/,/g, "")) || null;
return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
} catch (err) {
return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
}
}
async checkChange(productId, newPrice) {
const key = `last_price:${productId}`;
const lastData = await redis.get(key);
if (lastData) {
const last = JSON.parse(lastData);
if (last.price && newPrice && last.price !== newPrice) {
const changePct = ((newPrice - last.price) / last.price) * 100;
const event = {
productId,
oldPrice: last.price,
newPrice,
changePct: Math.round(changePct * 100) / 100,
timestamp: Date.now(),
};
await redis.publish("price_changes", JSON.stringify(event));
return event;
}
}
await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
return null;
}
async processProduct(product) {
const result = await this.fetchPrice(product);
if (result.price) {
const change = await this.checkChange(result.productId, result.price);
if (change) {
console.log(
`Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
);
}
}
// Random delay
await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
}
async start() {
this.running = true;
console.log(`Starting monitor with ${this.concurrency} workers`);
while (this.running) {
const batch = await this.getNextBatch(this.concurrency);
if (batch.length === 0) {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
await Promise.all(batch.map((p) => this.processProduct(p)));
}
}
async getNextBatch(size) {
const now = Date.now() / 1000;
const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
const products = [];
for (const item of items) {
const data = JSON.parse(item);
await redis.zadd("price_queue", now + data.interval, item);
products.push(data);
}
return products;
}
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();Gestion par procuration pour la surveillance continue
La surveillance en temps réel impose des exigences uniques à votre infrastructure de remplacement par rapport à la mise au rebut par lots.
Modèle de demande d'état stable
Contrairement au grattage par lots qui envoie des éclats de requêtes, la surveillance en temps réel crée un flux constant. C'est en fait mieux pour la santé par procuration — un flux régulier de 5-10 demandes par seconde semble plus naturel que 1000 demandes en une explosion de 2 minutes.
Configuration ProxyHat pour temps réel
# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080IP Surveillance de la santé
Suivez les taux de réussite par site cible et ajustez votre approche dynamiquement. Si les taux de réussite diminuent sur un marché donné, augmentez les retards ou passez à un autre bassin géo-cible. La grande piscine résidentielle de ProxyHat vous assure d'avoir toujours des IPs frais disponibles. Vérifiez notre lieux de remplacement pour une couverture complète.
À retenir : La surveillance en temps réel nécessite une stratégie de remplacement stable et durable. L'objectif est des requêtes constantes à faible volume pour de nombreuses IP, et non des rafales à volume élevé de quelques IP.
Stockage des données en temps réel
Les données de prix en temps réel ont besoin d'une solution de stockage optimisée pour les inserts à haute fréquence et les requêtes dans le temps.
CalendrierDB Schema
-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
time TIMESTAMPTZ NOT NULL,
product_id TEXT NOT NULL,
price DECIMAL(10,2),
currency VARCHAR(3) DEFAULT 'USD',
source_url TEXT,
status VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
product_id,
AVG(price) AS avg_price,
MIN(price) AS min_price,
MAX(price) AS max_price,
COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');Élargissement des considérations
- Échelle horizontale des travailleurs: Ajoutez des travailleurs sur plusieurs machines, chacun tirant de la même file d'attente Redis. Aucune coordination n'est nécessaire — la file d'attente gère la distribution.
- Throttling axé sur les priorités: Lorsque le budget par procuration est limité, réduire automatiquement la fréquence de vérification des produits peu prioritaires tout en maintenant une couverture en temps réel pour les éléments critiques.
- Intervalles adaptatifs: Si le prix d'un produit est stable pendant 24 heures, augmentez l'intervalle de contrôle. Si elle a changé deux fois en une heure, la diminuer.
- Cohérence propre au site : Différents sites cibles ont des tolérances différentes. Lancer plus de travailleurs concomitants pour Shopify (plus permissif) et moins pour Amazon (détection plus agressive).
Pour en savoir plus sur les stratégies de remplacement qui appuient la surveillance à haute fréquence, consultez notre guide sur meilleurs proxies pour le grattage de toile et Les plans de prix de ProxyHat pour une utilisation en grand volume.
A emporter des clés
- La surveillance en temps réel réduit la détection des changements de prix d'heures à minutes, ce qui est essentiel pour le repricing et la réponse concurrentielle.
- Utilisez une file d'attente prioritaire pour concentrer les ressources sur les produits de grande valeur tout en couvrant la queue longue.
- Un pool de travailleurs avec des connexions proxy simultanées fournit le débit sans les modèles d'éclatement.
- Changer les moteurs de détection filtre le bruit — seulement processus et alerte sur les changements de prix réels.
- Stocker les données brutes dans une base de données série chronologique (TimescaleDB) avec des politiques de conservation pour la gestion des coûts.
- Les procurations résidentielles à rotation à l'état d'équilibre sont essentielles pour une surveillance continue. Commencez par ProxyHat pour un accès fiable.
Construire une infrastructure en temps réel? Lisez notre Guide sur la démolition du commerce électronique pour la stratégie complète et vérifier nos guides sur utilisant des proxies dans Python et Node.js pour les détails de mise en œuvre.






