تصميم بنية استخراج بيانات موثوقة

Design an end-to-end scraping system: scheduler, URL queue, crawler pool, proxy layer, parser, storage, and monitoring. رمز (بايتون) الجاهز للإنتاج مع مخططات هيكلية.

تصميم بنية استخراج بيانات موثوقة

ما بعد النصر: لماذا مسائل الهندسة المعمارية

كشطة واحدة تعمل بشكل جيد لمئة صفحة ولكن عندما تحتاج إلى جمع الملايين من نقاط البيانات عبر عشرات المواقع المستهدفة على جدول متكرر، تحتاج إلى نظام - وليس نص. A reliable scraping structure separates concerns into independent components that can be scaled, monitored, and recovered independently.

This guide walks through the design of a production scraping system, from URL scheduling through proxy management to data storage. كل عنصر موضح بالرمز ومتصل البنية التحتية البديلة.

الخردة الحسنة البحث تعالج جمع البيانات كمشكلة هندسية، وليس مشكلة اختراق. ولكل عنصر مسؤولية واحدة، ووصلات وصل واضحة، والسلوك الجدير بالملاحظة.

استعراض هيكل النظام

ويتألف نظام خردة الإنتاج من ستة عناصر أساسية:

┌─────────────┐     ┌──────────────┐     ┌─────────────────┐
│  Scheduler   │────▶│  URL Queue   │────▶│  Crawler Pool   │
│  (cron/API)  │     │  (Redis)     │     │  (workers)      │
└─────────────┘     └──────────────┘     └────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │  Proxy Layer     │
                                          │  (ProxyHat)      │
                                          └────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │  Parser          │
                                          │  (extract data)  │
                                          └────────┬────────┘
                                                   │
                    ┌──────────────┐      ┌────────▼────────┐
                    │  Monitoring  │◀─────│  Storage         │
                    │  (metrics)   │      │  (DB / files)    │
                    └──────────────┘      └─────────────────┘
استعراض هيكل النظام
العنصرالمسؤوليةالتكنولوجيا
الجدوليقرر ما يكشط ومتىCron, Celery Beat, Bull
URL QueueBuffers URLs with priority and deduplicationRedis, RabbitMQ, SQS
"كراودر بول"صفحات الرسوم في وقت واحد(إسنسيو)، (غوروتينز)، خيوط العمال
Proxy Layerالطلبات المتعلقة بالطرق عن طريق التناوببوابة ProxyHat
ParserExtracts structured data from HTML/JSONسوب جميل، ابتهاج، غوكيري
التخزينالمستخرجون من البياناتPostgreSQL, MongoDB, S3
الرصدالمسارات الصحية والأداءبروميثيوس، قطع الأشجار، الإنذارات

العنصر 1: الجدول

يحدد الجدول أيهما يزحف ومتى It manages crawl frequency, priority levels, and ensures no URL is scraped more often than necessary.

import redis
import json
import time
from datetime import datetime, timedelta
class CrawlScheduler:
    """Manages crawl schedules and feeds URLs to the queue."""
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scraper:url_queue"
        self.schedule_key = "scraper:schedules"
        self.seen_key = "scraper:seen_urls"
    def add_schedule(self, name: str, urls: list[str], interval_hours: int, priority: int = 5):
        """Register a recurring crawl job."""
        self.redis.hset(self.schedule_key, name, json.dumps({
            "urls": urls,
            "interval_hours": interval_hours,
            "priority": priority,
            "last_run": None,
        }))
    def tick(self):
        """Check all schedules and enqueue URLs that are due."""
        schedules = self.redis.hgetall(self.schedule_key)
        enqueued = 0
        for name, data in schedules.items():
            schedule = json.loads(data)
            last_run = schedule.get("last_run")
            interval = timedelta(hours=schedule["interval_hours"])
            if last_run and datetime.fromisoformat(last_run) + interval > datetime.utcnow():
                continue
            for url in schedule["urls"]:
                self.enqueue(url, priority=schedule["priority"])
                enqueued += 1
            schedule["last_run"] = datetime.utcnow().isoformat()
            self.redis.hset(self.schedule_key, name, json.dumps(schedule))
        return enqueued
    def enqueue(self, url: str, priority: int = 5):
        """Add a URL to the crawl queue with deduplication."""
        # Skip if recently seen (within 1 hour)
        if self.redis.sismember(self.seen_key, url):
            return False
        self.redis.zadd(self.queue_key, {
            json.dumps({"url": url, "enqueued_at": time.time()}): priority
        })
        self.redis.sadd(self.seen_key, url)
        self.redis.expire(self.seen_key, 3600)  # 1-hour dedup window
        return True
    def dequeue(self, batch_size: int = 10) -> list[dict]:
        """Pull the highest-priority URLs from the queue."""
        items = self.redis.zpopmax(self.queue_key, batch_size)
        return [json.loads(item) for item, score in items]
    @property
    def queue_size(self) -> int:
        return self.redis.zcard(self.queue_key)
