Sim
SDKs

Python

Le SDK Python officiel pour Sim vous permet d'exécuter des workflows de manière programmatique à partir de vos applications Python en utilisant le SDK Python officiel.

Le SDK Python prend en charge Python 3.8+ avec support d'exécution asynchrone, limitation automatique du débit avec backoff exponentiel, et suivi d'utilisation.

Installation

Installez le SDK en utilisant pip :

pip install simstudio-sdk

Démarrage rapide

Voici un exemple simple pour commencer :

from simstudio import SimStudioClient

# Initialize the client
client = SimStudioClient(
    api_key="your-api-key-here",
    base_url="https://sim.ai"  # optional, defaults to https://sim.ai
)

# Execute a workflow
try:
    result = client.execute_workflow("workflow-id")
    print("Workflow executed successfully:", result)
except Exception as error:
    print("Workflow execution failed:", error)

Référence de l'API

SimStudioClient

Constructeur

SimStudioClient(api_key: str, base_url: str = "https://sim.ai")

Paramètres :

  • api_key (str) : Votre clé API Sim
  • base_url (str, facultatif) : URL de base pour l'API Sim

Méthodes

execute_workflow()

Exécuter un workflow avec des données d'entrée facultatives.

result = client.execute_workflow(
    "workflow-id",
    input_data={"message": "Hello, world!"},
    timeout=30.0  # 30 seconds
)

Paramètres :

  • workflow_id (str) : L'identifiant du workflow à exécuter
  • input_data (dict, facultatif) : Données d'entrée à transmettre au workflow
  • timeout (float, facultatif) : Délai d'expiration en secondes (par défaut : 30.0)
  • stream (bool, facultatif) : Activer les réponses en streaming (par défaut : False)
  • selected_outputs (list[str], facultatif) : Sorties de blocs à diffuser au format blockName.attribute (par exemple, ["agent1.content"])
  • async_execution (bool, facultatif) : Exécuter de manière asynchrone (par défaut : False)

Retourne : WorkflowExecutionResult | AsyncExecutionResult

Lorsque async_execution=True, retourne immédiatement un identifiant de tâche pour l'interrogation. Sinon, attend la fin de l'exécution.

get_workflow_status()

Obtenir le statut d'un workflow (statut de déploiement, etc.).

status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)

Paramètres :

  • workflow_id (str) : L'identifiant du workflow

Retourne : WorkflowStatus

validate_workflow()

Valider qu'un workflow est prêt pour l'exécution.

is_ready = client.validate_workflow("workflow-id")
if is_ready:
    # Workflow is deployed and ready
    pass

Paramètres :

  • workflow_id (str) : L'identifiant du workflow

Retourne : bool

get_job_status()

Obtenir le statut d'une exécution de tâche asynchrone.

status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"])  # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
    print("Output:", status["output"])

Paramètres :

  • task_id (str) : L'identifiant de tâche retourné par l'exécution asynchrone

Retourne : Dict[str, Any]

