Das offizielle Python SDK für Sim ermöglicht es Ihnen, Workflows programmatisch aus Ihren Python-Anwendungen mithilfe des offiziellen Python SDKs auszuführen.
Das Python SDK unterstützt Python 3.8+ mit asynchroner Ausführungsunterstützung, automatischer Ratenbegrenzung mit exponentiellem Backoff und Nutzungsverfolgung.
Installation
Installieren Sie das SDK mit pip:
pip install simstudio-sdkSchnellstart
Hier ist ein einfaches Beispiel für den Einstieg:
from simstudio import SimStudioClient
# Initialize the client
client = SimStudioClient(
api_key="your-api-key-here",
base_url="https://sim.ai" # optional, defaults to https://sim.ai
)
# Execute a workflow
try:
result = client.execute_workflow("workflow-id")
print("Workflow executed successfully:", result)
except Exception as error:
print("Workflow execution failed:", error)API-Referenz
SimStudioClient
Konstruktor
SimStudioClient(api_key: str, base_url: str = "https://sim.ai")Parameter:
api_key(str): Ihr Sim API-Schlüsselbase_url(str, optional): Basis-URL für die Sim API
Methoden
execute_workflow()
Führt einen Workflow mit optionalen Eingabedaten aus.
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Hello, world!"},
timeout=30.0 # 30 seconds
)Parameter:
workflow_id(str): Die ID des auszuführenden Workflowsinput_data(dict, optional): Eingabedaten, die an den Workflow übergeben werdentimeout(float, optional): Timeout in Sekunden (Standard: 30.0)stream(bool, optional): Streaming-Antworten aktivieren (Standard: False)selected_outputs(list[str], optional): Block-Ausgaben, die imblockName.attributeFormat gestreamt werden sollen (z.B.["agent1.content"])async_execution(bool, optional): Asynchron ausführen (Standard: False)
Rückgabe: WorkflowExecutionResult | AsyncExecutionResult
Wenn async_execution=True, wird sofort mit einer Task-ID zum Abfragen zurückgegeben. Andernfalls wird auf den Abschluss gewartet.
get_workflow_status()
Den Status eines Workflows abrufen (Bereitstellungsstatus usw.).
status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)Parameter:
workflow_id(str): Die ID des Workflows
Rückgabe: WorkflowStatus
validate_workflow()
Überprüfen, ob ein Workflow für die Ausführung bereit ist.
is_ready = client.validate_workflow("workflow-id")
if is_ready:
# Workflow is deployed and ready
passParameter:
workflow_id(str): Die ID des Workflows
Rückgabe: bool
get_job_status()
Den Status einer asynchronen Job-Ausführung abrufen.
status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"]) # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
print("Output:", status["output"])Parameter:
task_id(str): Die Task-ID, die von der asynchronen Ausführung zurückgegeben wurde
Rückgabe: Dict[str, Any]
Antwortfelder:
success(bool): Ob die Anfrage erfolgreich wartaskId(str): Die Task-IDstatus(str): Einer der Werte'queued','processing','completed','failed','cancelled'metadata(dict): EnthältstartedAt,completedAtunddurationoutput(any, optional): Die Workflow-Ausgabe (wenn abgeschlossen)error(any, optional): Fehlerdetails (wenn fehlgeschlagen)estimatedDuration(int, optional): Geschätzte Dauer in Millisekunden (wenn in Bearbeitung/in Warteschlange)
execute_with_retry()
Einen Workflow mit automatischer Wiederholung bei Ratenbegrenzungsfehlern unter Verwendung von exponentiellem Backoff ausführen.
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Hello"},
timeout=30.0,
max_retries=3, # Maximum number of retries
initial_delay=1.0, # Initial delay in seconds
max_delay=30.0, # Maximum delay in seconds
backoff_multiplier=2.0 # Exponential backoff multiplier
)Parameter:
workflow_id(str): Die ID des auszuführenden Workflowsinput_data(dict, optional): Eingabedaten, die an den Workflow übergeben werdentimeout(float, optional): Timeout in Sekundenstream(bool, optional): Streaming-Antworten aktivierenselected_outputs(list, optional): Block-Ausgaben zum Streamenasync_execution(bool, optional): Asynchron ausführenmax_retries(int, optional): Maximale Anzahl von Wiederholungen (Standard: 3)initial_delay(float, optional): Anfängliche Verzögerung in Sekunden (Standard: 1.0)max_delay(float, optional): Maximale Verzögerung in Sekunden (Standard: 30.0)backoff_multiplier(float, optional): Backoff-Multiplikator (Standard: 2.0)
Rückgabewert: WorkflowExecutionResult | AsyncExecutionResult
Die Wiederholungslogik verwendet exponentielles Backoff (1s → 2s → 4s → 8s...) mit ±25% Jitter, um den Thundering-Herd-Effekt zu vermeiden. Wenn die API einen retry-after Header bereitstellt, wird dieser stattdessen verwendet.
get_rate_limit_info()
Ruft die aktuellen Rate-Limit-Informationen aus der letzten API-Antwort ab.
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
print("Limit:", rate_limit_info.limit)
print("Remaining:", rate_limit_info.remaining)
print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))Rückgabewert: RateLimitInfo | None
get_usage_limits()
Ruft aktuelle Nutzungslimits und Kontingentinformationen für dein Konto ab.
limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])Rückgabewert: UsageLimits
Antwortstruktur:
{
"success": bool,
"rateLimit": {
"sync": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"async": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"authType": str # 'api' or 'manual'
},
"usage": {
"currentPeriodCost": float,
"limit": float,
"plan": str # e.g., 'free', 'pro'
}
}set_api_key()
Aktualisiert den API-Schlüssel.
client.set_api_key("new-api-key")set_base_url()
Aktualisiert die Basis-URL.
client.set_base_url("https://my-custom-domain.com")close()
Schließt die zugrunde liegende HTTP-Sitzung.
client.close()Datenklassen
WorkflowExecutionResult
@dataclass
class WorkflowExecutionResult:
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[List[Any]] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[List[Any]] = None
total_duration: Optional[float] = NoneAsyncExecutionResult
@dataclass
class AsyncExecutionResult:
success: bool
task_id: str
status: str # 'queued'
created_at: str
links: Dict[str, str] # e.g., {"status": "/api/jobs/{taskId}"}WorkflowStatus
@dataclass
class WorkflowStatus:
is_deployed: bool
deployed_at: Optional[str] = None
needs_redeployment: bool = FalseRateLimitInfo
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
retry_after: Optional[int] = NoneUsageLimits
@dataclass
class UsageLimits:
success: bool
rate_limit: Dict[str, Any]
usage: Dict[str, Any]SimStudioError
class SimStudioError(Exception):
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = statusHäufige Fehlercodes:
UNAUTHORIZED: Ungültiger API-SchlüsselTIMEOUT: Zeitüberschreitung bei der AnfrageRATE_LIMIT_EXCEEDED: Ratengrenze überschrittenUSAGE_LIMIT_EXCEEDED: Nutzungsgrenze überschrittenEXECUTION_ERROR: Workflow-Ausführung fehlgeschlagen
Beispiele
Grundlegende Workflow-Ausführung
Richten Sie den SimStudioClient mit Ihrem API-Schlüssel ein.
Prüfen Sie, ob der Workflow bereitgestellt und für die Ausführung bereit ist.
Führen Sie den Workflow mit Ihren Eingabedaten aus.
Verarbeiten Sie das Ausführungsergebnis und behandeln Sie eventuelle Fehler.
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def run_workflow():
try:
# Check if workflow is ready
is_ready = client.validate_workflow("my-workflow-id")
if not is_ready:
raise Exception("Workflow is not deployed or ready")
# Execute the workflow
result = client.execute_workflow(
"my-workflow-id",
input_data={
"message": "Process this data",
"user_id": "12345"
}
)
if result.success:
print("Output:", result.output)
print("Duration:", result.metadata.get("duration") if result.metadata else None)
else:
print("Workflow failed:", result.error)
except Exception as error:
print("Error:", error)
run_workflow()Fehlerbehandlung
Behandeln Sie verschiedene Fehlertypen, die während der Workflow-Ausführung auftreten können:
from simstudio import SimStudioClient, SimStudioError
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_error_handling():
try:
result = client.execute_workflow("workflow-id")
return result
except SimStudioError as error:
if error.code == "UNAUTHORIZED":
print("Invalid API key")
elif error.code == "TIMEOUT":
print("Workflow execution timed out")
elif error.code == "USAGE_LIMIT_EXCEEDED":
print("Usage limit exceeded")
elif error.code == "INVALID_JSON":
print("Invalid JSON in request body")
else:
print(f"Workflow error: {error}")
raise
except Exception as error:
print(f"Unexpected error: {error}")
raiseVerwendung des Kontextmanagers
Verwenden Sie den Client als Kontextmanager, um die Ressourcenbereinigung automatisch zu handhaben:
---CODE-PLACEHOLDER-ef99d3dd509e04865d5b6b0e0e03d3f8---
Batch-Workflow-Ausführung
Führen Sie mehrere Workflows effizient aus:
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_workflows_batch(workflow_data_pairs):
"""Execute multiple workflows with different input data."""
results = []
for workflow_id, input_data in workflow_data_pairs:
try:
# Validate workflow before execution
if not client.validate_workflow(workflow_id):
print(f"Skipping {workflow_id}: not deployed")
continue
result = client.execute_workflow(workflow_id, input_data)
results.append({
"workflow_id": workflow_id,
"success": result.success,
"output": result.output,
"error": result.error
})
except Exception as error:
results.append({
"workflow_id": workflow_id,
"success": False,
"error": str(error)
})
return results
# Example usage
workflows = [
("workflow-1", {"type": "analysis", "data": "sample1"}),
("workflow-2", {"type": "processing", "data": "sample2"}),
]
results = execute_workflows_batch(workflows)
for result in results:
print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")Asynchrone Workflow-Ausführung
Führen Sie Workflows asynchron für lang laufende Aufgaben aus:
import os
import time
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_async():
try:
# Start async execution
result = client.execute_workflow(
"workflow-id",
input_data={"data": "large dataset"},
async_execution=True # Execute asynchronously
)
# Check if result is an async execution
if hasattr(result, 'task_id'):
print(f"Task ID: {result.task_id}")
print(f"Status endpoint: {result.links['status']}")
# Poll for completion
status = client.get_job_status(result.task_id)
while status["status"] in ["queued", "processing"]:
print(f"Current status: {status['status']}")
time.sleep(2) # Wait 2 seconds
status = client.get_job_status(result.task_id)
if status["status"] == "completed":
print("Workflow completed!")
print(f"Output: {status['output']}")
print(f"Duration: {status['metadata']['duration']}")
else:
print(f"Workflow failed: {status['error']}")
except Exception as error:
print(f"Error: {error}")
execute_async()Rate-Limiting und Wiederholungsversuche
Behandle Rate-Limits automatisch mit exponentiellem Backoff:
import os
from simstudio import SimStudioClient, SimStudioError
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_retry_handling():
try:
# Automatically retries on rate limit
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Process this"},
max_retries=5,
initial_delay=1.0,
max_delay=60.0,
backoff_multiplier=2.0
)
print(f"Success: {result}")
except SimStudioError as error:
if error.code == "RATE_LIMIT_EXCEEDED":
print("Rate limit exceeded after all retries")
# Check rate limit info
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
from datetime import datetime
reset_time = datetime.fromtimestamp(rate_limit_info.reset)
print(f"Rate limit resets at: {reset_time}")
execute_with_retry_handling()Nutzungsüberwachung
Überwache deine Kontonutzung und -limits:
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def check_usage():
try:
limits = client.get_usage_limits()
print("=== Rate Limits ===")
print("Sync requests:")
print(f" Limit: {limits.rate_limit['sync']['limit']}")
print(f" Remaining: {limits.rate_limit['sync']['remaining']}")
print(f" Resets at: {limits.rate_limit['sync']['resetAt']}")
print(f" Is limited: {limits.rate_limit['sync']['isLimited']}")
print("\nAsync requests:")
print(f" Limit: {limits.rate_limit['async']['limit']}")
print(f" Remaining: {limits.rate_limit['async']['remaining']}")
print(f" Resets at: {limits.rate_limit['async']['resetAt']}")
print(f" Is limited: {limits.rate_limit['async']['isLimited']}")
print("\n=== Usage ===")
print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
print(f"Limit: ${limits.usage['limit']:.2f}")
print(f"Plan: {limits.usage['plan']}")
percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
print(f"Usage: {percent_used:.1f}%")
if percent_used > 80:
print("⚠️ Warning: You are approaching your usage limit!")
except Exception as error:
print(f"Error checking usage: {error}")
check_usage()Streaming-Workflow-Ausführung
Führe Workflows mit Echtzeit-Streaming-Antworten aus:
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_streaming():
"""Execute workflow with streaming enabled."""
try:
# Enable streaming for specific block outputs
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Count to five"},
stream=True,
selected_outputs=["agent1.content"] # Use blockName.attribute format
)
print("Workflow result:", result)
except Exception as error:
print("Error:", error)
execute_with_streaming()Die Streaming-Antwort folgt dem Server-Sent Events (SSE) Format:
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}
data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}
data: [DONE]Flask-Streaming-Beispiel:
from flask import Flask, Response, stream_with_context
import requests
import json
import os
app = Flask(__name__)
@app.route('/stream-workflow')
def stream_workflow():
"""Stream workflow execution to the client."""
def generate():
response = requests.post(
'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
headers={
'Content-Type': 'application/json',
'X-API-Key': os.getenv('SIM_API_KEY')
},
json={
'message': 'Generate a story',
'stream': True,
'selectedOutputs': ['agent1.content']
},
stream=True
)
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
data = decoded_line[6:] # Remove 'data: ' prefix
if data == '[DONE]':
break
try:
parsed = json.loads(data)
if 'chunk' in parsed:
yield f"data: {json.dumps(parsed)}\n\n"
elif parsed.get('event') == 'done':
yield f"data: {json.dumps(parsed)}\n\n"
print("Execution complete:", parsed.get('metadata'))
except json.JSONDecodeError:
pass
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
if __name__ == '__main__':
app.run(debug=True)Umgebungskonfiguration
Konfiguriere den Client mit Umgebungsvariablen:
import os
from simstudio import SimStudioClient
# Development configuration
client = SimStudioClient(
api_key=os.getenv("SIM_API_KEY")
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)import os
from simstudio import SimStudioClient
# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
raise ValueError("SIM_API_KEY environment variable is required")
client = SimStudioClient(
api_key=api_key,
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)API-Schlüssel erhalten
Navigiere zu Sim und melde dich bei deinem Konto an.
Navigiere zu dem Workflow, den du programmatisch ausführen möchtest.
Klicke auf "Deploy", um deinen Workflow zu deployen, falls dies noch nicht geschehen ist.
Wähle während des Deployment-Prozesses einen API-Schlüssel aus oder erstelle einen neuen.
Kopiere den API-Schlüssel zur Verwendung in deiner Python-Anwendung.
Anforderungen
- Python 3.8+
- requests >= 2.25.0
Lizenz
Apache-2.0