Como Escalar Infraestrutura de Scraping

Padrões de arquitetura para raspagem da rede de escalonamento: sistemas baseados em filas, design de pipeline, escala horizontal com containers e gerenciamento de proxy em escala. Código em Python, Node.js e Go.

Como Escalar Infraestrutura de Scraping

Por que a infraestrutura de raspagem precisa de arquitetura dedicada

Um script single-threaded atingindo um site funciona bem para tarefas pequenas. Mas quando você precisa raspar milhões de páginas diariamente em dezenas de alvos, esse roteiro se torna um gargalo. Infra-estrutura de raspagem de escala requer passar de scripts lineares para arquiteturas distribuídas baseadas em filas que lidam com falhas graciosamente, gerenciam rotação de proxy e maximizam o rendimento.

Este guia abrange os padrões de arquitetura, sistemas de filas, estratégias de escala horizontal e técnicas de gerenciamento de proxy que power production-grade raspando em escala.

Este artigo baseia-se em conceitos do nosso Guia completo de Web Raspando Proxies. Para dimensionamento de grupos de proxy, ver Quantas proxies você precisa para raspar?

Padrões de arquitetura para raspagem escalável

Padrão 1: Raspa em fila

A base de raspagem escalável é um fila de mensagens que dissocia a descoberta de URL da recolha de dados. Os trabalhadores retiram tarefas da fila, obtêm páginas através de proxies e enviam resultados para o armazenamento.

# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
#                               → Worker 2 →
#                               → Worker N →
#                               ↓
#                        [Dead Letter Queue]
#                        (failed requests)

Benefícios deste padrão:

  • Escala horizontal: Adicionar ou remover trabalhadores sem alterar o sistema
  • Tolerância à falha: As tarefas falhadas retornam à fila para tentar novamente
  • Controlo da taxa: Ajustar a contagem de trabalhadores para controlar o rendimento global
  • Visibilidade: A profundidade da fila mostra o atraso; a taxa de conclusão mostra a saúde

Implementação em Python com Redes Fila

import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
    url: str
    target: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
class ScrapingQueue:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:tasks"
        self.results_key = "scrape:results"
        self.dlq_key = "scrape:dead_letter"
    def enqueue(self, task: ScrapeTask):
        self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
    def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
        result = self.redis.brpop(self.queue_key, timeout=timeout)
        if result:
            data = json.loads(result[1])
            return ScrapeTask(**data)
        return None
    def store_result(self, url: str, data: dict):
        self.redis.hset(self.results_key, url, json.dumps(data))
    def send_to_dlq(self, task: ScrapeTask, error: str):
        task_data = task.__dict__
        task_data["error"] = error
        self.redis.lpush(self.dlq_key, json.dumps(task_data))
    @property
    def pending_count(self) -> int:
        return self.redis.llen(self.queue_key)
class ScrapingWorker:
    def __init__(self, queue: ScrapingQueue, worker_id: int):
        self.queue = queue
        self.worker_id = worker_id
        self.session = requests.Session()
        self.session.proxies = {"http": PROXY, "https": PROXY}
    def process_task(self, task: ScrapeTask) -> bool:
        try:
            resp = self.session.get(task.url, timeout=30)
            if resp.status_code == 200:
                self.queue.store_result(task.url, {
                    "status": 200,
                    "body": resp.text[:10000],  # Truncate for storage
                    "target": task.target,
                })
                return True
            elif resp.status_code in [429, 503]:
                # Retry with backoff
                if task.retries < task.max_retries:
                    task.retries += 1
                    time.sleep(2 ** task.retries)
                    self.queue.enqueue(task)
                else:
                    self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
            else:
                self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
        except Exception as e:
            if task.retries < task.max_retries:
                task.retries += 1
                self.queue.enqueue(task)
            else:
                self.queue.send_to_dlq(task, str(e))
        return False
    def run(self):
        print(f"Worker {self.worker_id} started")
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
    queue.enqueue(ScrapeTask(
        url=f"https://example.com/product/{i}",
        target="example.com"
    ))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
    workers = [ScrapingWorker(queue, i) for i in range(10)]
    for worker in workers:
        executor.submit(worker.run)

Implementação Node.js com Bull Fila

