Skip to content

API SecurityΒΆ

Section: 5-security-architecture
Document: API Security
Status: Comprehensive API Security Documentation
Audience: Backend developers, security teams, API consumers


🎯 Overview¢

MachineAvatars exposes RESTful APIs for chatbot operations, user management, and data processing. This document covers API authentication, authorization, rate limiting, input validation, and security best practices.

API Gateway: Ports 8000 (HTTP) and 9000 (HTTPS)
Services: 23 microservices behind gateway
Authentication: JWT bearer tokens
Rate Limiting: By plan tier


πŸ” API AuthenticationΒΆ

JWT Token-Based AuthenticationΒΆ

Standard Flow:

sequenceDiagram
    participant C as Client
    participant G as Gateway
    participant S as Backend Service

    C->>G: GET /v1/chatbots<br/>Authorization: Bearer {token}

    Note over G: Verify JWT token
    alt Token missing
        G-->>C: 401 Unauthorized
    end

    alt Token invalid/expired
        G-->>C: 401 Invalid token
    end

    alt Token valid
        G->>S: Forward request<br/>(with user context)
        S->>S: Process request
        S-->>G: Response
        G-->>C: 200 OK + data
    end

Token ValidationΒΆ

Required Headers:

Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

Validation Code (Gateway/Services):

import jwt
from fastapi import HTTPException, Header

JWT_SECRET = os.getenv("JWT_SECRET")
JWT_ALGORITHM = "HS256"

def verify_jwt_token(authorization: str = Header(None)):
    """Verify JWT token from Authorization header."""
    if not authorization:
        raise HTTPException(
            status_code=401,
            detail="Authorization header missing"
        )

    # Extract token (remove "Bearer " prefix)
    if not authorization.startswith("Bearer "):
        raise HTTPException(
            status_code=401,
            detail="Invalid authorization format. Use 'Bearer {token}'"
        )

    token = authorization.replace("Bearer ", "")

    try:
        # Decode and verify signature
        payload = jwt.decode(
            token,
            JWT_SECRET,
            algorithms=[JWT_ALGORITHM]
        )

        # Extract user context
        user_id = payload.get("user_id")
        email = payload.get("email")

        if not user_id:
            raise HTTPException(
                status_code=401,
                detail="Invalid token payload"
            )

        return {"user_id": user_id, "email": email}

    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=401,
            detail="Token expired. Please login again."
        )
    except jwt.InvalidTokenError as e:
        raise HTTPException(
            status_code=401,
            detail=f"Invalid token: {str(e)}"
        )

Usage in Endpoints:

from fastapi import Depends

@app.get("/v1/chatbots")
async def get_chatbots(user_context: dict = Depends(verify_jwt_token)):
    user_id = user_context["user_id"]

    # Fetch chatbots for this user only
    chatbots = chatbots_collection.find({"user_id": user_id})

    return list(chatbots)

API Key Authentication (Business/Premium Plans)ΒΆ

For programmatic access (non-browser clients)

API Key Generation:

import secrets

def generate_api_key():
    """Generate secure 32-byte API key."""
    return f"ma_{secrets.token_urlsafe(32)}"
    # Example: "ma_xK7jP9nL4mQ2sR8vT3wY6zA5bD1hC0eN"

API Key Storage:

{
    "_id": ObjectId("..."),
    "user_id": "User-123456",
    "api_key_hash": "$2b$12$...",  // bcrypt hash of API key
    "api_key_prefix": "ma_xK7j",   // For user identification
    "name": "Production API Key",
    "created_at": "2025-01-15T10:00:00Z",
    "last_used": "2025-01-20T14:30:00Z",
    "expires_at": "2026-01-15T10:00:00Z",  // Optional
    "scopes": ["chatbot:read", "chatbot:write", "analytics:read"],
    "rate_limit": 10000,  // Requests per hour
    "enabled": true
}

API Key Usage:

curl https://api.machineavatars.com/v1/chatbots \
  -H "X-API-Key: ma_xK7jP9nL4mQ2sR8vT3wY6zA5bD1hC0eN"

Validation:

import bcrypt
from fastapi import HTTPException, Header

