Cómo escalar la infraestructura de scraping

Pautas de arquitectura para escalar el raspado web: sistemas basados en colas, diseño de tuberías, escalado horizontal con contenedores, y gestión proxy a escala. Código en Python, Node.js y Go.

Cómo escalar la infraestructura de scraping

Por qué cambiar infraestructura necesita arquitectura dedicada

Un script de un solo hilo que golpea un sitio web funciona bien para tareas pequeñas. Pero cuando necesitas raspar millones de páginas diariamente a través de docenas de objetivos, ese script se convierte en un cuello de botella. Infraestructura de raspado escala requiere pasar de scripts lineales a arquitecturas distribuidas, basadas en cola que manejan las fallas con gracia, gestionar la rotación proxy, y maximizar el rendimiento.

Esta guía cubre los patrones de arquitectura, los sistemas de cola, las estrategias de escalado horizontal y las técnicas de gestión proxy que el raspado de grado de producción de potencia a escala.

Este artículo se basa en conceptos de nuestro Guía completa de Proxies de Rastreo WebPara el tamaño de la piscina proxy, ver ¿Cuántos Proxies necesitas para rascar?

Patrones de Arquitectura para Scalable Scraping

Patrón 1: Raspado basado en la cola

La base del raspado escalable es un mensaje queue que decodifica el descubrimiento URL de la búsqueda de datos. Los trabajadores sacan tareas de la cola, recogen páginas a través de proxies, y empujan los resultados al almacenamiento.

# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
#                               → Worker 2 →
#                               → Worker N →
#                               ↓
#                        [Dead Letter Queue]
#                        (failed requests)

Beneficios de este patrón:

  • Escalada horizontal: Agregar o eliminar trabajadores sin cambiar el sistema
  • Tolerancia por defecto: Las tareas fallidas regresan a la cola para volver a entrar
  • Control de tarifas: El recuento obrero ajustado para controlar el rendimiento general
  • Visibilidad: La profundidad de las colas muestra retraso; la tasa de terminación muestra salud

Python Implementation with Redis Queue

import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
    url: str
    target: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
class ScrapingQueue:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:tasks"
        self.results_key = "scrape:results"
        self.dlq_key = "scrape:dead_letter"
    def enqueue(self, task: ScrapeTask):
        self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
    def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
        result = self.redis.brpop(self.queue_key, timeout=timeout)
        if result:
            data = json.loads(result[1])
            return ScrapeTask(**data)
        return None
    def store_result(self, url: str, data: dict):
        self.redis.hset(self.results_key, url, json.dumps(data))
    def send_to_dlq(self, task: ScrapeTask, error: str):
        task_data = task.__dict__
        task_data["error"] = error
        self.redis.lpush(self.dlq_key, json.dumps(task_data))
    @property
    def pending_count(self) -> int:
        return self.redis.llen(self.queue_key)
class ScrapingWorker:
    def __init__(self, queue: ScrapingQueue, worker_id: int):
        self.queue = queue
        self.worker_id = worker_id
        self.session = requests.Session()
        self.session.proxies = {"http": PROXY, "https": PROXY}
    def process_task(self, task: ScrapeTask) -> bool:
        try:
            resp = self.session.get(task.url, timeout=30)
            if resp.status_code == 200:
                self.queue.store_result(task.url, {
                    "status": 200,
                    "body": resp.text[:10000],  # Truncate for storage
                    "target": task.target,
                })
                return True
            elif resp.status_code in [429, 503]:
                # Retry with backoff
                if task.retries < task.max_retries:
                    task.retries += 1
                    time.sleep(2 ** task.retries)
                    self.queue.enqueue(task)
                else:
                    self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
            else:
                self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
        except Exception as e:
            if task.retries < task.max_retries:
                task.retries += 1
                self.queue.enqueue(task)
            else:
                self.queue.send_to_dlq(task, str(e))
        return False
    def run(self):
        print(f"Worker {self.worker_id} started")
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
    queue.enqueue(ScrapeTask(
        url=f"https://example.com/product/{i}",
        target="example.com"
    ))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
    workers = [ScrapingWorker(queue, i) for i in range(10)]
    for worker in workers:
        executor.submit(worker.run)

Node.js Aplicación con Bull Queue

