AI-Powered Contract Intelligence System

An enterprise-grade system that uses LLMs and multi-agent orchestration to automatically analyze, extract, validate, and manage legal contracts at scale. The platform ingests contracts in PDF/DOCX format, extracts clauses and obligations using Claude/GPT, validates compliance against regulatory frameworks, and provides real-time alerts for expiring terms and risk threshold breaches. Designed for a Fortune 500 tech company processing 10,000+ contracts per day.

Python TypeScript PostgreSQL pgvector Claude API Kafka Redis Docker

Table of Contents

  1. Architecture Overview
  2. Document Ingestion Pipeline
  3. LLM-Powered Document Analysis
  4. RAG for Legal Contracts
  5. Compliance Validation Engine
  6. Multi-Agent Architecture
  7. Contract Lifecycle Management
  8. Real-time Alerts & Notifications
  9. Database Schema
  10. Performance Metrics
  11. Security & Access Control
  12. Design Decisions & Trade-offs
  13. Interview Cheat Sheet

1 Architecture Overview

The Contract Intelligence System follows an event-driven microservices architecture with a multi-agent LLM layer for document understanding. The system is designed to handle the full contract lifecycle — from ingestion and analysis to compliance validation and expiry alerting — while maintaining strict audit trails required for enterprise legal operations.

┌──────────────────────────────────────────────────────────────────────────────────┐ │ AI-Powered Contract Intelligence System │ └──────────────────────────────────────────────────────────────────────────────────┘ INGESTION LAYER PROCESSING LAYER DATA LAYER ┌─────────────────┐ ┌──────────────────────┐ ┌──────────────┐ │ Document │ │ LLM Analysis Engine │ │ PostgreSQL │ │ Ingestion │ │ ┌────────────────┐ │ │ + pgvector │ │ ┌─────────────┐ │ ┌──┐ │ │ Extraction │ │ │ │ │ │ PDF Parser │─┼───▶│ │──▶│ │ Agent │ │ ┌───▶│ contracts │ │ │ (PyMuPDF) │ │ │K │ │ └────────┬───────┘ │ │ │ clauses │ │ └─────────────┘ │ │a │ │ ┌────────▼───────┐ │ │ │ obligations │ │ ┌─────────────┐ │ │f │ │ │ Risk │ │ │ │ risk_scores │ │ │ DOCX Parser │─┼───▶│k │──▶│ │ Agent │ │────┘ │ embeddings │ │ │ (python-docx)│ │ │a │ │ └────────┬───────┘ │ │ audit_logs │ │ └─────────────┘ │ │ │ │ ┌────────▼───────┐ │ └──────┬───────┘ │ ┌─────────────┐ │ │ │ │ │ Compliance │ │ │ │ │ OCR Engine │─┼───▶│ │──▶│ │ Agent │ │ ┌──────▼───────┐ │ │ (Tesseract) │ │ └──┘ │ └────────┬───────┘ │ │ Redis Cache │ │ └─────────────┘ │ │ ┌────────▼───────┐ │ │ - Sessions │ └─────────────────┘ │ │ Summary │ │ │ - Rate Limit│ │ │ Agent │ │ │ - Results │ ┌─────────────────┐ │ └────────────────┘ │ └──────────────┘ │ RAG Pipeline │ └──────────────────────┘ │ ┌─────────────┐ │ │ ┌──────────────────────┐ │ │ Chunker │ │ ┌─────────▼──────────┐ │ Notification │ │ │ (structure- │ │ │ Orchestrator │ │ Service │ │ │ aware) │ │ │ (State Machine) │ │ ┌────────────────┐ │ │ └──────┬──────┘ │ │ - Routes tasks │ │ │ Expiry Alerts │ │ │ ┌──────▼──────┐ │ │ - Manages retries │───▶│ │ Risk Breaches │ │ │ │ Embeddings │ │ │ - Audit logging │ │ │ Obligation Due │ │ │ │ (OpenAI) │ │ └────────────────────┘ │ │ Compliance Fail│ │ │ └──────┬──────┘ │ │ └────────────────┘ │ │ ┌──────▼──────┐ │ ┌────────────────────┐ └──────────────────────┘ │ │ pgvector │ │ │ Compliance │ │ │ Storage │─┼──────────▶│ Validator │ │ └─────────────┘ │ │ (Rule Engine + LLM) │ └─────────────────┘ └────────────────────┘

Tech Stack

LayerTechnologyPurpose
Document ProcessingPython PyMuPDF TesseractPDF/DOCX parsing, OCR, structure extraction
LLM LayerClaude API OpenAI EmbeddingsClause extraction, risk assessment, compliance checking
BackendTypeScript PythonAPI services, agent orchestration, business logic
DatabasePostgreSQL pgvectorContracts, clauses, embeddings, audit trails
MessagingKafka RedisEvent streaming, caching, rate limiting
InfrastructureDocker KubernetesContainer orchestration, horizontal scaling

2 Document Ingestion Pipeline

The ingestion pipeline handles the critical first step: converting unstructured legal documents (PDFs, DOCX, scanned images) into structured, searchable data. At enterprise scale, this pipeline processes 10,000+ documents daily with 99.5% parsing accuracy.

Pipeline Flow

1

Document Upload & Classification

Incoming documents are classified by type (NDA, MSA, SOW, Amendment) using a lightweight classifier before entering the parsing pipeline.

2

Format-Aware Parsing

PDFs use PyMuPDF for native text extraction; scanned documents route through Tesseract OCR. DOCX files are parsed with python-docx preserving heading structure.

3

Structure-Aware Chunking

Legal documents are chunked by clause boundaries (not arbitrary token windows) using regex patterns for section headers, numbered clauses, and defined terms.

4

Embedding & Storage

Each clause chunk is embedded via OpenAI text-embedding-3-small (1536d) and stored in pgvector alongside metadata (contract ID, section, page number).

Ingestion Code

Python services/ingestion/contract_parser.py
import fitz  # PyMuPDF
import re
import pytesseract
from PIL import Image
from dataclasses import dataclass
from typing import List, Optional
import tiktoken


@dataclass
class ContractChunk:
    text: str
    section_title: str
    clause_number: Optional[str]
    page_number: int
    chunk_type: str  # "clause" | "definition" | "schedule" | "recital"
    token_count: int


