Skip to content

Access Control

Section: 5-security-architecture
Document: Access Control & RBAC
Status: Access Control Implementation
Audience: Backend developers, security teams, administrators


🎯 Overview

MachineAvatars implements Role-Based Access Control (RBAC) to manage user permissions across the platform. This document details the implementation of roles, permissions, audit logging, and access control enforcement.

Authorization Model: RBAC (Role-Based Access Control)
Roles: 5 predefined roles
Granularity: Feature-level + chatbot-level permissions
Audit: Access logs retained per plan


👥 RBAC Implementation

5 Roles Defined

from enum import Enum

class UserRole(Enum):
    OWNER = "Owner"           # Full control
    ADMIN = "Admin"           # Manage chatbots/users, no billing
    EDITOR = "Editor"         # Create/edit chatbots
    VIEWER = "Viewer"         # Read-only access
    ANALYST = "Analyst"       # Analytics only

Complete Permissions Matrix

This is the complete permissions matrix already documented in Authentication & Authorization, now with implementation details:

Database Schema:

// users_multichatbot_v2 collection
{
    "user_id": "User-123456",
    "email": "user@example.com",
    "role": "Admin",  // Default role for account
    "organization_id": "org_abc123",  // For multi-user accounts
    "created_at": "2025-01-15T10:00:00Z"
}

// user_permissions collection (granular chatbot permissions)
{
    "_id": ObjectId("..."),
    "user_id": "User-456",
    "chatbot_id": "chatbot_123",
    "permissions": ["read", "write"],
    "granted_by": "User-789",  // Owner who granted permission
    "granted_at": "2025-01-15T10:00:00Z"
}

🔒 Permission Enforcement

Backend Middleware

from fastapi import HTTPException, Depends
from functools import wraps

