Mikroserwisy w Pythonie – architektura i integracja

Kacper Sieradziński
Kacper Sieradziński
22 maja 2025Edukacja6 min czytania

Architektura mikroserwisów to sposób budowania aplikacji jako zestawu małych, niezależnych serwisów, z których każdy odpowiada za określoną część logiki biznesowej. W przeciwieństwie do klasycznych monolitów, mikroserwisy są luźno powiązane, mają własne bazy danych, mogą być wdrażane niezależnie i rozwijane w różnych technologiach. Python, dzięki frameworkom takim jak FastAPI, Flask czy Django REST Framework, jest jednym z najczęściej wybieranych języków do implementacji mikroserwisów. W tym przewodniku poznasz, jak projektować i implementować mikroserwisy w Pythonie, jak organizować komunikację między serwisami oraz jak zarządzać ich wdrożeniem i monitoringiem.

Obraz główny Mikroserwisy w Pythonie – architektura i integracja

Czym są mikroserwisy?

W klasycznym podejściu monolitycznym wszystko — logika biznesowa, API, baza danych, UI — działa w jednym projekcie. Mikroserwisy dzielą ten system na mniejsze, samodzielne aplikacje, które komunikują się przez sieć.

Charakterystyka mikroserwisów

Każdy mikroserwis:

  • ma własny zakres odpowiedzialności (np. użytkownicy, płatności, powiadomienia),
  • może być wdrażany i aktualizowany niezależnie,
  • ma własną bazę danych i logikę biznesową,
  • komunikuje się przez API lub kolejki komunikatów.

Przykład: aplikacja e-commerce może składać się z mikroserwisów:

  • user-service — zarządzanie użytkownikami i autoryzacją
  • product-service — katalog produktów
  • order-service — procesowanie zamówień
  • payment-service — obsługa płatności
  • notification-service — wysyłanie e-maili i powiadomień

Mikroserwisy vs monolit

AspektMonolitMikroserwisy
WdrożenieCałość razemNiezależne serwisy
SkalowanieSkaluj całośćSkaluj pojedyncze serwisy
TechnologiaJedna technologiaRóżne technologie możliwe
ZłożonośćNiższa na początkuWyższa infrastruktura
NiezawodnośćAwaria = cała aplikacjaIzolacja błędów

Mikroserwisy w Pythonie – frameworki

Python ma kilka narzędzi do budowy mikroserwisów, każdy z innymi zaletami:

FrameworkZastosowanieZalety
FastAPIREST/gRPC API, wysokowydajne serwisySzybkość, typowanie, async/await, automatyczna dokumentacja
FlaskProste serwisy lub gatewayeMinimalizm i elastyczność, łatwa konfiguracja
Django + DRFDuże aplikacje APIORM, autoryzacja, gotowe komponenty, ecosystem

Przykład mikroserwisu z FastAPI

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import List app = FastAPI(title="User Service", version="1.0.0") class User(BaseModel): id: int name: str email: str class UserCreate(BaseModel): name: str email: str # Symulowana baza danych users_db = [] @app.get("/users/{user_id}", response_model=User) def get_user(user_id: int): user = next((u for u in users_db if u["id"] == user_id), None) if not user: raise HTTPException(status_code=404, detail="User not found") return user @app.post("/users", response_model=User) def create_user(user: UserCreate): new_user = { "id": len(users_db) + 1, "name": user.name, "email": user.email } users_db.append(new_user) return new_user @app.get("/users", response_model=List[User]) def list_users(): return users_db

Ten kod może działać jako osobny serwis np. user-service, obsługiwany przez własny kontener Dockera. FastAPI automatycznie generuje dokumentację OpenAPI pod /docs.

Przykład mikroserwisu z Flask

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from flask import Flask, jsonify, request from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/users' db = SQLAlchemy(app) class User(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100)) email = db.Column(db.String(100), unique=True) @app.route('/users/<int:user_id>', methods=['GET']) def get_user(user_id): user = User.query.get_or_404(user_id) return jsonify({'id': user.id, 'name': user.name, 'email': user.email})

Komunikacja między mikroserwisami

Mikroserwisy komunikują się na kilka sposobów, w zależności od charakteru projektu. Wybór metody komunikacji zależy od wymagań: czy komunikacja jest synchroniczna czy asynchroniczna, jak często komunikują się serwisy i jakie są wymagania dotyczące wydajności.

1. REST API

