Skip to content

Shared Module

Overview

The Shared Module is a critical shared library providing standardized infrastructure across all 20+ backend microservices. It contains common utilities for logging, database abstractions, vector embeddings, and Azure Blob Storage operations. This module ensures consistency, reduces code duplication, and provides a single source of truth for infrastructure patterns.

Path: machineagents-be/shared/
Purpose: Shared utilities and abstractions for all microservices
Components: 3 main modules (logging, database, storage)


Architecture

graph TB
    subgraph "All 20+ Microservices"
        Service1[Auth Service]
        Service2[User Service]
        Service3[Create Chatbot]
        ServiceN[... 17 more services]
    end

    subgraph "Shared Module"
        Logger[logger.py<br/>Standardized Logging]

        subgraph "database/"
            CosmosClient[cosmos_client.py<br/>CosmosDB Wrapper]
            DBManager[db_manager.py<br/>DEPRECATED Manager]
            MilvusService[milvus_embeddings_service.py<br/>Partition-Based Embeddings]
        end

        subgraph "storage/"
            BlobService[azure_blob_service.py<br/>Azure Blob Storage]
        end
    end

    Service1 --> Logger
    Service1 --> CosmosClient
    Service1 --> MilvusService
    Service1 --> BlobService

    Service2 --> Logger
    Service2 --> CosmosClient
    Service2 --> MilvusService
    Service2 --> BlobService

    Service3 --> Logger
    Service3 --> CosmosClient
    Service3 --> MilvusService
    Service3 --> BlobService

    ServiceN --> Logger
    ServiceN --> CosmosClient
    ServiceN --> MilvusService
    ServiceN --> BlobService

    CosmosClient -->|MongoDB Protocol| CosmosDB[(CosmosDB<br/>Metadata)]
    MilvusService -->|Vector API| Milvus[(Milvus<br/>Embeddings)]
    BlobService -->|REST API| Blob[Azure Blob<br/>Files]

    style Logger fill:#4CAF50,color:#fff
    style CosmosClient fill:#2196F3,color:#fff
    style MilvusService fill:#FF9800,color:#fff
    style BlobService fill:#9C27B0,color:#fff
    style DBManager fill:#757575,color:#fff

Directory Structure

shared/
├── __init__.py
├── .env                              # Shared environment configuration
├── logger.py                         # Standardized logging (108 lines)
├── database/
│   ├── __init__.py                   # Database module exports
│   ├── cosmos_client.py              # CosmosDB wrapper (68 lines)
│   ├── db_manager.py                 # DEPRECATED manager (130 lines)
│   └── milvus_embeddings_service.py  # Milvus partition service (492 lines)
└── storage/
    ├── __init__.py                   # Storage module exports
    └── azure_blob_service.py         # Azure Blob operations (232 lines)

Component 1: Standardized Logging (logger.py)

Purpose: Provides consistent logging configuration across all microservices.

Lines of Code: 108
Key Functions: 3

API Reference

1. truncate_for_log(text: str, max_length: int = 100) -> str

Purpose: Truncate text for logging to avoid excessive log sizes.

Parameters:

  • text (str): Text to truncate
  • max_length (int, default=100): Maximum length before truncation

Returns: Truncated text with indication of total length

Example:

from shared.logger import truncate_for_log

long_text = "A" * 500
result = truncate_for_log(long_text, max_length=100)
# Output: "AAAA... (500 chars total)"

Implementation (Lines 18-34):

def truncate_for_log(text: str, max_length: int = 100) -> str:
    if not text:
        return ""
    text_str = str(text)
    if len(text_str) <= max_length:
        return text_str
    return text_str[:max_length] + f"... ({len(text_str)} chars total)"

2. get_logger(...) -> logging.Logger

Purpose: Get a configured logger for a service with console and optional file output.

Parameters:

  • service_name (str): Name of the service (used as logger name)
  • log_file (str, optional): Optional log file name (defaults to None)
  • log_level (int, default=logging.INFO): Logging level
  • log_dir (str, default="."): Directory for log files

Returns: Configured logging.Logger instance

Example:

from shared.logger import get_logger

