Enterprise Deal Lifecycle & Campaign Management Platform

A large-scale enterprise platform managing the full lifecycle of advertising deals and campaigns at a Fortune 500 tech company. The system handles deal creation, campaign validation, multi-level approval workflows, real-time engagement scoring, and CDC-based data synchronization — processing 100K+ records daily with 99.95% uptime across distributed microservices.

Python Kafka Redis PostgreSQL asyncio Docker Kubernetes Grafana

Table of Contents

  1. Architecture Overview
  2. Campaign Validation Engine
  3. Kafka CDC Pipeline
  4. Approval Workflow Engine
  5. Engagement Scoring System
  6. Caching Strategy
  7. Reliability & Observability
  8. Database Schema
  9. Performance Metrics
  10. Design Decisions & Trade-offs
  11. Scaling Strategy
  12. Interview Cheat Sheet

1 Architecture Overview

The platform is built as a distributed microservices architecture serving an enterprise ad tech platform at scale. Each service owns its domain boundary, communicates via Kafka events, and maintains its own data store. The system processes the complete deal lifecycle: from initial deal creation through campaign validation, multi-level approvals, and real-time engagement tracking.

+==============================================================================+ | ENTERPRISE DEAL LIFECYCLE PLATFORM | +==============================================================================+ API Gateway (Rate Limiting, Auth, Routing) +------------------------------------------------------------------------+ | /api/v1/deals /api/v1/campaigns /api/v1/approvals /api/v1/score | +--------+------------------+-----------------+------------------+-------+ | | | | +--------v-------+ +------v--------+ +-----v--------+ +-----v--------+ | Deal Service | | Campaign | | Approval | | Engagement | | | | Service | | Engine | | Scoring | | - CRUD ops | | | | | | Service | | - Lifecycle | | - Validation | | - State | | | | - Versioning | | - asyncio | | Machine | | - ML feats | | - Audit log | | - Parallel | | - Multi-lvl | | - Real-time | +-------+--------+ | checks | | - SLA track | | - Signals | | +------+--------+ +-----+--------+ +------+-------+ | | | | +-------v-------------------v-----------------v-------------------v-------+ | Kafka Event Bus | | deal.created | campaign.validated | approval.completed | score.updated | | deal.updated | campaign.rejected | approval.escalated | score.alert | +-------+-------------------+------------------+--------------------+----+ | | | | +-------v-------+ +------v--------+ +------v-------+ +-------v------+ | CDC Pipeline | | DLQ Handler | | Notification| | Analytics | | (Debezium) | | (Retry + | | Service | | Pipeline | | 100K rec/day | | Circuit | | (Email/Slack| | (Metrics + | | | | Breaker) | | /Webhook) | | Dashboards)| +-------+-------+ +--------------+ +--------------+ +--------------+ | +-------v---------------------------+ | PostgreSQL Redis Cache | | - deals - validation | | - campaigns results | | - approvals - engagement | | - audit_log scores | | - engagement_metrics- session | | cache | +-----------------------------------+

Core Services

1

Deal Service

Manages the full deal lifecycle — creation, amendments, versioning, and audit logging. Emits CDC events for every state transition.

2

Campaign Validation Engine

Parallel async validation of campaigns against business rules, budget constraints, and targeting criteria. Achieves 94% latency reduction via asyncio + Redis caching.

3

Kafka CDC Pipeline

Change Data Capture processing 100K records/day with Debezium connectors, dead letter queues, circuit breakers, and exponential backoff retry logic.

4

Approval Workflow Engine

Multi-level approval state machine with SLA tracking, auto-escalation, and parallel approval paths. Reduced cycle time by 31.6%.

5

Engagement Scoring Service

Real-time scoring engine combining ML features, historical signals, and behavioral data to rank and prioritize deals by engagement likelihood.

2 Campaign Validation Engine

The validation engine is the performance-critical path of the platform. Every campaign must pass 12+ validation rules before submission. The original synchronous implementation took ~8 seconds per campaign. By redesigning with asyncio parallelism and Redis caching, we achieved a 94% latency reduction (8s → 450ms).

Key Design Principles

Python services/campaign/validation_engine.py
import asyncio
import hashlib
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional, Dict
import redis.asyncio as aioredis


class ValidationSeverity(Enum):
    CRITICAL = "critical"     # Blocks submission
    WARNING = "warning"       # Requires acknowledgment
    ADVISORY = "advisory"     # Informational only


@dataclass
class ValidationResult:
    rule_name: str
    passed: bool
    severity: ValidationSeverity
    message: str = ""
    latency_ms: float = 0.0
    cached: bool = False


@dataclass
class CampaignValidationResponse:
    campaign_id: str
    is_valid: bool
    results: List[ValidationResult] = field(default_factory=list)
    total_latency_ms: float = 0.0
    cache_hit_rate: float = 0.0


