Scraping Altyapısını Nasıl Ölçeklendirirsiniz

Web dökmesi için mimari desenler: kuyruk tabanlı sistemler, boru hatları tasarımı, konteynerlerle yatay ölçeklendirme ve proxy yönetimi. Python, Node.js ve Go.

Scraping Altyapısını Nasıl Ölçeklendirirsiniz

Neden avlanma Altyapısı Özel Mimarlıka İhtiyacı Var

Bir web sitesine vuran tek hazır bir senaryo küçük görevler için iyi çalışır. Ancak milyonlarca sayfayı her gün onlarca hedef boyunca dökmeniz gerektiğinde, bu senaryo bir şişenck haline gelir. Scaling scraping altyapısı Lineer senaryolardan dağıtılması, hataları kusursuz bir şekilde idare eden kuyruk tabanlı mimarilerden hareket etmek, proxy rotasyonunu yönetmek ve en üst düzeye çıkarmak gerektirir.

Bu kılavuz, mimari kalıpları, kuyruk sistemleri, yatay ölçeklendirme stratejileri ve proxy yönetimi tekniklerini ölçeklendirmeyi kapsar.

Bu makale, konseptlerimizi bizimkinden inşa ediyor Web'e komple rehber Proxies. havuz proxy büyüklüğü için, bakınız Kaç Proxies Kaçmak İçin İhtiyacınız Var?

Scalable için Mimarlık Desenleri

Şekil 1: Queue-Based Lavabo

Ölçeklenebilir kazının temeli bir şeydir Mesaj kuyruğu Bu de URL keşiflerini veri toplamasından ayırır. İşçiler kuyruktan görevleri alır, sayfaları proxy aracılığıyla getirir ve depolamak için sonuçları zorlar.

# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
#                               → Worker 2 →
#                               → Worker N →
#                               ↓
#                        [Dead Letter Queue]
#                        (failed requests)

Bu desenin Faydaları:

  • Yatay ölçeklendirme: Sistem değiştirmeden veya ortadan kaldırmadan işçiler sistemi
  • Hata toleransı: Başarısız görevler yeniden deneme için kuyruka geri döndü
  • Hız kontrolü: Uyum işçisi genel olarak kontrol etmeyi kabul eder
  • Viability: Queue derinlik arkalog gösteriyor; tamamlanma oranı sağlık gösteriyor

Python Redis Queue ile Uygulama

import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
    url: str
    target: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
class ScrapingQueue:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:tasks"
        self.results_key = "scrape:results"
        self.dlq_key = "scrape:dead_letter"
    def enqueue(self, task: ScrapeTask):
        self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
    def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
        result = self.redis.brpop(self.queue_key, timeout=timeout)
        if result:
            data = json.loads(result[1])
            return ScrapeTask(**data)
        return None
    def store_result(self, url: str, data: dict):
        self.redis.hset(self.results_key, url, json.dumps(data))
    def send_to_dlq(self, task: ScrapeTask, error: str):
        task_data = task.__dict__
        task_data["error"] = error
        self.redis.lpush(self.dlq_key, json.dumps(task_data))
    @property
    def pending_count(self) -> int:
        return self.redis.llen(self.queue_key)
class ScrapingWorker:
    def __init__(self, queue: ScrapingQueue, worker_id: int):
        self.queue = queue
        self.worker_id = worker_id
        self.session = requests.Session()
        self.session.proxies = {"http": PROXY, "https": PROXY}
    def process_task(self, task: ScrapeTask) -> bool:
        try:
            resp = self.session.get(task.url, timeout=30)
            if resp.status_code == 200:
                self.queue.store_result(task.url, {
                    "status": 200,
                    "body": resp.text[:10000],  # Truncate for storage
                    "target": task.target,
                })
                return True
            elif resp.status_code in [429, 503]:
                # Retry with backoff
                if task.retries < task.max_retries:
                    task.retries += 1
                    time.sleep(2 ** task.retries)
                    self.queue.enqueue(task)
                else:
                    self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
            else:
                self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
        except Exception as e:
            if task.retries < task.max_retries:
                task.retries += 1
                self.queue.enqueue(task)
            else:
                self.queue.send_to_dlq(task, str(e))
        return False
    def run(self):
        print(f"Worker {self.worker_id} started")
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
    queue.enqueue(ScrapeTask(
        url=f"https://example.com/product/{i}",
        target="example.com"
    ))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
    workers = [ScrapingWorker(queue, i) for i in range(10)]
    for worker in workers:
        executor.submit(worker.run)

