Monitoreo de precios en tiempo real vs
La mayoría de los sistemas de monitoreo de precios funcionan en modo de lote: comprueba todos los productos cada hora (o cada pocas horas), almacena los resultados y envía alertas sobre cambios. Esto funciona para muchos casos de uso, pero en mercados de rápido movimiento — ventas flash, precios dinámicos, competencia de mercado — monitoreo de lotes falla cambios de precios críticos que ocurren entre cheques.
El monitoreo de precios en tiempo real reduce el retraso de detección de horas a minutos o incluso segundos. En lugar de revisar cada producto en un horario fijo, un sistema en tiempo real monitorea continuamente objetivos de alta prioridad y reacciona a los cambios que suceden. Esta guía cubre la arquitectura, la infraestructura proxy y los detalles de implementación necesarios para construir un sistema de monitoreo en tiempo real. Para los conceptos de monitoreo de precios fundamentales, vea nuestra guía control de los precios de los competidores automáticamente.
| Aspecto | Batch Monitoring | Vigilancia en tiempo real |
|---|---|---|
| Frecuencia de comprobación | Cada 1-24 horas | Cada 1-5 minutos para temas prioritarios |
| Lag de detección | Hasta un intervalo completo | Menos de 5 minutos |
| Uso proxy | ráfagas concentradas | Corriente distribuida |
| Infraestructura | Trabajos simples de cron | Evento con piscinas de trabajadores |
| Costo | Bajo | Más alto (más solicitudes, más proxies) |
| Mejor | Informes diarios, análisis de tendencias | Repricing, detección de venta flash, licitación competitiva |
Arquitectura para el monitoreo en tiempo real
Un sistema de monitoreo de precios en tiempo real tiene cinco componentes básicos que trabajan juntos como un gasoducto continuo.
1. Cargos prioritarios
Los productos se asignan niveles prioritarios que determinan la frecuencia de comprobación. Una cola prioritaria (Redis Sorted Sets funciona bien) asegura que los productos de alto valor siempre se verifican primero.
import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
"""Add a product to the monitoring queue."""
next_check = time.time() # Check immediately on first add
r.zadd("price_queue", {json.dumps({
"product_id": product_id,
"url": url,
"interval": priority_minutes * 60,
}): next_check})
def get_next_batch(batch_size=10):
"""Get the next batch of products due for checking."""
now = time.time()
items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
products = []
for item in items:
data = json.loads(item)
r.zadd("price_queue", {item: now + data["interval"]})
products.append(data)
return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)2. Piscina de trabajo
Múltiples procesos de trabajo sacan de la cola de prioridad, recogen precios a través de proxies, y empujan los resultados a la tubería de datos. Los trabajadores operan independientemente, cada uno con su propia conexión proxy.
import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
"""Fetch the current price for a product."""
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
proxies = {"http": PROXY_URL, "https": PROXY_URL}
try:
response = requests.get(
product["url"], headers=headers,
proxies=proxies, timeout=30
)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
price_el = soup.select_one("span.a-price-whole")
if price_el:
price = float(price_el.get_text(strip=True).replace(",", ""))
return {
"product_id": product["product_id"],
"price": price,
"timestamp": time.time(),
"status": "success",
}
except Exception as e:
pass
return {
"product_id": product["product_id"],
"price": None,
"timestamp": time.time(),
"status": "failed",
}
def run_workers(num_workers=10):
"""Run the monitoring worker pool."""
with ThreadPoolExecutor(max_workers=num_workers) as executor:
while True:
batch = get_next_batch(batch_size=num_workers)
if not batch:
time.sleep(1)
continue
futures = [executor.submit(fetch_price, product) for product in batch]
for future in futures:
result = future.result()
process_result(result)
time.sleep(random.uniform(0.5, 2))3. Motor de detección de cambios
En lugar de almacenar cada cheque de precio, el motor de detección de cambios compara los precios actuales con los últimos valores conocidos y sólo desencadena eventos en cambios reales.
class ChangeDetector:
def __init__(self, redis_client):
self.redis = redis_client
def check_change(self, product_id, new_price):
"""Compare new price against last known and detect changes."""
key = f"last_price:{product_id}"
last_data = self.redis.get(key)
if last_data:
last = json.loads(last_data)
old_price = last["price"]
if old_price and new_price and old_price != new_price:
change_pct = ((new_price - old_price) / old_price) * 100
event = {
"product_id": product_id,
"old_price": old_price,
"new_price": new_price,
"change_pct": round(change_pct, 2),
"timestamp": time.time(),
}
# Publish change event
self.redis.publish("price_changes", json.dumps(event))
return event
# Update last known price
self.redis.set(key, json.dumps({
"price": new_price,
"timestamp": time.time(),
}))
return None4. Corriente de eventos
Los cambios de precio se publican a un canal Redis Pub/Sub (o tema Kafka para sistemas más grandes). Los consumidores de Downstream – alertar los servicios, revalorizar motores, tableros de mando – se suscriben a estos eventos y reaccionan independientemente.
import redis
import json
def subscribe_to_changes():
"""Subscribe to price change events."""
r = redis.Redis(host="localhost", port=6379)
pubsub = r.pubsub()
pubsub.subscribe("price_changes")
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
handle_price_change(event)
def handle_price_change(event):
"""Process a price change event."""
change = event["change_pct"]
product = event["product_id"]
if change < -10:
send_urgent_alert(event) # Major price drop
elif change < -5:
send_alert(event) # Moderate drop
elif change > 10:
send_alert(event) # Significant increase
# Always log to time-series database
store_price_change(event)5. Dashboard and Alerts
Los datos en tiempo real necesitan visualización en tiempo real. Utilice las conexiones WebSocket para presionar actualizaciones de precios a los paneles al instante.
Node.js Implementation
Versión Node.js del motor de monitoreo en tiempo real utilizando Nodo de ProxyHat SDK.
const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
constructor(concurrency = 10) {
this.concurrency = concurrency;
this.running = false;
this.agent = new HttpsProxyAgent(PROXY_URL);
}
async fetchPrice(product) {
try {
const { data } = await axios.get(product.url, {
httpsAgent: new HttpsProxyAgent(PROXY_URL),
headers: {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
},
timeout: 30000,
});
const $ = cheerio.load(data);
const priceText = $("span.a-price-whole").first().text().trim();
const price = parseFloat(priceText.replace(/,/g, "")) || null;
return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
} catch (err) {
return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
}
}
async checkChange(productId, newPrice) {
const key = `last_price:${productId}`;
const lastData = await redis.get(key);
if (lastData) {
const last = JSON.parse(lastData);
if (last.price && newPrice && last.price !== newPrice) {
const changePct = ((newPrice - last.price) / last.price) * 100;
const event = {
productId,
oldPrice: last.price,
newPrice,
changePct: Math.round(changePct * 100) / 100,
timestamp: Date.now(),
};
await redis.publish("price_changes", JSON.stringify(event));
return event;
}
}
await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
return null;
}
async processProduct(product) {
const result = await this.fetchPrice(product);
if (result.price) {
const change = await this.checkChange(result.productId, result.price);
if (change) {
console.log(
`Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
);
}
}
// Random delay
await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
}
async start() {
this.running = true;
console.log(`Starting monitor with ${this.concurrency} workers`);
while (this.running) {
const batch = await this.getNextBatch(this.concurrency);
if (batch.length === 0) {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
await Promise.all(batch.map((p) => this.processProduct(p)));
}
}
async getNextBatch(size) {
const now = Date.now() / 1000;
const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
const products = [];
for (const item of items) {
const data = JSON.parse(item);
await redis.zadd("price_queue", now + data.interval, item);
products.push(data);
}
return products;
}
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();Proxy Management for Continuous Monitoring
El monitoreo en tiempo real coloca demandas únicas en su infraestructura proxy en comparación con el raspado de lotes.
Patrón de Solicitud de Estado
A diferencia del raspado de lotes que envía ráfagas de solicitudes, el monitoreo en tiempo real crea un flujo constante. Esto es mejor para la salud indirecta: un flujo constante de 5-10 solicitudes por segundo parece más natural que 1.000 solicitudes en un estallido de 2 minutos.
Configuración ProxyHat para tiempo real
# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080IP Health Monitoring
Seguimiento de las tasas de éxito por sitio de destino y ajuste dinámicamente su enfoque. Si las tasas de éxito disminuyen en un mercado específico, aumentan las demoras o cambian a una piscina geo-objetiva diferente. La gran piscina residencial de ProxyHat le asegura siempre tener IPs frescas disponibles. Comprueba nuestro ubicaciones proxy para cobertura completa.
Toma de llaves: La vigilancia en tiempo real requiere una estrategia proxy estable y sostenible. The goal is consistent low-volume requests across many IPs, not high-volume explosions from few IPs.
Almacenamiento de datos para datos en tiempo real
Los datos de precios en tiempo real necesitan una solución de almacenamiento optimizada para inserciones de alta frecuencia y consultas de tiempo.
TimescaleDB Schema
-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
time TIMESTAMPTZ NOT NULL,
product_id TEXT NOT NULL,
price DECIMAL(10,2),
currency VARCHAR(3) DEFAULT 'USD',
source_url TEXT,
status VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
product_id,
AVG(price) AS avg_price,
MIN(price) AS min_price,
MAX(price) AS max_price,
COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');Consideraciones de escala
- Escalada de trabajadores horizontales: Añadir trabajadores a través de múltiples máquinas, cada tirado de la misma cola Redis. No se necesita coordinación — la cola maneja la distribución.
- Luchando con base prioritaria: Cuando el presupuesto proxy es limitado, reduce automáticamente la frecuencia de comprobación para productos de baja prioridad manteniendo la cobertura en tiempo real para artículos críticos.
- Intervalos adaptativos: Si el precio de un producto ha sido estable durante 24 horas, aumenta el intervalo de comprobación. Si cambió dos veces en una hora, disminuya.
- Concurrencia específica del sitio: Los diferentes sitios de destino tienen diferentes tolerancias. Ejecutar más trabajadores concurrentes para Shopify (más permisivo) y menos para Amazon (detección más agresiva).
Para más información sobre estrategias proxy que apoyan el monitoreo de alta frecuencia, explore nuestra guía sobre mejores proxies para raspado web y Planes de precios de ProxyHat para el uso de alto volumen.
Key Takeaways
- El monitoreo en tiempo real reduce la detección del cambio de precios de horas a minutos, crítica para la revalorización y respuesta competitiva.
- Utilice una cola de prioridad para centrar los recursos en productos de alto valor mientras todavía cubre la larga cola.
- Una piscina de trabajadores con conexiones proxy concurrentes proporciona rendimiento sin patrones de explosión.
- Cambiar los motores de detección filtran el ruido — sólo procesar y alertar sobre cambios de precio reales.
- Almacene datos brutos en una base de datos de series temporales (TimescaleDB) con políticas de retención para la gestión de costos.
- Los proxies residenciales con rotación estable son esenciales para el monitoreo continuo. Empieza con ProxyHat para un acceso confiable.
¿Construyendo infraestructura en tiempo real? Lea nuestro e-commerce scraping guide para la estrategia completa y comprobar nuestras guías en usando proxies en Python y Node.js para detalles de la aplicación.






