Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Production Deployment

Production deployment of probabilistic models requires reliability engineering, performance optimization, and operational excellence at scale. This guide establishes a mathematical framework for fault tolerance, service reliability, and system observability using Fugue's production-ready infrastructure patterns.

Reliability Theory Framework

Production systems exhibit stochastic reliability characterized by:

  • Availability: where MTBF = Mean Time Between Failures
  • Reliability Function: for exponential failure rates
  • Service Level Agreement:

Fugue's deployment patterns optimize these metrics through systematic fault isolation and graceful degradation.

Error Handling and Graceful Degradation

Graceful degradation implements fault tolerance through systematic error recovery and service continuity. The mathematical foundation relies on Markov reliability models and circuit breaker theory:

stateDiagram-v2
    [*] --> Healthy
    Healthy --> Degraded : Error Rate > τ₁
    Degraded --> Failed : Error Rate > τ₂  
    Failed --> Recovery : Time > T_recovery
    Recovery --> Healthy : Success Rate > σ
    Degraded --> Healthy : Error Rate < τ₀
    
    note right of Healthy
        Error Rate: λ < τ₀
        SLA: 99.9%
        Full Functionality
    end note
    
    note right of Degraded  
        Error Rate: τ₀ < λ < τ₁
        SLA: 95%
        Limited Functionality
    end note
    
    note right of Failed
        Error Rate: λ > τ₂
        Circuit Open
        Fallback Mode
    end note

Circuit Breaker Mathematics: The failure rate follows a Poisson process with rate . The circuit breaker transitions based on:

Error Budget Model: For SLA target , the error budget is:

/// Production-ready handler that gracefully handles failures
struct RobustProductionHandler<H: Handler> {
    inner: H,
    error_count: u32,
    max_errors: u32,
    _fallback_values: HashMap<String, ChoiceValue>,
    circuit_breaker_open: bool,
}

impl<H: Handler> RobustProductionHandler<H> {
    fn new(inner: H, max_errors: u32) -> Self {
        let mut fallback_values = HashMap::new();
        fallback_values.insert("default_f64".to_string(), ChoiceValue::F64(0.0));
        fallback_values.insert("default_bool".to_string(), ChoiceValue::Bool(false));
        fallback_values.insert("default_u64".to_string(), ChoiceValue::U64(0));
        fallback_values.insert("default_usize".to_string(), ChoiceValue::Usize(0));

        Self {
            inner,
            error_count: 0,
            max_errors,
            _fallback_values: fallback_values,
            circuit_breaker_open: false,
        }
    }

    fn handle_error(&mut self, operation: &str, addr: &Address) -> bool {
        self.error_count += 1;
        eprintln!("PRODUCTION ERROR: {} failed at address {}", operation, addr);

        if self.error_count >= self.max_errors {
            self.circuit_breaker_open = true;
            eprintln!("CIRCUIT BREAKER: Too many errors, switching to fallback mode");
        }

        self.circuit_breaker_open
    }

    fn get_fallback_f64(&self, addr: &Address) -> f64 {
        // In production, this might come from a cache, configuration, or ML model
        match addr.0.as_str() {
            s if s.contains("temperature") => 20.0,
            s if s.contains("price") => 100.0,
            s if s.contains("probability") => 0.5,
            _ => 0.0,
        }
    }
}

