لماذا تهتز البنية التحتية
السيناريو الوحيد الذي يضرب على موقع واحد يعمل بشكل جيد للمهام الصغيرة ولكن عندما تحتاج إلى كشط الملايين من الصفحات يوميا عبر العشرات من الأهداف، أن النص يصبح الاختناق. الهياكل الأساسية لقطع الخردة ويتطلب الانتقال من النصوص الخطية إلى البنيانات الموزعة القائمة على التساؤلات التي تعالج الإخفاقات بصورة رشيدة، وتدير التناوب المحترف، وتعظيم النواتج.
ويغطي هذا الدليل أنماط البنية، ونظم التساؤل، واستراتيجيات التقسيم الأفقي، وتقنيات الإدارة البديلة التي تخفض إنتاج الطاقة على نطاق واسع.
تستند هذه المادة إلى مفاهيم منا دليل كامل للدعاوى الإلكترونيةمن أجل تذوق حمام السباحة كم عدد المحترفين الذين تحتاجون للتشويش؟
أنماط الهندسة المعمارية
Pattern 1: Queue-Based Scraping
أساس الخردة المتصاعدة هو الرسالة: هذا الديكور يكتشف من جلب البيانات العمال يسحبون المهام من الطابور ويجلبون الصفحات من خلال العملاء ويدفعون النتائج إلى التخزين
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)فوائد هذا النمط:
- التوسع الأفقي: إضافة أو إزالة العمال دون تغيير النظام
- التسامح الافتراضي: وتعود المهام الفاشلة إلى طابور العودة
- مراقبة المعدل: عدل عدد العمال لمراقبة الناتج العام
- الرؤية: يُظهر عمق النواة تراكماً؛ ويُظهر معدل الإنجاز الصحة
Python Implementation with Redis Queue
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)Node.js Implementation with Bull Queue
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});Pattern 2: Pipeline Architecture
لتدفقات العمل المعقدة خط الأنابيب حيث تعالج كل مرحلة شاغلا مختلفا:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouseالتنفيذ
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}استراتيجيات التصعيد الأفقي
الاستراتيجية 1: النشر المتعدد الوسائط
يوزع العمال عبر آلات متعددة ويشكل هذا السؤال نقطة التنسيق:
| العنصر | النشر | Scaling |
|---|---|---|
| Queue (Redis/RabbitMQ) | خادوم مخصص أو خدمة مُدارة | كتابي (مزيد من RAM) |
| العمال | آلات أو حاويات متعددة | Horizontal (add instances) |
| تخزين النتائج | قاعدة البيانات أو مخزن الجسم | ترجمة: |
| الرصد | لوح مظلات مركزية | حالة واحدة |
الاستراتيجية 2: تصعيد الحاويات
استخدموا (دوكر) و(كوبرنيت) للارتقاء بمستوى كبير ويدير كل عامل في حاوية يمكن تكرارها:
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redisProxy Management at Scale
وعلى نطاق واسع، تصبح الإدارة البديلة عنصرا حاسما في النظام. الاعتبارات الرئيسية:
Connection Pooling
إعادة استخدام الاتصالات إلى البوابة العميلة بدلاً من إنشاء خطوط جديدة لكل طلب. وهذا يقلل من الرطوبة والارتباطات العامة:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)الرصد الصحي
رصد أداءك المحترف في الوقت الحقيقي لاكتشاف القضايا في وقت مبكر:
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)تخزين البيانات في سكال
| نوع التخزين | الأفضل | Scale |
|---|---|---|
| PostgreSQL | بيانات المنتجات/التجهيزات الهيكلية | ملايين الصفوف |
| MongoDB | الكيماويات شبه الهيكلية/الثابتة | بلايين الوثائق |
| S3/Object storage | Rw HTML archives | Petabytes |
| الفوضى | البحث الكامل عن البيانات المشطوبة | بلايين الوثائق |
| ClickHouse | تحليل البيانات الضخمة | تريليونات الصفوف |
القائمة المرجعية للتوسع
- الاكتشافات الدنيوية من الجلب استخدمي سائل بين المراحل
- تنفيذ منطق إعادة النظر السليم. التراجع العرضي عن الإخفاقات المستمرة
- راقب كل شيء العمق الكمي، معدلات النجاح، الرطوبة، معدلات الخطأ لكل مجال مستهدف.
- إستعمال التجمّع Reuse proxy connections instead of creating new ones per request.
- خطة للفشل العمال ينهارون ويغلقون العملاء ويغيرون هيكلهم بناء القدرة على الصمود في كل طبقة
- اختبار على المقياس قبل الإطلاق A system that works at 100 RPM may fail at 10,000 RPM due to memory, connection limits, or queue bottlenecks.
لاستراتيجيات التناوب المحترفة التي تكمّل هيكلك Proxy Rotation Strategies for Large-Scale Scraping-لتعامل مع الحد الأقصى للمعدلات الحد الأقصى.
استخدام Python SDK.. Node SDKأو Go SDK من أجل تحقيق التكامل المحترف في الإنتاج، واستكشاف خطط الوكيل من أجل الخردة الكبيرة
الأسئلة المتكررة
ما هو أفضل نظام للخردة على نطاق؟
ويعمل نظام " ريدز " (Node.js) أو " RQ (Python) على نحو جيد يصل إلى ملايين المهام في اليوم. وبالنسبة لحجم أكبر، فإن أباتشي كافكا أو رابيت ماك يوفران قدرا أفضل من القابلية للدوام ومنافذ. الاختيار على أساس البنية التحتية وخبرة الفريق
كم من العمال المتزامنين يجب أن أركض؟
إبدأ مع 10-20 عامل ومقياس بناءً على قدراتك المحترفة و تسامح الموقع رصد معدلات النجاح - إذا انخفضت إلى أقل من 90 في المائة، فإن ذلك يقلل من التناسق قبل إضافة المزيد من العمال. كل عامل من خلال (بروكسيهات) يحصل على تناوب أوتوماتيكي
هل أستخدم الأسينك أو خيط العمال؟
أما بالنسبة للخردة المتجهة إلى الشبكة الدولية (المعظمة)، فإن الأسينك (Python asyncio, Node.js) توفر كفاءة أفضل من كفاءة استخدام الموارد. استخدام الخياطة أو التجهيز المتعدد فقط عندما تحتاج إلى حزمة ثقيلة من وحدة منع الحمل إلى جانب الجلب. إذهبي إلى (غوروتينز) يفرّق في كلا النمطين
كيف أتعامل مع تغيرات هيكل الموقع؟
تنفيذ المصادقة على البيانات في خطك When parsed data fails validation (missing fields, wrong types), alert your team and queue affected URLs for re- processing with updated parsers. اكسروا حزمكم حتى تتمكنوا من العودة إذا لزم الأمر