const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => {  // 10 concurrent workers
  const { url, target } = job.data;
  try {
    const res = await fetch(url, { agent, timeout: 30000 });
    if (res.ok) {
      const body = await res.text();
      return { url, status: res.status, body: body.slice(0, 10000) };
    }
    if (res.status === 429 || res.status === 503) {
      throw new Error(`Rate limited: HTTP ${res.status}`);
    }
    return { url, status: res.status, body: null };
  } catch (err) {
    throw err; // Bull will retry based on job options
  }
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
  for (const url of urls) {
    await scrapeQueue.add(
      { url, target },
      {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: 1000,
        removeOnFail: false,
      }
    );
  }
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
  console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
  console.error(`Failed: ${job.data.url} - ${err.message}`);
});

Padrão 2: Arquitetura Pipeline

Para fluxos de trabalho de raspagem complexos, use pipeline Quando cada fase tratar uma preocupação diferente:

# Pipeline stages:
#
# Stage 1: URL Discovery    → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction  → parses HTML, extracts data
# Stage 4: Data Validation  → checks quality, deduplicates
# Stage 5: Storage          → saves to database/warehouse

Ir Execução

package main
import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)
type ScrapeResult struct {
    URL    string `json:"url"`
    Status int    `json:"status"`
    Body   string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
    defer wg.Done()
    proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
    client := &http.Client{
        Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
        Timeout:   30 * time.Second,
    }
    for u := range urls {
        resp, err := client.Get(u)
        if err != nil {
            results <- ScrapeResult{URL: u, Status: 0}
            continue
        }
        body, _ := io.ReadAll(resp.Body)
        resp.Body.Close()
        results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
    }
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
    for result := range results {
        if result.Status == 200 {
            // Extract and store data
            data, _ := json.Marshal(result)
            fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
        }
    }
    done <- true
}
func main() {
    urls := make(chan string, 1000)
    results := make(chan ScrapeResult, 1000)
    done := make(chan bool)
    // Start 20 fetcher workers
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go fetcher(urls, results, &wg)
    }
    // Start processor
    go processor(results, done)
    // Feed URLs
    go func() {
        for i := 0; i < 10000; i++ {
            urls <- fmt.Sprintf("https://example.com/product/%d", i)
        }
        close(urls)
    }()
    // Wait for all fetchers, then close results
    wg.Wait()
    close(results)
    <-done
}

Estratégias de Escala Horizontal

Estratégia 1: Implantação de máquinas múltiplas

Distribuir trabalhadores em várias máquinas. A fila funciona como o ponto de coordenação:

Estratégia 1: Implantação de máquinas múltiplas
ComponenteImplantaçãoEscala
Fila (Redis/RabbitMQ)Servidor dedicado ou serviço gerenciadoVertical (mais RAM)
TrabalhadoresMúltiplas máquinas ou recipientesHorizontal (informações adicionais)
Armazenamento dos resultadosDepósito de dados ou armazenamento de objetosVertical + raspagem
AcompanhamentoPainel centralizadoUma única instância

Estratégia 2: Escala baseada em contentores

Use Docker e Kubernetes para escalar elástico. Cada trabalhador corre em um recipiente que pode ser replicado:

# docker-compose.yml for scraping workers
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  scraper-worker:
    build: .
    environment:
      - PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
      - REDIS_URL=redis://redis:6379/0
      - WORKER_CONCURRENCY=10
    deploy:
      replicas: 5        # 5 containers × 10 concurrency = 50 parallel requests
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
    depends_on:
      - redis

Gerenciamento de Proxy na Escala

Em escala, o gerenciamento de proxy se torna um componente crítico do sistema. Considerações-chave:

Pooling de Ligação

Reutilizar conexões para o gateway proxy em vez de criar novas por solicitação. Isso reduz a latência e a sobrecarga de conexão:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
    """Create a session with connection pooling and retry logic."""
    session = requests.Session()
    # Connection pool: keep 20 connections, max 50
    adapter = HTTPAdapter(
        pool_connections=20,
        pool_maxsize=50,
        max_retries=Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[500, 502, 503],
        )
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.proxies = {
        "http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
        "https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
    }
    return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
    resp = session.get(url, timeout=30)

Monitorização da Saúde

Monitore seu desempenho de proxy em tempo real para detectar problemas precocemente:

import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
    """Track proxy health metrics for monitoring."""
    requests_total: int = 0
    requests_success: int = 0
    requests_blocked: int = 0
    requests_timeout: int = 0
    latency_samples: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    def record_request(self, status_code: int, latency_ms: float):
        self.requests_total += 1
        self.status_codes[status_code] += 1
        self.latency_samples.append(latency_ms)
        if status_code == 200:
            self.requests_success += 1
        elif status_code in [403, 429]:
            self.requests_blocked += 1
        # Keep only last 1000 samples
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]
    @property
    def success_rate(self) -> float:
        return self.requests_success / self.requests_total if self.requests_total else 0
    @property
    def avg_latency(self) -> float:
        return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
    def report(self) -> str:
        return (
            f"Total: {self.requests_total}, "
            f"Success: {self.success_rate:.1%}, "
            f"Blocked: {self.requests_blocked}, "
            f"Avg Latency: {self.avg_latency:.0f}ms"
        )

Armazenamento de dados na escala

Armazenamento de dados na escala
Tipo de ArmazenamentoMelhor paraEscala
PostgreSQLDados sobre o produto/preço estruturadosMilhões de linhas
MongoDBEsquemas semiestruturados/variáveisBilhões de documentos
Armazenamento S3/ObjectArquivos HTML em brutoPetabytes
Pesquisa ElasticPesquisa de texto completo sobre dados raspadosBilhões de documentos
ClickHouseAnálise sobre grandes conjuntos de dadosTrilhões de linhas

Lista de Verificação de Escala

  • Desbloquear a descoberta do URL ao obter. Usar uma fila de mensagens entre as etapas.
  • Aplicar lógica de repetição adequada. Retrocesso exponencial com filas de letras mortas para falhas persistentes.
  • Monitorizar tudo. Profundidade da fila, taxas de sucesso, latência, taxas de erro por domínio alvo.
  • Use o agrupamento de conexões. Reutilizar conexões proxy em vez de criar novas por solicitação.
  • Plano para o fracasso. Os trabalhadores caem, os proxies ficam bloqueados, os alvos mudam a sua estrutura. Construir resiliência em cada camada.
  • Teste em escala antes do lançamento. Um sistema que funciona em 100 RPM pode falhar em 10.000 RPM devido à memória, limites de conexão ou gargalos de fila.

Para estratégias de rotação proxy que complementam sua arquitetura de escala, leia Estratégias de rotação proxy para raspagem de grande escala. Para lidar com limites de taxa como você escala, veja Limites de taxa de raspagem explicados.

Utilizar o Python SDK, Nó SDK, ou Ir SDK para integração de proxy de produção, e explorar Planos do ProxyHat para raspagem de alto volume.

Perguntas Frequentes

Que sistema de fila é melhor para raspar em escala?

Redis com Bull (Node.js) ou RQ (Python) funciona bem até milhões de tarefas por dia. Para maior escala, o Apache Kafka ou RabbitMQ proporciona melhor durabilidade e rendimento. Escolha com base em sua infraestrutura existente e experiência em equipe.

Quantos trabalhadores simultâneos devo correr?

Comece com 10-20 trabalhadores e escala com base em sua capacidade proxy e tolerância ao site alvo. Monitorar as taxas de sucesso — se cairem abaixo de 90%, reduzir a concorrência antes de adicionar mais trabalhadores. Cada trabalhador através do ProxyHat obtém rotação IP automática.

Devo usar async ou threading para os trabalhadores?

Para raspagem de E/S (a maioria dos casos), o async (Python asynio, Node.js) proporciona melhor eficiência de recursos do que threading. Use threading ou multiprocessamento apenas quando você precisa de processamento de CPU-pesado ao lado de busca. As gorotinas são excelentes em ambos os padrões.

Como lidar com mudanças na estrutura do site alvo?

Implemente a validação de dados em seu pipeline. Quando os dados analisados falharem a validação (campos em falta, tipos errados), alerte sua equipe e URLs afetadas para reprocessamento com analisadores atualizados. Versão seus analisadores para que você possa voltar se necessário.

Pronto para começar?

Acesse mais de 50M de IPs residenciais em mais de 148 países com filtragem por IA.

Ver preçosProxies residenciais
← Voltar ao Blog