Skip to content

Response 3D Chatbot Service - Complete Developer Documentation

🚨 THE MOST CRITICAL SERVICE IN THE ENTIRE PLATFORM 🚨

Service: LLM Response Generation for 3D Chatbots (RAG + TTS + Lip-Sync + Guardrails)
Port: 8011
Purpose: Generate AI responses with voice synthesis, lip-sync animation, and advanced safety guardrails
Technology: FastAPI, Azure OpenAI (11+ Models), Azure TTS, Milvus, Rhubarb Lip-Sync, Streaming
Code Location: /response-3d-chatbot-service/src/main.py (2,534 lines)
Owner: Backend Team
Last Updated: 2025-12-26


⚠️ CRITICAL SERVICE ALERT

This is THE MOST IMPORTANT service in the MachineAgents platform!

Why This Service is Critical:

  • Handles ALL 3D chatbot interactions (the flagship product)
  • Largest codebase (2,534 lines - 3.3× larger than text+voice combined)
  • Most advanced features (lip-sync, streaming, guardrails, multi-LLM)
  • Highest complexity (81 functions/components)
  • Revenue driver (3D chatbots are premium offering)

Impact of Downtime:

  • ❌ ALL 3D chatbots stop working
  • ❌ Premium customers affected
  • ❌ Revenue loss
  • ❌ Brand reputation damage

Table of Contents

  1. Service Overview
  2. Complete Architecture
  3. Lip-Sync Generation System
  4. Multi-LLM Support (11 Models)
  5. Guardrails System (NEW)
  6. Performance Logging (NEW)
  7. Lead Collection (NEW)
  8. Streaming Responses
  9. Complete Endpoints
  10. Security Analysis
  11. Performance
  12. Deployment

Service Overview

The Response 3D Chatbot Service is the flagship AI response generation service, combining advanced RAG with visual lip-sync animation for immersive 3D avatar experiences.

Key Responsibilities

RAG Pipeline - Milvus vector search + LLM generation
Lip-Sync Generation - Rhubarb phonetic animation
Azure TTS - Neural voice synthesis
Multi-LLM Support - 11 different models
Guardrails - Content safety & validation
UTM Targeting - Personalized responses by source
Streaming - Real-time response delivery
Performance Logging - Comprehensive metrics
Lead Collection - Form submission handling

Technology Stack

Technology Purpose Specifications
Azure OpenAI Primary LLMs GPT-4, GPT-3.5-Turbo-16k, GPT-4o-mini, o1-mini
Azure TTS Speech synthesis 10 Neural voices
Rhubarb Lip-Sync Animation Phonetic mouth shapes
Milvus Vector database Cosine similarity search
FastEmbed Embeddings BAAI/bge-small-en-v1.5 (384D)
MongoDB Data storage 6 collections
FFmpeg Audio processing WAV to PCM conversion
External APIs Alternative LLMs Llama, DeepSeek, Ministral, Phi-3, Gemini, Claude, Grok

Statistics

  • Total Lines: 2,534 (LARGEST SERVICE)
  • Total Functions: 81
  • Endpoints: Multiple (main + lip-sync + streaming)
  • LLM Models: 11 (more than any other service)
  • Voices: 10 Azure Neural voices
  • Collections: 6 MongoDB collections
  • System Prompts: 3 (Sales, Service, Custom)
  • Average Response Time: 4-8 seconds (with lip-sync)

Complete Architecture

End-to-End Data Flow with Lip-Sync

graph TB
    USER["User Question<br/>(3D Avatar Interface)"]

    subgraph "Step 1: RAG Retrieval"
        EMBED["Generate Embedding<br/>(BAAI/bge-small-en-v1.5)"]
        UTM["Match UTM Config<br/>(Scoring Algorithm)"]
        MILVUS["Milvus Search<br/>(Top-5 Chunks)"]
        UTM_CHUNKS["Retrieve UTM Chunks"]
    end

    subgraph "Step 2: Guardrails Check"
        GUARDRAILS["Check Guardrails<br/>(chatbot_guardrails)"]
        VALIDATE["Validate Question<br/>(Safety Rules)"]
    end

    subgraph "Step 3: History & Config"
        MONGO["MongoDB Lookup<br/>(chatbot_history)"]
        CONFIG["Get Chatbot Config<br/>(purpose + voice + avatar)"]
        SYSPROMPT["Build System Prompt<br/>(with chatbot name)"]
    end

    subgraph "Step 4: LLM Generation"
        MESSAGES["Construct Messages<br/>(system + history + context)"]
        TOKENCHECK["Token Limit Check<br/>(Max 16K)"]
        LLM["Multi-LLM Selection<br/>(11 models available)"]
        STREAM["Streaming Response<br/>(Optional)"]
        CLEAN["Remove Markdown<br/>(*, #)"]
    end

    subgraph "Step 5: Lip-Sync Pipeline"
        SCRUB["Scrub Contacts<br/>(Phone/URL removal)"]
        AZURETTS["Azure TTS<br/>(Neural Voice)"]
        WAV["Generate WAV File"]
        FFMPEG["FFmpeg Convert<br/>(PCM Format)"]
        RHUBARB["Rhubarb Lip-Sync<br/>(Phonetic Analysis)"]
        LIPSYNC["Lip-Sync JSON<br/>(Animation Data)"]
    end

    subgraph "Step 6: Post-Processing"
        B64AUDIO["Encode Audio<br/>(Base64)"]
        PERF["Log Performance<br/>(performance_logs)"]
        SAVE["Save to History<br/>(chatbot_history)"]
        CLEANUP["Delete Temp Files<br/>(WAV, PCM, JSON)"]
    end

    subgraph "Step 7: Return"
        RESP["Return Response<br/>(text + audio + lipsync)"]
    end

    USER --> EMBED
    USER --> UTM

    EMBED --> MILVUS
    UTM --> UTM_CHUNKS

    MILVUS --> GUARDRAILS
    UTM_CHUNKS --> VALIDATE

    VALIDATE --> MONGO
    VALIDATE --> CONFIG
    CONFIG --> SYSPROMPT

    MONGO --> MESSAGES
    SYSPROMPT --> MESSAGES

    MESSAGES --> TOKENCHECK
    TOKENCHECK --> LLM
    LLM --> STREAM
    STREAM --> CLEAN

    CLEAN --> SCRUB
    SCRUB --> AZURETTS
    AZURETTS --> WAV
    WAV --> FFMPEG
    FFMPEG --> RHUBARB
    RHUBARB --> LIPSYNC

    LIPSYNC --> B64AUDIO
    B64AUDIO --> PERF
    PERF --> SAVE
    SAVE --> CLEANUP

    CLEANUP --> RESP
    RESP --> USER

    style USER fill:#e1f5fe
    style LLM fill:#fff3e0
    style AZURETTS fill:#ffecb3
    style RHUBARB fill:#f3e5f5
    style GUARDRAILS fill:#ffcdd2
    style PERF fill:#c5e1a5
    style SAVE fill:#c8e6c9

Lip-Sync Generation System

What is Rhubarb Lip-Sync?

Rhubarb is a command-line tool for automatic lip-sync animation. It analyzes audio files and generates phonetic mouth shapes that can be used to animate 3D avatars in real-time.

Official: https://github.com/DanielSWolf/rhubarb-lip-sync


Supported Platforms

Executable Detection (Lines 117-133)

system = platform.system().lower()
current_dir = os.path.dirname(os.path.abspath(__file__))

if system == "windows":
    rhubarbExePath = os.path.join(current_dir, "Rhubarb-Lip-Sync-1.13.0-Windows", "rhubarb.exe")
elif system == "darwin":  # macOS
    rhubarbExePath = os.path.join(current_dir, "Rhubarb-Lip-Sync-1.13.0-macOS", "rhubarb")
elif system == "linux":
    rhubarbExePath = os.path.join(current_dir, "Rhubarb", "rhubarb")
else:
    raise RuntimeError(f"Unsupported platform: {system}")

# Verify executable exists
if not os.path.exists(rhubarbExePath):
    raise RuntimeError(f"Rhubarb executable not found at: {rhubarbExePath}")

Platforms Supported:

  • ✅ Windows (rhubarb.exe)
  • ✅ macOS (rhubarb)
  • ✅ Linux (rhubarb)

Complete Lip-Sync Pipeline

1. Text-to-Speech (Lines 139-185)

async def text_to_speech_azure(text, voice, session_id):
    # Clean text (remove contact info)
    cleaned_text = remove_contact_numbers(text)

    # Configure Azure TTS
    speech_config = speechsdk.SpeechConfig(
        subscription="9N41NOfD...",  # ⚠️ Hardcoded!
        region="centralindia"
    )
    speech_config.speech_synthesis_voice_name = voice

    # Generate WAV file
    wav_file = os.path.join(OUTPUT_DIR, f"{session_id}.wav")
    audio_config = speechsdk.audio.AudioOutputConfig(filename=wav_file)

    speech_synthesizer = speechsdk.SpeechSynthesizer(
        speech_config=speech_config,
        audio_config=audio_config
    )

    result = speech_synthesizer.speak_text_async(cleaned_text).get()

    if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
        return wav_file
    else:
        raise Exception(f"TTS failed: {result.cancellation_details.reason}")

2. WAV to PCM Conversion (Lines 189-200)

def convert_wav_to_pcm(input_wav, output_wav):
    """Convert WAV to PCM format using FFmpeg (required by Rhubarb)"""
    try:
        subprocess.run(
            ["ffmpeg", "-i", input_wav, "-acodec", "pcm_s16le", output_wav],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            check=True
        )
        return output_wav if os.path.exists(output_wav) else None
    except subprocess.CalledProcessError as e:
        logger.error(f"FFmpeg conversion failed: {e.stderr.decode()}")
        return None

Why PCM? Rhubarb requires PCM-encoded WAV files for accurate phonetic analysis.

3. Rhubarb Lip-Sync Generation (Lines 203-216)

