Asynchroniczne kolejki w Pythonie – asyncio.Queue i multiprocessing.Queue

Kacper Sieradziński
Kacper Sieradziński
29 lipca 2025Edukacja3 min czytania

Kolejki to fundamentalny mechanizm komunikacji w programowaniu współbieżnym. W Pythonie mamy różne typy kolejek dla różnych scenariuszy: asyncio.Queue dla korutyn, multiprocessing.Queue dla procesów, i queue.Queue dla wątków. W tym artykule poznasz, jak używać asynchronicznych kolejek do koordynowania pracy między korutynami i procesami, implementować wzorce producer-consumer i budować skalowalne systemy przetwarzania danych.

Obraz główny Asynchroniczne kolejki w Pythonie – asyncio.Queue i multiprocessing.Queue

Czym są kolejki?

Kolejka (queue) to struktura danych typu FIFO (First In, First Out), która umożliwia bezpieczną komunikację między różnymi częściami programu działającymi współbieżnie. W Pythonie kolejki są thread-safe lub process-safe, co oznacza, że można ich bezpiecznie używać w środowisku wielowątkowym lub wieloprocesowym.

Jeśli chcesz poznać podstawy asynchroniczności, sprawdź Asynchroniczność w Pythonie z asyncio.

asyncio.Queue – kolejki dla korutyn

asyncio.Queue to kolejka zaprojektowana specjalnie dla korutyn. Pozwala na bezpieczne przekazywanie danych między korutynami bez blokowania pętli zdarzeń.

Podstawowe użycie asyncio.Queue

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import asyncio async def producer(queue, items): """Producent dodaje elementy do kolejki""" for item in items: await queue.put(item) print(f"Producent dodał: {item}") await asyncio.sleep(0.1) # Symulacja pracy # Sygnał zakończenia await queue.put(None) async def consumer(queue): """Konsument pobiera elementy z kolejki""" while True: item = await queue.get() if item is None: # Sygnał zakończenia break print(f"Konsument przetworzył: {item}") await asyncio.sleep(0.2) # Symulacja pracy # Oznacz zadanie jako wykonane queue.task_done() async def main(): queue = asyncio.Queue(maxsize=5) # Maksymalna pojemność # Utworzenie zadań producer_task = asyncio.create_task( producer(queue, ['A', 'B', 'C', 'D', 'E']) ) consumer_task = asyncio.create_task(consumer(queue)) # Oczekiwanie na zakończenie await producer_task await consumer_task # Oczekiwanie na zakończenie wszystkich zadań w kolejce await queue.join() asyncio.run(main())

Metody asyncio.Queue

put() i get()

Python
1 2 3 4 5 6 7 8 9 queue = asyncio.Queue(maxsize=3) # Dodawanie elementów await queue.put("item1") await queue.put("item2") # Pobieranie elementów item = await queue.get() # "item1" queue.task_done() # Ważne: oznacz zadanie jako wykonane

put_nowait() i get_nowait()

Nieblokujące wersje (rzucają wyjątek, jeśli kolejka jest pełna/pusta):

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 queue = asyncio.Queue(maxsize=2) try: queue.put_nowait("item1") queue.put_nowait("item2") queue.put_nowait("item3") # QueueFull except asyncio.QueueFull: print("Kolejka pełna") try: item = queue.get_nowait() except asyncio.QueueEmpty: print("Kolejka pusta")

join() i task_done()

join() czeka, aż wszystkie zadania zostaną oznaczone jako wykonane przez task_done():

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 async def worker(queue): while True: item = await queue.get() try: # Przetwarzanie await process_item(item) finally: queue.task_done() # Oznacz jako wykonane async def main(): queue = asyncio.Queue() # Dodaj zadania for i in range(10): await queue.put(i) # Utwórz workerów workers = [asyncio.create_task(worker(queue)) for _ in range(3)] # Czekaj, aż wszystkie zadania zostaną wykonane await queue.join() # Zatrzymaj workerów for w in workers: w.cancel()