def verify_api_key(x_api_key: str = Header(None)):
    """Verify API key from X-API-Key header."""
    if not x_api_key:
        raise HTTPException(status_code=401, detail="API key required")

    # Get key prefix (first 8 chars)
    prefix = x_api_key[:8]

    # Find API key by prefix
    api_key_doc = api_keys_collection.find_one({"api_key_prefix": prefix})

    if not api_key_doc:
        raise HTTPException(status_code=401, detail="Invalid API key")

    # Verify key using bcrypt
    if not bcrypt.checkpw(
        x_api_key.encode('utf-8'),
        api_key_doc["api_key_hash"].encode('utf-8')
    ):
        raise HTTPException(status_code=401, detail="Invalid API key")

    # Check if enabled
    if not api_key_doc.get("enabled", False):
        raise HTTPException(status_code=403, detail="API key disabled")

    # Check expiration
    if api_key_doc.get("expires_at"):
        if datetime.now() > api_key_doc["expires_at"]:
            raise HTTPException(status_code=403, detail="API key expired")

    # Update last_used timestamp
    api_keys_collection.update_one(
        {"_id": api_key_doc["_id"]},
        {"$set": {"last_used": datetime.now()}}
    )

    return {
        "user_id": api_key_doc["user_id"],
        "scopes": api_key_doc.get("scopes", []),
        "rate_limit": api_key_doc.get("rate_limit", 1000)
    }

🚦 Rate Limiting¢

Purpose: Prevent API abuse and ensure fair usage

Rate Limits by PlanΒΆ

Plan Requests/Hour Burst Limit Concurrent Requests
Free 100 10/min 1
Pro 1,000 50/min 5
Business 10,000 200/min 20
Premium 100,000 500/min 100

Implementation (SlowAPI)ΒΆ

Installation:

pip install slowapi

Configuration:

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

app = FastAPI()

# Initialize limiter
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# Custom key function (by user_id)
def get_user_id_from_token(request):
    """Extract user_id from JWT token for rate limiting."""
    auth_header = request.headers.get("Authorization")
    if not auth_header:
        return get_remote_address(request)  # Fallback to IP

    try:
        token = auth_header.replace("Bearer ", "")
        payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
        return payload.get("user_id", get_remote_address(request))
    except:
        return get_remote_address(request)

limiter_by_user = Limiter(key_func=get_user_id_from_token)

Usage on Endpoints:

@app.post("/v2/get-response-3d-chatbot")
@limiter.limit("10/minute")  # Global limit
async def get_3d_response(...):
    pass

@app.get("/v1/chatbots")
@limiter_by_user.limit("100/hour")  # User-specific limit
async def get_chatbots(...):
    pass

Dynamic Rate Limiting (by Plan):

def get_rate_limit_for_user(user_id):
    """Get rate limit based on user's subscription plan."""
    user = users_collection.find_one({"user_id": user_id})
    plan = user.get("subscription_plan", "Free")

    limits = {
        "Free": "100/hour",
        "Pro": "1000/hour",
        "Business": "10000/hour",
        "Premium": "100000/hour"
    }

    return limits.get(plan, "100/hour")

@app.get("/v1/chatbots")
async def get_chatbots(user_context: dict = Depends(verify_jwt_token)):
    user_id = user_context["user_id"]
    rate_limit = get_rate_limit_for_user(user_id)

    # Apply limit (requires custom implementation)
    await check_rate_limit(user_id, rate_limit)

    # Process request
    ...

Rate Limit HeadersΒΆ

Response Headers:

X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 847
X-RateLimit-Reset: 1735214400

429 Too Many Requests:

{
  "detail": "Rate limit exceeded. Try again in 1800 seconds.",
  "limit": 1000,
  "remaining": 0,
  "reset": 1735214400
}

βœ… Input ValidationΒΆ

Purpose: Prevent injection attacks and ensure data integrity

Request Validation (Pydantic)ΒΆ

Model-Based Validation:

from pydantic import BaseModel, EmailStr, Field, validator

class SignupRequest(BaseModel):
    email: EmailStr  # Validates email format
    password: str = Field(..., min_length=8, max_length=128)
    name: str = Field(..., min_length=1, max_length=100)

    @validator('password')
    def validate_password_strength(cls, v):
        """Ensure password contains uppercase, lowercase, and digit."""
        if not any(c.isupper() for c in v):
            raise ValueError("Password must contain uppercase letter")
        if not any(c.islower() for c in v):
            raise ValueError("Password must contain lowercase letter")
        if not any(c.isdigit() for c in v):
            raise ValueError("Password must contain digit")
        return v

    @validator('name')
    def validate_name(cls, v):
        """Sanitize name (alphanumeric + spaces only)."""
        if not all(c.isalnum() or c.isspace() for c in v):
            raise ValueError("Name can only contain letters, numbers, and spaces")
        return v.strip()

