Concevoir une architecture de scraping fiable

Concevoir un système de grattage de bout en bout : planificateur, file d'attente URL, pool de rampeurs, couche proxy, analyseur, stockage et surveillance. Code Python prêt à la production avec diagrammes d'architecture.

Concevoir une architecture de scraping fiable

Au-delà des Scripts: Pourquoi l'architecture compte

Un racleur à un seul fichier fonctionne bien pour une centaine de pages. Mais quand vous avez besoin de collecter des millions de points de données sur des dizaines de sites cibles sur un calendrier récurrent, vous avez besoin d'un système — pas d'un script. Une architecture de grattage fiable sépare les préoccupations en composants indépendants qui peuvent être réduits, surveillés et récupérés indépendamment.

Ce guide passe par la conception d'un système de grattage de production, depuis la planification des URL jusqu'à la gestion par procuration jusqu'au stockage des données. Chaque composant est illustré avec du code et connecté au Infrastructure proxyHat mandataire.

Un racleur bien architecturé traite la collecte de données comme un problème d'ingénierie, pas un problème de piratage. Chaque composant a une responsabilité unique, des interfaces claires et un comportement observable.

Aperçu de l'architecture du système

Un système de grattage de production se compose de six éléments principaux:

┌─────────────┐     ┌──────────────┐     ┌─────────────────┐
│  Scheduler   │────▶│  URL Queue   │────▶│  Crawler Pool   │
│  (cron/API)  │     │  (Redis)     │     │  (workers)      │
└─────────────┘     └──────────────┘     └────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │  Proxy Layer     │
                                          │  (ProxyHat)      │
                                          └────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │  Parser          │
                                          │  (extract data)  │
                                          └────────┬────────┘
                                                   │
                    ┌──────────────┐      ┌────────▼────────┐
                    │  Monitoring  │◀─────│  Storage         │
                    │  (metrics)   │      │  (DB / files)    │
                    └──────────────┘      └─────────────────┘
Aperçu de l'architecture du système
ComposanteResponsabilitéTechnologie
PlanificateurDécide quoi gratter et quandCron, Céleri Beat, Bull
Demande d'URLURLs tampons avec priorité et déduplicationRedis, RabbitMQ, SQS
Pool de ramasseursFetches pages simultanémentasyncio, goroutines, fils de travail
Calque mandataireDemandes d'itinéraires à travers des procurations tournantesPortail ProxyHat
AnalyseurExtrait des données structurées de HTML/JSONBelle Soupe, cheelio, goquery
StockageDonnées extraites par les PersistesPostgreSQL, MongoDB, S3
SurveillanceSuivi de la santé et de la performanceProméthée, enregistrement, alertes

Composante 1 : planificateur

Le planificateur décide quelles URLs ramper et quand. Il gère la fréquence de rampage, les niveaux de priorité, et assure qu'aucune URL n'est grattée plus souvent que nécessaire.

import redis
import json
import time
from datetime import datetime, timedelta
class CrawlScheduler:
    """Manages crawl schedules and feeds URLs to the queue."""
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scraper:url_queue"
        self.schedule_key = "scraper:schedules"
        self.seen_key = "scraper:seen_urls"
    def add_schedule(self, name: str, urls: list[str], interval_hours: int, priority: int = 5):
        """Register a recurring crawl job."""
        self.redis.hset(self.schedule_key, name, json.dumps({
            "urls": urls,
            "interval_hours": interval_hours,
            "priority": priority,
            "last_run": None,
        }))
    def tick(self):
        """Check all schedules and enqueue URLs that are due."""
        schedules = self.redis.hgetall(self.schedule_key)
        enqueued = 0
        for name, data in schedules.items():
            schedule = json.loads(data)
            last_run = schedule.get("last_run")
            interval = timedelta(hours=schedule["interval_hours"])
            if last_run and datetime.fromisoformat(last_run) + interval > datetime.utcnow():
                continue
            for url in schedule["urls"]:
                self.enqueue(url, priority=schedule["priority"])
                enqueued += 1
            schedule["last_run"] = datetime.utcnow().isoformat()
            self.redis.hset(self.schedule_key, name, json.dumps(schedule))
        return enqueued
    def enqueue(self, url: str, priority: int = 5):
        """Add a URL to the crawl queue with deduplication."""
        # Skip if recently seen (within 1 hour)
        if self.redis.sismember(self.seen_key, url):
            return False
        self.redis.zadd(self.queue_key, {
            json.dumps({"url": url, "enqueued_at": time.time()}): priority
        })
        self.redis.sadd(self.seen_key, url)
        self.redis.expire(self.seen_key, 3600)  # 1-hour dedup window
        return True
    def dequeue(self, batch_size: int = 10) -> list[dict]:
        """Pull the highest-priority URLs from the queue."""
        items = self.redis.zpopmax(self.queue_key, batch_size)
        return [json.loads(item) for item, score in items]
    @property
    def queue_size(self) -> int:
        return self.redis.zcard(self.queue_key)
