diff --git a/cmd/event/consume.go b/cmd/event/consume.go index 9fd4d234..0f8134b6 100644 --- a/cmd/event/consume.go +++ b/cmd/event/consume.go @@ -64,8 +64,8 @@ Use 'event schema ' for parameter details.`, cmd.Flags().StringVar(&o.jqExpr, "jq", "", "JQ expression to filter output") cmd.Flags().BoolVar(&o.quiet, "quiet", false, "Suppress informational messages on stderr") cmd.Flags().StringVar(&o.outputDir, "output-dir", "", "Write each event as a file in this directory (relative paths only; absolute paths and ~ are rejected to prevent path traversal)") - cmd.Flags().IntVar(&o.maxEvents, "max-events", 0, "Exit after N successful emits (0 = unlimited). Multi-worker EventKeys may emit up to workers-1 past N before all workers stop.") - cmd.Flags().DurationVar(&o.timeout, "timeout", 0, "Exit after DURATION (e.g. 30s, 2m). 0 = no timeout. Timeout is a normal exit (code 0; stderr 'reason: timeout').") + cmd.Flags().IntVar(&o.maxEvents, "max-events", 0, "Exit after N successful emits (0 = unlimited). Multi-worker EventKeys may emit up to workers-1 past N before all workers stop. Bounded runs ignore stdin EOF.") + cmd.Flags().DurationVar(&o.timeout, "timeout", 0, "Exit after DURATION (e.g. 30s, 2m). 0 = no timeout. Timeout is a normal exit (code 0; stderr 'reason: timeout'). Bounded runs ignore stdin EOF.") cmd.Flags().String("as", "auto", "identity type: user | bot | auto (must match EventKey's declared AuthTypes)") _ = cmd.RegisterFlagCompletionFunc("as", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { return []string{"user", "bot", "auto"}, cobra.ShellCompDirectiveNoFileComp @@ -184,8 +184,9 @@ func runConsume(cmd *cobra.Command, f *cmdutil.Factory, eventKey string, o consu errOut = io.Discard } - // Non-TTY only: stdin EOF is shutdown for subprocess callers; in TTY Ctrl-D must not exit. - if !f.IOStreams.IsTerminal { + // Non-TTY unbounded consumers use stdin EOF as shutdown for subprocess callers. + // Bounded runs already have --max-events/--timeout as their lifecycle control. + if shouldWatchStdinEOF(f.IOStreams.IsTerminal, o.maxEvents, o.timeout) { watchStdinEOF(os.Stdin, cancel, errOut) } @@ -370,3 +371,8 @@ func watchStdinEOF(r io.Reader, cancel context.CancelFunc, errOut io.Writer) { cancel() }() } + +// shouldWatchStdinEOF gates the stdin-EOF shutdown watcher: non-TTY unbounded runs only (<= 0 mirrors downstream's >0-is-bounded semantics, so negative bounds stay unbounded). +func shouldWatchStdinEOF(isTerminal bool, maxEvents int, timeout time.Duration) bool { + return !isTerminal && maxEvents <= 0 && timeout <= 0 +} diff --git a/cmd/event/consume_stdin_test.go b/cmd/event/consume_stdin_test.go index c4391842..67e50fb6 100644 --- a/cmd/event/consume_stdin_test.go +++ b/cmd/event/consume_stdin_test.go @@ -61,3 +61,70 @@ func TestWatchStdinEOF_DiagnosticMessage(t *testing.T) { t.Fatal("watchStdinEOF did not cancel within 1s of EOF") } } + +func TestShouldWatchStdinEOF(t *testing.T) { + tests := []struct { + name string + isTerminal bool + maxEvents int + timeout time.Duration + want bool + }{ + { + name: "terminal", + isTerminal: true, + want: false, + }, + { + name: "non terminal unbounded", + want: true, + }, + { + name: "non terminal negative max events is unbounded", + maxEvents: -1, + want: true, + }, + { + name: "non terminal negative timeout is unbounded", + timeout: -1 * time.Second, + want: true, + }, + { + name: "non terminal max events bounded", + maxEvents: 1, + want: false, + }, + { + name: "non terminal timeout bounded", + timeout: 10 * time.Minute, + want: false, + }, + { + name: "non terminal both bounds positive", + maxEvents: 1, + timeout: 10 * time.Minute, + want: false, + }, + { + name: "non terminal bounded max events with negative timeout", + maxEvents: 1, + timeout: -1 * time.Second, + want: false, + }, + { + name: "non terminal bounded timeout with negative max events", + maxEvents: -1, + timeout: 10 * time.Minute, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := shouldWatchStdinEOF(tt.isTerminal, tt.maxEvents, tt.timeout) + if got != tt.want { + t.Fatalf("shouldWatchStdinEOF() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/event/consume/consume.go b/internal/event/consume/consume.go index 6c862705..c88bff86 100644 --- a/internal/event/consume/consume.go +++ b/internal/event/consume/consume.go @@ -130,7 +130,7 @@ func Run(ctx context.Context, tr transport.IPC, appID, profileName, domain strin if !opts.Quiet { fmt.Fprintln(errOut, listeningText(opts)) if !opts.IsTTY { - fmt.Fprintln(errOut, stopHintText()) + fmt.Fprintln(errOut, stopHintText(opts)) } } @@ -213,7 +213,11 @@ func exitReason(ctx context.Context, emitted int64, opts Options) string { return "signal" } -func stopHintText() string { +func stopHintText(opts Options) string { + if opts.MaxEvents > 0 || opts.Timeout > 0 { + return "[event] to stop gracefully: send SIGTERM (kill ). " + + "Avoid kill -9 — it skips cleanup and may leak server-side subscriptions." + } return "[event] to stop gracefully: send SIGTERM (kill ) or close stdin. " + "Avoid kill -9 — it skips cleanup and may leak server-side subscriptions." } diff --git a/internal/event/consume/listening_text_test.go b/internal/event/consume/listening_text_test.go index ed45f366..b3e3ec06 100644 --- a/internal/event/consume/listening_text_test.go +++ b/internal/event/consume/listening_text_test.go @@ -50,12 +50,32 @@ func TestListeningText_NonTTY_MaxEventsAndTimeout(t *testing.T) { } // AI-facing contract: must name "kill -9" + "cleanup" so agents parsing stderr are steered away from SIGKILL. -func TestStopHintText_Content(t *testing.T) { - got := stopHintText() - mustContain := []string{"SIGTERM", "kill -9", "cleanup"} +func TestStopHintText_Unbounded(t *testing.T) { + got := stopHintText(Options{}) + mustContain := []string{"SIGTERM", "kill -9", "cleanup", "close stdin"} for _, s := range mustContain { if !bytes.Contains([]byte(got), []byte(s)) { - t.Errorf("stopHintText missing %q; got %q", s, got) + t.Errorf("stopHintText(unbounded) missing %q; got %q", s, got) + } + } +} + +// AI-facing contract: must name "kill -9" + "cleanup" so agents parsing stderr are steered away from SIGKILL. +func TestStopHintText_Bounded(t *testing.T) { + cases := []Options{ + {MaxEvents: 1}, + {Timeout: 30 * time.Second}, + } + for _, opts := range cases { + got := stopHintText(opts) + mustContain := []string{"SIGTERM", "kill -9", "cleanup"} + for _, s := range mustContain { + if !bytes.Contains([]byte(got), []byte(s)) { + t.Errorf("stopHintText(bounded) missing %q; got %q", s, got) + } + } + if bytes.Contains([]byte(got), []byte("close stdin")) { + t.Errorf("stopHintText(bounded) must not contain \"close stdin\"; got %q", got) } } } diff --git a/skills/lark-event/SKILL.md b/skills/lark-event/SKILL.md index 1d64e595..75aab35a 100644 --- a/skills/lark-event/SKILL.md +++ b/skills/lark-event/SKILL.md @@ -69,7 +69,7 @@ wait ### stdin EOF = graceful exit -`event consume` treats stdin close as a shutdown signal (wired for AI subprocess callers). `< /dev/null` / `nohup` / systemd's default `StandardInput=null` will cause an immediate graceful exit (stderr `reason: signal`). To keep running: +`event consume` treats stdin close as a shutdown signal (wired for AI subprocess callers). **Bounded runs are exempt: when `--max-events` or `--timeout` is set (> 0), stdin EOF is ignored and the run exits only via its own bound, timeout, or SIGTERM.** For unbounded runs, `< /dev/null` / `nohup` / systemd's default `StandardInput=null` will cause an immediate graceful exit (stderr `reason: signal`). To keep an unbounded run alive: - Feed stdin a source that never EOFs: `< <(tail -f /dev/null)` - Or run bounded: `--max-events N` / `--timeout D` @@ -82,7 +82,7 @@ On exit, the last stderr line is `[event] exited — received N event(s) in Xs ( |---|---|---| | 0 | `reason: limit` | `--max-events` reached | | 0 | `reason: timeout` | `--timeout` reached | -| 0 | `reason: signal` | Ctrl+C / SIGTERM / stdin EOF | +| 0 | `reason: signal` | Ctrl+C / SIGTERM / stdin EOF (stdin EOF applies to unbounded runs only) | | non-0 | `Error: ...` (no `exited` line) | Startup / runtime failure (permissions, network, params, config) | Orchestrators should treat `reason: limit/timeout/signal` (all exit 0) as "business completion" and non-zero as "failure".