توسيع طلبات البروكسي مع التحكم في التزامن

(ج) أنماط التطابق الرئيسية للخردة القائمة على التكسير: الرئوي، ومجمعات الوعود، ومجمعات العمال، والحد من المعدلات، والقمع. رمز الإنتاج في (بايتون) و(نودج) و(جو).

توسيع طلبات البروكسي مع التحكم في التزامن

لماذا مسائل مراقبة التكافل

إرسال الطلبات بالتسلسل من خلال proxy نفايات الضمادات والوقت إرسالهم جميعاً في وقت واحد فوق البوابة العميلة والخادم المستهدف ونظامك الخاص إن مراقبة العملة تضفي على التوازن - وهي تضاعف من الناتج إلى أقصى حد مع البقاء في حدود مجمعكم العميل، وتسامح الموقع المستهدف، والموارد المتاحة.

This guide covers production-grade concurrency patterns in three languages: Python (asyncio), Node.js (Promise pools), and Go (goroutines with semaphores). كل مثال العميلة (بروكسي ها) تقوم بالتناوب وجاهز للنسخ في مشاريعك

هدف التحكم في الاتّفاق بسيط: تعظيم الطلبات للثانية دون تحريك الكتل، الذاكرة المستنفذة، أو تحطّم عمليّتك. النمط الصحيح يعتمد على لغتك وموقعك المستهدف

أنماط العملة المقارنة

أنماط العملة المقارنة
Patternاللغةالأفضل(ماكس)
أسنسيو SemaphorePythonI/O-bound scraping50-200 لكل عملية
مجمّع عامل (اجتماع)Pythonاستفسارات المهام مع التخلف10100 عامل
وعد.Node.jsمتزامن بسيط50-500 لكل عملية
P-limit / p-queueNode.jsتطابق جيد10-200 على التوالي
Goroutines + Semaphoreإذهبخلاصات عالية100-1000+
مجمع العمال (قنوات الذهاب)إذهبالتوزيع الهيكلي للمهام10-500 عامل

Python: asyncio Semaphore

