Comment mettre à l'échelle l'infrastructure de scraping

Modèles d'architecture pour le grattage du réseau : systèmes basés sur la file d'attente, conception de pipeline, échelle horizontale avec conteneurs et gestion par procuration à l'échelle. Code en Python, Node.js et Go.

Comment mettre à l'échelle l'infrastructure de scraping

Pourquoi l'infrastructure de scraping nécessite une architecture dédiée

Un script à un seul trait frappant un site Web fonctionne bien pour les petites tâches. Mais quand vous avez besoin de gratter des millions de pages quotidiennement sur des dizaines de cibles, ce script devient un goulot d'étranglement. Élargissement des infrastructures de démolition nécessite de passer de scripts linéaires à des architectures distribuées, basées sur la file d'attente qui traitent les échecs gracieusement, gèrent la rotation de proxy et maximisent le débit.

Ce guide couvre les modèles d'architecture, les systèmes de file d'attente, les stratégies d'échelle horizontale et les techniques de gestion par procuration qui alimentent à l'échelle le grattage de qualité de production.

Cet article s'appuie sur les concepts de notre Guide complet des produits de scraping Web. Pour le dimensionnement du pool de proxy, voir Combien de proxies avez-vous besoin pour le scraping?

Patterns d'architecture pour le scraping évolutive

Modèle 1: Scraping en file d'attente

La base de la grattage évolutive est une file d'attente des messages qui découple la découverte d'URL de la récupération de données. Les travailleurs tirent les tâches de la file d'attente, récupèrent les pages à travers les proxies, et poussent les résultats au stockage.

# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
#                               → Worker 2 →
#                               → Worker N →
#                               ↓
#                        [Dead Letter Queue]
#                        (failed requests)

Avantages de ce modèle :

  • Échelle horizontale: Ajouter ou supprimer des travailleurs sans modifier le système
  • Tolérance aux défauts: Les tâches échouées reviennent à la file d'attente pour réessayer
  • Contrôle des taux: Ajuster le nombre de travailleurs pour contrôler le débit global
  • Visibilité: La profondeur de la file montre l'arriéré; le taux d'achèvement montre la santé

Mise en œuvre de Python avec Redis Queue

import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
    url: str
    target: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
class ScrapingQueue:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:tasks"
        self.results_key = "scrape:results"
        self.dlq_key = "scrape:dead_letter"
    def enqueue(self, task: ScrapeTask):
        self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
    def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
        result = self.redis.brpop(self.queue_key, timeout=timeout)
        if result:
            data = json.loads(result[1])
            return ScrapeTask(**data)
        return None
    def store_result(self, url: str, data: dict):
        self.redis.hset(self.results_key, url, json.dumps(data))
    def send_to_dlq(self, task: ScrapeTask, error: str):
        task_data = task.__dict__
        task_data["error"] = error
        self.redis.lpush(self.dlq_key, json.dumps(task_data))
    @property
    def pending_count(self) -> int:
        return self.redis.llen(self.queue_key)
class ScrapingWorker:
    def __init__(self, queue: ScrapingQueue, worker_id: int):
        self.queue = queue
        self.worker_id = worker_id
        self.session = requests.Session()
        self.session.proxies = {"http": PROXY, "https": PROXY}
    def process_task(self, task: ScrapeTask) -> bool:
        try:
            resp = self.session.get(task.url, timeout=30)
            if resp.status_code == 200:
                self.queue.store_result(task.url, {
                    "status": 200,
                    "body": resp.text[:10000],  # Truncate for storage
                    "target": task.target,
                })
                return True
            elif resp.status_code in [429, 503]:
                # Retry with backoff
                if task.retries < task.max_retries:
                    task.retries += 1
                    time.sleep(2 ** task.retries)
                    self.queue.enqueue(task)
                else:
                    self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
            else:
                self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
        except Exception as e:
            if task.retries < task.max_retries:
                task.retries += 1
                self.queue.enqueue(task)
            else:
                self.queue.send_to_dlq(task, str(e))
        return False
    def run(self):
        print(f"Worker {self.worker_id} started")
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
    queue.enqueue(ScrapeTask(
        url=f"https://example.com/product/{i}",
        target="example.com"
    ))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
    workers = [ScrapingWorker(queue, i) for i in range(10)]
    for worker in workers:
        executor.submit(worker.run)

Mise en œuvre de Node.js avec Bull Queue

