Skip to content

Data Crawling Service - Complete Developer Documentation

Service: Website Crawling, Content Extraction & Embedding Generation
Port: 8005
Purpose: Crawl websites, extract content, generate embeddings, store in Milvus + Azure Blob
Technology: FastAPI, Playwright, BeautifulSoup, FastEmbed (BAAI/bge-small-en-v1.5), Milvus
Code Location: /data-crawling-service/src/main.py (1150 lines, LARGEST SERVICE)
Owner: Backend Team
Last Updated: 2025-12-26


Table of Contents

  1. Service Overview
  2. Architecture Evolution
  3. Complete Endpoints
  4. Website Crawling System
  5. Content Extraction & Processing
  6. Text Chunking Strategy
  7. Embedding Generation
  8. NEW Architecture: Blob + Milvus + Cosmos
  9. Proxy System
  10. Security Analysis
  11. Performance & Optimization
  12. Deployment

Service Overview

The Data Crawling Service is the LARGEST and most complex service (1150 lines). It crawls websites, extracts content, generates embeddings using FastEmbed, and stores data across three layers: Azure Blob Storage (JSON), Milvus (vectors), and CosmosDB (metadata).

Key Responsibilities

Website Crawling - Playwright (JS-heavy) + Requests (simple pages)
URL Discovery - Extract up to 50 internal links
Content Extraction - BeautifulSoup → text extraction
Text Preprocessing - Remove noise, emails, cookie notices
Text Chunking - 1000 chars with 200 overlap
Embedding Generation - BAAI/bge-small-en-v1.5 (384 dimensions)
Milvus Storage - Partition-based vector storage
Azure Blob Storage - Full crawl JSON backup
CosmosDB Metadata - Milvus IDs + references
Background Tasks - Async processing with status tracking

Statistics

  • Total Lines: 1150 (BIGGEST service!)
  • Endpoints: 10+
  • Embedding Model: BAAI/bge-small-en-v1.5 (384 dimensions, 512 max length)
  • Crawling: 100 concurrent threads
  • Chunk Size: 1000 characters (200 overlap)

Architecture Evolution

OLD Architecture (Deprecated but Still in Code)

All-in-one - Everything in CosmosDB
Large documents - Full content + embeddings in single document
No vector search - Simple text search only


NEW Architecture (Current)

Three-Layer Storage:

graph TB
    subgraph "Data Crawling Service"
        A[Website URL]
    end

    subgraph "Layer 1: Azure Blob Storage"
        B[JSON File<br/>Full crawl data]
    end

    subgraph "Layer 2: Milvus"
        C[Chunk Embeddings<br/>Per-chunk vectors]
    end

    subgraph "Layer 3: CosmosDB"
        D[Metadata<br/>Milvus IDs + blob URL]
    end

    A -->|1. Crawl| B
    A -->|2. Chunk + Embed| C
    A -->|3. Store refs| D

    style B fill:#E1F5FE
    style C fill:#FFE082
    style D fill:#C8E6C9

Benefits:

  1. Azure Blob - Cheap storage for full JSON ($0.01/GB)
  2. Milvus - Fast vector search with partitions
  3. CosmosDB - Small metadata only (cost-effective)

Complete Endpoints

1. POST /fetch-urls

Purpose: Crawl domain and discover up to 50 internal URLs

Code Location: Lines 646-691

Request:

POST /fetch-urls
Content-Type: multipart/form-data

domain=https://example.com

Response:

{
    "status": "success",
    "domain": "https://example.com",
    "proxy_ip": "123.456.789.012",
    "total_urls": 42,
    "urls": [
        "https://example.com/about",
        "https://example.com/products",
        "https://example.com/contact",
        ...
    ],
    "message": "Successfully extracted 42 URLs from https://example.com"
}

2. POST /extract-contents

Purpose: Extract content from URLs and create embeddings (runs as background task)

Code Location: Lines 696-762

Request:

POST /extract-contents
Content-Type: multipart/form-data