logger = get_logger(service_name="auth-service", log_file="auth.log")
logger.info("Service started successfully")
logger.error("Failed to connect to database")

Implementation (Lines 37-92):

def get_logger(
    service_name: str,
    log_file: Optional[str] = None,
    log_level: int = logging.INFO,
    log_dir: str = "."
) -> logging.Logger:
    # Use service name as logger name
    logger = logging.getLogger(service_name)

    # Only configure if not already configured (avoid duplicate handlers)
    if logger.handlers:
        return logger

    logger.setLevel(log_level)

    # Create formatter
    formatter = logging.Formatter(
        fmt="%(asctime)s | %(levelname)-8s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )

    # Console handler (stdout)
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    # File handler (if log_file specified)
    if log_file:
        try:
            # Create log directory if it doesn't exist
            log_path = Path(log_dir)
            log_path.mkdir(parents=True, exist_ok=True)

            file_handler = logging.FileHandler(log_path / log_file, encoding='utf-8')
            file_handler.setFormatter(formatter)
            logger.addHandler(file_handler)
        except Exception as e:
            # If file logging fails, just log to console
            logger.warning(f"Could not create file handler for {log_file}: {e}")

    # Prevent propagation to root logger to avoid duplicate logs
    logger.propagate = False

    return logger

Log Format:

2024-03-15 14:30:22 | INFO     | ✓ Connected to MongoDB: Machine_agent_dev
2024-03-15 14:30:23 | WARNING  | Quota limit exceeded
2024-03-15 14:30:25 | ERROR    | ✗ Failed to fetch data: Connection timeout

Key Features:

  • Prevents duplicate handlers (Lines 59-60): Checks if logger already configured
  • Console + file logging (Lines 71-84): Dual output channels
  • Prevents propagation (Line 90): Avoids duplicate logs from root logger
  • Graceful degradation (Lines 85-87): Falls back to console-only if file logging fails

3. setup_basic_logging(...) -> logging.Logger

Purpose: Quick setup for basic logging (file + console). Convenience wrapper for get_logger.

Implementation (Lines 95-107):

def setup_basic_logging(service_name: str, log_file: str = "data.log"):
    return get_logger(service_name=service_name, log_file=log_file)

Component 2: Database Module (database/)

2.1 CosmosDB Client (cosmos_client.py)

Purpose: Unified wrapper for MongoDB/CosmosDB operations.

Lines of Code: 68
Classes: 2

Class: CosmosCollectionWrapper

Purpose: Wrapper class providing unified interface for CosmosDB collections.

Methods:

Method Purpose Signature
insert_one Insert a single document (document: Dict)
find_one Find a single document (query: Dict, projection: Dict = None)
find Find multiple documents (query: Dict, projection: Dict = None, limit: int = 100)
update_one Update a single document (query: Dict, update: Dict, upsert: bool = False)
delete_many Delete multiple documents (query: Dict)
count_documents Count documents matching query (query: Dict)

Example Usage:

from shared.database import CosmosClient

# Initialize client
cosmos = CosmosClient()

# Get collection wrapper
users = cosmos["users_multichatbot_v2"]

# Insert
users.insert_one({"email": "user@example.com", "name": "John Doe"})

# Find
user = users.find_one({"email": "user@example.com"})

# Update
users.update_one(
    {"email": "user@example.com"},
    {"$set": {"name": "Jane Doe"}},
    upsert=True
)

# Count
count = users.count_documents({"subscription_plan": "enterprise"})

Class: CosmosClient

Purpose: Wrapper for MongoDB/CosmosDB client with connection management.

Constructor (Lines 50-55):

def __init__(self, mongo_uri: str = None, db_name: str = None):
    self.mongo_uri = mongo_uri or os.getenv("MONGO_URI")
    self.db_name = db_name or os.getenv("MONGO_DB_NAME", "Machine_agent_dev")
    self.client = MongoClient(self.mongo_uri)
    self.db = self.client[self.db_name]
    logger.info(f"Connected to CosmosDB: {self.db_name}")

Methods:

  1. get_collection(name: str) -> CosmosCollectionWrapper (Lines 57-59)
  2. Get a collection wrapper by name
  3. __getitem__(name: str) -> CosmosCollectionWrapper (Lines 61-63)
  4. Allow dict-like access: cosmos["collection_name"]
  5. list_collection_names() -> list (Lines 65-67)
  6. List all collections in the database

2.2 Database Manager (db_manager.py) [DEPRECATED]

[!WARNING] > DEPRECATED: This class is kept for backward compatibility with existing endpoints. New code should use CosmosDB directly via pymongo.MongoClient.

Purpose: Provides database abstraction layer (now just routes to CosmosDB).

Lines of Code: 130
Status: ⚠️ DEPRECATED

Original Purpose (from docstring):

In the old architecture:

  • Milvus stored both data AND embeddings
  • Database selection logic (Milvus vs CosmosDB)

In the new architecture:

  • CosmosDB stores all metadata and data
  • Milvus stores only vector embeddings (via MilvusEmbeddingsService)
  • Azure Blob Storage stores files

Class Methods:

  1. get_collection(...) (Lines 40-59)
  2. Returns CosmosDB collection (ignores all parameters for backward compatibility)
  3. get_collection_with_fallback(...) (Lines 61-87)
  4. Returns (collection, data, "cosmosdb") (fallback logic removed)
  5. find_with_fallback(...) (Lines 89-117)
  6. Returns (documents, "cosmosdb") (fallback logic removed)

Singleton Pattern (Lines 120-129):

_db_manager = None

def get_db_manager() -> DatabaseManager:
    """Get global database manager instance (singleton)"""
    global _db_manager
    if _db_manager is None:
        _db_manager = DatabaseManager()
    return _db_manager

⚠️ Migration Path:

Instead of:

from shared.database import get_db_manager
db_manager = get_db_manager()
collection = db_manager.get_collection("users_multichatbot_v2")

Use:

from shared.database import CosmosClient
cosmos = CosmosClient()
collection = cosmos["users_multichatbot_v2"]

2.3 Milvus Embeddings Service (milvus_embeddings_service.py)

Purpose: Partition-based vector embeddings storage in Milvus for multi-tenancy.

Lines of Code: 492
Architecture: Each chatbot (project_id) gets its own partition for optimal performance and isolation

[!IMPORTANT] > Partition-Based Architecture: This service implements a partition-per-project strategy where each project_id becomes a separate partition in Milvus. This provides:

  • Isolation: Data from different projects is physically separated
  • Performance: Searches are scoped to a single partition (faster)
  • Deletion: Entire project can be deleted by dropping partition (instant)

Schema Definition

Collection Fields (Lines 72-83):

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=100),
    FieldSchema(name="user_id", dtype=DataType.VARCHAR, max_length=100),
    FieldSchema(name="project_id", dtype=DataType.VARCHAR, max_length=100),
    FieldSchema(name="chunk_index", dtype=DataType.INT32),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2000),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384),  # BAAI/bge-small-en-v1.5
    FieldSchema(name="data_type", dtype=DataType.VARCHAR, max_length=50),  # pdf, text, qa, org, url
    FieldSchema(name="source_url", dtype=DataType.VARCHAR, max_length=500),
    FieldSchema(name="created_at", dtype=DataType.VARCHAR, max_length=100),
]