def require_permission(required_permission: str):
    """Decorator to enforce permissions on endpoints."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, user_context: dict = Depends(verify_jwt_token), **kwargs):
            user_id = user_context["user_id"]

            # Get user role
            user = users_collection.find_one({"user_id": user_id})
            role = user.get("role", "Viewer")

            # Check if role has permission
            if not has_permission(role, required_permission):
                raise HTTPException(
                    status_code=403,
                    detail=f"Insufficient permissions. Required: {required_permission}"
                )

            return await func(*args, user_context=user_context, **kwargs)
        return wrapper
    return decorator

# Usage
@app.post("/v1/chatbots")
@require_permission("chatbot:create")
async def create_chatbot(user_context: dict = Depends(verify_jwt_token)):
    # Only Owner, Admin, Editor can create chatbots
    pass

@app.delete("/v1/chatbots/{chatbot_id}")
@require_permission("chatbot:delete")
async def delete_chatbot(chatbot_id: str, user_context: dict = Depends(verify_jwt_token)):
    # Only Owner, Admin can delete chatbots
    pass

Permission Check Logic

ROLE_PERMISSIONS = {
    "Owner": [
        "chatbot:create", "chatbot:read", "chatbot:update", "chatbot:delete",
        "user:invite", "user:remove", "user:update_role",
        "billing:view", "billing:update",
        "analytics:view", "analytics:export",
        "prompts:update", "data:upload"
    ],
    "Admin": [
        "chatbot:create", "chatbot:read", "chatbot:update", "chatbot:delete",
        "user:invite", "user:remove", "user:update_role",
        "analytics:view", "analytics:export",
        "prompts:update", "data:upload"
    ],
    "Editor": [
        "chatbot:create", "chatbot:read", "chatbot:update",
        "analytics:view",
        "prompts:update", "data:upload"
    ],
    "Viewer": [
        "chatbot:read",
        "analytics:view"
    ],
    "Analyst": [
        "analytics:view", "analytics:export"
    ]
}

def has_permission(role: str, permission: str) -> bool:
    """Check if role has specific permission."""
    return permission in ROLE_PERMISSIONS.get(role, [])

📝 Audit Logging

Audit Log Schema

// audit_logs collection
{
    "_id": ObjectId("..."),
    "timestamp": "2025-01-15T10:30:00.000Z",
    "user_id": "User-123456",
    "action": "chatbot:delete",
    "resource_type": "chatbot",
    "resource_id": "chatbot_abc123",
    "ip_address": "203.0.113.50",
    "user_agent": "Mozilla/5.0...",
    "result": "success",  // or "denied" if permission check failed
    "metadata": {
        "chatbot_name": "Customer Support Bot",
        "previous_owner": "User-789"
    }
}

Logging Implementation

import logging
from datetime import datetime

async def log_access(
    user_id: str,
    action: str,
    resource_type: str,
    resource_id: str,
    result: str,
    metadata: dict = None,
    request = None
):
    """Log access attempt to audit log."""
    log_entry = {
        "timestamp": datetime.utcnow(),
        "user_id": user_id,
        "action": action,
        "resource_type": resource_type,
        "resource_id": resource_id,
        "ip_address": request.client.host if request else None,
        "user_agent": request.headers.get("user-agent") if request else None,
        "result": result,
        "metadata": metadata or {}
    }

    # Insert into audit logs collection
    audit_logs_collection.insert_one(log_entry)

    # Also log to application logs
    logging.info(f"Audit: {user_id} {action} {resource_type}:{resource_id} - {result}")

# Usage in endpoints
@app.delete("/v1/chatbots/{chatbot_id}")
@require_permission("chatbot:delete")
async def delete_chatbot(
    chatbot_id: str,
    user_context: dict = Depends(verify_jwt_token),
    request: Request = None
):
    user_id = user_context["user_id"]

    # Get chatbot for metadata
    chatbot = chatbots_collection.find_one({"_id": chatbot_id})

    try:
        # Perform deletion
        result = chatbots_collection.delete_one({"_id": chatbot_id})

        # Log successful deletion
        await log_access(
            user_id=user_id,
            action="chatbot:delete",
            resource_type="chatbot",
            resource_id=chatbot_id,
            result="success",
            metadata={"chatbot_name": chatbot.get("name")},
            request=request
        )

        return {"message": "Chatbot deleted"}
    except Exception as e:
        # Log failed attempt
        await log_access(
            user_id=user_id,
            action="chatbot:delete",
            resource_type="chatbot",
            resource_id=chatbot_id,
            result="failed",
            metadata={"error": str(e)},
            request=request
        )
        raise

Audit Log Retention

Plan Retention Period Searchable
Free 7 days
Pro 30 days
Business 90 days
Premium Unlimited

🏢 Multi-User & Department Management

Organization Structure

// organizations collection
{
    "_id": ObjectId("..."),
    "organization_id": "org_abc123",
    "name": "Acme Corporation",
    "owner_user_id": "User-123456",
    "plan": "Premium",
    "departments": [
        {
            "department_id": "dept_sales",
            "name": "Sales",
            "admin_user_ids": ["User-456"],
            "member_user_ids": ["User-789", "User-321"]
        },
        {
            "department_id": "dept_support",
            "name": "Customer Support",
            "admin_user_ids": ["User-654"],
            "member_user_ids": ["User-987"]
        }
    ],
    "created_at": "2025-01-01T00:00:00Z"
}

Department Isolation (Enterprise)

def get_accessible_chatbots(user_id: str):
    """Get chatbots user can access based on department."""
    user = users_collection.find_one({"user_id": user_id})

    # Owner can access all chatbots
    if user.get("role") == "Owner":
        return chatbots_collection.find({"organization_id": user["organization_id"]})

    # Get user's department
    org = organizations_collection.find_one({"organization_id": user["organization_id"]})
    user_department = None

    for dept in org.get("departments", []):
        if user_id in dept["admin_user_ids"] or user_id in dept["member_user_ids"]:
            user_department = dept["department_id"]
            break

    # Return only chatbots in user's department
    return chatbots_collection.find({
        "organization_id": user["organization_id"],
        "department_id": user_department
    })

🔐 Principle of Least Privilege

Implementation:

  1. Default to Viewer: New users start with minimal permissions
  2. Explicit Grants: Permissions must be explicitly granted
  3. Time-Limited: Temporary elevated access with auto-expiry (planned)
  4. Just-in-Time: Request permission when needed (planned)

Example:

# Temporary elevated permissions (planned feature)
async def grant_temporary_permission(
    user_id: str,
    permission: str,
    duration_hours: int = 24
):
    """Grant temporary permission that auto-expires."""
    expiry = datetime.utcnow() + timedelta(hours=duration_hours)

    temp_permissions_collection.insert_one({
        "user_id": user_id,
        "permission": permission,
        "granted_at": datetime.utcnow(),
        "expires_at": expiry
    })

Security:

Features:


"Access granted only to those who need it, when they need it." 🔐✅