2021-12-31 / Bartłomiej Kurek
Crawler - asynchronicznie (#5 - Cache)

Cache w naszym crawlerze to krótka klasa realizująca podstawowe działania na kolekcji adresów URL w bazie danych. W tym artykule omówimy szczegółowo implementację metod oraz testy jednostkowe.

Implementacja

Implementację omawiamy krok po kroku, co umożliwi szczegółową analizę kilku zagadnień w sposób przejrzysty i izolowany. Przyjrzenie się szczegółom od razu zapewni nam zrozumienie zakresu testów oraz wymogów środowiska testowego. Lepiej z góry wiedzieć do czego zmierzamy, niż najpierw robić, a później myśleć, refaktoryzować, poprawiać.

Sam interfejs klasy jest krótki i prosty. Oprócz samego __init__() klasa Cache implementuje trzy metody, które wykonują operacje na bazie danych.

import typing as T
import sqlite3


class Cache(Base):
    def __init__(self, dbconn: sqlite3.Connection, *args, **kwargs):
        ...

    def initdb(self):
        ...

    def store_links(self, links: list[str]):
        ...

    def store_response(self,
                       url: str,
                       status_code: T.Optional[int] = None,
                       headers: T.Optional[dict] = None):
        ...

Inicjalizacja obiektu: __init__()

W programie będziemy wykorzystywać pojedyncze połączenie do bazy danych, a jego obiekt będziemy przekazywać do tych komponentów, które go wymagają. Jest to najprostszy sposób na zapewnienie braku komplikacji w kodzie (unikamy tzw. spaghetti code). Zasada jest tutaj prosta: masz jakiś zasób, przekazujesz go dalej tam, gdzie jest potrzebny. Koniec, kropka. To zdecydowanie ułatwi nam też testowanie kodu przez eliminację zależności (inter-module dependencies).

Loose coupling:
Loose coupling mamy tym samym od razu zapewniony. Cache nie musi niczego "wiedzieć" o argumentach programu, inicjalizować połączeń i obsługiwać błędów z tym związanych. To samo dotyczyć będzie pozostałych komponentów. Obiekt bazy danych tworzymy w funkcji main() w programie, tam obsługujemy błędy z tym związane. Jeśli nie ma bazy danych/nie uda się połączyć, to w ogóle nie będziemy tworzyć obiektów, a program zakończymy informując użytkownia od razu o problemie z bazą danych.
Jeśli połączymy się z bazą danych, to pozostałym komponentom przekazujemy gotowy obiekt na zasadzie: "masz, używaj".
Gdybyśmy rozbili ten program na kilka modułów, to sam plik z modułem Cache nie będzie też wtedy musiał nawet importować innych modułów niezwiązanych z jego funkcjonalnością.

Obiekt cache otrzymuje zatem gotowe połączenie do bazy danych i przechowuje referencję do niego przez cały cykl swojego życia (dopóki istnieje /object lifetime/).

class Cache(Base):
    def __init__(self, dbconn: sqlite3.Connection, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.db = dbconn
        self.logger.debug(dbconn)

Inicjalizacja schematu bazy danych: initdb()

Schemat bazy danych musimy zainicjalizować. W tym programie nie korzystamy z żadnego ORM (object relational mapping), wykorzystujemy po prostu język SQL.
Schemat bazy danych zawiera jedną tabelę, a ona zawiera podstawowe pola. Program jest mały i nie przewidujemy w nim obecnie żadnych migracji schematu/struktury bazy danych. Możemy zatem zdać się na standard języka SQL, który umożliwia tworzenie tabel z klauzulą IF NOT EXISTS. Jeśli tabela już istnieje, to nic się nie stanie. Operacje, które w bezpieczny sposób możemy wykonywać wielokrotnie bez zmiany stanu, nazywamy "idempotentnymi" /idempotent/. "Idem" to "tożsamość". Stan jest "tożsamy".

Metoda zatem tworzy kursor do bazy danych, wykonuje polecenie/kwerendę (statement) SQL i zatwierdza zmiany (commit) na obiekcie połączenia. Python posiada specyfikację DBAPI, a moduły do obsługi baz danych w Python mają w zasadzie bardzo podobny i spójny interfejs. Ma być "cursor", "execute", "commit", "rollback", a kursor ma implementować "fetchone", "fetchmany", "execute". Istnieje kilka implementacji nieco odrębnych, ale to rozważymy przy okazji innych programów.

class Cache(Base):
    ...
    def initdb(self):
        schema = (
            """
            CREATE TABLE IF NOT EXISTS urls(
                url_id INTEGER PRIMARY KEY AUTOINCREMENT,
                url TEXT NOT NULL UNIQUE,
                ctime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
                mtime TIMESTAMP DEFAULT NULL,
                content_type TEXT DEFAULT NULL,
                status_code INTEGER DEFAULT NULL,
                size INTEGER DEFAULT NULL
            );
            """
        )

        self.logger.debug("Creating schema")
        cur = self.db.cursor()
        cur.execute(schema)
        cur.connection.commit()

Kursor powstaje z obiektu połączenia i zawiera jego uchwyt w sobie (cur.connection). Implementacja zgodna z DBAPI stosuje tę samą zasadę, o której wspominaliśmy wcześniej: masz zasób, przekazujesz go dalej tam, gdzie jest potrzebny.

Zapis linków jest trywialny. Metoda otrzymuje listę obiektów typu str (napisów), a następnie wykonuje zapis wszystkich jednocześnie. Kluczową sprawą jest fakt, że pole url (kolumnę w tabeli urls) zdefiniowaliśmy jako unikalne, a zatem w przypadku konfliktu ("rekord z tym URL już istnieje w bazie") musimy taką sytuacją obsłużyć. Chcemy wyrazić: jeśli wystąpi konflikt na polu url, to nic nie rób. Dlatego w zapytaniu SQL widzimy "ON CONFLICT (...) DO NOTHING". Całość przeanalizujemy jeszcze poniżej.

Wykorzystujemy tutaj metodę executemany() dostępną w obiekcie kursora, która umożliwia zapis wielu obiektów jednym wywołaniem (nie tworzymy własnej pętli, a niektóre bazy umożliwiają tutaj optymalizację /prepared statements/). Po zapisie zatwierdzamy całość wywołując commit() na obiekcie połączenia (cur.connection).

class Cache(Base):
    ...
    def store_links(self, links: list[str]):
        stmt = "INSERT INTO urls(url) VALUES(?) ON CONFLICT(url) DO NOTHING"
        cur = self.db.cursor()
        cur.executemany(stmt, [(link,) for link in links])
        cur.connection.commit()

Zapis odpowiedzi: store_response()

Odpowiedzi serwerów www zawierają kod HTTP (liczbowy: 1xx, 2xx, 3xx, 4xx, 5xx), oraz nagłówki tej odpowiedzi. Nagłówki zawierają informacje o serwerze, połączeniu, dokumencie. W tym crawlerze zapisujemy jedynie kod HTTP (status), rozmiar dokumentu (Content-Length) oraz rodzaj dokumentu (Content-Type). Obiekty odpowiedzi (response) zawierają słownik headers, a słownik ten umożliwia nam pobieranie danych bez uwzględniania wielkości znaków w kluczach (case-insensitivity). Możemy zatem poprosić słownik o wartość pod kluczem content-type, Content-Type, czy ConTENt-TYPe. Nie ma to znaczenia, odniesiemy się w tym przypadku do tego samego nagłówka.

Conflict resolution.
W bazie zapewne będzie już istniał rekord dla tego url (scheduler jakoś musiał zakolejkować zadanie), a samą stronę mogliśmy odwiedzać już wielokrotnie wcześniej. Przy ponowionej próbie dodania istniejącego już rekordu (INSERT) wystąpi więc błąd (unique violation). Konflikt ten rozwiążemy przez aktualizację danych (zasób się zmienił, teraz pod tym url jest dokument tego typu, o tym rozmiarze, a serwer zwraca taki kod HTTP). Różne dialekty SQL pozwalają wykonać taką operacją (UPSERT) przy użyciu różnej składni: INSERT OR IGNORE, REPLACE INTO, itp. My używamy standardowej składni: INSERT INTO ... ON CONFLICT (field) DO .... Tutaj wykorzystujemy SQLite, a filozofia SQLite ma slogan (WWPD - "What would Postgres do?"). W przypadku konfliktu silnik bazy danych umieszcza odrzucone pola w krotce EXCLUDED, a zatem w przypadku konfliktu ON CONFLICT dokonujemy aktualizacji pól rekordu (DO UPDATE) i ustawiamy zadane pola (SET field = ...) na konkretne wartości (tutaj te EXCLUDED).
Pole mtime (data ostatniej modyfikacji) ustawiamy na CURRENT_TIMESTAMP. Scheduler będzie używał tego pola do określenia czy zasób jest wystarczająco stary aby kolejkować ponowne odwiedziny. Linki nowe, których nie odwiedzaliśmy to będą te, które w polu mtime mają NULL.

class Cache(Base):
    ...
    def store_response(self,
                       url: str,
                       status_code: T.Optional[int] = None,
                       headers: T.Optional[dict] = None):
        headers = (headers or {})
        stmt = (
            """
            INSERT INTO urls(url, content_type, status_code, size, mtime)
            VALUES(?, ?, ?, ?, CURRENT_TIMESTAMP)
            ON CONFLICT(url) DO UPDATE SET
                content_type=EXCLUDED.content_type,
                status_code=EXCLUDED.status_code,
                size=EXCLUDED.size,
                mtime=CURRENT_TIMESTAMP
            """
        )

        record = (
            url,
            headers.get("content-type"),
            status_code,
            (int(headers.get("content-length", 0)) or None),
        )
        cur = self.db.cursor()
        cur.execute(stmt, record)
        cur.connection.commit()

Testy

Szkielet testu

Przyjrzyjmy się samemu szkieletowi klas testów, pozwoli nam to bez przerażenia szybko zrozumieć zakres prac. Do przetestowania mamy tworzenie schematu, zapis linków, zapis już istniejąch linków oraz zapis/aktualizację danych odpowiedzi. Cache od niczego nie zależy, wymaga jedynie uchwytu do bazy danych, nie mamy zatem tutaj również żadnych konkretnych danych testowych (fixtures).

class TestCacheBasics(unittest.TestCase):
    def test_init(self):
        ...

class TestCacheOperations(unittest.TestCase):
    def setUp(self):
        ...

    def tearDown(self):
        ...

    def test_initdb(self):
        ...

    def test_store_links(self):
        ...

    def test_store_links_duplicates(self):
        """Checks if store_links() ignores already existing links"""
        ...

    def test_store_response(self):
        """Checks storing a response. Repeats it twice to check UPDATE."""
        ...

Test: Cache.init()

Przy inicjalizacji obiektu typu Cache sprawdzamy jedynie czy obiekt znajduje się w naszej hierarchii klas (dziedziczy z naszego Base), czy zapisuje w sobie uchwyt/obiekt bazy danych oraz czy logowanie działa poprawnie. Niczego specyficznego tutaj nie ma. SQLite pozwala tworzyć bazy danych w pamięci (jako ścieżkę przekazujemy :memory:), a zatem nie mamy tutaj nawet żadnego problemu związanego z filesystemem (katalogami/plikami).

W teście tworzymy obiekt połączenia i przekazujemy go do Cache przy tworzeniu obiektu.

class TestCacheBasics(unittest.TestCase):
    def test_init(self):
        dbconn = sqlite3.connect(":memory:")
        with self.assertLogs(level=logging.DEBUG) as logs:
            obj = Cache(dbconn)
        self.assertIsInstance(obj, Base, "Inherits Base")
        self.assertIsInstance(obj.db, type(dbconn), "Stores db connection")

        self.assertEqual(len(logs), 2)
        names = set([r.name for r in logs.records])
        self.assertEqual(names, set(["Cache"]), "Logger name")

        # check logs
        self.assertEqual(logs.records[0].message, "Initialized")
        expected = "sqlite3.Connection object at 0x"
        self.assertIn(expected, logs.records[1].message, "Logs db connection")

Testy operacji: setUp() / teardDown()

unittest opiera się na wzorcu metod szablonowych, wykorzystujemy tutaj ten fakt i w setUp() tworzymy nowy obiekt połączenia do bazy danych, a w tearDown() połączenie zamykamy.
Na obiekcie połączenia możemy ustawić row_factory jako sqlite3.Row, co sprawi, iż rekordy zwracane z bazy danych będziemy mogli traktować jak słowniki. Do pól rekordów (kolumn) będziemy odnosić się przez ich nazwy zamiast indeksów pozycyjnych (record["field_name"] zamiast record[0]).

class TestCacheOperations(unittest.TestCase):
    def setUp(self):
        super().setUp()
        self.dbconn = sqlite3.connect(":memory:")
        self.dbconn.row_factory = sqlite3.Row

    def tearDown(self):
        self.dbconn.close()
        super().tearDown()

Test: initdb()

Tutaj nic szczególnego się nie dzieje. Kiedy metoda testu jest uruchomiona, to musiał się odbyć wcześniej setUp(). W self mamy zatem uchwyt do bazy danych (self.dbconn). Tworzymy obiekt Cache, inicjalizujemy schemat (initdb()), a na koniec sprawdzamy czy zapytanie do bazy danych zostanie zrealizowane. Baza danych zawiera tylko jedną tabelę (urls). Jeśli tabela została utworzona, to SELECT się powiedzie i zwróci pustą listę rekordów, a w przeciwym razie testy zgłoszą błąd.

class TestCacheOperations(unittest.TestCase):
    def test_initdb(self):
        obj = Cache(self.dbconn)
        obj.initdb()

        # check db schema
        cur = self.dbconn.cursor()
        cur.execute("SELECT * FROM urls")
        self.assertEqual(cur.fetchall(), [], "Empty urls table")

Pierwsza część testu jest taka jak powyżej - tworzymy świeży schemat w bazie danych. Następnie generujemy kilka linków (tutaj 8) i wykonujemy metodę Cache.store_links() przekazując całą listę. Cache.store_links() powinno stworzyć rekordy w bazie danych. Następnie sami pobieramy dane z bazy, zliczamy rekordy, sprawdzamy czy są dokładnie takie, jakie zapisaliśmy. Na koniec sprawdzamy czy każdy nowy rekord zawiera domyślne wartości.

  • pole ctime (creation time) ma zawierać datę i czas
  • pozostałe pola mają być jeszcze nieuzupełnione /nigdy nie odwiedzaliśmy tych zasobów/.
class TestCacheOperations(unittest.TestCase):
        ...
    def test_store_links(self):
        obj = Cache(self.dbconn)
        obj.initdb()
        links = ["http://localhost/%d" % i for i in range(8)]
        obj.store_links(links)

        # check db records
        cur = self.dbconn.cursor()
        cur.execute("SELECT * FROM urls")
        records = cur.fetchall()
        self.assertEqual(len(records), len(links), "All links were stored")

        urls = [r["url"] for r in records]
        self.assertFalse(set(links).difference(urls), "Links")

        null_fields = ["mtime", "content_type", "status_code", "size"]
        for r in records:
            self.assertIsNotNone(r["ctime"], "ctime has a timestamp")
            # check expected null_fields are NULL
            for field in null_fields:
                self.assertIsNone(r[field], "Expected NULL: %s" % field)

Test: duplikaty rekordów

Tutaj praktycznie wszystko już umiemy. Generujemy linki (8), zapisujemy je wielokrotnie wywołując Cache.store_links(), sprawdzamy czy ich liczba jest zgodna z liczbą tych wygenerowanych (8).

class TestCacheOperations(unittest.TestCase):
        ...
    def test_store_links_duplicates(self):
        """Checks if store_links() ignores already existing links"""
        obj = Cache(self.dbconn)
        obj.initdb()
        links = ["http://localhost/%d" % i for i in range(8)]

        # Add first time
        obj.store_links(links)

        # Add second time
        obj.store_links(links)

        cur = self.dbconn.cursor()
        cur.execute("SELECT * FROM urls")
        records = cur.fetchall()
        self.assertEqual(len(records), len(links), "Ignores duplicates")

Test: store_response()

W tym teście przygotowujemy sobie słownik z danymi odpowiedzi. Tutaj również widzimy, że brak zależności zdecydowanie ułatwia nam testowanie. Cache zapisuje dane, ale nie musi znać struktury i typu oryginalnej odpowiedzi (w tym crawlerze to aiohttp.Response). Potrzebne są jedynie dane odpowiedzi.

Tworzymy zatem przykładowy słownik i zapisujemy odpowiedź używając Cache.store_response().

        response = dict(
            url="http://localhost",
            status_code=200,
            headers={
                "content-type": "text/html",
                "content-length": 123,
            }
        )

        obj.store_response(**response)

W dalszej części wybieramy dane z bazy i sprawdzamy co zawierają. Zapisujemy tutaj tylko jedną odpowiedź, zatem sprawdzamy zawartość wybranego rekordu.

Duplikaty:
Po sprawdzeniu danych tworzymy słownik z nowymi danymi nagłówków imitując nową odpowiedź (nowa wizyta crawlera). Ponownie zapisujemy odpowiedź i sprawdzamy czy rekord został zaktualizowany (nowe dane nagłówków, zaktualizowany czas modyfikacji /pole mtime/).

class TestCacheOperations(unittest.TestCase):
        ...
    def test_store_response(self):
        """Checks storing a response. Repeats it twice to check UPDATE."""
        obj = Cache(self.dbconn)
        obj.initdb()

        # NOTE: lowercase keys.
        # Regular headers in a response are a 'case-insensitive mapping'.
        response = dict(
            url="http://localhost",
            status_code=200,
            headers={
                "content-type": "text/html",
                "content-length": 123,
            }
        )

        # FIRST TIME
        obj.store_response(**response)

        cur = self.dbconn.cursor()
        cur.execute("SELECT * FROM urls")
        records = cur.fetchall()
        self.assertEqual(len(records), 1, "Stored")

        # check record values
        r = records[0]
        self.assertEqual(
            r["content_type"],
            response["headers"]["content-type"],
            "Stores content type"
        )

        self.assertEqual(
            r["size"],
            response["headers"]["content-length"],
            "Stores size (content-length)"
        )

        self.assertTrue(r["mtime"], "mtime updated")

        last_mtime = r["mtime"]

        # SECOND TIME /update/
        time.sleep(1)  # mtime has to be "later"

        response["headers"].update({
            "content-type": "application/xml",
            "content-length": 234,
        })
        obj.store_response(**response)

        cur = self.dbconn.cursor()
        cur.execute("SELECT * FROM urls")
        records = cur.fetchall()
        self.assertEqual(len(records), 1, "Updated")

        # check record values
        r = records[0]
        self.assertEqual(
            r["content_type"],
            response["headers"]["content-type"],
            "Stores content type"
        )

        self.assertEqual(
            r["size"],
            response["headers"]["content-length"],
            "Stores size (content-length)"
        )

        self.assertNotEqual(r["mtime"], last_mtime, "mtime updated")

Uruchamiamy testy

$ python3 -m unittest discover -v . -k TestCache
test_init (tests.test_cache.TestCacheBasics) ... ok
test_initdb (tests.test_cache.TestCacheOperations) ... ok
test_store_links (tests.test_cache.TestCacheOperations) ... ok
test_store_links_duplicates (tests.test_cache.TestCacheOperations)
Checks if store_links() ignores already existing links ... ok
test_store_response (tests.test_cache.TestCacheOperations)
Checks storing a response. Repeats it twice to check UPDATE. ... ok

----------------------------------------------------------------------
Ran 5 tests in 1.014s

OK

Wszystko działa. Cache mamy za sobą.