Sim

Esquema YAML de bloque paralelo

Referencia de configuración YAML para bloques paralelos

Definición del esquema

type: object
required:
  - type
  - name
  - inputs
  - connections
properties:
  type:
    type: string
    enum: [parallel]
    description: Block type identifier
  name:
    type: string
    description: Display name for this parallel block
  inputs:
    type: object
    required:
      - parallelType
    properties:
      parallelType:
        type: string
        enum: [count, collection]
        description: Type of parallel execution
      count:
        type: number
        description: Number of parallel instances (for 'count' type)
        minimum: 1
        maximum: 100
      collection:
        type: string
        description: Collection to distribute across instances (for 'collection' type)
      maxConcurrency:
        type: number
        description: Maximum concurrent executions
        default: 10
        minimum: 1
        maximum: 50
  connections:
    type: object
    required:
      - parallel
    properties:
      parallel:
        type: object
        required:
          - start
        properties:
          start:
            type: string
            description: Target block ID to execute inside each parallel instance
          end:
            type: string
            description: Target block ID after all parallel instances complete (optional)
      error:
        type: string
        description: Target block ID for error handling

Configuración de conexión

Los bloques paralelos utilizan un formato de conexión especial con una sección parallel:

connections:
  parallel:
    start: <string>                     # Target block ID to execute inside each parallel instance
    end: <string>                       # Target block ID after all instances complete (optional)
  error: <string>                       # Target block ID for error handling (optional)

Configuración de bloques secundarios

Los bloques dentro de un bloque paralelo deben tener su parentId configurado con el ID del bloque paralelo:

parallel-1:
  type: parallel
  name: "Process Items"
  inputs:
    parallelType: collection
    collection: <start.items>
  connections:
    parallel:
      start: process-item
      end: aggregate-results

# Child block inside the parallel
process-item:
  type: agent
  name: "Process Item"
  parentId: parallel-1                  # References the parallel block
  inputs:
    systemPrompt: "Process this item"
    userPrompt: <parallel.currentItem>
    model: gpt-4o
    apiKey: '{{OPENAI_API_KEY}}'

Ejemplos

Procesamiento paralelo basado en conteo

worker-parallel:
  type: parallel
  name: "Worker Parallel"
  inputs:
    parallelType: count
    count: 5
    maxConcurrency: 3
  connections:
    parallel:
      start: worker-task
      end: collect-worker-results

worker-task:
  type: api
  name: "Worker Task"
  parentId: worker-parallel
  inputs:
    url: "https://api.worker.com/process"
    method: POST
    headers:
      - key: "Authorization"
        value: "Bearer {{WORKER_API_KEY}}"
    body: |
      {
        "instanceId": <parallel.index>,
        "timestamp": "{{new Date().toISOString()}}"
      }
  connections:
    success: worker-complete

Procesamiento paralelo basado en colecciones

api-parallel:
  type: parallel
  name: "API Parallel"
  inputs:
    parallelType: collection
    collection: <start.apiEndpoints>
    maxConcurrency: 10
  connections:
    parallel:
      start: call-api
      end: merge-api-results

call-api:
  type: api
  name: "Call API"
  parentId: api-parallel
  inputs:
    url: <parallel.currentItem.endpoint>
    method: <parallel.currentItem.method>
    headers:
      - key: "Authorization"
        value: "Bearer {{API_TOKEN}}"
  connections:
    success: api-complete

Pipeline de procesamiento paralelo complejo

data-processing-parallel:
  type: parallel
  name: "Data Processing Parallel"
  inputs:
    parallelType: collection
    collection: <data-loader.records>
    maxConcurrency: 8
  connections:
    parallel:
      start: validate-data
      end: final-aggregation
    error: parallel-error-handler

validate-data:
  type: function
  name: "Validate Data"
  parentId: data-processing-parallel
  inputs:
    code: |
      const record = <parallel.currentItem>;
      const index = <parallel.index>;
      
      // Validate record structure
      if (!record.id || !record.content) {
        throw new Error(`Invalid record at index ${index}`);
      }
      
      return {
        valid: true,
        recordId: record.id,
        validatedAt: new Date().toISOString()
      };
  connections:
    success: process-data
    error: validation-error

process-data:
  type: agent
  name: "Process Data"
  parentId: data-processing-parallel
  inputs:
    systemPrompt: "Process and analyze this data record"
    userPrompt: |
      Record ID: <validatedata.recordId>
      Content: <parallel.currentItem.content>
      Instance: <parallel.index>
    model: gpt-4o
    temperature: 0.3
    apiKey: '{{OPENAI_API_KEY}}'
  connections:
    success: store-result

store-result:
  type: function
  name: "Store Result"
  parentId: data-processing-parallel
  inputs:
    code: |
      const processed = <processdata.content>;
      const recordId = <validatedata.recordId>;
      
      return {
        recordId,
        processed,
        completedAt: new Date().toISOString(),
        instanceIndex: <parallel.index>
      };

Análisis de IA concurrente

multi-model-parallel:
  type: parallel
  name: "Multi-Model Analysis"
  inputs:
    parallelType: collection
    collection: |
      [
        {"model": "gpt-4o", "focus": "technical accuracy"},
        {"model": "claude-3-5-sonnet-20241022", "focus": "creative quality"},
        {"model": "gemini-2.0-flash-exp", "focus": "factual verification"}
      ]
    maxConcurrency: 3
  connections:
    parallel:
      start: analyze-content
      end: combine-analyses

analyze-content:
  type: agent
  name: "Analyze Content"
  parentId: multi-model-parallel
  inputs:
    systemPrompt: |
      You are analyzing content with a focus on <parallel.currentItem.focus>.
      Provide detailed analysis from this perspective.
    userPrompt: |
      Content to analyze: <start.content>
      Analysis focus: <parallel.currentItem.focus>
    model: <parallel.currentItem.model>
    apiKey: '{{OPENAI_API_KEY}}'
  connections:
    success: analysis-complete

Variables paralelas

Dentro de los bloques secundarios paralelos, estas variables especiales están disponibles:

# Available in all child blocks of the parallel
<parallel.index>                # Instance number (0-based)
<parallel.currentItem>          # Item for this instance (collection type)
<parallel.items>                # Full collection (collection type)

Referencias de salida

Después de que un bloque paralelo se completa, puedes hacer referencia a sus resultados agregados:

# In blocks after the parallel
final-processor:
  inputs:
    all-results: <parallel-name.results>  # Array of all instance results
    total-count: <parallel-name.count>    # Number of instances completed

Mejores prácticas

  • Utiliza un maxConcurrency apropiado para evitar sobrecargar las APIs
  • Asegúrate de que las operaciones sean independientes y no dependan entre sí
  • Incluye manejo de errores para una ejecución paralela robusta
  • Prueba primero con colecciones pequeñas
  • Monitorea los límites de frecuencia para APIs externas
  • Usa el tipo collection para distribuir trabajo, tipo count para instancias fijas
  • Considera el uso de memoria con colecciones grandes
Esquema YAML de bloque paralelo