Sim
SDKs

Python

El SDK oficial de Python para Sim te permite ejecutar flujos de trabajo programáticamente desde tus aplicaciones Python utilizando el SDK oficial de Python.

El SDK de Python es compatible con Python 3.8+ con soporte para ejecución asíncrona, limitación automática de velocidad con retroceso exponencial y seguimiento de uso.

Instalación

Instala el SDK usando pip:

pip install simstudio-sdk

Inicio rápido

Aquí tienes un ejemplo sencillo para empezar:

from simstudio import SimStudioClient

# Initialize the client
client = SimStudioClient(
    api_key="your-api-key-here",
    base_url="https://sim.ai"  # optional, defaults to https://sim.ai
)

# Execute a workflow
try:
    result = client.execute_workflow("workflow-id")
    print("Workflow executed successfully:", result)
except Exception as error:
    print("Workflow execution failed:", error)

Referencia de la API

SimStudioClient

Constructor

SimStudioClient(api_key: str, base_url: str = "https://sim.ai")

Parámetros:

  • api_key (str): Tu clave API de Sim
  • base_url (str, opcional): URL base para la API de Sim

Métodos

execute_workflow()

Ejecuta un flujo de trabajo con datos de entrada opcionales.

result = client.execute_workflow(
    "workflow-id",
    input_data={"message": "Hello, world!"},
    timeout=30.0  # 30 seconds
)

Parámetros:

  • workflow_id (str): El ID del flujo de trabajo a ejecutar
  • input_data (dict, opcional): Datos de entrada para pasar al flujo de trabajo
  • timeout (float, opcional): Tiempo de espera en segundos (predeterminado: 30.0)
  • stream (bool, opcional): Habilitar respuestas en streaming (predeterminado: False)
  • selected_outputs (list[str], opcional): Salidas de bloque para transmitir en formato blockName.attribute (p. ej., ["agent1.content"])
  • async_execution (bool, opcional): Ejecutar de forma asíncrona (predeterminado: False)

Devuelve: WorkflowExecutionResult | AsyncExecutionResult

Cuando async_execution=True, devuelve inmediatamente un ID de tarea para sondeo. De lo contrario, espera a que se complete.

get_workflow_status()

Obtener el estado de un flujo de trabajo (estado de implementación, etc.).

status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)

Parámetros:

  • workflow_id (str): El ID del flujo de trabajo

Devuelve: WorkflowStatus

validate_workflow()

Validar que un flujo de trabajo está listo para su ejecución.

is_ready = client.validate_workflow("workflow-id")
if is_ready:
    # Workflow is deployed and ready
    pass

Parámetros:

  • workflow_id (str): El ID del flujo de trabajo

Devuelve: bool

get_job_status()

Obtener el estado de una ejecución de trabajo asíncrono.

status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"])  # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
    print("Output:", status["output"])

Parámetros:

  • task_id (str): El ID de tarea devuelto de la ejecución asíncrona

Devuelve: Dict[str, Any]

Campos de respuesta:

  • success (bool): Si la solicitud fue exitosa
  • taskId (str): El ID de la tarea
  • status (str): Uno de 'queued', 'processing', 'completed', 'failed', 'cancelled'
  • metadata (dict): Contiene startedAt, completedAt, y duration
  • output (any, opcional): La salida del flujo de trabajo (cuando se completa)
  • error (any, opcional): Detalles del error (cuando falla)
  • estimatedDuration (int, opcional): Duración estimada en milisegundos (cuando está procesando/en cola)
execute_with_retry()

Ejecutar un flujo de trabajo con reintento automático en errores de límite de velocidad usando retroceso exponencial.

result = client.execute_with_retry(
    "workflow-id",
    input_data={"message": "Hello"},
    timeout=30.0,
    max_retries=3,           # Maximum number of retries
    initial_delay=1.0,       # Initial delay in seconds
    max_delay=30.0,          # Maximum delay in seconds
    backoff_multiplier=2.0   # Exponential backoff multiplier
)

