W tym artykule omawiamy kod i testy jednostkowe klasy Scheduler zaimplementowanej w naszym programie edukacyjnym: asyncio-crawler.
Scheduler
Scheduler w naszym crawlerze to minimalny mechanizm "harmonogramu" kolejkowania zadań.
Dla przypomnienia - jego zadania obejmują:
- cykliczne (co określony interwał: (schedule_interval_sec)) pobieranie adresów URL z bazy danych
- kolejkowanie tych adresów we współdzielonej przez workery kolejce url_queue
- zatrzymanie programu (ustawienie stop_ev) po określonym czasie bezczynności (max_idle_sec)
Scheduler działa w pętli, warunkiem wyjścia z tej pętli są następujące sytuacje:
- proces otrzymuje sygnał zakończenia programu
- w bazie danych nie ma adresów spełniających kryteria kolejkowania
Klasa Scheduler implementuje jedynie dwie metody: __init__()
oraz _run()
. Domyślną implementację wywołania (operator wywołania) implementuje już klasa bazowa Worker
, z której Scheduler dziedziczy.
Inicjalizacja
Scheduler podczas inicjalizacji otrzymuje:
- obiekt
asyncio.Event
, którego stan informuje o otrzymaniu przez proces sygnału zakończenia cache
- obiekt naszej klasyCache
implementujący pewne operacje wykonywane na bazie danychurl_queue
- kolejkę, w której Scheduler umieszcza kolejne zadaniaschedule_interval_sec
- interwał czasowy kontrolujący co ile sekund Scheduler sięga do bazy danych po kolejne adresy URLmax_idle_sec
- interwał czasowy, po którym Scheduler sam ustawia zdarzenie (stop_ev
) sygnalizując zakończenie programu (brak zadań).
class Scheduler(Worker):
def __init__(self,
stop_ev: asyncio.Event,
cache: Cache,
url_queue: asyncio.Queue,
/,
schedule_interval_sec: float = 1,
max_idle_sec: float = 1):
self.stop_ev = stop_ev
self.cache = cache
self.url_queue = url_queue
self.schedule_interval_sec = schedule_interval_sec
self.max_idle_sec = max_idle_sec
W samej metodzie nie mamy w zasadzie żadnych zachowań wymagających przetestowania. Można oczywiście sprawdzać argumenty i ich wartości, ale ograniczymy się tutaj do minimum.
Główna pętla: _run()
Tutaj będziemy mieć dwa główne przypadki. Test pierwszego z nich polegał będzie na sprawdzeniu czy Scheduler poprawne kolejkuje zadania. W drugim przypadku będziemy musieli sprawdzić czy Scheduler ustawia flagę zakończenia programu po ustalonym okresie bezczynności.
Kluczowe elementy omawianej pętli to:
1 2 3 4 5 6 7 8 9 10 11 |
|
- W gałęzi A wstawiamy pobrane adresy URL do kolejki zadań i zapamiętujemy aktualny czas jako czas ostatniej operacji zakończonej powodzeniem.
- W gałęzi B sygnalizujemy zakończenie programu. Ta gałąź jest wykonywana kiedy w bazie nie ma nowych adresów URL i od ostatniego pobrania danych z bazy minął czas określony argumentem
max_idle_sec
. W tym przypadku ustawiamy zdarzeniestop_ev
, którego stan warunkuje również wykonywanie się pozostałych workerów (downloaderów). - W gałęzi C wstawimy jedynie logowanie informacji, że nie ma żadnych nowych zadań do wykonania.
Na końcu pętli mamy asyncio.sleep()
. Bez tego scheduler najprawdopodobniej zmonopolizowałby event_loop
asyncio, a pozostałe workery nie miałyby szansy się wykonywać. asyncio.sleep()
jest blokujące, zatem do zatrzymania schedulera w sposób siłowy użyjemy asyncio.Task.cancel()
.
Cały kod funkcji.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
|
Analiza
Do pobierania danych z bazy używamy kwerendy SQL w formie CTE (common table expression). Przeanalizujmy samą kwerendę.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
Zaczynamy od wybrania rekordów, które mają pole mtime
ustawione na NULL (zasoby nieodwiedzone) lub odwiedzone były nie później niż godzinę temu (linia 6). Rekordy są w losowej kolejności (ORDER BY RANDOM()
), a ich liczba jest limitowana przekazanym argumentem.
W tej samej kwerendzie od razu aktualizujemy rekordy, które CTE wybierze. Aktualizujemy pole mtime
wartością obecnego czasu. Cały wynik zwracamy od razu (RETURNING *
).
Kolejność.
Rekordy zwracamy tutaj w losowej kolejności. W przypadku sprytniejszego crawlera chcielibyśmy prawdopodobnie wybierać rekordy pogrupowane i limitowane "per domena", aby nie obciążać serwerów, które odwiedzamy. To wymagałoby przechowywania osobno samych domen i referencji do nich w tabeli urls
. Obecna implementacja stanowi jednak poglądowe miniumum.
Transakcyjność zadań.
Rekordy są aktualizowane od razu (pole mtime
), gdyż w tej implementacji nie ma wymagania transakcyjności zadań/gwarancji wykonania. Kiedy dane pobierzemy z bazy i zakolejkujemy zadania w programie, to - zanim wszystkie zadania zostaną wykonane - program może zostać zatrzymany, a zasoby nie zostaną odwiedzone, mimo iż w bazie znajdzie się data wykonania (mtime
). W systemie transakcyjnym mielibyśmy dodatkowe pola informujące o dacie zakolejkowania oraz datę wykonania uzupełnianą kiedy zadanie faktycznie zostanie wykonane. Ze względu na prostotę implementacji i zakres użycia nie jest to zagadnieniem rozważanym na obecnym etapie. Program nie przewiduje również wznawiania prób wykonania zadań (retries).
Jeśli zakolejkujemy zadania i zatrzymamy program, to trudno. Sam program mógłby również przetwarzać kolejkę zadań do końca już po otrzymaniu sygnału zatrzymania. Zajmiemy się tym w przyszłości.
W obecnej implementacji zadania przerwane będą miały okazję wykonać się ponownie w późniejszym czasie ("+3600 seconds").
Testy
Tutaj docieramy do momentu, w którym do testów wprowadzimy własną klasę bazową. Workery w crawlerze wymagają obiektu Cache
oraz połączenia do bazy danych. Skoro będziemy wykonywać operacje na bazie danych, to będzie potrzebne również stworzenie jej schematu.
Do testów wprowadzamy więc nową klasę dziedziczącą z unittest.IsolatedAsyncioTestCase
. W metodzie setUp()
od razu przygotujemy stop_ev
, połączenie do bazy, stworzymy schemat w bazie danych oraz stworzymy obiekt Cache
używany przez pozostałe komponenty.
Klasy WorkerTestCase
użyjemy też w przyszłości przy testach Downloader
.
WorkerTestCase
class WorkerTestCase(unittest.IsolatedAsyncioTestCase):
def setUp(self):
super().setUp()
self.stop_ev = asyncio.Event()
self.dbconn = sqlite3.connect(":memory:")
self.dbconn.row_factory = sqlite3.Row
self.cache = Cache(self.dbconn)
self.cache.initdb()
def tearDown(self):
self.dbconn.close()
Baza umieszczona jest w pamięci, operacje są bardzo szybkie, zatem nie potrzebujemy również zachowywania stanu i struktury bazy danych pomiędzy przypadkami testowymi. Każdy przypadek testowy będzie działał na świeżym schemacie bazy danych bez żadnych rekordów.
W metodzie tearDown()
zamykamy połączenie do bazy, w tym momencie baza "znika".
Zakres testów
Zaczynamy od szkieletu kodu testów.
Inicjalizacja:
class TestScheduler(WorkerTestCase):
def test_init(self):
...
Operacje:
class TestSchedulerOperations(WorkerTestCase):
def _insert_urls(self, *urls):
...
async def _queue_consumer(self,
stop_ev,
url_queue,
result_queue,
/,
timeout=1,
total_expected=-1):
...
async def test_stop_when_idle(self):
...
async def test_reacts_to_stop(self):
...
async def test_scheduling_simple(self):
...
async def test_scheduling(self):
...
async def test_scheduling_visited(self):
...
Omówienie szkieletu.
W trakcie testów będziemy wstawiać rekordy do bazy (_insert_urls()
), a scheduler będzie je pobierał. Scheduler wstawia zadania do kolejki. Kolejka ma ograniczoną pojemność, więc coś musi te zadania z kolejki pobierać (_queue_consumer()
). Operacje schedulera związane są z czasem, musimy więc mieć również mechanizmy kontrolowania czasu przeznaczonego na te operacje.
Testy:
test_stop_when_idle
Testujemy czy scheduler sygnalizuje zakończenie programu, kiedy przez określony czas zabraknie nowych rekordów do pobrania.test_reacts_to_stop
Testujemy czy scheduler reaguje na sygnał stop (zdarzeniestop_ev
).test_scheduling_simple
Najprostszy przypadek - testujemy czy zadania w ogóle są kolejkowane.test_scheduling
Test kolejkowania wielu zadań w zadanym czasie. Scheduler wybiera zadania losowo, sprawdzamy czy wszystkie zostaną przetworzone jednokrotnie.test_scheduling_visited
Sprawdzamy czy po upływie określonego czasu adresy zostaną zakolejkowane do ponownego pobrania (odświeżenia).
Testujemy
test_init
Minimalny test: sprawdzamy inicjalizację obiektu typu Scheduler. Widzimy od razu argumenty, które za każdym razem musimy przekazać. W obiekcie testu mamy już przygotowane: event stop_ev
oraz obiekt Cache (self.cache
).
class TestScheduler(WorkerTestCase):
def test_init(self):
scheduler = Scheduler(
self.stop_ev,
self.cache,
asyncio.Queue(),
)
self.assertIsInstance(scheduler, Worker, "Is a Worker")
Metody pomocnicze
_insert_urls()
Zaczynamy od implementacji pomocniczej metody, której będziemy używać w celu dodania rekordów do bazy danych. Metoda otrzymuje adresy url i umieszcza je w bazie.
class TestSchedulerOperations(WorkerTestCase):
def _insert_urls(self, *urls):
cur = self.dbconn.cursor()
cur.executemany(
"INSERT INTO urls(url) VALUES(?)",
[(url,) for url in urls]
)
cur.connection.commit()
Task: _queue_consumer()
Metoda pomocnicza, której będziemy używać w testach jako zadania pobierającego dane z kolejki. Metoda będzie otrzymywać argument total_expected
, a następnie sprawdzać czy z kolejki url_queue
(kolejka zadań) zostały już pobrane wszystkie elementy. Metoda niczego nie zwraca, opróżnia jedynie kolejkę url_queue
, a elementy umieszcza w result_queue
. Zawartość result_queue
będziemy poźniej porównywać z oczekiwanymi danymi. Zadanie (task) konsumenta będzie kończyć się w momencie pobrania oczekiwanej liczby adresów URL, lub po zasygnalizowaniu zakończenia ustawieniem zdarzenia self.stop_ev
.
Interfejs kolejki asyncio.Queue()
posiada metody get()
i get_nowait()
. Ta pierwsza jest blokująca - oczekuje aż pojawią się dane. Ta druga nie czeka. Jeśli nie ma danych w kolejce, zgłasza wyjątek asyncio.QueueEmpty
. Jeśli zastosowalibyśmy tutaj url_queue.get_nowait()
, to brak danych w kolejce ("scheduler jeszcze niczego nie zakolejkował") spowodowałby zgłoszenie wyjątku. Wyjątek obsłużylibyśmy, a w bloku except
najpewnie musielibyśmy dodać asyncio.sleep()
lub inny mechanizm blokujący, aby uniknąć monopolizacji pętli asyncio (event_loop
) przez tę metodę. Dlatego używamy tutaj blokującego url_queue.get()
, ale opakowujemy go w funkcję asyncio.wait_for()
, która umozliwia przekazanie argumentu timeout
. Albo wcześniej będą dane w kolejce, albo po upływie czasu określonego argumentem timeout
nastąpi zgłoszenie wyjątku TimeoutError
. Uzyskujemy w ten sposób jednocześnie możliwość pobierania z kolejki wraz z możliwością przekazania maksymalnego okresu oczekiwania. asyncio.Queue()
nie implementuje metody get()
, która umożliwiałaby zrealizowanie tej operacji ("get with timeout") automatycznie.
class TestSchedulerOperations(WorkerTestCase):
...
async def _queue_consumer(self,
stop_ev,
url_queue,
result_queue,
/,
timeout=1,
total_expected=-1):
while not stop_ev.is_set():
if result_queue.qsize() == total_expected:
break
try:
job = await asyncio.wait_for(url_queue.get(), timeout=timeout)
url_queue.task_done()
result_queue.put_nowait(job)
except (asyncio.QueueEmpty, asyncio.TimeoutError):
pass
test_stop_when_idle()
Tutaj testujemy automatyczne zakończenie pętli schedulera kiedy w bazie przez okres max_idle_sec
nie ma nowych adresów URL.
Przebieg testu:
- tworzymy obiekt Scheduler, w
max_idle_sec
przekazujemy okres 1 sekundy - tworzymy nowy
Task
, w którym działa scheduler - oczekujemy nieco dlużej niż 1 sekundę na samoczynne wyjście schedulera
- sprawdzamy czy scheduler ustawił
stop_ev
z powodu braku danych - jeśli nie doczekaliśmy się zakończenia schedulera, to zostanie zgłoszony
TimeoutError
. Jeśli tak się stanie, to oznacza to, iż scheduler nie sygnalizuje zakończenia w sytuacji braku danych po określonym czasie. Zgłaszamy wtedy niepowodzenie testu (self.fail()
).
class TestSchedulerOperations(WorkerTestCase):
...
async def test_stop_when_idle(self):
url_queue = asyncio.Queue(maxsize=8)
scheduler = Scheduler(
self.stop_ev,
self.cache,
url_queue,
max_idle_sec=1,
)
task = asyncio.create_task(scheduler())
try:
await asyncio.wait_for(asyncio.gather(task), timeout=3)
self.assertTrue(self.stop_ev.is_set(), "Stop event is set.")
except (asyncio.TimeoutError, asyncio.CancelledError):
self.fail("Scheduler did not finish.")
test_reacts_to_stop()
W tej metodzie testujemy czy główna pętla schedulera obserwuje zdarzenie stop_ev
.
Przebieg testu:
- tworzymy obiekt Scheduler (jak poprzednio)
- tworzymy
Task
(task jest uruchamiany automatycznie po stworzeniu) - ustawiamy flagę
stop_ev
i oczekujemy na zakończenie schedulera - jeśli scheduler się nie zakończy (ma na to maksymalnie 3 sekundy) i zgłoszony zostanie wyjątek
TimeoutError
, to raportujemy błąd (self.fail()
)
W bloku finally
oczekujemy na zadanie bez względu na to, czy asyncio.gather()
już to wykonało, czy też nie. Poniżej samej metody testowej zamieszczam przykład tegoż await
na minimalnym kodzie.
class TestSchedulerOperations(WorkerTestCase):
...
async def test_reacts_to_stop(self):
url_queue = asyncio.Queue(maxsize=16)
scheduler = Scheduler(
self.stop_ev,
self.cache,
url_queue,
max_idle_sec=1
)
task = asyncio.create_task(scheduler())
self.stop_ev.set()
try:
await asyncio.wait_for(asyncio.gather(task), timeout=3)
except (asyncio.TimeoutError, asyncio.CancelledError):
self.fail("Scheduler did not finish.")
finally:
await task
await:
>>> import asyncio
>>> async def foo(): ...
...
>>> async def main():
... task = asyncio.create_task(foo())
... print(task)
... await task
... print(task)
... await task
... print(task)
...
>>> asyncio.run(main())
<Task pending name='Task-6' coro=<foo() running at <stdin>:1>>
<Task finished name='Task-6' coro=<foo() done, defined at <stdin>:1> result=None>
<Task finished name='Task-6' coro=<foo() done, defined at <stdin>:1> result=None>
Po stworzeniu zadanie jest od razu wykonywane, a obiekt zadania ma status "pending". Po pierwszym await
zadanie ma już status "finished", a kolejny await
niczego nie psuje.
test_scheduling_simple()
Podstawowy test operacji schedulera. Tworzymy obiekt, podajemy max_idle_sec=2
. Uruchamiamy zadanie, dodajemy do bazy nowy URL i oczekujemy, że scheduler zakolejkuje go wcześniej niż za sekundę. Jeśli tego nie zrobi, zgłaszamy błąd. W bloku finally
ustawiamy zdarzenie stop_ev
i oczekujemy na koniec zadania schedulera.
class TestSchedulerOperations(WorkerTestCase):
...
async def test_scheduling_simple(self):
url_queue = asyncio.Queue(maxsize=8)
scheduler = Scheduler(
self.stop_ev,
self.cache,
url_queue,
max_idle_sec=2,
)
task = asyncio.create_task(scheduler())
url = "http://localhost/1"
self._insert_urls(url)
try:
job = await asyncio.wait_for(url_queue.get(), timeout=1)
self.assertEqual(job["url"], url, "Queued url")
except (asyncio.TimeoutError, asyncio.CancelledError):
self.fail("Scheduler does not enqueue URLs.")
finally:
self.stop_ev.set()
await task
test_scheduling()
W tej metodzie testujemy "zwykłe" działanie schedulera. Dodajemy kilkaset adresów URL, uruchamiamy scheduler oraz konsumenta kolejki url_queue
. Czekamy zadany czas (max 5 sekund) na zakolejkowanie wszystkich zadań. Po tym czasie zatrzymujemy oba zadania i sprawdzamy rezultaty. Oczekujemy, iż w kolejce result_queue
znajdą się się wszystkie adresy URL i każdy z nich powinien być zakolejkowany tylko raz. Sprawdzamy zatem czy zakolejkowane przez scheduler i pobrane przez konsumenta adresy to dokładnie te adresy, które umieściliśmy w bazie danych. Jeśli którykolwiek adres URL jest powtórzony, lub któregokolwiek brakuje - zgłaszamy błąd. Maksymalny rozmiar kolejki ustawiamy na małą liczbę elementów, aby scheduler musiał wykonywać kolejkowanie partiami. W swojej pętli scheduler wykonuje blokującą operację url_queue.put()
. Jeśli kolejka chwilowo jest pełna (konsument nie odebrał jeszcze zadań), scheduler poczeka aż będzie mógł zakolejkować element. Do przetworzenia ma ich 512, maksymalny rozmiar kolejki to 4 elementy, a maksymalny czas na całość wynosi 5 sekund.
class TestSchedulerOperations(WorkerTestCase):
...
async def test_scheduling(self):
url_queue = asyncio.Queue(maxsize=4)
scheduler = Scheduler(
self.stop_ev,
self.cache,
url_queue,
schedule_interval_sec=0.01,
max_idle_sec=2,
)
total_jobs = 512
result_queue = asyncio.Queue()
urls = ["http://localhost/%d" % i for i in range(total_jobs)]
self._insert_urls(*urls)
scheduler_task = asyncio.create_task(scheduler())
consumer_task = asyncio.create_task(
self._queue_consumer(
self.stop_ev,
url_queue,
result_queue,
timeout=0.1,
total_expected=total_jobs
)
)
wait_interval = 0.005
max_wait_loops = 5 / wait_interval # 5 seconds / interval
try:
while max_wait_loops and result_queue.qsize() < total_jobs:
await asyncio.sleep(wait_interval)
max_wait_loops -= 1
self.stop_ev.set()
await asyncio.wait_for(
asyncio.gather(scheduler_task, consumer_task),
timeout=1
)
self.assertTrue(scheduler_task.done())
self.assertTrue(consumer_task.done())
self.assertEqual(result_queue.qsize(), total_jobs, "All schduled")
# make sure these are correct urls
jobs = []
while result_queue.qsize():
job = result_queue.get_nowait()
result_queue.task_done()
jobs.append(job["url"])
self.assertFalse(set(jobs) - set(urls), "All urls are correct")
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
self.fail("Scheduling is broken: %s" % e)
test_scheduling_visited()
W ostatnim teście sprawdzamy czy scheduler zakolejkuje odwiedzone adresy URL kiedy będą już wystarczająco "stare". Dodajemy URL, uruchamiamy scheduler, odbieramy rezultat. Następnie przestawiamy czas modyfikacji "pobranego" już zasobu i oczekujemy, że scheduler zakolejkuje go ponownie. Kod testu wygląda na długi, ale jest to po prostu "kopiuj-wklej". Przypadek jest na tyle prosty, że nie ma sensu generalizować tak małego fragmentu.
class TestSchedulerOperations(WorkerTestCase):
...
async def test_scheduling_visited(self):
url_queue = asyncio.Queue(maxsize=8)
result_queue = asyncio.Queue()
url = "http://localhost/1"
self._insert_urls(url)
scheduler = Scheduler(
self.stop_ev,
self.cache,
url_queue,
schedule_interval_sec=0.1,
max_idle_sec=2,
)
scheduler_task = asyncio.create_task(scheduler())
consumer_task = asyncio.create_task(
self._queue_consumer(
self.stop_ev,
url_queue,
result_queue,
timeout=0.1,
total_expected=1
)
)
try:
await asyncio.wait_for(consumer_task, timeout=1)
self.assertEqual(result_queue.qsize(), 1, "Got job")
result_queue.get_nowait()
result_queue.task_done()
# update processed url's mtime, so it's old
cur = self.dbconn.cursor()
cur.execute("UPDATE urls SET mtime = 0")
cur.connection.commit()
# start a consumer again
consumer_task = asyncio.create_task(
self._queue_consumer(
self.stop_ev,
url_queue,
result_queue,
timeout=0.1,
total_expected=1
)
)
await asyncio.wait_for(consumer_task, timeout=1)
self.assertEqual(result_queue.qsize(), 1, "Got job")
result_queue.get_nowait()
result_queue.task_done()
self.stop_ev.set()
except asyncio.TimeoutError as e:
self.fail("Scheduling is broken (aged tasks): %r" % e)
finally:
await scheduler_task
await consumer_task
Uruchamiamy testy
$ python3 -m unittest discover -v . -k TestScheduler
test_init (tests.test_scheduler.TestScheduler) ... ok
test_reacts_to_stop (tests.test_scheduler.TestSchedulerOperations) ... ok
test_scheduling (tests.test_scheduler.TestSchedulerOperations) ... ok
test_scheduling_simple (tests.test_scheduler.TestSchedulerOperations) ... ok
test_scheduling_visited (tests.test_scheduler.TestSchedulerOperations) ... ok
test_stop_when_idle (tests.test_scheduler.TestSchedulerOperations) ... ok
----------------------------------------------------------------------
Ran 6 tests in 3.734s
Podsumowanie
Testowanie tego kodu nie stwarza problemów.
Jak zazwyczaj - w testowaniu trudniej opracować scenariusze i mechanizmy niż napisać sam kod.
W następnej części omówimy implementację i testy ostatniego komponentu w obecnej wersji programu: klasy Downloader.