スクレイピングインフラのスケーリング方法

ウェブスクレイピングのスケーリングのためのアーキテクチャパターン:キューベースシステム、パイプライン設計、コンテナと水平スケーリング、およびスケールでのプロキシ管理。 Python、Node.js、Goのコード.

スクレイピングインフラのスケーリング方法

なぜスクレイピングインフラストラクチャは、専用のアーキテクチャを必要とするのか

1つのウェブサイトをヒットした単一スレッドスクリプトは、小さなタスクのためにうまく機能します。 しかし、何千ものターゲットを横断して毎日数千ページをスクレイピングする必要がある場合は、スクリプトはボトルネックになります。 インフラのスクレイピング リニアスクリプトから配布、キューベースのアーキテクチャーに移動して、障害を優雅に処理し、プロキシの回転を管理し、スループットを最大化する必要があります。

このガイドでは、アーキテクチャのパターン、キューシステム、水平スケーリング戦略、およびスケールで生産グレードのスクレイピングを出力するプロキシ管理技術について説明します。

この記事は、私たちのコンセプトに基づいて構築します ウェブスクレイピングプロキシの完全なガイド. プロキシプールサイジングのため、参照して下さい あなたがスクレイピングのために必要とする多くのプロキシは?

スケーラブルスクレイピングのための建築パターン

パターン1:キューベーススクレイピング

スケーラブルスクレイピングの基礎は、 メッセージキュー データのフェッチから URL の検出をデカップリングします。 ワーカーは、キューからタスクをプルし、プロキシを通してページをフェッチし、結果をストレージにプッシュします。

# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
#                               → Worker 2 →
#                               → Worker N →
#                               ↓
#                        [Dead Letter Queue]
#                        (failed requests)

このパターンの利点:

  • 横のスケーリング: システムを変更せずに労働者を追加または削除
  • 欠陥の許容: 失敗したタスクは、再試行のキューに戻る
  • 率制御: 作業者のカウントを調整して、全体的なスループットをコントロール
  • 可視性: キュー深さは、バックログを示しています。 完了率は健康を示しています

Redis QueueによるPythonの実装

import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
    url: str
    target: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
class ScrapingQueue:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:tasks"
        self.results_key = "scrape:results"
        self.dlq_key = "scrape:dead_letter"
    def enqueue(self, task: ScrapeTask):
        self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
    def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
        result = self.redis.brpop(self.queue_key, timeout=timeout)
        if result:
            data = json.loads(result[1])
            return ScrapeTask(**data)
        return None
    def store_result(self, url: str, data: dict):
        self.redis.hset(self.results_key, url, json.dumps(data))
    def send_to_dlq(self, task: ScrapeTask, error: str):
        task_data = task.__dict__
        task_data["error"] = error
        self.redis.lpush(self.dlq_key, json.dumps(task_data))
    @property
    def pending_count(self) -> int:
        return self.redis.llen(self.queue_key)
class ScrapingWorker:
    def __init__(self, queue: ScrapingQueue, worker_id: int):
        self.queue = queue
        self.worker_id = worker_id
        self.session = requests.Session()
        self.session.proxies = {"http": PROXY, "https": PROXY}
    def process_task(self, task: ScrapeTask) -> bool:
        try:
            resp = self.session.get(task.url, timeout=30)
            if resp.status_code == 200:
                self.queue.store_result(task.url, {
                    "status": 200,
                    "body": resp.text[:10000],  # Truncate for storage
                    "target": task.target,
                })
                return True
            elif resp.status_code in [429, 503]:
                # Retry with backoff
                if task.retries < task.max_retries:
                    task.retries += 1
                    time.sleep(2 ** task.retries)
                    self.queue.enqueue(task)
                else:
                    self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
            else:
                self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
        except Exception as e:
            if task.retries < task.max_retries:
                task.retries += 1
                self.queue.enqueue(task)
            else:
                self.queue.send_to_dlq(task, str(e))
        return False
    def run(self):
        print(f"Worker {self.worker_id} started")
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
    queue.enqueue(ScrapeTask(
        url=f"https://example.com/product/{i}",
        target="example.com"
    ))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
    workers = [ScrapingWorker(queue, i) for i in range(10)]
    for worker in workers:
        executor.submit(worker.run)