class CampaignValidationEngine:
    """
    High-performance campaign validation using asyncio parallelism.
    Reduces validation latency from 8s to 450ms (94% reduction).
    """

    def __init__(self, redis_client: aioredis.Redis):
        self.redis = redis_client
        self.cache_ttl = 300  # 5 min TTL for validation cache
        self.validators = [
            self._validate_budget_constraints,
            self._validate_targeting_criteria,
            self._validate_creative_specs,
            self._validate_compliance_rules,
            self._validate_scheduling_conflicts,
            self._validate_audience_overlap,
            self._validate_frequency_caps,
            self._validate_geo_restrictions,
            self._validate_bid_strategy,
            self._validate_pacing_rules,
            self._validate_attribution_windows,
            self._validate_brand_safety,
        ]

    async def validate_campaign(self, campaign: Dict) -> CampaignValidationResponse:
        """Run all validators in parallel using asyncio.gather."""
        start = time.monotonic()

        # Phase 1: Critical validators (fail-fast)
        critical_tasks = [
            self._run_with_cache(v, campaign)
            for v in self.validators
            if v.__name__ in (
                "_validate_budget_constraints",
                "_validate_compliance_rules",
            )
        ]
        critical_results = await asyncio.gather(*critical_tasks)

        # Short-circuit if critical validation fails
        if any(
            not r.passed and r.severity == ValidationSeverity.CRITICAL
            for r in critical_results
        ):
            return CampaignValidationResponse(
                campaign_id=campaign["id"],
                is_valid=False,
                results=list(critical_results),
                total_latency_ms=(time.monotonic() - start) * 1000,
            )

        # Phase 2: All remaining validators in parallel
        remaining_tasks = [
            self._run_with_cache(v, campaign)
            for v in self.validators
            if v.__name__ not in (
                "_validate_budget_constraints",
                "_validate_compliance_rules",
            )
        ]
        remaining_results = await asyncio.gather(*remaining_tasks)

        all_results = list(critical_results) + list(remaining_results)
        cached_count = sum(1 for r in all_results if r.cached)
        elapsed = (time.monotonic() - start) * 1000

        return CampaignValidationResponse(
            campaign_id=campaign["id"],
            is_valid=all(
                r.passed for r in all_results
                if r.severity == ValidationSeverity.CRITICAL
            ),
            results=all_results,
            total_latency_ms=elapsed,
            cache_hit_rate=cached_count / len(all_results) if all_results else 0,
        )

    async def _run_with_cache(self, validator, campaign: Dict) -> ValidationResult:
        """Execute validator with Redis cache lookup."""
        cache_key = self._cache_key(validator.__name__, campaign)

        # Try cache first
        cached = await self.redis.get(cache_key)
        if cached:
            result = ValidationResult(**json.loads(cached))
            result.cached = True
            return result

        # Execute validator
        start = time.monotonic()
        result = await validator(campaign)
        result.latency_ms = (time.monotonic() - start) * 1000

        # Cache the result
        await self.redis.setex(
            cache_key,
            self.cache_ttl,
            json.dumps({
                "rule_name": result.rule_name,
                "passed": result.passed,
                "severity": result.severity.value,
                "message": result.message,
                "latency_ms": result.latency_ms,
            })
        )
        return result

    def _cache_key(self, rule: str, campaign: Dict) -> str:
        """Generate deterministic cache key from rule + campaign hash."""
        payload = json.dumps(
            {k: campaign[k] for k in sorted(campaign.keys())},
            sort_keys=True, default=str,
        )
        digest = hashlib.sha256(payload.encode()).hexdigest()[:16]
        return f"validation:{rule}:{digest}"

    # --- Individual Validators ---

    async def _validate_budget_constraints(self, campaign: Dict) -> ValidationResult:
        """Validate budget against deal limits and remaining allocation."""
        total_budget = campaign.get("budget", 0)
        deal_limit = campaign.get("deal_budget_limit", float("inf"))
        allocated = campaign.get("allocated_spend", 0)

        remaining = deal_limit - allocated
        passed = total_budget <= remaining

        return ValidationResult(
            rule_name="budget_constraints",
            passed=passed,
            severity=ValidationSeverity.CRITICAL,
            message=f"Budget ${total_budget:,.2f} vs remaining ${remaining:,.2f}"
                    if not passed else "Budget within limits",
        )

    async def _validate_targeting_criteria(self, campaign: Dict) -> ValidationResult:
        """Validate audience targeting parameters."""
        targeting = campaign.get("targeting", {})
        has_audience = bool(targeting.get("audience_segments"))
        has_geo = bool(targeting.get("geo_targets"))
        return ValidationResult(
            rule_name="targeting_criteria",
            passed=has_audience and has_geo,
            severity=ValidationSeverity.CRITICAL,
            message="Targeting criteria validated" if (has_audience and has_geo)
                    else "Missing required targeting: audience or geo",
        )

    # ... additional validators follow the same pattern

Performance Impact

Sequential validation: 12 validators x ~650ms avg = ~8 seconds. With asyncio.gather parallelism + Redis caching: ~450ms (94% reduction). Cache hit rate stabilizes at 85% during peak hours due to repeated validation of similar campaign configurations.

3 Kafka CDC Pipeline

The platform uses Change Data Capture (CDC) via Debezium to propagate deal and campaign state changes across services. The pipeline processes 100K+ records per day with exactly-once semantics, dead letter queues for poison messages, and circuit breakers to prevent cascading failures.

CDC Architecture

Python services/pipeline/kafka_cdc_consumer.py
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Dict, Optional
from confluent_kafka import Consumer, Producer, KafkaError

logger = logging.getLogger(__name__)


class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Rejecting requests
    HALF_OPEN = "half_open" # Testing recovery


@dataclass
class CircuitBreaker:
    """Circuit breaker to prevent cascading failures in CDC pipeline."""
    failure_threshold: int = 5
    recovery_timeout: float = 30.0
    half_open_max_calls: int = 3
    state: CircuitState = CircuitState.CLOSED
    failure_count: int = 0
    last_failure_time: float = 0.0
    half_open_calls: int = 0

    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            if time.monotonic() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
                return True
            return False
        # HALF_OPEN: allow limited calls
        return self.half_open_calls < self.half_open_max_calls

    def record_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max_calls:
                self.state = CircuitState.CLOSED
                self.failure_count = 0

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.monotonic()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.warning(f"Circuit OPEN after {self.failure_count} failures")


class CDCPipelineConsumer:
    """
    Kafka CDC consumer with DLQ, circuit breakers, and exponential backoff.
    Processes 100K+ records/day from Debezium CDC connectors.
    """

    def __init__(self, config: Dict):
        self.consumer = Consumer({
            "bootstrap.servers": config["kafka_brokers"],
            "group.id": config["consumer_group"],
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,  # Manual commit for exactly-once
            "max.poll.interval.ms": 300000,
        })
        self.dlq_producer = Producer({
            "bootstrap.servers": config["kafka_brokers"],
        })
        self.circuit_breaker = CircuitBreaker()
        self.max_retries = 3
        self.base_backoff = 1.0  # seconds
        self.handlers: Dict[str, Callable] = {}
        self.metrics = {"processed": 0, "failed": 0, "dlq": 0}

    def register_handler(self, event_type: str, handler: Callable):
        """Register a handler for a specific CDC event type."""
        self.handlers[event_type] = handler

    async def consume_loop(self, topics: list):
        """Main consumption loop with error handling."""
        self.consumer.subscribe(topics)
        logger.info(f"Subscribed to topics: {topics}")

        while True:
            msg = self.consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                logger.error(f"Kafka error: {msg.error()}")
                continue

            await self._process_message(msg)

    async def _process_message(self, msg):
        """Process a single CDC message with retry and DLQ logic."""
        if not self.circuit_breaker.can_execute():
            logger.warning("Circuit breaker OPEN — skipping message")
            await self._send_to_dlq(msg, "circuit_breaker_open")
            return

        for attempt in range(self.max_retries):
            try:
                payload = json.loads(msg.value().decode("utf-8"))
                event_type = payload.get("op", "unknown")  # c=create, u=update, d=delete
                table = payload.get("source", {}).get("table", "unknown")
                handler_key = f"{table}.{event_type}"

                handler = self.handlers.get(handler_key)
                if handler:
                    await handler(payload)

                # Commit offset on success
                self.consumer.commit(msg)
                self.circuit_breaker.record_success()
                self.metrics["processed"] += 1
                return

            except Exception as e:
                self.circuit_breaker.record_failure()
                backoff = self.base_backoff * (2 ** attempt)
                logger.warning(
                    f"Retry {attempt + 1}/{self.max_retries} "
                    f"in {backoff:.1f}s: {e}"
                )
                await asyncio.sleep(backoff)

        # All retries exhausted — send to DLQ
        await self._send_to_dlq(msg, "max_retries_exhausted")
        self.metrics["dlq"] += 1
        self.metrics["failed"] += 1

    async def _send_to_dlq(self, msg, reason: str):
        """Route failed message to Dead Letter Queue with metadata."""
        dlq_payload = {
            "original_topic": msg.topic(),
            "original_partition": msg.partition(),
            "original_offset": msg.offset(),
            "failure_reason": reason,
            "timestamp": time.time(),
            "payload": msg.value().decode("utf-8"),
        }
        self.dlq_producer.produce(
            topic=f"{msg.topic()}.dlq",
            value=json.dumps(dlq_payload).encode("utf-8"),
        )
        self.dlq_producer.flush()
        logger.error(f"Message sent to DLQ: {reason}")

Why Debezium CDC over Application-Level Events?

