Files
larksuite-cli/shortcuts/event/processor_test.go
梁硕 83dfb068ad feat: open-source lark-cli — the official CLI for Lark/Feishu
Change-Id: I113d9cdb5403cec347efe4595415e34a18b7decf
2026-03-28 10:36:25 +08:00

928 lines
24 KiB
Go

// Copyright (c) 2026 Lark Technologies Pte. Ltd.
// SPDX-License-Identifier: MIT
package event
import (
"bytes"
"context"
"encoding/json"
"io"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"time"
larkevent "github.com/larksuite/oapi-sdk-go/v3/event"
)
// chdirTemp changes cwd to a fresh temp dir for the test duration.
func chdirTemp(t *testing.T) {
t.Helper()
orig, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
dir := t.TempDir()
if err := os.Chdir(dir); err != nil {
t.Fatal(err)
}
t.Cleanup(func() { os.Chdir(orig) })
}
// helper to build a RawEvent from event-level JSON and header fields.
func makeRawEvent(eventType string, eventJSON string) *RawEvent {
return &RawEvent{
Schema: "2.0",
Header: larkevent.EventHeader{
EventType: eventType,
EventID: "ev_test",
},
Event: json.RawMessage(eventJSON),
}
}
// --- Registry ---
func TestRegistryLookup(t *testing.T) {
r := DefaultRegistry()
p := r.Lookup("im.message.receive_v1")
if p.EventType() != "im.message.receive_v1" {
t.Errorf("got %q", p.EventType())
}
p2 := r.Lookup("unknown.type")
if p2.EventType() != "" {
t.Errorf("fallback should have empty EventType, got %q", p2.EventType())
}
}
func TestRegistryDuplicateReturnsError(t *testing.T) {
r := NewProcessorRegistry(&GenericProcessor{})
if err := r.Register(&ImMessageProcessor{}); err != nil {
t.Fatalf("first register should succeed: %v", err)
}
if err := r.Register(&ImMessageProcessor{}); err == nil {
t.Error("expected error on duplicate registration")
}
}
// --- Filters ---
func TestEventTypeFilter(t *testing.T) {
f := NewEventTypeFilter("im.message.receive_v1, drive.file.edit_v1")
if !f.Allow("im.message.receive_v1") {
t.Error("should allow")
}
if f.Allow("unknown.type") {
t.Error("should reject")
}
}
func TestEventTypeFilter_Empty(t *testing.T) {
if f := NewEventTypeFilter(""); f != nil {
t.Error("empty should return nil")
}
}
func TestRegexFilter(t *testing.T) {
f, err := NewRegexFilter("im\\.message\\..*")
if err != nil {
t.Fatal(err)
}
if !f.Allow("im.message.receive_v1") {
t.Error("should match")
}
if f.Allow("drive.file.edit_v1") {
t.Error("should not match")
}
}
func TestRegexFilter_Invalid(t *testing.T) {
_, err := NewRegexFilter("[invalid")
if err == nil {
t.Error("should error")
}
}
func TestFilterChain(t *testing.T) {
etf := NewEventTypeFilter("im.message.receive_v1, drive.file.edit_v1")
rf, _ := NewRegexFilter("im\\..*")
chain := NewFilterChain(etf, rf)
if !chain.Allow("im.message.receive_v1") {
t.Error("both filters pass, should allow")
}
if chain.Allow("drive.file.edit_v1") {
t.Error("regex rejects drive, should block")
}
empty := NewFilterChain()
if !empty.Allow("anything") {
t.Error("empty chain should allow all")
}
var nilChain *FilterChain
if !nilChain.Allow("anything") {
t.Error("nil chain should allow all")
}
}
func TestEventTypeFilter_TypesSorted(t *testing.T) {
f := NewEventTypeFilter("z.type,a.type,m.type")
got := f.Types()
want := []string{"a.type", "m.type", "z.type"}
if !reflect.DeepEqual(got, want) {
t.Errorf("Types() = %v, want %v", got, want)
}
}
// --- Processors ---
func TestImMessageProcessor_Raw(t *testing.T) {
p := &ImMessageProcessor{}
eventJSON := `{"message":{"id":"1"}}`
raw := makeRawEvent("im.message.receive_v1", eventJSON)
result, ok := p.Transform(context.Background(), raw, TransformRaw).(*RawEvent)
if !ok {
t.Fatal("raw mode should return *RawEvent")
}
if result.Header.EventType != "im.message.receive_v1" {
t.Errorf("EventType = %v", result.Header.EventType)
}
if result.Schema != "2.0" {
t.Errorf("Schema = %v", result.Schema)
}
}
func TestGenericProcessor_Compact(t *testing.T) {
p := &GenericProcessor{}
eventJSON := `{"file_token":"xxx"}`
raw := makeRawEvent("drive.file.edit_v1", eventJSON)
result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{})
if !ok {
t.Fatal("compact should return map[string]interface{}")
}
if result["file_token"] != "xxx" {
t.Error("file_token should be preserved")
}
if result["type"] != "drive.file.edit_v1" {
t.Errorf("type = %v, want drive.file.edit_v1", result["type"])
}
if result["event_id"] != "ev_test" {
t.Errorf("event_id = %v, want ev_test", result["event_id"])
}
}
func TestGenericProcessor_Raw(t *testing.T) {
p := &GenericProcessor{}
eventJSON := `{"schema":"2.0"}`
raw := makeRawEvent("drive.file.edit_v1", eventJSON)
result, ok := p.Transform(context.Background(), raw, TransformRaw).(*RawEvent)
if !ok {
t.Fatal("raw mode should return *RawEvent")
}
if result.Header.EventType != "drive.file.edit_v1" {
t.Errorf("EventType = %v", result.Header.EventType)
}
}
// --- Pipeline ---
func TestPipeline_Raw(t *testing.T) {
filters := NewFilterChain()
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformRaw}, &out, &errOut)
eventJSON := `{"file_token":"xxx"}`
raw := makeRawEvent("drive.file.edit_v1", eventJSON)
raw.Header.EventID = "ev_raw"
raw.Header.CreateTime = "1700000000"
raw.Header.AppID = "cli_test"
p.Process(context.Background(), raw)
// Raw output should be the complete original event (schema + header + event)
var outputMap map[string]interface{}
if err := json.Unmarshal(out.Bytes(), &outputMap); err != nil {
t.Fatalf("failed to parse output: %v", err)
}
if outputMap["schema"] != "2.0" {
t.Errorf("schema = %v, want 2.0", outputMap["schema"])
}
header, ok := outputMap["header"].(map[string]interface{})
if !ok {
t.Fatal("raw output should contain header object")
}
if header["event_type"] != "drive.file.edit_v1" {
t.Errorf("header.event_type = %v", header["event_type"])
}
if header["app_id"] != "cli_test" {
t.Errorf("header.app_id = %v, want cli_test", header["app_id"])
}
}
func TestPipeline_Filtered(t *testing.T) {
filters := NewFilterChain(NewEventTypeFilter("im.message.receive_v1"))
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{}, &out, &errOut)
raw := makeRawEvent("drive.file.edit_v1", `{}`)
p.Process(context.Background(), raw)
if p.EventCount() != 0 {
t.Errorf("filtered event should not be counted")
}
if out.Len() != 0 {
t.Error("filtered event should produce no output")
}
}
func TestDeduplicateKey(t *testing.T) {
raw := makeRawEvent("im.message.receive_v1", `{}`)
if k := (&ImMessageProcessor{}).DeduplicateKey(raw); k != "ev_test" {
t.Errorf("ImMessageProcessor got %q, want ev_test", k)
}
if k := (&GenericProcessor{}).DeduplicateKey(raw); k != "ev_test" {
t.Errorf("GenericProcessor got %q, want ev_test", k)
}
}
func TestPipeline_Dedup(t *testing.T) {
filters := NewFilterChain()
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformRaw}, &out, &errOut)
raw := makeRawEvent("im.message.receive_v1", `{"message":{"id":"1"}}`)
// First event should pass
p.Process(context.Background(), raw)
if p.EventCount() != 1 {
t.Fatalf("EventCount = %d, want 1", p.EventCount())
}
firstLen := out.Len()
if firstLen == 0 {
t.Fatal("expected output from first event")
}
// Same event_id again should be deduped
p.Process(context.Background(), raw)
if p.EventCount() != 1 {
t.Errorf("EventCount = %d, want 1 (deduped)", p.EventCount())
}
if out.Len() != firstLen {
t.Error("duplicate event should produce no additional output")
}
// Different event_id should pass
raw2 := makeRawEvent("im.message.receive_v1", `{"message":{"id":"2"}}`)
raw2.Header.EventID = "ev_other"
p.Process(context.Background(), raw2)
if p.EventCount() != 2 {
t.Errorf("EventCount = %d, want 2", p.EventCount())
}
}
// --- Pipeline: OutputDir ---
func TestPipeline_OutputDir(t *testing.T) {
dir := t.TempDir()
filters := NewFilterChain()
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformCompact, OutputDir: dir}, &out, &errOut)
if err := p.EnsureDirs(); err != nil {
t.Fatal(err)
}
eventJSON := `{
"message": {
"message_id": "msg_file", "chat_id": "oc_001",
"chat_type": "group", "message_type": "text",
"content": "{\"text\":\"file test\"}", "create_time": "1700000000"
},
"sender": {"sender_id": {"open_id": "ou_001"}}
}`
raw := makeRawEvent("im.message.receive_v1", eventJSON)
raw.Header.EventID = "ev_file"
raw.Header.CreateTime = "1700000000"
p.Process(context.Background(), raw)
// stdout should be empty (output goes to file)
if out.Len() != 0 {
t.Error("OutputDir mode should not write to stdout")
}
// Verify file was created
entries, err := os.ReadDir(dir)
if err != nil {
t.Fatal(err)
}
if len(entries) != 1 {
t.Fatalf("expected 1 file, got %d", len(entries))
}
// Verify file content is valid JSON
data, err := os.ReadFile(filepath.Join(dir, entries[0].Name()))
if err != nil {
t.Fatal(err)
}
var m map[string]interface{}
if err := json.Unmarshal(data, &m); err != nil {
t.Fatalf("file content is not valid JSON: %v", err)
}
if m["type"] != "im.message.receive_v1" {
t.Errorf("type = %v", m["type"])
}
}
// --- Pipeline: JsonFlag ---
func TestPipeline_JsonFlag(t *testing.T) {
filters := NewFilterChain()
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformRaw, JsonFlag: true}, &out, &errOut)
raw := makeRawEvent("drive.file.edit_v1", `{"key":"val"}`)
p.Process(context.Background(), raw)
// --json output should be pretty-printed (contain newlines + indentation)
output := out.String()
if !strings.Contains(output, "\n") {
t.Error("--json output should be pretty-printed")
}
var m map[string]interface{}
if err := json.Unmarshal([]byte(output), &m); err != nil {
t.Fatalf("output is not valid JSON: %v", err)
}
}
// --- Pipeline: Quiet ---
func TestPipeline_Quiet(t *testing.T) {
filters := NewFilterChain()
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformRaw, Quiet: true}, &out, &errOut)
raw := makeRawEvent("im.message.receive_v1", `{}`)
p.Process(context.Background(), raw)
if errOut.Len() != 0 {
t.Errorf("quiet mode should suppress stderr, got: %s", errOut.String())
}
}
// --- writeEventFile ---
func TestWriteEventFile(t *testing.T) {
dir := t.TempDir()
header := larkevent.EventHeader{
EventType: "im.message.receive_v1",
EventID: "ev_write",
CreateTime: "1700000000",
}
data := map[string]string{"hello": "world"}
path, err := writeEventFile(dir, data, header)
if err != nil {
t.Fatal(err)
}
if !strings.Contains(path, "ev_write") {
t.Errorf("path should contain event ID, got: %s", path)
}
content, err := os.ReadFile(path)
if err != nil {
t.Fatal(err)
}
if !strings.Contains(string(content), `"hello"`) {
t.Error("file should contain data")
}
}
func TestWriteEventFile_EmptyFields(t *testing.T) {
dir := t.TempDir()
header := larkevent.EventHeader{EventType: "test.type"}
_, err := writeEventFile(dir, "data", header)
if err != nil {
t.Fatal(err)
}
entries, _ := os.ReadDir(dir)
if len(entries) != 1 {
t.Fatal("expected 1 file")
}
name := entries[0].Name()
if !strings.Contains(name, "unknown") {
t.Errorf("empty EventID should fallback to 'unknown', got: %s", name)
}
}
// --- stderrLogger ---
func TestStderrLogger(t *testing.T) {
var buf bytes.Buffer
l := &stderrLogger{w: &buf, quiet: false}
l.Debug(context.Background(), "debug msg")
if buf.Len() != 0 {
t.Error("Debug should always be suppressed")
}
l.Info(context.Background(), "info msg")
if !strings.Contains(buf.String(), "info msg") {
t.Error("Info should print when not quiet")
}
buf.Reset()
l.Warn(context.Background(), "warn msg")
if !strings.Contains(buf.String(), "warn msg") {
t.Error("Warn should always print")
}
buf.Reset()
l.Error(context.Background(), "error msg")
if !strings.Contains(buf.String(), "error msg") {
t.Error("Error should always print")
}
}
func TestStderrLogger_Quiet(t *testing.T) {
var buf bytes.Buffer
l := &stderrLogger{w: &buf, quiet: true}
l.Info(context.Background(), "info msg")
if buf.Len() != 0 {
t.Error("Info should be suppressed when quiet")
}
l.Warn(context.Background(), "warn msg")
if !strings.Contains(buf.String(), "warn msg") {
t.Error("Warn should print even when quiet")
}
}
// --- RegexFilter.String ---
func TestRegexFilter_String(t *testing.T) {
f, _ := NewRegexFilter("im\\..*")
if f.String() != "im\\..*" {
t.Errorf("String() = %v", f.String())
}
}
// --- WindowStrategy ---
func TestWindowStrategy(t *testing.T) {
im := &ImMessageProcessor{}
if im.WindowStrategy() != (WindowConfig{}) {
t.Error("should return zero WindowConfig")
}
gen := &GenericProcessor{}
if gen.WindowStrategy() != (WindowConfig{}) {
t.Error("should return zero WindowConfig")
}
}
// --- Shortcuts ---
func TestShortcuts(t *testing.T) {
s := Shortcuts()
if len(s) == 0 {
t.Fatal("should return at least one shortcut")
}
if s[0].Command != "+subscribe" {
t.Errorf("first shortcut command = %q", s[0].Command)
}
}
// --- Compact unmarshal error fallback ---
func TestImMessageProcessor_CompactUnmarshalError(t *testing.T) {
p := &ImMessageProcessor{}
raw := makeRawEvent("im.message.receive_v1", `not valid json`)
result, ok := p.Transform(context.Background(), raw, TransformCompact).(*RawEvent)
if !ok {
t.Fatal("unmarshal error should fallback to *RawEvent")
}
if result.Header.EventType != "im.message.receive_v1" {
t.Errorf("EventType = %v", result.Header.EventType)
}
}
func TestImMessageProcessor_CompactInteractiveFallsBackToRaw(t *testing.T) {
p := &ImMessageProcessor{}
raw := makeRawEvent("im.message.receive_v1", `{
"message": {
"message_id": "om_interactive",
"message_type": "interactive",
"content": "{\"type\":\"template\"}"
}
}`)
origStderr := os.Stderr
r, w, err := os.Pipe()
if err != nil {
t.Fatalf("os.Pipe() error = %v", err)
}
os.Stderr = w
defer func() {
os.Stderr = origStderr
}()
result, ok := p.Transform(context.Background(), raw, TransformCompact).(*RawEvent)
if err := w.Close(); err != nil {
t.Fatalf("stderr close error = %v", err)
}
hint, readErr := io.ReadAll(r)
if readErr != nil {
t.Fatalf("ReadAll(stderr) error = %v", readErr)
}
if !ok {
t.Fatal("interactive compact conversion should fallback to *RawEvent")
}
if result != raw {
t.Fatal("interactive compact conversion should return the original raw event")
}
if !strings.Contains(string(hint), "interactive") || !strings.Contains(string(hint), "returning raw event data") {
t.Fatalf("stderr hint = %q, want interactive fallback message", string(hint))
}
}
func TestGenericProcessor_CompactUnmarshalError(t *testing.T) {
p := &GenericProcessor{}
raw := makeRawEvent("some.type", `not valid json`)
result, ok := p.Transform(context.Background(), raw, TransformCompact).(*RawEvent)
if !ok {
t.Fatal("unmarshal error should fallback to *RawEvent")
}
if result.Header.EventType != "some.type" {
t.Errorf("EventType = %v", result.Header.EventType)
}
}
// --- Router ---
func TestParseRoutes(t *testing.T) {
routes, err := ParseRoutes([]string{
`^im\.message=dir:./messages/`,
`^contact\.=dir:./contacts/`,
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if routes == nil {
t.Fatal("expected non-nil router")
}
if len(routes.routes) != 2 {
t.Errorf("expected 2 routes, got %d", len(routes.routes))
}
}
func TestParseRoutes_Empty(t *testing.T) {
routes, err := ParseRoutes(nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if routes != nil {
t.Error("expected nil router for empty input")
}
routes2, err2 := ParseRoutes([]string{})
if err2 != nil {
t.Fatalf("unexpected error: %v", err2)
}
if routes2 != nil {
t.Error("expected nil router for empty slice")
}
}
func TestParseRoutes_MissingEquals(t *testing.T) {
_, err := ParseRoutes([]string{"no-equals-sign"})
if err == nil {
t.Error("expected error for missing =")
}
}
func TestParseRoutes_InvalidRegex(t *testing.T) {
_, err := ParseRoutes([]string{"[invalid=dir:./foo/"})
if err == nil {
t.Error("expected error for invalid regex")
}
}
func TestParseRoutes_MissingPrefix(t *testing.T) {
_, err := ParseRoutes([]string{`^im\.message=./messages/`})
if err == nil {
t.Error("expected error for missing dir: prefix")
}
if !strings.Contains(err.Error(), "dir:") {
t.Errorf("error should mention dir: prefix, got: %v", err)
}
}
func TestParseRoutes_EmptyPath(t *testing.T) {
_, err := ParseRoutes([]string{`^im\.message=dir:`})
if err == nil {
t.Error("expected error for empty path")
}
}
func TestParseRoutes_RejectsAbsolutePath(t *testing.T) {
_, err := ParseRoutes([]string{`^test=dir:/tmp/evil`})
if err == nil {
t.Error("expected error for absolute path in route")
}
}
func TestParseRoutes_RejectsTraversal(t *testing.T) {
_, err := ParseRoutes([]string{`^test=dir:../../etc/evil`})
if err == nil {
t.Error("expected error for path traversal in route")
}
}
func TestParseRoutes_PathSafety(t *testing.T) {
routes, err := ParseRoutes([]string{`^test=dir:./foo/../bar/`})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
dir := routes.routes[0].dir
if !filepath.IsAbs(dir) {
t.Errorf("expected absolute path, got %s", dir)
}
if strings.Contains(dir, "..") {
t.Errorf("expected cleaned path without .., got %s", dir)
}
}
func TestEventRouter_Match(t *testing.T) {
chdirTemp(t)
router, err := ParseRoutes([]string{
`^im\.message=dir:./test_messages`,
`^contact\.=dir:./test_contacts`,
})
if err != nil {
t.Fatal(err)
}
// Single match
dirs := router.Match("im.message.receive_v1")
if len(dirs) != 1 {
t.Errorf("expected 1 match, got %v", dirs)
}
dirs = router.Match("contact.user.created_v3")
if len(dirs) != 1 {
t.Errorf("expected 1 match, got %v", dirs)
}
// No match
dirs = router.Match("drive.file.edit_v1")
if len(dirs) != 0 {
t.Errorf("expected no match, got %v", dirs)
}
}
func TestEventRouter_Match_FanOut(t *testing.T) {
chdirTemp(t)
router, err := ParseRoutes([]string{
`^im\.=dir:./test_im`,
`message=dir:./test_msg`,
})
if err != nil {
t.Fatal(err)
}
// "im.message.receive_v1" matches both patterns
dirs := router.Match("im.message.receive_v1")
if len(dirs) != 2 {
t.Errorf("expected 2 matches (fan-out), got %d: %v", len(dirs), dirs)
}
}
// --- Pipeline: Route ---
func TestPipeline_Route(t *testing.T) {
chdirTemp(t)
router, err := ParseRoutes([]string{
`^im\.message=dir:./route_out`,
})
if err != nil {
t.Fatal(err)
}
dir := router.routes[0].dir
filters := NewFilterChain()
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformCompact, Router: router}, &out, &errOut)
if err := p.EnsureDirs(); err != nil {
t.Fatal(err)
}
eventJSON := `{
"message": {
"message_id": "msg_route", "chat_id": "oc_001",
"chat_type": "group", "message_type": "text",
"content": "{\"text\":\"routed\"}", "create_time": "1700000000"
},
"sender": {"sender_id": {"open_id": "ou_001"}}
}`
raw := makeRawEvent("im.message.receive_v1", eventJSON)
raw.Header.EventID = "ev_route"
raw.Header.CreateTime = "1700000000"
p.Process(context.Background(), raw)
// stdout should be empty — output goes to route dir
if out.Len() != 0 {
t.Errorf("routed event should not appear on stdout, got: %s", out.String())
}
// Verify file was created in route dir
entries, err := os.ReadDir(dir)
if err != nil {
t.Fatal(err)
}
if len(entries) != 1 {
t.Fatalf("expected 1 file in route dir, got %d", len(entries))
}
data, err := os.ReadFile(filepath.Join(dir, entries[0].Name()))
if err != nil {
t.Fatal(err)
}
var m map[string]interface{}
if err := json.Unmarshal(data, &m); err != nil {
t.Fatalf("file content is not valid JSON: %v", err)
}
if m["type"] != "im.message.receive_v1" {
t.Errorf("type = %v", m["type"])
}
}
func TestPipeline_Route_NoMatch(t *testing.T) {
chdirTemp(t)
fallbackDir := t.TempDir()
router, err := ParseRoutes([]string{
`^im\.message=dir:./route_dir`,
})
if err != nil {
t.Fatal(err)
}
routeDir := router.routes[0].dir
filters := NewFilterChain()
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformCompact, Router: router, OutputDir: fallbackDir}, &out, &errOut)
if err := p.EnsureDirs(); err != nil {
t.Fatal(err)
}
// Send an event that does NOT match the route
raw := makeRawEvent("drive.file.edit_v1", `{"file_token":"xxx"}`)
raw.Header.EventID = "ev_nomatch"
raw.Header.CreateTime = "1700000000"
p.Process(context.Background(), raw)
// stdout should be empty
if out.Len() != 0 {
t.Errorf("should not appear on stdout, got: %s", out.String())
}
// Route dir should be empty
routeEntries, _ := os.ReadDir(routeDir)
if len(routeEntries) != 0 {
t.Errorf("route dir should be empty, got %d files", len(routeEntries))
}
// Fallback dir should have the file
fallbackEntries, _ := os.ReadDir(fallbackDir)
if len(fallbackEntries) != 1 {
t.Fatalf("fallback dir should have 1 file, got %d", len(fallbackEntries))
}
}
func TestPipeline_Route_NoMatch_Stdout(t *testing.T) {
chdirTemp(t)
router, err := ParseRoutes([]string{
`^im\.message=dir:./route_dir`,
})
if err != nil {
t.Fatal(err)
}
routeDir := router.routes[0].dir
filters := NewFilterChain()
var out, errOut bytes.Buffer
// No OutputDir — unmatched events should go to stdout
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformRaw, Router: router}, &out, &errOut)
if err := p.EnsureDirs(); err != nil {
t.Fatal(err)
}
raw := makeRawEvent("drive.file.edit_v1", `{"file_token":"xxx"}`)
raw.Header.EventID = "ev_stdout"
raw.Header.CreateTime = "1700000000"
p.Process(context.Background(), raw)
// Route dir should be empty
routeEntries, _ := os.ReadDir(routeDir)
if len(routeEntries) != 0 {
t.Errorf("route dir should be empty, got %d files", len(routeEntries))
}
// stdout should have the event
if out.Len() == 0 {
t.Error("unmatched event should fall through to stdout")
}
var m map[string]interface{}
if err := json.Unmarshal(out.Bytes(), &m); err != nil {
t.Fatalf("stdout is not valid JSON: %v", err)
}
}
func TestPipeline_Route_FanOut(t *testing.T) {
chdirTemp(t)
router, err := ParseRoutes([]string{
`^im\.=dir:./fanout1`,
`message=dir:./fanout2`,
})
if err != nil {
t.Fatal(err)
}
dir1 := router.routes[0].dir
dir2 := router.routes[1].dir
filters := NewFilterChain()
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformCompact, Router: router}, &out, &errOut)
if err := p.EnsureDirs(); err != nil {
t.Fatal(err)
}
eventJSON := `{
"message": {
"message_id": "msg_fanout", "chat_id": "oc_001",
"chat_type": "group", "message_type": "text",
"content": "{\"text\":\"fanout\"}", "create_time": "1700000000"
},
"sender": {"sender_id": {"open_id": "ou_001"}}
}`
raw := makeRawEvent("im.message.receive_v1", eventJSON)
raw.Header.EventID = "ev_fanout"
raw.Header.CreateTime = "1700000000"
p.Process(context.Background(), raw)
// stdout should be empty
if out.Len() != 0 {
t.Errorf("fan-out event should not appear on stdout, got: %s", out.String())
}
// Both dirs should have a file
entries1, _ := os.ReadDir(dir1)
entries2, _ := os.ReadDir(dir2)
if len(entries1) != 1 {
t.Errorf("dir1 should have 1 file, got %d", len(entries1))
}
if len(entries2) != 1 {
t.Errorf("dir2 should have 1 file, got %d", len(entries2))
}
}
// --- cleanupSeen ---
func TestCleanupSeen(t *testing.T) {
filters := NewFilterChain()
var out, errOut bytes.Buffer
p := NewEventPipeline(DefaultRegistry(), filters,
PipelineConfig{Mode: TransformRaw}, &out, &errOut)
// Insert an expired entry directly
p.seen.Store("old_key", time.Now().Add(-10*time.Minute))
p.seen.Store("fresh_key", time.Now())
p.cleanupSeen(time.Now())
if _, ok := p.seen.Load("old_key"); ok {
t.Error("expired key should be cleaned up")
}
if _, ok := p.seen.Load("fresh_key"); !ok {
t.Error("fresh key should be kept")
}
}