impl<H: Handler> Handler for RobustProductionHandler<H> {
    fn on_sample_f64(&mut self, addr: &Address, dist: &dyn Distribution<f64>) -> f64 {
        if self.circuit_breaker_open {
            return self.get_fallback_f64(addr);
        }

        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            self.inner.on_sample_f64(addr, dist)
        })) {
            Ok(value) if value.is_finite() => value,
            Ok(invalid_value) => {
                eprintln!("Invalid f64 sample: {} at {}", invalid_value, addr);
                self.handle_error("sample_f64", addr);
                self.get_fallback_f64(addr)
            }
            Err(_) => {
                self.handle_error("sample_f64_panic", addr);
                self.get_fallback_f64(addr)
            }
        }
    }

    fn on_sample_bool(&mut self, addr: &Address, dist: &dyn Distribution<bool>) -> bool {
        if self.circuit_breaker_open {
            return false; // Safe fallback
        }

        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            self.inner.on_sample_bool(addr, dist)
        })) {
            Ok(value) => value,
            Err(_) => {
                self.handle_error("sample_bool_panic", addr);
                false
            }
        }
    }

    fn on_sample_u64(&mut self, addr: &Address, dist: &dyn Distribution<u64>) -> u64 {
        if self.circuit_breaker_open {
            return 1; // Safe default
        }

        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            self.inner.on_sample_u64(addr, dist)
        })) {
            Ok(value) => value,
            Err(_) => {
                self.handle_error("sample_u64_panic", addr);
                1
            }
        }
    }

    fn on_sample_usize(&mut self, addr: &Address, dist: &dyn Distribution<usize>) -> usize {
        if self.circuit_breaker_open {
            return 0; // Safe array index
        }

        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            self.inner.on_sample_usize(addr, dist)
        })) {
            Ok(value) => value,
            Err(_) => {
                self.handle_error("sample_usize_panic", addr);
                0
            }
        }
    }

    fn on_observe_f64(&mut self, addr: &Address, dist: &dyn Distribution<f64>, value: f64) {
        if !self.circuit_breaker_open
            && std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
                self.inner.on_observe_f64(addr, dist, value)
            }))
            .is_err()
        {
            self.handle_error("observe_f64_panic", addr);
        }
    }

    fn on_observe_bool(&mut self, addr: &Address, dist: &dyn Distribution<bool>, value: bool) {
        if !self.circuit_breaker_open
            && std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
                self.inner.on_observe_bool(addr, dist, value)
            }))
            .is_err()
        {
            self.handle_error("observe_bool_panic", addr);
        }
    }

    fn on_observe_u64(&mut self, addr: &Address, dist: &dyn Distribution<u64>, value: u64) {
        if !self.circuit_breaker_open
            && std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
                self.inner.on_observe_u64(addr, dist, value)
            }))
            .is_err()
        {
            self.handle_error("observe_u64_panic", addr);
        }
    }

    fn on_observe_usize(&mut self, addr: &Address, dist: &dyn Distribution<usize>, value: usize) {
        if !self.circuit_breaker_open
            && std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
                self.inner.on_observe_usize(addr, dist, value)
            }))
            .is_err()
        {
            self.handle_error("observe_usize_panic", addr);
        }
    }

    fn on_factor(&mut self, log_weight: f64) {
        if !log_weight.is_finite() {
            eprintln!("Invalid factor log-weight: {}", log_weight);
            self.error_count += 1;
            return; // Skip invalid factors
        }

        if !self.circuit_breaker_open {
            self.inner.on_factor(log_weight);
        }
    }

    fn finish(self) -> Trace {
        println!("✅ Production handler statistics:");
        println!(
            "   - Errors encountered: {}/{}",
            self.error_count, self.max_errors
        );
        println!(
            "   - Circuit breaker status: {}",
            if self.circuit_breaker_open {
                "OPEN (fallback mode)"
            } else {
                "CLOSED (normal)"
            }
        );

        if self.circuit_breaker_open {
            // Return minimal valid trace in fallback mode
            Trace::default()
        } else {
            self.inner.finish()
        }
    }
}

Robust Error Handling Features:

  • Circuit Breaker Pattern: Prevents cascade failures by switching to fallback mode
  • Panic Recovery: Catches panics and provides safe default values
  • Input Validation: Ensures all inputs are finite and within expected ranges
  • Fallback Values: Domain-specific defaults for different parameter types
  • Error Counting: Tracks error rates to trigger circuit breaker activation

Configuration Management

Production models require flexible configuration for different environments:

#[derive(Debug, Clone)]
struct ModelConfig {
    // Model parameters
    temperature_prior_mean: f64,
    temperature_prior_std: f64,
    validity_probability: f64,
    sensor_noise_std: f64,

    // Runtime configuration
    max_inference_time_ms: u64,
    memory_pool_size: usize,
    enable_circuit_breaker: bool,
    error_threshold: u32,

    // Environment settings
    environment: String, // "development", "staging", "production"
    _log_level: String,
    enable_metrics: bool,
}

impl Default for ModelConfig {
    fn default() -> Self {
        Self {
            temperature_prior_mean: 20.0,
            temperature_prior_std: 5.0,
            validity_probability: 0.95,
            sensor_noise_std: 1.0,
            max_inference_time_ms: 1000,
            memory_pool_size: 100,
            enable_circuit_breaker: true,
            error_threshold: 10,
            environment: "production".to_string(),
            _log_level: "info".to_string(),
            enable_metrics: true,
        }
    }
}

struct ConfigurableModelRunner {
    config: ModelConfig,
    pool: TracePool,
    metrics: ProductionMetrics,
}

impl ConfigurableModelRunner {
    fn new(config: ModelConfig) -> Self {
        Self {
            pool: TracePool::new(config.memory_pool_size),
            metrics: ProductionMetrics::new(config.enable_metrics),
            config,
        }
    }

    fn create_model(&self) -> Model<(f64, bool)> {
        let config = self.config.clone();
        prob!(
            let temp <- sample(
                addr!("temperature"),
                Normal::new(config.temperature_prior_mean, config.temperature_prior_std).unwrap()
            );
            let valid <- sample(
                addr!("valid"),
                Bernoulli::new(config.validity_probability).unwrap()
            );
            // Simulate sensor reading with configured noise
            observe(
                addr!("sensor"),
                Normal::new(temp, config.sensor_noise_std).unwrap(),
                22.0
            );
            pure((temp, valid))
        )
    }

