Senaryoların Ötesinde: Mimarlık Nedenleri
Tek bir filtreli yüz sayfa için iyi çalışır. Ancak tekrarlanan bir programdaki düzinelerce hedef siteye milyonlarca veri noktası toplamanız gerektiğinde, bir sisteme ihtiyacınız var - bir senaryo değil. Güvenilir bir kazı mimarisi, ölçeklenebilen, izlenebilir ve bağımsız bir şekilde kurtarılabilir bağımsız bileşenlerle ilgilidir.
Bu kılavuz, bir üretim kazı sisteminin tasarımıyla, URL’den veri depolamaya yönetim yoluyla zamanlama yoluyla yürür. Her bileşen kodla gösterilir ve bağlantı ile bağlantılıdır ProxyHat altyapısı proxy.
İyi anarşist, bir mühendislik problemi olarak veri koleksiyonuna davranır, bir hack sorunu değil. Her bileşeninin tek bir sorumluluğu, açık arabirimleri ve gözlemlenebilir davranışları vardır.
Sistem Mimari Genel Bakış
Bir üretim kazı sistemi altı temel bileşenden oluşur:
┌─────────────┐ ┌──────────────┐ ┌─────────────────┐
│ Scheduler │────▶│ URL Queue │────▶│ Crawler Pool │
│ (cron/API) │ │ (Redis) │ │ (workers) │
└─────────────┘ └──────────────┘ └────────┬────────┘
│
┌────────▼────────┐
│ Proxy Layer │
│ (ProxyHat) │
└────────┬────────┘
│
┌────────▼────────┐
│ Parser │
│ (extract data) │
└────────┬────────┘
│
┌──────────────┐ ┌────────▼────────┐
│ Monitoring │◀─────│ Storage │
│ (metrics) │ │ (DB / files) │
└──────────────┘ └─────────────────┘
| Bitirme | Sorumluluk Sorumluluk Sorumluluk | Teknoloji Teknolojisi |
|---|---|---|
| Scheduler | Ne kazıtacağına karar verir ve ne zaman | Cron, Celery Beat, Bull |
| URL Queue | Buffers URLs öncelik ve deduplication ile | Redis, TavşanMQ, SQS |
| Crawler Pool | Fetches sayfaları eş zamanlı olarak | Asyncio, goroutines, işçi iplikleri |
| Proxy Katmanı | Rotalar, dönen proxy yoluyla talepler | ProxyHat Gateway |
| Parser | HTML/JSON'dan yapılandırılmış veriler | Güzel Soup, neşeli, goquery |
| Depolama | Persistler verileri satın aldı | PostgreSQL, MongoDB, S3 |
| İzleme İzleme İzleme İzleme İzleme | Sağlık ve performans | Prometheus, log, uyarılar |
Not 1: Scheduler
Programcı hangi URL'lerin taranacağına ve ne zaman olduğuna karar verir. Ekran frekansı, öncelik seviyelerini yönetir ve gerekli olandan daha sık URL'nin çıkarılmasını sağlar.
import redis
import json
import time
from datetime import datetime, timedelta
class CrawlScheduler:
"""Manages crawl schedules and feeds URLs to the queue."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.queue_key = "scraper:url_queue"
self.schedule_key = "scraper:schedules"
self.seen_key = "scraper:seen_urls"
def add_schedule(self, name: str, urls: list[str], interval_hours: int, priority: int = 5):
"""Register a recurring crawl job."""
self.redis.hset(self.schedule_key, name, json.dumps({
"urls": urls,
"interval_hours": interval_hours,
"priority": priority,
"last_run": None,
}))
def tick(self):
"""Check all schedules and enqueue URLs that are due."""
schedules = self.redis.hgetall(self.schedule_key)
enqueued = 0
for name, data in schedules.items():
schedule = json.loads(data)
last_run = schedule.get("last_run")
interval = timedelta(hours=schedule["interval_hours"])
if last_run and datetime.fromisoformat(last_run) + interval > datetime.utcnow():
continue
for url in schedule["urls"]:
self.enqueue(url, priority=schedule["priority"])
enqueued += 1
schedule["last_run"] = datetime.utcnow().isoformat()
self.redis.hset(self.schedule_key, name, json.dumps(schedule))
return enqueued
def enqueue(self, url: str, priority: int = 5):
"""Add a URL to the crawl queue with deduplication."""
# Skip if recently seen (within 1 hour)
if self.redis.sismember(self.seen_key, url):
return False
self.redis.zadd(self.queue_key, {
json.dumps({"url": url, "enqueued_at": time.time()}): priority
})
self.redis.sadd(self.seen_key, url)
self.redis.expire(self.seen_key, 3600) # 1-hour dedup window
return True
def dequeue(self, batch_size: int = 10) -> list[dict]:
"""Pull the highest-priority URLs from the queue."""
items = self.redis.zpopmax(self.queue_key, batch_size)
return [json.loads(item) for item, score in items]
@property
def queue_size(self) -> int:
return self.redis.zcard(self.queue_key)
# Usage
scheduler = CrawlScheduler()
scheduler.add_schedule(
name="product_prices",
urls=[f"https://example.com/product/{i}" for i in range(1, 1001)],
interval_hours=6,
priority=8,
)
scheduler.add_schedule(
name="competitor_pages",
urls=["https://competitor.com/pricing", "https://competitor.com/features"],
interval_hours=24,
priority=5,
)
# Run every minute via cron
enqueued = scheduler.tick()
print(f"Enqueued {enqueued} URLs, queue size: {scheduler.queue_size}")
2. URL Queue
kuyruklar taramaktan zamanlama yapar. Öncelikli sipariş, geri baskı ve kalıcılık sağlar - bu yüzden bir tarama kazaları olup kaybolmaz.
// Node.js queue with Bull
const Queue = require('bull');
const crawlQueue = new Queue('crawl', 'redis://localhost:6379', {
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 100,
removeOnFail: 500,
},
});
// Add URLs with priority (lower number = higher priority)
async function enqueueUrls(urls, priority = 5) {
const jobs = urls.map(url =>
crawlQueue.add(
{ url, enqueuedAt: Date.now() },
{ priority, jobId: url } // jobId for deduplication
)
);
await Promise.all(jobs);
console.log(`Enqueued ${urls.length} URLs`);
}
// Worker processes URLs
crawlQueue.process(10, async (job) => {
const { url } = job.data;
// Crawl logic here (see Crawler Pool below)
return { url, status: 'completed' };
});
crawlQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed: ${err.message}`);
});
3. Crawler Pool
Scaner havuz sayfaları hemen aracılığıyla getiriyor Ortaware katmanı. Yeterlik limitleriAncak yeniden işliyor ve the'ye ham yanıtları geçiyor.
import asyncio
import aiohttp
import uuid
import time
from typing import Optional
class CrawlerPool:
"""Concurrent crawler with proxy rotation and retry logic."""
def __init__(
self,
concurrency: int = 30,
max_retries: int = 3,
timeout: int = 30,
):
self.concurrency = concurrency
self.max_retries = max_retries
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.semaphore = asyncio.Semaphore(concurrency)
self.stats = {"success": 0, "failed": 0, "retries": 0}
def _get_proxy(self, country: Optional[str] = None) -> str:
session_id = uuid.uuid4().hex[:8]
username = f"USERNAME-session-{session_id}"
if country:
username += f"-country-{country}"
return f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
async def fetch(
self,
session: aiohttp.ClientSession,
url: str,
country: Optional[str] = None,
) -> dict:
async with self.semaphore:
for attempt in range(self.max_retries + 1):
proxy = self._get_proxy(country)
start = time.time()
try:
async with session.get(
url, proxy=proxy, timeout=self.timeout,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
},
) as response:
body = await response.text()
latency = time.time() - start
if response.status in (403, 429, 503) and attempt < self.max_retries:
self.stats["retries"] += 1
await asyncio.sleep(2 ** attempt)
continue
if response.status < 400:
self.stats["success"] += 1
else:
self.stats["failed"] += 1
return {
"url": url,
"status": response.status,
"body": body,
"latency": latency,
"success": response.status < 400,
}
except Exception as e:
if attempt < self.max_retries:
self.stats["retries"] += 1
await asyncio.sleep(2 ** attempt)
continue
self.stats["failed"] += 1
return {
"url": url,
"error": str(e),
"latency": time.time() - start,
"success": False,
}
async def crawl(self, urls: list[str], country: Optional[str] = None) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [self.fetch(session, url, country) for url in urls]
return await asyncio.gather(*tasks)
# Usage
crawler = CrawlerPool(concurrency=30, max_retries=3)
urls = [f"https://example.com/product/{i}" for i in range(500)]
results = asyncio.run(crawler.crawl(urls, country="us"))
print(f"Success: {crawler.stats['success']}, Failed: {crawler.stats['failed']}")
Bitir 4: Parser
The ham HTML'yi yapısal verilere dönüştürür. Ekranlamadan ayrı mantık tutun - bu, hedef siteleri değiştiğinde test edilebilir, yeniden uygulanabilir ve güncellemeyi kolaylaştırır.
from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
import json
@dataclass
class ProductData:
url: str
title: Optional[str] = None
price: Optional[float] = None
currency: Optional[str] = None
availability: Optional[str] = None
rating: Optional[float] = None
review_count: Optional[int] = None
parsed_at: Optional[str] = None
class ProductParser:
"""Extracts structured product data from HTML."""
def parse(self, url: str, html: str) -> ProductData:
soup = BeautifulSoup(html, "html.parser")
data = ProductData(url=url)
try:
data.title = self._extract_text(soup, "h1.product-title")
data.price = self._extract_price(soup)
data.currency = self._extract_currency(soup)
data.availability = self._extract_text(soup, ".availability-status")
data.rating = self._extract_rating(soup)
data.review_count = self._extract_review_count(soup)
data.parsed_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
except Exception as e:
logger.error(f"Parse error for {url}: {e}")
return data
def _extract_text(self, soup, selector: str) -> Optional[str]:
el = soup.select_one(selector)
return el.get_text(strip=True) if el else None
def _extract_price(self, soup) -> Optional[float]:
el = soup.select_one("[data-price], .price, .product-price")
if not el:
return None
price_text = el.get("data-price") or el.get_text(strip=True)
# Remove currency symbols and parse
cleaned = "".join(c for c in price_text if c.isdigit() or c == ".")
return float(cleaned) if cleaned else None
def _extract_currency(self, soup) -> Optional[str]:
el = soup.select_one("[data-currency], .currency")
return el.get("data-currency") or el.get_text(strip=True) if el else None
def _extract_rating(self, soup) -> Optional[float]:
el = soup.select_one("[data-rating], .rating-value")
if el:
val = el.get("data-rating") or el.get_text(strip=True)
try:
return float(val)
except ValueError:
return None
return None
def _extract_review_count(self, soup) -> Optional[int]:
el = soup.select_one(".review-count, [data-reviews]")
if el:
text = el.get("data-reviews") or el.get_text(strip=True)
digits = "".join(c for c in text if c.isdigit())
return int(digits) if digits else None
return None
# Usage
parser = ProductParser()
for result in results:
if result["success"]:
product = parser.parse(result["url"], result["body"])
print(f"{product.title}: ${product.price}")
5: Storage 5: Storage
Persist verileri sürümleme ve deduplication ile reddetti. Re-crawls nezaketle başa çıkmak için tezler kullanın.
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
class DataStore:
"""Persists scraped data with upsert and versioning."""
def __init__(self, dsn: str):
self.conn = psycopg2.connect(dsn)
self._ensure_tables()
def _ensure_tables(self):
with self.conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
url TEXT PRIMARY KEY,
title TEXT,
price NUMERIC(10,2),
currency VARCHAR(3),
availability TEXT,
rating NUMERIC(3,2),
review_count INTEGER,
first_seen_at TIMESTAMPTZ DEFAULT NOW(),
last_updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS price_history (
id SERIAL PRIMARY KEY,
url TEXT REFERENCES products(url),
price NUMERIC(10,2),
recorded_at TIMESTAMPTZ DEFAULT NOW()
);
""")
self.conn.commit()
def upsert_products(self, products: list[ProductData]):
"""Insert or update products, recording price changes."""
with self.conn.cursor() as cur:
for product in products:
if product.price is None:
continue
# Check if price changed
cur.execute(
"SELECT price FROM products WHERE url = %s",
(product.url,)
)
row = cur.fetchone()
# Upsert product
cur.execute("""
INSERT INTO products (url, title, price, currency, availability,
rating, review_count, last_updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (url) DO UPDATE SET
title = EXCLUDED.title,
price = EXCLUDED.price,
currency = EXCLUDED.currency,
availability = EXCLUDED.availability,
rating = EXCLUDED.rating,
review_count = EXCLUDED.review_count,
last_updated_at = NOW()
""", (
product.url, product.title, product.price,
product.currency, product.availability,
product.rating, product.review_count,
))
# Record price history if changed
if row is None or float(row[0]) != product.price:
cur.execute(
"INSERT INTO price_history (url, price) VALUES (%s, %s)",
(product.url, product.price)
)
self.conn.commit()
@property
def product_count(self) -> int:
with self.conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM products")
return cur.fetchone()[0]
# Usage
store = DataStore("postgresql://user:pass@localhost:5432/scraper")
store.upsert_products(parsed_products)
print(f"Total products: {store.product_count}")
6.
Merkezileştirilmiş izleme ile birlikte tüm bileşenleri yapıştırın. Detaylı izleme desenleri için, bakınız our detailed-specific monitoring pattern, see our our our our Proxy Performance kılavuz.
import time
import json
import logging
from datetime import datetime
class PipelineMonitor:
"""Monitors the entire scraping pipeline."""
def __init__(self):
self.logger = logging.getLogger("pipeline")
self.stage_times = {}
self.stage_counts = {}
def start_stage(self, stage: str):
self.stage_times[stage] = time.time()
def end_stage(self, stage: str, item_count: int = 0):
elapsed = time.time() - self.stage_times.get(stage, time.time())
self.stage_counts[stage] = item_count
self.logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"stage": stage,
"duration_seconds": round(elapsed, 2),
"items_processed": item_count,
"items_per_second": round(item_count / elapsed, 1) if elapsed > 0 else 0,
}))
def report(self) -> dict:
return {
"stages": {
stage: {
"items": self.stage_counts.get(stage, 0),
}
for stage in self.stage_counts
},
"timestamp": datetime.utcnow().isoformat(),
}
# Usage within the full pipeline
monitor = PipelineMonitor()
# Stage 1: Schedule
monitor.start_stage("schedule")
urls = scheduler.dequeue(batch_size=500)
monitor.end_stage("schedule", len(urls))
# Stage 2: Crawl
monitor.start_stage("crawl")
results = asyncio.run(crawler.crawl([u["url"] for u in urls]))
monitor.end_stage("crawl", len(results))
# Stage 3: Parse
monitor.start_stage("parse")
products = [parser.parse(r["url"], r["body"]) for r in results if r["success"]]
monitor.end_stage("parse", len(products))
# Stage 4: Store
monitor.start_stage("store")
store.upsert_products(products)
monitor.end_stage("store", len(products))
print(json.dumps(monitor.report(), indent=2))
Her şeyi bir araya getirmek
İşte tüm altı bileşeni tek, runnable scraping sistemine bağlayan tam boru hattı.
import asyncio
import logging
logging.basicConfig(level=logging.INFO)
async def run_pipeline():
# Initialize components
scheduler = CrawlScheduler()
crawler = CrawlerPool(concurrency=30, max_retries=3)
parser = ProductParser()
store = DataStore("postgresql://user:pass@localhost:5432/scraper")
monitor = PipelineMonitor()
# Schedule crawls
scheduler.add_schedule(
name="daily_products",
urls=[f"https://example.com/product/{i}" for i in range(1, 501)],
interval_hours=24,
priority=8,
)
# Main loop
while True:
# 1. Check schedules and enqueue URLs
monitor.start_stage("schedule")
scheduler.tick()
batch = scheduler.dequeue(batch_size=100)
monitor.end_stage("schedule", len(batch))
if not batch:
await asyncio.sleep(60)
continue
# 2. Crawl
monitor.start_stage("crawl")
urls = [item["url"] for item in batch]
results = await crawler.crawl(urls, country="us")
successful = [r for r in results if r.get("success")]
monitor.end_stage("crawl", len(successful))
# 3. Parse
monitor.start_stage("parse")
products = []
for result in successful:
try:
product = parser.parse(result["url"], result["body"])
products.append(product)
except Exception as e:
logging.error(f"Parse error: {e}")
monitor.end_stage("parse", len(products))
# 4. Store
monitor.start_stage("store")
store.upsert_products(products)
monitor.end_stage("store", len(products))
# 5. Report
logging.info(f"Pipeline: {monitor.report()}")
logging.info(f"Crawler stats: {crawler.stats}")
# Wait before next batch
await asyncio.sleep(5)
asyncio.run(run_pipeline())
Prodüksiyon Deployment Patterns
Yatay Scaling
Frekans taramaları programlayıcılardan ve .s'ten bağımsız olarak. Aynı Redis kuyruğundan tüketilen birden çok tarama örneği çalıştırın.
| Bitirme | Scaling Strategy | Tipik Scale |
|---|---|---|
| Scheduler | Tek örnek (başı seçim) | 1 örnek |
| URL Queue | Redis kümesi veya kuyruk yönetilen | 1 küme |
| Crawler Pool | Yatay pod ölçeklendirme | 2-20 örnek |
| Proxy Katmanı | ProxyHat tarafından Yönetilen | N/A (external) |
| Parser | Ekranlayıcı veya ayrı ayrı | 1:1 with hatchers |
| Depolama | Veritabanı replikasyon | 1 birincil + çoğaltmalar |
| İzleme İzleme İzleme İzleme İzleme | Centralized aggregation | 1 örnek |
Hata kurtarma
- Crawler kazasında: URL'ler Redis kuyruğunda kalır. Yeni taramacı onları otomatik olarak alır.
- Proxy başarısızlıkları: The The The The The The The The Ortaware katmanı Yeni IP ile yeniden birleşmeler. Sustained başarısızlıklar uyarıları tetikler.
- Parser başarısızlıkları: Raw HTML, manuel denetim ve . güncellemeler için ölü bir kuyrukta depolanır.
- Veritabanı başarısızlıkları: Storage/disk'ta bulunan veri tamponları, DB'nin kurtarılmasına kadar yaz-ahead logları ile iptal etti.
Data Quality Checks
class DataQualityChecker:
"""Validates parsed data before storage."""
def check(self, product: ProductData) -> list[str]:
issues = []
if not product.title:
issues.append("missing_title")
if product.price is not None and product.price <= 0:
issues.append("invalid_price")
if product.price is not None and product.price > 100000:
issues.append("suspicious_price")
if product.rating is not None and (product.rating < 0 or product.rating > 5):
issues.append("invalid_rating")
return issues
def filter_valid(self, products: list[ProductData]) -> list[ProductData]:
valid = []
for product in products:
issues = self.check(product)
if issues:
logger.warning(f"Data quality issues for {product.url}: {issues}")
else:
valid.append(product)
return valid
# Integrate into pipeline
checker = DataQualityChecker()
valid_products = checker.filter_valid(products)
store.upsert_products(valid_products)
En iyi kazı mimarisi, bir sebebiniz olabilir. Her bileşen üç soruya cevap vermelidir: Ne yapar? Nasıl başarısız olur? Nasıl geri dönüyor?
Katman proxy bileşeni için, bakınız Bir Proxy Ortaware Katmanı.In optimizasyon taramalı throughput, read Proxy Requests with Concurrency ControlTarayıcı tabanlı tarama için, bakınız Playwright ile Proxies. Anti-deteksiyon stratejileri için, keşfedin bloke olmadan nasıl kazınır.
Get start with ile başlayın Python SDK, Node SDKYa da Go SDK Proxy entegrasyonu için. Discover Discover Discover Discover ProxyHat pricing ve Belge belgeleri Yıkıcı mimarinizi güçlendirmek için.






