Warum Concurrency Control Matter für Proxy Scrap
Senden von Anfragen sequentiell durch eine Proxy verschwendet Bandbreite und Zeit. Sie alle auf einmal überfordert das Proxy-Gateway, den Zielserver und Ihr eigenes System. Die Konkurrenzkontrolle trifft auf die Waage – maximiert den Durchsatz während des Aufenthaltes in den Grenzen Ihres Proxypools, der Zielorttoleranz und der verfügbaren Ressourcen.
Dieser Leitfaden umfasst produktionsfähige Koncurrency-Muster in drei Sprachen: Python (asyncio), Node.js (Promise-Pools) und Go (Griechenland mit Semaphoren). Jedes Beispiel verwendet ProxyHats rotierende Wohn-Proxie und ist bereit, in Ihre Projekte zu kopieren.
Das Ziel der Koncurrency Control ist einfach: maximieren Sie Anfragen pro Sekunde, ohne Blöcke auszulösen, Speicher auszuschöpfen oder Ihren Prozess abzustürzen. Das richtige Muster hängt von der Sprache, der Zielseite und der Skala ab.
Konkurrenzmuster verglichen
| Muster | Sprache | Das Beste für | Max Concurrency |
|---|---|---|---|
| asyncio. Semaphore | Python | I/O-gebundenes Abkratzen | 50-200 pro Prozess |
| Worker Pool (asyncio) | Python | Aufgabenqueues mit Gegendruck | 10-100 Arbeitnehmer |
| Versprechen.all + Batch | Node.js | Einfaches paralleles Abholen | 50-500 pro Prozess |
| p-limit / p-queue | Node.js | Feinkörnige Konkurrenz | 10-200 pro Warteschlange |
| Goroutines + Semaphore | Los! | Hochdurchsatz-Schrott | 100-1000+ |
| Worker Pool (Go Kanäle) | Los! | Strukturierte Aufgabenverteilung | 10.500 Arbeitnehmer |
Python: Asyncio Semaphore
Das einfachste und effektivste Konkurrenzmuster in Python. Eine Semaphore begrenzt, wie viele Coroutinen gleichzeitig ausführen können, wodurch Ressourcenausschöpfung verhindert wird.
import asyncio
import aiohttp
import uuid
import time
PROXY_GATEWAY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
MAX_CONCURRENCY = 50
TIMEOUT = aiohttp.ClientTimeout(total=30)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
start = time.time()
try:
async with session.get(url, proxy=proxy, timeout=TIMEOUT) as response:
body = await response.text()
return {
"url": url,
"status": response.status,
"length": len(body),
"latency": round(time.time() - start, 3),
}
except Exception as e:
return {"url": url, "error": str(e), "latency": round(time.time() - start, 3)}
async def scrape_all(urls: list[str]) -> list[dict]:
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [f"https://example.com/product/{i}" for i in range(1000)]
results = asyncio.run(scrape_all(urls))
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(results)} successful")
print(f"Avg latency: {sum(r['latency'] for r in results) / len(results):.3f}s")
Python: Worker Pool mit Gegendruck
Wenn Sie mehr Kontrolle benötigen — Rate Begrenzung, Gegendruck oder Prioritätsplanung — verwenden Sie einen Worker Pool mit einem Asyncio. Warte.
import asyncio
import aiohttp
import uuid
class WorkerPool:
"""Fixed-size worker pool with backpressure via bounded queue."""
def __init__(self, num_workers: int = 20, queue_size: int = 100):
self.num_workers = num_workers
self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
self.results: list = []
self.stats = {"success": 0, "failed": 0, "total_latency": 0.0}
self._stop = False
async def worker(self, session: aiohttp.ClientSession, worker_id: int):
while not self._stop:
try:
url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
except asyncio.TimeoutError:
if self._stop:
break
continue
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
import time
start = time.time()
try:
async with session.get(
url, proxy=proxy,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
body = await response.text()
latency = time.time() - start
self.stats["success"] += 1
self.stats["total_latency"] += latency
self.results.append({
"url": url, "status": response.status,
"length": len(body), "worker": worker_id,
})
except Exception as e:
self.stats["failed"] += 1
self.results.append({"url": url, "error": str(e), "worker": worker_id})
finally:
self.queue.task_done()
async def run(self, urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
# Start workers
workers = [
asyncio.create_task(self.worker(session, i))
for i in range(self.num_workers)
]
# Feed URLs into the queue (backpressure: blocks when queue is full)
for url in urls:
await self.queue.put(url)
# Wait for all tasks to complete
await self.queue.join()
self._stop = True
# Cancel workers
for w in workers:
w.cancel()
return self.results
# Usage
pool = WorkerPool(num_workers=30, queue_size=50)
urls = [f"https://example.com/item/{i}" for i in range(500)]
results = asyncio.run(pool.run(urls))
print(f"Success: {pool.stats['success']}, Failed: {pool.stats['failed']}")
avg_lat = pool.stats["total_latency"] / max(pool.stats["success"], 1)
print(f"Avg latency: {avg_lat:.3f}s")
Python: Preisbegrenzer
Einige Ziele setzen strenge Ratenlimits durch. Dieser Token-Bucket-Ratenbegrenzer integriert sich mit den oben genannten Konkurrenzmustern.
import asyncio
import time
class RateLimiter:
"""Token-bucket rate limiter for async operations."""
def __init__(self, rate: float, burst: int = 1):
"""
Args:
rate: Requests per second
burst: Maximum burst size
"""
self.rate = rate
self.burst = burst
self.tokens = burst
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
# Combined with semaphore
async def fetch_rate_limited(session, url, semaphore, limiter):
async with semaphore:
await limiter.acquire()
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
async with session.get(url, proxy=proxy, timeout=TIMEOUT) as resp:
return await resp.text()
# 10 requests/second, max 30 concurrent
async def main():
semaphore = asyncio.Semaphore(30)
limiter = RateLimiter(rate=10.0, burst=5)
urls = [f"https://example.com/page/{i}" for i in range(200)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_rate_limited(session, u, semaphore, limiter) for u in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = sum(1 for r in results if not isinstance(r, Exception))
print(f"Done: {success}/{len(results)}")
asyncio.run(main())
Node.js: Versprochenes Batching
Das einfachste Node.js-Koncurrency-Muster verarbeitet URLs in festformatigen Chargen.
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const BATCH_SIZE = 20;
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function fetchUrl(url) {
const agent = createAgent();
const start = Date.now();
try {
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
const text = await response.text();
return {
url,
status: response.status,
length: text.length,
latency: Date.now() - start,
};
} catch (err) {
return { url, error: err.message, latency: Date.now() - start };
}
}
async function scrapeInBatches(urls) {
const results = [];
for (let i = 0; i < urls.length; i += BATCH_SIZE) {
const batch = urls.slice(i, i + BATCH_SIZE);
const batchResults = await Promise.all(batch.map(fetchUrl));
results.push(...batchResults);
const success = batchResults.filter(r => !r.error).length;
console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${success}/${batch.length} OK`);
}
return results;
}
// Usage
const urls = Array.from({ length: 200 }, (_, i) =>
`https://example.com/product/${i + 1}`
);
scrapeInBatches(urls).then(results => {
const success = results.filter(r => !r.error).length;
console.log(`Total: ${success}/${results.length} successful`);
});
Node.js: p-Limit für Fein-Grained Control
Für präzise Konkurrenzgrenzen ohne manuelle Chargen, verwenden Sie die p-limit Bibliothek.
// npm install p-limit
const pLimit = require('p-limit');
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const limit = pLimit(30); // Max 30 concurrent requests
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function fetchWithLimit(url) {
return limit(async () => {
const agent = createAgent();
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
return {
url,
status: response.status,
body: await response.text(),
};
});
}
// All 500 URLs start immediately, but only 30 run concurrently
const urls = Array.from({ length: 500 }, (_, i) =>
`https://example.com/item/${i + 1}`
);
Promise.all(urls.map(fetchWithLimit)).then(results => {
const success = results.filter(r => r.status === 200).length;
console.log(`Success: ${success}/${results.length}`);
});
Node.js: Worker Queue mit Gegendruck
// npm install p-queue
const PQueue = require('p-queue').default;
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const queue = new PQueue({
concurrency: 25,
intervalCap: 10, // Max 10 requests...
interval: 1000, // ...per second (rate limiting)
});
queue.on('active', () => {
console.log(`Active: ${queue.pending} pending, ${queue.size} queued`);
});
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function processUrl(url) {
const agent = createAgent();
const response = await fetch(url, { agent, signal: AbortSignal.timeout(30000) });
return { url, status: response.status, body: await response.text() };
}
// Add URLs to the queue
const urls = Array.from({ length: 1000 }, (_, i) =>
`https://example.com/page/${i + 1}`
);
const results = await Promise.all(
urls.map(url => queue.add(() => processUrl(url)))
);
console.log(`Completed: ${results.filter(r => r.status === 200).length}/${results.length}`);
Gehen Sie: Goroutines mit Semaphore
Go's Goroutines sind leicht, aber Sie müssen immer noch Konkurrenz begrenzen, um überwältigende Proxy-Verbindungen zu vermeiden. Eine kanalbasierte Semaphore ist der idiomatische Ansatz.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
const maxConcurrency = 50
type Result struct {
URL string
Status int
Length int
Latency time.Duration
Error error
}
func newProxyClient() *http.Client {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
proxyURL, _ := url.Parse(proxyStr)
return &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
}
func fetchURL(target string, sem chan struct{}, wg *sync.WaitGroup, results chan<- Result) {
defer wg.Done()
sem <- struct{}{} // Acquire semaphore
defer func() { <-sem }() // Release semaphore
client := newProxyClient()
start := time.Now()
resp, err := client.Get(target)
if err != nil {
results <- Result{URL: target, Error: err, Latency: time.Since(start)}
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
results <- Result{
URL: target,
Status: resp.StatusCode,
Length: len(body),
Latency: time.Since(start),
}
}
func main() {
urls := make([]string, 500)
for i := range urls {
urls[i] = fmt.Sprintf("https://example.com/item/%d", i+1)
}
sem := make(chan struct{}, maxConcurrency)
results := make(chan Result, len(urls))
var wg sync.WaitGroup
start := time.Now()
for _, u := range urls {
wg.Add(1)
go fetchURL(u, sem, &wg, results)
}
// Close results channel when all goroutines finish
go func() {
wg.Wait()
close(results)
}()
var success, failed int
var totalLatency time.Duration
for r := range results {
if r.Error != nil {
failed++
} else {
success++
totalLatency += r.Latency
}
}
elapsed := time.Since(start)
fmt.Printf("Completed in %s\n", elapsed)
fmt.Printf("Success: %d, Failed: %d\n", success, failed)
fmt.Printf("Avg latency: %s\n", totalLatency/time.Duration(max(success, 1)))
fmt.Printf("Throughput: %.1f req/s\n", float64(success+failed)/elapsed.Seconds())
}
Gehen Sie: Worker Pool mit Kanälen
Für eine strukturiertere Verarbeitung verwenden Sie einen festen Arbeitspool, der von einem Kanal verbraucht wird.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type Job struct {
URL string
}
type JobResult struct {
URL string
Status int
Body string
Latency time.Duration
Err error
}
func worker(id int, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
proxyURL, _ := url.Parse(proxyStr)
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
start := time.Now()
resp, err := client.Get(job.URL)
latency := time.Since(start)
if err != nil {
results <- JobResult{URL: job.URL, Err: err, Latency: latency}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- JobResult{
URL: job.URL,
Status: resp.StatusCode,
Body: string(body),
Latency: latency,
}
}
}
func main() {
numWorkers := 30
urls := make([]string, 300)
for i := range urls {
urls[i] = fmt.Sprintf("https://example.com/page/%d", i+1)
}
jobs := make(chan Job, len(urls))
results := make(chan JobResult, len(urls))
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Send jobs
for _, u := range urls {
jobs <- Job{URL: u}
}
close(jobs)
// Collect results
go func() {
wg.Wait()
close(results)
}()
var success, failed int
for r := range results {
if r.Err != nil {
failed++
} else {
success++
}
}
fmt.Printf("Success: %d, Failed: %d\n", success, failed)
}
Wahl der richtigen Konkurrenzstufe
Die optimale Konkurrenz hängt von mehreren Faktoren ab. Hier ist eine praktische Ausgangspunktführung:
| Zielart | Empfohlene Konkurrenz | Begründung |
|---|---|---|
| Leichte APIs (JSON) | 50-200 | Schnelle Antworten, niedriger Speicher pro Anfrage |
| Standard-Webseiten | 20-50 | Moderate Antwortgrößen, einige Geschwindigkeitsbegrenzung |
| Schwere JS-verfügbare Seiten | 5-15 | Browser-Kontexte verwenden signifikanten Speicher |
| Aggressive Anti-Bot-Seiten | 5-10 | Brauchen Sie realistisches Timing zwischen Anfragen |
| Große Datei-Downloads | 5-20 | Bandbreite-gebunden, nicht CPU-gebunden |
Beginnen Sie mit 10 gleichzeitigen Anfragen und erhöhen Sie allmählich die Erfolgsquoten. Wenn Ihre Erfolgsquote unter 90% sinkt, reduzieren Sie Konkurrenz oder addieren Sie Verzögerungen zwischen Anfragen. Mehr zum Tracking dieser Metriken, siehe unsere Überwachung der Proxy-Leistung Führung.
Für eine wiederverwendbare Proxy-Abstraktion mit eingebauter Konkurrenz siehe Aufbau einer Proxy Middleware Layer. Für End-to-End-Schrott-Architektur lesen Design einer zuverlässigen Scraping Architektur. Entdecken Sie die Python SDK, Node SDK, und SDK zur produktionsbereiten Proxy-Integration oder Überprüfung Preise für ProxyHat zu beginnen.