Wielu producentów i konsumentów

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import asyncio async def producer(queue, name, items): for item in items: await queue.put(item) print(f"Producent {name} dodał: {item}") await asyncio.sleep(0.1) await queue.put(None) async def consumer(queue, name): while True: item = await queue.get() if item is None: await queue.put(None) # Przekaż sygnał następnemu konsumentowi break print(f"Konsument {name} przetworzył: {item}") await asyncio.sleep(0.2) queue.task_done() async def main(): queue = asyncio.Queue(maxsize=10) # Wielu producentów producers = [ asyncio.create_task(producer(queue, f"P{i}", range(5))) for i in range(3) ] # Wielu konsumentów consumers = [ asyncio.create_task(consumer(queue, f"C{i}")) for i in range(2) ] await asyncio.gather(*producers) await queue.join() for c in consumers: c.cancel() asyncio.run(main())

PriorityQueue – kolejka priorytetowa

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncio async def main(): queue = asyncio.PriorityQueue() # Dodaj elementy z priorytetami (niższa liczba = wyższy priorytet) await queue.put((3, "niski priorytet")) await queue.put((1, "wysoki priorytet")) await queue.put((2, "średni priorytet")) while not queue.empty(): priority, item = await queue.get() print(f"Priorytet {priority}: {item}") asyncio.run(main()) # Output: # Priorytet 1: wysoki priorytet # Priorytet 2: średni priorytet # Priorytet 3: niski priorytet

LifoQueue – kolejka LIFO (Last In, First Out)

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncio async def main(): queue = asyncio.LifoQueue() await queue.put("pierwszy") await queue.put("drugi") await queue.put("trzeci") while not queue.empty(): item = await queue.get() print(item) # Output: trzeci, drugi, pierwszy asyncio.run(main())

multiprocessing.Queue – kolejki między procesami

multiprocessing.Queue pozwala na komunikację między różnymi procesami. Jest process-safe i może być używana przez procesy działające równolegle.

Podstawowe użycie multiprocessing.Queue

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from multiprocessing import Process, Queue import time def producer(queue, items): """Producent w osobnym procesie""" for item in items: queue.put(item) print(f"Producent dodał: {item}") time.sleep(0.1) queue.put(None) # Sygnał zakończenia def consumer(queue): """Konsument w osobnym procesie""" while True: item = queue.get() if item is None: break print(f"Konsument przetworzył: {item}") time.sleep(0.2) if __name__ == "__main__": queue = Queue() # Utworzenie procesów p = Process(target=producer, args=(queue, ['A', 'B', 'C'])) c = Process(target=consumer, args=(queue,)) # Uruchomienie p.start() c.start() # Oczekiwanie na zakończenie p.join() c.join()

Manager().Queue() – współdzielona kolejka

Użycie Manager().Queue() pozwala na współdzielenie kolejki między wieloma procesami:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from multiprocessing import Process, Manager def worker(queue, worker_id): while True: item = queue.get() if item is None: break print(f"Worker {worker_id} przetworzył: {item}") queue.task_done() if __name__ == "__main__": with Manager() as manager: queue = manager.Queue() # Dodaj zadania for i in range(10): queue.put(i) # Utwórz workerów workers = [ Process(target=worker, args=(queue, i)) for i in range(3) ] for w in workers: w.start() # Oczekiwanie na zakończenie zadań queue.join() # Zatrzymaj workerów for _ in workers: queue.put(None) for w in workers: w.join()

Różnice między multiprocessing.Queue a Manager().Queue()

  • Queue() – szybsza, ale działa tylko między procesami, które ją stworzyły
  • Manager().Queue() – wolniejsza, ale może być współdzielona przez dowolne procesy

Wzorzec producer-consumer

