API SecurityΒΆ
Section: 5-security-architecture
Document: API Security
Status: Comprehensive API Security Documentation
Audience: Backend developers, security teams, API consumers
π― OverviewΒΆ
MachineAvatars exposes RESTful APIs for chatbot operations, user management, and data processing. This document covers API authentication, authorization, rate limiting, input validation, and security best practices.
API Gateway: Ports 8000 (HTTP) and 9000 (HTTPS)
Services: 23 microservices behind gateway
Authentication: JWT bearer tokens
Rate Limiting: By plan tier
π API AuthenticationΒΆ
JWT Token-Based AuthenticationΒΆ
Standard Flow:
sequenceDiagram
participant C as Client
participant G as Gateway
participant S as Backend Service
C->>G: GET /v1/chatbots<br/>Authorization: Bearer {token}
Note over G: Verify JWT token
alt Token missing
G-->>C: 401 Unauthorized
end
alt Token invalid/expired
G-->>C: 401 Invalid token
end
alt Token valid
G->>S: Forward request<br/>(with user context)
S->>S: Process request
S-->>G: Response
G-->>C: 200 OK + data
end
Token ValidationΒΆ
Required Headers:
Validation Code (Gateway/Services):
import jwt
from fastapi import HTTPException, Header
JWT_SECRET = os.getenv("JWT_SECRET")
JWT_ALGORITHM = "HS256"
def verify_jwt_token(authorization: str = Header(None)):
"""Verify JWT token from Authorization header."""
if not authorization:
raise HTTPException(
status_code=401,
detail="Authorization header missing"
)
# Extract token (remove "Bearer " prefix)
if not authorization.startswith("Bearer "):
raise HTTPException(
status_code=401,
detail="Invalid authorization format. Use 'Bearer {token}'"
)
token = authorization.replace("Bearer ", "")
try:
# Decode and verify signature
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM]
)
# Extract user context
user_id = payload.get("user_id")
email = payload.get("email")
if not user_id:
raise HTTPException(
status_code=401,
detail="Invalid token payload"
)
return {"user_id": user_id, "email": email}
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail="Token expired. Please login again."
)
except jwt.InvalidTokenError as e:
raise HTTPException(
status_code=401,
detail=f"Invalid token: {str(e)}"
)
Usage in Endpoints:
from fastapi import Depends
@app.get("/v1/chatbots")
async def get_chatbots(user_context: dict = Depends(verify_jwt_token)):
user_id = user_context["user_id"]
# Fetch chatbots for this user only
chatbots = chatbots_collection.find({"user_id": user_id})
return list(chatbots)
API Key Authentication (Business/Premium Plans)ΒΆ
For programmatic access (non-browser clients)
API Key Generation:
import secrets
def generate_api_key():
"""Generate secure 32-byte API key."""
return f"ma_{secrets.token_urlsafe(32)}"
# Example: "ma_xK7jP9nL4mQ2sR8vT3wY6zA5bD1hC0eN"
API Key Storage:
{
"_id": ObjectId("..."),
"user_id": "User-123456",
"api_key_hash": "$2b$12$...", // bcrypt hash of API key
"api_key_prefix": "ma_xK7j", // For user identification
"name": "Production API Key",
"created_at": "2025-01-15T10:00:00Z",
"last_used": "2025-01-20T14:30:00Z",
"expires_at": "2026-01-15T10:00:00Z", // Optional
"scopes": ["chatbot:read", "chatbot:write", "analytics:read"],
"rate_limit": 10000, // Requests per hour
"enabled": true
}
API Key Usage:
curl https://api.machineavatars.com/v1/chatbots \
-H "X-API-Key: ma_xK7jP9nL4mQ2sR8vT3wY6zA5bD1hC0eN"
Validation:
import bcrypt
from fastapi import HTTPException, Header
def verify_api_key(x_api_key: str = Header(None)):
"""Verify API key from X-API-Key header."""
if not x_api_key:
raise HTTPException(status_code=401, detail="API key required")
# Get key prefix (first 8 chars)
prefix = x_api_key[:8]
# Find API key by prefix
api_key_doc = api_keys_collection.find_one({"api_key_prefix": prefix})
if not api_key_doc:
raise HTTPException(status_code=401, detail="Invalid API key")
# Verify key using bcrypt
if not bcrypt.checkpw(
x_api_key.encode('utf-8'),
api_key_doc["api_key_hash"].encode('utf-8')
):
raise HTTPException(status_code=401, detail="Invalid API key")
# Check if enabled
if not api_key_doc.get("enabled", False):
raise HTTPException(status_code=403, detail="API key disabled")
# Check expiration
if api_key_doc.get("expires_at"):
if datetime.now() > api_key_doc["expires_at"]:
raise HTTPException(status_code=403, detail="API key expired")
# Update last_used timestamp
api_keys_collection.update_one(
{"_id": api_key_doc["_id"]},
{"$set": {"last_used": datetime.now()}}
)
return {
"user_id": api_key_doc["user_id"],
"scopes": api_key_doc.get("scopes", []),
"rate_limit": api_key_doc.get("rate_limit", 1000)
}
π¦ Rate LimitingΒΆ
Purpose: Prevent API abuse and ensure fair usage
Rate Limits by PlanΒΆ
| Plan | Requests/Hour | Burst Limit | Concurrent Requests |
|---|---|---|---|
| Free | 100 | 10/min | 1 |
| Pro | 1,000 | 50/min | 5 |
| Business | 10,000 | 200/min | 20 |
| Premium | 100,000 | 500/min | 100 |
Implementation (SlowAPI)ΒΆ
Installation:
Configuration:
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
app = FastAPI()
# Initialize limiter
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Custom key function (by user_id)
def get_user_id_from_token(request):
"""Extract user_id from JWT token for rate limiting."""
auth_header = request.headers.get("Authorization")
if not auth_header:
return get_remote_address(request) # Fallback to IP
try:
token = auth_header.replace("Bearer ", "")
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload.get("user_id", get_remote_address(request))
except:
return get_remote_address(request)
limiter_by_user = Limiter(key_func=get_user_id_from_token)
Usage on Endpoints:
@app.post("/v2/get-response-3d-chatbot")
@limiter.limit("10/minute") # Global limit
async def get_3d_response(...):
pass
@app.get("/v1/chatbots")
@limiter_by_user.limit("100/hour") # User-specific limit
async def get_chatbots(...):
pass
Dynamic Rate Limiting (by Plan):
def get_rate_limit_for_user(user_id):
"""Get rate limit based on user's subscription plan."""
user = users_collection.find_one({"user_id": user_id})
plan = user.get("subscription_plan", "Free")
limits = {
"Free": "100/hour",
"Pro": "1000/hour",
"Business": "10000/hour",
"Premium": "100000/hour"
}
return limits.get(plan, "100/hour")
@app.get("/v1/chatbots")
async def get_chatbots(user_context: dict = Depends(verify_jwt_token)):
user_id = user_context["user_id"]
rate_limit = get_rate_limit_for_user(user_id)
# Apply limit (requires custom implementation)
await check_rate_limit(user_id, rate_limit)
# Process request
...
Rate Limit HeadersΒΆ
Response Headers:
429 Too Many Requests:
{
"detail": "Rate limit exceeded. Try again in 1800 seconds.",
"limit": 1000,
"remaining": 0,
"reset": 1735214400
}
β Input ValidationΒΆ
Purpose: Prevent injection attacks and ensure data integrity
Request Validation (Pydantic)ΒΆ
Model-Based Validation:
from pydantic import BaseModel, EmailStr, Field, validator
class SignupRequest(BaseModel):
email: EmailStr # Validates email format
password: str = Field(..., min_length=8, max_length=128)
name: str = Field(..., min_length=1, max_length=100)
@validator('password')
def validate_password_strength(cls, v):
"""Ensure password contains uppercase, lowercase, and digit."""
if not any(c.isupper() for c in v):
raise ValueError("Password must contain uppercase letter")
if not any(c.islower() for c in v):
raise ValueError("Password must contain lowercase letter")
if not any(c.isdigit() for c in v):
raise ValueError("Password must contain digit")
return v
@validator('name')
def validate_name(cls, v):
"""Sanitize name (alphanumeric + spaces only)."""
if not all(c.isalnum() or c.isspace() for c in v):
raise ValueError("Name can only contain letters, numbers, and spaces")
return v.strip()
@app.post("/v2/signup")
async def signup(request: SignupRequest):
# Pydantic automatically validates
email = request.email
password = request.password
name = request.name
...
SQL Injection PreventionΒΆ
MongoDB (NoSQL Injection):
β Vulnerable:
# User input directly in query
email = request_data["email"]
user = db.users.find_one({"email": email})
β Safe (but still be careful):
# Parameterized query
email = request_data["email"]
user = db.users.find_one({"email": {"$eq": email}})
# Or use Pydantic validation
Potential NoSQL Injection:
// Attacker sends:
{"email": {"$ne": null}}
// Query becomes:
db.users.find_one({"email": {"$ne": null}})
// Returns first user!
Prevention:
from pydantic import BaseModel, EmailStr
class LoginRequest(BaseModel):
email: EmailStr # Must be valid email string
password: str
# Now email can never be an object
XSS (Cross-Site Scripting) PreventionΒΆ
Input Sanitization:
import html
def sanitize_html(text: str) -> str:
"""Escape HTML special characters."""
return html.escape(text)
# Usage
user_input = "<script>alert('XSS')</script>"
safe_input = sanitize_html(user_input)
# Result: "<script>alert('XSS')</script>"
Output Encoding:
from fastapi.responses import JSONResponse
@app.get("/api/chatbot-name/{chatbot_id}")
async def get_chatbot_name(chatbot_id: str):
chatbot = chatbots_collection.find_one({"_id": chatbot_id})
# FastAPI automatically escapes JSON
return JSONResponse({
"name": chatbot["name"] # Safely encoded
})
Path Traversal PreventionΒΆ
β Vulnerable:
@app.get("/files/{filename}")
async def get_file(filename: str):
# Attacker could send: ../../etc/passwd
with open(f"/uploads/{filename}", "r") as f:
return f.read()
β Safe:
import os
from pathlib import Path
@app.get("/files/{filename}")
async def get_file(filename: str):
# Validate filename
if ".." in filename or "/" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
# Use absolute path
base_dir = Path("/uploads").resolve()
file_path = (base_dir / filename).resolve()
# Ensure file is within base_dir
if not file_path.is_relative_to(base_dir):
raise HTTPException(status_code=400, detail="Invalid file path")
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path)
π CORS ConfigurationΒΆ
Current Implementation:
β Insecure (allows all origins):
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # β οΈ SECURITY ISSUE!
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
β Secure (whitelist specific origins):
# Environment-based configuration
ALLOWED_ORIGINS = os.getenv("CORS_ALLOWED_ORIGINS", "").split(",")
# Development
if os.getenv("ENVIRONMENT") == "development":
ALLOWED_ORIGINS.append("http://localhost:3000")
ALLOWED_ORIGINS.append("http://localhost:3001")
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://machineavatars.com",
"https://app.machineavatars.com",
"https://www.machineavatars.com",
*ALLOWED_ORIGINS
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["Authorization", "Content-Type", "X-API-Key"],
max_age=3600 # Cache preflight for 1 hour
)
Preflight Request Handling:
OPTIONS /v1/chatbots HTTP/1.1
Origin: https://app.machineavatars.com
Access-Control-Request-Method: POST
Access-Control-Request-Headers: Authorization, Content-Type
HTTP/1.1 200 OK
Access-Control-Allow-Origin: https://app.machineavatars.com
Access-Control-Allow-Methods: GET, POST, PUT, DELETE
Access-Control-Allow-Headers: Authorization, Content-Type
Access-Control-Allow-Credentials: true
Access-Control-Max-Age: 3600
π API VersioningΒΆ
Purpose: Maintain backward compatibility
Current Versions:
v1- Original APIv2- Updated API (login, signup improvements)
URL-Based Versioning:
Implementation:
# v1 endpoints
@app.post("/v1/login")
async def login_v1(...):
# Original login logic (no reCAPTCHA)
pass
# v2 endpoints
@app.post("/v2/login")
async def login_v2(...):
# Enhanced login logic (with reCAPTCHA)
pass
Deprecation Strategy:
from fastapi import HTTPException
import warnings
@app.post("/v1/login")
async def login_v1(...):
warnings.warn(
"v1/login is deprecated. Use /v2/login instead.",
DeprecationWarning
)
# Add deprecation header
response = JSONResponse({...})
response.headers["X-API-Deprecated"] = "true"
response.headers["X-API-Sunset"] = "2025-06-01" # Removal date
response.headers["X-API-Replacement"] = "/v2/login"
return response
π‘οΈ Security HeadersΒΆ
Required Security Headers:
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
# HTTPS redirect (production only)
if os.getenv("ENVIRONMENT") == "production":
app.add_middleware(HTTPSRedirectMiddleware)
# Trusted hosts
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=[
"api.machineavatars.com",
"*.machineavatars.com",
"localhost" # Development only
]
)
# Security headers middleware
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"
# Prevent MIME sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# XSS protection
response.headers["X-XSS-Protection"] = "1; mode=block"
# Content Security Policy
response.headers["Content-Security-Policy"] = "default-src 'self'"
# Strict Transport Security (HTTPS only)
if request.url.scheme == "https":
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
# Referrer policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions policy
response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
return response
π API Logging & MonitoringΒΆ
Request Logging:
import time
import uuid
@app.middleware("http")
async def log_requests(request, call_next):
request_id = str(uuid.uuid4())
start_time = time.time()
# Log request
logger.info(f"Request started", extra={
"request_id": request_id,
"method": request.method,
"url": str(request.url),
"client_ip": request.client.host,
"user_agent": request.headers.get("user-agent")
})
# Process request
response = await call_next(request)
# Calculate duration
duration = time.time() - start_time
# Log response
logger.info(f"Request completed", extra={
"request_id": request_id,
"status_code": response.status_code,
"duration_ms": duration * 1000
})
# Add request ID to response headers
response.headers["X-Request-ID"] = request_id
return response
Sensitive Data Redaction:
def redact_sensitive_data(data: dict) -> dict:
"""Remove sensitive fields from logs."""
sensitive_fields = ["password", "api_key", "token", "secret"]
redacted = data.copy()
for field in sensitive_fields:
if field in redacted:
redacted[field] = "***REDACTED***"
return redacted
# Usage
logger.info("User created", extra=redact_sensitive_data(user_data))
π Error Handling Best PracticesΒΆ
Standardized Error Responses:
from fastapi import HTTPException
from fastapi.responses import JSONResponse
class APIError(Exception):
def __init__(self, status_code: int, message: str, error_code: str = None):
self.status_code = status_code
self.message = message
self.error_code = error_code or f"ERR_{status_code}"
@app.exception_handler(APIError)
async def api_error_handler(request, exc: APIError):
return JSONResponse(
status_code=exc.status_code,
content={
"error": {
"code": exc.error_code,
"message": exc.message,
"request_id": request.headers.get("X-Request-ID")
}
}
)
# Usage
raise APIError(
status_code=403,
message="Insufficient permissions to access this resource",
error_code="INSUFFICIENT_PERMISSIONS"
)
Error Response Format:
{
"error": {
"code": "INSUFFICIENT_PERMISSIONS",
"message": "Insufficient permissions to access this resource",
"request_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
}
}
π Related DocumentationΒΆ
Security:
- Authentication & Authorization - JWT, API keys
- Secret Management - API key storage
- Network Security - Firewall, DDoS protection
Backend:
- Gateway Service - API gateway
- All Backend Services - 23 services
Features:
- Platform Capabilities - API features by plan
"Secure APIs build trust with every request." πβ