+1

Crawl Selenium Grid chạy Real Time Multithreading với Celery kết hợp Docker, Channels trong Django

Giới thiệu

Nếu quá trình crawl data trong trang web của bạn làm delay trang web, khiến trang web của bạn bị chậm và đứng đơ một lúc. Điều này xảy ra vì quá trang web phải đợi selenium crawl dữ liệu xong nó mới trả về cho mình dữ liệu. Bây giờ bạn muốn crawl dữ liệu chạy song song đồng với nó sẽ trả về trang web mỗi lần crawl được một sản phẩm, bài báo, người dùng. Hôm này mình sẽ chỉ các bạn làm điều này. image.png

Selenium Grid

Selenium Grid là một trong số các bộ testing tool của Selenium, nó cho phép chúng ta có thể chạy nhiều các kịch bản test trên nhiều máy, nhiều hệ điều hành và nhiều trình duyệt khác nhau trong cùng một lúc. image.png

Celery

Cần tây là một hàng đợi tác vụ không đồng bộ mã nguồn mở hoặc hàng đợi công việc dựa trên việc truyền thông điệp phân tán. Mặc dù nó hỗ trợ lập lịch trình, nhưng trọng tâm của nó là hoạt động trong thời gian thực. image.png

Channels

Với WebSockets (thông qua Django Channels) quản lý giao tiếp giữa máy khách và máy chủ. image.png

Redis

Làm người môi giới giữa celery và django, channels image.png

Cài đặt

Django, Package

pip install django
django-admin startproject app
django-admin startapp tutorial

Tạo file

app
├─ tutorial
│  ├─ __init__
│  ├─ admin.py
│  ├─ apps.py
│  ├─ models.py
│  ├─ getData.py
│  ├─ tasks.py
│  ├─ tests.py
│  ├─ urls.py
│  └─ views.py 
├─ app
│  ├─ __init__
│  ├─ asgi.py
│  ├─ celery.py
│  ├─ consumers.py
│  ├─ routing.py
│  ├─ setting.py
│  ├─ urls.py
│  └─ wsgi.py 
├─ static
├─ media
├─ templates
│  └─ getdata.html 
├─ docker-compose.yml
├─ Dockerfile
├─ manage.py
└─ requirements.txt

Tạo 1 file requirements.txt trong app

beautifulsoup4==4.9.3
bs4==0.0.1
celery==5.1.2
channels==3.0.4
daphne==3.0.2
Django==3.2.6
django-celery-results==2.2.0
ftfy==6.0.3
numpy==1.21.1
pandas==1.3.1
Pillow==8.3.1
psycopg2==2.9.1
psycopg2-binary==2.9.1
redis==3.5.3
selenium==3.141.0
soupsieve==2.2.1
sqlparse==0.4.1
urllib3==1.26.6
webdriver-manager==3.4.2
channels-redis==3.3.0

Và gõ lệnh sau

pip install -r requirements.txt

Docker

Chỉnh sửa Dockerfile

FROM python:3

ENV PYTHONUNBUFFERED=1

WORKDIR /code
COPY requirements.txt /code/

RUN apt-get update \
    && apt-get -y install libpq-dev gcc
RUN pip install -r requirements.txt
COPY . /code/

EXPOSE 8000
CMD ["python", "manage.py", "runserver", "0.0.0.0:8000"]

Chỉnh sửa docker-compose.yml. Chúng ta sử dụng thêm Redis làm người môi giới(broker message)

version: "3.9"
   
