Python

Das offizielle Python SDK für Sim ermöglicht es Ihnen, Workflows programmatisch aus Ihren Python-Anwendungen heraus mit dem offiziellen Python SDK auszuführen.

Das Python SDK unterstützt Python 3.8+ mit Unterstützung für asynchrone Ausführung, automatischer Ratenbegrenzung mit exponentiellem Backoff und Nutzungsverfolgung.

Installation

Installieren Sie das SDK mit pip:

pip install simstudio-sdk

Schnellstart

Hier ist ein einfaches Beispiel für den Einstieg:

from simstudio import SimStudioClient

# Initialize the client
client = SimStudioClient(
    api_key="your-api-key-here",
    base_url="https://sim.ai"  # optional, defaults to https://sim.ai
)

# Execute a workflow
try:
    result = client.execute_workflow("workflow-id")
    print("Workflow executed successfully:", result)
except Exception as error:
    print("Workflow execution failed:", error)

API-Referenz

SimStudioClient

Konstruktor

SimStudioClient(api_key: str, base_url: str = "https://sim.ai")

Parameter:

  • api_key (str): Ihr Sim API-Schlüssel
  • base_url (str, optional): Basis-URL für die Sim API

Methoden

execute_workflow()

Führt einen Workflow mit optionalen Eingabedaten aus.

result = client.execute_workflow(
    "workflow-id",
    input_data={"message": "Hello, world!"},
    timeout=30.0  # 30 seconds
)

Parameter:

  • workflow_id (str): Die ID des auszuführenden Workflows
  • input_data (dict, optional): Eingabedaten, die an den Workflow übergeben werden
  • timeout (float, optional): Timeout in Sekunden (Standard: 30.0)
  • stream (bool, optional): Streaming-Antworten aktivieren (Standard: False)
  • selected_outputs (list[str], optional): Block-Ausgaben zum Streamen im Format blockName.attribute (z. B. ["agent1.content"])
  • async_execution (bool, optional): Asynchron ausführen (Standard: False)

Rückgabewert: WorkflowExecutionResult | AsyncExecutionResult

Wenn async_execution=True, wird sofort mit einer Task-ID zum Polling zurückgegeben. Andernfalls wird auf die Fertigstellung gewartet.

get_workflow_status()

Ruft den Status eines Workflows ab (Deployment-Status usw.).

status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)

Parameter:

  • workflow_id (str): Die ID des Workflows

Rückgabe: WorkflowStatus

validate_workflow()

Überprüft, ob ein Workflow zur Ausführung bereit ist.

is_ready = client.validate_workflow("workflow-id")
if is_ready:
    # Workflow is deployed and ready
    pass

Parameter:

  • workflow_id (str): Die ID des Workflows

Rückgabe: bool

get_job_status()

Ruft den Status einer asynchronen Job-Ausführung ab.

status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"])  # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
    print("Output:", status["output"])

Parameter:

  • task_id (str): Die Task-ID, die von der asynchronen Ausführung zurückgegeben wurde

Rückgabe: Dict[str, Any]

Antwortfelder:

  • success (bool): Ob die Anfrage erfolgreich war
  • taskId (str): Die Task-ID
  • status (str): Einer von 'queued', 'processing', 'completed', 'failed', 'cancelled'
  • metadata (dict): Enthält startedAt, completedAt und duration
  • output (any, optional): Die Workflow-Ausgabe (wenn abgeschlossen)
  • error (any, optional): Fehlerdetails (wenn fehlgeschlagen)
  • estimatedDuration (int, optional): Geschätzte Dauer in Millisekunden (wenn in Bearbeitung/in Warteschlange)
execute_with_retry()

Führt einen Workflow mit automatischer Wiederholung bei Rate-Limit-Fehlern unter Verwendung von exponentiellem Backoff aus.

result = client.execute_with_retry(
    "workflow-id",
    input_data={"message": "Hello"},
    timeout=30.0,
    max_retries=3,           # Maximum number of retries
    initial_delay=1.0,       # Initial delay in seconds
    max_delay=30.0,          # Maximum delay in seconds
    backoff_multiplier=2.0   # Exponential backoff multiplier
)