(أبسط وأفعل نمط للتوافق في (بايتون A semaphore limits how many coroutines can execute concurrently, preventing resource ple.

import asyncio
import aiohttp
import uuid
import time
PROXY_GATEWAY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
MAX_CONCURRENCY = 50
TIMEOUT = aiohttp.ClientTimeout(total=30)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> dict:
    async with semaphore:
        session_id = uuid.uuid4().hex[:8]
        proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
        start = time.time()
        try:
            async with session.get(url, proxy=proxy, timeout=TIMEOUT) as response:
                body = await response.text()
                return {
                    "url": url,
                    "status": response.status,
                    "length": len(body),
                    "latency": round(time.time() - start, 3),
                }
        except Exception as e:
            return {"url": url, "error": str(e), "latency": round(time.time() - start, 3)}
async def scrape_all(urls: list[str]) -> list[dict]:
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
    return results
# Usage
urls = [f"https://example.com/product/{i}" for i in range(1000)]
results = asyncio.run(scrape_all(urls))
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(results)} successful")
print(f"Avg latency: {sum(r['latency'] for r in results) / len(results):.3f}s")

Python: Worker Pool with Backpressure

عندما تحتاج إلى المزيد من التحكم - معدل الحد من التخلف، أو الجدولة ذات الأولوية - استخدام بركة العمال مع الأسينثيو. (كويو)

import asyncio
import aiohttp
import uuid
class WorkerPool:
    """Fixed-size worker pool with backpressure via bounded queue."""
    def __init__(self, num_workers: int = 20, queue_size: int = 100):
        self.num_workers = num_workers
        self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
        self.results: list = []
        self.stats = {"success": 0, "failed": 0, "total_latency": 0.0}
        self._stop = False
    async def worker(self, session: aiohttp.ClientSession, worker_id: int):
        while not self._stop:
            try:
                url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
            except asyncio.TimeoutError:
                if self._stop:
                    break
                continue
            session_id = uuid.uuid4().hex[:8]
            proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
            import time
            start = time.time()
            try:
                async with session.get(
                    url, proxy=proxy,
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as response:
                    body = await response.text()
                    latency = time.time() - start
                    self.stats["success"] += 1
                    self.stats["total_latency"] += latency
                    self.results.append({
                        "url": url, "status": response.status,
                        "length": len(body), "worker": worker_id,
                    })
            except Exception as e:
                self.stats["failed"] += 1
                self.results.append({"url": url, "error": str(e), "worker": worker_id})
            finally:
                self.queue.task_done()
    async def run(self, urls: list[str]) -> list[dict]:
        async with aiohttp.ClientSession() as session:
            # Start workers
            workers = [
                asyncio.create_task(self.worker(session, i))
                for i in range(self.num_workers)
            ]
            # Feed URLs into the queue (backpressure: blocks when queue is full)
            for url in urls:
                await self.queue.put(url)
            # Wait for all tasks to complete
            await self.queue.join()
            self._stop = True
            # Cancel workers
            for w in workers:
                w.cancel()
        return self.results
# Usage
pool = WorkerPool(num_workers=30, queue_size=50)
urls = [f"https://example.com/item/{i}" for i in range(500)]
results = asyncio.run(pool.run(urls))
print(f"Success: {pool.stats['success']}, Failed: {pool.stats['failed']}")
avg_lat = pool.stats["total_latency"] / max(pool.stats["success"], 1)
print(f"Avg latency: {avg_lat:.3f}s")

بيت ليميتر

وتفرض بعض الأهداف حدودا صارمة للمعدلات. ويندمج هذا الحد الأقصى لمعدلات البطن المكسور مع أنماط التطابق أعلاه.

import asyncio
import time
class RateLimiter:
    """Token-bucket rate limiter for async operations."""
    def __init__(self, rate: float, burst: int = 1):
        """
        Args:
            rate: Requests per second
            burst: Maximum burst size
        """
        self.rate = rate
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()
    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now
            if self.tokens < 1:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1
# Combined with semaphore
async def fetch_rate_limited(session, url, semaphore, limiter):
    async with semaphore:
        await limiter.acquire()
        session_id = uuid.uuid4().hex[:8]
        proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
        async with session.get(url, proxy=proxy, timeout=TIMEOUT) as resp:
            return await resp.text()
# 10 requests/second, max 30 concurrent
async def main():
    semaphore = asyncio.Semaphore(30)
    limiter = RateLimiter(rate=10.0, burst=5)
    urls = [f"https://example.com/page/{i}" for i in range(200)]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_rate_limited(session, u, semaphore, limiter) for u in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    success = sum(1 for r in results if not isinstance(r, Exception))
    print(f"Done: {success}/{len(results)}")
asyncio.run(main())

عِدْ باتشينغ

The simplest Node.js concurrency pattern processes URLs in fixed-size batches.

const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const BATCH_SIZE = 20;
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function fetchUrl(url) {
  const agent = createAgent();
  const start = Date.now();
  try {
    const response = await fetch(url, {
      agent,
      signal: AbortSignal.timeout(30000),
    });
    const text = await response.text();
    return {
      url,
      status: response.status,
      length: text.length,
      latency: Date.now() - start,
    };
  } catch (err) {
    return { url, error: err.message, latency: Date.now() - start };
  }
}
async function scrapeInBatches(urls) {
  const results = [];
  for (let i = 0; i < urls.length; i += BATCH_SIZE) {
    const batch = urls.slice(i, i + BATCH_SIZE);
    const batchResults = await Promise.all(batch.map(fetchUrl));
    results.push(...batchResults);
    const success = batchResults.filter(r => !r.error).length;
    console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${success}/${batch.length} OK`);
  }
  return results;
}
// Usage
const urls = Array.from({ length: 200 }, (_, i) =>
  `https://example.com/product/${i + 1}`
);
scrapeInBatches(urls).then(results => {
  const success = results.filter(r => !r.error).length;
  console.log(`Total: ${success}/${results.length} successful`);
});

Node.js: p-limit for fine-Grained Control

لفرض حدود دقيقة للاتفاق بدون دفع يدوي، استخدام p-limit المكتبة

// npm install p-limit
const pLimit = require('p-limit');
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const limit = pLimit(30); // Max 30 concurrent requests
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function fetchWithLimit(url) {
  return limit(async () => {
    const agent = createAgent();
    const response = await fetch(url, {
      agent,
      signal: AbortSignal.timeout(30000),
    });
    return {
      url,
      status: response.status,
      body: await response.text(),
    };
  });
}
// All 500 URLs start immediately, but only 30 run concurrently
const urls = Array.from({ length: 500 }, (_, i) =>
  `https://example.com/item/${i + 1}`
);
Promise.all(urls.map(fetchWithLimit)).then(results => {
  const success = results.filter(r => r.status === 200).length;
  console.log(`Success: ${success}/${results.length}`);
});

Node.js: Worker Queue with Backpressure

// npm install p-queue
const PQueue = require('p-queue').default;
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const queue = new PQueue({
  concurrency: 25,
  intervalCap: 10,   // Max 10 requests...
  interval: 1000,    // ...per second (rate limiting)
});
queue.on('active', () => {
  console.log(`Active: ${queue.pending} pending, ${queue.size} queued`);
});
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function processUrl(url) {
  const agent = createAgent();
  const response = await fetch(url, { agent, signal: AbortSignal.timeout(30000) });
  return { url, status: response.status, body: await response.text() };
}
// Add URLs to the queue
const urls = Array.from({ length: 1000 }, (_, i) =>
  `https://example.com/page/${i + 1}`
);
const results = await Promise.all(
  urls.map(url => queue.add(() => processUrl(url)))
);
console.log(`Completed: ${results.filter(r => r.status === 200).length}/${results.length}`);

Go: Goroutines with Semaphore

"الغوروتينات" خفيفة الوزن، لكنّك ما زلت بحاجة إلى الحدّ من الاتّفاق لتجنب الاتّصالات المحترفة الساحقة. والسيلمافوري القائم على القنوات هو النهج الإيديولوجي.

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"
)
const maxConcurrency = 50
type Result struct {
	URL     string
	Status  int
	Length  int
	Latency time.Duration
	Error   error
}
func newProxyClient() *http.Client {
	b := make([]byte, 4)
	rand.Read(b)
	sessionID := hex.EncodeToString(b)
	proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
	proxyURL, _ := url.Parse(proxyStr)
	return &http.Client{
		Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
		Timeout:   30 * time.Second,
	}
}
func fetchURL(target string, sem chan struct{}, wg *sync.WaitGroup, results chan<- Result) {
	defer wg.Done()
	sem <- struct{}{}        // Acquire semaphore
	defer func() { <-sem }() // Release semaphore
	client := newProxyClient()
	start := time.Now()
	resp, err := client.Get(target)
	if err != nil {
		results <- Result{URL: target, Error: err, Latency: time.Since(start)}
		return
	}
	defer resp.Body.Close()
	body, _ := io.ReadAll(resp.Body)
	results <- Result{
		URL:     target,
		Status:  resp.StatusCode,
		Length:  len(body),
		Latency: time.Since(start),
	}
}
func main() {
	urls := make([]string, 500)
	for i := range urls {
		urls[i] = fmt.Sprintf("https://example.com/item/%d", i+1)
	}
	sem := make(chan struct{}, maxConcurrency)
	results := make(chan Result, len(urls))
	var wg sync.WaitGroup
	start := time.Now()
	for _, u := range urls {
		wg.Add(1)
		go fetchURL(u, sem, &wg, results)
	}
	// Close results channel when all goroutines finish
	go func() {
		wg.Wait()
		close(results)
	}()
	var success, failed int
	var totalLatency time.Duration
	for r := range results {
		if r.Error != nil {
			failed++
		} else {
			success++
			totalLatency += r.Latency
		}
	}
	elapsed := time.Since(start)
	fmt.Printf("Completed in %s\n", elapsed)
	fmt.Printf("Success: %d, Failed: %d\n", success, failed)
	fmt.Printf("Avg latency: %s\n", totalLatency/time.Duration(max(success, 1)))
	fmt.Printf("Throughput: %.1f req/s\n", float64(success+failed)/elapsed.Seconds())
}

Go: Worker Pool with Channels

وللتجهيز الأكثر تنظيما، تستخدم مجموعة ثابتة من العمال الذين يستهلكون من قناة.

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"
)
type Job struct {
	URL string
}
type JobResult struct {
	URL     string
	Status  int
	Body    string
	Latency time.Duration
	Err     error
}
func worker(id int, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		b := make([]byte, 4)
		rand.Read(b)
		sessionID := hex.EncodeToString(b)
		proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
		proxyURL, _ := url.Parse(proxyStr)
		client := &http.Client{
			Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
			Timeout:   30 * time.Second,
		}
		start := time.Now()
		resp, err := client.Get(job.URL)
		latency := time.Since(start)
		if err != nil {
			results <- JobResult{URL: job.URL, Err: err, Latency: latency}
			continue
		}
		body, _ := io.ReadAll(resp.Body)
		resp.Body.Close()
		results <- JobResult{
			URL:     job.URL,
			Status:  resp.StatusCode,
			Body:    string(body),
			Latency: latency,
		}
	}
}
func main() {
	numWorkers := 30
	urls := make([]string, 300)
	for i := range urls {
		urls[i] = fmt.Sprintf("https://example.com/page/%d", i+1)
	}
	jobs := make(chan Job, len(urls))
	results := make(chan JobResult, len(urls))
	var wg sync.WaitGroup
	// Start workers
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}
	// Send jobs
	for _, u := range urls {
		jobs <- Job{URL: u}
	}
	close(jobs)
	// Collect results
	go func() {
		wg.Wait()
		close(results)
	}()
	var success, failed int
	for r := range results {
		if r.Err != nil {
			failed++
		} else {
			success++
		}
	}
	fmt.Printf("Success: %d, Failed: %d\n", success, failed)
}