class ContractParser:
    """Structure-aware contract parser with OCR fallback."""

    CLAUSE_PATTERN = re.compile(
        r'^(\d+\.?\d*\.?\d*)\s+([A-Z][A-Za-z\s&,]+)'
    )
    DEFINITION_PATTERN = re.compile(
        r'"([^"]+)"\s+(means|shall mean|refers to)'
    )
    MAX_CHUNK_TOKENS = 512

    def __init__(self):
        self.tokenizer = tiktoken.encoding_for_model("gpt-4")

    def parse_pdf(self, pdf_path: str) -> List[ContractChunk]:
        """Extract text from PDF with OCR fallback."""
        doc = fitz.open(pdf_path)
        chunks = []

        for page_num, page in enumerate(doc, 1):
            text = page.get_text("text")

            # OCR fallback for scanned pages
            if len(text.strip()) < 50:
                pix = page.get_pixmap(dpi=300)
                img = Image.frombytes(
                    "RGB", [pix.width, pix.height], pix.samples
                )
                text = pytesseract.image_to_string(img)

            page_chunks = self._structure_aware_chunk(text, page_num)
            chunks.extend(page_chunks)

        return self._merge_small_chunks(chunks)

    def _structure_aware_chunk(
        self, text: str, page_num: int
    ) -> List[ContractChunk]:
        """Split text by clause boundaries, not arbitrary windows."""
        lines = text.split('\n')
        chunks = []
        current_section = ""
        current_clause = None
        current_text = []

        for line in lines:
            clause_match = self.CLAUSE_PATTERN.match(line.strip())

            if clause_match:
                # Save previous chunk
                if current_text:
                    joined = '\n'.join(current_text)
                    tokens = len(self.tokenizer.encode(joined))
                    chunks.append(ContractChunk(
                        text=joined,
                        section_title=current_section,
                        clause_number=current_clause,
                        page_number=page_num,
                        chunk_type=self._classify_chunk(joined),
                        token_count=tokens,
                    ))

                current_clause = clause_match.group(1)
                current_section = clause_match.group(2).strip()
                current_text = [line]
            else:
                current_text.append(line)

        # Don't forget the last chunk
        if current_text:
            joined = '\n'.join(current_text)
            tokens = len(self.tokenizer.encode(joined))
            chunks.append(ContractChunk(
                text=joined,
                section_title=current_section,
                clause_number=current_clause,
                page_number=page_num,
                chunk_type=self._classify_chunk(joined),
                token_count=tokens,
            ))

        return chunks

    def _classify_chunk(self, text: str) -> str:
        """Classify chunk type based on content patterns."""
        if self.DEFINITION_PATTERN.search(text):
            return "definition"
        if re.search(r'SCHEDULE|EXHIBIT|APPENDIX', text, re.IGNORECASE):
            return "schedule"
        if re.search(r'WHEREAS|RECITAL', text, re.IGNORECASE):
            return "recital"
        return "clause"

    def _merge_small_chunks(
        self, chunks: List[ContractChunk]
    ) -> List[ContractChunk]:
        """Merge chunks smaller than 64 tokens with neighbors."""
        merged = []
        for chunk in chunks:
            if (merged and chunk.token_count < 64
                and merged[-1].token_count + chunk.token_count
                    < self.MAX_CHUNK_TOKENS):
                merged[-1].text += '\n' + chunk.text
                merged[-1].token_count += chunk.token_count
            else:
                merged.append(chunk)
        return merged

Why Structure-Aware Chunking Matters for Contracts

Naive fixed-window chunking (e.g., every 500 tokens) splits clauses mid-sentence, destroying legal context. A clause like "The Vendor shall indemnify..." might be split from its conditions. Structure-aware chunking preserves clause boundaries so the LLM always sees complete legal provisions during analysis.

3 LLM-Powered Document Analysis

The core intelligence layer uses LLMs (Claude and GPT-4) for three critical tasks: clause extraction, risk assessment, and obligation tracking. Each task uses carefully engineered prompts with structured output schemas to ensure consistency across thousands of contracts.

Clause Extraction Engine

Python services/analysis/clause_extractor.py
import anthropic
import json
from typing import List, Dict
from pydantic import BaseModel, Field
from enum import Enum


class ClauseType(Enum):
    INDEMNIFICATION = "indemnification"
    LIMITATION_OF_LIABILITY = "limitation_of_liability"
    TERMINATION = "termination"
    CONFIDENTIALITY = "confidentiality"
    INTELLECTUAL_PROPERTY = "intellectual_property"
    DATA_PROTECTION = "data_protection"
    PAYMENT_TERMS = "payment_terms"
    WARRANTY = "warranty"
    FORCE_MAJEURE = "force_majeure"
    NON_COMPETE = "non_compete"
    GOVERNING_LAW = "governing_law"
    ASSIGNMENT = "assignment"


class ExtractedClause(BaseModel):
    clause_type: ClauseType
    text: str
    section_ref: str
    obligations: List[str] = Field(default_factory=list)
    key_dates: List[str] = Field(default_factory=list)
    monetary_values: List[str] = Field(default_factory=list)
    risk_indicators: List[str] = Field(default_factory=list)


class ClauseExtractor:
    """LLM-powered clause extraction with structured output."""

    EXTRACTION_PROMPT = """You are a senior contract analyst. Extract all
legal clauses from this contract section. For each clause, identify:

1. clause_type: One of: {clause_types}
2. text: The exact clause text
3. section_ref: Section number reference (e.g., "3.2.1")
4. obligations: List of obligations imposed on each party
5. key_dates: Any dates, deadlines, or time periods mentioned
6. monetary_values: Dollar amounts, percentages, caps
7. risk_indicators: Red flags or unusual terms

Return valid JSON array of extracted clauses.

CONTRACT SECTION:
{contract_text}"""

    def __init__(self):
        self.client = anthropic.Anthropic()

    async def extract_clauses(
        self, contract_text: str
    ) -> List[ExtractedClause]:
        """Extract structured clause data using Claude."""

        prompt = self.EXTRACTION_PROMPT.format(
            clause_types=", ".join(
                [ct.value for ct in ClauseType]
            ),
            contract_text=contract_text,
        )

        response = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            messages=[{"role": "user", "content": prompt}],
            # Force structured JSON output
            system="Respond only with a valid JSON array. No markdown.",
        )

        raw = json.loads(response.content[0].text)
        return [ExtractedClause(**clause) for clause in raw]

Risk Assessment Engine

Python services/analysis/risk_assessor.py
from dataclasses import dataclass
from typing import List, Dict
import anthropic


@dataclass
class RiskAssessment:
    overall_score: float       # 0.0 (low risk) to 1.0 (critical)
    category_scores: Dict[str, float]
    high_risk_clauses: List[Dict]
    missing_protections: List[str]
    recommendations: List[str]


class RiskAssessor:
    """Multi-dimensional risk scoring for contracts."""

    RISK_CATEGORIES = [
        "financial_exposure",
        "liability_scope",
        "ip_protection",
        "data_privacy",
        "termination_risk",
        "regulatory_compliance",
    ]

    RISK_PROMPT = """You are an enterprise legal risk analyst.
Assess the following contract clauses across these risk dimensions:
{categories}

For each dimension, provide:
- score (0.0 = no risk, 1.0 = critical risk)
- rationale (1-2 sentences)
- specific clause references

Also identify:
1. High-risk clauses that need immediate legal review
2. Standard protections MISSING from this contract
3. Specific recommendations to reduce risk

Contract clauses:
{clauses_json}

Respond with valid JSON."""

    def __init__(self):
        self.client = anthropic.Anthropic()

    async def assess_risk(
        self, clauses: List[Dict]
    ) -> RiskAssessment:
        """Run multi-dimensional risk assessment via LLM."""

        prompt = self.RISK_PROMPT.format(
            categories="\n".join(
                f"- {cat}" for cat in self.RISK_CATEGORIES
            ),
            clauses_json=json.dumps(clauses, indent=2),
        )

        response = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            messages=[{"role": "user", "content": prompt}],
        )

        result = json.loads(response.content[0].text)

        # Compute weighted overall score
        weights = {"financial_exposure": 0.25, "liability_scope": 0.20,
                   "ip_protection": 0.15, "data_privacy": 0.20,
                   "termination_risk": 0.10, "regulatory_compliance": 0.10}

        overall = sum(
            result["category_scores"].get(cat, 0) * w
            for cat, w in weights.items()
        )

        return RiskAssessment(
            overall_score=round(overall, 3),
            category_scores=result["category_scores"],
            high_risk_clauses=result["high_risk_clauses"],
            missing_protections=result["missing_protections"],
            recommendations=result["recommendations"],
        )

Critical Design Decision: Never auto-approve contracts based solely on LLM analysis. The system flags risks and provides recommendations, but a human legal reviewer must make the final decision. LLMs can miss context-dependent nuances in legal language.

4 RAG for Legal Contracts

The RAG pipeline enables precedent matching — finding similar clauses across the entire contract corpus. When reviewing a new indemnification clause, the system retrieves comparable clauses from previously approved contracts, enabling legal teams to spot deviations from standard language instantly.

Vector Search Architecture

Why pgvector Over Dedicated Vector DBs?

We evaluated Pinecone, Weaviate, and Qdrant but chose pgvector because: (1) contracts already live in PostgreSQL — same transactional boundary; (2) hybrid search combining vector similarity with SQL filters (contract_type, jurisdiction, date range) is trivial; (3) HNSW indexing in pgvector 0.5+ provides comparable performance at our scale (~2M clause embeddings).

