System Design¶
Detailed descriptions of OpenComplai's system components and their interactions.
Component Interaction Flow¶
User Request
│
▼
┌─────────────────────┐
│ API Gateway │ ← Validates request, auth, rate limit
│ (Node.js) │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Database Lookup │ ← Read from PostgreSQL cache
│ (Redis/PG) │
└──────────┬──────────┘
│
▼
Is Fast?
/ \
Yes No → Queue to workers
│ │
│ ▼
│ ┌──────────────┐
│ │ Message Queue│ → Redis Streams
│ └──────┬───────┘
│ │
│ ▼
│ ┌──────────────┐
│ │ Workers │ → Process documents
│ └──────┬───────┘
│ │
▼ ▼
Store Results (PostgreSQL)
│
▼
Return to Client
API Gateway (Node.js + Express)¶
Responsibilities: - Handle all HTTP/HTTPS requests - Route requests to appropriate handlers - Authenticate using JWT tokens - Enforce rate limiting - Validate request payloads - Return formatted JSON responses
Key Features: - Fast request routing - Middleware chain for auth, logging, error handling - Support for CORS, compression, security headers - Request/response logging - Health check endpoints
Scaling: - Stateless design allows horizontal scaling - Multiple instances behind load balancer - Session state stored in Redis
Message Queue (Redis Streams)¶
Responsibilities: - Queue long-running document processing jobs - Guarantee delivery with acknowledgment - Broadcast real-time events to clients - Provide priority queue for urgent tasks - Persist jobs for recovery
Job Types: - document.process - Extract text and metadata - document.analyze - Run AI analysis - document.export - Generate outputs (PDF, images) - batch.process - Handle multiple documents
Queue Topology:
document.process → [Job1, Job2, Job3]
document.analyze → [Job1, Job2]
document.export → [Job1]
↓
Workers pick up
and process
Worker Services (Python FastAPI)¶
Responsibilities: - Listen to Redis message queue - Process documents asynchronously - Run AI/ML models - Extract information - Write results to database - Handle errors and retries
Processing Pipeline:
1. Pull job from queue
2. Load document from DB
3. Extract text content
4. Parse structure (headings, tables, etc.)
5. Run ML models if needed
6. Extract entities and relationships
7. Format results
8. Store back in DB
9. Publish event to clients
10. Acknowledge job (remove from queue)
Worker Configuration: - Multiple worker instances for parallelism - Configurable concurrency per worker - Automatic retry with exponential backoff - Dead-letter queue for failed jobs
Data Storage (PostgreSQL)¶
Core Tables:
users¶
- id (UUID)
- email (TEXT, unique)
- password_hash (TEXT)
- created_at (TIMESTAMP)
- updated_at (TIMESTAMP)
- is_active (BOOLEAN)
documents¶
- id (UUID)
- user_id (UUID, foreign key)
- title (TEXT)
- content (TEXT, full-text indexed)
- metadata (JSONB)
- processing_status (ENUM)
- created_at (TIMESTAMP)
- updated_at (TIMESTAMP)
processing_results¶
- id (UUID)
- document_id (UUID, foreign key)
- extracted_entities (JSONB)
- extracted_metadata (JSONB)
- analysis_results (JSONB)
- processing_time_ms (INTEGER)
- created_at (TIMESTAMP)
audit_logs¶
- id (UUID)
- user_id (UUID)
- action (TEXT)
- resource_type (TEXT)
- resource_id (UUID)
- changes (JSONB)
- created_at (TIMESTAMP)
Indexing Strategy: - Primary keys on all tables - Foreign key indexes for joins - Full-text search index on documents.content - Partial indexes on frequently filtered columns - Composite indexes for common queries
Data Flow Example: Document Upload¶
1. User uploads document via API
POST /api/v1/documents
2. API Gateway:
- Validates JWT token
- Checks rate limit (100 req/min)
- Validates request schema
- Returns 202 Accepted
3. API creates database record:
- INSERT INTO documents (...)
- Sets processing_status = 'queued'
- Returns document ID
4. API enqueues job:
- XADD redis:stream:process {...}
- Includes document ID and metadata
5. Workers listen to queue:
- XREAD from redis:stream:process
- Pick up job (consumer group)
- Fetch document from DB
6. Worker processes:
- Extract text from PDF/image
- Parse structure
- Run ML models
- Insert into processing_results
7. Worker publishes event:
- XADD redis:stream:events {...}
- Event type: document.processed
8. Client listens via WebSocket:
- Receives real-time event
- Updates UI instantly
- Shows extraction results
9. API returns result on demand:
- GET /api/v1/documents/{id}
- Returns latest processing_results
Security Implementation¶
Authentication (JWT)¶
Login Request
↓
API verifies credentials
↓
Issues JWT token (5 hour expiry)
↓
Client includes in Authorization header
↓
API validates signature on each request
Authorization (RBAC)¶
Encryption¶
In Transit: HTTPS/TLS 1.3
At Rest: AES-256-GCM for sensitive fields
Database: Passwords hashed with bcrypt
Monitoring & Observability¶
Metrics Collected: - Request latency (p50, p95, p99) - Error rates by endpoint - Queue depth and processing time - Database query performance - Worker availability and throughput
Logging Strategy: - Structured JSON logs - Log level: DEBUG, INFO, WARN, ERROR - Correlation IDs for request tracking - Centralized log aggregation
Health Checks: - /health - API is running - /ready - Ready to accept traffic - Worker heartbeat to Redis - Database connection pooling
Disaster Recovery¶
Backup Strategy: - Daily PostgreSQL backups to S3 - Weekly full database dumps - 30-day retention policy - Point-in-time recovery available
Failover: - Multiple API instances (no single point of failure) - Redis persistence enabled - Database replication ready - CDN for static assets
Incident Response: - On-call rotation for critical issues - Automated alerting for anomalies - Runbooks for common issues - Post-incident reviews
Performance Targets¶
| Metric | Target |
|---|---|
| API response time (p95) | < 200ms |
| Document processing | < 5 seconds |
| Database query | < 100ms |
| API availability | 99.9% |
| Queue processing latency | < 1 second |
Related Documentation¶
- Design Decisions - Why these choices were made
- Data Model - Detailed schema documentation
- API Reference - API endpoints
- Deployment - Deployment procedures