    fn run_inference(&mut self) -> Result<(f64, bool), String> {
        let start = Instant::now();

        // Configure handler based on environment
        let mut rng = thread_rng();
        let model = self.create_model(); // Create model before borrowing
        let result = if self.config.environment == "production" {
            // Use safe, fault-tolerant execution in production
            let base_handler = PooledPriorHandler::new(&mut rng, &mut self.pool);
            let robust_handler =
                RobustProductionHandler::new(base_handler, self.config.error_threshold);

            let (result, _trace) = runtime::handler::run(robust_handler, model);
            Ok(result)
        } else {
            // Use faster, less safe execution in development
            let handler = PriorHandler {
                rng: &mut rng,
                trace: Trace::default(),
            };
            let (result, _trace) = runtime::handler::run(handler, model);
            Ok(result)
        };

        let duration = start.elapsed();
        self.metrics.record_inference_time(duration);

        // Check timeout
        if duration.as_millis() > self.config.max_inference_time_ms as u128 {
            self.metrics.increment_timeout_count();
            return Err(format!(
                "Inference timeout: {}ms > {}ms",
                duration.as_millis(),
                self.config.max_inference_time_ms
            ));
        }

        result
    }
}

Configuration Best Practices:

  • Environment-Specific Settings: Different behavior for development/staging/production
  • Model Parameter Configuration: Tunable priors, noise levels, and thresholds
  • Runtime Configuration: Memory pool sizes, timeout limits, error thresholds
  • Deployment Configuration: Circuit breaker settings, logging levels, metrics enablement
  • Type-Safe Defaults: Sensible fallbacks for all configuration parameters

Production Metrics and Observability

Observability requires systematic metric collection with statistical analysis and anomaly detection. The metric taxonomy follows the USE method (Utilization, Saturation, Errors) and RED method (Rate, Errors, Duration):

graph TD
    subgraph "Observability Architecture"
        A[Model Execution] --> B[Metric Collection]
        B --> C{Metric Type}
        C -->|USE| D[Resource Metrics]
        C -->|RED| E[Service Metrics]  
        C -->|Business| F[Domain Metrics]
        
        D --> G[Utilization: ρ = λ/μ]
        D --> H[Saturation: Queue Length]
        D --> I[Error Rate: λₑ]
        
        E --> J[Request Rate: λᵣ]
        E --> K[Error Rate: εᵣ] 
        E --> L[Duration: T₉₉]
        
        F --> M[Inference Accuracy]
        F --> N[Model Drift]
        F --> O[Business KPIs]
        
        G --> P[(Time Series DB)]
        H --> P
        I --> P
        J --> P
        K --> P
        L --> P
        M --> Q[(Analytics DB)]
        N --> Q
        O --> Q
    end

Statistical Process Control: Metrics follow control chart theory with statistical control limits:

Anomaly Detection: Using exponentially weighted moving averages:

#[derive(Debug, Clone)]
struct ProductionMetrics {
    enabled: bool,
    inference_count: u64,
    error_count: u64,
    timeout_count: u64,
    total_inference_time: Duration,
    start_time: SystemTime,
}

impl ProductionMetrics {
    fn new(enabled: bool) -> Self {
        Self {
            enabled,
            inference_count: 0,
            error_count: 0,
            timeout_count: 0,
            total_inference_time: Duration::ZERO,
            start_time: SystemTime::now(),
        }
    }

    fn record_inference_time(&mut self, duration: Duration) {
        if self.enabled {
            self.inference_count += 1;
            self.total_inference_time += duration;
        }
    }

    fn _increment_error_count(&mut self) {
        if self.enabled {
            self.error_count += 1;
        }
    }

    fn increment_timeout_count(&mut self) {
        if self.enabled {
            self.timeout_count += 1;
        }
    }

    fn get_stats(&self) -> HashMap<String, f64> {
        let mut stats = HashMap::new();
        if self.enabled {
            let uptime = self.start_time.elapsed().unwrap_or(Duration::ZERO);
            let avg_inference_time = if self.inference_count > 0 {
                self.total_inference_time.as_millis() as f64 / self.inference_count as f64
            } else {
                0.0
            };

            stats.insert("inference_count".to_string(), self.inference_count as f64);
            stats.insert("error_count".to_string(), self.error_count as f64);
            stats.insert("timeout_count".to_string(), self.timeout_count as f64);
            stats.insert(
                "error_rate".to_string(),
                if self.inference_count > 0 {
                    self.error_count as f64 / self.inference_count as f64
                } else {
                    0.0
                },
            );
            stats.insert("avg_inference_time_ms".to_string(), avg_inference_time);
            stats.insert("uptime_seconds".to_string(), uptime.as_secs() as f64);
            stats.insert(
                "throughput_per_second".to_string(),
                if uptime.as_secs() > 0 {
                    self.inference_count as f64 / uptime.as_secs() as f64
                } else {
                    0.0
                },
            );
        }
        stats
    }

    fn export_prometheus_metrics(&self) -> String {
        let mut metrics = String::new();
        let stats = self.get_stats();

        metrics.push_str("# HELP fugue_inference_total Total number of inference runs\n");
        metrics.push_str("# TYPE fugue_inference_total counter\n");
        metrics.push_str(&format!(
            "fugue_inference_total {}\n",
            stats.get("inference_count").unwrap_or(&0.0)
        ));

        metrics.push_str("# HELP fugue_errors_total Total number of errors\n");
        metrics.push_str("# TYPE fugue_errors_total counter\n");
        metrics.push_str(&format!(
            "fugue_errors_total {}\n",
            stats.get("error_count").unwrap_or(&0.0)
        ));

        metrics.push_str(
            "# HELP fugue_inference_duration_ms Average inference duration in milliseconds\n",
        );
        metrics.push_str("# TYPE fugue_inference_duration_ms gauge\n");
        metrics.push_str(&format!(
            "fugue_inference_duration_ms {}\n",
            stats.get("avg_inference_time_ms").unwrap_or(&0.0)
        ));

        metrics.push_str("# HELP fugue_error_rate Error rate (errors/total inferences)\n");
        metrics.push_str("# TYPE fugue_error_rate gauge\n");
        metrics.push_str(&format!(
            "fugue_error_rate {}\n",
            stats.get("error_rate").unwrap_or(&0.0)
        ));

        metrics
    }
}

