Sim

Python

Simの公式Python SDKを使用すると、公式Python SDKを使用してPythonアプリケーションからプログラムでワークフローを実行できます。

Python SDKはPython 3.8以上をサポートし、非同期実行、指数バックオフによる自動レート制限、使用状況追跡機能を提供します。

インストール

pipを使用してSDKをインストールします:

pip install simstudio-sdk

クイックスタート

以下は、始めるための簡単な例です:

from simstudio import SimStudioClient

# Initialize the client
client = SimStudioClient(
    api_key="your-api-key-here",
    base_url="https://sim.ai"  # optional, defaults to https://sim.ai
)

# Execute a workflow
try:
    result = client.execute_workflow("workflow-id")
    print("Workflow executed successfully:", result)
except Exception as error:
    print("Workflow execution failed:", error)

APIリファレンス

SimStudioClient

コンストラクタ

SimStudioClient(api_key: str, base_url: str = "https://sim.ai")

パラメータ:

  • api_key (str): SimのAPIキー
  • base_url (str, オプション): Sim APIのベースURL

メソッド

execute_workflow()

オプションの入力データでワークフローを実行します。

result = client.execute_workflow(
    "workflow-id",
    input_data={"message": "Hello, world!"},
    timeout=30.0  # 30 seconds
)

パラメータ:

  • workflow_id (str): 実行するワークフローのID
  • input_data (dict, オプション): ワークフローに渡す入力データ
  • timeout (float, オプション): タイムアウト(秒)(デフォルト: 30.0)
  • stream (bool, オプション): ストリーミングレスポンスを有効にする(デフォルト: False)
  • selected_outputs (list[str], オプション): blockName.attribute形式でストリーミングするブロック出力(例: ["agent1.content"]
  • async_execution (bool, オプション): 非同期実行(デフォルト: False)

戻り値: WorkflowExecutionResult | AsyncExecutionResult

async_execution=Trueの場合、ポーリング用のタスクIDをすぐに返します。それ以外の場合は、完了を待ちます。

get_workflow_status()

ワークフローのステータス(デプロイメントステータスなど)を取得します。

status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)

パラメータ:

  • workflow_id (str): ワークフローのID

戻り値: WorkflowStatus

validate_workflow()

ワークフローが実行準備ができているかを検証します。

is_ready = client.validate_workflow("workflow-id")
if is_ready:
    # Workflow is deployed and ready
    pass

パラメータ:

  • workflow_id (str): ワークフローのID

戻り値: bool

get_job_status()

非同期ジョブ実行のステータスを取得します。

status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"])  # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
    print("Output:", status["output"])

パラメータ:

  • task_id (str): 非同期実行から返されたタスクID

戻り値: Dict[str, Any]

レスポンスフィールド:

  • success (bool): リクエストが成功したかどうか
  • taskId (str): タスクID
  • status (str): 次のいずれか: 'queued', 'processing', 'completed', 'failed', 'cancelled'
  • metadata (dict): startedAt, completedAt, durationを含む
  • output (any, オプション): ワークフロー出力(完了時)
  • error (any, オプション): エラー詳細(失敗時)
  • estimatedDuration (int, オプション): 推定所要時間(ミリ秒)(処理中/キュー時)
execute_with_retry()

指数バックオフを使用してレート制限エラーで自動的に再試行するワークフロー実行。

result = client.execute_with_retry(
    "workflow-id",
    input_data={"message": "Hello"},
    timeout=30.0,
    max_retries=3,           # Maximum number of retries
    initial_delay=1.0,       # Initial delay in seconds
    max_delay=30.0,          # Maximum delay in seconds
    backoff_multiplier=2.0   # Exponential backoff multiplier
)