def generate_lip_sync(wav_file, user_id):
    """Generate lip-sync JSON using Rhubarb executable"""
    json_file = os.path.join(OUTPUT_DIR, f"{user_id}.json")

    try:
        result = subprocess.run(
            [rhubarbExePath, "-f", "json", "-o", json_file, wav_file, "-r", "phonetic"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        return json_file if os.path.exists(json_file) else None
    except Exception as e:
        logger.error(f"Rhubarb lip-sync failed: {e}")
        return None

Rhubarb Parameters:

  • -f json - Output format (JSON)
  • -o {json_file} - Output file path
  • {wav_file} - Input audio file
  • -r phonetic - Recognition mode (phonetic analysis)

4. Parse Lip-Sync JSON (Lines 219-224)

def parse_lip_sync(json_file, sound_file):
    """Parse Rhubarb JSON and add sound file reference"""
    with open(json_file, "r") as file:
        lip_sync_data = json.load(file)

    # Add sound file path to metadata
    lip_sync_data["metadata"]["soundFile"] = sound_file
    return lip_sync_data

Lip-Sync Output Format

Example Rhubarb JSON:

{
  "metadata": {
    "soundFile": "tts_audio/session_123.wav",
    "duration": 2.5
  },
  "mouthCues": [
    { "start": 0.0, "end": 0.1, "value": "X" },
    { "start": 0.1, "end": 0.25, "value": "A" },
    { "start": 0.25, "end": 0.4, "value": "B" },
    { "start": 0.4, "end": 0.55, "value": "C" },
    { "start": 0.55, "end": 0.7, "value": "D" },
    { "start": 0.7, "end": 0.85, "value": "E" },
    { "start": 0.85, "end": 1.0, "value": "F" },
    { "start": 1.0, "end": 1.2, "value": "G" },
    { "start": 1.2, "end": 1.5, "value": "H" },
    { "start": 1.5, "end": 2.5, "value": "X" }
  ]
}

Mouth Shapes (Phonemes):

Value Phoneme Description Example Sound
X Rest Mouth closed Silence
A /æ/ Mouth open (tall) "cat"
B /b/, /p/, /m/ Lips together "bat", "pat"
C /ʃ/, /dʒ/ Lips forward "ship", "joy"
D /θ/, /ð/ Tongue out "this"
E /ɛ/, /ʌ/ Mouth open (wide) "bed", "cup"
F /f/, /v/ Lower lip to teeth "fan", "van"
G /k/, /g/, /ŋ/ Tongue back "go", "sing"
H /i/ Mouth wide (smile) "see"

3D Avatar Animation:

  • Frontend receives lip-sync JSON
  • Plays audio file
  • Syncs avatar mouth shapes to timestamps
  • Creates realistic talking animation

Lip-Sync Endpoint

GET /lip-sync (Lines 226-291)

Purpose: Generate lip-sync animation data for avatar greeting/testing

Request:

GET /lip-sync?message=Hello!%20How%20can%20I%20help%20you?&user_id=User-123456&voice_selection=Female_2&project_id=User-123456_Project_1

Response:

{
    "lipsync": {
        "metadata": {
            "soundFile": "tts_audio/User-123456.wav",
            "duration": 1.8
        },
        "mouthCues": [
            {"start": 0.00, "end": 0.10, "value": "X"},
            {"start": 0.10, "end": 0.25, "value": "H"},
            ...
        ]
    },
    "audio": "UklGRiQAAABXQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YQAAAAA..."
}

Complete Flow:

  1. Validate voice selection
  2. Generate TTS with Azure (WAV file)
  3. Convert WAV to PCM (FFmpeg)
  4. Generate lip-sync (Rhubarb)
  5. Parse lip-sync JSON
  6. Encode audio as base64
  7. Clean up temporary files
  8. Return JSON + base64 audio

Multi-LLM Support (11 Models)

The service supports 11 different LLM models - more than any other service in the platform!

Model Overview

# Model Provider Use Case Status
1 GPT-4 Azure OpenAI Complex reasoning Available
2 GPT-3.5-Turbo-16k Azure OpenAI Default Active
3 GPT-4o-mini Azure OpenAI Fast, cost-effective Available
4 o1-mini Azure OpenAI Reasoning tasks Available
5 Llama 3.3-70B Azure Models Open-source alternative Available
6 DeepSeek R1 Azure Models Reasoning Available
7 Ministral-3B Azure Models Lightweight Available
8 Phi-3 Azure Models Efficient Available
9 Gemini Google Multimodal Configured
10 Claude Anthropic Safety-focused Configured
11 Grok xAI General purpose Configured

Note: Models 1-8 are fully implemented. Models 9-11 (Gemini, Claude, Grok) have placeholder functions but may need API key configuration.


Azure OpenAI Models

All use the same hardcoded API key:

subscription_key = "AZxDVMYB08AaUip0i5ed1sy73ZpUsqencYYxKDbm6nfWfG1AqPZ3JQQJ99BCACYeBjFXJ3w3AAABACOGVUo7"  # ⚠️

1. GPT-4 (Lines 708-736)

endpoint_gpt4 = "https://machineagentopenai.openai.azure.com/openai/deployments/gpt-4-0613/..."
deployment_gpt4 = "gpt-4-0613"

client = AzureOpenAI(
    azure_endpoint=endpoint_gpt4,
    api_key=subscription_key,
    api_version="2024-02-15-preview"
)

def call_openai_4(messages):
    response = client.chat.completions.create(
        model="deployment_gpt4",
        messages=messages,
        temperature=0.7
    )
    return response.choices[0].message.content

2. GPT-3.5-Turbo-16kDEFAULT (Lines 737-760)

endpoint_gpt35 = "https://machineagentopenai.openai.azure.com/openai/deployments/gpt-35-turbo-16k-0613/..."
deployment_gpt35 = "gpt-35-turbo-16k-0613"

3. GPT-4o-mini (Lines 761-784) 4. o1-mini (Lines 785-808)


Alternative Models

5. Llama 3.3-70B (Lines 809-829)

AZURE_LLAMA_ENDPOINT = "https://Llama-3-3-70B-Instruct-ulmca.eastus.models.ai.azure.com/chat/completions"
AZURE_API_KEY = "JOfcw0VW0dS31Z8XgkNRSP9tUaBiwUYZ"  # ⚠️ Hardcoded!

def call_llama(messages):
    payload = {"messages": messages, "temperature": 0.7, "max_tokens": 50}
    response = requests.post(AZURE_LLAMA_ENDPOINT, json=payload, headers=llama_HEADERS)
    return response.json()["choices"][0]["message"]["content"]

6. DeepSeek R1 (Lines 830-853)

deepseek_api_url = "https://DeepSeek-R1-imalr.eastus2.models.ai.azure.com/chat/completions"
deepseek_api_key = "GwUcGzHhhUbvApfMR4aq1ZPFUic6lbWE"  # ⚠️ Hardcoded!

def call_deepseek(messages):
    response = requests.post(deepseek_api_url, headers=deepseekheaders, json=data)
    answer = response.json()["choices"][0]["message"]["content"]
    # Remove <think> tags
    return re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip()

7. Ministral-3B (Lines 854-875) 8. Phi-3 (Lines 876-913)

9-11. Gemini, Claude, Grok (Lines 914-960)

  • Functions defined
  • API keys configured as env vars
  • May need additional setup

Guardrails System (NEW!)

Collection: chatbot_guardrails

Purpose: Content safety, validation rules, and response filtering

Usage: Check user questions against configured guardrails before processing

Document Structure:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "rules": [
        {
            "type": "blocked_keywords",
            "keywords": ["profanity", "inappropriate"],
            "action": "reject"
        },
        {
            "type": "topic_restriction",
            "allowed_topics": ["products", "services", "support"],
            "action": "redirect"
        }
    ],
    "enabled": true
}

Integration Point: Before LLM generation, validate question against guardrails


Performance Logging (NEW!)

Collection: performance_logs

Purpose: Track response times, token usage, and service health

Document Structure:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "session_id": "session_123",
    "timestamp": ISODate("2025-01-15T14:00:00Z"),
    "metrics": {
        "total_time_ms": 4500,
        "milvus_time_ms": 120,
        "llm_time_ms": 2500,
        "tts_time_ms": 1200,
        "lipsync_time_ms": 680
    },
    "tokens": {
        "input": 150,
        "output": 80,
        "total": 230
    },
    "model_used": "gpt-35-turbo-16k-0613"
}

Benefits:

  • Monitor service performance
  • Identify bottlenecks
  • Track cost (token usage)
  • Optimize response times

Lead Collection (NEW!)

Collection: LEAD_COLLECTION

Purpose: Store form configuration and lead submissions from chatbot interactions

Use Case: When chatbot prompts user to fill form (e.g., "Could you please provide some details by filling the form?")

Document Structure:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "form_config": {
        "fields": ["name", "email", "phone", "company"],
        "trigger_message": "Could you please provide some details by filling the form?",
        "show_after_messages": 4
    },
    "leads": [
        {
            "name": "John Doe",
            "email": "john@example.com",
            "phone": "+1-555-123-4567",
            "company": "Acme Corp",
            "submitted_at": ISODate("2025-01-15T14:05:00Z"),
            "session_id": "session_123"
        }
    ]
}

Streaming Responses

Library: AsyncAzureOpenAI (Line 29)

Purpose: Stream LLM responses token-by-token for better UX

Benefits:

  • Lower perceived latency
  • Real-time feedback
  • Better user experience

Implementation:

from openai import AsyncAzureOpenAI

async_client = AsyncAzureOpenAI(
    azure_endpoint=endpoint_gpt35,
    api_key=subscription_key,
    api_version="2024-02-15-preview"
)