@app.post("/v2/signup")
async def signup(request: SignupRequest):
    # Pydantic automatically validates
    email = request.email
    password = request.password
    name = request.name
    ...

SQL Injection PreventionΒΆ

MongoDB (NoSQL Injection):

❌ Vulnerable:

# User input directly in query
email = request_data["email"]
user = db.users.find_one({"email": email})

βœ… Safe (but still be careful):

# Parameterized query
email = request_data["email"]
user = db.users.find_one({"email": {"$eq": email}})

# Or use Pydantic validation

Potential NoSQL Injection:

// Attacker sends:
{"email": {"$ne": null}}

// Query becomes:
db.users.find_one({"email": {"$ne": null}})
// Returns first user!

Prevention:

from pydantic import BaseModel, EmailStr

class LoginRequest(BaseModel):
    email: EmailStr  # Must be valid email string
    password: str

# Now email can never be an object

XSS (Cross-Site Scripting) PreventionΒΆ

Input Sanitization:

import html

def sanitize_html(text: str) -> str:
    """Escape HTML special characters."""
    return html.escape(text)

# Usage
user_input = "<script>alert('XSS')</script>"
safe_input = sanitize_html(user_input)
# Result: "&lt;script&gt;alert('XSS')&lt;/script&gt;"

Output Encoding:

from fastapi.responses import JSONResponse

@app.get("/api/chatbot-name/{chatbot_id}")
async def get_chatbot_name(chatbot_id: str):
    chatbot = chatbots_collection.find_one({"_id": chatbot_id})

    # FastAPI automatically escapes JSON
    return JSONResponse({
        "name": chatbot["name"]  # Safely encoded
    })

Path Traversal PreventionΒΆ

❌ Vulnerable:

@app.get("/files/{filename}")
async def get_file(filename: str):
    # Attacker could send: ../../etc/passwd
    with open(f"/uploads/{filename}", "r") as f:
        return f.read()

βœ… Safe:

import os
from pathlib import Path

@app.get("/files/{filename}")
async def get_file(filename: str):
    # Validate filename
    if ".." in filename or "/" in filename:
        raise HTTPException(status_code=400, detail="Invalid filename")

    # Use absolute path
    base_dir = Path("/uploads").resolve()
    file_path = (base_dir / filename).resolve()

    # Ensure file is within base_dir
    if not file_path.is_relative_to(base_dir):
        raise HTTPException(status_code=400, detail="Invalid file path")

    if not file_path.exists():
        raise HTTPException(status_code=404, detail="File not found")

    return FileResponse(file_path)

πŸ”’ CORS ConfigurationΒΆ

Current Implementation:

❌ Insecure (allows all origins):

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # ⚠️ SECURITY ISSUE!
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

βœ… Secure (whitelist specific origins):

# Environment-based configuration
ALLOWED_ORIGINS = os.getenv("CORS_ALLOWED_ORIGINS", "").split(",")

# Development
if os.getenv("ENVIRONMENT") == "development":
    ALLOWED_ORIGINS.append("http://localhost:3000")
    ALLOWED_ORIGINS.append("http://localhost:3001")

app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "https://machineavatars.com",
        "https://app.machineavatars.com",
        "https://www.machineavatars.com",
        *ALLOWED_ORIGINS
    ],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["Authorization", "Content-Type", "X-API-Key"],
    max_age=3600  # Cache preflight for 1 hour
)

Preflight Request Handling:

OPTIONS /v1/chatbots HTTP/1.1
Origin: https://app.machineavatars.com
Access-Control-Request-Method: POST
Access-Control-Request-Headers: Authorization, Content-Type

HTTP/1.1 200 OK
Access-Control-Allow-Origin: https://app.machineavatars.com
Access-Control-Allow-Methods: GET, POST, PUT, DELETE
Access-Control-Allow-Headers: Authorization, Content-Type
Access-Control-Allow-Credentials: true
Access-Control-Max-Age: 3600

πŸ“ API VersioningΒΆ

Purpose: Maintain backward compatibility

