Simの公式Python SDKを使用すると、公式Python SDKを使用してPythonアプリケーションからプログラムでワークフローを実行できます。
Python SDKはPython 3.8以上をサポートし、非同期実行、指数バックオフによる自動レート制限、使用状況追跡機能を提供します。
インストール
pipを使用してSDKをインストールします:
pip install simstudio-sdkクイックスタート
以下は、始めるための簡単な例です:
from simstudio import SimStudioClient
# Initialize the client
client = SimStudioClient(
api_key="your-api-key-here",
base_url="https://sim.ai" # optional, defaults to https://sim.ai
)
# Execute a workflow
try:
result = client.execute_workflow("workflow-id")
print("Workflow executed successfully:", result)
except Exception as error:
print("Workflow execution failed:", error)APIリファレンス
SimStudioClient
コンストラクタ
SimStudioClient(api_key: str, base_url: str = "https://sim.ai")パラメータ:
api_key(str): SimのAPIキーbase_url(str, オプション): Sim APIのベースURL
メソッド
execute_workflow()
オプションの入力データでワークフローを実行します。
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Hello, world!"},
timeout=30.0 # 30 seconds
)パラメータ:
workflow_id(str): 実行するワークフローのIDinput_data(dict, オプション): ワークフローに渡す入力データtimeout(float, オプション): タイムアウト(秒)(デフォルト: 30.0)stream(bool, オプション): ストリーミングレスポンスを有効にする(デフォルト: False)selected_outputs(list[str], オプション):blockName.attribute形式でストリーミングするブロック出力(例:["agent1.content"])async_execution(bool, オプション): 非同期実行(デフォルト: False)
戻り値: WorkflowExecutionResult | AsyncExecutionResult
async_execution=Trueの場合、ポーリング用のタスクIDをすぐに返します。それ以外の場合は、完了を待ちます。
get_workflow_status()
ワークフローのステータス(デプロイメントステータスなど)を取得します。
status = client.get_workflow_status("workflow-id")
print("Is deployed:", status.is_deployed)パラメータ:
workflow_id(str): ワークフローのID
戻り値: WorkflowStatus
validate_workflow()
ワークフローが実行準備ができているかを検証します。
is_ready = client.validate_workflow("workflow-id")
if is_ready:
# Workflow is deployed and ready
passパラメータ:
workflow_id(str): ワークフローのID
戻り値: bool
get_job_status()
非同期ジョブ実行のステータスを取得します。
status = client.get_job_status("task-id-from-async-execution")
print("Status:", status["status"]) # 'queued', 'processing', 'completed', 'failed'
if status["status"] == "completed":
print("Output:", status["output"])パラメータ:
task_id(str): 非同期実行から返されたタスクID
戻り値: Dict[str, Any]
レスポンスフィールド:
success(bool): リクエストが成功したかどうかtaskId(str): タスクIDstatus(str): 次のいずれか:'queued','processing','completed','failed','cancelled'metadata(dict):startedAt,completedAt,durationを含むoutput(any, オプション): ワークフロー出力(完了時)error(any, オプション): エラー詳細(失敗時)estimatedDuration(int, オプション): 推定所要時間(ミリ秒)(処理中/キュー時)
execute_with_retry()
指数バックオフを使用してレート制限エラーで自動的に再試行するワークフロー実行。
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Hello"},
timeout=30.0,
max_retries=3, # Maximum number of retries
initial_delay=1.0, # Initial delay in seconds
max_delay=30.0, # Maximum delay in seconds
backoff_multiplier=2.0 # Exponential backoff multiplier
)パラメータ:
workflow_id(str): 実行するワークフローのIDinput_data(dict, オプション): ワークフローに渡す入力データtimeout(float, オプション): タイムアウト(秒)stream(bool, オプション): ストリーミングレスポンスを有効にするselected_outputs(list, オプション): ストリーミングするブロック出力async_execution(bool, オプション): 非同期実行max_retries(int, オプション): 最大再試行回数(デフォルト: 3)initial_delay(float, オプション): 初期遅延(秒)(デフォルト: 1.0)max_delay(float, オプション): 最大遅延(秒)(デフォルト: 30.0)backoff_multiplier(float, オプション): バックオフ乗数(デフォルト: 2.0)
戻り値: WorkflowExecutionResult | AsyncExecutionResult
リトライロジックは、サンダリングハード問題を防ぐために±25%のジッターを伴う指数バックオフ(1秒→2秒→4秒→8秒...)を使用します。APIが retry-after ヘッダーを提供する場合、代わりにそれが使用されます。
get_rate_limit_info()
最後のAPIレスポンスから現在のレート制限情報を取得します。
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
print("Limit:", rate_limit_info.limit)
print("Remaining:", rate_limit_info.remaining)
print("Reset:", datetime.fromtimestamp(rate_limit_info.reset))戻り値: RateLimitInfo | None
get_usage_limits()
アカウントの現在の使用制限とクォータ情報を取得します。
limits = client.get_usage_limits()
print("Sync requests remaining:", limits.rate_limit["sync"]["remaining"])
print("Async requests remaining:", limits.rate_limit["async"]["remaining"])
print("Current period cost:", limits.usage["currentPeriodCost"])
print("Plan:", limits.usage["plan"])戻り値: UsageLimits
レスポンス構造:
{
"success": bool,
"rateLimit": {
"sync": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"async": {
"isLimited": bool,
"limit": int,
"remaining": int,
"resetAt": str
},
"authType": str # 'api' or 'manual'
},
"usage": {
"currentPeriodCost": float,
"limit": float,
"plan": str # e.g., 'free', 'pro'
}
}set_api_key()
APIキーを更新します。
client.set_api_key("new-api-key")set_base_url()
ベースURLを更新します。
client.set_base_url("https://my-custom-domain.com")close()
基盤となるHTTPセッションを閉じます。
client.close()データクラス
WorkflowExecutionResult
@dataclass
class WorkflowExecutionResult:
success: bool
output: Optional[Any] = None
error: Optional[str] = None
logs: Optional[List[Any]] = None
metadata: Optional[Dict[str, Any]] = None
trace_spans: Optional[List[Any]] = None
total_duration: Optional[float] = NoneAsyncExecutionResult
@dataclass
class AsyncExecutionResult:
success: bool
task_id: str
status: str # 'queued'
created_at: str
links: Dict[str, str] # e.g., {"status": "/api/jobs/{taskId}"}WorkflowStatus
@dataclass
class WorkflowStatus:
is_deployed: bool
deployed_at: Optional[str] = None
needs_redeployment: bool = FalseRateLimitInfo
@dataclass
class RateLimitInfo:
limit: int
remaining: int
reset: int
retry_after: Optional[int] = NoneUsageLimits
@dataclass
class UsageLimits:
success: bool
rate_limit: Dict[str, Any]
usage: Dict[str, Any]SimStudioError
class SimStudioError(Exception):
def __init__(self, message: str, code: Optional[str] = None, status: Optional[int] = None):
super().__init__(message)
self.code = code
self.status = status一般的なエラーコード:
UNAUTHORIZED: 無効なAPIキーTIMEOUT: リクエストがタイムアウトしましたRATE_LIMIT_EXCEEDED: レート制限を超えましたUSAGE_LIMIT_EXCEEDED: 使用制限を超えましたEXECUTION_ERROR: ワークフローの実行に失敗しました
例
基本的なワークフロー実行
APIキーを使用してSimStudioClientをセットアップします。
ワークフローがデプロイされ、実行準備ができているか確認します。
入力データでワークフローを実行します。
実行結果を処理し、エラーがあれば対処します。
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def run_workflow():
try:
# Check if workflow is ready
is_ready = client.validate_workflow("my-workflow-id")
if not is_ready:
raise Exception("Workflow is not deployed or ready")
# Execute the workflow
result = client.execute_workflow(
"my-workflow-id",
input_data={
"message": "Process this data",
"user_id": "12345"
}
)
if result.success:
print("Output:", result.output)
print("Duration:", result.metadata.get("duration") if result.metadata else None)
else:
print("Workflow failed:", result.error)
except Exception as error:
print("Error:", error)
run_workflow()エラー処理
ワークフロー実行中に発生する可能性のある様々なタイプのエラーを処理します:
from simstudio import SimStudioClient, SimStudioError
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_error_handling():
try:
result = client.execute_workflow("workflow-id")
return result
except SimStudioError as error:
if error.code == "UNAUTHORIZED":
print("Invalid API key")
elif error.code == "TIMEOUT":
print("Workflow execution timed out")
elif error.code == "USAGE_LIMIT_EXCEEDED":
print("Usage limit exceeded")
elif error.code == "INVALID_JSON":
print("Invalid JSON in request body")
else:
print(f"Workflow error: {error}")
raise
except Exception as error:
print(f"Unexpected error: {error}")
raiseコンテキストマネージャーの使用
リソースのクリーンアップを自動的に処理するためにクライアントをコンテキストマネージャーとして使用します:
from simstudio import SimStudioClient
import os
# Using context manager to automatically close the session
with SimStudioClient(api_key=os.getenv("SIM_API_KEY")) as client:
result = client.execute_workflow("workflow-id")
print("Result:", result)
# Session is automatically closed hereバッチワークフロー実行
複数のワークフローを効率的に実行します:
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_workflows_batch(workflow_data_pairs):
"""Execute multiple workflows with different input data."""
results = []
for workflow_id, input_data in workflow_data_pairs:
try:
# Validate workflow before execution
if not client.validate_workflow(workflow_id):
print(f"Skipping {workflow_id}: not deployed")
continue
result = client.execute_workflow(workflow_id, input_data)
results.append({
"workflow_id": workflow_id,
"success": result.success,
"output": result.output,
"error": result.error
})
except Exception as error:
results.append({
"workflow_id": workflow_id,
"success": False,
"error": str(error)
})
return results
# Example usage
workflows = [
("workflow-1", {"type": "analysis", "data": "sample1"}),
("workflow-2", {"type": "processing", "data": "sample2"}),
]
results = execute_workflows_batch(workflows)
for result in results:
print(f"Workflow {result['workflow_id']}: {'Success' if result['success'] else 'Failed'}")非同期ワークフロー実行
長時間実行されるタスクのためにワークフローを非同期で実行します:
import os
import time
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_async():
try:
# Start async execution
result = client.execute_workflow(
"workflow-id",
input_data={"data": "large dataset"},
async_execution=True # Execute asynchronously
)
# Check if result is an async execution
if hasattr(result, 'task_id'):
print(f"Task ID: {result.task_id}")
print(f"Status endpoint: {result.links['status']}")
# Poll for completion
status = client.get_job_status(result.task_id)
while status["status"] in ["queued", "processing"]:
print(f"Current status: {status['status']}")
time.sleep(2) # Wait 2 seconds
status = client.get_job_status(result.task_id)
if status["status"] == "completed":
print("Workflow completed!")
print(f"Output: {status['output']}")
print(f"Duration: {status['metadata']['duration']}")
else:
print(f"Workflow failed: {status['error']}")
except Exception as error:
print(f"Error: {error}")
execute_async()レート制限とリトライ
指数バックオフを使用して自動的にレート制限を処理します:
import os
from simstudio import SimStudioClient, SimStudioError
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_retry_handling():
try:
# Automatically retries on rate limit
result = client.execute_with_retry(
"workflow-id",
input_data={"message": "Process this"},
max_retries=5,
initial_delay=1.0,
max_delay=60.0,
backoff_multiplier=2.0
)
print(f"Success: {result}")
except SimStudioError as error:
if error.code == "RATE_LIMIT_EXCEEDED":
print("Rate limit exceeded after all retries")
# Check rate limit info
rate_limit_info = client.get_rate_limit_info()
if rate_limit_info:
from datetime import datetime
reset_time = datetime.fromtimestamp(rate_limit_info.reset)
print(f"Rate limit resets at: {reset_time}")
execute_with_retry_handling()使用状況モニタリング
アカウントの使用状況と制限をモニタリングします:
import os
from simstudio import SimStudioClient
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def check_usage():
try:
limits = client.get_usage_limits()
print("=== Rate Limits ===")
print("Sync requests:")
print(f" Limit: {limits.rate_limit['sync']['limit']}")
print(f" Remaining: {limits.rate_limit['sync']['remaining']}")
print(f" Resets at: {limits.rate_limit['sync']['resetAt']}")
print(f" Is limited: {limits.rate_limit['sync']['isLimited']}")
print("\nAsync requests:")
print(f" Limit: {limits.rate_limit['async']['limit']}")
print(f" Remaining: {limits.rate_limit['async']['remaining']}")
print(f" Resets at: {limits.rate_limit['async']['resetAt']}")
print(f" Is limited: {limits.rate_limit['async']['isLimited']}")
print("\n=== Usage ===")
print(f"Current period cost: ${limits.usage['currentPeriodCost']:.2f}")
print(f"Limit: ${limits.usage['limit']:.2f}")
print(f"Plan: {limits.usage['plan']}")
percent_used = (limits.usage['currentPeriodCost'] / limits.usage['limit']) * 100
print(f"Usage: {percent_used:.1f}%")
if percent_used > 80:
print("⚠️ Warning: You are approaching your usage limit!")
except Exception as error:
print(f"Error checking usage: {error}")
check_usage()ワークフローの実行ストリーミング
リアルタイムのストリーミングレスポンスでワークフローを実行します:
from simstudio import SimStudioClient
import os
client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
def execute_with_streaming():
"""Execute workflow with streaming enabled."""
try:
# Enable streaming for specific block outputs
result = client.execute_workflow(
"workflow-id",
input_data={"message": "Count to five"},
stream=True,
selected_outputs=["agent1.content"] # Use blockName.attribute format
)
print("Workflow result:", result)
except Exception as error:
print("Error:", error)
execute_with_streaming()ストリーミングレスポンスはServer-Sent Events(SSE)形式に従います:
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":"One"}
data: {"blockId":"7b7735b9-19e5-4bd6-818b-46aae2596e9f","chunk":", two"}
data: {"event":"done","success":true,"output":{},"metadata":{"duration":610}}
data: [DONE]Flaskストリーミングの例:
from flask import Flask, Response, stream_with_context
import requests
import json
import os
app = Flask(__name__)
@app.route('/stream-workflow')
def stream_workflow():
"""Stream workflow execution to the client."""
def generate():
response = requests.post(
'https://sim.ai/api/workflows/WORKFLOW_ID/execute',
headers={
'Content-Type': 'application/json',
'X-API-Key': os.getenv('SIM_API_KEY')
},
json={
'message': 'Generate a story',
'stream': True,
'selectedOutputs': ['agent1.content']
},
stream=True
)
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith('data: '):
data = decoded_line[6:] # Remove 'data: ' prefix
if data == '[DONE]':
break
try:
parsed = json.loads(data)
if 'chunk' in parsed:
yield f"data: {json.dumps(parsed)}\n\n"
elif parsed.get('event') == 'done':
yield f"data: {json.dumps(parsed)}\n\n"
print("Execution complete:", parsed.get('metadata'))
except json.JSONDecodeError:
pass
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
if __name__ == '__main__':
app.run(debug=True)環境設定
環境変数を使用してクライアントを設定します:
import os
from simstudio import SimStudioClient
# Development configuration
client = SimStudioClient(
api_key=os.getenv("SIM_API_KEY")
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)import os
from simstudio import SimStudioClient
# Production configuration with error handling
api_key = os.getenv("SIM_API_KEY")
if not api_key:
raise ValueError("SIM_API_KEY environment variable is required")
client = SimStudioClient(
api_key=api_key,
base_url=os.getenv("SIM_BASE_URL", "https://sim.ai")
)APIキーの取得方法
Simに移動してアカウントにログインします。
プログラムで実行したいワークフローに移動します。
まだデプロイされていない場合は、「デプロイ」をクリックしてワークフローをデプロイします。
デプロイプロセス中に、APIキーを選択または作成します。
Pythonアプリケーションで使用するAPIキーをコピーします。
要件
- Python 3.8+
- requests >= 2.25.0
ライセンス
Apache-2.0