Edge-Cloud Orchestration¶
This page demonstrates how to implement intelligent orchestration strategies with Oblix, seamlessly switching between edge and cloud models based on system resources, connectivity, and other factors.
What is Edge-Cloud Orchestration?¶
Edge-cloud orchestration refers to Oblix's ability to seamlessly switch between:
- Edge models (via Ollama) running locally on your own hardware
- Cloud models (via OpenAI, Claude) running on remote servers
This approach combines the advantages of both:
- Edge models: Privacy, offline capability, no usage fees
- Cloud models: Higher capabilities, larger context windows, specialized features
Basic Orchestration Setup¶
This example shows how to set up a basic orchestration system:
import asyncio
import os
from dotenv import load_dotenv
from oblix import OblixClient, ModelType
from oblix.agents import ResourceMonitor, ConnectivityAgent
async def main():
# Load environment variables from .env file
load_dotenv()
# Initialize client
oblix_api_key = os.getenv('OBLIX_API_KEY')
client = OblixClient(oblix_api_key=oblix_api_key)
# Hook an edge model
await client.hook_model(
model_type=ModelType.OLLAMA,
model_name="llama2",
endpoint="http://localhost:11434" # Default Ollama endpoint
)
# Hook a cloud model
openai_api_key = os.getenv('OPENAI_API_KEY')
await client.hook_model(
model_type=ModelType.OPENAI,
model_name="gpt-3.5-turbo",
api_key=openai_api_key
)
# Add resource monitoring agent
resource_monitor = ResourceMonitor(name="resource_monitor")
client.hook_agent(resource_monitor)
# Add connectivity monitoring agent
connectivity_monitor = ConnectivityAgent(name="connectivity_monitor")
client.hook_agent(connectivity_monitor)
# Execute a prompt (Oblix will automatically orchestrate between edge and cloud)
response = await client.execute("Explain quantum computing in simple terms")
print(f"Response from {response['model_id']}:")
print(response["response"])
# Print orchestration decision information
print("\nOrchestration decisions:")
for agent_name, check_result in response["agent_checks"].items():
print(f"- {agent_name}: {check_result.get('state')}, target: {check_result.get('target')}")
# Clean up resources
await client.shutdown()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nProgram interrupted by user. Exiting...")
except Exception as e:
print(f"\nError: {e}")
Multi-Tier Orchestration Strategy¶
For more sophisticated applications, you can implement a multi-tier orchestration strategy:
import asyncio
import os
from dotenv import load_dotenv
from oblix import OblixClient, ModelType
from oblix.agents import ResourceMonitor, ConnectivityAgent
async def main():
# Load environment variables from .env file
load_dotenv()
# Initialize client
oblix_api_key = os.getenv('OBLIX_API_KEY')
client = OblixClient(oblix_api_key=oblix_api_key)
# Tier 1: Small edge model (fast, low resources)
await client.hook_model(
model_type=ModelType.OLLAMA,
model_name="phi", # or other small model
endpoint="http://localhost:11434"
)
# Tier 2: Medium edge model (better quality, more resources)
await client.hook_model(
model_type=ModelType.OLLAMA,
model_name="llama2",
endpoint="http://localhost:11434"
)
# Tier 3: Standard cloud model (good quality, cost-effective)
openai_api_key = os.getenv('OPENAI_API_KEY')
await client.hook_model(
model_type=ModelType.OPENAI,
model_name="gpt-3.5-turbo",
api_key=openai_api_key
)
# Tier 4: Advanced cloud model (highest quality, more expensive)
await client.hook_model(
model_type=ModelType.OPENAI,
model_name="gpt-4",
api_key=openai_api_key
)
# Add monitoring agents for orchestration
resource_monitor = ResourceMonitor(name="resource_monitor")
connectivity_monitor = ConnectivityAgent(name="connectivity_monitor")
client.hook_agent(resource_monitor)
client.hook_agent(connectivity_monitor)
# Execute a simple prompt (orchestration may select a lower tier)
simple_response = await client.execute("What is the capital of France?")
print(f"Simple query used: {simple_response['model_id']}")
# Execute a complex prompt (orchestration may select a higher tier)
complex_response = await client.execute(
"Analyze the implications of quantum computing on modern cryptography, "
"including potential vulnerabilities in RSA and elliptic curve systems."
)
print(f"Complex query used: {complex_response['model_id']}")
# Clean up resources
await client.shutdown()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nProgram interrupted by user. Exiting...")
except Exception as e:
print(f"\nError: {e}")
Offline-First Orchestration¶
This example prioritizes offline capability with edge models while seamlessly switching to cloud when necessary:
import asyncio
from oblix import OblixClient, ModelType
from oblix.agents import ResourceMonitor, ConnectivityAgent
async def offline_first_app():
# Initialize client
client = OblixClient(oblix_api_key="your_oblix_api_key")
# Primary model: Edge
await client.hook_model(
model_type=ModelType.OLLAMA,
model_name="llama2"
)
# Fallback model: Cloud (only used when necessary)
await client.hook_model(
model_type=ModelType.OPENAI,
model_name="gpt-3.5-turbo",
api_key="your_openai_api_key"
)
# Add connectivity monitoring
connectivity_agent = ConnectivityAgent(
# Stricter thresholds to prefer edge models
latency_threshold=100.0, # Default is 200.0
packet_loss_threshold=5.0, # Default is 10.0
bandwidth_threshold=10.0 # Default is 5.0
)
client.hook_agent(connectivity_agent)
# Add resource monitoring
resource_monitor = ResourceMonitor(
custom_thresholds={
# More lenient resource thresholds to prefer edge models
"cpu_threshold": 90.0, # Default is 80.0
"memory_threshold": 90.0, # Default is 85.0,
"gpu_threshold": 90.0, # Default is 85.0
"critical_gpu": 95.0 # Default is 95.0
}
)
client.hook_agent(resource_monitor)
# Function to process user input
async def process_user_query(query):
try:
response = await client.execute(query)
print(f"\nUsing model: {response['model_id']}")
print(f"Response: {response['response']}")
# Check if we're offline
connectivity = response["agent_checks"].get("connectivity_monitor", {})
if connectivity.get("state") == "disconnected":
print("\n[OFFLINE MODE ACTIVE]")
except Exception as e:
print(f"Error: {e}")
# Even if everything fails, provide minimal functionality
print("Unable to process request. Working in emergency mode.")
# Simple interactive loop
print("Offline-First AI Assistant with Edge-Cloud Orchestration (type 'exit' to quit)")
while True:
user_input = input("\nYou: ")
if user_input.lower() == 'exit':
break
await process_user_query(user_input)
if __name__ == "__main__":
asyncio.run(offline_first_app())
Performance-Optimized Orchestration¶
This example optimizes for performance with intelligent edge-cloud orchestration:
import asyncio
from oblix import OblixClient, ModelType
from oblix.agents import ResourceMonitor, ConnectivityAgent
async def performance_optimized_app():
# Initialize client
client = OblixClient(oblix_api_key="your_oblix_api_key")
# Fast edge model for quick responses
await client.hook_model(
model_type=ModelType.OLLAMA,
model_name="phi" # Small, fast model
)
# Powerful cloud model for complex tasks
await client.hook_model(
model_type=ModelType.OPENAI,
model_name="gpt-3.5-turbo",
api_key="your_openai_api_key"
)
# Add resource monitoring for edge execution decisions
resource_monitor = ResourceMonitor()
client.hook_agent(resource_monitor)
# Add connectivity monitoring with performance-oriented thresholds
connectivity_agent = ConnectivityAgent(
latency_threshold=100.0, # Lower threshold for better performance
check_interval=15 # More frequent checks
)
client.hook_agent(connectivity_agent)
# Example task classification function
def classify_task_complexity(prompt):
# Simple heuristic: longer prompts or certain keywords indicate complexity
complexity_keywords = [
"analyze", "compare", "explain", "discuss", "evaluate",
"synthesize", "recommend", "critique"
]
# Check prompt length and keywords
is_complex = len(prompt.split()) > 20 or any(kw in prompt.lower() for kw in complexity_keywords)
return "complex" if is_complex else "simple"
# Process tasks based on complexity
async def process_task(prompt):
task_type = classify_task_complexity(prompt)
if task_type == "simple":
# For simple tasks, explicitly use the faster edge model
print("Simple task detected, using edge model for speed")
response = await client.execute(prompt, model_id="ollama:phi")
else:
# For complex tasks, let Oblix orchestrate based on current conditions
print("Complex task detected, using intelligent orchestration")
response = await client.execute(prompt)
print(f"Response from {response['model_id']}:")
print(response["response"])
# Performance metrics
metrics = response.get("metrics", {})
if metrics:
latency = metrics.get("total_latency")
if latency:
print(f"Response time: {latency:.2f} seconds")
# Example tasks
tasks = [
"What is 15 + 27?",
"What's the capital of Japan?",
"Explain the theory of relativity and its implications for modern physics, including experimental evidence.",
"Recommend a strategy for implementing a microservice architecture in a legacy monolithic application."
]
# Process all tasks
for task in tasks:
print(f"\nTask: {task}")
await process_task(task)
if __name__ == "__main__":
asyncio.run(performance_optimized_app())
Cost-Optimized Orchestration¶
This example optimizes for cost with intelligent edge-cloud orchestration:
import asyncio
import time
from oblix import OblixClient, ModelType
from oblix.agents import ResourceMonitor, ConnectivityAgent
from oblix.agents.base import BaseAgent
from typing import Dict, Any
# Custom agent for cost optimization
class CostOptimizationAgent(BaseAgent):
def __init__(self, name: str = "cost_optimizer",
daily_budget: float = 5.0):
super().__init__(name)
self.daily_budget = daily_budget
self.daily_spend = 0.0
self.reset_time = time.time() + 86400 # 24 hours from now
# Estimated cost per 1K tokens
self.cost_per_model = {
"openai:gpt-3.5-turbo": 0.0015,
"openai:gpt-4": 0.03,
"claude:claude-3-opus-20240229": 0.015,
"claude:claude-3-sonnet-20240229": 0.003,
"ollama:llama2": 0.0,
"ollama:phi": 0.0
}
async def initialize(self) -> bool:
self.is_active = True
return True
def _estimate_cost(self, model_id: str, input_tokens: int, output_tokens: int) -> float:
"""Estimate cost for a model execution"""
if model_id not in self.cost_per_model:
return 0.0
cost_per_1k = self.cost_per_model[model_id]
total_tokens = input_tokens + output_tokens
return (total_tokens / 1000) * cost_per_1k
def _update_spend(self, model_id: str, input_tokens: int, output_tokens: int):
"""Update running spend total"""
# Reset daily budget if needed
if time.time() > self.reset_time:
self.daily_spend = 0.0
self.reset_time = time.time() + 86400
# Add estimated cost
self.daily_spend += self._estimate_cost(model_id, input_tokens, output_tokens)
async def check(self, **kwargs) -> Dict[str, Any]:
# Get prompt info if available
prompt = kwargs.get("prompt", "")
# Check budget status
budget_percentage = (self.daily_spend / self.daily_budget) * 100
if budget_percentage > 90:
# Critical budget: prioritize edge models
return {
"proceed": True,
"state": "budget_critical",
"target": "local",
"reason": f"Budget nearly depleted: ${self.daily_spend:.2f}/${self.daily_budget:.2f}",
"metrics": {
"budget_percentage": budget_percentage,
"daily_spend": self.daily_spend,
"daily_budget": self.daily_budget
}
}
elif budget_percentage > 70:
# High budget use: prefer edge but allow cloud for complex tasks
return {
"proceed": True,
"state": "budget_constrained",
"target": "hybrid",
"reason": f"Budget usage high: ${self.daily_spend:.2f}/${self.daily_budget:.2f}",
"metrics": {
"budget_percentage": budget_percentage,
"daily_spend": self.daily_spend,
"daily_budget": self.daily_budget
}
}
else:
# Budget available: make recommendations based on estimated complexity
# Simple heuristic: longer prompts may need more powerful models
is_complex = len(prompt.split()) > 25
return {
"proceed": True,
"state": "budget_available",
"target": "cloud" if is_complex else "local",
"reason": f"Budget available: ${self.daily_spend:.2f}/${self.daily_budget:.2f}",
"metrics": {
"budget_percentage": budget_percentage,
"daily_spend": self.daily_spend,
"daily_budget": self.daily_budget,
"estimated_complexity": "complex" if is_complex else "simple"
}
}
async def shutdown(self) -> None:
self.is_active = False
async def cost_optimized_app():
# Initialize client
client = OblixClient(oblix_api_key="your_oblix_api_key")
# Hook models for orchestration
# Tier 1: Free edge model (no cost)
await client.hook_model(
model_type=ModelType.OLLAMA,
model_name="llama2"
)
# Tier 2: Economy cloud model (low cost)
await client.hook_model(
model_type=ModelType.OPENAI,
model_name="gpt-3.5-turbo",
api_key="your_openai_api_key"
)
# Tier 3: Premium cloud model (higher cost, only used when necessary)
await client.hook_model(
model_type=ModelType.OPENAI,
model_name="gpt-4",
api_key="your_openai_api_key"
)
# Add standard monitoring agents
client.hook_agent(ResourceMonitor())
client.hook_agent(ConnectivityAgent())
# Add cost optimization agent
cost_agent = CostOptimizationAgent(daily_budget=5.0) # $5 daily budget
client.hook_agent(cost_agent)
# Test with various queries
test_queries = [
"What time is it?", # Very simple
"Explain how photosynthesis works", # Moderate
"Write a detailed analysis of sustainable energy solutions for developing nations", # Complex
]
for query in test_queries:
print(f"\n--- Query: {query}")
# Execute the query with orchestration
response = await client.execute(query)
# Print model used and cost info
model_id = response["model_id"]
print(f"Model used by orchestration: {model_id}")
# Update cost tracker with token usage
metrics = response.get("metrics", {})
input_tokens = metrics.get("input_tokens", len(query.split()) * 1.3) # Rough estimate if not available
output_tokens = metrics.get("output_tokens", len(response["response"].split()) * 1.3)
cost_agent._update_spend(model_id, input_tokens, output_tokens)
# Print budget status
budget_percentage = (cost_agent.daily_spend / cost_agent.daily_budget) * 100
print(f"Budget status: ${cost_agent.daily_spend:.4f} spent (of ${cost_agent.daily_budget:.2f} total)")
print(f"Budget usage: {budget_percentage:.1f}%")
# Abbreviated response
response_text = response["response"]
print(f"Response: {response_text[:100]}..." if len(response_text) > 100 else response_text)
print("\n--- Final Budget Status ---")
print(f"Total spend: ${cost_agent.daily_spend:.4f}")
print(f"Remaining budget: ${cost_agent.daily_budget - cost_agent.daily_spend:.4f}")
if __name__ == "__main__":
asyncio.run(cost_optimized_app())
Conclusion¶
Edge-cloud orchestration with Oblix provides several key benefits:
- Resilience: Continue operation even without internet connectivity
- Cost optimization: Use free edge models when possible, cloud models when necessary
- Performance optimization: Intelligently switch to the most appropriate model based on the task
- Privacy: Keep sensitive prompts on edge models
- Adaptability: Seamlessly switch between models based on changing conditions
By implementing these patterns, you can build AI applications that are more robust, cost-effective, and performant than those using only edge or only cloud models.