API gateway i mikroserwisy w Pythonie

Kacper Sieradziński
Kacper Sieradziński
20 kwietnia 2025Edukacja5 min czytania

API Gateway to kluczowy komponent architektury mikroserwisów — centralny punkt wejścia, który zarządza ruchem, bezpieczeństwem i integracją wielu niezależnych serwisów. W systemach z wieloma mikroserwisami użytkownik nie powinien komunikować się z każdym serwisem bezpośrednio. API Gateway ukrywa złożoność systemu, kieruje żądania do właściwych mikroserwisów i agreguje odpowiedzi, zapewniając jednocześnie jednolite API dla klientów. W tym przewodniku poznasz, jak zbudować API Gateway w Pythonie, jak zarządzać autoryzacją, routingiem i komunikacją między mikroserwisami.

Obraz główny API gateway i mikroserwisy w Pythonie

Czym jest API Gateway?

API Gateway to pojedynczy punkt wejścia do systemu mikroserwisów. Pełni rolę reverse proxy, który:

  • kieruje żądania do właściwych mikroserwisów,
  • agreguje odpowiedzi z wielu serwisów w jedną odpowiedź,
  • zarządza autoryzacją i uwierzytelnianiem,
  • implementuje rate limiting i throttling,
  • przekształca protokoły komunikacji (np. REST na gRPC),
  • obsługuje load balancing między instancjami serwisu,
  • agreguje logi i metryki z wszystkich serwisów.

Dzięki API Gateway klient nie musi wiedzieć o istnieniu wielu mikroserwisów — komunikuje się tylko z gateway'em, który zajmuje się resztą.

Dlaczego API Gateway jest potrzebny?

Bez API Gateway:

  • klient musi znać adresy wszystkich mikroserwisów,
  • każdy mikroserwis musi implementować autoryzację osobno,
  • trudno zarządzać versioning i breaking changes,
  • brak centralnego miejsca do monitorowania i logowania.

Z API Gateway:

  • klient komunikuje się tylko z jednym endpointem,
  • autoryzacja w jednym miejscu,
  • łatwiejsze zarządzanie wersjami API,
  • centralne logowanie i monitoring.

Implementacja API Gateway w Pythonie

W Pythonie możesz zbudować API Gateway używając FastAPI, Flask czy Django. FastAPI jest szczególnie dobrym wyborem ze względu na natywną obsługę asynchroniczności i łatwą integrację z innymi serwisami.

Podstawowy API Gateway z FastAPI

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from fastapi import FastAPI, HTTPException, Header, Request import httpx import asyncio app = FastAPI(title="API Gateway") # Konfiguracja serwisów SERVICES = { "users": "http://user-service:8000", "orders": "http://order-service:8000", "payments": "http://payment-service:8000", "products": "http://product-service:8000", } @app.get("/api/users/{user_id}") async def get_user(user_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"{SERVICES['users']}/users/{user_id}") if response.status_code == 200: return response.json() raise HTTPException(status_code=response.status_code, detail=response.text) @app.get("/api/orders/{order_id}") async def get_order(order_id: int): async with httpx.AsyncClient() as client: # Pobierz zamówienie order_response = await client.get(f"{SERVICES['orders']}/orders/{order_id}") if order_response.status_code != 200: raise HTTPException(status_code=404, detail="Order not found") order = order_response.json() # Agregacja danych z wielu serwisów import asyncio user_task = client.get(f"{SERVICES['users']}/users/{order['user_id']}") product_tasks = [ client.get(f"{SERVICES['products']}/products/{pid}") for pid in order.get('product_ids', []) ] user, *products = await asyncio.gather(user_task, *product_tasks) return { "order": order, "user": user.json() if user.status_code == 200 else None, "products": [p.json() for p in products if p.status_code == 200] }

Routing z regex i wildcards

Możesz użyć bardziej zaawansowanego routingu z FastAPI:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from fastapi import FastAPI, Request from starlette.routing import Route, Mount app = FastAPI() def proxy_request(request: Request, service_name: str): """Proxy request do odpowiedniego mikroserwisu""" service_url = SERVICES.get(service_name) if not service_url: raise HTTPException(status_code=404, detail=f"Service {service_name} not found") # Przekieruj request do mikroserwisu return httpx.AsyncClient().request( method=request.method, url=f"{service_url}{request.url.path}", headers=dict(request.headers), params=dict(request.query_params) )

