2022-01-08 / Bartłomiej Kurek
Crawler - asynchronicznie (#6 - Scheduler)

W tym artykule omawiamy kod i testy jednostkowe klasy Scheduler zaimplementowanej w naszym programie edukacyjnym: asyncio-crawler.

Scheduler

Scheduler w naszym crawlerze to minimalny mechanizm "harmonogramu" kolejkowania zadań.

Dla przypomnienia - jego zadania obejmują:

  • cykliczne (co określony interwał: (schedule_interval_sec)) pobieranie adresów URL z bazy danych
  • kolejkowanie tych adresów we współdzielonej przez workery kolejce url_queue
  • zatrzymanie programu (ustawienie stop_ev) po określonym czasie bezczynności (max_idle_sec)

Scheduler działa w pętli, warunkiem wyjścia z tej pętli są następujące sytuacje:

  • proces otrzymuje sygnał zakończenia programu
  • w bazie danych nie ma adresów spełniających kryteria kolejkowania

Klasa Scheduler implementuje jedynie dwie metody: __init__() oraz _run(). Domyślną implementację wywołania (operator wywołania) implementuje już klasa bazowa Worker, z której Scheduler dziedziczy.

Inicjalizacja

Scheduler podczas inicjalizacji otrzymuje:

  • obiekt asyncio.Event, którego stan informuje o otrzymaniu przez proces sygnału zakończenia
  • cache - obiekt naszej klasy Cache implementujący pewne operacje wykonywane na bazie danych
  • url_queue - kolejkę, w której Scheduler umieszcza kolejne zadania
  • schedule_interval_sec - interwał czasowy kontrolujący co ile sekund Scheduler sięga do bazy danych po kolejne adresy URL
  • max_idle_sec - interwał czasowy, po którym Scheduler sam ustawia zdarzenie (stop_ev) sygnalizując zakończenie programu (brak zadań).
class Scheduler(Worker):
    def __init__(self,
                 stop_ev: asyncio.Event,
                 cache: Cache,
                 url_queue: asyncio.Queue,
                 /,
                 schedule_interval_sec: float = 1,
                 max_idle_sec: float = 1):
        self.stop_ev = stop_ev
        self.cache = cache
        self.url_queue = url_queue
        self.schedule_interval_sec = schedule_interval_sec
        self.max_idle_sec = max_idle_sec

W samej metodzie nie mamy w zasadzie żadnych zachowań wymagających przetestowania. Można oczywiście sprawdzać argumenty i ich wartości, ale ograniczymy się tutaj do minimum.

Główna pętla: _run()

Tutaj będziemy mieć dwa główne przypadki. Test pierwszego z nich polegał będzie na sprawdzeniu czy Scheduler poprawne kolejkuje zadania. W drugim przypadku będziemy musieli sprawdzić czy Scheduler ustawia flagę zakończenia programu po ustalonym okresie bezczynności.

Kluczowe elementy omawianej pętli to:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
async def _run(self, *args, **kwargs):
    while not self.stop_ev.is_set():
        ...  # fetch records from db
        if records:  # branch A
            ...
        elif (time.monotonic() - last_success) > self.max_idle_sec:  # branch B
            ...
            break
        else:  # branch C
            ...
        await asyncio.sleep(self.schedule_interval_sec)
  • W gałęzi A wstawiamy pobrane adresy URL do kolejki zadań i zapamiętujemy aktualny czas jako czas ostatniej operacji zakończonej powodzeniem.
  • W gałęzi B sygnalizujemy zakończenie programu. Ta gałąź jest wykonywana kiedy w bazie nie ma nowych adresów URL i od ostatniego pobrania danych z bazy minął czas określony argumentem max_idle_sec. W tym przypadku ustawiamy zdarzenie stop_ev, którego stan warunkuje również wykonywanie się pozostałych workerów (downloaderów).
  • W gałęzi C wstawimy jedynie logowanie informacji, że nie ma żadnych nowych zadań do wykonania.

Na końcu pętli mamy asyncio.sleep(). Bez tego scheduler najprawdopodobniej zmonopolizowałby event_loop asyncio, a pozostałe workery nie miałyby szansy się wykonywać. asyncio.sleep() jest blokujące, zatem do zatrzymania schedulera w sposób siłowy użyjemy asyncio.Task.cancel().

Cały kod funkcji.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Scheduler(Worker):
    ...
    async def _run(self, *args, **kwargs):
         cte = (
             """
             WITH select_stmt AS (
                 SELECT url_id FROM urls U
                     WHERE (
                         U.mtime IS NULL
                         OR
                         datetime(U.mtime, "+3600 seconds") <= CURRENT_TIMESTAMP
                     )
                 ORDER BY RANDOM()
                 LIMIT ?
             )
             UPDATE urls SET mtime = CURRENT_TIMESTAMP WHERE url_id IN (
                 SELECT url_id FROM select_stmt
             ) RETURNING *
             """
         )
         last_success = time.monotonic()
         while not self.stop_ev.is_set():
             cur = self.cache.db.cursor()
             cur.execute(cte, (self.url_queue.maxsize,))
             cur.connection.commit()
             records = cur.fetchall()
             if records:
                 last_success = time.monotonic()
                 self.logger.debug("Scheduling %d url(s)", len(records))
                 for record in records:
                     await self.url_queue.put(dict(record))
             elif (time.monotonic() - last_success) > self.max_idle_sec:
                 self.logger.info("Nothing to do. Stop.")
                 self.stop_ev.set()
                 break
             else:
                 self.logger.debug("No urls to schedule.")
             await asyncio.sleep(self.schedule_interval_sec)

Analiza

Do pobierania danych z bazy używamy kwerendy SQL w formie CTE (common table expression). Przeanalizujmy samą kwerendę.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
WITH select_stmt AS (
     SELECT url_id FROM urls U
         WHERE (
             U.mtime IS NULL
             OR
             datetime(U.mtime, "+3600 seconds") <= CURRENT_TIMESTAMP
         )
     ORDER BY RANDOM()
     LIMIT ?
 )
 UPDATE urls SET mtime = CURRENT_TIMESTAMP WHERE url_id IN (
     SELECT url_id FROM select_stmt
 ) RETURNING *

Zaczynamy od wybrania rekordów, które mają pole mtime ustawione na NULL (zasoby nieodwiedzone) lub odwiedzone były nie później niż godzinę temu (linia 6). Rekordy są w losowej kolejności (ORDER BY RANDOM()), a ich liczba jest limitowana przekazanym argumentem.
W tej samej kwerendzie od razu aktualizujemy rekordy, które CTE wybierze. Aktualizujemy pole mtime wartością obecnego czasu. Cały wynik zwracamy od razu (RETURNING *).

Kolejność.
Rekordy zwracamy tutaj w losowej kolejności. W przypadku sprytniejszego crawlera chcielibyśmy prawdopodobnie wybierać rekordy pogrupowane i limitowane "per domena", aby nie obciążać serwerów, które odwiedzamy. To wymagałoby przechowywania osobno samych domen i referencji do nich w tabeli urls. Obecna implementacja stanowi jednak poglądowe miniumum.

Transakcyjność zadań.
Rekordy są aktualizowane od razu (pole mtime), gdyż w tej implementacji nie ma wymagania transakcyjności zadań/gwarancji wykonania. Kiedy dane pobierzemy z bazy i zakolejkujemy zadania w programie, to - zanim wszystkie zadania zostaną wykonane - program może zostać zatrzymany, a zasoby nie zostaną odwiedzone, mimo iż w bazie znajdzie się data wykonania (mtime). W systemie transakcyjnym mielibyśmy dodatkowe pola informujące o dacie zakolejkowania oraz datę wykonania uzupełnianą kiedy zadanie faktycznie zostanie wykonane. Ze względu na prostotę implementacji i zakres użycia nie jest to zagadnieniem rozważanym na obecnym etapie. Program nie przewiduje również wznawiania prób wykonania zadań (retries).
Jeśli zakolejkujemy zadania i zatrzymamy program, to trudno. Sam program mógłby również przetwarzać kolejkę zadań do końca już po otrzymaniu sygnału zatrzymania. Zajmiemy się tym w przyszłości.
W obecnej implementacji zadania przerwane będą miały okazję wykonać się ponownie w późniejszym czasie ("+3600 seconds").

Testy

Tutaj docieramy do momentu, w którym do testów wprowadzimy własną klasę bazową. Workery w crawlerze wymagają obiektu Cache oraz połączenia do bazy danych. Skoro będziemy wykonywać operacje na bazie danych, to będzie potrzebne również stworzenie jej schematu.
Do testów wprowadzamy więc nową klasę dziedziczącą z unittest.IsolatedAsyncioTestCase. W metodzie setUp() od razu przygotujemy stop_ev, połączenie do bazy, stworzymy schemat w bazie danych oraz stworzymy obiekt Cache używany przez pozostałe komponenty.
Klasy WorkerTestCase użyjemy też w przyszłości przy testach Downloader.

WorkerTestCase

class WorkerTestCase(unittest.IsolatedAsyncioTestCase):
    def setUp(self):
        super().setUp()
        self.stop_ev = asyncio.Event()
        self.dbconn = sqlite3.connect(":memory:")
        self.dbconn.row_factory = sqlite3.Row
        self.cache = Cache(self.dbconn)
        self.cache.initdb()

    def tearDown(self):
        self.dbconn.close()

Baza umieszczona jest w pamięci, operacje są bardzo szybkie, zatem nie potrzebujemy również zachowywania stanu i struktury bazy danych pomiędzy przypadkami testowymi. Każdy przypadek testowy będzie działał na świeżym schemacie bazy danych bez żadnych rekordów.
W metodzie tearDown() zamykamy połączenie do bazy, w tym momencie baza "znika".

Zakres testów

Zaczynamy od szkieletu kodu testów.

Inicjalizacja:

class TestScheduler(WorkerTestCase):
    def test_init(self):
        ...

Operacje:

class TestSchedulerOperations(WorkerTestCase):
    def _insert_urls(self, *urls):
        ...

    async def _queue_consumer(self,
                              stop_ev,
                              url_queue,
                              result_queue,
                              /,
                              timeout=1,
                              total_expected=-1):
        ...

    async def test_stop_when_idle(self):
        ...

    async def test_reacts_to_stop(self):
        ...

    async def test_scheduling_simple(self):
        ...

    async def test_scheduling(self):
        ...

    async def test_scheduling_visited(self):
        ...

Omówienie szkieletu.

W trakcie testów będziemy wstawiać rekordy do bazy (_insert_urls()), a scheduler będzie je pobierał. Scheduler wstawia zadania do kolejki. Kolejka ma ograniczoną pojemność, więc coś musi te zadania z kolejki pobierać (_queue_consumer()). Operacje schedulera związane są z czasem, musimy więc mieć również mechanizmy kontrolowania czasu przeznaczonego na te operacje.

Testy:

  • test_stop_when_idle
    Testujemy czy scheduler sygnalizuje zakończenie programu, kiedy przez określony czas zabraknie nowych rekordów do pobrania.
  • test_reacts_to_stop
    Testujemy czy scheduler reaguje na sygnał stop (zdarzenie stop_ev).
  • test_scheduling_simple
    Najprostszy przypadek - testujemy czy zadania w ogóle są kolejkowane.
  • test_scheduling
    Test kolejkowania wielu zadań w zadanym czasie. Scheduler wybiera zadania losowo, sprawdzamy czy wszystkie zostaną przetworzone jednokrotnie.
  • test_scheduling_visited
    Sprawdzamy czy po upływie określonego czasu adresy zostaną zakolejkowane do ponownego pobrania (odświeżenia).

Testujemy

test_init

Minimalny test: sprawdzamy inicjalizację obiektu typu Scheduler. Widzimy od razu argumenty, które za każdym razem musimy przekazać. W obiekcie testu mamy już przygotowane: event stop_ev oraz obiekt Cache (self.cache).

class TestScheduler(WorkerTestCase):
    def test_init(self):
        scheduler = Scheduler(
            self.stop_ev,
            self.cache,
            asyncio.Queue(),
        )
        self.assertIsInstance(scheduler, Worker, "Is a Worker")

Metody pomocnicze

_insert_urls()

Zaczynamy od implementacji pomocniczej metody, której będziemy używać w celu dodania rekordów do bazy danych. Metoda otrzymuje adresy url i umieszcza je w bazie.

class TestSchedulerOperations(WorkerTestCase):
    def _insert_urls(self, *urls):
        cur = self.dbconn.cursor()
        cur.executemany(
            "INSERT INTO urls(url) VALUES(?)",
            [(url,) for url in urls]
        )
        cur.connection.commit()

Task: _queue_consumer()

Metoda pomocnicza, której będziemy używać w testach jako zadania pobierającego dane z kolejki. Metoda będzie otrzymywać argument total_expected, a następnie sprawdzać czy z kolejki url_queue (kolejka zadań) zostały już pobrane wszystkie elementy. Metoda niczego nie zwraca, opróżnia jedynie kolejkę url_queue, a elementy umieszcza w result_queue. Zawartość result_queue będziemy poźniej porównywać z oczekiwanymi danymi. Zadanie (task) konsumenta będzie kończyć się w momencie pobrania oczekiwanej liczby adresów URL, lub po zasygnalizowaniu zakończenia ustawieniem zdarzenia self.stop_ev.

Interfejs kolejki asyncio.Queue() posiada metody get() i get_nowait(). Ta pierwsza jest blokująca - oczekuje aż pojawią się dane. Ta druga nie czeka. Jeśli nie ma danych w kolejce, zgłasza wyjątek asyncio.QueueEmpty. Jeśli zastosowalibyśmy tutaj url_queue.get_nowait(), to brak danych w kolejce ("scheduler jeszcze niczego nie zakolejkował") spowodowałby zgłoszenie wyjątku. Wyjątek obsłużylibyśmy, a w bloku except najpewnie musielibyśmy dodać asyncio.sleep() lub inny mechanizm blokujący, aby uniknąć monopolizacji pętli asyncio (event_loop) przez tę metodę. Dlatego używamy tutaj blokującego url_queue.get(), ale opakowujemy go w funkcję asyncio.wait_for(), która umozliwia przekazanie argumentu timeout. Albo wcześniej będą dane w kolejce, albo po upływie czasu określonego argumentem timeout nastąpi zgłoszenie wyjątku TimeoutError. Uzyskujemy w ten sposób jednocześnie możliwość pobierania z kolejki wraz z możliwością przekazania maksymalnego okresu oczekiwania. asyncio.Queue() nie implementuje metody get(), która umożliwiałaby zrealizowanie tej operacji ("get with timeout") automatycznie.

class TestSchedulerOperations(WorkerTestCase):
    ...
    async def _queue_consumer(self,
                              stop_ev,
                              url_queue,
                              result_queue,
                              /,
                              timeout=1,
                              total_expected=-1):
        while not stop_ev.is_set():
            if result_queue.qsize() == total_expected:
                break
            try:
                job = await asyncio.wait_for(url_queue.get(), timeout=timeout)
                url_queue.task_done()
                result_queue.put_nowait(job)
            except (asyncio.QueueEmpty, asyncio.TimeoutError):
                pass

test_stop_when_idle()

Tutaj testujemy automatyczne zakończenie pętli schedulera kiedy w bazie przez okres max_idle_sec nie ma nowych adresów URL.

Przebieg testu:

  • tworzymy obiekt Scheduler, w max_idle_sec przekazujemy okres 1 sekundy
  • tworzymy nowy Task, w którym działa scheduler
  • oczekujemy nieco dlużej niż 1 sekundę na samoczynne wyjście schedulera
  • sprawdzamy czy scheduler ustawił stop_ev z powodu braku danych
  • jeśli nie doczekaliśmy się zakończenia schedulera, to zostanie zgłoszony TimeoutError. Jeśli tak się stanie, to oznacza to, iż scheduler nie sygnalizuje zakończenia w sytuacji braku danych po określonym czasie. Zgłaszamy wtedy niepowodzenie testu (self.fail()).
class TestSchedulerOperations(WorkerTestCase):
    ...
    async def test_stop_when_idle(self):
        url_queue = asyncio.Queue(maxsize=8)

        scheduler = Scheduler(
            self.stop_ev,
            self.cache,
            url_queue,
            max_idle_sec=1,
        )

        task = asyncio.create_task(scheduler())
        try:
            await asyncio.wait_for(asyncio.gather(task), timeout=3)
            self.assertTrue(self.stop_ev.is_set(), "Stop event is set.")
        except (asyncio.TimeoutError, asyncio.CancelledError):
            self.fail("Scheduler did not finish.")

test_reacts_to_stop()

W tej metodzie testujemy czy główna pętla schedulera obserwuje zdarzenie stop_ev.

Przebieg testu:

  • tworzymy obiekt Scheduler (jak poprzednio)
  • tworzymy Task (task jest uruchamiany automatycznie po stworzeniu)
  • ustawiamy flagę stop_ev i oczekujemy na zakończenie schedulera
  • jeśli scheduler się nie zakończy (ma na to maksymalnie 3 sekundy) i zgłoszony zostanie wyjątek TimeoutError, to raportujemy błąd (self.fail())

W bloku finally oczekujemy na zadanie bez względu na to, czy asyncio.gather() już to wykonało, czy też nie. Poniżej samej metody testowej zamieszczam przykład tegoż await na minimalnym kodzie.

class TestSchedulerOperations(WorkerTestCase):
    ...
    async def test_reacts_to_stop(self):
        url_queue = asyncio.Queue(maxsize=16)
        scheduler = Scheduler(
            self.stop_ev,
            self.cache,
            url_queue,
            max_idle_sec=1
        )
        task = asyncio.create_task(scheduler())
        self.stop_ev.set()
        try:
            await asyncio.wait_for(asyncio.gather(task), timeout=3)
        except (asyncio.TimeoutError, asyncio.CancelledError):
            self.fail("Scheduler did not finish.")
        finally:
            await task

await:

>>> import asyncio
>>> async def foo(): ...
... 
>>> async def main():
...     task = asyncio.create_task(foo())
...     print(task)
...     await task
...     print(task)
...     await task
...     print(task)
... 
>>> asyncio.run(main())
<Task pending name='Task-6' coro=<foo() running at <stdin>:1>>
<Task finished name='Task-6' coro=<foo() done, defined at <stdin>:1> result=None>
<Task finished name='Task-6' coro=<foo() done, defined at <stdin>:1> result=None>

Po stworzeniu zadanie jest od razu wykonywane, a obiekt zadania ma status "pending". Po pierwszym await zadanie ma już status "finished", a kolejny await niczego nie psuje.

test_scheduling_simple()

Podstawowy test operacji schedulera. Tworzymy obiekt, podajemy max_idle_sec=2. Uruchamiamy zadanie, dodajemy do bazy nowy URL i oczekujemy, że scheduler zakolejkuje go wcześniej niż za sekundę. Jeśli tego nie zrobi, zgłaszamy błąd. W bloku finally ustawiamy zdarzenie stop_ev i oczekujemy na koniec zadania schedulera.

class TestSchedulerOperations(WorkerTestCase):
    ...
    async def test_scheduling_simple(self):
        url_queue = asyncio.Queue(maxsize=8)

        scheduler = Scheduler(
            self.stop_ev,
            self.cache,
            url_queue,
            max_idle_sec=2,
        )
        task = asyncio.create_task(scheduler())

        url = "http://localhost/1"
        self._insert_urls(url)
        try:
            job = await asyncio.wait_for(url_queue.get(), timeout=1)
            self.assertEqual(job["url"], url, "Queued url")
        except (asyncio.TimeoutError, asyncio.CancelledError):
            self.fail("Scheduler does not enqueue URLs.")
        finally:
            self.stop_ev.set()
            await task

test_scheduling()

W tej metodzie testujemy "zwykłe" działanie schedulera. Dodajemy kilkaset adresów URL, uruchamiamy scheduler oraz konsumenta kolejki url_queue. Czekamy zadany czas (max 5 sekund) na zakolejkowanie wszystkich zadań. Po tym czasie zatrzymujemy oba zadania i sprawdzamy rezultaty. Oczekujemy, iż w kolejce result_queue znajdą się się wszystkie adresy URL i każdy z nich powinien być zakolejkowany tylko raz. Sprawdzamy zatem czy zakolejkowane przez scheduler i pobrane przez konsumenta adresy to dokładnie te adresy, które umieściliśmy w bazie danych. Jeśli którykolwiek adres URL jest powtórzony, lub któregokolwiek brakuje - zgłaszamy błąd. Maksymalny rozmiar kolejki ustawiamy na małą liczbę elementów, aby scheduler musiał wykonywać kolejkowanie partiami. W swojej pętli scheduler wykonuje blokującą operację url_queue.put(). Jeśli kolejka chwilowo jest pełna (konsument nie odebrał jeszcze zadań), scheduler poczeka aż będzie mógł zakolejkować element. Do przetworzenia ma ich 512, maksymalny rozmiar kolejki to 4 elementy, a maksymalny czas na całość wynosi 5 sekund.

class TestSchedulerOperations(WorkerTestCase):
    ...
    async def test_scheduling(self):
        url_queue = asyncio.Queue(maxsize=4)

        scheduler = Scheduler(
            self.stop_ev,
            self.cache,
            url_queue,
            schedule_interval_sec=0.01,
            max_idle_sec=2,
        )

        total_jobs = 512
        result_queue = asyncio.Queue()

        urls = ["http://localhost/%d" % i for i in range(total_jobs)]
        self._insert_urls(*urls)

        scheduler_task = asyncio.create_task(scheduler())
        consumer_task = asyncio.create_task(
            self._queue_consumer(
                self.stop_ev,
                url_queue,
                result_queue,
                timeout=0.1,
                total_expected=total_jobs
            )
        )

        wait_interval = 0.005
        max_wait_loops = 5 / wait_interval  # 5 seconds / interval

        try:
            while max_wait_loops and result_queue.qsize() < total_jobs:
                await asyncio.sleep(wait_interval)
                max_wait_loops -= 1

            self.stop_ev.set()
            await asyncio.wait_for(
                asyncio.gather(scheduler_task, consumer_task),
                timeout=1
            )
            self.assertTrue(scheduler_task.done())
            self.assertTrue(consumer_task.done())
            self.assertEqual(result_queue.qsize(), total_jobs, "All schduled")

            # make sure these are correct urls
            jobs = []
            while result_queue.qsize():
                job = result_queue.get_nowait()
                result_queue.task_done()
                jobs.append(job["url"])
            self.assertFalse(set(jobs) - set(urls), "All urls are correct")
        except (asyncio.TimeoutError, asyncio.CancelledError) as e:
            self.fail("Scheduling is broken: %s" % e)

test_scheduling_visited()

W ostatnim teście sprawdzamy czy scheduler zakolejkuje odwiedzone adresy URL kiedy będą już wystarczająco "stare". Dodajemy URL, uruchamiamy scheduler, odbieramy rezultat. Następnie przestawiamy czas modyfikacji "pobranego" już zasobu i oczekujemy, że scheduler zakolejkuje go ponownie. Kod testu wygląda na długi, ale jest to po prostu "kopiuj-wklej". Przypadek jest na tyle prosty, że nie ma sensu generalizować tak małego fragmentu.

class TestSchedulerOperations(WorkerTestCase):
    ...
    async def test_scheduling_visited(self):
        url_queue = asyncio.Queue(maxsize=8)
        result_queue = asyncio.Queue()

        url = "http://localhost/1"
        self._insert_urls(url)

        scheduler = Scheduler(
            self.stop_ev,
            self.cache,
            url_queue,
            schedule_interval_sec=0.1,
            max_idle_sec=2,
        )

        scheduler_task = asyncio.create_task(scheduler())

        consumer_task = asyncio.create_task(
            self._queue_consumer(
                self.stop_ev,
                url_queue,
                result_queue,
                timeout=0.1,
                total_expected=1
            )
        )

        try:
            await asyncio.wait_for(consumer_task, timeout=1)
            self.assertEqual(result_queue.qsize(), 1, "Got job")
            result_queue.get_nowait()
            result_queue.task_done()

            # update processed url's mtime, so it's old
            cur = self.dbconn.cursor()
            cur.execute("UPDATE urls SET mtime = 0")
            cur.connection.commit()

            # start a consumer again
            consumer_task = asyncio.create_task(
                self._queue_consumer(
                    self.stop_ev,
                    url_queue,
                    result_queue,
                    timeout=0.1,
                    total_expected=1
                )
            )
            await asyncio.wait_for(consumer_task, timeout=1)
            self.assertEqual(result_queue.qsize(), 1, "Got job")
            result_queue.get_nowait()
            result_queue.task_done()
            self.stop_ev.set()
        except asyncio.TimeoutError as e:
            self.fail("Scheduling is broken (aged tasks): %r" % e)
        finally:
            await scheduler_task
            await consumer_task

Uruchamiamy testy

$ python3 -m unittest discover -v . -k TestScheduler
test_init (tests.test_scheduler.TestScheduler) ... ok
test_reacts_to_stop (tests.test_scheduler.TestSchedulerOperations) ... ok
test_scheduling (tests.test_scheduler.TestSchedulerOperations) ... ok
test_scheduling_simple (tests.test_scheduler.TestSchedulerOperations) ... ok
test_scheduling_visited (tests.test_scheduler.TestSchedulerOperations) ... ok
test_stop_when_idle (tests.test_scheduler.TestSchedulerOperations) ... ok

----------------------------------------------------------------------
Ran 6 tests in 3.734s

Podsumowanie

Testowanie tego kodu nie stwarza problemów.
Jak zazwyczaj - w testowaniu trudniej opracować scenariusze i mechanizmy niż napisać sam kod.
W następnej części omówimy implementację i testy ostatniego komponentu w obecnej wersji programu: klasy Downloader.