2021-12-19 / Bartłomiej Kurek
Crawler - asynchronicznie (#1 - setup / video demo)

Tworząc treść tego bloga zdarza mi się zamieścić nieprawidłowy link. Jest to zatem dobra okazja, aby poruszyć temat crawlingu podczas implementacji narzędzia usprawniającego moją własną pracę. Moim zadaniem jest odwiedzenie wszystkich wewnętrznych i zewnętrznych linków na tym blogu i zaraportowanie zasobów niepoprawnych, nieaktualnych, zwracających błędy. Oczywiście najprostszą implementację można stworzyć przy pomocy biblioteki requests w Pythonie. Stworzę jednak kod asynchroniczny, wykorzystujący m.in. biblioteki asyncio oraz aiohttp, oparty o kolejkę zadań, co pozwoli omówić kilka zagadnień dalece wykraczających poza zakres odpowiedzi na pytanie "jak pobrać stronę www w Pythonie?", zagłębić się nieco bardziej w protokół HTTP, przyjrzeć się kolejkowaniu zadań, czy też skorzystać z odrobiny możliwości języka SQL. Kod użyty w tej serii artykułów można znaleźć w repozytorium git.

W serii tej przejdziemy poszczególne etapy implementacji i testowania programu. Testy obejmować będą:

  • opcje programu (argparse)
  • interfejsów klas
  • testy jednostkowe komponentów (Cache, Scheduler, Downloader)
  • implementację prostej aplikacji webowej generującej testowe strony z linkami dla crawlera

Demo

Video: MP4, 3.2M, 1920x1080. Duration: 00:01:00 Link

Setup środowiska

Biblioteki:

$ cat requirements.txt
aiohttp
bs4
coloredogs

Venv:

$ python3 -m venv .venv
$ . .venv/bin/activate
$ pip install -r requirements.txt

Hierarchia katalogów

Program umieścimy w głównym katalogu. Cały kod programu znajdzie się w jednym pliku crawler.py, a kod testów jednostkowych znajdzie się w pakiecie tests.

.
├── crawler.py
├── requirements-dev.txt
├── requirements.txt
└── tests
    ├── __init__.py

Specyfikacja programu

Zanim przystąpimy do implementacji, warto zebrać wymagania i nakreślić szkic architekturalny. Pozwoli nam to podzielić kod na luźne, osobne komponenty, których implementacja i testowanie będą łatwiejsze. Im mniej zależności pomiędzy komponentami (loose coupling) tym łatwiej będzie nam tworzyć testy jednostkowe. Na każdym etapie implementacji i testów będę szczegółowo tłumaczył co i dlaczego robimy. Wpierw rzućmy jednak okiem na ogólny zarys programu i opowiedzmy sobie "z grubsza" co ma realizować, jak będziemy go uruchamiać oraz jak będziemy nim sterować (np. zatrzymywać go).

Założenia

  • możliwość przekazania opcji podczas uruchomienia
  • możliwość konfiguracji poziomu logowania (INFO/DEBUG)
  • cykliczne kolejkowanie stron z bazy danych (Scheduler)
  • równoczesne pobieranie wielu stron
  • pobieranie nagłówków stron i zapis informacji do bazy danych
  • pobieranie całych dokumentów jeśli spełniają kryteria (zawierają linki, nie przekraczają określonego rozmiaru)
  • pobieranie częściami (strumieniowe - unikamy pobierania "nieskończonego" strumienia danych /np. streaming danych kamery live przez http/)
  • możliwość zakończenia programu w dowolnym momencie sygnałem z klawiatury (Ctrl+C /SIGINT/) lub komendą kill (SIGTERM)
  • jeden proces

Opis działania

Program uruchamiamy przekazując mu opcje. Opcje są parsowane (argparse) i przekazywane dalej poszczególnym komponentom. Startowe adresy strony są:
a) przekazywane przy uruchomieniu (opcjonalnie)
b) cyklicznie pobierane z bazy danych (Scheduler).

Pobrane z bazy danych adresy stron trafiać będą do kolejki (asyncio.Queue), skąd pobierać będzie je wiele obiektów typu Downloader. Downloader pobierał będzie najpierw jedynie nagłówki (metoda HEAD protokołu HTTP), a następnie sprawdzi czy:

  • typ zasobu należy pobrać w całości w celu ekstrakcji linków (dokumenty text/html oraz application/xml)
  • rozmiar dokumentu określony w nagłówkach nie przekracza ustalonego limitu (opcja programu: size_limit_kb).

