Data Crawling Service - Complete Developer Documentation¶
Service: Website Crawling, Content Extraction & Embedding Generation
Port: 8005
Purpose: Crawl websites, extract content, generate embeddings, store in Milvus + Azure Blob
Technology: FastAPI, Playwright, BeautifulSoup, FastEmbed (BAAI/bge-small-en-v1.5), Milvus
Code Location:/data-crawling-service/src/main.py(1150 lines, LARGEST SERVICE)
Owner: Backend Team
Last Updated: 2025-12-26
Table of Contents¶
- Service Overview
- Architecture Evolution
- Complete Endpoints
- Website Crawling System
- Content Extraction & Processing
- Text Chunking Strategy
- Embedding Generation
- NEW Architecture: Blob + Milvus + Cosmos
- Proxy System
- Security Analysis
- Performance & Optimization
- Deployment
Service Overview¶
The Data Crawling Service is the LARGEST and most complex service (1150 lines). It crawls websites, extracts content, generates embeddings using FastEmbed, and stores data across three layers: Azure Blob Storage (JSON), Milvus (vectors), and CosmosDB (metadata).
Key Responsibilities¶
✅ Website Crawling - Playwright (JS-heavy) + Requests (simple pages)
✅ URL Discovery - Extract up to 50 internal links
✅ Content Extraction - BeautifulSoup → text extraction
✅ Text Preprocessing - Remove noise, emails, cookie notices
✅ Text Chunking - 1000 chars with 200 overlap
✅ Embedding Generation - BAAI/bge-small-en-v1.5 (384 dimensions)
✅ Milvus Storage - Partition-based vector storage
✅ Azure Blob Storage - Full crawl JSON backup
✅ CosmosDB Metadata - Milvus IDs + references
✅ Background Tasks - Async processing with status tracking
Statistics¶
- Total Lines: 1150 (BIGGEST service!)
- Endpoints: 10+
- Embedding Model: BAAI/bge-small-en-v1.5 (384 dimensions, 512 max length)
- Crawling: 100 concurrent threads
- Chunk Size: 1000 characters (200 overlap)
Architecture Evolution¶
OLD Architecture (Deprecated but Still in Code)¶
❌ All-in-one - Everything in CosmosDB
❌ Large documents - Full content + embeddings in single document
❌ No vector search - Simple text search only
NEW Architecture (Current)¶
✅ Three-Layer Storage:
graph TB
subgraph "Data Crawling Service"
A[Website URL]
end
subgraph "Layer 1: Azure Blob Storage"
B[JSON File<br/>Full crawl data]
end
subgraph "Layer 2: Milvus"
C[Chunk Embeddings<br/>Per-chunk vectors]
end
subgraph "Layer 3: CosmosDB"
D[Metadata<br/>Milvus IDs + blob URL]
end
A -->|1. Crawl| B
A -->|2. Chunk + Embed| C
A -->|3. Store refs| D
style B fill:#E1F5FE
style C fill:#FFE082
style D fill:#C8E6C9
Benefits:
- Azure Blob - Cheap storage for full JSON ($0.01/GB)
- Milvus - Fast vector search with partitions
- CosmosDB - Small metadata only (cost-effective)
Complete Endpoints¶
1. POST /fetch-urls¶
Purpose: Crawl domain and discover up to 50 internal URLs
Code Location: Lines 646-691
Request:
Response:
{
"status": "success",
"domain": "https://example.com",
"proxy_ip": "123.456.789.012",
"total_urls": 42,
"urls": [
"https://example.com/about",
"https://example.com/products",
"https://example.com/contact",
...
],
"message": "Successfully extracted 42 URLs from https://example.com"
}
2. POST /extract-contents¶
Purpose: Extract content from URLs and create embeddings (runs as background task)
Code Location: Lines 696-762
Request:
POST /extract-contents
Content-Type: multipart/form-data
user_id=User-123456
project_id=User-123456_Project_1
domain=https://example.com
urls=https://example.com/page1,https://example.com/page2,...
Response (Immediate):
{
"task_id": "a1b2c3d4-e5f6-4a5b-8c9d-0e1f2a3b4c5d",
"status": "processing",
"cleaned_urls_count": 15
}
❗ IMPORTANT: This endpoint returns immediately with a task_id. The actual crawling/embedding runs in the background!
3. GET /task-status¶
Purpose: Check background task status
Code Location: Lines 787-792
Request:
Responses:
Processing:
Completed:
Failed:
4. GET /v2/get-embeddings¶
Purpose: Retrieve stored embeddings for a project
Code Location: Lines 802-854
Request:
Response:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"embeddings": [...],
"urls": ["https://example.com/page1", "https://example.com/page2"],
"timestamp": "2025-01-15T14:00:00Z",
"metadata": {
"content_length": 45231,
"url_count": 15
}
}
5. GET /v2/get-sitemap-urls¶
Purpose: Get list of crawled URLs
Code Location: Lines 856-879
6. GET /v2/fetch-secondary-data-url¶
Purpose: Fetch full crawl data from Azure Blob Storage
Code Location: Lines 881-1012
This is a KEY endpoint - retrieves complete URL-content pairs from blob storage!
Request:
Response:
{
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"url_contents": [
{
"url": "https://example.com/about",
"content": "About Us\nWe are a company that..."
},
{
"url": "https://example.com/products",
"content": "Our Products\nProduct 1: ..."
}
]
}
7. POST /v2/update-secondary-data-url¶
Purpose: Update crawled content and regenerate embeddings
Code Location: Lines 1014-1144
Request:
POST /v2/update-secondary-data-url?user_id=User-123456&project_id=User-123456_Project_1
Content-Type: application/json
[
{
"url": "https://example.com/about",
"content": "Updated About Us content..."
},
{
"url": "https://example.com/products",
"content": "Updated Products content..."
}
]
Response:
Website Crawling System¶
Multi-Layer Crawling Strategy¶
The service uses 3 crawling methods with fallbacks:
1. Requests (Fast, simple pages)
↓ (if blocked/CAPTCHA)
2. Playwright (JS rendering)
↓ (if Windows or Playwright fails)
3. Requests Fallback
Method 1: Simple Requests¶
Function: fetch_content(url) (Lines 156-172)
Code:
def fetch_content(url: str) -> Dict[str, str]:
"""Helper function to fetch content for a single URL and extract only words."""
try:
selected_proxy = random.choice(proxy_list)
response = requests.get(
url,
headers=get_headers(), # Random User-Agent
proxies=selected_proxy,
timeout=30,
verify=False # ⚠️ Skip SSL verification
)
soup = BeautifulSoup(response.text, "html.parser")
text = soup.get_text(separator=" ", strip=True)
# Extract only words (remove numbers, punctuation)
text = " ".join(re.findall(r'\b[a-zA-Z]+\b', text))
cleaned_text = preprocess_text(text)
return url, cleaned_text[:10000] # Truncate to 10K chars
except Exception as e:
logger.error(f"✗ Error fetching content for {url}: {e}")
return url, ""
Features:
- Random proxy selection
- Random User-Agent
- Extract only alphabetic words
- Truncate to 10,000 characters
- Error handling → empty string
Method 2: Playwright (JS-Heavy Sites)¶
Function: fetch_html_with_playwright1(url) (Lines 537-590)
When Used:
- CAPTCHA detected in response
- Response < 500 characters (likely blocked)
- Initial request fails
Platform Detection:
if platform.system().lower() == 'windows':
print(f"Windows detected, using requests directly for {url}")
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, fetch_html_with_requests_fallback, url)
⚠️ Windows Skip: Playwright skipped on Windows to avoid NotImplementedError
Playwright Configuration:
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
proxy=proxy_config,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--no-first-run',
'--no-zygote',
'--disable-gpu'
]
)
page = await browser.new_page()
await page.goto(url, timeout=60000, wait_until='domcontentloaded')
html = await page.content()
await browser.close()
return html
URL Discovery¶
Function: fetch_url(url) (Lines 592-644)
Extracts up to 50 internal URLs:
Code:
async def fetch_url(url):
limit = 50 # Hardcoded limit
# Fetch HTML (with Playwright fallback if needed)
html = response.text or await fetch_html_with_playwright1(url)
base_domain = urlparse(url).netloc
links = set()
normalized_links = set()
# Extract href attributes
hrefs = re.findall(r'href=["\\'](.*?)["\\'']', html, re.IGNORECASE)
# Skip file extensions
skip_exts = (
".css", ".js", ".jpg", ".jpeg", ".png", ".gif", ".svg", ".ico",
".pdf", ".zip", ".rar", ".tar", ".mp4", ".mp3", ".avi", ".mov",
".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", ".exe", ".webp",
".xml", ".php", ".wp-json"
)
for href in hrefs:
if len(links) >= limit:
break
full_url = urljoin(url, href)
parsed_url = urlparse(full_url)
path = parsed_url.path.lower()
# Skip URLs ending with file extensions
if re.search(r'\.[a-z0-9]+$', path):
continue
# Only same domain
if parsed_url.netloc == base_domain:
clean_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
if clean_url not in normalized_links:
links.add(full_url)
normalized_links.add(clean_url)
return list(links)
Features:
- Limit: 50 URLs
- Same-domain only (no external links)
- Skip file downloads
- Remove duplicates (normalized)
Content Extraction & Processing¶
Concurrent Extraction¶
Function: extract_content(urls) (Lines 185-201)
Code:
def extract_content(urls: List[str]) -> Dict[str, str]:
"""Extracts text content from the provided URLs using proxies and random User-Agent."""
logger.info(f"Extracting content from {len(urls)} URLs")
contents = {}
# 100 concurrent threads!
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(fetch_content, url) for url in urls]
for future in concurrent.futures.as_completed(futures):
url, content = future.result()
# Skip empty or error content
if not content.strip():
continue
if "this page could not be found" in content.lower():
continue
contents[url] = content
logger.info(f"✓ Extracted content from {len(contents)}/{len(urls)} URLs")
return contents
Performance:
- 100 concurrent threads - can fetch 100 URLs simultaneously
- Fast: Typically processes 50 URLs in 10-20 seconds
- Error resilient: Skips failed URLs
Text Preprocessing¶
Function: preprocess_text(text) (Lines 133-154)
Removes:
- "email protected" text
- "javascript:void(0)" links
- Cookie/privacy policy mentions
- Extra whitespace
Code:
def preprocess_text(text: str) -> str:
"""
Comprehensive text preprocessing to clean extracted content.
Removes unwanted symbols, tags, emails, repeated words, and other noise.
"""
if not text or not text.strip():
return ""
# Clean up common web artifacts
text = re.sub(r'email protected', ' ', text, flags=re.IGNORECASE)
text = re.sub(r'\bjavascript:void\(0\)\b', '', text, flags=re.IGNORECASE)
text = re.sub(r'\bcookies?\b', '', text, flags=re.IGNORECASE)
text = re.sub(r'\bprivacy policy\b', '', text, flags=re.IGNORECASE)
text = re.sub(r'\bterms of service\b', '', text, flags=re.IGNORECASE)
# Final cleanup
text = re.sub(r'\s+', ' ', text) # Collapse whitespace
text = text.strip()
return text
Text Chunking Strategy¶
Function: chunk_text(text, chunk_size=1000, overlap=200) (Lines 203-230)
Purpose: Split large text into manageable chunks for embedding generation
Algorithm:
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[Dict]:
"""Split text into chunks with metadata."""
if not text.strip():
return []
chunks = []
start = 0
text_length = len(text)
chunk_index = 0
while start < text_length:
end = min(start + chunk_size, text_length)
chunk_content = text[start:end]
chunks.append({
"chunk_index": chunk_index,
"content": chunk_content,
"start_pos": start,
"end_pos": end,
"length": len(chunk_content)
})
chunk_index += 1
start = end - overlap # Move back by overlap amount
if end == text_length:
break
return chunks
Example Chunking¶
Input Text: (3000 characters)
Output: 3 chunks
| Chunk | Start | End | Length | Content |
|---|---|---|---|---|
| 0 | 0 | 1000 | 1000 | Lorem ipsum... (first 1000 chars) |
| 1 | 800 | 1800 | 1000 | ...last 200 from chunk 0 + next 800... |
| 2 | 1600 | 2600 | 1000 | ...last 200 from chunk 1 + next 800... |
| 3 | 2400 | 3000 | 600 | ...last 200 from chunk 2 + final 400... |
Overlap Benefit: Ensures context isn't lost at chunk boundaries
Embedding Generation¶
Model: BAAI/bge-small-en-v1.5¶
Initialization (Line 53):
from fastembed.embedding import FlagEmbedding as Embedding
embedder = Embedding(
model_name="BAAI/bge-small-en-v1.5",
max_length=512
)
Model Specifications:
- Dimensions: 384 (smaller than OpenAI's 1536)
- Max Input: 512 tokens (~400 words)
- Speed: ~1000 chunks/second on CPU
- Size: ~90MB model file
- License: MIT (free for commercial use)
Generation Process¶
Code (Lines 413-427):
if clean_url_chunks:
# Generate embeddings for each chunk
chunk_texts = [chunk["content"] for chunk in clean_url_chunks]
chunk_embeddings = list(embedder.embed(chunk_texts))
# Prepare data for Milvus
for idx, (chunk, embedding) in enumerate(zip(clean_url_chunks, chunk_embeddings)):
embeddings_data.append({
'document_id': document_id, # UUID for this URL
'user_id': user_id,
'project_id': project_id,
'chunk_index': idx,
'text': chunk["content"][:2000], # Truncate to 2K
'embedding': [float(x) for x in embedding], # Convert to float list
'data_type': 'url',
'source_url': url # ✅ Track source
})
Example Embedding:
NEW Architecture: Blob + Milvus + Cosmos¶
Complete Data Flow¶
Code: sync_process_urls_content() (Lines 262-520)
This is the CORE function - 258 lines of complex logic!
Step 1: Save JSON to Azure Blob¶
Code (Lines 343-372):
# Step 1: Save crawled data as JSON to Azure Blob Storage
crawl_data = {
"domain": domain,
"urls": urls,
"contents": contents, # Full URL→content mapping
"timestamp": datetime.utcnow().isoformat(),
"proxy_info": proxy_ip
}
json_blob_path = f"{user_id}/{project_id}/crawls/{domain}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
# Async upload
json_url = await azure_blob.async_upload_blob(
blob_name=json_blob_path,
data=json.dumps(crawl_data, indent=2).encode('utf-8'),
content_type="application/json"
)
logger.info(f"✓ Uploaded crawl JSON to Blob Storage: {json_blob_path}")
Blob Path Example:
Blob Content:
{
"domain": "https://example.com",
"urls": ["https://example.com/about", "https://example.com/products"],
"contents": {
"https://example.com/about": "About Us\nWe are...",
"https://example.com/products": "Our Products\n..."
},
"timestamp": "2025-01-15T14:05:30Z",
"proxy_info": "123.456.789.012"
}
Step 2: Generate & Store Embeddings in Milvus¶
Code (Lines 404-434):
# Generate UUID for this URL document
document_id = str(uuid.uuid4()) # e.g., "a1b2c3d4-e5f6-..."
embeddings_data = []
milvus_ids = []
# Generate embeddings for each chunk
chunk_texts = [chunk["content"] for chunk in clean_url_chunks]
chunk_embeddings = list(embedder.embed(chunk_texts))
# Prepare for Milvus
for idx, (chunk, embedding) in enumerate(zip(clean_url_chunks, chunk_embeddings)):
embeddings_data.append({
'document_id': document_id,
'user_id': user_id,
'project_id': project_id,
'chunk_index': idx,
'text': chunk["content"][:2000],
'embedding': [float(x) for x in embedding],
'data_type': 'url',
'source_url': url
})
# Insert into Milvus and get IDs
milvus_ids = milvus_embeddings.insert_embeddings(
collection_name="embeddings",
embeddings_data=embeddings_data
)
logger.info(f"✓ Inserted {len(milvus_ids)} chunk embeddings into Milvus for {url}")
Milvus Storage:
- Collection:
embeddings - Partition: Based on
project_id - Returns: List of Milvus IDs for each chunk
Step 3: Store Metadata in CosmosDB¶
Code (Lines 437-464):
url_doc = {
"_id": document_id, # Same UUID as in Milvus
"user_id": user_id,
"project_id": project_id,
"filename": f"{domain}_{len(inserted_ids) + 1}",
"file_type": "url",
"base_url": domain,
"urls": [url], # Only this specific URL
"extracted_text_preview": processed_content[:500], # Preview only!
"chunks": clean_url_chunks, # Metadata (no embeddings)
"milvus_embedding_ids": milvus_ids, # ✅ CRITICAL: Reference to Milvus
"chunk_settings": {
"chunk_size": 1000,
"overlap": 200,
"source_url": url,
"chunks_in_url": len(clean_url_chunks),
"preprocessed": True
},
"total_chunks": len(clean_url_chunks),
"embedding_count": len(milvus_ids),
"timestamp": datetime.utcnow(),
"proxy_info": proxy_ip,
"json_blob_url": json_url # ✅ Reference to blob storage
}
# Insert
inserted_id = files_collection.insert_one(url_doc).inserted_id
CosmosDB Document:
{
"_id": "a1b2c3d4-e5f6-1234-5678-9abcdef12345",
"user_id": "User-123456",
"project_id": "User-123456_Project_1",
"filename": "example.com_1",
"file_type": "url",
"base_url": "https://example.com",
"urls": ["https://example.com/about"],
// Only 500 char preview (not full content!)
"extracted_text_preview": "About Us\nWe are a company...",
// Chunk metadata (lengths, positions)
"chunks": [
{"chunk_index": 0, "content": "About Us...", "start_pos": 0, "end_pos": 1000},
{"chunk_index": 1, "content": "...founded in...", "start_pos": 800, "end_pos": 1800}
],
// ✅ References to Milvus vectors
"milvus_embedding_ids": [123456, 123457],
"chunk_settings": {
"chunk_size": 1000,
"overlap": 200,
"source_url": "https://example.com/about",
"chunks_in_url": 2,
"preprocessed": true
},
"total_chunks": 2,
"embedding_count": 2,
"timestamp": ISODate("2025-01-15T14:05:35Z"),
"proxy_info": "123.456.789.012",
// ✅ Reference to full data in blob
"json_blob_url": "https://storage.blob.core.windows.net/container/User-123456/..."
}
Data Separation Benefits¶
| Data Type | Storage | Reason | Cost |
|---|---|---|---|
| Full JSON | Azure Blob | Cheap cold storage | $0.01/GB |
| Vectors (384D) | Milvus | Fast similarity search | Compute cost |
| Metadata | CosmosDB | Small + queryable | $0.25/GB |
Example Costs for 1000 URLs:
- Blob: ~10MB JSON = $0.0001
- Milvus: 2000 chunks × 384D = compute only
- Cosmos: ~100KB metadata = $0.000025
Total: Nearly free! 🎉
Proxy System¶
⚠️ HARDCODED PROXY CREDENTIALS¶
Location: Lines 122-131
proxy_list = [
{
'http': 'http://a85f75db6eab927ddc39:cb717a25699afc5b@gw.dataimpulse.com:823',
'https': 'http://a85f75db6eab927ddc39:cb717a25699afc5b@gw.dataimpulse.com:823'
},
{
'http': 'http://a85f75db6eab927ddc39:cb717a25699afc5b@gw.dataimpulse.com:824',
'https': 'http://a85f75db6eab927ddc39:cb717a25699afc5b@gw.dataimpulse.com:824'
},
]
⚠️ CRITICAL SECURITY ISSUE:
- Username:
a85f75db6eab927ddc39 - Password:
cb717a25699afc5b - Hardcoded in source code!
Random Proxy Selection¶
Code:
selected_proxy = random.choice(proxy_list)
response = requests.get(url, proxies=selected_proxy, ...)
Usage Locations:
fetch_content()- Content extractionfetch_url()- URL discoveryfetch_urls_endpoint()- Get proxy IP
Proxy IP Detection¶
Code (Lines 290-297):
try:
selected_proxy = random.choice(proxy_list)
proxy_response = requests.get(
"http://httpbin.org/ip",
proxies=selected_proxy,
timeout=100,
verify=False
)
proxy_ip = proxy_response.json().get("origin", "Unknown")
logger.info(f"✓ Using proxy IP: {proxy_ip}")
except Exception as e:
proxy_ip = f"Error fetching proxy IP: {e}"
Stores proxy IP in documents for debugging
Security Analysis¶
Critical Issues¶
1. ⚠️ HARDCODED PROXY CREDENTIALS (Lines 122-131)
Impact: Anyone with code access can use paid proxies
Fix:
PROXY_LIST_JSON = os.getenv("PROXY_LIST_JSON")
if PROXY_LIST_JSON:
proxy_list = json.loads(PROXY_LIST_JSON)
else:
proxy_list = [] # No proxies
2. ⚠️ SSL VERIFICATION DISABLED
Location: Multiple places (verify=False)
Impact: Man-in-the-middle attacks possible
Fix:
3. ⚠️ NO INPUT VALIDATION
Problem: URLs not validated before crawling
Example Attack:
Fix:
def validate_url(url):
if not url.startswith(('http://', 'https://')):
raise HTTPException(400, "Invalid URL scheme")
if 'localhost' in url or '127.0.0.1' in url:
raise HTTPException(400, "localhost not allowed")
return url
4. ⚠️ UNBOUNDED CRAWLING
Problem: No rate limiting on crawling
Impact: Could DDoS target websites with 100 concurrent requests
Fix:
from slowapi import Limiter
@app.post("/fetch-urls")
@limiter.limit("10/minute") # Max 10 domains per minute
async def fetch_urls_endpoint(...):
...
Performance & Optimization¶
Concurrent Processing¶
100 Thread Pool:
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(fetch_content, url) for url in urls]
Performance:
- 50 URLs in 10-20 seconds
- 100 URLs in 20-40 seconds
- Limited by network, not CPU
Embedding Generation Speed¶
FastEmbed Performance:
- CPU: ~1000 chunks/second
- GPU: ~10,000 chunks/second
Example:
- 50 URLs × 3 chunks avg = 150 chunks
- CPU: 0.15 seconds
- Negligible compared to crawling (20 seconds)
Memory Optimization¶
Old Architecture:
// BAD: Everything in one document
{
"urls": [...50 URLs...],
"extracted_text": {
"url1": "...10,000 chars...",
...
},
"embeddings": [
[384 floats],
[384 floats],
... // 150 chunks × 384 floats = 57,600 numbers!
]
}
// Total: ~500KB per project
New Architecture:
// GOOD: Metadata only
{
"urls": ["url1"],
"extracted_text_preview": "...500 chars...",
"chunks": [{metadata only}],
"milvus_embedding_ids": [123456, 123457], // Just IDs!
"json_blob_url": "https://..."
}
// Total: ~5KB per URL
100× smaller CosmosDB documents!
Deployment¶
Docker Configuration¶
Dockerfile:
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
wget \
gnupg \
&& rm -rf /var/lib/apt/lists/*
# Install Playwright browsers
RUN pip install playwright && playwright install chromium
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy shared modules
COPY shared/ ./shared/
# Copy source code
COPY src/ .
EXPOSE 8005
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8005"]
Requirements.txt¶
fastapi>=0.95.0
uvicorn[standard]>=0.22.0
pymongo>=4.3.3
python-multipart>=0.0.6
python-dotenv>=1.0.0
# Web scraping
requests>=2.31.0
beautifulsoup4>=4.12.0
playwright>=1.40.0
fake-useragent>=1.4.0
validators>=0.22.0
# Embedding
fastembed>=0.1.0
# Concurrent processing
concurrent-futures>=3.1.1
# Monitoring
ddtrace>=1.19.0
Environment Variables¶
# Database
MONGO_URI=mongodb://...
MONGO_DB_NAME=Machine_agent_demo
# Milvus
MILVUS_HOST=localhost
MILVUS_PORT=19530
# Azure Blob
AZURE_STORAGE_CONNECTION_STRING=...
# Proxies (should be added!)
PROXY_LIST_JSON=[{"http":"...","https":"..."}]
# DataDog
DD_SERVICE=data-crawling-service
DD_ENV=production
Related Documentation¶
- Embedding Strategy - BAAI/bge-small-en-v1.5 details
- Response 3D Chatbot Service - Uses these embeddings
- System Architecture
Recommendations¶
Critical (Security)¶
- ⚠️ Move Proxy Credentials to Environment
- ⚠️ Enable SSL Verification (
verify=True) - ⚠️ Add URL Validation (no localhost, file://, etc.)
- ⚠️ Add Rate Limiting (prevent DDoS)
Performance¶
- Cache Embeddings - Don't regenerate for same content
- Batch Milvus Inserts - Insert 100 chunks at once
- Async Blob Upload - Don't block on blob storage
Code Quality¶
- Remove Old Architecture Code - Lines 174-183 commented
- Split into Modules - 1150 lines is too big
- Add Unit Tests - Test chunking, preprocessing
- Document Shared Modules (
get_db_manager,get_milvus_embeddings_service)
Last Updated: 2025-12-26
Code Version: data-crawling-service/src/main.py (1150 lines)
Status: COMPLETE - Part 1 of Service Documentation
Review Cycle: Monthly (Critical Data Pipeline)
"Where websites become knowledge."