Consistency: CDC captures changes at the database level, guaranteeing no events are lost even if the application crashes mid-transaction. Application-level events require dual-write coordination (write to DB + publish event), which introduces inconsistency windows. Decoupling: Services don't need to know about downstream consumers — the DB is the source of truth.

4 Approval Workflow Engine

Deals above configurable thresholds require multi-level approvals from sales managers, finance, and legal. The engine implements a finite state machine with SLA tracking, auto-escalation, and parallel approval paths. This design reduced approval cycle times by 31.6%.

State Machine Design

DEAL APPROVAL STATE MACHINE ============================ +-----------+ submit +-------------+ | DRAFT +---------------->| PENDING | +-----------+ +------+------+ ^ | | reject | auto-route | v | +------+------+ +-----+ | LEVEL_1 | (Sales Manager) | | REVIEW | | +------+------+ | | | approve | reject | +--------+--------+ | v | | +------+------+ | | | LEVEL_2 | | | | REVIEW | (Finance) | +------+------+ | | | | | approve | reject | | +------+------+ | | v | | | +-----+------+ | | | | LEVEL_3 | | | | | REVIEW | (Legal) | | +-----+------+ | | | | | | | approve| | | | v v v | +-----+------+ +--+----------+-+ | | APPROVED | | REJECTED | | +------------+ +-------+-------+ | | +-------------------------+ revise & resubmit ESCALATION: If SLA breached at any level -> auto-escalate to next approver PARALLEL: Finance + Legal can review simultaneously for deals > $500K
Python services/approval/workflow_engine.py
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional
from uuid import uuid4

logger = logging.getLogger(__name__)


class ApprovalState(Enum):
    DRAFT = "draft"
    PENDING = "pending"
    LEVEL_1_REVIEW = "level_1_review"
    LEVEL_2_REVIEW = "level_2_review"
    LEVEL_3_REVIEW = "level_3_review"
    APPROVED = "approved"
    REJECTED = "rejected"
    ESCALATED = "escalated"


class ApprovalAction(Enum):
    SUBMIT = "submit"
    APPROVE = "approve"
    REJECT = "reject"
    ESCALATE = "escalate"
    REVISE = "revise"


@dataclass
class ApprovalLevel:
    level: int
    approver_role: str
    sla_hours: int
    required: bool = True
    parallel_with: Optional[int] = None  # Level to run in parallel with


@dataclass
class ApprovalRecord:
    id: str = field(default_factory=lambda: str(uuid4()))
    deal_id: str = ""
    level: int = 0
    approver_id: str = ""
    action: Optional[ApprovalAction] = None
    comments: str = ""
    created_at: datetime = field(default_factory=datetime.utcnow)
    decided_at: Optional[datetime] = None


class ApprovalWorkflowEngine:
    """
    Multi-level approval state machine with SLA tracking.
    Reduces approval cycle time by 31.6% through:
    - Parallel approval paths for high-value deals
    - Auto-escalation on SLA breach
    - Smart routing based on deal attributes
    """

    TRANSITIONS = {
        (ApprovalState.DRAFT, ApprovalAction.SUBMIT): ApprovalState.PENDING,
        (ApprovalState.PENDING, ApprovalAction.APPROVE): ApprovalState.LEVEL_1_REVIEW,
        (ApprovalState.LEVEL_1_REVIEW, ApprovalAction.APPROVE): ApprovalState.LEVEL_2_REVIEW,
        (ApprovalState.LEVEL_2_REVIEW, ApprovalAction.APPROVE): ApprovalState.LEVEL_3_REVIEW,
        (ApprovalState.LEVEL_3_REVIEW, ApprovalAction.APPROVE): ApprovalState.APPROVED,
        (ApprovalState.LEVEL_1_REVIEW, ApprovalAction.REJECT): ApprovalState.REJECTED,
        (ApprovalState.LEVEL_2_REVIEW, ApprovalAction.REJECT): ApprovalState.REJECTED,
        (ApprovalState.LEVEL_3_REVIEW, ApprovalAction.REJECT): ApprovalState.REJECTED,
        (ApprovalState.REJECTED, ApprovalAction.REVISE): ApprovalState.DRAFT,
    }

    def __init__(self, db, notification_service, kafka_producer):
        self.db = db
        self.notifier = notification_service
        self.kafka = kafka_producer
        self.approval_levels = [
            ApprovalLevel(level=1, approver_role="sales_manager", sla_hours=24),
            ApprovalLevel(level=2, approver_role="finance", sla_hours=48),
            ApprovalLevel(level=3, approver_role="legal", sla_hours=72, parallel_with=2),
        ]

    async def transition(
        self, deal_id: str, action: ApprovalAction,
        actor_id: str, comments: str = ""
    ) -> ApprovalState:
        """Execute a state transition with validation and side effects."""
        deal = await self.db.get_deal(deal_id)
        current_state = ApprovalState(deal["approval_state"])

        # Validate transition
        next_state = self.TRANSITIONS.get((current_state, action))
        if next_state is None:
            raise ValueError(
                f"Invalid transition: {current_state.value} + {action.value}"
            )

        # Check authorization
        await self._authorize_action(deal, actor_id, action, current_state)

        # Determine if parallel approval applies
        deal_value = deal.get("total_value", 0)
        if deal_value > 500_000 and current_state == ApprovalState.LEVEL_1_REVIEW:
            # Trigger parallel Finance + Legal review
            await self._trigger_parallel_approval(deal_id, [2, 3])

        # Persist state change
        await self.db.update_deal_state(deal_id, next_state.value)

        # Record approval decision
        record = ApprovalRecord(
            deal_id=deal_id,
            level=self._get_current_level(current_state),
            approver_id=actor_id,
            action=action,
            comments=comments,
            decided_at=datetime.utcnow(),
        )
        await self.db.insert_approval_record(record)

        # Emit event for downstream services
        await self.kafka.produce(
            topic="deal.approval.events",
            value={
                "deal_id": deal_id,
                "action": action.value,
                "from_state": current_state.value,
                "to_state": next_state.value,
                "actor_id": actor_id,
            },
        )

        # Notify next approver
        await self._notify_next_approver(deal_id, next_state)

        logger.info(
            f"Deal {deal_id}: {current_state.value} -> {next_state.value} "
            f"by {actor_id}"
        )
        return next_state

    async def check_sla_breaches(self):
        """Periodic job to detect and escalate SLA breaches."""
        pending_approvals = await self.db.get_pending_approvals()

        for approval in pending_approvals:
            level_config = self.approval_levels[approval["level"] - 1]
            deadline = approval["created_at"] + timedelta(
                hours=level_config.sla_hours
            )

            if datetime.utcnow() > deadline:
                logger.warning(
                    f"SLA breach: deal {approval['deal_id']} at level "
                    f"{approval['level']} — escalating"
                )
                await self._escalate(
                    approval["deal_id"], approval["level"]
                )

    async def _escalate(self, deal_id: str, current_level: int):
        """Escalate to backup approver or skip-level manager."""
        backup = await self.db.get_backup_approver(current_level)
        await self.notifier.send_escalation(
            deal_id=deal_id,
            approver_id=backup["id"],
            reason="SLA breach — auto-escalated",
        )

    async def _trigger_parallel_approval(self, deal_id: str, levels: List[int]):
        """Launch parallel approval tracks for high-value deals."""
        tasks = []
        for level in levels:
            config = self.approval_levels[level - 1]
            approver = await self.db.get_approver_for_role(config.approver_role)
            tasks.append(
                self.notifier.request_approval(deal_id, approver["id"], level)
            )
        await asyncio.gather(*tasks)
        logger.info(f"Parallel approval triggered for deal {deal_id}: levels {levels}")

