Deployment
from typing import Union, Optional, List, Dict, Any
from flax import nnx
# Deployment API Reference
The `opifex.deployment` package provides enterprise-grade deployment capabilities for serving Opifex models in production.
## Overview
The deployment module offers:
- **Model Serving**: High-performance REST API for model inference
- **Inference Engine**: Optimized inference with batching and caching
- **Cloud Deployment**: AWS and GCP integration
- **Kubernetes Orchestration**: Auto-scaling and resource management
- **Monitoring**: Health checks, metrics, and logging
- **Model Registry**: Integration with model versioning
## Model Serving
### ModelServer
High-performance model serving infrastructure.
```python
from opifex.deployment import ModelServer, DeploymentConfig
class ModelServer:
"""
Production model serving with REST API.
Provides HTTP endpoints for model inference with automatic
batching, caching, and performance optimization.
Args:
model: Trained model or model ID from registry
config: Deployment configuration
enable_batching: Enable request batching
batch_size: Maximum batch size
timeout_ms: Request timeout in milliseconds
Example:
>>> config = DeploymentConfig(
... host='0.0.0.0',
... port=8000,
... workers=4,
... enable_gpu=True
... )
>>> server = ModelServer(model, config)
>>> server.start() # Starts serving on port 8000
"""
def __init__(
self,
model: Union[nnx.Module, str],
config: DeploymentConfig,
enable_batching: bool = True,
batch_size: int = 32,
timeout_ms: int = 5000
):
"""Initialize model server."""
def start(self):
"""
Start serving model.
Creates REST API with endpoints:
- POST /predict: Run inference
- GET /health: Health check
- GET /metrics: Prometheus metrics
- GET /model/info: Model metadata
Example:
>>> server.start()
>>> # Server now accepting requests at http://localhost:8000
"""
def stop(self):
"""Stop server gracefully."""
def reload_model(self, model_path: str):
"""
Hot-reload model without downtime.
Args:
model_path: Path to new model checkpoint
Example:
>>> # Deploy new model version
>>> server.reload_model('model_v2.ckpt')
"""
DeploymentConfig¶
Configuration for model deployment.
from opifex.deployment import DeploymentConfig
@dataclass
class DeploymentConfig:
"""
Configuration for model deployment.
Attributes:
host: Server host address
port: Server port
workers: Number of worker processes
enable_gpu: Use GPU for inference
max_batch_size: Maximum batch size
timeout_ms: Request timeout
enable_caching: Cache frequent requests
cache_size: Maximum cached items
log_level: Logging level
cors_origins: Allowed CORS origins
api_key_required: Require API key authentication
"""
host: str = "0.0.0.0"
port: int = 8000
workers: int = 4
enable_gpu: bool = True
max_batch_size: int = 32
timeout_ms: int = 5000
enable_caching: bool = True
cache_size: int = 1000
log_level: str = "INFO"
cors_origins: List[str] = field(default_factory=lambda: ["*"])
api_key_required: bool = False
Inference Engine¶
InferenceEngine¶
Optimized inference with batching and model optimization.
from opifex.deployment import InferenceEngine
class InferenceEngine:
"""
High-performance inference engine.
Features:
- Automatic request batching
- Model optimization (quantization, pruning)
- Multi-device support
- Cached predictions
- Performance monitoring
Args:
model: Model to serve
device: Target device ('cpu', 'cuda', 'tpu')
optimize: Apply model optimizations
precision: Inference precision ('fp32', 'fp16', 'bf16')
Example:
>>> engine = InferenceEngine(
... model=fno_model,
... device='cuda',
... optimize=True,
... precision='fp16'
... )
>>> # Run inference
>>> predictions = engine.predict(inputs)
"""
def predict(
self,
inputs: Array,
batch_size: Optional[int] = None
) -> Array:
"""
Run inference on inputs.
Args:
inputs: Input data
batch_size: Override default batch size
Returns:
Model predictions
Example:
>>> inputs = jnp.array([...]) # Shape: (1000, 64, 64)
>>> # Automatically batched
>>> predictions = engine.predict(inputs, batch_size=32)
"""
def predict_async(
self,
inputs: Array
) -> asyncio.Future:
"""
Async inference for concurrent requests.
Args:
inputs: Input data
Returns:
Future for prediction result
Example:
>>> async def process_batch(batch):
... result = await engine.predict_async(batch)
... return result
"""
def optimize_model(
self,
optimization_level: int = 1
):
"""
Apply model optimizations.
Args:
optimization_level: Optimization level (0-3):
- 0: No optimization
- 1: Basic (JIT compilation)
- 2: Standard (+ operator fusion)
- 3: Aggressive (+ quantization)
Example:
>>> engine.optimize_model(optimization_level=2)
"""
def benchmark(
self,
test_inputs: Array,
num_iterations: int = 100
) -> Dict[str, float]:
"""
Benchmark inference performance.
Args:
test_inputs: Sample inputs for benchmarking
num_iterations: Number of benchmark iterations
Returns:
Performance metrics:
- throughput: Samples/second
- latency_p50: Median latency (ms)
- latency_p95: 95th percentile latency
- latency_p99: 99th percentile latency
Example:
>>> metrics = engine.benchmark(test_data, num_iterations=1000)
>>> print(f"Throughput: {metrics['throughput']:.1f} samples/s")
>>> print(f"P95 latency: {metrics['latency_p95']:.2f} ms")
"""
Cloud Deployment¶
AWS Deployment¶
Deploy models to AWS infrastructure.
from opifex.deployment.cloud import AWSDeploymentManager, AWSConfig
class AWSDeploymentManager:
"""
Manage model deployment on AWS.
Supports:
- EC2 instances
- SageMaker endpoints
- Lambda functions
- ECS containers
Args:
config: AWS configuration
region: AWS region
Example:
>>> aws_config = AWSConfig(
... access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
... secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
... instance_type='ml.g4dn.xlarge',
... endpoint_name='opifex-fno-prod'
... )
>>> manager = AWSDeploymentManager(aws_config, region='us-east-1')
"""
def deploy_sagemaker(
self,
model: nnx.Module,
deployment_config: DeploymentConfig
) -> str:
"""
Deploy model to SageMaker endpoint.
Args:
model: Model to deploy
deployment_config: Deployment configuration
Returns:
Endpoint URL
Example:
>>> endpoint_url = manager.deploy_sagemaker(
... model=fno_model,
... deployment_config=config
... )
>>> print(f"Model deployed at: {endpoint_url}")
"""
def deploy_lambda(
self,
model: nnx.Module,
memory_mb: int = 3008,
timeout_seconds: int = 300
) -> str:
"""
Deploy model as AWS Lambda function.
Args:
model: Model to deploy
memory_mb: Lambda memory allocation
timeout_seconds: Function timeout
Returns:
Lambda function ARN
Example:
>>> # Deploy lightweight model to Lambda
>>> function_arn = manager.deploy_lambda(
... model=small_model,
... memory_mb=1024,
... timeout_seconds=60
... )
"""
def create_auto_scaling(
self,
endpoint_name: str,
min_instances: int = 1,
max_instances: int = 10,
target_metric: str = 'InvocationsPerInstance',
target_value: float = 1000.0
):
"""
Configure auto-scaling for endpoint.
Args:
endpoint_name: SageMaker endpoint name
min_instances: Minimum instance count
max_instances: Maximum instance count
target_metric: Scaling metric
target_value: Target metric value
Example:
>>> manager.create_auto_scaling(
... endpoint_name='opifex-fno-prod',
... min_instances=2,
... max_instances=20,
... target_value=500.0
... )
"""
GCP Deployment¶
Deploy models to Google Cloud Platform.
from opifex.deployment.cloud import GCPDeploymentManager, GCPConfig
class GCPDeploymentManager:
"""
Manage model deployment on GCP.
Supports:
- Vertex AI endpoints
- Cloud Run
- Cloud Functions
- GKE clusters
Args:
config: GCP configuration
project_id: GCP project ID
Example:
>>> gcp_config = GCPConfig(
... credentials_path='credentials.json',
... machine_type='n1-standard-4-k80',
... endpoint_name='opifex-model'
... )
>>> manager = GCPDeploymentManager(gcp_config, project_id='my-project')
"""
def deploy_vertex_ai(
self,
model: nnx.Module,
deployment_config: DeploymentConfig
) -> str:
"""
Deploy to Vertex AI endpoint.
Args:
model: Model to deploy
deployment_config: Deployment configuration
Returns:
Endpoint URL
"""
def deploy_cloud_run(
self,
model: nnx.Module,
min_instances: int = 0,
max_instances: int = 10,
concurrency: int = 80
) -> str:
"""
Deploy to Cloud Run (serverless).
Args:
model: Model to deploy
min_instances: Minimum instances
max_instances: Maximum instances
concurrency: Requests per instance
Returns:
Service URL
"""
Kubernetes Orchestration¶
Kubernetes Manifest Generator¶
Generate Kubernetes manifests for model deployment.
from opifex.deployment.kubernetes import ManifestGenerator
class ManifestGenerator:
"""
Generate Kubernetes deployment manifests.
Creates complete k8s configuration including:
- Deployment
- Service
- HorizontalPodAutoscaler
- Ingress
- ConfigMap
"""
def generate_deployment(
self,
model_name: str,
image: str,
replicas: int = 3,
resources: Optional[Dict] = None
) -> str:
"""
Generate deployment manifest.
Args:
model_name: Name for deployment
image: Container image
replicas: Number of replicas
resources: Resource requests/limits
Returns:
YAML manifest
Example:
>>> generator = ManifestGenerator()
>>> manifest = generator.generate_deployment(
... model_name='opifex-fno',
... image='gcr.io/my-project/opifex-fno:v1',
... replicas=5,
... resources={
... 'requests': {'memory': '2Gi', 'cpu': '1'},
... 'limits': {'memory': '4Gi', 'cpu': '2', 'nvidia.com/gpu': '1'}
... }
... )
>>> # Apply to cluster
>>> with open('deployment.yaml', 'w') as f:
... f.write(manifest)
"""
def generate_autoscaler(
self,
deployment_name: str,
min_replicas: int = 2,
max_replicas: int = 20,
target_cpu: int = 70
) -> str:
"""
Generate HorizontalPodAutoscaler manifest.
Args:
deployment_name: Target deployment
min_replicas: Minimum pods
max_replicas: Maximum pods
target_cpu: Target CPU utilization (%)
Returns:
YAML manifest
"""
Kubernetes Orchestrator¶
Manage Kubernetes deployments.
from opifex.deployment.kubernetes import KubernetesOrchestrator
class KubernetesOrchestrator:
"""
Orchestrate model deployment on Kubernetes.
Args:
kubeconfig_path: Path to kubeconfig
namespace: Kubernetes namespace
Example:
>>> orchestrator = KubernetesOrchestrator(
... kubeconfig_path='~/.kube/config',
... namespace='ml-models'
... )
"""
def deploy(
self,
model: nnx.Module,
deployment_name: str,
image: str,
replicas: int = 3
):
"""
Deploy model to Kubernetes.
Args:
model: Model to deploy
deployment_name: Deployment name
image: Container image
replicas: Number of replicas
Example:
>>> orchestrator.deploy(
... model=fno_model,
... deployment_name='fno-production',
... image='myregistry/fno:latest',
... replicas=5
... )
"""
def scale(
self,
deployment_name: str,
replicas: int
):
"""
Scale deployment to specified replicas.
Args:
deployment_name: Deployment to scale
replicas: Target replica count
Example:
>>> orchestrator.scale('fno-production', replicas=10)
"""
def rolling_update(
self,
deployment_name: str,
new_image: str
):
"""
Perform rolling update to new model version.
Args:
deployment_name: Deployment to update
new_image: New container image
Example:
>>> orchestrator.rolling_update(
... 'fno-production',
... 'myregistry/fno:v2'
... )
"""
Monitoring¶
Health Monitoring¶
Monitor deployment health and performance.
from opifex.deployment.monitoring.health import HealthChecker
class HealthChecker:
"""
Monitor deployment health and performance.
Tracks:
- Request latency
- Error rates
- Model performance metrics
- Resource utilization
"""
def check_health(self) -> Dict[str, Any]:
"""
Perform health check.
Returns:
Health status dictionary
Example:
>>> monitor = HealthChecker(server)
>>> status = monitor.check_health()
>>> if status['healthy']:
... print("System healthy")
>>> else:
... print(f"Issues: {status['issues']}")
"""
def get_metrics(self) -> Dict[str, float]:
"""
Get current performance metrics.
Returns:
Metrics dictionary:
- requests_per_second
- average_latency_ms
- error_rate
- p95_latency_ms
- p99_latency_ms
- cpu_usage_percent
- memory_usage_mb
- gpu_utilization_percent (if GPU)
"""
Complete Deployment Examples¶
Local Development Deployment¶
from opifex.deployment import ModelServer, DeploymentConfig
# Load trained model (use your preferred checkpoint loading method)
# model = ...
# Configure server
config = DeploymentConfig(
host='localhost',
port=8000,
workers=2,
enable_gpu=False, # CPU for local dev
log_level='DEBUG'
)
# Start server
server = ModelServer(model, config)
server.start()
# Make prediction (from client)
import requests
response = requests.post(
'http://localhost:8000/predict',
json={'input': input_data.tolist()}
)
prediction = response.json()['prediction']
Production AWS Deployment¶
from opifex.deployment.cloud import AWSDeploymentManager, AWSConfig
from opifex.deployment import DeploymentConfig
# Configure AWS
aws_config = AWSConfig(
instance_type='ml.g4dn.4xlarge', # GPU instance
endpoint_name='opifex-fno-production',
initial_instance_count=3
)
# Configure deployment
deploy_config = DeploymentConfig(
enable_gpu=True,
max_batch_size=64,
enable_caching=True,
cache_size=10000
)
# Deploy
manager = AWSDeploymentManager(aws_config, region='us-east-1')
endpoint_url = manager.deploy_sagemaker(model, deploy_config)
# Setup auto-scaling
manager.create_auto_scaling(
endpoint_name='opifex-fno-production',
min_instances=3,
max_instances=20,
target_value=500.0 # Target 500 requests/instance
)
print(f"Model deployed at: {endpoint_url}")
Kubernetes Production Deployment¶
from opifex.deployment.kubernetes import KubernetesOrchestrator, ManifestGenerator
# Generate manifests
generator = ManifestGenerator()
deployment = generator.generate_deployment(
model_name='opifex-fno',
image='gcr.io/my-project/opifex-fno:v1.0',
replicas=5,
resources={
'requests': {'memory': '4Gi', 'cpu': '2'},
'limits': {'memory': '8Gi', 'cpu': '4', 'nvidia.com/gpu': '1'}
}
)
autoscaler = generator.generate_autoscaler(
deployment_name='opifex-fno',
min_replicas=3,
max_replicas=20,
target_cpu=70
)
# Save manifests
with open('k8s/deployment.yaml', 'w') as f:
f.write(deployment)
with open('k8s/autoscaler.yaml', 'w') as f:
f.write(autoscaler)
# Deploy
orchestrator = KubernetesOrchestrator(namespace='ml-production')
orchestrator.deploy(
model=fno_model,
deployment_name='opifex-fno',
image='gcr.io/my-project/opifex-fno:v1.0',
replicas=5
)
# Monitor
from opifex.deployment.monitoring.health import HealthChecker
monitor = HealthChecker(orchestrator)
metrics = monitor.get_metrics()
print(f"Current RPS: {metrics['requests_per_second']}")
print(f"P95 latency: {metrics['p95_latency_ms']} ms")
See Also¶
- Platform API: Model registry and versioning
- MLOps API: Experiment tracking
- Cloud Deployment Guide: Detailed AWS setup
- Kubernetes Deployment: Cloud deployment guide