/// Production monitoring handler that integrates with metrics systems
struct MetricsHandler<H: Handler> {
    inner: H,
    metrics: Arc<std::sync::Mutex<ProductionMetrics>>,
    _model_name: String,
}

impl<H: Handler> MetricsHandler<H> {
    fn new(
        inner: H,
        metrics: Arc<std::sync::Mutex<ProductionMetrics>>,
        model_name: String,
    ) -> Self {
        Self {
            inner,
            metrics,
            _model_name: model_name,
        }
    }
}

impl<H: Handler> Handler for MetricsHandler<H> {
    fn on_sample_f64(&mut self, addr: &Address, dist: &dyn Distribution<f64>) -> f64 {
        let start = Instant::now();
        let result = self.inner.on_sample_f64(addr, dist);

        if let Ok(mut metrics) = self.metrics.lock() {
            metrics.record_inference_time(start.elapsed());
        }

        result
    }

    fn on_sample_bool(&mut self, addr: &Address, dist: &dyn Distribution<bool>) -> bool {
        self.inner.on_sample_bool(addr, dist)
    }

    fn on_sample_u64(&mut self, addr: &Address, dist: &dyn Distribution<u64>) -> u64 {
        self.inner.on_sample_u64(addr, dist)
    }

    fn on_sample_usize(&mut self, addr: &Address, dist: &dyn Distribution<usize>) -> usize {
        self.inner.on_sample_usize(addr, dist)
    }

    fn on_observe_f64(&mut self, addr: &Address, dist: &dyn Distribution<f64>, value: f64) {
        self.inner.on_observe_f64(addr, dist, value);
    }

    fn on_observe_bool(&mut self, addr: &Address, dist: &dyn Distribution<bool>, value: bool) {
        self.inner.on_observe_bool(addr, dist, value);
    }

    fn on_observe_u64(&mut self, addr: &Address, dist: &dyn Distribution<u64>, value: u64) {
        self.inner.on_observe_u64(addr, dist, value);
    }

    fn on_observe_usize(&mut self, addr: &Address, dist: &dyn Distribution<usize>, value: usize) {
        self.inner.on_observe_usize(addr, dist, value);
    }

    fn on_factor(&mut self, log_weight: f64) {
        self.inner.on_factor(log_weight);
    }

    fn finish(self) -> Trace {
        self.inner.finish()
    }
}

Metrics Collection:

  • Performance Metrics: Inference time, throughput, operation counts
  • Error Tracking: Error rates, timeout counts, failure categorization
  • System Health: Uptime, resource utilization, memory pool efficiency
  • Prometheus Integration: Standard metrics format for monitoring systems
  • Real-Time Dashboards: Live performance and health indicators

Health Checks and System Validation

Health monitoring implements continuous system validation through multi-level health checks with statistical thresholds and predictive alerting:

graph TD
    subgraph "Health Check Hierarchy"
        A[System Health H⁽ˢ⁾] --> B[Model Health H⁽ᵐ⁾]
        B --> C[Inference Health H⁽ⁱ⁾]
        C --> D[Resource Health H⁽ʳ⁾]

        A --> E{H⁽ˢ⁾ > θₛ?}
        B --> F{H⁽ᵐ⁾ > θₘ?}
        C --> G{H⁽ⁱ⁾ > θᵢ?}
        D --> H{H⁽ʳ⁾ > θʳ?}

        E -->|No| I[System Alert]
        F -->|No| J[Model Alert]
        G -->|No| K[Inference Alert]
        H -->|No| L[Resource Alert]

        E -->|Yes| M[System OK]
        F -->|Yes| M
        G -->|Yes| M
        H -->|Yes| M
    end

Health Score Calculation: Weighted combination of subsystem health:

where are importance weights and .

Predictive Health Modeling: Using time series forecasting:

Health Degradation Alert

Early Warning System: When for sustained periods, the system triggers preemptive scaling or graceful degradation before reaching critical thresholds.

#[derive(Debug, Clone)]
struct HealthCheckResult {
    status: HealthStatus,
    message: String,
    details: HashMap<String, String>,
    timestamp: SystemTime,
}

#[derive(Debug, Clone, PartialEq)]
enum HealthStatus {
    Healthy,
    Degraded,
    Unhealthy,
}

struct ProductionHealthChecker {
    model_config: ModelConfig,
    metrics: Arc<std::sync::Mutex<ProductionMetrics>>,
}

impl ProductionHealthChecker {
    fn new(config: ModelConfig, metrics: Arc<std::sync::Mutex<ProductionMetrics>>) -> Self {
        Self {
            model_config: config,
            metrics,
        }
    }

