リアルタイム価格監視インフラの構築

プライオリティキュー、ワーカープール、変更検出、および住宅プロキシ回転を備えたリアルタイム価格監視システムの設計と構築。 Python と Node.js の実装ガイドを完了します.

リアルタイム価格監視インフラの構築

リアルタイム対バッチ価格監視

ほとんどの価格監視システムは、バッチモードで動作します。毎時(または数時間ごとに)すべての製品をチェックし、結果を保存し、変更にアラートを送信してください。 これは、多くのユースケースで動作します, しかし、高速移動市場で - フラッシュ販売, ダイナミックプライシング, マーケットプレース競争 — バッチ監視は、チェックの間に起こる重要な価格変化を見逃します.

リアルタイム価格監視では、検出遅延を時間から分、秒単位まで短縮できます。 一定のスケジュールですべての製品をチェックする代わりに、リアルタイムシステムが継続的に監視し、変化に反応します。 このガイドでは、リアルタイム監視システムの構築に必要なアーキテクチャ、プロキシインフラストラクチャ、および実装の詳細について説明します。 基礎的な価格の監視の概念については、ガイドを参照してください 競争相手の価格を自動的に監視して下さい. .

リアルタイム対バッチ価格監視
アスペクトバッチ監視リアルタイム監視
頻度をチェックして下さい1~24時間優先品1~5分ごとに
検出の遅れ1つの間隔まで5分以内
プロキシの使用濃縮バーストSteady、分散ストリーム
インフラシンプルなcronジョブワーカープールでイベント主導
コストログインより高い(より多くの要求、より多くのプロキシ)
最高の毎日のレポート、トレンド分析、抜け目がない販売の検出、競争の入札

リアルタイムモニタリングアーキテクチャ

リアルタイム価格モニタリングシステムには、連続パイプラインとして連携する5つのコアコンポーネントがあります。

1.優先キュー

製品は、チェック周波数を決定する優先層を割り当てられます。 優先キュー(Redisソートされたセットはうまく機能します)は、高値製品が常に最初にチェックされるようにします。

import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
    """Add a product to the monitoring queue."""
    next_check = time.time()  # Check immediately on first add
    r.zadd("price_queue", {json.dumps({
        "product_id": product_id,
        "url": url,
        "interval": priority_minutes * 60,
    }): next_check})
def get_next_batch(batch_size=10):
    """Get the next batch of products due for checking."""
    now = time.time()
    items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
    products = []
    for item in items:
        data = json.loads(item)
        r.zadd("price_queue", {item: now + data["interval"]})
        products.append(data)
    return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)

2. 労働者プール

複数のワーカープロセスは、優先キュー、プロキシによる価格の取得、データパイプラインへの結果をプッシュします。 労働者は、それぞれ独自のプロキシ接続で独立して作動します。

import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
    """Fetch the current price for a product."""
    headers = {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
    }
    proxies = {"http": PROXY_URL, "https": PROXY_URL}
    try:
        response = requests.get(
            product["url"], headers=headers,
            proxies=proxies, timeout=30
        )
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, "html.parser")
            price_el = soup.select_one("span.a-price-whole")
            if price_el:
                price = float(price_el.get_text(strip=True).replace(",", ""))
                return {
                    "product_id": product["product_id"],
                    "price": price,
                    "timestamp": time.time(),
                    "status": "success",
                }
    except Exception as e:
        pass
    return {
        "product_id": product["product_id"],
        "price": None,
        "timestamp": time.time(),
        "status": "failed",
    }
def run_workers(num_workers=10):
    """Run the monitoring worker pool."""
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        while True:
            batch = get_next_batch(batch_size=num_workers)
            if not batch:
                time.sleep(1)
                continue
            futures = [executor.submit(fetch_price, product) for product in batch]
            for future in futures:
                result = future.result()
                process_result(result)
            time.sleep(random.uniform(0.5, 2))

3. 検出エンジンの変更

すべての価格チェックを格納する代わりに、変更検出エンジンは、既定値に対して現在の価格を比較し、実際の変更時にイベントをトリガーします。