Autoryzacja i uwierzytelnianie w Gateway

Jedną z najważniejszych funkcji API Gateway jest centralna autoryzacja. Dzięki temu logika autoryzacji nie musi być powielana w każdym mikroserwisie.

Middleware autoryzacji

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from fastapi import FastAPI, HTTPException, Depends, Header from functools import wraps import jwt app = FastAPI() SECRET_KEY = "your-secret-key" async def verify_token(authorization: str = Header(None)): """Weryfikacja tokena JWT""" if not authorization: raise HTTPException(status_code=401, detail="Missing authorization header") try: token = authorization.replace("Bearer ", "") payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") @app.get("/api/users/{user_id}") async def get_user(user_id: int, token_data: dict = Depends(verify_token)): # Token jest już zweryfikowany, przekaż user_id do mikroserwisu user_id_from_token = token_data.get("user_id") async with httpx.AsyncClient() as client: headers = {"X-User-ID": str(user_id_from_token)} response = await client.get( f"{SERVICES['users']}/users/{user_id}", headers=headers ) return response.json()

W ten sposób logika autoryzacji nie musi być powielana w każdym mikroserwisie. Gateway weryfikuje token i przekazuje informacje o użytkowniku do mikroserwisów w nagłówkach.

Rate limiting

Możesz również dodać rate limiting na poziomie gateway:

Python
1 2 3 4 5 6 7 8 9 10 from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter @app.get("/api/users/{user_id}") @limiter.limit("10/minute") async def get_user(request: Request, user_id: int): ...

Komunikacja między mikroserwisami

API Gateway może działać synchronicznie (REST/gRPC) lub asynchronicznie (kolejki). Wybór metody komunikacji zależy od wymagań: czy potrzebujesz natychmiastowej odpowiedzi, czy operacja może być asynchroniczna.

REST API

Prosty, czytelny i łatwy w debugowaniu. Działa w oparciu o HTTP i JSON. Dobry do systemów o niskiej złożoności.

Zalety:

  • Łatwy w implementacji i debugowaniu
  • Działa w każdej technologii
  • Czytelny format (JSON)
  • Dobrze wspierany przez narzędzia

Wady:

  • Mniej wydajny niż gRPC
  • Większy overhead protokołu

gRPC

Binarny protokół oparty o HTTP/2. Wysoka wydajność, automatyczne generowanie kodu klienta/serwera. Idealny dla komunikacji między serwisami w środowisku o dużej liczbie wywołań.

Przykład użycia gRPC w Gateway:

Python
1 2 3 4 5 6 7 8 9 10 import grpc from grpc import aio @app.get("/api/users/{user_id}") async def get_user(user_id: int): async with grpc.aio.insecure_channel('user-service:50051') as channel: stub = UserServiceStub(channel) request = UserRequest(user_id=user_id) response = await stub.GetUser(request) return {"id": response.id, "name": response.name}

Message Queues

Kolejki wiadomości (np. RabbitMQ, Kafka) pozwalają na asynchroniczną komunikację. Idealne dla systemów event-driven: jeden mikroserwis wysyła zdarzenie, inne reagują. Umożliwia niezależność czasową między serwisami.

Przykład integracji z RabbitMQ:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import pika import json async def publish_event(event_type: str, data: dict): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.queue_declare(queue=event_type) channel.basic_publish( exchange='', routing_key=event_type, body=json.dumps(data) ) connection.close() @app.post("/api/orders") async def create_order(order: OrderCreate): # Zapisz zamówienie order_response = await save_order(order) # Wyślij zdarzenie asynchronicznie await publish_event("order_created", {"order_id": order_response["id"]}) return order_response

Service Discovery

W systemie z wieloma mikroserwisami dynamicznie uruchamianymi w kontenerach (np. Docker, Kubernetes) trudno zarządzać adresami każdego serwisu. Service Discovery rozwiązuje ten problem — pozwala mikroserwisom automatycznie się odnajdywać.

Popularne rozwiązania

Consul – rejestr usług i zdrowia serwisów. Pozwala na dynamiczne rejestrowanie i wykrywanie serwisów:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import consul c = consul.Consul() # Rejestracja serwisu c.agent.service.register( 'user-service', service_id='user-service-1', address='user-service', port=8000, check=consul.Check.tcp('user-service', 8000, "10s") ) # Odnalezienie serwisu services = c.health.service('user-service', passing=True)[1] service_address = f"{services[0]['Service']['Address']}:{services[0]['Service']['Port']}"

