Czym jest API Gateway?
API Gateway to pojedynczy punkt wejścia do systemu mikroserwisów. Pełni rolę reverse proxy, który:
- kieruje żądania do właściwych mikroserwisów,
- agreguje odpowiedzi z wielu serwisów w jedną odpowiedź,
- zarządza autoryzacją i uwierzytelnianiem,
- implementuje rate limiting i throttling,
- przekształca protokoły komunikacji (np. REST na gRPC),
- obsługuje load balancing między instancjami serwisu,
- agreguje logi i metryki z wszystkich serwisów.
Dzięki API Gateway klient nie musi wiedzieć o istnieniu wielu mikroserwisów — komunikuje się tylko z gateway'em, który zajmuje się resztą.
Dlaczego API Gateway jest potrzebny?
Bez API Gateway:
- klient musi znać adresy wszystkich mikroserwisów,
- każdy mikroserwis musi implementować autoryzację osobno,
- trudno zarządzać versioning i breaking changes,
- brak centralnego miejsca do monitorowania i logowania.
Z API Gateway:
- klient komunikuje się tylko z jednym endpointem,
- autoryzacja w jednym miejscu,
- łatwiejsze zarządzanie wersjami API,
- centralne logowanie i monitoring.
Implementacja API Gateway w Pythonie
W Pythonie możesz zbudować API Gateway używając FastAPI, Flask czy Django. FastAPI jest szczególnie dobrym wyborem ze względu na natywną obsługę asynchroniczności i łatwą integrację z innymi serwisami.
Podstawowy API Gateway z FastAPI
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47from fastapi import FastAPI, HTTPException, Header, Request import httpx import asyncio app = FastAPI(title="API Gateway") # Konfiguracja serwisów SERVICES = { "users": "http://user-service:8000", "orders": "http://order-service:8000", "payments": "http://payment-service:8000", "products": "http://product-service:8000", } @app.get("/api/users/{user_id}") async def get_user(user_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"{SERVICES['users']}/users/{user_id}") if response.status_code == 200: return response.json() raise HTTPException(status_code=response.status_code, detail=response.text) @app.get("/api/orders/{order_id}") async def get_order(order_id: int): async with httpx.AsyncClient() as client: # Pobierz zamówienie order_response = await client.get(f"{SERVICES['orders']}/orders/{order_id}") if order_response.status_code != 200: raise HTTPException(status_code=404, detail="Order not found") order = order_response.json() # Agregacja danych z wielu serwisów import asyncio user_task = client.get(f"{SERVICES['users']}/users/{order['user_id']}") product_tasks = [ client.get(f"{SERVICES['products']}/products/{pid}") for pid in order.get('product_ids', []) ] user, *products = await asyncio.gather(user_task, *product_tasks) return { "order": order, "user": user.json() if user.status_code == 200 else None, "products": [p.json() for p in products if p.status_code == 200] }
Routing z regex i wildcards
Możesz użyć bardziej zaawansowanego routingu z FastAPI:
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18from fastapi import FastAPI, Request from starlette.routing import Route, Mount app = FastAPI() def proxy_request(request: Request, service_name: str): """Proxy request do odpowiedniego mikroserwisu""" service_url = SERVICES.get(service_name) if not service_url: raise HTTPException(status_code=404, detail=f"Service {service_name} not found") # Przekieruj request do mikroserwisu return httpx.AsyncClient().request( method=request.method, url=f"{service_url}{request.url.path}", headers=dict(request.headers), params=dict(request.query_params) )
Autoryzacja i uwierzytelnianie w Gateway
Jedną z najważniejszych funkcji API Gateway jest centralna autoryzacja. Dzięki temu logika autoryzacji nie musi być powielana w każdym mikroserwisie.
Middleware autoryzacji
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34from fastapi import FastAPI, HTTPException, Depends, Header from functools import wraps import jwt app = FastAPI() SECRET_KEY = "your-secret-key" async def verify_token(authorization: str = Header(None)): """Weryfikacja tokena JWT""" if not authorization: raise HTTPException(status_code=401, detail="Missing authorization header") try: token = authorization.replace("Bearer ", "") payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") @app.get("/api/users/{user_id}") async def get_user(user_id: int, token_data: dict = Depends(verify_token)): # Token jest już zweryfikowany, przekaż user_id do mikroserwisu user_id_from_token = token_data.get("user_id") async with httpx.AsyncClient() as client: headers = {"X-User-ID": str(user_id_from_token)} response = await client.get( f"{SERVICES['users']}/users/{user_id}", headers=headers ) return response.json()
W ten sposób logika autoryzacji nie musi być powielana w każdym mikroserwisie. Gateway weryfikuje token i przekazuje informacje o użytkowniku do mikroserwisów w nagłówkach.
Rate limiting
Możesz również dodać rate limiting na poziomie gateway:
Python1 2 3 4 5 6 7 8 9 10from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter @app.get("/api/users/{user_id}") @limiter.limit("10/minute") async def get_user(request: Request, user_id: int): ...
Komunikacja między mikroserwisami
API Gateway może działać synchronicznie (REST/gRPC) lub asynchronicznie (kolejki). Wybór metody komunikacji zależy od wymagań: czy potrzebujesz natychmiastowej odpowiedzi, czy operacja może być asynchroniczna.
REST API
Prosty, czytelny i łatwy w debugowaniu. Działa w oparciu o HTTP i JSON. Dobry do systemów o niskiej złożoności.
Zalety:
- Łatwy w implementacji i debugowaniu
- Działa w każdej technologii
- Czytelny format (JSON)
- Dobrze wspierany przez narzędzia
Wady:
- Mniej wydajny niż gRPC
- Większy overhead protokołu
gRPC
Binarny protokół oparty o HTTP/2. Wysoka wydajność, automatyczne generowanie kodu klienta/serwera. Idealny dla komunikacji między serwisami w środowisku o dużej liczbie wywołań.
Przykład użycia gRPC w Gateway:
Python1 2 3 4 5 6 7 8 9 10import grpc from grpc import aio @app.get("/api/users/{user_id}") async def get_user(user_id: int): async with grpc.aio.insecure_channel('user-service:50051') as channel: stub = UserServiceStub(channel) request = UserRequest(user_id=user_id) response = await stub.GetUser(request) return {"id": response.id, "name": response.name}
Message Queues
Kolejki wiadomości (np. RabbitMQ, Kafka) pozwalają na asynchroniczną komunikację. Idealne dla systemów event-driven: jeden mikroserwis wysyła zdarzenie, inne reagują. Umożliwia niezależność czasową między serwisami.
Przykład integracji z RabbitMQ:
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24import pika import json async def publish_event(event_type: str, data: dict): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.queue_declare(queue=event_type) channel.basic_publish( exchange='', routing_key=event_type, body=json.dumps(data) ) connection.close() @app.post("/api/orders") async def create_order(order: OrderCreate): # Zapisz zamówienie order_response = await save_order(order) # Wyślij zdarzenie asynchronicznie await publish_event("order_created", {"order_id": order_response["id"]}) return order_response
Service Discovery
W systemie z wieloma mikroserwisami dynamicznie uruchamianymi w kontenerach (np. Docker, Kubernetes) trudno zarządzać adresami każdego serwisu. Service Discovery rozwiązuje ten problem — pozwala mikroserwisom automatycznie się odnajdywać.
Popularne rozwiązania
Consul – rejestr usług i zdrowia serwisów. Pozwala na dynamiczne rejestrowanie i wykrywanie serwisów:
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16import consul c = consul.Consul() # Rejestracja serwisu c.agent.service.register( 'user-service', service_id='user-service-1', address='user-service', port=8000, check=consul.Check.tcp('user-service', 8000, "10s") ) # Odnalezienie serwisu services = c.health.service('user-service', passing=True)[1] service_address = f"{services[0]['Service']['Address']}:{services[0]['Service']['Port']}"
Kubernetes Service Registry – automatyczny DNS dla mikroserwisów. W Kubernetes każdy serwis ma własny DNS:
Python1 2# W Kubernetes możesz używać nazw serwisów jako hostnames user_service_url = "http://user-service.default.svc.cluster.local:8000"
etcd / Zookeeper – rozproszone przechowywanie metadanych usług.
Dzięki Service Discovery gateway nie musi mieć statycznej listy adresów — może pytać rejestr o aktualne lokalizacje mikroserwisów.
Dynamiczny routing z Service Discovery
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21from fastapi import FastAPI import consul app = FastAPI() c = consul.Consul() def get_service_url(service_name: str) -> str: """Pobierz URL serwisu z Consul""" services = c.health.service(service_name, passing=True)[1] if not services: raise HTTPException(status_code=503, detail=f"Service {service_name} unavailable") service = services[0]['Service'] return f"http://{service['Address']}:{service['Port']}" @app.get("/api/{service_name}/{path:path}") async def proxy_request(service_name: str, path: str): service_url = get_service_url(service_name) async with httpx.AsyncClient() as client: response = await client.get(f"{service_url}/{path}") return response.json()
Deployment w architekturze mikroserwisów
Każdy mikroserwis oraz gateway działa w osobnym kontenerze. Najczęściej stosuje się Docker Compose dla developmentu lub Kubernetes dla produkcji.
Przykład docker-compose.yml
YAML1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71version: "3.9" services: gateway: build: ./gateway ports: - "80:8000" environment: - CONSUL_HOST=consul - REDIS_HOST=redis depends_on: - consul - redis user-service: build: ./user-service ports: - "8001:8000" environment: - DATABASE_URL=postgresql://user:pass@user-db:5432/users depends_on: - user-db order-service: build: ./order-service ports: - "8002:8000" environment: - DATABASE_URL=postgresql://user:pass@order-db:5432/orders - USER_SERVICE_URL=http://user-service:8000 depends_on: - order-db - user-service payment-service: build: ./payment-service ports: - "8003:8000" depends_on: - payment-db consul: image: consul:latest ports: - "8500:8500" command: consul agent -dev -client=0.0.0.0 redis: image: redis:7 user-db: image: postgres:15 environment: POSTGRES_DB: users POSTGRES_USER: user POSTGRES_PASSWORD: pass volumes: - user_data:/var/lib/postgresql/data order-db: image: postgres:15 environment: POSTGRES_DB: orders POSTGRES_USER: user POSTGRES_PASSWORD: pass volumes: - order_data:/var/lib/postgresql/data volumes: user_data: order_data:
Gateway nasłuchuje na porcie 80 i przekierowuje ruch do odpowiednich mikroserwisów. Komunikacja między kontenerami odbywa się po nazwach hostów w sieci Dockera.
Kubernetes deployment
Dla produkcji warto użyć Kubernetes z Service objects:
YAML1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31apiVersion: apps/v1 kind: Deployment metadata: name: api-gateway spec: replicas: 2 selector: matchLabels: app: api-gateway template: metadata: labels: app: api-gateway spec: containers: - name: gateway image: api-gateway:latest ports: - containerPort: 8000 --- apiVersion: v1 kind: Service metadata: name: api-gateway spec: selector: app: api-gateway ports: - port: 80 targetPort: 8000 type: LoadBalancer
Monitoring i observability
W architekturze mikroserwisów trudno diagnozować błędy, dlatego potrzebne są narzędzia do zbierania logów i metryk. Gateway powinien być pierwszym miejscem, które zbiera dane o błędach, czasie odpowiedzi i obciążeniu mikroserwisów.
Polecane narzędzia
Prometheus + Grafana – metryki wydajności:
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26from prometheus_client import Counter, Histogram, generate_latest from fastapi import FastAPI, Response, Request import time request_count = Counter('gateway_requests_total', 'Total requests', ['service', 'method', 'status']) request_duration = Histogram('gateway_request_duration_seconds', 'Request duration', ['service']) @app.middleware("http") async def track_metrics(request: Request, call_next): start_time = time.time() response = await call_next(request) duration = time.time() - start_time service_name = request.url.path.split('/')[2]# /api/{service}/... request_count.labels( service=service_name, method=request.method, status=response.status_code ).inc() request_duration.labels(service=service_name).observe(duration) return response @app.get("/metrics") def metrics(): return Response(content=generate_latest(), media_type="text/plain")
ELK Stack (Elasticsearch + Logstash + Kibana) – scentralizowane logi:
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24import logging import json from pythonjsonlogger import jsonlogger # Konfiguracja logowania do ELK log_handler = logging.StreamHandler() formatter = jsonlogger.JsonFormatter() log_handler.setFormatter(formatter) logger = logging.getLogger() logger.addHandler(log_handler) @app.middleware("http") async def log_requests(request: Request, call_next): logger.info("request", extra={ "method": request.method, "path": request.url.path, "ip": request.client.host }) response = await call_next(request) logger.info("response", extra={ "status_code": response.status_code, "path": request.url.path }) return response
Jaeger / OpenTelemetry – śledzenie przepływu requestów między serwisami (distributed tracing):
Python1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19from opentelemetry import trace from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.exporter.jaeger import JaegerExporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor trace.set_tracer_provider(TracerProvider()) tracer = trace.get_tracer(__name__) jaeger_exporter = JaegerExporter( agent_host_name="jaeger", agent_port=6831, ) trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(jaeger_exporter) ) FastAPIInstrumentor.instrument_app(app)
Distributed tracing pozwala śledzić request przez wszystkie serwisy, co jest kluczowe w debugowaniu problemów w rozproszonym systemie.
Zalety i wyzwania
Architektura z API Gateway ma swoje zalety i wyzwania. Ważne, aby rozumieć obie strony przed implementacją.
Zalety
- Jedno miejsce dla logiki autoryzacji i routingu — łatwiejsze utrzymanie i aktualizacja.
- Lepsze bezpieczeństwo i kontrola ruchu — centralne miejsce do implementacji zabezpieczeń.
- Możliwość wprowadzania cache i limitów żądań — optymalizacja wydajności na poziomie gateway.
- Skalowalność — gateway można replikować niezależnie od mikroserwisów.
- Uproszczenie interfejsu klienta — klient nie musi znać struktury mikroserwisów.
- Versioning API — łatwiejsze zarządzanie wersjami (np.
/v1/,/v2/).
Wyzwania
- Dodatkowa warstwa infrastruktury — może być single point of failure (rozwiązanie: replikacja gateway).
- Konieczność wdrożenia monitoringu i health-checków — gateway musi monitorować stan mikroserwisów.
- Potrzeba automatycznego wykrywania usług (service discovery) — bez tego trudno zarządzać dynamicznymi serwisami.
- Potencjalne wąskie gardło — jeśli gateway nie jest skalowany, może stać się bottleneckiem.
- Zwiększona złożoność — więcej komponentów do zarządzania i monitorowania.
Rozwiązanie problemu single point of failure
Aby uniknąć single point of failure, replikuj gateway:
YAML1 2 3 4 5 6 7# Kubernetes - wiele replik gateway z load balancerem apiVersion: apps/v1 kind: Deployment metadata: name: api-gateway spec: replicas: 3# Trzy instancje gateway
Podsumowanie
API Gateway to kluczowy komponent każdej architektury mikroserwisowej. Zarządza ruchem, bezpieczeństwem, autoryzacją i integracją serwisów, jednocześnie ukrywając złożoność systemu przed użytkownikiem końcowym.
Python — dzięki bibliotekom takim jak FastAPI, httpx, aiohttp — pozwala stworzyć nowoczesny gateway równie wydajny jak rozwiązania w Node.js czy Go. Asynchroniczność FastAPI sprawia, że gateway może obsługiwać tysiące równoczesnych połączeń.
Jeśli planujesz architekturę mikroserwisową:
- Zdefiniuj odpowiedzialność każdego serwisu — jasno określ granice domenowe.
- Wprowadź API Gateway jako punkt wejścia — zacznij od prostego routingu, stopniowo dodawaj funkcjonalności.
- Zadbaj o monitoring, service discovery i bezpieczeństwo — to kluczowe elementy, bez których system będzie trudny w utrzymaniu.
- Replikuj gateway — unikaj single point of failure.
- Mierz wydajność — monitoruj czasy odpowiedzi, błędy i obciążenie każdego serwisu.
Pamiętaj, że API Gateway to narzędzie — użyj go tam, gdzie ma sens. Dla małych systemów może być niepotrzebną komplikacją, ale dla większych, rozproszonych systemów jest praktycznie niezbędny.



