Esquema YAML de bloque paralelo
Referencia de configuración YAML para bloques paralelos
Definición del esquema
type: object
required:
- type
- name
- inputs
- connections
properties:
type:
type: string
enum: [parallel]
description: Block type identifier
name:
type: string
description: Display name for this parallel block
inputs:
type: object
required:
- parallelType
properties:
parallelType:
type: string
enum: [count, collection]
description: Type of parallel execution
count:
type: number
description: Number of parallel instances (for 'count' type)
minimum: 1
maximum: 100
collection:
type: string
description: Collection to distribute across instances (for 'collection' type)
maxConcurrency:
type: number
description: Maximum concurrent executions
default: 10
minimum: 1
maximum: 50
connections:
type: object
required:
- parallel
properties:
parallel:
type: object
required:
- start
properties:
start:
type: string
description: Target block ID to execute inside each parallel instance
end:
type: string
description: Target block ID after all parallel instances complete (optional)
error:
type: string
description: Target block ID for error handling
Configuración de conexión
Los bloques paralelos utilizan un formato de conexión especial con una sección parallel
:
connections:
parallel:
start: <string> # Target block ID to execute inside each parallel instance
end: <string> # Target block ID after all instances complete (optional)
error: <string> # Target block ID for error handling (optional)
Configuración de bloques secundarios
Los bloques dentro de un bloque paralelo deben tener su parentId
configurado con el ID del bloque paralelo:
parallel-1:
type: parallel
name: "Process Items"
inputs:
parallelType: collection
collection: <start.items>
connections:
parallel:
start: process-item
end: aggregate-results
# Child block inside the parallel
process-item:
type: agent
name: "Process Item"
parentId: parallel-1 # References the parallel block
inputs:
systemPrompt: "Process this item"
userPrompt: <parallel.currentItem>
model: gpt-4o
apiKey: '{{OPENAI_API_KEY}}'
Ejemplos
Procesamiento paralelo basado en conteo
worker-parallel:
type: parallel
name: "Worker Parallel"
inputs:
parallelType: count
count: 5
maxConcurrency: 3
connections:
parallel:
start: worker-task
end: collect-worker-results
worker-task:
type: api
name: "Worker Task"
parentId: worker-parallel
inputs:
url: "https://api.worker.com/process"
method: POST
headers:
- key: "Authorization"
value: "Bearer {{WORKER_API_KEY}}"
body: |
{
"instanceId": <parallel.index>,
"timestamp": "{{new Date().toISOString()}}"
}
connections:
success: worker-complete
Procesamiento paralelo basado en colecciones
api-parallel:
type: parallel
name: "API Parallel"
inputs:
parallelType: collection
collection: <start.apiEndpoints>
maxConcurrency: 10
connections:
parallel:
start: call-api
end: merge-api-results
call-api:
type: api
name: "Call API"
parentId: api-parallel
inputs:
url: <parallel.currentItem.endpoint>
method: <parallel.currentItem.method>
headers:
- key: "Authorization"
value: "Bearer {{API_TOKEN}}"
connections:
success: api-complete
Pipeline de procesamiento paralelo complejo
data-processing-parallel:
type: parallel
name: "Data Processing Parallel"
inputs:
parallelType: collection
collection: <data-loader.records>
maxConcurrency: 8
connections:
parallel:
start: validate-data
end: final-aggregation
error: parallel-error-handler
validate-data:
type: function
name: "Validate Data"
parentId: data-processing-parallel
inputs:
code: |
const record = <parallel.currentItem>;
const index = <parallel.index>;
// Validate record structure
if (!record.id || !record.content) {
throw new Error(`Invalid record at index ${index}`);
}
return {
valid: true,
recordId: record.id,
validatedAt: new Date().toISOString()
};
connections:
success: process-data
error: validation-error
process-data:
type: agent
name: "Process Data"
parentId: data-processing-parallel
inputs:
systemPrompt: "Process and analyze this data record"
userPrompt: |
Record ID: <validatedata.recordId>
Content: <parallel.currentItem.content>
Instance: <parallel.index>
model: gpt-4o
temperature: 0.3
apiKey: '{{OPENAI_API_KEY}}'
connections:
success: store-result
store-result:
type: function
name: "Store Result"
parentId: data-processing-parallel
inputs:
code: |
const processed = <processdata.content>;
const recordId = <validatedata.recordId>;
return {
recordId,
processed,
completedAt: new Date().toISOString(),
instanceIndex: <parallel.index>
};
Análisis de IA concurrente
multi-model-parallel:
type: parallel
name: "Multi-Model Analysis"
inputs:
parallelType: collection
collection: |
[
{"model": "gpt-4o", "focus": "technical accuracy"},
{"model": "claude-3-5-sonnet-20241022", "focus": "creative quality"},
{"model": "gemini-2.0-flash-exp", "focus": "factual verification"}
]
maxConcurrency: 3
connections:
parallel:
start: analyze-content
end: combine-analyses
analyze-content:
type: agent
name: "Analyze Content"
parentId: multi-model-parallel
inputs:
systemPrompt: |
You are analyzing content with a focus on <parallel.currentItem.focus>.
Provide detailed analysis from this perspective.
userPrompt: |
Content to analyze: <start.content>
Analysis focus: <parallel.currentItem.focus>
model: <parallel.currentItem.model>
apiKey: '{{OPENAI_API_KEY}}'
connections:
success: analysis-complete
Variables paralelas
Dentro de los bloques secundarios paralelos, estas variables especiales están disponibles:
# Available in all child blocks of the parallel
<parallel.index> # Instance number (0-based)
<parallel.currentItem> # Item for this instance (collection type)
<parallel.items> # Full collection (collection type)
Referencias de salida
Después de que un bloque paralelo se completa, puedes hacer referencia a sus resultados agregados:
# In blocks after the parallel
final-processor:
inputs:
all-results: <parallel-name.results> # Array of all instance results
total-count: <parallel-name.count> # Number of instances completed
Mejores prácticas
- Utiliza un maxConcurrency apropiado para evitar sobrecargar las APIs
- Asegúrate de que las operaciones sean independientes y no dependan entre sí
- Incluye manejo de errores para una ejecución paralela robusta
- Prueba primero con colecciones pequeñas
- Monitorea los límites de frecuencia para APIs externas
- Usa el tipo collection para distribuir trabajo, tipo count para instancias fijas
- Considera el uso de memoria con colecciones grandes