Automatyzacja API i integracje w Pythonie

Kacper Sieradziński
Kacper Sieradziński
6 stycznia 2025Edukacja2 min czytania

Automatyzacja integracji z zewnętrznymi API to dziś niezbędna umiejętność w automatyzacji — większość nowoczesnych systemów komunikuje się przez REST API, webhooki i inne protokoły sieciowe. Python z biblioteką requests oraz zaawansowanymi wzorcami obsługi błędów, retry i rate limiting oferuje wszystko, czego potrzebujesz do tworzenia niezawodnych integracji. Niezależnie od tego, czy pobierasz dane z systemu CRM, synchronizujesz informacje między platformami, czy automatycznie wysyłasz powiadomienia — właściwie zaprojektowana integracja API może zautomatyzować całe workflow'y biznesowe, eliminując potrzebę ręcznej pracy i znacząco przyspieszając procesy.

Obraz główny Automatyzacja API i integracje w Pythonie

Podstawowe zapytania API

Podstawowe zapytania do API wykonuje się za pomocą biblioteki requests, która jest de facto standardem w Pythonie do pracy z HTTP. Kluczowe jest jednak właściwe obsłużenie błędów — sieć jest nieprzewidywalna, API mogą być czasowo niedostępne, a odpowiedzi mogą zawierać błędy. Dobrze zaprojektowana funkcja powinna obsługiwać timeouts, błędy HTTP oraz wyjątki sieciowe, zwracając None lub odpowiedni komunikat błędu zamiast rzucać wyjątkami, które przerywają cały proces automatyzacji. Oto przykład solidnej implementacji:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import requests from typing import Optional def get_api_data(url: str, headers: Optional[dict] = None) -> Optional[dict]: """Wykonuje GET request z obsługą błędów.""" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.Timeout: print("Timeout: Zapytanie przekroczyło limit czasu") return None except requests.exceptions.HTTPError as e: print(f"Błąd HTTP: {e}") return None except requests.exceptions.RequestException as e: print(f"Błąd zapytania: {e}") return None data = get_api_data("https://api.example.com/data")

Retry z exponential backoff

W rzeczywistych systemach czasowe błędy sieciowe są częste — API może być chwilowo przeciążone, może dojść do timeoutu lub tymczasowego problemu z połączeniem. Zamiast natychmiast rezygnować, warto zaimplementować mechanizm automatycznego ponawiania z exponential backoff, który zwiększa czas oczekiwania między kolejnymi próbami. To pozwala systemowi na odzyskanie sprawności, jednocześnie nie przeciążając go zbyt częstymi zapytaniami. Oto przykład dekoratora, który automatyzuje ten proces:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import requests import time from functools import wraps def retry_with_backoff(max_retries: int = 3, backoff_factor: float = 2.0): """Decorator do automatycznego ponawiania z exponential backoff.""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except requests.exceptions.RequestException as e: if attempt == max_retries - 1: raise wait_time = backoff_factor ** attempt print(f"Ponawianie za {wait_time}s (próba {attempt + 1}/{max_retries})") time.sleep(wait_time) return None return wrapper return decorator @retry_with_backoff(max_retries=3) def fetch_api_data(url: str): response = requests.get(url, timeout=10) response.raise_for_status() return response.json() data = fetch_api_data("https://api.example.com/data")

Obsługa limitów API (rate limiting)

Respektowanie limitów API:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import requests import time from datetime import datetime class RateLimitedAPI: def __init__(self, requests_per_minute: int = 60): self.requests_per_minute = requests_per_minute self.requests_times = [] def wait_if_needed(self): """Czeka jeśli osiągnięto limit.""" now = time.time() # Usuń stare wpisy (starsze niż 1 minuta) self.requests_times = [t for t in self.requests_times if now - t < 60] if len(self.requests_times) >= self.requests_per_minute: sleep_time = 60 - (now - self.requests_times[0]) + 1 print(f"Osiągnięto limit. Czekam {sleep_time:.2f}s...") time.sleep(sleep_time) self.requests_times.append(time.time()) def get(self, url: str, **kwargs): """Wykonuje GET z rate limiting.""" self.wait_if_needed() return requests.get(url, **kwargs) api = RateLimitedAPI(requests_per_minute=30) response = api.get("https://api.example.com/data")

Bezpieczne zarządzanie kluczami API

Prawidłowe przechowywanie kluczy:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import os from dotenv import load_dotenv # Wczytanie z pliku .env load_dotenv() def get_api_key(key_name: str) -> str: """Bezpieczne pobieranie klucza API z zmiennych środowiskowych.""" api_key = os.getenv(key_name) if not api_key: raise ValueError(f"Klucz API '{key_name}' nie został znaleziony w zmiennych środowiskowych") return api_key # Użycie api_key = get_api_key("API_KEY") headers = {"Authorization": f"Bearer {api_key}"} # Albo alternatywnie z os api_key = os.environ.get("API_KEY", "")

Webhooki

Odbieranie webhooków i wysyłanie danych:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from flask import Flask, request, jsonify app = Flask(__name__) @app.route('/webhook', methods=['POST']) def handle_webhook(): """Odbiera webhook i przetwarza dane.""" data = request.get_json() # Przetwarzanie danych z webhooka event_type = data.get("type") payload = data.get("data", {}) # Logika przetwarzania print(f"Odebrano webhook: {event_type}") print(f"Dane: {payload}") return jsonify({"status": "success"}), 200 # Uruchomienie serwera webhook if __name__ == '__main__': app.run(port=5000)

Wysyłanie danych przez API

POST requests z walidacją:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import requests import json def post_api_data(url: str, data: dict, headers: Optional[dict] = None) -> bool: """Wysyła dane przez API z walidacją.""" if headers is None: headers = {"Content-Type": "application/json"} try: response = requests.post( url, json=data, headers=headers, timeout=10 ) response.raise_for_status() print(f"Dane wysłane pomyślnie: {response.status_code}") return True except requests.exceptions.HTTPError as e: print(f"Błąd wysyłania: {e}") print(f"Odpowiedź: {response.text}") return False # Przykład użycia data = { "name": "Jan Kowalski", "email": "jan@example.com", "action": "subscribe" } post_api_data("https://api.example.com/users", data)

Batch processing z API

Przetwarzanie wielu żądań:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import requests from concurrent.futures import ThreadPoolExecutor, as_completed def process_batch_requests(urls: list[str], max_workers: int = 5): """Przetwarza wiele zapytań API równolegle.""" results = {} with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_url = {executor.submit(requests.get, url): url for url in urls} for future in as_completed(future_to_url): url = future_to_url[future] try: response = future.result() results[url] = response.json() if response.status_code == 200 else None except Exception as e: results[url] = f"Błąd: {e}" return results urls = [ "https://api.example.com/data/1", "https://api.example.com/data/2", "https://api.example.com/data/3" ] results = process_batch_requests(urls, max_workers=3)

Podsumowanie

Automatyzacja API obejmuje:

  • Podstawowe zapytania z obsługą błędów
  • Retry z exponential backoff dla odporności
  • Rate limiting dla respektowania limitów
  • Bezpieczne klucze API z zmiennych środowiskowych
  • Webhooki do odbierania powiadomień
  • Batch processing dla wydajności

Te wzorce są podstawą niezawodnych integracji API w Pythonie.


➡️ Następny artykuł

Po opanowaniu integracji API, naucz się walidować i czyścić dane tekstowe:

Walidacja i czyszczenie tekstu w Pythonie — normalizacja tekstu, wykrywanie anomalii, walidacja formatów i kontrola jakości danych w automatyzacji.