Vector Index (Lines 89-94):

index_params = {
    "metric_type": "L2",        # Euclidean distance
    "index_type": "IVF_FLAT",   # Inverted file index
    "params": {"nlist": 128}    # 128 clusters
}

Key Methods

1. insert_embeddings(...) -> List[int] (Lines 146-236)

Purpose: Insert embeddings into Milvus partition (partition-based architecture).

Parameters:

  • collection_name (str): Name of the collection (e.g., 'embeddings')
  • embeddings_data (List[Dict]): List of embeddings with fields:
  • document_id (str): Document identifier
  • user_id (str): User identifier
  • project_id (str): Project identifier (used as partition name)
  • chunk_index (int): Chunk position in document
  • text (str): Text content (max 2000 chars)
  • embedding (List[float]): 384-dim vector
  • data_type (str): Type (pdf, text, qa, org, url)
  • source_url (str, optional): URL for website crawl data

Returns: List of Milvus IDs for the inserted embeddings

Validation (Lines 182-184):

# Verify all embeddings have the same project_id
if not all(e.get('project_id') == project_id for e in embeddings_data):
    raise ValueError("All embeddings in a batch must have the same project_id")

Example:

from shared.database import get_milvus_embeddings_service

milvus = get_milvus_embeddings_service()

embeddings_data = [
    {
        "document_id": "doc_123",
        "user_id": "User_456",
        "project_id": "Project_789",
        "chunk_index": 0,
        "text": "This is the first chunk of the document...",
        "embedding": [0.1, 0.2, ..., 0.384],  # 384-dim vector
        "data_type": "pdf",
        "source_url": ""
    },
    # ... more chunks
]