    fn run_health_check(&self) -> HealthCheckResult {
        let mut details = HashMap::new();
        let mut overall_status = HealthStatus::Healthy;
        let mut messages = Vec::new();

        // Check 1: Model execution health
        match self.check_model_execution() {
            Ok(duration) => {
                details.insert("model_execution".to_string(), "healthy".to_string());
                details.insert(
                    "execution_time_ms".to_string(),
                    format!("{:.1}", duration.as_millis()),
                );
            }
            Err(e) => {
                overall_status = HealthStatus::Unhealthy;
                messages.push(format!("Model execution failed: {}", e));
                details.insert("model_execution".to_string(), "failed".to_string());
            }
        }

        // Check 2: Memory usage
        if let Some(pool_stats) = self.check_memory_health() {
            let hit_ratio = pool_stats.hit_ratio();
            details.insert("memory_hit_ratio".to_string(), format!("{:.2}%", hit_ratio));

            if hit_ratio < 50.0 {
                overall_status = HealthStatus::Degraded;
                messages.push("Low memory pool hit ratio".to_string());
            }
        }

        // Check 3: Error rates
        if let Ok(metrics) = self.metrics.lock() {
            let stats = metrics.get_stats();
            let error_rate = stats.get("error_rate").unwrap_or(&0.0) * 100.0;
            details.insert(
                "error_rate_percent".to_string(),
                format!("{:.2}%", error_rate),
            );

            if error_rate > 5.0 {
                overall_status = HealthStatus::Degraded;
                messages.push(format!("High error rate: {:.1}%", error_rate));
            } else if error_rate > 20.0 {
                overall_status = HealthStatus::Unhealthy;
                messages.push(format!("Critical error rate: {:.1}%", error_rate));
            }

            let avg_time = stats.get("avg_inference_time_ms").unwrap_or(&0.0);
            details.insert(
                "avg_inference_time_ms".to_string(),
                format!("{:.1}", avg_time),
            );

            if *avg_time > self.model_config.max_inference_time_ms as f64 * 0.8 {
                overall_status = HealthStatus::Degraded;
                messages.push("Inference time approaching timeout threshold".to_string());
            }
        }

        // Check 4: System resources
        details.insert(
            "memory_pool_size".to_string(),
            self.model_config.memory_pool_size.to_string(),
        );
        details.insert(
            "circuit_breaker".to_string(),
            if self.model_config.enable_circuit_breaker {
                "enabled".to_string()
            } else {
                "disabled".to_string()
            },
        );

        let message = if messages.is_empty() {
            "All systems healthy".to_string()
        } else {
            messages.join("; ")
        };

        HealthCheckResult {
            status: overall_status,
            message,
            details,
            timestamp: SystemTime::now(),
        }
    }

    fn check_model_execution(&self) -> Result<Duration, String> {
        let start = Instant::now();
        let mut rng = thread_rng();

        // Run a simplified version of the model for health checking
        let health_model = || {
            prob!(
                let value <- sample(addr!("health_check"), Normal::new(0.0, 1.0).unwrap());
                pure(value)
            )
        };

        let handler = PriorHandler {
            rng: &mut rng,
            trace: Trace::default(),
        };

        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            runtime::handler::run(handler, health_model())
        })) {
            Ok((result, trace)) => {
                if result.is_finite() && trace.total_log_weight().is_finite() {
                    Ok(start.elapsed())
                } else {
                    Err("Invalid model output".to_string())
                }
            }
            Err(_) => Err("Model execution panicked".to_string()),
        }
    }

    fn check_memory_health(&self) -> Option<fugue::runtime::memory::PoolStats> {
        // In a real implementation, this would check the actual memory pool
        // For demonstration, we'll create a temporary pool
        let pool = TracePool::new(10);
        Some(pool.stats().clone())
    }
}

Health Check Components:

  • Model Execution Health: Verifies core functionality with simplified tests
  • Memory Health: Monitors pool efficiency and memory usage patterns
  • Error Rate Analysis: Tracks and categorizes different failure modes
  • Performance Monitoring: Identifies degradation before it impacts users
  • Multi-Level Status: Healthy/Degraded/Unhealthy with detailed diagnostics

Input Validation and Security

Robust input validation prevents security vulnerabilities and system failures:

/// Secure input validator for production model parameters
struct InputValidator;

impl InputValidator {
    fn validate_temperature(temp: f64) -> Result<f64, String> {
        match temp {
            t if !t.is_finite() => Err("Temperature must be finite".to_string()),
            t if t < -50.0 => Err("Temperature too low (< -50°C)".to_string()),
            t if t > 100.0 => Err("Temperature too high (> 100°C)".to_string()),
            t => Ok(t),
        }
    }

    fn validate_probability(p: f64) -> Result<f64, String> {
        match p {
            p if !p.is_finite() => Err("Probability must be finite".to_string()),
            p if p < 0.0 => Err("Probability must be non-negative".to_string()),
            p if p > 1.0 => Err("Probability must not exceed 1.0".to_string()),
            p => Ok(p),
        }
    }