user_id=User-123456
project_id=User-123456_Project_1
domain=https://example.com
urls=https://example.com/page1,https://example.com/page2,...

Response (Immediate):

{
  "task_id": "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d",
  "status": "processing",
  "cleaned_urls_count": 15
}

❗ IMPORTANT: This endpoint returns immediately with a task_id. The actual crawling/embedding runs in the background!


3. GET /task-status

Purpose: Check background task status

Code Location: Lines 787-792

Request:

GET /task-status?task_id=a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d

Responses:

Processing:

{
  "task_id": "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d",
  "status": "processing"
}

Completed:

{
  "task_id": "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d",
  "status": "completed"
}

Failed:

{
  "task_id": "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d",
  "status": "failed"
}

4. GET /v2/get-embeddings

Purpose: Retrieve stored embeddings for a project

Code Location: Lines 802-854

Request:

GET /v2/get-embeddings?user_id=User-123456&project_id=User-123456_Project_1

Response:

{
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "embeddings": [...],
    "urls": ["https://example.com/page1", "https://example.com/page2"],
    "timestamp": "2025-01-15T14:00:00Z",
    "metadata": {
        "content_length": 45231,
        "url_count": 15
    }
}

5. GET /v2/get-sitemap-urls

Purpose: Get list of crawled URLs

Code Location: Lines 856-879


6. GET /v2/fetch-secondary-data-url

Purpose: Fetch full crawl data from Azure Blob Storage

Code Location: Lines 881-1012

This is a KEY endpoint - retrieves complete URL-content pairs from blob storage!

Request:

GET /v2/fetch-secondary-data-url?user_id=User-123456&project_id=User-123456_Project_1

Response:

{
  "user_id": "User-123456",
  "project_id": "User-123456_Project_1",
  "url_contents": [
    {
      "url": "https://example.com/about",
      "content": "About Us\nWe are a company that..."
    },
    {
      "url": "https://example.com/products",
      "content": "Our Products\nProduct 1: ..."
    }
  ]
}

7. POST /v2/update-secondary-data-url

Purpose: Update crawled content and regenerate embeddings

Code Location: Lines 1014-1144

Request:

POST /v2/update-secondary-data-url?user_id=User-123456&project_id=User-123456_Project_1
Content-Type: application/json

[
    {
        "url": "https://example.com/about",
        "content": "Updated About Us content..."
    },
    {
        "url": "https://example.com/products",
        "content": "Updated Products content..."
    }
]

Response:

{
  "message": "Secondary data updated and embeddings saved successfully"
}

Website Crawling System

Multi-Layer Crawling Strategy

The service uses 3 crawling methods with fallbacks:

1. Requests (Fast, simple pages)
   ↓ (if blocked/CAPTCHA)
2. Playwright (JS rendering)
   ↓ (if Windows or Playwright fails)
3. Requests Fallback

Method 1: Simple Requests

Function: fetch_content(url) (Lines 156-172)

Code:

def fetch_content(url: str) -> Dict[str, str]:
    """Helper function to fetch content for a single URL and extract only words."""
    try:
        selected_proxy = random.choice(proxy_list)
        response = requests.get(
            url,
            headers=get_headers(),  # Random User-Agent
            proxies=selected_proxy,
            timeout=30,
            verify=False  # ⚠️ Skip SSL verification
        )
        soup = BeautifulSoup(response.text, "html.parser")
        text = soup.get_text(separator=" ", strip=True)

        # Extract only words (remove numbers, punctuation)
        text = " ".join(re.findall(r'\b[a-zA-Z]+\b', text))
        cleaned_text = preprocess_text(text)

        return url, cleaned_text[:10000]  # Truncate to 10K chars
    except Exception as e:
        logger.error(f"✗ Error fetching content for {url}: {e}")
        return url, ""

Features:

  • Random proxy selection
  • Random User-Agent
  • Extract only alphabetic words
  • Truncate to 10,000 characters
  • Error handling → empty string