milvus_ids = milvus.insert_embeddings("embeddings", embeddings_data)
# Returns: [449834597376389120, 449834597376389121, ...]

Process Flow:

1. Get collection
2. Extract project_id from first embedding
3. Validate all embeddings have same project_id
4. Sanitize partition name (replace "-" with "_")
5. Get or create partition
6. Prepare data arrays
7. Insert into partition
8. Flush to persist
9. Reload partition for immediate querying
10. Return Milvus IDs

2. search_embeddings(...) -> List[Dict] (Lines 238-316)

Purpose: Search for similar embeddings within a specific partition.

Parameters:

  • collection_name (str): Collection name
  • query_vector (List[float]): Query embedding vector (384-dim)
  • user_id (str): User ID (for logging/validation)
  • project_id (str): Project ID (used as partition name)
  • top_k (int, default=5): Number of results to return
  • milvus_ids (List[int], optional): Specific Milvus IDs to search within

Returns: List of search results with {milvus_id, document_id, chunk_index, text, data_type, distance, score}

Example:

query_vector = [0.1, 0.2, ..., 0.384]  # 384-dim vector from encoder

results = milvus.search_embeddings(
    collection_name="embeddings",
    query_vector=query_vector,
    user_id="User_456",
    project_id="Project_789",
    top_k=5
)

# Results:
# [
#     {
#         'milvus_id': 449834597376389120,
#         'document_id': 'doc_123',
#         'chunk_index': 5,
#         'text': 'Relevant text content...',
#         'data_type': 'pdf',
#         'distance': 0.234,
#         'score': 0.810  # 1 / (1 + distance)
#     },
#     ...
# ]

Search Parameters (Line 286):

search_params = {"metric_type": "L2", "params": {"nprobe": 10}}

Search Scoping (Line 294):

partition_names=[partition_name],  # Search only in this partition!

Score Calculation (Line 308):

'score': 1 / (1 + hit.distance)  # Convert L2 distance to similarity score

3. delete_embeddings_by_document(...) -> bool (Lines 318-364)

Purpose: Delete all embeddings for a specific document within a partition.

Parameters:

  • collection_name (str): Collection name
  • document_id (str): Document ID to delete
  • project_id (str, optional): Project ID (partition name). If not provided, searches all partitions (slower)

Example:

# Delete specific document from specific partition (fast)
success = milvus.delete_embeddings_by_document(
    collection_name="embeddings",
    document_id="doc_123",
    project_id="Project_789"
)

# Delete from all partitions (slow, if project_id unknown)
success = milvus.delete_embeddings_by_document(
    collection_name="embeddings",
    document_id="doc_123"
)

4. delete_embeddings_by_user_project(...) -> bool (Lines 366-423)

Purpose: Delete all embeddings for a user/project by dropping the entire partition (very fast!).

Parameters:

  • collection_name (str): Collection name
  • user_id (str): User ID (for logging/validation)
  • project_id (str): Project ID (partition name)
  • data_type (str, optional): If provided, uses scalar deletion instead of partition drop

Example:

# Drop entire partition (instant deletion of all project data)
success = milvus.delete_embeddings_by_user_project(
    collection_name="embeddings",
    user_id="User_456",
    project_id="Project_789"
)

# Delete only specific data type (e.g., only PDFs)
success = milvus.delete_embeddings_by_user_project(
    collection_name="embeddings",
    user_id="User_456",
    project_id="Project_789",
    data_type="pdf"
)