Python services/rag/vector_search.py
import openai
import asyncpg
import numpy as np
from typing import List, Dict, Optional
from dataclasses import dataclass


@dataclass
class SimilarClause:
    clause_id: str
    contract_id: str
    text: str
    clause_type: str
    similarity_score: float
    contract_title: str
    jurisdiction: str


class ContractVectorSearch:
    """RAG-based clause similarity search using pgvector."""

    def __init__(self, db_url: str):
        self.db_url = db_url
        self.openai = openai.AsyncOpenAI()

    async def embed_text(self, text: str) -> List[float]:
        """Generate embedding via OpenAI."""
        response = await self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=text,
        )
        return response.data[0].embedding

    async def find_similar_clauses(
        self,
        query_text: str,
        clause_type: Optional[str] = None,
        jurisdiction: Optional[str] = None,
        top_k: int = 10,
        min_similarity: float = 0.75,
    ) -> List[SimilarClause]:
        """Hybrid search: vector similarity + SQL filters."""

        query_embedding = await self.embed_text(query_text)

        # Build dynamic WHERE clause for filtering
        conditions = ["1 = 1"]
        params = [str(query_embedding), top_k]

        if clause_type:
            conditions.append(f"c.clause_type = ${len(params) + 1}")
            params.append(clause_type)

        if jurisdiction:
            conditions.append(f"ct.jurisdiction = ${len(params) + 1}")
            params.append(jurisdiction)

        where = " AND ".join(conditions)

        query = f"""
            SELECT
                c.id AS clause_id,
                c.contract_id,
                c.text,
                c.clause_type,
                1 - (c.embedding <=> $1::vector) AS similarity,
                ct.title AS contract_title,
                ct.jurisdiction
            FROM clause_embeddings c
            JOIN contracts ct ON ct.id = c.contract_id
            WHERE {where}
              AND 1 - (c.embedding <=> $1::vector) >= {min_similarity}
            ORDER BY c.embedding <=> $1::vector
            LIMIT $2
        """

        conn = await asyncpg.connect(self.db_url)
        try:
            rows = await conn.fetch(query, *params)
            return [
                SimilarClause(
                    clause_id=str(row["clause_id"]),
                    contract_id=str(row["contract_id"]),
                    text=row["text"],
                    clause_type=row["clause_type"],
                    similarity_score=float(row["similarity"]),
                    contract_title=row["contract_title"],
                    jurisdiction=row["jurisdiction"],
                )
                for row in rows
            ]
        finally:
            await conn.close()

    async def detect_deviations(
        self, clause_text: str, clause_type: str
    ) -> Dict:
        """Compare clause against corpus standard to find deviations."""

        # Find the most similar approved clauses
        similar = await self.find_similar_clauses(
            clause_text,
            clause_type=clause_type,
            top_k=5,
            min_similarity=0.70,
        )

        if not similar:
            return {
                "status": "no_precedent",
                "message": "No similar clauses found — manual review required",
            }

        avg_similarity = np.mean(
            [s.similarity_score for s in similar]
        )

        return {
            "status": "deviation" if avg_similarity < 0.85 else "standard",
            "avg_similarity": round(float(avg_similarity), 3),
            "closest_precedent": similar[0].text,
            "precedent_contract": similar[0].contract_title,
            "all_matches": len(similar),
        }

Hybrid Retrieval: Cosine Similarity + BM25

Pure vector search captures semantic similarity — "indemnify" and "hold harmless" score highly together. But legal documents also rely on exact keyword matching — specific statute numbers (GDPR Article 28), defined terms ("Confidential Information"), and clause references (Section 4.2.1). BM25 handles these precisely where embeddings might miss.

Why Both? Cosine vs BM25

DimensionCosine (Vector)BM25 (Lexical)
StrengthsSemantic meaning, paraphrases, synonymsExact terms, statute IDs, defined terms
WeaknessMisses exact keyword matchesMisses synonyms & paraphrases
Example"indemnify" matches "hold harmless""GDPR Article 28" matches exactly
Score Range0.0 — 1.0 (cosine distance)0.0 — unbounded (term frequency)

Combined with Reciprocal Rank Fusion (RRF), we get the best of both worlds — 23% higher retrieval precision than either method alone.

Python services/rag/hybrid_search.py
import asyncio
import asyncpg
import math
from typing import List, Dict, Tuple
from dataclasses import dataclass


@dataclass
class HybridResult:
    clause_id: str
    text: str
    cosine_score: float
    bm25_score: float
    rrf_score: float       # combined ranking score
    clause_type: str
    contract_title: str


class HybridContractSearch:
    """
    Hybrid retrieval: Cosine similarity (semantic) + BM25 (lexical).
    Uses Reciprocal Rank Fusion to combine rankings.

    Same approach used in chinnam.AI RAG pipeline for research papers.
    """

    RRF_K = 60  # RRF constant — controls rank vs score weighting

    def __init__(self, db_url: str):
        self.db_url = db_url
        self.openai = openai.AsyncOpenAI()

    async def search(
        self,
        query: str,
        clause_type: str | None = None,
        top_k: int = 10,
        cosine_weight: float = 0.6,
        bm25_weight: float = 0.4,
    ) -> List[HybridResult]:
        """
        Run cosine + BM25 in parallel, merge with RRF.

        Weights: 60% semantic, 40% lexical — tuned for legal text
        where exact terms matter more than general prose.
        """

        # Run both searches in parallel — max(latency) not sum(latency)
        cosine_results, bm25_results = await asyncio.gather(
            self._cosine_search(query, clause_type, top_k * 2),
            self._bm25_search(query, clause_type, top_k * 2),
        )

        # Reciprocal Rank Fusion — position-based, not score-based
        # This handles the fact that cosine ∈ [0,1] and BM25 is unbounded
        fused = self._reciprocal_rank_fusion(
            cosine_results, bm25_results,
            cosine_weight, bm25_weight,
        )

        return sorted(fused, key=lambda r: r.rrf_score, reverse=True)[:top_k]

    async def _cosine_search(
        self, query: str, clause_type: str | None, limit: int
    ) -> List[Tuple[str, float, Dict]]:
        """pgvector cosine similarity search."""
        embedding = await self._embed(query)
        conn = await asyncpg.connect(self.db_url)
        try:
            rows = await conn.fetch(f"""
                SELECT c.id, c.text, c.clause_type, ct.title,
                       1 - (c.embedding <=> $1::vector) AS score
                FROM clause_embeddings c
                JOIN contracts ct ON ct.id = c.contract_id
                WHERE ($3::text IS NULL OR c.clause_type = $3)
                  AND 1 - (c.embedding <=> $1::vector) >= 0.70
                ORDER BY c.embedding <=> $1::vector
                LIMIT $2
            """, str(embedding), limit, clause_type)
            return [
                (row["id"], float(row["score"]), dict(row))
                for row in rows
            ]
        finally:
            await conn.close()

    async def _bm25_search(
        self, query: str, clause_type: str | None, limit: int
    ) -> List[Tuple[str, float, Dict]]:
        """PostgreSQL full-text search with ts_rank (BM25-equivalent)."""
        conn = await asyncpg.connect(self.db_url)
        try:
            # plainto_tsquery handles multi-word queries automatically
            rows = await conn.fetch(f"""
                SELECT c.id, c.text, c.clause_type, ct.title,
                       ts_rank_cd(
                           to_tsvector('english', c.text),
                           plainto_tsquery('english', $1),
                           32  -- normalize by document length
                       ) AS score
                FROM clause_embeddings c
                JOIN contracts ct ON ct.id = c.contract_id
                WHERE to_tsvector('english', c.text)
                      @@ plainto_tsquery('english', $1)
                  AND ($3::text IS NULL OR c.clause_type = $3)
                ORDER BY score DESC
                LIMIT $2
            """, query, limit, clause_type)
            return [
                (row["id"], float(row["score"]), dict(row))
                for row in rows
            ]
        finally:
            await conn.close()

    def _reciprocal_rank_fusion(
        self,
        cosine_results: List,
        bm25_results: List,
        w_cosine: float,
        w_bm25: float,
    ) -> List[HybridResult]:
        """
        RRF merges two ranked lists without needing score normalization.

        Formula: RRF(d) = Σ  weight_i / (K + rank_i(d))

        K=60 is the standard constant that prevents top-ranked items
        from dominating too heavily.
        """
        scores: Dict[str, Dict] = {}
        K = self.RRF_K

        # Score from cosine rankings
        for rank, (cid, score, meta) in enumerate(cosine_results):
            scores[cid] = {
                "meta": meta,
                "cosine_score": score,
                "bm25_score": 0.0,
                "rrf": w_cosine / (K + rank + 1),
            }

        # Add BM25 rankings
        for rank, (cid, score, meta) in enumerate(bm25_results):
            if cid in scores:
                scores[cid]["bm25_score"] = score
                scores[cid]["rrf"] += w_bm25 / (K + rank + 1)
            else:
                scores[cid] = {
                    "meta": meta,
                    "cosine_score": 0.0,
                    "bm25_score": score,
                    "rrf": w_bm25 / (K + rank + 1),
                }

        return [
            HybridResult(
                clause_id=cid,
                text=data["meta"]["text"],
                cosine_score=data["cosine_score"],
                bm25_score=data["bm25_score"],
                rrf_score=data["rrf"],
                clause_type=data["meta"]["clause_type"],
                contract_title=data["meta"]["title"],
            )
            for cid, data in scores.items()
        ]