class ChangeDetector:
    def __init__(self, redis_client):
        self.redis = redis_client
    def check_change(self, product_id, new_price):
        """Compare new price against last known and detect changes."""
        key = f"last_price:{product_id}"
        last_data = self.redis.get(key)
        if last_data:
            last = json.loads(last_data)
            old_price = last["price"]
            if old_price and new_price and old_price != new_price:
                change_pct = ((new_price - old_price) / old_price) * 100
                event = {
                    "product_id": product_id,
                    "old_price": old_price,
                    "new_price": new_price,
                    "change_pct": round(change_pct, 2),
                    "timestamp": time.time(),
                }
                # Publish change event
                self.redis.publish("price_changes", json.dumps(event))
                return event
        # Update last known price
        self.redis.set(key, json.dumps({
            "price": new_price,
            "timestamp": time.time(),
        }))
        return None

4. イベントストリーム

価格は、Redis Pub/Sub チャネル(またはより大きいシステムのためのカフカトピック)に公開されます。 ダウンストリームの消費者 — アラートサービス、エンジンの補充、ダッシュボード — これらのイベントを購読し、独立して反応します。

import redis
import json
def subscribe_to_changes():
    """Subscribe to price change events."""
    r = redis.Redis(host="localhost", port=6379)
    pubsub = r.pubsub()
    pubsub.subscribe("price_changes")
    for message in pubsub.listen():
        if message["type"] == "message":
            event = json.loads(message["data"])
            handle_price_change(event)
def handle_price_change(event):
    """Process a price change event."""
    change = event["change_pct"]
    product = event["product_id"]
    if change < -10:
        send_urgent_alert(event)  # Major price drop
    elif change < -5:
        send_alert(event)         # Moderate drop
    elif change > 10:
        send_alert(event)         # Significant increase
    # Always log to time-series database
    store_price_change(event)

5.ダッシュボードとアラート

リアルタイムデータをリアルタイムで可視化します。 WebSocket接続を使用して、価格の更新をダッシュボードに即座にプッシュします。

Node.js 実装

リアルタイム監視エンジンのNode.jsバージョン ProxyHat ノード SDK. .

