Python
El SDK oficial de Python para Sim te permite ejecutar flujos de trabajo programáticamente desde tus aplicaciones Python utilizando el SDK oficial de Python.
El SDK de Python es compatible con Python 3.8+ con soporte para ejecución asíncrona, limitación automática de velocidad con retroceso exponencial y seguimiento de uso.
Instalación
Instala el SDK usando pip:
pip install simstudio-sdkInicio rápido
Aquí tienes un ejemplo sencillo para empezar:
from simstudio import SimStudioClient
# Initialize the client
client = SimStudioClient(
api_key="your-api-key-here",
base_url="https://sim.ai" # optional, defaults to https://sim.ai
)
# Execute a workflow
try:
result = client.execute_workflow("workflow-id")
print("Workflow executed successfully:", result)
except Exception as error:
print("Workflow execution failed:", error)Referencia de la API
SimStudioClient
Constructor
SimStudioClient(api_key: str, base_url: str = "https://sim.ai")Parámetros:
api_key(str): Tu clave API de Simbase_url(str, opcional): URL base para la API de Sim
Métodos
execute_workflow()
Ejecuta un flujo de trabajo con datos de entrada opcionales.
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Hello, world!"},
timeout=30.0 # 30 seconds
)Parámetros:
workflow_id(str): El ID del flujo de trabajo a ejecutarinput_data(dict, opcional): Datos de entrada para pasar al flujo de trabajotimeout(float, opcional): Tiempo de espera en segundos (predeterminado: 30.0)stream(bool, opcional): Habilitar respuestas en streaming (predeterminado: False)selected_outputs(list[str], opcional): Salidas de bloque para transmitir en formatoblockName.attribute(p. ej.,["agent1.content"])async_execution(bool, opcional): Ejecutar de forma asíncrona (predeterminado: False)
Devuelve: WorkflowExecutionResult | AsyncExecutionResult
Cuando async_execution=True, devuelve inmediatamente un ID de tarea para sondeo. De lo contrario, espera a que se complete.
get_workflow_status()
Obtener el estado de un flujo de trabajo (estado de implementación, etc.).
status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)Parámetros:
workflow_id(str): El ID del flujo de trabajo
Devuelve: WorkflowStatus
validate_workflow()
Validar que un flujo de trabajo está listo para su ejecución.
is_ready = client.validate_workflow("workflow-id")
if is_ready:
# Workflow is deployed and ready
passParámetros:
workflow_id(str): El ID del flujo de trabajo
Devuelve: bool
get_job_status()
Obtener el estado de una ejecución de trabajo asíncrono.
status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"]) # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
print("Output:", status["output"])Parámetros:
task_id(str): El ID de tarea devuelto de la ejecución asíncrona
Devuelve: Dict[str, Any]
Campos de respuesta:
success(bool): Si la solicitud fue exitosataskId(str): El ID de la tareastatus(str): Uno de'queued','processing','completed','failed','cancelled'metadata(dict): ContienestartedAt,completedAt, ydurationoutput(any, opcional): La salida del flujo de trabajo (cuando se completa)error(any, opcional): Detalles del error (cuando falla)estimatedDuration(int, opcional): Duración estimada en milisegundos (cuando está procesando/en cola)
execute_with_retry()
Ejecutar un flujo de trabajo con reintento automático en errores de límite de velocidad usando retroceso exponencial.
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Hello"},
timeout=30.0,
max_retries=3, # Maximum number of retries
initial_delay=1.0, # Initial delay in seconds
max_delay=30.0, # Maximum delay in seconds
backoff_multiplier=2.0 # Exponential backoff multiplier
)Parámetros:
workflow_id(str): El ID del flujo de trabajo a ejecutarinput_data(dict, opcional): Datos de entrada para pasar al flujo de trabajotimeout(float, opcional): Tiempo de espera en segundosstream(bool, opcional): Habilitar respuestas en streamingselected_outputs(list, opcional): Salidas de bloque para transmitirasync_execution(bool, opcional): Ejecutar de forma asíncronamax_retries(int, opcional): Número máximo de reintentos (predeterminado: 3)initial_delay(float, opcional): Retraso inicial en segundos (predeterminado: 1.0)max_delay(float, opcional): Retraso máximo en segundos (predeterminado: 30.0)backoff_multiplier(float, opcional): Multiplicador de retroceso (predeterminado: 2.0)
Devuelve: WorkflowExecutionResult | AsyncExecutionResult
La lógica de reintento utiliza retroceso exponencial (1s → 2s → 4s → 8s...) con fluctuación de ±25% para evitar el efecto de manada. Si la API proporciona un encabezado retry-after, se utilizará en su lugar.
get_rate_limit_info()
Obtiene la información actual del límite de tasa de la última respuesta de la API.
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
print("Limit:", rate_limit_info.limit)
print("Remaining:", rate_limit_info.remaining)
print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))Devuelve: RateLimitInfo | None
get_usage_limits()
Obtiene los límites de uso actuales y la información de cuota para tu cuenta.
limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])Devuelve: UsageLimits
Estructura de respuesta:
{
"success": bool,
"rateLimit": {
"sync": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"async": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"authType": str # 'api' or 'manual'
},
"usage": {
"currentPeriodCost": float,
"limit": float,
"plan": str # e.g., 'free', 'pro'
}
}set_api_key()
Actualiza la clave API.
client.set_api_key("new-api-key")set_base_url()
Actualiza la URL base.
client.set_base_url("https://my-custom-domain.com")close()
Cierra la sesión HTTP subyacente.
client.close()Clases de datos
WorkflowExecutionResult
@dataclass
class WorkflowExecutionResult:
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[List[Any]] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[List[Any]] = None
total_duration: Optional[float] = NoneAsyncExecutionResult
@dataclass
class AsyncExecutionResult:
success: bool
task_id: str
status: str # 'queued'
created_at: str
links: Dict[str, str] # e.g., {"status": "/api/jobs/{taskId}"}WorkflowStatus
@dataclass
class WorkflowStatus:
is_deployed: bool
deployed_at: Optional[str] = None
is_published: bool = False
needs_redeployment: bool = FalseRateLimitInfo
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
retry_after: Optional[int] = NoneUsageLimits
@dataclass
class UsageLimits:
success: bool
rate_limit: Dict[str, Any]
usage: Dict[str, Any]SimStudioError
class SimStudioError(Exception):
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = statusCódigos de error comunes:
UNAUTHORIZED: Clave API inválidaTIMEOUT: Tiempo de espera agotadoRATE_LIMIT_EXCEEDED: Límite de tasa excedidoUSAGE_LIMIT_EXCEEDED: Límite de uso excedidoEXECUTION_ERROR: Ejecución del flujo de trabajo fallida
Ejemplos
Ejecución básica de flujo de trabajo
Configura el SimStudioClient con tu clave API.
Comprueba si el flujo de trabajo está desplegado y listo para su ejecución.
Ejecuta el flujo de trabajo con tus datos de entrada.
Procesa el resultado de la ejecución y gestiona cualquier error.
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def run_workflow():
try:
# Check if workflow is ready
is_ready = client.validate_workflow("my-workflow-id")
if not is_ready:
raise Exception("Workflow is not deployed or ready")
# Execute the workflow
result = client.execute_workflow(
"my-workflow-id",
input_data={
"message": "Process this data",
"user_id": "12345"
}
)
if result.success:
print("Output:", result.output)
print("Duration:", result.metadata.get("duration") if result.metadata else None)
else:
print("Workflow failed:", result.error)
except Exception as error:
print("Error:", error)
run_workflow()Manejo de errores
Maneja diferentes tipos de errores que pueden ocurrir durante la ejecución del flujo de trabajo:
from simstudio import SimStudioClient, SimStudioError
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_error_handling():
try:
result = client.execute_workflow("workflow-id")
return result
except SimStudioError as error:
if error.code == "UNAUTHORIZED":
print("Invalid API key")
elif error.code == "TIMEOUT":
print("Workflow execution timed out")
elif error.code == "USAGE_LIMIT_EXCEEDED":
print("Usage limit exceeded")
elif error.code == "INVALID_JSON":
print("Invalid JSON in request body")
else:
print(f"Workflow error: {error}")
raise
except Exception as error:
print(f"Unexpected error: {error}")
raiseUso del gestor de contexto
Usa el cliente como un gestor de contexto para manejar automáticamente la limpieza de recursos:
from simstudio import SimStudioClient
import os
# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
result = client.execute_workflow("workflow-id")
print("Result:", result)
# Session is automatically closed hereEjecución de flujos de trabajo por lotes
Ejecuta múltiples flujos de trabajo de manera eficiente:
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_workflows_batch(workflow_data_pairs):
"""Execute multiple workflows with different input data."""
results = []
for workflow_id, input_data in workflow_data_pairs:
try:
# Validate workflow before execution
if not client.validate_workflow(workflow_id):
print(f"Skipping {workflow_id}: not deployed")
continue
result = client.execute_workflow(workflow_id, input_data)
results.append({
"workflow_id": workflow_id,
"success": result.success,
"output": result.output,
"error": result.error
})
except Exception as error:
results.append({
"workflow_id": workflow_id,
"success": False,
"error": str(error)
})
return results
# Example usage
workflows = [
("workflow-1", {"type": "analysis", "data": "sample1"}),
("workflow-2", {"type": "processing", "data": "sample2"}),
]
results = execute_workflows_batch(workflows)
for result in results:
print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")Ejecución asíncrona de flujos de trabajo
Ejecuta flujos de trabajo de forma asíncrona para tareas de larga duración:
import os
import time
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_async():
try:
# Start async execution
result = client.execute_workflow(
"workflow-id",
input_data={"data": "large dataset"},
async_execution=True # Execute asynchronously
)
# Check if result is an async execution
if hasattr(result, 'task_id'):
print(f"Task ID: {result.task_id}")
print(f"Status endpoint: {result.links['status']}")
# Poll for completion
status = client.get_job_status(result.task_id)
while status["status"] in ["queued", "processing"]:
print(f"Current status: {status['status']}")
time.sleep(2) # Wait 2 seconds
status = client.get_job_status(result.task_id)
if status["status"] == "completed":
print("Workflow completed!")
print(f"Output: {status['output']}")
print(f"Duration: {status['metadata']['duration']}")
else:
print(f"Workflow failed: {status['error']}")
except Exception as error:
print(f"Error: {error}")
execute_async()Límite de tasa y reintentos
Maneja los límites de tasa automáticamente con retroceso exponencial:
import os
from simstudio import SimStudioClient, SimStudioError
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_retry_handling():
try:
# Automatically retries on rate limit
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Process this"},
max_retries=5,
initial_delay=1.0,
max_delay=60.0,
backoff_multiplier=2.0
)
print(f"Success: {result}")
except SimStudioError as error:
if error.code == "RATE_LIMIT_EXCEEDED":
print("Rate limit exceeded after all retries")
# Check rate limit info
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
from datetime import datetime
reset_time = datetime.fromtimestamp(rate_limit_info.reset)
print(f"Rate limit resets at: {reset_time}")
execute_with_retry_handling()Monitoreo de uso
Monitorea el uso de tu cuenta y sus límites:
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def check_usage():
try:
limits = client.get_usage_limits()
print("=== Rate Limits ===")
print("Sync requests:")
print(f" Limit: {limits.rate_limit['sync']['limit']}")
print(f" Remaining: {limits.rate_limit['sync']['remaining']}")
print(f" Resets at: {limits.rate_limit['sync']['resetAt']}")
print(f" Is limited: {limits.rate_limit['sync']['isLimited']}")
print("\nAsync requests:")
print(f" Limit: {limits.rate_limit['async']['limit']}")
print(f" Remaining: {limits.rate_limit['async']['remaining']}")
print(f" Resets at: {limits.rate_limit['async']['resetAt']}")
print(f" Is limited: {limits.rate_limit['async']['isLimited']}")
print("\n=== Usage ===")
print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
print(f"Limit: ${limits.usage['limit']:.2f}")
print(f"Plan: {limits.usage['plan']}")
percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
print(f"Usage: {percent_used:.1f}%")
if percent_used > 80:
print("⚠️ Warning: You are approaching your usage limit!")
except Exception as error:
print(f"Error checking usage: {error}")
check_usage()Ejecución de flujo de trabajo en streaming
Ejecuta flujos de trabajo con respuestas en tiempo real:
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_streaming():
"""Execute workflow with streaming enabled."""
try:
# Enable streaming for specific block outputs
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Count to five"},
stream=True,
selected_outputs=["agent1.content"] # Use blockName.attribute format
)
print("Workflow result:", result)
except Exception as error:
print("Error:", error)
execute_with_streaming()La respuesta en streaming sigue el formato de Server-Sent Events (SSE):
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}
data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}
data: [DONE]Ejemplo de streaming con Flask:
from flask import Flask, Response, stream_with_context
import requests
import json
import os
app = Flask(__name__)
@app.route('/stream-workflow')
def stream_workflow():
"""Stream workflow execution to the client."""
def generate():
response = requests.post(
'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
headers={
'Content-Type': 'application/json',
'X-API-Key': os.getenv('SIM_API_KEY')
},
json={
'message': 'Generate a story',
'stream': True,
'selectedOutputs': ['agent1.content']
},
stream=True
)
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
data = decoded_line[6:] # Remove 'data: ' prefix
if data == '[DONE]':
break
try:
parsed = json.loads(data)
if 'chunk' in parsed:
yield f"data: {json.dumps(parsed)}\n\n"
elif parsed.get('event') == 'done':
yield f"data: {json.dumps(parsed)}\n\n"
print("Execution complete:", parsed.get('metadata'))
except json.JSONDecodeError:
pass
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
if __name__ == '__main__':
app.run(debug=True)Configuración del entorno
Configura el cliente usando variables de entorno:
import os
from simstudio import SimStudioClient
# Development configuration
client = SimStudioClient(
api_key=os.getenv("SIM_API_KEY")
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)import os
from simstudio import SimStudioClient
# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
raise ValueError("SIM_API_KEY environment variable is required")
client = SimStudioClient(
api_key=api_key,
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)Obtener tu clave API
Navega a Sim e inicia sesión en tu cuenta.
Navega al flujo de trabajo que quieres ejecutar programáticamente.
Haz clic en "Deploy" para desplegar tu flujo de trabajo si aún no ha sido desplegado.
Durante el proceso de despliegue, selecciona o crea una clave API.
Copia la clave API para usarla en tu aplicación Python.
Requisitos
- Python 3.8+
- requests >= 2.25.0
Licencia
Apache-2.0