Wie man Scraping-Infrastruktur skaliert

Architekturmuster zum Skalieren von Web-Schrott: queuebasierte Systeme, Pipeline-Design, horizontale Skalierung mit Containern und Proxy-Management im Maßstab. Code in Python, Node.js und Go.

Wie man Scraping-Infrastruktur skaliert

Warum Scraping Infrastruktur braucht Dedizierte Architektur

Ein eingängiges Skript, das eine Website trifft, funktioniert für kleine Aufgaben gut. Aber wenn Sie Millionen von Seiten täglich über Dutzende von Zielen abkratzen müssen, wird dieses Skript ein Engpass. Skalierung der Schrottinfrastruktur erfordert den Übergang von linearen Skripten zu verteilten, queuebasierten Architekturen, die Fehler anmutig handhaben, die Proxy-Drehung verwalten und den Durchsatz maximieren.

Dieser Leitfaden umfasst die Architekturmuster, Wartesysteme, horizontale Skalierungsstrategien und Proxy-Management-Techniken, die Produktion-Grad-Schrott im Maßstab zu treiben.

Dieser Artikel baut auf Konzepten aus unserem Kompletter Leitfaden für Web Scraping Proxies. Für Proxy-Pool-Sizing, siehe Wie viele Proxies benötigen Sie für Schrott?

Architektur Muster für skalierbares Scraping

Muster 1: Queue-Based Scraping

Die Grundlage der skalierbaren Schrott ist ein Nachricht löschen die URL-Erkennung aus dem Daten-Fetching entkoppelt. Arbeiter ziehen Aufgaben aus der Warteschlange, holen Seiten durch Proxies, und drücken Sie die Ergebnisse zur Speicherung.

# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
#                               → Worker 2 →
#                               → Worker N →
#                               ↓
#                        [Dead Letter Queue]
#                        (failed requests)

Vorteile dieses Musters:

  • Horizontale Skalierung: Arbeiter hinzufügen oder entfernen, ohne das System zu ändern
  • Fehlertoleranz: Gefehlte Aufgaben zurück zur Warteschlange für die Wiederaufnahme
  • Steuersatz: Anpassen der Worker-Anzahl an Gesamtdurchsatz
  • Sichtbarkeit: Queue Tiefe zeigt Backlog; Abschlussrate zeigt Gesundheit

Python Implementierung mit Redis Queue

import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
    url: str
    target: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
class ScrapingQueue:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:tasks"
        self.results_key = "scrape:results"
        self.dlq_key = "scrape:dead_letter"
    def enqueue(self, task: ScrapeTask):
        self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
    def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
        result = self.redis.brpop(self.queue_key, timeout=timeout)
        if result:
            data = json.loads(result[1])
            return ScrapeTask(**data)
        return None
    def store_result(self, url: str, data: dict):
        self.redis.hset(self.results_key, url, json.dumps(data))
    def send_to_dlq(self, task: ScrapeTask, error: str):
        task_data = task.__dict__
        task_data["error"] = error
        self.redis.lpush(self.dlq_key, json.dumps(task_data))
    @property
    def pending_count(self) -> int:
        return self.redis.llen(self.queue_key)
class ScrapingWorker:
    def __init__(self, queue: ScrapingQueue, worker_id: int):
        self.queue = queue
        self.worker_id = worker_id
        self.session = requests.Session()
        self.session.proxies = {"http": PROXY, "https": PROXY}
    def process_task(self, task: ScrapeTask) -> bool:
        try:
            resp = self.session.get(task.url, timeout=30)
            if resp.status_code == 200:
                self.queue.store_result(task.url, {
                    "status": 200,
                    "body": resp.text[:10000],  # Truncate for storage
                    "target": task.target,
                })
                return True
            elif resp.status_code in [429, 503]:
                # Retry with backoff
                if task.retries < task.max_retries:
                    task.retries += 1
                    time.sleep(2 ** task.retries)
                    self.queue.enqueue(task)
                else:
                    self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
            else:
                self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
        except Exception as e:
            if task.retries < task.max_retries:
                task.retries += 1
                self.queue.enqueue(task)
            else:
                self.queue.send_to_dlq(task, str(e))
        return False
    def run(self):
        print(f"Worker {self.worker_id} started")
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
    queue.enqueue(ScrapeTask(
        url=f"https://example.com/product/{i}",
        target="example.com"
    ))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
    workers = [ScrapingWorker(queue, i) for i in range(10)]
    for worker in workers:
        executor.submit(worker.run)

Node.js Umsetzung mit Bull Queue

