Proxy-Anfragen mit Nebenläufigkeitskontrolle skalieren

Master Concurrency-Muster für Proxy-basierte Abstreifung: Asyncio semaphores, Promise Pools, Go Worker Pools, Ratenbegrenzer und Gegendruck. Produktionscode in Python, Node.js und Go.

Proxy-Anfragen mit Nebenläufigkeitskontrolle skalieren

Warum Concurrency Control Matter für Proxy Scrap

Senden von Anfragen sequentiell durch eine Proxy verschwendet Bandbreite und Zeit. Sie alle auf einmal überfordert das Proxy-Gateway, den Zielserver und Ihr eigenes System. Die Konkurrenzkontrolle trifft auf die Waage – maximiert den Durchsatz während des Aufenthaltes in den Grenzen Ihres Proxypools, der Zielorttoleranz und der verfügbaren Ressourcen.

Dieser Leitfaden umfasst produktionsfähige Koncurrency-Muster in drei Sprachen: Python (asyncio), Node.js (Promise-Pools) und Go (Griechenland mit Semaphoren). Jedes Beispiel verwendet ProxyHats rotierende Wohn-Proxie und ist bereit, in Ihre Projekte zu kopieren.

Das Ziel der Koncurrency Control ist einfach: maximieren Sie Anfragen pro Sekunde, ohne Blöcke auszulösen, Speicher auszuschöpfen oder Ihren Prozess abzustürzen. Das richtige Muster hängt von der Sprache, der Zielseite und der Skala ab.

Konkurrenzmuster verglichen

Konkurrenzmuster verglichen
MusterSpracheDas Beste fürMax Concurrency
asyncio. SemaphorePythonI/O-gebundenes Abkratzen50-200 pro Prozess
Worker Pool (asyncio)PythonAufgabenqueues mit Gegendruck10-100 Arbeitnehmer
Versprechen.all + BatchNode.jsEinfaches paralleles Abholen50-500 pro Prozess
p-limit / p-queueNode.jsFeinkörnige Konkurrenz10-200 pro Warteschlange
Goroutines + SemaphoreLos!Hochdurchsatz-Schrott100-1000+
Worker Pool (Go Kanäle)Los!Strukturierte Aufgabenverteilung10.500 Arbeitnehmer

Python: Asyncio Semaphore

Das einfachste und effektivste Konkurrenzmuster in Python. Eine Semaphore begrenzt, wie viele Coroutinen gleichzeitig ausführen können, wodurch Ressourcenausschöpfung verhindert wird.

import asyncio
import aiohttp
import uuid
import time
PROXY_GATEWAY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
MAX_CONCURRENCY = 50
TIMEOUT = aiohttp.ClientTimeout(total=30)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> dict:
    async with semaphore:
        session_id = uuid.uuid4().hex[:8]
        proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
        start = time.time()
        try:
            async with session.get(url, proxy=proxy, timeout=TIMEOUT) as response:
                body = await response.text()
                return {
                    "url": url,
                    "status": response.status,
                    "length": len(body),
                    "latency": round(time.time() - start, 3),
                }
        except Exception as e:
            return {"url": url, "error": str(e), "latency": round(time.time() - start, 3)}
async def scrape_all(urls: list[str]) -> list[dict]:
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
    return results
# Usage
urls = [f"https://example.com/product/{i}" for i in range(1000)]
results = asyncio.run(scrape_all(urls))
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(results)} successful")
print(f"Avg latency: {sum(r['latency'] for r in results) / len(results):.3f}s")

Python: Worker Pool mit Gegendruck

Wenn Sie mehr Kontrolle benötigen — Rate Begrenzung, Gegendruck oder Prioritätsplanung — verwenden Sie einen Worker Pool mit einem Asyncio. Warte.