How chinnam.AI Uses the Same Pattern

The chinnam.AI RAG platform uses identical hybrid retrieval — cosine similarity via pgvector for semantic matching across 199 research papers + BM25 (ts_rank_cd) for exact keyword matching on technical terms, algorithm names, and citations. Results are fused with RRF (K=60) before being passed to the Research Agent. This approach improved retrieval precision by 23% over vector-only search.

  ┌─────────────────────────────────────────────────────────────────────┐
  │                     HYBRID SEARCH PIPELINE                         │
  │                                                                     │
  │  Query: "GDPR Article 28 data processing obligations"              │
  │                          │                                          │
  │              ┌───────────┴───────────┐                             │
  │              │                       │                              │
  │     ┌────────▼────────┐    ┌────────▼────────┐                    │
  │     │  COSINE SEARCH  │    │   BM25 SEARCH   │   ← parallel      │
  │     │  (pgvector)     │    │  (ts_rank_cd)   │     via gather()  │
  │     │                 │    │                  │                    │
  │     │ "data handling  │    │ "GDPR Article 28"│                   │
  │     │  requirements"  │    │  exact match     │                   │
  │     │  → 0.91 cosine  │    │  → 12.4 BM25    │                   │
  │     └────────┬────────┘    └────────┬────────┘                    │
  │              │                       │                              │
  │              └───────────┬───────────┘                             │
  │                          │                                          │
  │              ┌───────────▼───────────┐                             │
  │              │  RECIPROCAL RANK      │                             │
  │              │  FUSION (K=60)        │                             │
  │              │                       │                             │
  │              │  RRF = 0.6/(60+rank)  │                             │
  │              │      + 0.4/(60+rank)  │                             │
  │              └───────────┬───────────┘                             │
  │                          │                                          │
  │              ┌───────────▼───────────┐                             │
  │              │  TOP-K RESULTS        │                             │
  │              │  Sorted by RRF score  │                             │
  │              │  → Agent context      │                             │
  │              └───────────────────────┘                             │
  └─────────────────────────────────────────────────────────────────────┘
        

Precedent Matching in Practice

When a new MSA arrives with an unusual indemnification clause, the RAG pipeline retrieves the 5 most similar indemnification clauses from previously approved contracts. If the average similarity drops below 0.85, the system flags it as a "deviation from standard" and routes it for senior legal review — catching non-standard terms that would otherwise take hours to identify manually.

5 Compliance Validation Engine

The compliance engine uses a hybrid rule-engine + LLM approach. Deterministic rules handle known regulatory requirements (GDPR data processing clauses, SOX financial controls), while the LLM handles nuanced interpretation of novel or ambiguous language.

Why Hybrid, Not Pure LLM?

Python services/compliance/validator.py
import re
import anthropic
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum


class ComplianceFramework(Enum):
    GDPR = "gdpr"
    SOX = "sox"
    HIPAA = "hipaa"
    SOC2 = "soc2"
    CCPA = "ccpa"


@dataclass
class ComplianceRule:
    rule_id: str
    framework: ComplianceFramework
    description: str
    required_patterns: List[str]    # regex patterns that MUST exist
    prohibited_patterns: List[str]  # regex patterns that MUST NOT exist
    severity: str  # "critical" | "high" | "medium" | "low"


@dataclass
class ComplianceResult:
    rule_id: str
    passed: bool
    confidence: float
    source: str  # "rule_engine" | "llm" | "hybrid"
    details: str
    clause_ref: Optional[str] = None


class ComplianceValidator:
    """Hybrid rule-engine + LLM compliance checker."""

    # Deterministic rules for known requirements
    RULES: List[ComplianceRule] = [
        ComplianceRule(
            rule_id="GDPR-001",
            framework=ComplianceFramework.GDPR,
            description="Must include data processing agreement",
            required_patterns=[
                r"data\s+process(ing|or)\s+agreement",
                r"sub-?processor",
            ],
            prohibited_patterns=[],
            severity="critical",
        ),
        ComplianceRule(
            rule_id="GDPR-002",
            framework=ComplianceFramework.GDPR,
            description="Must specify data retention period",
            required_patterns=[
                r"retention\s+period",
                r"delet(e|ion)\s+.{0,30}(data|records)",
            ],
            prohibited_patterns=[
                r"retain\s+(indefinitely|forever|permanently)",
            ],
            severity="critical",
        ),
        ComplianceRule(
            rule_id="SOX-001",
            framework=ComplianceFramework.SOX,
            description="Must include audit rights clause",
            required_patterns=[
                r"audit\s+right",
                r"(inspect|examin)\w*\s+.{0,20}(books|records)",
            ],
            prohibited_patterns=[],
            severity="high",
        ),
    ]

    def __init__(self):
        self.client = anthropic.Anthropic()

    async def validate(
        self,
        contract_text: str,
        frameworks: List[ComplianceFramework],
    ) -> List[ComplianceResult]:
        """Run hybrid compliance validation."""
        results = []

        # Phase 1: Deterministic rule checks
        applicable_rules = [
            r for r in self.RULES
            if r.framework in frameworks
        ]

        for rule in applicable_rules:
            result = self._check_rule(rule, contract_text)
            results.append(result)

        # Phase 2: LLM for nuanced checks rules can't cover
        llm_results = await self._llm_compliance_check(
            contract_text, frameworks
        )
        results.extend(llm_results)

        return results

    def _check_rule(
        self, rule: ComplianceRule, text: str
    ) -> ComplianceResult:
        """Deterministic regex-based rule check."""
        text_lower = text.lower()

        # Check required patterns
        required_found = all(
            re.search(pattern, text_lower)
            for pattern in rule.required_patterns
        )

        # Check prohibited patterns
        prohibited_found = any(
            re.search(pattern, text_lower)
            for pattern in rule.prohibited_patterns
        )

        passed = required_found and not prohibited_found

        return ComplianceResult(
            rule_id=rule.rule_id,
            passed=passed,
            confidence=1.0,  # deterministic = full confidence
            source="rule_engine",
            details=rule.description,
        )

    async def _llm_compliance_check(
        self,
        contract_text: str,
        frameworks: List[ComplianceFramework],
    ) -> List[ComplianceResult]:
        """LLM-based check for nuanced compliance requirements."""

        prompt = f"""As a regulatory compliance expert, analyze this
contract for compliance with: {', '.join(f.value.upper() for f in frameworks)}

Focus on requirements that simple keyword matching would MISS:
1. Implied obligations without explicit language
2. Contradictory clauses that create compliance gaps
3. Missing standard protections for the framework
4. Ambiguous language that could be interpreted non-compliantly

Contract text:
{contract_text[:8000]}

Return JSON array with fields: rule_id, passed, confidence (0-1),
details, clause_ref"""

        response = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            messages=[{"role": "user", "content": prompt}],
        )

        raw = json.loads(response.content[0].text)
        return [
            ComplianceResult(
                rule_id=r["rule_id"],
                passed=r["passed"],
                confidence=r["confidence"],
                source="llm",
                details=r["details"],
                clause_ref=r.get("clause_ref"),
            )
            for r in raw
        ]