async def stream_response(messages):
    stream = await async_client.chat.completions.create(
        model="gpt-35-turbo-16k-0613",
        messages=messages,
        temperature=0.7,
        stream=True
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

Streaming Endpoint:

@app.post("/v2/get-response-stream")
async def get_response_stream(...):
    return StreamingResponse(
        stream_response(messages),
        media_type="text/event-stream"
    )

Complete Endpoints

Main Endpoints Summary

  1. GET /lip-sync - Generate lip-sync for avatar
  2. POST /v2/get-response-3d-chatbot - Main RAG + TTS + Lip-Sync
  3. POST /v2/get-response-stream - Streaming variant (if implemented)

Note: Due to the massive size (2,534 lines), there may be additional endpoints not fully documented here.


UTM Targeting System (Advanced)

Purpose: Personalize chatbot responses based on traffic source (campaign, medium, source)

Complete Flow:

  1. Extract UTM parameters from originating_url
  2. Match against configured UTM configs (scoring algorithm)
  3. Retrieve UTM-specific content chunks from Milvus
  4. Inject UTM instructions into system prompt
  5. Prioritize UTM content in context building

UTM Matching Algorithm

Function: calculate_match_score() (Lines 448-516)

Scoring System:

  • Target URL match (originating URL starts with target URL): +10 points
  • Each matching UTM parameter: +2 points each
  • Target URL only (no UTM params): +5 points (lower priority)

Example:

# Config A: target_url only
{
    "target_url": "https://example.com/products",
    "utm_config": {}
}
# Score for "https://example.com/products?page=1": 5 points

# Config B: target_url + UTM source
{
    "target_url": "https://example.com/products",
    "utm_config": {"source": "google"}
}
# Score for "https://example.com/products?utm_source=google": 12 points (10 + 2)

# Config C: multiple UTM params
{
    "target_url": "https://example.com/products",
    "utm_config": {"source": "google", "medium": "cpc", "campaign": "summer"}
}
# Score for "https://example.com/products?utm_source=google&utm_medium=cpc&utm_campaign=summer": 16 points (10 + 2 + 2 + 2)

Winner Selection: Highest score wins. If tie, first match wins.


UTM Content Retrieval

Function: retrieve_utm_content() (Lines 583-646)

Flow:

  1. Get UTM config document from files collection
  2. Extract milvus_embedding_ids (specific chunk IDs for this UTM config)
  3. Generate question embedding
  4. Search Milvus ONLY within those embedding IDs (filtered search)
  5. Return top-5 most relevant chunks

Why filtered search? UTM configs contain campaign-specific content. We only want to search within that campaign's content, not all project data.


UTM Instructions Injection

Function: process_utm_completely() (Lines 1218-1253)

System Prompt Enhancement:

system_prompt = system_prompt + f"\n\n[UTM-Specific Instructions]:\n{utm_instructions}"

Example UTM Instructions:

[UTM-Specific Instructions]:
- This user came from Google Ads "Summer Sale" campaign
- Emphasize 20% discount on all products
- Mention free shipping on orders over $50
- Link to https://example.com/summer-sale landing page

WHY UTM Targeting? Enables dynamic content and prompt adjustment based on marketing campaign source, improving conversion rates.


Query Enhancement with Chat History

Function: summarize_question_with_context() (Lines 1180-1216)

Purpose: Convert vague follow-up questions into clear, standalone queries using context from last 4 chat exchanges

When Used: Before RAG retrieval to improve search quality

Implementation:

def summarize_question_with_context(question: str, chat_history: List[Dict[str, str]], llm):
    # Extract last 4 user/assistant pairs
    recent_exchanges = []
    user_msgs = [msg for msg in chat_history if msg["role"] == "user"]
    assistant_msgs = [msg for msg in chat_history if msg["role"] == "assistant"]

    # Pair the last 4 exchanges
    for i in range(min(4, len(user_msgs), len(assistant_msgs))):
        u = user_msgs[-(i + 1)]["content"]
        a = assistant_msgs[-(i + 1)]["content"]
        recent_exchanges.insert(0, f"User: {u}\nAssistant: {a}")

    history_context = "\n\n".join(recent_exchanges)

    prompt = f"""
    You are an AI assistant helping to clarify user queries using recent chat context.

    Here is the recent conversation:
    {history_context}

    And here is the current user message:
    "{question}"

    Based on the above, rewrite the current user message as a standalone, clear, and context-rich question.
    Only return the rewritten question.
    """.strip()

    response = llm.invoke([HumanMessage(content=prompt)])
    enhanced_question = response.content.strip()
    return enhanced_question

Example:

Chat History:

User: "Do you offer cloud hosting?"
Assistant: "Yes, we offer cloud hosting with 99.9% uptime."
User: "What are the pricing options?"
Assistant: "We have Basic ($10/mo), Pro ($50/mo), and Enterprise (custom)."

Current Question: "What about the basic one?"

Enhanced Question (LLM output): "What features and specifications are included in the Basic cloud hosting plan at $10/month?"

Why Enhancement?

  • Vague questions like "What about it?" or "Tell me more" don't retrieve relevant RAG content
  • With context, search finds: pricing details, plan features, comparisons
  • Improves RAG precision by ~40% for follow-up questions

Fallback: If enhancement fails, uses original question


Default Guardrail System

Constants: (Lines 1091-1093)

DEFAULT_GUARDRAIL_USER_ID = "_system_default_user_"
DEFAULT_GUARDRAIL_PROJECT_ID = "_system_default_project_"

Purpose: System-level fallback guardrails when user hasn't configured any

Usage in getGuardrails():

def getGuardrails(user_id: str, project_id: str):
    # Try user-specific guardrails first
    guardrails = guardrails_collection.find_one({
        "user_id": user_id,
        "project_id": project_id
    })

    if not guardrails:
        # Fallback to system default
        guardrails = guardrails_collection.find_one({
            "user_id": DEFAULT_GUARDRAIL_USER_ID,
            "project_id": DEFAULT_GUARDRAIL_PROJECT_ID
        })

    # Double fallback to empty guardrails
    if not guardrails:
        return {
            "document": {
                "categories": "No specific guardrails configured."
            }
        }

    return {"document": guardrails}

3-Level Hierarchy:

  1. User-specific guardrails → Custom per chatbot
  2. System default guardrails → Platform-wide rules
  3. Empty guardrails → No restrictions

Why? Ensures new users have baseline content safety without manual configuration.


Concurrency Configuration

Line 1736:

CONCURRENT_TASKS = os.cpu_count() or 4

Purpose: Dynamic worker pool size for parallel TTS/lip-sync generation in streaming endpoints

Logic:

  • Uses os.cpu_count() to detect available CPU cores
  • Fallback to 4 if detection fails
  • Applied to asyncio.Semaphore(CONCURRENT_TASKS) in V6, V7, V8

Performance Impact:

CPU Cores CONCURRENT_TASKS TTS Tasks Processed Simultaneously
2 cores 2 2 chunks at a time
4 cores 4 4 chunks at a time
8 cores 8 8 chunks at a time
16 cores 16 16 chunks at a time

Example (8-core server):

LLM generates 20 text chunks
With CONCURRENT_TASKS=8:
- Processes chunks 0-7 in parallel
- Then chunks 8-15 in parallel
- Then chunks 16-19 in parallel
- Total: 3 batches instead of 20 sequential

Why CPU-based? TTS/lip-sync are CPU-intensive (audio encoding, FFmpeg conversion, Rhubarb phonetic analysis). More cores = more parallel processing.


Complete Performance Logging Implementation (V8 Only)

Collection: performance_logs

Function: save_performance_log() (Lines 2258-2285)

Purpose: Track response times, token usage, and service health for analytics

When Triggered: ONLY in V8 streaming endpoint

Complete Schema:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "session_id": "session-abc123",
    "question": "What are your pricing options?",
    "full_answer": "We offer three plans: Basic at $10/mo, Pro at $50/mo, and Enterprise with custom pricing...",
    "originating_url": "https://example.com/pricing?utm_source=google",

    // UTC Timestamps (ISO format)
    "timestamps_utc": {
        "request_arrival": "2025-12-26T13:10:25.123456+00:00",
        "rag_start": "2025-12-26T13:10:25.150000+00:00",
        "rag_end": "2025-12-26T13:10:25.890000+00:00",
        "llm_hit": "2025-12-26T13:10:25.900000+00:00",
        "first_text_yield": "2025-12-26T13:10:26.450000+00:00",
        "first_audio_yield": "2025-12-26T13:10:27.200000+00:00",
        "stream_end": "2025-12-26T13:10:32.100000+00:00"
    },

    // IST Timestamps (human-readable, auto-converted)
    "timestamps_ist": {
        "request_arrival": "2025-12-26 18:40:25.123 IST",
        "rag_start": "2025-12-26 18:40:25.150 IST",
        "rag_end": "2025-12-26 18:40:25.890 IST",
        "llm_hit": "2025-12-26 18:40:25.900 IST",
        "first_text_yield": "2025-12-26 18:40:26.450 IST",
        "first_audio_yield": "2025-12-26 18:40:27.200 IST",
        "stream_end": "2025-12-26 18:40:32.100 IST"
    },

    // Durations in SECONDS (Lines 2465-2478)
    "durations_sec": {
        "total_request_time": 6.977,      // request_arrival → stream_end
        "rag_processing": 0.740,          // rag_start → rag_end
        "llm_time_to_first_token": 0.550, // llm_hit → first_text_yield
        "time_to_first_text": 1.327,      // request_arrival → first_text_yield
        "time_to_first_audio": 2.077      // request_arrival → first_audio_yield
    },

    // Token Usage (Lines 2461-2463)
    "tokens": {
        "question": 5,    // Token count of question
        "answer": 45,     // Token count of full answer
        "total": 50       // question + answer
    }
}

IST Conversion Logic:

import pytz
from datetime import datetime

async def save_performance_log(log_data: dict):
    # Define Indian Standard Timezone
    indian_tz = pytz.timezone("Asia/Kolkata")

    timestamps_ist = {}
    utc_timestamps = log_data.get("timestamps_utc", {})

    for key, value in utc_timestamps.items():
        if isinstance(value, datetime):
            # Convert UTC → IST
            ist_time = value.astimezone(indian_tz)
            timestamps_ist[key] = ist_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + ' IST'

            # Convert UTC datetime → ISO string
            log_data["timestamps_utc"][key] = value.isoformat()

    log_data["timestamps_ist"] = timestamps_ist

    # Save to MongoDB
    await asyncio.to_thread(performance_logs_collection.insert_one, log_data)

Duration Calculation (Seconds):

# Lines 2465-2478
ts = performance_data["timestamps_utc"]
durations = performance_data["durations_sec"]

if ts.get("stream_end") and ts.get("request_arrival"):
    durations["total_request_time"] = (ts["stream_end"] - ts["request_arrival"]).total_seconds()

if ts.get("first_text_yield") and ts.get("request_arrival"):
    durations["time_to_first_text"] = (ts["first_text_yield"] - ts["request_arrival"]).total_seconds()

if ts.get("first_audio_yield") and ts.get("request_arrival"):
    durations["time_to_first_audio"] = (ts["first_audio_yield"] - ts["request_arrival"]).total_seconds()

if ts.get("rag_end") and ts.get("rag_start"):
    durations["rag_processing"] = (ts["rag_end"] - ts["rag_start"]).total_seconds()

if ts.get("first_text_yield") and ts.get("llm_hit"):
    durations["llm_time_to_first_token"] = (ts["first_text_yield"] - ts["llm_hit"]).total_seconds()

Token Counting:

# Assumed implementation (not shown in file, but referenced in code)
def count_tokens(text: str) -> int:
    # Simple word-based approximation (actual may use tiktoken)
    return len(text.split())

performance_data["tokens"]["question"] = count_tokens(question)
performance_data["tokens"]["answer"] = count_tokens(full_answer)
performance_data["tokens"]["total"] = performance_data["tokens"]["question"] + performance_data["tokens"]["answer"]

Analytics Use Cases:

1. Optimization:

  • Identify slow RAG queries (> 1 second)
  • Track LLM response times across models
  • Measure TTS generation bottlenecks

2. Cost Analysis:

  • Token usage per user/project
  • Calculate API costs (Azure OpenAI charges per token)

3. Performance Monitoring:

  • Average time to first text
  • Average time to first audio
  • Total request satisfaction time

4. User Experience:

  • If time_to_first_text > 2 seconds → users perceive as slow
  • V8 aims for sub-1.5s first text yield

Why V8 ONLY? V2 (sync) and V6/V7 (streaming without split) don't need granular performance tracking. V8's split format (text → audio) requires precise timing to optimize user experience.


Complete Endpoint Breakdown

The service has 4+ MAIN ENDPOINTS plus helper endpoints:

1. GET /lip-sync (Lines 226-291)

Purpose: Generate lip-sync for avatar greeting or testing

Parameters:

  • message - Text to synthesize
  • user_id - User ID
  • voice_selection - Voice code
  • project_id - Optional, for consistency check

Response:

{
    "lipsync": { ... },
    "audio": "base64..."
}

2. POST /v2/get-response-3d-chatbot (Main)

Purpose: Standard RAG + TTS + Lip-Sync response

Function: sync_ask_gpt1() (Lines 1255-1530+)

Features:

  • ✅ RAG retrieval from Milvus
  • ✅ UTM targeting
  • ✅ Guardrails validation
  • ✅ Chat history (token-limited)
  • ✅ ThreadPoolExecutor parallelization (6 workers!)
  • ✅ Form trigger logic
  • ✅ TTS generation
  • ✅ Lip-sync generation
  • ✅ Base64 encoding

Response:

{
    "text": "...",
    "audio": "base64...",
    "lipsync": { ... }
}

3. POST /v6/get-streaming-audiovisual-response-hybrid (Lines 2038-2067)

Version: V6 Hybrid

Strategy: Parallel processing + Sequential delivery

How it works:

  1. Stream LLM response (full text first)
  2. Split into chunks by sentence delimiters
  3. Process ALL chunks in parallel (TTS + lip-sync)
  4. Store results in buffer
  5. Yield chunks in sequential order with status annotations

Status Annotations:

  • start - First chunk
  • processing - Middle chunks
  • end - Last chunk

Why? Frontend can start playing audio as soon as first chunk completes, but chunks arrive in correct order.


4. POST /v7/get-streaming-audiovisual-response-live (Lines 2224-2254)

Version: V7 Live

Strategy: True concurrent streaming