Performance:

  • Partition drop: Instant (milliseconds) - entire partition deleted
  • Scalar deletion: Slow (seconds) - iterates through vectors matching filter

5. get_collection_stats(...) -> Dict (Lines 425-478)

Purpose: Get statistics for a collection or specific partition.

Example:

# Get stats for specific partition
stats = milvus.get_collection_stats("embeddings", project_id="Project_789")
# {
#     'collection_name': 'embeddings',
#     'partition_name': 'Project_789',
#     'exists': True,
#     'total_embeddings': 1523
# }

# Get stats for entire collection
stats = milvus.get_collection_stats("embeddings")
# {
#     'collection_name': 'embeddings',
#     'total_embeddings': 45678,
#     'status': 'loaded',
#     'num_partitions': 147,
#     'partitions': [
#         {'name': 'Project_001', 'num_entities': 234},
#         {'name': 'Project_002', 'num_entities': 567},
#         ...
#     ]
# }

Partition Name Sanitization

Method: _sanitize_partition_name(...) -> str (Lines 102-119)

Purpose: Milvus partition names can only contain numbers, letters, and underscores. This method replaces hyphens with underscores.

Example:

# Input:  "User-978560_Project_4"
# Output: "User_978560_Project_4"

Code:

def _sanitize_partition_name(self, partition_name: str) -> str:
    # Replace hyphens with underscores
    sanitized = partition_name.replace('-', '_')

    if sanitized != partition_name:
        logger.debug(f"Sanitized partition name: '{partition_name}' -> '{sanitized}'")

    return sanitized

Singleton Pattern

Function: get_milvus_embeddings_service(...) -> MilvusEmbeddingsService (Lines 485-490)

_milvus_embeddings_service = None

def get_milvus_embeddings_service(host: str = None, port: str = None) -> MilvusEmbeddingsService:
    """Get or create Milvus Embeddings Service singleton"""
    global _milvus_embeddings_service
    if _milvus_embeddings_service is None:
        _milvus_embeddings_service = MilvusEmbeddingsService(host, port)
    return _milvus_embeddings_service

Usage:

from shared.database import get_milvus_embeddings_service

# First call creates the singleton
milvus = get_milvus_embeddings_service()

# Subsequent calls return the same instance
milvus2 = get_milvus_embeddings_service()  # Same object as milvus

Component 3: Storage Module (storage/)

Azure Blob Service (azure_blob_service.py)

Purpose: Shared service for Azure Blob Storage operations across all microservices.

Lines of Code: 232
Methods: 10 (5 sync + 5 async variants)

Constructor

def __init__(self, account_name: str = None, account_key: str = None, container_name: str = None):
    self.account_name = account_name or os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
    self.account_key = account_key or os.getenv("AZURE_STORAGE_ACCOUNT_KEY")
    self.container_name = container_name or os.getenv("AZURE_STORAGE_CONTAINER_NAME", "machineagents-data")

Key Methods

1. connect() -> bool (Lines 27-44)

Purpose: Connect to Azure Blob Storage and create container if it doesn't exist.

Returns: True if successful, False otherwise

Example:

from shared.storage import AzureBlobService

blob_service = AzureBlobService()
if blob_service.connect():
    print("✓ Connected to Azure Blob Storage")

2. upload_blob(...) -> str (Lines 50-88)

Purpose: Upload a blob to Azure with optional content type.

Parameters:

  • blob_name (str): Name/path of the blob (e.g., "users/user123/document.pdf")
  • data (bytes): Binary data to upload
  • overwrite (bool, default=True): Whether to overwrite existing blob
  • content_type (str, optional): MIME type (e.g., "application/pdf", "text/plain")

Returns: URL of the uploaded blob

Example:

# Upload PDF
with open("document.pdf", "rb") as f:
    pdf_data = f.read()

url = blob_service.upload_blob(
    blob_name="users/user123/document.pdf",
    data=pdf_data,
    content_type="application/pdf"
)
# Returns: "https://machineagentstorage.blob.core.windows.net/machineagents-data/users/user123/document.pdf"

Async Variant: async_upload_blob(...)


