Summary
The workflow engine's fan-out step accepts a max_concurrency field (added with the engine in #2158 / #2142) but the engine never honors it. FanOutStep.execute records the value in the step output, and WorkflowEngine._execute_steps then runs the per-item templates in a plain sequential for loop (src/specify_cli/workflows/engine.py, the if step_type == "fan-out": branch):
for item_idx, item_val in enumerate(result.output["items"]):
context.item = item_val
...
self._execute_steps([item_step], context, state, registry, step_offset=-1)
So a workflow that sets max_concurrency: 4 still runs its items one at a time. A fan-out item typically dispatches a command step that shells out to an agent CLI — a blocking subprocess that releases the GIL — so there is real wall-clock parallelism available that the engine currently leaves on the table.
Proposed direction
Honor max_concurrency by dispatching items on a bounded concurrent.futures.ThreadPoolExecutor when it is > 1, and taking the existing sequential path when it is <= 1 (the default), so existing workflows are unchanged. Keep it opt-in, default-sequential, and order-preserving:
- results assembled in item order regardless of completion order (fan-in reads them positionally, so the join is unaffected),
- the
parentId:templateId:index per-item id grammar unchanged,
- halt-on-first-failure preserved, and
state.json persistence made concurrency-safe (it currently serializes the live step_results dict via a plain open("w"), which a concurrent fan-out can both interleave on disk and mutate mid-serialization).
I have a small PR ready.
AI disclosure (per CONTRIBUTING.md)
This issue and the accompanying PR were authored with AI assistance (Claude Code), from a fan-out concurrency investigation; the change was verified by running the repo's test suite and ruff locally.
cc @mnriem — would appreciate your thoughts on the approach when you have a moment.
Summary
The workflow engine's
fan-outstep accepts amax_concurrencyfield (added with the engine in #2158 / #2142) but the engine never honors it.FanOutStep.executerecords the value in the step output, andWorkflowEngine._execute_stepsthen runs the per-item templates in a plain sequentialforloop (src/specify_cli/workflows/engine.py, theif step_type == "fan-out":branch):So a workflow that sets
max_concurrency: 4still runs its items one at a time. A fan-out item typically dispatches acommandstep that shells out to an agent CLI — a blocking subprocess that releases the GIL — so there is real wall-clock parallelism available that the engine currently leaves on the table.Proposed direction
Honor
max_concurrencyby dispatching items on a boundedconcurrent.futures.ThreadPoolExecutorwhen it is> 1, and taking the existing sequential path when it is<= 1(the default), so existing workflows are unchanged. Keep it opt-in, default-sequential, and order-preserving:parentId:templateId:indexper-item id grammar unchanged,state.jsonpersistence made concurrency-safe (it currently serializes the livestep_resultsdict via a plainopen("w"), which a concurrent fan-out can both interleave on disk and mutate mid-serialization).I have a small PR ready.
AI disclosure (per CONTRIBUTING.md)
This issue and the accompanying PR were authored with AI assistance (Claude Code), from a fan-out concurrency investigation; the change was verified by running the repo's test suite and ruff locally.
cc @mnriem — would appreciate your thoughts on the approach when you have a moment.