Pourquoi l'infrastructure de scraping nécessite une architecture dédiée
Un script à un seul trait frappant un site Web fonctionne bien pour les petites tâches. Mais quand vous avez besoin de gratter des millions de pages quotidiennement sur des dizaines de cibles, ce script devient un goulot d'étranglement. Élargissement des infrastructures de démolition nécessite de passer de scripts linéaires à des architectures distribuées, basées sur la file d'attente qui traitent les échecs gracieusement, gèrent la rotation de proxy et maximisent le débit.
Ce guide couvre les modèles d'architecture, les systèmes de file d'attente, les stratégies d'échelle horizontale et les techniques de gestion par procuration qui alimentent à l'échelle le grattage de qualité de production.
Cet article s'appuie sur les concepts de notre Guide complet des produits de scraping Web. Pour le dimensionnement du pool de proxy, voir Combien de proxies avez-vous besoin pour le scraping?
Patterns d'architecture pour le scraping évolutive
Modèle 1: Scraping en file d'attente
La base de la grattage évolutive est une file d'attente des messages qui découple la découverte d'URL de la récupération de données. Les travailleurs tirent les tâches de la file d'attente, récupèrent les pages à travers les proxies, et poussent les résultats au stockage.
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)Avantages de ce modèle :
- Échelle horizontale: Ajouter ou supprimer des travailleurs sans modifier le système
- Tolérance aux défauts: Les tâches échouées reviennent à la file d'attente pour réessayer
- Contrôle des taux: Ajuster le nombre de travailleurs pour contrôler le débit global
- Visibilité: La profondeur de la file montre l'arriéré; le taux d'achèvement montre la santé
Mise en œuvre de Python avec Redis Queue
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)Mise en œuvre de Node.js avec Bull Queue
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});Modèle 2 : Architecture des pipelines
Pour des flux de travail complexes, utilisez une pipeline où chaque étape traite une préoccupation différente:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouseAller à la mise en œuvre
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}Stratégies horizontales de calibrage
Stratégie 1 : Déploiement multimachines
Distribuer les travailleurs sur plusieurs machines. La file d'attente sert de point de coordination:
| Composante | Déploiement | Élargissement |
|---|---|---|
| Requête (Redis/RabbitMQ) | Serveur dédié ou service géré | Vertical (plus de RAM) |
| Travailleurs | Machines ou récipients multiples | Horizontale (ajouter les instances) |
| Stockage des résultats | Base de données ou magasin d'objets | Vertical + ardeur |
| Surveillance | Tableau de bord centralisé | Une seule instance |
Stratégie 2 : Élargissement axé sur les conteneurs
Utilisez Docker et Kubernetes pour l'échelle élastique. Chaque travailleur court dans un conteneur qui peut être reproduit:
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redisGestion du mandataire à l'échelle
À l'échelle, la gestion par procuration devient une composante essentielle du système. Principales considérations:
Mise en commun des connexions
Réutiliser les connexions à la passerelle proxy au lieu de créer de nouvelles connexions par requête. Cela réduit les frais de latence et de connexion:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)Surveillance de la santé
Surveillez vos performances proxy en temps réel pour détecter les problèmes rapidement :
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)Stockage des données à l'échelle
| Type de stockage | Meilleur pour | Échelle |
|---|---|---|
| PostgreSQL | Données sur les produits structurés/prix | Millions de lignes |
| MangoDB | Schémas semi-structurés/variables | Des milliards de documents |
| Stockage S3/Object | Archives HTML brutes | Pétaoctets |
| Recherche élastique | Recherche en texte intégral sur les données supprimées | Des milliards de documents |
| ClickHouse | Analyse sur les grands ensembles de données | Trillions de lignes |
Liste de contrôle pour l'élargissement
- Découpler la découverte d'URL de la récupération. Utilisez une file d'attente entre les étapes.
- Mettre en œuvre une logique de réessayer appropriée. Défaut exponentiel avec des files d'attente de lettres mortes pour des échecs persistants.
- Surveille tout. Queue profondeur, taux de réussite, latence, taux d'erreur par domaine cible.
- Utilisez la piscine de connexion. Réutiliser les connexions proxy au lieu de créer de nouvelles par requête.
- Prévoyez un échec. Les travailleurs s'écrasent, les proxies sont bloqués, les cibles changent leur structure. Construisez la résilience dans chaque couche.
- Tester à l'échelle avant le lancement. Un système qui fonctionne à 100 RPM peut échouer à 10 000 RPM en raison de la mémoire, des limites de connexion ou des goulets d'étranglement de la file d'attente.
Pour les stratégies de rotation par procuration qui complètent votre architecture d'échelle, lire Stratégies de rotation par procuration pour le scrapage à grande échelle. Pour gérer les limites de taux en fonction de l'échelle, voir Limites de vitesse expliquées.
Utiliser Python SDK, Numéro SDKou Allez au SDK pour l'intégration de proxy de production, et explorer ProxyHat plans pour le grattage en grand volume.
Foire aux questions
Quel système de file d'attente est le meilleur pour le grattage à l'échelle?
Redis avec Bull (Node.js) ou RQ (Python) fonctionne bien jusqu'à des millions de tâches par jour. Pour une plus grande échelle, Apache Kafka ou RabbitMQ offre une meilleure durabilité et un meilleur débit. Choisissez en fonction de votre infrastructure existante et de votre expertise d'équipe.
Combien de travailleurs doivent-ils courir ?
Commencez par 10-20 travailleurs et échelle en fonction de votre capacité de proxy et de la tolérance au site cible. Surveiller les taux de réussite — s'ils baissent au-dessous de 90 %, réduire la convergence avant d'ajouter plus de travailleurs. Chaque travailleur par ProxyHat obtient une rotation IP automatique.
Devrais-je utiliser l'async ou le filetage pour les travailleurs?
Pour le grattage lié aux E/S (la plupart des cas), async (Python asyncio, Node.js) offre une meilleure efficacité des ressources que le filetage. Utilisez le filetage ou le multitraitement seulement lorsque vous avez besoin de l'analyse CPU lourde à côté de la récupération. Go Goroutines excelle dans les deux modèles.
Comment gérer les changements de structure du site cible?
Implémentez la validation des données dans votre pipeline. Lorsque les données parsed échouent la validation (champs manquants, mauvais types), alertez votre équipe et les URLs affectées par la file d'attente pour le re-traitement avec des analyseurs mis à jour. Version de vos analyseurs afin que vous puissiez revenir en arrière si nécessaire.