const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
  constructor(concurrency = 10) {
    this.concurrency = concurrency;
    this.running = false;
    this.agent = new HttpsProxyAgent(PROXY_URL);
  }
  async fetchPrice(product) {
    try {
      const { data } = await axios.get(product.url, {
        httpsAgent: new HttpsProxyAgent(PROXY_URL),
        headers: {
          "User-Agent":
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
          "Accept-Language": "en-US,en;q=0.9",
        },
        timeout: 30000,
      });
      const $ = cheerio.load(data);
      const priceText = $("span.a-price-whole").first().text().trim();
      const price = parseFloat(priceText.replace(/,/g, "")) || null;
      return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
    } catch (err) {
      return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
    }
  }
  async checkChange(productId, newPrice) {
    const key = `last_price:${productId}`;
    const lastData = await redis.get(key);
    if (lastData) {
      const last = JSON.parse(lastData);
      if (last.price && newPrice && last.price !== newPrice) {
        const changePct = ((newPrice - last.price) / last.price) * 100;
        const event = {
          productId,
          oldPrice: last.price,
          newPrice,
          changePct: Math.round(changePct * 100) / 100,
          timestamp: Date.now(),
        };
        await redis.publish("price_changes", JSON.stringify(event));
        return event;
      }
    }
    await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
    return null;
  }
  async processProduct(product) {
    const result = await this.fetchPrice(product);
    if (result.price) {
      const change = await this.checkChange(result.productId, result.price);
      if (change) {
        console.log(
          `Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
        );
      }
    }
    // Random delay
    await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
  }
  async start() {
    this.running = true;
    console.log(`Starting monitor with ${this.concurrency} workers`);
    while (this.running) {
      const batch = await this.getNextBatch(this.concurrency);
      if (batch.length === 0) {
        await new Promise((r) => setTimeout(r, 1000));
        continue;
      }
      await Promise.all(batch.map((p) => this.processProduct(p)));
    }
  }
  async getNextBatch(size) {
    const now = Date.now() / 1000;
    const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
    const products = [];
    for (const item of items) {
      const data = JSON.parse(item);
      await redis.zadd("price_queue", now + data.interval, item);
      products.push(data);
    }
    return products;
  }
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();

継続的な監視のためのプロキシ管理

リアルタイム監視では、プロキシインフラストラクチャの要求がバッチスクレイピングに比べて一元化します。

Steady-Stateリクエストパターン

リクエストのバーストを送信するバッチスクレイピングとは異なり、リアルタイム監視は定数ストリームを作成します。 これは、実際にプロキシーヘルスのために優れています — 1秒あたりの5-10リクエストの安定した流れは、2分のバーストで1,000リクエスト以上です。

Real-Time の ProxyHat 設定

# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080

IP の健康の監視

ターゲットサイトの成功率を追跡し、アプローチを動的に調整します。 特定のマーケットプレイスで成功率が低下する場合、遅延の増加や異なるジオターゲティングプールへの切り替え。 ProxyHat の大型住宅プールは、常に新しい IP を利用できるようにします。 お問い合わせ プロキシの場所 完全な適用範囲のため。

キーのテイクアウト:リアルタイムの監視は、安定した持続可能なプロキシ戦略を必要とします。 目標は、いくつかのIPから大量のバーストではなく、多くのIP間で一貫した低音量要求です。

リアルタイムのデータストレージ

リアルタイム価格データは、高周波インサートと時間範囲のクエリに最適化されたストレージソリューションを必要とします。

TimescaleDBスキーマ

-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
    time        TIMESTAMPTZ NOT NULL,
    product_id  TEXT NOT NULL,
    price       DECIMAL(10,2),
    currency    VARCHAR(3) DEFAULT 'USD',
    source_url  TEXT,
    status      VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS hour,
    product_id,
    AVG(price) AS avg_price,
    MIN(price) AS min_price,
    MAX(price) AS max_price,
    COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');

スケールの検討

  • 横の労働者のスケーリング: 複数のマシンにワーカーを追加し、同じRedisキューから各プルします。 調整は必要ありません。キューは配布を処理します。
  • 優先順位ベースの回転: プロキシの予算が限られる場合、自動的に重要な項目のための実時間適用範囲を維持している間低優先性プロダクトのための点検頻度を減らして下さい。
  • 適応間隔: 製品の価格は24時間安定している場合は、チェック間隔を増加させます。 1時間に2回変更した場合、それを減らす。
  • サイト固有の通貨: 異なるターゲットサイトには異なる許容値があります。 Shopify(more permissive)とAmazon(more積極的な検出)の少ないコンカレントワーズを実行します。

高周波監視をサポートするプロキシ戦略の詳細については、ガイドをご覧ください ウェブスクレイピングに最適なプロキシ そして、 ProxyHatの料金プラン 大量利用のため。

キーテイクアウト

  • リアルタイム監視では、価格変化の検出を時間から分まで短縮し、リプライや競争力のある応答に不可欠です。
  • 優先キューを使用して、長尾をカバーしながら、高値の製品にリソースを集中させます。
  • 同時プロキシ接続を備えたワーカープールは、バーストパターンなしでスループットを提供します。
  • 検出エンジンのフィルタノイズの変更 — 実際の価格変更の処理とアラートのみ。
  • コスト管理のための保持ポリシーを使用して、時間系列データベース(TimescaleDB)に生データを保存します。
  • 安定した状態の回転が付いている住宅のプロキシは連続的な監視のために必要です。 まずは プロキシハート 信頼できるアクセスのため。

リアルタイムインフラの構築 お問い合わせ eコマーススクレイピングガイド 完全な戦略のためおよびガイドを点検して下さい Pythonでプロキシを使う そして、 ノード.js 実装の詳細

始める準備はできましたか?

AIフィルタリングで148か国以上、5,000万以上のレジデンシャルIPにアクセス。

料金を見るレジデンシャルプロキシ
← ブログに戻る