31.6% Cycle Time Reduction — How?

Before: Sequential L1 → L2 → L3 approval, average 9.5 business days. After: Parallel Finance + Legal for deals above $500K, auto-escalation on SLA breach (24h/48h/72h limits), smart routing that skips unnecessary levels for standard deals. Average cycle dropped to 6.5 business days.

5 Engagement Scoring System

The engagement scoring service provides real-time deal prioritization based on behavioral signals, historical patterns, and ML-derived features. Scores are computed incrementally as new events arrive and cached in Redis for sub-millisecond reads by the deal management UI.

Scoring Signals

Python services/scoring/engagement_scorer.py
import math
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import redis.asyncio as aioredis


@dataclass
class EngagementFeatures:
    """ML feature vector for engagement scoring."""
    deal_id: str = ""
    # Recency features
    days_since_last_interaction: float = 0.0
    days_since_last_campaign_update: float = 0.0
    hours_since_last_approval_action: float = 0.0
    # Frequency features
    interactions_7d: int = 0
    interactions_30d: int = 0
    campaign_edits_30d: int = 0
    # Monetary features
    deal_value: float = 0.0
    budget_utilization: float = 0.0  # 0.0 to 1.0
    historical_spend: float = 0.0
    # Behavioral features
    targeting_change_count: int = 0
    approval_velocity: float = 0.0  # avg hours per approval level
    # Account health
    past_completion_rate: float = 0.0
    support_tickets_90d: int = 0


@dataclass
class EngagementScore:
    deal_id: str
    score: float           # 0.0 to 100.0
    tier: str              # "hot", "warm", "cold", "at_risk"
    components: Dict[str, float] = field(default_factory=dict)
    computed_at: datetime = field(default_factory=datetime.utcnow)


class EngagementScoringEngine:
    """
    Real-time engagement scoring with weighted feature combination.
    Scores are computed incrementally and cached in Redis.
    """

    # Feature weights (tuned via historical deal outcome analysis)
    WEIGHTS = {
        "recency": 0.30,
        "frequency": 0.20,
        "monetary": 0.25,
        "behavioral": 0.15,
        "account_health": 0.10,
    }

    TIER_THRESHOLDS = {
        "hot": 75.0,
        "warm": 50.0,
        "cold": 25.0,
        "at_risk": 0.0,
    }

    def __init__(self, redis_client: aioredis.Redis, db):
        self.redis = redis_client
        self.db = db
        self.cache_ttl = 600  # 10 min cache for scores

    async def compute_score(self, deal_id: str) -> EngagementScore:
        """Compute engagement score from feature vector."""
        features = await self._extract_features(deal_id)

        # Compute component scores (each 0-100)
        components = {
            "recency": self._score_recency(features),
            "frequency": self._score_frequency(features),
            "monetary": self._score_monetary(features),
            "behavioral": self._score_behavioral(features),
            "account_health": self._score_account_health(features),
        }

        # Weighted combination
        final_score = sum(
            components[k] * self.WEIGHTS[k] for k in components
        )

        # Determine tier
        tier = "at_risk"
        for t, threshold in self.TIER_THRESHOLDS.items():
            if final_score >= threshold:
                tier = t
                break

        result = EngagementScore(
            deal_id=deal_id,
            score=round(final_score, 2),
            tier=tier,
            components=components,
        )

        # Cache in Redis
        await self.redis.setex(
            f"engagement:{deal_id}",
            self.cache_ttl,
            result.__dict__.__str__(),
        )
        return result

    def _score_recency(self, f: EngagementFeatures) -> float:
        """Exponential decay: recent activity scores higher."""
        decay_rate = 0.1
        recency_raw = math.exp(-decay_rate * f.days_since_last_interaction)
        campaign_recency = math.exp(-decay_rate * f.days_since_last_campaign_update)
        approval_recency = math.exp(-0.05 * f.hours_since_last_approval_action)
        return (recency_raw * 0.4 + campaign_recency * 0.35 + approval_recency * 0.25) * 100

    def _score_frequency(self, f: EngagementFeatures) -> float:
        """Log-scaled frequency to avoid domination by power users."""
        freq_7d = min(math.log1p(f.interactions_7d) / math.log1p(20), 1.0)
        freq_30d = min(math.log1p(f.interactions_30d) / math.log1p(60), 1.0)
        edits = min(math.log1p(f.campaign_edits_30d) / math.log1p(15), 1.0)
        return (freq_7d * 0.5 + freq_30d * 0.3 + edits * 0.2) * 100

    def _score_monetary(self, f: EngagementFeatures) -> float:
        """Normalize deal value with sigmoid for graceful saturation."""
        value_norm = 1 / (1 + math.exp(-0.00001 * (f.deal_value - 100_000)))
        utilization = f.budget_utilization
        spend_norm = min(math.log1p(f.historical_spend) / math.log1p(1_000_000), 1.0)
        return (value_norm * 0.4 + utilization * 0.35 + spend_norm * 0.25) * 100

    def _score_behavioral(self, f: EngagementFeatures) -> float:
        """Behavioral signals indicate active deal management."""
        targeting_score = min(f.targeting_change_count / 10, 1.0)
        velocity_score = max(0, 1 - f.approval_velocity / 168)  # 168h = 1 week
        return (targeting_score * 0.5 + velocity_score * 0.5) * 100

    def _score_account_health(self, f: EngagementFeatures) -> float:
        """Account reliability based on historical performance."""
        completion = f.past_completion_rate * 100
        tickets_penalty = min(f.support_tickets_90d * 5, 50)
        return max(0, completion - tickets_penalty)

    async def _extract_features(self, deal_id: str) -> EngagementFeatures:
        """Extract feature vector from database and event store."""
        deal = await self.db.get_deal_with_history(deal_id)
        events = await self.db.get_deal_events(deal_id, days=90)
        account = await self.db.get_account_health(deal["account_id"])

        now = datetime.utcnow()
        return EngagementFeatures(
            deal_id=deal_id,
            days_since_last_interaction=(now - deal["last_interaction"]).days,
            days_since_last_campaign_update=(now - deal["last_campaign_update"]).days,
            hours_since_last_approval_action=(now - deal["last_approval_action"]).total_seconds() / 3600,
            interactions_7d=len([e for e in events if (now - e["ts"]).days <= 7]),
            interactions_30d=len([e for e in events if (now - e["ts"]).days <= 30]),
            campaign_edits_30d=deal.get("campaign_edits_30d", 0),
            deal_value=deal["total_value"],
            budget_utilization=deal.get("budget_utilization", 0.0),
            historical_spend=account.get("total_historical_spend", 0),
            targeting_change_count=deal.get("targeting_changes", 0),
            approval_velocity=deal.get("avg_approval_hours", 48.0),
            past_completion_rate=account.get("deal_completion_rate", 0.8),
            support_tickets_90d=account.get("support_tickets_90d", 0),
        )

