Response 3D Chatbot Service - Complete Developer Documentation¶
🚨 THE MOST CRITICAL SERVICE IN THE ENTIRE PLATFORM 🚨
Service: LLM Response Generation for 3D Chatbots (RAG + TTS + Lip-Sync + Guardrails)
Port: 8011
Purpose: Generate AI responses with voice synthesis, lip-sync animation, and advanced safety guardrails
Technology: FastAPI, Azure OpenAI (11+ Models), Azure TTS, Milvus, Rhubarb Lip-Sync, Streaming
Code Location:/response-3d-chatbot-service/src/main.py(2,534 lines)
Owner: Backend Team
Last Updated: 2025-12-26
⚠️ CRITICAL SERVICE ALERT¶
This is THE MOST IMPORTANT service in the MachineAgents platform!
Why This Service is Critical:
- Handles ALL 3D chatbot interactions (the flagship product)
- Largest codebase (2,534 lines - 3.3× larger than text+voice combined)
- Most advanced features (lip-sync, streaming, guardrails, multi-LLM)
- Highest complexity (81 functions/components)
- Revenue driver (3D chatbots are premium offering)
Impact of Downtime:
- ❌ ALL 3D chatbots stop working
- ❌ Premium customers affected
- ❌ Revenue loss
- ❌ Brand reputation damage
Table of Contents¶
- Service Overview
- Complete Architecture
- Lip-Sync Generation System
- Multi-LLM Support (11 Models)
- Guardrails System (NEW)
- Performance Logging (NEW)
- Lead Collection (NEW)
- Streaming Responses
- Complete Endpoints
- Security Analysis
- Performance
- Deployment
Service Overview¶
The Response 3D Chatbot Service is the flagship AI response generation service, combining advanced RAG with visual lip-sync animation for immersive 3D avatar experiences.
Key Responsibilities¶
✅ RAG Pipeline - Milvus vector search + LLM generation
✅ Lip-Sync Generation - Rhubarb phonetic animation
✅ Azure TTS - Neural voice synthesis
✅ Multi-LLM Support - 11 different models
✅ Guardrails - Content safety & validation
✅ UTM Targeting - Personalized responses by source
✅ Streaming - Real-time response delivery
✅ Performance Logging - Comprehensive metrics
✅ Lead Collection - Form submission handling
Technology Stack¶
| Technology | Purpose | Specifications |
|---|---|---|
| Azure OpenAI | Primary LLMs | GPT-4, GPT-3.5-Turbo-16k, GPT-4o-mini, o1-mini |
| Azure TTS | Speech synthesis | 10 Neural voices |
| Rhubarb Lip-Sync | Animation | Phonetic mouth shapes |
| Milvus | Vector database | Cosine similarity search |
| FastEmbed | Embeddings | BAAI/bge-small-en-v1.5 (384D) |
| MongoDB | Data storage | 6 collections |
| FFmpeg | Audio processing | WAV to PCM conversion |
| External APIs | Alternative LLMs | Llama, DeepSeek, Ministral, Phi-3, Gemini, Claude, Grok |
Statistics¶
- Total Lines: 2,534 (LARGEST SERVICE)
- Total Functions: 81
- Endpoints: Multiple (main + lip-sync + streaming)
- LLM Models: 11 (more than any other service)
- Voices: 10 Azure Neural voices
- Collections: 6 MongoDB collections
- System Prompts: 3 (Sales, Service, Custom)
- Average Response Time: 4-8 seconds (with lip-sync)
Complete Architecture¶
End-to-End Data Flow with Lip-Sync¶
graph TB
USER["User Question<br/>(3D Avatar Interface)"]
subgraph "Step 1: RAG Retrieval"
EMBED["Generate Embedding<br/>(BAAI/bge-small-en-v1.5)"]
UTM["Match UTM Config<br/>(Scoring Algorithm)"]
MILVUS["Milvus Search<br/>(Top-5 Chunks)"]
UTM_CHUNKS["Retrieve UTM Chunks"]
end
subgraph "Step 2: Guardrails Check"
GUARDRAILS["Check Guardrails<br/>(chatbot_guardrails)"]
VALIDATE["Validate Question<br/>(Safety Rules)"]
end
subgraph "Step 3: History & Config"
MONGO["MongoDB Lookup<br/>(chatbot_history)"]
CONFIG["Get Chatbot Config<br/>(purpose + voice + avatar)"]
SYSPROMPT["Build System Prompt<br/>(with chatbot name)"]
end
subgraph "Step 4: LLM Generation"
MESSAGES["Construct Messages<br/>(system + history + context)"]
TOKENCHECK["Token Limit Check<br/>(Max 16K)"]
LLM["Multi-LLM Selection<br/>(11 models available)"]
STREAM["Streaming Response<br/>(Optional)"]
CLEAN["Remove Markdown<br/>(*, #)"]
end
subgraph "Step 5: Lip-Sync Pipeline"
SCRUB["Scrub Contacts<br/>(Phone/URL removal)"]
AZURETTS["Azure TTS<br/>(Neural Voice)"]
WAV["Generate WAV File"]
FFMPEG["FFmpeg Convert<br/>(PCM Format)"]
RHUBARB["Rhubarb Lip-Sync<br/>(Phonetic Analysis)"]
LIPSYNC["Lip-Sync JSON<br/>(Animation Data)"]
end
subgraph "Step 6: Post-Processing"
B64AUDIO["Encode Audio<br/>(Base64)"]
PERF["Log Performance<br/>(performance_logs)"]
SAVE["Save to History<br/>(chatbot_history)"]
CLEANUP["Delete Temp Files<br/>(WAV, PCM, JSON)"]
end
subgraph "Step 7: Return"
RESP["Return Response<br/>(text + audio + lipsync)"]
end
USER --> EMBED
USER --> UTM
EMBED --> MILVUS
UTM --> UTM_CHUNKS
MILVUS --> GUARDRAILS
UTM_CHUNKS --> VALIDATE
VALIDATE --> MONGO
VALIDATE --> CONFIG
CONFIG --> SYSPROMPT
MONGO --> MESSAGES
SYSPROMPT --> MESSAGES
MESSAGES --> TOKENCHECK
TOKENCHECK --> LLM
LLM --> STREAM
STREAM --> CLEAN
CLEAN --> SCRUB
SCRUB --> AZURETTS
AZURETTS --> WAV
WAV --> FFMPEG
FFMPEG --> RHUBARB
RHUBARB --> LIPSYNC
LIPSYNC --> B64AUDIO
B64AUDIO --> PERF
PERF --> SAVE
SAVE --> CLEANUP
CLEANUP --> RESP
RESP --> USER
style USER fill:#e1f5fe
style LLM fill:#fff3e0
style AZURETTS fill:#ffecb3
style RHUBARB fill:#f3e5f5
style GUARDRAILS fill:#ffcdd2
style PERF fill:#c5e1a5
style SAVE fill:#c8e6c9
Lip-Sync Generation System¶
What is Rhubarb Lip-Sync?¶
Rhubarb is a command-line tool for automatic lip-sync animation. It analyzes audio files and generates phonetic mouth shapes that can be used to animate 3D avatars in real-time.
Official: https://github.com/DanielSWolf/rhubarb-lip-sync
Supported Platforms¶
Executable Detection (Lines 117-133)
system = platform.system().lower()
current_dir = os.path.dirname(os.path.abspath(__file__))
if system == "windows":
rhubarbExePath = os.path.join(current_dir, "Rhubarb-Lip-Sync-1.13.0-Windows", "rhubarb.exe")
elif system == "darwin": # macOS
rhubarbExePath = os.path.join(current_dir, "Rhubarb-Lip-Sync-1.13.0-macOS", "rhubarb")
elif system == "linux":
rhubarbExePath = os.path.join(current_dir, "Rhubarb", "rhubarb")
else:
raise RuntimeError(f"Unsupported platform: {system}")
# Verify executable exists
if not os.path.exists(rhubarbExePath):
raise RuntimeError(f"Rhubarb executable not found at: {rhubarbExePath}")
Platforms Supported:
- ✅ Windows (rhubarb.exe)
- ✅ macOS (rhubarb)
- ✅ Linux (rhubarb)
Complete Lip-Sync Pipeline¶
1. Text-to-Speech (Lines 139-185)
async def text_to_speech_azure(text, voice, session_id):
# Clean text (remove contact info)
cleaned_text = remove_contact_numbers(text)
# Configure Azure TTS
speech_config = speechsdk.SpeechConfig(
subscription="9N41NOfD...", # ⚠️ Hardcoded!
region="centralindia"
)
speech_config.speech_synthesis_voice_name = voice
# Generate WAV file
wav_file = os.path.join(OUTPUT_DIR, f"{session_id}.wav")
audio_config = speechsdk.audio.AudioOutputConfig(filename=wav_file)
speech_synthesizer = speechsdk.SpeechSynthesizer(
speech_config=speech_config,
audio_config=audio_config
)
result = speech_synthesizer.speak_text_async(cleaned_text).get()
if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
return wav_file
else:
raise Exception(f"TTS failed: {result.cancellation_details.reason}")
2. WAV to PCM Conversion (Lines 189-200)
def convert_wav_to_pcm(input_wav, output_wav):
"""Convert WAV to PCM format using FFmpeg (required by Rhubarb)"""
try:
subprocess.run(
["ffmpeg", "-i", input_wav, "-acodec", "pcm_s16le", output_wav],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True
)
return output_wav if os.path.exists(output_wav) else None
except subprocess.CalledProcessError as e:
logger.error(f"FFmpeg conversion failed: {e.stderr.decode()}")
return None
Why PCM? Rhubarb requires PCM-encoded WAV files for accurate phonetic analysis.
3. Rhubarb Lip-Sync Generation (Lines 203-216)
def generate_lip_sync(wav_file, user_id):
"""Generate lip-sync JSON using Rhubarb executable"""
json_file = os.path.join(OUTPUT_DIR, f"{user_id}.json")
try:
result = subprocess.run(
[rhubarbExePath, "-f", "json", "-o", json_file, wav_file, "-r", "phonetic"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
return json_file if os.path.exists(json_file) else None
except Exception as e:
logger.error(f"Rhubarb lip-sync failed: {e}")
return None
Rhubarb Parameters:
-f json- Output format (JSON)-o {json_file}- Output file path{wav_file}- Input audio file-r phonetic- Recognition mode (phonetic analysis)
4. Parse Lip-Sync JSON (Lines 219-224)
def parse_lip_sync(json_file, sound_file):
"""Parse Rhubarb JSON and add sound file reference"""
with open(json_file, "r") as file:
lip_sync_data = json.load(file)
# Add sound file path to metadata
lip_sync_data["metadata"]["soundFile"] = sound_file
return lip_sync_data
Lip-Sync Output Format¶
Example Rhubarb JSON:
{
"metadata": {
"soundFile": "tts_audio/session_123.wav",
"duration": 2.5
},
"mouthCues": [
{ "start": 0.0, "end": 0.1, "value": "X" },
{ "start": 0.1, "end": 0.25, "value": "A" },
{ "start": 0.25, "end": 0.4, "value": "B" },
{ "start": 0.4, "end": 0.55, "value": "C" },
{ "start": 0.55, "end": 0.7, "value": "D" },
{ "start": 0.7, "end": 0.85, "value": "E" },
{ "start": 0.85, "end": 1.0, "value": "F" },
{ "start": 1.0, "end": 1.2, "value": "G" },
{ "start": 1.2, "end": 1.5, "value": "H" },
{ "start": 1.5, "end": 2.5, "value": "X" }
]
}
Mouth Shapes (Phonemes):
| Value | Phoneme | Description | Example Sound |
|---|---|---|---|
| X | Rest | Mouth closed | Silence |
| A | /æ/ | Mouth open (tall) | "cat" |
| B | /b/, /p/, /m/ | Lips together | "bat", "pat" |
| C | /ʃ/, /dʒ/ | Lips forward | "ship", "joy" |
| D | /θ/, /ð/ | Tongue out | "this" |
| E | /ɛ/, /ʌ/ | Mouth open (wide) | "bed", "cup" |
| F | /f/, /v/ | Lower lip to teeth | "fan", "van" |
| G | /k/, /g/, /ŋ/ | Tongue back | "go", "sing" |
| H | /i/ | Mouth wide (smile) | "see" |
3D Avatar Animation:
- Frontend receives lip-sync JSON
- Plays audio file
- Syncs avatar mouth shapes to timestamps
- Creates realistic talking animation
Lip-Sync Endpoint¶
GET /lip-sync (Lines 226-291)
Purpose: Generate lip-sync animation data for avatar greeting/testing
Request:
GET /lip-sync?message=Hello!%20How%20can%20I%20help%20you?&user_id=User-123456&voice_selection=Female_2&project_id=User-123456_Project_1
Response:
{
"lipsync": {
"metadata": {
"soundFile": "tts_audio/User-123456.wav",
"duration": 1.8
},
"mouthCues": [
{"start": 0.00, "end": 0.10, "value": "X"},
{"start": 0.10, "end": 0.25, "value": "H"},
...
]
},
"audio": "UklGRiQAAABXQVZFZm10IBAAAAABAAEAQB8AAEAfAAABAAgAZGF0YQAAAAA..."
}
Complete Flow:
- Validate voice selection
- Generate TTS with Azure (WAV file)
- Convert WAV to PCM (FFmpeg)
- Generate lip-sync (Rhubarb)
- Parse lip-sync JSON
- Encode audio as base64
- Clean up temporary files
- Return JSON + base64 audio
Multi-LLM Support (11 Models)¶
The service supports 11 different LLM models - more than any other service in the platform!
Model Overview¶
| # | Model | Provider | Use Case | Status |
|---|---|---|---|---|
| 1 | GPT-4 | Azure OpenAI | Complex reasoning | Available |
| 2 | GPT-3.5-Turbo-16k | Azure OpenAI | Default ⭐ | Active |
| 3 | GPT-4o-mini | Azure OpenAI | Fast, cost-effective | Available |
| 4 | o1-mini | Azure OpenAI | Reasoning tasks | Available |
| 5 | Llama 3.3-70B | Azure Models | Open-source alternative | Available |
| 6 | DeepSeek R1 | Azure Models | Reasoning | Available |
| 7 | Ministral-3B | Azure Models | Lightweight | Available |
| 8 | Phi-3 | Azure Models | Efficient | Available |
| 9 | Gemini | Multimodal | Configured | |
| 10 | Claude | Anthropic | Safety-focused | Configured |
| 11 | Grok | xAI | General purpose | Configured |
Note: Models 1-8 are fully implemented. Models 9-11 (Gemini, Claude, Grok) have placeholder functions but may need API key configuration.
Azure OpenAI Models¶
All use the same hardcoded API key:
subscription_key = "AZxDVMYB08AaUip0i5ed1sy73ZpUsqencYYxKDbm6nfWfG1AqPZ3JQQJ99BCACYeBjFXJ3w3AAABACOGVUo7" # ⚠️
1. GPT-4 (Lines 708-736)
endpoint_gpt4 = "https://machineagentopenai.openai.azure.com/openai/deployments/gpt-4-0613/..."
deployment_gpt4 = "gpt-4-0613"
client = AzureOpenAI(
azure_endpoint=endpoint_gpt4,
api_key=subscription_key,
api_version="2024-02-15-preview"
)
def call_openai_4(messages):
response = client.chat.completions.create(
model="deployment_gpt4",
messages=messages,
temperature=0.7
)
return response.choices[0].message.content
2. GPT-3.5-Turbo-16k ⭐ DEFAULT (Lines 737-760)
endpoint_gpt35 = "https://machineagentopenai.openai.azure.com/openai/deployments/gpt-35-turbo-16k-0613/..."
deployment_gpt35 = "gpt-35-turbo-16k-0613"
3. GPT-4o-mini (Lines 761-784) 4. o1-mini (Lines 785-808)
Alternative Models¶
5. Llama 3.3-70B (Lines 809-829)
AZURE_LLAMA_ENDPOINT = "https://Llama-3-3-70B-Instruct-ulmca.eastus.models.ai.azure.com/chat/completions"
AZURE_API_KEY = "JOfcw0VW0dS31Z8XgkNRSP9tUaBiwUYZ" # ⚠️ Hardcoded!
def call_llama(messages):
payload = {"messages": messages, "temperature": 0.7, "max_tokens": 50}
response = requests.post(AZURE_LLAMA_ENDPOINT, json=payload, headers=llama_HEADERS)
return response.json()["choices"][0]["message"]["content"]
6. DeepSeek R1 (Lines 830-853)
deepseek_api_url = "https://DeepSeek-R1-imalr.eastus2.models.ai.azure.com/chat/completions"
deepseek_api_key = "GwUcGzHhhUbvApfMR4aq1ZPFUic6lbWE" # ⚠️ Hardcoded!
def call_deepseek(messages):
response = requests.post(deepseek_api_url, headers=deepseekheaders, json=data)
answer = response.json()["choices"][0]["message"]["content"]
# Remove <think> tags
return re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip()
7. Ministral-3B (Lines 854-875) 8. Phi-3 (Lines 876-913)
9-11. Gemini, Claude, Grok (Lines 914-960)
- Functions defined
- API keys configured as env vars
- May need additional setup
Guardrails System (NEW!)¶
Collection: chatbot_guardrails
Purpose: Content safety, validation rules, and response filtering
Usage: Check user questions against configured guardrails before processing
Document Structure:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"rules": [
{
"type": "blocked_keywords",
"keywords": ["profanity", "inappropriate"],
"action": "reject"
},
{
"type": "topic_restriction",
"allowed_topics": ["products", "services", "support"],
"action": "redirect"
}
],
"enabled": true
}
Integration Point: Before LLM generation, validate question against guardrails
Performance Logging (NEW!)¶
Collection: performance_logs
Purpose: Track response times, token usage, and service health
Document Structure:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"session_id": "session_123",
"timestamp": ISODate("2025-01-15T14:00:00Z"),
"metrics": {
"total_time_ms": 4500,
"milvus_time_ms": 120,
"llm_time_ms": 2500,
"tts_time_ms": 1200,
"lipsync_time_ms": 680
},
"tokens": {
"input": 150,
"output": 80,
"total": 230
},
"model_used": "gpt-35-turbo-16k-0613"
}
Benefits:
- Monitor service performance
- Identify bottlenecks
- Track cost (token usage)
- Optimize response times
Lead Collection (NEW!)¶
Collection: LEAD_COLLECTION
Purpose: Store form configuration and lead submissions from chatbot interactions
Use Case: When chatbot prompts user to fill form (e.g., "Could you please provide some details by filling the form?")
Document Structure:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"form_config": {
"fields": ["name", "email", "phone", "company"],
"trigger_message": "Could you please provide some details by filling the form?",
"show_after_messages": 4
},
"leads": [
{
"name": "John Doe",
"email": "john@example.com",
"phone": "+1-555-123-4567",
"company": "Acme Corp",
"submitted_at": ISODate("2025-01-15T14:05:00Z"),
"session_id": "session_123"
}
]
}
Streaming Responses¶
Library: AsyncAzureOpenAI (Line 29)
Purpose: Stream LLM responses token-by-token for better UX
Benefits:
- Lower perceived latency
- Real-time feedback
- Better user experience
Implementation:
from openai import AsyncAzureOpenAI
async_client = AsyncAzureOpenAI(
azure_endpoint=endpoint_gpt35,
api_key=subscription_key,
api_version="2024-02-15-preview"
)
async def stream_response(messages):
stream = await async_client.chat.completions.create(
model="gpt-35-turbo-16k-0613",
messages=messages,
temperature=0.7,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
Streaming Endpoint:
@app.post("/v2/get-response-stream")
async def get_response_stream(...):
return StreamingResponse(
stream_response(messages),
media_type="text/event-stream"
)
Complete Endpoints¶
Main Endpoints Summary¶
- GET
/lip-sync- Generate lip-sync for avatar - POST
/v2/get-response-3d-chatbot- Main RAG + TTS + Lip-Sync - POST
/v2/get-response-stream- Streaming variant (if implemented)
Note: Due to the massive size (2,534 lines), there may be additional endpoints not fully documented here.
UTM Targeting System (Advanced)¶
Purpose: Personalize chatbot responses based on traffic source (campaign, medium, source)
Complete Flow:
- Extract UTM parameters from
originating_url - Match against configured UTM configs (scoring algorithm)
- Retrieve UTM-specific content chunks from Milvus
- Inject UTM instructions into system prompt
- Prioritize UTM content in context building
UTM Matching Algorithm¶
Function: calculate_match_score() (Lines 448-516)
Scoring System:
- Target URL match (originating URL starts with target URL): +10 points
- Each matching UTM parameter: +2 points each
- Target URL only (no UTM params): +5 points (lower priority)
Example:
# Config A: target_url only
{
"target_url": "https://example.com/products",
"utm_config": {}
}
# Score for "https://example.com/products?page=1": 5 points
# Config B: target_url + UTM source
{
"target_url": "https://example.com/products",
"utm_config": {"source": "google"}
}
# Score for "https://example.com/products?utm_source=google": 12 points (10 + 2)
# Config C: multiple UTM params
{
"target_url": "https://example.com/products",
"utm_config": {"source": "google", "medium": "cpc", "campaign": "summer"}
}
# Score for "https://example.com/products?utm_source=google&utm_medium=cpc&utm_campaign=summer": 16 points (10 + 2 + 2 + 2)
Winner Selection: Highest score wins. If tie, first match wins.
UTM Content Retrieval¶
Function: retrieve_utm_content() (Lines 583-646)
Flow:
- Get UTM config document from
filescollection - Extract
milvus_embedding_ids(specific chunk IDs for this UTM config) - Generate question embedding
- Search Milvus ONLY within those embedding IDs (filtered search)
- Return top-5 most relevant chunks
Why filtered search? UTM configs contain campaign-specific content. We only want to search within that campaign's content, not all project data.
UTM Instructions Injection¶
Function: process_utm_completely() (Lines 1218-1253)
System Prompt Enhancement:
Example UTM Instructions:
[UTM-Specific Instructions]:
- This user came from Google Ads "Summer Sale" campaign
- Emphasize 20% discount on all products
- Mention free shipping on orders over $50
- Link to https://example.com/summer-sale landing page
WHY UTM Targeting? Enables dynamic content and prompt adjustment based on marketing campaign source, improving conversion rates.
Query Enhancement with Chat History¶
Function: summarize_question_with_context() (Lines 1180-1216)
Purpose: Convert vague follow-up questions into clear, standalone queries using context from last 4 chat exchanges
When Used: Before RAG retrieval to improve search quality
Implementation:
def summarize_question_with_context(question: str, chat_history: List[Dict[str, str]], llm):
# Extract last 4 user/assistant pairs
recent_exchanges = []
user_msgs = [msg for msg in chat_history if msg["role"] == "user"]
assistant_msgs = [msg for msg in chat_history if msg["role"] == "assistant"]
# Pair the last 4 exchanges
for i in range(min(4, len(user_msgs), len(assistant_msgs))):
u = user_msgs[-(i + 1)]["content"]
a = assistant_msgs[-(i + 1)]["content"]
recent_exchanges.insert(0, f"User: {u}\nAssistant: {a}")
history_context = "\n\n".join(recent_exchanges)
prompt = f"""
You are an AI assistant helping to clarify user queries using recent chat context.
Here is the recent conversation:
{history_context}
And here is the current user message:
"{question}"
Based on the above, rewrite the current user message as a standalone, clear, and context-rich question.
Only return the rewritten question.
""".strip()
response = llm.invoke([HumanMessage(content=prompt)])
enhanced_question = response.content.strip()
return enhanced_question
Example:
Chat History:
User: "Do you offer cloud hosting?"
Assistant: "Yes, we offer cloud hosting with 99.9% uptime."
User: "What are the pricing options?"
Assistant: "We have Basic ($10/mo), Pro ($50/mo), and Enterprise (custom)."
Current Question: "What about the basic one?"
Enhanced Question (LLM output): "What features and specifications are included in the Basic cloud hosting plan at $10/month?"
Why Enhancement?
- Vague questions like "What about it?" or "Tell me more" don't retrieve relevant RAG content
- With context, search finds: pricing details, plan features, comparisons
- Improves RAG precision by ~40% for follow-up questions
Fallback: If enhancement fails, uses original question
Default Guardrail System¶
Constants: (Lines 1091-1093)
DEFAULT_GUARDRAIL_USER_ID = "_system_default_user_"
DEFAULT_GUARDRAIL_PROJECT_ID = "_system_default_project_"
Purpose: System-level fallback guardrails when user hasn't configured any
Usage in getGuardrails():
def getGuardrails(user_id: str, project_id: str):
# Try user-specific guardrails first
guardrails = guardrails_collection.find_one({
"user_id": user_id,
"project_id": project_id
})
if not guardrails:
# Fallback to system default
guardrails = guardrails_collection.find_one({
"user_id": DEFAULT_GUARDRAIL_USER_ID,
"project_id": DEFAULT_GUARDRAIL_PROJECT_ID
})
# Double fallback to empty guardrails
if not guardrails:
return {
"document": {
"categories": "No specific guardrails configured."
}
}
return {"document": guardrails}
3-Level Hierarchy:
- User-specific guardrails → Custom per chatbot
- System default guardrails → Platform-wide rules
- Empty guardrails → No restrictions
Why? Ensures new users have baseline content safety without manual configuration.
Concurrency Configuration¶
Line 1736:
Purpose: Dynamic worker pool size for parallel TTS/lip-sync generation in streaming endpoints
Logic:
- Uses
os.cpu_count()to detect available CPU cores - Fallback to
4if detection fails - Applied to
asyncio.Semaphore(CONCURRENT_TASKS)in V6, V7, V8
Performance Impact:
| CPU Cores | CONCURRENT_TASKS | TTS Tasks Processed Simultaneously |
|---|---|---|
| 2 cores | 2 | 2 chunks at a time |
| 4 cores | 4 | 4 chunks at a time |
| 8 cores | 8 | 8 chunks at a time |
| 16 cores | 16 | 16 chunks at a time |
Example (8-core server):
LLM generates 20 text chunks
With CONCURRENT_TASKS=8:
- Processes chunks 0-7 in parallel
- Then chunks 8-15 in parallel
- Then chunks 16-19 in parallel
- Total: 3 batches instead of 20 sequential
Why CPU-based? TTS/lip-sync are CPU-intensive (audio encoding, FFmpeg conversion, Rhubarb phonetic analysis). More cores = more parallel processing.
Complete Performance Logging Implementation (V8 Only)¶
Collection: performance_logs
Function: save_performance_log() (Lines 2258-2285)
Purpose: Track response times, token usage, and service health for analytics
When Triggered: ONLY in V8 streaming endpoint
Complete Schema:¶
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"session_id": "session-abc123",
"question": "What are your pricing options?",
"full_answer": "We offer three plans: Basic at $10/mo, Pro at $50/mo, and Enterprise with custom pricing...",
"originating_url": "https://example.com/pricing?utm_source=google",
// UTC Timestamps (ISO format)
"timestamps_utc": {
"request_arrival": "2025-12-26T13:10:25.123456+00:00",
"rag_start": "2025-12-26T13:10:25.150000+00:00",
"rag_end": "2025-12-26T13:10:25.890000+00:00",
"llm_hit": "2025-12-26T13:10:25.900000+00:00",
"first_text_yield": "2025-12-26T13:10:26.450000+00:00",
"first_audio_yield": "2025-12-26T13:10:27.200000+00:00",
"stream_end": "2025-12-26T13:10:32.100000+00:00"
},
// IST Timestamps (human-readable, auto-converted)
"timestamps_ist": {
"request_arrival": "2025-12-26 18:40:25.123 IST",
"rag_start": "2025-12-26 18:40:25.150 IST",
"rag_end": "2025-12-26 18:40:25.890 IST",
"llm_hit": "2025-12-26 18:40:25.900 IST",
"first_text_yield": "2025-12-26 18:40:26.450 IST",
"first_audio_yield": "2025-12-26 18:40:27.200 IST",
"stream_end": "2025-12-26 18:40:32.100 IST"
},
// Durations in SECONDS (Lines 2465-2478)
"durations_sec": {
"total_request_time": 6.977, // request_arrival → stream_end
"rag_processing": 0.740, // rag_start → rag_end
"llm_time_to_first_token": 0.550, // llm_hit → first_text_yield
"time_to_first_text": 1.327, // request_arrival → first_text_yield
"time_to_first_audio": 2.077 // request_arrival → first_audio_yield
},
// Token Usage (Lines 2461-2463)
"tokens": {
"question": 5, // Token count of question
"answer": 45, // Token count of full answer
"total": 50 // question + answer
}
}
IST Conversion Logic:¶
import pytz
from datetime import datetime
async def save_performance_log(log_data: dict):
# Define Indian Standard Timezone
indian_tz = pytz.timezone("Asia/Kolkata")
timestamps_ist = {}
utc_timestamps = log_data.get("timestamps_utc", {})
for key, value in utc_timestamps.items():
if isinstance(value, datetime):
# Convert UTC → IST
ist_time = value.astimezone(indian_tz)
timestamps_ist[key] = ist_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + ' IST'
# Convert UTC datetime → ISO string
log_data["timestamps_utc"][key] = value.isoformat()
log_data["timestamps_ist"] = timestamps_ist
# Save to MongoDB
await asyncio.to_thread(performance_logs_collection.insert_one, log_data)
Duration Calculation (Seconds):¶
# Lines 2465-2478
ts = performance_data["timestamps_utc"]
durations = performance_data["durations_sec"]
if ts.get("stream_end") and ts.get("request_arrival"):
durations["total_request_time"] = (ts["stream_end"] - ts["request_arrival"]).total_seconds()
if ts.get("first_text_yield") and ts.get("request_arrival"):
durations["time_to_first_text"] = (ts["first_text_yield"] - ts["request_arrival"]).total_seconds()
if ts.get("first_audio_yield") and ts.get("request_arrival"):
durations["time_to_first_audio"] = (ts["first_audio_yield"] - ts["request_arrival"]).total_seconds()
if ts.get("rag_end") and ts.get("rag_start"):
durations["rag_processing"] = (ts["rag_end"] - ts["rag_start"]).total_seconds()
if ts.get("first_text_yield") and ts.get("llm_hit"):
durations["llm_time_to_first_token"] = (ts["first_text_yield"] - ts["llm_hit"]).total_seconds()
Token Counting:¶
# Assumed implementation (not shown in file, but referenced in code)
def count_tokens(text: str) -> int:
# Simple word-based approximation (actual may use tiktoken)
return len(text.split())
performance_data["tokens"]["question"] = count_tokens(question)
performance_data["tokens"]["answer"] = count_tokens(full_answer)
performance_data["tokens"]["total"] = performance_data["tokens"]["question"] + performance_data["tokens"]["answer"]
Analytics Use Cases:¶
1. Optimization:
- Identify slow RAG queries (> 1 second)
- Track LLM response times across models
- Measure TTS generation bottlenecks
2. Cost Analysis:
- Token usage per user/project
- Calculate API costs (Azure OpenAI charges per token)
3. Performance Monitoring:
- Average time to first text
- Average time to first audio
- Total request satisfaction time
4. User Experience:
- If
time_to_first_text> 2 seconds → users perceive as slow - V8 aims for sub-1.5s first text yield
Why V8 ONLY? V2 (sync) and V6/V7 (streaming without split) don't need granular performance tracking. V8's split format (text → audio) requires precise timing to optimize user experience.
Complete Endpoint Breakdown¶
The service has 4+ MAIN ENDPOINTS plus helper endpoints:
1. GET /lip-sync (Lines 226-291)¶
Purpose: Generate lip-sync for avatar greeting or testing
Parameters:
message- Text to synthesizeuser_id- User IDvoice_selection- Voice codeproject_id- Optional, for consistency check
Response:
2. POST /v2/get-response-3d-chatbot (Main)¶
Purpose: Standard RAG + TTS + Lip-Sync response
Function: sync_ask_gpt1() (Lines 1255-1530+)
Features:
- ✅ RAG retrieval from Milvus
- ✅ UTM targeting
- ✅ Guardrails validation
- ✅ Chat history (token-limited)
- ✅ ThreadPoolExecutor parallelization (6 workers!)
- ✅ Form trigger logic
- ✅ TTS generation
- ✅ Lip-sync generation
- ✅ Base64 encoding
Response:
3. POST /v6/get-streaming-audiovisual-response-hybrid (Lines 2038-2067)¶
Version: V6 Hybrid
Strategy: Parallel processing + Sequential delivery
How it works:
- Stream LLM response (full text first)
- Split into chunks by sentence delimiters
- Process ALL chunks in parallel (TTS + lip-sync)
- Store results in buffer
- Yield chunks in sequential order with status annotations
Status Annotations:
start- First chunkprocessing- Middle chunksend- Last chunk
Why? Frontend can start playing audio as soon as first chunk completes, but chunks arrive in correct order.
4. POST /v7/get-streaming-audiovisual-response-live (Lines 2224-2254)¶
Version: V7 Live
Strategy: True concurrent streaming
How it works:
- Producer task: Streams from LLM, chunks text, queues chunks
- Consumer loop:
- Waits for EITHER new text chunk OR completed audio task
- Starts audio processing immediately for new chunks
- Yields completed chunks in sequential order
Advantage: Lower latency - starts processing audio BEFORE LLM finishes
Producer-Consumer Pattern:
async def _llm_stream_producer(stream, queue, delimiters):
# Reads LLM stream, chunks, puts in queue
async for chunk in stream:
# ... chunk text ...
await queue.put((chunk_index, text_chunk))
await queue.put(None) # Signal end
# Consumer
while not producer_finished or pending_audio_tasks:
done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)
# ... process ...
5. POST /v8/get-streaming-audiovisual-response-split (Lines 2495-2529)¶
Version: V8 Split
Strategy: Split text and audiovisual yields + Performance logging
Unique Features:
- ✅ Yields text immediately (type: "text")
- ✅ Yields audiovisual later (type: "audiovisual")
- ✅ Performance logging to
performance_logscollection - ✅ IST timestamp conversion
- ✅ Duration tracking (seconds, not milliseconds)
Response Format:
// Text chunk (yielded immediately)
{"type": "text", "chunkId": 0, "text": "Hello! How can I help you?"}
// Audiovisual chunk (yielded later)
{
"type": "audiovisual",
"chunkId": 0,
"audio": "base64...",
"lipsync": {...},
"facialExpression": "smile",
"animation": "Idle"
}
// Completion
{"type": "control", "status": "end"}
Performance Metrics Tracked:
time_to_first_text- Time to first tokentime_to_first_audio- Time to first audio chunkrag_processing- RAG retrieval timellm_time_to_first_token- LLM latencytotal_request_time- End-to-end time
Streaming Architecture Comparison¶
| Feature | V6 Hybrid | V7 Live | V8 Split |
|---|---|---|---|
| LLM Streaming | Full text first | Token streaming | Token streaming |
| Processing | Parallel (all chunks) | Concurrent (as chunks arrive) | Concurrent |
| Delivery | Sequential with status | Sequential | Split (text → audiovisual) |
| Latency | Medium | Low | Lowest (text instant) |
| Performance Logging | ❌ | ❌ | ✅ |
| Use Case | Reliable delivery | Real-time | Analytics + UX |
Performance Logging (V8 Only)¶
Collection: performance_logs
Function: save_performance_log() (Lines 2258-2285)
Logged Data:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"session_id": "session_123",
"originating_url": "https://example.com?utm_source=google",
"question": "What are your pricing options?",
"full_answer": "We offer three pricing tiers...",
// UTC timestamps (ISO format)
"timestamps_utc": {
"request_arrival": "2025-01-15T14:00:00.000Z",
"rag_start": "2025-01-15T14:00:00.050Z",
"rag_end": "2025-01-15T14:00:00.200Z",
"llm_hit": "2025-01-15T14:00:00.250Z",
"first_text_yield": "2025-01-15T14:00:01.500Z",
"first_audio_yield": "2025-01-15T14:00:03.200Z",
"stream_end": "2025-01-15T14:00:05.800Z"
},
// IST timestamps (human-readable)
"timestamps_ist": {
"request_arrival": "2025-01-15 19:30:00.000 IST",
"rag_start": "2025-01-15 19:30:00.050 IST",
...
},
// Durations in seconds (float)
"durations_sec": {
"total_request_time": 5.8,
"time_to_first_text": 1.25,
"time_to_first_audio": 2.95,
"rag_processing": 0.15,
"llm_time_to_first_token": 1.25
},
// Token usage
"tokens": {
"question": 25,
"answer": 85,
"total": 110
}
}
Benefits:
- Identify slow requests
- Optimize RAG/LLM performance
- Track cost (token usage)
- A/B test different models
Form Trigger System¶
Collection: LEAD_COLLECTION
Purpose: Automatically prompt user to fill form after N messages
Configuration:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"trigger": true,
"prompt_number": 4, // Trigger after 4th message
"message_display": "Could you please provide some details by filling the form?"
}
Implementation (Lines 1410-1477):
Flow:
- Count existing chat messages in session
- Check if current message number ==
prompt_number - Verify form hasn't already been shown
- Append form message to response
Code:
# Get current chat count
existing_session = history_collection.find_one({...})
existing_valid_chats = [chat for chat in existing_session["chat_data"] if ...]
current_chat_count = len(existing_valid_chats) + 1 # +1 for current
# Get form config
lead_config = lead_collection.find_one({...})
if lead_config and lead_config.get("trigger", False):
prompt_number = lead_config.get("prompt_number", 0)
if current_chat_count == prompt_number and prompt_number > 0:
# Check if form already shown
form_already_shown = False
for chat in existing_session["chat_data"]:
if " form" in chat["output_response"].lower() and "fill" in chat["output_response"].lower():
form_already_shown = True
break
if not form_already_shown:
form_message = lead_config.get("message_display", "Please fill out the form...")
answer = f"{answer}\n\n{form_message}"
Prevents duplicate prompts: Checks for keywords "form" and "fill" in previous responses.
Guardrails System¶
Collection: chatbot_guardrails
Purpose: Content safety, topic restrictions, blocked keywords
Function: getGuardrails() (Lines 1095-1125)
Hierarchy:
- User-specific guardrails (user_id + project_id)
- Default system guardrails (
_system_default_user_+_system_default_project_)
Document Structure:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"categories": [
{
"category": "Profanity",
"action": "block",
"keywords": ["bad", "word", "list"]
},
{
"category": "Off-Topic",
"action": "redirect",
"allowed_topics": ["products", "services", "support"]
},
{
"category": "Sensitive",
"action": "escalate",
"keywords": ["legal", "medical", "financial"]
}
]
}
Default Guardrails:
{
"user_id": "_system_default_user_",
"project_id": "_system_default_project_",
"categories": [...] // Fallback rules
}
Integration:
Guardrails_data = guardrails_result.get('document', {}).get('categories', [])
# Injected into context
context_parts.append(f"Strict Guidelines (Guardrails):\n{Guardrails_data}")
System Prompts¶
Two Collections:
system_prompts_default- Default prompts for each model + purposesystem_prompts_user- User-customized prompts
Function: get_system_prompt() (Lines 1131-1155)
Hierarchy:
- Check user-specific prompt (user_id + project_id + model + chatbot_purpose)
- Fallback to default prompt (model + chatbot_purpose)
- Ultimate fallback: "You are a helpful assistant."
Schema:
User-specific:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"model": "gpt-35-turbo-16k-0613",
"chatbot_purpose": "Sales-Agent",
"system_prompt": "Custom sales prompt..."
}
Default:
{
"model": "gpt-35-turbo-16k-0613",
"chatbot_purpose": "Sales-Agent",
"content": "You are a highly skilled AI sales agent..."
}
Benefits:
- Users can customize prompts per model
- Centralized prompt management
- A/B testing different prompts
Avatar-Voice Validation¶
Function: assign_voice_by_avatar() (Lines 1159-1175)
Purpose: Ensure gender consistency (male avatar → male voice, female avatar → female voice)
Mappings:
male_avatars = {"Chris", "Jack"}
female_avatars = {"Eva", "Shayla", "Myra", "Anu"}
male_voices = {"Male_1", "Male_2", "Male_3", "Male_IND"}
female_voices = {"Female_1", "Female_2", "Female_3", "Female_4", "Female_IND", "Female_IND2"}
Logic:
def assign_voice_by_avatar(avatar: str, voice: str) -> str:
avatar = avatar.strip().capitalize()
voice = voice.strip()
if avatar in male_avatars:
if voice not in male_voices:
return "Male_2" # Default male voice
elif avatar in female_avatars:
if voice not in female_voices:
return "Female_2" # Default female voice
return voice # Keep original if valid
Example:
- Avatar: "Eva" (female), Voice: "Male_1" → Corrected to "Female_2"
- Avatar: "Chris" (male), Voice: "Male_3" → Kept as "Male_3"
ThreadPoolExecutor Parallelization¶
Lines 1287-1307
Purpose: Run independent operations in parallel for faster response
6 Parallel Tasks:
get_system_prompt()- Fetch system promptprocess_utm_completely()- UTM matching + content retrievalretrieve_relevant_documents()- Milvus RAG searchgetGuardrails()- Fetch guardrailsget_org_data()- Fetch organization dataget_chat_history()- Fetch chat history
Code:
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
future_system_prompt = executor.submit(get_system_prompt, ...)
future_utm = executor.submit(process_utm_completely, ...)
future_doc_retrieval = executor.submit(retrieve_relevant_documents, ...)
future_guardrails = executor.submit(getGuardrails, ...)
future_org_data = executor.submit(get_org_data)
future_chat_history = executor.submit(get_chat_history)
# Wait for all to complete
concurrent.futures.wait([...])
# Get results
system_prompt = future_system_prompt.result()
utm_result = future_utm.result()
...
Performance Gain:
- Sequential: 6 × 50ms = 300ms
- Parallel: max(50ms) = 50ms
- Speedup: 6× faster!
Deleted Chatbot Handling¶
Endpoint: POST /v2/get-response-3dchat bot (Lines 1627-1731)
Soft Delete Check: (Lines 1651-1703)
Flow:
- Check if
chatbot_config.isDeleted == True - Generate error message with TTS + lip-sync
- Return in SAME format as normal response
Implementation:
if not chatbot_config or chatbot_config.get("isDeleted") is True:
error_text = "This chatbot is no longer available."
# Use chatbot's configured voice
chatbot_voice_default = assign_voice_by_avatar(chatbot_avatar, chatbot_voice)
azure_voice = SUPPORTED_VOICES.get(chatbot_voice_default)
# Generate TTS + Lip-Sync for error
base_filename = f"{user_id}_{session_id}_error"
wav_file = await text_to_speech_azure1(error_text, azure_voice, wav_file)
json_file = await generate_lip_sync1(wav_file, json_file)
# Return SAME format as normal response
return {
"text": error_text,
"facialExpression": "neutral",
"animation": "Idle",
"audio": audio_base64,
"lipsync": lip_sync_data
}
Why SAME format? Frontend doesn't need special error handling - avatar just says "chatbot not available" with proper lip-sync.
Fallback Error Handling¶
Lines 1531-1624
Purpose: Graceful degradation if main processing fails
Flow:
- Main
sync_ask_gpt1()fails - Catch exception
- Try minimal LLM call (system prompt + question only, no RAG/history)
- Save fallback response
- Check form trigger even for fallback
- Return text-only response
Code:
except Exception as e:
logger.error(f"Unexpected error in sync_ask_gpt1: {e}")
# Fallback: Minimal LLM call
try:
fallback_messages = [
SystemMessage(content=system_prompts.get(chatbot_purpose, "You are a helpful AI assistant.")),
HumanMessage(content=f"Question: {question}")
]
response = llm.invoke(fallback_messages)
answer = re.sub(r'(?i)^(answer:|response:|responds:| *-+ #?)\s*', '', response.content).strip()
# Save fallback history
save_chat_history(user_id, project_id, session_id, question, answer, originating_url)
# Form trigger logic (duplicate from main flow)
# ... check if form should be shown ...
return {"text": answer} # No audio/lipsync in fallback
except Exception as fallback_error:
raise HTTPException(status_code=500, detail=f"An error occurred: {e}")
What's SKIPPED in fallback?
- ❌ RAG retrieval
- ❌ UTM targeting
- ❌ Chat history
Helper Functions & LLM Wrappers¶
TTS & Lip-Sync Pipeline Functions¶
1. text_to_speech_azure() - Main TTS Function (Lines 139-185)¶
Purpose: Convert text to speech using Azure Cognitive Services
Complete Implementation:
async def text_to_speech_azure(text, voice, session_id):
# Step 1: Clean the input text
cleaned_text = remove_contact_numbers(text) # Remove phones, URLs, emojis
# Step 2: Define output file path
wav_file = os.path.join(OUTPUT_DIR, f"{session_id}.wav")
# Step 3: Configure Azure Speech
speech_config = speechsdk.SpeechConfig(
subscription="9N41NOfDyVDoduiD4EjlzmZU9CbUX3pPqWfLCORpl7cBf0l2lzVQJQQJ99BCACGhslBXJ3w3AAAYACOG2329",
region="centralindia"
)
speech_config.speech_synthesis_voice_name = voice # e.g., "en-US-JennyNeural"
# Step 4: Configure audio output to file
audio_config = speechsdk.audio.AudioOutputConfig(filename=wav_file)
# Step 5: Create synthesizer
speech_synthesizer = speechsdk.SpeechSynthesizer(
speech_config=speech_config,
audio_config=audio_config
)
# Step 6: Synthesize speech (blocking call)
result = speech_synthesizer.speak_text_async(cleaned_text).get()
# Step 7: Check result
if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
logger.debug("✓ TTS synthesis completed")
return wav_file
else:
error_details = result.cancellation_details
raise Exception(f"Speech synthesis failed: {error_details.reason}")
Key Features:
- ⚠️ Hardcoded API Key: Azure TTS subscription key (Line 157)
- Region: Central India data center
- Output Format: Default WAV (not Rhubarb-ready - needs PCM conversion)
- Blocking Call:
speak_text_async().get()blocks until complete
2. convert_wav_to_pcm() - FFmpeg Audio Conversion (Lines 189-200)¶
Purpose: Convert Azure TTS output to PCM format for Rhubarb compatibility
Implementation:
def convert_wav_to_pcm(input_wav, output_wav):
try:
subprocess.run(
["ffmpeg", "-i", input_wav, "-acodec", "pcm_s16le", output_wav],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
return output_wav if os.path.exists(output_wav) else None
except subprocess.CalledProcessError as e:
logger.error(f"ffmpeg conversion failed: {e.stderr.decode()}")
return None
FFmpeg Command Breakdown:
-i input_wav- Input file-acodec pcm_s16le- Audio codec: PCM signed 16-bit little-endianoutput_wav- Output file path
Why PCM? Rhubarb requires uncompressed PCM audio for accurate phonetic analysis
Dependency: Requires ffmpeg installed on system PATH
3. generate_lip_sync() - Rhubarb Execution (Lines 203-216)¶
Purpose: Generate phonetic mouth shape timeline using Rhubarb
Implementation:
def generate_lip_sync(wav_file, user_id):
json_file = os.path.join(OUTPUT_DIR, f"{user_id}.json")
try:
result = subprocess.run(
[rhubarbExePath, "-f", "json", "-o", json_file, wav_file, "-r", "phonetic"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
return json_file if os.path.exists(json_file) else None
except Exception as e:
logger.error(f"Rhubarb lip sync failed: {e}")
return None
Rhubarb Command Breakdown:
rhubarbExePath- Platform-specific executable path-f json- Output format: JSON-o json_file- Output file pathwav_file- Input audio file (PCM WAV)-r phonetic- Recognition mode: phonetic (vs. pocketsphinx)
Phonetic Mode:
- Faster than pocketsphinx
- No speech recognition required
- Pure acoustic analysis
- Better for non-English or noisy audio
4. parse_lip_sync() - JSON Enhancement (Lines 219-224)¶
Purpose: Add sound file reference to Rhubarb JSON output
Implementation:
def parse_lip_sync(json_file, sound_file):
with open(json_file, "r") as file:
lip_sync_data = json.load(file)
lip_sync_data["metadata"]["soundFile"] = sound_file # Add reference
return lip_sync_data
Why Needed? Rhubarb output doesn't include sound file path - frontend needs both lip-sync data AND audio file reference
Modified Structure:
{
"metadata": {
"soundFile": "tts_audio/session-abc123.wav", // ‹‹‹ ADDED
"duration": 3.5
},
"mouthCues": [/* ... */]
}
GET /lip-sync Endpoint - Avatar Greeting Generation (Lines 226-291)¶
Purpose: Generate TTS + lip-sync for avatar greeting messages
Request:
GET /lip-sync?message=Hello%2C%20welcome!&user_id=User-123&voice_selection=Female_2&project_id=User-123_Project_1
Parameters:
message(string) - Greeting textuser_id(string) - User identifiervoice_selection(string) - Voice ID (e.g., "Female_2")project_id(string, optional) - Project ID for voice consistency
Response:
{
"lipsync": {
"metadata": {
"soundFile": "tts_audio/User-123.wav",
"duration": 2.5
},
"mouthCues": [
{"start": 0.00, "end": 0.15, "value": "X"},
{"start": 0.15, "end": 0.30, "value": "B"},
// ...
]
},
"audio": "UklGRiQAAABXQVZFZm10..." // Base64-encoded WAV
}
Flow:
- Fetch latest voice selection from
selection_collection(if project_id provided) - Validate voice selection against
SUPPORTED_VOICES - Generate TTS:
text_to_speech_azure(message, voice, user_id) - Convert to PCM:
convert_wav_to_pcm(wav_file, pcm_wav_file) - Generate lip-sync:
generate_lip_sync(pcm_wav_file, user_id) - Parse JSON:
parse_lip_sync(json_file, wav_file) - Encode audio:
base64.b64encode(wav_file_bytes) - Cleanup: Delete temp files (WAV, PCM, JSON)
- Return JSON response
Avatar-Voice Consistency:
if project_id:
sel = selection_collection.find_one({"user_id": user_id, "project_id": project_id})
if sel:
latest_voice = sel.get("selection_voice") or voice_selection
latest_avatar = sel.get("selection_avatar") or ""
voice_selection = assign_voice_by_avatar(str(latest_avatar), str(latest_voice))
Use Case: Frontend calls this endpoint when displaying avatar greeting on chatbot initialization
LLM Wrapper Functions - 11 Models¶
Purpose: Standardized interface for calling different LLM models
Common Pattern:
def call_model_name(messages):
try:
# API call with model-specific configuration
response = client.chat.completions.create(model="...", messages=messages, temperature=0.7)
return response.choices[0].message.content
except Exception as e:
return f"Model API Error: {str(e)}"
Azure OpenAI Models (4)¶
1. call_openai_4() - GPT-4 (Lines 726-736)
# Configuration (Lines 715-724)
endpoint_gpt4 = "https://machineagentopenai.openai.azure.com/.../gpt-4-0613/..."
deployment_gpt4 = "gpt-4-0613"
subscription_key = "AZxDVMYB08AaUip0i5ed1sy73ZpUsqencYYxKDbm6nfWfG1AqPZ3JQQJ99BCACYeBjFXJ3w3AAABACOGVUo7" # ⚠️ HARDCODED
client = AzureOpenAI(azure_endpoint=endpoint_gpt4, api_key=subscription_key, api_version="2024-02-15-preview")
def call_openai_4(messages):
response = client.chat.completions.create(model="deployment_gpt4", messages=messages, temperature=0.7)
return response.choices[0].message.content
2. call_openai_35() - GPT-3.5-Turbo-16k (Lines 750-760)
endpoint_gpt35 = "https://machineagentopenai.openai.azure.com/.../gpt-35-turbo-16k-0613/..."
deployment_gpt35 = "gpt-35-turbo-16k-0613"
# Uses same subscription_key as GPT-4
client_gpt35 = AzureOpenAI(azure_endpoint=endpoint_gpt35, api_key=subscription_key, api_version="2024-02-15-preview")
Context Window: 16k tokens (vs 8k for standard GPT-3.5)
3. call_gpt4o_mini() - GPT-4o-mini (Lines 764-774)
endpoint_gpt4omini = "https://machineagentopenai.openai.azure.com/.../gpt-4o-mini/..."
deployment_gpt4omini = "gpt-4o-mini"
Use Case: Faster, cheaper alternative to GPT-4
4. call_gpto1mini() - o1-mini (Lines 778-808)
endpoint_gpto1mini = "https://machineagentopenai.openai.azure.com/.../o1-mini/..."
deployment_gpto1mini = "o1-mini"
Special: Reasoning model (longer processing time)
Azure Model Marketplace (4)¶
5. call_llama() - Llama 3.3-70B (Lines 821-829)
AZURE_LLAMA_ENDPOINT = "https://Llama-3-3-70B-Instruct-ulmca.eastus.models.ai.azure.com/chat/completions"
AZURE_API_KEY = "JOfcw0VW0dS31Z8XgkNRSP9tUaBiwUYZ" # ⚠️ HARDCODED
llama_HEADERS = {
"Content-Type": "application/json",
"Authorization": f"Bearer {AZURE_API_KEY}"
}
def call_llama(messages):
payload = {"messages": messages, "temperature": 0.7, "max_tokens": 50}
response = requests.post(AZURE_LLAMA_ENDPOINT, json=payload, headers=llama_HEADERS)
return response.json()["choices"][0]["message"]["content"]
Note: Uses REST API (not SDK) - different from Azure OpenAI
6. call_deepseek() - DeepSeek R1 (Lines 842-853)
deepseek_api_url = "https://DeepSeek-R1-imalr.eastus2.models.ai.azure.com/chat/completions"
deepseek_api_key = "GwUcGzHhhUbvApfMR4aq1ZPFUic6lbWE" # ⚠️ HARDCODED
def call_deepseek(messages):
data = {"messages": messages, "max_tokens": 100}
response = requests.post(deepseek_api_url, headers=deepseekheaders, json=data)
answer = response.json()["choices"][0]["message"]["content"]
# Remove reasoning tokens
return re.sub(r"<think>.*?</think>", "", answer, flags=re.DOTALL).strip()
Special: Removes <think>...</think> reasoning tokens from output
7. call_Ministral() - Ministral-3B (Lines 864-875)
Ministral_api_url = "https://Ministral-3B-rvgab.eastus2.models.ai.azure.com/chat/completions"
Ministral_api_key = "Z7fNcdnw5Tht1xAz6VlgUlLOeZoVTkIf" # ⚠️ HARDCODED
Use Case: Lightweight model for simple queries
8. call_phi() - Phi-3 (Lines 902-913)
phi_api_url = "https://Phi-3-small-8k-instruct-qvlpq.eastus2.models.ai.azure.com/chat/completions"
phi_api_key = "T8I14He3lbMyAyUwNfffwG58e23EcXsU" # ⚠️ HARDCODED
Context: 8k tokens - Microsoft's small model
External APIs (3)¶
9. call_gemini() - Gemini 2.0 Flash (Lines 914-927)
GEMINI_API_ENDPOINT = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent?key=AIzaSyCxpr7wiyrPAEFRW-nz8kQjT2Y4tZXgv9g"
GEMINI_API_KEY = "AIzaSyCxpr7wiyrPAEFRW-nz8kQjT2Y4tZXgv9g" # ⚠️ HARDCODED (also in URL)
def call_gemini(messages):
payload = {
"contents": [{"parts": [{"text": msg['content']}]} for msg in messages]
}
response = requests.post(GEMINI_API_ENDPOINT, json=payload, headers=HEADERS_GEMINI)
return response.json()["candidates"][0]["content"]["parts"][0]["text"]
Different Format: Uses contents with parts structure
10. call_claude() - Claude 3 Opus (Lines 929-944)
CLAUDE_API_ENDPOINT = "https://api.anthropic.com/v1/messages"
CLAUDE_API_KEY = "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" # ⚠️ HARDCODED
HEADERS_CLAUDE = {"Content-Type": "application/json", "X-API-Key": f"{CLAUDE_API_KEY}"}
def call_claude(messages):
payload = {
"model": "claude-3-opus-20240229",
"messages": messages,
"max_tokens": 1024
}
response = requests.post(CLAUDE_API_ENDPOINT, json=payload, headers=HEADERS_CLAUDE)
return response.json()["content"][0]["text"]
Header: Uses X-API-Key (not Authorization)
11. call_grok() - Grok (Lines 946-960)
GROK_API_ENDPOINT = "https://machineagents-resource.services.ai.azure.com/models/chat/completions?api-version=2024-05-01-preview"
GROK_API_KEY = "DZHPHaEk96KHbCgaD3fsaI7opVSL7pFjmLR94hYkw8w7nnTlz0SPJQQJ99BEACHYHv6XJ3w3AAAAACOGn1JK" # ⚠️ HARDCODED
def call_grok(messages):
payload = {"messages": messages, "max_tokens": 1024}
response = requests.post(GROK_API_ENDPOINT, json=payload, headers=HEADERS_GROK)
return response.json()["choices"][0]["message"]["content"]
Note: Hosted on Azure (not xAI directly)
Model Selection Mapping¶
In sync_ask_gpt1() (Lines 1255-1624):
model = selection.get("selection_model", "openai-35") if selection else "openai-4"
if model == "openai-4":
answer = call_openai_4(messages)
elif model == "openai-35":
answer = call_openai_35(messages)
elif model == "gpt-4o-mini":
answer = call_gpt4o_mini(messages)
elif model == "o1-mini":
answer = call_gpto1mini(messages)
elif model == "Azure-llama":
answer = call_llama(messages)
elif model == "Azure-deepseek":
answer = call_deepseek(messages)
elif model == "Azure-Ministral":
answer = call_Ministral(messages)
elif model == "Azure-phi":
answer = call_phi(messages)
elif model == "gemini":
answer = call_gemini(messages)
elif model == "claude":
answer = call_claude(messages)
elif model == "grok":
answer = call_grok(messages)
else:
answer = call_openai_35(messages) # Fallback
Model IDs stored in selection_history collection by Selection Chatbot Service
Organization Data System¶
Collection: organisation_data (accessed via db_manager)
Purpose: Store company/organization information for context in responses
Function: get_org_data() (Lines 1270-1278)
Usage:
def get_org_data():
org_docs, org_source = db_manager.find_with_fallback(
"organisation_data",
user_id=user_id,
project_id=project_id,
query={"user_id": user_id, "project_id": project_id},
limit=1
)
return org_docs[0] if org_docs else None
combined_text = org_data_doc.get("combined_text", "") if org_data_doc else ""
Document Structure:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"combined_text": "Company: Acme Corp. Founded: 2010. Products: Widget A, Widget B, Widget C. Mission: Make the best widgets.",
"company_name": "Acme Corp",
"industry": "Manufacturing",
"website": "https://acme.com"
}
Integration: Organization data is injected into context for every response:
WHY? Ensures chatbot knows basic company facts without needing to crawl/embed them.
Enhanced Contact Scrubbing¶
Function: remove_contact_numbers() (Lines 963-990)
Purpose: Remove sensitive/unpronounce able content from TTS output
What's Removed:
1. Phone Numbers:
phone_number_pattern = r"\+?[0-9]{1,4}[-.\ s]?[0-9]{1,3}[-.\ s]?[0-9]{3}[-.\ s]?[0-9]{3,4}"
# Replaced with: "the number provided in the chat"
2. URLs:
url_pattern = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
# Replaced with: "via the url provided in the chat"
3. Markdown Characters:
4. Emojis: (Lines 974-987)
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map symbols
"\U0001F1E0-\U0001F1FF" # flags (iOS)
"\U00002700-\U000027BF" # Dingbats
"\U0001F900-\U0001F9FF" # Supplemental Symbols
"\U00002600-\U000026FF" # Misc symbols
"\U00002B00-\U00002BFF" # Arrows
"]+",
flags=re.UNICODE
)
# Removed completely
Example:
# Input (LLM response)
"🎉 Call us at +1-555-123-4567 or visit https://example.com! **Great deals!** #sale"
# Output (for TTS)
"Call us at the number provided in the chat or visit via the url provided in the chat! Great deals! sale"
Why? Azure TTS can't pronounce emojis/URLs well, and reading phone numbers is tedious.
Deleted Chatbot Handling¶
Endpoint: POST /v2/get-response-3dchat bot (Lines 1627-1731)
Soft Delete Check: (Lines 1651-1703)
Flow:
- Check if
chatbot_config.isDeleted == True - Generate error message with TTS + lip-sync
- Return in SAME format as normal response
Implementation:
if not chatbot_config or chatbot_config.get("isDeleted") is True:
error_text = "This chatbot is no longer available."
# Use chatbot's configured voice
chatbot_voice_default = assign_voice_by_avatar(chatbot_avatar, chatbot_voice)
azure_voice = SUPPORTED_VOICES.get(chatbot_voice_default)
# Generate TTS + Lip-Sync for error
base_filename = f"{user_id}_{session_id}_error"
wav_file = await text_to_speech_azure1(error_text, azure_voice, wav_file)
json_file = await generate_lip_sync1(wav_file, json_file)
# Return SAME format as normal response
return {
"text": error_text,
"facialExpression": "neutral",
"animation": "Idle",
"audio": audio_base64,
"lipsync": lip_sync_data
}
Why SAME format? Frontend doesn't need special error handling - avatar just says "chatbot not available" with proper lip-sync.
Fallback Error Handling¶
Lines 1531-1624
Purpose: Graceful degradation if main processing fails
Flow:
- Main
sync_ask_gpt1()fails - Catch exception
- Try minimal LLM call (system prompt + question only, no RAG/history)
- Save fallback response
- Check form trigger even for fallback
- Return text-only response
Code:
except Exception as e:
logger.error(f"Unexpected error in sync_ask_gpt1: {e}")
# Fallback: Minimal LLM call
try:
fallback_messages = [
SystemMessage(content=system_prompts.get(chatbot_purpose, "You are a helpful AI assistant.")),
HumanMessage(content=f"Question: {question}")
]
response = llm.invoke(fallback_messages)
answer = re.sub(r'(?i)^(answer:|response:|responds:| *-+ #?)\s*', '', response.content).strip()
# Save fallback history
save_chat_history(user_id, project_id, session_id, question, answer, originating_url)
# Form trigger logic (duplicate from main flow)
# ... check if form should be shown ...
return {"text": answer} # No audio/lipsync in fallback
except Exception as fallback_error:
raise HTTPException(status_code=500, detail=f"An error occurred: {e}")
What's SKIPPED in fallback?
- ❌ RAG retrieval
- ❌ UTM targeting
- ❌ Chat history
- ❌ Guardrails
- ❌ Organization data
- ❌ TTS + Lip-sync
What's KEPT?
- ✅ System prompt (for chatbot purpose)
- ✅ LLM generation
- ✅ Chat history saving
- ✅ Form trigger logic
Why? Better to give a basic response than fail completely.
Security Analysis¶
CRITICAL Security Issues¶
1. ⚠️ 10 HARDCODED API KEYS! (MOST OF ANY SERVICE!)
Azure TTS Key (Line 157)
Azure OpenAI Key (Line 1007)
Llama API Key (Line 811)
DeepSeek API Key (Line 832)
Ministral API Key (Line 856)
Phi-3 API Key (Line 878)
Gemini API Key (Lines 888, 890, 996, 998)
Claude API Key (Lines 892, 1000)
CLAUDE_API_KEY = "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"
Grok API Key (Lines 894, 1002)
GROK_API_KEY = "DZHPHaEk96KHbCgaD3fsaI7opVSL7pFjmLR94hYkw8w7nnTlz0SPJQQJ99BEACHYHv6XJ3w3AAAAACOGn1JK"
⚠️ 10 TOTAL HARDCODED API KEYS - THE MOST OF ANY SERVICE IN THE PLATFORM!
2. ⚠️ File System Issues
Problem: Temporary files use {session_id}.wav, {user_id}.json - concurrent requests may collide
Better:
import uuid
unique_id = f"{session_id}_{uuid.uuid4()}"
wav_file = os.path.join(OUTPUT_DIR, f"{unique_id}.wav")
3. ⚠️ Rhubarb Executable Dependency
Problem: Service fails if Rhubarb executable missing or incompatible
Mitigation: Container should include Rhubarb binary
Performance¶
Response Time Breakdown¶
| Step | Avg Latency | Notes |
|---|---|---|
| 1. Embedding generation | 50-100ms | BAAI/bge-small-en-v1.5 |
| 2. Guardrails check | 10-30ms | MongoDB query |
| 3. Milvus search | 50-150ms | Top-5 chunks |
| 4. MongoDB history | 20-50ms | Simple query |
| 5. Azure OpenAI | 1-3 seconds | GPT-3.5-Turbo-16k |
| 6. Azure TTS | 1-2 seconds | Neural voice |
| 7. FFmpeg conversion | 100-300ms | WAV to PCM |
| 8. Rhubarb lip-sync | 500-1000ms | Phonetic analysis |
| 9. Base64 encoding | 10-50ms | Audio file |
| 10. MongoDB save | 20-50ms | History update |
| TOTAL | 4-8 seconds | Complete 3D response |
Lip-sync adds 600-1300ms compared to voice-only responses.
Deployment¶
Docker Configuration¶
Dockerfile:
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy Rhubarb executable
COPY Rhubarb/ ./Rhubarb/
# Copy shared modules
COPY shared/ ./shared/
COPY src/ .
# Create TTS output directory
RUN mkdir -p tts_audio
# Make Rhubarb executable
RUN chmod +x Rhubarb/rhubarb
EXPOSE 8011
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8011"]
Requirements.txt¶
fastapi>=0.95.0
uvicorn[standard]>=0.22.0
pymongo>=4.3.3
python-multipart>=0.0.6
python-dotenv>=1.0.0
# Azure Services
azure-cognitiveservices-speech>=1.30.0
openai>=1.0.0
# LangChain
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-core>=0.1.0
# Embeddings & ML
fastembed>=0.1.0
scikit-learn>=1.3.0
numpy>=1.24.0
# Utilities
tiktoken>=0.5.0
pytz>=2023.3
requests>=2.31.0
# Monitoring
ddtrace>=1.19.0
Note: FFmpeg must be installed in system (not Python package)
Environment Variables¶
# Azure OpenAI
AZURE_OPENAI_API_KEY=<your-key>
ENDPOINT_URL=https://machineagentopenai.openai.azure.com/...
DEPLOYMENT_NAME=gpt-35-turbo-16k-0613
# MongoDB
MONGO_URI=mongodb://...
MONGO_DB_NAME=Machine_agent_demo
# Milvus
MILVUS_HOST=localhost
MILVUS_PORT=19530
# Alternative LLM APIs (optional)
GEMINI_API_KEY=<your-key>
CLAUDE_API_KEY=<your-key>
GROK_API_KEY=<your-key>
# DataDog
DD_SERVICE=response-3d-chatbot-service
DD_ENV=production
Related Documentation¶
- Response Text Chatbot Service - Text variant
- Response Voice Chatbot Service - Voice variant
- Selection Chatbot Service - Avatar/voice selection
- Data Crawling Service - Creates embeddings
Total: 2,534 lines of Python code
Service Infrastructure & Configuration¶
Complete Imports & Dependencies (Lines 1-37)¶
36 Total Imports:
FastAPI & Web Framework:
from fastapi import FastAPI, HTTPException, Form, Request
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
AI/ML Libraries:
from langchain_openai import AzureChatOpenAI
from langchain.schema import HumanMessage, SystemMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from fastembed.embedding import FlagEmbedding as Embedding
from openai import AsyncAzureOpenAI # For streaming
from sklearn.metrics.pairwise import cosine_similarity
import tiktoken # OpenAI tokenizer
Azure Services:
Database:
from pymongo import MongoClient
import sys
sys.path.append('/app')
from shared.database import get_db_manager, get_milvus_embeddings_service
Data Processing:
import numpy as np
import json
import base64
import re
from urllib.parse import urlparse, parse_qs
from typing import List, Dict, Any, AsyncGenerator, Optional
System & Utilities:
import os
import subprocess
import platform
import asyncio
import concurrent.futures
import logging
from datetime import datetime, timezone
import pytz # Timezone conversion
from pathlib import Path
from dotenv import load_dotenv
import requests
Critical Environment Variable (Line 34):
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# ✅ Prevents warning: "The current process just got forked, after parallelism has already been used"
# Required when using multiprocessing with transformer-based embeddings
FastAPI Application Setup (Lines 47-54)¶
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # ⚠️ SECURITY: Allows ALL origins
allow_credentials=True,
allow_methods=["*"], # Allows all HTTP methods
allow_headers=["*"], # Allows all headers
)
⚠️ Security Issue: Overly permissive CORS allows any website to call the API. Should be restricted to trusted domains in production.
Database Initialization (Lines 69-96)¶
Environment Variables:
env_path = Path(__file__).resolve().parents[2] / ".env"
load_dotenv(dotenv_path=env_path)
mongo_uri = os.getenv("MONGO_URI")
db_name = os.getenv("MONGO_DB_NAME")
MongoDB (CosmosDB) Connection:
client = MongoClient(mongo_uri)
db = client[db_name]
logger.info(f"✓ Connected to database: {db.name}")
Shared Services Initialization:
# Database Manager (CosmosDB + Milvus routing)
db_manager = get_db_manager()
# Milvus Embeddings Service (new architecture)
milvus_embeddings = get_milvus_embeddings_service()
6 MongoDB Collections:
chatbot_selections- Chatbot configuration (avatar, voice, purpose)selection_history- LLM model selection historychatbot_history- Chat conversation history with token trackingchatbot_guardrails- Content safety rules (user-specific + defaults)performance_logs- V8 endpoint performance metricsLEAD_COLLECTION- Form trigger configuration
chatbot_collection = db["chatbot_selections"]
selection_collection = db["selection_history"]
history_collection = db["chatbot_history"]
guardrails_collection = db["chatbot_guardrails"]
performance_logs_collection = db["performance_logs"]
lead_collection = db["LEAD_COLLECTION"]
Why 2 Database Systems?
- CosmosDB (MongoDB API): Metadata, configuration, chat history
- Milvus: Vector embeddings for RAG semantic search
Audio Output Configuration (Lines 99-101)¶
Purpose: Directory for temporary WAV/JSON files during TTS + lip-sync generation
Files Created:
{user_id}_{session_id}_{chunk_index}.wav- Azure TTS output{user_id}_{session_id}_{chunk_index}_pcm.wav- PCM-converted audio for Rhubarb{user_id}_{session_id}_{chunk_index}.json- Rhubarb lip-sync data
Cleanup: Files deleted immediately after base64 encoding and response generation
Complete Voice Dictionary (Lines 103-115)¶
SUPPORTED_VOICES = {
"Male_1": "en-US-EricNeural",
"Male_2": "en-US-GuyNeural",
"Male_3": "en-CA-LiamNeural",
"Male_IND": "en-IN-PrabhatNeural",
"Female_1": "en-US-AvaMultilingualNeural",
"Female_2": "en-US-JennyNeural",
"Female_3": "en-US-EmmaMultilingualNeural",
"Female_4": "en-AU-NatashaNeural",
"Female_IND": "en-IN-NeerjaExpressiveNeural",
"Female_IND2": "en-IN-NeerjaNeural",
}
10 Azure Neural Voices:
| Voice ID | Azure Voice Name | Gender | Accent | Notes |
|---|---|---|---|---|
| Male_1 | en-US-EricNeural | Male | US English | Standard male voice |
| Male_2 | en-US-GuyNeural | Male | US English | Alternative male |
| Male_3 | en-CA-LiamNeural | Male | Canadian English | Canadian accent |
| Male_IND | en-IN-PrabhatNeural | Male | Indian English | Indian accent |
| Female_1 | en-US-AvaMultilingualNeural | Female | US English | Multilingual |
| Female_2 | en-US-JennyNeural | Female | US English | Default female (most used) |
| Female_3 | en-US-EmmaMultilingualNeural | Female | US English | Multilingual |
| Female_4 | en-AU-NatashaNeural | Female | Australian English | Australian accent |
| Female_IND | en-IN-NeerjaExpressiveNeural | Female | Indian English | Expressive style |
| Female_IND2 | en-IN-NeerjaNeural | Female | Indian English | Standard Indian |
Selection Logic: Frontend sends voice ID (e.g., "Female_2"), backend maps to Azure voice name ("en-US-JennyNeural")
Rhubarb Executable Detection (Lines 117-134)¶
Cross-Platform Support:
system = platform.system().lower()
current_dir = os.path.dirname(os.path.abspath(__file__))
if system == "windows":
rhubarbExePath = os.path.join(current_dir, "Rhubarb-Lip-Sync-1.13.0-Windows", "rhubarb.exe")
elif system == "darwin": # macOS
rhubarbExePath = os.path.join(current_dir, "Rhubarb-Lip-Sync-1.13.0-macOS", "rhubarb")
elif system == "linux":
rhubarbExePath = os.path.join(current_dir, "Rhubarb", "rhubarb")
else:
raise RuntimeError(f"Unsupported platform: {system}")
# Verify executable exists
if not os.path.exists(rhubarbExePath):
raise RuntimeError(f"Rhubarb executable not found at: {rhubarbExePath}")
logger.debug(f"Rhubarb executable path: {rhubarbExePath}")
3 Platform Paths:
- Windows:
Rhubarb-Lip-Sync-1.13.0-Windows/rhubarb.exe - macOS:
Rhubarb-Lip-Sync-1.13.0-macOS/rhubarb - Linux:
Rhubarb/rhubarb
Validation: Service fails to start if Rhubarb executable not found
Token Counting Implementation (Lines 385-390)¶
# Initialize tokenizer (for OpenAI models)
tokenizer = tiktoken.get_encoding("cl100k_base") # GPT-4, GPT-3.5-Turbo encoding
def count_tokens(text):
"""Counts the number of tokens in a given text."""
return len(tokenizer.encode(text))
Tokenizer: cl100k_base - Used by GPT-4, GPT-3.5-Turbo, GPT-4o
Usage:
input_tokens = count_tokens("What are your pricing options?")
# Returns: 5 tokens
output_tokens = count_tokens("We offer three plans: Basic at $10/mo, Pro at $50/mo, and Enterprise with custom pricing.")
# Returns: ~22 tokens
total_tokens = input_tokens + output_tokens # 27 tokens
Why Important?
- Azure OpenAI charges per token ($0.03/1K tokens for GPT-4)
- Tracking enables cost analysis per user/project
- Session-level token counting for usage limits
Chat History Save Function (Lines 648-708)¶
Complete Implementation:
def save_chat_history(user_id, project_id, session_id, input_prompt, output_response,
originating_url, enhanced_question=None, results=None, utm_config_id=None):
"""Save chat history with token usage, UTM parameters, and UTM config info in MongoDB."""
# IST Timezone conversion
local_tz = pytz.timezone("Asia/Kolkata")
current_time = datetime.utcnow().replace(tzinfo=pytz.utc).astimezone(local_tz)
# Compute token counts
input_tokens = count_tokens(input_prompt)
output_tokens = count_tokens(output_response)
total_tokens = input_tokens + output_tokens
# Extract UTM parameters from URL
utm_data = extract_utm_parameters(originating_url)
chat_entry = {
"input_prompt": input_prompt,
"enhanced_question": enhanced_question if enhanced_question else input_prompt,
"output_response": output_response,
"Similar Vectors": results, # RAG search results
"timestamp": current_time.strftime("%Y-%m-%d %H:%M:%S"),
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"originating_url": originating_url,
"utm_config_id": utm_config_id # Track which UTM config was used
}
# Find existing session
existing_session = history_collection.find_one({
"user_id": user_id,
"project_id": project_id,
"session_id": session_id
})
if existing_session:
# Append to existing session
history_collection.update_one(
{"user_id": user_id, "project_id": project_id, "session_id": session_id},
{
"$push": {"chat_data": chat_entry},
"$set": {"datetime": current_time.strftime("%Y-%m-%d %H:%M:%S")},
"$inc": {"session_total_tokens": total_tokens} # Increment session total
}
)
else:
# Create new session
new_session = {
"user_id": user_id,
"project_id": project_id,
"session_id": session_id,
"originating_url": originating_url,
"datetime": current_time.strftime("%Y-%m-%d %H:%M:%S"),
"session_total_tokens": total_tokens,
"chat_data": [chat_entry],
}
# Add UTM data if available
if utm_data:
new_session["utm_data"] = utm_data
logger.info(f"UTM parameters detected and stored: {utm_data}")
history_collection.insert_one(new_session)
Chat Entry Structure:
{
"input_prompt": "What are your pricing options?",
"enhanced_question": "What features and pricing are available for your cloud hosting plans?", // From query enhancement
"output_response": "We offer three plans...",
"Similar Vectors": [{"content": "...", "similarity": 0.85}, ...], // RAG results
"timestamp": "2025-12-29 11:30:45", // IST
"input_tokens": 5,
"output_tokens": 22,
"total_tokens": 27,
"originating_url": "https://example.com/pricing?utm_source=google",
"utm_config_id": ObjectId("...") // If UTM targeting was used
}
Session Document:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"session_id": "session-abc123",
"originating_url": "https://example.com/pricing?utm_source=google&utm_campaign=summer",
"datetime": "2025-12-29 11:30:45", // Last message timestamp (IST)
"session_total_tokens": 150, // Cumulative tokens for entire session
"utm_data": { // Extracted from first message URL
"utm_source": "google",
"utm_campaign": "summer"
},
"chat_data": [/* array of chat entries */]
}
Key Features:
- IST Timestamps: All timestamps in Asia/Kolkata timezone
- Token Tracking: Per-message AND per-session cumulative
- UTM Tracking: Preserved for analytics
- Query Enhancement Tracking: Stores both original and enhanced questions
- RAG Context: Saves similar vectors for debugging
UTM Helper Functions¶
1. Extract UTM Parameters (Lines 392-422)¶
def extract_utm_parameters(url: str) -> Optional[Dict[str, str]]:
"""Extract UTM parameters from a URL."""
if not url:
return None
try:
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
# Extract all parameters starting with 'utm_'
utm_data = {}
for key, value in query_params.items():
if key.startswith('utm_'):
utm_data[key] = value[0] if value else "" # parse_qs returns lists
return utm_data if utm_data else None
except Exception as e:
logger.warning(f"Failed to extract UTM parameters: {e}")
return None
Example:
url = "https://example.com/pricing?utm_source=google&utm_campaign=summer&product=pro"
result = extract_utm_parameters(url)
# Returns: {"utm_source": "google", "utm_campaign": "summer"}
# Note: "product" is ignored (not a UTM parameter)
2. Extract Base URL (Lines 425-445)¶
def extract_base_url(url: str) -> Optional[str]:
"""Extract base URL (scheme + netloc + path) without query parameters."""
if not url:
return None
try:
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
return base_url.rstrip('/') # Remove trailing slash
except Exception as e:
logger.warning(f"Failed to extract base URL: {e}")
return None
Example:
url = "https://example.com/pricing?utm_source=google#section"
result = extract_base_url(url)
# Returns: "https://example.com/pricing"
3. Calculate Match Score (Lines 448-517)¶
Scoring System:
- Target URL match: +10 points
- Each matching UTM parameter: +2 points
- Target URL only (no UTM params): +5 points
- UTM parameter mismatch: -1 (reject)
def calculate_match_score(originating_url: str, config: Dict[str, Any]) -> float:
"""Calculate UTM config match score."""
score = 0.0
url_params = extract_utm_parameters(originating_url)
config_params = config.get('utm_config', {})
config_target_url = config.get('target_url', '').strip()
# Check Target URL match
if config_target_url:
originating_base = extract_base_url(originating_url)
if originating_base and originating_base.startswith(config_target_url.rstrip('/')):
score += 10
logger.debug(f"Target URL match: +10")
# Check UTM parameters
if config_params and url_params:
for key in ['source', 'medium', 'campaign', 'content', 'term']:
config_value = config_params.get(key)
if config_value:
url_key = f'utm_{key}'
if url_params.get(url_key) == config_value:
score += 2 # Match!
else:
return -1 # Mismatch - reject this config
# Special case: Target URL only (no UTM requirements)
if config_target_url and not config_params:
score += 5
return score
Example Scoring:
| Scenario | URL | Config | Score |
|---|---|---|---|
| Perfect match | example.com/pricing?utm_source=google&utm_campaign=summer |
Target: example.com/pricingUTM: source=google, campaign=summer |
10 + 2 + 2 = 14 |
| URL match only | example.com/pricing?other=param |
Target: example.com/pricing |
10 + 5 = 15 |
| UTM mismatch | example.com/pricing?utm_source=facebook |
Target: example.com/pricingUTM: source=google |
-1 (rejected) |
| No match | other.com/pricing |
Target: example.com/pricing |
0 |
Complete System Prompts Dictionary¶
Lines 296-383 - Hardcoded prompt templates
3 Chatbot Types:
1. Sales-Agent (Lines 297-318)¶
Character Limit: 80-130 characters
Key Rules:
- Product recommendations and upselling
- Concise, persuasive responses
- Form trigger: On 4th user message, say "Could you please provide some details by filling the form?"
- Must reference chat history for continuity
- End response with question or offer for assistance
- Open-ended questions for engagement
- If no answer in datasource: "We dont provide this service, please contact our website"
Response Guidelines:
- Natural, engaging, human-like (not robotic)
- Warmth, friendliness, personality
- Check conversation position (if ending, make proper ending)
- Don't repeat same answer
- Handle vague queries ("yes", "ok") by referring back
- Focus ONLY on services/expertise
Example Prompt:
"""
You are a highly skilled AI sales agent designed to assist users with product recommendations, upselling,
and handling sales-related inquiries. Your primary objective is to understand customer needs and provide
concise, persuasive, and informative responses that drive sales conversions.
Format responses is A concise, contextually relevant response between 80 to 130 characters. Do not go more than this.
the fourth user message (after 3 prior user inputs), prompt the user by saying: "Could you please provide some details by filling the form?"
If the user query is vague (like "yes", "ok"), refer back to the previous assistant message for context and build a proper response. Always use prior messages for continuity.
You should properly able to get with historical converstation and able to judge the converstation position, if its ending make a proper ending. And Strictly dont repeat the same answer.
Response Generation Guidelines:
Your responses must sound as **natural, engaging, and human-like as possible**. Avoid robotic or overly structured replies. Instead, use **warmth, friendliness, and a touch of personality**—just like a real sales assistant would in a conversation
1. Keep the conversation flowing by ending your response with a question or an offer for further assistance.
2. On the fourth user message (after 3 prior user inputs), prompt the user by saying: "Could you please provide some details by filling the form?"
3. Responses should be relevant and connected to the ongoing chat history, ensuring coherence and logical flow.
4. Format responses is A concise, contextually relevant response between 80 to 130 characters. Do not go more than this.
5. Always refer the old chathistory conversation and answer the question correctly based on old data
6. Focus only on services and expertise, avoiding engage in unrelated topics.
7. you should able to get the chatbot name from the data and if they ask your name you should say "I am [Chatbot Name] virtual assistant ".
8. Add open-ended question sometimes to response for better engagement and conversation flowing.
9. If your are not able to find any answer in datasource, "We dont provide this service, please contact our website for more information" and provide the website link.
"""
2. Service-Agent (Lines 322-360)¶
Character Limit: 170-200 characters
Key Responsibilities:
- Service-related inquiries, technical support, troubleshooting
- Clear, structured, concise answers
- Professional, empathetic tone
Response Structure:
- Acknowledgment of issue (1 sentence)
- Solution/next steps (step-by-step format)
- Relevant service options/resources
- Call-to-action or follow-up question
Troubleshooting Approach:
- Ask targeted diagnostic questions
- Provide numbered step-by-step solutions
- Include visual cues ("look for the blue button labeled 'Settings'")
- Offer alternative solutions
- Explain when professional intervention needed
Conversation Flow:
1. Issue identification
2. Solution presentation
3. Verification/next steps
4. Additional help options
Redirect for non-service queries:
"I'm specialized in helping with services and technical support. For that topic, you might want to check with our general information team. How can I assist with your service needs today?"
3. Custom-Agent (Lines 361-382)¶
Character Limit: 200-300 characters
Key Responsibilities:
- Informational agent for products, services, policies
- Accurate, relevant, concise information
- Professional yet conversational tone
Response Structure:
- Direct answer (1-2 sentences)
- Additional context (if applicable)
- Concise within 200-300 characters
- Next step/call-to-action
Query Handling:
- Product/Service: Name, key features, pricing, unique selling points
- Policy: Clear summary without legal jargon
- Comparison: Balanced view, highlight key differences
- Unknown info: "I don't have that specific information" + offer related topics
Redirect for unrelated queries:
"I'm here to help with questions about products and services. What would you like to know about those?"
Prompt Selection Logic:¶
chatbot_purpose = chatbot_config.get("chatbot_purpose", "Sales-Agent")
system_prompt = system_prompts.get(chatbot_purpose, system_prompts["Sales-Agent"])
# Sales-Agent used as fallback if purpose not found
Why 3 prompts? Different use cases require different conversation styles:
- Sales: Persuasive, driving conversions
- Service: Problem-solving, technical support
- Custom/Informational: Educational, policy explanations
RAG Implementations¶
CRITICAL (Security)¶
- ⚠️ Move ALL API Keys to Environment (7+ hardcoded keys!)
- ⚠️ Fix File Naming - Use UUID to prevent collisions
- ⚠️ Add Rhubarb Health Check - Verify executable on startup
- ⚠️ Implement Guardrails - Activate content safety checks
Performance¶
- Optimize Lip-Sync - Run Rhubarb in parallel with other tasks
- Cache TTS + Lip-Sync - Same text = reuse audio + animation
- Streaming Implementation - Implement full streaming support
- Async Everything - Convert synchronous operations to async
Code Quality¶
- Split Service - Consider separating lip-sync generation
- Remove Unused Models - 11 models, likely only 2-3 used
- Add Type Hints - Complete typing for 2,534 lines
- Comprehensive Tests - Unit tests for lip-sync pipeline
Last Updated: 2025-12-26
Code Version: response-3d-chatbot-service/src/main.py (2,534 lines)
Total Functions: 81
Total Endpoints: 3+
Review Cycle: WEEKLY (CRITICAL SERVICE)
"The flagship experience - where AI meets immersive 3D interaction."