Cykl: Flask vs FastAPI: Porównanie frameworków webowych w Pythonie · Część 19/26

Mikroserwisy w Pythonie – architektura i integracja

Kacper Sieradziński
Kacper Sieradziński22 maja 2025 · 6 min czytania
Streszczenie
  • Czym są mikroserwisy?
  • Charakterystyka mikroserwisów
  • Mikroserwisy vs monolit
  • Mikroserwisy w Pythonie – frameworki
Mikroserwisy w Pythonie – architektura i integracja

Architektura mikroserwisów to sposób budowania aplikacji jako zestawu małych, niezależnych serwisów, z których każdy odpowiada za określoną część logiki biznesowej. W przeciwieństwie do klasycznych monolitów, mikroserwisy są luźno powiązane, mają własne bazy danych, mogą być wdrażane niezależnie i rozwijane w różnych technologiach. Python, dzięki frameworkom takim jak FastAPI, Flask czy Django REST Framework, jest jednym z najczęściej wybieranych języków do implementacji mikroserwisów. W tym przewodniku poznasz, jak projektować i implementować mikroserwisy w Pythonie, jak organizować komunikację między serwisami oraz jak zarządzać ich wdrożeniem i monitoringiem.

Czym są mikroserwisy?

W klasycznym podejściu monolitycznym wszystko — logika biznesowa, API, baza danych, UI — działa w jednym projekcie. Mikroserwisy dzielą ten system na mniejsze, samodzielne aplikacje, które komunikują się przez sieć.

Charakterystyka mikroserwisów

Każdy mikroserwis:

  • ma własny zakres odpowiedzialności (np. użytkownicy, płatności, powiadomienia),
  • może być wdrażany i aktualizowany niezależnie,
  • ma własną bazę danych i logikę biznesową,
  • komunikuje się przez API lub kolejki komunikatów.

Przykład: aplikacja e-commerce może składać się z mikroserwisów:

  • user-service — zarządzanie użytkownikami i autoryzacją
  • product-service — katalog produktów
  • order-service — procesowanie zamówień
  • payment-service — obsługa płatności
  • notification-service — wysyłanie e-maili i powiadomień

Mikroserwisy vs monolit

AspektMonolitMikroserwisy
WdrożenieCałość razemNiezależne serwisy
SkalowanieSkaluj całośćSkaluj pojedyncze serwisy
TechnologiaJedna technologiaRóżne technologie możliwe
ZłożonośćNiższa na początkuWyższa infrastruktura
NiezawodnośćAwaria = cała aplikacjaIzolacja błędów

Mikroserwisy w Pythonie – frameworki

Python ma kilka narzędzi do budowy mikroserwisów, każdy z innymi zaletami:

FrameworkZastosowanieZalety
FastAPIREST/gRPC API, wysokowydajne serwisySzybkość, typowanie, async/await, automatyczna dokumentacja
FlaskProste serwisy lub gatewayeMinimalizm i elastyczność, łatwa konfiguracja
Django + DRFDuże aplikacje APIORM, autoryzacja, gotowe komponenty, ecosystem

Przykład mikroserwisu z FastAPI

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import List app = FastAPI(title="User Service", version="1.0.0") class User(BaseModel): id: int name: str email: str class UserCreate(BaseModel): name: str email: str # Symulowana baza danych users_db = [] @app.get("/users/{user_id}", response_model=User) def get_user(user_id: int): user = next((u for u in users_db if u["id"] == user_id), None) if not user: raise HTTPException(status_code=404, detail="User not found") return user @app.post("/users", response_model=User) def create_user(user: UserCreate): new_user = { "id": len(users_db) + 1, "name": user.name, "email": user.email } users_db.append(new_user) return new_user @app.get("/users", response_model=List[User]) def list_users(): return users_db

Ten kod może działać jako osobny serwis np. user-service, obsługiwany przez własny kontener Dockera. FastAPI automatycznie generuje dokumentację OpenAPI pod /docs.

Przykład mikroserwisu z Flask

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from flask import Flask, jsonify, request from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/users' db = SQLAlchemy(app) class User(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100)) email = db.Column(db.String(100), unique=True) @app.route('/users/<int:user_id>', methods=['GET']) def get_user(user_id): user = User.query.get_or_404(user_id) return jsonify({'id': user.id, 'name': user.name, 'email': user.email})

Komunikacja między mikroserwisami

Mikroserwisy komunikują się na kilka sposobów, w zależności od charakteru projektu. Wybór metody komunikacji zależy od wymagań: czy komunikacja jest synchroniczna czy asynchroniczna, jak często komunikują się serwisy i jakie są wymagania dotyczące wydajności.

1. REST API

