Skip to content

Advanced Patterns

Complex usage patterns and best practices for production applications.

Retry Logic with Exponential Backoff

Python: Automatic Retries

import time
import random
from opencomplai import Client, RateLimitError, APIError

def create_with_retries(client, title, content, max_retries=3):
    """Create document with exponential backoff retry logic."""

    for attempt in range(max_retries):
        try:
            doc = client.documents.create(
                title=title,
                content=content
            )
            return doc

        except RateLimitError as e:
            # Rate limited - wait before retrying
            wait_time = 2 ** attempt  # 1s, 2s, 4s
            jitter = random.uniform(0, 1)
            total_wait = wait_time + jitter

            if attempt == max_retries - 1:
                raise  # Last attempt failed

            print(f"Rate limited. Waiting {total_wait:.1f}s before retry {attempt + 1}...")
            time.sleep(total_wait)

        except APIError as e:
            # Server error - might be transient
            if e.status_code >= 500 and attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"Server error. Retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise

# Usage
client = Client(api_key="sk_test_xyz...")
doc = create_with_retries(
    client,
    title="Important Document",
    content="..."
)

Caching Pattern

Python: Cache Processing Results

import json
from pathlib import Path
from datetime import datetime, timedelta
from opencomplai import Client

class DocumentCache:
    """Cache document processing results locally."""

    def __init__(self, cache_dir=".cache/documents"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.ttl = 3600  # 1 hour

    def get_cache_path(self, doc_id):
        return self.cache_dir / f"{doc_id}.json"

    def get(self, doc_id):
        """Get cached document or None if expired."""
        cache_path = self.get_cache_path(doc_id)

        if not cache_path.exists():
            return None

        # Check if cache expired
        mtime = cache_path.stat().st_mtime
        age = (datetime.now() - datetime.fromtimestamp(mtime)).total_seconds()

        if age > self.ttl:
            cache_path.unlink()  # Delete expired cache
            return None

        with open(cache_path) as f:
            return json.load(f)

    def set(self, doc_id, data):
        """Save document to cache."""
        cache_path = self.get_cache_path(doc_id)
        with open(cache_path, 'w') as f:
            json.dump(data, f, indent=2)

# Usage
cache = DocumentCache()
client = Client(api_key="sk_test_xyz...")

doc_id = "doc_abc123"

# Check cache first
cached = cache.get(doc_id)
if cached:
    print("Using cached data")
    doc = cached
else:
    print("Fetching from API")
    doc = client.documents.get(doc_id)
    cache.set(doc_id, doc.__dict__)

Batch Processing with Progress

Python: Process Many Documents

from opencomplai import Client
import time

def process_documents_batch(file_list, batch_size=5):
    """Process multiple documents in parallel batches."""

    client = Client(api_key="sk_test_xyz...")
    results = []

    for i in range(0, len(file_list), batch_size):
        batch = file_list[i:i + batch_size]
        print(f"\nProcessing batch {i // batch_size + 1}...")

        created_docs = []

        # Create all documents in batch
        for filename in batch:
            try:
                with open(filename, 'r') as f:
                    content = f.read()

                doc = client.documents.create(
                    title=filename,
                    content=content
                )
                created_docs.append(doc)
                print(f"  ✅ Created: {doc.id}")

            except Exception as e:
                print(f"  ❌ Failed to create {filename}: {e}")

        # Wait for batch to process
        print("Waiting for processing...")
        for doc in created_docs:
            max_wait = 60
            elapsed = 0

            while elapsed < max_wait:
                doc = client.documents.get(doc.id)

                if doc.processing_status == 'completed':
                    results.append(doc)
                    print(f"  ✅ Processed: {doc.id}")
                    break
                elif doc.processing_status == 'failed':
                    print(f"  ❌ Failed: {doc.id}")
                    break

                time.sleep(2)
                elapsed += 2
            else:
                print(f"  ⏱️  Timeout: {doc.id}")

    return results

# Usage
files = ["report1.txt", "report2.txt", "report3.txt"]
results = process_documents_batch(files, batch_size=3)
print(f"\nProcessed {len(results)} documents successfully")

Webhook Listener

Python: Real-Time Event Handling

from flask import Flask, request
from opencomplai import Client
import json

app = Flask(__name__)
client = Client(api_key="sk_test_xyz...")

# Store webhook events
processed_events = []

@app.route('/webhooks/opencomplai', methods=['POST'])
def handle_webhook():
    """Handle document processing webhook."""

    event = request.json
    event_type = event.get('type')

    print(f"📨 Received webhook: {event_type}")

    if event_type == 'document.processed':
        handle_document_processed(event)
    elif event_type == 'document.failed':
        handle_document_failed(event)
    elif event_type == 'document.created':
        handle_document_created(event)

    # Return 200 to acknowledge receipt
    return {'status': 'ok'}, 200

def handle_document_processed(event):
    """Handle successful document processing."""
    doc_id = event.get('data', {}).get('document_id')
    results = event.get('data', {}).get('results', {})

    print(f"✅ Document {doc_id} processed")
    print(f"   Extracted entities: {results.get('entities', [])}")

    processed_events.append({
        'type': 'processed',
        'doc_id': doc_id,
        'results': results
    })

def handle_document_failed(event):
    """Handle document processing failure."""
    doc_id = event.get('data', {}).get('document_id')
    error = event.get('data', {}).get('error', 'Unknown error')

    print(f"❌ Document {doc_id} failed: {error}")

    processed_events.append({
        'type': 'failed',
        'doc_id': doc_id,
        'error': error
    })

def handle_document_created(event):
    """Handle document creation."""
    doc_id = event.get('data', {}).get('document_id')
    print(f"📝 Document {doc_id} created")

if __name__ == '__main__':
    app.run(port=5000)

# Test webhook
# curl -X POST http://localhost:5000/webhooks/opencomplai \
#   -H "Content-Type: application/json" \
#   -d '{
#     "type": "document.processed",
#     "data": {
#       "document_id": "doc_xyz",
#       "results": {"entities": ["Apple", "Microsoft"]}
#     }
#   }'

Streaming Large Results

JavaScript: Stream Processing Results

import { OpenComplai } from '@opencomplai/sdk';
import { createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';

async function exportDocumentsAsJSON(userDocuments) {
  const client = new OpenComplai({
    apiKey: process.env.OPENCOMPLAI_API_KEY
  });

  const outputStream = createWriteStream('export.jsonl');

  try {
    outputStream.write(''); // Start the stream

    for (const docId of userDocuments) {
      const doc = await client.documents.get(docId);

      // Write one JSON object per line (JSONL format)
      outputStream.write(JSON.stringify(doc) + '\n');

      console.log(`✅ Exported: ${doc.title}`);
    }

    outputStream.end();
    console.log('Export complete!');

  } catch (error) {
    console.error('Export failed:', error);
  }
}

// Usage
const docs = ['doc_1', 'doc_2', 'doc_3'];
exportDocumentsAsJSON(docs);

Parallel Requests

JavaScript: Concurrent API Calls

import { OpenComplai } from '@opencomplai/sdk';

async function processDocumentsParallel(documentIds, concurrency = 5) {
  const client = new OpenComplai({
    apiKey: process.env.OPENCOMPLAI_API_KEY
  });

  const results = [];
  const inProgress = new Set();

  for (let i = 0; i < documentIds.length; i++) {
    // Wait if we're at max concurrency
    while (inProgress.size >= concurrency) {
      await Promise.race(inProgress);
    }

    // Start a new request
    const docId = documentIds[i];
    const promise = client.documents.get(docId)
      .then(doc => {
        results.push(doc);
        inProgress.delete(promise);
        console.log(`✅ Fetched: ${doc.title}`);
        return doc;
      })
      .catch(error => {
        console.error(`❌ Failed: ${docId}`);
        inProgress.delete(promise);
        throw error;
      });

    inProgress.add(promise);
  }

  // Wait for all remaining requests
  await Promise.all(inProgress);

  return results;
}

// Usage
const ids = Array.from({ length: 20 }, (_, i) => `doc_${i}`);
const results = await processDocumentsParallel(ids, concurrency=5);
console.log(`Processed ${results.length} documents`);

Configuration Management

Python: Environment-Based Config

import os
from enum import Enum
from dotenv import load_dotenv

# Load .env file
load_dotenv()

class Environment(str, Enum):
    LOCAL = "local"
    STAGING = "staging"
    PRODUCTION = "production"

class Config:
    """OpenComplai configuration."""

    ENV = Environment(os.getenv("ENV", "local"))
    API_KEY = os.getenv("OPENCOMPLAI_API_KEY")
    API_URL = os.getenv(
        "OPENCOMPLAI_API_URL",
        "https://api.opencomplai.com"
    )

    # Feature flags
    ENABLE_CACHE = os.getenv("ENABLE_CACHE", "true").lower() == "true"
    ENABLE_WEBHOOKS = os.getenv("ENABLE_WEBHOOKS", "false").lower() == "true"

    # Limits
    MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
    REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30"))

    @classmethod
    def validate(cls):
        """Validate required config."""
        if not cls.API_KEY:
            raise ValueError("OPENCOMPLAI_API_KEY environment variable required")

# Usage
Config.validate()
print(f"Environment: {Config.ENV}")
print(f"Cache enabled: {Config.ENABLE_CACHE}")

Error Recovery Strategies

Python: Graceful Degradation

from opencomplai import Client, APIError

def get_document_with_fallback(doc_id, fallback_content=""):
    """Get document, use fallback if API unavailable."""

    try:
        client = Client(api_key="sk_test_xyz...")
        doc = client.documents.get(doc_id)
        return doc

    except APIError as e:
        if e.status_code >= 500:
            # Server error - use fallback
            print("⚠️  API unavailable, using fallback")
            return {
                'id': doc_id,
                'title': 'Fallback Document',
                'content': fallback_content,
                'error': 'API was unavailable'
            }
        else:
            # Client error - re-raise
            raise

# Usage
doc = get_document_with_fallback(
    "doc_xyz",
    fallback_content="Default content when API is down"
)