Sim

并行块 YAML 模式

并行块的 YAML 配置参考

模式定义

type: object
required:
  - type
  - name
  - inputs
  - connections
properties:
  type:
    type: string
    enum: [parallel]
    description: Block type identifier
  name:
    type: string
    description: Display name for this parallel block
  inputs:
    type: object
    required:
      - parallelType
    properties:
      parallelType:
        type: string
        enum: [count, collection]
        description: Type of parallel execution
      count:
        type: number
        description: Number of parallel instances (for 'count' type)
        minimum: 1
        maximum: 100
      collection:
        type: string
        description: Collection to distribute across instances (for 'collection' type)
      maxConcurrency:
        type: number
        description: Maximum concurrent executions
        default: 10
        minimum: 1
        maximum: 50
  connections:
    type: object
    required:
      - parallel
    properties:
      parallel:
        type: object
        required:
          - start
        properties:
          start:
            type: string
            description: Target block ID to execute inside each parallel instance
          end:
            type: string
            description: Target block ID after all parallel instances complete (optional)
      error:
        type: string
        description: Target block ID for error handling

连接配置

并行块使用一种特殊的连接格式,其中包含一个 parallel 部分:

connections:
  parallel:
    start: <string>                     # Target block ID to execute inside each parallel instance
    end: <string>                       # Target block ID after all instances complete (optional)
  error: <string>                       # Target block ID for error handling (optional)

子块配置

并行块中的子块必须将其 parentId 设置为并行块的 ID:

parallel-1:
  type: parallel
  name: "Process Items"
  inputs:
    parallelType: collection
    collection: <start.items>
  connections:
    parallel:
      start: process-item
      end: aggregate-results

# Child block inside the parallel
process-item:
  type: agent
  name: "Process Item"
  parentId: parallel-1                  # References the parallel block
  inputs:
    systemPrompt: "Process this item"
    userPrompt: <parallel.currentItem>
    model: gpt-4o
    apiKey: '{{OPENAI_API_KEY}}'

示例

基于计数的并行处理

worker-parallel:
  type: parallel
  name: "Worker Parallel"
  inputs:
    parallelType: count
    count: 5
    maxConcurrency: 3
  connections:
    parallel:
      start: worker-task
      end: collect-worker-results

worker-task:
  type: api
  name: "Worker Task"
  parentId: worker-parallel
  inputs:
    url: "https://api.worker.com/process"
    method: POST
    headers:
      - key: "Authorization"
        value: "Bearer {{WORKER_API_KEY}}"
    body: |
      {
        "instanceId": <parallel.index>,
        "timestamp": "{{new Date().toISOString()}}"
      }
  connections:
    success: worker-complete

基于集合的并行处理

api-parallel:
  type: parallel
  name: "API Parallel"
  inputs:
    parallelType: collection
    collection: <start.apiEndpoints>
    maxConcurrency: 10
  connections:
    parallel:
      start: call-api
      end: merge-api-results

call-api:
  type: api
  name: "Call API"
  parentId: api-parallel
  inputs:
    url: <parallel.currentItem.endpoint>
    method: <parallel.currentItem.method>
    headers:
      - key: "Authorization"
        value: "Bearer {{API_TOKEN}}"
  connections:
    success: api-complete

复杂的并行处理管道

data-processing-parallel:
  type: parallel
  name: "Data Processing Parallel"
  inputs:
    parallelType: collection
    collection: <data-loader.records>
    maxConcurrency: 8
  connections:
    parallel:
      start: validate-data
      end: final-aggregation
    error: parallel-error-handler

validate-data:
  type: function
  name: "Validate Data"
  parentId: data-processing-parallel
  inputs:
    code: |
      const record = <parallel.currentItem>;
      const index = <parallel.index>;
      
      // Validate record structure
      if (!record.id || !record.content) {
        throw new Error(`Invalid record at index ${index}`);
      }
      
      return {
        valid: true,
        recordId: record.id,
        validatedAt: new Date().toISOString()
      };
  connections:
    success: process-data
    error: validation-error

process-data:
  type: agent
  name: "Process Data"
  parentId: data-processing-parallel
  inputs:
    systemPrompt: "Process and analyze this data record"
    userPrompt: |
      Record ID: <validatedata.recordId>
      Content: <parallel.currentItem.content>
      Instance: <parallel.index>
    model: gpt-4o
    temperature: 0.3
    apiKey: '{{OPENAI_API_KEY}}'
  connections:
    success: store-result

store-result:
  type: function
  name: "Store Result"
  parentId: data-processing-parallel
  inputs:
    code: |
      const processed = <processdata.content>;
      const recordId = <validatedata.recordId>;
      
      return {
        recordId,
        processed,
        completedAt: new Date().toISOString(),
        instanceIndex: <parallel.index>
      };

并发 AI 分析

multi-model-parallel:
  type: parallel
  name: "Multi-Model Analysis"
  inputs:
    parallelType: collection
    collection: |
      [
        {"model": "gpt-4o", "focus": "technical accuracy"},
        {"model": "claude-3-5-sonnet-20241022", "focus": "creative quality"},
        {"model": "gemini-2.0-flash-exp", "focus": "factual verification"}
      ]
    maxConcurrency: 3
  connections:
    parallel:
      start: analyze-content
      end: combine-analyses

analyze-content:
  type: agent
  name: "Analyze Content"
  parentId: multi-model-parallel
  inputs:
    systemPrompt: |
      You are analyzing content with a focus on <parallel.currentItem.focus>.
      Provide detailed analysis from this perspective.
    userPrompt: |
      Content to analyze: <start.content>
      Analysis focus: <parallel.currentItem.focus>
    model: <parallel.currentItem.model>
    apiKey: '{{OPENAI_API_KEY}}'
  connections:
    success: analysis-complete

并行变量

在并行子块中,可以使用以下特殊变量:

# Available in all child blocks of the parallel
<parallel.index>                # Instance number (0-based)
<parallel.currentItem>          # Item for this instance (collection type)
<parallel.items>                # Full collection (collection type)

输出引用

并行块完成后,您可以引用其聚合结果:

# In blocks after the parallel
final-processor:
  inputs:
    all-results: <parallel-name.results>  # Array of all instance results
    total-count: <parallel-name.count>    # Number of instances completed

最佳实践

  • 使用适当的 maxConcurrency 以避免对 API 造成过载
  • 确保操作是独立的,且不相互依赖
  • 包含错误处理以实现稳健的并行执行
  • 先用小型集合进行测试
  • 监控外部 API 的速率限制
  • 使用集合类型分配工作,使用计数类型处理固定实例
  • 考虑大规模集合的内存使用情况
并行块 YAML 模式