並行性制御でプロキシリクエストをスケーリング

プロキシベースのスクレイピングのためのマスターの通貨パターン: asyncio semaphores, Promise プール, ワーカープールを行きます, レートリミッター, バックプレッシャー. Python、Node.js、Go のプロダクションコード.

並行性制御でプロキシリクエストをスケーリング

なぜ、プロキシスクレイピングのためのコンポレーションコントロールマター

リクエストを順次送信する プロキシ 帯域幅および時間無駄。 プロキシゲートウェイ、ターゲットサーバー、独自のシステムを圧倒して、すべてを送信します。 並列制御は、プロキシプール、ターゲットサイトの許容範囲、利用可能なリソースの制限内で滞在しながら、スループットを最大化するバランスを打つ。

このガイドでは、Python(asyncio)、Node.js(Promise pool)、Go(semaphores)の3つの言語での生産レベルの通貨パターンをカバーしています。 すべての例は、 ProxyHatの回転住宅プロキシ あなたのプロジェクトにコピーする準備ができています。

並列制御の目標は簡単です:ブロックをトリガーしたり、メモリを排出したり、プロセスをクラッシュしたりすることなく、毎秒のリクエストを最大化します。 正しいパターンは、言語、ターゲットサイト、スケールによって異なります。

並列パターン比較

並列パターン比較
パターン用語集最高ののための最高のConcurrency
asyncio. セマフォアフィードバックI/O-bound スクレーピングプロセスごとの50-200
ワーカープール(asyncio)フィードバックバックプレッシャーでタスクキュー10-100 人の労働者
Promise.all + バッチ処理ノード.jsシンプルな平行フェッチプロセスごとの50-500
p-limit / pキューノード.js微粉砕された通貨キューごとの10-200
ゴルテイン + セマフォアおすすめ高スループットスクレイピング100-1000+の
ワーカープール(Go channel)おすすめ構造タスクの配分10-500 人の労働者

Python:非同期Semaphore

Pythonで最もシンプルで最も効果的な対立パターン。 セマフォアは、資源の排気を防止し、同時実行できるコルーチンの数を制限します。

import asyncio
import aiohttp
import uuid
import time
PROXY_GATEWAY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
MAX_CONCURRENCY = 50
TIMEOUT = aiohttp.ClientTimeout(total=30)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> dict:
    async with semaphore:
        session_id = uuid.uuid4().hex[:8]
        proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
        start = time.time()
        try:
            async with session.get(url, proxy=proxy, timeout=TIMEOUT) as response:
                body = await response.text()
                return {
                    "url": url,
                    "status": response.status,
                    "length": len(body),
                    "latency": round(time.time() - start, 3),
                }
        except Exception as e:
            return {"url": url, "error": str(e), "latency": round(time.time() - start, 3)}
async def scrape_all(urls: list[str]) -> list[dict]:
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
    return results
# Usage
urls = [f"https://example.com/product/{i}" for i in range(1000)]
results = asyncio.run(scrape_all(urls))
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(results)} successful")
print(f"Avg latency: {sum(r['latency'] for r in results) / len(results):.3f}s")

Python: Backpressureのワーカープール

より多くの制御を必要とするとき — レートの制限、圧力、または優先スケジューリング - asyncio でワーカープールを使用してください。 キュー。