Champs de réponse :

  • success (bool) : Si la requête a réussi
  • taskId (str) : L'identifiant de la tâche
  • status (str) : L'un des états suivants : 'queued', 'processing', 'completed', 'failed', 'cancelled'
  • metadata (dict) : Contient startedAt, completedAt, et duration
  • output (any, facultatif) : La sortie du workflow (une fois terminé)
  • error (any, facultatif) : Détails de l'erreur (en cas d'échec)
  • estimatedDuration (int, facultatif) : Durée estimée en millisecondes (lors du traitement/mise en file d'attente)
execute_with_retry()

Exécuter un workflow avec réessai automatique en cas d'erreurs de limitation de débit, en utilisant un backoff exponentiel.

result = client.execute_with_retry(
    "workflow-id",
    input_data={"message": "Hello"},
    timeout=30.0,
    max_retries=3,           # Maximum number of retries
    initial_delay=1.0,       # Initial delay in seconds
    max_delay=30.0,          # Maximum delay in seconds
    backoff_multiplier=2.0   # Exponential backoff multiplier
)

Paramètres :

  • workflow_id (str) : L'identifiant du workflow à exécuter
  • input_data (dict, facultatif) : Données d'entrée à transmettre au workflow
  • timeout (float, facultatif) : Délai d'expiration en secondes
  • stream (bool, facultatif) : Activer les réponses en streaming
  • selected_outputs (list, facultatif) : Sorties de blocs à diffuser
  • async_execution (bool, facultatif) : Exécuter de manière asynchrone
  • max_retries (int, facultatif) : Nombre maximum de tentatives (par défaut : 3)
  • initial_delay (float, facultatif) : Délai initial en secondes (par défaut : 1.0)
  • max_delay (float, facultatif) : Délai maximum en secondes (par défaut : 30.0)
  • backoff_multiplier (float, facultatif) : Multiplicateur de backoff (par défaut : 2.0)

Retourne : WorkflowExecutionResult | AsyncExecutionResult

La logique de nouvelle tentative utilise un backoff exponentiel (1s → 2s → 4s → 8s...) avec une variation aléatoire de ±25% pour éviter l'effet de horde. Si l'API fournit un en-tête retry-after, celui-ci sera utilisé à la place.

get_rate_limit_info()

Obtenir les informations actuelles sur les limites de débit à partir de la dernière réponse de l'API.

rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
    print("Limit:", rate_limit_info.limit)
    print("Remaining:", rate_limit_info.remaining)
    print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))

Retourne : RateLimitInfo | None

get_usage_limits()

Obtenir les limites d'utilisation actuelles et les informations de quota pour votre compte.

limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])

Retourne : UsageLimits

Structure de la réponse :

{
    "success": bool,
    "rateLimit": {
        "sync": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "async": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "authType": str  # 'api' or 'manual'
    },
    "usage": {
        "currentPeriodCost": float,
        "limit": float,
        "plan": str  # e.g., 'free', 'pro'
    }
}
set_api_key()

Mettre à jour la clé API.

client.set_api_key("new-api-key")
set_base_url()

Mettre à jour l'URL de base.

client.set_base_url("https://my-custom-domain.com")
close()

Fermer la session HTTP sous-jacente.

client.close()

Classes de données

WorkflowExecutionResult

@dataclass
class WorkflowExecutionResult:
    success: bool
    output: Optional[Any] = None
    error: Optional[str] = None
    logs: Optional[List[Any]] = None
    metadata: Optional[Dict[str, Any]] = None
    trace_spans: Optional[List[Any]] = None
    total_duration: Optional[float] = None

AsyncExecutionResult

@dataclass
class AsyncExecutionResult:
    success: bool
    task_id: str
    status: str  # 'queued'
    created_at: str
    links: Dict[str, str]  # e.g., {"status": "/api/jobs/{taskId}"}

WorkflowStatus

@dataclass
class WorkflowStatus:
    is_deployed: bool
    deployed_at: Optional[str] = None
    is_published: bool = False
    needs_redeployment: bool = False

RateLimitInfo

@dataclass
class RateLimitInfo:
    limit: int
    remaining: int
    reset: int
    retry_after: Optional[int] = None

UsageLimits

@dataclass
class UsageLimits:
    success: bool
    rate_limit: Dict[str, Any]
    usage: Dict[str, Any]

SimStudioError

class SimStudioError(Exception):
    def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
        super().__init__(message)
        self.code = code
        self.status = status

Codes d'erreur courants :

  • UNAUTHORIZED : Clé API invalide
  • TIMEOUT : Délai d'attente de la requête dépassé
  • RATE_LIMIT_EXCEEDED : Limite de débit dépassée
  • USAGE_LIMIT_EXCEEDED : Limite d'utilisation dépassée
  • EXECUTION_ERROR : Échec de l'exécution du workflow

Exemples

Exécution basique d'un workflow

Configurez le SimStudioClient avec votre clé API.

Vérifiez si le workflow est déployé et prêt pour l'exécution.

Lancez le workflow avec vos données d'entrée.