Node.js Implementation with Bull Queue

const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => {  // 10 concurrent workers
  const { url, target } = job.data;
  try {
    const res = await fetch(url, { agent, timeout: 30000 });
    if (res.ok) {
      const body = await res.text();
      return { url, status: res.status, body: body.slice(0, 10000) };
    }
    if (res.status === 429 || res.status === 503) {
      throw new Error(`Rate limited: HTTP ${res.status}`);
    }
    return { url, status: res.status, body: null };
  } catch (err) {
    throw err; // Bull will retry based on job options
  }
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
  for (const url of urls) {
    await scrapeQueue.add(
      { url, target },
      {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: 1000,
        removeOnFail: false,
      }
    );
  }
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
  console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
  console.error(`Failed: ${job.data.url} - ${err.message}`);
});

Şekil 2: Boru Mimarisi

Karmaşık kazı akışları için, bir akış kullanın Boru hattı boru hattı boru hattı boru hattı Her aşamanın farklı bir endişesi nerede idare eder:

# Pipeline stages:
#
# Stage 1: URL Discovery    → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction  → parses HTML, extracts data
# Stage 4: Data Validation  → checks quality, deduplicates
# Stage 5: Storage          → saves to database/warehouse

Go Uygulama

package main
import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)
type ScrapeResult struct {
    URL    string `json:"url"`
    Status int    `json:"status"`
    Body   string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
    defer wg.Done()
    proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
    client := &http.Client{
        Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
        Timeout:   30 * time.Second,
    }
    for u := range urls {
        resp, err := client.Get(u)
        if err != nil {
            results <- ScrapeResult{URL: u, Status: 0}
            continue
        }
        body, _ := io.ReadAll(resp.Body)
        resp.Body.Close()
        results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
    }
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
    for result := range results {
        if result.Status == 200 {
            // Extract and store data
            data, _ := json.Marshal(result)
            fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
        }
    }
    done <- true
}
func main() {
    urls := make(chan string, 1000)
    results := make(chan ScrapeResult, 1000)
    done := make(chan bool)
    // Start 20 fetcher workers
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go fetcher(urls, results, &wg)
    }
    // Start processor
    go processor(results, done)
    // Feed URLs
    go func() {
        for i := 0; i < 10000; i++ {
            urls <- fmt.Sprintf("https://example.com/product/%d", i)
        }
        close(urls)
    }()
    // Wait for all fetchers, then close results
    wg.Wait()
    close(results)
    <-done
}

Yatay Scaling Strategies

Strateji 1: Multi-Makine Deployment

Birden çok makinedeki işçiler. kuyruk koordinasyon noktası olarak hareket eder:

Strateji 1: Multi-Makine Deployment
BitirmeDeploymentScaling
Queue (Redis/LordtMQ)Özel sunucu veya yönetilen hizmetDikey (daha fazla RAM)
İşçilerBirden çok makine veya konteynerYatay (add cases)
Sonuçlar depolamaVeritabanı veya nesne depolamaDikey +
İzleme İzleme İzleme İzleme İzlemeCentralized dashboardTek örnek

Strateji 2: Konteyner Tabanlı Scaling

elastik ölçeklendirme için Docker ve Kubernet kullanın. Her işçi tekrarlanabilir bir konteynerde çalışır:

# docker-compose.yml for scraping workers
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  scraper-worker:
    build: .
    environment:
      - PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
      - REDIS_URL=redis://redis:6379/0
      - WORKER_CONCURRENCY=10
    deploy:
      replicas: 5        # 5 containers × 10 concurrency = 50 parallel requests
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
    depends_on:
      - redis

Proxy Management at Scale

Ölçekte, proxy yönetimi kritik bir sistem bileşeni haline gelir. Anahtar düşünceler:

Bağlantı Havuz

İstek başına yeni olanlar yaratmak yerine proxy ağ geçidine bağlantıları kullanın. Bu latency ve bağlantı yükünü azaltır:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
    """Create a session with connection pooling and retry logic."""
    session = requests.Session()
    # Connection pool: keep 20 connections, max 50
    adapter = HTTPAdapter(
        pool_connections=20,
        pool_maxsize=50,
        max_retries=Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[500, 502, 503],
        )
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.proxies = {
        "http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
        "https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
    }
    return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
    resp = session.get(url, timeout=30)

Sağlık Takip Et

Sorunları erken tespit etmek için performans proxy'nizi gerçek zamanlı izleyin:

import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
    """Track proxy health metrics for monitoring."""
    requests_total: int = 0
    requests_success: int = 0
    requests_blocked: int = 0
    requests_timeout: int = 0
    latency_samples: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    def record_request(self, status_code: int, latency_ms: float):
        self.requests_total += 1
        self.status_codes[status_code] += 1
        self.latency_samples.append(latency_ms)
        if status_code == 200:
            self.requests_success += 1
        elif status_code in [403, 429]:
            self.requests_blocked += 1
        # Keep only last 1000 samples
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]
    @property
    def success_rate(self) -> float:
        return self.requests_success / self.requests_total if self.requests_total else 0
    @property
    def avg_latency(self) -> float:
        return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
    def report(self) -> str:
        return (
            f"Total: {self.requests_total}, "
            f"Success: {self.success_rate:.1%}, "
            f"Blocked: {self.requests_blocked}, "
            f"Avg Latency: {self.avg_latency:.0f}ms"
        )

Data Storage at Scale

Data Storage at Scale
Depolama TipiEn iyisi içinScale Scale
PostgreSQLYapılı ürün/pricing dataMilyonlarca satır
MongoDBYarı yapılandırılmış / değişkenli şemalarMilyarlarca belge
S3/Object depolamaRaw HTML arşivleriPetabays
ElasticsearchTamamlanan veriler üzerinde tam metin aramaMilyarlarca belge
ClickHouseBüyük veri kümeleri üzerinde analitikÇizgilerin Trillions of rows

Scaling Checklist

  • Çift URL keşif getirmekten. aşamalar arasında bir mesaj kuyruk kullanın.
  • Doğru yeniden deneme mantığı uygulayın. Kalıcı başarısızlıklar için ölü mektup kuyrukları ile Exponential backoff.
  • Her şeyi izleyin. Queue derinlik, başarı oranları, geçncy, hedef alan başına hata oranları.
  • Bağlantı havuzu kullanın. İstek başına yeni olanları yaratmak yerine yeniden kullanım bağlantıları.
  • Başarısızlık için plan. İşçiler kazasında, proxy bloke edilir, hedefler yapısını değiştirir. Her katmana dayanıklılık inşa edin.
  • Başlamadan önce ölçek test edin. 100 RPM'de çalışan bir sistem hafıza, bağlantı sınırları veya kuyruk şişeleri nedeniyle 10.000 RPM'de başarısız olabilir.

Ölçekleme mimarisinizi tamamlayan proxy stratejileri için, okuyun Proxy Rotation Strategies for Large-Scale Haping. Boyut olarak oran limitlerini işlemek için, bakınız Takma Sınırları Açıkladı.

Kullanın Python SDK, Node SDKYa da Go SDK Üretim entegrasyonu proxy için ve keşfedin ProxyHat planları Yüksek hacimli kazı için.

Sık Sorulan Sorular

Ölçekte kazı yapmak için hangi kuyruk sistemi en iyisidir?

Redis with Bull (Node.js) veya RQ (Python) günde milyonlarca görev için iyi çalışıyor. Daha büyük ölçek için, Apache Kafka veya TavşanMQ daha iyi dayanıklılık ve yönlendirme sağlar. Mevcut altyapınıza ve takım uzmanlığınıza dayanarak seçin.

Kaç eş zamanlı işçi nasıl koşmalıyım?

Proxy kapasitenize ve hedef site toleransına dayanan 10-20 işçi ve ölçek ile başlayın. Başarı oranlarını izleyin -% 90'ın altına düşerse, daha fazla işçi eklemeden önce karşılıklılığı azaltır. ProxyHat aracılığıyla her işçi otomatik IP rotasyonu alır.

İşçiler için birsen veya iplik kullanmalı mıyım?

I/O-bound scraping (en çok vaka), async (Python asyncio, Node.js) iplikten daha iyi kaynak verimliliği sağlar. Sadece CPU-heavy parsing'e ihtiyacınız olduğunda iplik veya çokiş kullanın. Go goroutines her iki modelde de öne çıkıyor.

Hedef site yapısını nasıl değiştiririm?

Boru hattınızda doğrulama verileri geçerlidir. Veriler geçerli değildir (bölgeleri, yanlış türleri), ekibinizi ve kuyruklarınızı güncel .s ile yeniden işleme için etkilenen URL'lerinizi uyarır. .lerinizi yazın, böylece gerekirse geri dönebilirsiniz.

Başlamaya hazır mısınız?

148+ ülkede 50M+ konut IP'sine AI destekli filtreleme ile erişin.

Fiyatlandırmayı GörüntüleKonut Proxy'leri
← Bloga Dön