import asyncio
import aiohttp
import uuid
class WorkerPool:
    """Fixed-size worker pool with backpressure via bounded queue."""
    def __init__(self, num_workers: int = 20, queue_size: int = 100):
        self.num_workers = num_workers
        self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
        self.results: list = []
        self.stats = {"success": 0, "failed": 0, "total_latency": 0.0}
        self._stop = False
    async def worker(self, session: aiohttp.ClientSession, worker_id: int):
        while not self._stop:
            try:
                url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
            except asyncio.TimeoutError:
                if self._stop:
                    break
                continue
            session_id = uuid.uuid4().hex[:8]
            proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
            import time
            start = time.time()
            try:
                async with session.get(
                    url, proxy=proxy,
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as response:
                    body = await response.text()
                    latency = time.time() - start
                    self.stats["success"] += 1
                    self.stats["total_latency"] += latency
                    self.results.append({
                        "url": url, "status": response.status,
                        "length": len(body), "worker": worker_id,
                    })
            except Exception as e:
                self.stats["failed"] += 1
                self.results.append({"url": url, "error": str(e), "worker": worker_id})
            finally:
                self.queue.task_done()
    async def run(self, urls: list[str]) -> list[dict]:
        async with aiohttp.ClientSession() as session:
            # Start workers
            workers = [
                asyncio.create_task(self.worker(session, i))
                for i in range(self.num_workers)
            ]
            # Feed URLs into the queue (backpressure: blocks when queue is full)
            for url in urls:
                await self.queue.put(url)
            # Wait for all tasks to complete
            await self.queue.join()
            self._stop = True
            # Cancel workers
            for w in workers:
                w.cancel()
        return self.results
# Usage
pool = WorkerPool(num_workers=30, queue_size=50)
urls = [f"https://example.com/item/{i}" for i in range(500)]
results = asyncio.run(pool.run(urls))
print(f"Success: {pool.stats['success']}, Failed: {pool.stats['failed']}")
avg_lat = pool.stats["total_latency"] / max(pool.stats["success"], 1)
print(f"Avg latency: {avg_lat:.3f}s")

Python: Preisbegrenzer

Einige Ziele setzen strenge Ratenlimits durch. Dieser Token-Bucket-Ratenbegrenzer integriert sich mit den oben genannten Konkurrenzmustern.

import asyncio
import time
class RateLimiter:
    """Token-bucket rate limiter for async operations."""
    def __init__(self, rate: float, burst: int = 1):
        """
        Args:
            rate: Requests per second
            burst: Maximum burst size
        """
        self.rate = rate
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()
    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now
            if self.tokens < 1:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1
# Combined with semaphore
async def fetch_rate_limited(session, url, semaphore, limiter):
    async with semaphore:
        await limiter.acquire()
        session_id = uuid.uuid4().hex[:8]
        proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
        async with session.get(url, proxy=proxy, timeout=TIMEOUT) as resp:
            return await resp.text()
# 10 requests/second, max 30 concurrent
async def main():
    semaphore = asyncio.Semaphore(30)
    limiter = RateLimiter(rate=10.0, burst=5)
    urls = [f"https://example.com/page/{i}" for i in range(200)]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_rate_limited(session, u, semaphore, limiter) for u in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    success = sum(1 for r in results if not isinstance(r, Exception))
    print(f"Done: {success}/{len(results)}")
asyncio.run(main())

Node.js: Versprochenes Batching

Das einfachste Node.js-Koncurrency-Muster verarbeitet URLs in festformatigen Chargen.

const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const BATCH_SIZE = 20;
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function fetchUrl(url) {
  const agent = createAgent();
  const start = Date.now();
  try {
    const response = await fetch(url, {
      agent,
      signal: AbortSignal.timeout(30000),
    });
    const text = await response.text();
    return {
      url,
      status: response.status,
      length: text.length,
      latency: Date.now() - start,
    };
  } catch (err) {
    return { url, error: err.message, latency: Date.now() - start };
  }
}
async function scrapeInBatches(urls) {
  const results = [];
  for (let i = 0; i < urls.length; i += BATCH_SIZE) {
    const batch = urls.slice(i, i + BATCH_SIZE);
    const batchResults = await Promise.all(batch.map(fetchUrl));
    results.push(...batchResults);
    const success = batchResults.filter(r => !r.error).length;
    console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${success}/${batch.length} OK`);
  }
  return results;
}
// Usage
const urls = Array.from({ length: 200 }, (_, i) =>
  `https://example.com/product/${i + 1}`
);
scrapeInBatches(urls).then(results => {
  const success = results.filter(r => !r.error).length;
  console.log(`Total: ${success}/${results.length} successful`);
});

Node.js: p-Limit für Fein-Grained Control

Für präzise Konkurrenzgrenzen ohne manuelle Chargen, verwenden Sie die p-limit Bibliothek.

// npm install p-limit
const pLimit = require('p-limit');
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const limit = pLimit(30); // Max 30 concurrent requests
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function fetchWithLimit(url) {
  return limit(async () => {
    const agent = createAgent();
    const response = await fetch(url, {
      agent,
      signal: AbortSignal.timeout(30000),
    });
    return {
      url,
      status: response.status,
      body: await response.text(),
    };
  });
}
// All 500 URLs start immediately, but only 30 run concurrently
const urls = Array.from({ length: 500 }, (_, i) =>
  `https://example.com/item/${i + 1}`
);
Promise.all(urls.map(fetchWithLimit)).then(results => {
  const success = results.filter(r => r.status === 200).length;
  console.log(`Success: ${success}/${results.length}`);
});

Node.js: Worker Queue mit Gegendruck

// npm install p-queue
const PQueue = require('p-queue').default;
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const queue = new PQueue({
  concurrency: 25,
  intervalCap: 10,   // Max 10 requests...
  interval: 1000,    // ...per second (rate limiting)
});
queue.on('active', () => {
  console.log(`Active: ${queue.pending} pending, ${queue.size} queued`);
});
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function processUrl(url) {
  const agent = createAgent();
  const response = await fetch(url, { agent, signal: AbortSignal.timeout(30000) });
  return { url, status: response.status, body: await response.text() };
}
// Add URLs to the queue
const urls = Array.from({ length: 1000 }, (_, i) =>
  `https://example.com/page/${i + 1}`
);
const results = await Promise.all(
  urls.map(url => queue.add(() => processUrl(url)))
);
console.log(`Completed: ${results.filter(r => r.status === 200).length}/${results.length}`);

Gehen Sie: Goroutines mit Semaphore

Go's Goroutines sind leicht, aber Sie müssen immer noch Konkurrenz begrenzen, um überwältigende Proxy-Verbindungen zu vermeiden. Eine kanalbasierte Semaphore ist der idiomatische Ansatz.

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"
)
const maxConcurrency = 50
type Result struct {
	URL     string
	Status  int
	Length  int
	Latency time.Duration
	Error   error
}
func newProxyClient() *http.Client {
	b := make([]byte, 4)
	rand.Read(b)
	sessionID := hex.EncodeToString(b)
	proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
	proxyURL, _ := url.Parse(proxyStr)
	return &http.Client{
		Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
		Timeout:   30 * time.Second,
	}
}
func fetchURL(target string, sem chan struct{}, wg *sync.WaitGroup, results chan<- Result) {
	defer wg.Done()
	sem <- struct{}{}        // Acquire semaphore
	defer func() { <-sem }() // Release semaphore
	client := newProxyClient()
	start := time.Now()
	resp, err := client.Get(target)
	if err != nil {
		results <- Result{URL: target, Error: err, Latency: time.Since(start)}
		return
	}
	defer resp.Body.Close()
	body, _ := io.ReadAll(resp.Body)
	results <- Result{
		URL:     target,
		Status:  resp.StatusCode,
		Length:  len(body),
		Latency: time.Since(start),
	}
}
func main() {
	urls := make([]string, 500)
	for i := range urls {
		urls[i] = fmt.Sprintf("https://example.com/item/%d", i+1)
	}
	sem := make(chan struct{}, maxConcurrency)
	results := make(chan Result, len(urls))
	var wg sync.WaitGroup
	start := time.Now()
	for _, u := range urls {
		wg.Add(1)
		go fetchURL(u, sem, &wg, results)
	}
	// Close results channel when all goroutines finish
	go func() {
		wg.Wait()
		close(results)
	}()
	var success, failed int
	var totalLatency time.Duration
	for r := range results {
		if r.Error != nil {
			failed++
		} else {
			success++
			totalLatency += r.Latency
		}
	}
	elapsed := time.Since(start)
	fmt.Printf("Completed in %s\n", elapsed)
	fmt.Printf("Success: %d, Failed: %d\n", success, failed)
	fmt.Printf("Avg latency: %s\n", totalLatency/time.Duration(max(success, 1)))
	fmt.Printf("Throughput: %.1f req/s\n", float64(success+failed)/elapsed.Seconds())
}

Gehen Sie: Worker Pool mit Kanälen

Für eine strukturiertere Verarbeitung verwenden Sie einen festen Arbeitspool, der von einem Kanal verbraucht wird.

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"
)
type Job struct {
	URL string
}
type JobResult struct {
	URL     string
	Status  int
	Body    string
	Latency time.Duration
	Err     error
}
func worker(id int, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		b := make([]byte, 4)
		rand.Read(b)
		sessionID := hex.EncodeToString(b)
		proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
		proxyURL, _ := url.Parse(proxyStr)
		client := &http.Client{
			Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
			Timeout:   30 * time.Second,
		}
		start := time.Now()
		resp, err := client.Get(job.URL)
		latency := time.Since(start)
		if err != nil {
			results <- JobResult{URL: job.URL, Err: err, Latency: latency}
			continue
		}
		body, _ := io.ReadAll(resp.Body)
		resp.Body.Close()
		results <- JobResult{
			URL:     job.URL,
			Status:  resp.StatusCode,
			Body:    string(body),
			Latency: latency,
		}
	}
}
func main() {
	numWorkers := 30
	urls := make([]string, 300)
	for i := range urls {
		urls[i] = fmt.Sprintf("https://example.com/page/%d", i+1)
	}
	jobs := make(chan Job, len(urls))
	results := make(chan JobResult, len(urls))
	var wg sync.WaitGroup
	// Start workers
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}
	// Send jobs
	for _, u := range urls {
		jobs <- Job{URL: u}
	}
	close(jobs)
	// Collect results
	go func() {
		wg.Wait()
		close(results)
	}()
	var success, failed int
	for r := range results {
		if r.Err != nil {
			failed++
		} else {
			success++
		}
	}
	fmt.Printf("Success: %d, Failed: %d\n", success, failed)
}

