なぜスクレイピングインフラストラクチャは、専用のアーキテクチャを必要とするのか
1つのウェブサイトをヒットした単一スレッドスクリプトは、小さなタスクのためにうまく機能します。 しかし、何千ものターゲットを横断して毎日数千ページをスクレイピングする必要がある場合は、スクリプトはボトルネックになります。 インフラのスクレイピング リニアスクリプトから配布、キューベースのアーキテクチャーに移動して、障害を優雅に処理し、プロキシの回転を管理し、スループットを最大化する必要があります。
このガイドでは、アーキテクチャのパターン、キューシステム、水平スケーリング戦略、およびスケールで生産グレードのスクレイピングを出力するプロキシ管理技術について説明します。
この記事は、私たちのコンセプトに基づいて構築します ウェブスクレイピングプロキシの完全なガイド. プロキシプールサイジングのため、参照して下さい あなたがスクレイピングのために必要とする多くのプロキシは?
スケーラブルスクレイピングのための建築パターン
パターン1:キューベーススクレイピング
スケーラブルスクレイピングの基礎は、 メッセージキュー データのフェッチから URL の検出をデカップリングします。 ワーカーは、キューからタスクをプルし、プロキシを通してページをフェッチし、結果をストレージにプッシュします。
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)このパターンの利点:
- 横のスケーリング: システムを変更せずに労働者を追加または削除
- 欠陥の許容: 失敗したタスクは、再試行のキューに戻る
- 率制御: 作業者のカウントを調整して、全体的なスループットをコントロール
- 可視性: キュー深さは、バックログを示しています。 完了率は健康を示しています
Redis QueueによるPythonの実装
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)Bull QueueによるNode.js実装
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});パターン2:パイプラインアーキテクチャ
複雑なスクレイピングワークフローでは、 パイプライン 各ステージが異なる懸念を処理する場所:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouse導入事例
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}水平スケーリング戦略
戦略1:マルチマシン展開
複数の機械を渡る蒸溜器。 キューは、コーディネートポイントとして機能します。
| コンポーネント | 導入事例 | スケーリング |
|---|---|---|
| キュー(Redis/RabbitMQ) | 専用サーバーまたは管理サービス | 縦(より多くのRAM) |
| ワーカー | 複数の機械か容器 | 水平(インスタンスを追加) |
| 結果のストレージ | データベースまたはオブジェクトストア | 縦 + シャーディング |
| モニタリング | 集中ダッシュボード | 単一インスタンス |
戦略2:コンテナベースのスケーリング
Docker と Kubernetes を使用して、弾性スケーリングを行います。 各ワーカーは、レプリカできるコンテナで実行します。
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redisスケールでのプロキシ管理
スケールでは、プロキシ管理が重要なシステムコンポーネントになります。 主な検討:
接続プール
リクエストごとに新しいものを作成する代わりにプロキシゲートウェイへの接続を再利用します。 これはレイテンシと接続オーバーヘッドを削減します。
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)健康モニタリング
プロキシのパフォーマンスをリアルタイムで監視し、問題を早期に検出します。
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)スケールのデータストレージ
| 貯蔵のタイプ | 最高ののための | スケール |
|---|---|---|
| PostgreSQLについて | 構造物製品/優先データ | 数百万行 |
| モンゴDB | 半構造/可変的なスキーマ | 書類の請求 |
| S3/オブジェクトストレージ | 生のHTMLアーカイブ | ペタバイト |
| 研究開発 | スクラップされたデータに対する全文検索 | 書類の請求 |
| クリックハウス | 大量のデータセットに対する分析 | 行のトリリオン |
スケールチェックリスト
- fetching からの URL の発見を宣言します。 ステージ間のメッセージキューを使用します。
- 適切な再試行ロジックを実行します。 永続的な失敗のための死んだ手紙の列が付いている指数関数的なバックオフ。
- すべてを監視します。 キュー深さ、成功率、遅延、ターゲットドメインごとのエラー率。
- 接続プールを使用します。 リクエストごとに新しいものを作成する代わりにプロキシ接続を再利用します。
- 失敗の計画。 ワーカーのクラッシュ、プロキシがブロックされ、ターゲットは構造を変えます。 あらゆる層にレジリエンスを築きます。
- 起動前にスケールでテストします。 メモリ、接続制限、またはキューボトルネックにより、100 RPMで動作するシステムが10,000 RPMで失敗することがあります。
スケーリングアーキテクチャを補完するプロキシの回転戦略については、 大規模なスクレイピングのためのプロキシ回転戦略. 速度制限をスケールで処理するには、 スクレイピング率は説明を制限します. .
利用する Python SDK, ノードSDKまたは SDKについて 生産のプロキシ統合のために、および探検して下さい ProxyHatプラン 大量のスクレーピングに。
よくある質問
スケールでスクレイピングするのに最適なキューシステムは何ですか?
ブル (Node.js) または RQ (Python) の Redis は、1 日あたりの数百万のタスクまで動作します。 より大きいスケールのために、Apache Kafka か RabbitMQ はよりよい耐久性およびスループットを提供します。 既存のインフラとチームの専門知識に基づいて選択してください。
実行するコンカレントワーカーは何人ですか?
プロキシ容量とターゲットサイトの公差に基づいて10-20人の労働者とスケールで始まります。 成功率を監視 — 90% 以下にドロップすると、より多くの労働者を追加する前に、通貨を削減します。 ProxyHat による各ワーカーは自動 IP の回転を取得します。
ワーカーに非同期またはスレッドを使用する必要がありますか?
I/O-boundスクレイピング(ほとんどの場合)の場合、非同期(Python asyncio、Node.js)は、スレッドよりも優れたリソース効率を提供します。 フェッチと一緒にCPUヘビーパッシングを必要とする場合にのみ、スレッドまたはマルチプロセッシングを使用します。 両方のパターンでgoroutines をエクセルに行きます。
ターゲットサイト構造の変更を処理する方法は?
パイプライン内のデータ検証を実行します。 パースされたデータがバリデーション(フィールドの省略、間違ったタイプ)に失敗すると、チームとキューの影響を受けた URL をアラートして、更新されたパーサで再処理します。 パーサをバージョンアップして、必要に応じてロールバックできます。