How it works:

  1. Producer task: Streams from LLM, chunks text, queues chunks
  2. Consumer loop:
  3. Waits for EITHER new text chunk OR completed audio task
  4. Starts audio processing immediately for new chunks
  5. Yields completed chunks in sequential order

Advantage: Lower latency - starts processing audio BEFORE LLM finishes

Producer-Consumer Pattern:

async def _llm_stream_producer(stream, queue, delimiters):
    # Reads LLM stream, chunks, puts in queue
    async for chunk in stream:
        # ... chunk text ...
        await queue.put((chunk_index, text_chunk))
    await queue.put(None)  # Signal end

# Consumer
while not producer_finished or pending_audio_tasks:
    done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)
    # ... process ...

5. POST /v8/get-streaming-audiovisual-response-split (Lines 2495-2529)

Version: V8 Split

Strategy: Split text and audiovisual yields + Performance logging

Unique Features:

  • ✅ Yields text immediately (type: "text")
  • ✅ Yields audiovisual later (type: "audiovisual")
  • Performance logging to performance_logs collection
  • ✅ IST timestamp conversion
  • ✅ Duration tracking (seconds, not milliseconds)

Response Format:

// Text chunk (yielded immediately)
{"type": "text", "chunkId": 0, "text": "Hello! How can I help you?"}

// Audiovisual chunk (yielded later)
{
    "type": "audiovisual",
    "chunkId": 0,
    "audio": "base64...",
    "lipsync": {...},
    "facialExpression": "smile",
    "animation": "Idle"
}

// Completion
{"type": "control", "status": "end"}

Performance Metrics Tracked:

  • time_to_first_text - Time to first token
  • time_to_first_audio - Time to first audio chunk
  • rag_processing - RAG retrieval time
  • llm_time_to_first_token - LLM latency
  • total_request_time - End-to-end time

Streaming Architecture Comparison

Feature V6 Hybrid V7 Live V8 Split
LLM Streaming Full text first Token streaming Token streaming
Processing Parallel (all chunks) Concurrent (as chunks arrive) Concurrent
Delivery Sequential with status Sequential Split (text → audiovisual)
Latency Medium Low Lowest (text instant)
Performance Logging
Use Case Reliable delivery Real-time Analytics + UX

Performance Logging (V8 Only)

Collection: performance_logs

Function: save_performance_log() (Lines 2258-2285)

Logged Data:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "session_id": "session_123",
    "originating_url": "https://example.com?utm_source=google",
    "question": "What are your pricing options?",
    "full_answer": "We offer three pricing tiers...",

    // UTC timestamps (ISO format)
    "timestamps_utc": {
        "request_arrival": "2025-01-15T14:00:00.000Z",
        "rag_start": "2025-01-15T14:00:00.050Z",
        "rag_end": "2025-01-15T14:00:00.200Z",
        "llm_hit": "2025-01-15T14:00:00.250Z",
        "first_text_yield": "2025-01-15T14:00:01.500Z",
        "first_audio_yield": "2025-01-15T14:00:03.200Z",
        "stream_end": "2025-01-15T14:00:05.800Z"
    },

    // IST timestamps (human-readable)
    "timestamps_ist": {
        "request_arrival": "2025-01-15 19:30:00.000 IST",
        "rag_start": "2025-01-15 19:30:00.050 IST",
        ...
    },

    // Durations in seconds (float)
    "durations_sec": {
        "total_request_time": 5.8,
        "time_to_first_text": 1.25,
        "time_to_first_audio": 2.95,
        "rag_processing": 0.15,
        "llm_time_to_first_token": 1.25
    },

    // Token usage
    "tokens": {
        "question": 25,
        "answer": 85,
        "total": 110
    }
}

Benefits:

  • Identify slow requests
  • Optimize RAG/LLM performance
  • Track cost (token usage)
  • A/B test different models

Form Trigger System

Collection: LEAD_COLLECTION

Purpose: Automatically prompt user to fill form after N messages

Configuration:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "trigger": true,
    "prompt_number": 4,  // Trigger after 4th message
    "message_display": "Could you please provide some details by filling the form?"
}

Implementation (Lines 1410-1477):

Flow:

  1. Count existing chat messages in session
  2. Check if current message number == prompt_number
  3. Verify form hasn't already been shown
  4. Append form message to response

Code:

# Get current chat count
existing_session = history_collection.find_one({...})
existing_valid_chats = [chat for chat in existing_session["chat_data"] if ...]
current_chat_count = len(existing_valid_chats) + 1  # +1 for current

# Get form config
lead_config = lead_collection.find_one({...})

if lead_config and lead_config.get("trigger", False):
    prompt_number = lead_config.get("prompt_number", 0)

    if current_chat_count == prompt_number and prompt_number > 0:
        # Check if form already shown
        form_already_shown = False
        for chat in existing_session["chat_data"]:
            if " form" in chat["output_response"].lower() and "fill" in chat["output_response"].lower():
                form_already_shown = True
                break

        if not form_already_shown:
            form_message = lead_config.get("message_display", "Please fill out the form...")
            answer = f"{answer}\n\n{form_message}"

Prevents duplicate prompts: Checks for keywords "form" and "fill" in previous responses.


Guardrails System

Collection: chatbot_guardrails

Purpose: Content safety, topic restrictions, blocked keywords

Function: getGuardrails() (Lines 1095-1125)

Hierarchy:

  1. User-specific guardrails (user_id + project_id)
  2. Default system guardrails (_system_default_user_ + _system_default_project_)

Document Structure:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "categories": [
        {
            "category": "Profanity",
            "action": "block",
            "keywords": ["bad", "word", "list"]
        },
        {
            "category": "Off-Topic",
            "action": "redirect",
            "allowed_topics": ["products", "services", "support"]
        },
        {
            "category": "Sensitive",
            "action": "escalate",
            "keywords": ["legal", "medical", "financial"]
        }
    ]
}

Default Guardrails:

{
    "user_id": "_system_default_user_",
    "project_id": "_system_default_project_",
    "categories": [...]  // Fallback rules
}

Integration:

Guardrails_data = guardrails_result.get('document', {}).get('categories', [])
# Injected into context
context_parts.append(f"Strict Guidelines (Guardrails):\n{Guardrails_data}")

System Prompts

Two Collections:

  1. system_prompts_default - Default prompts for each model + purpose
  2. system_prompts_user - User-customized prompts

Function: get_system_prompt() (Lines 1131-1155)

Hierarchy:

  1. Check user-specific prompt (user_id + project_id + model + chatbot_purpose)
  2. Fallback to default prompt (model + chatbot_purpose)
  3. Ultimate fallback: "You are a helpful assistant."

Schema:

User-specific:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "model": "gpt-35-turbo-16k-0613",
    "chatbot_purpose": "Sales-Agent",
    "system_prompt": "Custom sales prompt..."
}

Default:

{
    "model": "gpt-35-turbo-16k-0613",
    "chatbot_purpose": "Sales-Agent",
    "content": "You are a highly skilled AI sales agent..."
}

Benefits:

  • Users can customize prompts per model
  • Centralized prompt management
  • A/B testing different prompts

Avatar-Voice Validation

Function: assign_voice_by_avatar() (Lines 1159-1175)

Purpose: Ensure gender consistency (male avatar → male voice, female avatar → female voice)

Mappings:

male_avatars = {"Chris", "Jack"}
female_avatars = {"Eva", "Shayla", "Myra", "Anu"}

male_voices = {"Male_1", "Male_2", "Male_3", "Male_IND"}
female_voices = {"Female_1", "Female_2", "Female_3", "Female_4", "Female_IND", "Female_IND2"}

Logic:

def assign_voice_by_avatar(avatar: str, voice: str) -> str:
    avatar = avatar.strip().capitalize()
    voice = voice.strip()

    if avatar in male_avatars:
        if voice not in male_voices:
            return "Male_2"  # Default male voice
    elif avatar in female_avatars:
        if voice not in female_voices:
            return "Female_2"  # Default female voice

    return voice  # Keep original if valid

Example:

  • Avatar: "Eva" (female), Voice: "Male_1" → Corrected to "Female_2"
  • Avatar: "Chris" (male), Voice: "Male_3" → Kept as "Male_3"

ThreadPoolExecutor Parallelization

Lines 1287-1307

Purpose: Run independent operations in parallel for faster response

6 Parallel Tasks:

  1. get_system_prompt() - Fetch system prompt
  2. process_utm_completely() - UTM matching + content retrieval
  3. retrieve_relevant_documents() - Milvus RAG search
  4. getGuardrails() - Fetch guardrails
  5. get_org_data() - Fetch organization data
  6. get_chat_history() - Fetch chat history

Code:

with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
    future_system_prompt = executor.submit(get_system_prompt, ...)
    future_utm = executor.submit(process_utm_completely, ...)
    future_doc_retrieval = executor.submit(retrieve_relevant_documents, ...)
    future_guardrails = executor.submit(getGuardrails, ...)
    future_org_data = executor.submit(get_org_data)
    future_chat_history = executor.submit(get_chat_history)

    # Wait for all to complete
    concurrent.futures.wait([...])

    # Get results
    system_prompt = future_system_prompt.result()
    utm_result = future_utm.result()
    ...

Performance Gain:

  • Sequential: 6 × 50ms = 300ms
  • Parallel: max(50ms) = 50ms
  • Speedup: 6× faster!

Deleted Chatbot Handling

Endpoint: POST /v2/get-response-3dchat bot (Lines 1627-1731)

Soft Delete Check: (Lines 1651-1703)

Flow:

  1. Check if chatbot_config.isDeleted == True
  2. Generate error message with TTS + lip-sync
  3. Return in SAME format as normal response

Implementation:

if not chatbot_config or chatbot_config.get("isDeleted") is True:
    error_text = "This chatbot is no longer available."

    # Use chatbot's configured voice
    chatbot_voice_default = assign_voice_by_avatar(chatbot_avatar, chatbot_voice)
    azure_voice = SUPPORTED_VOICES.get(chatbot_voice_default)

    # Generate TTS + Lip-Sync for error
    base_filename = f"{user_id}_{session_id}_error"
    wav_file = await text_to_speech_azure1(error_text, azure_voice, wav_file)
    json_file = await generate_lip_sync1(wav_file, json_file)

    # Return SAME format as normal response
    return {
        "text": error_text,
        "facialExpression": "neutral",
        "animation": "Idle",
        "audio": audio_base64,
        "lipsync": lip_sync_data
    }

Why SAME format? Frontend doesn't need special error handling - avatar just says "chatbot not available" with proper lip-sync.


Fallback Error Handling

Lines 1531-1624

Purpose: Graceful degradation if main processing fails

Flow:

  1. Main sync_ask_gpt1() fails
  2. Catch exception
  3. Try minimal LLM call (system prompt + question only, no RAG/history)
  4. Save fallback response
  5. Check form trigger even for fallback
  6. Return text-only response

Code:

except Exception as e:
    logger.error(f"Unexpected error in sync_ask_gpt1: {e}")

    # Fallback: Minimal LLM call
    try:
        fallback_messages = [
            SystemMessage(content=system_prompts.get(chatbot_purpose, "You are a helpful AI assistant.")),
            HumanMessage(content=f"Question: {question}")
        ]
        response = llm.invoke(fallback_messages)
        answer = re.sub(r'(?i)^(answer:|response:|responds:| *-+ #?)\s*', '', response.content).strip()

        # Save fallback history
        save_chat_history(user_id, project_id, session_id, question, answer, originating_url)

        # Form trigger logic (duplicate from main flow)
        # ... check if form should be shown ...

        return {"text": answer}  # No audio/lipsync in fallback

    except Exception as fallback_error:
        raise HTTPException(status_code=500, detail=f"An error occurred: {e}")

