Access Control¶
Section: 5-security-architecture
Document: Access Control & RBAC
Status: Access Control Implementation
Audience: Backend developers, security teams, administrators
🎯 Overview¶
MachineAvatars implements Role-Based Access Control (RBAC) to manage user permissions across the platform. This document details the implementation of roles, permissions, audit logging, and access control enforcement.
Authorization Model: RBAC (Role-Based Access Control)
Roles: 5 predefined roles
Granularity: Feature-level + chatbot-level permissions
Audit: Access logs retained per plan
👥 RBAC Implementation¶
5 Roles Defined¶
from enum import Enum
class UserRole(Enum):
OWNER = "Owner" # Full control
ADMIN = "Admin" # Manage chatbots/users, no billing
EDITOR = "Editor" # Create/edit chatbots
VIEWER = "Viewer" # Read-only access
ANALYST = "Analyst" # Analytics only
Complete Permissions Matrix¶
This is the complete permissions matrix already documented in Authentication & Authorization, now with implementation details:
Database Schema:
// users_multichatbot_v2 collection
{
"user_id": "User-123456",
"email": "user@example.com",
"role": "Admin", // Default role for account
"organization_id": "org_abc123", // For multi-user accounts
"created_at": "2025-01-15T10:00:00Z"
}
// user_permissions collection (granular chatbot permissions)
{
"_id": ObjectId("..."),
"user_id": "User-456",
"chatbot_id": "chatbot_123",
"permissions": ["read", "write"],
"granted_by": "User-789", // Owner who granted permission
"granted_at": "2025-01-15T10:00:00Z"
}
🔒 Permission Enforcement¶
Backend Middleware¶
from fastapi import HTTPException, Depends
from functools import wraps
def require_permission(required_permission: str):
"""Decorator to enforce permissions on endpoints."""
def decorator(func):
@wraps(func)
async def wrapper(*args, user_context: dict = Depends(verify_jwt_token), **kwargs):
user_id = user_context["user_id"]
# Get user role
user = users_collection.find_one({"user_id": user_id})
role = user.get("role", "Viewer")
# Check if role has permission
if not has_permission(role, required_permission):
raise HTTPException(
status_code=403,
detail=f"Insufficient permissions. Required: {required_permission}"
)
return await func(*args, user_context=user_context, **kwargs)
return wrapper
return decorator
# Usage
@app.post("/v1/chatbots")
@require_permission("chatbot:create")
async def create_chatbot(user_context: dict = Depends(verify_jwt_token)):
# Only Owner, Admin, Editor can create chatbots
pass
@app.delete("/v1/chatbots/{chatbot_id}")
@require_permission("chatbot:delete")
async def delete_chatbot(chatbot_id: str, user_context: dict = Depends(verify_jwt_token)):
# Only Owner, Admin can delete chatbots
pass
Permission Check Logic¶
ROLE_PERMISSIONS = {
"Owner": [
"chatbot:create", "chatbot:read", "chatbot:update", "chatbot:delete",
"user:invite", "user:remove", "user:update_role",
"billing:view", "billing:update",
"analytics:view", "analytics:export",
"prompts:update", "data:upload"
],
"Admin": [
"chatbot:create", "chatbot:read", "chatbot:update", "chatbot:delete",
"user:invite", "user:remove", "user:update_role",
"analytics:view", "analytics:export",
"prompts:update", "data:upload"
],
"Editor": [
"chatbot:create", "chatbot:read", "chatbot:update",
"analytics:view",
"prompts:update", "data:upload"
],
"Viewer": [
"chatbot:read",
"analytics:view"
],
"Analyst": [
"analytics:view", "analytics:export"
]
}
def has_permission(role: str, permission: str) -> bool:
"""Check if role has specific permission."""
return permission in ROLE_PERMISSIONS.get(role, [])
📝 Audit Logging¶
Audit Log Schema¶
// audit_logs collection
{
"_id": ObjectId("..."),
"timestamp": "2025-01-15T10:30:00.000Z",
"user_id": "User-123456",
"action": "chatbot:delete",
"resource_type": "chatbot",
"resource_id": "chatbot_abc123",
"ip_address": "203.0.113.50",
"user_agent": "Mozilla/5.0...",
"result": "success", // or "denied" if permission check failed
"metadata": {
"chatbot_name": "Customer Support Bot",
"previous_owner": "User-789"
}
}
Logging Implementation¶
import logging
from datetime import datetime
async def log_access(
user_id: str,
action: str,
resource_type: str,
resource_id: str,
result: str,
metadata: dict = None,
request = None
):
"""Log access attempt to audit log."""
log_entry = {
"timestamp": datetime.utcnow(),
"user_id": user_id,
"action": action,
"resource_type": resource_type,
"resource_id": resource_id,
"ip_address": request.client.host if request else None,
"user_agent": request.headers.get("user-agent") if request else None,
"result": result,
"metadata": metadata or {}
}
# Insert into audit logs collection
audit_logs_collection.insert_one(log_entry)
# Also log to application logs
logging.info(f"Audit: {user_id} {action} {resource_type}:{resource_id} - {result}")
# Usage in endpoints
@app.delete("/v1/chatbots/{chatbot_id}")
@require_permission("chatbot:delete")
async def delete_chatbot(
chatbot_id: str,
user_context: dict = Depends(verify_jwt_token),
request: Request = None
):
user_id = user_context["user_id"]
# Get chatbot for metadata
chatbot = chatbots_collection.find_one({"_id": chatbot_id})
try:
# Perform deletion
result = chatbots_collection.delete_one({"_id": chatbot_id})
# Log successful deletion
await log_access(
user_id=user_id,
action="chatbot:delete",
resource_type="chatbot",
resource_id=chatbot_id,
result="success",
metadata={"chatbot_name": chatbot.get("name")},
request=request
)
return {"message": "Chatbot deleted"}
except Exception as e:
# Log failed attempt
await log_access(
user_id=user_id,
action="chatbot:delete",
resource_type="chatbot",
resource_id=chatbot_id,
result="failed",
metadata={"error": str(e)},
request=request
)
raise
Audit Log Retention¶
| Plan | Retention Period | Searchable |
|---|---|---|
| Free | 7 days | ❌ |
| Pro | 30 days | ✅ |
| Business | 90 days | ✅ |
| Premium | Unlimited | ✅ |
🏢 Multi-User & Department Management¶
Organization Structure¶
// organizations collection
{
"_id": ObjectId("..."),
"organization_id": "org_abc123",
"name": "Acme Corporation",
"owner_user_id": "User-123456",
"plan": "Premium",
"departments": [
{
"department_id": "dept_sales",
"name": "Sales",
"admin_user_ids": ["User-456"],
"member_user_ids": ["User-789", "User-321"]
},
{
"department_id": "dept_support",
"name": "Customer Support",
"admin_user_ids": ["User-654"],
"member_user_ids": ["User-987"]
}
],
"created_at": "2025-01-01T00:00:00Z"
}
Department Isolation (Enterprise)¶
def get_accessible_chatbots(user_id: str):
"""Get chatbots user can access based on department."""
user = users_collection.find_one({"user_id": user_id})
# Owner can access all chatbots
if user.get("role") == "Owner":
return chatbots_collection.find({"organization_id": user["organization_id"]})
# Get user's department
org = organizations_collection.find_one({"organization_id": user["organization_id"]})
user_department = None
for dept in org.get("departments", []):
if user_id in dept["admin_user_ids"] or user_id in dept["member_user_ids"]:
user_department = dept["department_id"]
break
# Return only chatbots in user's department
return chatbots_collection.find({
"organization_id": user["organization_id"],
"department_id": user_department
})
🔐 Principle of Least Privilege¶
Implementation:
- Default to Viewer: New users start with minimal permissions
- Explicit Grants: Permissions must be explicitly granted
- Time-Limited: Temporary elevated access with auto-expiry (planned)
- Just-in-Time: Request permission when needed (planned)
Example:
# Temporary elevated permissions (planned feature)
async def grant_temporary_permission(
user_id: str,
permission: str,
duration_hours: int = 24
):
"""Grant temporary permission that auto-expires."""
expiry = datetime.utcnow() + timedelta(hours=duration_hours)
temp_permissions_collection.insert_one({
"user_id": user_id,
"permission": permission,
"granted_at": datetime.utcnow(),
"expires_at": expiry
})
🔗 Related Documentation¶
Security:
- Authentication & Authorization - RBAC roles and SSO
- API Security - API-level access control
- Incident Response - Account takeover playbook
Features:
- Admin & Management - User management features
- Enterprise Features - Department partitioning
"Access granted only to those who need it, when they need it." 🔐✅