Kubernetes Service Registry – automatyczny DNS dla mikroserwisów. W Kubernetes każdy serwis ma własny DNS:

Python
1 2 # W Kubernetes możesz używać nazw serwisów jako hostnames user_service_url = "http://user-service.default.svc.cluster.local:8000"

etcd / Zookeeper – rozproszone przechowywanie metadanych usług.

Dzięki Service Discovery gateway nie musi mieć statycznej listy adresów — może pytać rejestr o aktualne lokalizacje mikroserwisów.

Dynamiczny routing z Service Discovery

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from fastapi import FastAPI import consul app = FastAPI() c = consul.Consul() def get_service_url(service_name: str) -> str: """Pobierz URL serwisu z Consul""" services = c.health.service(service_name, passing=True)[1] if not services: raise HTTPException(status_code=503, detail=f"Service {service_name} unavailable") service = services[0]['Service'] return f"http://{service['Address']}:{service['Port']}" @app.get("/api/{service_name}/{path:path}") async def proxy_request(service_name: str, path: str): service_url = get_service_url(service_name) async with httpx.AsyncClient() as client: response = await client.get(f"{service_url}/{path}") return response.json()

Deployment w architekturze mikroserwisów

Każdy mikroserwis oraz gateway działa w osobnym kontenerze. Najczęściej stosuje się Docker Compose dla developmentu lub Kubernetes dla produkcji.

Przykład docker-compose.yml

YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 version: "3.9" services: gateway: build: ./gateway ports: - "80:8000" environment: - CONSUL_HOST=consul - REDIS_HOST=redis depends_on: - consul - redis user-service: build: ./user-service ports: - "8001:8000" environment: - DATABASE_URL=postgresql://user:pass@user-db:5432/users depends_on: - user-db order-service: build: ./order-service ports: - "8002:8000" environment: - DATABASE_URL=postgresql://user:pass@order-db:5432/orders - USER_SERVICE_URL=http://user-service:8000 depends_on: - order-db - user-service payment-service: build: ./payment-service ports: - "8003:8000" depends_on: - payment-db consul: image: consul:latest ports: - "8500:8500" command: consul agent -dev -client=0.0.0.0 redis: image: redis:7 user-db: image: postgres:15 environment: POSTGRES_DB: users POSTGRES_USER: user POSTGRES_PASSWORD: pass volumes: - user_data:/var/lib/postgresql/data order-db: image: postgres:15 environment: POSTGRES_DB: orders POSTGRES_USER: user POSTGRES_PASSWORD: pass volumes: - order_data:/var/lib/postgresql/data volumes: user_data: order_data:

Gateway nasłuchuje na porcie 80 i przekierowuje ruch do odpowiednich mikroserwisów. Komunikacja między kontenerami odbywa się po nazwach hostów w sieci Dockera.

Kubernetes deployment

Dla produkcji warto użyć Kubernetes z Service objects:

YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 apiVersion: apps/v1 kind: Deployment metadata: name: api-gateway spec: replicas: 2 selector: matchLabels: app: api-gateway template: metadata: labels: app: api-gateway spec: containers: - name: gateway image: api-gateway:latest ports: - containerPort: 8000 --- apiVersion: v1 kind: Service metadata: name: api-gateway spec: selector: app: api-gateway ports: - port: 80 targetPort: 8000 type: LoadBalancer

Monitoring i observability

W architekturze mikroserwisów trudno diagnozować błędy, dlatego potrzebne są narzędzia do zbierania logów i metryk. Gateway powinien być pierwszym miejscem, które zbiera dane o błędach, czasie odpowiedzi i obciążeniu mikroserwisów.

Polecane narzędzia

Prometheus + Grafana – metryki wydajności:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from prometheus_client import Counter, Histogram, generate_latest from fastapi import FastAPI, Response, Request import time request_count = Counter('gateway_requests_total', 'Total requests', ['service', 'method', 'status']) request_duration = Histogram('gateway_request_duration_seconds', 'Request duration', ['service']) @app.middleware("http") async def track_metrics(request: Request, call_next): start_time = time.time() response = await call_next(request) duration = time.time() - start_time service_name = request.url.path.split('/')[2]# /api/{service}/... request_count.labels( service=service_name, method=request.method, status=response.status_code ).inc() request_duration.labels(service=service_name).observe(duration) return response @app.get("/metrics") def metrics(): return Response(content=generate_latest(), media_type="text/plain")