Najprostszy i najczęstszy sposób — komunikacja przez HTTP. Zalety:

  • łatwy w implementacji,
  • działa w każdej technologii,
  • dobry dla synchronicznych zapytań (np. pobranie danych użytkownika).

Przykład wywołania REST API między serwisami:

Python
1 2 3 4 5 6 7 8 import httpx async def get_user_from_service(user_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"http://user-service:8000/users/{user_id}") if response.status_code == 200: return response.json() return None

2. gRPC

Nowoczesny protokół RPC oparty na HTTP/2 i protobuf. Bardziej wydajny i binarny niż REST. Idealny dla mikroserwisów wymagających dużej liczby połączeń o niskim opóźnieniu.

Instalacja:

Bash
1 pip install grpcio grpcio-tools

Przykład definicji .proto:

PROTOBUF
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 syntax = "proto3"; service UserService { rpc GetUser (UserRequest) returns (UserResponse); } message UserRequest { int32 user_id = 1; } message UserResponse { int32 id = 1; string name = 2; string email = 3; }

Implementacja w FastAPI:

Python
1 2 3 4 5 6 7 8 9 10 11 12 from fastapi import FastAPI import grpc app = FastAPI() @app.get("/users/{user_id}") async def get_user(user_id: int): # Wywołanie gRPC with grpc.insecure_channel('user-service:50051') as channel: stub = UserServiceStub(channel) response = stub.GetUser(UserRequest(user_id=user_id)) return {"id": response.id, "name": response.name}

3. Kolejki wiadomości (Message Queues)

Asynchroniczna komunikacja między serwisami przez systemy takie jak RabbitMQ, Apache Kafka czy Redis Streams. Idealne dla operacji, które nie wymagają natychmiastowej odpowiedzi.

Przykład: serwis zamówień wysyła wiadomość "order_created", którą odbiera serwis płatności.

Instalacja:

Bash
1 2 3 pip install pika# dla RabbitMQ # lub pip install kafka-python# dla Kafka

Przykład z RabbitMQ (producent):

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import pika import json connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.queue_declare(queue="orders") message = {"order_id": 123, "user_id": 456, "status": "created"} channel.basic_publish( exchange="", routing_key="orders", body=json.dumps(message) ) connection.close()

Konsument (payment-service):

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import pika import json def process_order(ch, method, properties, body): order = json.loads(body) print(f"Processing payment for order {order['order_id']}") # Logika płatności ch.basic_ack(delivery_tag=method.delivery_tag) connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.queue_declare(queue="orders") channel.basic_consume(queue="orders", on_message_callback=process_order) channel.start_consuming()

Przykład z Redis Streams:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import redis import json redis_client = redis.Redis(host='localhost', port=6379, db=0) # Producent redis_client.xadd('orders', {'order_id': 123, 'status': 'created'}) # Konsument while True: messages = redis_client.xread({'orders': '$'}, block=1000) for stream, msgs in messages: for msg_id, data in msgs: order = json.loads(data[b'order_id']) # Przetwórz zamówienie

Bazy danych w mikroserwisach

Newsletter · co środę

Python co tydzień — newsletter dla programistów

Otrzymuj codzienne ćwiczenia, ciekawostki z ekosystemu Pythona i wskazówki do rozmów rekrutacyjnych.

2 312 czytelników · ⭐ 4,8

Każdy mikroserwis powinien mieć własną bazę danych. Współdzielenie jednej bazy między serwisami to najczęstszy błąd przy wdrażaniu tej architektury — prowadzi do tight coupling i utraty niezależności serwisów.

Database per Service

Najczęstsze podejścia:

  • Serwis użytkowników – PostgreSQL (relacyjna baza, ACID)
  • Serwis płatności – PostgreSQL lub MySQL (transakcje, spójność)
  • Serwis analizy – MongoDB lub ClickHouse (duże wolumeny danych, analityka)
  • Serwis cache – Redis (szybki dostęp do danych)

Komunikacja między serwisami odbywa się na poziomie API, nie bazy. Jeśli jeden serwis potrzebuje danych z drugiego, wykonuje wywołanie API, a nie bezpośrednie zapytanie SQL.

Przykład izolacji danych

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # user-service - własna baza PostgreSQL # order-service - własna baza PostgreSQL (nie może bezpośrednio odczytać danych z user-service) # order-service potrzebuje danych użytkownika import httpx async def get_order_with_user(order_id: int): # Pobierz zamówienie z własnej bazy order = await Order.query.get(order_id) # Pobierz dane użytkownika przez API async with httpx.AsyncClient() as client: user_response = await client.get(f"http://user-service/users/{order.user_id}") user = user_response.json() return {"order": order, "user": user}

API Gateway – centralny punkt dostępu

