Auth Service - Complete Developer Documentation¶
Service: Authentication Service
Port: 8001
Purpose: JWT token generation for authenticated users
Technology: FastAPI (Python 3.9+)
Code Location:/auth-service/src/main.py(106 lines, 1 endpoint)
Owner: Backend Team
Last Updated: 2025-12-26
Table of Contents¶
- Service Overview
- Architecture & Design
- Complete Code Walkthrough
- Login Endpoint Deep-Dive
- JWT Token Implementation
- Database Schema
- Security Analysis
- Error Handling
- Deployment
- Relationship with User Service
Service Overview¶
The Auth Service is a minimal authentication service responsible for a single critical function: validating user credentials and issuing JWT tokens.
Key Responsibility¶
✅ ONE ENDPOINT: POST /v2/login - Validate credentials & issue JWT
⚠️ Note: Most authentication logic (signup, OTP, password reset) is handled by User Service (8002)
Why So Minimal?¶
Architectural Decision: The Auth Service was intentionally kept minimal to:
- Separation of Concerns: Login validation vs. user management
- Microservice Principle: Single responsibility (token issuance)
- Future Scalability: Easy to replace with OAuth2/OIDC provider
- Simplicity: Easier to audit for security
Statistics¶
- Total Lines: 106
- Endpoints: 1 (
/v2/login) - Dependencies: FastAPI, PyMongo, PyJWT
- Average Response Time: 50-100ms
Architecture & Design¶
Position in System¶
graph TB
subgraph "Client"
A[Frontend]
end
subgraph "Gateway Layer"
B[Gateway<br/>Port 8000]
end
subgraph "Authentication Services"
C[Auth Service<br/>Port 8001<br/>LOGIN ONLY]
D[User Service<br/>Port 8002<br/>Signup, OTP, etc.]
end
subgraph "Database"
E[(Cosmos DB<br/>users_multichatbot_v2)]
end
A -->|POST /v2/login| B
B -->|Verify reCAPTCHA| B
B -->|Forward| C
C -->|Query user| E
C -->|Return JWT| B
B -->|Return token| A
A -->|POST /v2/signup| B
B -->|Forward| D
D -->|Create user| E
style C fill:#FFE082
style D fill:#E3F2FD
Responsibilities Split¶
| Function | Auth Service (8001) | User Service (8002) |
|---|---|---|
| Login | ✅ Validate & issue JWT | ❌ |
| Signup | ❌ | ✅ Create user |
| OTP Verification | ❌ | ✅ Verify OTP |
| Password Reset | ❌ | ✅ Handle reset |
| JWT Creation | ✅ Core function | ✅ Also has JWT helper |
Complete Code Walkthrough¶
Full Source Code (main.py)¶
Lines 1-106 (Complete File):
from fastapi import FastAPI, HTTPException, Form
from fastapi.responses import JSONResponse
from pymongo import MongoClient
import os
import jwt
from datetime import datetime, timedelta
import time
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from dotenv import load_dotenv
import random
import requests
# Import standardized logger (works in both Docker module and script mode)
try:
from .logger import logger
except ImportError:
import logger as logger_module
logger = logger_module.logger
app = FastAPI()
# CORS Configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # ⚠️ PRODUCTION: Should restrict
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Load environment variables
import os
from pathlib import Path
from dotenv import load_dotenv
from pymongo import MongoClient
env_path = Path(__file__).resolve().parents[2] / ".env"
load_dotenv(dotenv_path=env_path)
# Get values from environment
mongo_uri = os.getenv("MONGO_URI")
db_name = os.getenv("MONGO_DB_NAME")
# Initialize MongoDB client and database
client = MongoClient(mongo_uri)
db = client[db_name]
logger.info(f"✓ Connected to MongoDB: {db.name}")
users_collection = db["users_multichatbot_v2"]
# JWT Secret and Algorithm
JWT_SECRET = os.getenv("JWT_SECRET", "your_jwt_secret")
JWT_ALGORITHM = "HS256"
# Generate JWT Token (Duplicate definition - see line 94)
def create_jwt_token(data: dict, expires_delta: timedelta = timedelta(minutes=15)):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
return jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
@app.post("/v2/login")
async def login(email: str = Form(...), password: str = Form(...)):
logger.info(f"Login attempt for email: {email}")
user = users_collection.find_one({"email": email})
if not user:
logger.warning(f"Login failed: User does not exist - {email}")
raise HTTPException(status_code=404, detail="User does not exist, please create a new account.")
if user["password"] != password: # ⚠️ PLAIN TEXT COMPARISON
logger.warning(f"Login failed: Invalid credentials for {email}")
raise HTTPException(status_code=401, detail="Invalid credentials")
if not user.get("verified"):
logger.warning(f"Login failed: Account not verified - {email}")
raise HTTPException(status_code=403, detail="Account not verified")
user_id = user.get("user_id")
token = create_jwt_token({"email": email, "user_id": user_id})
logger.info(f"✓ Login successful for user: {user_id}")
return JSONResponse(content={
"message": "Login successful",
"token": token,
"user_id": user_id
})
# Duplicate JWT function with different expiry (30 min vs 15 min)
def create_jwt_token(data: dict, expires_delta: timedelta = timedelta(minutes=30)):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
return jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
# OTP generator (not used in this service, but defined)
def generate_otp():
return ''.join(random.choices("0123456789", k=6))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)
Code Analysis¶
Issues Identified:
-
Duplicate Function:
create_jwt_tokendefined twice (lines 60 & 94) -
First: 15-minute expiry
-
Second: 30-minute expiry (overwrites first)
-
Unused Function:
generate_otp()(line 100) - not called anywhere -
⚠️ SECURITY ISSUE: Plain text password comparison (line 76)
user["password"] != password- No bcrypt/hashing!
Login Endpoint Deep-Dive¶
Endpoint: POST /v2/login¶
Code Location: Lines 67-92
Purpose: Validate user credentials and return JWT token
Request:
Example Request (curl):
curl -X POST http://localhost:8001/v2/login \
-F "email=user@example.com" \
-F "password=mypassword123"
Login Flow (Step-by-Step)¶
sequenceDiagram
participant C as Client
participant A as Auth Service
participant DB as Cosmos DB
C->>A: POST /v2/login<br/>(email, password)
Note over A: Step 1: Query user
A->>DB: db.users_multichatbot_v2.findOne({email})
DB-->>A: User document or null
alt User not found
A-->>C: 404 "User does not exist"
end
Note over A: Step 2: Check password
alt Password mismatch
A-->>C: 401 "Invalid credentials"
end
Note over A: Step 3: Check verification
alt Not verified
A-->>C: 403 "Account not verified"
end
Note over A: Step 4: Generate JWT
A->>A: create_jwt_token({email, user_id})
Note over A: Step 5: Return success
A-->>C: 200 {message, token, user_id}
Step 1: Query User¶
Code (Line 70):
MongoDB Query:
Result:
{
"_id": ObjectId("6789abcd..."),
"user_id": "User_12345",
"email": "user@example.com",
"password": "mypassword123", // ⚠️ PLAIN TEXT!
"name": "John Doe",
"verified": true,
"subscription_plan": "Professional",
"subscription_date": "2025-01-15T00:00:00Z",
"created_at": "2025-01-15T10:00:00Z"
}
If user not found:
if not user:
raise HTTPException(
status_code=404,
detail="User does not exist, please create a new account."
)
Step 2: Validate Password¶
Code (Line 76):
if user["password"] != password:
raise HTTPException(
status_code=401,
detail="Invalid credentials"
)
⚠️ CRITICAL SECURITY ISSUE:
This compares plain text passwords directly!
Current Implementation:
Should Be (with bcrypt):
import bcrypt
if not bcrypt.checkpw(password.encode('utf-8'), user["password_hash"].encode('utf-8')):
raise HTTPException(status_code=401, detail="Invalid credentials")
Why This Is a Problem:
- Passwords stored in plain text in database
- Database breach = all passwords exposed
- No password hashing = violates security best practices
Recommendation: Implement bcrypt hashing immediately!
Step 3: Check Verification Status¶
Code (Line 80):
Purpose: Ensure user has verified their email via OTP
Verification Flow:
- User signs up (User Service)
- OTP sent to email
- User verifies OTP (User Service)
verifiedfield set totrue- Now user can login
Step 4: Generate JWT Token¶
Code (Lines 84-85):
Calls:
def create_jwt_token(data: dict, expires_delta: timedelta = timedelta(minutes=30)):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
return jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
JWT Payload:
{
"email": "user@example.com",
"user_id": "User_12345",
"exp": 1735214400 // Unix timestamp (30 min from now)
}
JWT Structure:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6InVzZXJAZXhhbXBsZS5jb20iLCJ1c2VyX2lkIjoiVXNlcl8xMjM0NSIsImV4cCI6MTczNTIxNDQwMH0.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
HEADER . PAYLOAD . SIGNATURE
Header:
Payload:
Signature:
Step 5: Return Success Response¶
Code (Lines 88-92):
Response:
{
"message": "Login successful",
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"user_id": "User_12345"
}
Frontend Usage:
// Store token
localStorage.setItem("auth_token", response.token);
// Use in subsequent requests
fetch("https://api.example.com/chatbots", {
headers: {
Authorization: `Bearer ${token}`,
},
});
JWT Token Implementation¶
Token Generation Function¶
Code (Lines 94-98):
def create_jwt_token(data: dict, expires_delta: timedelta = timedelta(minutes=30)):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
return jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
Configuration¶
Environment Variables:
JWT_SECRET=your_jwt_secret_key_here # Should be 256-bit random string
JWT_ALGORITHM=HS256 # HMAC with SHA-256
⚠️ SECURITY RECOMMENDATION:
# Generate secure secret:
openssl rand -base64 32
# Output: "dGhpcyBpcyBhIHNlY3VyZSBzZWNyZXQga2V5IGZvciBqd3Q="
JWT_SECRET=dGhpcyBpcyBhIHNlY3VyZSBzZWNyZXQga2V5IGZvciBqd3Q=
Token Expiration¶
Current: 30 minutes (1800 seconds)
Expiration Calculation:
expire = datetime.utcnow() + timedelta(minutes=30)
# Current time: 2025-01-15 14:00:00
# Expiry time: 2025-01-15 14:30:00
# Unix timestamp: 1735214400
Decoded Token:
{
"email": "user@example.com",
"user_id": "User_12345",
"exp": 1735214400, // Expiry
"iat": 1735212600 // Issued At (not included, but recommended)
}
Recommendation - Add iat field:
def create_jwt_token(data: dict, expires_delta: timedelta = timedelta(minutes=30)):
to_encode = data.copy()
now = datetime.utcnow()
expire = now + expires_delta
to_encode.update({
"exp": expire,
"iat": now # Issued at timestamp
})
return jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
Token Validation (Not in Auth Service)¶
Where JWT is validated:
- Gateway Service: Could validate (currently doesn't seem to)
- Individual Services: Should validate before processing
Validation Code (Example):
import jwt
from fastapi import HTTPException, Header
def verify_token(authorization: str = Header(None)):
if not authorization:
raise HTTPException(status_code=401, detail="No authorization header")
try:
# Remove "Bearer " prefix
token = authorization.replace("Bearer ", "")
# Decode and verify
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload # {"email": "...", "user_id": "..."}
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
Database Schema¶
Collection: users_multichatbot_v2¶
Complete Schema:
{
_id: ObjectId("6789abcd..."),
// Identity
user_id: "User_12345", // ⭐ Primary identifier (not _id)
email: "user@example.com", // Unique, indexed
// Authentication
password: "mypassword123", // ⚠️ PLAIN TEXT! Should be password_hash
verified: true, // Email verified via OTP
// Profile
name: "John Doe",
// Subscription
subscription_plan: "Professional", // "Free", "Starter", "Professional", "Business", "Enterprise"
subscription_date: "2025-01-15T00:00:00Z",
subscription_expiry: "2025-02-15T00:00:00Z", // Optional
// Timestamps
created_at: "2025-01-15T10:00:00Z",
last_login: "2025-01-20T14:00:00Z", // Optional, updated on login
// Optional fields
phone: "+91-9876543210", // If provided
company: "Acme Corp",
role: "Admin"
}
Indexes¶
Email Index (Unique):
user_id Index:
Sample Documents¶
Verified User:
{
"_id": ObjectId("65a1b2c3d4e5f6789abc"),
"user_id": "User_12345",
"email": "john@example.com",
"password": "securepass123", // Should be hashed!
"name": "John Doe",
"verified": true,
"subscription_plan": "Professional",
"subscription_date": "2025-01-15T00:00:00Z",
"created_at": "2025-01-15T10:00:00Z"
}
Unverified User (Fresh Signup):
{
"_id": ObjectId("65a1b2c3d4e5f6789abd"),
"user_id": "User_67890",
"email": "jane@example.com",
"password": "password456",
"name": "Jane Smith",
"verified": false, // ❌ Cannot login yet
"subscription_plan": "Free",
"created_at": "2025-01-20T14:30:00Z"
}
Security Analysis¶
Current Security Issues¶
1. ⚠️ CRITICAL: Plain Text Passwords
Problem:
Impact:
- Passwords stored in database as plain text
- Database breach = all passwords exposed
- Violates OWASP, GDPR, PCI-DSS standards
Fix Required:
import bcrypt
# On signup (User Service)
password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(rounds=12))
# On login (Auth Service)
if not bcrypt.checkpw(password.encode('utf-8'), user["password_hash"].encode('utf-8')):
raise HTTPException(status_code=401, detail="Invalid credentials")
2. ⚠️ JWT Secret Hardcoded
Problem:
If env var not set, falls back to "your_jwt_secret"
Fix:
JWT_SECRET = os.getenv("JWT_SECRET")
if not JWT_SECRET:
raise ValueError("JWT_SECRET environment variable must be set!")
3. ⚠️ CORS Allows All Origins
Problem:
Fix:
allow_origins=[
"https://machineavatars.com",
"https://app.machineavatars.com",
"http://localhost:3000" # Dev only
]
4. ⚠️ No Rate Limiting
Problem: Brute force attacks possible (unlimited login attempts)
Fix: Add rate limiting middleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.post("/v2/login")
@limiter.limit("5/minute") # 5 attempts per minute
async def login(...):
...
5. ⚠️ No Account Lockout
Problem: After N failed attempts, account should lock
Fix: Track failed attempts in database
{
"email": "user@example.com",
"failed_login_attempts": 3,
"account_locked_until": "2025-01-15T15:00:00Z"
}
Security Best Practices Implemented¶
✅ JWT Tokens: Industry-standard authentication
✅ HTTPS: Required in production (enforced by Azure)
✅ Logging: All login attempts logged
✅ Error Messages: Don't reveal if email exists (should improve)
Error Handling¶
Error Responses¶
1. User Not Found (404):
Response:
⚠️ Security Issue: Reveals if email exists (enables email enumeration)
Better Response:
2. Invalid Password (401):
3. Account Not Verified (403):
Frontend Action: Redirect to OTP verification page
Logging¶
Login Attempt:
Success:
Failures:
logger.warning(f"Login failed: User does not exist - {email}")
logger.warning(f"Login failed: Invalid credentials for {email}")
logger.warning(f"Login failed: Account not verified - {email}")
Log Format (JSON):
{
"timestamp": "2025-01-15T14:00:00Z",
"level": "INFO",
"service": "auth-service",
"message": "✓ Login successful for user: User_12345",
"email": "user@example.com"
}
Deployment¶
Docker Configuration¶
Dockerfile:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ .
EXPOSE 8001
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001"]
Docker Compose¶
auth-service:
build: ./auth-service
container_name: auth-service
ports:
- "8001:8001"
networks:
- app-network
labels:
- "logging=loki"
- "app=machine-agent-app"
environment:
- MONGO_URI=${MONGO_URI}
- MONGO_DB_NAME=Machine_agent_demo
- JWT_SECRET=${JWT_SECRET}
- JWT_ALGORITHM=HS256
# DataDog APM
- DATADOG_API_KEY=${DATADOG_API_KEY}
- DD_SITE=us3.datadoghq.com
- DD_AGENT_HOST=datadog-agent
- DD_TRACE_AGENT_PORT=8126
- DD_ENV=production
- DD_SERVICE=auth-service
- DD_VERSION=1.0.0
- DD_LOGS_INJECTION=true
- DD_TRACE_ENABLED=true
restart: always
Requirements.txt¶
# Web framework
fastapi>=0.95.0
uvicorn[standard]>=0.22.0
# Database
pymongo>=4.3.3
# Authentication & Security
PyJWT>=2.8.0
python-jose>=3.3.0
python-multipart>=0.0.6
# Environment variables
python-dotenv>=1.0.0
# Email (Azure) - Not used in Auth Service
azure-communication-email>=1.0.0
# HTTP client
requests>=2.31.0
# Monitoring & Tracing
ddtrace>=1.19.0
# Vector Database - Not used in Auth Service
pymilvus==2.3.4
Unused Dependencies:
azure-communication-email- Email sent by User Servicepymilvus- Not used by Auth Servicerequests- Imported but not used
Relationship with User Service¶
Division of Responsibilities¶
graph TB
subgraph "User Flow"
A[Client]
end
subgraph "Auth Service 8001"
B[POST /v2/login]
end
subgraph "User Service 8002"
C[POST /v2/signup]
D[POST /v2/verify-otp]
E[POST /v2/forgot-password]
F[POST /v2/reset-password]
end
A -->|1. Signup| C
A -->|2. Verify Email| D
A -->|3. Login| B
A -->|4. Reset Password| E
A -->|5. Confirm Reset| F
style B fill:#FFE082
style C fill:#E3F2FD
style D fill:#E3F2FD
style E fill:#E3F2FD
style F fill:#E3F2FD
Why This Split?¶
Auth Service (8001):
- Core Function: Token issuance
- Stateless: Only validates existing users
- Minimal: Easy to audit for security
- Future: Can be replaced with OAuth2 provider (Auth0, Keycloak)
User Service (8002):
- User Management: CRUD operations on users
- OTP System: Email verification
- Password Management: Reset, change
- Feature Assignment: Subscription features
Performance Metrics¶
| Operation | Latency | Bottleneck |
|---|---|---|
| Login (success) | 50-100ms | MongoDB query (~30ms) |
| Login (user not found) | 30-50ms | MongoDB query only |
| JWT Generation | < 1ms | CPU-bound (very fast) |
Optimization Opportunities¶
- Add MongoDB Index on
email(should already exist) - Cache User Documents (Redis) - for high-traffic scenarios
- Connection Pooling: MongoDB already pools connections
Testing¶
Manual Testing¶
Success Case:
curl -X POST http://localhost:8001/v2/login \
-F "email=user@example.com" \
-F "password=mypassword123"
Expected Response:
{
"message": "Login successful",
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"user_id": "User_12345"
}
User Not Found:
curl -X POST http://localhost:8001/v2/login \
-F "email=nonexistent@example.com" \
-F "password=anything"
Expected Response:
Invalid Password:
curl -X POST http://localhost:8001/v2/login \
-F "email=user@example.com" \
-F "password=wrongpassword"
Expected Response:
Unverified Account:
curl -X POST http://localhost:8001/v2/login \
-F "email=unverified@example.com" \
-F "password=correctpassword"
Expected Response:
Related Documentation¶
- User Service Documentation - Signup, OTP, Password Reset
- Gateway Service Documentation - How Gateway routes to Auth Service
- System Architecture
Recommendations & Next Steps¶
Critical (Security)¶
- ⚠️ IMPLEMENT PASSWORD HASHING (bcrypt) - ASAP!
- ⚠️ Add Rate Limiting - Prevent brute force
- ⚠️ Restrict CORS Origins - Production security
- ⚠️ Enforce JWT_SECRET - No fallback to default
Improvements¶
- Add
iat(Issued At) to JWT - Better token tracking - Implement Token Refresh - Long-lived sessions
- Add 2FA Support - Optional extra layer
- Audit Logging - Track all login attempts with IP
Code Quality¶
- Remove Duplicate
create_jwt_token- Keep one version - Remove Unused
generate_otp()- Clean up - Add Unit Tests - Test login logic
- Add Type Hints - Better code documentation
Last Updated: 2025-12-26
Code Version: auth-service/src/main.py (106 lines)
Total Endpoints: 1 (/v2/login)
Review Cycle: Monthly (Critical Security Service)
"Minimal by design, critical by function."