Sim

Python

The official Python SDK for Sim allows you to execute workflows programmatically from your Python applications using the official Python SDK.

The Python SDK supports Python 3.8+ with async execution support, automatic rate limiting with exponential backoff, and usage tracking.

Installation

Install the SDK using pip:

pip install simstudio-sdk

Quick Start

Here's a simple example to get you started:

from simstudio import SimStudioClient

# Initialize the client
client = SimStudioClient(
    api_key="your-api-key-here",
    base_url="https://sim.ai"  # optional, defaults to https://sim.ai
)

# Execute a workflow
try:
    result = client.execute_workflow("workflow-id")
    print("Workflow executed successfully:", result)
except Exception as error:
    print("Workflow execution failed:", error)

API Reference

SimStudioClient

Constructor

SimStudioClient(api_key: str, base_url: str = "https://sim.ai")

Parameters:

  • api_key (str): Your Sim API key
  • base_url (str, optional): Base URL for the Sim API

Methods

execute_workflow()

Execute a workflow with optional input data.

result = client.execute_workflow(
    "workflow-id",
    input_data={"message": "Hello, world!"},
    timeout=30.0  # 30 seconds
)

Parameters:

  • workflow_id (str): The ID of the workflow to execute
  • input_data (dict, optional): Input data to pass to the workflow
  • timeout (float, optional): Timeout in seconds (default: 30.0)
  • stream (bool, optional): Enable streaming responses (default: False)
  • selected_outputs (list[str], optional): Block outputs to stream in blockName.attribute format (e.g., ["agent1.content"])
  • async_execution (bool, optional): Execute asynchronously (default: False)

Returns: WorkflowExecutionResult | AsyncExecutionResult

When async_execution=True, returns immediately with a task ID for polling. Otherwise, waits for completion.

get_workflow_status()

Get the status of a workflow (deployment status, etc.).

status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)

Parameters:

  • workflow_id (str): The ID of the workflow

Returns: WorkflowStatus

validate_workflow()

Validate that a workflow is ready for execution.

is_ready = client.validate_workflow("workflow-id")
if is_ready:
    # Workflow is deployed and ready
    pass

Parameters:

  • workflow_id (str): The ID of the workflow

Returns: bool

get_job_status()

Get the status of an async job execution.

status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"])  # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
    print("Output:", status["output"])

Parameters:

  • task_id (str): The task ID returned from async execution

Returns: Dict[str, Any]

Response fields:

  • success (bool): Whether the request was successful
  • taskId (str): The task ID
  • status (str): One of 'queued', 'processing', 'completed', 'failed', 'cancelled'
  • metadata (dict): Contains startedAt, completedAt, and duration
  • output (any, optional): The workflow output (when completed)
  • error (any, optional): Error details (when failed)
  • estimatedDuration (int, optional): Estimated duration in milliseconds (when processing/queued)
execute_with_retry()

Execute a workflow with automatic retry on rate limit errors using exponential backoff.

result = client.execute_with_retry(
    "workflow-id",
    input_data={"message": "Hello"},
    timeout=30.0,
    max_retries=3,           # Maximum number of retries
    initial_delay=1.0,       # Initial delay in seconds
    max_delay=30.0,          # Maximum delay in seconds
    backoff_multiplier=2.0   # Exponential backoff multiplier
)

Parameters:

  • workflow_id (str): The ID of the workflow to execute
  • input_data (dict, optional): Input data to pass to the workflow
  • timeout (float, optional): Timeout in seconds
  • stream (bool, optional): Enable streaming responses
  • selected_outputs (list, optional): Block outputs to stream
  • async_execution (bool, optional): Execute asynchronously
  • max_retries (int, optional): Maximum number of retries (default: 3)
  • initial_delay (float, optional): Initial delay in seconds (default: 1.0)
  • max_delay (float, optional): Maximum delay in seconds (default: 30.0)
  • backoff_multiplier (float, optional): Backoff multiplier (default: 2.0)

Returns: WorkflowExecutionResult | AsyncExecutionResult

The retry logic uses exponential backoff (1s → 2s → 4s → 8s...) with ±25% jitter to prevent thundering herd. If the API provides a retry-after header, it will be used instead.

get_rate_limit_info()

Get the current rate limit information from the last API response.

rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
    print("Limit:", rate_limit_info.limit)
    print("Remaining:", rate_limit_info.remaining)
    print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))

Returns: RateLimitInfo | None

get_usage_limits()

Get current usage limits and quota information for your account.

limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])

Returns: UsageLimits

Response structure:

{
    "success": bool,
    "rateLimit": {
        "sync": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "async": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "authType": str  # 'api' or 'manual'
    },
    "usage": {
        "currentPeriodCost": float,
        "limit": float,
        "plan": str  # e.g., 'free', 'pro'
    }
}
set_api_key()

Update the API key.

client.set_api_key("new-api-key")
set_base_url()