const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => {  // 10 concurrent workers
  const { url, target } = job.data;
  try {
    const res = await fetch(url, { agent, timeout: 30000 });
    if (res.ok) {
      const body = await res.text();
      return { url, status: res.status, body: body.slice(0, 10000) };
    }
    if (res.status === 429 || res.status === 503) {
      throw new Error(`Rate limited: HTTP ${res.status}`);
    }
    return { url, status: res.status, body: null };
  } catch (err) {
    throw err; // Bull will retry based on job options
  }
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
  for (const url of urls) {
    await scrapeQueue.add(
      { url, target },
      {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: 1000,
        removeOnFail: false,
      }
    );
  }
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
  console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
  console.error(`Failed: ${job.data.url} - ${err.message}`);
});

Muster 2: Pipeline Architektur

Für komplexe Abstreifvorgänge verwenden Sie Rohrleitungen wobei jede Stufe ein anderes Anliegen erfüllt:

# Pipeline stages:
#
# Stage 1: URL Discovery    → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction  → parses HTML, extracts data
# Stage 4: Data Validation  → checks quality, deduplicates
# Stage 5: Storage          → saves to database/warehouse

Umsetzung

package main
import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)
type ScrapeResult struct {
    URL    string `json:"url"`
    Status int    `json:"status"`
    Body   string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
    defer wg.Done()
    proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
    client := &http.Client{
        Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
        Timeout:   30 * time.Second,
    }
    for u := range urls {
        resp, err := client.Get(u)
        if err != nil {
            results <- ScrapeResult{URL: u, Status: 0}
            continue
        }
        body, _ := io.ReadAll(resp.Body)
        resp.Body.Close()
        results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
    }
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
    for result := range results {
        if result.Status == 200 {
            // Extract and store data
            data, _ := json.Marshal(result)
            fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
        }
    }
    done <- true
}
func main() {
    urls := make(chan string, 1000)
    results := make(chan ScrapeResult, 1000)
    done := make(chan bool)
    // Start 20 fetcher workers
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go fetcher(urls, results, &wg)
    }
    // Start processor
    go processor(results, done)
    // Feed URLs
    go func() {
        for i := 0; i < 10000; i++ {
            urls <- fmt.Sprintf("https://example.com/product/%d", i)
        }
        close(urls)
    }()
    // Wait for all fetchers, then close results
    wg.Wait()
    close(results)
    <-done
}

Horizontale Skalierungsstrategien

Strategie 1: Multi-Machine Bereitstellung

Verteilen Sie Arbeiter über mehrere Maschinen. Die Warteschlange fungiert als Koordinierungspunkt:

Strategie 1: Multi-Machine Bereitstellung
KomponenteBereitstellungSkalierung
Queue (Redis/RabbitMQ)Dedizierter Server oder verwalteter ServiceVertikal (mehr RAM)
ArbeitnehmerMehrere Maschinen oder BehälterHorizontal (Add-Instanzen)
ErgebnisspeicherungDatenbank oder ObjektspeicherVertikal + härten
ÜberwachungZentrales DashboardEinzelfall

Strategie 2: Containerbasierte Skalierung

Verwenden Sie Docker und Kubernetes für elastisches Skalieren. Jeder Arbeiter läuft in einem Behälter, der repliziert werden kann:

# docker-compose.yml for scraping workers
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  scraper-worker:
    build: .
    environment:
      - PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
      - REDIS_URL=redis://redis:6379/0
      - WORKER_CONCURRENCY=10
    deploy:
      replicas: 5        # 5 containers × 10 concurrency = 50 parallel requests
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
    depends_on:
      - redis

Proxy-Management im Maßstab

Im Maßstab wird die Proxy-Management zu einer kritischen Systemkomponente. Schlüsselüberlegungen:

Anschluss Pooling

Verwenden Sie Verbindungen zum Proxy-Gateway, anstatt neue zu erstellen. Dies reduziert die Latenz und die Verbindung über Kopf:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
    """Create a session with connection pooling and retry logic."""
    session = requests.Session()
    # Connection pool: keep 20 connections, max 50
    adapter = HTTPAdapter(
        pool_connections=20,
        pool_maxsize=50,
        max_retries=Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[500, 502, 503],
        )
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.proxies = {
        "http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
        "https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
    }
    return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
    resp = session.get(url, timeout=30)

Gesundheitsüberwachung

Überwachen Sie Ihre Proxy-Performance in Echtzeit, um Probleme frühzeitig zu erkennen:

import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
    """Track proxy health metrics for monitoring."""
    requests_total: int = 0
    requests_success: int = 0
    requests_blocked: int = 0
    requests_timeout: int = 0
    latency_samples: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    def record_request(self, status_code: int, latency_ms: float):
        self.requests_total += 1
        self.status_codes[status_code] += 1
        self.latency_samples.append(latency_ms)
        if status_code == 200:
            self.requests_success += 1
        elif status_code in [403, 429]:
            self.requests_blocked += 1
        # Keep only last 1000 samples
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]
    @property
    def success_rate(self) -> float:
        return self.requests_success / self.requests_total if self.requests_total else 0
    @property
    def avg_latency(self) -> float:
        return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
    def report(self) -> str:
        return (
            f"Total: {self.requests_total}, "
            f"Success: {self.success_rate:.1%}, "
            f"Blocked: {self.requests_blocked}, "
            f"Avg Latency: {self.avg_latency:.0f}ms"
        )

Datenspeicherung im Maßstab

Datenspeicherung im Maßstab
SpeichertypDas Beste fürSkala
PostgreSQLStrukturierte Produkt-/VorgabedatenMillionen Zeilen
MongoDBHalbstrukturierte/variable SchemasBillionen von Dokumenten
S3/ObjektspeicherRaw HTML ArchivePetersilien
ElasticsearchVolltextsuche über abgekratzte DatenBillionen von Dokumenten
Klicken Sie hierAnalytics über große DatensätzeTrillionen der Zeilen

Scaling Checkliste

  • Entkoppeln URL-Erkennung aus dem Fetching. Verwenden Sie eine Nachricht Warteschlange zwischen den Stufen.
  • Ergänzen Sie die richtige Retry-Logik. Exponential Backoff mit toten Briefqueues für persistente Fehler.
  • Überwachen Sie alles. Suchtiefe, Erfolgsquoten, Latenz, Fehlerraten pro Zieldomäne.
  • Verwenden Sie die Verbindung Pooling. Verwenden Sie Proxy-Verbindungen, anstatt neue zu erstellen.
  • Plan für Scheitern. Workers Crash, Proxies werden blockiert, Ziele ändern ihre Struktur. Bauen Sie Resilienz in jede Schicht.
  • Test im Maßstab vor dem Start. Ein System, das bei 100 RPM arbeitet, kann aufgrund von Speicher, Verbindungsgrenzen oder Warteschlangen bei 10.000 RPM scheitern.

Für Proxy-Drehstrategien, die Ihre Skalierungsarchitektur ergänzen, lesen Proxy-Drehstrategien für großschalige Schrapierung. Um die Geschwindigkeitsbegrenzungen zu bewältigen, siehe Schrott-Beschränkungen Erklärt.

Verwenden Sie die Python SDK, Node SDK, oder SDK für die Produktion Proxy-Integration, und erkunden ProxyHat plant für hochvolumiges Abkratzen.

Häufig gestellte Fragen

Welches Wartesystem ist am besten zum Abkratzen im Maßstab?

Redis mit Bull (Node.js) oder RQ (Python) arbeitet bis zu Millionen Aufgaben pro Tag. Für größere Skalen bietet Apache Kafka oder RabbitMQ eine bessere Haltbarkeit und Durchsatz. Wählen Sie aus Ihrer bestehenden Infrastruktur und Team-Know-how.

Wie viele gleichzeitige Arbeiter sollte ich laufen?

Beginnen Sie mit 10-20 Arbeitern und Skala basierend auf Ihrer Proxy-Kapazität und Zielorttoleranz. Überwachen Sie die Erfolgsquoten — wenn sie unter 90% fallen, verringern Sie die Konkurrenz, bevor Sie mehr Arbeitnehmer hinzufügen. Jeder Arbeiter durch ProxyHat wird automatisch IP-Drehung.

Sollte ich Async oder Threading für Arbeiter verwenden?

Für I/O-gebundene Abstreifung (die meisten Fälle) bietet Async (Python asyncio, Node.js) eine bessere Ressourceneffizienz als Gewinde. Verwenden Sie Gewinde- oder Multiverarbeitung nur, wenn Sie CPU-schwere Parsing neben dem Fetching benötigen. Gehen Sie Goroutines ausgezeichnet bei beiden Mustern.

Wie kann ich die Struktur der Zielseite ändern?

Implementierung der Datenvalidierung in Ihrer Pipeline. Wenn parsierte Daten die Validierung (missing-Felder, falsche Typen) versagen, warnen Sie Ihr Team und löschen Sie betroffene URLs für die Wiederaufbereitung mit aktualisierten Parsern. Schalten Sie Ihre Parser, damit Sie bei Bedarf zurückrollen können.

Bereit loszulegen?

Zugang zu über 50 Mio. Residential-IPs in über 148 Ländern mit KI-gesteuerter Filterung.

Preise ansehenResidential Proxies
← Zurück zum Blog