# Usage
scheduler = CrawlScheduler()
scheduler.add_schedule(
    name="product_prices",
    urls=[f"https://example.com/product/{i}" for i in range(1, 1001)],
    interval_hours=6,
    priority=8,
)
scheduler.add_schedule(
    name="competitor_pages",
    urls=["https://competitor.com/pricing", "https://competitor.com/features"],
    interval_hours=24,
    priority=5,
)
# Run every minute via cron
enqueued = scheduler.tick()
print(f"Enqueued {enqueued} URLs, queue size: {scheduler.queue_size}")

العنصر 2: URL Queue

الكوابيس تُخطط من الزحف وهو يوفر ترتيب الأولويات، والقمع، والثبات - حتى لا تضيع القوات الثورية إذا تحطم زحف.

// Node.js queue with Bull
const Queue = require('bull');
const crawlQueue = new Queue('crawl', 'redis://localhost:6379', {
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 },
    removeOnComplete: 100,
    removeOnFail: 500,
  },
});
// Add URLs with priority (lower number = higher priority)
async function enqueueUrls(urls, priority = 5) {
  const jobs = urls.map(url =>
    crawlQueue.add(
      { url, enqueuedAt: Date.now() },
      { priority, jobId: url } // jobId for deduplication
    )
  );
  await Promise.all(jobs);
  console.log(`Enqueued ${urls.length} URLs`);
}
// Worker processes URLs
crawlQueue.process(10, async (job) => {
  const { url } = job.data;
  // Crawl logic here (see Crawler Pool below)
  return { url, status: 'completed' };
});
crawlQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed: ${err.message}`);
});

العنصر 3: مجمع كراوفر

صفحات بركة الزحف في وقت واحد من خلال الطبقة المتوسطة-إنه يدير حدود الاتفاقيتعامل مع المقابر ويمرر ردوداً خامّة على الببغاء

import asyncio
import aiohttp
import uuid
import time
from typing import Optional
class CrawlerPool:
    """Concurrent crawler with proxy rotation and retry logic."""
    def __init__(
        self,
        concurrency: int = 30,
        max_retries: int = 3,
        timeout: int = 30,
    ):
        self.concurrency = concurrency
        self.max_retries = max_retries
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(concurrency)
        self.stats = {"success": 0, "failed": 0, "retries": 0}
    def _get_proxy(self, country: Optional[str] = None) -> str:
        session_id = uuid.uuid4().hex[:8]
        username = f"USERNAME-session-{session_id}"
        if country:
            username += f"-country-{country}"
        return f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
    async def fetch(
        self,
        session: aiohttp.ClientSession,
        url: str,
        country: Optional[str] = None,
    ) -> dict:
        async with self.semaphore:
            for attempt in range(self.max_retries + 1):
                proxy = self._get_proxy(country)
                start = time.time()
                try:
                    async with session.get(
                        url, proxy=proxy, timeout=self.timeout,
                        headers={
                            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                                          "AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
                            "Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
                        },
                    ) as response:
                        body = await response.text()
                        latency = time.time() - start
                        if response.status in (403, 429, 503) and attempt < self.max_retries:
                            self.stats["retries"] += 1
                            await asyncio.sleep(2 ** attempt)
                            continue
                        if response.status < 400:
                            self.stats["success"] += 1
                        else:
                            self.stats["failed"] += 1
                        return {
                            "url": url,
                            "status": response.status,
                            "body": body,
                            "latency": latency,
                            "success": response.status < 400,
                        }
                except Exception as e:
                    if attempt < self.max_retries:
                        self.stats["retries"] += 1
                        await asyncio.sleep(2 ** attempt)
                        continue
                    self.stats["failed"] += 1
                    return {
                        "url": url,
                        "error": str(e),
                        "latency": time.time() - start,
                        "success": False,
                    }
    async def crawl(self, urls: list[str], country: Optional[str] = None) -> list[dict]:
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url, country) for url in urls]
            return await asyncio.gather(*tasks)
# Usage
crawler = CrawlerPool(concurrency=30, max_retries=3)
urls = [f"https://example.com/product/{i}" for i in range(500)]
results = asyncio.run(crawler.crawl(urls, country="us"))
print(f"Success: {crawler.stats['success']}, Failed: {crawler.stats['failed']}")

العنصر 4: بارسر

وتحوّل الببغاء إلى بيانات منظمة. إبقاء المنطق منفصلاً عن الزحف - وهذا يجعله قابلاً للاختبار وإعادة الاستخدام ومن السهل التحديث عندما تتغير المواقع المستهدفة.

from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
import json
@dataclass
class ProductData:
    url: str
    title: Optional[str] = None
    price: Optional[float] = None
    currency: Optional[str] = None
    availability: Optional[str] = None
    rating: Optional[float] = None
    review_count: Optional[int] = None
    parsed_at: Optional[str] = None
class ProductParser:
    """Extracts structured product data from HTML."""
    def parse(self, url: str, html: str) -> ProductData:
        soup = BeautifulSoup(html, "html.parser")
        data = ProductData(url=url)
        try:
            data.title = self._extract_text(soup, "h1.product-title")
            data.price = self._extract_price(soup)
            data.currency = self._extract_currency(soup)
            data.availability = self._extract_text(soup, ".availability-status")
            data.rating = self._extract_rating(soup)
            data.review_count = self._extract_review_count(soup)
            data.parsed_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        except Exception as e:
            logger.error(f"Parse error for {url}: {e}")
        return data
    def _extract_text(self, soup, selector: str) -> Optional[str]:
        el = soup.select_one(selector)
        return el.get_text(strip=True) if el else None
    def _extract_price(self, soup) -> Optional[float]:
        el = soup.select_one("[data-price], .price, .product-price")
        if not el:
            return None
        price_text = el.get("data-price") or el.get_text(strip=True)
        # Remove currency symbols and parse
        cleaned = "".join(c for c in price_text if c.isdigit() or c == ".")
        return float(cleaned) if cleaned else None
    def _extract_currency(self, soup) -> Optional[str]:
        el = soup.select_one("[data-currency], .currency")
        return el.get("data-currency") or el.get_text(strip=True) if el else None
    def _extract_rating(self, soup) -> Optional[float]:
        el = soup.select_one("[data-rating], .rating-value")
        if el:
            val = el.get("data-rating") or el.get_text(strip=True)
            try:
                return float(val)
            except ValueError:
                return None
        return None
    def _extract_review_count(self, soup) -> Optional[int]:
        el = soup.select_one(".review-count, [data-reviews]")
        if el:
            text = el.get("data-reviews") or el.get_text(strip=True)
            digits = "".join(c for c in text if c.isdigit())
            return int(digits) if digits else None
        return None
# Usage
parser = ProductParser()
for result in results:
    if result["success"]:
        product = parser.parse(result["url"], result["body"])
        print(f"{product.title}: ${product.price}")

العنصر 5: التخزين

Persist parsed data with versioning and deduplication. إستخدموا المكثفات للتعامل مع اعادة الدمج بنعمة

import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
class DataStore:
    """Persists scraped data with upsert and versioning."""
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self._ensure_tables()
    def _ensure_tables(self):
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    url TEXT PRIMARY KEY,
                    title TEXT,
                    price NUMERIC(10,2),
                    currency VARCHAR(3),
                    availability TEXT,
                    rating NUMERIC(3,2),
                    review_count INTEGER,
                    first_seen_at TIMESTAMPTZ DEFAULT NOW(),
                    last_updated_at TIMESTAMPTZ DEFAULT NOW()
                );
                CREATE TABLE IF NOT EXISTS price_history (
                    id SERIAL PRIMARY KEY,
                    url TEXT REFERENCES products(url),
                    price NUMERIC(10,2),
                    recorded_at TIMESTAMPTZ DEFAULT NOW()
                );
            """)
            self.conn.commit()
    def upsert_products(self, products: list[ProductData]):
        """Insert or update products, recording price changes."""
        with self.conn.cursor() as cur:
            for product in products:
                if product.price is None:
                    continue
                # Check if price changed
                cur.execute(
                    "SELECT price FROM products WHERE url = %s",
                    (product.url,)
                )
                row = cur.fetchone()
                # Upsert product
                cur.execute("""
                    INSERT INTO products (url, title, price, currency, availability,
                                          rating, review_count, last_updated_at)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())
                    ON CONFLICT (url) DO UPDATE SET
                        title = EXCLUDED.title,
                        price = EXCLUDED.price,
                        currency = EXCLUDED.currency,
                        availability = EXCLUDED.availability,
                        rating = EXCLUDED.rating,
                        review_count = EXCLUDED.review_count,
                        last_updated_at = NOW()
                """, (
                    product.url, product.title, product.price,
                    product.currency, product.availability,
                    product.rating, product.review_count,
                ))
                # Record price history if changed
                if row is None or float(row[0]) != product.price:
                    cur.execute(
                        "INSERT INTO price_history (url, price) VALUES (%s, %s)",
                        (product.url, product.price)
                    )
            self.conn.commit()
    @property
    def product_count(self) -> int:
        with self.conn.cursor() as cur:
            cur.execute("SELECT COUNT(*) FROM products")
            return cur.fetchone()[0]
# Usage
store = DataStore("postgresql://user:pass@localhost:5432/scraper")
store.upsert_products(parsed_products)
print(f"Total products: {store.product_count}")

العنصر 6: الرصد

ربط جميع المكونات مع الرصد المركزي. لأنماط الرصد المفصّلة المتعلّقة بالوكالة، انظر Monitoring Proxy Performance دليل

import time
import json
import logging
from datetime import datetime
class PipelineMonitor:
    """Monitors the entire scraping pipeline."""
    def __init__(self):
        self.logger = logging.getLogger("pipeline")
        self.stage_times = {}
        self.stage_counts = {}
    def start_stage(self, stage: str):
        self.stage_times[stage] = time.time()
    def end_stage(self, stage: str, item_count: int = 0):
        elapsed = time.time() - self.stage_times.get(stage, time.time())
        self.stage_counts[stage] = item_count
        self.logger.info(json.dumps({
            "timestamp": datetime.utcnow().isoformat(),
            "stage": stage,
            "duration_seconds": round(elapsed, 2),
            "items_processed": item_count,
            "items_per_second": round(item_count / elapsed, 1) if elapsed > 0 else 0,
        }))
    def report(self) -> dict:
        return {
            "stages": {
                stage: {
                    "items": self.stage_counts.get(stage, 0),
                }
                for stage in self.stage_counts
            },
            "timestamp": datetime.utcnow().isoformat(),
        }
# Usage within the full pipeline
monitor = PipelineMonitor()
# Stage 1: Schedule
monitor.start_stage("schedule")
urls = scheduler.dequeue(batch_size=500)
monitor.end_stage("schedule", len(urls))
# Stage 2: Crawl
monitor.start_stage("crawl")
results = asyncio.run(crawler.crawl([u["url"] for u in urls]))
monitor.end_stage("crawl", len(results))
# Stage 3: Parse
monitor.start_stage("parse")
products = [parser.parse(r["url"], r["body"]) for r in results if r["success"]]
monitor.end_stage("parse", len(products))
# Stage 4: Store
monitor.start_stage("store")
store.upsert_products(products)
monitor.end_stage("store", len(products))
print(json.dumps(monitor.report(), indent=2))

وضعه معاً

هنا خط الأنابيب الكامل الذي يربط كل المكونات الستة في نظام الخردة الوحيد القابل للهرب

import asyncio
import logging
logging.basicConfig(level=logging.INFO)
async def run_pipeline():
    # Initialize components
    scheduler = CrawlScheduler()
    crawler = CrawlerPool(concurrency=30, max_retries=3)
    parser = ProductParser()
    store = DataStore("postgresql://user:pass@localhost:5432/scraper")
    monitor = PipelineMonitor()
    # Schedule crawls
    scheduler.add_schedule(
        name="daily_products",
        urls=[f"https://example.com/product/{i}" for i in range(1, 501)],
        interval_hours=24,
        priority=8,
    )
    # Main loop
    while True:
        # 1. Check schedules and enqueue URLs
        monitor.start_stage("schedule")
        scheduler.tick()
        batch = scheduler.dequeue(batch_size=100)
        monitor.end_stage("schedule", len(batch))
        if not batch:
            await asyncio.sleep(60)
            continue
        # 2. Crawl
        monitor.start_stage("crawl")
        urls = [item["url"] for item in batch]
        results = await crawler.crawl(urls, country="us")
        successful = [r for r in results if r.get("success")]
        monitor.end_stage("crawl", len(successful))
        # 3. Parse
        monitor.start_stage("parse")
        products = []
        for result in successful:
            try:
                product = parser.parse(result["url"], result["body"])
                products.append(product)
            except Exception as e:
                logging.error(f"Parse error: {e}")
        monitor.end_stage("parse", len(products))
        # 4. Store
        monitor.start_stage("store")
        store.upsert_products(products)
        monitor.end_stage("store", len(products))
        # 5. Report
        logging.info(f"Pipeline: {monitor.report()}")
        logging.info(f"Crawler stats: {crawler.stats}")
        # Wait before next batch
        await asyncio.sleep(5)
asyncio.run(run_pipeline())

أنماط نشر الإنتاج

الارتفاع الأفقي

زحامات سكاكين مستقلة عن الجداول و الطرود اجري العديد من حالات الزحف التي تستهلك من نفس قضية ريديس

الارتفاع الأفقي
العنصراستراتيجية التوسعطراز Scale
الجدولحالة واحدة (انتخابات قيادية)حالة واحدة
URL Queueمجموعة " ريديس " أو المدارمجموعة واحدة
"كراودر بول"الحوض الأفقي2-20 حالات
Proxy LayerManaged by ProxyHatغير رسمي (خارجي)
Parserمشترك في المكان مع الزحف أو المنفصلة1:1 مع الزحف
التخزينتكرار قاعدة البيانات1 أولية + مكررة
الرصدالتجميع المركزيحالة واحدة

استرداد الأخطاء

  • تصادم (كراولير) URLs remain in Redis queue. الزحف الجديد يلتقطها تلقائياً
  • الفشل الظاهري: The الطبقة المتوسطة مقابض مع شركاء جدد الإخفاقات المُستمرة تنبهات الزناد
  • حالات الفشل: ويخزن راو HTML في طابور مميتة للتفتيش اليدوي واستكمالات الطوابق.
  • إخفاقات قاعدة البيانات: حواجز البيانات المخففة في الذاكرة/الأدوية مع قطع رأس الكتابة إلى أن يتعافى الـ دي بي

التحقق من جودة البيانات

class DataQualityChecker:
    """Validates parsed data before storage."""
    def check(self, product: ProductData) -> list[str]:
        issues = []
        if not product.title:
            issues.append("missing_title")
        if product.price is not None and product.price <= 0:
            issues.append("invalid_price")
        if product.price is not None and product.price > 100000:
            issues.append("suspicious_price")
        if product.rating is not None and (product.rating < 0 or product.rating > 5):
            issues.append("invalid_rating")
        return issues
    def filter_valid(self, products: list[ProductData]) -> list[ProductData]:
        valid = []
        for product in products:
            issues = self.check(product)
            if issues:
                logger.warning(f"Data quality issues for {product.url}: {issues}")
            else:
                valid.append(product)
        return valid
# Integrate into pipeline
checker = DataQualityChecker()
valid_products = checker.filter_valid(products)
store.upsert_products(valid_products)
أفضل هيكل للخردة هو واحد يمكنك تبريره. وينبغي أن يجيب كل عنصر على ثلاثة أسئلة: ماذا تفعل؟ كيف تفشل؟ كيف تتعافى؟

لعنصر الطبقة المحترفة، انظر بناء شركة بروسي ميدلوارمن أجل الحصول على أكبر قدر ممكن من الإزدحام Scaling Proxy requests with Concurency Controlمن أجل الزحف القائم على التصفيق مسلسل "بروكس" مع "بلايرايت"- بحث استراتيجيات مكافحة الكشف كيف تخرّب دون أن تُغلق.

بدأت Python SDK.. Node SDKأو Go SDK للإدماج المحترف Explore تسعير ProxyHat و الوثائق لتقوية هيكل الخردة الخاص بك

الأسئلة المتكررة

¿Listo para empezar?

Accede a más de 50M de IPs residenciales en más de 148 países con filtrado impulsado por IA.

Ver preciosProxies residenciales
← Volver al Blog