Parámetros:

  • workflow_id (str): El ID del flujo de trabajo a ejecutar
  • input_data (dict, opcional): Datos de entrada para pasar al flujo de trabajo
  • timeout (float, opcional): Tiempo de espera en segundos
  • stream (bool, opcional): Habilitar respuestas en streaming
  • selected_outputs (list, opcional): Salidas de bloque para transmitir
  • async_execution (bool, opcional): Ejecutar de forma asíncrona
  • max_retries (int, opcional): Número máximo de reintentos (predeterminado: 3)
  • initial_delay (float, opcional): Retraso inicial en segundos (predeterminado: 1.0)
  • max_delay (float, opcional): Retraso máximo en segundos (predeterminado: 30.0)
  • backoff_multiplier (float, opcional): Multiplicador de retroceso (predeterminado: 2.0)

Devuelve: WorkflowExecutionResult | AsyncExecutionResult

La lógica de reintento utiliza retroceso exponencial (1s → 2s → 4s → 8s...) con fluctuación de ±25% para evitar el efecto de manada. Si la API proporciona un encabezado retry-after, se utilizará en su lugar.

get_rate_limit_info()

Obtiene la información actual del límite de tasa de la última respuesta de la API.

rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
    print("Limit:", rate_limit_info.limit)
    print("Remaining:", rate_limit_info.remaining)
    print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))

Devuelve: RateLimitInfo | None

get_usage_limits()

Obtiene los límites de uso actuales y la información de cuota para tu cuenta.

limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])

Devuelve: UsageLimits

Estructura de respuesta:

{
    "success": bool,
    "rateLimit": {
        "sync": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "async": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "authType": str  # 'api' or 'manual'
    },
    "usage": {
        "currentPeriodCost": float,
        "limit": float,
        "plan": str  # e.g., 'free', 'pro'
    }
}
set_api_key()

Actualiza la clave API.

client.set_api_key("new-api-key")
set_base_url()

Actualiza la URL base.

client.set_base_url("https://my-custom-domain.com")
close()

Cierra la sesión HTTP subyacente.

client.close()

Clases de datos

WorkflowExecutionResult

@dataclass
class WorkflowExecutionResult:
    success: bool
    output: Optional[Any] = None
    error: Optional[str] = None
    logs: Optional[List[Any]] = None
    metadata: Optional[Dict[str, Any]] = None
    trace_spans: Optional[List[Any]] = None
    total_duration: Optional[float] = None

AsyncExecutionResult

@dataclass
class AsyncExecutionResult:
    success: bool
    task_id: str
    status: str  # 'queued'
    created_at: str
    links: Dict[str, str]  # e.g., {"status": "/api/jobs/{taskId}"}

WorkflowStatus

@dataclass
class WorkflowStatus:
    is_deployed: bool
    deployed_at: Optional[str] = None
    is_published: bool = False
    needs_redeployment: bool = False

RateLimitInfo

@dataclass
class RateLimitInfo:
    limit: int
    remaining: int
    reset: int
    retry_after: Optional[int] = None

UsageLimits

@dataclass
class UsageLimits:
    success: bool
    rate_limit: Dict[str, Any]
    usage: Dict[str, Any]

SimStudioError

class SimStudioError(Exception):
    def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
        super().__init__(message)
        self.code = code
        self.status = status

Códigos de error comunes:

  • UNAUTHORIZED: Clave API inválida
  • TIMEOUT: Tiempo de espera agotado
  • RATE_LIMIT_EXCEEDED: Límite de tasa excedido
  • USAGE_LIMIT_EXCEEDED: Límite de uso excedido
  • EXECUTION_ERROR: Ejecución del flujo de trabajo fallida

Ejemplos

Ejecución básica de flujo de trabajo

Configura el SimStudioClient con tu clave API.

Comprueba si el flujo de trabajo está desplegado y listo para su ejecución.

Ejecuta el flujo de trabajo con tus datos de entrada.

