Advanced Patterns¶
Complex usage patterns and best practices for production applications.
Retry Logic with Exponential Backoff¶
Python: Automatic Retries¶
import time
import random
from opencomplai import Client, RateLimitError, APIError
def create_with_retries(client, title, content, max_retries=3):
"""Create document with exponential backoff retry logic."""
for attempt in range(max_retries):
try:
doc = client.documents.create(
title=title,
content=content
)
return doc
except RateLimitError as e:
# Rate limited - wait before retrying
wait_time = 2 ** attempt # 1s, 2s, 4s
jitter = random.uniform(0, 1)
total_wait = wait_time + jitter
if attempt == max_retries - 1:
raise # Last attempt failed
print(f"Rate limited. Waiting {total_wait:.1f}s before retry {attempt + 1}...")
time.sleep(total_wait)
except APIError as e:
# Server error - might be transient
if e.status_code >= 500 and attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Server error. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
# Usage
client = Client(api_key="sk_test_xyz...")
doc = create_with_retries(
client,
title="Important Document",
content="..."
)
Caching Pattern¶
Python: Cache Processing Results¶
import json
from pathlib import Path
from datetime import datetime, timedelta
from opencomplai import Client
class DocumentCache:
"""Cache document processing results locally."""
def __init__(self, cache_dir=".cache/documents"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.ttl = 3600 # 1 hour
def get_cache_path(self, doc_id):
return self.cache_dir / f"{doc_id}.json"
def get(self, doc_id):
"""Get cached document or None if expired."""
cache_path = self.get_cache_path(doc_id)
if not cache_path.exists():
return None
# Check if cache expired
mtime = cache_path.stat().st_mtime
age = (datetime.now() - datetime.fromtimestamp(mtime)).total_seconds()
if age > self.ttl:
cache_path.unlink() # Delete expired cache
return None
with open(cache_path) as f:
return json.load(f)
def set(self, doc_id, data):
"""Save document to cache."""
cache_path = self.get_cache_path(doc_id)
with open(cache_path, 'w') as f:
json.dump(data, f, indent=2)
# Usage
cache = DocumentCache()
client = Client(api_key="sk_test_xyz...")
doc_id = "doc_abc123"
# Check cache first
cached = cache.get(doc_id)
if cached:
print("Using cached data")
doc = cached
else:
print("Fetching from API")
doc = client.documents.get(doc_id)
cache.set(doc_id, doc.__dict__)
Batch Processing with Progress¶
Python: Process Many Documents¶
from opencomplai import Client
import time
def process_documents_batch(file_list, batch_size=5):
"""Process multiple documents in parallel batches."""
client = Client(api_key="sk_test_xyz...")
results = []
for i in range(0, len(file_list), batch_size):
batch = file_list[i:i + batch_size]
print(f"\nProcessing batch {i // batch_size + 1}...")
created_docs = []
# Create all documents in batch
for filename in batch:
try:
with open(filename, 'r') as f:
content = f.read()
doc = client.documents.create(
title=filename,
content=content
)
created_docs.append(doc)
print(f" ✅ Created: {doc.id}")
except Exception as e:
print(f" ❌ Failed to create {filename}: {e}")
# Wait for batch to process
print("Waiting for processing...")
for doc in created_docs:
max_wait = 60
elapsed = 0
while elapsed < max_wait:
doc = client.documents.get(doc.id)
if doc.processing_status == 'completed':
results.append(doc)
print(f" ✅ Processed: {doc.id}")
break
elif doc.processing_status == 'failed':
print(f" ❌ Failed: {doc.id}")
break
time.sleep(2)
elapsed += 2
else:
print(f" ⏱️ Timeout: {doc.id}")
return results
# Usage
files = ["report1.txt", "report2.txt", "report3.txt"]
results = process_documents_batch(files, batch_size=3)
print(f"\nProcessed {len(results)} documents successfully")
Webhook Listener¶
Python: Real-Time Event Handling¶
from flask import Flask, request
from opencomplai import Client
import json
app = Flask(__name__)
client = Client(api_key="sk_test_xyz...")
# Store webhook events
processed_events = []
@app.route('/webhooks/opencomplai', methods=['POST'])
def handle_webhook():
"""Handle document processing webhook."""
event = request.json
event_type = event.get('type')
print(f"📨 Received webhook: {event_type}")
if event_type == 'document.processed':
handle_document_processed(event)
elif event_type == 'document.failed':
handle_document_failed(event)
elif event_type == 'document.created':
handle_document_created(event)
# Return 200 to acknowledge receipt
return {'status': 'ok'}, 200
def handle_document_processed(event):
"""Handle successful document processing."""
doc_id = event.get('data', {}).get('document_id')
results = event.get('data', {}).get('results', {})
print(f"✅ Document {doc_id} processed")
print(f" Extracted entities: {results.get('entities', [])}")
processed_events.append({
'type': 'processed',
'doc_id': doc_id,
'results': results
})
def handle_document_failed(event):
"""Handle document processing failure."""
doc_id = event.get('data', {}).get('document_id')
error = event.get('data', {}).get('error', 'Unknown error')
print(f"❌ Document {doc_id} failed: {error}")
processed_events.append({
'type': 'failed',
'doc_id': doc_id,
'error': error
})
def handle_document_created(event):
"""Handle document creation."""
doc_id = event.get('data', {}).get('document_id')
print(f"📝 Document {doc_id} created")
if __name__ == '__main__':
app.run(port=5000)
# Test webhook
# curl -X POST http://localhost:5000/webhooks/opencomplai \
# -H "Content-Type: application/json" \
# -d '{
# "type": "document.processed",
# "data": {
# "document_id": "doc_xyz",
# "results": {"entities": ["Apple", "Microsoft"]}
# }
# }'
Streaming Large Results¶
JavaScript: Stream Processing Results¶
import { OpenComplai } from '@opencomplai/sdk';
import { createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';
async function exportDocumentsAsJSON(userDocuments) {
const client = new OpenComplai({
apiKey: process.env.OPENCOMPLAI_API_KEY
});
const outputStream = createWriteStream('export.jsonl');
try {
outputStream.write(''); // Start the stream
for (const docId of userDocuments) {
const doc = await client.documents.get(docId);
// Write one JSON object per line (JSONL format)
outputStream.write(JSON.stringify(doc) + '\n');
console.log(`✅ Exported: ${doc.title}`);
}
outputStream.end();
console.log('Export complete!');
} catch (error) {
console.error('Export failed:', error);
}
}
// Usage
const docs = ['doc_1', 'doc_2', 'doc_3'];
exportDocumentsAsJSON(docs);
Parallel Requests¶
JavaScript: Concurrent API Calls¶
import { OpenComplai } from '@opencomplai/sdk';
async function processDocumentsParallel(documentIds, concurrency = 5) {
const client = new OpenComplai({
apiKey: process.env.OPENCOMPLAI_API_KEY
});
const results = [];
const inProgress = new Set();
for (let i = 0; i < documentIds.length; i++) {
// Wait if we're at max concurrency
while (inProgress.size >= concurrency) {
await Promise.race(inProgress);
}
// Start a new request
const docId = documentIds[i];
const promise = client.documents.get(docId)
.then(doc => {
results.push(doc);
inProgress.delete(promise);
console.log(`✅ Fetched: ${doc.title}`);
return doc;
})
.catch(error => {
console.error(`❌ Failed: ${docId}`);
inProgress.delete(promise);
throw error;
});
inProgress.add(promise);
}
// Wait for all remaining requests
await Promise.all(inProgress);
return results;
}
// Usage
const ids = Array.from({ length: 20 }, (_, i) => `doc_${i}`);
const results = await processDocumentsParallel(ids, concurrency=5);
console.log(`Processed ${results.length} documents`);
Configuration Management¶
Python: Environment-Based Config¶
import os
from enum import Enum
from dotenv import load_dotenv
# Load .env file
load_dotenv()
class Environment(str, Enum):
LOCAL = "local"
STAGING = "staging"
PRODUCTION = "production"
class Config:
"""OpenComplai configuration."""
ENV = Environment(os.getenv("ENV", "local"))
API_KEY = os.getenv("OPENCOMPLAI_API_KEY")
API_URL = os.getenv(
"OPENCOMPLAI_API_URL",
"https://api.opencomplai.com"
)
# Feature flags
ENABLE_CACHE = os.getenv("ENABLE_CACHE", "true").lower() == "true"
ENABLE_WEBHOOKS = os.getenv("ENABLE_WEBHOOKS", "false").lower() == "true"
# Limits
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30"))
@classmethod
def validate(cls):
"""Validate required config."""
if not cls.API_KEY:
raise ValueError("OPENCOMPLAI_API_KEY environment variable required")
# Usage
Config.validate()
print(f"Environment: {Config.ENV}")
print(f"Cache enabled: {Config.ENABLE_CACHE}")
Error Recovery Strategies¶
Python: Graceful Degradation¶
from opencomplai import Client, APIError
def get_document_with_fallback(doc_id, fallback_content=""):
"""Get document, use fallback if API unavailable."""
try:
client = Client(api_key="sk_test_xyz...")
doc = client.documents.get(doc_id)
return doc
except APIError as e:
if e.status_code >= 500:
# Server error - use fallback
print("⚠️ API unavailable, using fallback")
return {
'id': doc_id,
'title': 'Fallback Document',
'content': fallback_content,
'error': 'API was unavailable'
}
else:
# Client error - re-raise
raise
# Usage
doc = get_document_with_fallback(
"doc_xyz",
fallback_content="Default content when API is down"
)
Related Documentation¶
- Basic Usage - Common patterns
- Error Handling Guide - Error codes
- Performance Guide - Optimization tips
- API Reference - Complete API documentation