Current Versions:

  • v1 - Original API
  • v2 - Updated API (login, signup improvements)

URL-Based Versioning:

POST /v1/login       # Old endpoint
POST /v2/login       # New endpoint (with reCAPTCHA)

Implementation:

# v1 endpoints
@app.post("/v1/login")
async def login_v1(...):
    # Original login logic (no reCAPTCHA)
    pass

# v2 endpoints
@app.post("/v2/login")
async def login_v2(...):
    # Enhanced login logic (with reCAPTCHA)
    pass

Deprecation Strategy:

from fastapi import HTTPException
import warnings

@app.post("/v1/login")
async def login_v1(...):
    warnings.warn(
        "v1/login is deprecated. Use /v2/login instead.",
        DeprecationWarning
    )

    # Add deprecation header
    response = JSONResponse({...})
    response.headers["X-API-Deprecated"] = "true"
    response.headers["X-API-Sunset"] = "2025-06-01"  # Removal date
    response.headers["X-API-Replacement"] = "/v2/login"

    return response

πŸ›‘οΈ Security HeadersΒΆ

Required Security Headers:

from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware

# HTTPS redirect (production only)
if os.getenv("ENVIRONMENT") == "production":
    app.add_middleware(HTTPSRedirectMiddleware)

# Trusted hosts
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=[
        "api.machineavatars.com",
        "*.machineavatars.com",
        "localhost"  # Development only
    ]
)

# Security headers middleware
@app.middleware("http")
async def add_security_headers(request, call_next):
    response = await call_next(request)

    # Prevent clickjacking
    response.headers["X-Frame-Options"] = "DENY"

    # Prevent MIME sniffing
    response.headers["X-Content-Type-Options"] = "nosniff"

    # XSS protection
    response.headers["X-XSS-Protection"] = "1; mode=block"

    # Content Security Policy
    response.headers["Content-Security-Policy"] = "default-src 'self'"

    # Strict Transport Security (HTTPS only)
    if request.url.scheme == "https":
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"

    # Referrer policy
    response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"

    # Permissions policy
    response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"

    return response

πŸ” API Logging & MonitoringΒΆ

Request Logging:

import time
import uuid

@app.middleware("http")
async def log_requests(request, call_next):
    request_id = str(uuid.uuid4())
    start_time = time.time()

    # Log request
    logger.info(f"Request started", extra={
        "request_id": request_id,
        "method": request.method,
        "url": str(request.url),
        "client_ip": request.client.host,
        "user_agent": request.headers.get("user-agent")
    })

    # Process request
    response = await call_next(request)

    # Calculate duration
    duration = time.time() - start_time

    # Log response
    logger.info(f"Request completed", extra={
        "request_id": request_id,
        "status_code": response.status_code,
        "duration_ms": duration * 1000
    })

    # Add request ID to response headers
    response.headers["X-Request-ID"] = request_id

    return response

Sensitive Data Redaction:

def redact_sensitive_data(data: dict) -> dict:
    """Remove sensitive fields from logs."""
    sensitive_fields = ["password", "api_key", "token", "secret"]

    redacted = data.copy()
    for field in sensitive_fields:
        if field in redacted:
            redacted[field] = "***REDACTED***"

    return redacted

# Usage
logger.info("User created", extra=redact_sensitive_data(user_data))

πŸ“Š Error Handling Best PracticesΒΆ

Standardized Error Responses:

from fastapi import HTTPException
from fastapi.responses import JSONResponse

class APIError(Exception):
    def __init__(self, status_code: int, message: str, error_code: str = None):
        self.status_code = status_code
        self.message = message
        self.error_code = error_code or f"ERR_{status_code}"

@app.exception_handler(APIError)
async def api_error_handler(request, exc: APIError):
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": {
                "code": exc.error_code,
                "message": exc.message,
                "request_id": request.headers.get("X-Request-ID")
            }
        }
    )

# Usage
raise APIError(
    status_code=403,
    message="Insufficient permissions to access this resource",
    error_code="INSUFFICIENT_PERMISSIONS"
)

Error Response Format:

{
  "error": {
    "code": "INSUFFICIENT_PERMISSIONS",
    "message": "Insufficient permissions to access this resource",
    "request_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
  }
}

Security:

Backend:

Features:


"Secure APIs build trust with every request." πŸ”βœ