Parameter:

  • workflow_id (str): Die ID des auszuführenden Workflows
  • input_data (dict, optional): Eingabedaten, die an den Workflow übergeben werden
  • timeout (float, optional): Timeout in Sekunden
  • stream (bool, optional): Streaming-Antworten aktivieren
  • selected_outputs (list, optional): Block-Ausgaben zum Streamen
  • async_execution (bool, optional): Asynchron ausführen
  • max_retries (int, optional): Maximale Anzahl von Wiederholungen (Standard: 3)
  • initial_delay (float, optional): Anfangsverzögerung in Sekunden (Standard: 1.0)
  • max_delay (float, optional): Maximale Verzögerung in Sekunden (Standard: 30.0)
  • backoff_multiplier (float, optional): Backoff-Multiplikator (Standard: 2.0)

Rückgabe: WorkflowExecutionResult | AsyncExecutionResult

Die Wiederholungslogik verwendet exponentielles Backoff (1s → 2s → 4s → 8s...) mit ±25% Jitter, um Thundering Herd zu verhindern. Wenn die API einen retry-after-Header bereitstellt, wird dieser stattdessen verwendet.

get_rate_limit_info()

Ruft die aktuellen Rate-Limit-Informationen aus der letzten API-Antwort ab.

rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
    print("Limit:", rate_limit_info.limit)
    print("Remaining:", rate_limit_info.remaining)
    print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))

Rückgabewert: RateLimitInfo | None

get_usage_limits()

Ruft aktuelle Nutzungslimits und Kontingentinformationen für Ihr Konto ab.

limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])

Rückgabewert: UsageLimits

Antwortstruktur:

{
    "success": bool,
    "rateLimit": {
        "sync": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "async": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "authType": str  # 'api' or 'manual'
    },
    "usage": {
        "currentPeriodCost": float,
        "limit": float,
        "plan": str  # e.g., 'free', 'pro'
    }
}
set_api_key()

Aktualisiert den API-Schlüssel.

client.set_api_key("new-api-key")
set_base_url()

Aktualisiert die Basis-URL.

client.set_base_url("https://my-custom-domain.com")
close()

Schließt die zugrunde liegende HTTP-Sitzung.

client.close()

Datenklassen

WorkflowExecutionResult

@dataclass
class WorkflowExecutionResult:
    success: bool
    output: Optional[Any] = None
    error: Optional[str] = None
    logs: Optional[List[Any]] = None
    metadata: Optional[Dict[str, Any]] = None
    trace_spans: Optional[List[Any]] = None
    total_duration: Optional[float] = None

AsyncExecutionResult

@dataclass
class AsyncExecutionResult:
    success: bool
    task_id: str
    status: str  # 'queued'
    created_at: str
    links: Dict[str, str]  # e.g., {"status": "/api/jobs/{taskId}"}

WorkflowStatus

@dataclass
class WorkflowStatus:
    is_deployed: bool
    deployed_at: Optional[str] = None
    needs_redeployment: bool = False

RateLimitInfo

@dataclass
class RateLimitInfo:
    limit: int
    remaining: int
    reset: int
    retry_after: Optional[int] = None

UsageLimits

@dataclass
class UsageLimits:
    success: bool
    rate_limit: Dict[str, Any]
    usage: Dict[str, Any]

SimStudioError

class SimStudioError(Exception):
    def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
        super().__init__(message)
        self.code = code
        self.status = status

Häufige Fehlercodes:

  • UNAUTHORIZED: Ungültiger API-Schlüssel
  • TIMEOUT: Zeitüberschreitung der Anfrage
  • RATE_LIMIT_EXCEEDED: Ratenlimit überschritten
  • USAGE_LIMIT_EXCEEDED: Nutzungslimit überschritten
  • EXECUTION_ERROR: Workflow-Ausführung fehlgeschlagen

Beispiele

Grundlegende Workflow-Ausführung

Richten Sie den SimStudioClient mit Ihrem API-Schlüssel ein.

Prüfen Sie, ob der Workflow bereitgestellt und zur Ausführung bereit ist.

Führen Sie den Workflow mit Ihren Eingabedaten aus.