3. async_upload_blob_stream(...) -> str (Lines 94-132)

Purpose: Upload a blob to Azure using streaming (for large files, avoids loading entire file into memory).

Parameters:

  • blob_name (str): Name/path of the blob
  • file_stream (file-like): File-like object to stream from
  • overwrite (bool, default=True): Whether to overwrite
  • content_type (str, optional): MIME type
  • file_size (int, optional): File size for progress tracking

Returns: URL of the uploaded blob

Example:

# Stream large file
with open("large_video.mp4", "rb") as video_stream:
    url = await blob_service.async_upload_blob_stream(
        blob_name="videos/video123.mp4",
        file_stream=video_stream,
        content_type="video/mp4",
        file_size=os.path.getsize("large_video.mp4")
    )

Advantage: Does NOT load entire file into memory, suitable for multi-GB files.


4. download_blob(...) -> Optional[bytes] (Lines 134-148)

Purpose: Download a blob from Azure.

Parameters:

  • blob_name (str): Name/path of the blob

Returns: Binary data of the blob, or None if error

Example:

data = blob_service.download_blob("users/user123/document.pdf")
if data:
    with open("downloaded.pdf", "wb") as f:
        f.write(data)

Async Variant: async_download_blob(...)


5. list_blobs(...) -> List[Dict] (Lines 154-172)

Purpose: List all blobs in the container with optional prefix filter.

Parameters:

  • prefix (str, optional): Filter blobs by prefix (e.g., "users/user123/")

Returns: List of blob metadata dictionaries

Example:

# List all blobs for a user
blobs = blob_service.list_blobs(prefix="users/user123/")
# [
#     {
#         'name': 'users/user123/document.pdf',
#         'size': 245632,
#         'created': '2024-03-15T14:30:22Z',
#         'content_type': 'application/pdf'
#     },
#     ...
# ]

Async Variant: async_list_blobs(...)


6. delete_blob(...) -> bool (Lines 178-191)

Purpose: Delete a blob from Azure.

Example:

success = blob_service.delete_blob("users/user123/document.pdf")

Async Variant: async_delete_blob(...)


7. get_blob_url(...) -> str (Lines 197-201)

Purpose: Generate Azure blob URL from blob name.

Example:

url = blob_service.get_blob_url("users/user123/document.pdf")
# Returns: "https://machineagentstorage.blob.core.windows.net/machineagents-data/users/user123/document.pdf"

8. blob_exists(...) -> bool (Lines 203-213)

Purpose: Check if a blob exists.

Example:

if blob_service.blob_exists("users/user123/document.pdf"):
    print("Blob exists")

Async Variant: async_blob_exists(...)


Singleton Pattern

Function: get_azure_blob_service(...) -> AzureBlobService (Lines 224-230)

_azure_blob_service = None

def get_azure_blob_service(...) -> AzureBlobService:
    """Get or create Azure Blob Service singleton"""
    global _azure_blob_service
    if _azure_blob_service is None:
        _azure_blob_service = AzureBlobService(account_name, account_key, container_name)
        _azure_blob_service.connect()
    return _azure_blob_service

Usage:

from shared.storage import get_azure_blob_service

# First call creates singleton and connects
blob = get_azure_blob_service()

# Subsequent calls return same instance
blob2 = get_azure_blob_service()  # Same object

Usage Examples Across Services

Example 1: Data Crawling Service

from shared.logger import get_logger
from shared.database import CosmosClient, get_milvus_embeddings_service
from shared.storage import get_azure_blob_service

# Initialize logger
logger = get_logger(service_name="data-crawling-service", log_file="crawling.log")

# Connect to databases
cosmos = CosmosClient()
milvus = get_milvus_embeddings_service()
blob = get_azure_blob_service()

# Save document metadata to CosmosDB
files_collection = cosmos["files"]
doc_id = files_collection.insert_one({
    "user_id": "User_123",
    "project_id": "Project_456",
    "filename": "document.pdf",
    "url": blob_url
})

# Save embeddings to Milvus (partition-based)
embeddings_data = [...]  # Generated embeddings
milvus_ids = milvus.insert_embeddings("embeddings", embeddings_data)

