address review: namespaced execution with per-step copy-back

Revert to namespaced step IDs for execution (preserving unique
log entries and state keys per iteration) but copy each step's
result back to the unprefixed key immediately after it completes.

This preserves backward compatibility (same namespaced key format,
same log IDs) while fixing both the condition evaluation bug and
inter-step references within multi-step loop bodies.
This commit is contained in:
Manfred Riem
2026-05-21 10:11:19 -05:00
parent 0df9182578
commit 9d0a2226bb
2 changed files with 27 additions and 28 deletions

View File

@@ -672,30 +672,30 @@ class WorkflowEngine:
for _loop_iter in range(max_iters - 1):
if not evaluate_condition(condition, context):
break
# Snapshot current results under namespaced
# keys for per-iteration history before they
# are overwritten by the next iteration.
# Namespace nested step IDs per iteration
# so logs and state keys are unique.
# Execute one step at a time and alias each
# result back to the unprefixed key so that
# later steps in the same body and the loop
# condition see the latest values.
for ns in result.next_steps:
orig = ns.get("id")
if orig and orig in context.steps:
ns_key = f"{step_id}:{orig}:{_loop_iter}"
context.steps[ns_key] = context.steps[orig]
state.step_results[ns_key] = context.steps[orig]
# Execute body with original step IDs so
# results land at the unprefixed keys. Both
# inter-step references within the body and
# the loop condition naturally see the latest
# values without a copy-back.
self._execute_steps(
result.next_steps, context, state, registry,
step_offset=-1,
)
if state.status in (
RunStatus.PAUSED,
RunStatus.FAILED,
RunStatus.ABORTED,
):
return
ns_copy = dict(ns)
orig = ns_copy.get("id")
if orig:
ns_copy["id"] = f"{step_id}:{orig}:{_loop_iter + 1}"
self._execute_steps(
[ns_copy], context, state, registry,
step_offset=-1,
)
if orig and ns_copy["id"] in context.steps:
context.steps[orig] = context.steps[ns_copy["id"]]
state.step_results[orig] = context.steps[ns_copy["id"]]
if state.status in (
RunStatus.PAUSED,
RunStatus.FAILED,
RunStatus.ABORTED,
):
return
# Fan-out: execute nested step template per item with unique IDs
if step_type == "fan-out":

View File

@@ -1943,9 +1943,8 @@ steps:
assert state.status == RunStatus.COMPLETED
# The unprefixed key should reflect the latest iteration's result.
assert state.step_results["attempt"]["output"]["stdout"] == "done"
# Iteration-0 history preserved under namespaced key.
assert "retry-loop:attempt:0" in state.step_results
assert state.step_results["retry-loop:attempt:0"]["output"]["stdout"] != "done"
# Namespaced iteration-1 result should also exist.
assert "retry-loop:attempt:1" in state.step_results
# Counter should be 2 (iteration 0 + iteration 1), not 5.
assert counter_file.read_text(encoding="utf-8").strip() == "2"
@@ -2041,9 +2040,9 @@ steps:
assert counter_file.read_text(encoding="utf-8").strip() == "3"
# Unprefixed key holds the last iteration's result.
assert state.step_results["tick"]["output"]["stdout"] == "pending"
# Namespaced history keys for previous iterations exist.
assert "retry-loop:tick:0" in state.step_results
# Namespaced keys for loop iterations exist.
assert "retry-loop:tick:1" in state.step_results
assert "retry-loop:tick:2" in state.step_results
def test_do_while_loop_runs_to_max_when_condition_stays_true(self, project_dir):
"""Do-while loop must still run to max_iterations when the condition