Verarbeiten Sie das Ausführungsergebnis und behandeln Sie eventuelle Fehler.

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def run_workflow():
    try:
        # Check if workflow is ready
        is_ready = client.validate_workflow("my-workflow-id")
        if not is_ready:
            raise Exception("Workflow is not deployed or ready")

        # Execute the workflow
        result = client.execute_workflow(
            "my-workflow-id",
            input_data={
                "message": "Process this data",
                "user_id": "12345"
            }
        )

        if result.success:
            print("Output:", result.output)
            print("Duration:", result.metadata.get("duration") if result.metadata else None)
        else:
            print("Workflow failed:", result.error)
            
    except Exception as error:
        print("Error:", error)

run_workflow()

Fehlerbehandlung

Behandeln Sie verschiedene Fehlertypen, die während der Workflow-Ausführung auftreten können:

from simstudio import SimStudioClient, SimStudioError
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_error_handling():
    try:
        result = client.execute_workflow("workflow-id")
        return result
    except SimStudioError as error:
        if error.code == "UNAUTHORIZED":
            print("Invalid API key")
        elif error.code == "TIMEOUT":
            print("Workflow execution timed out")
        elif error.code == "USAGE_LIMIT_EXCEEDED":
            print("Usage limit exceeded")
        elif error.code == "INVALID_JSON":
            print("Invalid JSON in request body")
        else:
            print(f"Workflow error: {error}")
        raise
    except Exception as error:
        print(f"Unexpected error: {error}")
        raise

Verwendung des Context-Managers

Verwenden Sie den Client als Context-Manager, um die Ressourcenbereinigung automatisch zu handhaben:

from simstudio import SimStudioClient
import os

# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
    result = client.execute_workflow("workflow-id")
    print("Result:", result)
# Session is automatically closed here

Batch-Workflow-Ausführung

Führen Sie mehrere Workflows effizient aus:

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))   

def execute_workflows_batch(workflow_data_pairs):
    """Execute multiple workflows with different input data."""
    results = []
    
    for workflow_id, input_data in workflow_data_pairs:
        try:
            # Validate workflow before execution
            if not client.validate_workflow(workflow_id):
                print(f"Skipping {workflow_id}: not deployed")
                continue
                
            result = client.execute_workflow(workflow_id, input_data)
            results.append({
                "workflow_id": workflow_id,
                "success": result.success,
                "output": result.output,
                "error": result.error
            })
            
        except Exception as error:
            results.append({
                "workflow_id": workflow_id,
                "success": False,
                "error": str(error)
            })
    
    return results

# Example usage
workflows = [
    ("workflow-1", {"type": "analysis", "data": "sample1"}),
    ("workflow-2", {"type": "processing", "data": "sample2"}),
]

results = execute_workflows_batch(workflows)
for result in results:
    print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")

Asynchrone Workflow-Ausführung

Führen Sie Workflows asynchron für langwierige Aufgaben aus:

import os
import time
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_async():
    try:
        # Start async execution
        result = client.execute_workflow(
            "workflow-id",
            input_data={"data": "large dataset"},
            async_execution=True  # Execute asynchronously
        )

        # Check if result is an async execution
        if hasattr(result, 'task_id'):
            print(f"Task ID: {result.task_id}")
            print(f"Status endpoint: {result.links['status']}")

            # Poll for completion
            status = client.get_job_status(result.task_id)

            while status["status"] in ["queued", "processing"]:
                print(f"Current status: {status['status']}")
                time.sleep(2)  # Wait 2 seconds
                status = client.get_job_status(result.task_id)

            if status["status"] == "completed":
                print("Workflow completed!")
                print(f"Output: {status['output']}")
                print(f"Duration: {status['metadata']['duration']}")
            else:
                print(f"Workflow failed: {status['error']}")

    except Exception as error:
        print(f"Error: {error}")

execute_async()

Ratenlimitierung und Wiederholung

Behandeln Sie Ratenbegrenzungen automatisch mit exponentiellem Backoff:

import os
from simstudio import SimStudioClient, SimStudioError

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_retry_handling():
    try:
        # Automatically retries on rate limit
        result = client.execute_with_retry(
            "workflow-id",
            input_data={"message": "Process this"},
            max_retries=5,
            initial_delay=1.0,
            max_delay=60.0,
            backoff_multiplier=2.0
        )

        print(f"Success: {result}")
    except SimStudioError as error:
        if error.code == "RATE_LIMIT_EXCEEDED":
            print("Rate limit exceeded after all retries")

            # Check rate limit info
            rate_limit_info = client.get_rate_limit_info()
            if rate_limit_info:
                from datetime import datetime
                reset_time = datetime.fromtimestamp(rate_limit_info.reset)
                print(f"Rate limit resets at: {reset_time}")

