Neden avlanma Altyapısı Özel Mimarlıka İhtiyacı Var
Bir web sitesine vuran tek hazır bir senaryo küçük görevler için iyi çalışır. Ancak milyonlarca sayfayı her gün onlarca hedef boyunca dökmeniz gerektiğinde, bu senaryo bir şişenck haline gelir. Scaling scraping altyapısı Lineer senaryolardan dağıtılması, hataları kusursuz bir şekilde idare eden kuyruk tabanlı mimarilerden hareket etmek, proxy rotasyonunu yönetmek ve en üst düzeye çıkarmak gerektirir.
Bu kılavuz, mimari kalıpları, kuyruk sistemleri, yatay ölçeklendirme stratejileri ve proxy yönetimi tekniklerini ölçeklendirmeyi kapsar.
Bu makale, konseptlerimizi bizimkinden inşa ediyor Web'e komple rehber Proxies. havuz proxy büyüklüğü için, bakınız Kaç Proxies Kaçmak İçin İhtiyacınız Var?
Scalable için Mimarlık Desenleri
Şekil 1: Queue-Based Lavabo
Ölçeklenebilir kazının temeli bir şeydir Mesaj kuyruğu Bu de URL keşiflerini veri toplamasından ayırır. İşçiler kuyruktan görevleri alır, sayfaları proxy aracılığıyla getirir ve depolamak için sonuçları zorlar.
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)Bu desenin Faydaları:
- Yatay ölçeklendirme: Sistem değiştirmeden veya ortadan kaldırmadan işçiler sistemi
- Hata toleransı: Başarısız görevler yeniden deneme için kuyruka geri döndü
- Hız kontrolü: Uyum işçisi genel olarak kontrol etmeyi kabul eder
- Viability: Queue derinlik arkalog gösteriyor; tamamlanma oranı sağlık gösteriyor
Python Redis Queue ile Uygulama
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)Node.js Implementation with Bull Queue
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});Şekil 2: Boru Mimarisi
Karmaşık kazı akışları için, bir akış kullanın Boru hattı boru hattı boru hattı boru hattı Her aşamanın farklı bir endişesi nerede idare eder:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouseGo Uygulama
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}Yatay Scaling Strategies
Strateji 1: Multi-Makine Deployment
Birden çok makinedeki işçiler. kuyruk koordinasyon noktası olarak hareket eder:
| Bitirme | Deployment | Scaling |
|---|---|---|
| Queue (Redis/LordtMQ) | Özel sunucu veya yönetilen hizmet | Dikey (daha fazla RAM) |
| İşçiler | Birden çok makine veya konteyner | Yatay (add cases) |
| Sonuçlar depolama | Veritabanı veya nesne depolama | Dikey + |
| İzleme İzleme İzleme İzleme İzleme | Centralized dashboard | Tek örnek |
Strateji 2: Konteyner Tabanlı Scaling
elastik ölçeklendirme için Docker ve Kubernet kullanın. Her işçi tekrarlanabilir bir konteynerde çalışır:
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redisProxy Management at Scale
Ölçekte, proxy yönetimi kritik bir sistem bileşeni haline gelir. Anahtar düşünceler:
Bağlantı Havuz
İstek başına yeni olanlar yaratmak yerine proxy ağ geçidine bağlantıları kullanın. Bu latency ve bağlantı yükünü azaltır:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)Sağlık Takip Et
Sorunları erken tespit etmek için performans proxy'nizi gerçek zamanlı izleyin:
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)Data Storage at Scale
| Depolama Tipi | En iyisi için | Scale Scale |
|---|---|---|
| PostgreSQL | Yapılı ürün/pricing data | Milyonlarca satır |
| MongoDB | Yarı yapılandırılmış / değişkenli şemalar | Milyarlarca belge |
| S3/Object depolama | Raw HTML arşivleri | Petabays |
| Elasticsearch | Tamamlanan veriler üzerinde tam metin arama | Milyarlarca belge |
| ClickHouse | Büyük veri kümeleri üzerinde analitik | Çizgilerin Trillions of rows |
Scaling Checklist
- Çift URL keşif getirmekten. aşamalar arasında bir mesaj kuyruk kullanın.
- Doğru yeniden deneme mantığı uygulayın. Kalıcı başarısızlıklar için ölü mektup kuyrukları ile Exponential backoff.
- Her şeyi izleyin. Queue derinlik, başarı oranları, geçncy, hedef alan başına hata oranları.
- Bağlantı havuzu kullanın. İstek başına yeni olanları yaratmak yerine yeniden kullanım bağlantıları.
- Başarısızlık için plan. İşçiler kazasında, proxy bloke edilir, hedefler yapısını değiştirir. Her katmana dayanıklılık inşa edin.
- Başlamadan önce ölçek test edin. 100 RPM'de çalışan bir sistem hafıza, bağlantı sınırları veya kuyruk şişeleri nedeniyle 10.000 RPM'de başarısız olabilir.
Ölçekleme mimarisinizi tamamlayan proxy stratejileri için, okuyun Proxy Rotation Strategies for Large-Scale Haping. Boyut olarak oran limitlerini işlemek için, bakınız Takma Sınırları Açıkladı.
Kullanın Python SDK, Node SDKYa da Go SDK Üretim entegrasyonu proxy için ve keşfedin ProxyHat planları Yüksek hacimli kazı için.
Sık Sorulan Sorular
Ölçekte kazı yapmak için hangi kuyruk sistemi en iyisidir?
Redis with Bull (Node.js) veya RQ (Python) günde milyonlarca görev için iyi çalışıyor. Daha büyük ölçek için, Apache Kafka veya TavşanMQ daha iyi dayanıklılık ve yönlendirme sağlar. Mevcut altyapınıza ve takım uzmanlığınıza dayanarak seçin.
Kaç eş zamanlı işçi nasıl koşmalıyım?
Proxy kapasitenize ve hedef site toleransına dayanan 10-20 işçi ve ölçek ile başlayın. Başarı oranlarını izleyin -% 90'ın altına düşerse, daha fazla işçi eklemeden önce karşılıklılığı azaltır. ProxyHat aracılığıyla her işçi otomatik IP rotasyonu alır.
İşçiler için birsen veya iplik kullanmalı mıyım?
I/O-bound scraping (en çok vaka), async (Python asyncio, Node.js) iplikten daha iyi kaynak verimliliği sağlar. Sadece CPU-heavy parsing'e ihtiyacınız olduğunda iplik veya çokiş kullanın. Go goroutines her iki modelde de öne çıkıyor.
Hedef site yapısını nasıl değiştiririm?
Boru hattınızda doğrulama verileri geçerlidir. Veriler geçerli değildir (bölgeleri, yanlış türleri), ekibinizi ve kuyruklarınızı güncel .s ile yeniden işleme için etkilenen URL'lerinizi uyarır. .lerinizi yazın, böylece gerekirse geri dönebilirsiniz.