Dokumenty, które nie pasują do zadanych kryteriów będziemy pomijać.
Dane z nagłówków (headers) odpowiedzi dla metody HEAD (typ dokumentu, status odpowiedzi, rozmiar dokumentu) zapisywać będziemy od razu w bazie danych (Cache).

Dokumenty odpowiadające kryteriom (typ i rozmiar) będą nastepnie pobierane w całości (metoda GET protokołu HTTP), parsowane (BeautifulSoup) a z nich wybierzemy wszystkie osadzone w nich linki. Linki te dopiszemy do bazy danych jeśli będą one adresami wspieranych protokołów (http, https, ftp, mailto). W bazie danych umieścimy pole daty ostatniej modyfikacji rekordu (mtime), po czym będzie można rozpoznać:

  • czy dany zasób był już odwiedzany
  • czy należy go zakolejkować i odwiedzić, czy też nie (zupełnie nowy lub odwiedzany dawno temu)

Kryteriami będą tutaj:

  • brak daty modyfikacji (NULL)
  • data modyfikacji starsza niż 1h

Obiektów typu Downloader będzie wiele, dzięki czemu wiele stron pobieranych będzie równocześnie, a program pozwoli określić przy starcie liczbę obiektów typu Downloader.

Zakończenie programu

W programie zaimplementujemy obsługę sygnałów. Jeśli jednak Scheduler przez określony czas (max_idle_sec) nie odczyta z bazy danych adresów gotowych do zakolejkowania, uruchomi samoczynne zakończenie programu.

Logowanie

Program będzie logował komunikaty (logging), a poziom logowania określimy na starcie programu. Domyślnym poziomem będzie INFO, a w przypadku podania stosownej opcji przy uruchomieniu - poziom DEBUG.

Architektura

Program w znacznej części napiszemy obiektowo.
Wszystkie klasy w programie będą dziedziczyć klasę bazową Base, w której zaimplementujemy wspólne części interfejsu oraz logger.

Komponenty:

              [abc.ABC]
                  |
                  |
                [Base]
               /      \
              /        \
       [Cache]          [Worker]
                       /        \
                      /          \
           [Scheduler]            [Downloader]

Ustawienia domyślne:

DEFAULT_USER_AGENT = "Crawler"
DEFAULT_DB = ":memory:"
DEFAULT_SIZE_LIMIT_KB = 128
DEFAULT_CONCURRENCY = 32
DEFAULT_QUEUE_SIZE = DEFAULT_CONCURRENCY
DEFAULT_MAX_IDLE_SEC = 3
DEFAULT_SCHEDULING_INTERVAL_SEC = 1

Kod: szkic

Moduły, których będziemy używać.

import abc
import aiohttp
import argparse
import asyncio
import bs4
import coloredlogs
import logging
import signal
import sqlite3
import time
from urllib.parse import (
    urlparse,
    urljoin,
)

Klasa Base

Klasa bazowa Base: dziedziczy z abc.ABC choć sama nie wymusza konkretnego interfejsu. Tworząc obiekt loggera przekazujemy mu nazwę taką jaką zwróci metoda __str__ danego obiektu. Wykorzystujemy tutaj polimorfizm, a domyślna implementacja metody __str__ zwraca nazwę typu obiektu (nazwę jego klasy).

class Base(abc.ABC):
    def __init__(self, *args, **kwargs):
        self.logger = logging.getLogger(str(self))
        self.logger.debug("Initialized")

    def __str__(self):
        return self.__class__.__name__

Klasa bazowa Worker

Worker to klasa bazowa dla workerów: Scheduler, Downloader. Tutaj wymuszamy konkretny interfejs - klasy pochodne muszą implementować metodę _run(). Worker implementuje operator wywołania __call__, w który wywoływana jest metoda _run(). Ten wzorzec to Template method.

