diff --git a/cmd/event/format_helpers_test.go b/cmd/event/format_helpers_test.go index 939b85d2..a9aaf694 100644 --- a/cmd/event/format_helpers_test.go +++ b/cmd/event/format_helpers_test.go @@ -143,6 +143,79 @@ func TestWriteStatusText_CoversAllStates(t *testing.T) { } } +func TestWriteStatusText_ShowsSubColumn(t *testing.T) { + var buf bytes.Buffer + writeStatusText(&buf, []appStatus{ + { + AppID: "cli_RUNNINGXXXXXXXXX", + State: stateRunning, + PID: 1234, + UptimeSec: 60, + Active: 2, + Consumers: []protocol.ConsumerInfo{ + {PID: 1001, EventKey: "mail.x", SubscriptionID: "mail.x:alice", Received: 5, Dropped: 0}, + {PID: 1002, EventKey: "mail.x", SubscriptionID: "mail.x:bob", Received: 3, Dropped: 0}, + }, + }, + }) + out := buf.String() + if !strings.Contains(out, "SUB") { + t.Errorf("missing SUB column header: %s", out) + } + if !strings.Contains(out, "alice") { + t.Errorf("missing alice suffix in SUB column: %s", out) + } + if !strings.Contains(out, "bob") { + t.Errorf("missing bob suffix in SUB column: %s", out) + } +} + +func TestWriteStatusText_LegacySubscriptionID_RendersDash(t *testing.T) { + var buf bytes.Buffer + writeStatusText(&buf, []appStatus{ + { + AppID: "cli_RUNNINGXXXXXXXXX", + State: stateRunning, + PID: 1234, + UptimeSec: 60, + Active: 1, + Consumers: []protocol.ConsumerInfo{ + {PID: 1001, EventKey: "im.x", SubscriptionID: "", Received: 5}, + }, + }, + }) + out := buf.String() + if !strings.Contains(out, "SUB") { + t.Errorf("missing SUB header: %s", out) + } + if !strings.Contains(out, "-") { + t.Errorf("missing dash placeholder for empty SubscriptionID: %s", out) + } +} + +func TestWriteStatusText_EventKeyEqualSubscriptionID_RendersDash(t *testing.T) { + var buf bytes.Buffer + writeStatusText(&buf, []appStatus{ + { + AppID: "cli_RUNNINGXXXXXXXXX", + State: stateRunning, + PID: 1234, + UptimeSec: 60, + Active: 1, + Consumers: []protocol.ConsumerInfo{ + {PID: 1001, EventKey: "im.x", SubscriptionID: "im.x", Received: 5}, + }, + }, + }) + out := buf.String() + if !strings.Contains(out, "SUB") { + t.Errorf("missing SUB header: %s", out) + } + if !strings.Contains(out, "-") { + t.Errorf("missing dash placeholder when SubscriptionID==EventKey: %s", out) + } +} + func TestWriteStatusJSON_OrphanHint(t *testing.T) { var buf bytes.Buffer if err := writeStatusJSON(&buf, []appStatus{ diff --git a/cmd/event/schema.go b/cmd/event/schema.go index 74b56edf..c078767f 100644 --- a/cmd/event/schema.go +++ b/cmd/event/schema.go @@ -134,12 +134,16 @@ func runSchema(f *cmdutil.Factory, key string, asJSON bool) error { if len(def.Params) > 0 { fmt.Fprintf(out, "\nParameters:\n") w := tabwriter.NewWriter(out, 0, 4, 2, ' ', 0) - fmt.Fprintf(w, " NAME\tTYPE\tREQUIRED\tDEFAULT\tDESCRIPTION\n") + fmt.Fprintf(w, " NAME\tTYPE\tREQUIRED\tSUB-KEY\tDEFAULT\tDESCRIPTION\n") for _, p := range def.Params { required := "no" if p.Required { required = "yes" } + subKey := "no" + if p.SubscriptionKey { + subKey = "yes" + } defaultVal := p.Default if defaultVal == "" { defaultVal = "-" @@ -148,7 +152,7 @@ func runSchema(f *cmdutil.Factory, key string, asJSON bool) error { if desc == "" { desc = "-" } - fmt.Fprintf(w, " %s\t%s\t%s\t%s\t%s\n", p.Name, p.Type, required, defaultVal, desc) + fmt.Fprintf(w, " %s\t%s\t%s\t%s\t%s\t%s\n", p.Name, p.Type, required, subKey, defaultVal, desc) } w.Flush() diff --git a/cmd/event/schema_test.go b/cmd/event/schema_test.go index 2fb67f0b..e02f6758 100644 --- a/cmd/event/schema_test.go +++ b/cmd/event/schema_test.go @@ -96,6 +96,79 @@ func TestRunSchema_JSONOutput(t *testing.T) { } } +func TestSchema_RendersSubscriptionKeyMarker(t *testing.T) { + const syntheticKey = "test.evt_sub" + t.Cleanup(func() { eventlib.UnregisterKeyForTest(syntheticKey) }) + + eventlib.RegisterKey(eventlib.KeyDefinition{ + Key: syntheticKey, + EventType: syntheticKey, + Params: []eventlib.ParamDef{ + {Name: "mailbox", SubscriptionKey: true, Description: "subscription id source"}, + {Name: "folders", Description: "filter only"}, + }, + Schema: eventlib.SchemaDef{Native: &eventlib.SchemaSpec{Type: reflect.TypeOf(struct{ X string }{})}}, + }) + + f, stdout, _, _ := cmdutil.TestFactory(t, &core.CliConfig{AppID: "test"}) + if err := runSchema(f, syntheticKey, false); err != nil { + t.Fatalf("runSchema: %v", err) + } + + out := stdout.String() + if !strings.Contains(out, "SUB-KEY") { + t.Errorf("missing SUB-KEY column header in:\n%s", out) + } + + // Find the mailbox row and verify "yes" is present + var mailboxRow string + for _, ln := range strings.Split(out, "\n") { + if strings.Contains(ln, "mailbox") && !strings.Contains(ln, "NAME") { + mailboxRow = ln + break + } + } + if !strings.Contains(mailboxRow, "yes") { + t.Errorf("mailbox row missing yes SUB-KEY marker: %q", mailboxRow) + } + + // Find the folders row and verify "no" is present + var foldersRow string + for _, ln := range strings.Split(out, "\n") { + if strings.Contains(ln, "folders") && !strings.Contains(ln, "NAME") { + foldersRow = ln + break + } + } + if !strings.Contains(foldersRow, "no") { + t.Errorf("folders row missing no SUB-KEY marker: %q", foldersRow) + } +} + +func TestSchema_JSON_IncludesSubscriptionKey(t *testing.T) { + const syntheticKey = "test.evt_json" + t.Cleanup(func() { eventlib.UnregisterKeyForTest(syntheticKey) }) + + eventlib.RegisterKey(eventlib.KeyDefinition{ + Key: syntheticKey, + EventType: syntheticKey, + Params: []eventlib.ParamDef{{Name: "mailbox", SubscriptionKey: true}}, + Schema: eventlib.SchemaDef{Native: &eventlib.SchemaSpec{Type: reflect.TypeOf(struct{ X string }{})}}, + }) + + f, stdout, _, _ := cmdutil.TestFactory(t, &core.CliConfig{AppID: "test"}) + if err := runSchema(f, syntheticKey, true); err != nil { + t.Fatalf("runSchema json: %v", err) + } + + if !strings.Contains(stdout.String(), `"subscription_key"`) { + t.Errorf("JSON output missing subscription_key field: %s", stdout.String()) + } + if !strings.Contains(stdout.String(), `true`) { + t.Errorf("JSON output missing subscription_key: true value: %s", stdout.String()) + } +} + func TestResolveSchemaJSON_CustomWithOverlay(t *testing.T) { const syntheticKey = "t.custom.overlay" t.Cleanup(func() { eventlib.UnregisterKeyForTest(syntheticKey) }) diff --git a/cmd/event/status.go b/cmd/event/status.go index 92c8be25..ece33958 100644 --- a/cmd/event/status.go +++ b/cmd/event/status.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "sort" + "strings" "sync" "time" @@ -242,12 +243,17 @@ func writeStatusText(out io.Writer, statuses []appStatus) { s.PID, (time.Duration(s.UptimeSec) * time.Second).String()) fmt.Fprintf(out, " Active consumers: %d\n", s.Active) if len(s.Consumers) > 0 { - headers := []string{"CONSUMER", "EVENT KEY", "RECEIVED", "DROPPED"} + headers := []string{"CONSUMER", "EVENT KEY", "SUB", "RECEIVED", "DROPPED"} rows := make([][]string, 0, len(s.Consumers)) for _, c := range s.Consumers { + subDisplay := "-" + if c.SubscriptionID != "" && c.SubscriptionID != c.EventKey { + subDisplay = strings.TrimPrefix(c.SubscriptionID, c.EventKey+":") + } rows = append(rows, []string{ fmt.Sprintf("pid=%d", c.PID), c.EventKey, + subDisplay, fmt.Sprintf("%d", c.Received), fmt.Sprintf("%d", c.Dropped), }) diff --git a/events/minutes/preconsume.go b/events/minutes/preconsume.go index 2429bc53..b396f637 100644 --- a/events/minutes/preconsume.go +++ b/events/minutes/preconsume.go @@ -13,8 +13,8 @@ import ( const cleanupTimeout = 5 * time.Second -func subscriptionPreConsume(eventType, subscribePath, unsubscribePath string) func(context.Context, event.APIClient, map[string]string) (func(), error) { - return func(ctx context.Context, rt event.APIClient, _ map[string]string) (func(), error) { +func subscriptionPreConsume(eventType, subscribePath, unsubscribePath string) func(context.Context, event.APIClient, map[string]string) (func() error, error) { + return func(ctx context.Context, rt event.APIClient, _ map[string]string) (func() error, error) { if rt == nil { return nil, errs.NewInternalError(errs.SubtypeUnknown, "runtime API client is required for pre-consume subscription") @@ -25,10 +25,13 @@ func subscriptionPreConsume(eventType, subscribePath, unsubscribePath string) fu return nil, err } - return func() { + return func() error { cleanupCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout) defer cancel() - _, _ = rt.CallAPI(cleanupCtx, "POST", unsubscribePath, body) + if _, err := rt.CallAPI(cleanupCtx, "POST", unsubscribePath, body); err != nil { + return err + } + return nil }, nil } } diff --git a/events/vc/preconsume.go b/events/vc/preconsume.go index 1fa78cbc..ce7e16f7 100644 --- a/events/vc/preconsume.go +++ b/events/vc/preconsume.go @@ -13,8 +13,8 @@ import ( const cleanupTimeout = 5 * time.Second -func subscriptionPreConsume(eventType, subscribePath, unsubscribePath string) func(context.Context, event.APIClient, map[string]string) (func(), error) { - return func(ctx context.Context, rt event.APIClient, _ map[string]string) (func(), error) { +func subscriptionPreConsume(eventType, subscribePath, unsubscribePath string) func(context.Context, event.APIClient, map[string]string) (func() error, error) { + return func(ctx context.Context, rt event.APIClient, _ map[string]string) (func() error, error) { if rt == nil { return nil, errs.NewInternalError(errs.SubtypeUnknown, "runtime API client is required for pre-consume subscription") @@ -25,10 +25,13 @@ func subscriptionPreConsume(eventType, subscribePath, unsubscribePath string) fu return nil, err } - return func() { + return func() error { cleanupCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout) defer cancel() - _, _ = rt.CallAPI(cleanupCtx, "POST", unsubscribePath, body) + if _, err := rt.CallAPI(cleanupCtx, "POST", unsubscribePath, body); err != nil { + return err + } + return nil }, nil } } diff --git a/events/whiteboard/preconsume.go b/events/whiteboard/preconsume.go index 6e60416c..02c27fe6 100644 --- a/events/whiteboard/preconsume.go +++ b/events/whiteboard/preconsume.go @@ -22,8 +22,8 @@ const cleanupTimeout = 5 * time.Second // // board.whiteboard.updated_v1 is subscribed per-whiteboard (by whiteboard_id), // so the path contains a :whiteboard_id placeholder that must be supplied via params. -func whiteboardSubscriptionPreConsume(eventType string) func(context.Context, event.APIClient, map[string]string) (func(), error) { - return func(ctx context.Context, rt event.APIClient, params map[string]string) (func(), error) { +func whiteboardSubscriptionPreConsume(eventType string) func(context.Context, event.APIClient, map[string]string) (func() error, error) { + return func(ctx context.Context, rt event.APIClient, params map[string]string) (func() error, error) { if rt == nil { return nil, errs.NewInternalError(errs.SubtypeUnknown, "runtime API client is required for pre-consume subscription") @@ -44,10 +44,13 @@ func whiteboardSubscriptionPreConsume(eventType string) func(context.Context, ev return nil, err } - return func() { + return func() error { cleanupCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout) defer cancel() - _, _ = rt.CallAPI(cleanupCtx, "POST", unsubscribePath, body) + if _, err := rt.CallAPI(cleanupCtx, "POST", unsubscribePath, body); err != nil { + return err + } + return nil }, nil } } diff --git a/internal/event/bus/bus.go b/internal/event/bus/bus.go index 469e9f12..d018ce86 100644 --- a/internal/event/bus/bus.go +++ b/internal/event/bus/bus.go @@ -262,19 +262,23 @@ func (b *Bus) handleConn(conn net.Conn) { // handleHello registers a consume connection with the hub; reader carries bytes already pulled off conn. func (b *Bus) handleHello(conn net.Conn, reader *bufio.Reader, hello *protocol.Hello) { - bc := NewConn(conn, reader, hello.EventKey, hello.EventTypes, hello.PID) + subID := hello.SubscriptionID + if subID == "" { + subID = hello.EventKey + } + bc := NewConn(conn, reader, hello.EventKey, hello.EventTypes, hello.PID, subID) bc.SetLogger(b.logger) // Register + isFirst under one lock; blocks on any in-progress cleanup lock for the same EventKey. firstForKey := b.hub.RegisterAndIsFirst(bc) - bc.SetCheckLastForKey(func(eventKey string) bool { - return b.hub.AcquireCleanupLock(eventKey) + bc.SetCheckLastForKey(func(scope string) bool { + return b.hub.AcquireCleanupLock(scope) }) bc.SetOnClose(func(c *Conn) { b.hub.UnregisterAndIsLast(c) // Release is idempotent and must fire on every disconnect path so waiters don't block forever. - b.hub.ReleaseCleanupLock(c.EventKey()) + b.hub.ReleaseCleanupLock(c.SubscriptionID()) b.mu.Lock() delete(b.conns, c) remaining := len(b.conns) diff --git a/internal/event/bus/bus_shutdown_test.go b/internal/event/bus/bus_shutdown_test.go index 989caefc..98ff0740 100644 --- a/internal/event/bus/bus_shutdown_test.go +++ b/internal/event/bus/bus_shutdown_test.go @@ -33,7 +33,7 @@ func TestRunShutdownWithMultipleConns(t *testing.T) { server, client := net.Pipe() pipes = append(pipes, server, client) - bc := NewConn(server, nil, "im.msg", []string{"im.message.receive_v1"}, 1000+i) + bc := NewConn(server, nil, "im.msg", []string{"im.message.receive_v1"}, 1000+i, "") bc.SetLogger(logger) hub.RegisterAndIsFirst(bc) diff --git a/internal/event/bus/conn.go b/internal/event/bus/conn.go index 833231ff..c827d800 100644 --- a/internal/event/bus/conn.go +++ b/internal/event/bus/conn.go @@ -29,9 +29,10 @@ type Conn struct { writeMu sync.Mutex // serialises all net.Conn writes (Encode+SetWriteDeadline is a 2-call sequence) eventKey string eventTypes []string + subID string pid int onClose func(*Conn) - checkLastForKey func(eventKey string) bool + checkLastForKey func(scope string) bool logger *log.Logger closed chan struct{} closeOnce sync.Once @@ -41,7 +42,7 @@ type Conn struct { } // NewConn creates a Conn; pass a reader with pre-buffered bytes (handoff from Bus.handleConn) or nil for a fresh one. -func NewConn(conn net.Conn, reader *bufio.Reader, eventKey string, eventTypes []string, pid int) *Conn { +func NewConn(conn net.Conn, reader *bufio.Reader, eventKey string, eventTypes []string, pid int, subID string) *Conn { if reader == nil { reader = bufio.NewReader(conn) } @@ -52,10 +53,20 @@ func NewConn(conn net.Conn, reader *bufio.Reader, eventKey string, eventTypes [] eventKey: eventKey, eventTypes: eventTypes, pid: pid, + subID: subID, closed: make(chan struct{}), } } +// SubscriptionID returns the subscription identity. Falls back to EventKey +// when the stored subID is empty (legacy clients / no-SubscriptionKey EventKeys). +func (c *Conn) SubscriptionID() string { + if c.subID == "" { + return c.eventKey + } + return c.subID +} + func (c *Conn) SetOnClose(fn func(*Conn)) { c.onClose = fn } // SetCheckLastForKey: returning true means "you are the last subscriber, run cleanup". @@ -132,13 +143,19 @@ func (c *Conn) ReaderLoop() { } func (c *Conn) handleControlMessage(msg interface{}) { - switch m := msg.(type) { + switch msg.(type) { case *protocol.Bye: c.shutdown() case *protocol.PreShutdownCheck: + // Use the connection's own authoritative subscription identity rather + // than recomputing from the incoming message: a stale or mismatched + // PreShutdownCheck must not ask about the wrong scope (which would + // suppress or mistrigger per-subscription cleanup). Conn.SubscriptionID() + // already falls back to EventKey when its stored subID is empty. + scope := c.SubscriptionID() lastForKey := true if c.checkLastForKey != nil { - lastForKey = c.checkLastForKey(m.EventKey) + lastForKey = c.checkLastForKey(scope) } ack := protocol.NewPreShutdownAck(lastForKey) if err := c.writeFrame(ack); err != nil && c.logger != nil { diff --git a/internal/event/bus/conn_test.go b/internal/event/bus/conn_test.go index aaa2d4f9..1beaf348 100644 --- a/internal/event/bus/conn_test.go +++ b/internal/event/bus/conn_test.go @@ -21,7 +21,7 @@ func TestConn_SenderWritesEvents(t *testing.T) { defer server.Close() defer client.Close() - bc := NewConn(server, nil, "im.msg", []string{"im.message.receive_v1"}, 12345) + bc := NewConn(server, nil, "im.msg", []string{"im.message.receive_v1"}, 12345, "") go bc.SenderLoop() bc.SendCh() <- &protocol.Event{ @@ -62,7 +62,7 @@ func TestConn_ConcurrentWritesSerialised(t *testing.T) { defer client.Close() det := &serializingDetector{Conn: server} - bc := NewConn(det, nil, "im.msg", []string{"im.msg"}, 12345) + bc := NewConn(det, nil, "im.msg", []string{"im.msg"}, 12345, "") go func() { _, _ = io.Copy(io.Discard, client) }() @@ -106,7 +106,7 @@ func TestConn_TrySend_NonEvicting(t *testing.T) { server, client := net.Pipe() defer server.Close() defer client.Close() - bc := NewConn(server, nil, "im.msg", []string{"im.msg"}, 12345) + bc := NewConn(server, nil, "im.msg", []string{"im.msg"}, 12345, "") for i := 0; i < sendChCap; i++ { if !bc.TrySend(i) { @@ -126,7 +126,7 @@ func TestConn_ReaderDetectsEOF(t *testing.T) { server, client := net.Pipe() defer server.Close() - bc := NewConn(server, nil, "im.msg", []string{"im.msg"}, 12345) + bc := NewConn(server, nil, "im.msg", []string{"im.msg"}, 12345, "") done := make(chan struct{}) go func() { @@ -142,3 +142,23 @@ func TestConn_ReaderDetectsEOF(t *testing.T) { t.Fatal("ReaderLoop did not exit on EOF") } } + +func TestConn_SubscriptionID(t *testing.T) { + c1, c2 := net.Pipe() + defer c1.Close() + defer c2.Close() + conn := NewConn(c1, nil, "mail.x", []string{"mail.x"}, 999, "mail.x:abc") + if got := conn.SubscriptionID(); got != "mail.x:abc" { + t.Errorf("SubscriptionID() = %q, want %q", got, "mail.x:abc") + } +} + +func TestConn_SubscriptionID_EmptyFallsBackToEventKey(t *testing.T) { + c1, c2 := net.Pipe() + defer c1.Close() + defer c2.Close() + conn := NewConn(c1, nil, "mail.x", []string{"mail.x"}, 999, "") + if got := conn.SubscriptionID(); got != "mail.x" { + t.Errorf("SubscriptionID() with empty input = %q, want fallback %q", got, "mail.x") + } +} diff --git a/internal/event/bus/handle_hello_test.go b/internal/event/bus/handle_hello_test.go index 7be844de..034fbd1b 100644 --- a/internal/event/bus/handle_hello_test.go +++ b/internal/event/bus/handle_hello_test.go @@ -63,3 +63,134 @@ func TestHandleHello_HelloAckWriteFailureUnregisters(t *testing.T) { t.Errorf("b.conns after failed HelloAck = %d entries, want 0", remaining) } } + +// TestHandleHello_LegacyClient_FallsBackToEventKey: a Hello with empty +// subscription_id registers under EventKey (today's behavior preserved). +func TestHandleHello_LegacyClient_FallsBackToEventKey(t *testing.T) { + logger := log.New(io.Discard, "", 0) + hub := NewHub() + b := &Bus{ + hub: hub, + logger: logger, + conns: make(map[*Conn]struct{}), + idleTimer: time.NewTimer(30 * time.Second), + shutdownCh: make(chan struct{}, 1), + } + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + // Legacy client: no subscription_id field (empty string). + hello := &protocol.Hello{ + PID: 9999, + EventKey: "im.message", + EventTypes: []string{"im.message.receive_v1"}, + SubscriptionID: "", // legacy: empty, should fallback to EventKey + } + + br := bufio.NewReader(server) + + done := make(chan struct{}) + go func() { + b.handleHello(server, br, hello) + close(done) + }() + + // Read the HelloAck from client side to let handleHello complete. + clientReader := bufio.NewReader(client) + ackLine, err := clientReader.ReadString('\n') + if err != nil { + t.Fatalf("failed to read HelloAck: %v", err) + } + + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("handleHello did not return within 3s") + } + + // Assertions: registered under EventKey (not a qualified subscription ID). + if got := hub.ConnCount(); got != 1 { + t.Errorf("hub.ConnCount = %d, want 1", got) + } + if got := hub.EventKeyCount("im.message"); got != 1 { + t.Errorf("hub.EventKeyCount(im.message) = %d, want 1", got) + } + if got := hub.SubCount("im.message"); got != 1 { + t.Errorf("hub.SubCount(im.message) = %d, want 1 (legacy fallback to EventKey)", got) + } + if got := hub.SubCount("im.message:something"); got != 0 { + t.Errorf("hub.SubCount(im.message:something) = %d, want 0 (should not exist)", got) + } + + if ackLine == "" { + t.Fatal("HelloAck was empty") + } +} + +// TestHandleHello_ModernClient_UsesSubscriptionID: a Hello with +// non-empty subscription_id registers under that ID, not EventKey. +func TestHandleHello_ModernClient_UsesSubscriptionID(t *testing.T) { + logger := log.New(io.Discard, "", 0) + hub := NewHub() + b := &Bus{ + hub: hub, + logger: logger, + conns: make(map[*Conn]struct{}), + idleTimer: time.NewTimer(30 * time.Second), + shutdownCh: make(chan struct{}, 1), + } + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + // Modern client: subscription_id explicitly set. + subscriptionID := "mail.message:alice@example.com" + hello := &protocol.Hello{ + PID: 8888, + EventKey: "mail.message", + EventTypes: []string{"mail.message.receive_v1"}, + SubscriptionID: subscriptionID, // modern: per-resource subscription + } + + br := bufio.NewReader(server) + + done := make(chan struct{}) + go func() { + b.handleHello(server, br, hello) + close(done) + }() + + // Read the HelloAck from client side to let handleHello complete. + clientReader := bufio.NewReader(client) + ackLine, err := clientReader.ReadString('\n') + if err != nil { + t.Fatalf("failed to read HelloAck: %v", err) + } + + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("handleHello did not return within 3s") + } + + // Assertions: registered under the subscription_id, not bare EventKey. + if got := hub.ConnCount(); got != 1 { + t.Errorf("hub.ConnCount = %d, want 1", got) + } + if got := hub.EventKeyCount("mail.message"); got != 1 { + t.Errorf("hub.EventKeyCount(mail.message) = %d, want 1", got) + } + if got := hub.SubCount(subscriptionID); got != 1 { + t.Errorf("hub.SubCount(%q) = %d, want 1 (modern: uses SubscriptionID)", subscriptionID, got) + } + if got := hub.SubCount("mail.message"); got != 0 { + t.Errorf("hub.SubCount(mail.message) = %d, want 0 (modern: NOT registered under bare EventKey)", got) + } + + if ackLine == "" { + t.Fatal("HelloAck was empty") + } +} diff --git a/internal/event/bus/hub.go b/internal/event/bus/hub.go index 76b7d22e..620d3df6 100644 --- a/internal/event/bus/hub.go +++ b/internal/event/bus/hub.go @@ -16,6 +16,9 @@ import ( // Subscriber is the interface a connection must satisfy for Hub registration. type Subscriber interface { EventKey() string + // SubscriptionID identifies the per-resource subscription for dedup purposes. + // When no resource qualifier is needed it equals EventKey. + SubscriptionID() string EventTypes() []string SendCh() chan interface{} PID() int @@ -34,8 +37,11 @@ type Subscriber interface { type Hub struct { mu sync.RWMutex subscribers map[Subscriber]struct{} - keyCounts map[string]int - // cleanupInProgress[key] holds a channel closed on release; presence means a cleanup lock is held. + // subCounts is keyed by SubscriptionID (not EventKey) so that different + // per-resource subscriptions sharing the same EventKey are deduped independently. + subCounts map[string]int + // cleanupInProgress[subscriptionID] holds a channel closed on release; + // presence means a cleanup lock is held for that subscription. cleanupInProgress map[string]chan struct{} logger atomic.Pointer[log.Logger] } @@ -43,7 +49,7 @@ type Hub struct { func NewHub() *Hub { return &Hub{ subscribers: make(map[Subscriber]struct{}), - keyCounts: make(map[string]int), + subCounts: make(map[string]int), cleanupInProgress: make(map[string]chan struct{}), } } @@ -51,7 +57,7 @@ func NewHub() *Hub { // SetLogger attaches a logger (nil tolerated). func (h *Hub) SetLogger(l *log.Logger) { h.logger.Store(l) } -// UnregisterAndIsLast removes s and reports whether it was last for its EventKey; stale unregisters are no-ops. +// UnregisterAndIsLast removes s and reports whether it was last for its SubscriptionID; stale unregisters are no-ops. func (h *Hub) UnregisterAndIsLast(s Subscriber) bool { h.mu.Lock() defer h.mu.Unlock() @@ -59,34 +65,35 @@ func (h *Hub) UnregisterAndIsLast(s Subscriber) bool { return false } delete(h.subscribers, s) - h.keyCounts[s.EventKey()]-- - isLast := h.keyCounts[s.EventKey()] == 0 + sid := s.SubscriptionID() + h.subCounts[sid]-- + isLast := h.subCounts[sid] == 0 if isLast { - delete(h.keyCounts, s.EventKey()) + delete(h.subCounts, sid) } return isLast } -// AcquireCleanupLock reserves cleanup rights iff exactly one subscriber exists for eventKey and no lock is held. +// AcquireCleanupLock reserves cleanup rights iff exactly one subscriber exists for subscriptionID and no lock is held. // Count==0 is rejected (would block future Register calls). On true return, caller MUST Release. -func (h *Hub) AcquireCleanupLock(eventKey string) bool { +func (h *Hub) AcquireCleanupLock(subscriptionID string) bool { h.mu.Lock() defer h.mu.Unlock() - if h.keyCounts[eventKey] != 1 { + if h.subCounts[subscriptionID] != 1 { return false } - if _, alreadyLocked := h.cleanupInProgress[eventKey]; alreadyLocked { + if _, alreadyLocked := h.cleanupInProgress[subscriptionID]; alreadyLocked { return false } - h.cleanupInProgress[eventKey] = make(chan struct{}) + h.cleanupInProgress[subscriptionID] = make(chan struct{}) return true } // ReleaseCleanupLock is idempotent; OnClose calls unconditionally. -func (h *Hub) ReleaseCleanupLock(eventKey string) { +func (h *Hub) ReleaseCleanupLock(subscriptionID string) { h.mu.Lock() - ch := h.cleanupInProgress[eventKey] - delete(h.cleanupInProgress, eventKey) + ch := h.cleanupInProgress[subscriptionID] + delete(h.cleanupInProgress, subscriptionID) h.mu.Unlock() if ch != nil { close(ch) @@ -94,23 +101,24 @@ func (h *Hub) ReleaseCleanupLock(eventKey string) { } // RegisterAndIsFirst adds s to the hub and reports whether it's the first -// subscriber for its EventKey. If a cleanup is in progress for -// s.EventKey() (another conn holds the cleanup lock), this waits until +// subscriber for its SubscriptionID. If a cleanup is in progress for +// s.SubscriptionID() (another conn holds the cleanup lock), this waits until // cleanup releases before registering — closing the PreShutdownCheck × // Hello TOCTOU race. The wait releases h.mu before blocking on the -// channel, so concurrent operations on other keys aren't stalled. +// channel, so concurrent operations on other subscriptions aren't stalled. func (h *Hub) RegisterAndIsFirst(s Subscriber) bool { + sid := s.SubscriptionID() for { h.mu.Lock() - ch, locked := h.cleanupInProgress[s.EventKey()] + ch, locked := h.cleanupInProgress[sid] if locked { h.mu.Unlock() <-ch // wait for release, then re-check (defensive against races) continue } - isFirst := h.keyCounts[s.EventKey()] == 0 + isFirst := h.subCounts[sid] == 0 h.subscribers[s] = struct{}{} - h.keyCounts[s.EventKey()]++ + h.subCounts[sid]++ h.mu.Unlock() return isFirst } @@ -176,11 +184,25 @@ func (h *Hub) ConnCount() int { return len(h.subscribers) } -// EventKeyCount returns the number of subscribers registered for eventKey. +// EventKeyCount returns total subscribers for the given EventKey, aggregating +// across all SubscriptionIDs. For per-subscription counts use SubCount. func (h *Hub) EventKeyCount(eventKey string) int { h.mu.RLock() defer h.mu.RUnlock() - return h.keyCounts[eventKey] + count := 0 + for s := range h.subscribers { + if s.EventKey() == eventKey { + count++ + } + } + return count +} + +// SubCount returns the count of subscribers for the given SubscriptionID. +func (h *Hub) SubCount(subscriptionID string) int { + h.mu.RLock() + defer h.mu.RUnlock() + return h.subCounts[subscriptionID] } // BroadcastSourceStatus fans out a source-level status change to every @@ -205,10 +227,11 @@ func (h *Hub) Consumers() []protocol.ConsumerInfo { result := make([]protocol.ConsumerInfo, 0, len(h.subscribers)) for s := range h.subscribers { result = append(result, protocol.ConsumerInfo{ - PID: s.PID(), - EventKey: s.EventKey(), - Received: s.Received(), - Dropped: s.DroppedCount(), + PID: s.PID(), + EventKey: s.EventKey(), + SubscriptionID: s.SubscriptionID(), + Received: s.Received(), + Dropped: s.DroppedCount(), }) } return result diff --git a/internal/event/bus/hub_observability_test.go b/internal/event/bus/hub_observability_test.go index 051e3501..0134fe2a 100644 --- a/internal/event/bus/hub_observability_test.go +++ b/internal/event/bus/hub_observability_test.go @@ -17,7 +17,7 @@ func TestHubDroppedCountIncrements(t *testing.T) { server, client := testNetPipe(t) defer server.Close() defer client.Close() - c := NewConn(server, nil, "k", []string{"t"}, 1) + c := NewConn(server, nil, "k", []string{"t"}, 1, "") c.sendCh = make(chan interface{}, 1) h.RegisterAndIsFirst(c) @@ -35,7 +35,7 @@ func TestPublishAssignsIncrementalSeq(t *testing.T) { server, client := testNetPipe(t) defer server.Close() defer client.Close() - c := NewConn(server, nil, "k", []string{"t"}, 1) + c := NewConn(server, nil, "k", []string{"t"}, 1, "") c.sendCh = make(chan interface{}, 10) h.RegisterAndIsFirst(c) @@ -60,7 +60,7 @@ func TestPublishPopulatesEventIDAndSourceTime(t *testing.T) { server, client := testNetPipe(t) defer server.Close() defer client.Close() - c := NewConn(server, nil, "k", []string{"t"}, 1) + c := NewConn(server, nil, "k", []string{"t"}, 1, "") c.sendCh = make(chan interface{}, 1) h.RegisterAndIsFirst(c) @@ -87,7 +87,7 @@ func TestPublishSourceTimeTakesPrecedence(t *testing.T) { server, client := testNetPipe(t) defer server.Close() defer client.Close() - c := NewConn(server, nil, "k", []string{"t"}, 1) + c := NewConn(server, nil, "k", []string{"t"}, 1, "") c.sendCh = make(chan interface{}, 1) h.RegisterAndIsFirst(c) @@ -111,7 +111,7 @@ func TestPublishSourceTimeFallback(t *testing.T) { server, client := testNetPipe(t) defer server.Close() defer client.Close() - c := NewConn(server, nil, "k", []string{"t"}, 1) + c := NewConn(server, nil, "k", []string{"t"}, 1, "") c.sendCh = make(chan interface{}, 1) h.RegisterAndIsFirst(c) diff --git a/internal/event/bus/hub_publish_race_test.go b/internal/event/bus/hub_publish_race_test.go index b91987f5..f0eb0866 100644 --- a/internal/event/bus/hub_publish_race_test.go +++ b/internal/event/bus/hub_publish_race_test.go @@ -111,6 +111,7 @@ type alwaysFailSubscriber struct { } func (s *alwaysFailSubscriber) EventKey() string { return s.eventKey } +func (s *alwaysFailSubscriber) SubscriptionID() string { return s.eventKey } func (s *alwaysFailSubscriber) EventTypes() []string { return s.eventTypes } func (s *alwaysFailSubscriber) SendCh() chan interface{} { return s.sendCh } func (s *alwaysFailSubscriber) PID() int { return 0 } @@ -153,6 +154,7 @@ func newRaceSubscriber(key string, types []string, capacity int) *raceSubscriber } func (s *raceSubscriber) EventKey() string { return s.eventKey } +func (s *raceSubscriber) SubscriptionID() string { return s.eventKey } func (s *raceSubscriber) EventTypes() []string { return s.eventTypes } func (s *raceSubscriber) SendCh() chan interface{} { return s.sendCh } func (s *raceSubscriber) PID() int { return s.pid } diff --git a/internal/event/bus/hub_test.go b/internal/event/bus/hub_test.go index e135d436..04fe5364 100644 --- a/internal/event/bus/hub_test.go +++ b/internal/event/bus/hub_test.go @@ -5,6 +5,7 @@ package bus import ( "encoding/json" + "net" "sync" "sync/atomic" "testing" @@ -235,7 +236,10 @@ func newTestConn(eventKey string, eventTypes []string) *testConn { } } -func (c *testConn) EventKey() string { return c.eventKey } +func (c *testConn) EventKey() string { return c.eventKey } + +// SubscriptionID falls back to EventKey for test mocks that don't set a separate subscription ID. +func (c *testConn) SubscriptionID() string { return c.eventKey } func (c *testConn) EventTypes() []string { return c.eventTypes } func (c *testConn) SendCh() chan interface{} { return c.sendCh } func (c *testConn) PID() int { return c.pid } @@ -275,3 +279,79 @@ func (c *testConn) TrySend(msg interface{}) bool { return false } } + +func TestHub_SubscriptionID_Isolation(t *testing.T) { + h := NewHub() + c1, _ := net.Pipe() + c2, _ := net.Pipe() + defer c1.Close() + defer c2.Close() + s1 := NewConn(c1, nil, "mail.x", []string{"mail.x"}, 1, "mail.x:alice") + s2 := NewConn(c2, nil, "mail.x", []string{"mail.x"}, 2, "mail.x:bob") + + if !h.RegisterAndIsFirst(s1) { + t.Error("s1 should be first for its subscription") + } + if !h.RegisterAndIsFirst(s2) { + t.Error("s2 should ALSO be first (different SubscriptionID)") + } + if !h.UnregisterAndIsLast(s1) { + t.Error("s1 should be last for mail.x:alice") + } + if !h.UnregisterAndIsLast(s2) { + t.Error("s2 should be last for mail.x:bob") + } +} + +func TestHub_SameSubscriptionID_NotFirst(t *testing.T) { + h := NewHub() + c1, _ := net.Pipe() + c2, _ := net.Pipe() + defer c1.Close() + defer c2.Close() + s1 := NewConn(c1, nil, "mail.x", []string{"mail.x"}, 1, "mail.x:alice") + s2 := NewConn(c2, nil, "mail.x", []string{"mail.x"}, 2, "mail.x:alice") + + if !h.RegisterAndIsFirst(s1) { + t.Error("s1 first") + } + if h.RegisterAndIsFirst(s2) { + t.Error("s2 same SubscriptionID should NOT be first") + } +} + +func TestHub_EventKeyCount_AggregatesAcrossSubscriptions(t *testing.T) { + h := NewHub() + c1, _ := net.Pipe() + c2, _ := net.Pipe() + defer c1.Close() + defer c2.Close() + s1 := NewConn(c1, nil, "mail.x", []string{"mail.x"}, 1, "mail.x:alice") + s2 := NewConn(c2, nil, "mail.x", []string{"mail.x"}, 2, "mail.x:bob") + h.RegisterAndIsFirst(s1) + h.RegisterAndIsFirst(s2) + if got := h.EventKeyCount("mail.x"); got != 2 { + t.Errorf("EventKeyCount(mail.x) = %d, want 2 (aggregated across subscriptions)", got) + } + if got := h.SubCount("mail.x:alice"); got != 1 { + t.Errorf("SubCount(mail.x:alice) = %d, want 1", got) + } + if got := h.SubCount("mail.x:bob"); got != 1 { + t.Errorf("SubCount(mail.x:bob) = %d, want 1", got) + } +} + +func TestHub_Consumers_PopulatesSubscriptionID(t *testing.T) { + h := NewHub() + c1, _ := net.Pipe() + defer c1.Close() + s1 := NewConn(c1, nil, "mail.x", []string{"mail.x"}, 1, "mail.x:alice") + h.RegisterAndIsFirst(s1) + consumers := h.Consumers() + if len(consumers) != 1 { + t.Fatalf("got %d consumers, want 1", len(consumers)) + } + if consumers[0].SubscriptionID != "mail.x:alice" { + t.Errorf("Consumers()[0].SubscriptionID = %q, want %q", consumers[0].SubscriptionID, "mail.x:alice") + } +} diff --git a/internal/event/consume/consume.go b/internal/event/consume/consume.go index de4be276..76a5b5c3 100644 --- a/internal/event/consume/consume.go +++ b/internal/event/consume/consume.go @@ -61,6 +61,22 @@ func Run(ctx context.Context, tr transport.IPC, appID, profileName, domain strin } } + // Normalize params (resolve aliases like "me" -> real email) before fingerprint + // compute, PreConsume, Match, Process. Must happen BEFORE doHello so the + // SubscriptionID we send to bus reflects canonical values. + if keyDef.NormalizeParams != nil { + if err := keyDef.NormalizeParams(ctx, opts.Runtime, opts.Params); err != nil { + if _, ok := errs.ProblemOf(err); ok { + return err + } + return errs.NewInternalError(errs.SubtypeUnknown, + "normalize params for %s: %s", opts.EventKey, err).WithCause(err) + } + } + + // Compute subscription identity from normalized params + SubscriptionKey flags. + subscriptionID := ComputeSubscriptionID(keyDef, opts.Params) + if opts.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, opts.Timeout) @@ -81,13 +97,13 @@ func Run(ctx context.Context, tr transport.IPC, appID, profileName, domain strin } defer conn.Close() - ack, br, err := doHello(conn, opts.EventKey, []string{keyDef.EventType}) + ack, br, err := doHello(conn, opts.EventKey, []string{keyDef.EventType}, subscriptionID) if err != nil { return errs.NewInternalError(errs.SubtypeUnknown, "event bus handshake failed: %s", err).WithCause(err) } - var cleanup func() + var cleanup func() error if ack.FirstForKey && keyDef.PreConsume != nil { if !opts.Quiet { fmt.Fprintf(errOut, "[event] running pre-consume setup...\n") @@ -113,14 +129,22 @@ func Run(ctx context.Context, tr transport.IPC, appID, profileName, domain strin if cleanup != nil { switch { case r != nil: - fmt.Fprintf(errOut, "WARN: panic recovered; running cleanup unconditionally (may affect other consumers of %s)\n", opts.EventKey) - cleanup() + fmt.Fprintf(errOut, + "WARN: panic recovered; running cleanup unconditionally (may affect other consumers of %s)\n", + opts.EventKey) + if cleanupErr := cleanup(); cleanupErr != nil { + fmt.Fprintf(errOut, + "WARN: cleanup also failed during panic recovery: %v\n", cleanupErr) + } case lastForKey: if !opts.Quiet { fmt.Fprintf(errOut, "[event] running cleanup...\n") } - cleanup() - if !opts.Quiet { + if cleanupErr := cleanup(); cleanupErr != nil { + fmt.Fprintf(errOut, + "WARN: cleanup failed: %v (server-side subscribe is idempotent — residual record will be overwritten on next subscribe)\n", + cleanupErr) + } else if !opts.Quiet { fmt.Fprintf(errOut, "[event] cleanup done.\n") } } @@ -144,7 +168,7 @@ func Run(ctx context.Context, tr transport.IPC, appID, profileName, domain strin writeReadyMarker(errOut, opts) - return consumeLoop(ctx, conn, br, keyDef, opts, &lastForKey, &emitted) + return consumeLoop(ctx, conn, br, keyDef, opts, subscriptionID, &lastForKey, &emitted) } func truncateDuration(d time.Duration) time.Duration { diff --git a/internal/event/consume/consume_test.go b/internal/event/consume/consume_test.go new file mode 100644 index 00000000..a082e71f --- /dev/null +++ b/internal/event/consume/consume_test.go @@ -0,0 +1,101 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package consume + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "net" + "strings" + "testing" + + "github.com/larksuite/cli/internal/event" + "github.com/larksuite/cli/internal/event/protocol" + "github.com/larksuite/cli/internal/event/transport" +) + +// fakeRT is a minimal event.APIClient mock. +type fakeRT struct { + err error +} + +func (f *fakeRT) CallAPI(_ context.Context, _, _ string, _ interface{}) (json.RawMessage, error) { + return nil, f.err +} + +func TestNormalizeParams_ErrorIsWrappedWithEventKey(t *testing.T) { + // Drives the real Run() path: NormalizeParams fails before EnsureBus, so no + // bus is contacted, yet the production error-wrapping is exercised — if Run() + // ever stops wrapping, this test fails. + const key = "test.evt_normalize_fail" + event.RegisterKey(event.KeyDefinition{ + Key: key, + EventType: key, + Schema: event.SchemaDef{Custom: &event.SchemaSpec{Raw: json.RawMessage(`{"type":"object"}`)}}, + NormalizeParams: func(_ context.Context, _ event.APIClient, _ map[string]string) error { + return errors.New("simulated normalize failure") + }, + }) + defer event.UnregisterKeyForTest(key) + + err := Run(context.Background(), transport.New(), "app", "", "", Options{ + EventKey: key, + Runtime: &fakeRT{}, + Quiet: true, + }) + if err == nil { + t.Fatal("expected Run to fail when NormalizeParams errors") + } + if !strings.Contains(err.Error(), "normalize params for "+key+":") { + t.Errorf("error not wrapped with EventKey prefix: %v", err) + } + if !strings.Contains(err.Error(), "simulated normalize failure") { + t.Errorf("underlying error not propagated: %v", err) + } +} + +func TestDoHello_PassesSubscriptionIDToWire(t *testing.T) { + a, b := net.Pipe() + defer a.Close() + defer b.Close() + + // Server-side: read Hello, decode, assert SubscriptionID, send ack + done := make(chan string, 1) + go func() { + br := bufio.NewReader(b) + line, err := protocol.ReadFrame(br) + if err != nil { + done <- "READ_ERR:" + err.Error() + return + } + msg, err := protocol.Decode(bytes.TrimRight(line, "\n")) + if err != nil { + done <- "DECODE_ERR:" + err.Error() + return + } + if hello, ok := msg.(*protocol.Hello); ok { + done <- hello.SubscriptionID + // send ack so client can return + ack := protocol.NewHelloAck("v1", true) + _ = protocol.EncodeWithDeadline(b, ack, protocol.WriteTimeout) + } else { + done <- "WRONG_TYPE" + } + }() + + ack, _, err := doHello(a, "mail.x", []string{"mail.x"}, "mail.x:alice") + if err != nil { + t.Fatalf("doHello error: %v", err) + } + if ack == nil { + t.Fatal("got nil ack") + } + got := <-done + if got != "mail.x:alice" { + t.Errorf("Hello.SubscriptionID on wire = %q, want %q", got, "mail.x:alice") + } +} diff --git a/internal/event/consume/fingerprint.go b/internal/event/consume/fingerprint.go new file mode 100644 index 00000000..a452f1aa --- /dev/null +++ b/internal/event/consume/fingerprint.go @@ -0,0 +1,41 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package consume + +import ( + "crypto/sha256" + "encoding/base64" + "encoding/json" + "sort" + + "github.com/larksuite/cli/internal/event" +) + +// ComputeSubscriptionID returns a stable identifier scoped to (EventKey, values +// of the ParamDefs marked SubscriptionKey); the framework uses it to dedup +// PreConsume/cleanup gates and key Hub counts per-subscription. No SubscriptionKey +// params -> returns def.Key verbatim (legacy one-dimensional behavior). +// +// Stability contract: same EventKey + same normalized param values -> same ID +// across CLI versions; changing the encoding requires a wire-format bump. +func ComputeSubscriptionID(def *event.KeyDefinition, params map[string]string) string { + type kv struct { + Name string `json:"name"` + Value string `json:"value"` + } + var subParams []kv + for _, p := range def.Params { + if !p.SubscriptionKey { + continue + } + subParams = append(subParams, kv{Name: p.Name, Value: params[p.Name]}) + } + if len(subParams) == 0 { + return def.Key + } + sort.Slice(subParams, func(i, j int) bool { return subParams[i].Name < subParams[j].Name }) + raw, _ := json.Marshal(subParams) // err impossible: kv has no unmarshalable fields + sum := sha256.Sum256(raw) + return def.Key + ":" + base64.RawURLEncoding.EncodeToString(sum[:12]) +} diff --git a/internal/event/consume/fingerprint_test.go b/internal/event/consume/fingerprint_test.go new file mode 100644 index 00000000..1440ff5d --- /dev/null +++ b/internal/event/consume/fingerprint_test.go @@ -0,0 +1,126 @@ +// Copyright (c) 2026 Lark Technologies Pte. Ltd. +// SPDX-License-Identifier: MIT + +package consume + +import ( + "strings" + "testing" + + "github.com/larksuite/cli/internal/event" +) + +func TestComputeSubscriptionID(t *testing.T) { + makeDef := func(subKeyNames ...string) *event.KeyDefinition { + def := &event.KeyDefinition{Key: "test.evt"} + marked := make(map[string]bool, len(subKeyNames)) + for _, n := range subKeyNames { + marked[n] = true + } + for _, n := range []string{"alpha", "beta", "gamma"} { + def.Params = append(def.Params, event.ParamDef{Name: n, SubscriptionKey: marked[n]}) + } + return def + } + + t.Run("no SubscriptionKey params returns EventKey verbatim", func(t *testing.T) { + def := makeDef() + got := ComputeSubscriptionID(def, map[string]string{"alpha": "x", "beta": "y"}) + if got != "test.evt" { + t.Errorf("got %q, want %q", got, "test.evt") + } + }) + + t.Run("single SubscriptionKey param: non-sub params do not leak into ID", func(t *testing.T) { + def := makeDef("alpha") + id1 := ComputeSubscriptionID(def, map[string]string{"alpha": "value1", "beta": "ignored"}) + id2 := ComputeSubscriptionID(def, map[string]string{"alpha": "value1", "beta": "different"}) + if id1 != id2 { + t.Errorf("non-SubscriptionKey param change leaked into ID: %q vs %q", id1, id2) + } + }) + + t.Run("different SubscriptionKey value produces different ID", func(t *testing.T) { + def := makeDef("alpha") + id1 := ComputeSubscriptionID(def, map[string]string{"alpha": "v1"}) + id2 := ComputeSubscriptionID(def, map[string]string{"alpha": "v2"}) + if id1 == id2 { + t.Errorf("different values produced same ID: %q", id1) + } + }) +} + +func TestComputeSubscriptionID_Stability(t *testing.T) { + // Param order in the ParamDef list must not affect the result (sorted by name internally). + def1 := &event.KeyDefinition{ + Key: "test.evt", + Params: []event.ParamDef{ + {Name: "b", SubscriptionKey: true}, + {Name: "a", SubscriptionKey: true}, + }, + } + def2 := &event.KeyDefinition{ + Key: "test.evt", + Params: []event.ParamDef{ + {Name: "a", SubscriptionKey: true}, + {Name: "b", SubscriptionKey: true}, + }, + } + id1 := ComputeSubscriptionID(def1, map[string]string{"a": "1", "b": "2"}) + id2 := ComputeSubscriptionID(def2, map[string]string{"a": "1", "b": "2"}) + if id1 != id2 { + t.Errorf("order-sensitive: id1=%q id2=%q", id1, id2) + } +} + +func TestComputeSubscriptionID_Format(t *testing.T) { + def := &event.KeyDefinition{ + Key: "mail.user_mailbox.event.message_received_v1", + Params: []event.ParamDef{{Name: "mailbox", SubscriptionKey: true}}, + } + id := ComputeSubscriptionID(def, map[string]string{"mailbox": "liuxinyang@example.com"}) + prefix := "mail.user_mailbox.event.message_received_v1:" + if !strings.HasPrefix(id, prefix) { + t.Fatalf("missing prefix: %q", id) + } + suffix := strings.TrimPrefix(id, prefix) + if len(suffix) != 16 { + t.Errorf("fingerprint length = %d, want 16", len(suffix)) + } + for _, c := range suffix { + isValid := (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '-' || c == '_' + if !isValid { + t.Errorf("non-base64URL char in fingerprint: %q", suffix) + break + } + } +} + +func TestComputeSubscriptionID_UnicodeAndSpecialChars(t *testing.T) { + def := &event.KeyDefinition{ + Key: "test.evt", + Params: []event.ParamDef{{Name: "value", SubscriptionKey: true}}, + } + for _, val := range []string{"中文", "emoji🚀", "with spaces", "with:colons", "with\"quotes"} { + id := ComputeSubscriptionID(def, map[string]string{"value": val}) + if !strings.HasPrefix(id, "test.evt:") || len(id) != len("test.evt:")+16 { + t.Errorf("ID malformed for value=%q: %q (len=%d)", val, id, len(id)) + } + } +} + +func TestComputeSubscriptionID_EmptyValue(t *testing.T) { + def := &event.KeyDefinition{ + Key: "test.evt", + Params: []event.ParamDef{{Name: "x", SubscriptionKey: true}}, + } + id1 := ComputeSubscriptionID(def, map[string]string{"x": ""}) + id2 := ComputeSubscriptionID(def, map[string]string{}) // missing entirely + if id1 != id2 { + t.Errorf("empty value should be indistinguishable from missing: %q vs %q", id1, id2) + } + id3 := ComputeSubscriptionID(def, map[string]string{"x": "nonempty"}) + if id1 == id3 { + t.Errorf("empty and nonempty produced same ID: %q", id1) + } +} diff --git a/internal/event/consume/handshake.go b/internal/event/consume/handshake.go index 7aac408d..18aeac95 100644 --- a/internal/event/consume/handshake.go +++ b/internal/event/consume/handshake.go @@ -18,8 +18,8 @@ const helloAckTimeout = 5 * time.Second // symmetric with bus-side hello read de // doHello returns a bufio.Reader holding any bytes already pulled off conn so events // buffered with the ack in one TCP segment aren't dropped. -func doHello(conn net.Conn, eventKey string, eventTypes []string) (*protocol.HelloAck, *bufio.Reader, error) { - hello := protocol.NewHello(os.Getpid(), eventKey, eventTypes, "v1") +func doHello(conn net.Conn, eventKey string, eventTypes []string, subscriptionID string) (*protocol.HelloAck, *bufio.Reader, error) { + hello := protocol.NewHello(os.Getpid(), eventKey, eventTypes, "v1", subscriptionID) if err := protocol.EncodeWithDeadline(conn, hello, protocol.WriteTimeout); err != nil { return nil, nil, err } diff --git a/internal/event/consume/handshake_test.go b/internal/event/consume/handshake_test.go index b434a9a9..918c849e 100644 --- a/internal/event/consume/handshake_test.go +++ b/internal/event/consume/handshake_test.go @@ -27,7 +27,7 @@ func TestDoHello_ReadDeadline(t *testing.T) { start := time.Now() done := make(chan error, 1) go func() { - _, _, err := doHello(client, "im.msg", []string{"im.msg"}) + _, _, err := doHello(client, "im.msg", []string{"im.msg"}, "") done <- err }() diff --git a/internal/event/consume/loop.go b/internal/event/consume/loop.go index 6a3f844c..849caddd 100644 --- a/internal/event/consume/loop.go +++ b/internal/event/consume/loop.go @@ -22,7 +22,7 @@ import ( ) // consumeLoop reads events and dispatches to workers; cancels on terminal sink errors. -func consumeLoop(ctx context.Context, conn net.Conn, br *bufio.Reader, keyDef *event.KeyDefinition, opts Options, lastForKey *bool, emitted *atomic.Int64) error { +func consumeLoop(ctx context.Context, conn net.Conn, br *bufio.Reader, keyDef *event.KeyDefinition, opts Options, subscriptionID string, lastForKey *bool, emitted *atomic.Int64) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -185,7 +185,7 @@ func consumeLoop(ctx context.Context, conn net.Conn, br *bufio.Reader, keyDef *e close(stopReader) <-readerDone conn.SetReadDeadline(time.Time{}) - *lastForKey = checkLastForKey(conn, opts.EventKey) + *lastForKey = checkLastForKey(conn, opts.EventKey, subscriptionID) conn.Close() case <-allDone: // bus-side close; can't query, assume last @@ -199,13 +199,19 @@ func consumeLoop(ctx context.Context, conn net.Conn, br *bufio.Reader, keyDef *e // processAndOutput returns (wrote, err); err non-nil only for sink.Write failures. func processAndOutput(ctx context.Context, keyDef *event.KeyDefinition, evt *protocol.Event, opts Options, sink Sink, jqCode *gojq.Code) (bool, error) { + raw := &event.RawEvent{ + EventType: evt.EventType, + Payload: evt.Payload, + } + + // Synchronous Match filter runs before any work (Process / sink write). + if keyDef.Match != nil && !keyDef.Match(raw, opts.Params) { + return false, nil + } + var result json.RawMessage if keyDef.Process != nil { - raw := &event.RawEvent{ - EventType: evt.EventType, - Payload: evt.Payload, - } var err error result, err = keyDef.Process(ctx, opts.Runtime, raw, opts.Params) if err != nil { diff --git a/internal/event/consume/loop_test.go b/internal/event/consume/loop_test.go index 9b098da7..ace554fd 100644 --- a/internal/event/consume/loop_test.go +++ b/internal/event/consume/loop_test.go @@ -89,7 +89,7 @@ func TestConsumeLoop_DeliversEventsAndExitsOnMaxEvents(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - err := consumeLoop(ctx, client, bufio.NewReader(client), echoKeyDef("test.key"), opts, &lastForKey, &emitted) + err := consumeLoop(ctx, client, bufio.NewReader(client), echoKeyDef("test.key"), opts, "", &lastForKey, &emitted) if err != nil { t.Fatalf("consumeLoop: %v", err) } @@ -132,7 +132,7 @@ func TestConsumeLoop_SeqGapEmitsWarning(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := consumeLoop(ctx, client, bufio.NewReader(client), echoKeyDef("test.key"), opts, &lastForKey, &emitted); err != nil { + if err := consumeLoop(ctx, client, bufio.NewReader(client), echoKeyDef("test.key"), opts, "", &lastForKey, &emitted); err != nil { t.Fatalf("consumeLoop: %v", err) } if got := emitted.Load(); got != 2 { @@ -169,7 +169,7 @@ func TestConsumeLoop_JQFilterAppliedPerEvent(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := consumeLoop(ctx, client, bufio.NewReader(client), echoKeyDef("test.key"), opts, &lastForKey, &emitted); err != nil { + if err := consumeLoop(ctx, client, bufio.NewReader(client), echoKeyDef("test.key"), opts, "", &lastForKey, &emitted); err != nil { t.Fatalf("consumeLoop: %v", err) } if got := emitted.Load(); got != 1 { @@ -196,12 +196,96 @@ func TestConsumeLoop_CompileJQFailsEarly(t *testing.T) { var lastForKey bool var emitted atomic.Int64 - err := consumeLoop(context.Background(), client, bufio.NewReader(client), echoKeyDef("test.key"), opts, &lastForKey, &emitted) + err := consumeLoop(context.Background(), client, bufio.NewReader(client), echoKeyDef("test.key"), opts, "", &lastForKey, &emitted) if err == nil { t.Fatal("consumeLoop should fail immediately on bad jq expression") } } +// captureSink is a minimal Sink for unit-testing processAndOutput directly. +type captureSink struct { + written []json.RawMessage +} + +func (s *captureSink) Write(data json.RawMessage) error { + s.written = append(s.written, data) + return nil +} + +func TestProcessAndOutput_Match_DropsEvent(t *testing.T) { + calledProcess := false + keyDef := &event.KeyDefinition{ + Key: "test.evt", + Match: func(raw *event.RawEvent, params map[string]string) bool { + return false + }, + Process: func(ctx context.Context, rt event.APIClient, raw *event.RawEvent, params map[string]string) (json.RawMessage, error) { + calledProcess = true + return json.RawMessage(`{}`), nil + }, + } + sink := &captureSink{} + wrote, err := processAndOutput(context.Background(), keyDef, + &protocol.Event{Type: protocol.MsgTypeEvent, EventType: "test.evt", Payload: json.RawMessage(`{"x":1}`)}, + Options{}, sink, nil) + if err != nil { + t.Fatal(err) + } + if wrote { + t.Error("Match returned false but event was written") + } + if calledProcess { + t.Error("Process was called even though Match returned false") + } + if len(sink.written) != 0 { + t.Errorf("sink received %d events, want 0", len(sink.written)) + } +} + +func TestProcessAndOutput_Match_NilAcceptsAll(t *testing.T) { + keyDef := &event.KeyDefinition{Key: "test.evt"} // no Match, no Process + sink := &captureSink{} + wrote, err := processAndOutput(context.Background(), keyDef, + &protocol.Event{Type: protocol.MsgTypeEvent, EventType: "test.evt", Payload: json.RawMessage(`{"x":1}`)}, + Options{}, sink, nil) + if err != nil || !wrote { + t.Errorf("expected wrote=true err=nil; got wrote=%v err=%v", wrote, err) + } + if len(sink.written) != 1 { + t.Errorf("sink received %d events, want 1", len(sink.written)) + } +} + +func TestProcessAndOutput_Match_RunsBeforeProcess(t *testing.T) { + // Record the actual call sequence — a bare call-count check would still + // pass if Process ran before Match. + var order []string + keyDef := &event.KeyDefinition{ + Key: "test.evt", + Match: func(raw *event.RawEvent, params map[string]string) bool { + order = append(order, "match") + return true + }, + Process: func(ctx context.Context, rt event.APIClient, raw *event.RawEvent, params map[string]string) (json.RawMessage, error) { + order = append(order, "process") + return raw.Payload, nil + }, + } + sink := &captureSink{} + wrote, err := processAndOutput(context.Background(), keyDef, + &protocol.Event{Type: protocol.MsgTypeEvent, EventType: "test.evt", Payload: json.RawMessage(`{}`)}, + Options{}, sink, nil) + if err != nil { + t.Fatal(err) + } + if !wrote { + t.Error("expected wrote=true") + } + if len(order) != 2 || order[0] != "match" || order[1] != "process" { + t.Errorf("call order = %v, want [match process]", order) + } +} + func TestIsTerminalSinkError(t *testing.T) { for _, tc := range []struct { name string diff --git a/internal/event/consume/shutdown.go b/internal/event/consume/shutdown.go index e41209e2..3c07f986 100644 --- a/internal/event/consume/shutdown.go +++ b/internal/event/consume/shutdown.go @@ -16,8 +16,8 @@ const preShutdownAckTimeout = 2 * time.Second // checkLastForKey atomically reserves a cleanup lock; on any error defaults to true // (cleanup-on-error is safer than leaking server state). Discards non-ack frames in flight. -func checkLastForKey(conn net.Conn, eventKey string) bool { - msg := protocol.NewPreShutdownCheck(eventKey) +func checkLastForKey(conn net.Conn, eventKey string, subscriptionID string) bool { + msg := protocol.NewPreShutdownCheck(eventKey, subscriptionID) if err := protocol.EncodeWithDeadline(conn, msg, protocol.WriteTimeout); err != nil { return true } diff --git a/internal/event/consume/shutdown_test.go b/internal/event/consume/shutdown_test.go index c77754a4..76e28ad6 100644 --- a/internal/event/consume/shutdown_test.go +++ b/internal/event/consume/shutdown_test.go @@ -4,6 +4,8 @@ package consume import ( + "bufio" + "bytes" "encoding/json" "io" "net" @@ -38,7 +40,7 @@ func TestCheckLastForKey_IgnoresNonAckFrames(t *testing.T) { } }() - got := checkLastForKey(client, "im.msg") + got := checkLastForKey(client, "im.msg", "") if got != false { t.Errorf("checkLastForKey = %v, want false", got) } @@ -62,7 +64,7 @@ func TestCheckLastForKey_ReturnsAckValue(t *testing.T) { _ = protocol.Encode(server, ack) }() - got := checkLastForKey(client, "im.msg") + got := checkLastForKey(client, "im.msg", "") if got != true { t.Errorf("checkLastForKey = %v, want true", got) } @@ -83,7 +85,7 @@ func TestCheckLastForKey_DefaultsToTrueOnTimeout(t *testing.T) { }() start := time.Now() - got := checkLastForKey(client, "im.msg") + got := checkLastForKey(client, "im.msg", "") elapsed := time.Since(start) if got != true { @@ -93,3 +95,39 @@ func TestCheckLastForKey_DefaultsToTrueOnTimeout(t *testing.T) { t.Errorf("elapsed = %v, expected ~%v (timeout-bounded)", elapsed, preShutdownAckTimeout) } } + +func TestCheckLastForKey_SendsSubscriptionID(t *testing.T) { + a, b := net.Pipe() + defer a.Close() + defer b.Close() + + done := make(chan string, 1) + go func() { + br := bufio.NewReader(b) + line, err := protocol.ReadFrame(br) + if err != nil { + done <- "READ_ERR" + return + } + msg, err := protocol.Decode(bytes.TrimRight(line, "\n")) + if err != nil { + done <- "DECODE_ERR" + return + } + check, ok := msg.(*protocol.PreShutdownCheck) + if !ok { + done <- "WRONG_TYPE" + return + } + done <- check.SubscriptionID + // Reply with ack so client returns + ack := protocol.NewPreShutdownAck(true) + _ = protocol.EncodeWithDeadline(b, ack, protocol.WriteTimeout) + }() + + _ = checkLastForKey(a, "mail.x", "mail.x:alice") + got := <-done + if got != "mail.x:alice" { + t.Errorf("PreShutdownCheck.SubscriptionID on wire = %q, want %q", got, "mail.x:alice") + } +} diff --git a/internal/event/protocol/codec_test.go b/internal/event/protocol/codec_test.go index 91dbe01f..979c487a 100644 --- a/internal/event/protocol/codec_test.go +++ b/internal/event/protocol/codec_test.go @@ -77,3 +77,88 @@ func TestDecodeUnknownType(t *testing.T) { t.Error("expected error for unknown type") } } + +func TestEncodeDecodeHello_WithSubscriptionID(t *testing.T) { + msg := &Hello{ + Type: MsgTypeHello, + PID: 12345, + EventKey: "mail.user_mailbox.event.message_received_v1", + EventTypes: []string{"mail.user_mailbox.event.message_received_v1"}, + Version: "v1", + SubscriptionID: "mail.user_mailbox.event.message_received_v1:a7Bx9Kp2Lm3Qv4Rs", + } + buf := &bytes.Buffer{} + if err := Encode(buf, msg); err != nil { + t.Fatal(err) + } + line := buf.Bytes() + if !bytes.Contains(line, []byte(`"subscription_id":"mail.user_mailbox.event.message_received_v1:a7Bx9Kp2Lm3Qv4Rs"`)) { + t.Errorf("subscription_id not serialized: %s", string(line)) + } + decoded, err := Decode(bytes.TrimRight(line, "\n")) + if err != nil { + t.Fatal(err) + } + hello, ok := decoded.(*Hello) + if !ok { + t.Fatalf("expected *Hello, got %T", decoded) + } + if hello.SubscriptionID != msg.SubscriptionID { + t.Errorf("roundtrip subscription_id: got %q want %q", hello.SubscriptionID, msg.SubscriptionID) + } +} + +func TestEncodeDecodeHello_EmptySubscriptionIDOmitted(t *testing.T) { + msg := &Hello{ + Type: MsgTypeHello, + PID: 1, + EventKey: "k", + EventTypes: []string{"k"}, + Version: "v1", + } + buf := &bytes.Buffer{} + if err := Encode(buf, msg); err != nil { + t.Fatal(err) + } + if bytes.Contains(buf.Bytes(), []byte("subscription_id")) { + t.Errorf("empty subscription_id should be omitted: %s", buf.String()) + } + decoded, _ := Decode(bytes.TrimRight(buf.Bytes(), "\n")) + hello := decoded.(*Hello) + if hello.SubscriptionID != "" { + t.Errorf("got %q, want empty", hello.SubscriptionID) + } +} + +func TestEncodeDecodePreShutdownCheck_WithSubscriptionID(t *testing.T) { + msg := &PreShutdownCheck{ + Type: MsgTypePreShutdownCheck, + EventKey: "mail.x", + SubscriptionID: "mail.x:abc", + } + buf := &bytes.Buffer{} + if err := Encode(buf, msg); err != nil { + t.Fatal(err) + } + decoded, err := Decode(bytes.TrimRight(buf.Bytes(), "\n")) + if err != nil { + t.Fatal(err) + } + got := decoded.(*PreShutdownCheck) + if got.SubscriptionID != msg.SubscriptionID { + t.Errorf("roundtrip: got %q want %q", got.SubscriptionID, msg.SubscriptionID) + } +} + +func TestStatusResponse_ConsumerInfo_SubscriptionID(t *testing.T) { + msg := NewStatusResponse(7, 120, 1, []ConsumerInfo{ + {PID: 99, EventKey: "mail.x", SubscriptionID: "mail.x:abc", Received: 5, Dropped: 0}, + }) + buf := &bytes.Buffer{} + if err := Encode(buf, msg); err != nil { + t.Fatal(err) + } + if !bytes.Contains(buf.Bytes(), []byte(`"subscription_id":"mail.x:abc"`)) { + t.Errorf("ConsumerInfo.SubscriptionID missing from JSON: %s", buf.String()) + } +} diff --git a/internal/event/protocol/messages.go b/internal/event/protocol/messages.go index 0effa36c..9274c647 100644 --- a/internal/event/protocol/messages.go +++ b/internal/event/protocol/messages.go @@ -34,11 +34,12 @@ type SourceStatus struct { } type Hello struct { - Type string `json:"type"` - PID int `json:"pid"` - EventKey string `json:"event_key"` - EventTypes []string `json:"event_types"` - Version string `json:"version"` + Type string `json:"type"` + PID int `json:"pid"` + EventKey string `json:"event_key"` + EventTypes []string `json:"event_types"` + Version string `json:"version"` + SubscriptionID string `json:"subscription_id,omitempty"` // empty = fallback to EventKey on bus side } type HelloAck struct { @@ -61,10 +62,11 @@ type Bye struct { Type string `json:"type"` } -// PreShutdownCheck atomically reserves the cleanup lock for EventKey. +// PreShutdownCheck atomically reserves the cleanup lock for (EventKey, SubscriptionID). type PreShutdownCheck struct { - Type string `json:"type"` - EventKey string `json:"event_key"` + Type string `json:"type"` + EventKey string `json:"event_key"` + SubscriptionID string `json:"subscription_id,omitempty"` // empty = fallback to EventKey } type PreShutdownAck struct { @@ -77,10 +79,11 @@ type StatusQuery struct { } type ConsumerInfo struct { - PID int `json:"pid"` - EventKey string `json:"event_key"` - Received int64 `json:"received"` - Dropped int64 `json:"dropped"` + PID int `json:"pid"` + EventKey string `json:"event_key"` + SubscriptionID string `json:"subscription_id,omitempty"` + Received int64 `json:"received"` + Dropped int64 `json:"dropped"` } type StatusResponse struct { @@ -95,13 +98,14 @@ type Shutdown struct { Type string `json:"type"` } -func NewHello(pid int, eventKey string, eventTypes []string, version string) *Hello { +func NewHello(pid int, eventKey string, eventTypes []string, version string, subscriptionID string) *Hello { return &Hello{ - Type: MsgTypeHello, - PID: pid, - EventKey: eventKey, - EventTypes: eventTypes, - Version: version, + Type: MsgTypeHello, + PID: pid, + EventKey: eventKey, + EventTypes: eventTypes, + Version: version, + SubscriptionID: subscriptionID, } } @@ -124,8 +128,8 @@ func NewEvent(eventType, eventID, sourceTime string, seq uint64, payload json.Ra } } -func NewPreShutdownCheck(eventKey string) *PreShutdownCheck { - return &PreShutdownCheck{Type: MsgTypePreShutdownCheck, EventKey: eventKey} +func NewPreShutdownCheck(eventKey, subscriptionID string) *PreShutdownCheck { + return &PreShutdownCheck{Type: MsgTypePreShutdownCheck, EventKey: eventKey, SubscriptionID: subscriptionID} } func NewPreShutdownAck(lastForKey bool) *PreShutdownAck { diff --git a/internal/event/protocol/messages_test.go b/internal/event/protocol/messages_test.go index c6ca2bd9..2893789f 100644 --- a/internal/event/protocol/messages_test.go +++ b/internal/event/protocol/messages_test.go @@ -17,7 +17,7 @@ import ( // Every NewXxx helper must set the Type discriminator (Decode rejects messages without it). func TestConstructors_PinTypeField(t *testing.T) { - if got := NewHello(1, "k", []string{"t"}, "v1"); got.Type != MsgTypeHello { + if got := NewHello(1, "k", []string{"t"}, "v1", ""); got.Type != MsgTypeHello { t.Errorf("NewHello.Type = %q, want %q", got.Type, MsgTypeHello) } if got := NewHelloAck("v1", true); got.Type != MsgTypeHelloAck || !got.FirstForKey { @@ -26,7 +26,7 @@ func TestConstructors_PinTypeField(t *testing.T) { if got := NewEvent("im.msg", "e1", "", 7, json.RawMessage(`{}`)); got.Type != MsgTypeEvent || got.Seq != 7 { t.Errorf("NewEvent mismatch: %+v", got) } - if got := NewPreShutdownCheck("k"); got.Type != MsgTypePreShutdownCheck || got.EventKey != "k" { + if got := NewPreShutdownCheck("k", ""); got.Type != MsgTypePreShutdownCheck || got.EventKey != "k" { t.Errorf("NewPreShutdownCheck mismatch: %+v", got) } if got := NewPreShutdownAck(true); got.Type != MsgTypePreShutdownAck || !got.LastForKey { @@ -63,7 +63,7 @@ func TestEncode_DecodeRoundtripAllTypes(t *testing.T) { } } roundtrip(t, NewHelloAck("v1", true), &HelloAck{}) - roundtrip(t, NewPreShutdownCheck("im.msg"), &PreShutdownCheck{}) + roundtrip(t, NewPreShutdownCheck("im.msg", ""), &PreShutdownCheck{}) roundtrip(t, NewPreShutdownAck(false), &PreShutdownAck{}) roundtrip(t, NewStatusQuery(), &StatusQuery{}) roundtrip(t, NewStatusResponse(7, 120, 1, []ConsumerInfo{{PID: 99, EventKey: "k"}}), &StatusResponse{}) diff --git a/internal/event/types.go b/internal/event/types.go index 4dde301a..1121c11d 100644 --- a/internal/event/types.go +++ b/internal/event/types.go @@ -55,6 +55,23 @@ type ParamDef struct { Default string `json:"default,omitempty"` Description string `json:"description"` Values []ParamValue `json:"values,omitempty"` + + // SubscriptionKey marks this param as part of the subscription identity. + // Two consumers of the same EventKey but different values for any + // SubscriptionKey-marked param are treated as DISTINCT subscriptions: + // PreConsume runs once per (EventKey, SubscriptionID), cleanup runs once per + // (EventKey, SubscriptionID). + // + // CONTRACT: only mark a param SubscriptionKey if the EventKey's server-side + // subscribe/unsubscribe API is itself scoped to that resource. Lark keys the + // subscription record by (app, user, event_type) and overwrites it rather + // than reference-counting, so for a non-per-resource API the cleanup of one + // resource's last consumer unsubscribes the shared record and silently cuts + // off every other resource sharing that event_type. + // + // Default false = the param is a filter / formatting / metadata param + // and does not affect subscription identity. + SubscriptionKey bool `json:"subscription_key,omitempty"` } type ProcessFunc = func(ctx context.Context, rt APIClient, raw *RawEvent, params map[string]string) (json.RawMessage, error) @@ -83,10 +100,44 @@ type KeyDefinition struct { Schema SchemaDef `json:"schema"` + // NormalizeParams canonicalizes param values BEFORE fingerprint compute, + // PreConsume, Match, and Process. Mutates the params map in place. + // May call OAPI; runs once per consumer at startup. + // + // Use cases: resolve aliases ("me" -> real email, a name -> an ID), + // trim whitespace. On error, consume fails (no retry); caller gets the + // wrapped error. + // + // Default nil = no normalization, params pass through unchanged. + NormalizeParams func(ctx context.Context, rt APIClient, params map[string]string) error `json:"-"` + // Process required when Schema.Custom is Processed output; must be nil when Native is used. + // + // Convention: returning (nil, nil) signals "drop this event" — the + // consumer loop will skip writing it to sink and not advance the + // emitted counter. Useful for async filtering (e.g. fetch metadata, + // drop if folder doesn't match). For sync filters that don't need + // OAPI, use Match instead. Process func(ctx context.Context, rt APIClient, raw *RawEvent, params map[string]string) (json.RawMessage, error) `json:"-"` - PreConsume func(ctx context.Context, rt APIClient, params map[string]string) (cleanup func(), err error) `json:"-"` + // Match is a synchronous payload filter run on every received event + // BEFORE Process. Return false to drop the event without further work. + // + // Signature deliberately omits ctx/rt to physically enforce "no OAPI + // calls in Match". For filters that need a metadata fetch first, use + // Process and return nil to drop. + // + // Default nil = accept all events. + Match func(raw *RawEvent, params map[string]string) bool `json:"-"` + + // PreConsume runs once per (EventKey, SubscriptionID) when this consumer + // is first for that scope. Returns a cleanup function that the framework + // invokes when this consumer is the last for its scope. + // + // The cleanup's error return is honored: on nil the framework prints + // "[event] cleanup done."; on non-nil it prints a WARN with an + // idempotency note. + PreConsume func(ctx context.Context, rt APIClient, params map[string]string) (cleanup func() error, err error) `json:"-"` Scopes []string `json:"scopes,omitempty"`