    fn _validate_sensor_reading(reading: f64) -> Result<f64, String> {
        match reading {
            r if !r.is_finite() => Err("Sensor reading must be finite".to_string()),
            r if r.abs() > 1000.0 => Err("Sensor reading out of reasonable range".to_string()),
            r => Ok(r),
        }
    }

    fn sanitize_address_component(component: &str) -> Result<String, String> {
        // Prevent injection attacks in address components
        if component
            .chars()
            .any(|c| !(c.is_alphanumeric() || c == '_' || c == '-'))
        {
            return Err("Address component contains invalid characters".to_string());
        }

        if component.len() > 50 {
            Err("Address component too long".to_string())
        } else if component.is_empty() {
            Err("Address component cannot be empty".to_string())
        } else {
            Ok(component.to_string())
        }
    }
}

/// Production model with comprehensive input validation
fn create_validated_model(
    temperature_reading: f64,
    sensor_id: &str,
    prior_prob: f64,
) -> Result<Model<(f64, bool)>, String> {
    // Validate all inputs before model creation
    let validated_temp = InputValidator::validate_temperature(temperature_reading)?;
    let validated_prob = InputValidator::validate_probability(prior_prob)?;
    let sanitized_sensor_id = InputValidator::sanitize_address_component(sensor_id)?;

    // Additional business logic validation
    if sanitized_sensor_id.starts_with("test_") && validated_prob > 0.5 {
        return Err("Test sensors cannot have high prior probability".to_string());
    }

    Ok(prob!(
        let true_temp <- sample(
            addr!("temperature"),
            Normal::new(validated_temp, 2.0).unwrap()
        );
        let is_working <- sample(
            addr!("sensor_working", sanitized_sensor_id.clone()),
            Bernoulli::new(validated_prob).unwrap()
        );

        // Safe observation with validated input
        observe(
            addr!("reading", sanitized_sensor_id),
            Normal::new(true_temp, if is_working { 0.5 } else { 5.0 }).unwrap(),
            validated_temp
        );

        pure((true_temp, is_working))
    ))
}

Security Measures:

  • Range Validation: Ensure parameters are within physically meaningful bounds
  • Type Safety: Validate all inputs before model construction
  • Sanitization: Clean address components to prevent injection attacks
  • Business Rule Enforcement: Domain-specific validation logic
  • Error Messages: Informative feedback without revealing system internals

Deployment Strategies and Patterns

Deployment strategies implement risk management through controlled rollout and statistical validation. Each strategy provides different risk-latency tradeoffs:

graph TD
    subgraph "Deployment Strategy Matrix"  
        A[New Model Version] --> B{Strategy Selection}
        B -->|Low Risk| C[Blue-Green]
        B -->|Medium Risk| D[Canary]
        B -->|High Risk| E[A/B Test]

        C --> F[Instant Switch<br/>Risk: High<br/>Rollback: Fast]
        D --> G[Gradual Rollout<br/>Risk: Medium<br/>Validation: Statistical]
        E --> H[Statistical Test<br/>Risk: Low<br/>Duration: Long]

        F --> I{Success?}
        G --> J{Performance > Baseline?}
        E --> K{Significance Test?}

        I -->|No| L[Instant Rollback]
        J -->|No| M[Gradual Rollback]
        K -->|No| N[Maintain Status Quo]

        I -->|Yes| O[Full Deployment]
        J -->|Yes| P[Continue Rollout]
        K -->|Yes| Q[Gradual Migration]
    end

Canary Analysis: Statistical significance testing for canary deployments:

A/B Testing: Welch's t-test for unequal variances:

/// Production deployment manager with different strategies
#[derive(Debug, Clone)]
enum DeploymentStrategy {
    BlueGreen,
    CanaryRelease { percentage: f64 },
    RollingUpdate,
    _ImmediateSwitch,
}

struct ModelDeploymentManager {
    current_model_version: String,
    candidate_model_version: String,
    deployment_strategy: DeploymentStrategy,
    _rollback_threshold_error_rate: f64,
}

impl ModelDeploymentManager {
    fn new(strategy: DeploymentStrategy) -> Self {
        Self {
            current_model_version: "v1.0.0".to_string(),
            candidate_model_version: "v1.1.0".to_string(),
            deployment_strategy: strategy,
            _rollback_threshold_error_rate: 0.05, // 5% error rate triggers rollback
        }
    }

    fn should_use_candidate_model(&self, request_id: u64) -> bool {
        match &self.deployment_strategy {
            DeploymentStrategy::BlueGreen => {
                // In blue-green, we typically switch all traffic at once
                // For demo, we'll use request ID to simulate the switch
                request_id % 100 < 10 // 10% to candidate for testing
            }
            DeploymentStrategy::CanaryRelease { percentage } => {
                let hash = request_id % 100;
                (hash as f64) < (*percentage * 100.0)
            }
            DeploymentStrategy::RollingUpdate => {
                // Gradual rollout based on some criteria
                request_id % 10 < 3 // 30% rollout
            }
            DeploymentStrategy::_ImmediateSwitch => true,
        }
    }