W __call__ dodaję jedynie dwa komunikaty - przed i po. W ten sposób klasa Worker definiuje jak obiekt workera będzie wywołany, natomiast szczegółową implementację pozostawia klasom pochodnym. Jest to wygodny wzorzec, podobny nieco do dekoratora, ale zrealizowany obiektowo. W tej implementacji nic szczególnego się nie dzieje, ale wzorzec ten pozwala uchwycić część wspólną w jednym miejscu. Tutaj dajemy workerom szansę na ewentualne asynchroniczne zainicjalizowanie dodatkowych zasobów (np. własne cache /choćby aioredis, itp./) oraz po zakończeniu _run() gwarantujemy wywołanie asynchronicznego zamknięcia ewentualnych zasobów (shutdown()). Oczywiście, tylko jeśli initialize() się powiedzie - jeśli czegoś nie uda się zainicjalizować - "niech się wysypie tak, żeby wszyscy widzieli".
Moglibyśmy tutaj - w razie potrzeby - zamieścić również ujednoliconą obsługę wyjątków właściwych dla hierarchii workerów. Dostarczamy domyślne implementacje initialize() oraz shutdown() i zostawiamy temat. Identyczny pattern znamy z frameworków testowych (setUp, tearDown, setUpClass, tearDownClass, asyncSetUp, asyncTearDown) takich jak unittest, jUnit, itp. Temat testów jednak jeszcze chwilowo odkładamy.

Bazowa, abstrakcyjna klasa Worker w całości:

class Worker(Base):
    async def initialize(self, *args, **kwargs): ...
    async def shutdown(self, *args, **kwargs): ...

    async def __call__(self, *args, **kwargs):
        self.logger.debug("Running")
        await self.initialize()
        try:
            await self._run(*args, **kwargs)
        finally:
            await self.shutdown()
        self.logger.debug("Done")

    @abc.abstractmethod
    async def _run(self, *args, **kwargs): ...

Klasa Cache

Zarys klasy Cache:

class Cache(Base):
    def __init__(self, dbconn: sqlite3.Connection, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.db = dbconn
        self.logger.debug(dbconn)

    def initdb(self):
        ...

    def store_links(self, links):
        ...

    def store_response(self, url, status_code=None, headers=None):
        ...

W tej klasie umieścimy większość operacji związanych z bazą danych:

  • initdb() - tworzenie schematu
  • store_links() - zapis adresów wyekstrahowanych przez Downloader
  • store_response() - zapis danych odpowiedzi (adres zasobu, status, nagłówki)

Metoda store_links() będzie idempotentna (jeśli link jest już w bazie, to nie funkcja nie zmienia stanu"), a store_response() będzie musiała zawsze aktualizować dane (zasób być może się zmienił, albo nie jest dostępny /inny status/).

Klasa Scheduler (worker)

class Scheduler(Worker):
    def __init__(self,
                 stop_ev: asyncio.Event,
                 cache: Cache,
                 url_queue: asyncio.Queue,
                 /,
                 schedule_interval_sec: float = 1,
                 max_idle_sec: float = 1):
        super().__init__()
        ...

    async def _run(self, *args, **kwargs):
        while not self.stop_ev.is_set():
            ...

Dla przypomnienia - zadania schedulera obejmują:

  • cykliczne (co określony interwał: (schedule_interval_sec)) pobieranie adresów z bazy danych
  • kolejkowanie adresów w url_queue
  • zatrzymanie programu (ustawienie stop_ev) po określonym czasie bezczynności (max_idle_sec)

Scheduler jako worker będzie się "kręcił" aż wydamy sygnał zakończenia programu, lub w bazie danych nie będzie adresów spełniających kryteria kolejkowania w celu odwiedzin.

Klasa Downloader (worker)

class Downloader(Worker):
    SEQ_ID = 0
    SUPPORTED_PROTOCOLS = ["http", "https", "ftp", "mailto"]
    SUPPORTED_CONTENT_TYPES = ["text/html", "application/xml"]

    def __init__(self,
                 stop_ev: asyncio.Event,
                 cache: Cache,
                 url_queue: asyncio.Queue,
                 http_client: aiohttp.ClientSession,
                 /,
                 whitelist=None,
                 size_limit_kb=None):
        super().__init__()
        ...

    def __str__(self):
        ...

    async def _run(self, *args, **kwargs):
        while not self.stop_ev.is_set():
            ...

    async def HEAD(self, job):
        ...

    async def GET(self, job):
        ...

    def extract_urls(self, url, headers, text):
        ...

    async def stream_content(self, response):
        ...

Tutaj będzie się dziać najwięcej. Downloader "będzie się kręcił" dopóki nie zostanie ustawiony stop_ev. W głównej pętli w _run() będzie pobierał zadania z kolejki url_queue, wykonywał żądania HTTP przy pomocy http_client, a zapis wyników będzie zlecał do cache. Dla każdego adresu pobranego z kolejki będzie pobierał nagłówki (HEAD()), następnie sprawdzi kryteria: domena na białej liście (whitelist) oraz rozmiar określony w odpowiedzi HEAD mieszczący się w limicie size_limit_kb*. Jeśli zasób spełni kryteria - nastąpi żądanie metodą GET(), dane zostaną pobrane strumieniowo (stream_content()) i w wyniku otrzymamy jeden z SUPPORTED_CONTENT_TYPES** (HTML lub XML). Jeśli uda się wyekstahować linki (extract_urls()) używając BeautifulSoup, to linki zapiszemy do bazy wywołując cache.store_links().

Klasa wyjątku DownloaderError

Jeśli pobieranie danych w Downloader przekroczy w pewnym momencie narzucony limit - zgłosimy własny wyjątek klasy DownloaderError.

class DownloaderError(Exception):
    ...

Tutaj w kwestii kodu nie ma nic więcej do dodania.

Funkcja main()

async def main(po: argparse.Namespace):
    # create a logger
    # create stop_ev
    # create db connection
    # create Cache and initialize the db
    # create url_queue
    # enqueue initial urls (program options)
    # setup signal handlers (SIGINT, SIGTERM)
    # initialize http client
    # initialize Scheduler
    # initialize Downloader objects
    # create asyncio tasks
    # wait for downloaders' completion
    # cancel scheduler task
    # close http client
    # close db connection
    ...

Funkcja main() przyjmie argumenty programu, utworzy wszystkie połączenia i klienty, zainstaluje obsługę sygnałów, stworzy zadania (asyncio.Task) workerów (jeden Scheduler, wiele Downloaderów), poczeka na zakończenie, zatrzyma scheduler, a na końcu pozamyka wszystkie klienty/połączenia.

Funkcja setup_logging

def setup_logging(color=False) -> None:
    ...

W tej funkcji skonfigurujemy logowanie.

Funkcja create_parser

def create_parser() -> argparse.ArgumentParser:
    ...

Tutaj stworzymy cały parser z opcjami programu.

Główna sekcja (__main__)

if __name__ == "__main__":
    parser = create_parser()
    args = parser.parse_args()
    setup_logging(args.color)
    asyncio.run(main(args))

Kiedy uruchomimy skrypt bezpośrednio, nastąpi:

  • storzenie parsera
  • parsowanie opcji programu
  • konfiguracja logowania
  • uruchomienie funkcji main()

Cały program będzie korzystał z wysokopoziomowego API asyncio.

Lista plików testów jednostkowych

tests/
├── __init__.py
├── test_base.py
├── test_cache.py
├── test_downloader.py
├── testing.py
├── test_parser.py
├── test_scheduler.py
└── test_worker.py

Testy podzielimy według komponentów. Ze względów praktycznych testy napiszemy przy użyciu unittest. W testach wykorzystamy zalety OOP, które umożliwią wygodną organizację i zapewnią przejrzystość kodu. Oczywiście dodatkową przewagą unittest jest fakt, że znajduje się w bibliotece standardowej Python. Wykorzystamy jednak również pytest do uruchomienia tych testów, tak jak w demostracji video.

Podsumowanie

W tej części omówiliśmy założenia i szkic projektu. Powyższa architektura ma na celu nie tylko stworzenie crawlera, ale również umożliwienie odkrycia wyzwań i dobrych praktyk, ciekawostek oraz zetknięcia się z różnymi zagadnieniami w procesach implementacji i testowania.

Zastosowania

Dopowiedzieć można, że stron nie musimy jedynie pobierać/monitorować. Możemy również dane wysyłać. Przykładem mogłyby być automaty licytujące na aukcjach internetowych lub systemy tradingowe (choć to już najpewniej nie protokół HTTP). Niemniej, podobne architektury oparte o kolejkowanie znajdziemy z pewnością w wielu systemach transakcyjnych (choćby masowe wysyłanie email/sms). Oczywiście to już inne technologie (amqp, rabbitmq, redis, kafka, elasticsearch, itd.), rozproszone infrastruktury, większe wdrożenia.

Kolejne kroki

W następnej części zajmiemy się pierwszą częścią implementacyjną, a testy do niej będziemy tworzyć w miarę równolegle do kodu, gdyż założony na początku loose coupling powinien nam zapewnić odpowiedni poziom separacji komponentów.