Eine zuverlässige Scraping-Architektur entwerfen

Entwerfen Sie ein End-to-End-Schrottsystem: Scheduler, URL-Warte, Crawler-Pool, Proxy-Schicht, Parser, Speicher und Überwachung. Produktionsbereiter Python-Code mit Architekturdiagrammen.

Eine zuverlässige Scraping-Architektur entwerfen

Jenseits von Scripts: Warum Architektur

Ein Ein-Datei-Schrotter funktioniert gut für hundert Seiten. Aber wenn Sie Millionen von Datenpunkten über Dutzende von Zielseiten auf einem wiederkehrenden Zeitplan sammeln müssen, benötigen Sie ein System – nicht ein Skript. Eine zuverlässige Abstreifarchitektur trennt sich von unabhängigen Komponenten, die unabhängig skaliert, überwacht und wiederhergestellt werden können.

Diese Anleitung geht durch die Gestaltung eines Produktionsabstreifsystems, von der URL-Scheduling über das Proxymanagement bis zur Datenspeicherung. Jede Komponente ist mit Code dargestellt und mit der ProxyHat Proxyinfrastruktur.

Ein gut geordneter Abstreifer behandelt die Datenerfassung als Engineering-Problem, nicht als Hacking-Problem. Jede Komponente hat eine einzige Verantwortung, klare Schnittstellen und beobachtbares Verhalten.

Systemarchitektur Überblick

Ein Produktionsabstreifsystem besteht aus sechs Kernkomponenten:

┌─────────────┐     ┌──────────────┐     ┌─────────────────┐
│  Scheduler   │────▶│  URL Queue   │────▶│  Crawler Pool   │
│  (cron/API)  │     │  (Redis)     │     │  (workers)      │
└─────────────┘     └──────────────┘     └────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │  Proxy Layer     │
                                          │  (ProxyHat)      │
                                          └────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │  Parser          │
                                          │  (extract data)  │
                                          └────────┬────────┘
                                                   │
                    ┌──────────────┐      ┌────────▼────────┐
                    │  Monitoring  │◀─────│  Storage         │
                    │  (metrics)   │      │  (DB / files)    │
                    └──────────────┘      └─────────────────┘
Systemarchitektur Überblick
KomponenteVerantwortungTechnologie
ZeitplanEntscheidet, was zu kratzen und wannCron, Celery Beat, Bull
Zurück zur ÜbersichtBuffers URLs mit Priorität und DeduplizierungRedis, RabbitMQ, SQS
Crawler PoolFetches Seiten gleichzeitigasyncio, goroutines, worker threads
ProxyschichtRoutenanfragen durch rotierende ProxiesProxyHat Gateway
ParserExtrahiert strukturierte Daten von HTML/JSONSchönSoup, cheerio, goquery
LagerungPersisten extrahierten DatenPostgreSQL, MongoDB, S3
ÜberwachungVerfolgen Sie Gesundheit und LeistungPrometheus, Protokollierung, Alarme

Komponente 1: Planer

Der Scheduler entscheidet, welche URLs zu kriechen und wann. Es verwaltet Crawl-Frequenz, Prioritätsstufen und stellt sicher, dass keine URL häufiger als notwendig abgeschabt wird.

import redis
import json
import time
from datetime import datetime, timedelta
class CrawlScheduler:
    """Manages crawl schedules and feeds URLs to the queue."""
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scraper:url_queue"
        self.schedule_key = "scraper:schedules"
        self.seen_key = "scraper:seen_urls"
    def add_schedule(self, name: str, urls: list[str], interval_hours: int, priority: int = 5):
        """Register a recurring crawl job."""
        self.redis.hset(self.schedule_key, name, json.dumps({
            "urls": urls,
            "interval_hours": interval_hours,
            "priority": priority,
            "last_run": None,
        }))
    def tick(self):
        """Check all schedules and enqueue URLs that are due."""
        schedules = self.redis.hgetall(self.schedule_key)
        enqueued = 0
        for name, data in schedules.items():
            schedule = json.loads(data)
            last_run = schedule.get("last_run")
            interval = timedelta(hours=schedule["interval_hours"])
            if last_run and datetime.fromisoformat(last_run) + interval > datetime.utcnow():
                continue
            for url in schedule["urls"]:
                self.enqueue(url, priority=schedule["priority"])
                enqueued += 1
            schedule["last_run"] = datetime.utcnow().isoformat()
            self.redis.hset(self.schedule_key, name, json.dumps(schedule))
        return enqueued
    def enqueue(self, url: str, priority: int = 5):
        """Add a URL to the crawl queue with deduplication."""
        # Skip if recently seen (within 1 hour)
        if self.redis.sismember(self.seen_key, url):
            return False
        self.redis.zadd(self.queue_key, {
            json.dumps({"url": url, "enqueued_at": time.time()}): priority
        })
        self.redis.sadd(self.seen_key, url)
        self.redis.expire(self.seen_key, 3600)  # 1-hour dedup window
        return True
    def dequeue(self, batch_size: int = 10) -> list[dict]:
        """Pull the highest-priority URLs from the queue."""
        items = self.redis.zpopmax(self.queue_key, batch_size)
        return [json.loads(item) for item, score in items]
    @property
    def queue_size(self) -> int:
        return self.redis.zcard(self.queue_key)
