Warum Scraping Infrastruktur braucht Dedizierte Architektur
Ein eingängiges Skript, das eine Website trifft, funktioniert für kleine Aufgaben gut. Aber wenn Sie Millionen von Seiten täglich über Dutzende von Zielen abkratzen müssen, wird dieses Skript ein Engpass. Skalierung der Schrottinfrastruktur erfordert den Übergang von linearen Skripten zu verteilten, queuebasierten Architekturen, die Fehler anmutig handhaben, die Proxy-Drehung verwalten und den Durchsatz maximieren.
Dieser Leitfaden umfasst die Architekturmuster, Wartesysteme, horizontale Skalierungsstrategien und Proxy-Management-Techniken, die Produktion-Grad-Schrott im Maßstab zu treiben.
Dieser Artikel baut auf Konzepten aus unserem Kompletter Leitfaden für Web Scraping Proxies. Für Proxy-Pool-Sizing, siehe Wie viele Proxies benötigen Sie für Schrott?
Architektur Muster für skalierbares Scraping
Muster 1: Queue-Based Scraping
Die Grundlage der skalierbaren Schrott ist ein Nachricht löschen die URL-Erkennung aus dem Daten-Fetching entkoppelt. Arbeiter ziehen Aufgaben aus der Warteschlange, holen Seiten durch Proxies, und drücken Sie die Ergebnisse zur Speicherung.
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)Vorteile dieses Musters:
- Horizontale Skalierung: Arbeiter hinzufügen oder entfernen, ohne das System zu ändern
- Fehlertoleranz: Gefehlte Aufgaben zurück zur Warteschlange für die Wiederaufnahme
- Steuersatz: Anpassen der Worker-Anzahl an Gesamtdurchsatz
- Sichtbarkeit: Queue Tiefe zeigt Backlog; Abschlussrate zeigt Gesundheit
Python Implementierung mit Redis Queue
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)Node.js Umsetzung mit Bull Queue
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});Muster 2: Pipeline Architektur
Für komplexe Abstreifvorgänge verwenden Sie Rohrleitungen wobei jede Stufe ein anderes Anliegen erfüllt:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouseUmsetzung
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}Horizontale Skalierungsstrategien
Strategie 1: Multi-Machine Bereitstellung
Verteilen Sie Arbeiter über mehrere Maschinen. Die Warteschlange fungiert als Koordinierungspunkt:
| Komponente | Bereitstellung | Skalierung |
|---|---|---|
| Queue (Redis/RabbitMQ) | Dedizierter Server oder verwalteter Service | Vertikal (mehr RAM) |
| Arbeitnehmer | Mehrere Maschinen oder Behälter | Horizontal (Add-Instanzen) |
| Ergebnisspeicherung | Datenbank oder Objektspeicher | Vertikal + härten |
| Überwachung | Zentrales Dashboard | Einzelfall |
Strategie 2: Containerbasierte Skalierung
Verwenden Sie Docker und Kubernetes für elastisches Skalieren. Jeder Arbeiter läuft in einem Behälter, der repliziert werden kann:
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redisProxy-Management im Maßstab
Im Maßstab wird die Proxy-Management zu einer kritischen Systemkomponente. Schlüsselüberlegungen:
Anschluss Pooling
Verwenden Sie Verbindungen zum Proxy-Gateway, anstatt neue zu erstellen. Dies reduziert die Latenz und die Verbindung über Kopf:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)Gesundheitsüberwachung
Überwachen Sie Ihre Proxy-Performance in Echtzeit, um Probleme frühzeitig zu erkennen:
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)Datenspeicherung im Maßstab
| Speichertyp | Das Beste für | Skala |
|---|---|---|
| PostgreSQL | Strukturierte Produkt-/Vorgabedaten | Millionen Zeilen |
| MongoDB | Halbstrukturierte/variable Schemas | Billionen von Dokumenten |
| S3/Objektspeicher | Raw HTML Archive | Petersilien |
| Elasticsearch | Volltextsuche über abgekratzte Daten | Billionen von Dokumenten |
| Klicken Sie hier | Analytics über große Datensätze | Trillionen der Zeilen |
Scaling Checkliste
- Entkoppeln URL-Erkennung aus dem Fetching. Verwenden Sie eine Nachricht Warteschlange zwischen den Stufen.
- Ergänzen Sie die richtige Retry-Logik. Exponential Backoff mit toten Briefqueues für persistente Fehler.
- Überwachen Sie alles. Suchtiefe, Erfolgsquoten, Latenz, Fehlerraten pro Zieldomäne.
- Verwenden Sie die Verbindung Pooling. Verwenden Sie Proxy-Verbindungen, anstatt neue zu erstellen.
- Plan für Scheitern. Workers Crash, Proxies werden blockiert, Ziele ändern ihre Struktur. Bauen Sie Resilienz in jede Schicht.
- Test im Maßstab vor dem Start. Ein System, das bei 100 RPM arbeitet, kann aufgrund von Speicher, Verbindungsgrenzen oder Warteschlangen bei 10.000 RPM scheitern.
Für Proxy-Drehstrategien, die Ihre Skalierungsarchitektur ergänzen, lesen Proxy-Drehstrategien für großschalige Schrapierung. Um die Geschwindigkeitsbegrenzungen zu bewältigen, siehe Schrott-Beschränkungen Erklärt.
Verwenden Sie die Python SDK, Node SDK, oder SDK für die Produktion Proxy-Integration, und erkunden ProxyHat plant für hochvolumiges Abkratzen.
Häufig gestellte Fragen
Welches Wartesystem ist am besten zum Abkratzen im Maßstab?
Redis mit Bull (Node.js) oder RQ (Python) arbeitet bis zu Millionen Aufgaben pro Tag. Für größere Skalen bietet Apache Kafka oder RabbitMQ eine bessere Haltbarkeit und Durchsatz. Wählen Sie aus Ihrer bestehenden Infrastruktur und Team-Know-how.
Wie viele gleichzeitige Arbeiter sollte ich laufen?
Beginnen Sie mit 10-20 Arbeitern und Skala basierend auf Ihrer Proxy-Kapazität und Zielorttoleranz. Überwachen Sie die Erfolgsquoten — wenn sie unter 90% fallen, verringern Sie die Konkurrenz, bevor Sie mehr Arbeitnehmer hinzufügen. Jeder Arbeiter durch ProxyHat wird automatisch IP-Drehung.
Sollte ich Async oder Threading für Arbeiter verwenden?
Für I/O-gebundene Abstreifung (die meisten Fälle) bietet Async (Python asyncio, Node.js) eine bessere Ressourceneffizienz als Gewinde. Verwenden Sie Gewinde- oder Multiverarbeitung nur, wenn Sie CPU-schwere Parsing neben dem Fetching benötigen. Gehen Sie Goroutines ausgezeichnet bei beiden Mustern.
Wie kann ich die Struktur der Zielseite ändern?
Implementierung der Datenvalidierung in Ihrer Pipeline. Wenn parsierte Daten die Validierung (missing-Felder, falsche Typen) versagen, warnen Sie Ihr Team und löschen Sie betroffene URLs für die Wiederaufbereitung mit aktualisierten Parsern. Schalten Sie Ihre Parser, damit Sie bei Bedarf zurückrollen können.