# Usage
scheduler = CrawlScheduler()
scheduler.add_schedule(
    name="product_prices",
    urls=[f"https://example.com/product/{i}" for i in range(1, 1001)],
    interval_hours=6,
    priority=8,
)
scheduler.add_schedule(
    name="competitor_pages",
    urls=["https://competitor.com/pricing", "https://competitor.com/features"],
    interval_hours=24,
    priority=5,
)
# Run every minute via cron
enqueued = scheduler.tick()
print(f"Enqueued {enqueued} URLs, queue size: {scheduler.queue_size}")

Composante 2: file d'attente URL

La file d'attente découple le planning de ramper. Il fournit l'ordre de priorité, la contre-pression et la persistance — de sorte qu'aucune URL ne sont perdues si un rampeur s'écrase.

// Node.js queue with Bull
const Queue = require('bull');
const crawlQueue = new Queue('crawl', 'redis://localhost:6379', {
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 },
    removeOnComplete: 100,
    removeOnFail: 500,
  },
});
// Add URLs with priority (lower number = higher priority)
async function enqueueUrls(urls, priority = 5) {
  const jobs = urls.map(url =>
    crawlQueue.add(
      { url, enqueuedAt: Date.now() },
      { priority, jobId: url } // jobId for deduplication
    )
  );
  await Promise.all(jobs);
  console.log(`Enqueued ${urls.length} URLs`);
}
// Worker processes URLs
crawlQueue.process(10, async (job) => {
  const { url } = job.data;
  // Crawl logic here (see Crawler Pool below)
  return { url, status: 'completed' };
});
crawlQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed: ${err.message}`);
});

Composante 3 : Pool de ramassage

La piscine rampeuse récupère des pages simultanément à travers le couche intermédiaire proxy. Il gère Limites de convergence, gère les relevés et transmet les réponses brutes à l'analyseur.

import asyncio
import aiohttp
import uuid
import time
from typing import Optional
class CrawlerPool:
    """Concurrent crawler with proxy rotation and retry logic."""
    def __init__(
        self,
        concurrency: int = 30,
        max_retries: int = 3,
        timeout: int = 30,
    ):
        self.concurrency = concurrency
        self.max_retries = max_retries
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(concurrency)
        self.stats = {"success": 0, "failed": 0, "retries": 0}
    def _get_proxy(self, country: Optional[str] = None) -> str:
        session_id = uuid.uuid4().hex[:8]
        username = f"USERNAME-session-{session_id}"
        if country:
            username += f"-country-{country}"
        return f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
    async def fetch(
        self,
        session: aiohttp.ClientSession,
        url: str,
        country: Optional[str] = None,
    ) -> dict:
        async with self.semaphore:
            for attempt in range(self.max_retries + 1):
                proxy = self._get_proxy(country)
                start = time.time()
                try:
                    async with session.get(
                        url, proxy=proxy, timeout=self.timeout,
                        headers={
                            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                                          "AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
                            "Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
                        },
                    ) as response:
                        body = await response.text()
                        latency = time.time() - start
                        if response.status in (403, 429, 503) and attempt < self.max_retries:
                            self.stats["retries"] += 1
                            await asyncio.sleep(2 ** attempt)
                            continue
                        if response.status < 400:
                            self.stats["success"] += 1
                        else:
                            self.stats["failed"] += 1
                        return {
                            "url": url,
                            "status": response.status,
                            "body": body,
                            "latency": latency,
                            "success": response.status < 400,
                        }
                except Exception as e:
                    if attempt < self.max_retries:
                        self.stats["retries"] += 1
                        await asyncio.sleep(2 ** attempt)
                        continue
                    self.stats["failed"] += 1
                    return {
                        "url": url,
                        "error": str(e),
                        "latency": time.time() - start,
                        "success": False,
                    }
    async def crawl(self, urls: list[str], country: Optional[str] = None) -> list[dict]:
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url, country) for url in urls]
            return await asyncio.gather(*tasks)
# Usage
crawler = CrawlerPool(concurrency=30, max_retries=3)
urls = [f"https://example.com/product/{i}" for i in range(500)]
results = asyncio.run(crawler.crawl(urls, country="us"))
print(f"Success: {crawler.stats['success']}, Failed: {crawler.stats['failed']}")

Composante 4: Parser

L'analyseur transforme le HTML brut en données structurées. Gardez la logique d'analyse séparée de la rampe, ce qui la rend testable, réutilisable et facile à mettre à jour lorsque les sites cibles changent.

from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
import json
@dataclass
class ProductData:
    url: str
    title: Optional[str] = None
    price: Optional[float] = None
    currency: Optional[str] = None
    availability: Optional[str] = None
    rating: Optional[float] = None
    review_count: Optional[int] = None
    parsed_at: Optional[str] = None
class ProductParser:
    """Extracts structured product data from HTML."""
    def parse(self, url: str, html: str) -> ProductData:
        soup = BeautifulSoup(html, "html.parser")
        data = ProductData(url=url)
        try:
            data.title = self._extract_text(soup, "h1.product-title")
            data.price = self._extract_price(soup)
            data.currency = self._extract_currency(soup)
            data.availability = self._extract_text(soup, ".availability-status")
            data.rating = self._extract_rating(soup)
            data.review_count = self._extract_review_count(soup)
            data.parsed_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        except Exception as e:
            logger.error(f"Parse error for {url}: {e}")
        return data
    def _extract_text(self, soup, selector: str) -> Optional[str]:
        el = soup.select_one(selector)
        return el.get_text(strip=True) if el else None
    def _extract_price(self, soup) -> Optional[float]:
        el = soup.select_one("[data-price], .price, .product-price")
        if not el:
            return None
        price_text = el.get("data-price") or el.get_text(strip=True)
        # Remove currency symbols and parse
        cleaned = "".join(c for c in price_text if c.isdigit() or c == ".")
        return float(cleaned) if cleaned else None
    def _extract_currency(self, soup) -> Optional[str]:
        el = soup.select_one("[data-currency], .currency")
        return el.get("data-currency") or el.get_text(strip=True) if el else None
    def _extract_rating(self, soup) -> Optional[float]:
        el = soup.select_one("[data-rating], .rating-value")
        if el:
            val = el.get("data-rating") or el.get_text(strip=True)
            try:
                return float(val)
            except ValueError:
                return None
        return None
    def _extract_review_count(self, soup) -> Optional[int]:
        el = soup.select_one(".review-count, [data-reviews]")
        if el:
            text = el.get("data-reviews") or el.get_text(strip=True)
            digits = "".join(c for c in text if c.isdigit())
            return int(digits) if digits else None
        return None
# Usage
parser = ProductParser()
for result in results:
    if result["success"]:
        product = parser.parse(result["url"], result["body"])
        print(f"{product.title}: ${product.price}")

Composante 5: Stockage

Persistez les données analysées avec la version et la déduplication. Utilisez des upserts pour gérer les re-crawls gracieusement.

import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
class DataStore:
    """Persists scraped data with upsert and versioning."""
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self._ensure_tables()
    def _ensure_tables(self):
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    url TEXT PRIMARY KEY,
                    title TEXT,
                    price NUMERIC(10,2),
                    currency VARCHAR(3),
                    availability TEXT,
                    rating NUMERIC(3,2),
                    review_count INTEGER,
                    first_seen_at TIMESTAMPTZ DEFAULT NOW(),
                    last_updated_at TIMESTAMPTZ DEFAULT NOW()
                );
                CREATE TABLE IF NOT EXISTS price_history (
                    id SERIAL PRIMARY KEY,
                    url TEXT REFERENCES products(url),
                    price NUMERIC(10,2),
                    recorded_at TIMESTAMPTZ DEFAULT NOW()
                );
            """)
            self.conn.commit()
    def upsert_products(self, products: list[ProductData]):
        """Insert or update products, recording price changes."""
        with self.conn.cursor() as cur:
            for product in products:
                if product.price is None:
                    continue
                # Check if price changed
                cur.execute(
                    "SELECT price FROM products WHERE url = %s",
                    (product.url,)
                )
                row = cur.fetchone()
                # Upsert product
                cur.execute("""
                    INSERT INTO products (url, title, price, currency, availability,
                                          rating, review_count, last_updated_at)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())
                    ON CONFLICT (url) DO UPDATE SET
                        title = EXCLUDED.title,
                        price = EXCLUDED.price,
                        currency = EXCLUDED.currency,
                        availability = EXCLUDED.availability,
                        rating = EXCLUDED.rating,
                        review_count = EXCLUDED.review_count,
                        last_updated_at = NOW()
                """, (
                    product.url, product.title, product.price,
                    product.currency, product.availability,
                    product.rating, product.review_count,
                ))
                # Record price history if changed
                if row is None or float(row[0]) != product.price:
                    cur.execute(
                        "INSERT INTO price_history (url, price) VALUES (%s, %s)",
                        (product.url, product.price)
                    )
            self.conn.commit()
    @property
    def product_count(self) -> int:
        with self.conn.cursor() as cur:
            cur.execute("SELECT COUNT(*) FROM products")
            return cur.fetchone()[0]
# Usage
store = DataStore("postgresql://user:pass@localhost:5432/scraper")
store.upsert_products(parsed_products)
print(f"Total products: {store.product_count}")

Composante 6 : Surveillance

Attachez tous les composants avec une surveillance centralisée. Pour des modèles détaillés de surveillance par procuration, consultez notre Surveillance de la performance proxy guide.

import time
import json
import logging
from datetime import datetime
class PipelineMonitor:
    """Monitors the entire scraping pipeline."""
    def __init__(self):
        self.logger = logging.getLogger("pipeline")
        self.stage_times = {}
        self.stage_counts = {}
    def start_stage(self, stage: str):
        self.stage_times[stage] = time.time()
    def end_stage(self, stage: str, item_count: int = 0):
        elapsed = time.time() - self.stage_times.get(stage, time.time())
        self.stage_counts[stage] = item_count
        self.logger.info(json.dumps({
            "timestamp": datetime.utcnow().isoformat(),
            "stage": stage,
            "duration_seconds": round(elapsed, 2),
            "items_processed": item_count,
            "items_per_second": round(item_count / elapsed, 1) if elapsed > 0 else 0,
        }))
    def report(self) -> dict:
        return {
            "stages": {
                stage: {
                    "items": self.stage_counts.get(stage, 0),
                }
                for stage in self.stage_counts
            },
            "timestamp": datetime.utcnow().isoformat(),
        }
# Usage within the full pipeline
monitor = PipelineMonitor()
# Stage 1: Schedule
monitor.start_stage("schedule")
urls = scheduler.dequeue(batch_size=500)
monitor.end_stage("schedule", len(urls))
# Stage 2: Crawl
monitor.start_stage("crawl")
results = asyncio.run(crawler.crawl([u["url"] for u in urls]))
monitor.end_stage("crawl", len(results))
# Stage 3: Parse
monitor.start_stage("parse")
products = [parser.parse(r["url"], r["body"]) for r in results if r["success"]]
monitor.end_stage("parse", len(products))
# Stage 4: Store
monitor.start_stage("store")
store.upsert_products(products)
monitor.end_stage("store", len(products))
print(json.dumps(monitor.report(), indent=2))

Tout mettre ensemble

Voici le pipeline complet qui relie les six composants en un seul système de raclage.

import asyncio
import logging
logging.basicConfig(level=logging.INFO)
async def run_pipeline():
    # Initialize components
    scheduler = CrawlScheduler()
    crawler = CrawlerPool(concurrency=30, max_retries=3)
    parser = ProductParser()
    store = DataStore("postgresql://user:pass@localhost:5432/scraper")
    monitor = PipelineMonitor()
    # Schedule crawls
    scheduler.add_schedule(
        name="daily_products",
        urls=[f"https://example.com/product/{i}" for i in range(1, 501)],
        interval_hours=24,
        priority=8,
    )
    # Main loop
    while True:
        # 1. Check schedules and enqueue URLs
        monitor.start_stage("schedule")
        scheduler.tick()
        batch = scheduler.dequeue(batch_size=100)
        monitor.end_stage("schedule", len(batch))
        if not batch:
            await asyncio.sleep(60)
            continue
        # 2. Crawl
        monitor.start_stage("crawl")
        urls = [item["url"] for item in batch]
        results = await crawler.crawl(urls, country="us")
        successful = [r for r in results if r.get("success")]
        monitor.end_stage("crawl", len(successful))
        # 3. Parse
        monitor.start_stage("parse")
        products = []
        for result in successful:
            try:
                product = parser.parse(result["url"], result["body"])
                products.append(product)
            except Exception as e:
                logging.error(f"Parse error: {e}")
        monitor.end_stage("parse", len(products))
        # 4. Store
        monitor.start_stage("store")
        store.upsert_products(products)
        monitor.end_stage("store", len(products))
        # 5. Report
        logging.info(f"Pipeline: {monitor.report()}")
        logging.info(f"Crawler stats: {crawler.stats}")
        # Wait before next batch
        await asyncio.sleep(5)
asyncio.run(run_pipeline())

Modèles de déploiement de la production

Étalonnage horizontal

Rampants à l'échelle indépendamment des planificateurs et des analyseurs. Exécutez plusieurs instances de rampeurs qui consomment depuis la même file d'attente de Redis.

Étalonnage horizontal
ComposanteStratégie de renforcementÉchelle typique
PlanificateurUne seule instance (élection de chef de file)1 instance
Demande d'URLRedis cluster ou file d'attente gérée1 groupe
Pool de ramasseursÉchelle horizontale2-20 instances
Calque mandataireGéré par ProxyHatS.O. (externe)
AnalyseurCo-loqué avec rampeur ou séparé1:1 avec rampeurs
StockageRéplication de la base de données1 primaire + répliques
SurveillanceAgrégation centralisée1 instance

Erreur de récupération

  • Crash du rameur : Les URL restent dans la file d'attente Redis. Le nouveau rampeur les ramasse automatiquement.
  • Défauts de proxy: Les couche intermédiaire les relevés avec des IP frais. Les échecs soutenus déclenchent des alertes.
  • Défauts d'analyse : Le HTML brut est stocké dans une file d'attente en lettres mortes pour une inspection manuelle et des mises à jour d'analyse.
  • Défauts de base de données : tampons de données parsés dans la mémoire / disque avec write-ahead log jusqu'à ce que la DB récupère.

Vérifications de la qualité des données

class DataQualityChecker:
    """Validates parsed data before storage."""
    def check(self, product: ProductData) -> list[str]:
        issues = []
        if not product.title:
            issues.append("missing_title")
        if product.price is not None and product.price <= 0:
            issues.append("invalid_price")
        if product.price is not None and product.price > 100000:
            issues.append("suspicious_price")
        if product.rating is not None and (product.rating < 0 or product.rating > 5):
            issues.append("invalid_rating")
        return issues
    def filter_valid(self, products: list[ProductData]) -> list[ProductData]:
        valid = []
        for product in products:
            issues = self.check(product)
            if issues:
                logger.warning(f"Data quality issues for {product.url}: {issues}")
            else:
                valid.append(product)
        return valid
# Integrate into pipeline
checker = DataQualityChecker()
valid_products = checker.filter_valid(products)
store.upsert_products(valid_products)
La meilleure architecture de grattage est celle que vous pouvez raisonner. Chaque composante devrait répondre à trois questions : Qu'est-ce que ça fait ? Comment ça échoue ? Comment ça se rétablit ?

Pour le composant de couche proxy, voir Bâtir un calque mandataire. Pour optimiser le débit du rampeur, lire Élargissement des demandes de procuration avec contrôle de l'actualité. Pour les rampes basées sur le navigateur, voir Proxies tournantes avec Playwright. Pour les stratégies anti-détection, explorer Comment gratter sans être bloqué.

Commencez par Python SDK, Numéro SDKou Allez au SDK pour l'intégration par procuration. Explorer Prix ProxyHat et la documentation pour alimenter votre architecture de grattage.

Foire aux questions

Prêt à commencer ?

Accédez à plus de 50M d'IPs résidentielles dans plus de 148 pays avec filtrage IA.

Voir les tarifsProxies résidentiels
← Retour au Blog