mirror of
https://github.com/larksuite/cli.git
synced 2026-07-05 07:31:22 +08:00
* feat(event): add event subscription & consume system with orphan bus detection
Introduces end-to-end Feishu event consumption via a new `lark-cli event`
command family. Users can subscribe to and consume real-time events
(IM messages, chat/member lifecycle, reactions, ...) in a forked bus
daemon architecture with orphan detection, reflected + overrideable JSON
schemas, and AI-friendly `--json` / `--jq` output.
Commands
--------
- `event list [--json]` list subscribable EventKeys
- `event schema <key>` Parameters + Output Schema + auth info
- `event consume <key>` foreground blocking consume; SIGINT/SIGTERM
/stdin-EOF shutdown; `--max-events` /
`--timeout` bounded; `--jq` projection;
`--output-dir` spool; `--param` KV inputs
- `event status [--fail-on-orphan] [--json]` bus daemon health
- `event stop [--all] [--force] [--json]` stop bus daemon(s)
- `event _bus` (hidden) forked daemon entrypoint
Architecture
------------
- Bus daemon (internal/event/bus): per-AppID forked process that holds
the Feishu long-poll connection and fans events out to 1..N local
consumers over an IPC socket. Drop-oldest backpressure, TOCTOU-safe
cleanup via AcquireCleanupLock, idle-timeout self-shutdown, graceful
SIGTERM.
- Consume client (internal/event/consume): fork+dial the daemon,
handshake, remote preflight (HTTP /open-apis/event/v1/connection),
JQ projection, sequence-gap detection, health probe. Bounded
execution (`--max-events` / `--timeout`) for AI/script usage.
- Wire protocol (internal/event/protocol): newline-delimited JSON
frames with 1 MB size cap and 5 s write deadlines. Hello / HelloAck /
PreShutdownCheck / Shutdown / StatusQuery control messages.
- Orphan detection (internal/event/busdiscover): OS process-table scan
(ps on Unix, PowerShell on Windows) with two-gate cmdline filter
(lark-cli + event _bus) that naturally rejects pid-reused unrelated
processes.
- Transport (internal/event/transport): Unix socket on darwin/linux,
Windows named pipe on windows.
- Schema system (internal/event, internal/event/schemas): SchemaDef with
mutually-exclusive Native (framework wraps V2 envelope) or Custom
(zero-touch) specs. Reflection reads `desc` / `enum` / `kind` struct
tags, with array elements diving into `items`. FieldOverrides overlay
engine addresses paths via JSON Pointer (including `/*` array
wildcard) and runs post-reflect, post-envelope. Lint guards orphan
override paths.
- IM events (events/im): 11 keys — receive / read / recalled, chat and
member lifecycle, reactions — all with per-field open_id / union_id /
user_id / chat_id / message_id / timestamp_ms format annotations.
Robustness
----------
- Bus idle-timer race fix: re-check live conn count under lock before
honoring the tick; Stop+drain before Reset per timer contract.
- Protocol frame cap: replace `br.ReadBytes('\n')` with `ReadFrame` that
rejects frames > MaxFrameBytes (1 MB). Closes a DoS path where any
local peer could grow the reader's buffer unbounded.
- Control-message writes gated by WriteTimeout (5 s) so a wedged peer
kernel buffer can't stall writers indefinitely.
- Consume signal goroutine: `signal.Stop` + `ctx.Done` select, no leak
across repeated invocations in the same process.
- JQ pre-flight compile so bad expressions fail before the bus fork and
any server-side PreConsume side effects.
- `f.NewAPIClient`'s `*core.ConfigError` now passes through unwrapped
so the actionable "run lark-cli config init" hint reaches the user.
Subprocess / AI contract
------------------------
- `event consume` emits `[event] ready event_key=<key>` on stderr once
the bus handshake completes and events will flow. Parent processes
block-read stderr until this line before reading stdout — no `sleep`
fallback needed.
- All list-like commands have `--json` for structured consumption.
- Skill docs in `skills/lark-event/` (SKILL.md + references/) brief AI
agents on the command surface, JQ against Output Schema, bounded
execution, and subprocess lifecycle.
Testing
-------
Unit tests across bus/hub, consume loop, protocol codec, dedup,
registry, transport (Unix + Windows), schema reflection, field
overrides, pointer resolver. Integration tests cover fork startup,
shutdown, orphan detection, probe, stdin EOF, preflight, bounded
execution, and Windows busdiscover PowerShell compatibility.
Change-Id: Ib69d6d8409b33b99790081e273d4b5b01b7dbf80
* fix(event): address CodeRabbit findings + lift patch coverage above 60%
CodeRabbit comments (PR #654)
-----------------------------
1. bus/dedup: IsDuplicate dropped legitimate (post-TTL) events after
cleanupExpired fired. The run-every-1000-inserts cleanup removed
TTL-expired IDs from the `seen` map but left them in the ring;
IsDuplicate's ring-scan fallback then rediscovered them and falsely
reported "duplicate", and bus.Publish silently dropped the event.
Removed the ring-scan branch — `seen` is the sole authority, the ring
only bounds map size via overflow eviction. New regression test
TestDedupFilter_TTLExpiryAfterCleanupRunRespected exercises the 10-
insert + cleanup path and guards the fix.
2. consume/remote_preflight: the decoder only read `data.online_instance_
cnt`. A non-zero business code with no data payload decoded to 0 and
callers treated it as "verified zero", forking a local bus that would
duplicate events. Added Code / Msg fields and promoted code != 0 into
an error so the caller distinguishes verified-zero from check-failed.
3. cmd/event/stop: swapped os.ReadDir / os.Stat to vfs.ReadDir / vfs.Stat
in discoverAppIDs per project guideline (enables test mocking). New
TestDiscoverAppIDs_* lifts discoverAppIDs from 0% to 100%.
4. cmd/event/appmeta_err: narrowed authURLPattern from
feishu.cn|feishu.net|larksuite.com|larkoffice.com to the two hosts
consoleScopeGrantURL actually produces. Kept the allowlist pinned to
ResolveEndpoints' output with a comment flagging the synchrony.
5. cmd/event/list: moved "No EventKeys registered." and "Use 'event
schema <key>' for details." hints to stderr so `event list | jq`
style pipelines don't ingest them as data.
6. cmd/event/schema: runSchema is a RunE entry point; swapped the bare
fmt.Errorf on resolveSchemaJSON failure to output.Errorf so AI
agents parse a structured error envelope.
Coverage bumps (patch ~50% -> ~60%)
-----------------------------------
internal/event/consume/loop_test.go: loop.go was 0% at patch time.
New tests cover consumeLoop end-to-end via net.Pipe (events -> sink,
max-events -> ctx.Done -> PreShutdownCheck/Ack), seq-gap warning,
jq filtering + early compile failure, isTerminalSinkError classifier.
Takes consumeLoop from 0% to ~74%.
internal/event/protocol/messages_test.go: all NewXxx constructors,
Encode/Decode roundtrip per message type, EncodeWithDeadline deadline
enforcement, ReadFrame MaxFrameBytes rejection + EOF propagation.
Takes protocol from 28% to ~86%.
Also bundles small UX polish:
- cmd/event/consume: --output-dir flag doc flags path-traversal behavior;
jq-validation failures now re-wrap with an event-specific hint
pointing at `event schema` for payload shape.
- internal/event/consume.validateParams: error now names the EventKey
and lists valid param names inline so AI callers recover without a
second `event schema` round-trip.
- skills/lark-event: description expanded to mention
listener/subscribe/consume synonyms + the IM scope set explicitly;
lark-event-im reference polished; obsolete lark-event-subscribe
reference removed.
Verified with go test -race -timeout 120s across ./cmd/event/...,
./events/..., ./internal/event/...; gofmt clean; go vet clean.
Change-Id: I3837b8645ea1d7529c9a8fd4c2bbfa965ae1b519
* test(event): cover format helpers + cobra factories
Adds cmd/event/format_helpers_test.go covering the pure output helpers
and factory wire-ups that RunE-level tests would need a live bus to
exercise:
- writeStopJSON: shape assertions + nil → [] (scripts expecting
.results | length must not see null).
- writeStopText: stdout vs stderr routing — stopped / no-bus lines to
stdout, refused / errored lines to stderr.
- busState.String: all three discriminator values.
- humanizeDuration: each bucket boundary (seconds / minutes / hours / days).
- writeStatusText: covers stateNotRunning / stateRunning (with consumer
table) / stateOrphan (with kill hint).
- writeStatusJSON: orphan entry carries suggested_action + issue;
running entry must NOT carry those fields (hint-leak guard for
scripts that key on issue != "").
- exitForOrphan: flag-off never errors; flag-on errors iff any orphan
is present, with ExitValidation code.
- NewCmdConsume / NewCmdStatus / NewCmdStop / NewCmdList / NewCmdBus:
flag registration + RunE presence, so review catches flag-name drift.
NewCmdBus check also pins Hidden=true.
Lifts cmd/event coverage 51.7% → 61.1%; aggregate event-package
coverage crosses the 60% codecov patch threshold (62% locally).
Change-Id: I9ecf3d905a8f9607b9441ee8a61e746496e2be63
* fix(event): address lint + deadcode CI failures
4 golangci-lint findings + 1 deadcode finding flagged on PR #654.
lint
----
1. cmd/event/stop.go:86 (ineffassign): `targets := []string{}` is
overwritten by both branches of the `if o.all` below, so the empty-
slice initializer is dead. Switched to `var targets []string`.
2. cmd/event/consume.go nilerr: the user-identity scope preflight
swallows a non-nil ResolveToken error and returns nil. This is
intentional — a missing/expired user token must not block consume;
the bus handshake will surface the real auth error with actionable
hints. Added `//nolint:nilerr` with a 4-line comment pinning the
reasoning.
3. events/im/message_receive.go:62 nilerr: malformed JSON payload
returns the original bytes + nil so consumers still see the event
(the WARN breadcrumb lives in the outer loop). Added
`//nolint:nilerr` with a one-line comment.
4. internal/event/schemas/fromtype_test.go:26 unused: `unexportedStr`
is a reflection-test fixture — its presence (not value) exercises
the FromType skip-unexported path verified at the "unexported
field should not be in schema" assertion. Added `//nolint:unused`
and a 4-line comment pointing at the guarded assertion.
deadcode
--------
5. internal/event/testutil/testutil.go: NewTCPFake has no callers in
the repo. Removed the constructor plus the `inner == nil` TCP-mode
branches from Listen / Dial / Cleanup. FakeTransport now only
supports the wrapped-overlay mode (NewWrappedFake), which is the
one every existing test uses. Doc comment simplified accordingly.
Verified locally: go test -race -timeout 120s across ./cmd/event/...,
./events/..., ./internal/event/... all green; gofmt clean; go vet
clean.
Change-Id: Ie8a2270827a0bde6b8159ab70aaf5c1e9ca7d5b9
* fix(event): drop stale enum + simplify protocol test type helper
- events/im/message_receive.go: dropped the `enum` tag on
ImMessageReceiveOutput.MessageType. convertlib registers many more
message types than the old 11-item list (video / location /
calendar / todo / vote / hongbao / merge_forward / folder / ...),
so a partial enum would tell AI consumers that valid values like
"video" are invalid and produce false-negative JQ filters.
- internal/event/protocol/messages_test.go: collapsed the
typeOf → reflectTypeName → stringType chain in
TestEncode_DecodeRoundtripAllTypes to a single fmt.Sprintf("%T", v).
The hand-maintained type switch silently returned "<unknown>" for
any new message type, which would have let future Decode bugs slip
past the roundtrip assertion. Also removed a dead `cases` table at
the top of TestConstructors_PinTypeField left over from an earlier
refactor.
Change-Id: I831e96f8417e80637596030d652a559de0d33122
* docs(event): polish skill docs + rename root_path_hint to jq_root_path
- skills/lark-event/SKILL.md, lark-event-im.md: translated to English,
reorganized around a top-level "Core commands" table, scenario
recipes tightened.
- cmd/event/schema.go: renamed the writeSchemaJSON hint field
RootPathHint / "root_path_hint" -> JQRootPath / "jq_root_path" to
make its purpose (a jq path prefix) obvious at the call site; no
external consumer depends on the old name yet.
Change-Id: I00c14061ca33caedc0975bfeadc4b26d3dcd314d
* chore(event): strip excessive comments
Change-Id: I8f44f36f5dbdba3ef95dfc67069dc796232f91ec
* fix(event): dedup self-eviction race + protocol oversized-frame test
dedup: in IsDuplicate, the ring-slot eviction step deleted seen[id] even
when ring[pos] equalled the freshly-recorded id (post-TTL reinsertion
landing on its own historical slot). Net result: ring still held id but
seen did not, so the next IsDuplicate(id) returned false and the
duplicate was delivered. Skip the delete when old == eventID. New
TestDedupFilter_SelfEvictionPreservesFreshEntry pins the invariant by
pre-loading the ring slot and asserting the second call still reports
duplicate.
protocol: TestReadFrame_RejectsOversized used strings.Contains feeding
t.Logf, so any non-nil error passed — including a future regression
that returned io.ErrUnexpectedEOF while silently keeping the buffer
unbounded. Promoted MaxFrameBytes overflow to a sentinel
ErrFrameTooLarge and the test now asserts via errors.Is.
Change-Id: I50281dad392152b0ca083fd30c38eb0695e63bd3
* docs(event): clarify .content shape per message_type + add sender filter recipe
Change-Id: I619fd15c1a362e42e6602fd3e3316bbc75eddc5e
* fix(event): replace cmdline-regex bus discovery with PID file + close concurrent fork race
Bus discovery previously walked the OS process table and parsed `--profile cli_*` from
cmdline; the regex rejected any non-cli_ profile name (D-03a). Replace with per-AppID
bus.pid + bus.alive.lock under events/<AppID>/, probed via try-lock. AppID round-trips
through the directory name, so the profile-vs-AppID confusion is gone by construction.
Also fix B-07 (two consumers each fork an independent bus, halving event delivery):
- forkBus holds bus.fork.lock until child is dial-able, not just until cmd.Start
- bus daemon takes alive.lock before binding the socket; cleanup-TOCTOU race can no
longer leave two listeners on different inodes
status.go renders an orphan with PID=0 distinctly (live bus but pid file unreadable)
so we never print "Action: kill 0".
Change-Id: I3bf0a6cf1d91fb274ac5a6df83d66896aafb291f
* style(event): gofmt bus.go
Trailing blank line introduced when appending acquireAliveLock helper.
Change-Id: I4ae1b4a4363dc6c89dcbd6a170f4563117490ba3
* fix(event): swap os.Remove/Rename for vfs.* and silence forbidigo on internal diagnostics
golangci-lint forbidigo blocks os.* in internal/. Switch the pid-file write to vfs.Remove/vfs.Rename and add a nolint marker on the two stderr diagnostics in busdiscover, matching the existing pattern in consume/*.
Change-Id: Ia6768be62aefeb8ca40f991d3130a78ef2ec0ea5
* fix(event): cross-platform --all + clean SIGPIPE shutdown for consume
- stop --all: replace bus.sock-file probe with busdiscover lock-based
scan; previously skipped Windows entirely (named-pipe transport, no
socket on disk) and misidentified Unix stale sockets as live. Same
win for `event status` (shares discoverAppIDs).
- consume: ignore SIGPIPE so a closed stdout pipe (e.g. `... | head -n 1`)
surfaces as EPIPE error and reaches the existing isTerminalSinkError
cleanup path (log "output pipe closed", lastForKey query, hub
unregister), instead of being killed by Go's default fd 1/2 SIGPIPE
handler with exit 141 and zero deferred cleanup.
Build-tagged: real on unix, no-op on windows (no SIGPIPE there).
Change-Id: I453b19f05c489fd9d5c1a9ba3bdc35e127c15b83
* docs(event): translate IM EventKey descriptions and field tags to English
Aligns with the rest of the codebase (titles, struct names, README) which
are already in English. Surfaces in `event list` / `event schema` and is
also consumed by AI agents.
- events/im/message_receive.go: 11 desc tags on ImMessageReceiveOutput
- events/im/native.go: 10 description fields on Native EventKeys
- events/im/register.go: im.message.receive_v1 Description
Change-Id: I6f46950b4793f137e0129c1f06019a3419195443
* docs(event): drop misleading AuthTypes[0] auto-default claim
The KeyDefinition comment and SKILL.md flag table both stated that
`--as auto` resolves to `AuthTypes[0]`. It does not — ResolveAs goes
through global rules (config default_as / credential hint / `bot`
fallback) without consulting the EventKey. AuthTypes is only used by
CheckIdentity as a post-resolve whitelist.
Reword the field comment to plain whitelist semantics and have SKILL.md
defer `--as` documentation to lark-shared.
Change-Id: Ia5d3d3790aed05813a0fa72d6b43518224e2055b
* revert(comments): restore original comments on 3rd-party files
e61482a stripped comments across 105 files. Restore the four files
authored by others (cmd/build.go, shortcuts/common/{types,runner}.go,
shortcuts/event/subscribe.go) to their pre-strip state so unrelated
documentation isn't churned in this PR.
Change-Id: Ie2527b06bfaf5b3861b0b9dff1e19bbfe7dde456
169 lines
4.7 KiB
Go
169 lines
4.7 KiB
Go
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package source
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
larkcore "github.com/larksuite/oapi-sdk-go/v3/core"
|
|
larkevent "github.com/larksuite/oapi-sdk-go/v3/event"
|
|
"github.com/larksuite/oapi-sdk-go/v3/event/dispatcher"
|
|
larkws "github.com/larksuite/oapi-sdk-go/v3/ws"
|
|
|
|
"github.com/larksuite/cli/internal/event"
|
|
"github.com/larksuite/cli/internal/event/protocol"
|
|
)
|
|
|
|
const maxEventBodyBytes = 1 << 20 // bound per-subscriber sendCh memory under runaway payloads
|
|
|
|
type FeishuSource struct {
|
|
AppID string
|
|
AppSecret string
|
|
Domain string
|
|
Logger *log.Logger
|
|
}
|
|
|
|
func (s *FeishuSource) Name() string { return "feishu-websocket" }
|
|
|
|
func (s *FeishuSource) Start(ctx context.Context, eventTypes []string, emit func(*event.RawEvent), notify StatusNotifier) error {
|
|
d := dispatcher.NewEventDispatcher("", "")
|
|
|
|
rawHandler := s.buildRawHandler(emit)
|
|
|
|
for _, et := range eventTypes {
|
|
d.OnCustomizedEvent(et, rawHandler)
|
|
}
|
|
|
|
opts := []larkws.ClientOption{larkws.WithEventHandler(d)}
|
|
if s.Domain != "" {
|
|
opts = append(opts, larkws.WithDomain(s.Domain))
|
|
}
|
|
if s.Logger != nil || notify != nil {
|
|
opts = append(opts, larkws.WithLogLevel(larkcore.LogLevelInfo))
|
|
opts = append(opts, larkws.WithLogger(&sdkLogger{l: s.Logger, notify: notify}))
|
|
}
|
|
|
|
if notify != nil {
|
|
notify(protocol.SourceStateConnecting, "")
|
|
}
|
|
cli := larkws.NewClient(s.AppID, s.AppSecret, opts...)
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() { errCh <- cli.Start(ctx) }()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case err := <-errCh:
|
|
return err
|
|
}
|
|
}
|
|
|
|
// buildRawHandler is extracted from Start so unit tests can exercise it without a WS client.
|
|
func (s *FeishuSource) buildRawHandler(emit func(*event.RawEvent)) func(context.Context, *larkevent.EventReq) error {
|
|
return func(_ context.Context, e *larkevent.EventReq) error {
|
|
if e.Body == nil {
|
|
return nil
|
|
}
|
|
if len(e.Body) > maxEventBodyBytes {
|
|
if s.Logger != nil {
|
|
s.Logger.Printf("[feishu] drop oversized event: %d bytes > cap %d", len(e.Body), maxEventBodyBytes)
|
|
}
|
|
return nil
|
|
}
|
|
var envelope struct {
|
|
Header struct {
|
|
EventID string `json:"event_id"`
|
|
EventType string `json:"event_type"`
|
|
CreateTime string `json:"create_time"`
|
|
} `json:"header"`
|
|
}
|
|
if err := json.Unmarshal(e.Body, &envelope); err != nil {
|
|
if s.Logger != nil {
|
|
preview := string(e.Body)
|
|
if len(preview) > 200 {
|
|
preview = preview[:200] + "...(truncated)"
|
|
}
|
|
s.Logger.Printf("[feishu] drop malformed event: unmarshal error: %v body=%s", err, preview)
|
|
}
|
|
return nil
|
|
}
|
|
if envelope.Header.EventID == "" || envelope.Header.EventType == "" {
|
|
if s.Logger != nil {
|
|
s.Logger.Printf("[feishu] drop event missing header fields: event_id=%q event_type=%q",
|
|
envelope.Header.EventID, envelope.Header.EventType)
|
|
}
|
|
return nil
|
|
}
|
|
emit(&event.RawEvent{
|
|
EventID: envelope.Header.EventID,
|
|
EventType: envelope.Header.EventType,
|
|
SourceTime: envelope.Header.CreateTime,
|
|
Payload: json.RawMessage(e.Body),
|
|
Timestamp: time.Now(),
|
|
})
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// sdkLogger forwards every SDK line to bus.log; lifecycle lines also fire notify.
|
|
type sdkLogger struct {
|
|
l *log.Logger
|
|
notify StatusNotifier
|
|
}
|
|
|
|
func (a *sdkLogger) Debug(_ context.Context, _ ...interface{}) {}
|
|
func (a *sdkLogger) Info(_ context.Context, args ...interface{}) {
|
|
msg := fmt.Sprint(args...)
|
|
if a.l != nil {
|
|
a.l.Output(2, "[SDK] "+msg)
|
|
}
|
|
a.tryNotify(msg, "")
|
|
}
|
|
func (a *sdkLogger) Warn(_ context.Context, args ...interface{}) {
|
|
msg := fmt.Sprint(args...)
|
|
if a.l != nil {
|
|
a.l.Output(2, "[SDK WARN] "+msg)
|
|
}
|
|
a.tryNotify(msg, "")
|
|
}
|
|
func (a *sdkLogger) Error(_ context.Context, args ...interface{}) {
|
|
msg := fmt.Sprint(args...)
|
|
if a.l != nil {
|
|
a.l.Output(2, "[SDK ERROR] "+msg)
|
|
}
|
|
// Errors usually manifest as disconnects; pass msg as detail.
|
|
a.tryNotify(msg, msg)
|
|
}
|
|
|
|
var reconnectAttemptRe = regexp.MustCompile(`reconnect:?\s*(\d+)`)
|
|
|
|
// tryNotify uses HasPrefix (not Contains): "connected to" matches inside "disconnected to" otherwise.
|
|
func (a *sdkLogger) tryNotify(msg, errDetail string) {
|
|
if a.notify == nil {
|
|
return
|
|
}
|
|
lower := strings.ToLower(msg)
|
|
switch {
|
|
case strings.HasPrefix(lower, sdkLogReconnecting):
|
|
detail := ""
|
|
if m := reconnectAttemptRe.FindStringSubmatch(lower); len(m) == 2 {
|
|
detail = "attempt " + m[1]
|
|
}
|
|
a.notify(protocol.SourceStateReconnecting, detail)
|
|
case strings.HasPrefix(lower, sdkLogDisconnected):
|
|
a.notify(protocol.SourceStateDisconnected, errDetail)
|
|
case strings.HasPrefix(lower, sdkLogConnected):
|
|
a.notify(protocol.SourceStateConnected, "")
|
|
}
|
|
}
|
|
|
|
var _ larkcore.Logger = (*sdkLogger)(nil)
|