Proxy Yararlayıcısı için Neden Yetki Kontrolü Maddeleri
Sending requests sequentially through a proxy proxy proxy Atıklar bant genişliği ve zamanı. Onları bir kez aşırı uçaraktlar, proxy ağ geçidi, hedef sunucu ve kendi sisteminiz. Uygunluk kontrolü dengeyi vuruyor - proxy havuzunuzun sınırları içinde kalıyorken, hedef site toleransı ve mevcut kaynaklar.
Bu kılavuz, üç dilde üretim yeterlilik kalıpları kapsar: Python (asyncio), Node.js (Promise havuzları), ve Go (goroutines with semaphores). Her örnek kullanır ProxyHat'ın dönen konut proxyleri Ve projelerinize kopyalamaya hazır.
Koncurrency kontrolünün amacı basittir: Blokları, egzoz hafızasını veya sürecinizi çarpmadan ikinci olarak en iyi istekler. Doğru model dilinize, hedef sitesine ve ölçeklerinize bağlıdır.
Eşdeğer Desenler Karşılaştırmalı
| Desen | Dil Dili | En iyisi için | Max Concurrency |
|---|---|---|---|
| asyncio. Semafor | Python Python | I/O-bound scraping | 50-200 per process |
| Worker Pool (asyncio) | Python Python | Görev kuyrukları arka baskı ile | 10-100 işçi |
| Söz. bütün + toplulaştırma | Hayır | Basit paralellik | 50-500 per process |
| p-limit / p-queue | Hayır | Güzellik | 10-200 kuyruk başına |
| Goroutines + Semaphore | Go Go Go Go | High-throughput scraping | 100-1000+ |
| Worker Pool (Go kanalları) | Go Go Go Go | Yapılı görev dağılımı | 10-500 işçi |
Python: asyncio Semaphore
Python'daki en basit ve en etkili koncurrency pattern. Bir semaphore, birçok Koroutines'in bir anda nasıl uygulanabileceğini, kaynağın tükenmesini engelleyebilir.
import asyncio
import aiohttp
import uuid
import time
PROXY_GATEWAY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
MAX_CONCURRENCY = 50
TIMEOUT = aiohttp.ClientTimeout(total=30)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
start = time.time()
try:
async with session.get(url, proxy=proxy, timeout=TIMEOUT) as response:
body = await response.text()
return {
"url": url,
"status": response.status,
"length": len(body),
"latency": round(time.time() - start, 3),
}
except Exception as e:
return {"url": url, "error": str(e), "latency": round(time.time() - start, 3)}
async def scrape_all(urls: list[str]) -> list[dict]:
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [f"https://example.com/product/{i}" for i in range(1000)]
results = asyncio.run(scrape_all(urls))
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(results)} successful")
print(f"Avg latency: {sum(r['latency'] for r in results) / len(results):.3f}s")
Python: Backpress ile Worker Pool
Daha fazla kontrole ihtiyacınız olduğunda - limitli, sırt basıncı veya öncelik zamanlaması - bir çalışan havuzu bir asyncio ile kullanın. Queue.
import asyncio
import aiohttp
import uuid
class WorkerPool:
"""Fixed-size worker pool with backpressure via bounded queue."""
def __init__(self, num_workers: int = 20, queue_size: int = 100):
self.num_workers = num_workers
self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
self.results: list = []
self.stats = {"success": 0, "failed": 0, "total_latency": 0.0}
self._stop = False
async def worker(self, session: aiohttp.ClientSession, worker_id: int):
while not self._stop:
try:
url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
except asyncio.TimeoutError:
if self._stop:
break
continue
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
import time
start = time.time()
try:
async with session.get(
url, proxy=proxy,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
body = await response.text()
latency = time.time() - start
self.stats["success"] += 1
self.stats["total_latency"] += latency
self.results.append({
"url": url, "status": response.status,
"length": len(body), "worker": worker_id,
})
except Exception as e:
self.stats["failed"] += 1
self.results.append({"url": url, "error": str(e), "worker": worker_id})
finally:
self.queue.task_done()
async def run(self, urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
# Start workers
workers = [
asyncio.create_task(self.worker(session, i))
for i in range(self.num_workers)
]
# Feed URLs into the queue (backpressure: blocks when queue is full)
for url in urls:
await self.queue.put(url)
# Wait for all tasks to complete
await self.queue.join()
self._stop = True
# Cancel workers
for w in workers:
w.cancel()
return self.results
# Usage
pool = WorkerPool(num_workers=30, queue_size=50)
urls = [f"https://example.com/item/{i}" for i in range(500)]
results = asyncio.run(pool.run(urls))
print(f"Success: {pool.stats['success']}, Failed: {pool.stats['failed']}")
avg_lat = pool.stats["total_latency"] / max(pool.stats["success"], 1)
print(f"Avg latency: {avg_lat:.3f}s")
Python: Rate Limiter
Bazı hedefler katı oran sınırları uygular. Bu token-bucket oranı limiter yukarıdaki koncurrency kalıpları ile bütünleşiyor.
import asyncio
import time
class RateLimiter:
"""Token-bucket rate limiter for async operations."""
def __init__(self, rate: float, burst: int = 1):
"""
Args:
rate: Requests per second
burst: Maximum burst size
"""
self.rate = rate
self.burst = burst
self.tokens = burst
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
# Combined with semaphore
async def fetch_rate_limited(session, url, semaphore, limiter):
async with semaphore:
await limiter.acquire()
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
async with session.get(url, proxy=proxy, timeout=TIMEOUT) as resp:
return await resp.text()
# 10 requests/second, max 30 concurrent
async def main():
semaphore = asyncio.Semaphore(30)
limiter = RateLimiter(rate=10.0, burst=5)
urls = [f"https://example.com/page/{i}" for i in range(200)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_rate_limited(session, u, semaphore, limiter) for u in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = sum(1 for r in results if not isinstance(r, Exception))
print(f"Done: {success}/{len(results)}")
asyncio.run(main())
Node.js: Promise Batching
En basit Node.js koncurrency pattern süreçleri URL'ler sabit boyutlu kümelerde.
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const BATCH_SIZE = 20;
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function fetchUrl(url) {
const agent = createAgent();
const start = Date.now();
try {
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
const text = await response.text();
return {
url,
status: response.status,
length: text.length,
latency: Date.now() - start,
};
} catch (err) {
return { url, error: err.message, latency: Date.now() - start };
}
}
async function scrapeInBatches(urls) {
const results = [];
for (let i = 0; i < urls.length; i += BATCH_SIZE) {
const batch = urls.slice(i, i + BATCH_SIZE);
const batchResults = await Promise.all(batch.map(fetchUrl));
results.push(...batchResults);
const success = batchResults.filter(r => !r.error).length;
console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${success}/${batch.length} OK`);
}
return results;
}
// Usage
const urls = Array.from({ length: 200 }, (_, i) =>
`https://example.com/product/${i + 1}`
);
scrapeInBatches(urls).then(results => {
const success = results.filter(r => !r.error).length;
console.log(`Total: ${success}/${results.length} successful`);
});
Node.js: p-limit for Fine-Grateed Control
Basit koncurrency limitleri manuel toplu olmadan, kullanın p-limit Kütüphane.
// npm install p-limit
const pLimit = require('p-limit');
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const limit = pLimit(30); // Max 30 concurrent requests
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function fetchWithLimit(url) {
return limit(async () => {
const agent = createAgent();
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
return {
url,
status: response.status,
body: await response.text(),
};
});
}
// All 500 URLs start immediately, but only 30 run concurrently
const urls = Array.from({ length: 500 }, (_, i) =>
`https://example.com/item/${i + 1}`
);
Promise.all(urls.map(fetchWithLimit)).then(results => {
const success = results.filter(r => r.status === 200).length;
console.log(`Success: ${success}/${results.length}`);
});
Node.js: Worker Queue with Backbas
// npm install p-queue
const PQueue = require('p-queue').default;
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const queue = new PQueue({
concurrency: 25,
intervalCap: 10, // Max 10 requests...
interval: 1000, // ...per second (rate limiting)
});
queue.on('active', () => {
console.log(`Active: ${queue.pending} pending, ${queue.size} queued`);
});
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function processUrl(url) {
const agent = createAgent();
const response = await fetch(url, { agent, signal: AbortSignal.timeout(30000) });
return { url, status: response.status, body: await response.text() };
}
// Add URLs to the queue
const urls = Array.from({ length: 1000 }, (_, i) =>
`https://example.com/page/${i + 1}`
);
const results = await Promise.all(
urls.map(url => queue.add(() => processUrl(url)))
);
console.log(`Completed: ${results.filter(r => r.status === 200).length}/${results.length}`);
Go: Semaphore ile Goroutines
Go's goroutines hafif, ancak hala ezici proxy bağlantılarından kaçınmak için tutarlılığı sınırlamanız gerekiyor. Bir kanal temelli semaphore deyimomatik yaklaşımdır.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
const maxConcurrency = 50
type Result struct {
URL string
Status int
Length int
Latency time.Duration
Error error
}
func newProxyClient() *http.Client {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
proxyURL, _ := url.Parse(proxyStr)
return &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
}
func fetchURL(target string, sem chan struct{}, wg *sync.WaitGroup, results chan<- Result) {
defer wg.Done()
sem <- struct{}{} // Acquire semaphore
defer func() { <-sem }() // Release semaphore
client := newProxyClient()
start := time.Now()
resp, err := client.Get(target)
if err != nil {
results <- Result{URL: target, Error: err, Latency: time.Since(start)}
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
results <- Result{
URL: target,
Status: resp.StatusCode,
Length: len(body),
Latency: time.Since(start),
}
}
func main() {
urls := make([]string, 500)
for i := range urls {
urls[i] = fmt.Sprintf("https://example.com/item/%d", i+1)
}
sem := make(chan struct{}, maxConcurrency)
results := make(chan Result, len(urls))
var wg sync.WaitGroup
start := time.Now()
for _, u := range urls {
wg.Add(1)
go fetchURL(u, sem, &wg, results)
}
// Close results channel when all goroutines finish
go func() {
wg.Wait()
close(results)
}()
var success, failed int
var totalLatency time.Duration
for r := range results {
if r.Error != nil {
failed++
} else {
success++
totalLatency += r.Latency
}
}
elapsed := time.Since(start)
fmt.Printf("Completed in %s\n", elapsed)
fmt.Printf("Success: %d, Failed: %d\n", success, failed)
fmt.Printf("Avg latency: %s\n", totalLatency/time.Duration(max(success, 1)))
fmt.Printf("Throughput: %.1f req/s\n", float64(success+failed)/elapsed.Seconds())
}
Go: Channels ile Worker Pool
Daha yapılandırılmış bir işlem için, bir kanaldan gelen sabit bir işçi havuzu kullanın.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type Job struct {
URL string
}
type JobResult struct {
URL string
Status int
Body string
Latency time.Duration
Err error
}
func worker(id int, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
proxyURL, _ := url.Parse(proxyStr)
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
start := time.Now()
resp, err := client.Get(job.URL)
latency := time.Since(start)
if err != nil {
results <- JobResult{URL: job.URL, Err: err, Latency: latency}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- JobResult{
URL: job.URL,
Status: resp.StatusCode,
Body: string(body),
Latency: latency,
}
}
}
func main() {
numWorkers := 30
urls := make([]string, 300)
for i := range urls {
urls[i] = fmt.Sprintf("https://example.com/page/%d", i+1)
}
jobs := make(chan Job, len(urls))
results := make(chan JobResult, len(urls))
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Send jobs
for _, u := range urls {
jobs <- Job{URL: u}
}
close(jobs)
// Collect results
go func() {
wg.Wait()
close(results)
}()
var success, failed int
for r := range results {
if r.Err != nil {
failed++
} else {
success++
}
}
fmt.Printf("Success: %d, Failed: %d\n", success, failed)
}
Doğru Koncurrency Seviyeyi Seçin
En uygun koncurrency birkaç faktöre bağlıdır. İşte pratik bir başlangıç noktası rehberi:
| Hedef Türü | Önerilen Concurrency | Sebep Sebep Sebep Sebep |
|---|---|---|
| Hafif APIs (JSON) | 50-200 | Hızlı cevaplar, istek başına düşük hafıza |
| Standart web sayfaları | 20-50 | Moderate yanıt boyutları, bazı oran sınırlayıcı |
| Ağır JS-rendered sayfalar | 5-15 | Tarayıcı bağlamları önemli hafıza kullanır |
| Aggressive anti-bot siteleri | 5-10 | İstekler arasında gerçekçi zamanlamaya ihtiyaç var |
| Büyük dosya indirmeleri | 5-20 | Band Wide-bound, CPU-bound değil |
10 eş zamanlı istekle başlayın ve başarı oranlarını takip ederken yavaş yavaş artış. Başarı oranınız %90'ın altına düşerse, koncurrency veya istekler arasında gecikmeler ekleyin. Bu metrikleri takip etmek için daha fazlası için, bizi gör Proxy Performance kılavuz.
Yeniden inşa edilebilir bir proxy soyutlama için, bakınız Bir Proxy Ortaware Katmanı. Son dönem kazı mimarisi için, okuyun Güvenilir bir avlama mimarisi tasarımı. Python SDK, Node SDKVe Go SDK Üretim hazır proxy entegrasyonu veya kontrol için ProxyHat pricing Başlamak için.