ELK Stack (Elasticsearch + Logstash + Kibana) – scentralizowane logi:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import logging import json from pythonjsonlogger import jsonlogger # Konfiguracja logowania do ELK log_handler = logging.StreamHandler() formatter = jsonlogger.JsonFormatter() log_handler.setFormatter(formatter) logger = logging.getLogger() logger.addHandler(log_handler) @app.middleware("http") async def log_requests(request: Request, call_next): logger.info("request", extra={ "method": request.method, "path": request.url.path, "ip": request.client.host }) response = await call_next(request) logger.info("response", extra={ "status_code": response.status_code, "path": request.url.path }) return response

Jaeger / OpenTelemetry – śledzenie przepływu requestów między serwisami (distributed tracing):

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from opentelemetry import trace from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.exporter.jaeger import JaegerExporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor trace.set_tracer_provider(TracerProvider()) tracer = trace.get_tracer(__name__) jaeger_exporter = JaegerExporter( agent_host_name="jaeger", agent_port=6831, ) trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(jaeger_exporter) ) FastAPIInstrumentor.instrument_app(app)

Distributed tracing pozwala śledzić request przez wszystkie serwisy, co jest kluczowe w debugowaniu problemów w rozproszonym systemie.

Zalety i wyzwania

Architektura z API Gateway ma swoje zalety i wyzwania. Ważne, aby rozumieć obie strony przed implementacją.

Zalety

  • Jedno miejsce dla logiki autoryzacji i routingu — łatwiejsze utrzymanie i aktualizacja.
  • Lepsze bezpieczeństwo i kontrola ruchu — centralne miejsce do implementacji zabezpieczeń.
  • Możliwość wprowadzania cache i limitów żądań — optymalizacja wydajności na poziomie gateway.
  • Skalowalność — gateway można replikować niezależnie od mikroserwisów.
  • Uproszczenie interfejsu klienta — klient nie musi znać struktury mikroserwisów.
  • Versioning API — łatwiejsze zarządzanie wersjami (np. /v1/, /v2/).

Wyzwania

  • Dodatkowa warstwa infrastruktury — może być single point of failure (rozwiązanie: replikacja gateway).
  • Konieczność wdrożenia monitoringu i health-checków — gateway musi monitorować stan mikroserwisów.
  • Potrzeba automatycznego wykrywania usług (service discovery) — bez tego trudno zarządzać dynamicznymi serwisami.
  • Potencjalne wąskie gardło — jeśli gateway nie jest skalowany, może stać się bottleneckiem.
  • Zwiększona złożoność — więcej komponentów do zarządzania i monitorowania.

Rozwiązanie problemu single point of failure

Aby uniknąć single point of failure, replikuj gateway:

YAML
1 2 3 4 5 6 7 # Kubernetes - wiele replik gateway z load balancerem apiVersion: apps/v1 kind: Deployment metadata: name: api-gateway spec: replicas: 3# Trzy instancje gateway

Podsumowanie

API Gateway to kluczowy komponent każdej architektury mikroserwisowej. Zarządza ruchem, bezpieczeństwem, autoryzacją i integracją serwisów, jednocześnie ukrywając złożoność systemu przed użytkownikiem końcowym.

Python — dzięki bibliotekom takim jak FastAPI, httpx, aiohttp — pozwala stworzyć nowoczesny gateway równie wydajny jak rozwiązania w Node.js czy Go. Asynchroniczność FastAPI sprawia, że gateway może obsługiwać tysiące równoczesnych połączeń.

Jeśli planujesz architekturę mikroserwisową:

  1. Zdefiniuj odpowiedzialność każdego serwisu — jasno określ granice domenowe.
  2. Wprowadź API Gateway jako punkt wejścia — zacznij od prostego routingu, stopniowo dodawaj funkcjonalności.
  3. Zadbaj o monitoring, service discovery i bezpieczeństwo — to kluczowe elementy, bez których system będzie trudny w utrzymaniu.
  4. Replikuj gateway — unikaj single point of failure.
  5. Mierz wydajność — monitoruj czasy odpowiedzi, błędy i obciążenie każdego serwisu.

Pamiętaj, że API Gateway to narzędzie — użyj go tam, gdzie ma sens. Dla małych systemów może być niepotrzebną komplikacją, ale dla większych, rozproszonych systemów jest praktycznie niezbędny.