6 Multi-Agent Architecture

The system employs five specialized agents coordinated by a state-machine orchestrator. Each agent is an expert in its domain and communicates through a shared context object. The orchestrator manages transitions, retries, and ensures deterministic execution order.

Agent Roles

O

Orchestrator (State Machine)

Routes the contract through the agent pipeline, manages state transitions, handles errors and retries, maintains the shared context.

1

Extraction Agent

Parses raw text into structured clauses, identifies parties, dates, monetary values, and defined terms.

2

Risk Agent

Scores each clause across 6 risk dimensions, identifies high-risk provisions, flags missing standard protections.

3

Compliance Agent

Validates against applicable regulatory frameworks (GDPR, SOX, HIPAA). Combines rule engine with LLM interpretation.

4

Summary Agent

Generates executive summary, key terms table, obligation timeline, and risk-prioritized action items for legal reviewers.

Orchestrator State Machine

Python services/orchestrator/state_machine.py
import asyncio
import logging
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, List
from datetime import datetime

logger = logging.getLogger(__name__)


class PipelineState(Enum):
    PENDING = "pending"
    EXTRACTING = "extracting"
    ASSESSING_RISK = "assessing_risk"
    CHECKING_COMPLIANCE = "checking_compliance"
    SUMMARIZING = "summarizing"
    COMPLETED = "completed"
    FAILED = "failed"
    NEEDS_REVIEW = "needs_review"


@dataclass
class PipelineContext:
    """Shared context passed between agents."""
    contract_id: str
    raw_text: str
    contract_type: str
    state: PipelineState = PipelineState.PENDING
    extracted_clauses: List[Dict] = field(default_factory=list)
    risk_assessment: Optional[Dict] = None
    compliance_results: List[Dict] = field(default_factory=list)
    summary: Optional[str] = None
    errors: List[Dict] = field(default_factory=list)
    started_at: datetime = field(default_factory=datetime.utcnow)
    audit_log: List[Dict] = field(default_factory=list)


class ContractOrchestrator:
    """State machine orchestrator for multi-agent pipeline."""

    # Valid state transitions
    TRANSITIONS = {
        PipelineState.PENDING: [PipelineState.EXTRACTING],
        PipelineState.EXTRACTING: [
            PipelineState.ASSESSING_RISK,
            PipelineState.FAILED,
        ],
        PipelineState.ASSESSING_RISK: [
            PipelineState.CHECKING_COMPLIANCE,
            PipelineState.NEEDS_REVIEW,
            PipelineState.FAILED,
        ],
        PipelineState.CHECKING_COMPLIANCE: [
            PipelineState.SUMMARIZING,
            PipelineState.NEEDS_REVIEW,
            PipelineState.FAILED,
        ],
        PipelineState.SUMMARIZING: [
            PipelineState.COMPLETED,
            PipelineState.FAILED,
        ],
    }

    MAX_RETRIES = 3

    def __init__(self, extraction_agent, risk_agent,
                 compliance_agent, summary_agent):
        self.agents = {
            PipelineState.EXTRACTING: extraction_agent,
            PipelineState.ASSESSING_RISK: risk_agent,
            PipelineState.CHECKING_COMPLIANCE: compliance_agent,
            PipelineState.SUMMARIZING: summary_agent,
        }

    async def run_pipeline(
        self, ctx: PipelineContext
    ) -> PipelineContext:
        """Execute the full agent pipeline with retry logic."""

        pipeline_order = [
            PipelineState.EXTRACTING,
            PipelineState.ASSESSING_RISK,
            PipelineState.CHECKING_COMPLIANCE,
            PipelineState.SUMMARIZING,
        ]

        for target_state in pipeline_order:
            # Validate transition
            allowed = self.TRANSITIONS.get(ctx.state, [])
            if target_state not in allowed:
                logger.error(
                    f"Invalid transition: {ctx.state} -> {target_state}"
                )
                ctx.state = PipelineState.FAILED
                break

            # Execute agent with retries
            success = await self._execute_with_retry(
                ctx, target_state
            )

            if not success:
                ctx.state = PipelineState.FAILED
                break

            # Check if high-risk triggers manual review
            if (target_state == PipelineState.ASSESSING_RISK
                and ctx.risk_assessment
                and ctx.risk_assessment.get("overall_score", 0) > 0.8):
                ctx.state = PipelineState.NEEDS_REVIEW
                self._log_audit(
                    ctx, "HIGH_RISK_ESCALATION",
                    "Risk score > 0.8 — routing to senior review",
                )
                break

        if ctx.state == PipelineState.SUMMARIZING:
            ctx.state = PipelineState.COMPLETED

        return ctx

    async def _execute_with_retry(
        self, ctx: PipelineContext, state: PipelineState
    ) -> bool:
        """Execute an agent with exponential backoff retry."""
        agent = self.agents[state]

        for attempt in range(self.MAX_RETRIES):
            try:
                ctx.state = state
                self._log_audit(
                    ctx, "AGENT_START",
                    f"Starting {state.value} (attempt {attempt + 1})",
                )

                await agent.execute(ctx)

                self._log_audit(
                    ctx, "AGENT_COMPLETE",
                    f"Completed {state.value}",
                )
                return True

            except Exception as e:
                wait = 2 ** attempt
                logger.warning(
                    f"Agent {state.value} failed (attempt {attempt + 1}): {e}"
                )
                ctx.errors.append({
                    "agent": state.value,
                    "attempt": attempt + 1,
                    "error": str(e),
                })
                if attempt < self.MAX_RETRIES - 1:
                    await asyncio.sleep(wait)

        return False

    def _log_audit(
        self, ctx: PipelineContext,
        event: str, details: str
    ):
        ctx.audit_log.append({
            "timestamp": datetime.utcnow().isoformat(),
            "contract_id": ctx.contract_id,
            "event": event,
            "state": ctx.state.value,
            "details": details,
        })

Key Design Principle: Fail-Safe Escalation

When the risk score exceeds 0.8, the orchestrator halts the pipeline and routes to NEEDS_REVIEW state. This ensures no high-risk contract completes the automated pipeline without senior legal oversight. The state machine prevents skipping steps or re-entering completed states.

7 Contract Lifecycle Management

Enterprise contracts are living documents. The system tracks the full lifecycle from draft through execution to expiry, including version control, approval workflows, amendment tracking, and complete audit trails.

Lifecycle States

┌────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ DRAFT │───▶│ IN_REVIEW│───▶│ APPROVED │───▶│ EXECUTED │───▶│ EXPIRED │ └────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ │ │ ▼ │ ▼ │ ┌──────────┐ │ ┌──────────┐ │ │ RENEWED │ │ │ REJECTED │ │ └──────────┘ │ └──────────┘ │ │ ▼ │ ┌──────────┐ └──────────────────────▶│ AMENDED │──▶ (re-enters IN_REVIEW) └──────────┘

Version Control & Amendment Tracking

Approval Workflow Engine

Contract ValueRisk ScoreApproval ChainSLA
< $100K< 0.3 (Low)Legal Analyst24 hours
$100K - $1M0.3 - 0.6 (Medium)Senior Counsel → Legal Director48 hours
$1M - $10M0.6 - 0.8 (High)Legal Director → VP Legal → CFO5 business days
> $10M> 0.8 (Critical)VP Legal → GC → CEO + Board10 business days

