mirror of
https://github.com/larksuite/cli.git
synced 2026-07-03 22:24:31 +08:00
928 lines
24 KiB
Go
928 lines
24 KiB
Go
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package event
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
larkevent "github.com/larksuite/oapi-sdk-go/v3/event"
|
|
)
|
|
|
|
// chdirTemp changes cwd to a fresh temp dir for the test duration.
|
|
func chdirTemp(t *testing.T) {
|
|
t.Helper()
|
|
orig, err := os.Getwd()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
dir := t.TempDir()
|
|
if err := os.Chdir(dir); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(func() { os.Chdir(orig) })
|
|
}
|
|
|
|
// helper to build a RawEvent from event-level JSON and header fields.
|
|
func makeRawEvent(eventType string, eventJSON string) *RawEvent {
|
|
return &RawEvent{
|
|
Schema: "2.0",
|
|
Header: larkevent.EventHeader{
|
|
EventType: eventType,
|
|
EventID: "ev_test",
|
|
},
|
|
Event: json.RawMessage(eventJSON),
|
|
}
|
|
}
|
|
|
|
// --- Registry ---
|
|
|
|
func TestRegistryLookup(t *testing.T) {
|
|
r := DefaultRegistry()
|
|
p := r.Lookup("im.message.receive_v1")
|
|
if p.EventType() != "im.message.receive_v1" {
|
|
t.Errorf("got %q", p.EventType())
|
|
}
|
|
p2 := r.Lookup("unknown.type")
|
|
if p2.EventType() != "" {
|
|
t.Errorf("fallback should have empty EventType, got %q", p2.EventType())
|
|
}
|
|
}
|
|
|
|
func TestRegistryDuplicateReturnsError(t *testing.T) {
|
|
r := NewProcessorRegistry(&GenericProcessor{})
|
|
if err := r.Register(&ImMessageProcessor{}); err != nil {
|
|
t.Fatalf("first register should succeed: %v", err)
|
|
}
|
|
if err := r.Register(&ImMessageProcessor{}); err == nil {
|
|
t.Error("expected error on duplicate registration")
|
|
}
|
|
}
|
|
|
|
// --- Filters ---
|
|
|
|
func TestEventTypeFilter(t *testing.T) {
|
|
f := NewEventTypeFilter("im.message.receive_v1, drive.file.edit_v1")
|
|
if !f.Allow("im.message.receive_v1") {
|
|
t.Error("should allow")
|
|
}
|
|
if f.Allow("unknown.type") {
|
|
t.Error("should reject")
|
|
}
|
|
}
|
|
|
|
func TestEventTypeFilter_Empty(t *testing.T) {
|
|
if f := NewEventTypeFilter(""); f != nil {
|
|
t.Error("empty should return nil")
|
|
}
|
|
}
|
|
|
|
func TestRegexFilter(t *testing.T) {
|
|
f, err := NewRegexFilter("im\\.message\\..*")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if !f.Allow("im.message.receive_v1") {
|
|
t.Error("should match")
|
|
}
|
|
if f.Allow("drive.file.edit_v1") {
|
|
t.Error("should not match")
|
|
}
|
|
}
|
|
|
|
func TestRegexFilter_Invalid(t *testing.T) {
|
|
_, err := NewRegexFilter("[invalid")
|
|
if err == nil {
|
|
t.Error("should error")
|
|
}
|
|
}
|
|
|
|
func TestFilterChain(t *testing.T) {
|
|
etf := NewEventTypeFilter("im.message.receive_v1, drive.file.edit_v1")
|
|
rf, _ := NewRegexFilter("im\\..*")
|
|
chain := NewFilterChain(etf, rf)
|
|
|
|
if !chain.Allow("im.message.receive_v1") {
|
|
t.Error("both filters pass, should allow")
|
|
}
|
|
if chain.Allow("drive.file.edit_v1") {
|
|
t.Error("regex rejects drive, should block")
|
|
}
|
|
|
|
empty := NewFilterChain()
|
|
if !empty.Allow("anything") {
|
|
t.Error("empty chain should allow all")
|
|
}
|
|
|
|
var nilChain *FilterChain
|
|
if !nilChain.Allow("anything") {
|
|
t.Error("nil chain should allow all")
|
|
}
|
|
}
|
|
|
|
func TestEventTypeFilter_TypesSorted(t *testing.T) {
|
|
f := NewEventTypeFilter("z.type,a.type,m.type")
|
|
got := f.Types()
|
|
want := []string{"a.type", "m.type", "z.type"}
|
|
if !reflect.DeepEqual(got, want) {
|
|
t.Errorf("Types() = %v, want %v", got, want)
|
|
}
|
|
}
|
|
|
|
// --- Processors ---
|
|
|
|
func TestImMessageProcessor_Raw(t *testing.T) {
|
|
p := &ImMessageProcessor{}
|
|
eventJSON := `{"message":{"id":"1"}}`
|
|
raw := makeRawEvent("im.message.receive_v1", eventJSON)
|
|
result, ok := p.Transform(context.Background(), raw, TransformRaw).(*RawEvent)
|
|
if !ok {
|
|
t.Fatal("raw mode should return *RawEvent")
|
|
}
|
|
if result.Header.EventType != "im.message.receive_v1" {
|
|
t.Errorf("EventType = %v", result.Header.EventType)
|
|
}
|
|
if result.Schema != "2.0" {
|
|
t.Errorf("Schema = %v", result.Schema)
|
|
}
|
|
}
|
|
|
|
func TestGenericProcessor_Compact(t *testing.T) {
|
|
p := &GenericProcessor{}
|
|
eventJSON := `{"file_token":"xxx"}`
|
|
raw := makeRawEvent("drive.file.edit_v1", eventJSON)
|
|
result, ok := p.Transform(context.Background(), raw, TransformCompact).(map[string]interface{})
|
|
if !ok {
|
|
t.Fatal("compact should return map[string]interface{}")
|
|
}
|
|
if result["file_token"] != "xxx" {
|
|
t.Error("file_token should be preserved")
|
|
}
|
|
if result["type"] != "drive.file.edit_v1" {
|
|
t.Errorf("type = %v, want drive.file.edit_v1", result["type"])
|
|
}
|
|
if result["event_id"] != "ev_test" {
|
|
t.Errorf("event_id = %v, want ev_test", result["event_id"])
|
|
}
|
|
}
|
|
|
|
func TestGenericProcessor_Raw(t *testing.T) {
|
|
p := &GenericProcessor{}
|
|
eventJSON := `{"schema":"2.0"}`
|
|
raw := makeRawEvent("drive.file.edit_v1", eventJSON)
|
|
result, ok := p.Transform(context.Background(), raw, TransformRaw).(*RawEvent)
|
|
if !ok {
|
|
t.Fatal("raw mode should return *RawEvent")
|
|
}
|
|
if result.Header.EventType != "drive.file.edit_v1" {
|
|
t.Errorf("EventType = %v", result.Header.EventType)
|
|
}
|
|
}
|
|
|
|
// --- Pipeline ---
|
|
|
|
func TestPipeline_Raw(t *testing.T) {
|
|
filters := NewFilterChain()
|
|
var out, errOut bytes.Buffer
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{Mode: TransformRaw}, &out, &errOut)
|
|
|
|
eventJSON := `{"file_token":"xxx"}`
|
|
raw := makeRawEvent("drive.file.edit_v1", eventJSON)
|
|
raw.Header.EventID = "ev_raw"
|
|
raw.Header.CreateTime = "1700000000"
|
|
raw.Header.AppID = "cli_test"
|
|
p.Process(context.Background(), raw)
|
|
|
|
// Raw output should be the complete original event (schema + header + event)
|
|
var outputMap map[string]interface{}
|
|
if err := json.Unmarshal(out.Bytes(), &outputMap); err != nil {
|
|
t.Fatalf("failed to parse output: %v", err)
|
|
}
|
|
if outputMap["schema"] != "2.0" {
|
|
t.Errorf("schema = %v, want 2.0", outputMap["schema"])
|
|
}
|
|
header, ok := outputMap["header"].(map[string]interface{})
|
|
if !ok {
|
|
t.Fatal("raw output should contain header object")
|
|
}
|
|
if header["event_type"] != "drive.file.edit_v1" {
|
|
t.Errorf("header.event_type = %v", header["event_type"])
|
|
}
|
|
if header["app_id"] != "cli_test" {
|
|
t.Errorf("header.app_id = %v, want cli_test", header["app_id"])
|
|
}
|
|
}
|
|
|
|
func TestPipeline_Filtered(t *testing.T) {
|
|
filters := NewFilterChain(NewEventTypeFilter("im.message.receive_v1"))
|
|
var out, errOut bytes.Buffer
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{}, &out, &errOut)
|
|
|
|
raw := makeRawEvent("drive.file.edit_v1", `{}`)
|
|
p.Process(context.Background(), raw)
|
|
|
|
if p.EventCount() != 0 {
|
|
t.Errorf("filtered event should not be counted")
|
|
}
|
|
if out.Len() != 0 {
|
|
t.Error("filtered event should produce no output")
|
|
}
|
|
}
|
|
|
|
func TestDeduplicateKey(t *testing.T) {
|
|
raw := makeRawEvent("im.message.receive_v1", `{}`)
|
|
if k := (&ImMessageProcessor{}).DeduplicateKey(raw); k != "ev_test" {
|
|
t.Errorf("ImMessageProcessor got %q, want ev_test", k)
|
|
}
|
|
if k := (&GenericProcessor{}).DeduplicateKey(raw); k != "ev_test" {
|
|
t.Errorf("GenericProcessor got %q, want ev_test", k)
|
|
}
|
|
}
|
|
|
|
func TestPipeline_Dedup(t *testing.T) {
|
|
filters := NewFilterChain()
|
|
var out, errOut bytes.Buffer
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{Mode: TransformRaw}, &out, &errOut)
|
|
|
|
raw := makeRawEvent("im.message.receive_v1", `{"message":{"id":"1"}}`)
|
|
|
|
// First event should pass
|
|
p.Process(context.Background(), raw)
|
|
if p.EventCount() != 1 {
|
|
t.Fatalf("EventCount = %d, want 1", p.EventCount())
|
|
}
|
|
firstLen := out.Len()
|
|
if firstLen == 0 {
|
|
t.Fatal("expected output from first event")
|
|
}
|
|
|
|
// Same event_id again should be deduped
|
|
p.Process(context.Background(), raw)
|
|
if p.EventCount() != 1 {
|
|
t.Errorf("EventCount = %d, want 1 (deduped)", p.EventCount())
|
|
}
|
|
if out.Len() != firstLen {
|
|
t.Error("duplicate event should produce no additional output")
|
|
}
|
|
|
|
// Different event_id should pass
|
|
raw2 := makeRawEvent("im.message.receive_v1", `{"message":{"id":"2"}}`)
|
|
raw2.Header.EventID = "ev_other"
|
|
p.Process(context.Background(), raw2)
|
|
if p.EventCount() != 2 {
|
|
t.Errorf("EventCount = %d, want 2", p.EventCount())
|
|
}
|
|
}
|
|
|
|
// --- Pipeline: OutputDir ---
|
|
|
|
func TestPipeline_OutputDir(t *testing.T) {
|
|
dir := t.TempDir()
|
|
filters := NewFilterChain()
|
|
var out, errOut bytes.Buffer
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{Mode: TransformCompact, OutputDir: dir}, &out, &errOut)
|
|
if err := p.EnsureDirs(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
eventJSON := `{
|
|
"message": {
|
|
"message_id": "msg_file", "chat_id": "oc_001",
|
|
"chat_type": "group", "message_type": "text",
|
|
"content": "{\"text\":\"file test\"}", "create_time": "1700000000"
|
|
},
|
|
"sender": {"sender_id": {"open_id": "ou_001"}}
|
|
}`
|
|
raw := makeRawEvent("im.message.receive_v1", eventJSON)
|
|
raw.Header.EventID = "ev_file"
|
|
raw.Header.CreateTime = "1700000000"
|
|
p.Process(context.Background(), raw)
|
|
|
|
// stdout should be empty (output goes to file)
|
|
if out.Len() != 0 {
|
|
t.Error("OutputDir mode should not write to stdout")
|
|
}
|
|
|
|
// Verify file was created
|
|
entries, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(entries) != 1 {
|
|
t.Fatalf("expected 1 file, got %d", len(entries))
|
|
}
|
|
|
|
// Verify file content is valid JSON
|
|
data, err := os.ReadFile(filepath.Join(dir, entries[0].Name()))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
var m map[string]interface{}
|
|
if err := json.Unmarshal(data, &m); err != nil {
|
|
t.Fatalf("file content is not valid JSON: %v", err)
|
|
}
|
|
if m["type"] != "im.message.receive_v1" {
|
|
t.Errorf("type = %v", m["type"])
|
|
}
|
|
}
|
|
|
|
// --- Pipeline: JsonFlag ---
|
|
|
|
func TestPipeline_JsonFlag(t *testing.T) {
|
|
filters := NewFilterChain()
|
|
var out, errOut bytes.Buffer
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{Mode: TransformRaw, JsonFlag: true}, &out, &errOut)
|
|
|
|
raw := makeRawEvent("drive.file.edit_v1", `{"key":"val"}`)
|
|
p.Process(context.Background(), raw)
|
|
|
|
// --json output should be pretty-printed (contain newlines + indentation)
|
|
output := out.String()
|
|
if !strings.Contains(output, "\n") {
|
|
t.Error("--json output should be pretty-printed")
|
|
}
|
|
|
|
var m map[string]interface{}
|
|
if err := json.Unmarshal([]byte(output), &m); err != nil {
|
|
t.Fatalf("output is not valid JSON: %v", err)
|
|
}
|
|
}
|
|
|
|
// --- Pipeline: Quiet ---
|
|
|
|
func TestPipeline_Quiet(t *testing.T) {
|
|
filters := NewFilterChain()
|
|
var out, errOut bytes.Buffer
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{Mode: TransformRaw, Quiet: true}, &out, &errOut)
|
|
|
|
raw := makeRawEvent("im.message.receive_v1", `{}`)
|
|
p.Process(context.Background(), raw)
|
|
|
|
if errOut.Len() != 0 {
|
|
t.Errorf("quiet mode should suppress stderr, got: %s", errOut.String())
|
|
}
|
|
}
|
|
|
|
// --- writeEventFile ---
|
|
|
|
func TestWriteEventFile(t *testing.T) {
|
|
dir := t.TempDir()
|
|
header := larkevent.EventHeader{
|
|
EventType: "im.message.receive_v1",
|
|
EventID: "ev_write",
|
|
CreateTime: "1700000000",
|
|
}
|
|
data := map[string]string{"hello": "world"}
|
|
|
|
path, err := writeEventFile(dir, data, header)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if !strings.Contains(path, "ev_write") {
|
|
t.Errorf("path should contain event ID, got: %s", path)
|
|
}
|
|
|
|
content, err := os.ReadFile(path)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if !strings.Contains(string(content), `"hello"`) {
|
|
t.Error("file should contain data")
|
|
}
|
|
}
|
|
|
|
func TestWriteEventFile_EmptyFields(t *testing.T) {
|
|
dir := t.TempDir()
|
|
header := larkevent.EventHeader{EventType: "test.type"}
|
|
_, err := writeEventFile(dir, "data", header)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
entries, _ := os.ReadDir(dir)
|
|
if len(entries) != 1 {
|
|
t.Fatal("expected 1 file")
|
|
}
|
|
name := entries[0].Name()
|
|
if !strings.Contains(name, "unknown") {
|
|
t.Errorf("empty EventID should fallback to 'unknown', got: %s", name)
|
|
}
|
|
}
|
|
|
|
// --- stderrLogger ---
|
|
|
|
func TestStderrLogger(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
l := &stderrLogger{w: &buf, quiet: false}
|
|
|
|
l.Debug(context.Background(), "debug msg")
|
|
if buf.Len() != 0 {
|
|
t.Error("Debug should always be suppressed")
|
|
}
|
|
|
|
l.Info(context.Background(), "info msg")
|
|
if !strings.Contains(buf.String(), "info msg") {
|
|
t.Error("Info should print when not quiet")
|
|
}
|
|
buf.Reset()
|
|
|
|
l.Warn(context.Background(), "warn msg")
|
|
if !strings.Contains(buf.String(), "warn msg") {
|
|
t.Error("Warn should always print")
|
|
}
|
|
buf.Reset()
|
|
|
|
l.Error(context.Background(), "error msg")
|
|
if !strings.Contains(buf.String(), "error msg") {
|
|
t.Error("Error should always print")
|
|
}
|
|
}
|
|
|
|
func TestStderrLogger_Quiet(t *testing.T) {
|
|
var buf bytes.Buffer
|
|
l := &stderrLogger{w: &buf, quiet: true}
|
|
|
|
l.Info(context.Background(), "info msg")
|
|
if buf.Len() != 0 {
|
|
t.Error("Info should be suppressed when quiet")
|
|
}
|
|
|
|
l.Warn(context.Background(), "warn msg")
|
|
if !strings.Contains(buf.String(), "warn msg") {
|
|
t.Error("Warn should print even when quiet")
|
|
}
|
|
}
|
|
|
|
// --- RegexFilter.String ---
|
|
|
|
func TestRegexFilter_String(t *testing.T) {
|
|
f, _ := NewRegexFilter("im\\..*")
|
|
if f.String() != "im\\..*" {
|
|
t.Errorf("String() = %v", f.String())
|
|
}
|
|
}
|
|
|
|
// --- WindowStrategy ---
|
|
|
|
func TestWindowStrategy(t *testing.T) {
|
|
im := &ImMessageProcessor{}
|
|
if im.WindowStrategy() != (WindowConfig{}) {
|
|
t.Error("should return zero WindowConfig")
|
|
}
|
|
gen := &GenericProcessor{}
|
|
if gen.WindowStrategy() != (WindowConfig{}) {
|
|
t.Error("should return zero WindowConfig")
|
|
}
|
|
}
|
|
|
|
// --- Shortcuts ---
|
|
|
|
func TestShortcuts(t *testing.T) {
|
|
s := Shortcuts()
|
|
if len(s) == 0 {
|
|
t.Fatal("should return at least one shortcut")
|
|
}
|
|
if s[0].Command != "+subscribe" {
|
|
t.Errorf("first shortcut command = %q", s[0].Command)
|
|
}
|
|
}
|
|
|
|
// --- Compact unmarshal error fallback ---
|
|
|
|
func TestImMessageProcessor_CompactUnmarshalError(t *testing.T) {
|
|
p := &ImMessageProcessor{}
|
|
raw := makeRawEvent("im.message.receive_v1", `not valid json`)
|
|
result, ok := p.Transform(context.Background(), raw, TransformCompact).(*RawEvent)
|
|
if !ok {
|
|
t.Fatal("unmarshal error should fallback to *RawEvent")
|
|
}
|
|
if result.Header.EventType != "im.message.receive_v1" {
|
|
t.Errorf("EventType = %v", result.Header.EventType)
|
|
}
|
|
}
|
|
|
|
func TestImMessageProcessor_CompactInteractiveFallsBackToRaw(t *testing.T) {
|
|
p := &ImMessageProcessor{}
|
|
raw := makeRawEvent("im.message.receive_v1", `{
|
|
"message": {
|
|
"message_id": "om_interactive",
|
|
"message_type": "interactive",
|
|
"content": "{\"type\":\"template\"}"
|
|
}
|
|
}`)
|
|
|
|
origStderr := os.Stderr
|
|
r, w, err := os.Pipe()
|
|
if err != nil {
|
|
t.Fatalf("os.Pipe() error = %v", err)
|
|
}
|
|
os.Stderr = w
|
|
defer func() {
|
|
os.Stderr = origStderr
|
|
}()
|
|
|
|
result, ok := p.Transform(context.Background(), raw, TransformCompact).(*RawEvent)
|
|
if err := w.Close(); err != nil {
|
|
t.Fatalf("stderr close error = %v", err)
|
|
}
|
|
hint, readErr := io.ReadAll(r)
|
|
if readErr != nil {
|
|
t.Fatalf("ReadAll(stderr) error = %v", readErr)
|
|
}
|
|
if !ok {
|
|
t.Fatal("interactive compact conversion should fallback to *RawEvent")
|
|
}
|
|
if result != raw {
|
|
t.Fatal("interactive compact conversion should return the original raw event")
|
|
}
|
|
if !strings.Contains(string(hint), "interactive") || !strings.Contains(string(hint), "returning raw event data") {
|
|
t.Fatalf("stderr hint = %q, want interactive fallback message", string(hint))
|
|
}
|
|
}
|
|
|
|
func TestGenericProcessor_CompactUnmarshalError(t *testing.T) {
|
|
p := &GenericProcessor{}
|
|
raw := makeRawEvent("some.type", `not valid json`)
|
|
result, ok := p.Transform(context.Background(), raw, TransformCompact).(*RawEvent)
|
|
if !ok {
|
|
t.Fatal("unmarshal error should fallback to *RawEvent")
|
|
}
|
|
if result.Header.EventType != "some.type" {
|
|
t.Errorf("EventType = %v", result.Header.EventType)
|
|
}
|
|
}
|
|
|
|
// --- Router ---
|
|
|
|
func TestParseRoutes(t *testing.T) {
|
|
routes, err := ParseRoutes([]string{
|
|
`^im\.message=dir:./messages/`,
|
|
`^contact\.=dir:./contacts/`,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if routes == nil {
|
|
t.Fatal("expected non-nil router")
|
|
}
|
|
if len(routes.routes) != 2 {
|
|
t.Errorf("expected 2 routes, got %d", len(routes.routes))
|
|
}
|
|
}
|
|
|
|
func TestParseRoutes_Empty(t *testing.T) {
|
|
routes, err := ParseRoutes(nil)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
if routes != nil {
|
|
t.Error("expected nil router for empty input")
|
|
}
|
|
|
|
routes2, err2 := ParseRoutes([]string{})
|
|
if err2 != nil {
|
|
t.Fatalf("unexpected error: %v", err2)
|
|
}
|
|
if routes2 != nil {
|
|
t.Error("expected nil router for empty slice")
|
|
}
|
|
}
|
|
|
|
func TestParseRoutes_MissingEquals(t *testing.T) {
|
|
_, err := ParseRoutes([]string{"no-equals-sign"})
|
|
if err == nil {
|
|
t.Error("expected error for missing =")
|
|
}
|
|
}
|
|
|
|
func TestParseRoutes_InvalidRegex(t *testing.T) {
|
|
_, err := ParseRoutes([]string{"[invalid=dir:./foo/"})
|
|
if err == nil {
|
|
t.Error("expected error for invalid regex")
|
|
}
|
|
}
|
|
|
|
func TestParseRoutes_MissingPrefix(t *testing.T) {
|
|
_, err := ParseRoutes([]string{`^im\.message=./messages/`})
|
|
if err == nil {
|
|
t.Error("expected error for missing dir: prefix")
|
|
}
|
|
if !strings.Contains(err.Error(), "dir:") {
|
|
t.Errorf("error should mention dir: prefix, got: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestParseRoutes_EmptyPath(t *testing.T) {
|
|
_, err := ParseRoutes([]string{`^im\.message=dir:`})
|
|
if err == nil {
|
|
t.Error("expected error for empty path")
|
|
}
|
|
}
|
|
|
|
func TestParseRoutes_RejectsAbsolutePath(t *testing.T) {
|
|
_, err := ParseRoutes([]string{`^test=dir:/tmp/evil`})
|
|
if err == nil {
|
|
t.Error("expected error for absolute path in route")
|
|
}
|
|
}
|
|
|
|
func TestParseRoutes_RejectsTraversal(t *testing.T) {
|
|
_, err := ParseRoutes([]string{`^test=dir:../../etc/evil`})
|
|
if err == nil {
|
|
t.Error("expected error for path traversal in route")
|
|
}
|
|
}
|
|
|
|
func TestParseRoutes_PathSafety(t *testing.T) {
|
|
routes, err := ParseRoutes([]string{`^test=dir:./foo/../bar/`})
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %v", err)
|
|
}
|
|
dir := routes.routes[0].dir
|
|
if !filepath.IsAbs(dir) {
|
|
t.Errorf("expected absolute path, got %s", dir)
|
|
}
|
|
if strings.Contains(dir, "..") {
|
|
t.Errorf("expected cleaned path without .., got %s", dir)
|
|
}
|
|
}
|
|
|
|
func TestEventRouter_Match(t *testing.T) {
|
|
chdirTemp(t)
|
|
|
|
router, err := ParseRoutes([]string{
|
|
`^im\.message=dir:./test_messages`,
|
|
`^contact\.=dir:./test_contacts`,
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Single match
|
|
dirs := router.Match("im.message.receive_v1")
|
|
if len(dirs) != 1 {
|
|
t.Errorf("expected 1 match, got %v", dirs)
|
|
}
|
|
|
|
dirs = router.Match("contact.user.created_v3")
|
|
if len(dirs) != 1 {
|
|
t.Errorf("expected 1 match, got %v", dirs)
|
|
}
|
|
|
|
// No match
|
|
dirs = router.Match("drive.file.edit_v1")
|
|
if len(dirs) != 0 {
|
|
t.Errorf("expected no match, got %v", dirs)
|
|
}
|
|
}
|
|
|
|
func TestEventRouter_Match_FanOut(t *testing.T) {
|
|
chdirTemp(t)
|
|
|
|
router, err := ParseRoutes([]string{
|
|
`^im\.=dir:./test_im`,
|
|
`message=dir:./test_msg`,
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// "im.message.receive_v1" matches both patterns
|
|
dirs := router.Match("im.message.receive_v1")
|
|
if len(dirs) != 2 {
|
|
t.Errorf("expected 2 matches (fan-out), got %d: %v", len(dirs), dirs)
|
|
}
|
|
}
|
|
|
|
// --- Pipeline: Route ---
|
|
|
|
func TestPipeline_Route(t *testing.T) {
|
|
chdirTemp(t)
|
|
router, err := ParseRoutes([]string{
|
|
`^im\.message=dir:./route_out`,
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
dir := router.routes[0].dir
|
|
|
|
filters := NewFilterChain()
|
|
var out, errOut bytes.Buffer
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{Mode: TransformCompact, Router: router}, &out, &errOut)
|
|
if err := p.EnsureDirs(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
eventJSON := `{
|
|
"message": {
|
|
"message_id": "msg_route", "chat_id": "oc_001",
|
|
"chat_type": "group", "message_type": "text",
|
|
"content": "{\"text\":\"routed\"}", "create_time": "1700000000"
|
|
},
|
|
"sender": {"sender_id": {"open_id": "ou_001"}}
|
|
}`
|
|
raw := makeRawEvent("im.message.receive_v1", eventJSON)
|
|
raw.Header.EventID = "ev_route"
|
|
raw.Header.CreateTime = "1700000000"
|
|
p.Process(context.Background(), raw)
|
|
|
|
// stdout should be empty — output goes to route dir
|
|
if out.Len() != 0 {
|
|
t.Errorf("routed event should not appear on stdout, got: %s", out.String())
|
|
}
|
|
|
|
// Verify file was created in route dir
|
|
entries, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(entries) != 1 {
|
|
t.Fatalf("expected 1 file in route dir, got %d", len(entries))
|
|
}
|
|
|
|
data, err := os.ReadFile(filepath.Join(dir, entries[0].Name()))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
var m map[string]interface{}
|
|
if err := json.Unmarshal(data, &m); err != nil {
|
|
t.Fatalf("file content is not valid JSON: %v", err)
|
|
}
|
|
if m["type"] != "im.message.receive_v1" {
|
|
t.Errorf("type = %v", m["type"])
|
|
}
|
|
}
|
|
|
|
func TestPipeline_Route_NoMatch(t *testing.T) {
|
|
chdirTemp(t)
|
|
fallbackDir := t.TempDir()
|
|
|
|
router, err := ParseRoutes([]string{
|
|
`^im\.message=dir:./route_dir`,
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
routeDir := router.routes[0].dir
|
|
|
|
filters := NewFilterChain()
|
|
var out, errOut bytes.Buffer
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{Mode: TransformCompact, Router: router, OutputDir: fallbackDir}, &out, &errOut)
|
|
if err := p.EnsureDirs(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Send an event that does NOT match the route
|
|
raw := makeRawEvent("drive.file.edit_v1", `{"file_token":"xxx"}`)
|
|
raw.Header.EventID = "ev_nomatch"
|
|
raw.Header.CreateTime = "1700000000"
|
|
p.Process(context.Background(), raw)
|
|
|
|
// stdout should be empty
|
|
if out.Len() != 0 {
|
|
t.Errorf("should not appear on stdout, got: %s", out.String())
|
|
}
|
|
|
|
// Route dir should be empty
|
|
routeEntries, _ := os.ReadDir(routeDir)
|
|
if len(routeEntries) != 0 {
|
|
t.Errorf("route dir should be empty, got %d files", len(routeEntries))
|
|
}
|
|
|
|
// Fallback dir should have the file
|
|
fallbackEntries, _ := os.ReadDir(fallbackDir)
|
|
if len(fallbackEntries) != 1 {
|
|
t.Fatalf("fallback dir should have 1 file, got %d", len(fallbackEntries))
|
|
}
|
|
}
|
|
|
|
func TestPipeline_Route_NoMatch_Stdout(t *testing.T) {
|
|
chdirTemp(t)
|
|
|
|
router, err := ParseRoutes([]string{
|
|
`^im\.message=dir:./route_dir`,
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
routeDir := router.routes[0].dir
|
|
|
|
filters := NewFilterChain()
|
|
var out, errOut bytes.Buffer
|
|
// No OutputDir — unmatched events should go to stdout
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{Mode: TransformRaw, Router: router}, &out, &errOut)
|
|
if err := p.EnsureDirs(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
raw := makeRawEvent("drive.file.edit_v1", `{"file_token":"xxx"}`)
|
|
raw.Header.EventID = "ev_stdout"
|
|
raw.Header.CreateTime = "1700000000"
|
|
p.Process(context.Background(), raw)
|
|
|
|
// Route dir should be empty
|
|
routeEntries, _ := os.ReadDir(routeDir)
|
|
if len(routeEntries) != 0 {
|
|
t.Errorf("route dir should be empty, got %d files", len(routeEntries))
|
|
}
|
|
|
|
// stdout should have the event
|
|
if out.Len() == 0 {
|
|
t.Error("unmatched event should fall through to stdout")
|
|
}
|
|
var m map[string]interface{}
|
|
if err := json.Unmarshal(out.Bytes(), &m); err != nil {
|
|
t.Fatalf("stdout is not valid JSON: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestPipeline_Route_FanOut(t *testing.T) {
|
|
chdirTemp(t)
|
|
|
|
router, err := ParseRoutes([]string{
|
|
`^im\.=dir:./fanout1`,
|
|
`message=dir:./fanout2`,
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
dir1 := router.routes[0].dir
|
|
dir2 := router.routes[1].dir
|
|
|
|
filters := NewFilterChain()
|
|
var out, errOut bytes.Buffer
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{Mode: TransformCompact, Router: router}, &out, &errOut)
|
|
if err := p.EnsureDirs(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
eventJSON := `{
|
|
"message": {
|
|
"message_id": "msg_fanout", "chat_id": "oc_001",
|
|
"chat_type": "group", "message_type": "text",
|
|
"content": "{\"text\":\"fanout\"}", "create_time": "1700000000"
|
|
},
|
|
"sender": {"sender_id": {"open_id": "ou_001"}}
|
|
}`
|
|
raw := makeRawEvent("im.message.receive_v1", eventJSON)
|
|
raw.Header.EventID = "ev_fanout"
|
|
raw.Header.CreateTime = "1700000000"
|
|
p.Process(context.Background(), raw)
|
|
|
|
// stdout should be empty
|
|
if out.Len() != 0 {
|
|
t.Errorf("fan-out event should not appear on stdout, got: %s", out.String())
|
|
}
|
|
|
|
// Both dirs should have a file
|
|
entries1, _ := os.ReadDir(dir1)
|
|
entries2, _ := os.ReadDir(dir2)
|
|
if len(entries1) != 1 {
|
|
t.Errorf("dir1 should have 1 file, got %d", len(entries1))
|
|
}
|
|
if len(entries2) != 1 {
|
|
t.Errorf("dir2 should have 1 file, got %d", len(entries2))
|
|
}
|
|
}
|
|
|
|
// --- cleanupSeen ---
|
|
|
|
func TestCleanupSeen(t *testing.T) {
|
|
filters := NewFilterChain()
|
|
var out, errOut bytes.Buffer
|
|
p := NewEventPipeline(DefaultRegistry(), filters,
|
|
PipelineConfig{Mode: TransformRaw}, &out, &errOut)
|
|
|
|
// Insert an expired entry directly
|
|
p.seen.Store("old_key", time.Now().Add(-10*time.Minute))
|
|
p.seen.Store("fresh_key", time.Now())
|
|
|
|
p.cleanupSeen(time.Now())
|
|
|
|
if _, ok := p.seen.Load("old_key"); ok {
|
|
t.Error("expired key should be cleaned up")
|
|
}
|
|
if _, ok := p.seen.Load("fresh_key"); !ok {
|
|
t.Error("fresh key should be kept")
|
|
}
|
|
}
|