Traitez le résultat de l'exécution et gérez les éventuelles erreurs.

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def run_workflow():
    try:
        # Check if workflow is ready
        is_ready = client.validate_workflow("my-workflow-id")
        if not is_ready:
            raise Exception("Workflow is not deployed or ready")

        # Execute the workflow
        result = client.execute_workflow(
            "my-workflow-id",
            input_data={
                "message": "Process this data",
                "user_id": "12345"
            }
        )

        if result.success:
            print("Output:", result.output)
            print("Duration:", result.metadata.get("duration") if result.metadata else None)
        else:
            print("Workflow failed:", result.error)
            
    except Exception as error:
        print("Error:", error)

run_workflow()

Gestion des erreurs

Gérez différents types d'erreurs qui peuvent survenir pendant l'exécution du workflow :

from simstudio import SimStudioClient, SimStudioError
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))   

def execute_with_error_handling():
    try:
        result = client.execute_workflow("workflow-id")
        return result
    except SimStudioError as error:
        if error.code == "UNAUTHORIZED":
            print("Invalid API key")
        elif error.code == "TIMEOUT":
            print("Workflow execution timed out")
        elif error.code == "USAGE_LIMIT_EXCEEDED":
            print("Usage limit exceeded")
        elif error.code == "INVALID_JSON":
            print("Invalid JSON in request body")
        else:
            print(f"Workflow error: {error}")
        raise
    except Exception as error:
        print(f"Unexpected error: {error}")
        raise

Utilisation du gestionnaire de contexte

Utilisez le client comme gestionnaire de contexte pour gérer automatiquement le nettoyage des ressources :

from simstudio import SimStudioClient
import os

# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
    result = client.execute_workflow("workflow-id")
    print("Result:", result)
# Session is automatically closed here

Exécution de workflows par lots

Exécutez plusieurs workflows efficacement :

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_workflows_batch(workflow_data_pairs):
    """Execute multiple workflows with different input data."""
    results = []
    
    for workflow_id, input_data in workflow_data_pairs:
        try:
            # Validate workflow before execution
            if not client.validate_workflow(workflow_id):
                print(f"Skipping {workflow_id}: not deployed")
                continue
                
            result = client.execute_workflow(workflow_id, input_data)
            results.append({
                "workflow_id": workflow_id,
                "success": result.success,
                "output": result.output,
                "error": result.error
            })
            
        except Exception as error:
            results.append({
                "workflow_id": workflow_id,
                "success": False,
                "error": str(error)
            })
    
    return results

# Example usage
workflows = [
    ("workflow-1", {"type": "analysis", "data": "sample1"}),
    ("workflow-2", {"type": "processing", "data": "sample2"}),
]

results = execute_workflows_batch(workflows)
for result in results:
    print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")

Exécution asynchrone de workflow

Exécutez des workflows de manière asynchrone pour les tâches de longue durée :

import os
import time
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_async():
    try:
        # Start async execution
        result = client.execute_workflow(
            "workflow-id",
            input_data={"data": "large dataset"},
            async_execution=True  # Execute asynchronously
        )

        # Check if result is an async execution
        if hasattr(result, 'task_id'):
            print(f"Task ID: {result.task_id}")
            print(f"Status endpoint: {result.links['status']}")

            # Poll for completion
            status = client.get_job_status(result.task_id)

            while status["status"] in ["queued", "processing"]:
                print(f"Current status: {status['status']}")
                time.sleep(2)  # Wait 2 seconds
                status = client.get_job_status(result.task_id)

            if status["status"] == "completed":
                print("Workflow completed!")
                print(f"Output: {status['output']}")
                print(f"Duration: {status['metadata']['duration']}")
            else:
                print(f"Workflow failed: {status['error']}")

    except Exception as error:
        print(f"Error: {error}")

execute_async()

Limitation de débit et nouvelle tentative

Gérez les limites de débit automatiquement avec un retrait exponentiel :

