Por que a infraestrutura de raspagem precisa de arquitetura dedicada
Um script single-threaded atingindo um site funciona bem para tarefas pequenas. Mas quando você precisa raspar milhões de páginas diariamente em dezenas de alvos, esse roteiro se torna um gargalo. Infra-estrutura de raspagem de escala requer passar de scripts lineares para arquiteturas distribuídas baseadas em filas que lidam com falhas graciosamente, gerenciam rotação de proxy e maximizam o rendimento.
Este guia abrange os padrões de arquitetura, sistemas de filas, estratégias de escala horizontal e técnicas de gerenciamento de proxy que power production-grade raspando em escala.
Este artigo baseia-se em conceitos do nosso Guia completo de Web Raspando Proxies. Para dimensionamento de grupos de proxy, ver Quantas proxies você precisa para raspar?
Padrões de arquitetura para raspagem escalável
Padrão 1: Raspa em fila
A base de raspagem escalável é um fila de mensagens que dissocia a descoberta de URL da recolha de dados. Os trabalhadores retiram tarefas da fila, obtêm páginas através de proxies e enviam resultados para o armazenamento.
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)Benefícios deste padrão:
- Escala horizontal: Adicionar ou remover trabalhadores sem alterar o sistema
- Tolerância à falha: As tarefas falhadas retornam à fila para tentar novamente
- Controlo da taxa: Ajustar a contagem de trabalhadores para controlar o rendimento global
- Visibilidade: A profundidade da fila mostra o atraso; a taxa de conclusão mostra a saúde
Implementação em Python com Redes Fila
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)Implementação Node.js com Bull Fila
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});Padrão 2: Arquitetura Pipeline
Para fluxos de trabalho de raspagem complexos, use pipeline Quando cada fase tratar uma preocupação diferente:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouseIr Execução
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}Estratégias de Escala Horizontal
Estratégia 1: Implantação de máquinas múltiplas
Distribuir trabalhadores em várias máquinas. A fila funciona como o ponto de coordenação:
| Componente | Implantação | Escala |
|---|---|---|
| Fila (Redis/RabbitMQ) | Servidor dedicado ou serviço gerenciado | Vertical (mais RAM) |
| Trabalhadores | Múltiplas máquinas ou recipientes | Horizontal (informações adicionais) |
| Armazenamento dos resultados | Depósito de dados ou armazenamento de objetos | Vertical + raspagem |
| Acompanhamento | Painel centralizado | Uma única instância |
Estratégia 2: Escala baseada em contentores
Use Docker e Kubernetes para escalar elástico. Cada trabalhador corre em um recipiente que pode ser replicado:
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redisGerenciamento de Proxy na Escala
Em escala, o gerenciamento de proxy se torna um componente crítico do sistema. Considerações-chave:
Pooling de Ligação
Reutilizar conexões para o gateway proxy em vez de criar novas por solicitação. Isso reduz a latência e a sobrecarga de conexão:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)Monitorização da Saúde
Monitore seu desempenho de proxy em tempo real para detectar problemas precocemente:
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)Armazenamento de dados na escala
| Tipo de Armazenamento | Melhor para | Escala |
|---|---|---|
| PostgreSQL | Dados sobre o produto/preço estruturados | Milhões de linhas |
| MongoDB | Esquemas semiestruturados/variáveis | Bilhões de documentos |
| Armazenamento S3/Object | Arquivos HTML em bruto | Petabytes |
| Pesquisa Elastic | Pesquisa de texto completo sobre dados raspados | Bilhões de documentos |
| ClickHouse | Análise sobre grandes conjuntos de dados | Trilhões de linhas |
Lista de Verificação de Escala
- Desbloquear a descoberta do URL ao obter. Usar uma fila de mensagens entre as etapas.
- Aplicar lógica de repetição adequada. Retrocesso exponencial com filas de letras mortas para falhas persistentes.
- Monitorizar tudo. Profundidade da fila, taxas de sucesso, latência, taxas de erro por domínio alvo.
- Use o agrupamento de conexões. Reutilizar conexões proxy em vez de criar novas por solicitação.
- Plano para o fracasso. Os trabalhadores caem, os proxies ficam bloqueados, os alvos mudam a sua estrutura. Construir resiliência em cada camada.
- Teste em escala antes do lançamento. Um sistema que funciona em 100 RPM pode falhar em 10.000 RPM devido à memória, limites de conexão ou gargalos de fila.
Para estratégias de rotação proxy que complementam sua arquitetura de escala, leia Estratégias de rotação proxy para raspagem de grande escala. Para lidar com limites de taxa como você escala, veja Limites de taxa de raspagem explicados.
Utilizar o Python SDK, Nó SDK, ou Ir SDK para integração de proxy de produção, e explorar Planos do ProxyHat para raspagem de alto volume.
Perguntas Frequentes
Que sistema de fila é melhor para raspar em escala?
Redis com Bull (Node.js) ou RQ (Python) funciona bem até milhões de tarefas por dia. Para maior escala, o Apache Kafka ou RabbitMQ proporciona melhor durabilidade e rendimento. Escolha com base em sua infraestrutura existente e experiência em equipe.
Quantos trabalhadores simultâneos devo correr?
Comece com 10-20 trabalhadores e escala com base em sua capacidade proxy e tolerância ao site alvo. Monitorar as taxas de sucesso — se cairem abaixo de 90%, reduzir a concorrência antes de adicionar mais trabalhadores. Cada trabalhador através do ProxyHat obtém rotação IP automática.
Devo usar async ou threading para os trabalhadores?
Para raspagem de E/S (a maioria dos casos), o async (Python asynio, Node.js) proporciona melhor eficiência de recursos do que threading. Use threading ou multiprocessamento apenas quando você precisa de processamento de CPU-pesado ao lado de busca. As gorotinas são excelentes em ambos os padrões.
Como lidar com mudanças na estrutura do site alvo?
Implemente a validação de dados em seu pipeline. Quando os dados analisados falharem a validação (campos em falta, tipos errados), alerte sua equipe e URLs afetadas para reprocessamento com analisadores atualizados. Versão seus analisadores para que você possa voltar se necessário.