const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => {  // 10 concurrent workers
  const { url, target } = job.data;
  try {
    const res = await fetch(url, { agent, timeout: 30000 });
    if (res.ok) {
      const body = await res.text();
      return { url, status: res.status, body: body.slice(0, 10000) };
    }
    if (res.status === 429 || res.status === 503) {
      throw new Error(`Rate limited: HTTP ${res.status}`);
    }
    return { url, status: res.status, body: null };
  } catch (err) {
    throw err; // Bull will retry based on job options
  }
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
  for (const url of urls) {
    await scrapeQueue.add(
      { url, target },
      {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: 1000,
        removeOnFail: false,
      }
    );
  }
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
  console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
  console.error(`Failed: ${job.data.url} - ${err.message}`);
});

Modèle 2 : Architecture des pipelines

Pour des flux de travail complexes, utilisez une pipeline où chaque étape traite une préoccupation différente:

# Pipeline stages:
#
# Stage 1: URL Discovery    → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction  → parses HTML, extracts data
# Stage 4: Data Validation  → checks quality, deduplicates
# Stage 5: Storage          → saves to database/warehouse

Aller à la mise en œuvre

package main
import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)
type ScrapeResult struct {
    URL    string `json:"url"`
    Status int    `json:"status"`
    Body   string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
    defer wg.Done()
    proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
    client := &http.Client{
        Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
        Timeout:   30 * time.Second,
    }
    for u := range urls {
        resp, err := client.Get(u)
        if err != nil {
            results <- ScrapeResult{URL: u, Status: 0}
            continue
        }
        body, _ := io.ReadAll(resp.Body)
        resp.Body.Close()
        results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
    }
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
    for result := range results {
        if result.Status == 200 {
            // Extract and store data
            data, _ := json.Marshal(result)
            fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
        }
    }
    done <- true
}
func main() {
    urls := make(chan string, 1000)
    results := make(chan ScrapeResult, 1000)
    done := make(chan bool)
    // Start 20 fetcher workers
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go fetcher(urls, results, &wg)
    }
    // Start processor
    go processor(results, done)
    // Feed URLs
    go func() {
        for i := 0; i < 10000; i++ {
            urls <- fmt.Sprintf("https://example.com/product/%d", i)
        }
        close(urls)
    }()
    // Wait for all fetchers, then close results
    wg.Wait()
    close(results)
    <-done
}

Stratégies horizontales de calibrage

Stratégie 1 : Déploiement multimachines

Distribuer les travailleurs sur plusieurs machines. La file d'attente sert de point de coordination:

Stratégie 1 : Déploiement multimachines
ComposanteDéploiementÉlargissement
Requête (Redis/RabbitMQ)Serveur dédié ou service géréVertical (plus de RAM)
TravailleursMachines ou récipients multiplesHorizontale (ajouter les instances)
Stockage des résultatsBase de données ou magasin d'objetsVertical + ardeur
SurveillanceTableau de bord centraliséUne seule instance

Stratégie 2 : Élargissement axé sur les conteneurs

Utilisez Docker et Kubernetes pour l'échelle élastique. Chaque travailleur court dans un conteneur qui peut être reproduit:

# docker-compose.yml for scraping workers
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  scraper-worker:
    build: .
    environment:
      - PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
      - REDIS_URL=redis://redis:6379/0
      - WORKER_CONCURRENCY=10
    deploy:
      replicas: 5        # 5 containers × 10 concurrency = 50 parallel requests
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
    depends_on:
      - redis

Gestion du mandataire à l'échelle

À l'échelle, la gestion par procuration devient une composante essentielle du système. Principales considérations:

Mise en commun des connexions

Réutiliser les connexions à la passerelle proxy au lieu de créer de nouvelles connexions par requête. Cela réduit les frais de latence et de connexion:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
    """Create a session with connection pooling and retry logic."""
    session = requests.Session()
    # Connection pool: keep 20 connections, max 50
    adapter = HTTPAdapter(
        pool_connections=20,
        pool_maxsize=50,
        max_retries=Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[500, 502, 503],
        )
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.proxies = {
        "http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
        "https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
    }
    return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
    resp = session.get(url, timeout=30)

Surveillance de la santé

Surveillez vos performances proxy en temps réel pour détecter les problèmes rapidement :

