Por qué cambiar infraestructura necesita arquitectura dedicada
Un script de un solo hilo que golpea un sitio web funciona bien para tareas pequeñas. Pero cuando necesitas raspar millones de páginas diariamente a través de docenas de objetivos, ese script se convierte en un cuello de botella. Infraestructura de raspado escala requiere pasar de scripts lineales a arquitecturas distribuidas, basadas en cola que manejan las fallas con gracia, gestionar la rotación proxy, y maximizar el rendimiento.
Esta guía cubre los patrones de arquitectura, los sistemas de cola, las estrategias de escalado horizontal y las técnicas de gestión proxy que el raspado de grado de producción de potencia a escala.
Este artículo se basa en conceptos de nuestro Guía completa de Proxies de Rastreo WebPara el tamaño de la piscina proxy, ver ¿Cuántos Proxies necesitas para rascar?
Patrones de Arquitectura para Scalable Scraping
Patrón 1: Raspado basado en la cola
La base del raspado escalable es un mensaje queue que decodifica el descubrimiento URL de la búsqueda de datos. Los trabajadores sacan tareas de la cola, recogen páginas a través de proxies, y empujan los resultados al almacenamiento.
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)Beneficios de este patrón:
- Escalada horizontal: Agregar o eliminar trabajadores sin cambiar el sistema
- Tolerancia por defecto: Las tareas fallidas regresan a la cola para volver a entrar
- Control de tarifas: El recuento obrero ajustado para controlar el rendimiento general
- Visibilidad: La profundidad de las colas muestra retraso; la tasa de terminación muestra salud
Python Implementation with Redis Queue
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)Node.js Aplicación con Bull Queue
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});Patrón 2: Arquitectura de Pipeline
Para los flujos de trabajo complejos, utilice un tubería donde cada etapa maneja una preocupación diferente:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouseGo Implementation
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}Estrategias de escalado horizontal
Estrategia 1: Despliegue multimaquina
Distribuir trabajadores a través de múltiples máquinas. La cola actúa como punto de coordinación:
| Componente | Despliegue | Escalada |
|---|---|---|
| Queue (Redis/RabbitMQ) | Servidor dedicado o servicio gestionado | Vertical (más RAM) |
| Trabajadores | Múltiples máquinas o contenedores | Horizontal (add instances) |
| Almacenamiento de resultados | Base de datos o almacén de objetos | Vertical + sharding |
| Supervisión | Dashboard centralizado | Caso único |
Estrategia 2: Escalada basada en los contenedores
Use Docker y Kubernetes para el escalado elástico. Cada trabajador corre en un contenedor que puede ser replicado:
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redisProxy Management at Scale
A escala, la gestión proxy se convierte en un componente crítico del sistema. Consideraciones clave:
Conexión Piscina
Reutiliza las conexiones a la puerta de entrada proxy en lugar de crear nuevas por petición. Esto reduce la latencia y la conexión generales:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)Vigilancia de la salud
Supervise su rendimiento proxy en tiempo real para detectar problemas temprano:
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)Almacenamiento de datos en Escala
| Tipo de almacenamiento | Mejor | Escala |
|---|---|---|
| PostgreSQL | Datos estructurados del producto/pricing | Millones de filas |
| MongoDB | Esquemas semiestructurados/variables | Billones de documentos |
| S3/Object storage | Archivos HTML brutos | Petabytes |
| Elasticsearch | Búsqueda de texto completo sobre datos chatarra | Billones de documentos |
| ClickHouse | Análisis sobre conjuntos de datos grandes | Trillions of rows |
Lista de verificación de escalas
- Descúbrete el descubrimiento de URL de buscar. Use una cola de mensaje entre etapas.
- Implementar una lógica de retry adecuada. Respaldo exponencial con colas de cartas muertas por fallos persistentes.
- Monitorear todo. Profundidad de cola, tasas de éxito, latencia, tasas de error por dominio objetivo.
- Usar conexión a la piscina. Reutiliza las conexiones proxy en lugar de crear nuevas por solicitud.
- Plan de fracaso. Los trabajadores se estrellan, los proxies se bloquean, los objetivos cambian su estructura. Construir la resiliencia en cada capa.
- Prueba a escala antes del lanzamiento. Un sistema que funciona a 100 RPM puede fallar en 10.000 RPM debido a la memoria, los límites de conexión o embotellamientos de cola.
Para las estrategias de rotación proxy que complementan su arquitectura de escalado, lea Estrategias de Rotación Proxy para Raspado de Escala GrandePara manejar los límites de tarifas a medida que escala, vea Límites de la tasa de rachado Explicados.
Usar el Python SDK, Nodo SDKo Go SDK para la integración proxy de producción, y explorar Planes ProxyHat para el raspado de alto volumen.
Preguntas frecuentes
¿Qué sistema de cola es mejor para raspar a escala?
Redis con Bull (Node.js) o RQ (Python) funciona bien hasta millones de tareas por día. Para mayor escala, Apache Kafka o RabbitMQ proporciona una mayor durabilidad y rendimiento. Elija basado en su infraestructura existente y experiencia en equipo.
¿Cuántos trabajadores concurrentes debo correr?
Comience con 10-20 trabajadores y escala basada en su capacidad proxy y tolerancia al sitio objetivo. Supervisar las tasas de éxito - si bajan por debajo del 90%, reducir la concurrencia antes de añadir más trabajadores. Cada trabajador a través de ProxyHat recibe la rotación IP automática.
¿Debería usar asinc o rosca para los trabajadores?
Para I/O-bound scraping (la mayoría de los casos), async (Python asyncio, Node.js) proporciona una mejor eficiencia de recursos que la rosca. Usar rosca o multiprocesamiento sólo cuando necesite que la CPU-heavy pare junto con la captura. Goroutines sobresalen en ambos patrones.
¿Cómo puedo manejar cambios en la estructura del sitio objetivo?
Implementar validación de datos en su oleoducto. Cuando los datos analizados fallan la validación (campos perdidos, tipos incorrectos), alerte a su equipo y cola URL afectados para volver a procesar con paresers actualizados. Versión tus parsers para que puedas regresar si es necesario.