6 Caching Strategy

The platform achieves an 85% cache hit rate through a multi-layer caching strategy using Redis. The cache sits between the API layer and the database, dramatically reducing read latency and PostgreSQL load during peak deal management hours.

Cache Layers

Python services/cache/cache_manager.py
import json
import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Dict, Optional
import redis.asyncio as aioredis

logger = logging.getLogger(__name__)


class CacheLayer(Enum):
    VALIDATION = "validation"     # TTL: 300s
    ENTITY = "entity"             # TTL: 900s
    ENGAGEMENT = "engagement"     # TTL: 600s
    CONFIG = "config"             # TTL: 3600s


class CacheInvalidationStrategy(Enum):
    WRITE_THROUGH = "write_through"   # Update cache on write
    WRITE_BEHIND = "write_behind"     # Async cache update
    INVALIDATE = "invalidate"         # Delete on write, lazy reload


@dataclass
class CacheConfig:
    layer: CacheLayer
    ttl: int
    strategy: CacheInvalidationStrategy
    prefix: str


class MultiLayerCacheManager:
    """
    Multi-layer Redis cache with 85% hit rate.
    Supports write-through, write-behind, and invalidation strategies.
    """

    LAYER_CONFIGS = {
        CacheLayer.VALIDATION: CacheConfig(
            layer=CacheLayer.VALIDATION, ttl=300,
            strategy=CacheInvalidationStrategy.INVALIDATE,
            prefix="val",
        ),
        CacheLayer.ENTITY: CacheConfig(
            layer=CacheLayer.ENTITY, ttl=900,
            strategy=CacheInvalidationStrategy.WRITE_THROUGH,
            prefix="ent",
        ),
        CacheLayer.ENGAGEMENT: CacheConfig(
            layer=CacheLayer.ENGAGEMENT, ttl=600,
            strategy=CacheInvalidationStrategy.WRITE_BEHIND,
            prefix="eng",
        ),
        CacheLayer.CONFIG: CacheConfig(
            layer=CacheLayer.CONFIG, ttl=3600,
            strategy=CacheInvalidationStrategy.INVALIDATE,
            prefix="cfg",
        ),
    }

    def __init__(self, redis_client: aioredis.Redis):
        self.redis = redis_client
        self.stats = {"hits": 0, "misses": 0, "invalidations": 0}

    async def get(
        self, layer: CacheLayer, key: str
    ) -> Optional[Any]:
        """Retrieve from cache with stats tracking."""
        config = self.LAYER_CONFIGS[layer]
        cache_key = f"{config.prefix}:{key}"

        cached = await self.redis.get(cache_key)
        if cached:
            self.stats["hits"] += 1
            return json.loads(cached)

        self.stats["misses"] += 1
        return None

    async def set(
        self, layer: CacheLayer, key: str, value: Any
    ):
        """Store in cache with layer-specific TTL."""
        config = self.LAYER_CONFIGS[layer]
        cache_key = f"{config.prefix}:{key}"
        await self.redis.setex(
            cache_key, config.ttl, json.dumps(value, default=str)
        )

    async def invalidate(
        self, layer: CacheLayer, key: str
    ):
        """Remove specific key from cache."""
        config = self.LAYER_CONFIGS[layer]
        cache_key = f"{config.prefix}:{key}"
        await self.redis.delete(cache_key)
        self.stats["invalidations"] += 1

    async def invalidate_pattern(
        self, layer: CacheLayer, pattern: str
    ):
        """Invalidate all keys matching a pattern (e.g., deal update)."""
        config = self.LAYER_CONFIGS[layer]
        full_pattern = f"{config.prefix}:{pattern}*"
        cursor = 0
        while True:
            cursor, keys = await self.redis.scan(
                cursor=cursor, match=full_pattern, count=100
            )
            if keys:
                await self.redis.delete(*keys)
                self.stats["invalidations"] += len(keys)
            if cursor == 0:
                break

    async def get_or_compute(
        self, layer: CacheLayer, key: str,
        compute_fn: Callable, *args
    ) -> Any:
        """Cache-aside pattern: get from cache or compute and store."""
        cached = await self.get(layer, key)
        if cached is not None:
            return cached

        value = await compute_fn(*args)
        await self.set(layer, key, value)
        return value

    async def write_through(
        self, layer: CacheLayer, key: str,
        value: Any, db_write_fn: Callable, *args
    ):
        """Write-through: update DB and cache atomically."""
        await db_write_fn(*args)
        await self.set(layer, key, value)

    def hit_rate(self) -> float:
        """Current cache hit rate percentage."""
        total = self.stats["hits"] + self.stats["misses"]
        return (self.stats["hits"] / total * 100) if total > 0 else 0.0

Cache Invalidation Pitfall: When a deal is updated, we must invalidate not just the entity cache but also the validation cache (campaign rules may reference deal limits) and the engagement cache (scores depend on deal state). We use a cascade invalidation pattern: deal update triggers invalidation across all three layers via a Kafka event handler.

7 Reliability & Observability

The platform maintains 99.95% uptime through a defense-in-depth approach: circuit breakers at service boundaries, comprehensive health checks, structured logging, and real-time Grafana dashboards. Every service exposes Prometheus metrics for alerting and capacity planning.

Observability Stack

Python services/common/reliability.py
import asyncio
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Dict, List, Optional
from prometheus_client import Counter, Histogram, Gauge

logger = logging.getLogger(__name__)