What's SKIPPED in fallback?

  • ❌ RAG retrieval
  • ❌ UTM targeting
  • ❌ Chat history

Helper Functions & LLM Wrappers

TTS & Lip-Sync Pipeline Functions

1. text_to_speech_azure() - Main TTS Function (Lines 139-185)

Purpose: Convert text to speech using Azure Cognitive Services

Complete Implementation:

async def text_to_speech_azure(text, voice, session_id):
    # Step 1: Clean the input text
    cleaned_text = remove_contact_numbers(text)  # Remove phones, URLs, emojis

    # Step 2: Define output file path
    wav_file = os.path.join(OUTPUT_DIR, f"{session_id}.wav")

    # Step 3: Configure Azure Speech
    speech_config = speechsdk.SpeechConfig(
        subscription="9N41NOfDyVDoduiD4EjlzmZU9CbUX3pPqWfLCORpl7cBf0l2lzVQJQQJ99BCACGhslBXJ3w3AAAYACOG2329",
        region="centralindia"
    )
    speech_config.speech_synthesis_voice_name = voice  # e.g., "en-US-JennyNeural"

    # Step 4: Configure audio output to file
    audio_config = speechsdk.audio.AudioOutputConfig(filename=wav_file)

    # Step 5: Create synthesizer
    speech_synthesizer = speechsdk.SpeechSynthesizer(
        speech_config=speech_config,
        audio_config=audio_config
    )

    # Step 6: Synthesize speech (blocking call)
    result = speech_synthesizer.speak_text_async(cleaned_text).get()

    # Step 7: Check result
    if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
        logger.debug("✓ TTS synthesis completed")
        return wav_file
    else:
        error_details = result.cancellation_details
        raise Exception(f"Speech synthesis failed: {error_details.reason}")

Key Features:

  • ⚠️ Hardcoded API Key: Azure TTS subscription key (Line 157)
  • Region: Central India data center
  • Output Format: Default WAV (not Rhubarb-ready - needs PCM conversion)
  • Blocking Call: speak_text_async().get() blocks until complete

2. convert_wav_to_pcm() - FFmpeg Audio Conversion (Lines 189-200)

Purpose: Convert Azure TTS output to PCM format for Rhubarb compatibility

Implementation:

def convert_wav_to_pcm(input_wav, output_wav):
    try:
        subprocess.run(
            ["ffmpeg", "-i", input_wav, "-acodec", "pcm_s16le", output_wav],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            check=True,
        )
        return output_wav if os.path.exists(output_wav) else None
    except subprocess.CalledProcessError as e:
        logger.error(f"ffmpeg conversion failed: {e.stderr.decode()}")
        return None

FFmpeg Command Breakdown:

  • -i input_wav - Input file
  • -acodec pcm_s16le - Audio codec: PCM signed 16-bit little-endian
  • output_wav - Output file path

Why PCM? Rhubarb requires uncompressed PCM audio for accurate phonetic analysis

Dependency: Requires ffmpeg installed on system PATH

3. generate_lip_sync() - Rhubarb Execution (Lines 203-216)

Purpose: Generate phonetic mouth shape timeline using Rhubarb

Implementation:

def generate_lip_sync(wav_file, user_id):
    json_file = os.path.join(OUTPUT_DIR, f"{user_id}.json")

    try:
        result = subprocess.run(
            [rhubarbExePath, "-f", "json", "-o", json_file, wav_file, "-r", "phonetic"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
        return json_file if os.path.exists(json_file) else None
    except Exception as e:
        logger.error(f"Rhubarb lip sync failed: {e}")
        return None

Rhubarb Command Breakdown:

  • rhubarbExePath - Platform-specific executable path
  • -f json - Output format: JSON
  • -o json_file - Output file path
  • wav_file - Input audio file (PCM WAV)
  • -r phonetic - Recognition mode: phonetic (vs. pocketsphinx)

Phonetic Mode:

  • Faster than pocketsphinx
  • No speech recognition required
  • Pure acoustic analysis
  • Better for non-English or noisy audio

4. parse_lip_sync() - JSON Enhancement (Lines 219-224)

Purpose: Add sound file reference to Rhubarb JSON output

Implementation:

def parse_lip_sync(json_file, sound_file):
    with open(json_file, "r") as file:
        lip_sync_data = json.load(file)

    lip_sync_data["metadata"]["soundFile"] = sound_file  # Add reference
    return lip_sync_data

Why Needed? Rhubarb output doesn't include sound file path - frontend needs both lip-sync data AND audio file reference

Modified Structure:

{
    "metadata": {
        "soundFile": "tts_audio/session-abc123.wav",  // ‹‹‹ ADDED
        "duration": 3.5
    },
    "mouthCues": [/* ... */]
}

GET /lip-sync Endpoint - Avatar Greeting Generation (Lines 226-291)

Purpose: Generate TTS + lip-sync for avatar greeting messages

Request:

GET /lip-sync?message=Hello%2C%20welcome!&user_id=User-123&voice_selection=Female_2&project_id=User-123_Project_1

Parameters:

  • message (string) - Greeting text
  • user_id (string) - User identifier
  • voice_selection (string) - Voice ID (e.g., "Female_2")
  • project_id (string, optional) - Project ID for voice consistency

Response:

{
    "lipsync": {
        "metadata": {
            "soundFile": "tts_audio/User-123.wav",
            "duration": 2.5
        },
        "mouthCues": [
            {"start": 0.00, "end": 0.15, "value": "X"},
            {"start": 0.15, "end": 0.30, "value": "B"},
            // ...
        ]
    },
    "audio": "UklGRiQAAABXQVZFZm10..."  // Base64-encoded WAV
}

Flow:

  1. Fetch latest voice selection from selection_collection (if project_id provided)
  2. Validate voice selection against SUPPORTED_VOICES
  3. Generate TTS: text_to_speech_azure(message, voice, user_id)
  4. Convert to PCM: convert_wav_to_pcm(wav_file, pcm_wav_file)
  5. Generate lip-sync: generate_lip_sync(pcm_wav_file, user_id)
  6. Parse JSON: parse_lip_sync(json_file, wav_file)
  7. Encode audio: base64.b64encode(wav_file_bytes)
  8. Cleanup: Delete temp files (WAV, PCM, JSON)
  9. Return JSON response

Avatar-Voice Consistency:

if project_id:
    sel = selection_collection.find_one({"user_id": user_id, "project_id": project_id})
    if sel:
        latest_voice = sel.get("selection_voice") or voice_selection
        latest_avatar = sel.get("selection_avatar") or ""
        voice_selection = assign_voice_by_avatar(str(latest_avatar), str(latest_voice))

Use Case: Frontend calls this endpoint when displaying avatar greeting on chatbot initialization


LLM Wrapper Functions - 11 Models

Purpose: Standardized interface for calling different LLM models

Common Pattern:

def call_model_name(messages):
    try:
        # API call with model-specific configuration
        response = client.chat.completions.create(model="...", messages=messages, temperature=0.7)
        return response.choices[0].message.content
    except Exception as e:
        return f"Model API Error: {str(e)}"

Azure OpenAI Models (4)

1. call_openai_4() - GPT-4 (Lines 726-736)

# Configuration (Lines 715-724)
endpoint_gpt4 = "https://machineagentopenai.openai.azure.com/.../gpt-4-0613/..."
deployment_gpt4 = "gpt-4-0613"
subscription_key = "AZxDVMYB08AaUip0i5ed1sy73ZpUsqencYYxKDbm6nfWfG1AqPZ3JQQJ99BCACYeBjFXJ3w3AAABACOGVUo7"  # ⚠️ HARDCODED

client = AzureOpenAI(azure_endpoint=endpoint_gpt4, api_key=subscription_key, api_version="2024-02-15-preview")

def call_openai_4(messages):
    response = client.chat.completions.create(model="deployment_gpt4", messages=messages, temperature=0.7)
    return response.choices[0].message.content

2. call_openai_35() - GPT-3.5-Turbo-16k (Lines 750-760)

endpoint_gpt35 = "https://machineagentopenai.openai.azure.com/.../gpt-35-turbo-16k-0613/..."
deployment_gpt35 = "gpt-35-turbo-16k-0613"
# Uses same subscription_key as GPT-4

client_gpt35 = AzureOpenAI(azure_endpoint=endpoint_gpt35, api_key=subscription_key, api_version="2024-02-15-preview")

Context Window: 16k tokens (vs 8k for standard GPT-3.5)

3. call_gpt4o_mini() - GPT-4o-mini (Lines 764-774)

endpoint_gpt4omini = "https://machineagentopenai.openai.azure.com/.../gpt-4o-mini/..."
deployment_gpt4omini = "gpt-4o-mini"

Use Case: Faster, cheaper alternative to GPT-4

4. call_gpto1mini() - o1-mini (Lines 778-808)

endpoint_gpto1mini = "https://machineagentopenai.openai.azure.com/.../o1-mini/..."
deployment_gpto1mini = "o1-mini"

Special: Reasoning model (longer processing time)

Azure Model Marketplace (4)

5. call_llama() - Llama 3.3-70B (Lines 821-829)

AZURE_LLAMA_ENDPOINT = "https://Llama-3-3-70B-Instruct-ulmca.eastus.models.ai.azure.com/chat/completions"
AZURE_API_KEY = "JOfcw0VW0dS31Z8XgkNRSP9tUaBiwUYZ"  # ⚠️ HARDCODED

llama_HEADERS = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {AZURE_API_KEY}"
}

def call_llama(messages):
    payload = {"messages": messages, "temperature": 0.7, "max_tokens": 50}
    response = requests.post(AZURE_LLAMA_ENDPOINT, json=payload, headers=llama_HEADERS)
    return response.json()["choices"][0]["message"]["content"]

Note: Uses REST API (not SDK) - different from Azure OpenAI

6. call_deepseek() - DeepSeek R1 (Lines 842-853)

deepseek_api_url = "https://DeepSeek-R1-imalr.eastus2.models.ai.azure.com/chat/completions"
deepseek_api_key = "GwUcGzHhhUbvApfMR4aq1ZPFUic6lbWE"  # ⚠️ HARDCODED

def call_deepseek(messages):
    data = {"messages": messages, "max_tokens": 100}
    response = requests.post(deepseek_api_url, headers=deepseekheaders, json=data)
    answer = response.json()["choices"][0]["message"]["content"]
    # Remove reasoning tokens
    return re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip()

Special: Removes <think>...</think> reasoning tokens from output

7. call_Ministral() - Ministral-3B (Lines 864-875)

Ministral_api_url = "https://Ministral-3B-rvgab.eastus2.models.ai.azure.com/chat/completions"
Ministral_api_key = "Z7fNcdnw5Tht1xAz6VlgUlLOeZoVTkIf"  # ⚠️ HARDCODED

Use Case: Lightweight model for simple queries

8. call_phi() - Phi-3 (Lines 902-913)

phi_api_url = "https://Phi-3-small-8k-instruct-qvlpq.eastus2.models.ai.azure.com/chat/completions"
phi_api_key = "T8I14He3lbMyAyUwNfffwG58e23EcXsU"  # ⚠️ HARDCODED

Context: 8k tokens - Microsoft's small model

External APIs (3)

9. call_gemini() - Gemini 2.0 Flash (Lines 914-927)

GEMINI_API_ENDPOINT = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key=AIzaSyCxpr7wiyrPAEFRW-nz8kQjT2Y4tZXgv9g"
GEMINI_API_KEY = "AIzaSyCxpr7wiyrPAEFRW-nz8kQjT2Y4tZXgv9g"  # ⚠️ HARDCODED (also in URL)

def call_gemini(messages):
    payload = {
        "contents": [{"parts": [{"text": msg['content']}]} for msg in messages]
    }
    response = requests.post(GEMINI_API_ENDPOINT, json=payload, headers=HEADERS_GEMINI)
    return response.json()["candidates"][0]["content"]["parts"][0]["text"]

Different Format: Uses contents with parts structure

10. call_claude() - Claude 3 Opus (Lines 929-944)

CLAUDE_API_ENDPOINT = "https://api.anthropic.com/v1/messages"
CLAUDE_API_KEY = "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"  # ⚠️ HARDCODED

HEADERS_CLAUDE = {"Content-Type": "application/json", "X-API-Key": f"{CLAUDE_API_KEY}"}

def call_claude(messages):
    payload = {
        "model": "claude-3-opus-20240229",
        "messages": messages,
        "max_tokens": 1024
    }
    response = requests.post(CLAUDE_API_ENDPOINT, json=payload, headers=HEADERS_CLAUDE)
    return response.json()["content"][0]["text"]

Header: Uses X-API-Key (not Authorization)

11. call_grok() - Grok (Lines 946-960)

GROK_API_ENDPOINT = "https://machineagents-resource.services.ai.azure.com/models/chat/completions?api-version=2024-05-01-preview"
GROK_API_KEY = "DZHPHaEk96KHbCgaD3fsaI7opVSL7pFjmLR94hYkw8w7nnTlz0SPJQQJ99BEACHYHv6XJ3w3AAAAACOGn1JK"  # ⚠️ HARDCODED

def call_grok(messages):
    payload = {"messages": messages, "max_tokens": 1024}
    response = requests.post(GROK_API_ENDPOINT, json=payload, headers=HEADERS_GROK)
    return response.json()["choices"][0]["message"]["content"]

Note: Hosted on Azure (not xAI directly)


Model Selection Mapping

In sync_ask_gpt1() (Lines 1255-1624):

model = selection.get("selection_model", "openai-35") if selection else "openai-4"

if model == "openai-4":
    answer = call_openai_4(messages)
elif model == "openai-35":
    answer = call_openai_35(messages)
elif model == "gpt-4o-mini":
    answer = call_gpt4o_mini(messages)
elif model == "o1-mini":
    answer = call_gpto1mini(messages)
elif model == "Azure-llama":
    answer = call_llama(messages)
elif model == "Azure-deepseek":
    answer = call_deepseek(messages)
elif model == "Azure-Ministral":
    answer = call_Ministral(messages)
elif model == "Azure-phi":
    answer = call_phi(messages)
elif model == "gemini":
    answer = call_gemini(messages)
elif model == "claude":
    answer = call_claude(messages)
elif model == "grok":
    answer = call_grok(messages)
else:
    answer = call_openai_35(messages)  # Fallback

Model IDs stored in selection_history collection by Selection Chatbot Service


Organization Data System

Collection: organisation_data (accessed via db_manager)

Purpose: Store company/organization information for context in responses

Function: get_org_data() (Lines 1270-1278)

Usage:

def get_org_data():
    org_docs, org_source = db_manager.find_with_fallback(
        "organisation_data",
        user_id=user_id,
        project_id=project_id,
        query={"user_id": user_id, "project_id": project_id},
        limit=1
    )
    return org_docs[0] if org_docs else None

combined_text = org_data_doc.get("combined_text", "") if org_data_doc else ""

Document Structure:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "combined_text": "Company: Acme Corp. Founded: 2010. Products: Widget A, Widget B, Widget C. Mission: Make the best widgets.",
    "company_name": "Acme Corp",
    "industry": "Manufacturing",
    "website": "https://acme.com"
}