const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => {  // 10 concurrent workers
  const { url, target } = job.data;
  try {
    const res = await fetch(url, { agent, timeout: 30000 });
    if (res.ok) {
      const body = await res.text();
      return { url, status: res.status, body: body.slice(0, 10000) };
    }
    if (res.status === 429 || res.status === 503) {
      throw new Error(`Rate limited: HTTP ${res.status}`);
    }
    return { url, status: res.status, body: null };
  } catch (err) {
    throw err; // Bull will retry based on job options
  }
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
  for (const url of urls) {
    await scrapeQueue.add(
      { url, target },
      {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: 1000,
        removeOnFail: false,
      }
    );
  }
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
  console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
  console.error(`Failed: ${job.data.url} - ${err.message}`);
});

Patrón 2: Arquitectura de Pipeline

Para los flujos de trabajo complejos, utilice un tubería donde cada etapa maneja una preocupación diferente:

# Pipeline stages:
#
# Stage 1: URL Discovery    → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction  → parses HTML, extracts data
# Stage 4: Data Validation  → checks quality, deduplicates
# Stage 5: Storage          → saves to database/warehouse

Go Implementation

package main
import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)
type ScrapeResult struct {
    URL    string `json:"url"`
    Status int    `json:"status"`
    Body   string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
    defer wg.Done()
    proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
    client := &http.Client{
        Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
        Timeout:   30 * time.Second,
    }
    for u := range urls {
        resp, err := client.Get(u)
        if err != nil {
            results <- ScrapeResult{URL: u, Status: 0}
            continue
        }
        body, _ := io.ReadAll(resp.Body)
        resp.Body.Close()
        results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
    }
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
    for result := range results {
        if result.Status == 200 {
            // Extract and store data
            data, _ := json.Marshal(result)
            fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
        }
    }
    done <- true
}
func main() {
    urls := make(chan string, 1000)
    results := make(chan ScrapeResult, 1000)
    done := make(chan bool)
    // Start 20 fetcher workers
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go fetcher(urls, results, &wg)
    }
    // Start processor
    go processor(results, done)
    // Feed URLs
    go func() {
        for i := 0; i < 10000; i++ {
            urls <- fmt.Sprintf("https://example.com/product/%d", i)
        }
        close(urls)
    }()
    // Wait for all fetchers, then close results
    wg.Wait()
    close(results)
    <-done
}

Estrategias de escalado horizontal

Estrategia 1: Despliegue multimaquina

Distribuir trabajadores a través de múltiples máquinas. La cola actúa como punto de coordinación:

Estrategia 1: Despliegue multimaquina
ComponenteDespliegueEscalada
Queue (Redis/RabbitMQ)Servidor dedicado o servicio gestionadoVertical (más RAM)
TrabajadoresMúltiples máquinas o contenedoresHorizontal (add instances)
Almacenamiento de resultadosBase de datos o almacén de objetosVertical + sharding
SupervisiónDashboard centralizadoCaso único

Estrategia 2: Escalada basada en los contenedores

Use Docker y Kubernetes para el escalado elástico. Cada trabajador corre en un contenedor que puede ser replicado:

# docker-compose.yml for scraping workers
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  scraper-worker:
    build: .
    environment:
      - PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
      - REDIS_URL=redis://redis:6379/0
      - WORKER_CONCURRENCY=10
    deploy:
      replicas: 5        # 5 containers × 10 concurrency = 50 parallel requests
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
    depends_on:
      - redis

Proxy Management at Scale

A escala, la gestión proxy se convierte en un componente crítico del sistema. Consideraciones clave:

Conexión Piscina

Reutiliza las conexiones a la puerta de entrada proxy en lugar de crear nuevas por petición. Esto reduce la latencia y la conexión generales:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
    """Create a session with connection pooling and retry logic."""
    session = requests.Session()
    # Connection pool: keep 20 connections, max 50
    adapter = HTTPAdapter(
        pool_connections=20,
        pool_maxsize=50,
        max_retries=Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[500, 502, 503],
        )
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.proxies = {
        "http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
        "https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
    }
    return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
    resp = session.get(url, timeout=30)

Vigilancia de la salud

Supervise su rendimiento proxy en tiempo real para detectar problemas temprano:

import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
    """Track proxy health metrics for monitoring."""
    requests_total: int = 0
    requests_success: int = 0
    requests_blocked: int = 0
    requests_timeout: int = 0
    latency_samples: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    def record_request(self, status_code: int, latency_ms: float):
        self.requests_total += 1
        self.status_codes[status_code] += 1
        self.latency_samples.append(latency_ms)
        if status_code == 200:
            self.requests_success += 1
        elif status_code in [403, 429]:
            self.requests_blocked += 1
        # Keep only last 1000 samples
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]
    @property
    def success_rate(self) -> float:
        return self.requests_success / self.requests_total if self.requests_total else 0
    @property
    def avg_latency(self) -> float:
        return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
    def report(self) -> str:
        return (
            f"Total: {self.requests_total}, "
            f"Success: {self.success_rate:.1%}, "
            f"Blocked: {self.requests_blocked}, "
            f"Avg Latency: {self.avg_latency:.0f}ms"
        )

Almacenamiento de datos en Escala

Almacenamiento de datos en Escala
Tipo de almacenamientoMejorEscala
PostgreSQLDatos estructurados del producto/pricingMillones de filas
MongoDBEsquemas semiestructurados/variablesBillones de documentos
S3/Object storageArchivos HTML brutosPetabytes
ElasticsearchBúsqueda de texto completo sobre datos chatarraBillones de documentos
ClickHouseAnálisis sobre conjuntos de datos grandesTrillions of rows

Lista de verificación de escalas

  • Descúbrete el descubrimiento de URL de buscar. Use una cola de mensaje entre etapas.
  • Implementar una lógica de retry adecuada. Respaldo exponencial con colas de cartas muertas por fallos persistentes.
  • Monitorear todo. Profundidad de cola, tasas de éxito, latencia, tasas de error por dominio objetivo.
  • Usar conexión a la piscina. Reutiliza las conexiones proxy en lugar de crear nuevas por solicitud.
  • Plan de fracaso. Los trabajadores se estrellan, los proxies se bloquean, los objetivos cambian su estructura. Construir la resiliencia en cada capa.
  • Prueba a escala antes del lanzamiento. Un sistema que funciona a 100 RPM puede fallar en 10.000 RPM debido a la memoria, los límites de conexión o embotellamientos de cola.

Para las estrategias de rotación proxy que complementan su arquitectura de escalado, lea Estrategias de Rotación Proxy para Raspado de Escala GrandePara manejar los límites de tarifas a medida que escala, vea Límites de la tasa de rachado Explicados.

Usar el Python SDK, Nodo SDKo Go SDK para la integración proxy de producción, y explorar Planes ProxyHat para el raspado de alto volumen.

Preguntas frecuentes

¿Qué sistema de cola es mejor para raspar a escala?

Redis con Bull (Node.js) o RQ (Python) funciona bien hasta millones de tareas por día. Para mayor escala, Apache Kafka o RabbitMQ proporciona una mayor durabilidad y rendimiento. Elija basado en su infraestructura existente y experiencia en equipo.

¿Cuántos trabajadores concurrentes debo correr?

Comience con 10-20 trabajadores y escala basada en su capacidad proxy y tolerancia al sitio objetivo. Supervisar las tasas de éxito - si bajan por debajo del 90%, reducir la concurrencia antes de añadir más trabajadores. Cada trabajador a través de ProxyHat recibe la rotación IP automática.

¿Debería usar asinc o rosca para los trabajadores?

Para I/O-bound scraping (la mayoría de los casos), async (Python asyncio, Node.js) proporciona una mejor eficiencia de recursos que la rosca. Usar rosca o multiprocesamiento sólo cuando necesite que la CPU-heavy pare junto con la captura. Goroutines sobresalen en ambos patrones.

¿Cómo puedo manejar cambios en la estructura del sitio objetivo?

Implementar validación de datos en su oleoducto. Cuando los datos analizados fallan la validación (campos perdidos, tipos incorrectos), alerte a su equipo y cola URL afectados para volver a procesar con paresers actualizados. Versión tus parsers para que puedas regresar si es necesario.

¿Listo para empezar?

Accede a más de 50M de IPs residenciales en más de 148 países con filtrado impulsado por IA.

Ver preciosProxies residenciales
← Volver al Blog