# --- Prometheus Metrics ---
REQUEST_LATENCY = Histogram(
    "deal_platform_request_latency_seconds",
    "Request latency in seconds",
    ["service", "endpoint", "method"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
REQUEST_COUNT = Counter(
    "deal_platform_requests_total",
    "Total requests",
    ["service", "endpoint", "status"],
)
CIRCUIT_STATE = Gauge(
    "deal_platform_circuit_state",
    "Circuit breaker state (0=closed, 1=open, 2=half_open)",
    ["service", "dependency"],
)
CACHE_HIT_RATE = Gauge(
    "deal_platform_cache_hit_rate",
    "Cache hit rate percentage",
    ["layer"],
)
CDC_LAG = Gauge(
    "deal_platform_cdc_consumer_lag",
    "CDC consumer lag in messages",
    ["topic", "partition"],
)
DLQ_SIZE = Gauge(
    "deal_platform_dlq_size",
    "Dead letter queue message count",
    ["topic"],
)


class HealthStatus(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"


@dataclass
class HealthCheck:
    name: str
    status: HealthStatus
    latency_ms: float = 0.0
    message: str = ""


class ServiceHealthChecker:
    """Comprehensive health check for all service dependencies."""

    def __init__(self, db, redis_client, kafka_consumer):
        self.db = db
        self.redis = redis_client
        self.kafka = kafka_consumer

    async def check_all(self) -> Dict:
        """Run all health checks in parallel."""
        checks = await asyncio.gather(
            self._check_database(),
            self._check_redis(),
            self._check_kafka(),
            return_exceptions=True,
        )

        results = []
        for check in checks:
            if isinstance(check, Exception):
                results.append(HealthCheck(
                    name="unknown",
                    status=HealthStatus.UNHEALTHY,
                    message=str(check),
                ))
            else:
                results.append(check)

        overall = HealthStatus.HEALTHY
        if any(c.status == HealthStatus.UNHEALTHY for c in results):
            overall = HealthStatus.UNHEALTHY
        elif any(c.status == HealthStatus.DEGRADED for c in results):
            overall = HealthStatus.DEGRADED

        return {
            "status": overall.value,
            "checks": [
                {"name": c.name, "status": c.status.value,
                 "latency_ms": c.latency_ms, "message": c.message}
                for c in results
            ],
        }

    async def _check_database(self) -> HealthCheck:
        start = time.monotonic()
        try:
            await self.db.execute("SELECT 1")
            latency = (time.monotonic() - start) * 1000
            status = HealthStatus.HEALTHY if latency < 100 else HealthStatus.DEGRADED
            return HealthCheck(name="postgresql", status=status, latency_ms=latency)
        except Exception as e:
            return HealthCheck(
                name="postgresql", status=HealthStatus.UNHEALTHY,
                latency_ms=(time.monotonic() - start) * 1000, message=str(e),
            )

    async def _check_redis(self) -> HealthCheck:
        start = time.monotonic()
        try:
            await self.redis.ping()
            latency = (time.monotonic() - start) * 1000
            status = HealthStatus.HEALTHY if latency < 10 else HealthStatus.DEGRADED
            return HealthCheck(name="redis", status=status, latency_ms=latency)
        except Exception as e:
            return HealthCheck(
                name="redis", status=HealthStatus.UNHEALTHY,
                latency_ms=(time.monotonic() - start) * 1000, message=str(e),
            )

    async def _check_kafka(self) -> HealthCheck:
        start = time.monotonic()
        try:
            metadata = self.kafka.list_topics(timeout=5)
            latency = (time.monotonic() - start) * 1000
            topic_count = len(metadata.topics)
            status = HealthStatus.HEALTHY if topic_count > 0 else HealthStatus.DEGRADED
            return HealthCheck(
                name="kafka", status=status, latency_ms=latency,
                message=f"{topic_count} topics available",
            )
        except Exception as e:
            return HealthCheck(
                name="kafka", status=HealthStatus.UNHEALTHY,
                latency_ms=(time.monotonic() - start) * 1000, message=str(e),
            )


class RetryWithBackoff:
    """Decorator for retrying async functions with exponential backoff."""

    def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
                 max_delay: float = 60.0, exceptions: tuple = (Exception,)):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exceptions = exceptions

    def __call__(self, func: Callable):
        async def wrapper(*args, **kwargs):
            for attempt in range(self.max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except self.exceptions as e:
                    if attempt == self.max_retries:
                        logger.error(f"{func.__name__} failed after {self.max_retries} retries: {e}")
                        raise
                    delay = min(self.base_delay * (2 ** attempt), self.max_delay)
                    logger.warning(
                        f"{func.__name__} attempt {attempt + 1} failed, "
                        f"retrying in {delay:.1f}s: {e}"
                    )
                    await asyncio.sleep(delay)
        return wrapper

Grafana Dashboard Layout

Row 1 — Golden Signals: Request rate, error rate (4xx/5xx split), p50/p95/p99 latency, saturation (CPU/memory).
Row 2 — Pipeline Health: CDC consumer lag, DLQ depth, messages processed/min, circuit breaker state.
Row 3 — Business Metrics: Deals in each approval state, avg approval cycle time, validation pass rate, cache hit rate.
Row 4 — Infrastructure: PostgreSQL connections, Redis memory usage, Kafka partition balance, pod restart count.

8 Database Schema

The PostgreSQL schema is designed around domain-driven bounded contexts. Each service owns its tables, and cross-service data flows through Kafka events rather than shared database access. The schema supports full audit logging for compliance requirements.

SQL schema/deal_platform.sql
-- ============================================
-- DEALS: Core deal entity with lifecycle state
-- ============================================
CREATE TABLE deals (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    account_id      UUID NOT NULL REFERENCES accounts(id),
    deal_name       VARCHAR(255) NOT NULL,
    deal_type       VARCHAR(50) NOT NULL,  -- 'standard', 'programmatic', 'preferred'
    total_value     DECIMAL(15,2) NOT NULL,
    currency        CHAR(3) DEFAULT 'USD',
    status          VARCHAR(30) NOT NULL DEFAULT 'draft',
    approval_state  VARCHAR(30) NOT NULL DEFAULT 'draft',
    owner_id        UUID NOT NULL,
    start_date      DATE NOT NULL,
    end_date        DATE NOT NULL,
    version         INTEGER NOT NULL DEFAULT 1,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    CONSTRAINT chk_dates CHECK (end_date > start_date),
    CONSTRAINT chk_value CHECK (total_value > 0)
);

CREATE INDEX idx_deals_account ON deals(account_id);
CREATE INDEX idx_deals_status ON deals(status);
CREATE INDEX idx_deals_approval ON deals(approval_state);
CREATE INDEX idx_deals_owner ON deals(owner_id);
CREATE INDEX idx_deals_dates ON deals(start_date, end_date);

-- ============================================
-- CAMPAIGNS: Ad campaigns within a deal
-- ============================================
CREATE TABLE campaigns (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    deal_id         UUID NOT NULL REFERENCES deals(id) ON DELETE CASCADE,
    campaign_name   VARCHAR(255) NOT NULL,
    campaign_type   VARCHAR(50) NOT NULL,  -- 'display', 'video', 'native', 'sponsored'
    budget          DECIMAL(15,2) NOT NULL,
    allocated_spend DECIMAL(15,2) DEFAULT 0,
    status          VARCHAR(30) NOT NULL DEFAULT 'draft',
    targeting       JSONB NOT NULL DEFAULT '{}',  -- audience, geo, device targeting
    scheduling      JSONB NOT NULL DEFAULT '{}',  -- flight dates, pacing
    creative_specs  JSONB NOT NULL DEFAULT '{}',  -- ad formats, sizes
    validation_status VARCHAR(30) DEFAULT 'pending',
    last_validated_at TIMESTAMPTZ,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_campaigns_deal ON campaigns(deal_id);
CREATE INDEX idx_campaigns_status ON campaigns(status);
CREATE INDEX idx_campaigns_targeting ON campaigns USING GIN(targeting);

-- ============================================
-- APPROVALS: Multi-level approval tracking
-- ============================================
CREATE TABLE approvals (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    deal_id         UUID NOT NULL REFERENCES deals(id),
    level           INTEGER NOT NULL,
    approver_id     UUID NOT NULL,
    approver_role   VARCHAR(50) NOT NULL,
    action          VARCHAR(20),  -- 'approve', 'reject', 'escalate'
    comments        TEXT,
    sla_deadline    TIMESTAMPTZ NOT NULL,
    decided_at      TIMESTAMPTZ,
    is_sla_breached BOOLEAN DEFAULT FALSE,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_approvals_deal ON approvals(deal_id);
CREATE INDEX idx_approvals_pending ON approvals(deal_id) WHERE action IS NULL;
CREATE INDEX idx_approvals_sla ON approvals(sla_deadline) WHERE action IS NULL;

-- ============================================
-- ENGAGEMENT METRICS: Real-time scoring data
-- ============================================
CREATE TABLE engagement_metrics (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    deal_id         UUID NOT NULL REFERENCES deals(id),
    score           DECIMAL(5,2) NOT NULL,
    tier            VARCHAR(20) NOT NULL,  -- 'hot', 'warm', 'cold', 'at_risk'
    components      JSONB NOT NULL DEFAULT '{}',
    feature_vector  JSONB NOT NULL DEFAULT '{}',
    computed_at     TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_engagement_deal ON engagement_metrics(deal_id);
CREATE INDEX idx_engagement_tier ON engagement_metrics(tier);
CREATE INDEX idx_engagement_score ON engagement_metrics(score DESC);

-- ============================================
-- AUDIT LOG: Immutable event log for compliance
-- ============================================
CREATE TABLE audit_log (
    id              BIGSERIAL PRIMARY KEY,
    entity_type     VARCHAR(50) NOT NULL,  -- 'deal', 'campaign', 'approval'
    entity_id       UUID NOT NULL,
    action          VARCHAR(50) NOT NULL,  -- 'created', 'updated', 'approved'
    actor_id        UUID NOT NULL,
    changes         JSONB,                -- diff of old vs new values
    metadata        JSONB DEFAULT '{}',   -- IP, user agent, correlation ID
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_audit_entity ON audit_log(entity_type, entity_id);
CREATE INDEX idx_audit_actor ON audit_log(actor_id);
CREATE INDEX idx_audit_time ON audit_log(created_at DESC);

Schema Design Decisions

JSONB for targeting/scheduling: Campaign targeting rules vary widely by campaign type. JSONB allows flexible schemas with GIN indexes for efficient querying.
Partial indexes on approvals: The WHERE action IS NULL index dramatically speeds up the SLA breach checker, which only queries pending approvals.
Immutable audit log: No UPDATE or DELETE operations allowed on audit_log. Append-only design ensures compliance auditability.

9 Performance Metrics

Key performance indicators measured in production across a 90-day rolling window. These metrics are tracked on Grafana dashboards and trigger PagerDuty alerts when thresholds are breached.

94%
Latency Reduction
(8s → 450ms)
85%
Cache Hit Rate
(Redis Multi-Layer)
60%
Async Processing
Reduction
99.95%
Platform Uptime
(SLA Target: 99.9%)
100K+
CDC Records/Day
(Kafka Pipeline)
31.6%
Cycle Time Reduction
(Approval Workflow)

Detailed Latency Breakdown

OperationP50P95P99Notes
Campaign Validation (cached)12ms45ms120ms85% of requests
Campaign Validation (cold)380ms620ms1.2sParallel async validators
Deal Read (cache hit)2ms8ms15msRedis entity cache
Deal Read (DB)18ms45ms85msPostgreSQL with indexes
Engagement Score5ms22ms50msPre-computed, cached
Approval Transition35ms85ms150msDB write + Kafka publish
CDC Event Processing15ms60ms200msPer-message processing

Before vs. After Comparison

Validation Latency: 8,000ms → 450ms (94% reduction via asyncio parallelism + Redis caching).
Approval Cycle: 9.5 days → 6.5 days (31.6% reduction via parallel approvals + auto-escalation).
Data Sync Lag: 15 min (batch jobs) → <2 sec (Kafka CDC real-time streaming).
Cache Hit Rate: 0% (no caching) → 85% (multi-layer Redis with intelligent invalidation).

10 Design Decisions & Trade-offs

DecisionChosenAlternativeRationale
Event streaming Kafka CDC (Debezium) Application-level events Eliminates dual-write problem; DB is single source of truth; no event loss on app crash
Async validation Python asyncio Thread pool / multiprocessing I/O-bound validators (API calls, DB lookups) benefit from coroutine switching; lower memory than threads
Cache strategy Multi-layer Redis with TTLs Single cache layer Different data types have different staleness tolerance; per-layer TTLs optimize freshness vs. hit rate
Approval engine State machine with Kafka events Workflow engine (Temporal/Camunda) Simpler operational footprint; approval logic fits FSM pattern well; no need for long-running workflow orchestration
Database PostgreSQL + JSONB MongoDB / DynamoDB ACID transactions for financial data; JSONB gives document flexibility; mature ecosystem for ad tech reporting
Error handling DLQ + circuit breakers Retry-only / manual intervention DLQ prevents poison messages from blocking pipeline; circuit breakers prevent cascading failure to downstream services
Scoring computation Incremental + cache Batch nightly computation Real-time scores enable immediate deal prioritization; 10-min cache TTL balances freshness vs. compute cost
Schema design Service-owned tables Shared database Enables independent schema evolution; services communicate via events; avoids tight coupling
Deployment Kubernetes + Docker Serverless (Lambda) Long-running Kafka consumers need persistent processes; K8s provides pod auto-scaling and self-healing

Key Trade-off: Choosing CDC over application events means a dependency on database WAL configuration and Debezium connector health. If the connector goes down, events stop flowing. Mitigation: dedicated health monitoring for Debezium with automated connector restart and Slack alerting on consumer lag spikes.

11 Scaling Strategy

The platform is designed to scale from current load (100K records/day) to 10x growth without architectural changes. Each layer has independent scaling knobs.

Horizontal Scaling

Database Scaling

Sharding Strategy (Future)

SHARDING STRATEGY (When single-DB becomes bottleneck) ===================================================== Shard Key: account_id (consistent hashing) +-----------+ +-----------+ +-----------+ +-----------+ | Shard 0 | | Shard 1 | | Shard 2 | | Shard 3 | | Accts | | Accts | | Accts | | Accts | | 0-24% | | 25-49% | | 50-74% | | 75-100% | | | | | | | | | | deals | | deals | | deals | | deals | | campaigns | | campaigns | | campaigns | | campaigns | | approvals | | approvals | | approvals | | approvals | +-----------+ +-----------+ +-----------+ +-----------+ | | | | +----------------+----------------+----------------+ | +------+------+ | Routing | | Layer | | (account_id | | -> shard) | +-------------+ Cross-shard queries (e.g., "all deals this quarter"): -> Scatter-gather to all shards, merge results -> Cached in Redis with 60s TTL for dashboard queries Global tables (not sharded): - accounts, users, config — replicated to all shards

Redis Scaling

Capacity Planning

Current: 100K CDC events/day, 50K API requests/day, 2TB PostgreSQL, 7.5GB Redis.
Projected 10x: 1M CDC events/day, 500K API requests/day. Requires: 4 Kafka consumer pods (from 2), 8 API pods (from 4), read replicas, and Redis cluster expansion to 12 nodes. Estimated cost increase: ~3x (not 10x) due to caching efficiency.

12 Interview Cheat Sheet

Prepared answers for system design interview questions about this platform. Each answer is structured as: Context → Action → Result.

Q1: How did you achieve 94% latency reduction in campaign validation?

Context: 12 sequential validators each making DB/API calls, total ~8s. Action: Redesigned with asyncio.gather for parallel execution + Redis caching for stable validation results. Added fail-fast semantics — critical validators (budget, compliance) short-circuit before running remaining checks. Result: P50 dropped to 450ms. Cache hit rate of 85% means most validations serve from Redis without running validators at all.

Q2: Why Kafka CDC instead of application-level events?

Context: Needed reliable event propagation for deal state changes across 5+ downstream services. Action: Chose Debezium CDC capturing from PostgreSQL WAL. Result: Eliminated dual-write problem (write DB + publish event). Zero event loss guarantee since changes are captured at the DB level. Trade-off: dependency on Debezium connector health, mitigated by health monitoring and auto-restart.

Q3: How does the approval workflow handle SLA breaches?

Context: Multi-level approvals (Sales Manager → Finance → Legal) with different SLA windows (24h/48h/72h). Action: Periodic SLA checker job queries pending approvals with partial index. On breach: auto-escalate to backup approver + notify skip-level manager. For high-value deals ($500K+): Finance and Legal review in parallel instead of sequential. Result: 31.6% reduction in approval cycle time (9.5 → 6.5 business days).

Q4: How do you handle poison messages in the Kafka pipeline?

Context: Malformed or unprocessable CDC events could block the consumer. Action: Three-tier defense: (1) Retry with exponential backoff (1s, 2s, 4s), (2) After 3 retries, route to Dead Letter Queue with full metadata, (3) Circuit breaker opens after 5 consecutive failures, halting consumption until recovery. DLQ messages are processed manually or by a separate remediation service. Result: Zero pipeline blockages in 90 days. DLQ rate < 0.01% of total messages.

Q5: How does the engagement scoring system work?

Context: Sales teams needed real-time deal prioritization based on engagement likelihood. Action: Built a weighted scoring engine with 5 feature categories (recency, frequency, monetary, behavioral, account health). Scores computed incrementally on deal events and cached in Redis (10-min TTL). Tiers: hot (75+), warm (50-74), cold (25-49), at_risk (<25). Result: Sales teams prioritize hot deals first, improving deal conversion rates and reducing time-to-close.

Q6: How do you ensure data consistency across microservices?

Context: Deal updates must propagate to Campaign, Approval, and Engagement services reliably. Action: Each service owns its data. CDC provides eventually consistent propagation. For critical flows (deal creation + initial approval), we use the transactional outbox pattern — write to DB and outbox table in the same transaction. Debezium captures from the outbox. Result: Strong consistency within service boundaries, eventual consistency across services with guaranteed delivery.

Q7: How does cache invalidation work across services?

Context: Deal update must invalidate entity cache, validation cache (campaign rules reference deal), and engagement cache. Action: Cascade invalidation via Kafka events. When the CDC pipeline processes a deal update, it publishes a cache.invalidate event. Each service's cache handler listens and invalidates relevant keys using pattern matching. Result: 85% cache hit rate with sub-second staleness window. No stale data incidents in production.

Q8: What monitoring and alerting do you have?

Context: Platform serves enterprise ad deals — downtime directly impacts revenue. Action: Prometheus metrics exported from every service. Grafana dashboards for golden signals (rate, errors, latency, saturation) + business metrics (deal states, approval SLAs, cache rates). PagerDuty integration with severity routing: P1 (error rate >5% or uptime drop) pages on-call, P2 (CDC lag >10K) goes to Slack, P3 (cache hit rate drop) goes to email. Result: Mean time to detection < 2 minutes. 99.95% uptime over 12 months.

Q9: How would you handle a 10x traffic increase?

Context: Current: 100K CDC events/day, 50K API requests/day. Action: Phase 1: Scale Kubernetes pods via HPA (2x API pods, 2x consumers). Phase 2: Add PostgreSQL read replicas for reporting queries. Phase 3: Increase Kafka partitions from 12 to 64 per topic. Phase 4 (if needed): Shard PostgreSQL by account_id using consistent hashing. Result: Estimated 10x capacity with ~3x cost increase due to caching absorbing repeated reads.

Q10: Why state machine for approvals instead of a workflow engine like Temporal?

Context: Evaluated Temporal, Camunda, and custom state machine. Action: Chose FSM because: (1) Approval logic maps cleanly to states and transitions — no need for long-running workflow orchestration. (2) Lower operational overhead — no separate workflow cluster to manage. (3) State stored in PostgreSQL with the deal entity, avoiding consistency issues between workflow state and deal state. Trade-off: If approval logic grows complex (conditional branching, timer-based auto-actions), we may migrate to Temporal. Current complexity does not warrant it.

Q11: How do you test the CDC pipeline?

Context: CDC pipelines are hard to unit test because they depend on Debezium + Kafka + PostgreSQL. Action: Three testing layers: (1) Unit tests: mock Kafka consumer, test handler logic in isolation. (2) Integration tests: Testcontainers with real PostgreSQL + Kafka + Debezium, verify end-to-end CDC flow. (3) Canary testing: deploy to staging, write test deals, verify CDC events appear within SLA. Result: 95% code coverage on handlers. Integration tests catch schema drift before it hits production.

Q12: What happens if Redis goes down?

Context: Redis serves validation cache, entity cache, and engagement scores. Action: Cache is a performance optimization, not a correctness requirement. All services fall back to database reads on cache miss. Circuit breaker on Redis client: if Redis is unreachable, stop attempting cache reads/writes for 30s (avoid timeout pile-up). Redis Cluster with 3 replicas for HA — single node failure is automatic failover. Result: During a Redis node failure: latency increases from ~12ms to ~45ms (entity reads), but zero errors. Service degrades gracefully.

Q13: How do you handle schema evolution in the CDC pipeline?

Context: Database schema changes (adding columns, changing types) affect CDC event format. Action: Avro schema registry with backward compatibility enforcement. New columns are added as optional fields with defaults. Consumers use a schema-aware deserializer that handles both old and new formats. Breaking changes (rare) go through a 3-phase migration: add new field → backfill → deprecate old field. Result: Zero downtime schema migrations. 15+ schema changes deployed without pipeline disruption.

Q14: How do you prevent race conditions in the approval workflow?

Context: Two approvers at the same level could submit decisions simultaneously. Action: Optimistic locking with version column on the deals table. Approval transition reads current version, performs state change, and updates with WHERE version = expected_version. If version mismatch, the transaction fails and the second approver gets a "decision already made" response. Additionally, the state machine rejects invalid transitions (e.g., approving an already-approved deal). Result: Zero race condition incidents. Clean conflict resolution with user-friendly error messages.

Q15: What was the hardest production incident you debugged on this platform?

Context: CDC consumer lag spiked to 50K messages during a batch deal import (normally <100). Action: Root cause: a single large account created 5K deals in 10 minutes. All deals had the same account_id, routed to the same Kafka partition, creating a hot partition. The single consumer on that partition couldn't keep up. Fix: Added a composite partition key (account_id + deal_id hash) to distribute load across partitions while maintaining per-deal ordering. Also added backpressure metrics — alert when any single partition lag exceeds 5K. Result: Hot partition events eliminated. Consumer lag stays <500 even during bulk operations.

Q16: How do you handle idempotency in event-driven processing?

Context: Kafka at-least-once delivery means duplicate events are possible. Action: Every CDC event includes a unique event_id (combination of LSN + timestamp). Consumers maintain an idempotency table keyed by event_id. Before processing, check if event_id exists; if so, skip. The idempotency table uses TTL-based cleanup (events older than 7 days are purged). Result: Exactly-once processing semantics despite at-least-once delivery. Deduplication overhead < 1ms per message (Redis SET NX).