Procesa el resultado de la ejecución y gestiona cualquier error.

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def run_workflow():
    try:
        # Check if workflow is ready
        is_ready = client.validate_workflow("my-workflow-id")
        if not is_ready:
            raise Exception("Workflow is not deployed or ready")

        # Execute the workflow
        result = client.execute_workflow(
            "my-workflow-id",
            input_data={
                "message": "Process this data",
                "user_id": "12345"
            }
        )

        if result.success:
            print("Output:", result.output)
            print("Duration:", result.metadata.get("duration") if result.metadata else None)
        else:
            print("Workflow failed:", result.error)
            
    except Exception as error:
        print("Error:", error)

run_workflow()

Manejo de errores

Maneja diferentes tipos de errores que pueden ocurrir durante la ejecución del flujo de trabajo:

from simstudio import SimStudioClient, SimStudioError
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_error_handling():
    try:
        result = client.execute_workflow("workflow-id")
        return result
    except SimStudioError as error:
        if error.code == "UNAUTHORIZED":
            print("Invalid API key")
        elif error.code == "TIMEOUT":
            print("Workflow execution timed out")
        elif error.code == "USAGE_LIMIT_EXCEEDED":
            print("Usage limit exceeded")
        elif error.code == "INVALID_JSON":
            print("Invalid JSON in request body")
        else:
            print(f"Workflow error: {error}")
        raise
    except Exception as error:
        print(f"Unexpected error: {error}")
        raise

Uso del gestor de contexto

Usa el cliente como un gestor de contexto para manejar automáticamente la limpieza de recursos:

from simstudio import SimStudioClient
import os

# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
    result = client.execute_workflow("workflow-id")
    print("Result:", result)
# Session is automatically closed here

Ejecución de flujos de trabajo por lotes

Ejecuta múltiples flujos de trabajo de manera eficiente:

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))   

def execute_workflows_batch(workflow_data_pairs):
    """Execute multiple workflows with different input data."""
    results = []
    
    for workflow_id, input_data in workflow_data_pairs:
        try:
            # Validate workflow before execution
            if not client.validate_workflow(workflow_id):
                print(f"Skipping {workflow_id}: not deployed")
                continue
                
            result = client.execute_workflow(workflow_id, input_data)
            results.append({
                "workflow_id": workflow_id,
                "success": result.success,
                "output": result.output,
                "error": result.error
            })
            
        except Exception as error:
            results.append({
                "workflow_id": workflow_id,
                "success": False,
                "error": str(error)
            })
    
    return results

# Example usage
workflows = [
    ("workflow-1", {"type": "analysis", "data": "sample1"}),
    ("workflow-2", {"type": "processing", "data": "sample2"}),
]

results = execute_workflows_batch(workflows)
for result in results:
    print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")

Ejecución asíncrona de flujos de trabajo

Ejecuta flujos de trabajo de forma asíncrona para tareas de larga duración:

import os
import time
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_async():
    try:
        # Start async execution
        result = client.execute_workflow(
            "workflow-id",
            input_data={"data": "large dataset"},
            async_execution=True  # Execute asynchronously
        )

        # Check if result is an async execution
        if hasattr(result, 'task_id'):
            print(f"Task ID: {result.task_id}")
            print(f"Status endpoint: {result.links['status']}")

            # Poll for completion
            status = client.get_job_status(result.task_id)

            while status["status"] in ["queued", "processing"]:
                print(f"Current status: {status['status']}")
                time.sleep(2)  # Wait 2 seconds
                status = client.get_job_status(result.task_id)

            if status["status"] == "completed":
                print("Workflow completed!")
                print(f"Output: {status['output']}")
                print(f"Duration: {status['metadata']['duration']}")
            else:
                print(f"Workflow failed: {status['error']}")

    except Exception as error:
        print(f"Error: {error}")

execute_async()

Límite de tasa y reintentos

Maneja los límites de tasa automáticamente con retroceso exponencial:

import os
from simstudio import SimStudioClient, SimStudioError

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_retry_handling():
    try:
        # Automatically retries on rate limit
        result = client.execute_with_retry(
            "workflow-id",
            input_data={"message": "Process this"},
            max_retries=5,
            initial_delay=1.0,
            max_delay=60.0,
            backoff_multiplier=2.0
        )

        print(f"Success: {result}")
    except SimStudioError as error:
        if error.code == "RATE_LIMIT_EXCEEDED":
            print("Rate limit exceeded after all retries")

            # Check rate limit info
            rate_limit_info = client.get_rate_limit_info()
            if rate_limit_info:
                from datetime import datetime
                reset_time = datetime.fromtimestamp(rate_limit_info.reset)
                print(f"Rate limit resets at: {reset_time}")