    fn create_model(&self, use_candidate: bool) -> impl Fn() -> Model<f64> {
        let version = if use_candidate {
            self.candidate_model_version.clone()
        } else {
            self.current_model_version.clone()
        };

        move || {
            if version.starts_with("v1.1") {
                // Candidate model with improved parameters
                prob!(
                    let value <- sample(addr!("improved_param"), Normal::new(0.0, 0.8).unwrap());
                    factor(0.1); // Slight preference for this model
                    pure(value)
                )
            } else {
                // Current stable model
                prob!(
                    let value <- sample(addr!("stable_param"), Normal::new(0.0, 1.0).unwrap());
                    pure(value)
                )
            }
        }
    }

    fn process_request(&self, request_id: u64) -> Result<(f64, String), String> {
        let use_candidate = self.should_use_candidate_model(request_id);
        let version = if use_candidate {
            &self.candidate_model_version
        } else {
            &self.current_model_version
        };

        let model = self.create_model(use_candidate);

        // Execute with error handling
        let mut rng = thread_rng();
        let handler = PriorHandler {
            rng: &mut rng,
            trace: Trace::default(),
        };

        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            runtime::handler::run(handler, model())
        })) {
            Ok((result, trace)) => {
                if result.is_finite() && trace.total_log_weight().is_finite() {
                    Ok((result, version.clone()))
                } else {
                    Err(format!("Invalid result from model {}", version))
                }
            }
            Err(_) => Err(format!("Model {} panicked", version)),
        }
    }
}

Deployment Patterns:

  • Blue-Green Deployment: Instant traffic switching between model versions
  • Canary Releases: Gradual rollout to percentage of traffic for risk mitigation
  • Rolling Updates: Progressive deployment across infrastructure
  • A/B Testing: Compare model performance with statistical significance
  • Rollback Capability: Quick reversion to previous version on issues

Performance Optimization Patterns

Memory Management

use fugue::runtime::memory::{TracePool, PooledPriorHandler};

// Production memory management
let mut pool = TracePool::new(1000);
let handler = PooledPriorHandler::new(&mut rng, &mut pool);

Batch Processing

// Process multiple inference requests efficiently
struct BatchProcessor {
    pool: TracePool,
    batch_size: usize,
}

impl BatchProcessor {
    fn process_batch(&mut self, requests: Vec<InferenceRequest>) -> Vec<InferenceResult> {
        requests.into_iter().map(|req| {
            let handler = PooledPriorHandler::new(&mut req.rng, &mut self.pool);
            self.run_single_inference(handler, req.model)
        }).collect()
    }
}

Connection Pooling

// Database connection management for model parameters
struct ModelParameterStore {
    connection_pool: Arc<ConnectionPool>,
    parameter_cache: LruCache<String, ModelParameters>,
}

Monitoring Integration

Prometheus Metrics

// Export metrics in Prometheus format
fn export_metrics(metrics: &ProductionMetrics) -> String {
    format!(
        "# HELP fugue_inference_total Total inference operations\n\
         TYPE fugue_inference_total counter\n\
         fugue_inference_total {}\n\
         HELP fugue_error_rate Current error rate\n\
         TYPE fugue_error_rate gauge\n\
         fugue_error_rate {}\n",
        metrics.inference_count,
        metrics.error_rate()
    )
}

Structured Logging

use serde_json::json;

// Structured logging for production debugging
fn log_inference_event(
    request_id: &str,
    model_version: &str,
    duration: Duration,
    result: &InferenceResult
) {
    let log_entry = json!({
        "event": "inference_completed",
        "request_id": request_id,
        "model_version": model_version,
        "duration_ms": duration.as_millis(),
        "success": result.is_success(),
        "timestamp": SystemTime::now(),
    });
    println!("{}", log_entry);
}

Alert Rules

// Define alerting thresholds
struct AlertRules {
    max_error_rate: f64,
    max_latency_ms: u64,
    min_throughput_per_sec: f64,
}

impl AlertRules {
    fn check_alerts(&self, metrics: &ProductionMetrics) -> Vec<Alert> {
        let mut alerts = Vec::new();

        if metrics.error_rate() > self.max_error_rate {
            alerts.push(Alert::HighErrorRate(metrics.error_rate()));
        }

        if metrics.avg_latency().as_millis() > self.max_latency_ms as u128 {
            alerts.push(Alert::HighLatency(metrics.avg_latency()));
        }

        alerts
    }
}

Testing in Production

Shadow Mode Testing

// Run new model versions in shadow mode
struct ShadowTester {
    primary_model: Box<dyn Fn() -> Model<f64>>,
    shadow_model: Box<dyn Fn() -> Model<f64>>,
    comparison_rate: f64,
}

impl ShadowTester {
    fn run_with_shadow(&mut self, input: &Input) -> (PrimaryResult, Option<ShadowResult>) {
        let primary = self.run_primary(input);

        let shadow = if rand::random::<f64>() < self.comparison_rate {
            Some(self.run_shadow(input))
        } else {
            None
        };

        (primary, shadow)
    }
}

Production Validation

// Continuous validation in production
fn validate_model_assumptions(trace: &Trace) -> ValidationResult {
    let mut issues = Vec::new();

    // Check log-weight stability
    if !trace.total_log_weight().is_finite() {
        issues.push("Non-finite log-weight detected".to_string());
    }

    // Check parameter ranges
    for (addr, choice) in &trace.choices {
        if let ChoiceValue::F64(value) = choice.value {
            if value.abs() > 1000.0 {
                issues.push(format!("Extreme value at {}: {}", addr, value));
            }
        }
    }

    ValidationResult { issues }
}