Audit Trail Requirements

Every action on a contract is logged immutably: who viewed it, who approved it, what the AI analysis said, what changed between versions, and when. This is non-negotiable for SOX compliance and is the reason we chose PostgreSQL (ACID transactions) over a document store — audit records must never be lost or inconsistent.

8 Real-time Alerts & Notifications

The notification service monitors active contracts and fires alerts for upcoming expirations, obligation deadlines, risk threshold breaches, and compliance failures. It uses a combination of cron-based scanners and Kafka event-driven triggers.

Alert Types

Alert TypeTriggerChannelPriority
Contract Expiry90, 60, 30, 7 days before expiryEmail + Slack + DashboardHigh
Obligation Due14, 7, 3, 1 day(s) before deadlineEmail + Assignee DashboardMedium-High
Risk Threshold BreachRisk score exceeds configured thresholdEmail to Legal Lead + PagerDutyCritical
Compliance FailureRule engine or LLM detects violationEmail + Slack + Compliance DashboardCritical
Amendment FiledCounterparty submits contract amendmentEmail + Dashboard notificationMedium
Approval PendingContract awaiting approver action > SLAEmail + Slack reminderMedium

Event-Driven Alert Architecture

Python services/notifications/alert_service.py
import asyncio
from datetime import datetime, timedelta
from typing import List, Dict
from dataclasses import dataclass
from enum import Enum
import aiohttp