Method 2: Playwright (JS-Heavy Sites)

Function: fetch_html_with_playwright1(url) (Lines 537-590)

When Used:

  • CAPTCHA detected in response
  • Response < 500 characters (likely blocked)
  • Initial request fails

Platform Detection:

if platform.system().lower() == 'windows':
    print(f"Windows detected, using requests directly for {url}")
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, fetch_html_with_requests_fallback, url)

⚠️ Windows Skip: Playwright skipped on Windows to avoid NotImplementedError

Playwright Configuration:

async with async_playwright() as p:
    browser = await p.chromium.launch(
        headless=True,
        proxy=proxy_config,
        args=[
            '--no-sandbox',
            '--disable-setuid-sandbox',
            '--disable-dev-shm-usage',
            '--disable-accelerated-2d-canvas',
            '--no-first-run',
            '--no-zygote',
            '--disable-gpu'
        ]
    )
    page = await browser.new_page()
    await page.goto(url, timeout=60000, wait_until='domcontentloaded')
    html = await page.content()
    await browser.close()
    return html

URL Discovery

Function: fetch_url(url) (Lines 592-644)

Extracts up to 50 internal URLs:

Code:

async def fetch_url(url):
    limit = 50  # Hardcoded limit

    # Fetch HTML (with Playwright fallback if needed)
    html = response.text or await fetch_html_with_playwright1(url)

    base_domain = urlparse(url).netloc
    links = set()
    normalized_links = set()

    # Extract href attributes
    hrefs = re.findall(r'href=["\\'](.*?)["\\'']', html, re.IGNORECASE)

    # Skip file extensions
    skip_exts = (
        ".css", ".js", ".jpg", ".jpeg", ".png", ".gif", ".svg", ".ico",
        ".pdf", ".zip", ".rar", ".tar", ".mp4", ".mp3", ".avi", ".mov",
        ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", ".exe", ".webp",
        ".xml", ".php", ".wp-json"
    )

    for href in hrefs:
        if len(links) >= limit:
            break

        full_url = urljoin(url, href)
        parsed_url = urlparse(full_url)
        path = parsed_url.path.lower()

        # Skip URLs ending with file extensions
        if re.search(r'\.[a-z0-9]+$', path):
            continue

        # Only same domain
        if parsed_url.netloc == base_domain:
            clean_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
            if clean_url not in normalized_links:
                links.add(full_url)
                normalized_links.add(clean_url)

    return list(links)

Features:

  • Limit: 50 URLs
  • Same-domain only (no external links)
  • Skip file downloads
  • Remove duplicates (normalized)

Content Extraction & Processing

Concurrent Extraction

Function: extract_content(urls) (Lines 185-201)

Code:

def extract_content(urls: List[str]) -> Dict[str, str]:
    """Extracts text content from the provided URLs using proxies and random User-Agent."""
    logger.info(f"Extracting content from {len(urls)} URLs")
    contents = {}

    # 100 concurrent threads!
    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
        futures = [executor.submit(fetch_content, url) for url in urls]

        for future in concurrent.futures.as_completed(futures):
            url, content = future.result()

            # Skip empty or error content
            if not content.strip():
                continue
            if "this page could not be found" in content.lower():
                continue

            contents[url] = content

    logger.info(f"✓ Extracted content from {len(contents)}/{len(urls)} URLs")
    return contents

Performance:

  • 100 concurrent threads - can fetch 100 URLs simultaneously
  • Fast: Typically processes 50 URLs in 10-20 seconds
  • Error resilient: Skips failed URLs

Text Preprocessing

Function: preprocess_text(text) (Lines 133-154)

Removes:

  • "email protected" text
  • "javascript:void(0)" links
  • Cookie/privacy policy mentions
  • Extra whitespace

Code:

def preprocess_text(text: str) -> str:
    """
    Comprehensive text preprocessing to clean extracted content.
    Removes unwanted symbols, tags, emails, repeated words, and other noise.
    """
    if not text or not text.strip():
        return ""

    # Clean up common web artifacts
    text = re.sub(r'email protected', ' ', text, flags=re.IGNORECASE)
    text = re.sub(r'\bjavascript:void\(0\)\b', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\bcookies?\b', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\bprivacy policy\b', '', text, flags=re.IGNORECASE)
    text = re.sub(r'\bterms of service\b', '', text, flags=re.IGNORECASE)

    # Final cleanup
    text = re.sub(r'\s+', ' ', text)  # Collapse whitespace
    text = text.strip()

    return text

Text Chunking Strategy

Function: chunk_text(text, chunk_size=1000, overlap=200) (Lines 203-230)

Purpose: Split large text into manageable chunks for embedding generation

Algorithm:

def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[Dict]:
    """Split text into chunks with metadata."""
    if not text.strip():
        return []

    chunks = []
    start = 0
    text_length = len(text)
    chunk_index = 0

    while start < text_length:
        end = min(start + chunk_size, text_length)
        chunk_content = text[start:end]

        chunks.append({
            "chunk_index": chunk_index,
            "content": chunk_content,
            "start_pos": start,
            "end_pos": end,
            "length": len(chunk_content)
        })

        chunk_index += 1
        start = end - overlap  # Move back by overlap amount

        if end == text_length:
            break

    return chunks

Example Chunking

Input Text: (3000 characters)

Lorem ipsum dolor sit amet... (3000 chars)

Output: 3 chunks

Chunk Start End Length Content
0 0 1000 1000 Lorem ipsum... (first 1000 chars)
1 800 1800 1000 ...last 200 from chunk 0 + next 800...
2 1600 2600 1000 ...last 200 from chunk 1 + next 800...
3 2400 3000 600 ...last 200 from chunk 2 + final 400...

Overlap Benefit: Ensures context isn't lost at chunk boundaries


Embedding Generation

Model: BAAI/bge-small-en-v1.5

Initialization (Line 53):

from fastembed.embedding import FlagEmbedding as Embedding

embedder = Embedding(
    model_name="BAAI/bge-small-en-v1.5",
    max_length=512
)

Model Specifications:

  • Dimensions: 384 (smaller than OpenAI's 1536)
  • Max Input: 512 tokens (~400 words)
  • Speed: ~1000 chunks/second on CPU
  • Size: ~90MB model file
  • License: MIT (free for commercial use)

Generation Process

Code (Lines 413-427):

if clean_url_chunks:
    # Generate embeddings for each chunk
    chunk_texts = [chunk["content"] for chunk in clean_url_chunks]
    chunk_embeddings = list(embedder.embed(chunk_texts))

    # Prepare data for Milvus
    for idx, (chunk, embedding) in enumerate(zip(clean_url_chunks, chunk_embeddings)):
        embeddings_data.append({
            'document_id': document_id,  # UUID for this URL
            'user_id': user_id,
            'project_id': project_id,
            'chunk_index': idx,
            'text': chunk["content"][:2000],  # Truncate to 2K
            'embedding': [float(x) for x in embedding],  # Convert to float list
            'data_type': 'url',
            'source_url': url  # ✅ Track source
        })

Example Embedding:

[0.0234, -0.0156, 0.0891, ..., 0.0423]  # 384 floats

NEW Architecture: Blob + Milvus + Cosmos

Complete Data Flow

Code: sync_process_urls_content() (Lines 262-520)

This is the CORE function - 258 lines of complex logic!


Step 1: Save JSON to Azure Blob

Code (Lines 343-372):

# Step 1: Save crawled data as JSON to Azure Blob Storage
crawl_data = {
    "domain": domain,
    "urls": urls,
    "contents": contents,  # Full URL→content mapping
    "timestamp": datetime.utcnow().isoformat(),
    "proxy_info": proxy_ip
}

json_blob_path = f"{user_id}/{project_id}/crawls/{domain}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"

# Async upload
json_url = await azure_blob.async_upload_blob(
    blob_name=json_blob_path,
    data=json.dumps(crawl_data, indent=2).encode('utf-8'),
    content_type="application/json"
)
logger.info(f"✓ Uploaded crawl JSON to Blob Storage: {json_blob_path}")

Blob Path Example:

User-123456/User-123456_Project_1/crawls/example.com_20250115_140530.json

Blob Content:

{
  "domain": "https://example.com",
  "urls": ["https://example.com/about", "https://example.com/products"],
  "contents": {
    "https://example.com/about": "About Us\nWe are...",
    "https://example.com/products": "Our Products\n..."
  },
  "timestamp": "2025-01-15T14:05:30Z",
  "proxy_info": "123.456.789.012"
}

Step 2: Generate & Store Embeddings in Milvus

Code (Lines 404-434):

# Generate UUID for this URL document
document_id = str(uuid.uuid4())  # e.g., "a1b2c3d4-e5f6-..."

embeddings_data = []
milvus_ids = []

# Generate embeddings for each chunk
chunk_texts = [chunk["content"] for chunk in clean_url_chunks]
chunk_embeddings = list(embedder.embed(chunk_texts))

# Prepare for Milvus
for idx, (chunk, embedding) in enumerate(zip(clean_url_chunks, chunk_embeddings)):
    embeddings_data.append({
        'document_id': document_id,
        'user_id': user_id,
        'project_id': project_id,
        'chunk_index': idx,
        'text': chunk["content"][:2000],
        'embedding': [float(x) for x in embedding],
        'data_type': 'url',
        'source_url': url
    })

# Insert into Milvus and get IDs
milvus_ids = milvus_embeddings.insert_embeddings(
    collection_name="embeddings",
    embeddings_data=embeddings_data
)
logger.info(f"✓ Inserted {len(milvus_ids)} chunk embeddings into Milvus for {url}")

Milvus Storage:

  • Collection: embeddings
  • Partition: Based on project_id
  • Returns: List of Milvus IDs for each chunk

Step 3: Store Metadata in CosmosDB

Code (Lines 437-464):

url_doc = {
    "_id": document_id,  # Same UUID as in Milvus
    "user_id": user_id,
    "project_id": project_id,
    "filename": f"{domain}_{len(inserted_ids) + 1}",
    "file_type": "url",
    "base_url": domain,
    "urls": [url],  # Only this specific URL
    "extracted_text_preview": processed_content[:500],  # Preview only!
    "chunks": clean_url_chunks,  # Metadata (no embeddings)
    "milvus_embedding_ids": milvus_ids,  # ✅ CRITICAL: Reference to Milvus
    "chunk_settings": {
        "chunk_size": 1000,
        "overlap": 200,
        "source_url": url,
        "chunks_in_url": len(clean_url_chunks),
        "preprocessed": True
    },
    "total_chunks": len(clean_url_chunks),
    "embedding_count": len(milvus_ids),
    "timestamp": datetime.utcnow(),
    "proxy_info": proxy_ip,
    "json_blob_url": json_url  # ✅ Reference to blob storage
}

# Insert
inserted_id = files_collection.insert_one(url_doc).inserted_id

CosmosDB Document:

{
    "_id": "a1b2c3d4-e5f6-1234-5678-9abcdef12345",
    "user_id": "User-123456",
    "project_id": "User-123456_Project_1",
    "filename": "example.com_1",
    "file_type": "url",
    "base_url": "https://example.com",
    "urls": ["https://example.com/about"],

    // Only 500 char preview (not full content!)
    "extracted_text_preview": "About Us\nWe are a company...",

    // Chunk metadata (lengths, positions)
    "chunks": [
        {"chunk_index": 0, "content": "About Us...", "start_pos": 0, "end_pos": 1000},
        {"chunk_index": 1, "content": "...founded in...", "start_pos": 800, "end_pos": 1800}
    ],

    // ✅ References to Milvus vectors
    "milvus_embedding_ids": [123456, 123457],

    "chunk_settings": {
        "chunk_size": 1000,
        "overlap": 200,
        "source_url": "https://example.com/about",
        "chunks_in_url": 2,
        "preprocessed": true
    },

    "total_chunks": 2,
    "embedding_count": 2,
    "timestamp": ISODate("2025-01-15T14:05:35Z"),
    "proxy_info": "123.456.789.012",

    // ✅ Reference to full data in blob
    "json_blob_url": "https://storage.blob.core.windows.net/container/User-123456/..."
}

Data Separation Benefits

Data Type Storage Reason Cost
Full JSON Azure Blob Cheap cold storage $0.01/GB
Vectors (384D) Milvus Fast similarity search Compute cost
Metadata CosmosDB Small + queryable $0.25/GB

Example Costs for 1000 URLs:

  • Blob: ~10MB JSON = $0.0001
  • Milvus: 2000 chunks × 384D = compute only
  • Cosmos: ~100KB metadata = $0.000025

Total: Nearly free! 🎉


Proxy System

⚠️ HARDCODED PROXY CREDENTIALS

Location: Lines 122-131

proxy_list = [
    {
        'http': 'http://a85f75db6eab927ddc39:cb717a25699afc5b@gw.dataimpulse.com:823',
        'https': 'http://a85f75db6eab927ddc39:cb717a25699afc5b@gw.dataimpulse.com:823'
    },
    {
        'http': 'http://a85f75db6eab927ddc39:cb717a25699afc5b@gw.dataimpulse.com:824',
        'https': 'http://a85f75db6eab927ddc39:cb717a25699afc5b@gw.dataimpulse.com:824'
    },
]

⚠️ CRITICAL SECURITY ISSUE:

  • Username: a85f75db6eab927ddc39
  • Password: cb717a25699afc5b
  • Hardcoded in source code!

Random Proxy Selection

Code:

selected_proxy = random.choice(proxy_list)
response = requests.get(url, proxies=selected_proxy, ...)

Usage Locations:

  • fetch_content() - Content extraction
  • fetch_url() - URL discovery
  • fetch_urls_endpoint() - Get proxy IP

Proxy IP Detection

Code (Lines 290-297):

try:
    selected_proxy = random.choice(proxy_list)
    proxy_response = requests.get(
        "http://httpbin.org/ip",
        proxies=selected_proxy,
        timeout=100,
        verify=False
    )
    proxy_ip = proxy_response.json().get("origin", "Unknown")
    logger.info(f"✓ Using proxy IP: {proxy_ip}")
except Exception as e:
    proxy_ip = f"Error fetching proxy IP: {e}"

Stores proxy IP in documents for debugging


Security Analysis

Critical Issues

1. ⚠️ HARDCODED PROXY CREDENTIALS (Lines 122-131)

Impact: Anyone with code access can use paid proxies

Fix:

PROXY_LIST_JSON = os.getenv("PROXY_LIST_JSON")
if PROXY_LIST_JSON:
    proxy_list = json.loads(PROXY_LIST_JSON)
else:
    proxy_list = []  # No proxies

2. ⚠️ SSL VERIFICATION DISABLED

Location: Multiple places (verify=False)

Impact: Man-in-the-middle attacks possible

Fix:

response = requests.get(url, ..., verify=True)  # Enable SSL verification

3. ⚠️ NO INPUT VALIDATION

Problem: URLs not validated before crawling

Example Attack:

urls = "file:///etc/passwd,javascript:alert(1)"

Fix:

def validate_url(url):
    if not url.startswith(('http://', 'https://')):
        raise HTTPException(400, "Invalid URL scheme")
    if 'localhost' in url or '127.0.0.1' in url:
        raise HTTPException(400, "localhost not allowed")
    return url

4. ⚠️ UNBOUNDED CRAWLING

Problem: No rate limiting on crawling

Impact: Could DDoS target websites with 100 concurrent requests

Fix:

from slowapi import Limiter

@app.post("/fetch-urls")
@limiter.limit("10/minute")  # Max 10 domains per minute
async def fetch_urls_endpoint(...):
    ...

Performance & Optimization

Concurrent Processing

100 Thread Pool:

with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
    futures = [executor.submit(fetch_content, url) for url in urls]

Performance:

  • 50 URLs in 10-20 seconds
  • 100 URLs in 20-40 seconds
  • Limited by network, not CPU

Embedding Generation Speed

FastEmbed Performance:

  • CPU: ~1000 chunks/second
  • GPU: ~10,000 chunks/second

Example:

  • 50 URLs × 3 chunks avg = 150 chunks
  • CPU: 0.15 seconds
  • Negligible compared to crawling (20 seconds)

Memory Optimization

Old Architecture:

// BAD: Everything in one document
{
    "urls": [...50 URLs...],
    "extracted_text": {
        "url1": "...10,000 chars...",
        ...
    },
    "embeddings": [
        [384 floats],
        [384 floats],
        ...  // 150 chunks × 384 floats = 57,600 numbers!
    ]
}
// Total: ~500KB per project

New Architecture:

// GOOD: Metadata only
{
    "urls": ["url1"],
    "extracted_text_preview": "...500 chars...",
    "chunks": [{metadata only}],
    "milvus_embedding_ids": [123456, 123457],  // Just IDs!
    "json_blob_url": "https://..."
}
// Total: ~5KB per URL

100× smaller CosmosDB documents!


Deployment

Docker Configuration

Dockerfile:

FROM python:3.9-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    wget \
    gnupg \
    && rm -rf /var/lib/apt/lists/*

# Install Playwright browsers
RUN pip install playwright && playwright install chromium

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy shared modules
COPY shared/ ./shared/

# Copy source code
COPY src/ .

EXPOSE 8005

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8005"]

Requirements.txt

fastapi>=0.95.0
uvicorn[standard]>=0.22.0
pymongo>=4.3.3
python-multipart>=0.0.6
python-dotenv>=1.0.0

# Web scraping
requests>=2.31.0
beautifulsoup4>=4.12.0
playwright>=1.40.0
fake-useragent>=1.4.0
validators>=0.22.0

# Embedding
fastembed>=0.1.0

# Concurrent processing
concurrent-futures>=3.1.1

# Monitoring
ddtrace>=1.19.0

Environment Variables

# Database
MONGO_URI=mongodb://...
MONGO_DB_NAME=Machine_agent_demo

# Milvus
MILVUS_HOST=localhost
MILVUS_PORT=19530

# Azure Blob
AZURE_STORAGE_CONNECTION_STRING=...

# Proxies (should be added!)
PROXY_LIST_JSON=[{"http":"...","https":"..."}]

# DataDog
DD_SERVICE=data-crawling-service
DD_ENV=production


Recommendations

Critical (Security)

  1. ⚠️ Move Proxy Credentials to Environment
  2. ⚠️ Enable SSL Verification (verify=True)
  3. ⚠️ Add URL Validation (no localhost, file://, etc.)
  4. ⚠️ Add Rate Limiting (prevent DDoS)

Performance

  1. Cache Embeddings - Don't regenerate for same content
  2. Batch Milvus Inserts - Insert 100 chunks at once
  3. Async Blob Upload - Don't block on blob storage

Code Quality

  1. Remove Old Architecture Code - Lines 174-183 commented
  2. Split into Modules - 1150 lines is too big
  3. Add Unit Tests - Test chunking, preprocessing
  4. Document Shared Modules (get_db_manager, get_milvus_embeddings_service)

Last Updated: 2025-12-26
Code Version: data-crawling-service/src/main.py (1150 lines)
Status: COMPLETE - Part 1 of Service Documentation
Review Cycle: Monthly (Critical Data Pipeline)


"Where websites become knowledge."