feat: add Cisco Webex platform adapter (#1402)

* docs: add Webex platform adapter design spec

* docs: add Webex platform implementation plan

* feat(webex): add API types and REST client

* feat(webex): add Platform struct, New, and allowlist parsing

* feat(webex): add message gating and mention stripping

* feat(webex): build core.Message with attachment handling

* feat(webex): add Reply, Send, chunking, and file senders

* feat(webex): add Start/Stop and reconnecting WebSocket loop

* feat(webex): wire platform into build, Makefile, and config example

* test(webex): add interface conformance checks; docs: add no_webex tag

* fix(webex): address review — selfID race, allow_from wildcard, 429/401, token redaction, rune-safe chunking, goroutine + device cleanup

* chore(webex): remove internal design/plan docs from PR branch

* fix(webex): parse Mercury conversation.activity frames and fetch decrypted messages

* fix(webex): accept 'share' verb for file/image uploads, ignore re-notifications

* fix(webex): check Close() return values to satisfy errcheck lint

---------

Co-authored-by: Bryant Barzola <bryant.barzola@gmail.com>
This commit is contained in:
Bryant
2026-06-21 21:24:29 -04:00
committed by GitHub
parent 66a6353c55
commit cb0e2fa172
10 changed files with 1216 additions and 2 deletions

View File

@@ -205,7 +205,7 @@ go build -tags 'no_discord no_dingtalk no_qq no_qqbot no_line' ./cmd/cc-connect
Available tags: `no_acp`, `no_claudecode`, `no_codex`, `no_cursor`, `no_gemini`,
`no_iflow`, `no_opencode`, `no_qoder`, `no_feishu`, `no_telegram`,
`no_discord`, `no_slack`, `no_dingtalk`, `no_wecom`, `no_weixin`, `no_qq`, `no_qqbot`,
`no_line`, `no_weibo`, `no_matrix`.
`no_line`, `no_weibo`, `no_matrix`, `no_webex`.
## Pre-Commit Checklist

View File

@@ -34,7 +34,7 @@ PLATFORMS := \
# ---------------------------------------------------------------------------
ALL_AGENTS := acp antigravity claudecode codex copilot cursor devin gemini iflow kimi opencode pi qoder tmux
ALL_PLATFORMS := feishu telegram discord slack dingtalk wecom weixin qq qqbot line weibo max matrix
ALL_PLATFORMS := feishu telegram discord slack dingtalk wecom weixin qq qqbot line weibo max matrix webex
ALL_EXTRAS := web
COMMA := ,

View File

@@ -0,0 +1,5 @@
//go:build !no_webex
package main
import _ "github.com/chenhg5/cc-connect/platform/webex"

View File

@@ -1130,6 +1130,19 @@ app_secret = "your-feishu-app-secret"
# group_reply_all = false # respond to all room messages without @mention / 群聊无需 @ 也响应
# proxy = "" # optional HTTP/SOCKS5 proxy / 可选代理
# Webex (Cisco) (uncomment to enable / 取消注释以启用)
# 1. Create a bot at https://developer.webex.com/my-apps/new/bot
# 在 https://developer.webex.com/my-apps/new/bot 创建机器人
# 2. Copy the bot access token below / 复制 access token 到下方
# Connection: WebSocket — no public URL required / 无需公网地址
# [[projects.platforms]]
# type = "webex"
#
# [projects.platforms.options]
# token = "YOUR_WEBEX_BOT_ACCESS_TOKEN" # from developer.webex.com (Bot)
# allow_from = "you@cisco.com" # comma-separated email allowlist / 逗号分隔的邮箱白名单
# MAX messenger (uncomment to enable / 取消注释以启用)
# 1. Create a bot via @MasterBot in MAX, get the access token
# 在 MAX 中通过 @MasterBot 创建机器人,获取 access token

249
platform/webex/client.go Normal file
View File

@@ -0,0 +1,249 @@
package webex
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"strconv"
"strings"
"time"
)
const webexBaseURL = "https://webexapis.com/v1"
// maxRetryAfter caps how long we honor a 429 Retry-After header before retrying.
const maxRetryAfter = 60 * time.Second
// errUnauthorized signals a 401 from the Webex API so callers can stop retrying.
var errUnauthorized = errors.New("webex: unauthorized (401) — check bot token")
// webexClient abstracts the Webex REST API so tests can stub it.
type webexClient interface {
GetMe(ctx context.Context) (*person, error)
CreateDevice(ctx context.Context) (*device, error)
DeleteDevice(ctx context.Context, deviceURL string) error
GetMessage(ctx context.Context, id string) (*message, error)
DownloadFile(ctx context.Context, url string) (*downloadedFile, error)
PostMessage(ctx context.Context, roomID, parentID, markdown string) error
PostFile(ctx context.Context, roomID string, f *downloadedFile) error
}
// httpClient is the real webexClient backed by net/http.
type httpClient struct {
token string
hc *http.Client
baseURL string // Webex REST base; overridable in tests.
}
func newHTTPClient(token string) *httpClient {
return &httpClient{token: token, hc: &http.Client{Timeout: 60 * time.Second}, baseURL: webexBaseURL}
}
// base returns the configured REST base URL, falling back to the default.
func (c *httpClient) base() string {
if c.baseURL != "" {
return c.baseURL
}
return webexBaseURL
}
func (c *httpClient) do(ctx context.Context, method, url string, body []byte, contentType string) (*http.Response, error) {
var rdr io.Reader
if body != nil {
rdr = bytes.NewReader(body)
}
req, err := http.NewRequestWithContext(ctx, method, url, rdr)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.token)
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
return c.hc.Do(req)
}
// doWithRetry wraps do to surface 401 as errUnauthorized and to retry once on
// 429 after honoring the Retry-After header (capped). The returned response (on
// success) has an unread body the caller must close.
func (c *httpClient) doWithRetry(ctx context.Context, method, url string, body []byte, contentType, what string) (*http.Response, error) {
resp, err := c.do(ctx, method, url, body, contentType)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusUnauthorized {
_ = resp.Body.Close()
return nil, fmt.Errorf("%s: %w", what, errUnauthorized)
}
if resp.StatusCode == http.StatusTooManyRequests {
wait := retryAfter(resp.Header.Get("Retry-After"))
_ = resp.Body.Close()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(wait):
}
resp, err = c.do(ctx, method, url, body, contentType)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusUnauthorized {
_ = resp.Body.Close()
return nil, fmt.Errorf("%s: %w", what, errUnauthorized)
}
}
return resp, nil
}
// retryAfter parses a Retry-After header value (in seconds), capping the wait.
func retryAfter(v string) time.Duration {
secs, err := strconv.Atoi(strings.TrimSpace(v))
if err != nil || secs < 0 {
return 0
}
d := time.Duration(secs) * time.Second
if d > maxRetryAfter {
d = maxRetryAfter
}
return d
}
func (c *httpClient) GetMe(ctx context.Context) (*person, error) {
resp, err := c.doWithRetry(ctx, http.MethodGet, c.base()+"/people/me", nil, "", "webex: getMe")
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("webex: getMe status %d", resp.StatusCode)
}
var p person
if err := json.NewDecoder(resp.Body).Decode(&p); err != nil {
return nil, err
}
return &p, nil
}
func (c *httpClient) CreateDevice(ctx context.Context) (*device, error) {
// The WDM device endpoint requires name + model + localizedModel; omitting
// model returns HTTP 400 "Missing Model". Verified against the live API.
payload := []byte(`{"deviceName":"cc-connect","name":"cc-connect","model":"cc-connect","localizedModel":"cc-connect","systemName":"cc-connect","systemVersion":"1.0","deviceType":"DESKTOP"}`)
resp, err := c.doWithRetry(ctx, http.MethodPost, "https://wdm-a.wbx2.com/wdm/api/v1/devices", payload, "application/json", "webex: createDevice")
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("webex: createDevice status %d", resp.StatusCode)
}
var d device
if err := json.NewDecoder(resp.Body).Decode(&d); err != nil {
return nil, err
}
return &d, nil
}
func (c *httpClient) DeleteDevice(ctx context.Context, deviceURL string) error {
if deviceURL == "" {
return nil
}
resp, err := c.doWithRetry(ctx, http.MethodDelete, deviceURL, nil, "", "webex: deleteDevice")
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("webex: deleteDevice status %d", resp.StatusCode)
}
return nil
}
func (c *httpClient) GetMessage(ctx context.Context, id string) (*message, error) {
resp, err := c.doWithRetry(ctx, http.MethodGet, c.base()+"/messages/"+id, nil, "", "webex: getMessage")
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("webex: getMessage status %d", resp.StatusCode)
}
var m message
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
return nil, err
}
return &m, nil
}
func (c *httpClient) DownloadFile(ctx context.Context, url string) (*downloadedFile, error) {
resp, err := c.doWithRetry(ctx, http.MethodGet, url, nil, "", "webex: downloadFile")
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("webex: downloadFile status %d", resp.StatusCode)
}
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
f := &downloadedFile{Data: data, MimeType: resp.Header.Get("Content-Type")}
if cd := resp.Header.Get("Content-Disposition"); cd != "" {
if _, params, err := mime.ParseMediaType(cd); err == nil {
f.FileName = params["filename"]
}
}
return f, nil
}
func (c *httpClient) PostMessage(ctx context.Context, roomID, parentID, markdown string) error {
body := map[string]string{"roomId": roomID, "markdown": markdown}
if parentID != "" {
body["parentId"] = parentID
}
buf, _ := json.Marshal(body)
resp, err := c.doWithRetry(ctx, http.MethodPost, c.base()+"/messages", buf, "application/json", "webex: postMessage")
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("webex: postMessage status %d", resp.StatusCode)
}
return nil
}
func (c *httpClient) PostFile(ctx context.Context, roomID string, f *downloadedFile) error {
var buf bytes.Buffer
w := multipart.NewWriter(&buf)
_ = w.WriteField("roomId", roomID)
name := f.FileName
if name == "" {
name = "attachment"
}
part, err := w.CreateFormFile("files", name)
if err != nil {
return err
}
if _, err := part.Write(f.Data); err != nil {
return err
}
if err := w.Close(); err != nil {
return err
}
resp, err := c.doWithRetry(ctx, http.MethodPost, c.base()+"/messages", buf.Bytes(), w.FormDataContentType(), "webex: postFile")
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("webex: postFile status %d", resp.StatusCode)
}
return nil
}