import asyncio
import aiohttp
import uuid
class WorkerPool:
    """Fixed-size worker pool with backpressure via bounded queue."""
    def __init__(self, num_workers: int = 20, queue_size: int = 100):
        self.num_workers = num_workers
        self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
        self.results: list = []
        self.stats = {"success": 0, "failed": 0, "total_latency": 0.0}
        self._stop = False
    async def worker(self, session: aiohttp.ClientSession, worker_id: int):
        while not self._stop:
            try:
                url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
            except asyncio.TimeoutError:
                if self._stop:
                    break
                continue
            session_id = uuid.uuid4().hex[:8]
            proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
            import time
            start = time.time()
            try:
                async with session.get(
                    url, proxy=proxy,
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as response:
                    body = await response.text()
                    latency = time.time() - start
                    self.stats["success"] += 1
                    self.stats["total_latency"] += latency
                    self.results.append({
                        "url": url, "status": response.status,
                        "length": len(body), "worker": worker_id,
                    })
            except Exception as e:
                self.stats["failed"] += 1
                self.results.append({"url": url, "error": str(e), "worker": worker_id})
            finally:
                self.queue.task_done()
    async def run(self, urls: list[str]) -> list[dict]:
        async with aiohttp.ClientSession() as session:
            # Start workers
            workers = [
                asyncio.create_task(self.worker(session, i))
                for i in range(self.num_workers)
            ]
            # Feed URLs into the queue (backpressure: blocks when queue is full)
            for url in urls:
                await self.queue.put(url)
            # Wait for all tasks to complete
            await self.queue.join()
            self._stop = True
            # Cancel workers
            for w in workers:
                w.cancel()
        return self.results
# Usage
pool = WorkerPool(num_workers=30, queue_size=50)
urls = [f"https://example.com/item/{i}" for i in range(500)]
results = asyncio.run(pool.run(urls))
print(f"Success: {pool.stats['success']}, Failed: {pool.stats['failed']}")
avg_lat = pool.stats["total_latency"] / max(pool.stats["success"], 1)
print(f"Avg latency: {avg_lat:.3f}s")

Python: レートリミッタ

一部のターゲットは、厳格なレート制限を実施します。 このトークン・バック・レート・リミッターは、上の通貨パターンと統合します。

import asyncio
import time
class RateLimiter:
    """Token-bucket rate limiter for async operations."""
    def __init__(self, rate: float, burst: int = 1):
        """
        Args:
            rate: Requests per second
            burst: Maximum burst size
        """
        self.rate = rate
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()
    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now
            if self.tokens < 1:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1
# Combined with semaphore
async def fetch_rate_limited(session, url, semaphore, limiter):
    async with semaphore:
        await limiter.acquire()
        session_id = uuid.uuid4().hex[:8]
        proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
        async with session.get(url, proxy=proxy, timeout=TIMEOUT) as resp:
            return await resp.text()
# 10 requests/second, max 30 concurrent
async def main():
    semaphore = asyncio.Semaphore(30)
    limiter = RateLimiter(rate=10.0, burst=5)
    urls = [f"https://example.com/page/{i}" for i in range(200)]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_rate_limited(session, u, semaphore, limiter) for u in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    success = sum(1 for r in results if not isinstance(r, Exception))
    print(f"Done: {success}/{len(results)}")
asyncio.run(main())

Node.js: 約束のバッチ

Node.js の最も単純なパターンは、固定サイズのバッチで URL を処理します。

const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const BATCH_SIZE = 20;
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function fetchUrl(url) {
  const agent = createAgent();
  const start = Date.now();
  try {
    const response = await fetch(url, {
      agent,
      signal: AbortSignal.timeout(30000),
    });
    const text = await response.text();
    return {
      url,
      status: response.status,
      length: text.length,
      latency: Date.now() - start,
    };
  } catch (err) {
    return { url, error: err.message, latency: Date.now() - start };
  }
}
async function scrapeInBatches(urls) {
  const results = [];
  for (let i = 0; i < urls.length; i += BATCH_SIZE) {
    const batch = urls.slice(i, i + BATCH_SIZE);
    const batchResults = await Promise.all(batch.map(fetchUrl));
    results.push(...batchResults);
    const success = batchResults.filter(r => !r.error).length;
    console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${success}/${batch.length} OK`);
  }
  return results;
}
// Usage
const urls = Array.from({ length: 200 }, (_, i) =>
  `https://example.com/product/${i + 1}`
);
scrapeInBatches(urls).then(results => {
  const success = results.filter(r => !r.error).length;
  console.log(`Total: ${success}/${results.length} successful`);
});

Node.js:P-limit for Fine-Grained Control(ファイングリルコントロール)

手動のバッチ無しの精密なconcurrencyの限界のために、使用して下さい p-limit ライブラリ。

// npm install p-limit
const pLimit = require('p-limit');
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const limit = pLimit(30); // Max 30 concurrent requests
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function fetchWithLimit(url) {
  return limit(async () => {
    const agent = createAgent();
    const response = await fetch(url, {
      agent,
      signal: AbortSignal.timeout(30000),
    });
    return {
      url,
      status: response.status,
      body: await response.text(),
    };
  });
}
// All 500 URLs start immediately, but only 30 run concurrently
const urls = Array.from({ length: 500 }, (_, i) =>
  `https://example.com/item/${i + 1}`
);
Promise.all(urls.map(fetchWithLimit)).then(results => {
  const success = results.filter(r => r.status === 200).length;
  console.log(`Success: ${success}/${results.length}`);
});

Node.js: バックプレッシャーでワーカーキュー

// npm install p-queue
const PQueue = require('p-queue').default;
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const queue = new PQueue({
  concurrency: 25,
  intervalCap: 10,   // Max 10 requests...
  interval: 1000,    // ...per second (rate limiting)
});
queue.on('active', () => {
  console.log(`Active: ${queue.pending} pending, ${queue.size} queued`);
});
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function processUrl(url) {
  const agent = createAgent();
  const response = await fetch(url, { agent, signal: AbortSignal.timeout(30000) });
  return { url, status: response.status, body: await response.text() };
}
// Add URLs to the queue
const urls = Array.from({ length: 1000 }, (_, i) =>
  `https://example.com/page/${i + 1}`
);
const results = await Promise.all(
  urls.map(url => queue.add(() => processUrl(url)))
);
console.log(`Completed: ${results.filter(r => r.status === 200).length}/${results.length}`);

行く: セマフォアとゴルテイン

ゴルテインは軽量ですが、圧倒的なプロキシ接続を避けるために、まだ通貨を制限する必要があります。 チャネルベースの浮腫は慣習的なアプローチです。

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"
)
const maxConcurrency = 50
type Result struct {
	URL     string
	Status  int
	Length  int
	Latency time.Duration
	Error   error
}
func newProxyClient() *http.Client {
	b := make([]byte, 4)
	rand.Read(b)
	sessionID := hex.EncodeToString(b)
	proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
	proxyURL, _ := url.Parse(proxyStr)
	return &http.Client{
		Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
		Timeout:   30 * time.Second,
	}
}
func fetchURL(target string, sem chan struct{}, wg *sync.WaitGroup, results chan<- Result) {
	defer wg.Done()
	sem <- struct{}{}        // Acquire semaphore
	defer func() { <-sem }() // Release semaphore
	client := newProxyClient()
	start := time.Now()
	resp, err := client.Get(target)
	if err != nil {
		results <- Result{URL: target, Error: err, Latency: time.Since(start)}
		return
	}
	defer resp.Body.Close()
	body, _ := io.ReadAll(resp.Body)
	results <- Result{
		URL:     target,
		Status:  resp.StatusCode,
		Length:  len(body),
		Latency: time.Since(start),
	}
}
func main() {
	urls := make([]string, 500)
	for i := range urls {
		urls[i] = fmt.Sprintf("https://example.com/item/%d", i+1)
	}
	sem := make(chan struct{}, maxConcurrency)
	results := make(chan Result, len(urls))
	var wg sync.WaitGroup
	start := time.Now()
	for _, u := range urls {
		wg.Add(1)
		go fetchURL(u, sem, &wg, results)
	}
	// Close results channel when all goroutines finish
	go func() {
		wg.Wait()
		close(results)
	}()
	var success, failed int
	var totalLatency time.Duration
	for r := range results {
		if r.Error != nil {
			failed++
		} else {
			success++
			totalLatency += r.Latency
		}
	}
	elapsed := time.Since(start)
	fmt.Printf("Completed in %s\n", elapsed)
	fmt.Printf("Success: %d, Failed: %d\n", success, failed)
	fmt.Printf("Avg latency: %s\n", totalLatency/time.Duration(max(success, 1)))
	fmt.Printf("Throughput: %.1f req/s\n", float64(success+failed)/elapsed.Seconds())
}

Go: チャネルが付いている労働者のプール

より構造化された処理のために、チャネルから消費する労働者の固定プールを使用して下さい。

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"
)
type Job struct {
	URL string
}
type JobResult struct {
	URL     string
	Status  int
	Body    string
	Latency time.Duration
	Err     error
}
func worker(id int, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		b := make([]byte, 4)
		rand.Read(b)
		sessionID := hex.EncodeToString(b)
		proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
		proxyURL, _ := url.Parse(proxyStr)
		client := &http.Client{
			Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
			Timeout:   30 * time.Second,
		}
		start := time.Now()
		resp, err := client.Get(job.URL)
		latency := time.Since(start)
		if err != nil {
			results <- JobResult{URL: job.URL, Err: err, Latency: latency}
			continue
		}
		body, _ := io.ReadAll(resp.Body)
		resp.Body.Close()
		results <- JobResult{
			URL:     job.URL,
			Status:  resp.StatusCode,
			Body:    string(body),
			Latency: latency,
		}
	}
}
func main() {
	numWorkers := 30
	urls := make([]string, 300)
	for i := range urls {
		urls[i] = fmt.Sprintf("https://example.com/page/%d", i+1)
	}
	jobs := make(chan Job, len(urls))
	results := make(chan JobResult, len(urls))
	var wg sync.WaitGroup
	// Start workers
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}
	// Send jobs
	for _, u := range urls {
		jobs <- Job{URL: u}
	}
	close(jobs)
	// Collect results
	go func() {
		wg.Wait()
		close(results)
	}()
	var success, failed int
	for r := range results {
		if r.Err != nil {
			failed++
		} else {
			success++
		}
	}
	fmt.Printf("Success: %d, Failed: %d\n", success, failed)
}