Najprostszy i najczęstszy sposób — komunikacja przez HTTP. Zalety:

  • łatwy w implementacji,
  • działa w każdej technologii,
  • dobry dla synchronicznych zapytań (np. pobranie danych użytkownika).

Przykład wywołania REST API między serwisami:

Python
1 2 3 4 5 6 7 8 import httpx async def get_user_from_service(user_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"http://user-service:8000/users/{user_id}") if response.status_code == 200: return response.json() return None

2. gRPC

Nowoczesny protokół RPC oparty na HTTP/2 i protobuf. Bardziej wydajny i binarny niż REST. Idealny dla mikroserwisów wymagających dużej liczby połączeń o niskim opóźnieniu.

Instalacja:

Bash
1 pip install grpcio grpcio-tools

Przykład definicji .proto:

PROTOBUF
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 syntax = "proto3"; service UserService { rpc GetUser (UserRequest) returns (UserResponse); } message UserRequest { int32 user_id = 1; } message UserResponse { int32 id = 1; string name = 2; string email = 3; }

Implementacja w FastAPI:

Python
1 2 3 4 5 6 7 8 9 10 11 12 from fastapi import FastAPI import grpc app = FastAPI() @app.get("/users/{user_id}") async def get_user(user_id: int): # Wywołanie gRPC with grpc.insecure_channel('user-service:50051') as channel: stub = UserServiceStub(channel) response = stub.GetUser(UserRequest(user_id=user_id)) return {"id": response.id, "name": response.name}

3. Kolejki wiadomości (Message Queues)

Asynchroniczna komunikacja między serwisami przez systemy takie jak RabbitMQ, Apache Kafka czy Redis Streams. Idealne dla operacji, które nie wymagają natychmiastowej odpowiedzi.

Przykład: serwis zamówień wysyła wiadomość "order_created", którą odbiera serwis płatności.

Instalacja:

Bash
1 2 3 pip install pika# dla RabbitMQ # lub pip install kafka-python# dla Kafka

Przykład z RabbitMQ (producent):

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import pika import json connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.queue_declare(queue="orders") message = {"order_id": 123, "user_id": 456, "status": "created"} channel.basic_publish( exchange="", routing_key="orders", body=json.dumps(message) ) connection.close()

Konsument (payment-service):

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import pika import json def process_order(ch, method, properties, body): order = json.loads(body) print(f"Processing payment for order {order['order_id']}") # Logika płatności ch.basic_ack(delivery_tag=method.delivery_tag) connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.queue_declare(queue="orders") channel.basic_consume(queue="orders", on_message_callback=process_order) channel.start_consuming()

Przykład z Redis Streams:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import redis import json redis_client = redis.Redis(host='localhost', port=6379, db=0) # Producent redis_client.xadd('orders', {'order_id': 123, 'status': 'created'}) # Konsument while True: messages = redis_client.xread({'orders': '$'}, block=1000) for stream, msgs in messages: for msg_id, data in msgs: order = json.loads(data[b'order_id']) # Przetwórz zamówienie

Bazy danych w mikroserwisach

Każdy mikroserwis powinien mieć własną bazę danych. Współdzielenie jednej bazy między serwisami to najczęstszy błąd przy wdrażaniu tej architektury — prowadzi do tight coupling i utraty niezależności serwisów.

Database per Service

Najczęstsze podejścia:

  • Serwis użytkowników – PostgreSQL (relacyjna baza, ACID)
  • Serwis płatności – PostgreSQL lub MySQL (transakcje, spójność)
  • Serwis analizy – MongoDB lub ClickHouse (duże wolumeny danych, analityka)
  • Serwis cache – Redis (szybki dostęp do danych)

Komunikacja między serwisami odbywa się na poziomie API, nie bazy. Jeśli jeden serwis potrzebuje danych z drugiego, wykonuje wywołanie API, a nie bezpośrednie zapytanie SQL.

Przykład izolacji danych

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # user-service - własna baza PostgreSQL # order-service - własna baza PostgreSQL (nie może bezpośrednio odczytać danych z user-service) # order-service potrzebuje danych użytkownika import httpx async def get_order_with_user(order_id: int): # Pobierz zamówienie z własnej bazy order = await Order.query.get(order_id) # Pobierz dane użytkownika przez API async with httpx.AsyncClient() as client: user_response = await client.get(f"http://user-service/users/{order.user_id}") user = user_response.json() return {"order": order, "user": user}