اختيار مستوى الضمانة الصحيحة

ويتوقف التطابق الأمثل على عدة عوامل. وهنا دليل عملي لنقاط البدء:

اختيار مستوى الضمانة الصحيحة
النوع المستهدفالعملة الموصى بهاالسبب
معاملات الوزن الخفيف50-200الاستجابات السريعة، انخفاض الذاكرة لكل طلب
الصفحات الشبكية الموحدة20-50أحجام الاستجابة الحديثة، بعض المعدل الذي يحد من
Heavy JS-render pages5-15سياقات الحشد تستخدم ذكريات كبيرة
المواقع المعتدية المضادة للدبابات5-10الحاجة إلى توقيت واقعي بين الطلبات
تحميلات كبيرة من الملفات5-20Bandwidth-bound, not CPU
البدء بعشرة طلبات متزامنة وزيادة تدريجية مع رصد معدلات النجاح. إذا انخفض معدل نجاحك إلى أقل من 90 في المائة، خفض التناسق أو إضافة التأخيرات بين الطلبات. من أجل المزيد عن تعقب هذه القياسات Monitoring Proxy Performance دليل

من أجل ردة فعل متبادلة قابلة لإعادة الاستخدام بناء شركة بروسي ميدلوار- يستعاض عن عبارة " هيكل الخردة في نهاية المطاف " بعبارة " تصميم هيكل ثابت موثوق به- استكشاف Python SDK.. Node SDKو Go SDK من أجل الدمج الجاهز للإنتاج، أو التحقق تسعير ProxyHat للبدء

الأسئلة المتكررة

¿Listo para empezar?

Accede a más de 50M de IPs residenciales en más de 148 países con filtrado impulsado por IA.

Ver preciosProxies residenciales
← Volver al Blog