正しい通貨レベルを選択する

最適な通貨は、いくつかの要因に依存します。 実用的な出発点ガイドは次のとおりです。

正しい通貨レベルを選択する
ターゲット タイプ推奨通貨レイソン
軽量API(JSON)50~200円速い応答、低い記憶ごとの要求
標準的なウェブページ20-50年応答サイズ、制限速度
ヘビーJSレンダリングページ2018年5月15日ブラウザのコンテキストは重要なメモリを使用します
攻撃的なアンチボットサイト5月10日リクエスト間の現実的なタイミングが必要
大型ファイルのダウンロード5-20の帯域幅-bound、CPU-boundではなく
成功率を監視しながら10の同時リクエストから徐々に増加。 90%未満の成功率が低下した場合は、対立性を低下させ、要求間の遅延を追加します。 これらのメトリクスを追跡するには、 プロキシのパフォーマンスの監視 ガイド。

組み込みの通貨で再使用可能なプロキシ抽象化については、参照してください プロキシミドルウェアレイヤーの構築. エンドツーエンドのスクレイピングアーキテクチャのために、読む 信頼性の高いスクレイピングアーキテクチャの設計. 探検する Python SDK, ノードSDKSDKについて 生産準備のプロキシ統合、またはチェックのため ProxyHat 価格 はじめに。

よくある質問

始める準備はできましたか?

AIフィルタリングで148か国以上、5,000万以上のレジデンシャルIPにアクセス。

料金を見るレジデンシャルプロキシ
← ブログに戻る