Files
larksuite-cli/shortcuts/event/subscribe.go
liuxinyanglxy 4d4508dfd7 feat(event): add event subscription & consume system (#654)
* feat(event): add event subscription & consume system with orphan bus detection

Introduces end-to-end Feishu event consumption via a new `lark-cli event`
command family. Users can subscribe to and consume real-time events
(IM messages, chat/member lifecycle, reactions, ...) in a forked bus
daemon architecture with orphan detection, reflected + overrideable JSON
schemas, and AI-friendly `--json` / `--jq` output.

Commands
--------
- `event list [--json]`      list subscribable EventKeys
- `event schema <key>`       Parameters + Output Schema + auth info
- `event consume <key>`      foreground blocking consume; SIGINT/SIGTERM
                             /stdin-EOF shutdown; `--max-events` /
                             `--timeout` bounded; `--jq` projection;
                             `--output-dir` spool; `--param` KV inputs
- `event status [--fail-on-orphan] [--json]`   bus daemon health
- `event stop [--all] [--force] [--json]`      stop bus daemon(s)
- `event _bus` (hidden)      forked daemon entrypoint

Architecture
------------
- Bus daemon (internal/event/bus): per-AppID forked process that holds
  the Feishu long-poll connection and fans events out to 1..N local
  consumers over an IPC socket. Drop-oldest backpressure, TOCTOU-safe
  cleanup via AcquireCleanupLock, idle-timeout self-shutdown, graceful
  SIGTERM.
- Consume client (internal/event/consume): fork+dial the daemon,
  handshake, remote preflight (HTTP /open-apis/event/v1/connection),
  JQ projection, sequence-gap detection, health probe. Bounded
  execution (`--max-events` / `--timeout`) for AI/script usage.
- Wire protocol (internal/event/protocol): newline-delimited JSON
  frames with 1 MB size cap and 5 s write deadlines. Hello / HelloAck /
  PreShutdownCheck / Shutdown / StatusQuery control messages.
- Orphan detection (internal/event/busdiscover): OS process-table scan
  (ps on Unix, PowerShell on Windows) with two-gate cmdline filter
  (lark-cli + event _bus) that naturally rejects pid-reused unrelated
  processes.
- Transport (internal/event/transport): Unix socket on darwin/linux,
  Windows named pipe on windows.
- Schema system (internal/event, internal/event/schemas): SchemaDef with
  mutually-exclusive Native (framework wraps V2 envelope) or Custom
  (zero-touch) specs. Reflection reads `desc` / `enum` / `kind` struct
  tags, with array elements diving into `items`. FieldOverrides overlay
  engine addresses paths via JSON Pointer (including `/*` array
  wildcard) and runs post-reflect, post-envelope. Lint guards orphan
  override paths.
- IM events (events/im): 11 keys — receive / read / recalled, chat and
  member lifecycle, reactions — all with per-field open_id / union_id /
  user_id / chat_id / message_id / timestamp_ms format annotations.

Robustness
----------
- Bus idle-timer race fix: re-check live conn count under lock before
  honoring the tick; Stop+drain before Reset per timer contract.
- Protocol frame cap: replace `br.ReadBytes('\n')` with `ReadFrame` that
  rejects frames > MaxFrameBytes (1 MB). Closes a DoS path where any
  local peer could grow the reader's buffer unbounded.
- Control-message writes gated by WriteTimeout (5 s) so a wedged peer
  kernel buffer can't stall writers indefinitely.
- Consume signal goroutine: `signal.Stop` + `ctx.Done` select, no leak
  across repeated invocations in the same process.
- JQ pre-flight compile so bad expressions fail before the bus fork and
  any server-side PreConsume side effects.
- `f.NewAPIClient`'s `*core.ConfigError` now passes through unwrapped
  so the actionable "run lark-cli config init" hint reaches the user.

Subprocess / AI contract
------------------------
- `event consume` emits `[event] ready event_key=<key>` on stderr once
  the bus handshake completes and events will flow. Parent processes
  block-read stderr until this line before reading stdout — no `sleep`
  fallback needed.
- All list-like commands have `--json` for structured consumption.
- Skill docs in `skills/lark-event/` (SKILL.md + references/) brief AI
  agents on the command surface, JQ against Output Schema, bounded
  execution, and subprocess lifecycle.

Testing
-------
Unit tests across bus/hub, consume loop, protocol codec, dedup,
registry, transport (Unix + Windows), schema reflection, field
overrides, pointer resolver. Integration tests cover fork startup,
shutdown, orphan detection, probe, stdin EOF, preflight, bounded
execution, and Windows busdiscover PowerShell compatibility.

Change-Id: Ib69d6d8409b33b99790081e273d4b5b01b7dbf80

* fix(event): address CodeRabbit findings + lift patch coverage above 60%

CodeRabbit comments (PR #654)
-----------------------------
1. bus/dedup: IsDuplicate dropped legitimate (post-TTL) events after
   cleanupExpired fired. The run-every-1000-inserts cleanup removed
   TTL-expired IDs from the `seen` map but left them in the ring;
   IsDuplicate's ring-scan fallback then rediscovered them and falsely
   reported "duplicate", and bus.Publish silently dropped the event.
   Removed the ring-scan branch — `seen` is the sole authority, the ring
   only bounds map size via overflow eviction. New regression test
   TestDedupFilter_TTLExpiryAfterCleanupRunRespected exercises the 10-
   insert + cleanup path and guards the fix.

2. consume/remote_preflight: the decoder only read `data.online_instance_
   cnt`. A non-zero business code with no data payload decoded to 0 and
   callers treated it as "verified zero", forking a local bus that would
   duplicate events. Added Code / Msg fields and promoted code != 0 into
   an error so the caller distinguishes verified-zero from check-failed.

3. cmd/event/stop: swapped os.ReadDir / os.Stat to vfs.ReadDir / vfs.Stat
   in discoverAppIDs per project guideline (enables test mocking). New
   TestDiscoverAppIDs_* lifts discoverAppIDs from 0% to 100%.

4. cmd/event/appmeta_err: narrowed authURLPattern from
   feishu.cn|feishu.net|larksuite.com|larkoffice.com to the two hosts
   consoleScopeGrantURL actually produces. Kept the allowlist pinned to
   ResolveEndpoints' output with a comment flagging the synchrony.

5. cmd/event/list: moved "No EventKeys registered." and "Use 'event
   schema <key>' for details." hints to stderr so `event list | jq`
   style pipelines don't ingest them as data.

6. cmd/event/schema: runSchema is a RunE entry point; swapped the bare
   fmt.Errorf on resolveSchemaJSON failure to output.Errorf so AI
   agents parse a structured error envelope.

Coverage bumps (patch ~50% -> ~60%)
-----------------------------------
internal/event/consume/loop_test.go: loop.go was 0% at patch time.
New tests cover consumeLoop end-to-end via net.Pipe (events -> sink,
max-events -> ctx.Done -> PreShutdownCheck/Ack), seq-gap warning,
jq filtering + early compile failure, isTerminalSinkError classifier.
Takes consumeLoop from 0% to ~74%.

internal/event/protocol/messages_test.go: all NewXxx constructors,
Encode/Decode roundtrip per message type, EncodeWithDeadline deadline
enforcement, ReadFrame MaxFrameBytes rejection + EOF propagation.
Takes protocol from 28% to ~86%.

Also bundles small UX polish:
- cmd/event/consume: --output-dir flag doc flags path-traversal behavior;
  jq-validation failures now re-wrap with an event-specific hint
  pointing at `event schema` for payload shape.
- internal/event/consume.validateParams: error now names the EventKey
  and lists valid param names inline so AI callers recover without a
  second `event schema` round-trip.
- skills/lark-event: description expanded to mention
  listener/subscribe/consume synonyms + the IM scope set explicitly;
  lark-event-im reference polished; obsolete lark-event-subscribe
  reference removed.

Verified with go test -race -timeout 120s across ./cmd/event/...,
./events/..., ./internal/event/...; gofmt clean; go vet clean.

Change-Id: I3837b8645ea1d7529c9a8fd4c2bbfa965ae1b519

* test(event): cover format helpers + cobra factories

Adds cmd/event/format_helpers_test.go covering the pure output helpers
and factory wire-ups that RunE-level tests would need a live bus to
exercise:

- writeStopJSON: shape assertions + nil → [] (scripts expecting
  .results | length must not see null).
- writeStopText: stdout vs stderr routing — stopped / no-bus lines to
  stdout, refused / errored lines to stderr.
- busState.String: all three discriminator values.
- humanizeDuration: each bucket boundary (seconds / minutes / hours / days).
- writeStatusText: covers stateNotRunning / stateRunning (with consumer
  table) / stateOrphan (with kill hint).
- writeStatusJSON: orphan entry carries suggested_action + issue;
  running entry must NOT carry those fields (hint-leak guard for
  scripts that key on issue != "").
- exitForOrphan: flag-off never errors; flag-on errors iff any orphan
  is present, with ExitValidation code.
- NewCmdConsume / NewCmdStatus / NewCmdStop / NewCmdList / NewCmdBus:
  flag registration + RunE presence, so review catches flag-name drift.
  NewCmdBus check also pins Hidden=true.

Lifts cmd/event coverage 51.7% → 61.1%; aggregate event-package
coverage crosses the 60% codecov patch threshold (62% locally).

Change-Id: I9ecf3d905a8f9607b9441ee8a61e746496e2be63

* fix(event): address lint + deadcode CI failures

4 golangci-lint findings + 1 deadcode finding flagged on PR #654.

lint
----
1. cmd/event/stop.go:86 (ineffassign): `targets := []string{}` is
   overwritten by both branches of the `if o.all` below, so the empty-
   slice initializer is dead. Switched to `var targets []string`.
2. cmd/event/consume.go nilerr: the user-identity scope preflight
   swallows a non-nil ResolveToken error and returns nil. This is
   intentional — a missing/expired user token must not block consume;
   the bus handshake will surface the real auth error with actionable
   hints. Added `//nolint:nilerr` with a 4-line comment pinning the
   reasoning.
3. events/im/message_receive.go:62 nilerr: malformed JSON payload
   returns the original bytes + nil so consumers still see the event
   (the WARN breadcrumb lives in the outer loop). Added
   `//nolint:nilerr` with a one-line comment.
4. internal/event/schemas/fromtype_test.go:26 unused: `unexportedStr`
   is a reflection-test fixture — its presence (not value) exercises
   the FromType skip-unexported path verified at the "unexported
   field should not be in schema" assertion. Added `//nolint:unused`
   and a 4-line comment pointing at the guarded assertion.

deadcode
--------
5. internal/event/testutil/testutil.go: NewTCPFake has no callers in
   the repo. Removed the constructor plus the `inner == nil` TCP-mode
   branches from Listen / Dial / Cleanup. FakeTransport now only
   supports the wrapped-overlay mode (NewWrappedFake), which is the
   one every existing test uses. Doc comment simplified accordingly.

Verified locally: go test -race -timeout 120s across ./cmd/event/...,
./events/..., ./internal/event/... all green; gofmt clean; go vet
clean.

Change-Id: Ie8a2270827a0bde6b8159ab70aaf5c1e9ca7d5b9

* fix(event): drop stale enum + simplify protocol test type helper

- events/im/message_receive.go: dropped the `enum` tag on
  ImMessageReceiveOutput.MessageType. convertlib registers many more
  message types than the old 11-item list (video / location /
  calendar / todo / vote / hongbao / merge_forward / folder / ...),
  so a partial enum would tell AI consumers that valid values like
  "video" are invalid and produce false-negative JQ filters.

- internal/event/protocol/messages_test.go: collapsed the
  typeOf → reflectTypeName → stringType chain in
  TestEncode_DecodeRoundtripAllTypes to a single fmt.Sprintf("%T", v).
  The hand-maintained type switch silently returned "<unknown>" for
  any new message type, which would have let future Decode bugs slip
  past the roundtrip assertion. Also removed a dead `cases` table at
  the top of TestConstructors_PinTypeField left over from an earlier
  refactor.

Change-Id: I831e96f8417e80637596030d652a559de0d33122

* docs(event): polish skill docs + rename root_path_hint to jq_root_path

- skills/lark-event/SKILL.md, lark-event-im.md: translated to English,
  reorganized around a top-level "Core commands" table, scenario
  recipes tightened.
- cmd/event/schema.go: renamed the writeSchemaJSON hint field
  RootPathHint / "root_path_hint" -> JQRootPath / "jq_root_path" to
  make its purpose (a jq path prefix) obvious at the call site; no
  external consumer depends on the old name yet.

Change-Id: I00c14061ca33caedc0975bfeadc4b26d3dcd314d

* chore(event): strip excessive comments

Change-Id: I8f44f36f5dbdba3ef95dfc67069dc796232f91ec

* fix(event): dedup self-eviction race + protocol oversized-frame test

dedup: in IsDuplicate, the ring-slot eviction step deleted seen[id] even
when ring[pos] equalled the freshly-recorded id (post-TTL reinsertion
landing on its own historical slot). Net result: ring still held id but
seen did not, so the next IsDuplicate(id) returned false and the
duplicate was delivered. Skip the delete when old == eventID. New
TestDedupFilter_SelfEvictionPreservesFreshEntry pins the invariant by
pre-loading the ring slot and asserting the second call still reports
duplicate.

protocol: TestReadFrame_RejectsOversized used strings.Contains feeding
t.Logf, so any non-nil error passed — including a future regression
that returned io.ErrUnexpectedEOF while silently keeping the buffer
unbounded. Promoted MaxFrameBytes overflow to a sentinel
ErrFrameTooLarge and the test now asserts via errors.Is.

Change-Id: I50281dad392152b0ca083fd30c38eb0695e63bd3

* docs(event): clarify .content shape per message_type + add sender filter recipe

Change-Id: I619fd15c1a362e42e6602fd3e3316bbc75eddc5e

* fix(event): replace cmdline-regex bus discovery with PID file + close concurrent fork race

Bus discovery previously walked the OS process table and parsed `--profile cli_*` from
cmdline; the regex rejected any non-cli_ profile name (D-03a). Replace with per-AppID
bus.pid + bus.alive.lock under events/<AppID>/, probed via try-lock. AppID round-trips
through the directory name, so the profile-vs-AppID confusion is gone by construction.

Also fix B-07 (two consumers each fork an independent bus, halving event delivery):
- forkBus holds bus.fork.lock until child is dial-able, not just until cmd.Start
- bus daemon takes alive.lock before binding the socket; cleanup-TOCTOU race can no
  longer leave two listeners on different inodes

status.go renders an orphan with PID=0 distinctly (live bus but pid file unreadable)
so we never print "Action: kill 0".

Change-Id: I3bf0a6cf1d91fb274ac5a6df83d66896aafb291f

* style(event): gofmt bus.go

Trailing blank line introduced when appending acquireAliveLock helper.

Change-Id: I4ae1b4a4363dc6c89dcbd6a170f4563117490ba3

* fix(event): swap os.Remove/Rename for vfs.* and silence forbidigo on internal diagnostics

golangci-lint forbidigo blocks os.* in internal/. Switch the pid-file write to vfs.Remove/vfs.Rename and add a nolint marker on the two stderr diagnostics in busdiscover, matching the existing pattern in consume/*.

Change-Id: Ia6768be62aefeb8ca40f991d3130a78ef2ec0ea5

* fix(event): cross-platform --all + clean SIGPIPE shutdown for consume

- stop --all: replace bus.sock-file probe with busdiscover lock-based
  scan; previously skipped Windows entirely (named-pipe transport, no
  socket on disk) and misidentified Unix stale sockets as live. Same
  win for `event status` (shares discoverAppIDs).

- consume: ignore SIGPIPE so a closed stdout pipe (e.g. `... | head -n 1`)
  surfaces as EPIPE error and reaches the existing isTerminalSinkError
  cleanup path (log "output pipe closed", lastForKey query, hub
  unregister), instead of being killed by Go's default fd 1/2 SIGPIPE
  handler with exit 141 and zero deferred cleanup.
  Build-tagged: real on unix, no-op on windows (no SIGPIPE there).

Change-Id: I453b19f05c489fd9d5c1a9ba3bdc35e127c15b83

* docs(event): translate IM EventKey descriptions and field tags to English

Aligns with the rest of the codebase (titles, struct names, README) which
are already in English. Surfaces in `event list` / `event schema` and is
also consumed by AI agents.

- events/im/message_receive.go: 11 desc tags on ImMessageReceiveOutput
- events/im/native.go: 10 description fields on Native EventKeys
- events/im/register.go: im.message.receive_v1 Description

Change-Id: I6f46950b4793f137e0129c1f06019a3419195443

* docs(event): drop misleading AuthTypes[0] auto-default claim

The KeyDefinition comment and SKILL.md flag table both stated that
`--as auto` resolves to `AuthTypes[0]`. It does not — ResolveAs goes
through global rules (config default_as / credential hint / `bot`
fallback) without consulting the EventKey. AuthTypes is only used by
CheckIdentity as a post-resolve whitelist.

Reword the field comment to plain whitelist semantics and have SKILL.md
defer `--as` documentation to lark-shared.

Change-Id: Ia5d3d3790aed05813a0fa72d6b43518224e2055b

* revert(comments): restore original comments on 3rd-party files

e61482a stripped comments across 105 files. Restore the four files
authored by others (cmd/build.go, shortcuts/common/{types,runner}.go,
shortcuts/event/subscribe.go) to their pre-strip state so unrelated
documentation isn't churned in this PR.

Change-Id: Ie2527b06bfaf5b3861b0b9dff1e19bbfe7dde456
2026-04-28 11:19:02 +08:00

301 lines
9.8 KiB
Go

// Copyright (c) 2026 Lark Technologies Pte. Ltd.
// SPDX-License-Identifier: MIT
package event
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"
"github.com/larksuite/cli/internal/core"
"github.com/larksuite/cli/internal/lockfile"
"github.com/larksuite/cli/internal/output"
"github.com/larksuite/cli/internal/validate"
"github.com/larksuite/cli/shortcuts/common"
lark "github.com/larksuite/oapi-sdk-go/v3"
larkcore "github.com/larksuite/oapi-sdk-go/v3/core"
larkevent "github.com/larksuite/oapi-sdk-go/v3/event"
"github.com/larksuite/oapi-sdk-go/v3/event/dispatcher"
larkws "github.com/larksuite/oapi-sdk-go/v3/ws"
)
// stderrLogger redirects SDK log output to an io.Writer (stderr),
// preventing SDK logs from polluting the stdout data stream.
// Debug logs are always suppressed to avoid noisy event-loop output.
// When quiet is true, Info logs are also suppressed; Warn and Error always print.
type stderrLogger struct {
w io.Writer
quiet bool
}
func (l *stderrLogger) Debug(_ context.Context, _ ...interface{}) {}
func (l *stderrLogger) Info(_ context.Context, args ...interface{}) {
if !l.quiet {
fmt.Fprintln(l.w, append([]interface{}{"[SDK Info]"}, args...)...)
}
}
func (l *stderrLogger) Warn(_ context.Context, args ...interface{}) {
fmt.Fprintln(l.w, append([]interface{}{"[SDK Warn]"}, args...)...)
}
func (l *stderrLogger) Error(_ context.Context, args ...interface{}) {
fmt.Fprintln(l.w, append([]interface{}{"[SDK Error]"}, args...)...)
}
var _ larkcore.Logger = (*stderrLogger)(nil)
// commonEventTypes are well-known event types registered in catch-all mode.
var commonEventTypes = []string{
"im.message.receive_v1",
"im.message.message_read_v1",
"im.message.reaction.created_v1",
"im.message.reaction.deleted_v1",
"im.chat.member.bot.added_v1",
"im.chat.member.bot.deleted_v1",
"im.chat.member.user.added_v1",
"im.chat.member.user.withdrawn_v1",
"im.chat.member.user.deleted_v1",
"im.chat.updated_v1",
"im.chat.disbanded_v1",
"contact.user.created_v3",
"contact.user.updated_v3",
"contact.user.deleted_v3",
"contact.department.created_v3",
"contact.department.updated_v3",
"contact.department.deleted_v3",
"calendar.calendar.acl.created_v4",
"calendar.calendar.event.changed_v4",
"approval.approval.updated",
"application.application.visibility.added_v6",
"task.task.update_tenant_v1",
"task.task.update_user_access_v2",
"task.task.comment_updated_v1",
"drive.notice.comment_add_v1",
}
var EventSubscribe = common.Shortcut{
Service: "event",
Command: "+subscribe",
Description: "Subscribe to Lark events via WebSocket (NDJSON output)",
Risk: "read",
Scopes: []string{}, // no direct OAPI; scopes depend on subscribed event types
AuthTypes: []string{"bot"},
// Hidden: superseded by `event consume`. Kept executable so existing
// scripts keep working, but removed from --help/tab-completion so new
// users land on the replacement. Delete once downstream callers have
// migrated.
Hidden: true,
Flags: []common.Flag{
// Output destination — where events go
{Name: "output-dir", Desc: "write each event as a JSON file in this directory (default: stdout)"},
{Name: "route", Type: "string_array", Desc: "regex-based event routing (e.g. --route '^im\\.message=dir:./im/' --route '^contact\\.=dir:./contacts/'); unmatched events fall through to --output-dir or stdout"},
// Output format — how events are serialized
{Name: "compact", Type: "bool", Desc: "flat key-value output: extract text, strip noise fields"},
{Name: "json", Type: "bool", Desc: "pretty-print JSON instead of NDJSON"},
// Filtering — which events reach the pipeline
{Name: "event-types", Desc: "comma-separated event types to subscribe; only use when you do not need other events (omit for catch-all)"},
{Name: "filter", Desc: "regex to further filter events by event_type"},
// Behavior
{Name: "quiet", Type: "bool", Desc: "suppress stderr status messages"},
{Name: "force", Type: "bool", Desc: "bypass single-instance lock (UNSAFE: server randomly splits events across connections, each instance only receives a subset)"},
},
DryRun: func(ctx context.Context, runtime *common.RuntimeContext) *common.DryRunAPI {
eventTypesDisplay := "(catch-all)"
if s := runtime.Str("event-types"); s != "" {
eventTypesDisplay = s
}
filterDisplay := "(none)"
if s := runtime.Str("filter"); s != "" {
filterDisplay = s
}
outputDirDisplay := "(stdout)"
if s := runtime.Str("output-dir"); s != "" {
outputDirDisplay = s
}
routeDisplay := "(none)"
if routes := runtime.StrArray("route"); len(routes) > 0 {
routeDisplay = strings.Join(routes, "; ")
}
return common.NewDryRunAPI().
Desc("Subscribe to Lark events via WebSocket (long-running)").
Set("command", "event +subscribe").
Set("app_id", runtime.Config.AppID).
Set("event_types", eventTypesDisplay).
Set("filter", filterDisplay).Set("output_dir", outputDirDisplay).
Set("route", routeDisplay)
},
Execute: func(ctx context.Context, runtime *common.RuntimeContext) error {
eventTypesStr := runtime.Str("event-types")
filterStr := runtime.Str("filter")
jsonFlag := runtime.Bool("json")
compactFlag := runtime.Bool("compact")
outputDir := runtime.Str("output-dir")
quietFlag := runtime.Bool("quiet")
routeSpecs := runtime.StrArray("route")
forceFlag := runtime.Bool("force")
// Validate output directory path before any work
if outputDir != "" {
safePath, err := validate.SafeOutputPath(outputDir)
if err != nil {
return output.ErrValidation("unsafe output path: %s", err)
}
outputDir = safePath
}
errOut := runtime.IO().ErrOut
out := runtime.IO().Out
info := func(msg string) {
if !quietFlag {
fmt.Fprintln(errOut, msg)
}
}
// --- Single-instance lock ---
if !forceFlag {
lock, err := lockfile.ForSubscribe(runtime.Config.AppID)
if err != nil {
return fmt.Errorf("failed to create lock: %w", err)
}
if err := lock.TryLock(); err != nil {
return output.ErrValidation(
"another event +subscribe instance is already running for app %s\n"+
" Only one subscriber per app is allowed to prevent competing consumers.\n"+
" Use --force to bypass this check.",
runtime.Config.AppID,
)
}
defer lock.Unlock()
}
// --- Build filter chain ---
eventTypeFilter := NewEventTypeFilter(eventTypesStr)
regexFilter, err := NewRegexFilter(filterStr)
if err != nil {
return output.ErrValidation("invalid --filter regex: %s", filterStr)
}
var filterList []EventFilter
if eventTypeFilter != nil {
filterList = append(filterList, eventTypeFilter)
}
if regexFilter != nil {
filterList = append(filterList, regexFilter)
}
filters := NewFilterChain(filterList...)
// --- Parse route ---
router, err := ParseRoutes(routeSpecs)
if err != nil {
return output.ErrValidation("invalid --route: %v", err)
}
// --- Build pipeline ---
mode := TransformRaw
if compactFlag {
mode = TransformCompact
}
pipeline := NewEventPipeline(DefaultRegistry(), filters, PipelineConfig{
Mode: mode,
JsonFlag: jsonFlag,
OutputDir: outputDir,
Quiet: quietFlag,
Router: router,
}, out, errOut)
if err := pipeline.EnsureDirs(); err != nil {
return err
}
// --- Build SDK event dispatcher ---
rawHandler := func(ctx context.Context, event *larkevent.EventReq) error {
if event.Body == nil {
return nil
}
var raw RawEvent
if err := json.Unmarshal(event.Body, &raw); err != nil {
output.PrintError(errOut, fmt.Sprintf("failed to parse event: %v", err))
return nil
}
pipeline.Process(ctx, &raw)
return nil
}
sdkLogger := &stderrLogger{w: errOut, quiet: quietFlag}
eventDispatcher := dispatcher.NewEventDispatcher("", "")
eventDispatcher.InitConfig(larkevent.WithLogger(sdkLogger))
if eventTypeFilter != nil {
for _, et := range eventTypeFilter.Types() {
eventDispatcher.OnCustomizedEvent(et, rawHandler)
}
} else {
for _, et := range commonEventTypes {
eventDispatcher.OnCustomizedEvent(et, rawHandler)
}
}
// --- WebSocket ---
domain := lark.FeishuBaseUrl
if runtime.Config.Brand == core.BrandLark {
domain = lark.LarkBaseUrl
}
info(fmt.Sprintf("%sConnecting to Lark event WebSocket...%s", output.Cyan, output.Reset))
if eventTypeFilter != nil {
info(fmt.Sprintf("Listening for: %s%s%s", output.Green, strings.Join(eventTypeFilter.Types(), ", "), output.Reset))
} else {
info(fmt.Sprintf("Listening for %s%d common event types%s (catch-all mode)", output.Green, len(commonEventTypes), output.Reset))
info(fmt.Sprintf("%sTip:%s use --event-types to listen for specific event types", output.Dim, output.Reset))
}
if regexFilter != nil {
info(fmt.Sprintf("Filter: %s%s%s", output.Yellow, regexFilter.String(), output.Reset))
}
if router != nil {
for _, spec := range routeSpecs {
info(fmt.Sprintf(" Route: %s%s%s", output.Green, spec, output.Reset))
}
}
cli := larkws.NewClient(runtime.Config.AppID, runtime.Config.AppSecret,
larkws.WithEventHandler(eventDispatcher),
larkws.WithDomain(domain),
larkws.WithLogger(sdkLogger),
)
// --- Graceful shutdown ---
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigCh)
startErrCh := make(chan error, 1)
go func() {
startErrCh <- cli.Start(ctx)
}()
info(fmt.Sprintf("%s%sConnected.%s Waiting for events... (Ctrl+C to stop)", output.Bold, output.Green, output.Reset))
select {
case sig, ok := <-sigCh:
if ok && sig != nil {
info(fmt.Sprintf("\n%sReceived %s, shutting down...%s (received %s%d%s events)", output.Yellow, sig, output.Reset, output.Bold, pipeline.EventCount(), output.Reset))
}
return nil
case err, ok := <-startErrCh:
if !ok {
return nil
}
if err != nil {
return output.ErrNetwork("WebSocket connection failed: %v", err)
}
return nil
}
},
}