import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
    """Track proxy health metrics for monitoring."""
    requests_total: int = 0
    requests_success: int = 0
    requests_blocked: int = 0
    requests_timeout: int = 0
    latency_samples: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    def record_request(self, status_code: int, latency_ms: float):
        self.requests_total += 1
        self.status_codes[status_code] += 1
        self.latency_samples.append(latency_ms)
        if status_code == 200:
            self.requests_success += 1
        elif status_code in [403, 429]:
            self.requests_blocked += 1
        # Keep only last 1000 samples
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]
    @property
    def success_rate(self) -> float:
        return self.requests_success / self.requests_total if self.requests_total else 0
    @property
    def avg_latency(self) -> float:
        return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
    def report(self) -> str:
        return (
            f"Total: {self.requests_total}, "
            f"Success: {self.success_rate:.1%}, "
            f"Blocked: {self.requests_blocked}, "
            f"Avg Latency: {self.avg_latency:.0f}ms"
        )

Stockage des données à l'échelle

Stockage des données à l'échelle
Type de stockageMeilleur pourÉchelle
PostgreSQLDonnées sur les produits structurés/prixMillions de lignes
MangoDBSchémas semi-structurés/variablesDes milliards de documents
Stockage S3/ObjectArchives HTML brutesPétaoctets
Recherche élastiqueRecherche en texte intégral sur les données suppriméesDes milliards de documents
ClickHouseAnalyse sur les grands ensembles de donnéesTrillions de lignes

Liste de contrôle pour l'élargissement

  • Découpler la découverte d'URL de la récupération. Utilisez une file d'attente entre les étapes.
  • Mettre en œuvre une logique de réessayer appropriée. Défaut exponentiel avec des files d'attente de lettres mortes pour des échecs persistants.
  • Surveille tout. Queue profondeur, taux de réussite, latence, taux d'erreur par domaine cible.
  • Utilisez la piscine de connexion. Réutiliser les connexions proxy au lieu de créer de nouvelles par requête.
  • Prévoyez un échec. Les travailleurs s'écrasent, les proxies sont bloqués, les cibles changent leur structure. Construisez la résilience dans chaque couche.
  • Tester à l'échelle avant le lancement. Un système qui fonctionne à 100 RPM peut échouer à 10 000 RPM en raison de la mémoire, des limites de connexion ou des goulets d'étranglement de la file d'attente.

Pour les stratégies de rotation par procuration qui complètent votre architecture d'échelle, lire Stratégies de rotation par procuration pour le scrapage à grande échelle. Pour gérer les limites de taux en fonction de l'échelle, voir Limites de vitesse expliquées.

Utiliser Python SDK, Numéro SDKou Allez au SDK pour l'intégration de proxy de production, et explorer ProxyHat plans pour le grattage en grand volume.

Foire aux questions

Quel système de file d'attente est le meilleur pour le grattage à l'échelle?

Redis avec Bull (Node.js) ou RQ (Python) fonctionne bien jusqu'à des millions de tâches par jour. Pour une plus grande échelle, Apache Kafka ou RabbitMQ offre une meilleure durabilité et un meilleur débit. Choisissez en fonction de votre infrastructure existante et de votre expertise d'équipe.

Combien de travailleurs doivent-ils courir ?

Commencez par 10-20 travailleurs et échelle en fonction de votre capacité de proxy et de la tolérance au site cible. Surveiller les taux de réussite — s'ils baissent au-dessous de 90 %, réduire la convergence avant d'ajouter plus de travailleurs. Chaque travailleur par ProxyHat obtient une rotation IP automatique.

Devrais-je utiliser l'async ou le filetage pour les travailleurs?

Pour le grattage lié aux E/S (la plupart des cas), async (Python asyncio, Node.js) offre une meilleure efficacité des ressources que le filetage. Utilisez le filetage ou le multitraitement seulement lorsque vous avez besoin de l'analyse CPU lourde à côté de la récupération. Go Goroutines excelle dans les deux modèles.

Comment gérer les changements de structure du site cible?

Implémentez la validation des données dans votre pipeline. Lorsque les données parsed échouent la validation (champs manquants, mauvais types), alertez votre équipe et les URLs affectées par la file d'attente pour le re-traitement avec des analyseurs mis à jour. Version de vos analyseurs afin que vous puissiez revenir en arrière si nécessaire.

Prêt à commencer ?

Accédez à plus de 50M d'IPs résidentielles dans plus de 148 pays avec filtrage IA.

Voir les tarifsProxies résidentiels
← Retour au Blog