logger.info(f"✓ Saved document with {len(milvus_ids)} embeddings")

Example 2: Selection Chatbot Service

from shared.logger import get_logger
from shared.database import CosmosClient
from shared.storage import get_azure_blob_service

logger = get_logger(service_name="selection-chatbot-service", log_file="selection.log")
cosmos = CosmosClient()
blob = get_azure_blob_service()

# Save avatar selection
selection_collection = cosmos["chatbot_selections"]
selection_collection.update_one(
    {"user_id": user_id, "project_id": project_id},
    {"$set": {"selection_avatar": "Avatar_Emma"}},
    upsert=True
)

# Upload greeting audio to Azure Blob
audio_blob_name = f"greetings/{project_id}/greeting.wav"
audio_url = blob.upload_blob(audio_blob_name, audio_data, content_type="audio/wav")

logger.info(f"✓ Greeting uploaded to {audio_url}")

Example 3: Response 3D Chatbot Service

from shared.logger import get_logger, truncate_for_log
from shared.database import CosmosClient, get_milvus_embeddings_service

logger = get_logger(service_name="response-3d-chatbot-service", log_file="response.log")
cosmos = CosmosClient()
milvus = get_milvus_embeddings_service()

# Log truncated user question
logger.info(f"User question: {truncate_for_log(question, max_length=100)}")

# Search Milvus for relevant context
query_embedding = encode(question)  # Generate embedding
search_results = milvus.search_embeddings(
    collection_name="embeddings",
    query_vector=query_embedding,
    user_id=user_id,
    project_id=project_id,
    top_k=5
)

logger.info(f"✓ Found {len(search_results)} relevant chunks")

# Save chat history to CosmosDB
history_collection = cosmos["chatbot_history"]
history_collection.insert_one({
    "user_id": user_id,
    "project_id": project_id,
    "session_id": session_id,
    "question": question,
    "answer": answer,
    "timestamp": datetime.utcnow()
})

Environment Variables

Shared .env Configuration:

The shared module has its own .env file for common environment variables:

# Milvus Configuration
MILVUS_HOST=localhost
MILVUS_PORT=19530

# Azure Blob Storage
AZURE_STORAGE_ACCOUNT_NAME=machineagentsstoragedev
AZURE_STORAGE_ACCOUNT_KEY=<secret_key>
AZURE_STORAGE_CONTAINER_NAME=machineagents-data

Loading Environment Variables:

Milvus Service (Lines 24-25):

shared_env_path = Path(__file__).resolve().parent.parent / ".env"
load_dotenv(dotenv_path=shared_env_path)

Azure Blob Service (Lines 10-11):

# NOTE: Environment variables should be set by docker-compose or system environment
# Do NOT load from .env file here as it will override docker-compose environment variables

Key Design Patterns

1. Singleton Pattern

All services use singleton pattern to ensure single instance across application:

# Logger (no singleton - creates new logger per service name)
logger = get_logger(service_name="auth-service")

# CosmosDB (manual instantiation)
cosmos = CosmosClient()

# Milvus (singleton)
milvus = get_milvus_embeddings_service()

# Azure Blob (singleton)
blob = get_azure_blob_service()

2. Sync + Async Dual API

Azure Blob Service provides both sync and async variants:

# Sync (blocking)
url = blob.upload_blob(blob_name, data)

# Async (non-blocking)
url = await blob.async_upload_blob(blob_name, data)

3. Graceful Degradation

Logger (Lines 85-87):

If file logging fails, falls back to console-only:

try:
    file_handler = logging.FileHandler(log_path / log_file, encoding='utf-8')
    logger.addHandler(file_handler)
except Exception as e:
    logger.warning(f"Could not create file handler for {log_file}: {e}")
    # Continue with console logging only

4. Partition-Based Multi-Tenancy

Milvus Embeddings Service implements partition-per-project for:

  • Isolation: Each project's data is physically separated
  • Performance: Searches limited to specific partition
  • Deletion: Drop entire partition for instant cleanup

Code Quality Observations