class AlertPriority(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


@dataclass
class Alert:
    alert_type: str
    contract_id: str
    priority: AlertPriority
    message: str
    recipients: List[str]
    metadata: Dict


class ContractAlertService:
    """Monitors contracts and fires alerts via multiple channels."""

    EXPIRY_WINDOWS = [90, 60, 30, 7]  # days before expiry

    def __init__(self, db, kafka_producer, slack_client):
        self.db = db
        self.kafka = kafka_producer
        self.slack = slack_client

    async def scan_expiring_contracts(self):
        """Cron job: scan for contracts approaching expiry."""

        for window in self.EXPIRY_WINDOWS:
            target_date = datetime.utcnow() + timedelta(days=window)

            contracts = await self.db.fetch("""
                SELECT id, title, expiry_date, owner_email,
                       contract_value
                FROM contracts
                WHERE expiry_date::date = $1::date
                  AND status = 'executed'
                  AND auto_renew = FALSE
            """, target_date)

            for contract in contracts:
                alert = Alert(
                    alert_type="CONTRACT_EXPIRY",
                    contract_id=str(contract["id"]),
                    priority=(
                        AlertPriority.CRITICAL if window <= 7
                        else AlertPriority.HIGH
                    ),
                    message=(
                        f"Contract '{contract['title']}' expires "
                        f"in {window} days ({contract['expiry_date']})"
                    ),
                    recipients=[contract["owner_email"]],
                    metadata={
                        "days_until_expiry": window,
                        "contract_value": str(
                            contract["contract_value"]
                        ),
                    },
                )
                await self._dispatch_alert(alert)

    async def _dispatch_alert(self, alert: Alert):
        """Route alert to appropriate channels based on priority."""

        # Always publish to Kafka for audit trail
        await self.kafka.send(
            "contract-alerts",
            key=alert.contract_id,
            value=alert.__dict__,
        )

        # Email for all priorities
        await self._send_email(alert)

        # Slack for high and critical
        if alert.priority in (
            AlertPriority.HIGH, AlertPriority.CRITICAL
        ):
            await self._send_slack(alert)

        # PagerDuty for critical only
        if alert.priority == AlertPriority.CRITICAL:
            await self._trigger_pagerduty(alert)

9 Database Schema

The PostgreSQL schema is designed for referential integrity, audit compliance, and efficient vector search. All tables include created_at/updated_at timestamps with trigger-based auto-update. Soft deletes are used throughout — no contract data is ever physically removed.

SQL migrations/001_contract_intelligence.sql
-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;

-- ═══════════════════════════════════════════════
-- Core: Contracts
-- ═══════════════════════════════════════════════
CREATE TABLE contracts (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    title           VARCHAR(500) NOT NULL,
    contract_type   VARCHAR(50) NOT NULL,  -- NDA, MSA, SOW, etc.
    status          VARCHAR(20) NOT NULL DEFAULT 'draft',
    version         INTEGER NOT NULL DEFAULT 1,
    parent_id       UUID REFERENCES contracts(id),  -- for amendments
    parties         JSONB NOT NULL,  -- [{name, role, contact}]
    effective_date  DATE,
    expiry_date     DATE,
    contract_value  DECIMAL(15,2),
    currency        VARCHAR(3) DEFAULT 'USD',
    jurisdiction    VARCHAR(100),
    governing_law   VARCHAR(100),
    auto_renew      BOOLEAN DEFAULT FALSE,
    renewal_term    INTERVAL,
    raw_text        TEXT,
    file_url        VARCHAR(1000),
    owner_id        UUID NOT NULL,
    owner_email     VARCHAR(255),
    created_at      TIMESTAMPTZ DEFAULT now(),
    updated_at      TIMESTAMPTZ DEFAULT now(),
    deleted_at      TIMESTAMPTZ  -- soft delete
);

CREATE INDEX idx_contracts_status ON contracts(status);
CREATE INDEX idx_contracts_expiry ON contracts(expiry_date)
    WHERE deleted_at IS NULL;
CREATE INDEX idx_contracts_type ON contracts(contract_type);

-- ═══════════════════════════════════════════════
-- Clauses (extracted by LLM)
-- ═══════════════════════════════════════════════
CREATE TABLE clauses (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    contract_id     UUID NOT NULL REFERENCES contracts(id),
    clause_type     VARCHAR(50) NOT NULL,
    section_ref     VARCHAR(20),
    text            TEXT NOT NULL,
    page_number     INTEGER,
    risk_indicators JSONB DEFAULT '[]',
    key_dates       JSONB DEFAULT '[]',
    monetary_values JSONB DEFAULT '[]',
    created_at      TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX idx_clauses_contract ON clauses(contract_id);
CREATE INDEX idx_clauses_type ON clauses(clause_type);

-- ═══════════════════════════════════════════════
-- Clause Embeddings (pgvector)
-- ═══════════════════════════════════════════════
CREATE TABLE clause_embeddings (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    clause_id       UUID NOT NULL REFERENCES clauses(id),
    contract_id     UUID NOT NULL REFERENCES contracts(id),
    clause_type     VARCHAR(50),
    text            TEXT NOT NULL,
    embedding       vector(1536) NOT NULL,
    created_at      TIMESTAMPTZ DEFAULT now()
);

-- HNSW index for fast approximate nearest neighbor
CREATE INDEX idx_clause_embeddings_hnsw
    ON clause_embeddings
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 200);

-- GIN index for BM25 full-text search (hybrid retrieval)
CREATE INDEX idx_clause_text_search
    ON clause_embeddings
    USING gin (to_tsvector('english', text));

-- ═══════════════════════════════════════════════
-- Obligations
-- ═══════════════════════════════════════════════
CREATE TABLE obligations (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    contract_id     UUID NOT NULL REFERENCES contracts(id),
    clause_id       UUID REFERENCES clauses(id),
    description     TEXT NOT NULL,
    obligated_party VARCHAR(200),
    due_date        DATE,
    recurring       BOOLEAN DEFAULT FALSE,
    recurrence_rule VARCHAR(100),  -- RRULE format
    status          VARCHAR(20) DEFAULT 'pending',
    assigned_to     UUID,
    completed_at    TIMESTAMPTZ,
    created_at      TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX idx_obligations_due ON obligations(due_date)
    WHERE status = 'pending';

-- ═══════════════════════════════════════════════
-- Compliance Checks
-- ═══════════════════════════════════════════════
CREATE TABLE compliance_checks (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    contract_id     UUID NOT NULL REFERENCES contracts(id),
    framework       VARCHAR(20) NOT NULL,  -- GDPR, SOX, HIPAA
    rule_id         VARCHAR(50) NOT NULL,
    passed          BOOLEAN NOT NULL,
    confidence      DECIMAL(3,2),
    source          VARCHAR(20),  -- rule_engine | llm | hybrid
    details         TEXT,
    clause_ref      VARCHAR(20),
    reviewed_by     UUID,
    reviewed_at     TIMESTAMPTZ,
    created_at      TIMESTAMPTZ DEFAULT now()
);

-- ═══════════════════════════════════════════════
-- Risk Scores
-- ═══════════════════════════════════════════════
CREATE TABLE risk_scores (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    contract_id     UUID NOT NULL REFERENCES contracts(id),
    overall_score   DECIMAL(4,3) NOT NULL,
    category_scores JSONB NOT NULL,
    high_risk_clauses JSONB,
    missing_protections JSONB,
    recommendations JSONB,
    assessed_by     VARCHAR(20) DEFAULT 'system',
    created_at      TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX idx_risk_scores_contract ON risk_scores(contract_id);
CREATE INDEX idx_risk_scores_high ON risk_scores(overall_score)
    WHERE overall_score > 0.7;

-- ═══════════════════════════════════════════════
-- Audit Logs (immutable, append-only)
-- ═══════════════════════════════════════════════
CREATE TABLE audit_logs (
    id              BIGSERIAL PRIMARY KEY,
    contract_id     UUID REFERENCES contracts(id),
    actor_id        UUID,
    actor_type      VARCHAR(20),  -- user | system | agent
    action          VARCHAR(50) NOT NULL,
    details         JSONB,
    ip_address      INET,
    created_at      TIMESTAMPTZ DEFAULT now()
);

-- Partition audit logs by month for performance
CREATE INDEX idx_audit_contract ON audit_logs(contract_id);
CREATE INDEX idx_audit_created ON audit_logs(created_at);

10 Performance Metrics

Key performance indicators measured in production across an enterprise platform processing contracts for a Fortune 500 tech company.

95%
Clause Extraction Accuracy
80%
Review Time Reduction
99.9%
System Uptime
10K
Contracts Processed / Day
92%
Compliance Detection Rate
<3s
Avg. Analysis Latency
2M+
Clause Embeddings Indexed
45ms
Vector Search P95 Latency

How We Measure Extraction Accuracy

A rotating panel of legal analysts manually reviews a random 5% sample of AI-extracted clauses weekly. Each extraction is scored on: (1) correct clause type classification, (2) complete text extraction (no truncation), (3) accurate obligation identification, and (4) correct date/value extraction. The 95% figure is the weighted average across all four dimensions.

11 Security & Access Control

Contracts contain the most sensitive information in any enterprise — pricing, IP terms, liability caps. The system implements defense in depth with multiple overlapping security layers.

Role-Based Access Control (RBAC)

RoleViewCreateApproveAdminAudit Access
ViewerOwn dept onlyNoNoNoNo
AnalystOwn deptYesNoNoNo
Senior CounselAllYesUp to $1MNoYes
Legal DirectorAllYesUp to $10MNoYes
VP Legal / GCAllYesUnlimitedYesYes
System AdminMetadata onlyNoNoYesYes

Security Layers

PII Stripping Before LLM Calls: Before sending any contract text to Claude or GPT, the system runs a PII detection pass that replaces names, addresses, SSNs, and account numbers with placeholder tokens (e.g., [PARTY_A], [ADDRESS_1]). The mapping is stored locally and re-applied to the LLM response. This ensures no PII leaves the enterprise boundary.

12 Design Decisions & Trade-offs

DecisionChosenAlternativeRationale
Vector Database pgvector (PostgreSQL) Pinecone, Weaviate Same transactional boundary as contract data; hybrid SQL+vector queries; no additional infrastructure
LLM Provider Claude (primary) + GPT-4 (fallback) Single provider Claude excels at structured extraction; GPT-4 fallback ensures availability; provider diversity reduces single-point-of-failure risk
Agent Orchestration Custom state machine LangGraph, CrewAI Full control over state transitions, retry logic, and audit logging; frameworks add abstraction without proportional value at this complexity level
Compliance Engine Hybrid (Rules + LLM) Pure LLM / Pure Rules Rules guarantee coverage of known requirements (auditable); LLM catches nuanced/novel violations rules miss; combined detection rate 92% vs. 67% rules-only
Message Queue Kafka RabbitMQ, SQS Event replay for audit; partitioned by contract_id for ordering guarantees; proven at enterprise scale
Chunking Strategy Structure-aware (clause boundaries) Fixed-window (500 tokens) Legal clauses must be complete units for accurate analysis; splitting mid-clause destroys legal context and reduces extraction accuracy by 23%
Embedding Model text-embedding-3-small (1536d) text-embedding-3-large (3072d) 3072d showed only 2% retrieval improvement on legal text but doubled storage and index build time; 1536d is the better cost/performance trade-off
Contract Storage PostgreSQL + S3 for files MongoDB ACID transactions are non-negotiable for audit trails; relational schema enforces data integrity across contracts, clauses, and compliance records

13 Interview Cheat Sheet

Staff-level system design interview preparation — 15+ questions with battle-tested answers drawing from real enterprise experience building AI-powered document processing systems.

Architecture & System Design

Q: How would you design an AI-powered contract analysis system from scratch?

"I'd break it into four layers:

(1) Ingestion Layer: Format-aware parsers (PDF, DOCX, OCR for scans) that produce clean text. Structure-aware chunking by clause boundaries — not arbitrary windows — because legal clauses are atomic units of meaning.
(2) Intelligence Layer: Multi-agent pipeline with specialized agents for extraction, risk scoring, and compliance checking. State machine orchestrator manages the flow with retry logic and audit logging.
(3) RAG Layer: Vector embeddings (pgvector) of all clause chunks enable precedent matching — comparing new clauses against the entire corpus to detect deviations from standard language.
(4) Application Layer: Lifecycle management, approval workflows, real-time alerts for expiring contracts and obligation deadlines.

The key architectural insight is the hybrid compliance engine: deterministic rules handle known regulatory requirements (auditable, version-controlled), while LLMs catch nuanced violations that pattern matching misses."

Q: Why a multi-agent architecture instead of a single LLM call?

"Three reasons:

(1) Separation of concerns: Each agent has a focused prompt and domain expertise. The extraction agent doesn't need to think about compliance, and the compliance agent doesn't need to extract clauses. This makes each agent's task simpler and more reliable.
(2) Independent scaling and failure isolation: If the compliance agent fails, extraction results are preserved. Each agent can be retried independently without re-running the entire pipeline.
(3) Auditability: In legal tech, you need to explain every decision. With separate agents, you have a clear audit trail: 'The extraction agent identified clause 4.2 as an indemnification clause, the risk agent scored it 0.73, and the compliance agent flagged it for missing a liability cap.' A single monolithic LLM call makes this forensic analysis impossible."

Q: How do you handle LLM hallucinations in a legal context?

"This is the number one concern in legal AI. Our approach:

(1) Grounded generation: Every LLM response must reference specific clause text from the document. We use RAG to provide the exact contract text in the prompt — the LLM extracts and interprets, it doesn't generate from scratch.
(2) Structured output: We force JSON schema responses with Pydantic validation. If the LLM returns a clause_type that doesn't match our enum, it's rejected and retried.
(3) Cross-validation: The compliance engine uses both deterministic rules AND LLM checks. If they disagree, the contract is flagged for human review.
(4) Human-in-the-loop: No contract is auto-approved. The AI provides analysis and recommendations; a human legal reviewer makes the final decision. The system reduces review time by 80%, but it never removes the human entirely."

Q: How do you scale the system to handle 10,000+ contracts per day?

"Three scaling strategies:

(1) Async processing with Kafka: Contracts enter a Kafka topic and are consumed by worker pools. Each contract is independent, so we can scale consumers horizontally. Kafka's partitioning by contract_id ensures ordering per contract.
(2) LLM call batching and caching: Redis caches LLM responses for identical clause patterns (same text = same extraction). For similar contracts (e.g., standard NDAs from the same template), we use the cached extraction as a baseline and only re-analyze clauses that differ.
(3) Tiered processing: Contracts are classified by complexity at ingestion. Standard templates (60% of volume) go through a fast path with cached models. Complex or non-standard contracts get full multi-agent analysis. This lets us handle 10K/day with reasonable LLM costs."

RAG & Vector Search

Q: Why pgvector instead of a purpose-built vector database like Pinecone?

"At our scale (2M embeddings), pgvector with HNSW indexing performs comparably to dedicated vector DBs. But the real reason is operational:

(1) Transactional consistency: When we insert a new contract, its clauses and embeddings are in the same transaction. With Pinecone, you'd need distributed transactions or eventual consistency — neither is acceptable for legal data.
(2) Hybrid queries: Our most common query is 'find similar indemnification clauses in MSA contracts from California jurisdiction.' That's vector similarity + two SQL filters. In pgvector, it's one query. With Pinecone, you'd need metadata filtering with limited SQL expressiveness.
(3) Operational simplicity: One database to back up, monitor, and secure. Legal teams already have PostgreSQL compliance certifications — adding a new data store would require a new security audit."

Q: How does structure-aware chunking differ from naive chunking, and why does it matter?

"Naive chunking splits text every N tokens regardless of content. Structure-aware chunking splits at clause boundaries.

Consider this: 'The Vendor shall indemnify the Company against all claims arising from...' If you split at 500 tokens, you might cut this sentence in half. Now the embedding represents half a thought, and when you search for 'indemnification clauses,' this truncated chunk scores poorly.

Our structure-aware chunker uses regex patterns to detect section headers (e.g., '3.2.1 Indemnification') and splits at those boundaries. Small chunks (under 64 tokens) are merged with their neighbors. The result: each chunk is a complete legal provision. This improved our retrieval precision by 23% compared to fixed-window chunking."

Compliance & Security

Q: How do you ensure the system itself is compliant (SOX, GDPR)?

"Four pillars:

(1) Immutable audit trail: Every action (view, edit, approve, AI analysis) is logged in an append-only table. No DELETE permissions on audit_logs — not even for DBAs.
(2) PII isolation: Before any contract text reaches an LLM API, a PII detector strips names, addresses, and account numbers. Placeholders are used during analysis and re-mapped on return.
(3) Data residency: EU contracts stay in EU database instances. Region-aware routing ensures GDPR data sovereignty.
(4) Access controls: RBAC with principle of least privilege. Legal analysts see their department's contracts; cross-department access requires Director-level approval."

Q: What happens when the LLM provider is down?

"We designed for this explicitly:

(1) Multi-provider fallback: Claude is primary, GPT-4 is secondary. If Claude returns 5xx errors for 3 consecutive requests, the circuit breaker switches to GPT-4 automatically.
(2) Graceful degradation: The rule-engine portion of compliance checking works without any LLM. During an outage, users get rule-based results (67% detection rate) with a banner indicating 'enhanced AI analysis temporarily unavailable.'
(3) Queue buffering: Kafka absorbs incoming contracts during outages. When the LLM comes back, the backlog is processed automatically. No contracts are lost."

Data & Performance

Q: How do you measure and improve extraction accuracy over time?

"We built a continuous improvement loop:

(1) Human review sampling: 5% of AI extractions are randomly selected for human review weekly. Reviewers score each extraction on clause type accuracy, text completeness, obligation identification, and date/value extraction.
(2) Feedback loop: When a human corrects an AI extraction, the correction is stored as a training example. We periodically use these examples to refine our extraction prompts via few-shot learning.
(3) A/B testing: When we update an extraction prompt, we run both old and new versions in parallel for a week. If the new version scores higher on human review, we promote it.
(4) Drift detection: If extraction accuracy drops below 90% for any clause type in a given week, an alert fires and the team investigates — usually it's a new contract template we haven't seen before."

Q: How do you handle contract templates vs. bespoke contracts?

"Template detection is our biggest performance optimization. About 60% of incoming contracts match one of ~50 standard templates (company NDA, standard MSA, etc.).

At ingestion, we compute a document fingerprint (MinHash of clause embeddings). If it matches a known template with >95% similarity, we use cached extraction results as a baseline and only run the LLM on clauses that differ. This reduces LLM costs by 60% and processing time from 45 seconds to 8 seconds for template contracts."

Q: Walk me through a contract's journey from upload to approval.

"End-to-end in production:

T+0s: Counterparty emails a signed PDF. The intake system uploads it to S3 and publishes a CONTRACT_RECEIVED event to Kafka.
T+2s: Ingestion worker picks it up. PyMuPDF extracts text (or Tesseract for scans). Structure-aware chunker produces 40-80 clause chunks. Embeddings generated and stored in pgvector.
T+15s: Orchestrator starts the agent pipeline: Extraction Agent identifies 12 key clauses, parties, and defined terms.
T+25s: Risk Agent scores overall risk at 0.45 (medium). Flags one indemnification clause as non-standard (similarity to approved clauses is 0.72).
T+35s: Compliance Agent runs GDPR + SOX checks. 14 of 15 rules pass; one GDPR data retention clause needs clarification.
T+40s: Summary Agent generates executive brief with key terms, risk highlights, and recommended actions.
T+40s: Dashboard notification sent to assigned legal analyst. They see the AI summary, risk highlights, and can click through to any flagged clause.
T+hours: Analyst reviews, negotiates the data retention clause with counterparty, and approves. Contract moves to EXECUTED."

Q: How would you explain this system to a non-technical executive?

"Imagine you hire four specialist paralegals who never sleep:

The Reader takes any contract — PDF, scanned document, Word file — and breaks it into individual clauses, like a paralegal highlighting sections with colored tabs.
The Analyst reads each clause and checks it against every contract your company has ever signed. If something looks unusual — say, an indemnification clause that's different from your standard — they flag it for your lawyers.
The Compliance Officer checks every contract against your regulatory requirements — GDPR, SOX, whatever applies — and highlights anything that might put you at risk.
The Organizer writes an executive summary, sets up reminders for expiring contracts and upcoming deadlines, and makes sure nothing falls through the cracks.

The result: your legal team spends 80% less time on routine contract review and can focus on negotiation and strategy — the work that actually requires human judgment."

Q: What's the most challenging technical problem you solved?

"Handling scanned, low-quality PDFs at scale. About 15% of incoming contracts are scanned images — some from fax machines with poor resolution.

The challenge: Tesseract OCR accuracy drops to 70% on low-quality scans, and errors propagate through the entire pipeline — garbage in, garbage out.

Our solution: a three-stage approach. (1) Pre-processing with OpenCV to enhance contrast, deskew, and denoise. (2) Multi-engine OCR — run both Tesseract and a cloud OCR service, take the higher-confidence result. (3) LLM-based error correction — feed the OCR output to Claude with the instruction 'fix obvious OCR errors in this legal text while preserving exact legal terminology.' This brought our effective accuracy on scanned documents from 70% to 93%."

Q: How do you handle multi-language contracts?

"Enterprise platform processes contracts in 12 languages. Our approach:

(1) Language detection at ingestion using fastText — determines which OCR model and LLM prompts to use.
(2) Multilingual embeddings — we use OpenAI's embedding model which handles multilingual text natively. A German indemnification clause and its English equivalent have high cosine similarity.
(3) Localized compliance rules — the rule engine maintains separate rule sets per jurisdiction (EU GDPR rules vs. California CCPA rules vs. Brazil LGPD rules).
(4) Translation fallback — for languages where extraction accuracy drops below 85%, we first translate to English, run the extraction pipeline, then map results back to the original text positions."