execute_with_retry_handling()

Monitoreo de uso

Monitorea el uso de tu cuenta y sus límites:

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def check_usage():
    try:
        limits = client.get_usage_limits()

        print("=== Rate Limits ===")
        print("Sync requests:")
        print(f"  Limit: {limits.rate_limit['sync']['limit']}")
        print(f"  Remaining: {limits.rate_limit['sync']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['sync']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['sync']['isLimited']}")

        print("\nAsync requests:")
        print(f"  Limit: {limits.rate_limit['async']['limit']}")
        print(f"  Remaining: {limits.rate_limit['async']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['async']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['async']['isLimited']}")

        print("\n=== Usage ===")
        print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
        print(f"Limit: ${limits.usage['limit']:.2f}")
        print(f"Plan: {limits.usage['plan']}")

        percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
        print(f"Usage: {percent_used:.1f}%")

        if percent_used > 80:
            print("⚠️  Warning: You are approaching your usage limit!")

    except Exception as error:
        print(f"Error checking usage: {error}")

check_usage()

Ejecución de flujo de trabajo en streaming

Ejecuta flujos de trabajo con respuestas en tiempo real:

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))       

def execute_with_streaming():
    """Execute workflow with streaming enabled."""
    try:
        # Enable streaming for specific block outputs
        result = client.execute_workflow(
            "workflow-id",
            input_data={"message": "Count to five"},
            stream=True,
            selected_outputs=["agent1.content"]  # Use blockName.attribute format
        )

        print("Workflow result:", result)
    except Exception as error:
        print("Error:", error)

execute_with_streaming()

La respuesta en streaming sigue el formato de Server-Sent Events (SSE):

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}

data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}

data: [DONE]

Ejemplo de streaming con Flask:

from flask import Flask, Response, stream_with_context
import requests
import json
import os

app = Flask(__name__)

@app.route('/stream-workflow')
def stream_workflow():
    """Stream workflow execution to the client."""

    def generate():
        response = requests.post(
            'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
            headers={
                'Content-Type': 'application/json',
                'X-API-Key': os.getenv('SIM_API_KEY')
            },
            json={
                'message': 'Generate a story',
                'stream': True,
                'selectedOutputs': ['agent1.content']
            },
            stream=True
        )

        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith('data: '):
                    data = decoded_line[6:]  # Remove 'data: ' prefix

                    if data == '[DONE]':
                        break

                    try:
                        parsed = json.loads(data)
                        if 'chunk' in parsed:
                            yield f"data: {json.dumps(parsed)}\n\n"
                        elif parsed.get('event') == 'done':
                            yield f"data: {json.dumps(parsed)}\n\n"
                            print("Execution complete:", parsed.get('metadata'))
                    except json.JSONDecodeError:
                        pass

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

if __name__ == '__main__':
    app.run(debug=True)

Configuración del entorno

Configura el cliente usando variables de entorno:

import os
from simstudio import SimStudioClient

# Development configuration
client = SimStudioClient(
    api_key=os.getenv("SIM_API_KEY")
    base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
import os
from simstudio import SimStudioClient

# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
    raise ValueError("SIM_API_KEY environment variable is required")

client = SimStudioClient(
    api_key=api_key,
    base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)

Obtener tu clave API

Navega a Sim e inicia sesión en tu cuenta.

Navega al flujo de trabajo que quieres ejecutar programáticamente.

Haz clic en "Deploy" para desplegar tu flujo de trabajo si aún no ha sido desplegado.

Durante el proceso de despliegue, selecciona o crea una clave API.

Copia la clave API para usarla en tu aplicación Python.

Requisitos

  • Python 3.8+
  • requests >= 2.25.0

Licencia

Apache-2.0