✅ Good Practices

  1. Standardized Logging - Consistent format across all services
  2. Singleton Pattern - Prevents duplicate connections
  3. Type Hints - Clear parameter and return types
  4. Docstrings - Comprehensive documentation for all functions
  5. Error Handling - Try/except blocks with logging
  6. Partition Isolation - Excellent multi-tenancy design

⚠️ Areas for Improvement

1. No Configuration Validation

Issue: Environment variables are not validated on startup.

Example:

self.mongo_uri = mongo_uri or os.getenv("MONGO_URI")  # Could be None!

Recommendation:

self.mongo_uri = mongo_uri or os.getenv("MONGO_URI")
if not self.mongo_uri:
    raise ValueError("MONGO_URI environment variable is required")

2. Silent Exception Handling

Azure Blob - Lines 37-38:

try:
    self.container_client.get_container_properties()
except:
    self.container_client = self.service_client.create_container(self.container_name)

Issue: Bare except: catches all exceptions, even KeyboardInterrupt.

Recommendation:

except Exception as e:
    logger.warning(f"Container does not exist, creating: {e}")
    self.container_client = self.service_client.create_container(self.container_name)

3. Magic Numbers

Milvus - Line 195:

texts = [e['text'][:2000] for e in embeddings_data]  # Truncate to max length

Recommendation: Define as constant:

MAX_TEXT_LENGTH = 2000
texts = [e['text'][:MAX_TEXT_LENGTH] for e in embeddings_data]

4. Milvus Sanitization Limited

Current (Line 114):

sanitized = partition_name.replace('-', '_')

Issue: Only handles hyphens. Other invalid characters (spaces, special chars) not handled.

Recommendation:

import re
sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', partition_name)

Integration with Services

The following services use the shared module:

Service Logger CosmosDB Milvus Azure Blob
Auth Service
User Service
Create Chatbot Service
Selection Chatbot Service ✅ (greetings)
Data Crawling Service ✅ (files)
Response 3D Chatbot Service
Response Text Chatbot Service
Response Voice Chatbot Service
Chat History Service
Chatbot Maintenance Service ✅ (Blob sync)
All Other Services Varies Varies

Total Usage:

  • Logger: 20+ services (100%)
  • CosmosDB: 20+ services (100%)
  • Milvus: ~10 services (RAG-enabled services)
  • Azure Blob: ~5 services (file upload services)

Summary

Module Statistics

  • Total Files: 7 Python files
  • Total Lines: ~1,050 lines
  • Components: 3 main modules (logging, database, storage)
  • Classes: 5 (CosmosCollectionWrapper, CosmosClient, DatabaseManager, MilvusEmbeddingsService, AzureBlobService)
  • Functions: 15+ utility and singleton functions

Key Features

Standardized Logging - Unified format across all services
CosmosDB Abstraction - Wrapper for MongoDB/CosmosDB operations
Partition-Based Milvus - Multi-tenant vector storage with isolation
Azure Blob Integration - File storage with streaming support
Singleton Pattern - Single instances of expensive resources
Sync + Async APIs - Flexible blocking/non-blocking operations

Critical Capabilities

🔑 Multi-Tenancy: Partition-per-project in Milvus ensures data isolation
🔑 Performance: Partition-scoped searches are significantly faster
🔑 Instant Deletion: Drop partition for instant project cleanup
🔑 Streaming Uploads: Support for large files without memory issues
🔑 Consistent Logging: Standardized format aids debugging

Code Quality

⚠️ Areas for Improvement:

  1. Add environment variable validation
  2. Replace bare except: with specific exception handling
  3. Define magic numbers as constants
  4. Enhance partition name sanitization
  5. Add comprehensive unit tests

Recommendations

For New Services:

  1. Always use get_logger() for logging
  2. Use CosmosClient directly (avoid deprecated DatabaseManager)
  3. Use get_milvus_embeddings_service() for vector operations
  4. Use get_azure_blob_service() for file storage
  5. Follow partition-based architecture for multi-tenancy

For Existing Services:

  1. Migrate away from DatabaseManager to CosmosClient
  2. Ensure all Milvus operations use partition-based approach
  3. Add validation for environment variables
  4. Replace bare exception handlers

Documentation Complete: Shared Module