Update the base URL.

client.set_base_url("https://my-custom-domain.com")
close()

Close the underlying HTTP session.

client.close()

Data Classes

WorkflowExecutionResult

@dataclass
class WorkflowExecutionResult:
    success: bool
    output: Optional[Any] = None
    error: Optional[str] = None
    logs: Optional[List[Any]] = None
    metadata: Optional[Dict[str, Any]] = None
    trace_spans: Optional[List[Any]] = None
    total_duration: Optional[float] = None

AsyncExecutionResult

@dataclass
class AsyncExecutionResult:
    success: bool
    task_id: str
    status: str  # 'queued'
    created_at: str
    links: Dict[str, str]  # e.g., {"status": "/api/jobs/{taskId}"}

WorkflowStatus

@dataclass
class WorkflowStatus:
    is_deployed: bool
    deployed_at: Optional[str] = None
    is_published: bool = False
    needs_redeployment: bool = False

RateLimitInfo

@dataclass
class RateLimitInfo:
    limit: int
    remaining: int
    reset: int
    retry_after: Optional[int] = None

UsageLimits

@dataclass
class UsageLimits:
    success: bool
    rate_limit: Dict[str, Any]
    usage: Dict[str, Any]

SimStudioError

class SimStudioError(Exception):
    def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
        super().__init__(message)
        self.code = code
        self.status = status

Common error codes:

  • UNAUTHORIZED: Invalid API key
  • TIMEOUT: Request timed out
  • RATE_LIMIT_EXCEEDED: Rate limit exceeded
  • USAGE_LIMIT_EXCEEDED: Usage limit exceeded
  • EXECUTION_ERROR: Workflow execution failed

Examples

Basic Workflow Execution

Set up the SimStudioClient with your API key.

Check if the workflow is deployed and ready for execution.

Run the workflow with your input data.

Process the execution result and handle any errors.

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def run_workflow():
    try:
        # Check if workflow is ready
        is_ready = client.validate_workflow("my-workflow-id")
        if not is_ready:
            raise Exception("Workflow is not deployed or ready")

        # Execute the workflow
        result = client.execute_workflow(
            "my-workflow-id",
            input_data={
                "message": "Process this data",
                "user_id": "12345"
            }
        )

        if result.success:
            print("Output:", result.output)
            print("Duration:", result.metadata.get("duration") if result.metadata else None)
        else:
            print("Workflow failed:", result.error)
            
    except Exception as error:
        print("Error:", error)

run_workflow()

Error Handling

Handle different types of errors that may occur during workflow execution:

from simstudio import SimStudioClient, SimStudioError
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_error_handling():
    try:
        result = client.execute_workflow("workflow-id")
        return result
    except SimStudioError as error:
        if error.code == "UNAUTHORIZED":
            print("Invalid API key")
        elif error.code == "TIMEOUT":
            print("Workflow execution timed out")
        elif error.code == "USAGE_LIMIT_EXCEEDED":
            print("Usage limit exceeded")
        elif error.code == "INVALID_JSON":
            print("Invalid JSON in request body")
        else:
            print(f"Workflow error: {error}")
        raise
    except Exception as error:
        print(f"Unexpected error: {error}")
        raise

Context Manager Usage

Use the client as a context manager to automatically handle resource cleanup:

from simstudio import SimStudioClient
import os

# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
    result = client.execute_workflow("workflow-id")
    print("Result:", result)
# Session is automatically closed here

Batch Workflow Execution

Execute multiple workflows efficiently:

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))   

def execute_workflows_batch(workflow_data_pairs):
    """Execute multiple workflows with different input data."""
    results = []
    
    for workflow_id, input_data in workflow_data_pairs:
        try:
            # Validate workflow before execution
            if not client.validate_workflow(workflow_id):
                print(f"Skipping {workflow_id}: not deployed")
                continue
                
            result = client.execute_workflow(workflow_id, input_data)
            results.append({
                "workflow_id": workflow_id,
                "success": result.success,
                "output": result.output,
                "error": result.error
            })
            
        except Exception as error:
            results.append({
                "workflow_id": workflow_id,
                "success": False,
                "error": str(error)
            })
    
    return results

# Example usage
workflows = [
    ("workflow-1", {"type": "analysis", "data": "sample1"}),
    ("workflow-2", {"type": "processing", "data": "sample2"}),
]

results = execute_workflows_batch(workflows)
for result in results:
    print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")

Async Workflow Execution

Execute workflows asynchronously for long-running tasks:

