mirror of
https://github.com/chenhg5/cc-connect.git
synced 2026-07-03 12:28:10 +08:00
393 lines
11 KiB
Go
393 lines
11 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const relayTimeout = 120 * time.Second
|
|
|
|
const (
|
|
RelayVisibilityFull = "full"
|
|
RelayVisibilitySummary = "summary"
|
|
RelayVisibilityNone = "none"
|
|
)
|
|
|
|
// RelayBinding represents a bot-to-bot relay binding in a group chat.
|
|
type RelayBinding struct {
|
|
Platform string `json:"platform"`
|
|
ChatID string `json:"chat_id"`
|
|
Bots map[string]string `json:"bots"` // project name → bot display name
|
|
}
|
|
|
|
// RelayManager coordinates bot-to-bot message relay across engines.
|
|
type RelayManager struct {
|
|
mu sync.RWMutex
|
|
engines map[string]*Engine // project name → engine (runtime only)
|
|
bindings map[string]*RelayBinding // chatID → binding
|
|
storePath string // empty = no persistence
|
|
timeout time.Duration
|
|
visibility string
|
|
}
|
|
|
|
func NewRelayManager(dataDir string) *RelayManager {
|
|
rm := &RelayManager{
|
|
engines: make(map[string]*Engine),
|
|
bindings: make(map[string]*RelayBinding),
|
|
timeout: relayTimeout,
|
|
visibility: RelayVisibilityFull,
|
|
}
|
|
if dataDir != "" {
|
|
rm.storePath = filepath.Join(dataDir, "relay_bindings.json")
|
|
rm.load()
|
|
}
|
|
return rm
|
|
}
|
|
|
|
func (rm *RelayManager) RegisterEngine(name string, e *Engine) {
|
|
rm.mu.Lock()
|
|
defer rm.mu.Unlock()
|
|
rm.engines[name] = e
|
|
}
|
|
|
|
// SetTimeout overrides the relay response timeout. Set to 0 to disable it.
|
|
func (rm *RelayManager) SetTimeout(d time.Duration) {
|
|
rm.mu.Lock()
|
|
defer rm.mu.Unlock()
|
|
if d < 0 {
|
|
d = 0
|
|
}
|
|
rm.timeout = d
|
|
}
|
|
|
|
// SetVisibility controls whether relay request/response visibility messages are
|
|
// echoed into the source group chat. The relay transport still returns the
|
|
// target response to the caller regardless of this setting.
|
|
func (rm *RelayManager) SetVisibility(mode string) {
|
|
rm.mu.Lock()
|
|
defer rm.mu.Unlock()
|
|
rm.visibility = normalizeRelayVisibility(mode)
|
|
}
|
|
|
|
// Bind establishes a relay binding between bots in a group chat.
|
|
// If a binding already exists, it will be replaced.
|
|
func (rm *RelayManager) Bind(platform, chatID string, bots map[string]string) {
|
|
rm.mu.Lock()
|
|
defer rm.mu.Unlock()
|
|
rm.bindings[chatID] = &RelayBinding{
|
|
Platform: platform,
|
|
ChatID: chatID,
|
|
Bots: bots,
|
|
}
|
|
slog.Info("relay: binding created", "chat_id", chatID, "bots", bots)
|
|
rm.saveLocked()
|
|
}
|
|
|
|
// AddToBind adds a project to an existing binding, or creates a new one.
|
|
func (rm *RelayManager) AddToBind(platform, chatID, projectName string) {
|
|
rm.mu.Lock()
|
|
defer rm.mu.Unlock()
|
|
|
|
binding := rm.bindings[chatID]
|
|
if binding == nil {
|
|
binding = &RelayBinding{
|
|
Platform: platform,
|
|
ChatID: chatID,
|
|
Bots: make(map[string]string),
|
|
}
|
|
rm.bindings[chatID] = binding
|
|
}
|
|
|
|
binding.Bots[projectName] = projectName
|
|
slog.Info("relay: project added to binding", "chat_id", chatID, "project", projectName, "bots", binding.Bots)
|
|
rm.saveLocked()
|
|
}
|
|
|
|
// RemoveFromBind removes a project from an existing binding.
|
|
// Returns true if the project was removed, false if not found.
|
|
func (rm *RelayManager) RemoveFromBind(chatID, projectName string) bool {
|
|
rm.mu.Lock()
|
|
defer rm.mu.Unlock()
|
|
|
|
binding := rm.bindings[chatID]
|
|
if binding == nil {
|
|
return false
|
|
}
|
|
|
|
if _, exists := binding.Bots[projectName]; exists {
|
|
delete(binding.Bots, projectName)
|
|
slog.Info("relay: project removed from binding", "chat_id", chatID, "project", projectName, "remaining", binding.Bots)
|
|
|
|
if len(binding.Bots) == 0 {
|
|
delete(rm.bindings, chatID)
|
|
slog.Info("relay: binding removed (no bots left)", "chat_id", chatID)
|
|
}
|
|
rm.saveLocked()
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// GetBinding returns the binding for a chat, or nil if none.
|
|
func (rm *RelayManager) GetBinding(chatID string) *RelayBinding {
|
|
rm.mu.RLock()
|
|
defer rm.mu.RUnlock()
|
|
return rm.bindings[chatID]
|
|
}
|
|
|
|
// Unbind removes the relay binding for a chat.
|
|
func (rm *RelayManager) Unbind(chatID string) {
|
|
rm.mu.Lock()
|
|
defer rm.mu.Unlock()
|
|
delete(rm.bindings, chatID)
|
|
slog.Info("relay: binding removed", "chat_id", chatID)
|
|
rm.saveLocked()
|
|
}
|
|
|
|
// HasEngine checks if a project engine is registered.
|
|
func (rm *RelayManager) HasEngine(name string) bool {
|
|
rm.mu.RLock()
|
|
defer rm.mu.RUnlock()
|
|
_, ok := rm.engines[name]
|
|
return ok
|
|
}
|
|
|
|
// ListEngineNames returns all registered engine names.
|
|
func (rm *RelayManager) ListEngineNames() []string {
|
|
rm.mu.RLock()
|
|
defer rm.mu.RUnlock()
|
|
names := make([]string, 0, len(rm.engines))
|
|
for n := range rm.engines {
|
|
names = append(names, n)
|
|
}
|
|
return names
|
|
}
|
|
|
|
// ListBoundBots returns the other bots bound in the same chat as the given project.
|
|
func (rm *RelayManager) ListBoundBots(chatID, selfProject string) map[string]string {
|
|
rm.mu.RLock()
|
|
defer rm.mu.RUnlock()
|
|
|
|
b := rm.bindings[chatID]
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
others := make(map[string]string)
|
|
for proj, name := range b.Bots {
|
|
if proj != selfProject {
|
|
others[proj] = name
|
|
}
|
|
}
|
|
return others
|
|
}
|
|
|
|
// RelayRequest is the payload for a relay send.
|
|
type RelayRequest struct {
|
|
From string `json:"from"` // source project name
|
|
To string `json:"to"` // target project name
|
|
SessionKey string `json:"session_key"` // source session key (contains platform + chatID)
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
// RelayResponse is the result of a relay send.
|
|
type RelayResponse struct {
|
|
Response string `json:"response"`
|
|
}
|
|
|
|
// Send delivers a message from one bot to another and returns the response.
|
|
func (rm *RelayManager) Send(ctx context.Context, req RelayRequest) (*RelayResponse, error) {
|
|
platform, chatID, err := parseSessionKeyParts(req.SessionKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("relay: invalid session key: %w", err)
|
|
}
|
|
|
|
rm.mu.RLock()
|
|
binding := rm.bindings[chatID]
|
|
targetEngine := rm.engines[req.To]
|
|
sourceEngine := rm.engines[req.From]
|
|
visibility := rm.visibility
|
|
rm.mu.RUnlock()
|
|
|
|
if binding == nil {
|
|
return nil, fmt.Errorf("relay: no binding for this chat. Use /bind <project> first")
|
|
}
|
|
if _, ok := binding.Bots[req.To]; !ok {
|
|
var bound []string
|
|
for proj := range binding.Bots {
|
|
if proj != req.From {
|
|
bound = append(bound, proj)
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("relay: project %q is not bound in this chat. Available targets: %s (use the exact name)", req.To, strings.Join(bound, ", "))
|
|
}
|
|
if targetEngine == nil {
|
|
return nil, fmt.Errorf("relay: target engine %q not found (is the project running?)", req.To)
|
|
}
|
|
|
|
fromName := req.From
|
|
if binding.Bots[req.From] != "" {
|
|
fromName = binding.Bots[req.From]
|
|
}
|
|
toName := req.To
|
|
if binding.Bots[req.To] != "" {
|
|
toName = binding.Bots[req.To]
|
|
}
|
|
|
|
// Post the forwarded message to the group chat for visibility.
|
|
groupSessionKey := platform + ":" + chatID + ":relay"
|
|
if sourceEngine != nil && visibility != RelayVisibilityNone {
|
|
label := relayVisibilityRequestLabel(visibility, fromName, toName, req.Message)
|
|
rm.sendToGroup(ctx, sourceEngine, platform, groupSessionKey, label)
|
|
}
|
|
|
|
// Execute relay: inject message into target engine and collect response
|
|
relayCtx, cancel := rm.relayContext(ctx)
|
|
defer cancel()
|
|
|
|
response, err := targetEngine.HandleRelay(relayCtx, req.From, req.SessionKey, req.Message)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("relay: %w", err)
|
|
}
|
|
|
|
// Post the response to the group chat for visibility.
|
|
if targetEngine != nil && visibility != RelayVisibilityNone {
|
|
label := relayVisibilityResponseLabel(visibility, toName, response)
|
|
rm.sendToGroup(ctx, targetEngine, platform, groupSessionKey, label)
|
|
}
|
|
|
|
return &RelayResponse{Response: response}, nil
|
|
}
|
|
|
|
// sendToGroup sends a message to the group chat for visibility.
|
|
func (rm *RelayManager) sendToGroup(ctx context.Context, e *Engine, platform, sessionKey, content string) {
|
|
for _, p := range e.platforms {
|
|
if p.Name() != platform {
|
|
continue
|
|
}
|
|
rc, ok := p.(ReplyContextReconstructor)
|
|
if !ok {
|
|
continue
|
|
}
|
|
rctx, err := rc.ReconstructReplyCtx(sessionKey)
|
|
if err != nil {
|
|
slog.Debug("relay: failed to reconstruct reply ctx", "error", err)
|
|
continue
|
|
}
|
|
if err := p.Send(ctx, rctx, content); err != nil {
|
|
slog.Debug("relay: failed to send group message", "error", err)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
func truncateRelay(s string, maxLen int) string {
|
|
runes := []rune(s)
|
|
if len(runes) <= maxLen {
|
|
return s
|
|
}
|
|
return string(runes[:maxLen]) + "…"
|
|
}
|
|
|
|
func normalizeRelayVisibility(mode string) string {
|
|
switch strings.ToLower(strings.TrimSpace(mode)) {
|
|
case RelayVisibilityNone:
|
|
return RelayVisibilityNone
|
|
case RelayVisibilitySummary:
|
|
return RelayVisibilitySummary
|
|
case "", RelayVisibilityFull:
|
|
return RelayVisibilityFull
|
|
default:
|
|
slog.Warn("relay: unknown visibility mode, falling back to full", "mode", mode,
|
|
"valid_values", []string{RelayVisibilityNone, RelayVisibilitySummary, RelayVisibilityFull})
|
|
return RelayVisibilityFull
|
|
}
|
|
}
|
|
|
|
func relayVisibilityRequestLabel(mode, fromName, toName, message string) string {
|
|
if normalizeRelayVisibility(mode) == RelayVisibilitySummary {
|
|
return fmt.Sprintf("[%s → %s] relay request sent", fromName, toName)
|
|
}
|
|
return fmt.Sprintf("[%s → %s] %s", fromName, toName, message)
|
|
}
|
|
|
|
func relayVisibilityResponseLabel(mode, toName, response string) string {
|
|
if normalizeRelayVisibility(mode) == RelayVisibilitySummary {
|
|
return fmt.Sprintf("[%s] relay response ready (%d chars)", toName, len([]rune(response)))
|
|
}
|
|
return fmt.Sprintf("[%s] %s", toName, truncateRelay(response, 2000))
|
|
}
|
|
|
|
func (rm *RelayManager) relayContext(ctx context.Context) (context.Context, context.CancelFunc) {
|
|
rm.mu.RLock()
|
|
timeout := rm.timeout
|
|
rm.mu.RUnlock()
|
|
if timeout <= 0 {
|
|
return ctx, func() {}
|
|
}
|
|
return context.WithTimeout(ctx, timeout)
|
|
}
|
|
|
|
func parseSessionKeyParts(sessionKey string) (platform, chatID string, err error) {
|
|
// Format: "platform:chatID:userID"
|
|
// Relay session format: "relay:sourceProject:chatID"
|
|
parts := strings.SplitN(sessionKey, ":", 3)
|
|
if len(parts) < 2 {
|
|
return "", "", fmt.Errorf("invalid session key format: %q", sessionKey)
|
|
}
|
|
if parts[0] == "relay" && len(parts) == 3 {
|
|
// For relay sessions, chatID is the third part: "relay:sourceProject:chatID"
|
|
return parts[0], parts[2], nil
|
|
}
|
|
return parts[0], parts[1], nil
|
|
}
|
|
|
|
// ── Persistence ─────────────────────────────────────────────
|
|
|
|
// saveLocked persists bindings to disk. Caller must hold rm.mu (read or write).
|
|
func (rm *RelayManager) saveLocked() {
|
|
if rm.storePath == "" {
|
|
return
|
|
}
|
|
data, err := json.MarshalIndent(rm.bindings, "", " ")
|
|
if err != nil {
|
|
slog.Error("relay: failed to marshal bindings", "error", err)
|
|
return
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(rm.storePath), 0o755); err != nil {
|
|
slog.Error("relay: failed to create dir", "error", err)
|
|
return
|
|
}
|
|
if err := AtomicWriteFile(rm.storePath, data, 0o644); err != nil {
|
|
slog.Error("relay: failed to write bindings", "path", rm.storePath, "error", err)
|
|
}
|
|
}
|
|
|
|
func (rm *RelayManager) load() {
|
|
if rm.storePath == "" {
|
|
return
|
|
}
|
|
data, err := os.ReadFile(rm.storePath)
|
|
if err != nil {
|
|
if !os.IsNotExist(err) {
|
|
slog.Error("relay: failed to read bindings", "path", rm.storePath, "error", err)
|
|
}
|
|
return
|
|
}
|
|
var bindings map[string]*RelayBinding
|
|
if err := json.Unmarshal(data, &bindings); err != nil {
|
|
slog.Error("relay: failed to unmarshal bindings", "path", rm.storePath, "error", err)
|
|
return
|
|
}
|
|
if bindings != nil {
|
|
rm.bindings = bindings
|
|
slog.Info("relay: loaded bindings", "count", len(bindings))
|
|
}
|
|
}
|