mirror of
https://github.com/larksuite/cli.git
synced 2026-07-03 14:02:43 +08:00
* feat(event): add event subscription & consume system with orphan bus detection
Introduces end-to-end Feishu event consumption via a new `lark-cli event`
command family. Users can subscribe to and consume real-time events
(IM messages, chat/member lifecycle, reactions, ...) in a forked bus
daemon architecture with orphan detection, reflected + overrideable JSON
schemas, and AI-friendly `--json` / `--jq` output.
Commands
--------
- `event list [--json]` list subscribable EventKeys
- `event schema <key>` Parameters + Output Schema + auth info
- `event consume <key>` foreground blocking consume; SIGINT/SIGTERM
/stdin-EOF shutdown; `--max-events` /
`--timeout` bounded; `--jq` projection;
`--output-dir` spool; `--param` KV inputs
- `event status [--fail-on-orphan] [--json]` bus daemon health
- `event stop [--all] [--force] [--json]` stop bus daemon(s)
- `event _bus` (hidden) forked daemon entrypoint
Architecture
------------
- Bus daemon (internal/event/bus): per-AppID forked process that holds
the Feishu long-poll connection and fans events out to 1..N local
consumers over an IPC socket. Drop-oldest backpressure, TOCTOU-safe
cleanup via AcquireCleanupLock, idle-timeout self-shutdown, graceful
SIGTERM.
- Consume client (internal/event/consume): fork+dial the daemon,
handshake, remote preflight (HTTP /open-apis/event/v1/connection),
JQ projection, sequence-gap detection, health probe. Bounded
execution (`--max-events` / `--timeout`) for AI/script usage.
- Wire protocol (internal/event/protocol): newline-delimited JSON
frames with 1 MB size cap and 5 s write deadlines. Hello / HelloAck /
PreShutdownCheck / Shutdown / StatusQuery control messages.
- Orphan detection (internal/event/busdiscover): OS process-table scan
(ps on Unix, PowerShell on Windows) with two-gate cmdline filter
(lark-cli + event _bus) that naturally rejects pid-reused unrelated
processes.
- Transport (internal/event/transport): Unix socket on darwin/linux,
Windows named pipe on windows.
- Schema system (internal/event, internal/event/schemas): SchemaDef with
mutually-exclusive Native (framework wraps V2 envelope) or Custom
(zero-touch) specs. Reflection reads `desc` / `enum` / `kind` struct
tags, with array elements diving into `items`. FieldOverrides overlay
engine addresses paths via JSON Pointer (including `/*` array
wildcard) and runs post-reflect, post-envelope. Lint guards orphan
override paths.
- IM events (events/im): 11 keys — receive / read / recalled, chat and
member lifecycle, reactions — all with per-field open_id / union_id /
user_id / chat_id / message_id / timestamp_ms format annotations.
Robustness
----------
- Bus idle-timer race fix: re-check live conn count under lock before
honoring the tick; Stop+drain before Reset per timer contract.
- Protocol frame cap: replace `br.ReadBytes('\n')` with `ReadFrame` that
rejects frames > MaxFrameBytes (1 MB). Closes a DoS path where any
local peer could grow the reader's buffer unbounded.
- Control-message writes gated by WriteTimeout (5 s) so a wedged peer
kernel buffer can't stall writers indefinitely.
- Consume signal goroutine: `signal.Stop` + `ctx.Done` select, no leak
across repeated invocations in the same process.
- JQ pre-flight compile so bad expressions fail before the bus fork and
any server-side PreConsume side effects.
- `f.NewAPIClient`'s `*core.ConfigError` now passes through unwrapped
so the actionable "run lark-cli config init" hint reaches the user.
Subprocess / AI contract
------------------------
- `event consume` emits `[event] ready event_key=<key>` on stderr once
the bus handshake completes and events will flow. Parent processes
block-read stderr until this line before reading stdout — no `sleep`
fallback needed.
- All list-like commands have `--json` for structured consumption.
- Skill docs in `skills/lark-event/` (SKILL.md + references/) brief AI
agents on the command surface, JQ against Output Schema, bounded
execution, and subprocess lifecycle.
Testing
-------
Unit tests across bus/hub, consume loop, protocol codec, dedup,
registry, transport (Unix + Windows), schema reflection, field
overrides, pointer resolver. Integration tests cover fork startup,
shutdown, orphan detection, probe, stdin EOF, preflight, bounded
execution, and Windows busdiscover PowerShell compatibility.
Change-Id: Ib69d6d8409b33b99790081e273d4b5b01b7dbf80
* fix(event): address CodeRabbit findings + lift patch coverage above 60%
CodeRabbit comments (PR #654)
-----------------------------
1. bus/dedup: IsDuplicate dropped legitimate (post-TTL) events after
cleanupExpired fired. The run-every-1000-inserts cleanup removed
TTL-expired IDs from the `seen` map but left them in the ring;
IsDuplicate's ring-scan fallback then rediscovered them and falsely
reported "duplicate", and bus.Publish silently dropped the event.
Removed the ring-scan branch — `seen` is the sole authority, the ring
only bounds map size via overflow eviction. New regression test
TestDedupFilter_TTLExpiryAfterCleanupRunRespected exercises the 10-
insert + cleanup path and guards the fix.
2. consume/remote_preflight: the decoder only read `data.online_instance_
cnt`. A non-zero business code with no data payload decoded to 0 and
callers treated it as "verified zero", forking a local bus that would
duplicate events. Added Code / Msg fields and promoted code != 0 into
an error so the caller distinguishes verified-zero from check-failed.
3. cmd/event/stop: swapped os.ReadDir / os.Stat to vfs.ReadDir / vfs.Stat
in discoverAppIDs per project guideline (enables test mocking). New
TestDiscoverAppIDs_* lifts discoverAppIDs from 0% to 100%.
4. cmd/event/appmeta_err: narrowed authURLPattern from
feishu.cn|feishu.net|larksuite.com|larkoffice.com to the two hosts
consoleScopeGrantURL actually produces. Kept the allowlist pinned to
ResolveEndpoints' output with a comment flagging the synchrony.
5. cmd/event/list: moved "No EventKeys registered." and "Use 'event
schema <key>' for details." hints to stderr so `event list | jq`
style pipelines don't ingest them as data.
6. cmd/event/schema: runSchema is a RunE entry point; swapped the bare
fmt.Errorf on resolveSchemaJSON failure to output.Errorf so AI
agents parse a structured error envelope.
Coverage bumps (patch ~50% -> ~60%)
-----------------------------------
internal/event/consume/loop_test.go: loop.go was 0% at patch time.
New tests cover consumeLoop end-to-end via net.Pipe (events -> sink,
max-events -> ctx.Done -> PreShutdownCheck/Ack), seq-gap warning,
jq filtering + early compile failure, isTerminalSinkError classifier.
Takes consumeLoop from 0% to ~74%.
internal/event/protocol/messages_test.go: all NewXxx constructors,
Encode/Decode roundtrip per message type, EncodeWithDeadline deadline
enforcement, ReadFrame MaxFrameBytes rejection + EOF propagation.
Takes protocol from 28% to ~86%.
Also bundles small UX polish:
- cmd/event/consume: --output-dir flag doc flags path-traversal behavior;
jq-validation failures now re-wrap with an event-specific hint
pointing at `event schema` for payload shape.
- internal/event/consume.validateParams: error now names the EventKey
and lists valid param names inline so AI callers recover without a
second `event schema` round-trip.
- skills/lark-event: description expanded to mention
listener/subscribe/consume synonyms + the IM scope set explicitly;
lark-event-im reference polished; obsolete lark-event-subscribe
reference removed.
Verified with go test -race -timeout 120s across ./cmd/event/...,
./events/..., ./internal/event/...; gofmt clean; go vet clean.
Change-Id: I3837b8645ea1d7529c9a8fd4c2bbfa965ae1b519
* test(event): cover format helpers + cobra factories
Adds cmd/event/format_helpers_test.go covering the pure output helpers
and factory wire-ups that RunE-level tests would need a live bus to
exercise:
- writeStopJSON: shape assertions + nil → [] (scripts expecting
.results | length must not see null).
- writeStopText: stdout vs stderr routing — stopped / no-bus lines to
stdout, refused / errored lines to stderr.
- busState.String: all three discriminator values.
- humanizeDuration: each bucket boundary (seconds / minutes / hours / days).
- writeStatusText: covers stateNotRunning / stateRunning (with consumer
table) / stateOrphan (with kill hint).
- writeStatusJSON: orphan entry carries suggested_action + issue;
running entry must NOT carry those fields (hint-leak guard for
scripts that key on issue != "").
- exitForOrphan: flag-off never errors; flag-on errors iff any orphan
is present, with ExitValidation code.
- NewCmdConsume / NewCmdStatus / NewCmdStop / NewCmdList / NewCmdBus:
flag registration + RunE presence, so review catches flag-name drift.
NewCmdBus check also pins Hidden=true.
Lifts cmd/event coverage 51.7% → 61.1%; aggregate event-package
coverage crosses the 60% codecov patch threshold (62% locally).
Change-Id: I9ecf3d905a8f9607b9441ee8a61e746496e2be63
* fix(event): address lint + deadcode CI failures
4 golangci-lint findings + 1 deadcode finding flagged on PR #654.
lint
----
1. cmd/event/stop.go:86 (ineffassign): `targets := []string{}` is
overwritten by both branches of the `if o.all` below, so the empty-
slice initializer is dead. Switched to `var targets []string`.
2. cmd/event/consume.go nilerr: the user-identity scope preflight
swallows a non-nil ResolveToken error and returns nil. This is
intentional — a missing/expired user token must not block consume;
the bus handshake will surface the real auth error with actionable
hints. Added `//nolint:nilerr` with a 4-line comment pinning the
reasoning.
3. events/im/message_receive.go:62 nilerr: malformed JSON payload
returns the original bytes + nil so consumers still see the event
(the WARN breadcrumb lives in the outer loop). Added
`//nolint:nilerr` with a one-line comment.
4. internal/event/schemas/fromtype_test.go:26 unused: `unexportedStr`
is a reflection-test fixture — its presence (not value) exercises
the FromType skip-unexported path verified at the "unexported
field should not be in schema" assertion. Added `//nolint:unused`
and a 4-line comment pointing at the guarded assertion.
deadcode
--------
5. internal/event/testutil/testutil.go: NewTCPFake has no callers in
the repo. Removed the constructor plus the `inner == nil` TCP-mode
branches from Listen / Dial / Cleanup. FakeTransport now only
supports the wrapped-overlay mode (NewWrappedFake), which is the
one every existing test uses. Doc comment simplified accordingly.
Verified locally: go test -race -timeout 120s across ./cmd/event/...,
./events/..., ./internal/event/... all green; gofmt clean; go vet
clean.
Change-Id: Ie8a2270827a0bde6b8159ab70aaf5c1e9ca7d5b9
* fix(event): drop stale enum + simplify protocol test type helper
- events/im/message_receive.go: dropped the `enum` tag on
ImMessageReceiveOutput.MessageType. convertlib registers many more
message types than the old 11-item list (video / location /
calendar / todo / vote / hongbao / merge_forward / folder / ...),
so a partial enum would tell AI consumers that valid values like
"video" are invalid and produce false-negative JQ filters.
- internal/event/protocol/messages_test.go: collapsed the
typeOf → reflectTypeName → stringType chain in
TestEncode_DecodeRoundtripAllTypes to a single fmt.Sprintf("%T", v).
The hand-maintained type switch silently returned "<unknown>" for
any new message type, which would have let future Decode bugs slip
past the roundtrip assertion. Also removed a dead `cases` table at
the top of TestConstructors_PinTypeField left over from an earlier
refactor.
Change-Id: I831e96f8417e80637596030d652a559de0d33122
* docs(event): polish skill docs + rename root_path_hint to jq_root_path
- skills/lark-event/SKILL.md, lark-event-im.md: translated to English,
reorganized around a top-level "Core commands" table, scenario
recipes tightened.
- cmd/event/schema.go: renamed the writeSchemaJSON hint field
RootPathHint / "root_path_hint" -> JQRootPath / "jq_root_path" to
make its purpose (a jq path prefix) obvious at the call site; no
external consumer depends on the old name yet.
Change-Id: I00c14061ca33caedc0975bfeadc4b26d3dcd314d
* chore(event): strip excessive comments
Change-Id: I8f44f36f5dbdba3ef95dfc67069dc796232f91ec
* fix(event): dedup self-eviction race + protocol oversized-frame test
dedup: in IsDuplicate, the ring-slot eviction step deleted seen[id] even
when ring[pos] equalled the freshly-recorded id (post-TTL reinsertion
landing on its own historical slot). Net result: ring still held id but
seen did not, so the next IsDuplicate(id) returned false and the
duplicate was delivered. Skip the delete when old == eventID. New
TestDedupFilter_SelfEvictionPreservesFreshEntry pins the invariant by
pre-loading the ring slot and asserting the second call still reports
duplicate.
protocol: TestReadFrame_RejectsOversized used strings.Contains feeding
t.Logf, so any non-nil error passed — including a future regression
that returned io.ErrUnexpectedEOF while silently keeping the buffer
unbounded. Promoted MaxFrameBytes overflow to a sentinel
ErrFrameTooLarge and the test now asserts via errors.Is.
Change-Id: I50281dad392152b0ca083fd30c38eb0695e63bd3
* docs(event): clarify .content shape per message_type + add sender filter recipe
Change-Id: I619fd15c1a362e42e6602fd3e3316bbc75eddc5e
* fix(event): replace cmdline-regex bus discovery with PID file + close concurrent fork race
Bus discovery previously walked the OS process table and parsed `--profile cli_*` from
cmdline; the regex rejected any non-cli_ profile name (D-03a). Replace with per-AppID
bus.pid + bus.alive.lock under events/<AppID>/, probed via try-lock. AppID round-trips
through the directory name, so the profile-vs-AppID confusion is gone by construction.
Also fix B-07 (two consumers each fork an independent bus, halving event delivery):
- forkBus holds bus.fork.lock until child is dial-able, not just until cmd.Start
- bus daemon takes alive.lock before binding the socket; cleanup-TOCTOU race can no
longer leave two listeners on different inodes
status.go renders an orphan with PID=0 distinctly (live bus but pid file unreadable)
so we never print "Action: kill 0".
Change-Id: I3bf0a6cf1d91fb274ac5a6df83d66896aafb291f
* style(event): gofmt bus.go
Trailing blank line introduced when appending acquireAliveLock helper.
Change-Id: I4ae1b4a4363dc6c89dcbd6a170f4563117490ba3
* fix(event): swap os.Remove/Rename for vfs.* and silence forbidigo on internal diagnostics
golangci-lint forbidigo blocks os.* in internal/. Switch the pid-file write to vfs.Remove/vfs.Rename and add a nolint marker on the two stderr diagnostics in busdiscover, matching the existing pattern in consume/*.
Change-Id: Ia6768be62aefeb8ca40f991d3130a78ef2ec0ea5
* fix(event): cross-platform --all + clean SIGPIPE shutdown for consume
- stop --all: replace bus.sock-file probe with busdiscover lock-based
scan; previously skipped Windows entirely (named-pipe transport, no
socket on disk) and misidentified Unix stale sockets as live. Same
win for `event status` (shares discoverAppIDs).
- consume: ignore SIGPIPE so a closed stdout pipe (e.g. `... | head -n 1`)
surfaces as EPIPE error and reaches the existing isTerminalSinkError
cleanup path (log "output pipe closed", lastForKey query, hub
unregister), instead of being killed by Go's default fd 1/2 SIGPIPE
handler with exit 141 and zero deferred cleanup.
Build-tagged: real on unix, no-op on windows (no SIGPIPE there).
Change-Id: I453b19f05c489fd9d5c1a9ba3bdc35e127c15b83
* docs(event): translate IM EventKey descriptions and field tags to English
Aligns with the rest of the codebase (titles, struct names, README) which
are already in English. Surfaces in `event list` / `event schema` and is
also consumed by AI agents.
- events/im/message_receive.go: 11 desc tags on ImMessageReceiveOutput
- events/im/native.go: 10 description fields on Native EventKeys
- events/im/register.go: im.message.receive_v1 Description
Change-Id: I6f46950b4793f137e0129c1f06019a3419195443
* docs(event): drop misleading AuthTypes[0] auto-default claim
The KeyDefinition comment and SKILL.md flag table both stated that
`--as auto` resolves to `AuthTypes[0]`. It does not — ResolveAs goes
through global rules (config default_as / credential hint / `bot`
fallback) without consulting the EventKey. AuthTypes is only used by
CheckIdentity as a post-resolve whitelist.
Reword the field comment to plain whitelist semantics and have SKILL.md
defer `--as` documentation to lark-shared.
Change-Id: Ia5d3d3790aed05813a0fa72d6b43518224e2055b
* revert(comments): restore original comments on 3rd-party files
e61482a stripped comments across 105 files. Restore the four files
authored by others (cmd/build.go, shortcuts/common/{types,runner}.go,
shortcuts/event/subscribe.go) to their pre-strip state so unrelated
documentation isn't churned in this PR.
Change-Id: Ie2527b06bfaf5b3861b0b9dff1e19bbfe7dde456
139 lines
4.2 KiB
Go
139 lines
4.2 KiB
Go
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package appmeta
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/larksuite/cli/internal/event/testutil"
|
|
)
|
|
|
|
const respFourVersions = `{
|
|
"code": 0,
|
|
"data": {
|
|
"has_more": false,
|
|
"items": [
|
|
{"version_id": "oav_draft", "version": "1.0.3", "status": 4, "publish_time": null,
|
|
"event_infos": [{"event_type": "im.message.receive_v1"}, {"event_type": "mail.user_mailbox.event.message_received_v1"}],
|
|
"scopes": [{"scope": "draft:only", "token_types": ["tenant"]}]
|
|
},
|
|
{"version_id": "oav_latest", "version": "1.0.2", "status": 1, "publish_time": "1776684746",
|
|
"event_infos": [
|
|
{"event_type": "im.message.receive_v1"},
|
|
{"event_type": "im.message.message_read_v1"}
|
|
],
|
|
"scopes": [
|
|
{"scope": "im:message", "token_types": ["tenant", "user"]},
|
|
{"scope": "im:message.group_at_msg", "token_types": ["tenant"]},
|
|
{"scope": "contact:user:readonly", "token_types": ["user"]}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
}`
|
|
|
|
func TestFetchCurrentPublished_SelectsLatestPublished(t *testing.T) {
|
|
c := &testutil.StubAPIClient{Body: respFourVersions}
|
|
|
|
v, err := FetchCurrentPublished(context.Background(), c, "cli_test")
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if v == nil {
|
|
t.Fatal("expected a version, got nil")
|
|
}
|
|
if v.VersionID != "oav_latest" {
|
|
t.Errorf("VersionID = %q, want oav_latest", v.VersionID)
|
|
}
|
|
if v.Version != "1.0.2" {
|
|
t.Errorf("Version = %q, want 1.0.2", v.Version)
|
|
}
|
|
|
|
wantEvents := map[string]bool{"im.message.receive_v1": true, "im.message.message_read_v1": true}
|
|
if len(v.EventTypes) != len(wantEvents) {
|
|
t.Fatalf("EventTypes = %v, want %v", v.EventTypes, wantEvents)
|
|
}
|
|
for _, e := range v.EventTypes {
|
|
if !wantEvents[e] {
|
|
t.Errorf("unexpected event type %q in %v", e, v.EventTypes)
|
|
}
|
|
}
|
|
|
|
wantTenant := map[string]bool{"im:message": true, "im:message.group_at_msg": true}
|
|
if len(v.TenantScopes) != len(wantTenant) {
|
|
t.Fatalf("TenantScopes = %v, want %v", v.TenantScopes, wantTenant)
|
|
}
|
|
for _, s := range v.TenantScopes {
|
|
if !wantTenant[s] {
|
|
t.Errorf("unexpected tenant scope %q in %v", s, v.TenantScopes)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFetchCurrentPublished_PathContainsQuery(t *testing.T) {
|
|
c := &testutil.StubAPIClient{Body: respFourVersions}
|
|
_, _ = FetchCurrentPublished(context.Background(), c, "cli_x")
|
|
for _, want := range []string{
|
|
"/open-apis/application/v6/applications/cli_x/app_versions",
|
|
"lang=zh_cn",
|
|
"page_size=2",
|
|
} {
|
|
if !strings.Contains(c.GotPath, want) {
|
|
t.Errorf("path %q missing %q", c.GotPath, want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFetchCurrentPublished_NoPublishedYet(t *testing.T) {
|
|
c := &testutil.StubAPIClient{Body: `{"code":0,"data":{"items":[
|
|
{"version_id":"oav_draft","status":4,"publish_time":null,"event_infos":[],"scopes":[]}
|
|
]}}`}
|
|
v, err := FetchCurrentPublished(context.Background(), c, "cli_x")
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if v != nil {
|
|
t.Errorf("want nil (app never published), got %+v", v)
|
|
}
|
|
}
|
|
|
|
func TestFetchCurrentPublished_EmptyItems(t *testing.T) {
|
|
c := &testutil.StubAPIClient{Body: `{"code":0,"data":{"items":[]}}`}
|
|
v, err := FetchCurrentPublished(context.Background(), c, "cli_x")
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if v != nil {
|
|
t.Errorf("want nil for empty items, got %+v", v)
|
|
}
|
|
}
|
|
|
|
func TestFetchCurrentPublished_APIErrorPropagated(t *testing.T) {
|
|
want := errors.New("insufficient permission level")
|
|
c := &testutil.StubAPIClient{Err: want}
|
|
v, err := FetchCurrentPublished(context.Background(), c, "cli_x")
|
|
if !errors.Is(err, want) {
|
|
t.Errorf("err = %v, want wrapping %v", err, want)
|
|
}
|
|
if v != nil {
|
|
t.Errorf("want nil version on error, got %+v", v)
|
|
}
|
|
}
|
|
|
|
func TestFetchCurrentPublished_PublishTimeEmptyStringTreatedAsUnpublished(t *testing.T) {
|
|
c := &testutil.StubAPIClient{Body: `{"code":0,"data":{"items":[
|
|
{"version_id":"oav_x","status":1,"publish_time":"","event_infos":[],"scopes":[]}
|
|
]}}`}
|
|
v, err := FetchCurrentPublished(context.Background(), c, "cli_x")
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if v != nil {
|
|
t.Errorf("want nil (empty publish_time), got %+v", v)
|
|
}
|
|
}
|