Skip to content

System Design

Detailed descriptions of OpenComplai's system components and their interactions.

Component Interaction Flow

User Request
┌─────────────────────┐
│   API Gateway       │ ← Validates request, auth, rate limit
│   (Node.js)         │
└──────────┬──────────┘
┌─────────────────────┐
│   Database Lookup   │ ← Read from PostgreSQL cache
│   (Redis/PG)        │
└──────────┬──────────┘
      Is Fast?
       /     \
     Yes    No → Queue to workers
      │         │
      │         ▼
      │    ┌──────────────┐
      │    │ Message Queue│ → Redis Streams
      │    └──────┬───────┘
      │           │
      │           ▼
      │    ┌──────────────┐
      │    │   Workers    │ → Process documents
      │    └──────┬───────┘
      │           │
      ▼           ▼
   Store Results (PostgreSQL)
    Return to Client

API Gateway (Node.js + Express)

Responsibilities: - Handle all HTTP/HTTPS requests - Route requests to appropriate handlers - Authenticate using JWT tokens - Enforce rate limiting - Validate request payloads - Return formatted JSON responses

Key Features: - Fast request routing - Middleware chain for auth, logging, error handling - Support for CORS, compression, security headers - Request/response logging - Health check endpoints

Scaling: - Stateless design allows horizontal scaling - Multiple instances behind load balancer - Session state stored in Redis

Message Queue (Redis Streams)

Responsibilities: - Queue long-running document processing jobs - Guarantee delivery with acknowledgment - Broadcast real-time events to clients - Provide priority queue for urgent tasks - Persist jobs for recovery

Job Types: - document.process - Extract text and metadata - document.analyze - Run AI analysis - document.export - Generate outputs (PDF, images) - batch.process - Handle multiple documents

Queue Topology:

document.process → [Job1, Job2, Job3]
document.analyze → [Job1, Job2]
document.export  → [Job1]
              Workers pick up
              and process

Worker Services (Python FastAPI)

Responsibilities: - Listen to Redis message queue - Process documents asynchronously - Run AI/ML models - Extract information - Write results to database - Handle errors and retries

Processing Pipeline:

1. Pull job from queue
2. Load document from DB
3. Extract text content
4. Parse structure (headings, tables, etc.)
5. Run ML models if needed
6. Extract entities and relationships
7. Format results
8. Store back in DB
9. Publish event to clients
10. Acknowledge job (remove from queue)

Worker Configuration: - Multiple worker instances for parallelism - Configurable concurrency per worker - Automatic retry with exponential backoff - Dead-letter queue for failed jobs

Data Storage (PostgreSQL)

Core Tables:

users

- id (UUID)
- email (TEXT, unique)
- password_hash (TEXT)
- created_at (TIMESTAMP)
- updated_at (TIMESTAMP)
- is_active (BOOLEAN)

documents

- id (UUID)
- user_id (UUID, foreign key)
- title (TEXT)
- content (TEXT, full-text indexed)
- metadata (JSONB)
- processing_status (ENUM)
- created_at (TIMESTAMP)
- updated_at (TIMESTAMP)

processing_results

- id (UUID)
- document_id (UUID, foreign key)
- extracted_entities (JSONB)
- extracted_metadata (JSONB)
- analysis_results (JSONB)
- processing_time_ms (INTEGER)
- created_at (TIMESTAMP)

audit_logs

- id (UUID)
- user_id (UUID)
- action (TEXT)
- resource_type (TEXT)
- resource_id (UUID)
- changes (JSONB)
- created_at (TIMESTAMP)

Indexing Strategy: - Primary keys on all tables - Foreign key indexes for joins - Full-text search index on documents.content - Partial indexes on frequently filtered columns - Composite indexes for common queries

Data Flow Example: Document Upload

1. User uploads document via API
   POST /api/v1/documents

2. API Gateway:
   - Validates JWT token
   - Checks rate limit (100 req/min)
   - Validates request schema
   - Returns 202 Accepted

3. API creates database record:
   - INSERT INTO documents (...)
   - Sets processing_status = 'queued'
   - Returns document ID

4. API enqueues job:
   - XADD redis:stream:process {...}
   - Includes document ID and metadata

5. Workers listen to queue:
   - XREAD from redis:stream:process
   - Pick up job (consumer group)
   - Fetch document from DB

6. Worker processes:
   - Extract text from PDF/image
   - Parse structure
   - Run ML models
   - Insert into processing_results

7. Worker publishes event:
   - XADD redis:stream:events {...}
   - Event type: document.processed

8. Client listens via WebSocket:
   - Receives real-time event
   - Updates UI instantly
   - Shows extraction results

9. API returns result on demand:
   - GET /api/v1/documents/{id}
   - Returns latest processing_results

Security Implementation

Authentication (JWT)

Login Request
API verifies credentials
Issues JWT token (5 hour expiry)
Client includes in Authorization header
API validates signature on each request

Authorization (RBAC)

User → Role (admin, user, viewer)
Role → Permissions (create, read, update, delete)

Encryption

In Transit: HTTPS/TLS 1.3
At Rest: AES-256-GCM for sensitive fields
Database: Passwords hashed with bcrypt

Monitoring & Observability

Metrics Collected: - Request latency (p50, p95, p99) - Error rates by endpoint - Queue depth and processing time - Database query performance - Worker availability and throughput

Logging Strategy: - Structured JSON logs - Log level: DEBUG, INFO, WARN, ERROR - Correlation IDs for request tracking - Centralized log aggregation

Health Checks: - /health - API is running - /ready - Ready to accept traffic - Worker heartbeat to Redis - Database connection pooling

Disaster Recovery

Backup Strategy: - Daily PostgreSQL backups to S3 - Weekly full database dumps - 30-day retention policy - Point-in-time recovery available

Failover: - Multiple API instances (no single point of failure) - Redis persistence enabled - Database replication ready - CDN for static assets

Incident Response: - On-call rotation for critical issues - Automated alerting for anomalies - Runbooks for common issues - Post-incident reviews

Performance Targets

Metric Target
API response time (p95) < 200ms
Document processing < 5 seconds
Database query < 100ms
API availability 99.9%
Queue processing latency < 1 second