# Usage
scheduler = CrawlScheduler()
scheduler.add_schedule(
    name="product_prices",
    urls=[f"https://example.com/product/{i}" for i in range(1, 1001)],
    interval_hours=6,
    priority=8,
)
scheduler.add_schedule(
    name="competitor_pages",
    urls=["https://competitor.com/pricing", "https://competitor.com/features"],
    interval_hours=24,
    priority=5,
)
# Run every minute via cron
enqueued = scheduler.tick()
print(f"Enqueued {enqueued} URLs, queue size: {scheduler.queue_size}")

Komponente 2: URL Queue

Die Queue decouples scheduling von kriechen. Es bietet Priorität Ordnung, Gegendruck und Beharrlichkeit - so dass keine URLs verloren gehen, wenn ein Raupen abstürzt.

// Node.js queue with Bull
const Queue = require('bull');
const crawlQueue = new Queue('crawl', 'redis://localhost:6379', {
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 },
    removeOnComplete: 100,
    removeOnFail: 500,
  },
});
// Add URLs with priority (lower number = higher priority)
async function enqueueUrls(urls, priority = 5) {
  const jobs = urls.map(url =>
    crawlQueue.add(
      { url, enqueuedAt: Date.now() },
      { priority, jobId: url } // jobId for deduplication
    )
  );
  await Promise.all(jobs);
  console.log(`Enqueued ${urls.length} URLs`);
}
// Worker processes URLs
crawlQueue.process(10, async (job) => {
  const { url } = job.data;
  // Crawl logic here (see Crawler Pool below)
  return { url, status: 'completed' };
});
crawlQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed: ${err.message}`);
});

Komponenten 3: Crawler Pool

Der Crawler-Pool holt gleichzeitig Seiten durch die Proxy Middleware Layer. Es schafft Zulässigkeitsgrenzen, behandelt Retries und übergibt rohe Reaktionen auf den Parser.

import asyncio
import aiohttp
import uuid
import time
from typing import Optional
class CrawlerPool:
    """Concurrent crawler with proxy rotation and retry logic."""
    def __init__(
        self,
        concurrency: int = 30,
        max_retries: int = 3,
        timeout: int = 30,
    ):
        self.concurrency = concurrency
        self.max_retries = max_retries
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(concurrency)
        self.stats = {"success": 0, "failed": 0, "retries": 0}
    def _get_proxy(self, country: Optional[str] = None) -> str:
        session_id = uuid.uuid4().hex[:8]
        username = f"USERNAME-session-{session_id}"
        if country:
            username += f"-country-{country}"
        return f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
    async def fetch(
        self,
        session: aiohttp.ClientSession,
        url: str,
        country: Optional[str] = None,
    ) -> dict:
        async with self.semaphore:
            for attempt in range(self.max_retries + 1):
                proxy = self._get_proxy(country)
                start = time.time()
                try:
                    async with session.get(
                        url, proxy=proxy, timeout=self.timeout,
                        headers={
                            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                                          "AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
                            "Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
                        },
                    ) as response:
                        body = await response.text()
                        latency = time.time() - start
                        if response.status in (403, 429, 503) and attempt < self.max_retries:
                            self.stats["retries"] += 1
                            await asyncio.sleep(2 ** attempt)
                            continue
                        if response.status < 400:
                            self.stats["success"] += 1
                        else:
                            self.stats["failed"] += 1
                        return {
                            "url": url,
                            "status": response.status,
                            "body": body,
                            "latency": latency,
                            "success": response.status < 400,
                        }
                except Exception as e:
                    if attempt < self.max_retries:
                        self.stats["retries"] += 1
                        await asyncio.sleep(2 ** attempt)
                        continue
                    self.stats["failed"] += 1
                    return {
                        "url": url,
                        "error": str(e),
                        "latency": time.time() - start,
                        "success": False,
                    }
    async def crawl(self, urls: list[str], country: Optional[str] = None) -> list[dict]:
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url, country) for url in urls]
            return await asyncio.gather(*tasks)
# Usage
crawler = CrawlerPool(concurrency=30, max_retries=3)
urls = [f"https://example.com/product/{i}" for i in range(500)]
results = asyncio.run(crawler.crawl(urls, country="us"))
print(f"Success: {crawler.stats['success']}, Failed: {crawler.stats['failed']}")

Komponente 4: Parser

Der Parser verwandelt rohes HTML in strukturierte Daten. Halten Sie die Parsing-Logik getrennt vom Crawling - dies macht es testbar, wiederverwendbar und einfach zu aktualisieren, wenn Ziel-Websites ändern.

from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
import json
@dataclass
class ProductData:
    url: str
    title: Optional[str] = None
    price: Optional[float] = None
    currency: Optional[str] = None
    availability: Optional[str] = None
    rating: Optional[float] = None
    review_count: Optional[int] = None
    parsed_at: Optional[str] = None
class ProductParser:
    """Extracts structured product data from HTML."""
    def parse(self, url: str, html: str) -> ProductData:
        soup = BeautifulSoup(html, "html.parser")
        data = ProductData(url=url)
        try:
            data.title = self._extract_text(soup, "h1.product-title")
            data.price = self._extract_price(soup)
            data.currency = self._extract_currency(soup)
            data.availability = self._extract_text(soup, ".availability-status")
            data.rating = self._extract_rating(soup)
            data.review_count = self._extract_review_count(soup)
            data.parsed_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        except Exception as e:
            logger.error(f"Parse error for {url}: {e}")
        return data
    def _extract_text(self, soup, selector: str) -> Optional[str]:
        el = soup.select_one(selector)
        return el.get_text(strip=True) if el else None
    def _extract_price(self, soup) -> Optional[float]:
        el = soup.select_one("[data-price], .price, .product-price")
        if not el:
            return None
        price_text = el.get("data-price") or el.get_text(strip=True)
        # Remove currency symbols and parse
        cleaned = "".join(c for c in price_text if c.isdigit() or c == ".")
        return float(cleaned) if cleaned else None
    def _extract_currency(self, soup) -> Optional[str]:
        el = soup.select_one("[data-currency], .currency")
        return el.get("data-currency") or el.get_text(strip=True) if el else None
    def _extract_rating(self, soup) -> Optional[float]:
        el = soup.select_one("[data-rating], .rating-value")
        if el:
            val = el.get("data-rating") or el.get_text(strip=True)
            try:
                return float(val)
            except ValueError:
                return None
        return None
    def _extract_review_count(self, soup) -> Optional[int]:
        el = soup.select_one(".review-count, [data-reviews]")
        if el:
            text = el.get("data-reviews") or el.get_text(strip=True)
            digits = "".join(c for c in text if c.isdigit())
            return int(digits) if digits else None
        return None
# Usage
parser = ProductParser()
for result in results:
    if result["success"]:
        product = parser.parse(result["url"], result["body"])
        print(f"{product.title}: ${product.price}")

Komponente 5: Lagerung

Persist parsierte Daten mit Versionierung und Deduplizierung. Benutzen Sie Upserts, um neu zu krabbeln.

import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
class DataStore:
    """Persists scraped data with upsert and versioning."""
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self._ensure_tables()
    def _ensure_tables(self):
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    url TEXT PRIMARY KEY,
                    title TEXT,
                    price NUMERIC(10,2),
                    currency VARCHAR(3),
                    availability TEXT,
                    rating NUMERIC(3,2),
                    review_count INTEGER,
                    first_seen_at TIMESTAMPTZ DEFAULT NOW(),
                    last_updated_at TIMESTAMPTZ DEFAULT NOW()
                );
                CREATE TABLE IF NOT EXISTS price_history (
                    id SERIAL PRIMARY KEY,
                    url TEXT REFERENCES products(url),
                    price NUMERIC(10,2),
                    recorded_at TIMESTAMPTZ DEFAULT NOW()
                );
            """)
            self.conn.commit()
    def upsert_products(self, products: list[ProductData]):
        """Insert or update products, recording price changes."""
        with self.conn.cursor() as cur:
            for product in products:
                if product.price is None:
                    continue
                # Check if price changed
                cur.execute(
                    "SELECT price FROM products WHERE url = %s",
                    (product.url,)
                )
                row = cur.fetchone()
                # Upsert product
                cur.execute("""
                    INSERT INTO products (url, title, price, currency, availability,
                                          rating, review_count, last_updated_at)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())
                    ON CONFLICT (url) DO UPDATE SET
                        title = EXCLUDED.title,
                        price = EXCLUDED.price,
                        currency = EXCLUDED.currency,
                        availability = EXCLUDED.availability,
                        rating = EXCLUDED.rating,
                        review_count = EXCLUDED.review_count,
                        last_updated_at = NOW()
                """, (
                    product.url, product.title, product.price,
                    product.currency, product.availability,
                    product.rating, product.review_count,
                ))
                # Record price history if changed
                if row is None or float(row[0]) != product.price:
                    cur.execute(
                        "INSERT INTO price_history (url, price) VALUES (%s, %s)",
                        (product.url, product.price)
                    )
            self.conn.commit()
    @property
    def product_count(self) -> int:
        with self.conn.cursor() as cur:
            cur.execute("SELECT COUNT(*) FROM products")
            return cur.fetchone()[0]
# Usage
store = DataStore("postgresql://user:pass@localhost:5432/scraper")
store.upsert_products(parsed_products)
print(f"Total products: {store.product_count}")

Komponente 6: Überwachung

Binden Sie alle Komponenten zusammen mit zentraler Überwachung. Für detaillierte proxyspezifische Überwachungsmuster siehe unsere Überwachung der Proxy-Leistung Führung.

import time
import json
import logging
from datetime import datetime
class PipelineMonitor:
    """Monitors the entire scraping pipeline."""
    def __init__(self):
        self.logger = logging.getLogger("pipeline")
        self.stage_times = {}
        self.stage_counts = {}
    def start_stage(self, stage: str):
        self.stage_times[stage] = time.time()
    def end_stage(self, stage: str, item_count: int = 0):
        elapsed = time.time() - self.stage_times.get(stage, time.time())
        self.stage_counts[stage] = item_count
        self.logger.info(json.dumps({
            "timestamp": datetime.utcnow().isoformat(),
            "stage": stage,
            "duration_seconds": round(elapsed, 2),
            "items_processed": item_count,
            "items_per_second": round(item_count / elapsed, 1) if elapsed > 0 else 0,
        }))
    def report(self) -> dict:
        return {
            "stages": {
                stage: {
                    "items": self.stage_counts.get(stage, 0),
                }
                for stage in self.stage_counts
            },
            "timestamp": datetime.utcnow().isoformat(),
        }
# Usage within the full pipeline
monitor = PipelineMonitor()
# Stage 1: Schedule
monitor.start_stage("schedule")
urls = scheduler.dequeue(batch_size=500)
monitor.end_stage("schedule", len(urls))
# Stage 2: Crawl
monitor.start_stage("crawl")
results = asyncio.run(crawler.crawl([u["url"] for u in urls]))
monitor.end_stage("crawl", len(results))
# Stage 3: Parse
monitor.start_stage("parse")
products = [parser.parse(r["url"], r["body"]) for r in results if r["success"]]
monitor.end_stage("parse", len(products))
# Stage 4: Store
monitor.start_stage("store")
store.upsert_products(products)
monitor.end_stage("store", len(products))
print(json.dumps(monitor.report(), indent=2))

Alles zusammen setzen

Hier ist die komplette Pipeline, die alle sechs Komponenten zu einem einzigen, lauffähigen Abstreifsystem verbindet.

import asyncio
import logging
logging.basicConfig(level=logging.INFO)
async def run_pipeline():
    # Initialize components
    scheduler = CrawlScheduler()
    crawler = CrawlerPool(concurrency=30, max_retries=3)
    parser = ProductParser()
    store = DataStore("postgresql://user:pass@localhost:5432/scraper")
    monitor = PipelineMonitor()
    # Schedule crawls
    scheduler.add_schedule(
        name="daily_products",
        urls=[f"https://example.com/product/{i}" for i in range(1, 501)],
        interval_hours=24,
        priority=8,
    )
    # Main loop
    while True:
        # 1. Check schedules and enqueue URLs
        monitor.start_stage("schedule")
        scheduler.tick()
        batch = scheduler.dequeue(batch_size=100)
        monitor.end_stage("schedule", len(batch))
        if not batch:
            await asyncio.sleep(60)
            continue
        # 2. Crawl
        monitor.start_stage("crawl")
        urls = [item["url"] for item in batch]
        results = await crawler.crawl(urls, country="us")
        successful = [r for r in results if r.get("success")]
        monitor.end_stage("crawl", len(successful))
        # 3. Parse
        monitor.start_stage("parse")
        products = []
        for result in successful:
            try:
                product = parser.parse(result["url"], result["body"])
                products.append(product)
            except Exception as e:
                logging.error(f"Parse error: {e}")
        monitor.end_stage("parse", len(products))
        # 4. Store
        monitor.start_stage("store")
        store.upsert_products(products)
        monitor.end_stage("store", len(products))
        # 5. Report
        logging.info(f"Pipeline: {monitor.report()}")
        logging.info(f"Crawler stats: {crawler.stats}")
        # Wait before next batch
        await asyncio.sleep(5)
asyncio.run(run_pipeline())

Produktionsentwicklung Muster

Horizontale Skalierung

Waage Crawler unabhängig von Planern und Parsern. Führen Sie mehrere Crawler-Instanzen aus, die von der gleichen Redis-Warteschlange verbrauchen.

Horizontale Skalierung
KomponenteSkalierungsstrategieTypische Waage
ZeitplanEinzelinstanz (Führungswahl)Beispiel
Zurück zur ÜbersichtRedis Cluster oder verwaltete Warteschlange1 Cluster
Crawler PoolHorizontale Podskalierung2-20 Fälle
ProxyschichtVerwaltet von ProxyHatN/A (extern)
Parsermit Crawler oder getrennt1:1 mit Raupen
LagerungDatenbankreplikation1 Primär + Nachbildungen
ÜberwachungZentralisierte AggregationBeispiel

Fehlersuche

  • Crawler Crash: URLs bleiben in Redis Warteschlange. Der neue Raupen holt sie automatisch ab.
  • Proxy-Versagen: Die Mittelschicht Retries mit frischen IPs. Vorgehaltene Fehler lösen Alarme aus.
  • Parser-Versagen: Raw HTML wird in einer Dead-Buchstaben-Quue für manuelle Inspektion und Parser-Updates gespeichert.
  • Datenbankversagen: Parsed Datenpuffer in Speicher/Disk mit Schreib-Ahead-Logging, bis die DB wiederherstellt.

Datenqualitätsprüfungen

class DataQualityChecker:
    """Validates parsed data before storage."""
    def check(self, product: ProductData) -> list[str]:
        issues = []
        if not product.title:
            issues.append("missing_title")
        if product.price is not None and product.price <= 0:
            issues.append("invalid_price")
        if product.price is not None and product.price > 100000:
            issues.append("suspicious_price")
        if product.rating is not None and (product.rating < 0 or product.rating > 5):
            issues.append("invalid_rating")
        return issues
    def filter_valid(self, products: list[ProductData]) -> list[ProductData]:
        valid = []
        for product in products:
            issues = self.check(product)
            if issues:
                logger.warning(f"Data quality issues for {product.url}: {issues}")
            else:
                valid.append(product)
        return valid
# Integrate into pipeline
checker = DataQualityChecker()
valid_products = checker.filter_valid(products)
store.upsert_products(valid_products)
Die beste Scraping-Architektur ist eine, über die Sie Grund haben können. Jede Komponente sollte drei Fragen beantworten: Was macht das? Wie scheitert es? Wie erholt sich das?

Für die Proxyschichtkomponente siehe Aufbau einer Proxy Middleware Layer. Zur Optimierung des Raupendurchsatzes lesen Scaling Proxy Anfragen mit Koncurrency Control. Für Browser-basiertes Crawling, siehe Rotierende Proxies mit Playwright. Für Anti-Detektionsstrategien, erkunden wie man schrott, ohne blockiert zu werden.

Beginnen Sie mit Python SDK, Node SDK, oder SDK für die Proxy-Integration. Entdecken Sie Preise für ProxyHat und Dokumentation Ihre schrottende Architektur zu stärken.

Häufig gestellte Fragen

Bereit loszulegen?

Zugang zu über 50 Mio. Residential-IPs in über 148 Ländern mit KI-gesteuerter Filterung.

Preise ansehenResidential Proxies
← Zurück zum Blog