Production Deployment
- Error Handling and Graceful Degradation
- Configuration Management
- Production Metrics and Observability
- Health Checks and System Validation
- Input Validation and Security
- Deployment Strategies and Patterns
- Performance Optimization Patterns
- Monitoring Integration
- Testing in Production
- Operational Excellence
- Security Best Practices
- Common Production Pitfalls
Production deployment of probabilistic models requires reliability engineering, performance optimization, and operational excellence at scale. This guide establishes a mathematical framework for fault tolerance, service reliability, and system observability using Fugue's production-ready infrastructure patterns.
Production systems exhibit stochastic reliability characterized by:
- Availability: where MTBF = Mean Time Between Failures
- Reliability Function: for exponential failure rates
- Service Level Agreement:
Fugue's deployment patterns optimize these metrics through systematic fault isolation and graceful degradation.
Error Handling and Graceful Degradation
Graceful degradation implements fault tolerance through systematic error recovery and service continuity. The mathematical foundation relies on Markov reliability models and circuit breaker theory:
stateDiagram-v2 [*] --> Healthy Healthy --> Degraded : Error Rate > τ₁ Degraded --> Failed : Error Rate > τ₂ Failed --> Recovery : Time > T_recovery Recovery --> Healthy : Success Rate > σ Degraded --> Healthy : Error Rate < τ₀ note right of Healthy Error Rate: λ < τ₀ SLA: 99.9% Full Functionality end note note right of Degraded Error Rate: τ₀ < λ < τ₁ SLA: 95% Limited Functionality end note note right of Failed Error Rate: λ > τ₂ Circuit Open Fallback Mode end note
Circuit Breaker Mathematics: The failure rate follows a Poisson process with rate . The circuit breaker transitions based on:
Error Budget Model: For SLA target , the error budget is:
/// Production-ready handler that gracefully handles failures
struct RobustProductionHandler<H: Handler> {
inner: H,
error_count: u32,
max_errors: u32,
_fallback_values: HashMap<String, ChoiceValue>,
circuit_breaker_open: bool,
}
impl<H: Handler> RobustProductionHandler<H> {
fn new(inner: H, max_errors: u32) -> Self {
let mut fallback_values = HashMap::new();
fallback_values.insert("default_f64".to_string(), ChoiceValue::F64(0.0));
fallback_values.insert("default_bool".to_string(), ChoiceValue::Bool(false));
fallback_values.insert("default_u64".to_string(), ChoiceValue::U64(0));
fallback_values.insert("default_usize".to_string(), ChoiceValue::Usize(0));
Self {
inner,
error_count: 0,
max_errors,
_fallback_values: fallback_values,
circuit_breaker_open: false,
}
}
fn handle_error(&mut self, operation: &str, addr: &Address) -> bool {
self.error_count += 1;
eprintln!("PRODUCTION ERROR: {} failed at address {}", operation, addr);
if self.error_count >= self.max_errors {
self.circuit_breaker_open = true;
eprintln!("CIRCUIT BREAKER: Too many errors, switching to fallback mode");
}
self.circuit_breaker_open
}
fn get_fallback_f64(&self, addr: &Address) -> f64 {
// In production, this might come from a cache, configuration, or ML model
match addr.0.as_str() {
s if s.contains("temperature") => 20.0,
s if s.contains("price") => 100.0,
s if s.contains("probability") => 0.5,
_ => 0.0,
}
}
}
impl<H: Handler> Handler for RobustProductionHandler<H> {
fn on_sample_f64(&mut self, addr: &Address, dist: &dyn Distribution<f64>) -> f64 {
if self.circuit_breaker_open {
return self.get_fallback_f64(addr);
}
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
self.inner.on_sample_f64(addr, dist)
})) {
Ok(value) if value.is_finite() => value,
Ok(invalid_value) => {
eprintln!("Invalid f64 sample: {} at {}", invalid_value, addr);
self.handle_error("sample_f64", addr);
self.get_fallback_f64(addr)
}
Err(_) => {
self.handle_error("sample_f64_panic", addr);
self.get_fallback_f64(addr)
}
}
}
fn on_sample_bool(&mut self, addr: &Address, dist: &dyn Distribution<bool>) -> bool {
if self.circuit_breaker_open {
return false; // Safe fallback
}
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
self.inner.on_sample_bool(addr, dist)
})) {
Ok(value) => value,
Err(_) => {
self.handle_error("sample_bool_panic", addr);
false
}
}
}
fn on_sample_u64(&mut self, addr: &Address, dist: &dyn Distribution<u64>) -> u64 {
if self.circuit_breaker_open {
return 1; // Safe default
}
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
self.inner.on_sample_u64(addr, dist)
})) {
Ok(value) => value,
Err(_) => {
self.handle_error("sample_u64_panic", addr);
1
}
}
}
fn on_sample_usize(&mut self, addr: &Address, dist: &dyn Distribution<usize>) -> usize {
if self.circuit_breaker_open {
return 0; // Safe array index
}
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
self.inner.on_sample_usize(addr, dist)
})) {
Ok(value) => value,
Err(_) => {
self.handle_error("sample_usize_panic", addr);
0
}
}
}
fn on_observe_f64(&mut self, addr: &Address, dist: &dyn Distribution<f64>, value: f64) {
if !self.circuit_breaker_open
&& std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
self.inner.on_observe_f64(addr, dist, value)
}))
.is_err()
{
self.handle_error("observe_f64_panic", addr);
}
}
fn on_observe_bool(&mut self, addr: &Address, dist: &dyn Distribution<bool>, value: bool) {
if !self.circuit_breaker_open
&& std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
self.inner.on_observe_bool(addr, dist, value)
}))
.is_err()
{
self.handle_error("observe_bool_panic", addr);
}
}
fn on_observe_u64(&mut self, addr: &Address, dist: &dyn Distribution<u64>, value: u64) {
if !self.circuit_breaker_open
&& std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
self.inner.on_observe_u64(addr, dist, value)
}))
.is_err()
{
self.handle_error("observe_u64_panic", addr);
}
}
fn on_observe_usize(&mut self, addr: &Address, dist: &dyn Distribution<usize>, value: usize) {
if !self.circuit_breaker_open
&& std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
self.inner.on_observe_usize(addr, dist, value)
}))
.is_err()
{
self.handle_error("observe_usize_panic", addr);
}
}
fn on_factor(&mut self, log_weight: f64) {
if !log_weight.is_finite() {
eprintln!("Invalid factor log-weight: {}", log_weight);
self.error_count += 1;
return; // Skip invalid factors
}
if !self.circuit_breaker_open {
self.inner.on_factor(log_weight);
}
}
fn finish(self) -> Trace {
println!("✅ Production handler statistics:");
println!(
" - Errors encountered: {}/{}",
self.error_count, self.max_errors
);
println!(
" - Circuit breaker status: {}",
if self.circuit_breaker_open {
"OPEN (fallback mode)"
} else {
"CLOSED (normal)"
}
);
if self.circuit_breaker_open {
// Return minimal valid trace in fallback mode
Trace::default()
} else {
self.inner.finish()
}
}
}
Robust Error Handling Features:
- Circuit Breaker Pattern: Prevents cascade failures by switching to fallback mode
- Panic Recovery: Catches panics and provides safe default values
- Input Validation: Ensures all inputs are finite and within expected ranges
- Fallback Values: Domain-specific defaults for different parameter types
- Error Counting: Tracks error rates to trigger circuit breaker activation
Configuration Management
Production models require flexible configuration for different environments:
#[derive(Debug, Clone)]
struct ModelConfig {
// Model parameters
temperature_prior_mean: f64,
temperature_prior_std: f64,
validity_probability: f64,
sensor_noise_std: f64,
// Runtime configuration
max_inference_time_ms: u64,
memory_pool_size: usize,
enable_circuit_breaker: bool,
error_threshold: u32,
// Environment settings
environment: String, // "development", "staging", "production"
_log_level: String,
enable_metrics: bool,
}
impl Default for ModelConfig {
fn default() -> Self {
Self {
temperature_prior_mean: 20.0,
temperature_prior_std: 5.0,
validity_probability: 0.95,
sensor_noise_std: 1.0,
max_inference_time_ms: 1000,
memory_pool_size: 100,
enable_circuit_breaker: true,
error_threshold: 10,
environment: "production".to_string(),
_log_level: "info".to_string(),
enable_metrics: true,
}
}
}
struct ConfigurableModelRunner {
config: ModelConfig,
pool: TracePool,
metrics: ProductionMetrics,
}
impl ConfigurableModelRunner {
fn new(config: ModelConfig) -> Self {
Self {
pool: TracePool::new(config.memory_pool_size),
metrics: ProductionMetrics::new(config.enable_metrics),
config,
}
}
fn create_model(&self) -> Model<(f64, bool)> {
let config = self.config.clone();
prob!(
let temp <- sample(
addr!("temperature"),
Normal::new(config.temperature_prior_mean, config.temperature_prior_std).unwrap()
);
let valid <- sample(
addr!("valid"),
Bernoulli::new(config.validity_probability).unwrap()
);
// Simulate sensor reading with configured noise
observe(
addr!("sensor"),
Normal::new(temp, config.sensor_noise_std).unwrap(),
22.0
);
pure((temp, valid))
)
}
fn run_inference(&mut self) -> Result<(f64, bool), String> {
let start = Instant::now();
// Configure handler based on environment
let mut rng = thread_rng();
let model = self.create_model(); // Create model before borrowing
let result = if self.config.environment == "production" {
// Use safe, fault-tolerant execution in production
let base_handler = PooledPriorHandler::new(&mut rng, &mut self.pool);
let robust_handler =
RobustProductionHandler::new(base_handler, self.config.error_threshold);
let (result, _trace) = runtime::handler::run(robust_handler, model);
Ok(result)
} else {
// Use faster, less safe execution in development
let handler = PriorHandler {
rng: &mut rng,
trace: Trace::default(),
};
let (result, _trace) = runtime::handler::run(handler, model);
Ok(result)
};
let duration = start.elapsed();
self.metrics.record_inference_time(duration);
// Check timeout
if duration.as_millis() > self.config.max_inference_time_ms as u128 {
self.metrics.increment_timeout_count();
return Err(format!(
"Inference timeout: {}ms > {}ms",
duration.as_millis(),
self.config.max_inference_time_ms
));
}
result
}
}
Configuration Best Practices:
- Environment-Specific Settings: Different behavior for development/staging/production
- Model Parameter Configuration: Tunable priors, noise levels, and thresholds
- Runtime Configuration: Memory pool sizes, timeout limits, error thresholds
- Deployment Configuration: Circuit breaker settings, logging levels, metrics enablement
- Type-Safe Defaults: Sensible fallbacks for all configuration parameters
Production Metrics and Observability
Observability requires systematic metric collection with statistical analysis and anomaly detection. The metric taxonomy follows the USE method (Utilization, Saturation, Errors) and RED method (Rate, Errors, Duration):
graph TD subgraph "Observability Architecture" A[Model Execution] --> B[Metric Collection] B --> C{Metric Type} C -->|USE| D[Resource Metrics] C -->|RED| E[Service Metrics] C -->|Business| F[Domain Metrics] D --> G[Utilization: ρ = λ/μ] D --> H[Saturation: Queue Length] D --> I[Error Rate: λₑ] E --> J[Request Rate: λᵣ] E --> K[Error Rate: εᵣ] E --> L[Duration: T₉₉] F --> M[Inference Accuracy] F --> N[Model Drift] F --> O[Business KPIs] G --> P[(Time Series DB)] H --> P I --> P J --> P K --> P L --> P M --> Q[(Analytics DB)] N --> Q O --> Q end
Statistical Process Control: Metrics follow control chart theory with statistical control limits:
Anomaly Detection: Using exponentially weighted moving averages:
#[derive(Debug, Clone)]
struct ProductionMetrics {
enabled: bool,
inference_count: u64,
error_count: u64,
timeout_count: u64,
total_inference_time: Duration,
start_time: SystemTime,
}
impl ProductionMetrics {
fn new(enabled: bool) -> Self {
Self {
enabled,
inference_count: 0,
error_count: 0,
timeout_count: 0,
total_inference_time: Duration::ZERO,
start_time: SystemTime::now(),
}
}
fn record_inference_time(&mut self, duration: Duration) {
if self.enabled {
self.inference_count += 1;
self.total_inference_time += duration;
}
}
fn _increment_error_count(&mut self) {
if self.enabled {
self.error_count += 1;
}
}
fn increment_timeout_count(&mut self) {
if self.enabled {
self.timeout_count += 1;
}
}
fn get_stats(&self) -> HashMap<String, f64> {
let mut stats = HashMap::new();
if self.enabled {
let uptime = self.start_time.elapsed().unwrap_or(Duration::ZERO);
let avg_inference_time = if self.inference_count > 0 {
self.total_inference_time.as_millis() as f64 / self.inference_count as f64
} else {
0.0
};
stats.insert("inference_count".to_string(), self.inference_count as f64);
stats.insert("error_count".to_string(), self.error_count as f64);
stats.insert("timeout_count".to_string(), self.timeout_count as f64);
stats.insert(
"error_rate".to_string(),
if self.inference_count > 0 {
self.error_count as f64 / self.inference_count as f64
} else {
0.0
},
);
stats.insert("avg_inference_time_ms".to_string(), avg_inference_time);
stats.insert("uptime_seconds".to_string(), uptime.as_secs() as f64);
stats.insert(
"throughput_per_second".to_string(),
if uptime.as_secs() > 0 {
self.inference_count as f64 / uptime.as_secs() as f64
} else {
0.0
},
);
}
stats
}
fn export_prometheus_metrics(&self) -> String {
let mut metrics = String::new();
let stats = self.get_stats();
metrics.push_str("# HELP fugue_inference_total Total number of inference runs\n");
metrics.push_str("# TYPE fugue_inference_total counter\n");
metrics.push_str(&format!(
"fugue_inference_total {}\n",
stats.get("inference_count").unwrap_or(&0.0)
));
metrics.push_str("# HELP fugue_errors_total Total number of errors\n");
metrics.push_str("# TYPE fugue_errors_total counter\n");
metrics.push_str(&format!(
"fugue_errors_total {}\n",
stats.get("error_count").unwrap_or(&0.0)
));
metrics.push_str(
"# HELP fugue_inference_duration_ms Average inference duration in milliseconds\n",
);
metrics.push_str("# TYPE fugue_inference_duration_ms gauge\n");
metrics.push_str(&format!(
"fugue_inference_duration_ms {}\n",
stats.get("avg_inference_time_ms").unwrap_or(&0.0)
));
metrics.push_str("# HELP fugue_error_rate Error rate (errors/total inferences)\n");
metrics.push_str("# TYPE fugue_error_rate gauge\n");
metrics.push_str(&format!(
"fugue_error_rate {}\n",
stats.get("error_rate").unwrap_or(&0.0)
));
metrics
}
}
/// Production monitoring handler that integrates with metrics systems
struct MetricsHandler<H: Handler> {
inner: H,
metrics: Arc<std::sync::Mutex<ProductionMetrics>>,
_model_name: String,
}
impl<H: Handler> MetricsHandler<H> {
fn new(
inner: H,
metrics: Arc<std::sync::Mutex<ProductionMetrics>>,
model_name: String,
) -> Self {
Self {
inner,
metrics,
_model_name: model_name,
}
}
}
impl<H: Handler> Handler for MetricsHandler<H> {
fn on_sample_f64(&mut self, addr: &Address, dist: &dyn Distribution<f64>) -> f64 {
let start = Instant::now();
let result = self.inner.on_sample_f64(addr, dist);
if let Ok(mut metrics) = self.metrics.lock() {
metrics.record_inference_time(start.elapsed());
}
result
}
fn on_sample_bool(&mut self, addr: &Address, dist: &dyn Distribution<bool>) -> bool {
self.inner.on_sample_bool(addr, dist)
}
fn on_sample_u64(&mut self, addr: &Address, dist: &dyn Distribution<u64>) -> u64 {
self.inner.on_sample_u64(addr, dist)
}
fn on_sample_usize(&mut self, addr: &Address, dist: &dyn Distribution<usize>) -> usize {
self.inner.on_sample_usize(addr, dist)
}
fn on_observe_f64(&mut self, addr: &Address, dist: &dyn Distribution<f64>, value: f64) {
self.inner.on_observe_f64(addr, dist, value);
}
fn on_observe_bool(&mut self, addr: &Address, dist: &dyn Distribution<bool>, value: bool) {
self.inner.on_observe_bool(addr, dist, value);
}
fn on_observe_u64(&mut self, addr: &Address, dist: &dyn Distribution<u64>, value: u64) {
self.inner.on_observe_u64(addr, dist, value);
}
fn on_observe_usize(&mut self, addr: &Address, dist: &dyn Distribution<usize>, value: usize) {
self.inner.on_observe_usize(addr, dist, value);
}
fn on_factor(&mut self, log_weight: f64) {
self.inner.on_factor(log_weight);
}
fn finish(self) -> Trace {
self.inner.finish()
}
}
Metrics Collection:
- Performance Metrics: Inference time, throughput, operation counts
- Error Tracking: Error rates, timeout counts, failure categorization
- System Health: Uptime, resource utilization, memory pool efficiency
- Prometheus Integration: Standard metrics format for monitoring systems
- Real-Time Dashboards: Live performance and health indicators
Health Checks and System Validation
Health monitoring implements continuous system validation through multi-level health checks with statistical thresholds and predictive alerting:
graph TD subgraph "Health Check Hierarchy" A[System Health H⁽ˢ⁾] --> B[Model Health H⁽ᵐ⁾] B --> C[Inference Health H⁽ⁱ⁾] C --> D[Resource Health H⁽ʳ⁾] A --> E{H⁽ˢ⁾ > θₛ?} B --> F{H⁽ᵐ⁾ > θₘ?} C --> G{H⁽ⁱ⁾ > θᵢ?} D --> H{H⁽ʳ⁾ > θʳ?} E -->|No| I[System Alert] F -->|No| J[Model Alert] G -->|No| K[Inference Alert] H -->|No| L[Resource Alert] E -->|Yes| M[System OK] F -->|Yes| M G -->|Yes| M H -->|Yes| M end
Health Score Calculation: Weighted combination of subsystem health:
where are importance weights and .
Predictive Health Modeling: Using time series forecasting:
Early Warning System: When for sustained periods, the system triggers preemptive scaling or graceful degradation before reaching critical thresholds.
#[derive(Debug, Clone)]
struct HealthCheckResult {
status: HealthStatus,
message: String,
details: HashMap<String, String>,
timestamp: SystemTime,
}
#[derive(Debug, Clone, PartialEq)]
enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
}
struct ProductionHealthChecker {
model_config: ModelConfig,
metrics: Arc<std::sync::Mutex<ProductionMetrics>>,
}
impl ProductionHealthChecker {
fn new(config: ModelConfig, metrics: Arc<std::sync::Mutex<ProductionMetrics>>) -> Self {
Self {
model_config: config,
metrics,
}
}
fn run_health_check(&self) -> HealthCheckResult {
let mut details = HashMap::new();
let mut overall_status = HealthStatus::Healthy;
let mut messages = Vec::new();
// Check 1: Model execution health
match self.check_model_execution() {
Ok(duration) => {
details.insert("model_execution".to_string(), "healthy".to_string());
details.insert(
"execution_time_ms".to_string(),
format!("{:.1}", duration.as_millis()),
);
}
Err(e) => {
overall_status = HealthStatus::Unhealthy;
messages.push(format!("Model execution failed: {}", e));
details.insert("model_execution".to_string(), "failed".to_string());
}
}
// Check 2: Memory usage
if let Some(pool_stats) = self.check_memory_health() {
let hit_ratio = pool_stats.hit_ratio();
details.insert("memory_hit_ratio".to_string(), format!("{:.2}%", hit_ratio));
if hit_ratio < 50.0 {
overall_status = HealthStatus::Degraded;
messages.push("Low memory pool hit ratio".to_string());
}
}
// Check 3: Error rates
if let Ok(metrics) = self.metrics.lock() {
let stats = metrics.get_stats();
let error_rate = stats.get("error_rate").unwrap_or(&0.0) * 100.0;
details.insert(
"error_rate_percent".to_string(),
format!("{:.2}%", error_rate),
);
if error_rate > 5.0 {
overall_status = HealthStatus::Degraded;
messages.push(format!("High error rate: {:.1}%", error_rate));
} else if error_rate > 20.0 {
overall_status = HealthStatus::Unhealthy;
messages.push(format!("Critical error rate: {:.1}%", error_rate));
}
let avg_time = stats.get("avg_inference_time_ms").unwrap_or(&0.0);
details.insert(
"avg_inference_time_ms".to_string(),
format!("{:.1}", avg_time),
);
if *avg_time > self.model_config.max_inference_time_ms as f64 * 0.8 {
overall_status = HealthStatus::Degraded;
messages.push("Inference time approaching timeout threshold".to_string());
}
}
// Check 4: System resources
details.insert(
"memory_pool_size".to_string(),
self.model_config.memory_pool_size.to_string(),
);
details.insert(
"circuit_breaker".to_string(),
if self.model_config.enable_circuit_breaker {
"enabled".to_string()
} else {
"disabled".to_string()
},
);
let message = if messages.is_empty() {
"All systems healthy".to_string()
} else {
messages.join("; ")
};
HealthCheckResult {
status: overall_status,
message,
details,
timestamp: SystemTime::now(),
}
}
fn check_model_execution(&self) -> Result<Duration, String> {
let start = Instant::now();
let mut rng = thread_rng();
// Run a simplified version of the model for health checking
let health_model = || {
prob!(
let value <- sample(addr!("health_check"), Normal::new(0.0, 1.0).unwrap());
pure(value)
)
};
let handler = PriorHandler {
rng: &mut rng,
trace: Trace::default(),
};
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
runtime::handler::run(handler, health_model())
})) {
Ok((result, trace)) => {
if result.is_finite() && trace.total_log_weight().is_finite() {
Ok(start.elapsed())
} else {
Err("Invalid model output".to_string())
}
}
Err(_) => Err("Model execution panicked".to_string()),
}
}
fn check_memory_health(&self) -> Option<fugue::runtime::memory::PoolStats> {
// In a real implementation, this would check the actual memory pool
// For demonstration, we'll create a temporary pool
let pool = TracePool::new(10);
Some(pool.stats().clone())
}
}
Health Check Components:
- Model Execution Health: Verifies core functionality with simplified tests
- Memory Health: Monitors pool efficiency and memory usage patterns
- Error Rate Analysis: Tracks and categorizes different failure modes
- Performance Monitoring: Identifies degradation before it impacts users
- Multi-Level Status: Healthy/Degraded/Unhealthy with detailed diagnostics
Input Validation and Security
Robust input validation prevents security vulnerabilities and system failures:
/// Secure input validator for production model parameters
struct InputValidator;
impl InputValidator {
fn validate_temperature(temp: f64) -> Result<f64, String> {
match temp {
t if !t.is_finite() => Err("Temperature must be finite".to_string()),
t if t < -50.0 => Err("Temperature too low (< -50°C)".to_string()),
t if t > 100.0 => Err("Temperature too high (> 100°C)".to_string()),
t => Ok(t),
}
}
fn validate_probability(p: f64) -> Result<f64, String> {
match p {
p if !p.is_finite() => Err("Probability must be finite".to_string()),
p if p < 0.0 => Err("Probability must be non-negative".to_string()),
p if p > 1.0 => Err("Probability must not exceed 1.0".to_string()),
p => Ok(p),
}
}
fn _validate_sensor_reading(reading: f64) -> Result<f64, String> {
match reading {
r if !r.is_finite() => Err("Sensor reading must be finite".to_string()),
r if r.abs() > 1000.0 => Err("Sensor reading out of reasonable range".to_string()),
r => Ok(r),
}
}
fn sanitize_address_component(component: &str) -> Result<String, String> {
// Prevent injection attacks in address components
if component
.chars()
.any(|c| !(c.is_alphanumeric() || c == '_' || c == '-'))
{
return Err("Address component contains invalid characters".to_string());
}
if component.len() > 50 {
Err("Address component too long".to_string())
} else if component.is_empty() {
Err("Address component cannot be empty".to_string())
} else {
Ok(component.to_string())
}
}
}
/// Production model with comprehensive input validation
fn create_validated_model(
temperature_reading: f64,
sensor_id: &str,
prior_prob: f64,
) -> Result<Model<(f64, bool)>, String> {
// Validate all inputs before model creation
let validated_temp = InputValidator::validate_temperature(temperature_reading)?;
let validated_prob = InputValidator::validate_probability(prior_prob)?;
let sanitized_sensor_id = InputValidator::sanitize_address_component(sensor_id)?;
// Additional business logic validation
if sanitized_sensor_id.starts_with("test_") && validated_prob > 0.5 {
return Err("Test sensors cannot have high prior probability".to_string());
}
Ok(prob!(
let true_temp <- sample(
addr!("temperature"),
Normal::new(validated_temp, 2.0).unwrap()
);
let is_working <- sample(
addr!("sensor_working", sanitized_sensor_id.clone()),
Bernoulli::new(validated_prob).unwrap()
);
// Safe observation with validated input
observe(
addr!("reading", sanitized_sensor_id),
Normal::new(true_temp, if is_working { 0.5 } else { 5.0 }).unwrap(),
validated_temp
);
pure((true_temp, is_working))
))
}
Security Measures:
- Range Validation: Ensure parameters are within physically meaningful bounds
- Type Safety: Validate all inputs before model construction
- Sanitization: Clean address components to prevent injection attacks
- Business Rule Enforcement: Domain-specific validation logic
- Error Messages: Informative feedback without revealing system internals
Deployment Strategies and Patterns
Deployment strategies implement risk management through controlled rollout and statistical validation. Each strategy provides different risk-latency tradeoffs:
graph TD subgraph "Deployment Strategy Matrix" A[New Model Version] --> B{Strategy Selection} B -->|Low Risk| C[Blue-Green] B -->|Medium Risk| D[Canary] B -->|High Risk| E[A/B Test] C --> F[Instant Switch<br/>Risk: High<br/>Rollback: Fast] D --> G[Gradual Rollout<br/>Risk: Medium<br/>Validation: Statistical] E --> H[Statistical Test<br/>Risk: Low<br/>Duration: Long] F --> I{Success?} G --> J{Performance > Baseline?} E --> K{Significance Test?} I -->|No| L[Instant Rollback] J -->|No| M[Gradual Rollback] K -->|No| N[Maintain Status Quo] I -->|Yes| O[Full Deployment] J -->|Yes| P[Continue Rollout] K -->|Yes| Q[Gradual Migration] end
Canary Analysis: Statistical significance testing for canary deployments:
A/B Testing: Welch's t-test for unequal variances:
/// Production deployment manager with different strategies
#[derive(Debug, Clone)]
enum DeploymentStrategy {
BlueGreen,
CanaryRelease { percentage: f64 },
RollingUpdate,
_ImmediateSwitch,
}
struct ModelDeploymentManager {
current_model_version: String,
candidate_model_version: String,
deployment_strategy: DeploymentStrategy,
_rollback_threshold_error_rate: f64,
}
impl ModelDeploymentManager {
fn new(strategy: DeploymentStrategy) -> Self {
Self {
current_model_version: "v1.0.0".to_string(),
candidate_model_version: "v1.1.0".to_string(),
deployment_strategy: strategy,
_rollback_threshold_error_rate: 0.05, // 5% error rate triggers rollback
}
}
fn should_use_candidate_model(&self, request_id: u64) -> bool {
match &self.deployment_strategy {
DeploymentStrategy::BlueGreen => {
// In blue-green, we typically switch all traffic at once
// For demo, we'll use request ID to simulate the switch
request_id % 100 < 10 // 10% to candidate for testing
}
DeploymentStrategy::CanaryRelease { percentage } => {
let hash = request_id % 100;
(hash as f64) < (*percentage * 100.0)
}
DeploymentStrategy::RollingUpdate => {
// Gradual rollout based on some criteria
request_id % 10 < 3 // 30% rollout
}
DeploymentStrategy::_ImmediateSwitch => true,
}
}
fn create_model(&self, use_candidate: bool) -> impl Fn() -> Model<f64> {
let version = if use_candidate {
self.candidate_model_version.clone()
} else {
self.current_model_version.clone()
};
move || {
if version.starts_with("v1.1") {
// Candidate model with improved parameters
prob!(
let value <- sample(addr!("improved_param"), Normal::new(0.0, 0.8).unwrap());
factor(0.1); // Slight preference for this model
pure(value)
)
} else {
// Current stable model
prob!(
let value <- sample(addr!("stable_param"), Normal::new(0.0, 1.0).unwrap());
pure(value)
)
}
}
}
fn process_request(&self, request_id: u64) -> Result<(f64, String), String> {
let use_candidate = self.should_use_candidate_model(request_id);
let version = if use_candidate {
&self.candidate_model_version
} else {
&self.current_model_version
};
let model = self.create_model(use_candidate);
// Execute with error handling
let mut rng = thread_rng();
let handler = PriorHandler {
rng: &mut rng,
trace: Trace::default(),
};
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
runtime::handler::run(handler, model())
})) {
Ok((result, trace)) => {
if result.is_finite() && trace.total_log_weight().is_finite() {
Ok((result, version.clone()))
} else {
Err(format!("Invalid result from model {}", version))
}
}
Err(_) => Err(format!("Model {} panicked", version)),
}
}
}
Deployment Patterns:
- Blue-Green Deployment: Instant traffic switching between model versions
- Canary Releases: Gradual rollout to percentage of traffic for risk mitigation
- Rolling Updates: Progressive deployment across infrastructure
- A/B Testing: Compare model performance with statistical significance
- Rollback Capability: Quick reversion to previous version on issues
Performance Optimization Patterns
Memory Management
use fugue::runtime::memory::{TracePool, PooledPriorHandler};
// Production memory management
let mut pool = TracePool::new(1000);
let handler = PooledPriorHandler::new(&mut rng, &mut pool);
Batch Processing
// Process multiple inference requests efficiently
struct BatchProcessor {
pool: TracePool,
batch_size: usize,
}
impl BatchProcessor {
fn process_batch(&mut self, requests: Vec<InferenceRequest>) -> Vec<InferenceResult> {
requests.into_iter().map(|req| {
let handler = PooledPriorHandler::new(&mut req.rng, &mut self.pool);
self.run_single_inference(handler, req.model)
}).collect()
}
}
Connection Pooling
// Database connection management for model parameters
struct ModelParameterStore {
connection_pool: Arc<ConnectionPool>,
parameter_cache: LruCache<String, ModelParameters>,
}
Monitoring Integration
Prometheus Metrics
// Export metrics in Prometheus format
fn export_metrics(metrics: &ProductionMetrics) -> String {
format!(
"# HELP fugue_inference_total Total inference operations\n\
TYPE fugue_inference_total counter\n\
fugue_inference_total {}\n\
HELP fugue_error_rate Current error rate\n\
TYPE fugue_error_rate gauge\n\
fugue_error_rate {}\n",
metrics.inference_count,
metrics.error_rate()
)
}
Structured Logging
use serde_json::json;
// Structured logging for production debugging
fn log_inference_event(
request_id: &str,
model_version: &str,
duration: Duration,
result: &InferenceResult
) {
let log_entry = json!({
"event": "inference_completed",
"request_id": request_id,
"model_version": model_version,
"duration_ms": duration.as_millis(),
"success": result.is_success(),
"timestamp": SystemTime::now(),
});
println!("{}", log_entry);
}
Alert Rules
// Define alerting thresholds
struct AlertRules {
max_error_rate: f64,
max_latency_ms: u64,
min_throughput_per_sec: f64,
}
impl AlertRules {
fn check_alerts(&self, metrics: &ProductionMetrics) -> Vec<Alert> {
let mut alerts = Vec::new();
if metrics.error_rate() > self.max_error_rate {
alerts.push(Alert::HighErrorRate(metrics.error_rate()));
}
if metrics.avg_latency().as_millis() > self.max_latency_ms as u128 {
alerts.push(Alert::HighLatency(metrics.avg_latency()));
}
alerts
}
}
Testing in Production
Shadow Mode Testing
// Run new model versions in shadow mode
struct ShadowTester {
primary_model: Box<dyn Fn() -> Model<f64>>,
shadow_model: Box<dyn Fn() -> Model<f64>>,
comparison_rate: f64,
}
impl ShadowTester {
fn run_with_shadow(&mut self, input: &Input) -> (PrimaryResult, Option<ShadowResult>) {
let primary = self.run_primary(input);
let shadow = if rand::random::<f64>() < self.comparison_rate {
Some(self.run_shadow(input))
} else {
None
};
(primary, shadow)
}
}
Production Validation
// Continuous validation in production
fn validate_model_assumptions(trace: &Trace) -> ValidationResult {
let mut issues = Vec::new();
// Check log-weight stability
if !trace.total_log_weight().is_finite() {
issues.push("Non-finite log-weight detected".to_string());
}
// Check parameter ranges
for (addr, choice) in &trace.choices {
if let ChoiceValue::F64(value) = choice.value {
if value.abs() > 1000.0 {
issues.push(format!("Extreme value at {}: {}", addr, value));
}
}
}
ValidationResult { issues }
}
Operational Excellence
Infrastructure as Code
# Kubernetes deployment example
apiVersion: apps/v1
kind: Deployment
metadata:
name: fugue-inference-service
spec:
replicas: 3
selector:
matchLabels:
app: fugue-inference
template:
metadata:
labels:
app: fugue-inference
spec:
containers:
- name: inference-service
image: fugue-inference:v1.2.0
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
Service Level Objectives (SLOs)
// Define and monitor SLOs
struct ServiceLevelObjectives {
availability_target: f64, // 99.9%
latency_p99_ms: u64, // 100ms
error_rate_threshold: f64, // 0.1%
}
impl ServiceLevelObjectives {
fn evaluate_slo_compliance(&self, metrics: &ProductionMetrics) -> SLOReport {
SLOReport {
availability: self.calculate_availability(metrics),
latency_compliance: metrics.p99_latency() <= Duration::from_millis(self.latency_p99_ms),
error_rate_compliance: metrics.error_rate() <= self.error_rate_threshold,
}
}
}
Capacity Planning
// Capacity planning and auto-scaling
struct CapacityPlanner {
target_cpu_utilization: f64,
target_memory_utilization: f64,
scale_up_threshold: f64,
scale_down_threshold: f64,
}
impl CapacityPlanner {
fn recommend_scaling(&self, current_metrics: &SystemMetrics) -> ScalingRecommendation {
if current_metrics.cpu_utilization > self.scale_up_threshold {
ScalingRecommendation::ScaleUp(self.calculate_scale_factor(current_metrics))
} else if current_metrics.cpu_utilization < self.scale_down_threshold {
ScalingRecommendation::ScaleDown(0.5)
} else {
ScalingRecommendation::NoAction
}
}
}
Security Best Practices
Input Sanitization
Always validate and sanitize inputs before processing:
- Range checks for numerical parameters
- Character filtering for string inputs
- Business rule validation for domain constraints
- Rate limiting to prevent abuse
- Authentication and authorization for API access
Secret Management
// Secure configuration management
struct SecureConfig {
database_url: SecretString,
api_key: SecretString,
model_parameters: ModelConfig,
}
impl SecureConfig {
fn from_environment() -> Result<Self, ConfigError> {
Ok(SecureConfig {
database_url: env::var("DATABASE_URL")?.into(),
api_key: env::var("API_KEY")?.into(),
model_parameters: ModelConfig::from_file("model_config.toml")?,
})
}
}
Audit Logging
// Comprehensive audit trail
fn log_inference_request(
user_id: &str,
request: &InferenceRequest,
response: &InferenceResponse,
) {
let audit_log = AuditLogEntry {
timestamp: SystemTime::now(),
user_id: user_id.to_string(),
action: "inference_request".to_string(),
input_hash: hash_sensitive_data(&request.input),
output_hash: hash_sensitive_data(&response.output),
model_version: response.model_version.clone(),
success: response.success,
};
audit_logger::log(audit_log);
}
Common Production Pitfalls
Memory Leaks
// Avoid: Creating new pools repeatedly
// for _ in 0..1000 {
// let pool = TracePool::new(100); // Memory leak!
// }
// Do: Reuse pools across requests
let mut pool = TracePool::new(100);
for request in requests {
let handler = PooledPriorHandler::new(&mut request.rng, &mut pool);
process_request(handler, request);
}
Blocking Operations
// Avoid: Synchronous database calls in request handlers
// let result = database.query_sync(query); // Blocks event loop
// Do: Use async operations with proper timeouts
async fn process_request(request: Request) -> Result<Response, Error> {
let timeout = Duration::from_millis(100);
let result = tokio::time::timeout(timeout, database.query(query)).await??;
Ok(result)
}
Error Propagation
// Avoid: Panicking on errors
// let value = risky_operation().unwrap(); // May crash service
// Do: Graceful error handling with fallbacks
let value = match risky_operation() {
Ok(v) => v,
Err(e) => {
metrics.increment_error_count();
log::warn!("Operation failed: {}, using fallback", e);
fallback_value()
}
};
Successful production deployment combines mathematical rigor with engineering excellence:
- Reliability Engineering: Fault tolerance through statistical modeling and circuit breaker patterns
- Performance Optimization: Memory pooling, numerical stability, and batch processing
- Observability: Multi-dimensional metrics with statistical process control
- Deployment Strategies: Risk-managed rollouts with statistical validation
- Health Monitoring: Predictive alerting and graceful degradation
These patterns enable robust production systems capable of handling real-world probabilistic computing at scale.
Production deployment represents the culmination of probabilistic programming excellence, where theoretical foundations meet operational reality. Fugue's comprehensive tooling transforms academic probabilistic models into production-grade systems that deliver reliable, scalable, and maintainable probabilistic computing solutions.