Wzorzec producer-consumer jest powszechny w programowaniu współbieżnym:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import asyncio import aiohttp async def fetch_urls(queue, urls): """Producent: pobiera URL-e i dodaje do kolejki""" async with aiohttp.ClientSession() as session: for url in urls: async with session.get(url) as response: data = await response.text() await queue.put((url, data)) await queue.put(None) async def process_data(queue, worker_id): """Konsument: przetwarza dane z kolejki""" processed = 0 while True: item = await queue.get() if item is None: await queue.put(None) # Przekaż sygnał dalej break url, data = item # Przetwarzanie danych result = len(data) print(f"Worker {worker_id}: {url} -> {result} bajtów") processed += 1 queue.task_done() print(f"Worker {worker_id} przetworzył {processed} elementów") async def main(): urls = [ "https://example.com", "https://python.org", "https://github.com", ] * 10 queue = asyncio.Queue(maxsize=20) # Producent producer = asyncio.create_task(fetch_urls(queue, urls)) # Konsumenci workers = [ asyncio.create_task(process_data(queue, i)) for i in range(3) ] await producer await queue.join() for w in workers: w.cancel() asyncio.run(main())

Kolejki z timeout

Czasami chcemy poczekać na element z timeoutem:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import asyncio async def get_with_timeout(queue, timeout=5.0): """Pobierz element z kolejki z timeoutem""" try: item = await asyncio.wait_for(queue.get(), timeout=timeout) return item except asyncio.TimeoutError: print("Timeout - kolejka pusta") return None async def main(): queue = asyncio.Queue() # Spróbuj pobrać z timeoutem item = await get_with_timeout(queue, timeout=2.0) if item is None: print("Nie udało się pobrać elementu") asyncio.run(main())

Obsługa błędów w kolejkach

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import asyncio async def robust_worker(queue): """Worker z obsługą błędów""" while True: try: item = await queue.get() if item is None: break try: # Przetwarzanie może rzucić wyjątek await process_item(item) except Exception as e: print(f"Błąd przetwarzania {item}: {e}") # Możesz dodać item do kolejki błędów finally: queue.task_done() except asyncio.CancelledError: print("Worker anulowany") raise async def process_item(item): if item == "error": raise ValueError("Symulowany błąd") print(f"Przetworzono: {item}") async def main(): queue = asyncio.Queue() # Dodaj zadania for item in ["ok1", "error", "ok2"]: await queue.put(item) await queue.put(None) worker_task = asyncio.create_task(robust_worker(queue)) await queue.join() worker_task.cancel() asyncio.run(main())

Najlepsze praktyki

1. Używaj odpowiedniego typu kolejki

  • asyncio.Queue – dla korutyn (asynchroniczne)
  • multiprocessing.Queue – dla procesów (równoległe)
  • queue.Queue – dla wątków (wielowątkowe)

2. Ustaw maxsize rozsądnie

Zbyt mała kolejka może blokować producentów, zbyt duża może zużywać dużo pamięci. Jeśli masz problemy z pamięcią, sprawdź Zaawansowane zarządzanie pamięcią i garbage collector.

3. Zawsze używaj task_done()

Pamiętaj o queue.task_done() po przetworzeniu elementu, inaczej join() będzie czekać w nieskończoność.

4. Sygnalizuj zakończenie

Używaj sentinel value (np. None) do sygnalizowania zakończenia pracy.

5. Obsługuj błędy

Kolejki mogą być źródłem błędów. Obsługuj wyjątki odpowiednio.

6. Monitoruj rozmiar kolejki

W dużych systemach monitoruj rozmiar kolejki, aby wykrywać bottlenecky. Profilowanie może pomóc zidentyfikować problemy — więcej w Profilowanie wydajności w Pythonie.

Podsumowanie

Asynchroniczne kolejki to potężne narzędzie do koordynowania pracy w programach współbieżnych. Kluczowe typy:

  • asyncio.Queue – dla korutyn i programów asynchronicznych
  • multiprocessing.Queue – dla komunikacji między procesami
  • PriorityQueue – dla zadań z priorytetami
  • LifoQueue – dla zadań w kolejności LIFO

Pamiętaj o wzorcu producer-consumer, obsłudze błędów i odpowiednim sygnalizowaniu zakończenia pracy.

Co dalej?

Rozwijaj umiejętności współbieżności:

Pamiętaj: kolejki to tylko jeden z mechanizmów komunikacji. Wybierz odpowiednie narzędzie do swojego przypadku użycia!