W systemach z wieloma mikroserwisami użytkownik nie powinien komunikować się z nimi bezpośrednio. Dlatego wprowadza się API Gateway, który:

  • kieruje ruch do właściwego mikroserwisu,
  • agreguje odpowiedzi z wielu serwisów,
  • zarządza autoryzacją i limitami,
  • obsługuje load balancing,
  • agreguje metryki i logi.

API Gateway w Pythonie

Przykład z FastAPI:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from fastapi import FastAPI, HTTPException import httpx app = FastAPI() SERVICES = { "users": "http://user-service:8000", "orders": "http://order-service:8000", "payments": "http://payment-service:8000", } @app.get("/api/users/{user_id}") async def get_user(user_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"{SERVICES['users']}/users/{user_id}") if response.status_code == 200: return response.json() raise HTTPException(status_code=response.status_code) @app.get("/api/orders/{order_id}") async def get_order(order_id: int): async with httpx.AsyncClient() as client: # Agregacja danych z wielu serwisów order_response = await client.get(f"{SERVICES['orders']}/orders/{order_id}") if order_response.status_code != 200: raise HTTPException(status_code=404) order = order_response.json() user_response = await client.get(f"{SERVICES['users']}/users/{order['user_id']}") user = user_response.json() if user_response.status_code == 200 else None return {"order": order, "user": user}

W warstwie infrastruktury możesz użyć Nginx + Traefik lub specjalistycznych rozwiązań jak Kong, AWS API Gateway czy Google Cloud Endpoints.

Deployment i skalowanie

Każdy mikroserwis powinien być wdrażany osobno. Standardem są kontenery Dockera i orkiestracja w Kubernetesie. Docker zapewnia izolację i spójność środowisk, a Kubernetes automatyczne skalowanie i zarządzanie kontenerami.

Przykładowy Dockerfile

Dockerfile
1 2 3 4 5 6 7 8 9 10 FROM python:3.12-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose (dla kilku mikroserwisów)

YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 version: "3.9" services: user-service: build: ./user-service ports: - "8001:8000" environment: - DATABASE_URL=postgresql://user:pass@user-db:5432/users depends_on: - user-db - redis order-service: build: ./order-service ports: - "8002:8000" environment: - DATABASE_URL=postgresql://user:pass@order-db:5432/orders - USER_SERVICE_URL=http://user-service:8000 depends_on: - order-db - user-service payment-service: build: ./payment-service ports: - "8003:8000" depends_on: - payment-db gateway: build: ./gateway ports: - "80:8000" depends_on: - user-service - order-service - payment-service user-db: image: postgres:15 environment: POSTGRES_DB: users POSTGRES_USER: user POSTGRES_PASSWORD: pass volumes: - user_data:/var/lib/postgresql/data order-db: image: postgres:15 environment: POSTGRES_DB: orders POSTGRES_USER: user POSTGRES_PASSWORD: pass volumes: - order_data:/var/lib/postgresql/data redis: image: redis:7 rabbitmq: image: rabbitmq:3-management ports: - "5672:5672" - "15672:15672" volumes: user_data: order_data:

Każdy serwis działa w osobnym kontenerze, a komunikacja odbywa się po nazwach hostów w sieci Dockera (np. http://user-service:8000).

Kubernetes deployment

Dla produkcji warto użyć Kubernetes do automatycznego skalowania:

YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 apiVersion: apps/v1 kind: Deployment metadata: name: user-service spec: replicas: 3 selector: matchLabels: app: user-service template: metadata: labels: app: user-service spec: containers: - name: user-service image: user-service:latest ports: - containerPort: 8000 env: - name: DATABASE_URL valueFrom: secretKeyRef: name: db-secret key: url apiVersion: v1 kind: Service metadata: name: user-service spec: selector: app: user-service ports: - port: 80 targetPort: 8000

Monitoring i logowanie

Przy wielu mikroserwisach kluczowe jest centralne logowanie i obserwowalność. Bez odpowiedniego monitoringu trudno zdiagnozować problemy w rozproszonym systemie.

Narzędzia monitoringu

Prometheus + Grafana – metryki wydajności:

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # Eksport metryk z FastAPI from prometheus_client import Counter, Histogram, generate_latest from fastapi import FastAPI, Response request_count = Counter('http_requests_total', 'Total requests') request_duration = Histogram('http_request_duration_seconds', 'Request duration') app = FastAPI() @app.middleware("http") async def track_metrics(request, call_next): start_time = time.time() response = await call_next(request) duration = time.time() - start_time request_count.inc() request_duration.observe(duration) return response @app.get("/metrics") def metrics(): return Response(content=generate_latest(), media_type="text/plain")

Elastic Stack (ELK) – logi z wielu mikroserwisów w jednym miejscu:

  • Elasticsearch – przechowywanie i wyszukiwanie logów
  • Logstash – zbieranie i przetwarzanie logów
  • Kibana – wizualizacja logów

Sentry – błędy i wyjątki w czasie rzeczywistym (automatycznie raportuje błędy z każdego serwisu).

OpenTelemetry – śledzenie requestów między serwisami (tracing):

Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor trace.set_tracer_provider(TracerProvider()) tracer = trace.get_tracer(__name__) @app.get("/users/{user_id}") async def get_user(user_id: int): with tracer.start_as_current_span("get_user"): # Wywołanie innego serwisu z automatycznym tracingiem async with httpx.AsyncClient() as client: response = await client.get(f"http://order-service/orders?user_id={user_id}") return response.json()

Dzięki temu wiesz dokładnie, który mikroserwis zawiódł i dlaczego. Distributed tracing pozwala śledzić request przez wszystkie serwisy, co jest kluczowe w debugowaniu problemów.

Zalety i wady mikroserwisów

Kurs · 24 lekcje8h 14m
Kurs

Kurs Python dla początkujących — PyStart

Zacznij programować w Pythonie! Idealne dla osób bez doświadczenia. Praktyczne zadania, projekty i wsparcie społeczności.

  • 24 lekcje wideo + 80 ćwiczeń
  • Realne bazy z e-commerce
  • Społeczność i code-review
499 zł799 zł−38%
Rozpocznij naukę

Architektura mikroserwisów ma swoje zalety i wyzwania. Ważne, aby rozumieć obie strony przed podjęciem decyzji o przejściu na tę architekturę.

Zalety

  • Niezależne wdrażanie i rozwój — każdy serwis może być rozwijany przez inny zespół i wdrażany niezależnie.
  • Łatwiejsze skalowanie — możesz skalować tylko te serwisy, które tego wymagają (np. user-service ma większy ruch, więc skaluj tylko go).
  • Elastyczność technologiczna — każdy serwis może używać innej technologii (np. Python dla API, Node.js dla real-time, Go dla wydajności).
  • Większa odporność na błędy — awaria jednego serwisu nie zatrzymuje całości (o ile odpowiednio zaprojektujesz fallbacki).
  • Zespoły mogą pracować niezależnie — mniejsze konflikty w kodzie i szybszy development.

Wyzwania

  • Większa złożoność infrastruktury — potrzeba więcej narzędzi: orchestracja, service discovery, monitoring, logowanie.
  • Trudniejsze debugowanie i testowanie — request może przechodzić przez wiele serwisów, trudniej zreprodukować błąd.
  • Konieczność automatyzacji deploymentu (CI/CD) — ręczne wdrażanie wielu serwisów jest niepraktyczne.
  • Potrzeba monitoringu rozproszonego — trzeba śledzić wiele serwisów jednocześnie.
  • Komunikacja sieciowa — opóźnienia sieci, konieczność obsługi błędów komunikacji.
  • Zarządzanie transakcjami rozproszonymi — trudniejsze niż w monolicie (trzeba użyć patternów jak Saga).

Podsumowanie

Architektura mikroserwisów to potężne, ale wymagające podejście. Python doskonale się w nim sprawdza dzięki swojej prostocie, bogatym bibliotekom i elastyczności frameworków. Mikroserwisy nie są rozwiązaniem dla każdego projektu — dla małych aplikacji monolit może być lepszym wyborem.

Jeśli zaczynasz z mikroserwisami:

  1. Zacznij od podziału monolitu na logiczne moduły — nie dziel na mikroserwisy zbyt wcześnie, najpierw zidentyfikuj granice domenowe.
  2. Zadbaj o komunikację (REST, gRPC, RabbitMQ) — wybierz odpowiednią metodę dla każdego przypadku użycia.
  3. Wprowadź automatyczne wdrożenia i monitoring — CI/CD i obserwowalność są kluczowe.
  4. Nie komplikuj — mikroserwisy mają rozwiązywać problemy, nie je tworzyć. Zacznij od prostych rozwiązań i ewoluuj w miarę potrzeb.

Pamiętaj: mikroserwisy to narzędzie, a nie cel sam w sobie. Jeśli monolit spełnia Twoje potrzeby, nie ma powodu go dzielić. Mikroserwisy mają sens, gdy masz problemy ze skalowaniem, potrzebujesz niezależnych zespołów lub różne części aplikacji mają różne wymagania techniczne.

Część 20 z 26

Monitorowanie i logowanie błędów w aplikacjach webowych

druga lekcja cyklu „Flask vs FastAPI: Porównanie frameworków webowych w Pythonie"

Czytaj kolejny →