パラメータ:

  • workflow_id (str): 実行するワークフローのID
  • input_data (dict, オプション): ワークフローに渡す入力データ
  • timeout (float, オプション): タイムアウト(秒)
  • stream (bool, オプション): ストリーミングレスポンスを有効にする
  • selected_outputs (list, オプション): ストリーミングするブロック出力
  • async_execution (bool, オプション): 非同期実行
  • max_retries (int, オプション): 最大再試行回数(デフォルト: 3)
  • initial_delay (float, オプション): 初期遅延(秒)(デフォルト: 1.0)
  • max_delay (float, オプション): 最大遅延(秒)(デフォルト: 30.0)
  • backoff_multiplier (float, オプション): バックオフ乗数(デフォルト: 2.0)

戻り値: WorkflowExecutionResult | AsyncExecutionResult

リトライロジックは、サンダリングハード問題を防ぐために±25%のジッターを伴う指数バックオフ(1秒→2秒→4秒→8秒...)を使用します。APIが retry-after ヘッダーを提供する場合、代わりにそれが使用されます。

get_rate_limit_info()

最後のAPIレスポンスから現在のレート制限情報を取得します。

rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
    print("Limit:", rate_limit_info.limit)
    print("Remaining:", rate_limit_info.remaining)
    print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))

戻り値: RateLimitInfo | None

get_usage_limits()

アカウントの現在の使用制限とクォータ情報を取得します。

limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])

戻り値: UsageLimits

レスポンス構造:

{
    "success": bool,
    "rateLimit": {
        "sync": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "async": {
            "isLimited": bool,
            "limit": int,
            "remaining": int,
            "resetAt": str
        },
        "authType": str  # 'api' or 'manual'
    },
    "usage": {
        "currentPeriodCost": float,
        "limit": float,
        "plan": str  # e.g., 'free', 'pro'
    }
}
set_api_key()

APIキーを更新します。

client.set_api_key("new-api-key")
set_base_url()

ベースURLを更新します。

client.set_base_url("https://my-custom-domain.com")
close()

基盤となるHTTPセッションを閉じます。

client.close()

データクラス

WorkflowExecutionResult

@dataclass
class WorkflowExecutionResult:
    success: bool
    output: Optional[Any] = None
    error: Optional[str] = None
    logs: Optional[List[Any]] = None
    metadata: Optional[Dict[str, Any]] = None
    trace_spans: Optional[List[Any]] = None
    total_duration: Optional[float] = None

AsyncExecutionResult

@dataclass
class AsyncExecutionResult:
    success: bool
    task_id: str
    status: str  # 'queued'
    created_at: str
    links: Dict[str, str]  # e.g., {"status": "/api/jobs/{taskId}"}

WorkflowStatus

@dataclass
class WorkflowStatus:
    is_deployed: bool
    deployed_at: Optional[str] = None
    needs_redeployment: bool = False

RateLimitInfo

@dataclass
class RateLimitInfo:
    limit: int
    remaining: int
    reset: int
    retry_after: Optional[int] = None

UsageLimits

@dataclass
class UsageLimits:
    success: bool
    rate_limit: Dict[str, Any]
    usage: Dict[str, Any]

SimStudioError

class SimStudioError(Exception):
    def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
        super().__init__(message)
        self.code = code
        self.status = status

一般的なエラーコード:

  • UNAUTHORIZED: 無効なAPIキー
  • TIMEOUT: リクエストがタイムアウトしました
  • RATE_LIMIT_EXCEEDED: レート制限を超えました
  • USAGE_LIMIT_EXCEEDED: 使用制限を超えました
  • EXECUTION_ERROR: ワークフローの実行に失敗しました

基本的なワークフロー実行

APIキーを使用してSimStudioClientをセットアップします。

ワークフローがデプロイされ、実行準備ができているか確認します。

入力データでワークフローを実行します。

実行結果を処理し、エラーがあれば対処します。

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def run_workflow():
    try:
        # Check if workflow is ready
        is_ready = client.validate_workflow("my-workflow-id")
        if not is_ready:
            raise Exception("Workflow is not deployed or ready")

        # Execute the workflow
        result = client.execute_workflow(
            "my-workflow-id",
            input_data={
                "message": "Process this data",
                "user_id": "12345"
            }
        )

        if result.success:
            print("Output:", result.output)
            print("Duration:", result.metadata.get("duration") if result.metadata else None)
        else:
            print("Workflow failed:", result.error)
            
    except Exception as error:
        print("Error:", error)

