FastAPI Cookbook — Complete Summary¶
Giunio De Luca, Packt Publishing, 2024
Quick Reference Cheatsheet¶
| Category | Snippet | Notes |
|---|---|---|
| Install | pip install fastapi[all] |
Includes uvicorn, pydantic, etc. |
| Run (dev) | fastapi dev app/main.py |
Auto-reload enabled |
| Run (prod) | fastapi run app/main.py --port 80 |
No reload |
| App instance | app = FastAPI() |
Entry point |
| Router | router = APIRouter(); app.include_router(router) |
Modular route grouping |
| Path param | @app.get("/items/{item_id}") |
Auto-typed from annotation |
| Query param | def read(skip: int = 0, limit: int = 10) |
Params not in path = query |
| Request body | def create(item: Item) where Item(BaseModel) |
Pydantic validation |
| Response model | @app.get("/", response_model=ItemOut) |
Filters output fields |
| Field validation | Field(..., gt=0, max_length=100) |
Pydantic constraints |
| Custom validator | @field_validator("age") |
Class method on BaseModel |
| Dependency | Depends(get_db) |
DI for DB sessions, auth, etc. |
| Async endpoint | async def route(): |
Use for I/O-bound work |
| Sync endpoint | def route(): |
Runs in threadpool |
| Background task | background_tasks.add_task(fn, *args) |
Lightweight async work |
| File upload | file: UploadFile = File(...) |
shutil.copyfileobj to save |
| File download | return FileResponse(path=..., filename=...) |
Sets Content-Disposition |
| SQLAlchemy engine | create_engine("sqlite:///./db.sqlite3") |
Sync engine |
| Async engine | create_async_engine("sqlite+aiosqlite:///./db") |
Requires aiosqlite |
| DB session dep | def get_db(): db=Session(); try: yield db; finally: db.close() |
Generator dependency |
| MongoDB | MongoClient() (sync) / AsyncIOMotorClient() (async) |
pymongo / motor |
| Redis cache | aioredis.from_url("redis://localhost"); await r.set(k, v, ex=3600) |
TTL in seconds |
| JWT create | jose.jwt.encode({"sub": user, "exp": exp}, SECRET, algorithm="HS256") |
python-jose |
| JWT decode | jose.jwt.decode(token, SECRET, algorithms=["HS256"]) |
Raises JWTError |
| OAuth2 scheme | OAuth2PasswordBearer(tokenUrl="token") |
Returns token string |
| Password hash | CryptContext(schemes=["bcrypt"]).hash(pw) |
passlib |
| RBAC | Annotated[User, Depends(RoleChecker(["admin"]))] |
Callable class dep |
| TestClient | from fastapi.testclient import TestClient; c = TestClient(app) |
Sync tests |
| Async test | async with AsyncClient(transport=ASGITransport(app=app)) as c: |
httpx + pytest-asyncio |
| Override dep | app.dependency_overrides[get_db] = override_get_db |
Test isolation |
| Alembic init | alembic init alembic |
Migration scaffolding |
| Alembic migrate | alembic revision --autogenerate -m "msg"; alembic upgrade head |
Auto-detect changes |
| WebSocket | @app.websocket("/ws"); await ws.accept(); await ws.receive_text() |
Full-duplex |
| GraphQL | GraphQLRouter(strawberry.Schema(Query)); app.include_router(gql, prefix="/graphql") |
strawberry-graphql |
| gRPC gateway | grpc.aio.insecure_channel("localhost:50051") |
FastAPI as REST proxy |
| Rate limit | @limiter.limit("5/minute") |
slowapi |
| Profiling | Profiler(interval=0.001, async_mode="enabled") |
pyinstrument |
| CORS | app.add_middleware(CORSMiddleware, allow_origins=[...]) |
Cross-origin |
| Docker | FROM python:3.10; CMD ["fastapi", "run", "app/main.py", "--port", "80"] |
Production image |
| Gunicorn | gunicorn app.main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker |
Multi-worker; (2*cores)+1 |
| Hatch build | hatch new "Project"; hatch shell; hatch build -t sdist ../dist |
Packaging |
Chapter 1: First Steps with FastAPI¶
Setting up your development environment¶
- Install with
pip install fastapi[all]— bundles uvicorn, pydantic, starlette, and other essentials - Always use a virtual environment:
python -m venv venv && source venv/bin/activate - Run the dev server:
uvicorn main:app --reload(orfastapi dev main.pyin newer versions)
Creating a new FastAPI project¶
- Recommended structure for larger projects:
project/
├── src/ # Application code (models, routes, services)
├── tests/ # Test suite, mirrors src structure
├── docs/ # Documentation
├── requirements.txt
└── main.py
- Separate concerns into subdirectories under
src/: models, routes, services - Use
APIRouterin each route module, then aggregate withapp.include_router()inmain.py
# src/routes/items.py
from fastapi import APIRouter
router = APIRouter()
@router.get("/items/{item_id}")
async def read_item(item_id: int):
return {"item_id": item_id}
# main.py
from fastapi import FastAPI
from src.routes import items
app = FastAPI()
app.include_router(items.router)
Understanding FastAPI basics¶
- Built on Starlette (ASGI) and Pydantic (data validation)
- Endpoints can be
async def(non-blocking, ideal for I/O) ordef(run in a threadpool automatically) - Automatic interactive docs at
/docs(Swagger UI) and/redoc(ReDoc)
Defining your first API endpoint¶
- Decorators:
@app.get(),@app.post(),@app.put(),@app.delete(),@app.patch() - Status codes via
status_codeparameter orfastapi.statusconstants
from fastapi import FastAPI, status
app = FastAPI()
@app.get("/", status_code=status.HTTP_200_OK)
async def root():
return {"message": "Hello World"}
Working with path and query parameters¶
- Path parameters: defined in the route string, auto-cast from type annotations
- Query parameters: any function parameter not in the path string; use
Optionalor defaults for optional ones
@app.get("/items/{item_id}")
async def read_item(item_id: int, q: str | None = None):
# item_id is path param (required, int)
# q is query param (optional, string)
return {"item_id": item_id, "q": q}
Defining and using request and response models¶
- Pydantic
BaseModelfor both request bodies and response shaping Field(...)for constraints:min_length,max_length,gt,lt,ge,le,patternresponse_modelon the decorator filters the output to only declared fields
from pydantic import BaseModel, Field
class Book(BaseModel):
title: str = Field(..., min_length=1, max_length=100)
author: str = Field(..., min_length=1, max_length=50)
year: int = Field(..., gt=1900, lt=2100)
class BookOut(BaseModel):
title: str
author: str
@app.post("/books", response_model=BookOut)
async def create_book(book: Book):
# book is fully validated; response only includes title & author
return book
Handling errors and exceptions¶
HTTPException(status_code=..., detail=...)for standard error responses- Custom exception handlers via
@app.exception_handler(ExceptionClass) - Override
RequestValidationErrorfor custom validation error formatting
from fastapi import HTTPException, Request, status
from fastapi.responses import JSONResponse, PlainTextResponse
from fastapi.exceptions import RequestValidationError
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"message": "Oops! Something went wrong"}
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return PlainTextResponse(
f"Validation error:\n{json.dumps(exc.errors(), indent=2)}",
status_code=status.HTTP_400_BAD_REQUEST
)
Chapter 2: Working with Data¶
Setting up SQL databases¶
- SQLAlchemy ORM with the modern
DeclarativeBase+Mapped/mapped_columnstyle (SQLAlchemy 2.0+) - Engine creation, session factory, and a
get_dbgenerator dependency for request-scoped sessions
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, sessionmaker
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
email: Mapped[str]
DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Understanding CRUD operations with SQLAlchemy¶
- Create: instantiate model,
db.add(obj),db.commit(),db.refresh(obj) - Read:
db.query(Model).filter(Model.id == id).first()or.all() - Update: fetch → modify attributes →
db.commit() - Delete: fetch →
db.delete(obj)→db.commit()
from fastapi import Depends
from sqlalchemy.orm import Session
@app.post("/users")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = User(name=user.name, email=user.email)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
Integrating MongoDB for NoSQL data storage¶
- pymongo for synchronous access;
motorfor async (covered in Ch7) MongoClient()→ database → collection patternObjectIdfrombsonfor document IDs; validate withObjectId.is_valid()
from pymongo import MongoClient
from bson import ObjectId
client = MongoClient()
database = client.mydatabase
user_collection = database["users"]
@app.post("/users")
def create_user(user: UserCreate):
result = user_collection.insert_one(user.model_dump())
return {"id": str(result.inserted_id)}
@app.get("/users/{user_id}")
def get_user(user_id: str):
if not ObjectId.is_valid(user_id):
raise HTTPException(status_code=400, detail="Invalid ID")
user = user_collection.find_one({"_id": ObjectId(user_id)})
if not user:
raise HTTPException(status_code=404, detail="User not found")
user["_id"] = str(user["_id"])
return user
Working with data validation and serialization¶
EmailStrfrom pydantic for email validation (requiresemail-validatorpackage)field_validatorfor custom validation logic on individual fieldsmodel_dump()to serialize to dict;model_copy(update={...})for partial updates
from pydantic import BaseModel, EmailStr, field_validator
class User(BaseModel):
name: str
email: EmailStr
age: int
@field_validator("age")
def validate_age(cls, value):
if value < 18 or value > 100:
raise ValueError("Age must be between 18 and 100")
return value
Working with file uploads and downloads¶
- Upload:
UploadFile+File(...), save withshutil.copyfileobj - Download:
FileResponsewith explicitpathandfilename
import shutil
from pathlib import Path
from fastapi import File, UploadFile
from fastapi.responses import FileResponse
@app.post("/uploadfile")
async def upload_file(file: UploadFile = File(...)):
with open(f"uploads/{file.filename}", "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {"filename": file.filename}
@app.get("/downloadfile/{filename}", response_class=FileResponse)
async def download_file(filename: str):
filepath = Path(f"uploads/{filename}")
if not filepath.exists():
raise HTTPException(status_code=404, detail=f"File {filename} not found")
return FileResponse(path=str(filepath), filename=filename)
Handling asynchronous data operations¶
async defendpoints free the event loop during I/O waits — critical for concurrent performance- Sync
defendpoints block a threadpool thread for the duration of the call - Demonstrated with
asyncio.gatherbenchmarking:
import asyncio
from httpx import AsyncClient
async def make_requests(n: int, path: str):
async with AsyncClient(base_url="http://localhost:8000") as client:
tasks = (client.get(path, timeout=float("inf")) for _ in range(n))
await asyncio.gather(*tasks)
# Result: 100 sync requests ~6.4s vs 100 async requests ~2.4s
- Key takeaway: use
async deffor I/O-bound endpoints (DB queries, HTTP calls, file I/O); use syncdeffor CPU-bound work (it runs in a threadpool and won't block the event loop)
Securing sensitive data and best practices¶
- Store secrets in environment variables, never hardcode
- Hash passwords with bcrypt via
passlib - Use HTTPS in production
- Apply RBAC to restrict endpoint access by role
- Database-level security: parameterized queries (SQLAlchemy handles this), least-privilege DB users
Chapter 3: Building RESTful APIs with FastAPI¶
Creating CRUD operations¶
- Example: CSV-backed Task Manager demonstrating full CRUD without a database
csv.DictReader/csv.DictWriterfor persistence- Pydantic models with inheritance:
Task(BaseModel)for input,TaskWithID(Task)addingid: intfor output
class Task(BaseModel):
title: str
description: str
status: str
class TaskWithID(Task):
id: int
# Read all tasks
def read_tasks_from_csv() -> list[TaskWithID]:
with open("tasks.csv", "r") as f:
reader = csv.DictReader(f)
return [TaskWithID(**row) for row in reader]
Creating RESTful Endpoints¶
- Follow REST conventions:
GET /tasks(list),GET /tasks/{id}(detail),POST /tasks(create),PUT /tasks/{id}(full update),DELETE /tasks/{id}(remove) - Use appropriate HTTP status codes: 200 (OK), 201 (Created), 204 (No Content), 404 (Not Found)
Testing your RESTful API¶
TestClientfromfastapi.testclientfor quick synchronous tests- Tests can verify status codes, response bodies, and side effects
from fastapi.testclient import TestClient
client = TestClient(app)
def test_create_task():
response = client.post("/tasks", json={"title": "Test", "description": "Desc", "status": "pending"})
assert response.status_code == 200
assert response.json()["title"] == "Test"
Handling complex queries and filtering¶
- Use
Optionalquery parameters with defaults ofNone - Implement keyword-based search by checking if the search term appears in relevant fields
- Chain filters: each
Optionalparam narrows results only when provided
from typing import Optional
@app.get("/tasks")
def get_tasks(
status: Optional[str] = None,
keyword: Optional[str] = None
):
tasks = read_tasks_from_csv()
if status:
tasks = [t for t in tasks if t.status == status]
if keyword:
tasks = [t for t in tasks if keyword.lower() in t.title.lower()
or keyword.lower() in t.description.lower()]
return tasks
Versioning your API¶
Five strategies presented:
- URL path versioning (most common):
/v1/tasks,/v2/tasks— separate routers per version - Query parameter versioning:
?version=1— single endpoint dispatches by param - Header versioning:
X-API-Version: 2— clean URLs, version in request header - Consumer-based versioning: different behavior per API key / consumer identity
- Semantic versioning: version the API spec itself (major.minor.patch)
# URL path versioning
v1_router = APIRouter(prefix="/v1")
v2_router = APIRouter(prefix="/v2")
@v1_router.get("/tasks")
def get_tasks_v1(): ...
@v2_router.get("/tasks")
def get_tasks_v2(): ... # enhanced response schema
app.include_router(v1_router)
app.include_router(v2_router)
Securing your API with OAuth2¶
OAuth2PasswordBearer(tokenUrl="token")creates a dependency that extracts the bearer tokenOAuth2PasswordRequestFormfor the login endpoint (username + password form data)
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=401, detail="Incorrect credentials")
token = create_access_token(data={"sub": user.username})
return {"access_token": token, "token_type": "bearer"}
@app.get("/protected")
async def protected_route(token: str = Depends(oauth2_scheme)):
# token is the raw bearer token string
user = verify_token(token)
return {"user": user.username}
Documenting your API with Swagger and Redoc¶
- Auto-generated at
/docs(Swagger) and/redoc(ReDoc) - Customize with
FastAPI(title=..., description=..., version=...)constructor params - Override the OpenAPI schema with
app.openapi()method for advanced customization - Add
tags,summary,descriptionto individual endpoints for better organization
Chapter 4: Authentication and Authorization¶
Setting up user registration¶
- Hash passwords with
passlib+bcrypt:CryptContext(schemes=["bcrypt"], deprecated="auto") - Use the
lifespancontext manager (replaces deprecated@app.on_event) for startup/shutdown logic - Handle duplicate registration with
IntegrityErrorcatch
from passlib.context import CryptContext
from contextlib import asynccontextmanager
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create tables, seed data, etc.
Base.metadata.create_all(bind=engine)
yield
# Shutdown: cleanup
app = FastAPI(lifespan=lifespan)
@app.post("/register")
def register(user: UserCreate, db: Session = Depends(get_db)):
hashed = pwd_context.hash(user.password)
db_user = User(username=user.username, hashed_password=hashed)
try:
db.add(db_user)
db.commit()
except IntegrityError:
db.rollback()
raise HTTPException(status_code=400, detail="Username already exists")
return {"message": "User registered"}
Working with OAuth2 and JWT for authentication¶
- python-jose for JWT encoding/decoding
- Token payload:
{"sub": username, "exp": expiration_datetime} - Standard flow: login → verify password → issue JWT → client sends
Authorization: Bearer <token>
from jose import jwt, JWTError
from datetime import datetime, timedelta
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
return username
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
Setting up RBAC¶
- Define roles with
enum.Enum:class Role(str, Enum): admin = "admin"; user = "user" - Create a callable dependency class
RoleCheckerthat validates the current user's role - Chain dependencies with
Annotatedfor clean, reusable type aliases
from enum import Enum
from typing import Annotated
class Role(str, Enum):
admin = "admin"
user = "user"
class RoleChecker:
def __init__(self, allowed_roles: list[Role]):
self.allowed_roles = allowed_roles
def __call__(self, user: User = Depends(get_current_user)):
if user.role not in self.allowed_roles:
raise HTTPException(status_code=403, detail="Forbidden")
return user
AdminUser = Annotated[User, Depends(RoleChecker([Role.admin]))]
@app.get("/admin")
def admin_only(user: AdminUser):
return {"message": f"Welcome admin {user.username}"}
Using third-party authentication¶
- Example: GitHub OAuth2 flow using
httpx - Flow: redirect user to GitHub → user authorizes → GitHub redirects back with
code→ exchange code for access token → fetch user profile
import httpx
GITHUB_CLIENT_ID = os.getenv("GITHUB_CLIENT_ID")
GITHUB_CLIENT_SECRET = os.getenv("GITHUB_CLIENT_SECRET")
@app.get("/login/github")
async def github_login():
return RedirectResponse(
f"https://github.com/login/oauth/authorize?client_id={GITHUB_CLIENT_ID}"
)
@app.get("/github-callback")
async def github_callback(code: str):
async with httpx.AsyncClient() as client:
token_resp = await client.post(
"https://github.com/login/oauth/access_token",
params={"client_id": GITHUB_CLIENT_ID, "client_secret": GITHUB_CLIENT_SECRET, "code": code},
headers={"Accept": "application/json"}
)
access_token = token_resp.json()["access_token"]
user_resp = await client.get(
"https://api.github.com/user",
headers={"Authorization": f"Bearer {access_token}"}
)
return user_resp.json()
Implementing MFA¶
- TOTP (Time-based One-Time Password) with
pyotp - Generate a secret per user, produce a provisioning URI for authenticator apps
- Verify the 6-digit code on login
import pyotp
# During setup:
secret = pyotp.random_base32()
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(name=user.email, issuer_name="MyApp")
# Store secret with user record; share provisioning_uri (or QR code) with user
# During login verification:
totp = pyotp.TOTP(user.totp_secret)
if not totp.verify(submitted_code):
raise HTTPException(status_code=401, detail="Invalid MFA code")
Handling API key authentication¶
- Simple scheme: check an
X-API-Keyheader against stored keys - Implement as a
Depends()function orSecurity()dependency
Handling session cookies and logout functionality¶
- Set cookies with
response.set_cookie(key="session_id", value=token, httponly=True) - Clear on logout with
response.delete_cookie("session_id") httponly=Trueprevents JavaScript access (XSS protection)
Chapter 5: Testing and Debugging FastAPI Applications¶
⚡ Extra detail section — per user request
Setting up testing environments¶
- Install:
pip install pytest httpx pytest-asyncio conftest.py: shared fixtures (app instance, test client, test DB session)- Test DB isolation: use in-memory SQLite with
StaticPoolto avoid polluting production data
# conftest.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
SQLALCHEMY_DATABASE_URL = "sqlite://" # in-memory
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@pytest.fixture
def db_session():
Base.metadata.create_all(bind=engine)
db = TestingSessionLocal()
try:
yield db
finally:
db.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def client(db_session):
def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as c:
yield c
app.dependency_overrides.clear()
Writing and running unit tests¶
- Tests as plain functions prefixed with
test_ - Use
assertstatements directly (pytest rewrites them for detailed output) - Group related tests in classes:
class TestUserEndpoints:
def test_create_user(client):
response = client.post("/users", json={"name": "Alice", "email": "a@b.com"})
assert response.status_code == 200
data = response.json()
assert data["name"] == "Alice"
assert "id" in data
Testing API Endpoints¶
- Sync testing:
TestClient(app)— wraps requests, no need for async - Async testing:
httpx.AsyncClientwithASGITransport— required for testingasync defendpoints that use async dependencies
import pytest
from httpx import AsyncClient, ASGITransport
@pytest.mark.asyncio
async def test_async_endpoint():
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as client:
response = await client.get("/async-endpoint")
assert response.status_code == 200
- Dependency overrides (
app.dependency_overrides[original] = replacement) are the primary mechanism for test isolation — mock DB sessions, auth, external services
Running tests techniques¶
pytest— run all testspytest -v— verbose outputpytest --cov=app— coverage report (requirespytest-cov)pytest -m integration— run only tests marked with@pytest.mark.integration- Markers for categorization:
# pytest.ini or pyproject.toml
[tool.pytest.ini_options]
markers = [
"integration: marks tests as integration tests",
"slow: marks tests as slow",
]
@pytest.mark.integration
def test_db_connection(client):
...
Handling logging messages¶
- Standard library
loggingmodule with configurable handlers StreamHandlerfor console output,TimedRotatingFileHandlerfor rotating log filesColourizedFormatter(from uvicorn) for colored terminal output- Logging middleware to log every request/response:
import logging
from logging.handlers import TimedRotatingFileHandler
logger = logging.getLogger("myapp")
logger.setLevel(logging.DEBUG)
# Console handler with color
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(ColourizedFormatter("%(levelprefix)s %(message)s"))
logger.addHandler(stream_handler)
# File handler with rotation
file_handler = TimedRotatingFileHandler("app.log", when="midnight", backupCount=7)
file_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(file_handler)
- Logging middleware pattern: wrap
call_next(request)to log request method, URL, status code, and timing
Debugging techniques¶
- PDB:
import pdb; pdb.set_trace()— interactive debugger in terminal - debugpy: remote debugging, attach from VS Code or PyCharm
pip install debugpy- Add
debugpy.listen(("0.0.0.0", 5678))at startup - Configure IDE to attach to port 5678
- PyCharm: native FastAPI run configuration with breakpoints
Performance testing for high traffic applications¶
- Locust for load testing: define user behavior as a Python class
HttpUserbase class,@taskdecorator for weighted actions- Run headless for CI/CD integration
# locustfile.py
from locust import HttpUser, task, between
class APIUser(HttpUser):
wait_time = between(1, 3)
@task(3)
def get_items(self):
self.client.get("/items")
@task(1)
def create_item(self):
self.client.post("/items", json={"name": "test", "price": 9.99})
# Run headless
locust --headless --users 10 --spawn-rate 1 -H http://localhost:8000
Chapter 6: Integrating FastAPI with SQL Databases¶
Setting up SQLAlchemy¶
- SQLAlchemy 2.0+ async with
create_async_engineandAsyncSession - Requires an async driver:
aiosqlitefor SQLite,asyncpgfor PostgreSQL
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, DeclarativeBase
DATABASE_URL = "sqlite+aiosqlite:///./db.sqlite3"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
# Dependency
async def get_db():
async with async_session() as session:
yield session
Implementing CRUD operations¶
- Async CRUD uses
awaitwith session methods andasync with session.begin()for transactions
from sqlalchemy import select
@app.post("/items")
async def create_item(item: ItemCreate, db: AsyncSession = Depends(get_db)):
db_item = Item(**item.model_dump())
db.add(db_item)
await db.commit()
await db.refresh(db_item)
return db_item
@app.get("/items/{item_id}")
async def get_item(item_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Item).where(Item.id == item_id))
item = result.scalar_one_or_none()
if not item:
raise HTTPException(status_code=404)
return item
Working with migrations¶
- Alembic for database schema migrations
- Setup:
alembic init alembic→ configurealembic.iniandenv.pywith your DB URL and Base metadata - Workflow: modify models →
alembic revision --autogenerate -m "description"→alembic upgrade head - Rollback:
alembic downgrade -1
Handling relationships in SQL databases¶
- One-to-one:
relationship()withuselist=False - Many-to-one / One-to-many:
ForeignKey+relationship()withback_populates - Many-to-many: association table +
relationship()withsecondary
# Many-to-many example
from sqlalchemy import Table, Column, ForeignKey, Integer
# Association table
student_course = Table(
"student_course", Base.metadata,
Column("student_id", Integer, ForeignKey("student.id")),
Column("course_id", Integer, ForeignKey("course.id")),
)
class Student(Base):
__tablename__ = "student"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
courses: Mapped[list["Course"]] = relationship(secondary=student_course, back_populates="students")
class Course(Base):
__tablename__ = "course"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
students: Mapped[list["Student"]] = relationship(secondary=student_course, back_populates="courses")
Optimizing SQL queries for performance¶
- N+1 problem: accessing lazy-loaded relationships in a loop triggers one query per row
- Fix with eager loading:
joinedload()orselectinload() load_only()to fetch only specific columns
from sqlalchemy.orm import joinedload, load_only
# Eager load relationships
result = await db.execute(
select(Student).options(joinedload(Student.courses))
)
# Load only specific columns
result = await db.execute(
select(Item).options(load_only(Item.name, Item.price))
)
Securing sensitive data in SQL databases¶
- Fernet symmetric encryption for encrypting fields at rest
from cryptography.fernet import Fernet
key = Fernet.generate_key()
f = Fernet(key)
# Encrypt before storing
encrypted = f.encrypt(sensitive_data.encode())
# Decrypt when reading
decrypted = f.decrypt(encrypted).decode()
Handling transactions and concurrency¶
- Atomic updates with conditions to prevent race conditions: use
and_()in WHERE clauses - SQL isolation levels: control how concurrent transactions see each other's changes
asyncio.gatherto test concurrent behavior
from sqlalchemy import and_, update
# Atomic conditional update
stmt = (
update(Item)
.where(and_(Item.id == item_id, Item.version == expected_version))
.values(quantity=new_quantity, version=expected_version + 1)
)
result = await db.execute(stmt)
if result.rowcount == 0:
raise HTTPException(status_code=409, detail="Concurrent modification detected")
await db.commit()
Chapter 7: Integrating FastAPI with NoSQL Databases¶
Setting up MongoDB with FastAPI¶
- motor (
AsyncIOMotorClient) for async MongoDB access - Register
ObjectIdserializer for Pydantic compatibility
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import ENCODERS_BY_TYPE
from bson import ObjectId
ENCODERS_BY_TYPE[ObjectId] = str
client = AsyncIOMotorClient("mongodb://localhost:27017")
database = client.mydb
collection = database.get_collection("items")
CRUD operations in MongoDB¶
insert_one(),find_one(),find(),update_one(),delete_one()— allawait-able with motor- Convert
ObjectIdto string for API responses
Handling relationships in NoSQL databases¶
- Embedding (nested documents): denormalized, fast reads, good when data is always accessed together
- Referencing (storing IDs): normalized, avoids duplication, requires additional queries
- Choose based on access patterns and data size
Working with indexes in MongoDB¶
create_index()for single-field, compound, and text indexes- Text indexes enable
$text/$searchqueries - Use
explain()to analyze query performance
# Single field index
await collection.create_index("email", unique=True)
# Text index for search
await collection.create_index([("title", "text"), ("description", "text")])
# Query using text search
results = await collection.find({"$text": {"$search": "python fastapi"}}).to_list(100)
Exposing sensitive data from NoSQL databases¶
- Data masking techniques using MongoDB aggregation:
$redactto conditionally prune document fields$unsetto remove specific fields from output$map/$mergeObjectsto transform nested arrays- MongoDB Views: stored aggregation pipelines that present masked/filtered data
Integrating FastAPI with Elasticsearch¶
AsyncElasticsearchclient for async search operations- DSL queries:
match,multi_match,boolwithmust/should/filter - Aggregations for analytics (e.g.,
terms,avg,date_histogram)
from elasticsearch import AsyncElasticsearch
es = AsyncElasticsearch("http://localhost:9200")
@app.get("/search")
async def search(q: str):
result = await es.search(
index="products",
body={"query": {"multi_match": {"query": q, "fields": ["name", "description"]}}}
)
return result["hits"]["hits"]
Using Redis for caching in FastAPI¶
- aioredis for async Redis access
- Cache frequently accessed data with TTL to reduce DB load
import aioredis
import json
redis = aioredis.from_url("redis://localhost")
@app.get("/items/{item_id}")
async def get_item(item_id: int):
# Check cache first
cached = await redis.get(f"item:{item_id}")
if cached:
return json.loads(cached)
# Cache miss — fetch from DB
item = await fetch_from_db(item_id)
await redis.set(f"item:{item_id}", json.dumps(item), ex=3600) # TTL: 1 hour
return item
Chapter 8: Advanced Features and Best Practices¶
⚡ Extra detail section — per user request
Implementing dependency injection¶
- FastAPI DI via
Depends()— functions, generators, or callable classes - Nested dependencies: dependencies can themselves have dependencies, forming a chain
Annotatedtype aliases for reusable dependency declarations (cleaner than repeatingDepends())Query(...),Path(...)descriptors add validation + OpenAPI metadata to parameters
from typing import Annotated
from fastapi import Depends, Query
# Sub-dependency
def get_settings():
return Settings()
def get_db(settings: Annotated[Settings, Depends(get_settings)]):
engine = create_engine(settings.database_url)
session = Session(engine)
try:
yield session
finally:
session.close()
DB = Annotated[Session, Depends(get_db)]
@app.get("/items")
def list_items(db: DB, skip: int = Query(0, ge=0), limit: int = Query(10, le=100)):
return db.query(Item).offset(skip).limit(limit).all()
- Testing with dependency overrides:
app.dependency_overrides[original_dep] = mock_dep— swap any dependency for tests without changing production code
Creating custom middleware¶
BaseHTTPMiddleware: simple pattern — overridedispatch(self, request, call_next), return response- Use for cross-cutting concerns: logging, timing, header injection, client info
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request
class ClientInfoMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
client_host = request.client.host
user_agent = request.headers.get("user-agent", "unknown")
logger.info(f"Request from {client_host} using {user_agent}")
response = await call_next(request)
response.headers["X-Client-Host"] = client_host
return response
app.add_middleware(ClientInfoMiddleware)
Internationalization and localization¶
babellibrary for locale negotiation- Parse
Accept-Languageheader →negotiate_localeto find best match from supported locales - Return localized responses based on resolved locale
from babel import Locale, negotiate_locale
SUPPORTED_LOCALES = ["en_US", "fr_FR", "de_DE"]
def resolve_accept_language(
accept_language: str = Header("en-US"),
) -> str:
requested = [accept_language.replace("-", "_")]
locale = negotiate_locale(requested, SUPPORTED_LOCALES)
return locale or "en_US"
@app.get("/greeting")
def greeting(locale: str = Depends(resolve_accept_language)):
greetings = {"en_US": "Hello!", "fr_FR": "Bonjour!", "de_DE": "Hallo!"}
return {"message": greetings.get(locale, "Hello!")}
Optimizing application performance¶
- pyinstrument profiler for identifying bottlenecks in async FastAPI code
- Wrap as middleware to profile every request and output HTML reports
from pyinstrument import Profiler
from starlette.middleware.base import BaseHTTPMiddleware
class ProfileEndpointsMiddleWare(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
profiler = Profiler(interval=0.001, async_mode="enabled")
profiler.start()
response = await call_next(request)
profiler.stop()
profiler.write_html("profiler_output.html")
return response
- Key metrics to watch: response time per endpoint, DB query count/duration, memory usage
Implementing rate limiting¶
- slowapi — built on top of
limits, integrates directly with FastAPI Limiter(key_func=get_remote_address)— rate limit per client IP- Per-endpoint:
@limiter.limit("2/minute") - Global default:
default_limits=["5/minute"]+SlowAPIMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.middleware import SlowAPIMiddleware
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Per-endpoint
@app.get("/limited")
@limiter.limit("2/minute")
async def limited_endpoint(request: Request):
return {"message": "This endpoint is rate limited"}
# Global (alternative)
limiter = Limiter(key_func=get_remote_address, default_limits=["5/minute"])
app.add_middleware(SlowAPIMiddleware)
Implementing background tasks¶
BackgroundTasks: lightweight, built-in — tasks run after the response is sent- Inject via parameter:
background_tasks: BackgroundTasks - Add tasks:
background_tasks.add_task(function, *args, **kwargs) - When to use: logging, sending emails, lightweight data processing
- When NOT to use: heavy/long-running work → use Celery with a message broker instead
from fastapi import BackgroundTasks
def write_log(message: str):
with open("log.txt", "a") as f:
f.write(f"{message}\n")
def send_notification(email: str, message: str):
# Simulate email sending
...
@app.post("/orders")
async def create_order(order: Order, background_tasks: BackgroundTasks):
# Process order synchronously
result = process_order(order)
# Queue background work
background_tasks.add_task(write_log, f"Order {result.id} created")
background_tasks.add_task(send_notification, order.email, "Order confirmed!")
return result
- Important concurrency note:
BackgroundTasksrun in the same event loop. CPU-heavy tasks will block async endpoints. For heavy work, use Celery with Redis/RabbitMQ as a broker.
Chapter 9: Working with WebSocket¶
Setting up WebSockets in FastAPI¶
@app.websocket("/ws")decorator for WebSocket endpoints- Must
await websocket.accept()before sending/receiving
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
await websocket.send_text("Connected!")
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
Sending and receiving messages over WebSockets¶
send_text()/receive_text()for string datasend_json()/receive_json()for structured datasend_bytes()/receive_bytes()for binary data
Handling WebSocket connections and disconnections¶
ConnectionManagerpattern: track active connections, handle connect/disconnect/broadcast
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
Handling WebSocket errors and exceptions¶
- Catch
WebSocketDisconnectto handle client disconnections gracefully
from fastapi import WebSocketDisconnect
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"{client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"{client_id} left the chat")
Implementing chat functionality with WebSockets¶
- Combine
ConnectionManager+ Jinja2 HTML template with JavaScriptWebSocketAPI - Server broadcasts messages to all connected clients
- Client-side:
new WebSocket("ws://localhost:8000/ws/username")+onmessagehandler
Optimizing WebSocket performance¶
- Set connection limits to prevent resource exhaustion
- Implement heartbeat/ping-pong to detect stale connections
- Use
asyncio.wait_for()with timeouts on receive operations
Securing WebSocket connections with OAuth2¶
- WebSockets don't support HTTP headers in the browser API — pass token as a query parameter
- Validate token during the
connectphase before accepting
@app.websocket("/ws")
async def secured_ws(websocket: WebSocket, token: str = Query(...)):
try:
user = verify_token(token)
except Exception:
await websocket.close(code=1008) # Policy Violation
return
await websocket.accept()
...
Chapter 10: Integrating FastAPI with other Python Libraries¶
Integrating FastAPI with gRPC¶
- Define service in
.protofile (proto3 syntax) - Generate Python stubs:
python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. service.proto - gRPC server:
grpc.aio.server()with servicer implementation - FastAPI as a REST-to-gRPC gateway:
import grpc
from generated_pb2_grpc import GrpcServerStub
from generated_pb2 import Message
grpc_channel = grpc.aio.insecure_channel("localhost:50051")
@app.get("/grpc")
async def call_grpc(message: str):
async with grpc_channel as channel:
stub = GrpcServerStub(channel)
response = await stub.GetServerResponse(Message(message=message))
return {"server_message": response.message, "received": response.received}
Connecting FastAPI with GraphQL¶
- strawberry-graphql with FastAPI integration
- Define types with
@strawberry.type, queries with@strawberry.field - Mount with
GraphQLRouter
import strawberry
from strawberry.fastapi import GraphQLRouter
@strawberry.type
class User:
name: str
age: int
country: str
@strawberry.type
class Query:
@strawberry.field
def users(self, country: str | None = None) -> list[User]:
if country:
return [u for u in users_db if u.country == country]
return users_db
schema = strawberry.Schema(Query)
graphql_app = GraphQLRouter(schema)
app.include_router(graphql_app, prefix="/graphql")
- Access GraphiQL IDE at
/graphqlfor interactive queries
Using ML models with Joblib¶
- Load serialized models with
joblib.load()during applifespan hf_hub_download()fromhuggingface_hubto pull model files from HuggingFacepydantic.create_model()to dynamically generate request models from feature lists
import joblib
from huggingface_hub import hf_hub_download
from pydantic import create_model
from contextlib import asynccontextmanager
ml_model = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
model_path = hf_hub_download(repo_id=REPO_ID, filename=FILENAME)
ml_model["model"] = joblib.load(model_path)
yield
ml_model.clear()
# Dynamic Pydantic model from feature list
Symptoms = create_model(
"Symptoms",
**{symptom: (bool, False) for symptom in symptoms_list[:10]}
)
@app.post("/predict")
async def predict(symptoms: Symptoms):
features = [int(v) for v in symptoms.model_dump().values()]
prediction = ml_model["model"].predict([features])
return {"prediction": prediction[0]}
Integrating FastAPI with Cohere¶
- Cohere
AsyncClientfor async LLM API calls client.chat(model="command-r-plus", chat_history=[...], message=...)for conversational AIChatMessage(role=..., message=...)for maintaining chat history
from cohere import AsyncClient, ChatMessage
co = AsyncClient(api_key=os.getenv("COHERE_API_KEY"))
@app.post("/chat")
async def chat(message: str):
response = await co.chat(
model="command-r-plus",
chat_history=[ChatMessage(role="USER", message="Hi"), ChatMessage(role="CHATBOT", message="Hello!")],
message=message,
)
return {"response": response.text}
Integrating FastAPI with LangChain¶
- RAG (Retrieval-Augmented Generation) pipeline:
- Load documents with
DirectoryLoader - Split with
CharacterTextSplitter - Embed + store in vector DB (
Chroma+CohereEmbeddings) - Query: similarity search → inject context into prompt → LLM generates answer
- Chain composition with pipe operator:
template | model | StrOutputParser()
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader
from langchain_community.vectorstores import Chroma
from langchain_cohere import CohereEmbeddings, ChatCohere
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Setup at startup
loader = DirectoryLoader("./documents")
splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)
docs = splitter.split_documents(loader.load())
db = Chroma.from_documents(docs, CohereEmbeddings())
model = ChatCohere()
template = ChatPromptTemplate.from_template(
"Answer based on context:\n{context}\n\nQuestion: {question}"
)
chain = template | model | StrOutputParser()
def get_context(question: str, db):
docs = db.similarity_search(question, k=3)
return "\n".join(doc.page_content for doc in docs)
@app.post("/ask")
async def ask(question: str):
context = get_context(question, db)
response = await chain.ainvoke({"question": question, "context": context})
return {"answer": response}
Chapter 11: Middleware and Webhooks¶
Creating custom ASGI middleware¶
- Lower-level than
BaseHTTPMiddleware— implements the raw ASGI interface - Class with
__init__(self, app)andasync __call__(self, scope, receive, send) scope["type"]is"http","websocket", or"lifespan"
from starlette.types import ASGIApp, Scope, Receive, Send
class CustomASGIMiddleware:
def __init__(self, app: ASGIApp, some_param: str = "default"):
self.app = app
self.some_param = some_param
async def __call__(self, scope: Scope, receive: Receive, send: Send):
if scope["type"] == "http":
# Pre-processing
logger.info(f"Request scope: {scope}")
await self.app(scope, receive, send)
# Note: post-processing requires wrapping `send`
app.add_middleware(CustomASGIMiddleware, some_param="custom_value")
- Also supports a function decorator pattern for simpler cases
Developing middleware for request modification¶
- Example:
HashBodyContentMiddleware— reads request body, computes SHA1 hash, adds to headers - Must reconstruct the
receivecallable since the body stream is consumed once
import hashlib
class HashBodyContentMiddleware:
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send):
if scope["type"] == "http":
body = await receive()
body_bytes = body.get("body", b"")
body_hash = hashlib.sha1(body_bytes).hexdigest()
# Inject hash into headers
headers = dict(scope.get("headers", []))
scope["headers"] = list(scope.get("headers", [])) + [
(b"x-body-hash", body_hash.encode())
]
# Re-wrap receive so downstream can read the body
async def new_receive():
return body
await self.app(scope, new_receive, send)
else:
await self.app(scope, receive, send)
Developing middleware for response modification¶
- Wrap the
sendcallable to intercept and modify response headers/body MutableHeadersfrom Starlette for clean header manipulation
from starlette.datastructures import MutableHeaders
class ExtraHeadersResponseMiddleware:
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send):
async def modified_send(message):
if message["type"] == "http.response.start":
headers = MutableHeaders(scope=message)
headers.append("X-Custom-Header", "my-value")
await send(message)
await self.app(scope, receive, modified_send)
Handling CORS with middleware¶
CORSMiddlewarefrom Starlette — essential for frontend-backend separation
from starlette.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # or ["*"] for all
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
allow_origins: list of permitted origins (use specific origins in production, not["*"])allow_methods: HTTP methods to allowallow_headers: headers the client can send
Restricting incoming requests from hosts¶
TrustedHostMiddleware: reject requests not matching allowedHostheaders
from starlette.middleware.trustedhost import TrustedHostMiddleware
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["example.com", "*.example.com"]
)
Implementing webhooks¶
- Pattern: register webhook URLs at startup, fire events via middleware on every request
Event(BaseModel)with host, path, timestamp, and bodyhttpx.AsyncClientto POST events;asyncio.create_task()for non-blocking dispatch- Document webhooks with
@app.webhooks.post("event-name")for OpenAPI schema
from pydantic import BaseModel
from datetime import datetime
from asyncio import create_task
import httpx
class Event(BaseModel):
host: str
path: str
time: str
body: str
webhook_urls: list[str] = []
async def send_event_to_url(url: str, event: Event):
async with httpx.AsyncClient() as client:
await client.post(url, json=event.model_dump())
class WebhookSenderMiddleware:
def __init__(self, app: ASGIApp):
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send):
if scope["type"] == "http":
body_msg = await receive()
request = Request(scope=scope)
event = Event(
host=request.client.host,
path=str(request.url.path),
time=datetime.now().isoformat(),
body=body_msg.get("body", b"").decode()
)
for url in webhook_urls:
create_task(send_event_to_url(url, event))
async def new_receive():
return body_msg
await self.app(scope, new_receive, send)
else:
await self.app(scope, receive, send)
# Register webhook URLs via lifespan or endpoint
@app.post("/webhooks/register")
async def register_webhook(url: str):
webhook_urls.append(url)
return {"registered": url}
# Document in OpenAPI
@app.webhooks.post("new-request")
async def new_request_webhook(event: Event):
"""Fired on every incoming HTTP request."""
...
Chapter 12: Deploying and Managing FastAPI Applications¶
Running the server with the FastAPI CLI¶
fastapi dev app/main.py: development mode with auto-reloadfastapi run app/main.py --port 80: production mode, no reload- Equivalent to
uvicorn app.main:appwith appropriate flags
Enabling HTTPS on FastAPI applications¶
mkcertfor generating locally-trusted SSL certificates (development)HTTPSRedirectMiddlewareto force HTTPS
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
app.add_middleware(HTTPSRedirectMiddleware)
# Run with SSL:
# uvicorn app.main:app --ssl-keyfile=key.pem --ssl-certfile=cert.pem
Running FastAPI applications in Docker containers¶
FROM python:3.10
WORKDIR /code
COPY ./requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir -r /code/requirements.txt
COPY ./app /code/app
CMD ["fastapi", "run", "app/main.py", "--port", "80"]
# Build
docker build -f Dockerfile.dev -t myapp .
# Run
docker run -p 8000:80 myapp
Running the server across multiple workers¶
- Gunicorn as process manager with Uvicorn workers for production
- Heuristic for worker count:
(2 × CPU cores) + 1
gunicorn app.main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000
- Each worker is an independent process with its own event loop — true parallelism for multi-core utilization
- Combine with a reverse proxy (Nginx) for static files, SSL termination, and load balancing in production
Deploying your FastAPI application on the cloud¶
- Example: Railway deployment
- Create a
Procfile:web: fastapi run app/main.py --port $PORT - Connect GitHub repo → Railway auto-builds and deploys on push
- Environment variables configured via Railway dashboard
Shipping FastAPI applications with Hatch¶
- Hatch: modern Python project manager and build tool
hatch new "Project Name"— scaffold withpyproject.toml, src layout, testshatch shell— activate isolated virtual environmenthatch build -t sdist ../dist— build source distribution (.tar.gz)- Configure metadata, dependencies, and entry points in
pyproject.toml
# Create new project
hatch new "FCA Server"
# Enter virtual environment
cd fca-server
hatch shell
# Install dependencies
pip install fastapi uvicorn
# Build distribution
hatch build -t sdist ../dist
Notable Libraries Reference¶
| Library | Purpose | Install |
|---|---|---|
fastapi[all] |
Framework + all extras | pip install fastapi[all] |
uvicorn |
ASGI server | Included in fastapi[all] |
pydantic |
Data validation | Included in fastapi[all] |
sqlalchemy |
SQL ORM (sync + async) | pip install sqlalchemy |
aiosqlite |
Async SQLite driver | pip install aiosqlite |
asyncpg |
Async PostgreSQL driver | pip install asyncpg |
alembic |
DB migrations | pip install alembic |
pymongo |
MongoDB sync driver | pip install pymongo |
motor |
MongoDB async driver | pip install motor |
aioredis |
Async Redis client | pip install aioredis |
elasticsearch[async] |
Async Elasticsearch | pip install elasticsearch[async] |
passlib[bcrypt] |
Password hashing | pip install passlib[bcrypt] |
python-jose[cryptography] |
JWT tokens | pip install python-jose[cryptography] |
pyotp |
TOTP/MFA codes | pip install pyotp |
httpx |
Async HTTP client | pip install httpx |
pytest |
Testing framework | pip install pytest |
pytest-asyncio |
Async test support | pip install pytest-asyncio |
pytest-cov |
Coverage reporting | pip install pytest-cov |
locust |
Load testing | pip install locust |
slowapi |
Rate limiting | pip install slowapi |
pyinstrument |
Profiling | pip install pyinstrument |
babel |
i18n/l10n | pip install babel |
strawberry-graphql[fastapi] |
GraphQL | pip install strawberry-graphql[fastapi] |
grpcio + grpcio-tools |
gRPC | pip install grpcio grpcio-tools |
joblib |
Model serialization | pip install joblib |
huggingface_hub |
HF model download | pip install huggingface_hub |
cohere |
Cohere LLM API | pip install cohere |
langchain |
LLM orchestration | pip install langchain langchain-cohere langchain-community |
cryptography |
Fernet encryption | pip install cryptography |
gunicorn |
Process manager | pip install gunicorn |
hatch |
Build/packaging | pip install hatch |