View File

@@ -0,0 +1,50 @@
package webex
import (
"context"
"errors"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
)
func TestGetMessageUnauthorized(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
}))
defer srv.Close()
c := &httpClient{token: "t", hc: srv.Client(), baseURL: srv.URL}
_, err := c.GetMessage(context.Background(), "abc")
if !errors.Is(err, errUnauthorized) {
t.Fatalf("expected errUnauthorized, got %v", err)
}
}
func TestGetMessageRetriesOn429(t *testing.T) {
var calls int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
n := atomic.AddInt32(&calls, 1)
if n == 1 {
w.Header().Set("Retry-After", "0")
w.WriteHeader(http.StatusTooManyRequests)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"id":"abc","roomId":"r","roomType":"direct"}`))
}))
defer srv.Close()
c := &httpClient{token: "t", hc: srv.Client(), baseURL: srv.URL}
m, err := c.GetMessage(context.Background(), "abc")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if m.ID != "abc" {
t.Fatalf("message ID = %q", m.ID)
}
if got := atomic.LoadInt32(&calls); got != 2 {
t.Fatalf("expected 2 calls (1 retry), got %d", got)
}
}

51
platform/webex/types.go Normal file
View File

@@ -0,0 +1,51 @@
package webex
// person is the subset of GET /v1/people/me we use.
type person struct {
ID string `json:"id"`
Emails []string `json:"emails"`
DisplayName string `json:"displayName"`
}
// device is the subset of POST /v1/devices response we use.
type device struct {
URL string `json:"url"` // for DELETE on shutdown
WebSocketURL string `json:"webSocketUrl"` // wss:// endpoint
}
// message is the subset of GET /v1/messages/{id} we use.
type message struct {
ID string `json:"id"`
RoomID string `json:"roomId"`
RoomType string `json:"roomType"` // "direct" | "group"
Text string `json:"text"`
Markdown string `json:"markdown"`
PersonID string `json:"personId"`
PersonEmail string `json:"personEmail"`
MentionedPeople []string `json:"mentionedPeople"`
Files []string `json:"files"`
}
// wsEvent is the Mercury "conversation.activity" envelope delivered over the
// Webex Device WebSocket. The message body in the frame is end-to-end
// encrypted, so we use the activity ID to fetch the decrypted message via REST.
type wsEvent struct {
Data struct {
EventType string `json:"eventType"`
Activity struct {
ID string `json:"id"`
Verb string `json:"verb"`
Actor struct {
ID string `json:"id"`
EmailAddress string `json:"emailAddress"`
} `json:"actor"`
} `json:"activity"`
} `json:"data"`
}
// downloadedFile is a fetched attachment with metadata.
type downloadedFile struct {
Data []byte
MimeType string
FileName string
}

402
platform/webex/webex.go Normal file
View File

@@ -0,0 +1,402 @@
package webex
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log/slog"
"regexp"
"strings"
"sync"
"time"
"github.com/chenhg5/cc-connect/core"
"github.com/gorilla/websocket"
)
func init() {
core.RegisterPlatform("webex", New)
}
// replyContext carries what Reply/Send need to target a Webex room.
type replyContext struct {
roomID string
messageID string
personID string
}
// Platform is the Webex adapter implementing core.Platform.
type Platform struct {
token string
allowFrom []string // lowercased email allowlist; empty = allow all
client webexClient
mu sync.RWMutex
handler core.MessageHandler
lifecycleHandler core.PlatformLifecycleHandler
cancel context.CancelFunc
stopping bool
selfID string // bot's own personId
selfEmail string // bot's own email (Mercury actor uses email)
deviceURL string // for cleanup on Stop()
}
// New constructs a Webex platform from config options.
func New(opts map[string]any) (core.Platform, error) {
token, _ := opts["token"].(string)
if strings.TrimSpace(token) == "" {
return nil, fmt.Errorf("webex: token is required")
}
rawAllow, _ := opts["allow_from"].(string)
core.CheckAllowFrom("webex", rawAllow)
return &Platform{
token: token,
allowFrom: parseAllowFrom(rawAllow),
client: newHTTPClient(token),
}, nil
}
func (p *Platform) Name() string { return "webex" }
// self returns the bot's own personId under lock.
func (p *Platform) self() string {
p.mu.RLock()
defer p.mu.RUnlock()
return p.selfID
}
// selfEmailAddr returns the bot's own email under lock.
func (p *Platform) selfEmailAddr() string {
p.mu.RLock()
defer p.mu.RUnlock()
return p.selfEmail
}
// parseAllowFrom splits and lowercases a comma-separated email list.
func parseAllowFrom(raw string) []string {
raw = strings.TrimSpace(raw)
if raw == "" {
return nil
}
var out []string
for _, e := range strings.Split(raw, ",") {
if e = strings.TrimSpace(e); e != "" {
out = append(out, strings.ToLower(e))
}
}
return out
}
// isAllowed reports whether an email may use the bot.
// Empty allowlist permits everyone (a startup warning was already logged).
func (p *Platform) isAllowed(email string) bool {
if len(p.allowFrom) == 0 {
return true
}
email = strings.ToLower(strings.TrimSpace(email))
for _, a := range p.allowFrom {
if a == "*" || a == email {
return true
}
}
return false
}
var sparkMentionRe = regexp.MustCompile(`(?s)<spark-mention[^>]*>.*?</spark-mention>`)
// stripMention removes Webex <spark-mention> tags and trims the result.
func stripMention(text string) string {
return strings.TrimSpace(sparkMentionRe.ReplaceAllString(text, ""))
}
// isMentioned reports whether the bot's selfID appears in mentionedPeople.
func (p *Platform) isMentioned(m *message) bool {
self := p.self()
for _, id := range m.MentionedPeople {
if id == self {
return true
}
}
return false
}
// shouldProcess applies the gate: allowlist + group-mention requirement.
func (p *Platform) shouldProcess(m *message) bool {
if !p.isAllowed(m.PersonEmail) {
return false
}
if m.RoomType == "group" && !p.isMentioned(m) {
return false
}
return true
}
// buildMessage converts a fetched Webex message into a core.Message,
// downloading any attachments and stripping group @mentions.
func (p *Platform) buildMessage(ctx context.Context, m *message) *core.Message {
content := m.Text
if m.RoomType == "group" {
content = stripMention(content)
}
cm := &core.Message{
SessionKey: fmt.Sprintf("webex:%s:%s", m.RoomID, m.PersonID),
Platform: "webex",
MessageID: m.ID,
ChannelID: m.RoomID,
ChannelKey: m.RoomID,
UserID: m.PersonEmail,
// Webex message API exposes no display name; use email. A /people lookup could enrich this later.
UserName: m.PersonEmail,
Content: content,
ReplyCtx: replyContext{roomID: m.RoomID, messageID: m.ID, personID: m.PersonID},
}
for _, url := range m.Files {
f, err := p.client.DownloadFile(ctx, url)
if err != nil {
slog.Error("webex: download file failed", "error", err)
continue
}
if strings.HasPrefix(f.MimeType, "image/") {
cm.Images = append(cm.Images, core.ImageAttachment{
MimeType: f.MimeType, Data: f.Data, FileName: f.FileName,
})
} else {
cm.Files = append(cm.Files, core.FileAttachment{
MimeType: f.MimeType, Data: f.Data, FileName: f.FileName,
})
}
}
return cm
}
func (p *Platform) messageHandler() core.MessageHandler {
p.mu.RLock()
defer p.mu.RUnlock()
return p.handler
}
func (p *Platform) isStopping() bool {
p.mu.RLock()
defer p.mu.RUnlock()
return p.stopping
}
const (
initialBackoff = time.Second
maxBackoff = 30 * time.Second
stableConnWindow = 10 * time.Second
)
// Start fetches the bot identity, registers a device, and launches the
// reconnecting WebSocket read loop in the background.
func (p *Platform) Start(handler core.MessageHandler) error {
p.mu.Lock()
if p.stopping {
p.mu.Unlock()
return fmt.Errorf("webex: platform stopped")
}
p.handler = handler
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
p.mu.Unlock()
me, err := p.client.GetMe(ctx)
if err != nil {
return fmt.Errorf("webex: getMe: %w", err)
}
p.mu.Lock()
p.selfID = me.ID
if len(me.Emails) > 0 {
p.selfEmail = me.Emails[0]
}
p.mu.Unlock()
slog.Info("webex: authenticated", "bot", me.DisplayName)
go p.connectLoop(ctx)
return nil
}
// Stop cancels the read loop and deletes the registered device.
func (p *Platform) Stop() error {
p.mu.Lock()
p.stopping = true
cancel := p.cancel
deviceURL := p.deviceURL
p.mu.Unlock()
if cancel != nil {
cancel()
}
if deviceURL != "" {
ctx, c := context.WithTimeout(context.Background(), 10*time.Second)
defer c()
if err := p.client.DeleteDevice(ctx, deviceURL); err != nil {
slog.Warn("webex: delete device failed", "error", err)
}
}
return nil
}
// SetLifecycleHandler implements core.AsyncRecoverablePlatform.
func (p *Platform) SetLifecycleHandler(h core.PlatformLifecycleHandler) {
p.mu.Lock()
defer p.mu.Unlock()
p.lifecycleHandler = h
}
// connectLoop registers a device, opens the WebSocket, and reconnects with
// exponential backoff until the context is cancelled.
func (p *Platform) connectLoop(ctx context.Context) {
backoff := initialBackoff
for {
if ctx.Err() != nil || p.isStopping() {
return
}
started := time.Now()
err := p.runConnection(ctx)
if ctx.Err() != nil || p.isStopping() {
return
}
if err != nil {
if errors.Is(err, errUnauthorized) {
slog.Error("webex: authentication failed, not retrying", "error", core.RedactToken(err.Error(), p.token))
if h := p.lifecycle(); h != nil {
h.OnPlatformUnavailable(p, err)
}
return
}
slog.Warn("webex: connection ended", "error", core.RedactToken(err.Error(), p.token), "backoff", backoff)
if h := p.lifecycle(); h != nil {
h.OnPlatformUnavailable(p, err)
}
}
if time.Since(started) >= stableConnWindow {
backoff = initialBackoff
} else if backoff < maxBackoff {
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
}
}
func (p *Platform) lifecycle() core.PlatformLifecycleHandler {
p.mu.RLock()
defer p.mu.RUnlock()
return p.lifecycleHandler
}
// runConnection registers a device, dials the WebSocket, and reads until the
// connection drops or the context is cancelled.
func (p *Platform) runConnection(ctx context.Context) error {
dev, err := p.client.CreateDevice(ctx)
if err != nil {
return fmt.Errorf("create device: %w", err)
}
p.mu.Lock()
prevDevice := p.deviceURL
p.deviceURL = dev.URL
p.mu.Unlock()
if prevDevice != "" && prevDevice != dev.URL {
if err := p.client.DeleteDevice(ctx, prevDevice); err != nil {
slog.Debug("webex: delete stale device failed", "error", err)
}
}
header := map[string][]string{"Authorization": {"Bearer " + p.token}}
conn, _, err := websocket.DefaultDialer.DialContext(ctx, dev.WebSocketURL, header)
if err != nil {
return fmt.Errorf("dial websocket: %s", core.RedactToken(err.Error(), p.token))
}
defer func() { _ = conn.Close() }()
slog.Info("webex: websocket connected")
if h := p.lifecycle(); h != nil {
h.OnPlatformReady(p)
}
connClosed := make(chan struct{})
defer close(connClosed)
go func() {
select {
case <-ctx.Done():
_ = conn.Close()
case <-connClosed:
}
}()
for {
_, data, err := conn.ReadMessage()
if err != nil {
if ctx.Err() != nil {
return nil
}
return fmt.Errorf("read websocket: %w", err)
}
p.handleFrame(ctx, data)
}
}
// handleFrame parses one Mercury WebSocket frame and dispatches qualifying
// messages. The frame body is encrypted, so we fetch the decrypted message
// via REST using the activity ID.
func (p *Platform) handleFrame(ctx context.Context, data []byte) {
var ev wsEvent
if err := json.Unmarshal(data, &ev); err != nil {
slog.Debug("webex: non-JSON frame", "error", err)
return
}
// "post" = text message, "share" = file/image upload. Other verbs (e.g.
// "update" for malware-scan completion, "delete") are re-notifications we
// must ignore to avoid double-processing.
if ev.Data.EventType != "conversation.activity" {
return
}
if ev.Data.Activity.Verb != "post" && ev.Data.Activity.Verb != "share" {
return
}
// Skip our own messages (actor email matches the bot).
if self := p.selfEmailAddr(); self != "" && strings.EqualFold(ev.Data.Activity.Actor.EmailAddress, self) {
return
}
msgID := activityIDToMessageID(ev.Data.Activity.ID)
if msgID == "" {
return
}
m, err := p.client.GetMessage(ctx, msgID)
if err != nil {
slog.Error("webex: fetch message failed", "error", err)
return
}
if !p.shouldProcess(m) {
slog.Debug("webex: message gated out", "room_type", m.RoomType, "from", m.PersonEmail)
return
}
handler := p.messageHandler()
if handler == nil {
return
}
handler(p, p.buildMessage(ctx, m))
}
// activityIDToMessageID converts a Mercury activity UUID into the public Webex
// message ID (base64 of the ciscospark://us/MESSAGE/{uuid} URI) accepted by
// GET /v1/messages/{id}.
func activityIDToMessageID(activityID string) string {
if activityID == "" {
return ""
}
return base64.StdEncoding.EncodeToString([]byte("ciscospark://us/MESSAGE/" + activityID))
}

View File

@@ -0,0 +1,138 @@
package webex
import (
"context"
"fmt"
"strings"
"unicode/utf8"
"github.com/chenhg5/cc-connect/core"
)
// webexMaxBytes is Webex's per-message body cap.
const webexMaxBytes = 7439
// asReplyContext recovers a replyContext from the engine's any-typed value.
func asReplyContext(replyCtx any) (replyContext, error) {
rc, ok := replyCtx.(replyContext)
if !ok {
return replyContext{}, fmt.Errorf("webex: invalid reply context %T", replyCtx)
}
return rc, nil
}
// Reply posts a threaded response to the originating message.
func (p *Platform) Reply(ctx context.Context, replyCtx any, content string) error {
rc, err := asReplyContext(replyCtx)
if err != nil {
return err
}
return p.post(ctx, rc.roomID, rc.messageID, content)
}
// Send posts a non-threaded (proactive) message to the room.
func (p *Platform) Send(ctx context.Context, replyCtx any, content string) error {
rc, err := asReplyContext(replyCtx)
if err != nil {
return err
}
return p.post(ctx, rc.roomID, "", content)
}
// post chunks content and posts each chunk; only the first chunk threads.
func (p *Platform) post(ctx context.Context, roomID, parentID, content string) error {
chunks := chunkMarkdown(content, webexMaxBytes)
for i, chunk := range chunks {
pid := ""
if i == 0 {
pid = parentID
}
if err := p.client.PostMessage(ctx, roomID, pid, chunk); err != nil {
return fmt.Errorf("webex: post chunk %d/%d: %w", i+1, len(chunks), err)
}
}
return nil
}
// chunkMarkdown splits text to fit within limit bytes, preferring paragraph
// (\n\n), then line (\n), then a hard cut.
func chunkMarkdown(text string, limit int) []string {
if len(text) <= limit {
return []string{text}
}
var out []string
rest := text
for len(rest) > limit {
cut := strings.LastIndex(rest[:limit], "\n\n")
if cut <= 0 {
cut = strings.LastIndex(rest[:limit], "\n")
}
if cut <= 0 {
cut = limit
// back up to a rune boundary so we never split a multibyte char
for cut > 0 && !utf8.RuneStart(rest[cut]) {
cut--
}
if cut == 0 { // pathological: single rune larger than limit; force full limit
cut = limit
}
}
out = append(out, rest[:cut])
rest = strings.TrimLeft(rest[cut:], "\n")
}
if rest != "" {
out = append(out, rest)
}
return out
}
// SendImage implements core.ImageSender.
func (p *Platform) SendImage(ctx context.Context, replyCtx any, img core.ImageAttachment) error {
rc, err := asReplyContext(replyCtx)
if err != nil {
return err
}
return p.client.PostFile(ctx, rc.roomID, &downloadedFile{
Data: img.Data, MimeType: img.MimeType, FileName: img.FileName,
})
}
// SendFile implements core.FileSender.
func (p *Platform) SendFile(ctx context.Context, replyCtx any, file core.FileAttachment) error {
rc, err := asReplyContext(replyCtx)
if err != nil {
return err
}
return p.client.PostFile(ctx, rc.roomID, &downloadedFile{
Data: file.Data, MimeType: file.MimeType, FileName: file.FileName,
})
}
// ReconstructReplyCtx implements core.ReplyContextReconstructor for cron jobs.
// Session key format is "webex:{roomID}:{personID}".
func (p *Platform) ReconstructReplyCtx(sessionKey string) (any, error) {
parts := strings.SplitN(sessionKey, ":", 3)
if len(parts) < 2 || parts[0] != "webex" {
return nil, fmt.Errorf("webex: cannot reconstruct reply ctx from %q", sessionKey)
}
rc := replyContext{roomID: parts[1]}
if len(parts) == 3 {
rc.personID = parts[2]
}
return rc, nil
}
// FormattingInstructions implements core.FormattingInstructionProvider.
func (p *Platform) FormattingInstructions() string {
return "Webex supports standard Markdown (bold, italic, lists, code blocks, links). Use it freely."
}
// Compile-time interface conformance checks.
var (
_ core.Platform = (*Platform)(nil)
_ core.ImageSender = (*Platform)(nil)
_ core.FileSender = (*Platform)(nil)
_ core.ReplyContextReconstructor = (*Platform)(nil)
_ core.FormattingInstructionProvider = (*Platform)(nil)
_ core.AsyncRecoverablePlatform = (*Platform)(nil)
)

View File

@@ -0,0 +1,306 @@
package webex
import (
"context"
"strings"
"testing"
"unicode/utf8"
"github.com/chenhg5/cc-connect/core"
)
func TestNewRequiresToken(t *testing.T) {
if _, err := New(map[string]any{}); err == nil {
t.Fatal("expected error when token is missing")
}
}
func TestNewParsesAllowFrom(t *testing.T) {
p, err := New(map[string]any{
"token": "abc",
"allow_from": "A@x.com, b@x.com",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
wp := p.(*Platform)
if len(wp.allowFrom) != 2 {
t.Fatalf("expected 2 allowed emails, got %d", len(wp.allowFrom))
}
}
// stubClient implements webexClient for tests.
type stubClient struct {
me *person
dev *device
msg *message
file *downloadedFile
posted []postedMsg
postedFiles []string
deletedURL string
}
type postedMsg struct {
roomID, parentID, markdown string
}
func (s *stubClient) GetMe(context.Context) (*person, error) { return s.me, nil }
func (s *stubClient) CreateDevice(context.Context) (*device, error) { return s.dev, nil }
func (s *stubClient) DeleteDevice(_ context.Context, url string) error {
s.deletedURL = url
return nil
}
func (s *stubClient) GetMessage(context.Context, string) (*message, error) { return s.msg, nil }
func (s *stubClient) DownloadFile(context.Context, string) (*downloadedFile, error) {
return s.file, nil
}
func (s *stubClient) PostMessage(_ context.Context, roomID, parentID, markdown string) error {
s.posted = append(s.posted, postedMsg{roomID, parentID, markdown})
return nil
}
func (s *stubClient) PostFile(_ context.Context, roomID string, f *downloadedFile) error {
s.postedFiles = append(s.postedFiles, roomID)
return nil
}
func TestStripMention(t *testing.T) {
in := `<spark-mention data-object-type="person" data-object-id="123">bot</spark-mention> hello there`
got := stripMention(in)
if got != "hello there" {
t.Fatalf("got %q, want %q", got, "hello there")
}
}
func TestStripMentionNoTag(t *testing.T) {
if got := stripMention("plain text"); got != "plain text" {
t.Fatalf("got %q", got)
}
}
func TestShouldProcessGroupRequiresMention(t *testing.T) {
p := &Platform{selfID: "bot-id"}
// group message that does NOT mention the bot
m := &message{RoomType: "group", PersonEmail: "u@x.com", MentionedPeople: []string{"someone-else"}}
if p.shouldProcess(m) {
t.Fatal("group message without bot mention should be skipped")
}
// group message that DOES mention the bot
m.MentionedPeople = []string{"bot-id"}
if !p.shouldProcess(m) {
t.Fatal("group message mentioning bot should be processed")
}
}
func TestShouldProcessDirectAlwaysOK(t *testing.T) {
p := &Platform{selfID: "bot-id"}
m := &message{RoomType: "direct", PersonEmail: "u@x.com"}
if !p.shouldProcess(m) {
t.Fatal("direct message should be processed")
}
}
func TestShouldProcessDeniedEmail(t *testing.T) {
p := &Platform{selfID: "bot-id", allowFrom: []string{"allowed@x.com"}}
m := &message{RoomType: "direct", PersonEmail: "stranger@x.com"}
if p.shouldProcess(m) {
t.Fatal("message from non-allowlisted email should be skipped")
}
}
func TestBuildMessageText(t *testing.T) {
p := &Platform{selfID: "bot-id", client: &stubClient{}}
m := &message{
ID: "msg1", RoomID: "room1", RoomType: "direct",
Text: "hello", PersonID: "p1", PersonEmail: "u@x.com",
}
cm := p.buildMessage(context.Background(), m)
if cm.Content != "hello" {
t.Fatalf("content = %q", cm.Content)
}
if cm.Platform != "webex" {
t.Fatalf("platform = %q", cm.Platform)
}
if cm.SessionKey != "webex:room1:p1" {
t.Fatalf("sessionKey = %q", cm.SessionKey)
}
rc, ok := cm.ReplyCtx.(replyContext)
if !ok || rc.roomID != "room1" || rc.messageID != "msg1" {
t.Fatalf("replyCtx = %+v", cm.ReplyCtx)
}
}
func TestBuildMessageGroupStripsMention(t *testing.T) {
p := &Platform{selfID: "bot-id", client: &stubClient{}}
m := &message{
ID: "m", RoomID: "r", RoomType: "group",
Text: `<spark-mention data-object-id="bot-id">bot</spark-mention> do the thing`,
PersonID: "p1", PersonEmail: "u@x.com",
MentionedPeople: []string{"bot-id"},
}
cm := p.buildMessage(context.Background(), m)
if cm.Content != "do the thing" {
t.Fatalf("content = %q", cm.Content)
}
}
func TestBuildMessageImageAttachment(t *testing.T) {
stub := &stubClient{file: &downloadedFile{Data: []byte{1, 2, 3}, MimeType: "image/png", FileName: "a.png"}}
p := &Platform{selfID: "bot-id", client: stub}
m := &message{
ID: "m", RoomID: "r", RoomType: "direct",
Text: "look", PersonID: "p1", PersonEmail: "u@x.com",
Files: []string{"https://webex/contents/1"},
}
cm := p.buildMessage(context.Background(), m)
if len(cm.Images) != 1 || cm.Images[0].MimeType != "image/png" {
t.Fatalf("images = %+v", cm.Images)
}
if len(cm.Files) != 0 {
t.Fatalf("expected no non-image files, got %d", len(cm.Files))
}
}
func TestBuildMessageNonImageFile(t *testing.T) {
stub := &stubClient{file: &downloadedFile{Data: []byte{1}, MimeType: "application/pdf", FileName: "r.pdf"}}
p := &Platform{selfID: "bot-id", client: stub}
m := &message{
ID: "m", RoomID: "r", RoomType: "direct",
PersonID: "p1", PersonEmail: "u@x.com",
Files: []string{"https://webex/contents/1"},
}
cm := p.buildMessage(context.Background(), m)
if len(cm.Files) != 1 || cm.Files[0].FileName != "r.pdf" {
t.Fatalf("files = %+v", cm.Files)
}
if len(cm.Images) != 0 {
t.Fatalf("expected no images, got %d", len(cm.Images))
}
}
func TestChunkUnderLimit(t *testing.T) {
chunks := chunkMarkdown("short", 100)
if len(chunks) != 1 || chunks[0] != "short" {
t.Fatalf("chunks = %v", chunks)
}
}
func TestChunkSplitsOnParagraph(t *testing.T) {
text := "aaaa\n\nbbbb\n\ncccc"
chunks := chunkMarkdown(text, 6) // forces splits
if len(chunks) < 2 {
t.Fatalf("expected multiple chunks, got %d: %v", len(chunks), chunks)
}
joined := strings.ReplaceAll(strings.Join(chunks, ""), "\n", "")
if !strings.Contains(joined, "aaaa") || !strings.Contains(joined, "cccc") {
t.Fatalf("content lost in chunking: %v", chunks)
}
}
func TestIsAllowedWildcard(t *testing.T) {
p := &Platform{allowFrom: parseAllowFrom("*")}
if !p.isAllowed("anyone@x.com") {
t.Fatal("wildcard should allow any email")
}
}
func TestIsAllowedEmptyAllowsAll(t *testing.T) {
p := &Platform{}
if !p.isAllowed("anyone@x.com") {
t.Fatal("empty allowlist should allow all")
}
}
func TestChunkMultibyteNoSplit(t *testing.T) {
// 10 CJK chars (3 bytes each = 30 bytes), limit 8 bytes, no newlines
text := strings.Repeat("世", 10)
chunks := chunkMarkdown(text, 8)
for _, c := range chunks {
if !utf8.ValidString(c) {
t.Fatalf("chunk is not valid UTF-8: %q", c)
}
}
if strings.Join(chunks, "") != text {
t.Fatalf("content lost: %q", strings.Join(chunks, ""))
}
}
func TestReplyPostsWithParent(t *testing.T) {
stub := &stubClient{}
p := &Platform{client: stub}
rc := replyContext{roomID: "r1", messageID: "m1"}
if err := p.Reply(context.Background(), rc, "hi"); err != nil {
t.Fatalf("Reply err: %v", err)
}
if len(stub.posted) != 1 || stub.posted[0].roomID != "r1" || stub.posted[0].parentID != "m1" {
t.Fatalf("posted = %+v", stub.posted)
}
}
func TestSendPostsWithoutParent(t *testing.T) {
stub := &stubClient{}
p := &Platform{client: stub}
rc := replyContext{roomID: "r1", messageID: "m1"}
if err := p.Send(context.Background(), rc, "yo"); err != nil {
t.Fatalf("Send err: %v", err)
}
if len(stub.posted) != 1 || stub.posted[0].parentID != "" {
t.Fatalf("posted = %+v", stub.posted)
}
}
func TestActivityIDToMessageID(t *testing.T) {
got := activityIDToMessageID("766bc5a0-6b31-11f1-9a28-4325af6c06a3")
want := "Y2lzY29zcGFyazovL3VzL01FU1NBR0UvNzY2YmM1YTAtNmIzMS0xMWYxLTlhMjgtNDMyNWFmNmMwNmEz"
if got != want {
t.Fatalf("got %q, want %q", got, want)
}
if activityIDToMessageID("") != "" {
t.Fatal("empty activity id should yield empty message id")
}
}
func TestHandleFrameIgnoresNonActivity(t *testing.T) {
// A conversation.highlight frame must not trigger a GetMessage call.
stub := &stubClient{}
p := &Platform{client: stub, selfEmail: "bot@webex.bot"}
p.handler = func(_ core.Platform, _ *core.Message) { t.Fatal("should not dispatch") }
p.handleFrame(context.Background(), []byte(`{"data":{"eventType":"conversation.highlight"}}`))
// no panic, no dispatch = pass
}
// activityFrame builds a minimal conversation.activity frame JSON for the given verb.
func activityFrame(verb string) []byte {
return []byte(`{"data":{"eventType":"conversation.activity","activity":{` +
`"id":"766bc5a0-6b31-11f1-9a28-4325af6c06a3","verb":"` + verb + `",` +
`"actor":{"emailAddress":"user@x.com"}}}}`)
}
func TestHandleFramePostDispatches(t *testing.T) {
stub := &stubClient{msg: &message{ID: "m", RoomID: "r", RoomType: "direct", Text: "hi", PersonID: "p", PersonEmail: "user@x.com"}}
p := &Platform{client: stub, selfEmail: "bot@webex.bot"}
dispatched := false
p.handler = func(_ core.Platform, _ *core.Message) { dispatched = true }
p.handleFrame(context.Background(), activityFrame("post"))
if !dispatched {
t.Fatal("post verb should dispatch")
}
}
func TestHandleFrameShareDispatches(t *testing.T) {
stub := &stubClient{msg: &message{ID: "m", RoomID: "r", RoomType: "direct", Text: "", PersonID: "p", PersonEmail: "user@x.com"}}
p := &Platform{client: stub, selfEmail: "bot@webex.bot"}
dispatched := false
p.handler = func(_ core.Platform, _ *core.Message) { dispatched = true }
p.handleFrame(context.Background(), activityFrame("share"))
if !dispatched {
t.Fatal("share verb (file upload) should dispatch")
}
}
func TestHandleFrameUpdateIgnored(t *testing.T) {
// "update" is a re-notification (e.g. malware scan complete) and must not dispatch.
stub := &stubClient{msg: &message{ID: "m", RoomID: "r", RoomType: "direct", PersonEmail: "user@x.com"}}
p := &Platform{client: stub, selfEmail: "bot@webex.bot"}
p.handler = func(_ core.Platform, _ *core.Message) { t.Fatal("update verb should not dispatch") }
p.handleFrame(context.Background(), activityFrame("update"))
}