Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/specify_cli/workflows/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,40 @@ def _validate_steps(
f"boolean, got {type(coe).__name__}."
)

# Fan-in: every wait_for id must reference a step declared at or before
# this point. An id not yet seen is either a typo (unknown step) or a
# forward reference (the target runs after this fan-in, so its results
# cannot exist yet) — both are wiring errors that previously surfaced as
# a silent empty result + COMPLETED. A step that is declared but only
# conditionally executed (e.g. inside an if/switch branch) is still
# "seen" here, so a legitimately-empty result at runtime stays valid.
if step_type == "fan-in":
wait_for = step_config.get("wait_for")
if isinstance(wait_for, list):
for wid in wait_for:
if not isinstance(wid, str):
# A non-string entry (e.g. YAML `wait_for: [123]`) can
# never match a real step id, so the join is silently
# empty at runtime — surface it as a wiring error.
errors.append(
f"Fan-in step {step_id!r}: 'wait_for' entries must "
f"be step-id strings, got {type(wid).__name__} "
f"({wid!r})."
)
elif wid == step_id:
# The fan-in's own id is already in seen_ids by now, so
# a self-reference would pass the membership check below
# while still producing an empty join at runtime.
errors.append(
f"Fan-in step {step_id!r}: 'wait_for' references "
f"itself; a fan-in cannot wait for its own results."
)
elif wid not in seen_ids:
errors.append(
f"Fan-in step {step_id!r}: 'wait_for' references "
f"unknown or not-yet-declared step id {wid!r}."
)

# Recursively validate nested steps
for nested_key in ("then", "else", "steps"):
nested = step_config.get(nested_key)
Expand Down
122 changes: 122 additions & 0 deletions tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,128 @@ def test_validate_wait_for_not_list(self):
assert any("non-empty list" in e for e in errors)


class TestFanInWaitForValidation:
"""fan-in wait_for must reference a declared step (no silent empty join)."""

@staticmethod
def _errors(yaml_text):
from specify_cli.workflows.engine import (
WorkflowDefinition,
validate_workflow,
)

return validate_workflow(WorkflowDefinition.from_string(yaml_text))

def test_unknown_wait_for_id_is_rejected(self):
errors = self._errors("""
workflow:
id: wf
name: wf
version: "1.0.0"
steps:
- id: collect
type: fan-in
wait_for: [ghost]
""")
assert any(
"unknown or not-yet-declared step id 'ghost'" in e for e in errors
)

def test_wait_for_declared_earlier_step_passes(self):
errors = self._errors("""
workflow:
id: wf
name: wf
version: "1.0.0"
steps:
- id: produce
type: command
command: speckit.implement
- id: collect
type: fan-in
wait_for: [produce]
""")
assert not any("wait_for" in e for e in errors)

def test_wait_for_conditionally_declared_step_passes(self):
# A step declared inside an if-branch may be skipped at runtime, but it is
# still "declared", so referencing it must validate — a legitimately-empty
# runtime join stays valid.
errors = self._errors("""
workflow:
id: wf
name: wf
version: "1.0.0"
steps:
- id: maybe
type: if
condition: "{{ inputs.flag }}"
then:
- id: branch_task
type: command
command: speckit.implement
- id: collect
type: fan-in
wait_for: [branch_task]
""")
assert not any("wait_for" in e for e in errors)

def test_forward_reference_is_rejected(self):
# wait_for points at a step declared AFTER the fan-in; its results cannot
# exist when the fan-in runs, so it is flagged.
errors = self._errors("""
workflow:
id: wf
name: wf
version: "1.0.0"
steps:
- id: collect
type: fan-in
wait_for: [later]
- id: later
type: command
command: speckit.implement
""")
assert any(
"unknown or not-yet-declared step id 'later'" in e for e in errors
)

def test_self_reference_is_rejected(self):
# A fan-in's own id is in scope by the time it is validated, so a
# self-reference slips past the membership check while still producing
# an empty join at runtime.
errors = self._errors("""
workflow:
id: wf
name: wf
version: "1.0.0"
steps:
- id: collect
type: fan-in
wait_for: [collect]
""")
assert any(
"references itself" in e and "collect" in e for e in errors
)

def test_non_string_wait_for_entry_is_rejected(self):
# A non-string entry (e.g. YAML `wait_for: [123]`) can never match a
# real step id, so it must be flagged rather than silently ignored.
errors = self._errors("""
workflow:
id: wf
name: wf
version: "1.0.0"
steps:
- id: collect
type: fan-in
wait_for: [123]
""")
assert any(
"must be step-id strings" in e and "int" in e for e in errors
)


# ===== Workflow Definition Tests =====

class TestWorkflowDefinition:
Expand Down