Middleware¶
The backend uses a stack of ASGI middleware to handle cross-cutting concerns like rate limiting, request size validation, caching, and metrics collection. Middleware runs in order from outermost to innermost, with response processing in reverse order.
Middleware Stack¶
The middleware is applied in this order (outermost first):
- CORSMiddleware - Handles Cross-Origin Resource Sharing headers
- CacheControlMiddleware - Adds cache headers to responses
- RequestSizeLimitMiddleware - Rejects oversized requests
- CSRFMiddleware - Validates CSRF tokens on state-changing requests
- RateLimitMiddleware - Enforces per-user/per-endpoint limits
- MetricsMiddleware - Collects HTTP request metrics
Request Size Limit¶
Rejects requests exceeding a configurable size limit (default 10MB). This protects against denial-of-service attacks from large payloads.
class RequestSizeLimitMiddleware:
"""Middleware to limit request size, default 10MB.
Checks Content-Length header when present for an early reject, and wraps
the ASGI ``receive`` callable to count bytes as they stream — this
catches chunked-transfer requests that omit Content-Length.
"""
def __init__(self, app: ASGIApp, max_size_mb: int = 10) -> None:
self.app = app
self.max_size_bytes = max_size_mb * 1024 * 1024
Requests exceeding the limit receive a 413 response:
The middleware checks the Content-Length header before reading the body, avoiding wasted processing on oversized
requests.
Rate Limit¶
The RateLimitMiddleware intercepts all HTTP requests and checks them against configured rate limits.
Excluded paths bypass rate limiting:
EXCLUDED_PATHS = frozenset(
{
"/health",
"/metrics",
"/docs",
"/openapi.json",
"/favicon.ico",
"/api/v1/auth/login", # Auth endpoints handle their own limits
"/api/v1/auth/register",
"/api/v1/auth/logout",
}
)
When a request is allowed, rate limit headers are added to the response. When rejected, a 429 response is returned with
Retry-After indicating when to retry.
Cache Control¶
Adds appropriate Cache-Control headers to GET responses based on endpoint patterns:
self.cache_policies: dict[str, str] = {
"/api/v1/k8s-limits": "public, max-age=300", # 5 minutes
"/api/v1/example-scripts": "public, max-age=600", # 10 minutes
"/api/v1/notifications": "private, no-cache", # Always revalidate
"/api/v1/notifications/unread-count": "private, no-cache", # Always revalidate
}
| Endpoint | Policy | TTL |
|---|---|---|
/api/v1/k8s-limits |
public | 5 minutes |
/api/v1/example-scripts |
public | 10 minutes |
/api/v1/auth/me |
private, no-cache | - |
/api/v1/notifications |
private, no-cache | - |
Public endpoints also get a Vary: Accept-Encoding header for proper proxy caching. Cache headers are only added to
successful (200) responses.
Metrics¶
The MetricsMiddleware collects HTTP request telemetry using OpenTelemetry:
self.request_counter = self.meter.create_counter(
name="http_requests_total", description="Total number of HTTP requests", unit="requests"
)
self.request_duration = self.meter.create_histogram(
name="http_request_duration_seconds", description="HTTP request duration in seconds", unit="seconds"
)
self.request_size = self.meter.create_histogram(
name="http_request_size_bytes", description="HTTP request size in bytes", unit="bytes"
)
self.response_size = self.meter.create_histogram(
name="http_response_size_bytes", description="HTTP response size in bytes", unit="bytes"
)
self.active_requests = self.meter.create_up_down_counter(
name="http_requests_active", description="Number of active HTTP requests", unit="requests"
)
The middleware tracks:
- Request count by method, path template, and status code
- Request duration histogram
- Request/response size histograms
- Active requests gauge
Path templates use pattern replacement to reduce metric cardinality:
@staticmethod
def _get_path_template(path: str) -> str:
"""Convert path to template for lower cardinality."""
# Common patterns to replace
# UUID pattern
path = re.sub(r"/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", "/{id}", path)
# Numeric IDs
path = re.sub(r"/\d+", "/{id}", path)
# MongoDB ObjectIds
path = re.sub(r"/[0-9a-f]{24}", "/{id}", path)
return path
UUIDs, numeric IDs, and MongoDB ObjectIds are replaced with {id} to prevent metric explosion.
CSRF Protection¶
CSRFMiddleware implements the double-submit cookie pattern. At login the server issues two cookies: an httpOnly access_token cookie (the JWT) and a readable csrf_token cookie. The CSRF token is HMAC-signed against the access_token so it cannot be forged independently.
On every mutating request (POST, PUT, DELETE, PATCH) that targets an /api/ path, the middleware reads the csrf_token cookie and the X-CSRF-Token request header, then validates that they match (constant-time comparison) and that the token's HMAC signature is valid for the current access_token. Requests that fail any check receive a 403 response.
Safe methods (GET, HEAD, OPTIONS), auth endpoints (/api/v1/auth/login, /api/v1/auth/register), non-API paths, and unauthenticated requests (no access_token cookie) are exempt.
Frontend behaviour: The API interceptor in api-interceptors.ts auto-injects authStore.csrfToken into the X-CSRF-Token header for every non-GET request. The store obtains the token from the login response body and refreshes it by reading the csrf_token cookie on auth verification (auth.svelte.ts).
System Metrics¶
In addition to HTTP metrics, the middleware module provides system-level observables:
def create_system_metrics() -> None:
"""Create system metrics collectors."""
meter = metrics.get_meter(__name__)
# Process for system metrics
current_process = psutil.Process(os.getpid())
# Memory usage
def get_memory_usage(_: CallbackOptions) -> list[Observation]:
"""Get current memory usage."""
memory = psutil.virtual_memory()
return [
Observation(memory.used, {"type": "used"}),
Observation(memory.available, {"type": "available"}),
Observation(memory.percent, {"type": "percent"}),
]
meter.create_observable_gauge(
name="system_memory_bytes", callbacks=[get_memory_usage], description="System memory usage", unit="bytes"
)
# CPU usage
def get_cpu_usage(_: CallbackOptions) -> list[Observation]:
"""Get current CPU usage."""
cpu_percent = psutil.cpu_percent(interval=None)
return [Observation(cpu_percent)]
meter.create_observable_gauge(
name="system_cpu_percent", callbacks=[get_cpu_usage], description="System CPU usage percentage", unit="percent"
)
# Process metrics
def get_process_metrics(_: CallbackOptions) -> list[Observation]:
"""Get current process metrics."""
mem = current_process.memory_info()
return [
Observation(mem.rss, {"type": "rss"}),
Observation(mem.vms, {"type": "vms"}),
Observation(current_process.cpu_percent(), {"type": "cpu"}),
Observation(current_process.num_threads(), {"type": "threads"}),
]
meter.create_observable_gauge(
name="process_metrics", callbacks=[get_process_metrics], description="Process-level metrics", unit="mixed"
)
These expose:
system_memory_bytes- System memory (used, available, percent)system_cpu_percent- System CPU utilizationprocess_metrics- Process RSS, VMS, CPU, thread count
Key Files¶
-
Per-user / per-endpoint rate limiting
-
Cache-Control headers for GET responses
-
Reject oversized payloads before reading the body
-
HTTP request telemetry and system-level observables
-
CSRF token validation for state-changing requests