run_workflow()

エラー処理

ワークフロー実行中に発生する可能性のある様々なタイプのエラーを処理します:

from simstudio import SimStudioClient, SimStudioError
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_error_handling():
    try:
        result = client.execute_workflow("workflow-id")
        return result
    except SimStudioError as error:
        if error.code == "UNAUTHORIZED":
            print("Invalid API key")
        elif error.code == "TIMEOUT":
            print("Workflow execution timed out")
        elif error.code == "USAGE_LIMIT_EXCEEDED":
            print("Usage limit exceeded")
        elif error.code == "INVALID_JSON":
            print("Invalid JSON in request body")
        else:
            print(f"Workflow error: {error}")
        raise
    except Exception as error:
        print(f"Unexpected error: {error}")
        raise

コンテキストマネージャーの使用

リソースのクリーンアップを自動的に処理するためにクライアントをコンテキストマネージャーとして使用します:

from simstudio import SimStudioClient
import os

# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
    result = client.execute_workflow("workflow-id")
    print("Result:", result)
# Session is automatically closed here

バッチワークフロー実行

複数のワークフローを効率的に実行します:

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_workflows_batch(workflow_data_pairs):
    """Execute multiple workflows with different input data."""
    results = []
    
    for workflow_id, input_data in workflow_data_pairs:
        try:
            # Validate workflow before execution
            if not client.validate_workflow(workflow_id):
                print(f"Skipping {workflow_id}: not deployed")
                continue
                
            result = client.execute_workflow(workflow_id, input_data)
            results.append({
                "workflow_id": workflow_id,
                "success": result.success,
                "output": result.output,
                "error": result.error
            })
            
        except Exception as error:
            results.append({
                "workflow_id": workflow_id,
                "success": False,
                "error": str(error)
            })
    
    return results

# Example usage
workflows = [
    ("workflow-1", {"type": "analysis", "data": "sample1"}),
    ("workflow-2", {"type": "processing", "data": "sample2"}),
]

results = execute_workflows_batch(workflows)
for result in results:
    print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")

非同期ワークフロー実行

長時間実行されるタスクのためにワークフローを非同期で実行します:

import os
import time
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_async():
    try:
        # Start async execution
        result = client.execute_workflow(
            "workflow-id",
            input_data={"data": "large dataset"},
            async_execution=True  # Execute asynchronously
        )

        # Check if result is an async execution
        if hasattr(result, 'task_id'):
            print(f"Task ID: {result.task_id}")
            print(f"Status endpoint: {result.links['status']}")

            # Poll for completion
            status = client.get_job_status(result.task_id)

            while status["status"] in ["queued", "processing"]:
                print(f"Current status: {status['status']}")
                time.sleep(2)  # Wait 2 seconds
                status = client.get_job_status(result.task_id)

            if status["status"] == "completed":
                print("Workflow completed!")
                print(f"Output: {status['output']}")
                print(f"Duration: {status['metadata']['duration']}")
            else:
                print(f"Workflow failed: {status['error']}")

    except Exception as error:
        print(f"Error: {error}")

execute_async()

レート制限とリトライ

指数バックオフを使用して自動的にレート制限を処理します:

import os
from simstudio import SimStudioClient, SimStudioError

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_retry_handling():
    try:
        # Automatically retries on rate limit
        result = client.execute_with_retry(
            "workflow-id",
            input_data={"message": "Process this"},
            max_retries=5,
            initial_delay=1.0,
            max_delay=60.0,
            backoff_multiplier=2.0
        )

        print(f"Success: {result}")
    except SimStudioError as error:
        if error.code == "RATE_LIMIT_EXCEEDED":
            print("Rate limit exceeded after all retries")

            # Check rate limit info
            rate_limit_info = client.get_rate_limit_info()
            if rate_limit_info:
                from datetime import datetime
                reset_time = datetime.fromtimestamp(rate_limit_info.reset)
                print(f"Rate limit resets at: {reset_time}")