import os
from simstudio import SimStudioClient, SimStudioError

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_retry_handling():
    try:
        # Automatically retries on rate limit
        result = client.execute_with_retry(
            "workflow-id",
            input_data={"message": "Process this"},
            max_retries=5,
            initial_delay=1.0,
            max_delay=60.0,
            backoff_multiplier=2.0
        )

        print(f"Success: {result}")
    except SimStudioError as error:
        if error.code == "RATE_LIMIT_EXCEEDED":
            print("Rate limit exceeded after all retries")

            # Check rate limit info
            rate_limit_info = client.get_rate_limit_info()
            if rate_limit_info:
                from datetime import datetime
                reset_time = datetime.fromtimestamp(rate_limit_info.reset)
                print(f"Rate limit resets at: {reset_time}")

execute_with_retry_handling()

Surveillance de l'utilisation

Surveillez l'utilisation et les limites de votre compte :

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def check_usage():
    try:
        limits = client.get_usage_limits()

        print("=== Rate Limits ===")
        print("Sync requests:")
        print(f"  Limit: {limits.rate_limit['sync']['limit']}")
        print(f"  Remaining: {limits.rate_limit['sync']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['sync']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['sync']['isLimited']}")

        print("\nAsync requests:")
        print(f"  Limit: {limits.rate_limit['async']['limit']}")
        print(f"  Remaining: {limits.rate_limit['async']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['async']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['async']['isLimited']}")

        print("\n=== Usage ===")
        print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
        print(f"Limit: ${limits.usage['limit']:.2f}")
        print(f"Plan: {limits.usage['plan']}")

        percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
        print(f"Usage: {percent_used:.1f}%")

        if percent_used > 80:
            print("⚠️  Warning: You are approaching your usage limit!")

    except Exception as error:
        print(f"Error checking usage: {error}")

check_usage()

Exécution de workflow en streaming

Exécutez des workflows avec des réponses en streaming en temps réel :

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))   

def execute_with_streaming():
    """Execute workflow with streaming enabled."""
    try:
        # Enable streaming for specific block outputs
        result = client.execute_workflow(
            "workflow-id",
            input_data={"message": "Count to five"},
            stream=True,
            selected_outputs=["agent1.content"]  # Use blockName.attribute format
        )

        print("Workflow result:", result)
    except Exception as error:
        print("Error:", error)

execute_with_streaming()

La réponse en streaming suit le format Server-Sent Events (SSE) :

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}

data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}

data: [DONE]

Exemple de streaming avec Flask :

from flask import Flask, Response, stream_with_context
import requests
import json
import os

app = Flask(__name__)

@app.route('/stream-workflow')
def stream_workflow():
    """Stream workflow execution to the client."""

    def generate():
        response = requests.post(
            'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
            headers={
                'Content-Type': 'application/json',
                'X-API-Key': os.getenv('SIM_API_KEY')
            },
            json={
                'message': 'Generate a story',
                'stream': True,
                'selectedOutputs': ['agent1.content']
            },
            stream=True
        )

        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith('data: '):
                    data = decoded_line[6:]  # Remove 'data: ' prefix

                    if data == '[DONE]':
                        break

                    try:
                        parsed = json.loads(data)
                        if 'chunk' in parsed:
                            yield f"data: {json.dumps(parsed)}\n\n"
                        elif parsed.get('event') == 'done':
                            yield f"data: {json.dumps(parsed)}\n\n"
                            print("Execution complete:", parsed.get('metadata'))
                    except json.JSONDecodeError:
                        pass

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

if __name__ == '__main__':
    app.run(debug=True)

Configuration de l'environnement

Configurez le client en utilisant des variables d'environnement :

import os
from simstudio import SimStudioClient

# Development configuration
client = SimStudioClient(
    api_key=os.getenv("SIM_API_KEY")
    base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
import os
from simstudio import SimStudioClient

# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
    raise ValueError("SIM_API_KEY environment variable is required")

client = SimStudioClient(
    api_key=api_key,
    base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)

Obtention de votre clé API

Accédez à Sim et connectez-vous à votre compte.

Accédez au workflow que vous souhaitez exécuter par programmation.

Cliquez sur "Déployer" pour déployer votre workflow s'il n'a pas encore été déployé.

Pendant le processus de déploiement, sélectionnez ou créez une clé API.

Copiez la clé API pour l'utiliser dans votre application Python.

Prérequis

  • Python 3.8+
  • requests >= 2.25.0

Licence

Apache-2.0