Python
Le SDK Python officiel pour Sim vous permet d'exécuter des workflows de manière programmatique à partir de vos applications Python en utilisant le SDK Python officiel.
Le SDK Python prend en charge Python 3.8+ avec support d'exécution asynchrone, limitation automatique du débit avec backoff exponentiel, et suivi d'utilisation.
Installation
Installez le SDK en utilisant pip :
pip install simstudio-sdkDémarrage rapide
Voici un exemple simple pour commencer :
from simstudio import SimStudioClient
# Initialize the client
client = SimStudioClient(
api_key="your-api-key-here",
base_url="https://sim.ai" # optional, defaults to https://sim.ai
)
# Execute a workflow
try:
result = client.execute_workflow("workflow-id")
print("Workflow executed successfully:", result)
except Exception as error:
print("Workflow execution failed:", error)Référence de l'API
SimStudioClient
Constructeur
SimStudioClient(api_key: str, base_url: str = "https://sim.ai")Paramètres :
api_key(str) : Votre clé API Simbase_url(str, facultatif) : URL de base pour l'API Sim
Méthodes
execute_workflow()
Exécuter un workflow avec des données d'entrée facultatives.
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Hello, world!"},
timeout=30.0 # 30 seconds
)Paramètres :
workflow_id(str) : L'identifiant du workflow à exécuterinput_data(dict, facultatif) : Données d'entrée à transmettre au workflowtimeout(float, facultatif) : Délai d'expiration en secondes (par défaut : 30.0)stream(bool, facultatif) : Activer les réponses en streaming (par défaut : False)selected_outputs(list[str], facultatif) : Sorties de blocs à diffuser au formatblockName.attribute(par exemple,["agent1.content"])async_execution(bool, facultatif) : Exécuter de manière asynchrone (par défaut : False)
Retourne : WorkflowExecutionResult | AsyncExecutionResult
Lorsque async_execution=True, retourne immédiatement un identifiant de tâche pour l'interrogation. Sinon, attend la fin de l'exécution.
get_workflow_status()
Obtenir le statut d'un workflow (statut de déploiement, etc.).
status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)Paramètres :
workflow_id(str) : L'identifiant du workflow
Retourne : WorkflowStatus
validate_workflow()
Valider qu'un workflow est prêt pour l'exécution.
is_ready = client.validate_workflow("workflow-id")
if is_ready:
# Workflow is deployed and ready
passParamètres :
workflow_id(str) : L'identifiant du workflow
Retourne : bool
get_job_status()
Obtenir le statut d'une exécution de tâche asynchrone.
status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"]) # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
print("Output:", status["output"])Paramètres :
task_id(str) : L'identifiant de tâche retourné par l'exécution asynchrone
Retourne : Dict[str, Any]
Champs de réponse :
success(bool) : Si la requête a réussitaskId(str) : L'identifiant de la tâchestatus(str) : L'un des états suivants :'queued','processing','completed','failed','cancelled'metadata(dict) : ContientstartedAt,completedAt, etdurationoutput(any, facultatif) : La sortie du workflow (une fois terminé)error(any, facultatif) : Détails de l'erreur (en cas d'échec)estimatedDuration(int, facultatif) : Durée estimée en millisecondes (lors du traitement/mise en file d'attente)
execute_with_retry()
Exécuter un workflow avec réessai automatique en cas d'erreurs de limitation de débit, en utilisant un backoff exponentiel.
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Hello"},
timeout=30.0,
max_retries=3, # Maximum number of retries
initial_delay=1.0, # Initial delay in seconds
max_delay=30.0, # Maximum delay in seconds
backoff_multiplier=2.0 # Exponential backoff multiplier
)Paramètres :
workflow_id(str) : L'identifiant du workflow à exécuterinput_data(dict, facultatif) : Données d'entrée à transmettre au workflowtimeout(float, facultatif) : Délai d'expiration en secondesstream(bool, facultatif) : Activer les réponses en streamingselected_outputs(list, facultatif) : Sorties de blocs à diffuserasync_execution(bool, facultatif) : Exécuter de manière asynchronemax_retries(int, facultatif) : Nombre maximum de tentatives (par défaut : 3)initial_delay(float, facultatif) : Délai initial en secondes (par défaut : 1.0)max_delay(float, facultatif) : Délai maximum en secondes (par défaut : 30.0)backoff_multiplier(float, facultatif) : Multiplicateur de backoff (par défaut : 2.0)
Retourne : WorkflowExecutionResult | AsyncExecutionResult
La logique de nouvelle tentative utilise un backoff exponentiel (1s → 2s → 4s → 8s...) avec une variation aléatoire de ±25% pour éviter l'effet de horde. Si l'API fournit un en-tête retry-after, celui-ci sera utilisé à la place.
get_rate_limit_info()
Obtenir les informations actuelles sur les limites de débit à partir de la dernière réponse de l'API.
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
print("Limit:", rate_limit_info.limit)
print("Remaining:", rate_limit_info.remaining)
print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))Retourne : RateLimitInfo | None
get_usage_limits()
Obtenir les limites d'utilisation actuelles et les informations de quota pour votre compte.
limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])Retourne : UsageLimits
Structure de la réponse :
{
"success": bool,
"rateLimit": {
"sync": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"async": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"authType": str # 'api' or 'manual'
},
"usage": {
"currentPeriodCost": float,
"limit": float,
"plan": str # e.g., 'free', 'pro'
}
}set_api_key()
Mettre à jour la clé API.
client.set_api_key("new-api-key")set_base_url()
Mettre à jour l'URL de base.
client.set_base_url("https://my-custom-domain.com")close()
Fermer la session HTTP sous-jacente.
client.close()Classes de données
WorkflowExecutionResult
@dataclass
class WorkflowExecutionResult:
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[List[Any]] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[List[Any]] = None
total_duration: Optional[float] = NoneAsyncExecutionResult
@dataclass
class AsyncExecutionResult:
success: bool
task_id: str
status: str # 'queued'
created_at: str
links: Dict[str, str] # e.g., {"status": "/api/jobs/{taskId}"}WorkflowStatus
@dataclass
class WorkflowStatus:
is_deployed: bool
deployed_at: Optional[str] = None
is_published: bool = False
needs_redeployment: bool = FalseRateLimitInfo
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
retry_after: Optional[int] = NoneUsageLimits
@dataclass
class UsageLimits:
success: bool
rate_limit: Dict[str, Any]
usage: Dict[str, Any]SimStudioError
class SimStudioError(Exception):
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = statusCodes d'erreur courants :
UNAUTHORIZED: Clé API invalideTIMEOUT: Délai d'attente de la requête dépasséRATE_LIMIT_EXCEEDED: Limite de débit dépasséeUSAGE_LIMIT_EXCEEDED: Limite d'utilisation dépasséeEXECUTION_ERROR: Échec de l'exécution du workflow
Exemples
Exécution basique d'un workflow
Configurez le SimStudioClient avec votre clé API.
Vérifiez si le workflow est déployé et prêt pour l'exécution.
Lancez le workflow avec vos données d'entrée.
Traitez le résultat de l'exécution et gérez les éventuelles erreurs.
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def run_workflow():
try:
# Check if workflow is ready
is_ready = client.validate_workflow("my-workflow-id")
if not is_ready:
raise Exception("Workflow is not deployed or ready")
# Execute the workflow
result = client.execute_workflow(
"my-workflow-id",
input_data={
"message": "Process this data",
"user_id": "12345"
}
)
if result.success:
print("Output:", result.output)
print("Duration:", result.metadata.get("duration") if result.metadata else None)
else:
print("Workflow failed:", result.error)
except Exception as error:
print("Error:", error)
run_workflow()Gestion des erreurs
Gérez différents types d'erreurs qui peuvent survenir pendant l'exécution du workflow :
from simstudio import SimStudioClient, SimStudioError
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_error_handling():
try:
result = client.execute_workflow("workflow-id")
return result
except SimStudioError as error:
if error.code == "UNAUTHORIZED":
print("Invalid API key")
elif error.code == "TIMEOUT":
print("Workflow execution timed out")
elif error.code == "USAGE_LIMIT_EXCEEDED":
print("Usage limit exceeded")
elif error.code == "INVALID_JSON":
print("Invalid JSON in request body")
else:
print(f"Workflow error: {error}")
raise
except Exception as error:
print(f"Unexpected error: {error}")
raiseUtilisation du gestionnaire de contexte
Utilisez le client comme gestionnaire de contexte pour gérer automatiquement le nettoyage des ressources :
from simstudio import SimStudioClient
import os
# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
result = client.execute_workflow("workflow-id")
print("Result:", result)
# Session is automatically closed hereExécution de workflows par lots
Exécutez plusieurs workflows efficacement :
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_workflows_batch(workflow_data_pairs):
"""Execute multiple workflows with different input data."""
results = []
for workflow_id, input_data in workflow_data_pairs:
try:
# Validate workflow before execution
if not client.validate_workflow(workflow_id):
print(f"Skipping {workflow_id}: not deployed")
continue
result = client.execute_workflow(workflow_id, input_data)
results.append({
"workflow_id": workflow_id,
"success": result.success,
"output": result.output,
"error": result.error
})
except Exception as error:
results.append({
"workflow_id": workflow_id,
"success": False,
"error": str(error)
})
return results
# Example usage
workflows = [
("workflow-1", {"type": "analysis", "data": "sample1"}),
("workflow-2", {"type": "processing", "data": "sample2"}),
]
results = execute_workflows_batch(workflows)
for result in results:
print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")Exécution asynchrone de workflow
Exécutez des workflows de manière asynchrone pour les tâches de longue durée :
import os
import time
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_async():
try:
# Start async execution
result = client.execute_workflow(
"workflow-id",
input_data={"data": "large dataset"},
async_execution=True # Execute asynchronously
)
# Check if result is an async execution
if hasattr(result, 'task_id'):
print(f"Task ID: {result.task_id}")
print(f"Status endpoint: {result.links['status']}")
# Poll for completion
status = client.get_job_status(result.task_id)
while status["status"] in ["queued", "processing"]:
print(f"Current status: {status['status']}")
time.sleep(2) # Wait 2 seconds
status = client.get_job_status(result.task_id)
if status["status"] == "completed":
print("Workflow completed!")
print(f"Output: {status['output']}")
print(f"Duration: {status['metadata']['duration']}")
else:
print(f"Workflow failed: {status['error']}")
except Exception as error:
print(f"Error: {error}")
execute_async()Limitation de débit et nouvelle tentative
Gérez les limites de débit automatiquement avec un retrait exponentiel :
import os
from simstudio import SimStudioClient, SimStudioError
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_retry_handling():
try:
# Automatically retries on rate limit
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Process this"},
max_retries=5,
initial_delay=1.0,
max_delay=60.0,
backoff_multiplier=2.0
)
print(f"Success: {result}")
except SimStudioError as error:
if error.code == "RATE_LIMIT_EXCEEDED":
print("Rate limit exceeded after all retries")
# Check rate limit info
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
from datetime import datetime
reset_time = datetime.fromtimestamp(rate_limit_info.reset)
print(f"Rate limit resets at: {reset_time}")
execute_with_retry_handling()Surveillance de l'utilisation
Surveillez l'utilisation et les limites de votre compte :
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def check_usage():
try:
limits = client.get_usage_limits()
print("=== Rate Limits ===")
print("Sync requests:")
print(f" Limit: {limits.rate_limit['sync']['limit']}")
print(f" Remaining: {limits.rate_limit['sync']['remaining']}")
print(f" Resets at: {limits.rate_limit['sync']['resetAt']}")
print(f" Is limited: {limits.rate_limit['sync']['isLimited']}")
print("\nAsync requests:")
print(f" Limit: {limits.rate_limit['async']['limit']}")
print(f" Remaining: {limits.rate_limit['async']['remaining']}")
print(f" Resets at: {limits.rate_limit['async']['resetAt']}")
print(f" Is limited: {limits.rate_limit['async']['isLimited']}")
print("\n=== Usage ===")
print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
print(f"Limit: ${limits.usage['limit']:.2f}")
print(f"Plan: {limits.usage['plan']}")
percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
print(f"Usage: {percent_used:.1f}%")
if percent_used > 80:
print("⚠️ Warning: You are approaching your usage limit!")
except Exception as error:
print(f"Error checking usage: {error}")
check_usage()Exécution de workflow en streaming
Exécutez des workflows avec des réponses en streaming en temps réel :
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_streaming():
"""Execute workflow with streaming enabled."""
try:
# Enable streaming for specific block outputs
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Count to five"},
stream=True,
selected_outputs=["agent1.content"] # Use blockName.attribute format
)
print("Workflow result:", result)
except Exception as error:
print("Error:", error)
execute_with_streaming()La réponse en streaming suit le format Server-Sent Events (SSE) :
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}
data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}
data: [DONE]Exemple de streaming avec Flask :
from flask import Flask, Response, stream_with_context
import requests
import json
import os
app = Flask(__name__)
@app.route('/stream-workflow')
def stream_workflow():
"""Stream workflow execution to the client."""
def generate():
response = requests.post(
'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
headers={
'Content-Type': 'application/json',
'X-API-Key': os.getenv('SIM_API_KEY')
},
json={
'message': 'Generate a story',
'stream': True,
'selectedOutputs': ['agent1.content']
},
stream=True
)
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
data = decoded_line[6:] # Remove 'data: ' prefix
if data == '[DONE]':
break
try:
parsed = json.loads(data)
if 'chunk' in parsed:
yield f"data: {json.dumps(parsed)}\n\n"
elif parsed.get('event') == 'done':
yield f"data: {json.dumps(parsed)}\n\n"
print("Execution complete:", parsed.get('metadata'))
except json.JSONDecodeError:
pass
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
if __name__ == '__main__':
app.run(debug=True)Configuration de l'environnement
Configurez le client en utilisant des variables d'environnement :
import os
from simstudio import SimStudioClient
# Development configuration
client = SimStudioClient(
api_key=os.getenv("SIM_API_KEY")
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)import os
from simstudio import SimStudioClient
# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
raise ValueError("SIM_API_KEY environment variable is required")
client = SimStudioClient(
api_key=api_key,
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)Obtention de votre clé API
Accédez à Sim et connectez-vous à votre compte.
Accédez au workflow que vous souhaitez exécuter par programmation.
Cliquez sur "Déployer" pour déployer votre workflow s'il n'a pas encore été déployé.
Pendant le processus de déploiement, sélectionnez ou créez une clé API.
Copiez la clé API pour l'utiliser dans votre application Python.
Prérequis
- Python 3.8+
- requests >= 2.25.0
Licence
Apache-2.0