Operational Excellence

Infrastructure as Code

# Kubernetes deployment example
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fugue-inference-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: fugue-inference
  template:
    metadata:
      labels:
        app: fugue-inference
    spec:
      containers:
      - name: inference-service
        image: fugue-inference:v1.2.0
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10

Service Level Objectives (SLOs)

// Define and monitor SLOs
struct ServiceLevelObjectives {
    availability_target: f64,    // 99.9%
    latency_p99_ms: u64,        // 100ms
    error_rate_threshold: f64,   // 0.1%
}

impl ServiceLevelObjectives {
    fn evaluate_slo_compliance(&self, metrics: &ProductionMetrics) -> SLOReport {
        SLOReport {
            availability: self.calculate_availability(metrics),
            latency_compliance: metrics.p99_latency() <= Duration::from_millis(self.latency_p99_ms),
            error_rate_compliance: metrics.error_rate() <= self.error_rate_threshold,
        }
    }
}

Capacity Planning

// Capacity planning and auto-scaling
struct CapacityPlanner {
    target_cpu_utilization: f64,
    target_memory_utilization: f64,
    scale_up_threshold: f64,
    scale_down_threshold: f64,
}

impl CapacityPlanner {
    fn recommend_scaling(&self, current_metrics: &SystemMetrics) -> ScalingRecommendation {
        if current_metrics.cpu_utilization > self.scale_up_threshold {
            ScalingRecommendation::ScaleUp(self.calculate_scale_factor(current_metrics))
        } else if current_metrics.cpu_utilization < self.scale_down_threshold {
            ScalingRecommendation::ScaleDown(0.5)
        } else {
            ScalingRecommendation::NoAction
        }
    }
}

Security Best Practices

Input Sanitization

Always validate and sanitize inputs before processing:

  • Range checks for numerical parameters
  • Character filtering for string inputs
  • Business rule validation for domain constraints
  • Rate limiting to prevent abuse
  • Authentication and authorization for API access

Secret Management

// Secure configuration management
struct SecureConfig {
    database_url: SecretString,
    api_key: SecretString,
    model_parameters: ModelConfig,
}

impl SecureConfig {
    fn from_environment() -> Result<Self, ConfigError> {
        Ok(SecureConfig {
            database_url: env::var("DATABASE_URL")?.into(),
            api_key: env::var("API_KEY")?.into(),
            model_parameters: ModelConfig::from_file("model_config.toml")?,
        })
    }
}

Audit Logging

// Comprehensive audit trail
fn log_inference_request(
    user_id: &str,
    request: &InferenceRequest,
    response: &InferenceResponse,
) {
    let audit_log = AuditLogEntry {
        timestamp: SystemTime::now(),
        user_id: user_id.to_string(),
        action: "inference_request".to_string(),
        input_hash: hash_sensitive_data(&request.input),
        output_hash: hash_sensitive_data(&response.output),
        model_version: response.model_version.clone(),
        success: response.success,
    };

    audit_logger::log(audit_log);
}

Common Production Pitfalls

Memory Leaks

// Avoid: Creating new pools repeatedly
// for _ in 0..1000 {
//     let pool = TracePool::new(100); // Memory leak!
// }

// Do: Reuse pools across requests
let mut pool = TracePool::new(100);
for request in requests {
    let handler = PooledPriorHandler::new(&mut request.rng, &mut pool);
    process_request(handler, request);
}

Blocking Operations

// Avoid: Synchronous database calls in request handlers
// let result = database.query_sync(query); // Blocks event loop

// Do: Use async operations with proper timeouts
async fn process_request(request: Request) -> Result<Response, Error> {
    let timeout = Duration::from_millis(100);
    let result = tokio::time::timeout(timeout, database.query(query)).await??;
    Ok(result)
}

Error Propagation

// Avoid: Panicking on errors
// let value = risky_operation().unwrap(); // May crash service

// Do: Graceful error handling with fallbacks
let value = match risky_operation() {
    Ok(v) => v,
    Err(e) => {
        metrics.increment_error_count();
        log::warn!("Operation failed: {}, using fallback", e);
        fallback_value()
    }
};

Production Excellence Framework

Successful production deployment combines mathematical rigor with engineering excellence:

  1. Reliability Engineering: Fault tolerance through statistical modeling and circuit breaker patterns
  2. Performance Optimization: Memory pooling, numerical stability, and batch processing
  3. Observability: Multi-dimensional metrics with statistical process control
  4. Deployment Strategies: Risk-managed rollouts with statistical validation
  5. Health Monitoring: Predictive alerting and graceful degradation

These patterns enable robust production systems capable of handling real-world probabilistic computing at scale.

Production deployment represents the culmination of probabilistic programming excellence, where theoretical foundations meet operational reality. Fugue's comprehensive tooling transforms academic probabilistic models into production-grade systems that deliver reliable, scalable, and maintainable probabilistic computing solutions.