Bull QueueによるNode.js実装

const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => {  // 10 concurrent workers
  const { url, target } = job.data;
  try {
    const res = await fetch(url, { agent, timeout: 30000 });
    if (res.ok) {
      const body = await res.text();
      return { url, status: res.status, body: body.slice(0, 10000) };
    }
    if (res.status === 429 || res.status === 503) {
      throw new Error(`Rate limited: HTTP ${res.status}`);
    }
    return { url, status: res.status, body: null };
  } catch (err) {
    throw err; // Bull will retry based on job options
  }
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
  for (const url of urls) {
    await scrapeQueue.add(
      { url, target },
      {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: 1000,
        removeOnFail: false,
      }
    );
  }
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
  console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
  console.error(`Failed: ${job.data.url} - ${err.message}`);
});

パターン2:パイプラインアーキテクチャ

複雑なスクレイピングワークフローでは、 パイプライン 各ステージが異なる懸念を処理する場所:

# Pipeline stages:
#
# Stage 1: URL Discovery    → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction  → parses HTML, extracts data
# Stage 4: Data Validation  → checks quality, deduplicates
# Stage 5: Storage          → saves to database/warehouse

導入事例

package main
import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)
type ScrapeResult struct {
    URL    string `json:"url"`
    Status int    `json:"status"`
    Body   string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
    defer wg.Done()
    proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
    client := &http.Client{
        Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
        Timeout:   30 * time.Second,
    }
    for u := range urls {
        resp, err := client.Get(u)
        if err != nil {
            results <- ScrapeResult{URL: u, Status: 0}
            continue
        }
        body, _ := io.ReadAll(resp.Body)
        resp.Body.Close()
        results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
    }
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
    for result := range results {
        if result.Status == 200 {
            // Extract and store data
            data, _ := json.Marshal(result)
            fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
        }
    }
    done <- true
}
func main() {
    urls := make(chan string, 1000)
    results := make(chan ScrapeResult, 1000)
    done := make(chan bool)
    // Start 20 fetcher workers
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go fetcher(urls, results, &wg)
    }
    // Start processor
    go processor(results, done)
    // Feed URLs
    go func() {
        for i := 0; i < 10000; i++ {
            urls <- fmt.Sprintf("https://example.com/product/%d", i)
        }
        close(urls)
    }()
    // Wait for all fetchers, then close results
    wg.Wait()
    close(results)
    <-done
}

水平スケーリング戦略

戦略1:マルチマシン展開

複数の機械を渡る蒸溜器。 キューは、コーディネートポイントとして機能します。

戦略1:マルチマシン展開
コンポーネント導入事例スケーリング
キュー(Redis/RabbitMQ)専用サーバーまたは管理サービス縦(より多くのRAM)
ワーカー複数の機械か容器水平(インスタンスを追加)
結果のストレージデータベースまたはオブジェクトストア縦 + シャーディング
モニタリング集中ダッシュボード単一インスタンス

戦略2:コンテナベースのスケーリング

Docker と Kubernetes を使用して、弾性スケーリングを行います。 各ワーカーは、レプリカできるコンテナで実行します。

# docker-compose.yml for scraping workers
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  scraper-worker:
    build: .
    environment:
      - PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
      - REDIS_URL=redis://redis:6379/0
      - WORKER_CONCURRENCY=10
    deploy:
      replicas: 5        # 5 containers × 10 concurrency = 50 parallel requests
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
    depends_on:
      - redis

スケールでのプロキシ管理

スケールでは、プロキシ管理が重要なシステムコンポーネントになります。 主な検討:

接続プール

リクエストごとに新しいものを作成する代わりにプロキシゲートウェイへの接続を再利用します。 これはレイテンシと接続オーバーヘッドを削減します。

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
    """Create a session with connection pooling and retry logic."""
    session = requests.Session()
    # Connection pool: keep 20 connections, max 50
    adapter = HTTPAdapter(
        pool_connections=20,
        pool_maxsize=50,
        max_retries=Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[500, 502, 503],
        )
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.proxies = {
        "http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
        "https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
    }
    return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
    resp = session.get(url, timeout=30)

健康モニタリング

プロキシのパフォーマンスをリアルタイムで監視し、問題を早期に検出します。

