Cykl: Automatyzacja procesów z n8n — od danych do AI w jednym narzędziu · Część 5/20

Harmonogramy zadań w Pythonie: cron, APScheduler, asyncio

Kacper Sieradziński
Kacper Sieradziński14 stycznia 2025 · 1 min czytania
Streszczenie
  • APScheduler - zaawansowane harmonogramy
  • Prosty harmonogram z schedule
  • Asynchroniczne harmonogramy z asyncio
  • Kombinowanie zadań równoległych
Harmonogramy zadań w Pythonie: cron, APScheduler, asyncio

Harmonogramy zadań to serce każdej poważnej automatyzacji — pozwalają na automatyczne uruchamianie skryptów o określonych porach, bez konieczności pamiętania o ręcznym ich wywoływaniu. W Pythonie masz do dyspozycji szeroki wachlarz narzędzi: od prostych bibliotek jak schedule, przez zaawansowane APScheduler z pełną obsługą cron-like wyrażeń, po asynchroniczne rozwiązania oparte na asyncio, które pozwalają na równoległe wykonywanie wielu zadań jednocześnie. Właściwe zastosowanie harmonogramów przekształca jednorazowe skrypty w niezawodne systemy automatyzacji, które pracują w tle i wspierają codzienne operacje biznesowe.

APScheduler - zaawansowane harmonogramy

Biblioteka APScheduler oferuje elastyczne harmonogramy:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from datetime import datetime def scheduled_task(): """Zadanie do wykonania według harmonogramu.""" print(f"[{datetime.now()}] Wykonuję zadanie harmonogramowe") # Tworzenie harmonogramu scheduler = BlockingScheduler() # Codziennie o 8:00 scheduler.add_job( scheduled_task, trigger=CronTrigger(hour=8, minute=0), id='daily_task', name='Zadanie codzienne' ) # Co godzinę scheduler.add_job( scheduled_task, trigger=CronTrigger(minute=0), id='hourly_task' ) # W poniedziałki o 9:00 scheduler.add_job( scheduled_task, trigger=CronTrigger(day_of_week='mon', hour=9), id='weekly_task' ) # Uruchomienie harmonogramu APScheduler scheduler.start()

Prosty harmonogram z schedule

Biblioteka schedule dla prostszych przypadków:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import schedule import time from datetime import datetime def job(): print(f"[{datetime.now()}] Wykonuję zadanie") # Harmonogramy schedule.every(10).seconds.do(job) schedule.every().minute.do(job) schedule.every().hour.do(job) schedule.every().day.at("08:00").do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) # Uruchomienie harmonogramu schedule while True: schedule.run_pending() time.sleep(1)

Asynchroniczne harmonogramy z asyncio

Kurs · 24 lekcje8h 14m
Kurs

Kurs Python dla początkujących — PyStart

Zacznij programować w Pythonie! Idealne dla osób bez doświadczenia. Praktyczne zadania, projekty i wsparcie społeczności.

  • 24 lekcje wideo + 80 ćwiczeń
  • Realne bazy z e-commerce
  • Społeczność i code-review
499 zł799 zł−38%
Rozpocznij naukę

Harmonogramy asynchroniczne dla lepszej wydajności:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncio from datetime import datetime async def async_task(): """Asynchroniczne zadanie.""" print(f"[{datetime.now()}] Wykonuję asynchroniczne zadanie") await asyncio.sleep(1) print(f"[{datetime.now()}] Zadanie zakończone") async def periodic_task(interval: int): """Zadanie uruchamiane cyklicznie.""" while True: await async_task() await asyncio.sleep(interval) # Uruchomienie asynchronicznego harmonogramu asyncio.run(periodic_task(interval=60)) # Co 60 sekund

Kombinowanie zadań równoległych

Uruchamianie wielu zadań równolegle:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncio from datetime import datetime async def task1(): print(f"[{datetime.now()}] Zadanie 1") await asyncio.sleep(2) async def task2(): print(f"[{datetime.now()}] Zadanie 2") await asyncio.sleep(3) async def run_parallel_tasks(): """Uruchamia zadania równolegle.""" await asyncio.gather(task1(), task2()) print(f"[{datetime.now()}] Wszystkie zadania zakończone") asyncio.run(run_parallel_tasks())

Harmonogram z APScheduler - zaawansowany

Kompletny przykład z obsługą błędów:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from datetime import datetime import logging logging.basicConfig(level=logging.INFO) def job_with_error_handling(): """Zadanie z obsługą błędów.""" try: print(f"[{datetime.now()}] Wykonuję zadanie") # Logika zadania result = 1 + 1 print(f"Wynik: {result}") except Exception as e: print(f"Błąd w zadaniu: {e}") scheduler = BlockingScheduler() # Harmonogram z obsługą błędów scheduler.add_job( job_with_error_handling, trigger=CronTrigger(minute='*/5'), # Co 5 minut id='periodic_job', max_instances=1 # Tylko jedna instancja naraz ) scheduler.start()

Podsumowanie

Newsletter · co środę

Python co tydzień — newsletter dla programistów

Otrzymuj codzienne ćwiczenia, ciekawostki z ekosystemu Pythona i wskazówki do rozmów rekrutacyjnych.

2 312 czytelników · ⭐ 4,8

Harmonogramy zadań w Pythonie obejmują:

  • APScheduler dla zaawansowanych harmonogramów
  • schedule dla prostych przypadków
  • asyncio dla asynchronicznych zadań
  • Równoległość dla wydajności
  • Obsługę błędów dla niezawodności

Te narzędzia pozwalają na kompletną automatyzację zadań czasowych w Pythonie.

➡️ Następny artykuł

Po opanowaniu harmonogramów, naucz się zarządzać zadaniami i pipeline'ami:

Lista zadań i pipeline'y w Pythonie: makefile/invoke/fabric — standardyzacja komend, tworzenie powtarzalnych procesów i automatyzacja workflow'ów budowania i wdrażania.

Część 6 z 20

Konwersja plików do PDF i z PDF w Pythonie

druga lekcja cyklu „Automatyzacja procesów z n8n — od danych do AI w jednym narzędziu"

Czytaj kolejny →