services:
  db:
    image: postgres
    container_name: postgres
    volumes:
      - ./data/db:/var/lib/postgresql/data
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    ports:
      - "5432:5432"
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: always

    
  selenium-hub:
    image: selenium/hub
    container_name: selenium-hub
    ports:
      - "4444:4444"
    environment:
        GRID_MAX_SESSION: 20
        GRID_BROWSER_TIMEOUT: 300
        GRID_TIMEOUT: 300

  chrome:
    image: selenium/node-chrome
    container_name: chrome
    depends_on:
      - selenium-hub
    environment:
      HUB_PORT_4444_TCP_ADDR: selenium-hub
      HUB_PORT_4444_TCP_PORT: 4444
      NODE_MAX_SESSION: 1
      NODE_MAX_INSTANCES: 1

  firefox:
    image: selenium/node-firefox
    container_name: firefox
    depends_on:
      - selenium-hub
    environment:
      HUB_PORT_4444_TCP_ADDR: selenium-hub
      HUB_PORT_4444_TCP_PORT: 4444
      NODE_MAX_SESSION: 10
      NODE_MAX_INSTANCES: 10

  redis:
    image: redis:alpine
    container_name: redis
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 2s
      timeout: 3s
      retries: 10

  celery:
    build: .
    container_name: celery
    command: celery -A app worker -l info
    volumes:
      - .:/code
    depends_on:
      - web
      - redis
      - db

  web:
    build: .
    container_name: web
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    environment: 
      - DEBUG=1
      - DJANGO_ALLOWED_HOST=localhost 127.0.0.1
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_healthy  
    command: bash -c "python manage.py makemigrations && python manage.py migrate && python manage.py runserver 0.0.0.0:8000"
    restart: on-failure

Code

Setting

Chỉnh sửa file setting.py


TEMPLATE_DIR = os.path.join(BASE_DIR, 'templates')
STATIC_DIR = os.path.join(BASE_DIR, 'static')
MEDIA_ROOT  = os.path.join(BASE_DIR, 'media')

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django_celery_results',
    'tutorial',
]

TEMPLATES = [
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [TEMPLATE_DIR],
        'APP_DIRS': True,
        'OPTIONS': {
            'context_processors': [
                'django.template.context_processors.debug',
                'django.template.context_processors.request',
                'django.contrib.auth.context_processors.auth',
                'django.contrib.messages.context_processors.messages',
            ],
        },
    },
]

WSGI_APPLICATION = 'app.wsgi.application'
ASGI_APPLICATION = 'app.asgi.application'

CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('redis', 6379)],
        },
    },
}


DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'postgres',
        'USER': 'postgres',
        'PASSWORD': 'postgres',
        'HOST': 'db',
        'PORT': 5432,
    }
}

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

Channels

Chỉnh sửa file asgi.py trong app