execute_with_retry_handling()

Nutzungsüberwachung

Überwachen Sie die Nutzung und Limits Ihres Kontos:

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def check_usage():
    try:
        limits = client.get_usage_limits()

        print("=== Rate Limits ===")
        print("Sync requests:")
        print(f"  Limit: {limits.rate_limit['sync']['limit']}")
        print(f"  Remaining: {limits.rate_limit['sync']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['sync']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['sync']['isLimited']}")

        print("\nAsync requests:")
        print(f"  Limit: {limits.rate_limit['async']['limit']}")
        print(f"  Remaining: {limits.rate_limit['async']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['async']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['async']['isLimited']}")

        print("\n=== Usage ===")
        print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
        print(f"Limit: ${limits.usage['limit']:.2f}")
        print(f"Plan: {limits.usage['plan']}")

        percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
        print(f"Usage: {percent_used:.1f}%")

        if percent_used > 80:
            print("⚠️  Warning: You are approaching your usage limit!")

    except Exception as error:
        print(f"Error checking usage: {error}")

check_usage()

Streaming-Workflow-Ausführung

Führen Sie Workflows mit Echtzeit-Streaming-Antworten aus:

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_streaming():
    """Execute workflow with streaming enabled."""
    try:
        # Enable streaming for specific block outputs
        result = client.execute_workflow(
            "workflow-id",
            input_data={"message": "Count to five"},
            stream=True,
            selected_outputs=["agent1.content"]  # Use blockName.attribute format
        )

        print("Workflow result:", result)
    except Exception as error:
        print("Error:", error)

execute_with_streaming()

Die Streaming-Antwort folgt dem Server-Sent-Events- (SSE-) Format:

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}

data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}

data: [DONE]

Flask-Streaming-Beispiel:

from flask import Flask, Response, stream_with_context
import requests
import json
import os

app = Flask(__name__)

@app.route('/stream-workflow')
def stream_workflow():
    """Stream workflow execution to the client."""

    def generate():
        response = requests.post(
            'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
            headers={
                'Content-Type': 'application/json',
                'X-API-Key': os.getenv('SIM_API_KEY')
            },
            json={
                'message': 'Generate a story',
                'stream': True,
                'selectedOutputs': ['agent1.content']
            },
            stream=True
        )

        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith('data: '):
                    data = decoded_line[6:]  # Remove 'data: ' prefix

                    if data == '[DONE]':
                        break

                    try:
                        parsed = json.loads(data)
                        if 'chunk' in parsed:
                            yield f"data: {json.dumps(parsed)}\n\n"
                        elif parsed.get('event') == 'done':
                            yield f"data: {json.dumps(parsed)}\n\n"
                            print("Execution complete:", parsed.get('metadata'))
                    except json.JSONDecodeError:
                        pass

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

if __name__ == '__main__':
    app.run(debug=True)

Umgebungs­konfiguration

Konfigurieren Sie den Client mit Umgebungsvariablen:

import os
from simstudio import SimStudioClient

# Development configuration
client = SimStudioClient(
    api_key=os.getenv("SIM_API_KEY")
    base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
import os
from simstudio import SimStudioClient

# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
    raise ValueError("SIM_API_KEY environment variable is required")

client = SimStudioClient(
    api_key=api_key,
    base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)

Ihren API-Schlüssel erhalten

Navigieren Sie zu Sim und melden Sie sich in Ihrem Konto an.

Navigieren Sie zu dem Workflow, den Sie programmatisch ausführen möchten.

Klicken Sie auf "Bereitstellen", um Ihren Workflow bereitzustellen, falls dies noch nicht geschehen ist.

Wählen oder erstellen Sie während des Bereitstellungsprozesses einen API-Schlüssel.

Kopieren Sie den API-Schlüssel, um ihn in Ihrer Python-Anwendung zu verwenden.

Voraussetzungen

  • Python 3.8+
  • requests >= 2.25.0

Lizenz

Apache-2.0

On this page

Start building today
Trusted by over 70,000 builders.
Build Agentic workflows visually on a drag-and-drop canvas or with natural language.
Get started