Shared Module¶
Overview¶
The Shared Module is a critical shared library providing standardized infrastructure across all 20+ backend microservices. It contains common utilities for logging, database abstractions, vector embeddings, and Azure Blob Storage operations. This module ensures consistency, reduces code duplication, and provides a single source of truth for infrastructure patterns.
Path: machineagents-be/shared/
Purpose: Shared utilities and abstractions for all microservices
Components: 3 main modules (logging, database, storage)
Architecture¶
graph TB
subgraph "All 20+ Microservices"
Service1[Auth Service]
Service2[User Service]
Service3[Create Chatbot]
ServiceN[... 17 more services]
end
subgraph "Shared Module"
Logger[logger.py<br/>Standardized Logging]
subgraph "database/"
CosmosClient[cosmos_client.py<br/>CosmosDB Wrapper]
DBManager[db_manager.py<br/>DEPRECATED Manager]
MilvusService[milvus_embeddings_service.py<br/>Partition-Based Embeddings]
end
subgraph "storage/"
BlobService[azure_blob_service.py<br/>Azure Blob Storage]
end
end
Service1 --> Logger
Service1 --> CosmosClient
Service1 --> MilvusService
Service1 --> BlobService
Service2 --> Logger
Service2 --> CosmosClient
Service2 --> MilvusService
Service2 --> BlobService
Service3 --> Logger
Service3 --> CosmosClient
Service3 --> MilvusService
Service3 --> BlobService
ServiceN --> Logger
ServiceN --> CosmosClient
ServiceN --> MilvusService
ServiceN --> BlobService
CosmosClient -->|MongoDB Protocol| CosmosDB[(CosmosDB<br/>Metadata)]
MilvusService -->|Vector API| Milvus[(Milvus<br/>Embeddings)]
BlobService -->|REST API| Blob[Azure Blob<br/>Files]
style Logger fill:#4CAF50,color:#fff
style CosmosClient fill:#2196F3,color:#fff
style MilvusService fill:#FF9800,color:#fff
style BlobService fill:#9C27B0,color:#fff
style DBManager fill:#757575,color:#fff
Directory Structure¶
shared/
├── __init__.py
├── .env # Shared environment configuration
├── logger.py # Standardized logging (108 lines)
├── database/
│ ├── __init__.py # Database module exports
│ ├── cosmos_client.py # CosmosDB wrapper (68 lines)
│ ├── db_manager.py # DEPRECATED manager (130 lines)
│ └── milvus_embeddings_service.py # Milvus partition service (492 lines)
└── storage/
├── __init__.py # Storage module exports
└── azure_blob_service.py # Azure Blob operations (232 lines)
Component 1: Standardized Logging (logger.py)¶
Purpose: Provides consistent logging configuration across all microservices.
Lines of Code: 108
Key Functions: 3
API Reference¶
1. truncate_for_log(text: str, max_length: int = 100) -> str¶
Purpose: Truncate text for logging to avoid excessive log sizes.
Parameters:
text(str): Text to truncatemax_length(int, default=100): Maximum length before truncation
Returns: Truncated text with indication of total length
Example:
from shared.logger import truncate_for_log
long_text = "A" * 500
result = truncate_for_log(long_text, max_length=100)
# Output: "AAAA... (500 chars total)"
Implementation (Lines 18-34):
def truncate_for_log(text: str, max_length: int = 100) -> str:
if not text:
return ""
text_str = str(text)
if len(text_str) <= max_length:
return text_str
return text_str[:max_length] + f"... ({len(text_str)} chars total)"
2. get_logger(...) -> logging.Logger¶
Purpose: Get a configured logger for a service with console and optional file output.
Parameters:
service_name(str): Name of the service (used as logger name)log_file(str, optional): Optional log file name (defaults to None)log_level(int, default=logging.INFO): Logging levellog_dir(str, default="."): Directory for log files
Returns: Configured logging.Logger instance
Example:
from shared.logger import get_logger
logger = get_logger(service_name="auth-service", log_file="auth.log")
logger.info("Service started successfully")
logger.error("Failed to connect to database")
Implementation (Lines 37-92):
def get_logger(
service_name: str,
log_file: Optional[str] = None,
log_level: int = logging.INFO,
log_dir: str = "."
) -> logging.Logger:
# Use service name as logger name
logger = logging.getLogger(service_name)
# Only configure if not already configured (avoid duplicate handlers)
if logger.handlers:
return logger
logger.setLevel(log_level)
# Create formatter
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# Console handler (stdout)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler (if log_file specified)
if log_file:
try:
# Create log directory if it doesn't exist
log_path = Path(log_dir)
log_path.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_path / log_file, encoding='utf-8')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
except Exception as e:
# If file logging fails, just log to console
logger.warning(f"Could not create file handler for {log_file}: {e}")
# Prevent propagation to root logger to avoid duplicate logs
logger.propagate = False
return logger
Log Format:
2024-03-15 14:30:22 | INFO | ✓ Connected to MongoDB: Machine_agent_dev
2024-03-15 14:30:23 | WARNING | Quota limit exceeded
2024-03-15 14:30:25 | ERROR | ✗ Failed to fetch data: Connection timeout
Key Features:
- Prevents duplicate handlers (Lines 59-60): Checks if logger already configured
- Console + file logging (Lines 71-84): Dual output channels
- Prevents propagation (Line 90): Avoids duplicate logs from root logger
- Graceful degradation (Lines 85-87): Falls back to console-only if file logging fails
3. setup_basic_logging(...) -> logging.Logger¶
Purpose: Quick setup for basic logging (file + console). Convenience wrapper for get_logger.
Implementation (Lines 95-107):
def setup_basic_logging(service_name: str, log_file: str = "data.log"):
return get_logger(service_name=service_name, log_file=log_file)
Component 2: Database Module (database/)¶
2.1 CosmosDB Client (cosmos_client.py)¶
Purpose: Unified wrapper for MongoDB/CosmosDB operations.
Lines of Code: 68
Classes: 2
Class: CosmosCollectionWrapper¶
Purpose: Wrapper class providing unified interface for CosmosDB collections.
Methods:
| Method | Purpose | Signature |
|---|---|---|
insert_one |
Insert a single document | (document: Dict) |
find_one |
Find a single document | (query: Dict, projection: Dict = None) |
find |
Find multiple documents | (query: Dict, projection: Dict = None, limit: int = 100) |
update_one |
Update a single document | (query: Dict, update: Dict, upsert: bool = False) |
delete_many |
Delete multiple documents | (query: Dict) |
count_documents |
Count documents matching query | (query: Dict) |
Example Usage:
from shared.database import CosmosClient
# Initialize client
cosmos = CosmosClient()
# Get collection wrapper
users = cosmos["users_multichatbot_v2"]
# Insert
users.insert_one({"email": "user@example.com", "name": "John Doe"})
# Find
user = users.find_one({"email": "user@example.com"})
# Update
users.update_one(
{"email": "user@example.com"},
{"$set": {"name": "Jane Doe"}},
upsert=True
)
# Count
count = users.count_documents({"subscription_plan": "enterprise"})
Class: CosmosClient¶
Purpose: Wrapper for MongoDB/CosmosDB client with connection management.
Constructor (Lines 50-55):
def __init__(self, mongo_uri: str = None, db_name: str = None):
self.mongo_uri = mongo_uri or os.getenv("MONGO_URI")
self.db_name = db_name or os.getenv("MONGO_DB_NAME", "Machine_agent_dev")
self.client = MongoClient(self.mongo_uri)
self.db = self.client[self.db_name]
logger.info(f"Connected to CosmosDB: {self.db_name}")
Methods:
get_collection(name: str) -> CosmosCollectionWrapper(Lines 57-59)- Get a collection wrapper by name
__getitem__(name: str) -> CosmosCollectionWrapper(Lines 61-63)- Allow dict-like access:
cosmos["collection_name"] list_collection_names() -> list(Lines 65-67)- List all collections in the database
2.2 Database Manager (db_manager.py) [DEPRECATED]¶
[!WARNING] > DEPRECATED: This class is kept for backward compatibility with existing endpoints. New code should use
CosmosDBdirectly viapymongo.MongoClient.
Purpose: Provides database abstraction layer (now just routes to CosmosDB).
Lines of Code: 130
Status: ⚠️ DEPRECATED
Original Purpose (from docstring):
In the old architecture:
- Milvus stored both data AND embeddings
- Database selection logic (Milvus vs CosmosDB)
In the new architecture:
- CosmosDB stores all metadata and data
- Milvus stores only vector embeddings (via
MilvusEmbeddingsService) - Azure Blob Storage stores files
Class Methods:
get_collection(...)(Lines 40-59)- Returns CosmosDB collection (ignores all parameters for backward compatibility)
get_collection_with_fallback(...)(Lines 61-87)- Returns
(collection, data, "cosmosdb")(fallback logic removed) find_with_fallback(...)(Lines 89-117)- Returns
(documents, "cosmosdb")(fallback logic removed)
Singleton Pattern (Lines 120-129):
_db_manager = None
def get_db_manager() -> DatabaseManager:
"""Get global database manager instance (singleton)"""
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager
⚠️ Migration Path:
Instead of:
from shared.database import get_db_manager
db_manager = get_db_manager()
collection = db_manager.get_collection("users_multichatbot_v2")
Use:
from shared.database import CosmosClient
cosmos = CosmosClient()
collection = cosmos["users_multichatbot_v2"]
2.3 Milvus Embeddings Service (milvus_embeddings_service.py)¶
Purpose: Partition-based vector embeddings storage in Milvus for multi-tenancy.
Lines of Code: 492
Architecture: Each chatbot (project_id) gets its own partition for optimal performance and isolation
[!IMPORTANT] > Partition-Based Architecture: This service implements a partition-per-project strategy where each
project_idbecomes a separate partition in Milvus. This provides:
- Isolation: Data from different projects is physically separated
- Performance: Searches are scoped to a single partition (faster)
- Deletion: Entire project can be deleted by dropping partition (instant)
Schema Definition¶
Collection Fields (Lines 72-83):
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="user_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="project_id", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="chunk_index", dtype=DataType.INT32),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2000),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384), # BAAI/bge-small-en-v1.5
FieldSchema(name="data_type", dtype=DataType.VARCHAR, max_length=50), # pdf, text, qa, org, url
FieldSchema(name="source_url", dtype=DataType.VARCHAR, max_length=500),
FieldSchema(name="created_at", dtype=DataType.VARCHAR, max_length=100),
]
Vector Index (Lines 89-94):
index_params = {
"metric_type": "L2", # Euclidean distance
"index_type": "IVF_FLAT", # Inverted file index
"params": {"nlist": 128} # 128 clusters
}
Key Methods¶
1. insert_embeddings(...) -> List[int] (Lines 146-236)
Purpose: Insert embeddings into Milvus partition (partition-based architecture).
Parameters:
collection_name(str): Name of the collection (e.g.,'embeddings')embeddings_data(List[Dict]): List of embeddings with fields:document_id(str): Document identifieruser_id(str): User identifierproject_id(str): Project identifier (used as partition name)chunk_index(int): Chunk position in documenttext(str): Text content (max 2000 chars)embedding(List[float]): 384-dim vectordata_type(str): Type (pdf,text,qa,org,url)source_url(str, optional): URL for website crawl data
Returns: List of Milvus IDs for the inserted embeddings
Validation (Lines 182-184):
# Verify all embeddings have the same project_id
if not all(e.get('project_id') == project_id for e in embeddings_data):
raise ValueError("All embeddings in a batch must have the same project_id")
Example:
from shared.database import get_milvus_embeddings_service
milvus = get_milvus_embeddings_service()
embeddings_data = [
{
"document_id": "doc_123",
"user_id": "User_456",
"project_id": "Project_789",
"chunk_index": 0,
"text": "This is the first chunk of the document...",
"embedding": [0.1, 0.2, ..., 0.384], # 384-dim vector
"data_type": "pdf",
"source_url": ""
},
# ... more chunks
]
milvus_ids = milvus.insert_embeddings("embeddings", embeddings_data)
# Returns: [449834597376389120, 449834597376389121, ...]
Process Flow:
1. Get collection
2. Extract project_id from first embedding
3. Validate all embeddings have same project_id
4. Sanitize partition name (replace "-" with "_")
5. Get or create partition
6. Prepare data arrays
7. Insert into partition
8. Flush to persist
9. Reload partition for immediate querying
10. Return Milvus IDs
2. search_embeddings(...) -> List[Dict] (Lines 238-316)
Purpose: Search for similar embeddings within a specific partition.
Parameters:
collection_name(str): Collection namequery_vector(List[float]): Query embedding vector (384-dim)user_id(str): User ID (for logging/validation)project_id(str): Project ID (used as partition name)top_k(int, default=5): Number of results to returnmilvus_ids(List[int], optional): Specific Milvus IDs to search within
Returns: List of search results with {milvus_id, document_id, chunk_index, text, data_type, distance, score}
Example:
query_vector = [0.1, 0.2, ..., 0.384] # 384-dim vector from encoder
results = milvus.search_embeddings(
collection_name="embeddings",
query_vector=query_vector,
user_id="User_456",
project_id="Project_789",
top_k=5
)
# Results:
# [
# {
# 'milvus_id': 449834597376389120,
# 'document_id': 'doc_123',
# 'chunk_index': 5,
# 'text': 'Relevant text content...',
# 'data_type': 'pdf',
# 'distance': 0.234,
# 'score': 0.810 # 1 / (1 + distance)
# },
# ...
# ]
Search Parameters (Line 286):
Search Scoping (Line 294):
Score Calculation (Line 308):
3. delete_embeddings_by_document(...) -> bool (Lines 318-364)
Purpose: Delete all embeddings for a specific document within a partition.
Parameters:
collection_name(str): Collection namedocument_id(str): Document ID to deleteproject_id(str, optional): Project ID (partition name). If not provided, searches all partitions (slower)
Example:
# Delete specific document from specific partition (fast)
success = milvus.delete_embeddings_by_document(
collection_name="embeddings",
document_id="doc_123",
project_id="Project_789"
)
# Delete from all partitions (slow, if project_id unknown)
success = milvus.delete_embeddings_by_document(
collection_name="embeddings",
document_id="doc_123"
)
4. delete_embeddings_by_user_project(...) -> bool (Lines 366-423)
Purpose: Delete all embeddings for a user/project by dropping the entire partition (very fast!).
Parameters:
collection_name(str): Collection nameuser_id(str): User ID (for logging/validation)project_id(str): Project ID (partition name)data_type(str, optional): If provided, uses scalar deletion instead of partition drop
Example:
# Drop entire partition (instant deletion of all project data)
success = milvus.delete_embeddings_by_user_project(
collection_name="embeddings",
user_id="User_456",
project_id="Project_789"
)
# Delete only specific data type (e.g., only PDFs)
success = milvus.delete_embeddings_by_user_project(
collection_name="embeddings",
user_id="User_456",
project_id="Project_789",
data_type="pdf"
)
Performance:
- Partition drop: Instant (milliseconds) - entire partition deleted
- Scalar deletion: Slow (seconds) - iterates through vectors matching filter
5. get_collection_stats(...) -> Dict (Lines 425-478)
Purpose: Get statistics for a collection or specific partition.
Example:
# Get stats for specific partition
stats = milvus.get_collection_stats("embeddings", project_id="Project_789")
# {
# 'collection_name': 'embeddings',
# 'partition_name': 'Project_789',
# 'exists': True,
# 'total_embeddings': 1523
# }
# Get stats for entire collection
stats = milvus.get_collection_stats("embeddings")
# {
# 'collection_name': 'embeddings',
# 'total_embeddings': 45678,
# 'status': 'loaded',
# 'num_partitions': 147,
# 'partitions': [
# {'name': 'Project_001', 'num_entities': 234},
# {'name': 'Project_002', 'num_entities': 567},
# ...
# ]
# }
Partition Name Sanitization¶
Method: _sanitize_partition_name(...) -> str (Lines 102-119)
Purpose: Milvus partition names can only contain numbers, letters, and underscores. This method replaces hyphens with underscores.
Example:
Code:
def _sanitize_partition_name(self, partition_name: str) -> str:
# Replace hyphens with underscores
sanitized = partition_name.replace('-', '_')
if sanitized != partition_name:
logger.debug(f"Sanitized partition name: '{partition_name}' -> '{sanitized}'")
return sanitized
Singleton Pattern¶
Function: get_milvus_embeddings_service(...) -> MilvusEmbeddingsService (Lines 485-490)
_milvus_embeddings_service = None
def get_milvus_embeddings_service(host: str = None, port: str = None) -> MilvusEmbeddingsService:
"""Get or create Milvus Embeddings Service singleton"""
global _milvus_embeddings_service
if _milvus_embeddings_service is None:
_milvus_embeddings_service = MilvusEmbeddingsService(host, port)
return _milvus_embeddings_service
Usage:
from shared.database import get_milvus_embeddings_service
# First call creates the singleton
milvus = get_milvus_embeddings_service()
# Subsequent calls return the same instance
milvus2 = get_milvus_embeddings_service() # Same object as milvus
Component 3: Storage Module (storage/)¶
Azure Blob Service (azure_blob_service.py)¶
Purpose: Shared service for Azure Blob Storage operations across all microservices.
Lines of Code: 232
Methods: 10 (5 sync + 5 async variants)
Constructor¶
def __init__(self, account_name: str = None, account_key: str = None, container_name: str = None):
self.account_name = account_name or os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
self.account_key = account_key or os.getenv("AZURE_STORAGE_ACCOUNT_KEY")
self.container_name = container_name or os.getenv("AZURE_STORAGE_CONTAINER_NAME", "machineagents-data")
Key Methods¶
1. connect() -> bool (Lines 27-44)
Purpose: Connect to Azure Blob Storage and create container if it doesn't exist.
Returns: True if successful, False otherwise
Example:
from shared.storage import AzureBlobService
blob_service = AzureBlobService()
if blob_service.connect():
print("✓ Connected to Azure Blob Storage")
2. upload_blob(...) -> str (Lines 50-88)
Purpose: Upload a blob to Azure with optional content type.
Parameters:
blob_name(str): Name/path of the blob (e.g.,"users/user123/document.pdf")data(bytes): Binary data to uploadoverwrite(bool, default=True): Whether to overwrite existing blobcontent_type(str, optional): MIME type (e.g.,"application/pdf","text/plain")
Returns: URL of the uploaded blob
Example:
# Upload PDF
with open("document.pdf", "rb") as f:
pdf_data = f.read()
url = blob_service.upload_blob(
blob_name="users/user123/document.pdf",
data=pdf_data,
content_type="application/pdf"
)
# Returns: "https://machineagentstorage.blob.core.windows.net/machineagents-data/users/user123/document.pdf"
Async Variant: async_upload_blob(...)
3. async_upload_blob_stream(...) -> str (Lines 94-132)
Purpose: Upload a blob to Azure using streaming (for large files, avoids loading entire file into memory).
Parameters:
blob_name(str): Name/path of the blobfile_stream(file-like): File-like object to stream fromoverwrite(bool, default=True): Whether to overwritecontent_type(str, optional): MIME typefile_size(int, optional): File size for progress tracking
Returns: URL of the uploaded blob
Example:
# Stream large file
with open("large_video.mp4", "rb") as video_stream:
url = await blob_service.async_upload_blob_stream(
blob_name="videos/video123.mp4",
file_stream=video_stream,
content_type="video/mp4",
file_size=os.path.getsize("large_video.mp4")
)
Advantage: Does NOT load entire file into memory, suitable for multi-GB files.
4. download_blob(...) -> Optional[bytes] (Lines 134-148)
Purpose: Download a blob from Azure.
Parameters:
blob_name(str): Name/path of the blob
Returns: Binary data of the blob, or None if error
Example:
data = blob_service.download_blob("users/user123/document.pdf")
if data:
with open("downloaded.pdf", "wb") as f:
f.write(data)
Async Variant: async_download_blob(...)
5. list_blobs(...) -> List[Dict] (Lines 154-172)
Purpose: List all blobs in the container with optional prefix filter.
Parameters:
prefix(str, optional): Filter blobs by prefix (e.g.,"users/user123/")
Returns: List of blob metadata dictionaries
Example:
# List all blobs for a user
blobs = blob_service.list_blobs(prefix="users/user123/")
# [
# {
# 'name': 'users/user123/document.pdf',
# 'size': 245632,
# 'created': '2024-03-15T14:30:22Z',
# 'content_type': 'application/pdf'
# },
# ...
# ]
Async Variant: async_list_blobs(...)
6. delete_blob(...) -> bool (Lines 178-191)
Purpose: Delete a blob from Azure.
Example:
Async Variant: async_delete_blob(...)
7. get_blob_url(...) -> str (Lines 197-201)
Purpose: Generate Azure blob URL from blob name.
Example:
url = blob_service.get_blob_url("users/user123/document.pdf")
# Returns: "https://machineagentstorage.blob.core.windows.net/machineagents-data/users/user123/document.pdf"
8. blob_exists(...) -> bool (Lines 203-213)
Purpose: Check if a blob exists.
Example:
Async Variant: async_blob_exists(...)
Singleton Pattern¶
Function: get_azure_blob_service(...) -> AzureBlobService (Lines 224-230)
_azure_blob_service = None
def get_azure_blob_service(...) -> AzureBlobService:
"""Get or create Azure Blob Service singleton"""
global _azure_blob_service
if _azure_blob_service is None:
_azure_blob_service = AzureBlobService(account_name, account_key, container_name)
_azure_blob_service.connect()
return _azure_blob_service
Usage:
from shared.storage import get_azure_blob_service
# First call creates singleton and connects
blob = get_azure_blob_service()
# Subsequent calls return same instance
blob2 = get_azure_blob_service() # Same object
Usage Examples Across Services¶
Example 1: Data Crawling Service¶
from shared.logger import get_logger
from shared.database import CosmosClient, get_milvus_embeddings_service
from shared.storage import get_azure_blob_service
# Initialize logger
logger = get_logger(service_name="data-crawling-service", log_file="crawling.log")
# Connect to databases
cosmos = CosmosClient()
milvus = get_milvus_embeddings_service()
blob = get_azure_blob_service()
# Save document metadata to CosmosDB
files_collection = cosmos["files"]
doc_id = files_collection.insert_one({
"user_id": "User_123",
"project_id": "Project_456",
"filename": "document.pdf",
"url": blob_url
})
# Save embeddings to Milvus (partition-based)
embeddings_data = [...] # Generated embeddings
milvus_ids = milvus.insert_embeddings("embeddings", embeddings_data)
logger.info(f"✓ Saved document with {len(milvus_ids)} embeddings")
Example 2: Selection Chatbot Service¶
from shared.logger import get_logger
from shared.database import CosmosClient
from shared.storage import get_azure_blob_service
logger = get_logger(service_name="selection-chatbot-service", log_file="selection.log")
cosmos = CosmosClient()
blob = get_azure_blob_service()
# Save avatar selection
selection_collection = cosmos["chatbot_selections"]
selection_collection.update_one(
{"user_id": user_id, "project_id": project_id},
{"$set": {"selection_avatar": "Avatar_Emma"}},
upsert=True
)
# Upload greeting audio to Azure Blob
audio_blob_name = f"greetings/{project_id}/greeting.wav"
audio_url = blob.upload_blob(audio_blob_name, audio_data, content_type="audio/wav")
logger.info(f"✓ Greeting uploaded to {audio_url}")
Example 3: Response 3D Chatbot Service¶
from shared.logger import get_logger, truncate_for_log
from shared.database import CosmosClient, get_milvus_embeddings_service
logger = get_logger(service_name="response-3d-chatbot-service", log_file="response.log")
cosmos = CosmosClient()
milvus = get_milvus_embeddings_service()
# Log truncated user question
logger.info(f"User question: {truncate_for_log(question, max_length=100)}")
# Search Milvus for relevant context
query_embedding = encode(question) # Generate embedding
search_results = milvus.search_embeddings(
collection_name="embeddings",
query_vector=query_embedding,
user_id=user_id,
project_id=project_id,
top_k=5
)
logger.info(f"✓ Found {len(search_results)} relevant chunks")
# Save chat history to CosmosDB
history_collection = cosmos["chatbot_history"]
history_collection.insert_one({
"user_id": user_id,
"project_id": project_id,
"session_id": session_id,
"question": question,
"answer": answer,
"timestamp": datetime.utcnow()
})
Environment Variables¶
Shared .env Configuration:
The shared module has its own .env file for common environment variables:
# Milvus Configuration
MILVUS_HOST=localhost
MILVUS_PORT=19530
# Azure Blob Storage
AZURE_STORAGE_ACCOUNT_NAME=machineagentsstoragedev
AZURE_STORAGE_ACCOUNT_KEY=<secret_key>
AZURE_STORAGE_CONTAINER_NAME=machineagents-data
Loading Environment Variables:
Milvus Service (Lines 24-25):
shared_env_path = Path(__file__).resolve().parent.parent / ".env"
load_dotenv(dotenv_path=shared_env_path)
Azure Blob Service (Lines 10-11):
# NOTE: Environment variables should be set by docker-compose or system environment
# Do NOT load from .env file here as it will override docker-compose environment variables
Key Design Patterns¶
1. Singleton Pattern¶
All services use singleton pattern to ensure single instance across application:
# Logger (no singleton - creates new logger per service name)
logger = get_logger(service_name="auth-service")
# CosmosDB (manual instantiation)
cosmos = CosmosClient()
# Milvus (singleton)
milvus = get_milvus_embeddings_service()
# Azure Blob (singleton)
blob = get_azure_blob_service()
2. Sync + Async Dual API¶
Azure Blob Service provides both sync and async variants:
# Sync (blocking)
url = blob.upload_blob(blob_name, data)
# Async (non-blocking)
url = await blob.async_upload_blob(blob_name, data)
3. Graceful Degradation¶
Logger (Lines 85-87):
If file logging fails, falls back to console-only:
try:
file_handler = logging.FileHandler(log_path / log_file, encoding='utf-8')
logger.addHandler(file_handler)
except Exception as e:
logger.warning(f"Could not create file handler for {log_file}: {e}")
# Continue with console logging only
4. Partition-Based Multi-Tenancy¶
Milvus Embeddings Service implements partition-per-project for:
- Isolation: Each project's data is physically separated
- Performance: Searches limited to specific partition
- Deletion: Drop entire partition for instant cleanup
Code Quality Observations¶
✅ Good Practices¶
- Standardized Logging - Consistent format across all services
- Singleton Pattern - Prevents duplicate connections
- Type Hints - Clear parameter and return types
- Docstrings - Comprehensive documentation for all functions
- Error Handling - Try/except blocks with logging
- Partition Isolation - Excellent multi-tenancy design
⚠️ Areas for Improvement¶
1. No Configuration Validation
Issue: Environment variables are not validated on startup.
Example:
Recommendation:
self.mongo_uri = mongo_uri or os.getenv("MONGO_URI")
if not self.mongo_uri:
raise ValueError("MONGO_URI environment variable is required")
2. Silent Exception Handling
Azure Blob - Lines 37-38:
try:
self.container_client.get_container_properties()
except:
self.container_client = self.service_client.create_container(self.container_name)
Issue: Bare except: catches all exceptions, even KeyboardInterrupt.
Recommendation:
except Exception as e:
logger.warning(f"Container does not exist, creating: {e}")
self.container_client = self.service_client.create_container(self.container_name)
3. Magic Numbers
Milvus - Line 195:
Recommendation: Define as constant:
4. Milvus Sanitization Limited
Current (Line 114):
Issue: Only handles hyphens. Other invalid characters (spaces, special chars) not handled.
Recommendation:
Integration with Services¶
The following services use the shared module:
| Service | Logger | CosmosDB | Milvus | Azure Blob |
|---|---|---|---|---|
| Auth Service | ✅ | ✅ | ❌ | ❌ |
| User Service | ✅ | ✅ | ❌ | ❌ |
| Create Chatbot Service | ✅ | ✅ | ❌ | ❌ |
| Selection Chatbot Service | ✅ | ✅ | ❌ | ✅ (greetings) |
| Data Crawling Service | ✅ | ✅ | ✅ | ✅ (files) |
| Response 3D Chatbot Service | ✅ | ✅ | ✅ | ❌ |
| Response Text Chatbot Service | ✅ | ✅ | ✅ | ❌ |
| Response Voice Chatbot Service | ✅ | ✅ | ✅ | ❌ |
| Chat History Service | ✅ | ✅ | ❌ | ❌ |
| Chatbot Maintenance Service | ✅ | ✅ | ✅ | ✅ (Blob sync) |
| All Other Services | ✅ | ✅ | Varies | Varies |
Total Usage:
- Logger: 20+ services (100%)
- CosmosDB: 20+ services (100%)
- Milvus: ~10 services (RAG-enabled services)
- Azure Blob: ~5 services (file upload services)
Summary¶
Module Statistics¶
- Total Files: 7 Python files
- Total Lines: ~1,050 lines
- Components: 3 main modules (logging, database, storage)
- Classes: 5 (CosmosCollectionWrapper, CosmosClient, DatabaseManager, MilvusEmbeddingsService, AzureBlobService)
- Functions: 15+ utility and singleton functions
Key Features¶
✅ Standardized Logging - Unified format across all services
✅ CosmosDB Abstraction - Wrapper for MongoDB/CosmosDB operations
✅ Partition-Based Milvus - Multi-tenant vector storage with isolation
✅ Azure Blob Integration - File storage with streaming support
✅ Singleton Pattern - Single instances of expensive resources
✅ Sync + Async APIs - Flexible blocking/non-blocking operations
Critical Capabilities¶
🔑 Multi-Tenancy: Partition-per-project in Milvus ensures data isolation
🔑 Performance: Partition-scoped searches are significantly faster
🔑 Instant Deletion: Drop partition for instant project cleanup
🔑 Streaming Uploads: Support for large files without memory issues
🔑 Consistent Logging: Standardized format aids debugging
Code Quality¶
⚠️ Areas for Improvement:
- Add environment variable validation
- Replace bare
except:with specific exception handling - Define magic numbers as constants
- Enhance partition name sanitization
- Add comprehensive unit tests
Recommendations¶
For New Services:
- Always use
get_logger()for logging - Use
CosmosClientdirectly (avoid deprecatedDatabaseManager) - Use
get_milvus_embeddings_service()for vector operations - Use
get_azure_blob_service()for file storage - Follow partition-based architecture for multi-tenancy
For Existing Services:
- Migrate away from
DatabaseManagertoCosmosClient - Ensure all Milvus operations use partition-based approach
- Add validation for environment variables
- Replace bare exception handlers
Documentation Complete: Shared Module ✅