A large-scale enterprise platform managing the full lifecycle of advertising deals and campaigns at a Fortune 500 tech company. The system handles deal creation, campaign validation, multi-level approval workflows, real-time engagement scoring, and CDC-based data synchronization — processing 100K+ records daily with 99.95% uptime across distributed microservices.
The platform is built as a distributed microservices architecture serving an enterprise ad tech platform at scale. Each service owns its domain boundary, communicates via Kafka events, and maintains its own data store. The system processes the complete deal lifecycle: from initial deal creation through campaign validation, multi-level approvals, and real-time engagement tracking.
Manages the full deal lifecycle — creation, amendments, versioning, and audit logging. Emits CDC events for every state transition.
Parallel async validation of campaigns against business rules, budget constraints, and targeting criteria. Achieves 94% latency reduction via asyncio + Redis caching.
Change Data Capture processing 100K records/day with Debezium connectors, dead letter queues, circuit breakers, and exponential backoff retry logic.
Multi-level approval state machine with SLA tracking, auto-escalation, and parallel approval paths. Reduced cycle time by 31.6%.
Real-time scoring engine combining ML features, historical signals, and behavioral data to rank and prioritize deals by engagement likelihood.
The validation engine is the performance-critical path of the platform. Every campaign must pass 12+ validation rules before submission. The original synchronous implementation took ~8 seconds per campaign. By redesigning with asyncio parallelism and Redis caching, we achieved a 94% latency reduction (8s → 450ms).
import asyncio
import hashlib
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional, Dict
import redis.asyncio as aioredis
class ValidationSeverity(Enum):
CRITICAL = "critical" # Blocks submission
WARNING = "warning" # Requires acknowledgment
ADVISORY = "advisory" # Informational only
@dataclass
class ValidationResult:
rule_name: str
passed: bool
severity: ValidationSeverity
message: str = ""
latency_ms: float = 0.0
cached: bool = False
@dataclass
class CampaignValidationResponse:
campaign_id: str
is_valid: bool
results: List[ValidationResult] = field(default_factory=list)
total_latency_ms: float = 0.0
cache_hit_rate: float = 0.0
class CampaignValidationEngine:
"""
High-performance campaign validation using asyncio parallelism.
Reduces validation latency from 8s to 450ms (94% reduction).
"""
def __init__(self, redis_client: aioredis.Redis):
self.redis = redis_client
self.cache_ttl = 300 # 5 min TTL for validation cache
self.validators = [
self._validate_budget_constraints,
self._validate_targeting_criteria,
self._validate_creative_specs,
self._validate_compliance_rules,
self._validate_scheduling_conflicts,
self._validate_audience_overlap,
self._validate_frequency_caps,
self._validate_geo_restrictions,
self._validate_bid_strategy,
self._validate_pacing_rules,
self._validate_attribution_windows,
self._validate_brand_safety,
]
async def validate_campaign(self, campaign: Dict) -> CampaignValidationResponse:
"""Run all validators in parallel using asyncio.gather."""
start = time.monotonic()
# Phase 1: Critical validators (fail-fast)
critical_tasks = [
self._run_with_cache(v, campaign)
for v in self.validators
if v.__name__ in (
"_validate_budget_constraints",
"_validate_compliance_rules",
)
]
critical_results = await asyncio.gather(*critical_tasks)
# Short-circuit if critical validation fails
if any(
not r.passed and r.severity == ValidationSeverity.CRITICAL
for r in critical_results
):
return CampaignValidationResponse(
campaign_id=campaign["id"],
is_valid=False,
results=list(critical_results),
total_latency_ms=(time.monotonic() - start) * 1000,
)
# Phase 2: All remaining validators in parallel
remaining_tasks = [
self._run_with_cache(v, campaign)
for v in self.validators
if v.__name__ not in (
"_validate_budget_constraints",
"_validate_compliance_rules",
)
]
remaining_results = await asyncio.gather(*remaining_tasks)
all_results = list(critical_results) + list(remaining_results)
cached_count = sum(1 for r in all_results if r.cached)
elapsed = (time.monotonic() - start) * 1000
return CampaignValidationResponse(
campaign_id=campaign["id"],
is_valid=all(
r.passed for r in all_results
if r.severity == ValidationSeverity.CRITICAL
),
results=all_results,
total_latency_ms=elapsed,
cache_hit_rate=cached_count / len(all_results) if all_results else 0,
)
async def _run_with_cache(self, validator, campaign: Dict) -> ValidationResult:
"""Execute validator with Redis cache lookup."""
cache_key = self._cache_key(validator.__name__, campaign)
# Try cache first
cached = await self.redis.get(cache_key)
if cached:
result = ValidationResult(**json.loads(cached))
result.cached = True
return result
# Execute validator
start = time.monotonic()
result = await validator(campaign)
result.latency_ms = (time.monotonic() - start) * 1000
# Cache the result
await self.redis.setex(
cache_key,
self.cache_ttl,
json.dumps({
"rule_name": result.rule_name,
"passed": result.passed,
"severity": result.severity.value,
"message": result.message,
"latency_ms": result.latency_ms,
})
)
return result
def _cache_key(self, rule: str, campaign: Dict) -> str:
"""Generate deterministic cache key from rule + campaign hash."""
payload = json.dumps(
{k: campaign[k] for k in sorted(campaign.keys())},
sort_keys=True, default=str,
)
digest = hashlib.sha256(payload.encode()).hexdigest()[:16]
return f"validation:{rule}:{digest}"
# --- Individual Validators ---
async def _validate_budget_constraints(self, campaign: Dict) -> ValidationResult:
"""Validate budget against deal limits and remaining allocation."""
total_budget = campaign.get("budget", 0)
deal_limit = campaign.get("deal_budget_limit", float("inf"))
allocated = campaign.get("allocated_spend", 0)
remaining = deal_limit - allocated
passed = total_budget <= remaining
return ValidationResult(
rule_name="budget_constraints",
passed=passed,
severity=ValidationSeverity.CRITICAL,
message=f"Budget ${total_budget:,.2f} vs remaining ${remaining:,.2f}"
if not passed else "Budget within limits",
)
async def _validate_targeting_criteria(self, campaign: Dict) -> ValidationResult:
"""Validate audience targeting parameters."""
targeting = campaign.get("targeting", {})
has_audience = bool(targeting.get("audience_segments"))
has_geo = bool(targeting.get("geo_targets"))
return ValidationResult(
rule_name="targeting_criteria",
passed=has_audience and has_geo,
severity=ValidationSeverity.CRITICAL,
message="Targeting criteria validated" if (has_audience and has_geo)
else "Missing required targeting: audience or geo",
)
# ... additional validators follow the same pattern
Sequential validation: 12 validators x ~650ms avg = ~8 seconds. With asyncio.gather parallelism + Redis caching: ~450ms (94% reduction). Cache hit rate stabilizes at 85% during peak hours due to repeated validation of similar campaign configurations.
The platform uses Change Data Capture (CDC) via Debezium to propagate deal and campaign state changes across services. The pipeline processes 100K+ records per day with exactly-once semantics, dead letter queues for poison messages, and circuit breakers to prevent cascading failures.
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Dict, Optional
from confluent_kafka import Consumer, Producer, KafkaError
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Rejecting requests
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitBreaker:
"""Circuit breaker to prevent cascading failures in CDC pipeline."""
failure_threshold: int = 5
recovery_timeout: float = 30.0
half_open_max_calls: int = 3
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
last_failure_time: float = 0.0
half_open_calls: int = 0
def can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if time.monotonic() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
return True
return False
# HALF_OPEN: allow limited calls
return self.half_open_calls < self.half_open_max_calls
def record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max_calls:
self.state = CircuitState.CLOSED
self.failure_count = 0
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.monotonic()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.warning(f"Circuit OPEN after {self.failure_count} failures")
class CDCPipelineConsumer:
"""
Kafka CDC consumer with DLQ, circuit breakers, and exponential backoff.
Processes 100K+ records/day from Debezium CDC connectors.
"""
def __init__(self, config: Dict):
self.consumer = Consumer({
"bootstrap.servers": config["kafka_brokers"],
"group.id": config["consumer_group"],
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # Manual commit for exactly-once
"max.poll.interval.ms": 300000,
})
self.dlq_producer = Producer({
"bootstrap.servers": config["kafka_brokers"],
})
self.circuit_breaker = CircuitBreaker()
self.max_retries = 3
self.base_backoff = 1.0 # seconds
self.handlers: Dict[str, Callable] = {}
self.metrics = {"processed": 0, "failed": 0, "dlq": 0}
def register_handler(self, event_type: str, handler: Callable):
"""Register a handler for a specific CDC event type."""
self.handlers[event_type] = handler
async def consume_loop(self, topics: list):
"""Main consumption loop with error handling."""
self.consumer.subscribe(topics)
logger.info(f"Subscribed to topics: {topics}")
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
logger.error(f"Kafka error: {msg.error()}")
continue
await self._process_message(msg)
async def _process_message(self, msg):
"""Process a single CDC message with retry and DLQ logic."""
if not self.circuit_breaker.can_execute():
logger.warning("Circuit breaker OPEN — skipping message")
await self._send_to_dlq(msg, "circuit_breaker_open")
return
for attempt in range(self.max_retries):
try:
payload = json.loads(msg.value().decode("utf-8"))
event_type = payload.get("op", "unknown") # c=create, u=update, d=delete
table = payload.get("source", {}).get("table", "unknown")
handler_key = f"{table}.{event_type}"
handler = self.handlers.get(handler_key)
if handler:
await handler(payload)
# Commit offset on success
self.consumer.commit(msg)
self.circuit_breaker.record_success()
self.metrics["processed"] += 1
return
except Exception as e:
self.circuit_breaker.record_failure()
backoff = self.base_backoff * (2 ** attempt)
logger.warning(
f"Retry {attempt + 1}/{self.max_retries} "
f"in {backoff:.1f}s: {e}"
)
await asyncio.sleep(backoff)
# All retries exhausted — send to DLQ
await self._send_to_dlq(msg, "max_retries_exhausted")
self.metrics["dlq"] += 1
self.metrics["failed"] += 1
async def _send_to_dlq(self, msg, reason: str):
"""Route failed message to Dead Letter Queue with metadata."""
dlq_payload = {
"original_topic": msg.topic(),
"original_partition": msg.partition(),
"original_offset": msg.offset(),
"failure_reason": reason,
"timestamp": time.time(),
"payload": msg.value().decode("utf-8"),
}
self.dlq_producer.produce(
topic=f"{msg.topic()}.dlq",
value=json.dumps(dlq_payload).encode("utf-8"),
)
self.dlq_producer.flush()
logger.error(f"Message sent to DLQ: {reason}")
Consistency: CDC captures changes at the database level, guaranteeing no events are lost even if the application crashes mid-transaction. Application-level events require dual-write coordination (write to DB + publish event), which introduces inconsistency windows. Decoupling: Services don't need to know about downstream consumers — the DB is the source of truth.
Deals above configurable thresholds require multi-level approvals from sales managers, finance, and legal. The engine implements a finite state machine with SLA tracking, auto-escalation, and parallel approval paths. This design reduced approval cycle times by 31.6%.
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional
from uuid import uuid4
logger = logging.getLogger(__name__)
class ApprovalState(Enum):
DRAFT = "draft"
PENDING = "pending"
LEVEL_1_REVIEW = "level_1_review"
LEVEL_2_REVIEW = "level_2_review"
LEVEL_3_REVIEW = "level_3_review"
APPROVED = "approved"
REJECTED = "rejected"
ESCALATED = "escalated"
class ApprovalAction(Enum):
SUBMIT = "submit"
APPROVE = "approve"
REJECT = "reject"
ESCALATE = "escalate"
REVISE = "revise"
@dataclass
class ApprovalLevel:
level: int
approver_role: str
sla_hours: int
required: bool = True
parallel_with: Optional[int] = None # Level to run in parallel with
@dataclass
class ApprovalRecord:
id: str = field(default_factory=lambda: str(uuid4()))
deal_id: str = ""
level: int = 0
approver_id: str = ""
action: Optional[ApprovalAction] = None
comments: str = ""
created_at: datetime = field(default_factory=datetime.utcnow)
decided_at: Optional[datetime] = None
class ApprovalWorkflowEngine:
"""
Multi-level approval state machine with SLA tracking.
Reduces approval cycle time by 31.6% through:
- Parallel approval paths for high-value deals
- Auto-escalation on SLA breach
- Smart routing based on deal attributes
"""
TRANSITIONS = {
(ApprovalState.DRAFT, ApprovalAction.SUBMIT): ApprovalState.PENDING,
(ApprovalState.PENDING, ApprovalAction.APPROVE): ApprovalState.LEVEL_1_REVIEW,
(ApprovalState.LEVEL_1_REVIEW, ApprovalAction.APPROVE): ApprovalState.LEVEL_2_REVIEW,
(ApprovalState.LEVEL_2_REVIEW, ApprovalAction.APPROVE): ApprovalState.LEVEL_3_REVIEW,
(ApprovalState.LEVEL_3_REVIEW, ApprovalAction.APPROVE): ApprovalState.APPROVED,
(ApprovalState.LEVEL_1_REVIEW, ApprovalAction.REJECT): ApprovalState.REJECTED,
(ApprovalState.LEVEL_2_REVIEW, ApprovalAction.REJECT): ApprovalState.REJECTED,
(ApprovalState.LEVEL_3_REVIEW, ApprovalAction.REJECT): ApprovalState.REJECTED,
(ApprovalState.REJECTED, ApprovalAction.REVISE): ApprovalState.DRAFT,
}
def __init__(self, db, notification_service, kafka_producer):
self.db = db
self.notifier = notification_service
self.kafka = kafka_producer
self.approval_levels = [
ApprovalLevel(level=1, approver_role="sales_manager", sla_hours=24),
ApprovalLevel(level=2, approver_role="finance", sla_hours=48),
ApprovalLevel(level=3, approver_role="legal", sla_hours=72, parallel_with=2),
]
async def transition(
self, deal_id: str, action: ApprovalAction,
actor_id: str, comments: str = ""
) -> ApprovalState:
"""Execute a state transition with validation and side effects."""
deal = await self.db.get_deal(deal_id)
current_state = ApprovalState(deal["approval_state"])
# Validate transition
next_state = self.TRANSITIONS.get((current_state, action))
if next_state is None:
raise ValueError(
f"Invalid transition: {current_state.value} + {action.value}"
)
# Check authorization
await self._authorize_action(deal, actor_id, action, current_state)
# Determine if parallel approval applies
deal_value = deal.get("total_value", 0)
if deal_value > 500_000 and current_state == ApprovalState.LEVEL_1_REVIEW:
# Trigger parallel Finance + Legal review
await self._trigger_parallel_approval(deal_id, [2, 3])
# Persist state change
await self.db.update_deal_state(deal_id, next_state.value)
# Record approval decision
record = ApprovalRecord(
deal_id=deal_id,
level=self._get_current_level(current_state),
approver_id=actor_id,
action=action,
comments=comments,
decided_at=datetime.utcnow(),
)
await self.db.insert_approval_record(record)
# Emit event for downstream services
await self.kafka.produce(
topic="deal.approval.events",
value={
"deal_id": deal_id,
"action": action.value,
"from_state": current_state.value,
"to_state": next_state.value,
"actor_id": actor_id,
},
)
# Notify next approver
await self._notify_next_approver(deal_id, next_state)
logger.info(
f"Deal {deal_id}: {current_state.value} -> {next_state.value} "
f"by {actor_id}"
)
return next_state
async def check_sla_breaches(self):
"""Periodic job to detect and escalate SLA breaches."""
pending_approvals = await self.db.get_pending_approvals()
for approval in pending_approvals:
level_config = self.approval_levels[approval["level"] - 1]
deadline = approval["created_at"] + timedelta(
hours=level_config.sla_hours
)
if datetime.utcnow() > deadline:
logger.warning(
f"SLA breach: deal {approval['deal_id']} at level "
f"{approval['level']} — escalating"
)
await self._escalate(
approval["deal_id"], approval["level"]
)
async def _escalate(self, deal_id: str, current_level: int):
"""Escalate to backup approver or skip-level manager."""
backup = await self.db.get_backup_approver(current_level)
await self.notifier.send_escalation(
deal_id=deal_id,
approver_id=backup["id"],
reason="SLA breach — auto-escalated",
)
async def _trigger_parallel_approval(self, deal_id: str, levels: List[int]):
"""Launch parallel approval tracks for high-value deals."""
tasks = []
for level in levels:
config = self.approval_levels[level - 1]
approver = await self.db.get_approver_for_role(config.approver_role)
tasks.append(
self.notifier.request_approval(deal_id, approver["id"], level)
)
await asyncio.gather(*tasks)
logger.info(f"Parallel approval triggered for deal {deal_id}: levels {levels}")
Before: Sequential L1 → L2 → L3 approval, average 9.5 business days. After: Parallel Finance + Legal for deals above $500K, auto-escalation on SLA breach (24h/48h/72h limits), smart routing that skips unnecessary levels for standard deals. Average cycle dropped to 6.5 business days.
The engagement scoring service provides real-time deal prioritization based on behavioral signals, historical patterns, and ML-derived features. Scores are computed incrementally as new events arrive and cached in Redis for sub-millisecond reads by the deal management UI.
import math
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import redis.asyncio as aioredis
@dataclass
class EngagementFeatures:
"""ML feature vector for engagement scoring."""
deal_id: str = ""
# Recency features
days_since_last_interaction: float = 0.0
days_since_last_campaign_update: float = 0.0
hours_since_last_approval_action: float = 0.0
# Frequency features
interactions_7d: int = 0
interactions_30d: int = 0
campaign_edits_30d: int = 0
# Monetary features
deal_value: float = 0.0
budget_utilization: float = 0.0 # 0.0 to 1.0
historical_spend: float = 0.0
# Behavioral features
targeting_change_count: int = 0
approval_velocity: float = 0.0 # avg hours per approval level
# Account health
past_completion_rate: float = 0.0
support_tickets_90d: int = 0
@dataclass
class EngagementScore:
deal_id: str
score: float # 0.0 to 100.0
tier: str # "hot", "warm", "cold", "at_risk"
components: Dict[str, float] = field(default_factory=dict)
computed_at: datetime = field(default_factory=datetime.utcnow)
class EngagementScoringEngine:
"""
Real-time engagement scoring with weighted feature combination.
Scores are computed incrementally and cached in Redis.
"""
# Feature weights (tuned via historical deal outcome analysis)
WEIGHTS = {
"recency": 0.30,
"frequency": 0.20,
"monetary": 0.25,
"behavioral": 0.15,
"account_health": 0.10,
}
TIER_THRESHOLDS = {
"hot": 75.0,
"warm": 50.0,
"cold": 25.0,
"at_risk": 0.0,
}
def __init__(self, redis_client: aioredis.Redis, db):
self.redis = redis_client
self.db = db
self.cache_ttl = 600 # 10 min cache for scores
async def compute_score(self, deal_id: str) -> EngagementScore:
"""Compute engagement score from feature vector."""
features = await self._extract_features(deal_id)
# Compute component scores (each 0-100)
components = {
"recency": self._score_recency(features),
"frequency": self._score_frequency(features),
"monetary": self._score_monetary(features),
"behavioral": self._score_behavioral(features),
"account_health": self._score_account_health(features),
}
# Weighted combination
final_score = sum(
components[k] * self.WEIGHTS[k] for k in components
)
# Determine tier
tier = "at_risk"
for t, threshold in self.TIER_THRESHOLDS.items():
if final_score >= threshold:
tier = t
break
result = EngagementScore(
deal_id=deal_id,
score=round(final_score, 2),
tier=tier,
components=components,
)
# Cache in Redis
await self.redis.setex(
f"engagement:{deal_id}",
self.cache_ttl,
result.__dict__.__str__(),
)
return result
def _score_recency(self, f: EngagementFeatures) -> float:
"""Exponential decay: recent activity scores higher."""
decay_rate = 0.1
recency_raw = math.exp(-decay_rate * f.days_since_last_interaction)
campaign_recency = math.exp(-decay_rate * f.days_since_last_campaign_update)
approval_recency = math.exp(-0.05 * f.hours_since_last_approval_action)
return (recency_raw * 0.4 + campaign_recency * 0.35 + approval_recency * 0.25) * 100
def _score_frequency(self, f: EngagementFeatures) -> float:
"""Log-scaled frequency to avoid domination by power users."""
freq_7d = min(math.log1p(f.interactions_7d) / math.log1p(20), 1.0)
freq_30d = min(math.log1p(f.interactions_30d) / math.log1p(60), 1.0)
edits = min(math.log1p(f.campaign_edits_30d) / math.log1p(15), 1.0)
return (freq_7d * 0.5 + freq_30d * 0.3 + edits * 0.2) * 100
def _score_monetary(self, f: EngagementFeatures) -> float:
"""Normalize deal value with sigmoid for graceful saturation."""
value_norm = 1 / (1 + math.exp(-0.00001 * (f.deal_value - 100_000)))
utilization = f.budget_utilization
spend_norm = min(math.log1p(f.historical_spend) / math.log1p(1_000_000), 1.0)
return (value_norm * 0.4 + utilization * 0.35 + spend_norm * 0.25) * 100
def _score_behavioral(self, f: EngagementFeatures) -> float:
"""Behavioral signals indicate active deal management."""
targeting_score = min(f.targeting_change_count / 10, 1.0)
velocity_score = max(0, 1 - f.approval_velocity / 168) # 168h = 1 week
return (targeting_score * 0.5 + velocity_score * 0.5) * 100
def _score_account_health(self, f: EngagementFeatures) -> float:
"""Account reliability based on historical performance."""
completion = f.past_completion_rate * 100
tickets_penalty = min(f.support_tickets_90d * 5, 50)
return max(0, completion - tickets_penalty)
async def _extract_features(self, deal_id: str) -> EngagementFeatures:
"""Extract feature vector from database and event store."""
deal = await self.db.get_deal_with_history(deal_id)
events = await self.db.get_deal_events(deal_id, days=90)
account = await self.db.get_account_health(deal["account_id"])
now = datetime.utcnow()
return EngagementFeatures(
deal_id=deal_id,
days_since_last_interaction=(now - deal["last_interaction"]).days,
days_since_last_campaign_update=(now - deal["last_campaign_update"]).days,
hours_since_last_approval_action=(now - deal["last_approval_action"]).total_seconds() / 3600,
interactions_7d=len([e for e in events if (now - e["ts"]).days <= 7]),
interactions_30d=len([e for e in events if (now - e["ts"]).days <= 30]),
campaign_edits_30d=deal.get("campaign_edits_30d", 0),
deal_value=deal["total_value"],
budget_utilization=deal.get("budget_utilization", 0.0),
historical_spend=account.get("total_historical_spend", 0),
targeting_change_count=deal.get("targeting_changes", 0),
approval_velocity=deal.get("avg_approval_hours", 48.0),
past_completion_rate=account.get("deal_completion_rate", 0.8),
support_tickets_90d=account.get("support_tickets_90d", 0),
)
The platform achieves an 85% cache hit rate through a multi-layer caching strategy using Redis. The cache sits between the API layer and the database, dramatically reducing read latency and PostgreSQL load during peak deal management hours.
import json
import logging
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Dict, Optional
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
class CacheLayer(Enum):
VALIDATION = "validation" # TTL: 300s
ENTITY = "entity" # TTL: 900s
ENGAGEMENT = "engagement" # TTL: 600s
CONFIG = "config" # TTL: 3600s
class CacheInvalidationStrategy(Enum):
WRITE_THROUGH = "write_through" # Update cache on write
WRITE_BEHIND = "write_behind" # Async cache update
INVALIDATE = "invalidate" # Delete on write, lazy reload
@dataclass
class CacheConfig:
layer: CacheLayer
ttl: int
strategy: CacheInvalidationStrategy
prefix: str
class MultiLayerCacheManager:
"""
Multi-layer Redis cache with 85% hit rate.
Supports write-through, write-behind, and invalidation strategies.
"""
LAYER_CONFIGS = {
CacheLayer.VALIDATION: CacheConfig(
layer=CacheLayer.VALIDATION, ttl=300,
strategy=CacheInvalidationStrategy.INVALIDATE,
prefix="val",
),
CacheLayer.ENTITY: CacheConfig(
layer=CacheLayer.ENTITY, ttl=900,
strategy=CacheInvalidationStrategy.WRITE_THROUGH,
prefix="ent",
),
CacheLayer.ENGAGEMENT: CacheConfig(
layer=CacheLayer.ENGAGEMENT, ttl=600,
strategy=CacheInvalidationStrategy.WRITE_BEHIND,
prefix="eng",
),
CacheLayer.CONFIG: CacheConfig(
layer=CacheLayer.CONFIG, ttl=3600,
strategy=CacheInvalidationStrategy.INVALIDATE,
prefix="cfg",
),
}
def __init__(self, redis_client: aioredis.Redis):
self.redis = redis_client
self.stats = {"hits": 0, "misses": 0, "invalidations": 0}
async def get(
self, layer: CacheLayer, key: str
) -> Optional[Any]:
"""Retrieve from cache with stats tracking."""
config = self.LAYER_CONFIGS[layer]
cache_key = f"{config.prefix}:{key}"
cached = await self.redis.get(cache_key)
if cached:
self.stats["hits"] += 1
return json.loads(cached)
self.stats["misses"] += 1
return None
async def set(
self, layer: CacheLayer, key: str, value: Any
):
"""Store in cache with layer-specific TTL."""
config = self.LAYER_CONFIGS[layer]
cache_key = f"{config.prefix}:{key}"
await self.redis.setex(
cache_key, config.ttl, json.dumps(value, default=str)
)
async def invalidate(
self, layer: CacheLayer, key: str
):
"""Remove specific key from cache."""
config = self.LAYER_CONFIGS[layer]
cache_key = f"{config.prefix}:{key}"
await self.redis.delete(cache_key)
self.stats["invalidations"] += 1
async def invalidate_pattern(
self, layer: CacheLayer, pattern: str
):
"""Invalidate all keys matching a pattern (e.g., deal update)."""
config = self.LAYER_CONFIGS[layer]
full_pattern = f"{config.prefix}:{pattern}*"
cursor = 0
while True:
cursor, keys = await self.redis.scan(
cursor=cursor, match=full_pattern, count=100
)
if keys:
await self.redis.delete(*keys)
self.stats["invalidations"] += len(keys)
if cursor == 0:
break
async def get_or_compute(
self, layer: CacheLayer, key: str,
compute_fn: Callable, *args
) -> Any:
"""Cache-aside pattern: get from cache or compute and store."""
cached = await self.get(layer, key)
if cached is not None:
return cached
value = await compute_fn(*args)
await self.set(layer, key, value)
return value
async def write_through(
self, layer: CacheLayer, key: str,
value: Any, db_write_fn: Callable, *args
):
"""Write-through: update DB and cache atomically."""
await db_write_fn(*args)
await self.set(layer, key, value)
def hit_rate(self) -> float:
"""Current cache hit rate percentage."""
total = self.stats["hits"] + self.stats["misses"]
return (self.stats["hits"] / total * 100) if total > 0 else 0.0
Cache Invalidation Pitfall: When a deal is updated, we must invalidate not just the entity cache but also the validation cache (campaign rules may reference deal limits) and the engagement cache (scores depend on deal state). We use a cascade invalidation pattern: deal update triggers invalidation across all three layers via a Kafka event handler.
The platform maintains 99.95% uptime through a defense-in-depth approach: circuit breakers at service boundaries, comprehensive health checks, structured logging, and real-time Grafana dashboards. Every service exposes Prometheus metrics for alerting and capacity planning.
import asyncio
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Dict, List, Optional
from prometheus_client import Counter, Histogram, Gauge
logger = logging.getLogger(__name__)
# --- Prometheus Metrics ---
REQUEST_LATENCY = Histogram(
"deal_platform_request_latency_seconds",
"Request latency in seconds",
["service", "endpoint", "method"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
REQUEST_COUNT = Counter(
"deal_platform_requests_total",
"Total requests",
["service", "endpoint", "status"],
)
CIRCUIT_STATE = Gauge(
"deal_platform_circuit_state",
"Circuit breaker state (0=closed, 1=open, 2=half_open)",
["service", "dependency"],
)
CACHE_HIT_RATE = Gauge(
"deal_platform_cache_hit_rate",
"Cache hit rate percentage",
["layer"],
)
CDC_LAG = Gauge(
"deal_platform_cdc_consumer_lag",
"CDC consumer lag in messages",
["topic", "partition"],
)
DLQ_SIZE = Gauge(
"deal_platform_dlq_size",
"Dead letter queue message count",
["topic"],
)
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class HealthCheck:
name: str
status: HealthStatus
latency_ms: float = 0.0
message: str = ""
class ServiceHealthChecker:
"""Comprehensive health check for all service dependencies."""
def __init__(self, db, redis_client, kafka_consumer):
self.db = db
self.redis = redis_client
self.kafka = kafka_consumer
async def check_all(self) -> Dict:
"""Run all health checks in parallel."""
checks = await asyncio.gather(
self._check_database(),
self._check_redis(),
self._check_kafka(),
return_exceptions=True,
)
results = []
for check in checks:
if isinstance(check, Exception):
results.append(HealthCheck(
name="unknown",
status=HealthStatus.UNHEALTHY,
message=str(check),
))
else:
results.append(check)
overall = HealthStatus.HEALTHY
if any(c.status == HealthStatus.UNHEALTHY for c in results):
overall = HealthStatus.UNHEALTHY
elif any(c.status == HealthStatus.DEGRADED for c in results):
overall = HealthStatus.DEGRADED
return {
"status": overall.value,
"checks": [
{"name": c.name, "status": c.status.value,
"latency_ms": c.latency_ms, "message": c.message}
for c in results
],
}
async def _check_database(self) -> HealthCheck:
start = time.monotonic()
try:
await self.db.execute("SELECT 1")
latency = (time.monotonic() - start) * 1000
status = HealthStatus.HEALTHY if latency < 100 else HealthStatus.DEGRADED
return HealthCheck(name="postgresql", status=status, latency_ms=latency)
except Exception as e:
return HealthCheck(
name="postgresql", status=HealthStatus.UNHEALTHY,
latency_ms=(time.monotonic() - start) * 1000, message=str(e),
)
async def _check_redis(self) -> HealthCheck:
start = time.monotonic()
try:
await self.redis.ping()
latency = (time.monotonic() - start) * 1000
status = HealthStatus.HEALTHY if latency < 10 else HealthStatus.DEGRADED
return HealthCheck(name="redis", status=status, latency_ms=latency)
except Exception as e:
return HealthCheck(
name="redis", status=HealthStatus.UNHEALTHY,
latency_ms=(time.monotonic() - start) * 1000, message=str(e),
)
async def _check_kafka(self) -> HealthCheck:
start = time.monotonic()
try:
metadata = self.kafka.list_topics(timeout=5)
latency = (time.monotonic() - start) * 1000
topic_count = len(metadata.topics)
status = HealthStatus.HEALTHY if topic_count > 0 else HealthStatus.DEGRADED
return HealthCheck(
name="kafka", status=status, latency_ms=latency,
message=f"{topic_count} topics available",
)
except Exception as e:
return HealthCheck(
name="kafka", status=HealthStatus.UNHEALTHY,
latency_ms=(time.monotonic() - start) * 1000, message=str(e),
)
class RetryWithBackoff:
"""Decorator for retrying async functions with exponential backoff."""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
max_delay: float = 60.0, exceptions: tuple = (Exception,)):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exceptions = exceptions
def __call__(self, func: Callable):
async def wrapper(*args, **kwargs):
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except self.exceptions as e:
if attempt == self.max_retries:
logger.error(f"{func.__name__} failed after {self.max_retries} retries: {e}")
raise
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
logger.warning(
f"{func.__name__} attempt {attempt + 1} failed, "
f"retrying in {delay:.1f}s: {e}"
)
await asyncio.sleep(delay)
return wrapper
Row 1 — Golden Signals: Request rate, error rate (4xx/5xx split), p50/p95/p99 latency, saturation (CPU/memory).
Row 2 — Pipeline Health: CDC consumer lag, DLQ depth, messages processed/min, circuit breaker state.
Row 3 — Business Metrics: Deals in each approval state, avg approval cycle time, validation pass rate, cache hit rate.
Row 4 — Infrastructure: PostgreSQL connections, Redis memory usage, Kafka partition balance, pod restart count.
The PostgreSQL schema is designed around domain-driven bounded contexts. Each service owns its tables, and cross-service data flows through Kafka events rather than shared database access. The schema supports full audit logging for compliance requirements.
-- ============================================
-- DEALS: Core deal entity with lifecycle state
-- ============================================
CREATE TABLE deals (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
account_id UUID NOT NULL REFERENCES accounts(id),
deal_name VARCHAR(255) NOT NULL,
deal_type VARCHAR(50) NOT NULL, -- 'standard', 'programmatic', 'preferred'
total_value DECIMAL(15,2) NOT NULL,
currency CHAR(3) DEFAULT 'USD',
status VARCHAR(30) NOT NULL DEFAULT 'draft',
approval_state VARCHAR(30) NOT NULL DEFAULT 'draft',
owner_id UUID NOT NULL,
start_date DATE NOT NULL,
end_date DATE NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
CONSTRAINT chk_dates CHECK (end_date > start_date),
CONSTRAINT chk_value CHECK (total_value > 0)
);
CREATE INDEX idx_deals_account ON deals(account_id);
CREATE INDEX idx_deals_status ON deals(status);
CREATE INDEX idx_deals_approval ON deals(approval_state);
CREATE INDEX idx_deals_owner ON deals(owner_id);
CREATE INDEX idx_deals_dates ON deals(start_date, end_date);
-- ============================================
-- CAMPAIGNS: Ad campaigns within a deal
-- ============================================
CREATE TABLE campaigns (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
deal_id UUID NOT NULL REFERENCES deals(id) ON DELETE CASCADE,
campaign_name VARCHAR(255) NOT NULL,
campaign_type VARCHAR(50) NOT NULL, -- 'display', 'video', 'native', 'sponsored'
budget DECIMAL(15,2) NOT NULL,
allocated_spend DECIMAL(15,2) DEFAULT 0,
status VARCHAR(30) NOT NULL DEFAULT 'draft',
targeting JSONB NOT NULL DEFAULT '{}', -- audience, geo, device targeting
scheduling JSONB NOT NULL DEFAULT '{}', -- flight dates, pacing
creative_specs JSONB NOT NULL DEFAULT '{}', -- ad formats, sizes
validation_status VARCHAR(30) DEFAULT 'pending',
last_validated_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_campaigns_deal ON campaigns(deal_id);
CREATE INDEX idx_campaigns_status ON campaigns(status);
CREATE INDEX idx_campaigns_targeting ON campaigns USING GIN(targeting);
-- ============================================
-- APPROVALS: Multi-level approval tracking
-- ============================================
CREATE TABLE approvals (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
deal_id UUID NOT NULL REFERENCES deals(id),
level INTEGER NOT NULL,
approver_id UUID NOT NULL,
approver_role VARCHAR(50) NOT NULL,
action VARCHAR(20), -- 'approve', 'reject', 'escalate'
comments TEXT,
sla_deadline TIMESTAMPTZ NOT NULL,
decided_at TIMESTAMPTZ,
is_sla_breached BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_approvals_deal ON approvals(deal_id);
CREATE INDEX idx_approvals_pending ON approvals(deal_id) WHERE action IS NULL;
CREATE INDEX idx_approvals_sla ON approvals(sla_deadline) WHERE action IS NULL;
-- ============================================
-- ENGAGEMENT METRICS: Real-time scoring data
-- ============================================
CREATE TABLE engagement_metrics (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
deal_id UUID NOT NULL REFERENCES deals(id),
score DECIMAL(5,2) NOT NULL,
tier VARCHAR(20) NOT NULL, -- 'hot', 'warm', 'cold', 'at_risk'
components JSONB NOT NULL DEFAULT '{}',
feature_vector JSONB NOT NULL DEFAULT '{}',
computed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_engagement_deal ON engagement_metrics(deal_id);
CREATE INDEX idx_engagement_tier ON engagement_metrics(tier);
CREATE INDEX idx_engagement_score ON engagement_metrics(score DESC);
-- ============================================
-- AUDIT LOG: Immutable event log for compliance
-- ============================================
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
entity_type VARCHAR(50) NOT NULL, -- 'deal', 'campaign', 'approval'
entity_id UUID NOT NULL,
action VARCHAR(50) NOT NULL, -- 'created', 'updated', 'approved'
actor_id UUID NOT NULL,
changes JSONB, -- diff of old vs new values
metadata JSONB DEFAULT '{}', -- IP, user agent, correlation ID
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_audit_entity ON audit_log(entity_type, entity_id);
CREATE INDEX idx_audit_actor ON audit_log(actor_id);
CREATE INDEX idx_audit_time ON audit_log(created_at DESC);
JSONB for targeting/scheduling: Campaign targeting rules vary widely by campaign type. JSONB allows flexible schemas with GIN indexes for efficient querying.
Partial indexes on approvals: The WHERE action IS NULL index dramatically speeds up the SLA breach checker, which only queries pending approvals.
Immutable audit log: No UPDATE or DELETE operations allowed on audit_log. Append-only design ensures compliance auditability.
Key performance indicators measured in production across a 90-day rolling window. These metrics are tracked on Grafana dashboards and trigger PagerDuty alerts when thresholds are breached.
| Operation | P50 | P95 | P99 | Notes |
|---|---|---|---|---|
| Campaign Validation (cached) | 12ms | 45ms | 120ms | 85% of requests |
| Campaign Validation (cold) | 380ms | 620ms | 1.2s | Parallel async validators |
| Deal Read (cache hit) | 2ms | 8ms | 15ms | Redis entity cache |
| Deal Read (DB) | 18ms | 45ms | 85ms | PostgreSQL with indexes |
| Engagement Score | 5ms | 22ms | 50ms | Pre-computed, cached |
| Approval Transition | 35ms | 85ms | 150ms | DB write + Kafka publish |
| CDC Event Processing | 15ms | 60ms | 200ms | Per-message processing |
Validation Latency: 8,000ms → 450ms (94% reduction via asyncio parallelism + Redis caching).
Approval Cycle: 9.5 days → 6.5 days (31.6% reduction via parallel approvals + auto-escalation).
Data Sync Lag: 15 min (batch jobs) → <2 sec (Kafka CDC real-time streaming).
Cache Hit Rate: 0% (no caching) → 85% (multi-layer Redis with intelligent invalidation).
| Decision | Chosen | Alternative | Rationale |
|---|---|---|---|
| Event streaming | Kafka CDC (Debezium) | Application-level events | Eliminates dual-write problem; DB is single source of truth; no event loss on app crash |
| Async validation | Python asyncio | Thread pool / multiprocessing | I/O-bound validators (API calls, DB lookups) benefit from coroutine switching; lower memory than threads |
| Cache strategy | Multi-layer Redis with TTLs | Single cache layer | Different data types have different staleness tolerance; per-layer TTLs optimize freshness vs. hit rate |
| Approval engine | State machine with Kafka events | Workflow engine (Temporal/Camunda) | Simpler operational footprint; approval logic fits FSM pattern well; no need for long-running workflow orchestration |
| Database | PostgreSQL + JSONB | MongoDB / DynamoDB | ACID transactions for financial data; JSONB gives document flexibility; mature ecosystem for ad tech reporting |
| Error handling | DLQ + circuit breakers | Retry-only / manual intervention | DLQ prevents poison messages from blocking pipeline; circuit breakers prevent cascading failure to downstream services |
| Scoring computation | Incremental + cache | Batch nightly computation | Real-time scores enable immediate deal prioritization; 10-min cache TTL balances freshness vs. compute cost |
| Schema design | Service-owned tables | Shared database | Enables independent schema evolution; services communicate via events; avoids tight coupling |
| Deployment | Kubernetes + Docker | Serverless (Lambda) | Long-running Kafka consumers need persistent processes; K8s provides pod auto-scaling and self-healing |
Key Trade-off: Choosing CDC over application events means a dependency on database WAL configuration and Debezium connector health. If the connector goes down, events stop flowing. Mitigation: dedicated health monitoring for Debezium with automated connector restart and Slack alerting on consumer lag spikes.
The platform is designed to scale from current load (100K records/day) to 10x growth without architectural changes. Each layer has independent scaling knobs.
audit_log partitioned by month (range partitioning on created_at). Old partitions archived to cold storageallkeys-lru per-node, but TTLs ensure most keys expire naturally before eviction
Current: 100K CDC events/day, 50K API requests/day, 2TB PostgreSQL, 7.5GB Redis.
Projected 10x: 1M CDC events/day, 500K API requests/day. Requires: 4 Kafka consumer pods (from 2), 8 API pods (from 4), read replicas, and Redis cluster expansion to 12 nodes. Estimated cost increase: ~3x (not 10x) due to caching efficiency.
Prepared answers for system design interview questions about this platform. Each answer is structured as: Context → Action → Result.
Context: 12 sequential validators each making DB/API calls, total ~8s. Action: Redesigned with asyncio.gather for parallel execution + Redis caching for stable validation results. Added fail-fast semantics — critical validators (budget, compliance) short-circuit before running remaining checks. Result: P50 dropped to 450ms. Cache hit rate of 85% means most validations serve from Redis without running validators at all.
Context: Needed reliable event propagation for deal state changes across 5+ downstream services. Action: Chose Debezium CDC capturing from PostgreSQL WAL. Result: Eliminated dual-write problem (write DB + publish event). Zero event loss guarantee since changes are captured at the DB level. Trade-off: dependency on Debezium connector health, mitigated by health monitoring and auto-restart.
Context: Multi-level approvals (Sales Manager → Finance → Legal) with different SLA windows (24h/48h/72h). Action: Periodic SLA checker job queries pending approvals with partial index. On breach: auto-escalate to backup approver + notify skip-level manager. For high-value deals ($500K+): Finance and Legal review in parallel instead of sequential. Result: 31.6% reduction in approval cycle time (9.5 → 6.5 business days).
Context: Malformed or unprocessable CDC events could block the consumer. Action: Three-tier defense: (1) Retry with exponential backoff (1s, 2s, 4s), (2) After 3 retries, route to Dead Letter Queue with full metadata, (3) Circuit breaker opens after 5 consecutive failures, halting consumption until recovery. DLQ messages are processed manually or by a separate remediation service. Result: Zero pipeline blockages in 90 days. DLQ rate < 0.01% of total messages.
Context: Sales teams needed real-time deal prioritization based on engagement likelihood. Action: Built a weighted scoring engine with 5 feature categories (recency, frequency, monetary, behavioral, account health). Scores computed incrementally on deal events and cached in Redis (10-min TTL). Tiers: hot (75+), warm (50-74), cold (25-49), at_risk (<25). Result: Sales teams prioritize hot deals first, improving deal conversion rates and reducing time-to-close.
Context: Deal updates must propagate to Campaign, Approval, and Engagement services reliably. Action: Each service owns its data. CDC provides eventually consistent propagation. For critical flows (deal creation + initial approval), we use the transactional outbox pattern — write to DB and outbox table in the same transaction. Debezium captures from the outbox. Result: Strong consistency within service boundaries, eventual consistency across services with guaranteed delivery.
Context: Deal update must invalidate entity cache, validation cache (campaign rules reference deal), and engagement cache. Action: Cascade invalidation via Kafka events. When the CDC pipeline processes a deal update, it publishes a cache.invalidate event. Each service's cache handler listens and invalidates relevant keys using pattern matching. Result: 85% cache hit rate with sub-second staleness window. No stale data incidents in production.
Context: Platform serves enterprise ad deals — downtime directly impacts revenue. Action: Prometheus metrics exported from every service. Grafana dashboards for golden signals (rate, errors, latency, saturation) + business metrics (deal states, approval SLAs, cache rates). PagerDuty integration with severity routing: P1 (error rate >5% or uptime drop) pages on-call, P2 (CDC lag >10K) goes to Slack, P3 (cache hit rate drop) goes to email. Result: Mean time to detection < 2 minutes. 99.95% uptime over 12 months.
Context: Current: 100K CDC events/day, 50K API requests/day. Action: Phase 1: Scale Kubernetes pods via HPA (2x API pods, 2x consumers). Phase 2: Add PostgreSQL read replicas for reporting queries. Phase 3: Increase Kafka partitions from 12 to 64 per topic. Phase 4 (if needed): Shard PostgreSQL by account_id using consistent hashing. Result: Estimated 10x capacity with ~3x cost increase due to caching absorbing repeated reads.
Context: Evaluated Temporal, Camunda, and custom state machine. Action: Chose FSM because: (1) Approval logic maps cleanly to states and transitions — no need for long-running workflow orchestration. (2) Lower operational overhead — no separate workflow cluster to manage. (3) State stored in PostgreSQL with the deal entity, avoiding consistency issues between workflow state and deal state. Trade-off: If approval logic grows complex (conditional branching, timer-based auto-actions), we may migrate to Temporal. Current complexity does not warrant it.
Context: CDC pipelines are hard to unit test because they depend on Debezium + Kafka + PostgreSQL. Action: Three testing layers: (1) Unit tests: mock Kafka consumer, test handler logic in isolation. (2) Integration tests: Testcontainers with real PostgreSQL + Kafka + Debezium, verify end-to-end CDC flow. (3) Canary testing: deploy to staging, write test deals, verify CDC events appear within SLA. Result: 95% code coverage on handlers. Integration tests catch schema drift before it hits production.
Context: Redis serves validation cache, entity cache, and engagement scores. Action: Cache is a performance optimization, not a correctness requirement. All services fall back to database reads on cache miss. Circuit breaker on Redis client: if Redis is unreachable, stop attempting cache reads/writes for 30s (avoid timeout pile-up). Redis Cluster with 3 replicas for HA — single node failure is automatic failover. Result: During a Redis node failure: latency increases from ~12ms to ~45ms (entity reads), but zero errors. Service degrades gracefully.
Context: Database schema changes (adding columns, changing types) affect CDC event format. Action: Avro schema registry with backward compatibility enforcement. New columns are added as optional fields with defaults. Consumers use a schema-aware deserializer that handles both old and new formats. Breaking changes (rare) go through a 3-phase migration: add new field → backfill → deprecate old field. Result: Zero downtime schema migrations. 15+ schema changes deployed without pipeline disruption.
Context: Two approvers at the same level could submit decisions simultaneously. Action: Optimistic locking with version column on the deals table. Approval transition reads current version, performs state change, and updates with WHERE version = expected_version. If version mismatch, the transaction fails and the second approver gets a "decision already made" response. Additionally, the state machine rejects invalid transitions (e.g., approving an already-approved deal). Result: Zero race condition incidents. Clean conflict resolution with user-friendly error messages.
Context: CDC consumer lag spiked to 50K messages during a batch deal import (normally <100). Action: Root cause: a single large account created 5K deals in 10 minutes. All deals had the same account_id, routed to the same Kafka partition, creating a hot partition. The single consumer on that partition couldn't keep up. Fix: Added a composite partition key (account_id + deal_id hash) to distribute load across partitions while maintaining per-deal ordering. Also added backpressure metrics — alert when any single partition lag exceeds 5K. Result: Hot partition events eliminated. Consumer lag stays <500 even during bulk operations.
Context: Kafka at-least-once delivery means duplicate events are possible. Action: Every CDC event includes a unique event_id (combination of LSN + timestamp). Consumers maintain an idempotency table keyed by event_id. Before processing, check if event_id exists; if so, skip. The idempotency table uses TTL-based cleanup (events older than 7 days are purged). Result: Exactly-once processing semantics despite at-least-once delivery. Deduplication overhead < 1ms per message (Redis SET NX).