execute_with_retry_handling()

使用状況モニタリング

アカウントの使用状況と制限をモニタリングします:

import os
from simstudio import SimStudioClient

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def check_usage():
    try:
        limits = client.get_usage_limits()

        print("=== Rate Limits ===")
        print("Sync requests:")
        print(f"  Limit: {limits.rate_limit['sync']['limit']}")
        print(f"  Remaining: {limits.rate_limit['sync']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['sync']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['sync']['isLimited']}")

        print("\nAsync requests:")
        print(f"  Limit: {limits.rate_limit['async']['limit']}")
        print(f"  Remaining: {limits.rate_limit['async']['remaining']}")
        print(f"  Resets at: {limits.rate_limit['async']['resetAt']}")
        print(f"  Is limited: {limits.rate_limit['async']['isLimited']}")

        print("\n=== Usage ===")
        print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
        print(f"Limit: ${limits.usage['limit']:.2f}")
        print(f"Plan: {limits.usage['plan']}")

        percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
        print(f"Usage: {percent_used:.1f}%")

        if percent_used > 80:
            print("⚠️  Warning: You are approaching your usage limit!")

    except Exception as error:
        print(f"Error checking usage: {error}")

check_usage()

ワークフローの実行ストリーミング

リアルタイムのストリーミングレスポンスでワークフローを実行します:

from simstudio import SimStudioClient
import os

client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))

def execute_with_streaming():
    """Execute workflow with streaming enabled."""
    try:
        # Enable streaming for specific block outputs
        result = client.execute_workflow(
            "workflow-id",
            input_data={"message": "Count to five"},
            stream=True,
            selected_outputs=["agent1.content"]  # Use blockName.attribute format
        )

        print("Workflow result:", result)
    except Exception as error:
        print("Error:", error)

execute_with_streaming()

ストリーミングレスポンスはServer-Sent Events(SSE)形式に従います:

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}

data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}

data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}

data: [DONE]

Flaskストリーミングの例:

from flask import Flask, Response, stream_with_context
import requests
import json
import os

app = Flask(__name__)

@app.route('/stream-workflow')
def stream_workflow():
    """Stream workflow execution to the client."""

    def generate():
        response = requests.post(
            'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
            headers={
                'Content-Type': 'application/json',
                'X-API-Key': os.getenv('SIM_API_KEY')   
            },
            json={
                'message': 'Generate a story',
                'stream': True,
                'selectedOutputs': ['agent1.content']
            },
            stream=True
        )

        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith('data: '):
                    data = decoded_line[6:]  # Remove 'data: ' prefix

                    if data == '[DONE]':
                        break

                    try:
                        parsed = json.loads(data)
                        if 'chunk' in parsed:
                            yield f"data: {json.dumps(parsed)}\n\n"
                        elif parsed.get('event') == 'done':
                            yield f"data: {json.dumps(parsed)}\n\n"
                            print("Execution complete:", parsed.get('metadata'))
                    except json.JSONDecodeError:
                        pass

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

if __name__ == '__main__':
    app.run(debug=True)

環境設定

環境変数を使用してクライアントを設定します:

import os
from simstudio import SimStudioClient

# Development configuration
client = SimStudioClient(
    api_key=os.getenv("SIM_API_KEY")
    base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)
import os
from simstudio import SimStudioClient

# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
    raise ValueError("SIM_API_KEY environment variable is required")

client = SimStudioClient(
    api_key=api_key,
    base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)

APIキーの取得方法

Simに移動してアカウントにログインします。

プログラムで実行したいワークフローに移動します。

まだデプロイされていない場合は、「デプロイ」をクリックしてワークフローをデプロイします。

デプロイプロセス中に、APIキーを選択または作成します。

Pythonアプリケーションで使用するAPIキーをコピーします。

要件

  • Python 3.8+
  • requests >= 2.25.0

ライセンス

Apache-2.0