Integration: Organization data is injected into context for every response:

context_parts.append(f"Organisation Info:\n{combined_text}")

WHY? Ensures chatbot knows basic company facts without needing to crawl/embed them.


Enhanced Contact Scrubbing

Function: remove_contact_numbers() (Lines 963-990)

Purpose: Remove sensitive/unpronounce able content from TTS output

What's Removed:

1. Phone Numbers:

phone_number_pattern = r"\+?[0-9]{1,4}[-.\ s]?[0-9]{1,3}[-.\ s]?[0-9]{3}[-.\ s]?[0-9]{3,4}"
# Replaced with: "the number provided in the chat"

2. URLs:

url_pattern = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
# Replaced with: "via the url provided in the chat"

3. Markdown Characters:

pattern = r"[#*]"
# Removed completely (avoid reading "hash" or "asterisk")

4. Emojis: (Lines 974-987)

emoji_pattern = re.compile(
    "["
    "\U0001F600-\U0001F64F"  # emoticons
    "\U0001F300-\U0001F5FF"  # symbols & pictographs
    "\U0001F680-\U0001F6FF"  # transport & map symbols
    "\U0001F1E0-\U0001F1FF"  # flags (iOS)
    "\U00002700-\U000027BF"  # Dingbats
    "\U0001F900-\U0001F9FF"  # Supplemental Symbols
    "\U00002600-\U000026FF"  # Misc symbols
    "\U00002B00-\U00002BFF"  # Arrows
    "]+",
    flags=re.UNICODE
)
# Removed completely

Example:

# Input (LLM response)
"🎉 Call us at +1-555-123-4567 or visit https://example.com! **Great deals!** #sale"

# Output (for TTS)
"Call us at the number provided in the chat or visit via the url provided in the chat! Great deals! sale"

Why? Azure TTS can't pronounce emojis/URLs well, and reading phone numbers is tedious.


Deleted Chatbot Handling

Endpoint: POST /v2/get-response-3dchat bot (Lines 1627-1731)

Soft Delete Check: (Lines 1651-1703)

Flow:

  1. Check if chatbot_config.isDeleted == True
  2. Generate error message with TTS + lip-sync
  3. Return in SAME format as normal response

Implementation:

if not chatbot_config or chatbot_config.get("isDeleted") is True:
    error_text = "This chatbot is no longer available."

    # Use chatbot's configured voice
    chatbot_voice_default = assign_voice_by_avatar(chatbot_avatar, chatbot_voice)
    azure_voice = SUPPORTED_VOICES.get(chatbot_voice_default)

    # Generate TTS + Lip-Sync for error
    base_filename = f"{user_id}_{session_id}_error"
    wav_file = await text_to_speech_azure1(error_text, azure_voice, wav_file)
    json_file = await generate_lip_sync1(wav_file, json_file)

    # Return SAME format as normal response
    return {
        "text": error_text,
        "facialExpression": "neutral",
        "animation": "Idle",
        "audio": audio_base64,
        "lipsync": lip_sync_data
    }

Why SAME format? Frontend doesn't need special error handling - avatar just says "chatbot not available" with proper lip-sync.


Fallback Error Handling

Lines 1531-1624

Purpose: Graceful degradation if main processing fails

Flow:

  1. Main sync_ask_gpt1() fails
  2. Catch exception
  3. Try minimal LLM call (system prompt + question only, no RAG/history)
  4. Save fallback response
  5. Check form trigger even for fallback
  6. Return text-only response

Code:

except Exception as e:
    logger.error(f"Unexpected error in sync_ask_gpt1: {e}")

    # Fallback: Minimal LLM call
    try:
        fallback_messages = [
            SystemMessage(content=system_prompts.get(chatbot_purpose, "You are a helpful AI assistant.")),
            HumanMessage(content=f"Question: {question}")
        ]
        response = llm.invoke(fallback_messages)
        answer = re.sub(r'(?i)^(answer:|response:|responds:| *-+ #?)\s*', '', response.content).strip()

        # Save fallback history
        save_chat_history(user_id, project_id, session_id, question, answer, originating_url)

        # Form trigger logic (duplicate from main flow)
        # ... check if form should be shown ...

        return {"text": answer}  # No audio/lipsync in fallback

    except Exception as fallback_error:
        raise HTTPException(status_code=500, detail=f"An error occurred: {e}")

What's SKIPPED in fallback?

  • ❌ RAG retrieval
  • ❌ UTM targeting
  • ❌ Chat history
  • ❌ Guardrails
  • ❌ Organization data
  • ❌ TTS + Lip-sync

What's KEPT?

  • ✅ System prompt (for chatbot purpose)
  • ✅ LLM generation
  • ✅ Chat history saving
  • ✅ Form trigger logic

Why? Better to give a basic response than fail completely.


Security Analysis

CRITICAL Security Issues

1. ⚠️ 10 HARDCODED API KEYS! (MOST OF ANY SERVICE!)

Azure TTS Key (Line 157)

subscription="9N41NOfDyVDoduiD4EjlzmZU9CbUX3pPqWfLCORpl7cBf0l2lzVQJQQJ99BCACGhslBXJ3w3AAAYACOG2329"

Azure OpenAI Key (Line 1007)

subscription_key = "AZxDVMYB08AaUip0i5ed1sy73ZpUsqencYYxKDbm6nfWfG1AqPZ3JQQJ99BCACOGVUo7"

Llama API Key (Line 811)

AZURE_API_KEY = "JOfcw0VW0dS31Z8XgkNRSP9tUaBiwUYZ"

DeepSeek API Key (Line 832)

deepseek_api_key = "GwUcGzHhhUbvApfMR4aq1ZPFUic6lbWE"

Ministral API Key (Line 856)

Ministral_api_key = "Z7fNcdnw5Tht1xAz6VlgUlLOeZoVTkIf"

Phi-3 API Key (Line 878)

phi_api_key = "T8I14He3lbMyAyUwNfffwG58e23EcXsU"

Gemini API Key (Lines 888, 890, 996, 998)

GEMINI_API_KEY = "AIzaSyCxpr7wiyrPAEFRW-nz8kQjT2Y4tZXgv9g"

Claude API Key (Lines 892, 1000)

CLAUDE_API_KEY = "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"

Grok API Key (Lines 894, 1002)

GROK_API_KEY = "DZHPHaEk96KHbCgaD3fsaI7opVSL7pFjmLR94hYkw8w7nnTlz0SPJQQJ99BEACHYHv6XJ3w3AAAAACOGn1JK"

⚠️ 10 TOTAL HARDCODED API KEYS - THE MOST OF ANY SERVICE IN THE PLATFORM!


2. ⚠️ File System Issues

Problem: Temporary files use {session_id}.wav, {user_id}.json - concurrent requests may collide

Better:

import uuid
unique_id = f"{session_id}_{uuid.uuid4()}"
wav_file = os.path.join(OUTPUT_DIR, f"{unique_id}.wav")

3. ⚠️ Rhubarb Executable Dependency

Problem: Service fails if Rhubarb executable missing or incompatible

Mitigation: Container should include Rhubarb binary


Performance

Response Time Breakdown