import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
    """Track proxy health metrics for monitoring."""
    requests_total: int = 0
    requests_success: int = 0
    requests_blocked: int = 0
    requests_timeout: int = 0
    latency_samples: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    def record_request(self, status_code: int, latency_ms: float):
        self.requests_total += 1
        self.status_codes[status_code] += 1
        self.latency_samples.append(latency_ms)
        if status_code == 200:
            self.requests_success += 1
        elif status_code in [403, 429]:
            self.requests_blocked += 1
        # Keep only last 1000 samples
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]
    @property
    def success_rate(self) -> float:
        return self.requests_success / self.requests_total if self.requests_total else 0
    @property
    def avg_latency(self) -> float:
        return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
    def report(self) -> str:
        return (
            f"Total: {self.requests_total}, "
            f"Success: {self.success_rate:.1%}, "
            f"Blocked: {self.requests_blocked}, "
            f"Avg Latency: {self.avg_latency:.0f}ms"
        )

スケールのデータストレージ

スケールのデータストレージ
貯蔵のタイプ最高ののためのスケール
PostgreSQLについて構造物製品/優先データ数百万行
モンゴDB半構造/可変的なスキーマ書類の請求
S3/オブジェクトストレージ生のHTMLアーカイブペタバイト
研究開発スクラップされたデータに対する全文検索書類の請求
クリックハウス大量のデータセットに対する分析行のトリリオン

スケールチェックリスト

  • fetching からの URL の発見を宣言します。 ステージ間のメッセージキューを使用します。
  • 適切な再試行ロジックを実行します。 永続的な失敗のための死んだ手紙の列が付いている指数関数的なバックオフ。
  • すべてを監視します。 キュー深さ、成功率、遅延、ターゲットドメインごとのエラー率。
  • 接続プールを使用します。 リクエストごとに新しいものを作成する代わりにプロキシ接続を再利用します。
  • 失敗の計画。 ワーカーのクラッシュ、プロキシがブロックされ、ターゲットは構造を変えます。 あらゆる層にレジリエンスを築きます。
  • 起動前にスケールでテストします。 メモリ、接続制限、またはキューボトルネックにより、100 RPMで動作するシステムが10,000 RPMで失敗することがあります。

スケーリングアーキテクチャを補完するプロキシの回転戦略については、 大規模なスクレイピングのためのプロキシ回転戦略. 速度制限をスケールで処理するには、 スクレイピング率は説明を制限します. .

利用する Python SDK, ノードSDKまたは SDKについて 生産のプロキシ統合のために、および探検して下さい ProxyHatプラン 大量のスクレーピングに。

よくある質問

スケールでスクレイピングするのに最適なキューシステムは何ですか?

ブル (Node.js) または RQ (Python) の Redis は、1 日あたりの数百万のタスクまで動作します。 より大きいスケールのために、Apache Kafka か RabbitMQ はよりよい耐久性およびスループットを提供します。 既存のインフラとチームの専門知識に基づいて選択してください。

実行するコンカレントワーカーは何人ですか?

プロキシ容量とターゲットサイトの公差に基づいて10-20人の労働者とスケールで始まります。 成功率を監視 — 90% 以下にドロップすると、より多くの労働者を追加する前に、通貨を削減します。 ProxyHat による各ワーカーは自動 IP の回転を取得します。

ワーカーに非同期またはスレッドを使用する必要がありますか?

I/O-boundスクレイピング(ほとんどの場合)の場合、非同期(Python asyncio、Node.js)は、スレッドよりも優れたリソース効率を提供します。 フェッチと一緒にCPUヘビーパッシングを必要とする場合にのみ、スレッドまたはマルチプロセッシングを使用します。 両方のパターンでgoroutines をエクセルに行きます。

ターゲットサイト構造の変更を処理する方法は?

パイプライン内のデータ検証を実行します。 パースされたデータがバリデーション(フィールドの省略、間違ったタイプ)に失敗すると、チームとキューの影響を受けた URL をアラートして、更新されたパーサで再処理します。 パーサをバージョンアップして、必要に応じてロールバックできます。

始める準備はできましたか?

AIフィルタリングで148か国以上、5,000万以上のレジデンシャルIPにアクセス。

料金を見るレジデンシャルプロキシ
← ブログに戻る