mirror of
https://github.com/larksuite/cli.git
synced 2026-07-04 06:29:52 +08:00
* feat(event): add event subscription & consume system with orphan bus detection
Introduces end-to-end Feishu event consumption via a new `lark-cli event`
command family. Users can subscribe to and consume real-time events
(IM messages, chat/member lifecycle, reactions, ...) in a forked bus
daemon architecture with orphan detection, reflected + overrideable JSON
schemas, and AI-friendly `--json` / `--jq` output.
Commands
--------
- `event list [--json]` list subscribable EventKeys
- `event schema <key>` Parameters + Output Schema + auth info
- `event consume <key>` foreground blocking consume; SIGINT/SIGTERM
/stdin-EOF shutdown; `--max-events` /
`--timeout` bounded; `--jq` projection;
`--output-dir` spool; `--param` KV inputs
- `event status [--fail-on-orphan] [--json]` bus daemon health
- `event stop [--all] [--force] [--json]` stop bus daemon(s)
- `event _bus` (hidden) forked daemon entrypoint
Architecture
------------
- Bus daemon (internal/event/bus): per-AppID forked process that holds
the Feishu long-poll connection and fans events out to 1..N local
consumers over an IPC socket. Drop-oldest backpressure, TOCTOU-safe
cleanup via AcquireCleanupLock, idle-timeout self-shutdown, graceful
SIGTERM.
- Consume client (internal/event/consume): fork+dial the daemon,
handshake, remote preflight (HTTP /open-apis/event/v1/connection),
JQ projection, sequence-gap detection, health probe. Bounded
execution (`--max-events` / `--timeout`) for AI/script usage.
- Wire protocol (internal/event/protocol): newline-delimited JSON
frames with 1 MB size cap and 5 s write deadlines. Hello / HelloAck /
PreShutdownCheck / Shutdown / StatusQuery control messages.
- Orphan detection (internal/event/busdiscover): OS process-table scan
(ps on Unix, PowerShell on Windows) with two-gate cmdline filter
(lark-cli + event _bus) that naturally rejects pid-reused unrelated
processes.
- Transport (internal/event/transport): Unix socket on darwin/linux,
Windows named pipe on windows.
- Schema system (internal/event, internal/event/schemas): SchemaDef with
mutually-exclusive Native (framework wraps V2 envelope) or Custom
(zero-touch) specs. Reflection reads `desc` / `enum` / `kind` struct
tags, with array elements diving into `items`. FieldOverrides overlay
engine addresses paths via JSON Pointer (including `/*` array
wildcard) and runs post-reflect, post-envelope. Lint guards orphan
override paths.
- IM events (events/im): 11 keys — receive / read / recalled, chat and
member lifecycle, reactions — all with per-field open_id / union_id /
user_id / chat_id / message_id / timestamp_ms format annotations.
Robustness
----------
- Bus idle-timer race fix: re-check live conn count under lock before
honoring the tick; Stop+drain before Reset per timer contract.
- Protocol frame cap: replace `br.ReadBytes('\n')` with `ReadFrame` that
rejects frames > MaxFrameBytes (1 MB). Closes a DoS path where any
local peer could grow the reader's buffer unbounded.
- Control-message writes gated by WriteTimeout (5 s) so a wedged peer
kernel buffer can't stall writers indefinitely.
- Consume signal goroutine: `signal.Stop` + `ctx.Done` select, no leak
across repeated invocations in the same process.
- JQ pre-flight compile so bad expressions fail before the bus fork and
any server-side PreConsume side effects.
- `f.NewAPIClient`'s `*core.ConfigError` now passes through unwrapped
so the actionable "run lark-cli config init" hint reaches the user.
Subprocess / AI contract
------------------------
- `event consume` emits `[event] ready event_key=<key>` on stderr once
the bus handshake completes and events will flow. Parent processes
block-read stderr until this line before reading stdout — no `sleep`
fallback needed.
- All list-like commands have `--json` for structured consumption.
- Skill docs in `skills/lark-event/` (SKILL.md + references/) brief AI
agents on the command surface, JQ against Output Schema, bounded
execution, and subprocess lifecycle.
Testing
-------
Unit tests across bus/hub, consume loop, protocol codec, dedup,
registry, transport (Unix + Windows), schema reflection, field
overrides, pointer resolver. Integration tests cover fork startup,
shutdown, orphan detection, probe, stdin EOF, preflight, bounded
execution, and Windows busdiscover PowerShell compatibility.
Change-Id: Ib69d6d8409b33b99790081e273d4b5b01b7dbf80
* fix(event): address CodeRabbit findings + lift patch coverage above 60%
CodeRabbit comments (PR #654)
-----------------------------
1. bus/dedup: IsDuplicate dropped legitimate (post-TTL) events after
cleanupExpired fired. The run-every-1000-inserts cleanup removed
TTL-expired IDs from the `seen` map but left them in the ring;
IsDuplicate's ring-scan fallback then rediscovered them and falsely
reported "duplicate", and bus.Publish silently dropped the event.
Removed the ring-scan branch — `seen` is the sole authority, the ring
only bounds map size via overflow eviction. New regression test
TestDedupFilter_TTLExpiryAfterCleanupRunRespected exercises the 10-
insert + cleanup path and guards the fix.
2. consume/remote_preflight: the decoder only read `data.online_instance_
cnt`. A non-zero business code with no data payload decoded to 0 and
callers treated it as "verified zero", forking a local bus that would
duplicate events. Added Code / Msg fields and promoted code != 0 into
an error so the caller distinguishes verified-zero from check-failed.
3. cmd/event/stop: swapped os.ReadDir / os.Stat to vfs.ReadDir / vfs.Stat
in discoverAppIDs per project guideline (enables test mocking). New
TestDiscoverAppIDs_* lifts discoverAppIDs from 0% to 100%.
4. cmd/event/appmeta_err: narrowed authURLPattern from
feishu.cn|feishu.net|larksuite.com|larkoffice.com to the two hosts
consoleScopeGrantURL actually produces. Kept the allowlist pinned to
ResolveEndpoints' output with a comment flagging the synchrony.
5. cmd/event/list: moved "No EventKeys registered." and "Use 'event
schema <key>' for details." hints to stderr so `event list | jq`
style pipelines don't ingest them as data.
6. cmd/event/schema: runSchema is a RunE entry point; swapped the bare
fmt.Errorf on resolveSchemaJSON failure to output.Errorf so AI
agents parse a structured error envelope.
Coverage bumps (patch ~50% -> ~60%)
-----------------------------------
internal/event/consume/loop_test.go: loop.go was 0% at patch time.
New tests cover consumeLoop end-to-end via net.Pipe (events -> sink,
max-events -> ctx.Done -> PreShutdownCheck/Ack), seq-gap warning,
jq filtering + early compile failure, isTerminalSinkError classifier.
Takes consumeLoop from 0% to ~74%.
internal/event/protocol/messages_test.go: all NewXxx constructors,
Encode/Decode roundtrip per message type, EncodeWithDeadline deadline
enforcement, ReadFrame MaxFrameBytes rejection + EOF propagation.
Takes protocol from 28% to ~86%.
Also bundles small UX polish:
- cmd/event/consume: --output-dir flag doc flags path-traversal behavior;
jq-validation failures now re-wrap with an event-specific hint
pointing at `event schema` for payload shape.
- internal/event/consume.validateParams: error now names the EventKey
and lists valid param names inline so AI callers recover without a
second `event schema` round-trip.
- skills/lark-event: description expanded to mention
listener/subscribe/consume synonyms + the IM scope set explicitly;
lark-event-im reference polished; obsolete lark-event-subscribe
reference removed.
Verified with go test -race -timeout 120s across ./cmd/event/...,
./events/..., ./internal/event/...; gofmt clean; go vet clean.
Change-Id: I3837b8645ea1d7529c9a8fd4c2bbfa965ae1b519
* test(event): cover format helpers + cobra factories
Adds cmd/event/format_helpers_test.go covering the pure output helpers
and factory wire-ups that RunE-level tests would need a live bus to
exercise:
- writeStopJSON: shape assertions + nil → [] (scripts expecting
.results | length must not see null).
- writeStopText: stdout vs stderr routing — stopped / no-bus lines to
stdout, refused / errored lines to stderr.
- busState.String: all three discriminator values.
- humanizeDuration: each bucket boundary (seconds / minutes / hours / days).
- writeStatusText: covers stateNotRunning / stateRunning (with consumer
table) / stateOrphan (with kill hint).
- writeStatusJSON: orphan entry carries suggested_action + issue;
running entry must NOT carry those fields (hint-leak guard for
scripts that key on issue != "").
- exitForOrphan: flag-off never errors; flag-on errors iff any orphan
is present, with ExitValidation code.
- NewCmdConsume / NewCmdStatus / NewCmdStop / NewCmdList / NewCmdBus:
flag registration + RunE presence, so review catches flag-name drift.
NewCmdBus check also pins Hidden=true.
Lifts cmd/event coverage 51.7% → 61.1%; aggregate event-package
coverage crosses the 60% codecov patch threshold (62% locally).
Change-Id: I9ecf3d905a8f9607b9441ee8a61e746496e2be63
* fix(event): address lint + deadcode CI failures
4 golangci-lint findings + 1 deadcode finding flagged on PR #654.
lint
----
1. cmd/event/stop.go:86 (ineffassign): `targets := []string{}` is
overwritten by both branches of the `if o.all` below, so the empty-
slice initializer is dead. Switched to `var targets []string`.
2. cmd/event/consume.go nilerr: the user-identity scope preflight
swallows a non-nil ResolveToken error and returns nil. This is
intentional — a missing/expired user token must not block consume;
the bus handshake will surface the real auth error with actionable
hints. Added `//nolint:nilerr` with a 4-line comment pinning the
reasoning.
3. events/im/message_receive.go:62 nilerr: malformed JSON payload
returns the original bytes + nil so consumers still see the event
(the WARN breadcrumb lives in the outer loop). Added
`//nolint:nilerr` with a one-line comment.
4. internal/event/schemas/fromtype_test.go:26 unused: `unexportedStr`
is a reflection-test fixture — its presence (not value) exercises
the FromType skip-unexported path verified at the "unexported
field should not be in schema" assertion. Added `//nolint:unused`
and a 4-line comment pointing at the guarded assertion.
deadcode
--------
5. internal/event/testutil/testutil.go: NewTCPFake has no callers in
the repo. Removed the constructor plus the `inner == nil` TCP-mode
branches from Listen / Dial / Cleanup. FakeTransport now only
supports the wrapped-overlay mode (NewWrappedFake), which is the
one every existing test uses. Doc comment simplified accordingly.
Verified locally: go test -race -timeout 120s across ./cmd/event/...,
./events/..., ./internal/event/... all green; gofmt clean; go vet
clean.
Change-Id: Ie8a2270827a0bde6b8159ab70aaf5c1e9ca7d5b9
* fix(event): drop stale enum + simplify protocol test type helper
- events/im/message_receive.go: dropped the `enum` tag on
ImMessageReceiveOutput.MessageType. convertlib registers many more
message types than the old 11-item list (video / location /
calendar / todo / vote / hongbao / merge_forward / folder / ...),
so a partial enum would tell AI consumers that valid values like
"video" are invalid and produce false-negative JQ filters.
- internal/event/protocol/messages_test.go: collapsed the
typeOf → reflectTypeName → stringType chain in
TestEncode_DecodeRoundtripAllTypes to a single fmt.Sprintf("%T", v).
The hand-maintained type switch silently returned "<unknown>" for
any new message type, which would have let future Decode bugs slip
past the roundtrip assertion. Also removed a dead `cases` table at
the top of TestConstructors_PinTypeField left over from an earlier
refactor.
Change-Id: I831e96f8417e80637596030d652a559de0d33122
* docs(event): polish skill docs + rename root_path_hint to jq_root_path
- skills/lark-event/SKILL.md, lark-event-im.md: translated to English,
reorganized around a top-level "Core commands" table, scenario
recipes tightened.
- cmd/event/schema.go: renamed the writeSchemaJSON hint field
RootPathHint / "root_path_hint" -> JQRootPath / "jq_root_path" to
make its purpose (a jq path prefix) obvious at the call site; no
external consumer depends on the old name yet.
Change-Id: I00c14061ca33caedc0975bfeadc4b26d3dcd314d
* chore(event): strip excessive comments
Change-Id: I8f44f36f5dbdba3ef95dfc67069dc796232f91ec
* fix(event): dedup self-eviction race + protocol oversized-frame test
dedup: in IsDuplicate, the ring-slot eviction step deleted seen[id] even
when ring[pos] equalled the freshly-recorded id (post-TTL reinsertion
landing on its own historical slot). Net result: ring still held id but
seen did not, so the next IsDuplicate(id) returned false and the
duplicate was delivered. Skip the delete when old == eventID. New
TestDedupFilter_SelfEvictionPreservesFreshEntry pins the invariant by
pre-loading the ring slot and asserting the second call still reports
duplicate.
protocol: TestReadFrame_RejectsOversized used strings.Contains feeding
t.Logf, so any non-nil error passed — including a future regression
that returned io.ErrUnexpectedEOF while silently keeping the buffer
unbounded. Promoted MaxFrameBytes overflow to a sentinel
ErrFrameTooLarge and the test now asserts via errors.Is.
Change-Id: I50281dad392152b0ca083fd30c38eb0695e63bd3
* docs(event): clarify .content shape per message_type + add sender filter recipe
Change-Id: I619fd15c1a362e42e6602fd3e3316bbc75eddc5e
* fix(event): replace cmdline-regex bus discovery with PID file + close concurrent fork race
Bus discovery previously walked the OS process table and parsed `--profile cli_*` from
cmdline; the regex rejected any non-cli_ profile name (D-03a). Replace with per-AppID
bus.pid + bus.alive.lock under events/<AppID>/, probed via try-lock. AppID round-trips
through the directory name, so the profile-vs-AppID confusion is gone by construction.
Also fix B-07 (two consumers each fork an independent bus, halving event delivery):
- forkBus holds bus.fork.lock until child is dial-able, not just until cmd.Start
- bus daemon takes alive.lock before binding the socket; cleanup-TOCTOU race can no
longer leave two listeners on different inodes
status.go renders an orphan with PID=0 distinctly (live bus but pid file unreadable)
so we never print "Action: kill 0".
Change-Id: I3bf0a6cf1d91fb274ac5a6df83d66896aafb291f
* style(event): gofmt bus.go
Trailing blank line introduced when appending acquireAliveLock helper.
Change-Id: I4ae1b4a4363dc6c89dcbd6a170f4563117490ba3
* fix(event): swap os.Remove/Rename for vfs.* and silence forbidigo on internal diagnostics
golangci-lint forbidigo blocks os.* in internal/. Switch the pid-file write to vfs.Remove/vfs.Rename and add a nolint marker on the two stderr diagnostics in busdiscover, matching the existing pattern in consume/*.
Change-Id: Ia6768be62aefeb8ca40f991d3130a78ef2ec0ea5
* fix(event): cross-platform --all + clean SIGPIPE shutdown for consume
- stop --all: replace bus.sock-file probe with busdiscover lock-based
scan; previously skipped Windows entirely (named-pipe transport, no
socket on disk) and misidentified Unix stale sockets as live. Same
win for `event status` (shares discoverAppIDs).
- consume: ignore SIGPIPE so a closed stdout pipe (e.g. `... | head -n 1`)
surfaces as EPIPE error and reaches the existing isTerminalSinkError
cleanup path (log "output pipe closed", lastForKey query, hub
unregister), instead of being killed by Go's default fd 1/2 SIGPIPE
handler with exit 141 and zero deferred cleanup.
Build-tagged: real on unix, no-op on windows (no SIGPIPE there).
Change-Id: I453b19f05c489fd9d5c1a9ba3bdc35e127c15b83
* docs(event): translate IM EventKey descriptions and field tags to English
Aligns with the rest of the codebase (titles, struct names, README) which
are already in English. Surfaces in `event list` / `event schema` and is
also consumed by AI agents.
- events/im/message_receive.go: 11 desc tags on ImMessageReceiveOutput
- events/im/native.go: 10 description fields on Native EventKeys
- events/im/register.go: im.message.receive_v1 Description
Change-Id: I6f46950b4793f137e0129c1f06019a3419195443
* docs(event): drop misleading AuthTypes[0] auto-default claim
The KeyDefinition comment and SKILL.md flag table both stated that
`--as auto` resolves to `AuthTypes[0]`. It does not — ResolveAs goes
through global rules (config default_as / credential hint / `bot`
fallback) without consulting the EventKey. AuthTypes is only used by
CheckIdentity as a post-resolve whitelist.
Reword the field comment to plain whitelist semantics and have SKILL.md
defer `--as` documentation to lark-shared.
Change-Id: Ia5d3d3790aed05813a0fa72d6b43518224e2055b
* revert(comments): restore original comments on 3rd-party files
e61482a stripped comments across 105 files. Restore the four files
authored by others (cmd/build.go, shortcuts/common/{types,runner}.go,
shortcuts/event/subscribe.go) to their pre-strip state so unrelated
documentation isn't churned in this PR.
Change-Id: Ie2527b06bfaf5b3861b0b9dff1e19bbfe7dde456
353 lines
8.4 KiB
Go
353 lines
8.4 KiB
Go
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
//go:build !windows
|
|
|
|
package event_test
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/larksuite/cli/internal/event"
|
|
"github.com/larksuite/cli/internal/event/bus"
|
|
"github.com/larksuite/cli/internal/event/protocol"
|
|
"github.com/larksuite/cli/internal/event/source"
|
|
"github.com/larksuite/cli/internal/event/testutil"
|
|
"github.com/larksuite/cli/internal/event/transport"
|
|
)
|
|
|
|
type integTestOut struct{ A string }
|
|
|
|
func integNativeSchema() event.SchemaDef {
|
|
return event.SchemaDef{Native: &event.SchemaSpec{Type: reflect.TypeOf(integTestOut{})}}
|
|
}
|
|
|
|
func waitForBusReady(t *testing.T, tr transport.IPC, addr string) {
|
|
t.Helper()
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
if conn, err := tr.Dial(addr); err == nil {
|
|
conn.Close()
|
|
return
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
t.Fatalf("bus at %s did not come up within 2s", addr)
|
|
}
|
|
|
|
func runBus(t *testing.T, b *bus.Bus, ctx context.Context) {
|
|
t.Helper()
|
|
errCh := make(chan error, 1)
|
|
go func() { errCh <- b.Run(ctx) }()
|
|
t.Cleanup(func() {
|
|
select {
|
|
case err := <-errCh:
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
t.Errorf("bus.Run returned unexpected error: %v", err)
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Log("bus did not exit within 2s of test cleanup (non-fatal)")
|
|
}
|
|
})
|
|
}
|
|
|
|
type mockIntegSource struct {
|
|
mu sync.Mutex
|
|
emitFn func(*event.RawEvent)
|
|
}
|
|
|
|
func (s *mockIntegSource) Name() string { return "mock-integration" }
|
|
|
|
func (s *mockIntegSource) Start(ctx context.Context, _ []string, emit func(*event.RawEvent), _ source.StatusNotifier) error {
|
|
s.mu.Lock()
|
|
s.emitFn = emit
|
|
s.mu.Unlock()
|
|
<-ctx.Done()
|
|
return nil
|
|
}
|
|
|
|
func (s *mockIntegSource) emit(e *event.RawEvent) {
|
|
s.mu.Lock()
|
|
fn := s.emitFn
|
|
s.mu.Unlock()
|
|
if fn != nil {
|
|
fn(e)
|
|
}
|
|
}
|
|
|
|
func TestIntegration_BusToConsume(t *testing.T) {
|
|
event.ResetRegistryForTest()
|
|
source.ResetForTest()
|
|
|
|
event.RegisterKey(event.KeyDefinition{
|
|
Key: "test.event.v1",
|
|
EventType: "test.event.v1",
|
|
Schema: integNativeSchema(),
|
|
})
|
|
|
|
mockSrc := &mockIntegSource{}
|
|
source.Register(mockSrc)
|
|
|
|
dir := t.TempDir()
|
|
addr := filepath.Join(dir, "t.sock")
|
|
|
|
tr := transport.New()
|
|
logger := log.New(os.Stderr, "[test-bus] ", log.LstdFlags)
|
|
|
|
testTr := testutil.NewWrappedFake(tr, addr)
|
|
b := bus.NewBus("test-app", "test-secret", "", testTr, logger)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
runBus(t, b, ctx)
|
|
waitForBusReady(t, testTr, addr)
|
|
|
|
conn, err := testTr.Dial(addr)
|
|
if err != nil {
|
|
t.Fatalf("dial failed: %v", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
hello := &protocol.Hello{
|
|
Type: protocol.MsgTypeHello,
|
|
PID: os.Getpid(),
|
|
EventKey: "test.event.v1",
|
|
EventTypes: []string{"test.event.v1"},
|
|
Version: "v1",
|
|
}
|
|
if err := protocol.Encode(conn, hello); err != nil {
|
|
t.Fatalf("encode hello: %v", err)
|
|
}
|
|
|
|
scanner := bufio.NewScanner(conn)
|
|
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
|
|
if !scanner.Scan() {
|
|
t.Fatal("no hello_ack received")
|
|
}
|
|
msg, err := protocol.Decode(scanner.Bytes())
|
|
if err != nil {
|
|
t.Fatalf("decode hello_ack: %v", err)
|
|
}
|
|
ack, ok := msg.(*protocol.HelloAck)
|
|
if !ok {
|
|
t.Fatalf("expected HelloAck, got %T", msg)
|
|
}
|
|
if !ack.FirstForKey {
|
|
t.Error("expected first_for_key to be true")
|
|
}
|
|
|
|
mockSrc.emit(&event.RawEvent{
|
|
EventID: "evt-integration-1",
|
|
EventType: "test.event.v1",
|
|
Payload: json.RawMessage(`{"test": true}`),
|
|
Timestamp: time.Now(),
|
|
})
|
|
|
|
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
|
|
if !scanner.Scan() {
|
|
t.Fatal("no event received")
|
|
}
|
|
evtMsg, err := protocol.Decode(scanner.Bytes())
|
|
if err != nil {
|
|
t.Fatalf("decode event: %v", err)
|
|
}
|
|
evt, ok := evtMsg.(*protocol.Event)
|
|
if !ok {
|
|
t.Fatalf("expected Event, got %T", evtMsg)
|
|
}
|
|
if evt.EventType != "test.event.v1" {
|
|
t.Errorf("expected event_type %q, got %q", "test.event.v1", evt.EventType)
|
|
}
|
|
var payloadMap map[string]interface{}
|
|
if err := json.Unmarshal(evt.Payload, &payloadMap); err != nil {
|
|
t.Fatalf("unmarshal payload: %v", err)
|
|
}
|
|
if v, ok := payloadMap["test"]; !ok || v != true {
|
|
t.Errorf("unexpected payload: %s", string(evt.Payload))
|
|
}
|
|
|
|
conn.Close()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
cancel()
|
|
}
|
|
|
|
func TestIntegration_MultipleConsumers(t *testing.T) {
|
|
event.ResetRegistryForTest()
|
|
source.ResetForTest()
|
|
|
|
event.RegisterKey(event.KeyDefinition{
|
|
Key: "multi.event.v1",
|
|
EventType: "multi.event.v1",
|
|
Schema: integNativeSchema(),
|
|
})
|
|
|
|
mockSrc := &mockIntegSource{}
|
|
source.Register(mockSrc)
|
|
|
|
dir := t.TempDir()
|
|
addr := filepath.Join(dir, "m.sock")
|
|
tr := transport.New()
|
|
logger := log.New(os.Stderr, "[test-multi] ", log.LstdFlags)
|
|
|
|
testTr := testutil.NewWrappedFake(tr, addr)
|
|
b := bus.NewBus("test-multi", "test-secret", "", testTr, logger)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
runBus(t, b, ctx)
|
|
waitForBusReady(t, testTr, addr)
|
|
|
|
connectConsumer := func(name string) (net.Conn, *bufio.Scanner) {
|
|
conn, err := testTr.Dial(addr)
|
|
if err != nil {
|
|
t.Fatalf("dial %s: %v", name, err)
|
|
}
|
|
hello := &protocol.Hello{
|
|
Type: protocol.MsgTypeHello,
|
|
PID: os.Getpid(),
|
|
EventKey: "multi.event.v1",
|
|
EventTypes: []string{"multi.event.v1"},
|
|
Version: "v1",
|
|
}
|
|
protocol.Encode(conn, hello)
|
|
sc := bufio.NewScanner(conn)
|
|
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
|
|
if !sc.Scan() {
|
|
t.Fatalf("%s: no hello_ack", name)
|
|
}
|
|
msg, _ := protocol.Decode(sc.Bytes())
|
|
if _, ok := msg.(*protocol.HelloAck); !ok {
|
|
t.Fatalf("%s: expected HelloAck, got %T", name, msg)
|
|
}
|
|
return conn, sc
|
|
}
|
|
|
|
conn1, sc1 := connectConsumer("consumer-1")
|
|
defer conn1.Close()
|
|
conn2, sc2 := connectConsumer("consumer-2")
|
|
defer conn2.Close()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
mockSrc.emit(&event.RawEvent{
|
|
EventID: "evt-multi-1",
|
|
EventType: "multi.event.v1",
|
|
Payload: json.RawMessage(`{"fan":"out"}`),
|
|
Timestamp: time.Now(),
|
|
})
|
|
|
|
for _, tc := range []struct {
|
|
name string
|
|
conn net.Conn
|
|
sc *bufio.Scanner
|
|
}{
|
|
{"consumer-1", conn1, sc1},
|
|
{"consumer-2", conn2, sc2},
|
|
} {
|
|
tc.conn.SetReadDeadline(time.Now().Add(3 * time.Second))
|
|
if !tc.sc.Scan() {
|
|
t.Fatalf("%s: no event received", tc.name)
|
|
}
|
|
evtMsg, err := protocol.Decode(tc.sc.Bytes())
|
|
if err != nil {
|
|
t.Fatalf("%s: decode event: %v", tc.name, err)
|
|
}
|
|
evt, ok := evtMsg.(*protocol.Event)
|
|
if !ok {
|
|
t.Fatalf("%s: expected Event, got %T", tc.name, evtMsg)
|
|
}
|
|
if evt.EventType != "multi.event.v1" {
|
|
t.Errorf("%s: expected event_type %q, got %q", tc.name, "multi.event.v1", evt.EventType)
|
|
}
|
|
}
|
|
|
|
cancel()
|
|
}
|
|
|
|
func TestIntegration_DedupFilter(t *testing.T) {
|
|
event.ResetRegistryForTest()
|
|
source.ResetForTest()
|
|
|
|
event.RegisterKey(event.KeyDefinition{
|
|
Key: "dedup.event.v1",
|
|
EventType: "dedup.event.v1",
|
|
Schema: integNativeSchema(),
|
|
})
|
|
|
|
mockSrc := &mockIntegSource{}
|
|
source.Register(mockSrc)
|
|
|
|
dir := t.TempDir()
|
|
addr := filepath.Join(dir, "d.sock")
|
|
tr := transport.New()
|
|
logger := log.New(os.Stderr, "[test-dedup] ", log.LstdFlags)
|
|
|
|
testTr := testutil.NewWrappedFake(tr, addr)
|
|
b := bus.NewBus("test-dedup", "test-secret", "", testTr, logger)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
runBus(t, b, ctx)
|
|
waitForBusReady(t, testTr, addr)
|
|
|
|
conn, err := testTr.Dial(addr)
|
|
if err != nil {
|
|
t.Fatalf("dial: %v", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
hello := &protocol.Hello{
|
|
Type: protocol.MsgTypeHello,
|
|
PID: os.Getpid(),
|
|
EventKey: "dedup.event.v1",
|
|
EventTypes: []string{"dedup.event.v1"},
|
|
Version: "v1",
|
|
}
|
|
protocol.Encode(conn, hello)
|
|
sc := bufio.NewScanner(conn)
|
|
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
|
|
if !sc.Scan() {
|
|
t.Fatal("no hello_ack")
|
|
}
|
|
|
|
for i := 0; i < 2; i++ {
|
|
mockSrc.emit(&event.RawEvent{
|
|
EventID: "evt-dedup-same",
|
|
EventType: "dedup.event.v1",
|
|
Payload: json.RawMessage(`{"dup": true}`),
|
|
Timestamp: time.Now(),
|
|
})
|
|
}
|
|
|
|
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
|
if !sc.Scan() {
|
|
t.Fatal("expected at least one event")
|
|
}
|
|
evtMsg, _ := protocol.Decode(sc.Bytes())
|
|
if _, ok := evtMsg.(*protocol.Event); !ok {
|
|
t.Fatalf("expected Event, got %T", evtMsg)
|
|
}
|
|
|
|
conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
|
|
if sc.Scan() {
|
|
t.Error("received duplicate event; dedup filter should have blocked it")
|
|
}
|
|
|
|
cancel()
|
|
}
|