import os
import time
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_async():
    try:
        # Start async execution
        result = client.execute_workflow(
            "workflow-id",
            input_data={"data": "large dataset"},
            async_execution=True  # Execute asynchronously
        )

        # Check if result is an async execution
        if hasattr(result, 'task_id'):
            print(f"Task ID: {result.task_id}")
            print(f"Status endpoint: {result.links['status']}")

            # Poll for completion
            status = client.get_job_status(result.task_id)

            while status["status"] in ["queued", "processing"]:
                print(f"Current status: {status['status']}")
                time.sleep(2)  # Wait 2 seconds
                status = client.get_job_status(result.task_id)

            if status["status"] == "completed":
                print("Workflow completed!")
                print(f"Output: {status['output']}")
                print(f"Duration: {status['metadata']['duration']}")
            else:
                print(f"Workflow failed: {status['error']}")

    except Exception as error:
        print(f"Error: {error}")

execute_async()

Rate Limiting and Retry

Handle rate limits automatically with exponential backoff:

import os
from simstudio import SimStudioClient, SimStudioError

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_retry_handling():
    try:
        # Automatically retries on rate limit
        result = client.execute_with_retry(
            "workflow-id",
            input_data={"message": "Process this"},
            max_retries=5,
            initial_delay=1.0,
            max_delay=60.0,
            backoff_multiplier=2.0
        )

        print(f"Success: {result}")
    except SimStudioError as error:
        if error.code == "RATE_LIMIT_EXCEEDED":
            print("Rate limit exceeded after all retries")

            # Check rate limit info
            rate_limit_info = client.get_rate_limit_info()
            if rate_limit_info:
                from datetime import datetime
                reset_time = datetime.fromtimestamp(rate_limit_info.reset)
                print(f"Rate limit resets at: {reset_time}")

execute_with_retry_handling()

Usage Monitoring

Monitor your account usage and limits:

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def check_usage():
    try:
        limits = client.get_usage_limits()

        print("=== Rate Limits ===")
        print("Sync requests:")
        print(f"  Limit: {limits.rate_limit['sync']['limit']}")
        print(f"  Remaining: {limits.rate_limit['sync']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['sync']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['sync']['isLimited']}")

        print("\nAsync requests:")
        print(f"  Limit: {limits.rate_limit['async']['limit']}")
        print(f"  Remaining: {limits.rate_limit['async']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['async']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['async']['isLimited']}")

        print("\n=== Usage ===")
        print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
        print(f"Limit: ${limits.usage['limit']:.2f}")
        print(f"Plan: {limits.usage['plan']}")

        percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
        print(f"Usage: {percent_used:.1f}%")

        if percent_used > 80:
            print("⚠️  Warning: You are approaching your usage limit!")

    except Exception as error:
        print(f"Error checking usage: {error}")

check_usage()

Streaming Workflow Execution

Execute workflows with real-time streaming responses:

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_streaming():
    """Execute workflow with streaming enabled."""
    try:
        # Enable streaming for specific block outputs
        result = client.execute_workflow(
            "workflow-id",
            input_data={"message": "Count to five"},
            stream=True,
            selected_outputs=["agent1.content"]  # Use blockName.attribute format
        )

        print("Workflow result:", result)
    except Exception as error:
        print("Error:", error)

execute_with_streaming()

The streaming response follows the Server-Sent Events (SSE) format:

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}

data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}

data: [DONE]

Flask Streaming Example:

from flask import Flask, Response, stream_with_context
import requests
import json
import os

app = Flask(__name__)

@app.route('/stream-workflow')
def stream_workflow():
    """Stream workflow execution to the client."""

    def generate():
        response = requests.post(
            'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
            headers={
                'Content-Type': 'application/json',
                'X-API-Key': os.getenv('SIM_API_KEY')
            },
            json={
                'message': 'Generate a story',
                'stream': True,
                'selectedOutputs': ['agent1.content']
            },
            stream=True
        )

        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith('data: '):
                    data = decoded_line[6:]  # Remove 'data: ' prefix

                    if data == '[DONE]':
                        break

                    try:
                        parsed = json.loads(data)
                        if 'chunk' in parsed:
                            yield f"data: {json.dumps(parsed)}\n\n"
                        elif parsed.get('event') == 'done':
                            yield f"data: {json.dumps(parsed)}\n\n"
                            print("Execution complete:", parsed.get('metadata'))
                    except json.JSONDecodeError:
                        pass

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

if __name__ == '__main__':
    app.run(debug=True)

Environment Configuration

Configure the client using environment variables:

import os
from simstudio import SimStudioClient

# Development configuration
client = SimStudioClient(
    api_key=os.getenv("SIM_API_KEY")
    base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
import os
from simstudio import SimStudioClient

# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
    raise ValueError("SIM_API_KEY environment variable is required")

client = SimStudioClient(
    api_key=api_key,
    base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)

Getting Your API Key

Navigate to Sim and log in to your account.

Navigate to the workflow you want to execute programmatically.

Click on "Deploy" to deploy your workflow if it hasn't been deployed yet.

During the deployment process, select or create an API key.

Copy the API key to use in your Python application.

Requirements

  • Python 3.8+
  • requests >= 2.25.0

License

Apache-2.0