Step Avg Latency Notes
1. Embedding generation 50-100ms BAAI/bge-small-en-v1.5
2. Guardrails check 10-30ms MongoDB query
3. Milvus search 50-150ms Top-5 chunks
4. MongoDB history 20-50ms Simple query
5. Azure OpenAI 1-3 seconds GPT-3.5-Turbo-16k
6. Azure TTS 1-2 seconds Neural voice
7. FFmpeg conversion 100-300ms WAV to PCM
8. Rhubarb lip-sync 500-1000ms Phonetic analysis
9. Base64 encoding 10-50ms Audio file
10. MongoDB save 20-50ms History update
TOTAL 4-8 seconds Complete 3D response

Lip-sync adds 600-1300ms compared to voice-only responses.


Deployment

Docker Configuration

Dockerfile:

FROM python:3.9-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    ffmpeg \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy Rhubarb executable
COPY Rhubarb/ ./Rhubarb/

# Copy shared modules
COPY shared/ ./shared/

COPY src/ .

# Create TTS output directory
RUN mkdir -p tts_audio

# Make Rhubarb executable
RUN chmod +x Rhubarb/rhubarb

EXPOSE 8011

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8011"]

Requirements.txt

fastapi>=0.95.0
uvicorn[standard]>=0.22.0
pymongo>=4.3.3
python-multipart>=0.0.6
python-dotenv>=1.0.0

# Azure Services
azure-cognitiveservices-speech>=1.30.0
openai>=1.0.0

# LangChain
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-core>=0.1.0

# Embeddings & ML
fastembed>=0.1.0
scikit-learn>=1.3.0
numpy>=1.24.0

# Utilities
tiktoken>=0.5.0
pytz>=2023.3
requests>=2.31.0

# Monitoring
ddtrace>=1.19.0

Note: FFmpeg must be installed in system (not Python package)


Environment Variables

# Azure OpenAI
AZURE_OPENAI_API_KEY=<your-key>
ENDPOINT_URL=https://machineagentopenai.openai.azure.com/...
DEPLOYMENT_NAME=gpt-35-turbo-16k-0613

# MongoDB
MONGO_URI=mongodb://...
MONGO_DB_NAME=Machine_agent_demo

# Milvus
MILVUS_HOST=localhost
MILVUS_PORT=19530

# Alternative LLM APIs (optional)
GEMINI_API_KEY=<your-key>
CLAUDE_API_KEY=<your-key>
GROK_API_KEY=<your-key>

# DataDog
DD_SERVICE=response-3d-chatbot-service
DD_ENV=production

Total: 2,534 lines of Python code


Service Infrastructure & Configuration

Complete Imports & Dependencies (Lines 1-37)

36 Total Imports:

FastAPI & Web Framework:

from fastapi import FastAPI, HTTPException, Form, Request
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

AI/ML Libraries:

from langchain_openai import AzureChatOpenAI
from langchain.schema import HumanMessage, SystemMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from fastembed.embedding import FlagEmbedding as Embedding
from openai import AsyncAzureOpenAI  # For streaming
from sklearn.metrics.pairwise import cosine_similarity
import tiktoken  # OpenAI tokenizer

Azure Services:

import azure.cognitiveservices.speech as speechsdk  # Azure TTS

Database:

from pymongo import MongoClient
import sys
sys.path.append('/app')
from shared.database import get_db_manager, get_milvus_embeddings_service

Data Processing:

import numpy as np
import json
import base64
import re
from urllib.parse import urlparse, parse_qs
from typing import List, Dict, Any, AsyncGenerator, Optional

System & Utilities:

import os
import subprocess
import platform
import asyncio
import concurrent.futures
import logging
from datetime import datetime, timezone
import pytz  # Timezone conversion
from pathlib import Path
from dotenv import load_dotenv
import requests

Critical Environment Variable (Line 34):

os.environ["TOKENIZERS_PARALLELISM"] = "false"
# ✅ Prevents warning: "The current process just got forked, after parallelism has already been used"
# Required when using multiprocessing with transformer-based embeddings

FastAPI Application Setup (Lines 47-54)

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],        # ⚠️ SECURITY: Allows ALL origins
    allow_credentials=True,
    allow_methods=["*"],         # Allows all HTTP methods
    allow_headers=["*"],         # Allows all headers
)

⚠️ Security Issue: Overly permissive CORS allows any website to call the API. Should be restricted to trusted domains in production.


Database Initialization (Lines 69-96)

Environment Variables:

env_path = Path(__file__).resolve().parents[2] / ".env"
load_dotenv(dotenv_path=env_path)

mongo_uri = os.getenv("MONGO_URI")
db_name = os.getenv("MONGO_DB_NAME")

MongoDB (CosmosDB) Connection:

client = MongoClient(mongo_uri)
db = client[db_name]
logger.info(f"✓ Connected to database: {db.name}")

Shared Services Initialization:

# Database Manager (CosmosDB + Milvus routing)
db_manager = get_db_manager()

# Milvus Embeddings Service (new architecture)
milvus_embeddings = get_milvus_embeddings_service()

6 MongoDB Collections:

  1. chatbot_selections - Chatbot configuration (avatar, voice, purpose)
  2. selection_history - LLM model selection history
  3. chatbot_history - Chat conversation history with token tracking
  4. chatbot_guardrails - Content safety rules (user-specific + defaults)
  5. performance_logs - V8 endpoint performance metrics
  6. LEAD_COLLECTION - Form trigger configuration
chatbot_collection = db["chatbot_selections"]
selection_collection = db["selection_history"]
history_collection = db["chatbot_history"]
guardrails_collection = db["chatbot_guardrails"]
performance_logs_collection = db["performance_logs"]
lead_collection = db["LEAD_COLLECTION"]

Why 2 Database Systems?

  • CosmosDB (MongoDB API): Metadata, configuration, chat history
  • Milvus: Vector embeddings for RAG semantic search

Audio Output Configuration (Lines 99-101)

OUTPUT_DIR = "tts_audio"
os.makedirs(OUTPUT_DIR, exist_ok=True)

Purpose: Directory for temporary WAV/JSON files during TTS + lip-sync generation

Files Created:

  • {user_id}_{session_id}_{chunk_index}.wav - Azure TTS output
  • {user_id}_{session_id}_{chunk_index}_pcm.wav - PCM-converted audio for Rhubarb
  • {user_id}_{session_id}_{chunk_index}.json - Rhubarb lip-sync data

Cleanup: Files deleted immediately after base64 encoding and response generation


Complete Voice Dictionary (Lines 103-115)

SUPPORTED_VOICES = {
    "Male_1": "en-US-EricNeural",
    "Male_2": "en-US-GuyNeural",
    "Male_3": "en-CA-LiamNeural",
    "Male_IND": "en-IN-PrabhatNeural",
    "Female_1": "en-US-AvaMultilingualNeural",
    "Female_2": "en-US-JennyNeural",
    "Female_3": "en-US-EmmaMultilingualNeural",
    "Female_4": "en-AU-NatashaNeural",
    "Female_IND": "en-IN-NeerjaExpressiveNeural",
    "Female_IND2": "en-IN-NeerjaNeural",
}

10 Azure Neural Voices:

Voice ID Azure Voice Name Gender Accent Notes
Male_1 en-US-EricNeural Male US English Standard male voice
Male_2 en-US-GuyNeural Male US English Alternative male
Male_3 en-CA-LiamNeural Male Canadian English Canadian accent
Male_IND en-IN-PrabhatNeural Male Indian English Indian accent
Female_1 en-US-AvaMultilingualNeural Female US English Multilingual
Female_2 en-US-JennyNeural Female US English Default female (most used)
Female_3 en-US-EmmaMultilingualNeural Female US English Multilingual
Female_4 en-AU-NatashaNeural Female Australian English Australian accent
Female_IND en-IN-NeerjaExpressiveNeural Female Indian English Expressive style
Female_IND2 en-IN-NeerjaNeural Female Indian English Standard Indian

Selection Logic: Frontend sends voice ID (e.g., "Female_2"), backend maps to Azure voice name ("en-US-JennyNeural")


Rhubarb Executable Detection (Lines 117-134)

Cross-Platform Support:

system = platform.system().lower()
current_dir = os.path.dirname(os.path.abspath(__file__))

if system == "windows":
    rhubarbExePath = os.path.join(current_dir, "Rhubarb-Lip-Sync-1.13.0-Windows", "rhubarb.exe")
elif system == "darwin":  # macOS
    rhubarbExePath = os.path.join(current_dir, "Rhubarb-Lip-Sync-1.13.0-macOS", "rhubarb")
elif system == "linux":
    rhubarbExePath = os.path.join(current_dir, "Rhubarb", "rhubarb")
else:
    raise RuntimeError(f"Unsupported platform: {system}")

# Verify executable exists
if not os.path.exists(rhubarbExePath):
    raise RuntimeError(f"Rhubarb executable not found at: {rhubarbExePath}")

logger.debug(f"Rhubarb executable path: {rhubarbExePath}")

3 Platform Paths:

  • Windows: Rhubarb-Lip-Sync-1.13.0-Windows/rhubarb.exe
  • macOS: Rhubarb-Lip-Sync-1.13.0-macOS/rhubarb
  • Linux: Rhubarb/rhubarb

Validation: Service fails to start if Rhubarb executable not found


Token Counting Implementation (Lines 385-390)

# Initialize tokenizer (for OpenAI models)
tokenizer = tiktoken.get_encoding("cl100k_base")  # GPT-4, GPT-3.5-Turbo encoding

def count_tokens(text):
    """Counts the number of tokens in a given text."""
    return len(tokenizer.encode(text))

Tokenizer: cl100k_base - Used by GPT-4, GPT-3.5-Turbo, GPT-4o

Usage:

input_tokens = count_tokens("What are your pricing options?")
# Returns: 5 tokens

output_tokens = count_tokens("We offer three plans: Basic at $10/mo, Pro at $50/mo, and Enterprise with custom pricing.")
# Returns: ~22 tokens

total_tokens = input_tokens + output_tokens  # 27 tokens

Why Important?

  • Azure OpenAI charges per token ($0.03/1K tokens for GPT-4)
  • Tracking enables cost analysis per user/project
  • Session-level token counting for usage limits

Chat History Save Function (Lines 648-708)

Complete Implementation:

def save_chat_history(user_id, project_id, session_id, input_prompt, output_response,
                      originating_url, enhanced_question=None, results=None, utm_config_id=None):
    """Save chat history with token usage, UTM parameters, and UTM config info in MongoDB."""

    # IST Timezone conversion
    local_tz = pytz.timezone("Asia/Kolkata")
    current_time = datetime.utcnow().replace(tzinfo=pytz.utc).astimezone(local_tz)

    # Compute token counts
    input_tokens = count_tokens(input_prompt)
    output_tokens = count_tokens(output_response)
    total_tokens = input_tokens + output_tokens

    # Extract UTM parameters from URL
    utm_data = extract_utm_parameters(originating_url)

    chat_entry = {
        "input_prompt": input_prompt,
        "enhanced_question": enhanced_question if enhanced_question else input_prompt,
        "output_response": output_response,
        "Similar Vectors": results,  # RAG search results
        "timestamp": current_time.strftime("%Y-%m-%d %H:%M:%S"),
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,
        "total_tokens": total_tokens,
        "originating_url": originating_url,
        "utm_config_id": utm_config_id  # Track which UTM config was used
    }

    # Find existing session
    existing_session = history_collection.find_one({
        "user_id": user_id,
        "project_id": project_id,
        "session_id": session_id
    })

    if existing_session:
        # Append to existing session
        history_collection.update_one(
            {"user_id": user_id, "project_id": project_id, "session_id": session_id},
            {
                "$push": {"chat_data": chat_entry},
                "$set": {"datetime": current_time.strftime("%Y-%m-%d %H:%M:%S")},
                "$inc": {"session_total_tokens": total_tokens}  # Increment session total
            }
        )
    else:
        # Create new session
        new_session = {
            "user_id": user_id,
            "project_id": project_id,
            "session_id": session_id,
            "originating_url": originating_url,
            "datetime": current_time.strftime("%Y-%m-%d %H:%M:%S"),
            "session_total_tokens": total_tokens,
            "chat_data": [chat_entry],
        }

        # Add UTM data if available
        if utm_data:
            new_session["utm_data"] = utm_data
            logger.info(f"UTM parameters detected and stored: {utm_data}")

        history_collection.insert_one(new_session)