API Gateway – centralny punkt dostępu

W systemach z wieloma mikroserwisami użytkownik nie powinien komunikować się z nimi bezpośrednio. Dlatego wprowadza się API Gateway, który:

  • kieruje ruch do właściwego mikroserwisu,
  • agreguje odpowiedzi z wielu serwisów,
  • zarządza autoryzacją i limitami,
  • obsługuje load balancing,
  • agreguje metryki i logi.

API Gateway w Pythonie

Przykład z FastAPI:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from fastapi import FastAPI, HTTPException import httpx app = FastAPI() SERVICES = { "users": "http://user-service:8000", "orders": "http://order-service:8000", "payments": "http://payment-service:8000", } @app.get("/api/users/{user_id}") async def get_user(user_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"{SERVICES['users']}/users/{user_id}") if response.status_code == 200: return response.json() raise HTTPException(status_code=response.status_code) @app.get("/api/orders/{order_id}") async def get_order(order_id: int): async with httpx.AsyncClient() as client: # Agregacja danych z wielu serwisów order_response = await client.get(f"{SERVICES['orders']}/orders/{order_id}") if order_response.status_code != 200: raise HTTPException(status_code=404) order = order_response.json() user_response = await client.get(f"{SERVICES['users']}/users/{order['user_id']}") user = user_response.json() if user_response.status_code == 200 else None return {"order": order, "user": user}

W warstwie infrastruktury możesz użyć Nginx + Traefik lub specjalistycznych rozwiązań jak Kong, AWS API Gateway czy Google Cloud Endpoints.

Deployment i skalowanie

Każdy mikroserwis powinien być wdrażany osobno. Standardem są kontenery Dockera i orkiestracja w Kubernetesie. Docker zapewnia izolację i spójność środowisk, a Kubernetes automatyczne skalowanie i zarządzanie kontenerami.

Przykładowy Dockerfile

Dockerfile
1 2 3 4 5 6 7 8 9 10 FROM python:3.12-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose (dla kilku mikroserwisów)

YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 version: "3.9" services: user-service: build: ./user-service ports: - "8001:8000" environment: - DATABASE_URL=postgresql://user:pass@user-db:5432/users depends_on: - user-db - redis order-service: build: ./order-service ports: - "8002:8000" environment: - DATABASE_URL=postgresql://user:pass@order-db:5432/orders - USER_SERVICE_URL=http://user-service:8000 depends_on: - order-db - user-service payment-service: build: ./payment-service ports: - "8003:8000" depends_on: - payment-db gateway: build: ./gateway ports: - "80:8000" depends_on: - user-service - order-service - payment-service user-db: image: postgres:15 environment: POSTGRES_DB: users POSTGRES_USER: user POSTGRES_PASSWORD: pass volumes: - user_data:/var/lib/postgresql/data order-db: image: postgres:15 environment: POSTGRES_DB: orders POSTGRES_USER: user POSTGRES_PASSWORD: pass volumes: - order_data:/var/lib/postgresql/data redis: image: redis:7 rabbitmq: image: rabbitmq:3-management ports: - "5672:5672" - "15672:15672" volumes: user_data: order_data:

Każdy serwis działa w osobnym kontenerze, a komunikacja odbywa się po nazwach hostów w sieci Dockera (np. http://user-service:8000).

Kubernetes deployment

Dla produkcji warto użyć Kubernetes do automatycznego skalowania:

YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 apiVersion: apps/v1 kind: Deployment metadata: name: user-service spec: replicas: 3 selector: matchLabels: app: user-service template: metadata: labels: app: user-service spec: containers: - name: user-service image: user-service:latest ports: - containerPort: 8000 env: - name: DATABASE_URL valueFrom: secretKeyRef: name: db-secret key: url --- apiVersion: v1 kind: Service metadata: name: user-service spec: selector: app: user-service ports: - port: 80 targetPort: 8000

Monitoring i logowanie

Przy wielu mikroserwisach kluczowe jest centralne logowanie i obserwowalność. Bez odpowiedniego monitoringu trudno zdiagnozować problemy w rozproszonym systemie.

Narzędzia monitoringu

Prometheus + Grafana – metryki wydajności:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # Eksport metryk z FastAPI from prometheus_client import Counter, Histogram, generate_latest from fastapi import FastAPI, Response request_count = Counter('http_requests_total', 'Total requests') request_duration = Histogram('http_request_duration_seconds', 'Request duration') app = FastAPI() @app.middleware("http") async def track_metrics(request, call_next): start_time = time.time() response = await call_next(request) duration = time.time() - start_time request_count.inc() request_duration.observe(duration) return response @app.get("/metrics") def metrics(): return Response(content=generate_latest(), media_type="text/plain")

Elastic Stack (ELK) – logi z wielu mikroserwisów w jednym miejscu:

  • Elasticsearch – przechowywanie i wyszukiwanie logów
  • Logstash – zbieranie i przetwarzanie logów
  • Kibana – wizualizacja logów

Sentry – błędy i wyjątki w czasie rzeczywistym (automatycznie raportuje błędy z każdego serwisu).

OpenTelemetry – śledzenie requestów między serwisami (tracing):

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor trace.set_tracer_provider(TracerProvider()) tracer = trace.get_tracer(__name__) @app.get("/users/{user_id}") async def get_user(user_id: int): with tracer.start_as_current_span("get_user"): # Wywołanie innego serwisu z automatycznym tracingiem async with httpx.AsyncClient() as client: response = await client.get(f"http://order-service/orders?user_id={user_id}") return response.json()

Dzięki temu wiesz dokładnie, który mikroserwis zawiódł i dlaczego. Distributed tracing pozwala śledzić request przez wszystkie serwisy, co jest kluczowe w debugowaniu problemów.

Zalety i wady mikroserwisów

Architektura mikroserwisów ma swoje zalety i wyzwania. Ważne, aby rozumieć obie strony przed podjęciem decyzji o przejściu na tę architekturę.

Zalety

  • Niezależne wdrażanie i rozwój — każdy serwis może być rozwijany przez inny zespół i wdrażany niezależnie.
  • Łatwiejsze skalowanie — możesz skalować tylko te serwisy, które tego wymagają (np. user-service ma większy ruch, więc skaluj tylko go).
  • Elastyczność technologiczna — każdy serwis może używać innej technologii (np. Python dla API, Node.js dla real-time, Go dla wydajności).
  • Większa odporność na błędy — awaria jednego serwisu nie zatrzymuje całości (o ile odpowiednio zaprojektujesz fallbacki).
  • Zespoły mogą pracować niezależnie — mniejsze konflikty w kodzie i szybszy development.

Wyzwania

  • Większa złożoność infrastruktury — potrzeba więcej narzędzi: orchestracja, service discovery, monitoring, logowanie.
  • Trudniejsze debugowanie i testowanie — request może przechodzić przez wiele serwisów, trudniej zreprodukować błąd.
  • Konieczność automatyzacji deploymentu (CI/CD) — ręczne wdrażanie wielu serwisów jest niepraktyczne.
  • Potrzeba monitoringu rozproszonego — trzeba śledzić wiele serwisów jednocześnie.
  • Komunikacja sieciowa — opóźnienia sieci, konieczność obsługi błędów komunikacji.
  • Zarządzanie transakcjami rozproszonymi — trudniejsze niż w monolicie (trzeba użyć patternów jak Saga).

Podsumowanie

Architektura mikroserwisów to potężne, ale wymagające podejście. Python doskonale się w nim sprawdza dzięki swojej prostocie, bogatym bibliotekom i elastyczności frameworków. Mikroserwisy nie są rozwiązaniem dla każdego projektu — dla małych aplikacji monolit może być lepszym wyborem.

Jeśli zaczynasz z mikroserwisami:

  1. Zacznij od podziału monolitu na logiczne moduły — nie dziel na mikroserwisy zbyt wcześnie, najpierw zidentyfikuj granice domenowe.
  2. Zadbaj o komunikację (REST, gRPC, RabbitMQ) — wybierz odpowiednią metodę dla każdego przypadku użycia.
  3. Wprowadź automatyczne wdrożenia i monitoring — CI/CD i obserwowalność są kluczowe.
  4. Nie komplikuj — mikroserwisy mają rozwiązywać problemy, nie je tworzyć. Zacznij od prostych rozwiązań i ewoluuj w miarę potrzeb.

Pamiętaj: mikroserwisy to narzędzie, a nie cel sam w sobie. Jeśli monolit spełnia Twoje potrzeby, nie ma powodu go dzielić. Mikroserwisy mają sens, gdy masz problemy ze skalowaniem, potrzebujesz niezależnych zespołów lub różne części aplikacji mają różne wymagania techniczne.