Czym są kolejki?
Kolejka (queue) to struktura danych typu FIFO (First In, First Out), która umożliwia bezpieczną komunikację między różnymi częściami programu działającymi współbieżnie. W Pythonie kolejki są thread-safe lub process-safe, co oznacza, że można ich bezpiecznie używać w środowisku wielowątkowym lub wieloprocesowym.
Jeśli chcesz poznać podstawy asynchroniczności, sprawdź Asynchroniczność w Pythonie z asyncio.
asyncio.Queue – kolejki dla korutyn
asyncio.Queue to kolejka zaprojektowana specjalnie dla korutyn. Pozwala na bezpieczne przekazywanie danych między korutynami bez blokowania pętli zdarzeń.
Podstawowe użycie asyncio.Queue
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43import asyncio async def producer(queue, items): """Producent dodaje elementy do kolejki""" for item in items: await queue.put(item) print(f"Producent dodał: {item}") await asyncio.sleep(0.1) # Symulacja pracy # Sygnał zakończenia await queue.put(None) async def consumer(queue): """Konsument pobiera elementy z kolejki""" while True: item = await queue.get() if item is None: # Sygnał zakończenia break print(f"Konsument przetworzył: {item}") await asyncio.sleep(0.2) # Symulacja pracy # Oznacz zadanie jako wykonane queue.task_done() async def main(): queue = asyncio.Queue(maxsize=5) # Maksymalna pojemność # Utworzenie zadań producer_task = asyncio.create_task( producer(queue, ['A', 'B', 'C', 'D', 'E']) ) consumer_task = asyncio.create_task(consumer(queue)) # Oczekiwanie na zakończenie await producer_task await consumer_task # Oczekiwanie na zakończenie wszystkich zadań w kolejce await queue.join() asyncio.run(main())
Metody asyncio.Queue
put() i get()
Python1 2 3 4 5 6 7 8 9queue = asyncio.Queue(maxsize=3) # Dodawanie elementów await queue.put("item1") await queue.put("item2") # Pobieranie elementów item = await queue.get() # "item1" queue.task_done() # Ważne: oznacz zadanie jako wykonane
put_nowait() i get_nowait()
Nieblokujące wersje (rzucają wyjątek, jeśli kolejka jest pełna/pusta):
Python1 2 3 4 5 6 7 8 9 10 11 12 13queue = asyncio.Queue(maxsize=2) try: queue.put_nowait("item1") queue.put_nowait("item2") queue.put_nowait("item3") # QueueFull except asyncio.QueueFull: print("Kolejka pełna") try: item = queue.get_nowait() except asyncio.QueueEmpty: print("Kolejka pusta")
join() i task_done()
join() czeka, aż wszystkie zadania zostaną oznaczone jako wykonane przez task_done():
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25async def worker(queue): while True: item = await queue.get() try: # Przetwarzanie await process_item(item) finally: queue.task_done() # Oznacz jako wykonane async def main(): queue = asyncio.Queue() # Dodaj zadania for i in range(10): await queue.put(i) # Utwórz workerów workers = [asyncio.create_task(worker(queue)) for _ in range(3)] # Czekaj, aż wszystkie zadania zostaną wykonane await queue.join() # Zatrzymaj workerów for w in workers: w.cancel()
Wielu producentów i konsumentów
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43import asyncio async def producer(queue, name, items): for item in items: await queue.put(item) print(f"Producent {name} dodał: {item}") await asyncio.sleep(0.1) await queue.put(None) async def consumer(queue, name): while True: item = await queue.get() if item is None: await queue.put(None) # Przekaż sygnał następnemu konsumentowi break print(f"Konsument {name} przetworzył: {item}") await asyncio.sleep(0.2) queue.task_done() async def main(): queue = asyncio.Queue(maxsize=10) # Wielu producentów producers = [ asyncio.create_task(producer(queue, f"P{i}", range(5))) for i in range(3) ] # Wielu konsumentów consumers = [ asyncio.create_task(consumer(queue, f"C{i}")) for i in range(2) ] await asyncio.gather(*producers) await queue.join() for c in consumers: c.cancel() asyncio.run(main())
PriorityQueue – kolejka priorytetowa
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19import asyncio async def main(): queue = asyncio.PriorityQueue() # Dodaj elementy z priorytetami (niższa liczba = wyższy priorytet) await queue.put((3, "niski priorytet")) await queue.put((1, "wysoki priorytet")) await queue.put((2, "średni priorytet")) while not queue.empty(): priority, item = await queue.get() print(f"Priorytet {priority}: {item}") asyncio.run(main()) # Output: # Priorytet 1: wysoki priorytet # Priorytet 2: średni priorytet # Priorytet 3: niski priorytet
LifoQueue – kolejka LIFO (Last In, First Out)
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15import asyncio async def main(): queue = asyncio.LifoQueue() await queue.put("pierwszy") await queue.put("drugi") await queue.put("trzeci") while not queue.empty(): item = await queue.get() print(item) # Output: trzeci, drugi, pierwszy asyncio.run(main())
multiprocessing.Queue – kolejki między procesami
multiprocessing.Queue pozwala na komunikację między różnymi procesami. Jest process-safe i może być używana przez procesy działające równolegle.
Podstawowe użycie multiprocessing.Queue
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34from multiprocessing import Process, Queue import time def producer(queue, items): """Producent w osobnym procesie""" for item in items: queue.put(item) print(f"Producent dodał: {item}") time.sleep(0.1) queue.put(None) # Sygnał zakończenia def consumer(queue): """Konsument w osobnym procesie""" while True: item = queue.get() if item is None: break print(f"Konsument przetworzył: {item}") time.sleep(0.2) if __name__ == "__main__": queue = Queue() # Utworzenie procesów p = Process(target=producer, args=(queue, ['A', 'B', 'C'])) c = Process(target=consumer, args=(queue,)) # Uruchomienie p.start() c.start() # Oczekiwanie na zakończenie p.join() c.join()
Manager().Queue() – współdzielona kolejka
Użycie Manager().Queue() pozwala na współdzielenie kolejki między wieloma procesami:
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36from multiprocessing import Process, Manager def worker(queue, worker_id): while True: item = queue.get() if item is None: break print(f"Worker {worker_id} przetworzył: {item}") queue.task_done() if __name__ == "__main__": with Manager() as manager: queue = manager.Queue() # Dodaj zadania for i in range(10): queue.put(i) # Utwórz workerów workers = [ Process(target=worker, args=(queue, i)) for i in range(3) ] for w in workers: w.start() # Oczekiwanie na zakończenie zadań queue.join() # Zatrzymaj workerów for _ in workers: queue.put(None) for w in workers: w.join()
Różnice między multiprocessing.Queue a Manager().Queue()
- Queue() – szybsza, ale działa tylko między procesami, które ją stworzyły
- Manager().Queue() – wolniejsza, ale może być współdzielona przez dowolne procesy
Wzorzec producer-consumer
Wzorzec producer-consumer jest powszechny w programowaniu współbieżnym:
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55import asyncio import aiohttp async def fetch_urls(queue, urls): """Producent: pobiera URL-e i dodaje do kolejki""" async with aiohttp.ClientSession() as session: for url in urls: async with session.get(url) as response: data = await response.text() await queue.put((url, data)) await queue.put(None) async def process_data(queue, worker_id): """Konsument: przetwarza dane z kolejki""" processed = 0 while True: item = await queue.get() if item is None: await queue.put(None) # Przekaż sygnał dalej break url, data = item # Przetwarzanie danych result = len(data) print(f"Worker {worker_id}: {url} -> {result} bajtów") processed += 1 queue.task_done() print(f"Worker {worker_id} przetworzył {processed} elementów") async def main(): urls = [ "https://example.com", "https://python.org", "https://github.com", ] * 10 queue = asyncio.Queue(maxsize=20) # Producent producer = asyncio.create_task(fetch_urls(queue, urls)) # Konsumenci workers = [ asyncio.create_task(process_data(queue, i)) for i in range(3) ] await producer await queue.join() for w in workers: w.cancel() asyncio.run(main())
Kolejki z timeout
Czasami chcemy poczekać na element z timeoutem:
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20import asyncio async def get_with_timeout(queue, timeout=5.0): """Pobierz element z kolejki z timeoutem""" try: item = await asyncio.wait_for(queue.get(), timeout=timeout) return item except asyncio.TimeoutError: print("Timeout - kolejka pusta") return None async def main(): queue = asyncio.Queue() # Spróbuj pobrać z timeoutem item = await get_with_timeout(queue, timeout=2.0) if item is None: print("Nie udało się pobrać elementu") asyncio.run(main())
Obsługa błędów w kolejkach
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40import asyncio async def robust_worker(queue): """Worker z obsługą błędów""" while True: try: item = await queue.get() if item is None: break try: # Przetwarzanie może rzucić wyjątek await process_item(item) except Exception as e: print(f"Błąd przetwarzania {item}: {e}") # Możesz dodać item do kolejki błędów finally: queue.task_done() except asyncio.CancelledError: print("Worker anulowany") raise async def process_item(item): if item == "error": raise ValueError("Symulowany błąd") print(f"Przetworzono: {item}") async def main(): queue = asyncio.Queue() # Dodaj zadania for item in ["ok1", "error", "ok2"]: await queue.put(item) await queue.put(None) worker_task = asyncio.create_task(robust_worker(queue)) await queue.join() worker_task.cancel() asyncio.run(main())
Najlepsze praktyki
1. Używaj odpowiedniego typu kolejki
- asyncio.Queue – dla korutyn (asynchroniczne)
- multiprocessing.Queue – dla procesów (równoległe)
- queue.Queue – dla wątków (wielowątkowe)
2. Ustaw maxsize rozsądnie
Zbyt mała kolejka może blokować producentów, zbyt duża może zużywać dużo pamięci. Jeśli masz problemy z pamięcią, sprawdź Zaawansowane zarządzanie pamięcią i garbage collector.
3. Zawsze używaj task_done()
Pamiętaj o queue.task_done() po przetworzeniu elementu, inaczej join() będzie czekać w nieskończoność.
4. Sygnalizuj zakończenie
Używaj sentinel value (np. None) do sygnalizowania zakończenia pracy.
5. Obsługuj błędy
Kolejki mogą być źródłem błędów. Obsługuj wyjątki odpowiednio.
6. Monitoruj rozmiar kolejki
W dużych systemach monitoruj rozmiar kolejki, aby wykrywać bottlenecky. Profilowanie może pomóc zidentyfikować problemy — więcej w Profilowanie wydajności w Pythonie.
Podsumowanie
Asynchroniczne kolejki to potężne narzędzie do koordynowania pracy w programach współbieżnych. Kluczowe typy:
- ✅ asyncio.Queue – dla korutyn i programów asynchronicznych
- ✅ multiprocessing.Queue – dla komunikacji między procesami
- ✅ PriorityQueue – dla zadań z priorytetami
- ✅ LifoQueue – dla zadań w kolejności LIFO
Pamiętaj o wzorcu producer-consumer, obsłudze błędów i odpowiednim sygnalizowaniu zakończenia pracy.
Co dalej?
Rozwijaj umiejętności współbieżności:
- Asynchroniczność w Pythonie z asyncio – podstawy asynchroniczności
- Wielowątkowość a wieloprocesowość w Pythonie – różnice między threading a multiprocessing
- Integracja generatorów z asynchronicznym programowaniem – zaawansowane wzorce
- Profilowanie wydajności w Pythonie – mierzenie efektywności kolejek
Pamiętaj: kolejki to tylko jeden z mechanizmów komunikacji. Wybierz odpowiednie narzędzie do swojego przypadku użycia!



