+5

Crawl hàng triệu page mỗi ngày với python

Crawling có lẽ không còn xa lạ với tất cả mọi người, rất dễ để bắt đầu lấy được data. Tuy nhiên khi gặp phải những thách thức như: crawl 1 vài triệu page mỗi ngày, tài nguyên thì ít mà muốn vít thật nhanh, đang crawl ngon lành thì recaptcha xuất hiện,... thì không phải ai cũng biết cách giải quyết. Mình cũng mạnh dạn viết một bài để chia sẻ một số cách tối ưu đúc rút từ kinh nghiệm của mình trong việc áp dụng python để crawl tối ưu hơn, cũng hi vọng bài viết thành nơi mọi người thảo luận để mình cũng học hỏi thêm.

Cố gắng tìm và sử dụng các API/request thay vì selenium và puppeteer

Hãy cố gắng tìm các endpoint (http request) mà các website sử dụng thay vì điều khiển trình duyệt bằng selenium (browser) để collect. Tuy để build request (tránh bị chặn) có lâu nhưng sự hiệu quả và tối ưu thì giá trị hơn nhiều lần. Bạn có thể tiết kiệm tới 100 lần chi phí cho server/worker collect data nếu sử dụng request thay vì collect bằng selenium (browser). Để tiện trong việc build lại request/api bằng python thì recommend mọi người capture request bằng Fiddler hoặc Charles, sau đó copy curl các request vào Postman để lấy mã python. Mục đích là lấy đầy đủ các headers của request (tránh bị chặn). Một số kinh nghiệm của mình để tránh bị chặn mọi người có thể tìm hiểu: Thay đổi headers useragent, thay đổi proxy, thay đổi user agent là Goolge Bot (một số site không set ratelimit cho google bot), bắt request từ mobile app (nhiều site chỉ rate limit web app nhưng không rate limit website),... Trong đó thực tế mình gặp rất nhiều trường hợp set rate limit cho mobile app ở ngưỡng cao hơn, thậm chí là không chặn nên mọi người có thể tìm cách bypass ssl pinning để capture traffic trên mobile app thử nha.

Sử dụng các thư viện bất đồng bộ trên python

Mình thấy đa phần mọi người dùng python thì ít khi dùng tới các thư viện bất đồng bộ. Mặc dù sử dụng các thư viện này (aiohttp, httpx, aiosonic,...) nhanh và giúp tiết kiệm CPU rất nhiều so với thư viện requests.

» python tests/performance.py
doing tests...
{
 "aiosonic": "1000 requests in 182.03 ms",
 "aiosonic cyclic": "1000 requests in 370.55 ms",
 "aiohttp": "1000 requests in 367.66 ms",
 "requests": "1000 requests in 4613.77 ms",
 "httpx": "1000 requests in 812.41 ms"
}
aiosonic is 101.97% faster than aiohttp
aiosonic is 2434.55% faster than requests
aiosonic is 103.56% faster than aiosonic cyclic
aiosonic is 346.29% faster than httpx

Performance của aiosonic và các thư viện http request trên python.

Hai cách implement các crawler mà mình hay sử dụng Cách 1: Áp dụng cho số lượng task cố định

import asyncio

import aiohttp


async def run_limit_worker(tasks, limit: int = 100):
    semaphore = asyncio.Semaphore(limit)

    async def sem_task(task):
        async with semaphore:
            return await task

    await asyncio.gather(*(sem_task(task) for task in tasks))


async def crawl_product(product):
    async with aiohttp.ClientSession() as client:
        async with client.get(f"https://example.com/api/product/{product['id']}") as resp:
            data = await resp.json()
            # todo: process your output here


async def run_all_workers():
    tasks = []

    products = [{"id": 1}, {"id": 2}, {"id": 3}]  # ect
    for product in products:
        tasks.append(crawl_product(product))

    await run_limit_worker(tasks, limit=100)


if __name__ == '__main__':
    asyncio.run(run_all_workers())

Cách 2: Cố định số worker, lấy task từ hệ thống.

import asyncio

import aiohttp


class CrawlWorker:
    def __init__(self, worker_id):
        self.worker_id = worker_id
        self.client = aiohttp.ClientSession()

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.client:
            await self.client.close()

    async def get_input(self):
        async with self.client.get(f"https://yourserver.com/get_input?worker_id={self.worker_id}") as resp:
            return resp.json()

    async def crawl(self, product):
        async with self.client.get(f"https://example.com/api/product/{product['id']}") as resp:
            data = await resp.json()
            return data

    async def process_output(self, output):
        async with self.client.post(f"https://yourserver.com/process_output?worker_id={self.worker_id}", json={
            "product": output
        }) as resp:
            return resp.json()

    async def run(self):
        while True:
            try:
                product = await self.get_input()
                output = await self.crawl(product)
                await self.process_output(output)
            except Exception as e:
                print(f"Worker {self.worker_id} error: {e}")