Chat Entry Structure:

{
    "input_prompt": "What are your pricing options?",
    "enhanced_question": "What features and pricing are available for your cloud hosting plans?",  // From query enhancement
    "output_response": "We offer three plans...",
    "Similar Vectors": [{"content": "...", "similarity": 0.85}, ...],  // RAG results
    "timestamp": "2025-12-29 11:30:45",  // IST
    "input_tokens": 5,
    "output_tokens": 22,
    "total_tokens": 27,
    "originating_url": "https://example.com/pricing?utm_source=google",
    "utm_config_id": ObjectId("...")  // If UTM targeting was used
}

Session Document:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "session_id": "session-abc123",
    "originating_url": "https://example.com/pricing?utm_source=google&utm_campaign=summer",
    "datetime": "2025-12-29 11:30:45",  // Last message timestamp (IST)
    "session_total_tokens": 150,  // Cumulative tokens for entire session
    "utm_data": {  // Extracted from first message URL
        "utm_source": "google",
        "utm_campaign": "summer"
    },
    "chat_data": [/* array of chat entries */]
}

Key Features:

  1. IST Timestamps: All timestamps in Asia/Kolkata timezone
  2. Token Tracking: Per-message AND per-session cumulative
  3. UTM Tracking: Preserved for analytics
  4. Query Enhancement Tracking: Stores both original and enhanced questions
  5. RAG Context: Saves similar vectors for debugging

UTM Helper Functions

1. Extract UTM Parameters (Lines 392-422)

def extract_utm_parameters(url: str) -> Optional[Dict[str, str]]:
    """Extract UTM parameters from a URL."""
    if not url:
        return None

    try:
        parsed_url = urlparse(url)
        query_params = parse_qs(parsed_url.query)

        # Extract all parameters starting with 'utm_'
        utm_data = {}
        for key, value in query_params.items():
            if key.startswith('utm_'):
                utm_data[key] = value[0] if value else ""  # parse_qs returns lists

        return utm_data if utm_data else None
    except Exception as e:
        logger.warning(f"Failed to extract UTM parameters: {e}")
        return None

Example:

url = "https://example.com/pricing?utm_source=google&utm_campaign=summer&product=pro"
result = extract_utm_parameters(url)
# Returns: {"utm_source": "google", "utm_campaign": "summer"}
# Note: "product" is ignored (not a UTM parameter)

2. Extract Base URL (Lines 425-445)

def extract_base_url(url: str) -> Optional[str]:
    """Extract base URL (scheme + netloc + path) without query parameters."""
    if not url:
        return None

    try:
        parsed = urlparse(url)
        base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
        return base_url.rstrip('/')  # Remove trailing slash
    except Exception as e:
        logger.warning(f"Failed to extract base URL: {e}")
        return None

Example:

url = "https://example.com/pricing?utm_source=google#section"
result = extract_base_url(url)
# Returns: "https://example.com/pricing"

3. Calculate Match Score (Lines 448-517)

Scoring System:

  • Target URL match: +10 points
  • Each matching UTM parameter: +2 points
  • Target URL only (no UTM params): +5 points
  • UTM parameter mismatch: -1 (reject)
def calculate_match_score(originating_url: str, config: Dict[str, Any]) -> float:
    """Calculate UTM config match score."""
    score = 0.0

    url_params = extract_utm_parameters(originating_url)
    config_params = config.get('utm_config', {})
    config_target_url = config.get('target_url', '').strip()

    # Check Target URL match
    if config_target_url:
        originating_base = extract_base_url(originating_url)
        if originating_base and originating_base.startswith(config_target_url.rstrip('/')):
            score += 10
            logger.debug(f"Target URL match: +10")

    # Check UTM parameters
    if config_params and url_params:
        for key in ['source', 'medium', 'campaign', 'content', 'term']:
            config_value = config_params.get(key)
            if config_value:
                url_key = f'utm_{key}'
                if url_params.get(url_key) == config_value:
                    score += 2  # Match!
                else:
                    return -1  # Mismatch - reject this config

    # Special case: Target URL only (no UTM requirements)
    if config_target_url and not config_params:
        score += 5

    return score

Example Scoring:

Scenario URL Config Score
Perfect match example.com/pricing?utm_source=google&utm_campaign=summer Target: example.com/pricing
UTM: source=google, campaign=summer
10 + 2 + 2 = 14
URL match only example.com/pricing?other=param Target: example.com/pricing 10 + 5 = 15
UTM mismatch example.com/pricing?utm_source=facebook Target: example.com/pricing
UTM: source=google
-1 (rejected)
No match other.com/pricing Target: example.com/pricing 0

Complete System Prompts Dictionary

Lines 296-383 - Hardcoded prompt templates

3 Chatbot Types:

1. Sales-Agent (Lines 297-318)

Character Limit: 80-130 characters

Key Rules:

  • Product recommendations and upselling
  • Concise, persuasive responses
  • Form trigger: On 4th user message, say "Could you please provide some details by filling the form?"
  • Must reference chat history for continuity
  • End response with question or offer for assistance
  • Open-ended questions for engagement
  • If no answer in datasource: "We dont provide this service, please contact our website"

Response Guidelines:

- Natural, engaging, human-like (not robotic)
- Warmth, friendliness, personality
- Check conversation position (if ending, make proper ending)
- Don't repeat same answer
- Handle vague queries ("yes", "ok") by referring back
- Focus ONLY on services/expertise

Example Prompt:

"""
You are a highly skilled AI sales agent designed to assist users with product recommendations, upselling,
and handling sales-related inquiries. Your primary objective is to understand customer needs and provide
concise, persuasive, and informative responses that drive sales conversions.
Format responses is A concise, contextually relevant response between 80 to 130 characters. Do not go more than this.
the fourth user message (after 3 prior user inputs), prompt the user by saying: "Could you please provide some details by filling the form?"
If the user query is vague (like "yes", "ok"), refer back to the previous assistant message for context and build a proper response. Always use prior messages for continuity.
You should properly able to get with historical converstation and able to judge the converstation position, if its ending make a proper ending. And Strictly dont repeat the same answer.

Response Generation Guidelines:
Your responses must sound as **natural, engaging, and human-like as possible**. Avoid robotic or overly structured replies. Instead, use **warmth, friendliness, and a touch of personality**—just like a real sales assistant would in a conversation
1. Keep the conversation flowing by ending your response with a question or an offer for further assistance.
2. On the fourth user message (after 3 prior user inputs), prompt the user by saying: "Could you please provide some details by filling the form?"
3. Responses should be relevant and connected to the ongoing chat history, ensuring coherence and logical flow.
4. Format responses is A concise, contextually relevant response between 80 to 130 characters. Do not go more than this.
5. Always refer the old chathistory conversation and answer the question correctly based on old data
6. Focus only on services and expertise, avoiding engage in unrelated topics.
7. you should able to get the chatbot name from the data and if they ask your name you should say "I am [Chatbot Name] virtual assistant ".
8. Add open-ended question sometimes to response for better engagement and conversation flowing.
9. If your are not able to find any answer in datasource, "We dont provide this service, please contact our website for more information" and provide the website link.
"""

2. Service-Agent (Lines 322-360)

Character Limit: 170-200 characters

Key Responsibilities:

  • Service-related inquiries, technical support, troubleshooting
  • Clear, structured, concise answers
  • Professional, empathetic tone

Response Structure:

  1. Acknowledgment of issue (1 sentence)
  2. Solution/next steps (step-by-step format)
  3. Relevant service options/resources
  4. Call-to-action or follow-up question

Troubleshooting Approach:

  • Ask targeted diagnostic questions
  • Provide numbered step-by-step solutions
  • Include visual cues ("look for the blue button labeled 'Settings'")
  • Offer alternative solutions
  • Explain when professional intervention needed

Conversation Flow:

1. Issue identification
2. Solution presentation
3. Verification/next steps
4. Additional help options

Redirect for non-service queries:

"I'm specialized in helping with services and technical support. For that topic, you might want to check with our general information team. How can I assist with your service needs today?"

3. Custom-Agent (Lines 361-382)

Character Limit: 200-300 characters

Key Responsibilities:

  • Informational agent for products, services, policies
  • Accurate, relevant, concise information
  • Professional yet conversational tone

Response Structure:

  1. Direct answer (1-2 sentences)
  2. Additional context (if applicable)
  3. Concise within 200-300 characters
  4. Next step/call-to-action

Query Handling:

  • Product/Service: Name, key features, pricing, unique selling points
  • Policy: Clear summary without legal jargon
  • Comparison: Balanced view, highlight key differences
  • Unknown info: "I don't have that specific information" + offer related topics

Redirect for unrelated queries:

"I'm here to help with questions about products and services. What would you like to know about those?"

Prompt Selection Logic:

chatbot_purpose = chatbot_config.get("chatbot_purpose", "Sales-Agent")
system_prompt = system_prompts.get(chatbot_purpose, system_prompts["Sales-Agent"])
# Sales-Agent used as fallback if purpose not found

Why 3 prompts? Different use cases require different conversation styles:

  • Sales: Persuasive, driving conversions
  • Service: Problem-solving, technical support
  • Custom/Informational: Educational, policy explanations

RAG Implementations

CRITICAL (Security)

  1. ⚠️ Move ALL API Keys to Environment (7+ hardcoded keys!)
  2. ⚠️ Fix File Naming - Use UUID to prevent collisions
  3. ⚠️ Add Rhubarb Health Check - Verify executable on startup
  4. ⚠️ Implement Guardrails - Activate content safety checks

Performance

  1. Optimize Lip-Sync - Run Rhubarb in parallel with other tasks
  2. Cache TTS + Lip-Sync - Same text = reuse audio + animation
  3. Streaming Implementation - Implement full streaming support
  4. Async Everything - Convert synchronous operations to async

Code Quality

  1. Split Service - Consider separating lip-sync generation
  2. Remove Unused Models - 11 models, likely only 2-3 used
  3. Add Type Hints - Complete typing for 2,534 lines
  4. Comprehensive Tests - Unit tests for lip-sync pipeline

Last Updated: 2025-12-26
Code Version: response-3d-chatbot-service/src/main.py (2,534 lines)
Total Functions: 81
Total Endpoints: 3+
Review Cycle: WEEKLY (CRITICAL SERVICE)


"The flagship experience - where AI meets immersive 3D interaction."