feat(workflows): honor max_concurrency in fan-out via a bounded thread pool#3224
feat(workflows): honor max_concurrency in fan-out via a bounded thread pool#3224doquanghuy wants to merge 3 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Implements actual parallel execution for workflow fan-out by honoring max_concurrency, while making RunState persistence safe under concurrent execution (locking + atomic JSON writes). This fits into the workflow engine’s execution model by enabling opt-in bounded parallelism for I/O-bound fan-out items without changing default sequential behavior.
Changes:
- Add
WorkflowEngine._run_fan_out()to execute fan-out items sequentially or via a boundedThreadPoolExecutordepending onmax_concurrency. - Make
RunState.save()concurrency-safe via a per-run lock and atomic temp-file writes; route step result recording through a locked helper. - Add workflow tests covering fan-out concurrency behavior, ordering, coercion, and error/exception handling.
Show a summary per file
| File | Description |
|---|---|
src/specify_cli/workflows/engine.py |
Adds bounded concurrent fan-out execution and hardens run-state persistence for concurrency. |
tests/test_workflows.py |
Adds a new test suite validating fan-out concurrency semantics and edge cases. |
Review details
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Files reviewed: 2/2 changed files
- Comments generated: 5
- Review effort level: Low
| context.steps[step_id]["output"] = result.output | ||
| state.step_results[step_id]["output"] = result.output |
| futures: dict[Any, int] = {} | ||
| for idx in range(n): | ||
| if state.status in halting: | ||
| break | ||
| futures[pool.submit(run_isolated, idx)] = idx | ||
| # Collect in submission (item) order: each .result() blocks on that | ||
| # item, so slots fill 0,1,2,… and the collected set is a prefix. | ||
| for fut in list(futures): | ||
| idx = futures[fut] | ||
| try: | ||
| slots[idx] = fut.result() | ||
| except Exception as exc: | ||
| # A genuine exception escaping a step (not a normal step | ||
| # FAILED, which sets state.status) must not be masked: cancel | ||
| # outstanding work and re-raise so the engine marks the run | ||
| # failed instead of reporting a vacuous completion. | ||
| first_exc = exc | ||
| for other in futures: | ||
| other.cancel() | ||
| break | ||
| ran = idx + 1 | ||
| if state.status in halting: | ||
| for other in futures: | ||
| other.cancel() | ||
| break |
| in item order (never completion order), and the first item to reach a halting | ||
| run status (PAUSED/FAILED/ABORTED) stops further dispatch; on halt the | ||
| contiguous prefix of items that ran is returned. A non-int / 0 / negative / | ||
| ``None`` ``max_concurrency`` coerces to 1 (sequential). | ||
| """ |
| t0 = time.time() | ||
| self._run(tmp_path, list(range(4)), 4, on_item) | ||
| assert time.time() - t0 < 0.6 # serialized would be >= 1.2s |
| fan_out_results = self._run_fan_out( | ||
| items, template, step_id, context, state, registry, | ||
| result.output.get("max_concurrency", 1), | ||
| ) |
|
Thanks @copilot — all five points addressed at root cause (latest: d4479ed):
Coverage added: concurrent halt-includes-halting-item, continue_on_error-does-not-truncate, and unknown-template-type-matches-sequential. Full workflows suite green (328 passed), @mnriem could you give this a review when you get a chance? 🙏 |
…ut, faithful halt Address the reviewer feedback on the bounded fan-out concurrency: - Sliding submission window: keep at most `workers` items in flight and stop launching new items once the run is halting, instead of submitting all items up front (which let the pool keep starting queued work after a halt). - Faithful halt prefix: attribute a halt to the specific item whose own recorded result halted the run (replaying the sequential break condition, honoring continue_on_error/aborted), not the shared run status a later concurrent item may have flipped. The returned prefix now includes the actual halting item, matching the sequential path. An item that fails before recording a result (e.g. an unknown step type) is attributed too, since every item runs the same template. - Lock the parent fan-out output mutation: route the post-fan-out step_results[...]['output'] update through a new RunState.set_step_output() under the run lock, so it cannot race a concurrent save(). - Docstring: describe int() coercion accurately (numeric strings / floats are honored; only non-coercible or <= 1 runs sequentially). Tests: add concurrent halt-includes-halting-item, continue_on_error-does-not- truncate, and unknown-template-type-matches-sequential coverage; make the timing test use a monotonic clock with a looser threshold to avoid CI flakiness.
38c7798 to
d4479ed
Compare
| # Guards step_results mutation and save() so a concurrent fan-out cannot | ||
| # mutate the dict while save() is serializing it (which would raise | ||
| # "dictionary changed size during iteration"). | ||
| self._lock = threading.Lock() |
| self._execute_steps( | ||
| [item_step], item_ctx, state, registry, step_offset=-1, | ||
| ) | ||
| return context.steps.get(item_step["id"], {}).get("output", {}) |
| try: | ||
| workers = max(1, int(max_concurrency)) | ||
| except (TypeError, ValueError): | ||
| workers = 1 |
| def test_concurrency_is_real(self, tmp_path): | ||
| import time | ||
|
|
||
| n = 4 | ||
| delay = 0.2 | ||
|
|
||
| def on_item(item): | ||
| time.sleep(delay) | ||
| return None | ||
|
|
||
| # Monotonic clock (immune to wall-clock adjustments). All n items run | ||
| # concurrently, so elapsed is ~one delay; a generous bound well under the | ||
| # serialized baseline keeps a clear gap while tolerating slow/loaded CI. | ||
| serialized = n * delay | ||
| t0 = time.monotonic() | ||
| self._run(tmp_path, list(range(n)), n, on_item) | ||
| elapsed = time.monotonic() - t0 | ||
| assert elapsed < serialized * 0.6 # serialized would be >= n*delay | ||
|
|
| with ThreadPoolExecutor(max_workers=workers) as pool: | ||
| futures: dict[int, Future] = {} | ||
| next_submit = 0 | ||
| for idx in range(n): |
- append_log: serialize the log_entries append + log.jsonl write under a dedicated RunState._log_lock so concurrent fan-out workers can't interleave or corrupt log lines (kept separate from the state lock; never nested). - _run_fan_out.run_item: read the item output back through the item_ctx it executed against rather than the outer context closure — clearer and robust if StepContext ever stops sharing the steps dict by reference. - StepBase: document the thread-safety contract — STEP_REGISTRY holds one shared instance per type, so concurrent fan-out invokes execute() on the same object; implementations must be stateless/thread-safe (the built-ins already are). - test_concurrency_is_real: prove parallelism deterministically with a threading.Barrier (sequential execution can't clear it) instead of a wall-clock timing assertion.
|
Thanks @copilot — second-pass comments addressed at root cause in ce352a3:
Full workflows suite green (328 passed), @mnriem would appreciate your review when you have a chance 🙏 |
Description
Closes #3222.
The
fan-outstep has carried amax_concurrencyfield since the workflow engine landed (#2158), but the engine ignored it:_execute_stepsran fan-out items in a sequentialforloop andmax_concurrencywas only recorded in the step output. This honors it.A new
WorkflowEngine._run_fan_outruns items on a boundedThreadPoolExecutorwhenmax_concurrency > 1, and takes the existing sequential path when<= 1(the default) — so existing workflows are byte-for-byte unchanged. Results are always assembled in item order (a preallocated slot per item, collected in submission order), never completion order, so fan-in — which reads them positionally — is unaffected. TheparentId:templateId:indexid grammar and halt-on-first-failure are preserved;max_concurrencyis coerced withint(): a value that cannot be coerced (None, a non-numeric string) or that coerces to<= 1runs sequentially, while a numeric string like"4"or a float like4.0is honored.Fan-out items are I/O-bound — each typically dispatches a
commandstep that spawns a blocking agent-CLI subprocess, which releases the GIL — so a thread pool yields real wall-clock parallelism.Two concurrency care points:
dataclasses.replace(context, item=…), socontext.itemis never clobbered across threads; the sharedstepsdict is written only on the disjointparent:template:indexkey.RunState.save()previously serialized the livestep_resultsdict via a plainopen("w"), so a concurrent fan-out could both interleave on-disk writes and mutate the dict mid-json.dump(dictionary changed size during iteration).save()is now held under a per-run lock and written atomically (temp file +os.replace), and per-item result recording goes through a smallrecord_step_resulthelper under that lock. Sequential runs see only an uncontended lock.A genuine exception escaping an item (as opposed to a normal step
FAILED, which sets the run status) cancels outstanding work and re-raises, so the run is marked failed rather than reporting a vacuous completion.Testing
.venv/bin/python -m pytest tests/test_workflows.py— 325 passed, including 15 newTestFanOutConcurrencycases: K≤1 sequential parity, item-order under forced reverse completion (event chain, no sleeps), real parallelism,max_concurrencycoercion (0 / negative / None / non-int / string), per-thread item isolation, halt-on-failure prefix, and first-exception cancel + re-raise.test_timestamp_branches/ git extension) that fail identically onmain.uvx ruff check src/ tests/test_workflows.py— cleanuv run specify --helpAI Disclosure
Code, tests, and this description were authored with AI assistance (Claude Code), from a fan-out concurrency investigation; everything was verified by running the repo's test suite and ruff locally.
@mnriem — would appreciate your review when you have a moment. Happy to swap the
save()atomicity for a narrower lock if you'd prefer a smaller change.