async def run_worker(worker_id):
    async with CrawlWorker(worker_id) as worker:
        await worker.run()


async def run_all_workers():
    tasks = []
    max_workers = 100
    for i in range(max_workers):
        tasks.append(asyncio.create_task(run_worker(i)))
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    asyncio.run(run_all_workers())

Giải captcha tự động như recaptcha v2, recaptcha v3, funcaptcha,...

Có một số site bắt buộc đăng nhập để xem bài viết, quá trình đăng nhập cần phải giải recaptcha v2, recaptcha v3 hoặc funcaptcha. Bạn cần tạo sẵn tài khoản và đăng nhập để lấy token thường xuyên nên cần tự động hoá quy trình này. Cách tốt nhất cũng là tìm cách build http request để tự động hoá quá trình này, tuy nhiên bạn cũng cần giải captcha tự động để lấy token khi gửi các request đi. Bản chất khi giải xong captcha thì các dịch vụ như recaptcha, funcaptcha sẽ tạo cho bạn 1 token tương ứng với website (thường là sitekey và siteurl). Khi đăng nhập hoặc đăng ký hay thực hiện một hành động nào đó ngoài gửi tài khoản/mật khẩu,... lên hệ thống bạn cần phải gửi cả token lên hệ thống. Mỗi token chỉ được dùng một lần. Vì vậy bạn cần tạo token tự động để tự động hoá quá trình đăng ký/đăng nhập,... Dưới đây mình hướng dẫn các bạn sử dụng Rockcaptcha để sinh captcha token tự động cho recaptcha v2, recaptcha v3 và funcaptcha.

import asyncio
from typing import Optional

import httpx

BASE_URL = "https://api.rockcaptcha.com"


async def get_fun_token(api_key, sitekey) -> Optional[dict]:
    async with httpx.AsyncClient() as client:
        create_resp = await client.get(
            f"{BASE_URL}/FunCaptchaTokenTask", params={
                "apikey": api_key,
                "sitekey": sitekey,
                "siteurl": "https://client-api.arkoselabs.com"
            }
        )
        if create_resp.json()["Code"] == 0:
            task_id = create_resp.json()["TaskId"]
            while True:
                try:
                    solve_resp = await client.get(f"{BASE_URL}/getresult", params={
                        "apikey": api_key,
                        "taskId": task_id
                    })
                    if solve_resp.status_code == 200:
                        if solve_resp.json()['Status'] == "ERROR":
                            raise RuntimeError(solve_resp.text)
                        if solve_resp.json()['Status'] == "SUCCESS":
                            return {
                                "task_id": task_id,
                                "token": solve_resp.json()["Data"]["Token"]
                            }
                except Exception:
                    raise RuntimeError("Get captcha error")
                await asyncio.sleep(0.2)
    raise RuntimeError("Get captcha error")


async def get_recaptcha_token(api_key, sitekey, siteurl) -> Optional[dict]:
    async with httpx.AsyncClient() as client:
        create_resp = await client.get(
            f"{BASE_URL}/FunCaptchaTokenTask", params={
                "apikey": api_key,
                "sitekey": sitekey,
                "siteurl": siteurl
            }
        )
        if create_resp.json()["Code"] == 0:
            task_id = create_resp.json()["TaskId"]
            while True:
                try:
                    solve_resp = await client.get(f"{BASE_URL}/getresult", params={
                        "apikey": api_key,
                        "taskId": task_id
                    })
                    if solve_resp.status_code == 200:
                        if solve_resp.json()['Status'] == "ERROR":
                            raise RuntimeError(solve_resp.text)
                        if solve_resp.json()['Status'] == "SUCCESS":
                            return solve_resp.json()["Data"]["Token"]
                except Exception:
                    raise RuntimeError("Get captcha error")
                await asyncio.sleep(0.2)
    raise RuntimeError("Get captcha error")


async def login(username, password):
    token = await get_fun_token("<YOUR_API_KEY>", "<SITEKEY>")
    async with httpx.AsyncClient() as client:
        await client.post("https://example.com/login", json={
            "username": username,
            "password": password,
            "captcha_token": token
        })


asyncio.run(login("<username>", "<password>"))

All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí