From 141119efea470d0baf82b954c708fe19da4a78ec Mon Sep 17 00:00:00 2001 From: Huy Do Date: Thu, 4 Jun 2026 23:11:39 +0700 Subject: [PATCH] feat(workflows): add JSON output for workflow run resume and status (#2814) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(workflows): add --json output to workflow run, resume, and status Adds an opt-in `--json` flag to `workflow run`, `workflow resume`, and `workflow status` that emits a single machine-readable object (run_id, workflow_id, status, current step; status also reports per-step states and a runs list) for automation and external orchestrators. JSON is written via a small `_emit_workflow_json` helper using plain stdout, so Rich markup, highlighting, and line-wrapping can never alter the emitted object. Default human-readable output and exit codes are unchanged when `--json` is omitted. Reference docs updated. Closes #2811. Co-Authored-By: Claude Opus 4.8 (1M context) * fix(workflows): keep --json stdout clean while steps write output Suppressing the banner and the step-start callback was not enough to guarantee a single parseable JSON object on stdout: individual steps still write there while the engine runs. The gate step prints its prompt, and the prompt step runs a CLI subprocess that inherits the process's stdout file descriptor — either can corrupt the JSON stream for interactive runs or integration-backed workflows. Wrap engine.execute()/engine.resume() in a file-descriptor-level redirect (dup2) when --json is set, so both Python-level writes and inherited-fd subprocess output go to stderr while stdout carries only the emitted JSON. Step progress stays visible on stderr. status does not run the engine, so it is unaffected. Tests cover both pollution channels (a Python print and a real subprocess) via fd-level capture, and the inactive no-op path. Docs note the stdout/stderr split. Co-Authored-By: Claude Opus 4.8 (1M context) * docs(workflows): fix stray escape sequence in --json redirect comments The redirect helper's docstring and its test comment wrote ``print``\s, which renders as "print\s" rather than "prints". Replace with plain "prints". Co-Authored-By: Claude Opus 4.8 (1M context) --------- Co-authored-by: Claude Opus 4.8 (1M context) --- docs/reference/workflows.md | 24 ++++++ src/specify_cli/__init__.py | 123 +++++++++++++++++++++++++++-- tests/test_workflows.py | 152 ++++++++++++++++++++++++++++++++++++ 3 files changed, 293 insertions(+), 6 deletions(-) diff --git a/docs/reference/workflows.md b/docs/reference/workflows.md index 7afcf0d55..9e0c6f2e7 100644 --- a/docs/reference/workflows.md +++ b/docs/reference/workflows.md @@ -11,6 +11,7 @@ specify workflow run | Option | Description | | ------------------- | -------------------------------------------------------- | | `-i` / `--input` | Pass input values as `key=value` (repeatable) | +| `--json` | Emit the run outcome as a single JSON object | Runs a workflow from a catalog ID, URL, or local file path. Inputs declared by the workflow can be provided via `--input` or will be prompted interactively. @@ -20,6 +21,24 @@ Example: specify workflow run speckit -i spec="Build a kanban board with drag-and-drop task management" -i scope=full ``` +With `--json`, a single machine-readable object is printed instead of formatted text (the default output is unchanged when the flag is omitted): + +```bash +specify workflow run my-pipeline.yml --json +``` + +```json +{ + "run_id": "662bf791", + "workflow_id": "build-and-review", + "status": "paused", + "current_step_id": "review", + "current_step_index": 0 +} +``` + +`workflow_id` is the `workflow.id` declared inside the YAML, not the file name. The object is printed exactly as shown — pretty-printed with two-space indentation, on plain stdout with no Rich markup — so it always parses. While the workflow runs under `--json`, any progress a step would print (for example a gate prompt, or output from a prompt step's CLI subprocess) is redirected to stderr, so stdout carries only the JSON object. Read the object from stdout; leave stderr attached to the terminal or capture it separately. + > **Note:** Most workflow commands require a project already initialized with `specify init`. The exception is `specify workflow run `, which can run outside a project; in that case, run state is stored under the current directory's `.specify/workflows/runs//`. ## Resume a Workflow @@ -31,6 +50,7 @@ specify workflow resume | Option | Description | | ------------------- | -------------------------------------------------------- | | `-i` / `--input` | Updated input values as `key=value` (repeatable) | +| `--json` | Emit the resume outcome as a single JSON object | Resumes a paused or failed workflow run from the exact step where it stopped. Useful after responding to a gate step or fixing an issue that caused a failure. @@ -46,6 +66,10 @@ specify workflow resume --input cmd="exit 0" specify workflow status [] ``` +| Option | Description | +| ------------------- | -------------------------------------------------------- | +| `--json` | Emit run status (or the runs list) as a JSON object | + Shows the status of a specific run, or lists all runs if no ID is given. Run states: `created`, `running`, `completed`, `paused`, `failed`, `aborted`. ## List Installed Workflows diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 460b24f56..18885a296 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -26,6 +26,7 @@ Or install globally: specify init --here """ +import contextlib import os import sys import zipfile @@ -2693,12 +2694,68 @@ def _parse_input_values(input_values: list[str] | None) -> dict[str, Any]: return inputs +def _workflow_run_payload(state: Any) -> dict[str, Any]: + """Machine-readable summary of a run/resume outcome.""" + return { + "run_id": state.run_id, + "workflow_id": state.workflow_id, + "status": state.status.value, + "current_step_id": state.current_step_id, + "current_step_index": state.current_step_index, + } + + +def _emit_workflow_json(payload: dict[str, Any]) -> None: + """Write a workflow payload as machine-readable JSON to stdout. + + Uses the builtin ``print`` rather than ``console.print`` so Rich + markup interpretation, syntax highlighting, and line-wrapping can + never alter the emitted JSON. + """ + print(json.dumps(payload, indent=2)) + + +@contextlib.contextmanager +def _stdout_to_stderr_when(active: bool): + """Redirect everything written to stdout onto stderr while *active*. + + Suppressing the banner and the step-start callback is not enough to + keep a ``--json`` stream clean: individual steps may still write to + stdout while the engine runs — the gate step prints its prompt, + and the prompt step runs a subprocess that inherits the process's + stdout file descriptor. Either would corrupt the single JSON object. + + Redirecting at the file-descriptor level (``dup2``) captures both + Python-level writes and inherited-fd subprocess output, so step + progress lands on stderr (still visible to a human) while stdout + carries only the emitted JSON. A no-op when *active* is false. + """ + if not active: + yield + return + sys.stdout.flush() + saved_stdout_fd = os.dup(1) + try: + os.dup2(2, 1) # fd 1 (stdout) now points at fd 2 (stderr) + with contextlib.redirect_stdout(sys.stderr): + yield + finally: + sys.stdout.flush() + os.dup2(saved_stdout_fd, 1) # restore the real stdout + os.close(saved_stdout_fd) + + @workflow_app.command("run") def workflow_run( source: str = typer.Argument(..., help="Workflow ID or YAML file path"), input_values: list[str] | None = typer.Option( None, "--input", "-i", help="Input values as key=value pairs" ), + json_output: bool = typer.Option( + False, + "--json", + help="Emit the run outcome as a single JSON object instead of formatted text.", + ), ): """Run a workflow from an installed ID or local YAML path.""" from .workflows.engine import WorkflowEngine @@ -2721,7 +2778,8 @@ def workflow_run( project_root = _require_specify_project() engine = WorkflowEngine(project_root) - engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") + if not json_output: + engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") try: definition = engine.load_workflow(source_path if is_file_source else source) @@ -2743,11 +2801,13 @@ def workflow_run( # Parse inputs inputs = _parse_input_values(input_values) - console.print(f"\n[bold cyan]Running workflow:[/bold cyan] {definition.name} ({definition.id})") - console.print(f"[dim]Version: {definition.version}[/dim]\n") + if not json_output: + console.print(f"\n[bold cyan]Running workflow:[/bold cyan] {definition.name} ({definition.id})") + console.print(f"[dim]Version: {definition.version}[/dim]\n") try: - state = engine.execute(definition, inputs) + with _stdout_to_stderr_when(json_output): + state = engine.execute(definition, inputs) except ValueError as exc: console.print(f"[red]Error:[/red] {exc}") raise typer.Exit(1) @@ -2755,6 +2815,10 @@ def workflow_run( console.print(f"[red]Workflow failed:[/red] {exc}") raise typer.Exit(1) + if json_output: + _emit_workflow_json(_workflow_run_payload(state)) + return + status_colors = { "completed": "green", "paused": "yellow", @@ -2775,18 +2839,25 @@ def workflow_resume( input_values: list[str] | None = typer.Option( None, "--input", "-i", help="Updated input values as key=value pairs" ), + json_output: bool = typer.Option( + False, + "--json", + help="Emit the resume outcome as a single JSON object instead of formatted text.", + ), ): """Resume a paused or failed workflow run.""" from .workflows.engine import WorkflowEngine project_root = _require_specify_project() engine = WorkflowEngine(project_root) - engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") + if not json_output: + engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") inputs = _parse_input_values(input_values) try: - state = engine.resume(run_id, inputs or None) + with _stdout_to_stderr_when(json_output): + state = engine.resume(run_id, inputs or None) except FileNotFoundError: console.print(f"[red]Error:[/red] Run not found: {run_id}") raise typer.Exit(1) @@ -2797,6 +2868,10 @@ def workflow_resume( console.print(f"[red]Resume failed:[/red] {exc}") raise typer.Exit(1) + if json_output: + _emit_workflow_json(_workflow_run_payload(state)) + return + status_colors = { "completed": "green", "paused": "yellow", @@ -2810,6 +2885,11 @@ def workflow_resume( @workflow_app.command("status") def workflow_status( run_id: str | None = typer.Argument(None, help="Run ID to inspect (shows all if omitted)"), + json_output: bool = typer.Option( + False, + "--json", + help="Emit run status as a single JSON object instead of formatted text.", + ), ): """Show workflow run status.""" from .workflows.engine import WorkflowEngine @@ -2825,6 +2905,21 @@ def workflow_status( console.print(f"[red]Error:[/red] Run not found: {run_id}") raise typer.Exit(1) + if json_output: + # Build on the shared run/resume payload so the common fields + # (including current_step_index) stay identical across commands. + payload = { + **_workflow_run_payload(state), + "created_at": state.created_at, + "updated_at": state.updated_at, + "steps": { + sid: sd.get("status", "unknown") + for sid, sd in state.step_results.items() + }, + } + _emit_workflow_json(payload) + return + status_colors = { "completed": "green", "paused": "yellow", @@ -2852,6 +2947,22 @@ def workflow_status( console.print(f" [{sc}]●[/{sc}] {step_id}: {s}") else: runs = engine.list_runs() + + if json_output: + payload = { + "runs": [ + { + "run_id": r["run_id"], + "workflow_id": r.get("workflow_id"), + "status": r.get("status", "unknown"), + "updated_at": r.get("updated_at"), + } + for r in runs + ] + } + _emit_workflow_json(payload) + return + if not runs: console.print("[yellow]No workflow runs found.[/yellow]") return diff --git a/tests/test_workflows.py b/tests/test_workflows.py index e85c2d729..fa203c854 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -3138,6 +3138,158 @@ steps: assert "do-specify" not in state.step_results +class TestWorkflowJsonOutput: + """Test the --json machine-readable output for run/resume/status.""" + + _WF = """ +schema_version: "1.0" +workflow: + id: "json-wf" + name: "JSON WF" + version: "1.0.0" +steps: + - id: ask + type: gate + message: "Review" + options: [approve, reject] + - id: after + type: shell + run: "echo done" +""" + + _WF_DONE = """ +schema_version: "1.0" +workflow: + id: "json-done" + name: "JSON Done" + version: "1.0.0" +steps: + - id: only + type: shell + run: "echo done" +""" + + def _write_wf(self, project_dir, text, name): + path = project_dir / f"{name}.yml" + path.write_text(text, encoding="utf-8") + return path + + def _invoke(self, project_dir, args): + from typer.testing import CliRunner + from unittest.mock import patch + from specify_cli import app + + runner = CliRunner() + with patch.object(Path, "cwd", return_value=project_dir): + return runner.invoke(app, args, catch_exceptions=False) + + def test_run_json_completed(self, project_dir): + wf = self._write_wf(project_dir, self._WF_DONE, "done") + result = self._invoke(project_dir, ["workflow", "run", str(wf), "--json"]) + assert result.exit_code == 0 + payload = json.loads(result.stdout) + assert payload["workflow_id"] == "json-done" + assert payload["status"] == "completed" + assert "run_id" in payload + + def test_run_json_paused(self, project_dir): + wf = self._write_wf(project_dir, self._WF, "gated") + result = self._invoke(project_dir, ["workflow", "run", str(wf), "--json"]) + assert result.exit_code == 0 + payload = json.loads(result.stdout) + assert payload["status"] == "paused" + assert payload["current_step_id"] == "ask" + assert payload["current_step_index"] == 0 + + def test_run_json_output_has_no_markup_or_ansi(self, project_dir): + wf = self._write_wf(project_dir, self._WF_DONE, "clean") + out = self._invoke( + project_dir, ["workflow", "run", str(wf), "--json"] + ).stdout + # Machine output must be exactly the JSON object: no Rich markup + # tags and no ANSI escape sequences leaking in. + assert "\x1b[" not in out + assert "[/" not in out + assert out.strip() == json.dumps(json.loads(out), indent=2) + + def test_run_default_output_is_human_not_json(self, project_dir): + wf = self._write_wf(project_dir, self._WF_DONE, "done2") + result = self._invoke(project_dir, ["workflow", "run", str(wf)]) + assert result.exit_code == 0 + assert "Running workflow" in result.stdout + with pytest.raises(json.JSONDecodeError): + json.loads(result.stdout) + + def test_status_json_single_and_list(self, project_dir): + wf = self._write_wf(project_dir, self._WF, "gated2") + run = json.loads( + self._invoke(project_dir, ["workflow", "run", str(wf), "--json"]).stdout + ) + rid = run["run_id"] + + single = json.loads( + self._invoke(project_dir, ["workflow", "status", rid, "--json"]).stdout + ) + assert single["run_id"] == rid + assert single["status"] == "paused" + assert single["steps"]["ask"] == "paused" + # status --json carries the same step-position fields as run/resume + # so automation never has to branch on which command produced it. + assert single["current_step_id"] == run["current_step_id"] + assert single["current_step_index"] == run["current_step_index"] + + listing = json.loads( + self._invoke(project_dir, ["workflow", "status", "--json"]).stdout + ) + assert any(r["run_id"] == rid for r in listing["runs"]) + + def test_resume_json(self, project_dir): + wf = self._write_wf(project_dir, self._WF, "gated3") + rid = json.loads( + self._invoke(project_dir, ["workflow", "run", str(wf), "--json"]).stdout + )["run_id"] + # Non-interactive resume re-runs the gate, which pauses again. + resumed = json.loads( + self._invoke(project_dir, ["workflow", "resume", rid, "--json"]).stdout + ) + assert resumed["run_id"] == rid + assert resumed["status"] == "paused" + + def test_json_redirect_keeps_stdout_clean(self, capfd): + # While a workflow runs under --json, steps can still write to stdout: + # the gate step prints its prompt and the prompt step runs a + # subprocess that inherits the stdout fd. Both must be redirected to + # stderr so the JSON object on stdout stays parseable. capfd captures + # at the file-descriptor level, so it sees the subprocess output too. + import subprocess + import sys as _sys + from specify_cli import _stdout_to_stderr_when + + print("STDOUT_BEFORE") + with _stdout_to_stderr_when(True): + print("PY_LEAK") # Python-level write (gate-style) + subprocess.run( # inherited-fd write (prompt-style) + [_sys.executable, "-c", "print('SUBPROC_LEAK')"], + check=True, + ) + print("STDOUT_AFTER") + + out, err = capfd.readouterr() + # stdout keeps only what was written outside the guarded block. + assert "STDOUT_BEFORE" in out and "STDOUT_AFTER" in out + assert "PY_LEAK" not in out and "SUBPROC_LEAK" not in out + # The step output is preserved on stderr, not discarded. + assert "PY_LEAK" in err and "SUBPROC_LEAK" in err + + def test_json_redirect_inactive_is_noop(self, capfd): + from specify_cli import _stdout_to_stderr_when + + with _stdout_to_stderr_when(False): + print("VISIBLE_ON_STDOUT") + out, _ = capfd.readouterr() + assert "VISIBLE_ON_STDOUT" in out + + class TestResumeWithInputs: """Test that `workflow resume` can accept updated workflow inputs."""