Wahl der richtigen Konkurrenzstufe

Die optimale Konkurrenz hängt von mehreren Faktoren ab. Hier ist eine praktische Ausgangspunktführung:

Wahl der richtigen Konkurrenzstufe
ZielartEmpfohlene KonkurrenzBegründung
Leichte APIs (JSON)50-200Schnelle Antworten, niedriger Speicher pro Anfrage
Standard-Webseiten20-50Moderate Antwortgrößen, einige Geschwindigkeitsbegrenzung
Schwere JS-verfügbare Seiten5-15Browser-Kontexte verwenden signifikanten Speicher
Aggressive Anti-Bot-Seiten5-10Brauchen Sie realistisches Timing zwischen Anfragen
Große Datei-Downloads5-20Bandbreite-gebunden, nicht CPU-gebunden
Beginnen Sie mit 10 gleichzeitigen Anfragen und erhöhen Sie allmählich die Erfolgsquoten. Wenn Ihre Erfolgsquote unter 90% sinkt, reduzieren Sie Konkurrenz oder addieren Sie Verzögerungen zwischen Anfragen. Mehr zum Tracking dieser Metriken, siehe unsere Überwachung der Proxy-Leistung Führung.

Für eine wiederverwendbare Proxy-Abstraktion mit eingebauter Konkurrenz siehe Aufbau einer Proxy Middleware Layer. Für End-to-End-Schrott-Architektur lesen Design einer zuverlässigen Scraping Architektur. Entdecken Sie die Python SDK, Node SDK, und SDK zur produktionsbereiten Proxy-Integration oder Überprüfung Preise für ProxyHat zu beginnen.

Häufig gestellte Fragen

Bereit loszulegen?

Zugang zu über 50 Mio. Residential-IPs in über 148 Ländern mit KI-gesteuerter Filterung.

Preise ansehenResidential Proxies
← Zurück zum Blog