For more information on this file, see
https://docs.djangoproject.com/en/3.2/howto/deployment/asgi/
"""

import os

from django.core.asgi import get_asgi_application


from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter

from app import routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            routing.ws_urlpatterns
        )
    )
})

Chỉnh sửa file consumers.py trong app

from time import sleep
from channels.generic.websocket import AsyncWebsocketConsumer
from crawl.getData import data_scrap
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
import threading
import json
from crawl.models import Product
import asyncio
from asgiref.sync import sync_to_async


class ProductsConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        print('connect')
        await self.accept()
        await self.channel_layer.group_add("product", self.channel_name)
        print(f"Kết nối vào {self.channel_name}")

    async def receive(self, text_data): 
        pass
    
    async def disconnect(self, close_code):
        await self.channel_layer.group_discard("product", self.channel_name)
        print(f"Thoát khỏi {self.channel_name}")

    async def send_data_products(self, event):
        await self.send(text_data=json.dumps({
            'product': event['product']
        }))

    async def send_error_products(self, event):
        await self.send(text_data=json.dumps({
            'error': event['error']
        }))
        


class NotiConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        print('connect')
        await self.accept() 
        await self.channel_layer.group_add("noti", self.channel_name)
        print(f"Kết nối vào {self.channel_name}")

    async def receive(self, text_data): 
        pass
    
    async def disconnect(self, close_code):
        await self.channel_layer.group_discard("noti", self.channel_name)
        print(f"Thoát khỏi {self.channel_name}")

    async def send_message(self, event):
        await self.send(text_data=json.dumps({
            'message': event['message']
        }))

    async def send_error_message(self, event):
        await self.send(text_data=json.dumps({
            'error': event['error']
        }))

Chỉnh sửa routing.py trong app

from django.urls import path

from . import consumers

ws_urlpatterns = [
    path('ws/getdata/', consumers.ProductsConsumer.as_asgi()),
]

Celery

Chỉnh sửa file init trong thư mục app

#app/__init__.py
from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

chỉnh sửa file celery.py

from __future__ import absolute_import, unicode_literals

import os
from celery import Celery
from celery.schedules import crontab

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')

app = Celery('app')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

Getdata

chỉnh sửa file getData.py trong tutorial

import time
from selenium import webdriver
from .models import *
# Import packages
from selenium import webdriver  
from bs4 import SoupStrainer
from bs4 import BeautifulSoup
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from .models import Product
from  .tasks import *
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync


def data_scrap():
    channel_layer = get_channel_layer()
    driver = webdriver.Remote("http://selenium-hub:4444/wd/hub", DesiredCapabilities.FIREFOX)
    driver.get("https://github.com/giakinh0823?tab=repositories")
    time.sleep(2)
    htmlSource = driver.page_source
    only_class = SoupStrainer("div", {"id": "user-repositories-list"})
    list_product = BeautifulSoup(htmlSource, "html.parser", parse_only=only_class)
    for item in list_product.findAll("h3", {"class": "wb-break-all"}):
        name = str(item.find("a", attrs={"itemprop": "name codeRepository"}).text)
        async_to_sync(channel_layer.group_send)(
            'product',
            {
                'type': 'send_data_products',
                'product': name,
            }
        )
        product = Product.objects.create(name =name)
        product.save()
        time.sleep(2)
    driver.quit()

Tasks

Tạo và chỉnh sửa file tasks.py trong thư mục tutorial

from __future__ import absolute_import, unicode_literals

from celery import shared_task
from .models import *

@shared_task(name="get_data_child")
def getDataProductChild():
    from .getData import data_scrap
    data_scrap()

@shared_task(name="get_data")
def getDataProduct():
    getDataProductChild.delay()
    return True

Tutorial

Chỉnh sửa file urls.py trong app

from django.contrib import admin
from django.urls import path,include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('', include("tutorial.urls")),
]

Tạo và thêm file urls.py trong tutorial

from django.urls import path,include
from . import views

app_name = 'tutorial'

urlpatterns = [
    path('',  views.crawl, name="total"),
    path('getdata/',  views.getdata, name="get_data"),
    path('delete/',  views.delete, name="get_data"),
]

Chỉnh sửa file views.py trong tutorial

from django.http.response import HttpResponse
from django.shortcuts import render
from .tasks import getDataProduct

def crawl(request):
    products = Product.objects.all()
    return render(request, 'getdata.html', {"products": products})
    
def getdata(request):
    getDataProduct.delay()
    return JsonResponse({})
   
def delete(request):
    Product.objects.all().delete()
    return JsonResponse({})

Templates

Chỉnh sử file getdata.html

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Document</title>
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.5.1/jquery.min.js"></script>
</head>

<body>
            <h1>Crawl data</h1>
            <button id="delete" onClick="deleteData()">Delete Data</button>
            <button id="celery" onClick="getdata()">get data celery</button>
            <ul id="list_product">
                {% for product in products %}
                <li>{{product.name}}</li>
                {% endfor %}
            </ul>
    <script>
       
        const deleteData = () => {
            $.ajax({
                type: "GET",
                url: 'delete/',
                data: '',
                dataType: 'json',
                success: function (data) {
                    console.log(data)
                    document.getElementById("list_product").innerHTML = ""
                }
            });
        }

        const getdata = () => {
            $.ajax({
                type: "GET",
                url: 'getdata/',
                data: '',
                dataType: 'json',
                success: function (data) {
                    console.log(data)
                }
            });
        }


        const getdata = `ws://127.0.0.1:8000/ws/getdata/`
        const socketGetdata = new WebSocket(getdata)

        socketGetdata.onopen = function (e) {
            console.log("open", e);
        }

        socketGetdata.onmessage = function (e) {
            console.log("message", e)
            const data = JSON.parse(e.data);
            document.getElementById("list_product").innerHTML += ` <li>${data.product}</li>`
            console.log(data)
        }
        socketGetdata.onerror = function (e) {
            console.log("error", e)
        }
        socketGetdata.onclose = function (e) {
            console.log("close", e)
        }
    </script>
</body>
</html>

Kết quả

Giao diện

image.png

Celery

image.png

Selenium Grid

image.png

Channels send message

image.png

Build

docker-compose build
docker-compose up 

Bài viết đến đây là kết thúc. Chúc các bạn thành công


All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.