mirror of
https://github.com/larksuite/cli.git
synced 2026-07-03 14:02:43 +08:00
Framework support for resource-scoped event subscriptions, so one EventKey can fan out into independent per-resource subscription scopes: - KeyDefinition gains SubscriptionKey / NormalizeParams / Match hooks - ComputeSubscriptionID derives a dedup identity from (EventKey, sub-key params); plumbed through bus Hub, consume loop, and the Hello / PreShutdownCheck / ConsumerInfo protocol messages - add a synchronous Match filter stage before Process - change PreConsume cleanup to func() error and surface cleanup (unsubscribe) failures as WARN with an idempotency note - adapt minutes/vc/whiteboard PreConsume to the new cleanup signature - render SubscriptionID / SubscriptionKey in event status & schema output No domain wires these hooks yet; covered by unit tests using bus/protocol doubles. (Mail, the original exerciser, is intentionally not included.) Change-Id: Ifc743f1aa0bc4dff0c8a1e35da24883694fe7699
336 lines
9.3 KiB
Go
336 lines
9.3 KiB
Go
// Copyright (c) 2026 Lark Technologies Pte. Ltd.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package event
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/spf13/cobra"
|
|
|
|
"github.com/larksuite/cli/internal/cmdutil"
|
|
"github.com/larksuite/cli/internal/event/busctl"
|
|
"github.com/larksuite/cli/internal/event/busdiscover"
|
|
"github.com/larksuite/cli/internal/event/protocol"
|
|
"github.com/larksuite/cli/internal/event/transport"
|
|
"github.com/larksuite/cli/internal/output"
|
|
)
|
|
|
|
func NewCmdStatus(f *cmdutil.Factory) *cobra.Command {
|
|
var (
|
|
asJSON bool
|
|
current bool
|
|
failOnOrphan bool
|
|
)
|
|
cmd := &cobra.Command{
|
|
Use: "status",
|
|
Short: "Show event bus daemon status for all discovered apps",
|
|
Long: "Connect to each bus daemon under the config-dir/events/ tree and show PID, uptime, and active consumers. Use --current for only the current profile's app. Use --json for machine-readable output. Use --fail-on-orphan to exit 2 when any orphan bus is detected (for health checks).",
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
return runStatus(f, current, asJSON, failOnOrphan)
|
|
},
|
|
}
|
|
cmd.Flags().BoolVar(&asJSON, "json", false, "Emit status as JSON (for AI / scripts)")
|
|
cmd.Flags().BoolVar(¤t, "current", false, "Only show status for the current profile's app")
|
|
cmd.Flags().BoolVar(&failOnOrphan, "fail-on-orphan", false, "Exit 2 when any orphan bus is detected (default: always exit 0)")
|
|
cmdutil.SetRisk(cmd, "read")
|
|
return cmd
|
|
}
|
|
|
|
type busState int
|
|
|
|
const (
|
|
stateNotRunning busState = iota
|
|
stateRunning
|
|
stateOrphan
|
|
)
|
|
|
|
func (s busState) String() string {
|
|
switch s {
|
|
case stateRunning:
|
|
return "running"
|
|
case stateOrphan:
|
|
return "orphan"
|
|
default:
|
|
return "not_running"
|
|
}
|
|
}
|
|
|
|
// appStatus bundles one AppID's derived status; State picks which fields are meaningful.
|
|
type appStatus struct {
|
|
AppID string
|
|
State busState
|
|
PID int
|
|
UptimeSec int
|
|
Active int
|
|
Consumers []protocol.ConsumerInfo
|
|
}
|
|
|
|
type busQuerier interface {
|
|
QueryBusStatus(appID string) (*protocol.StatusResponse, error)
|
|
}
|
|
|
|
// singleAppScanner wraps a Scanner and filters to one AppID for --current queries.
|
|
type singleAppScanner struct {
|
|
appID string
|
|
inner busdiscover.Scanner
|
|
}
|
|
|
|
func (s singleAppScanner) ScanBusProcesses() ([]busdiscover.Process, error) {
|
|
if s.inner == nil {
|
|
return nil, nil
|
|
}
|
|
all, err := s.inner.ScanBusProcesses()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := all[:0]
|
|
for _, p := range all {
|
|
if p.AppID == s.appID {
|
|
out = append(out, p)
|
|
}
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
type transportQuerier struct {
|
|
tr transport.IPC
|
|
}
|
|
|
|
func (q *transportQuerier) QueryBusStatus(appID string) (*protocol.StatusResponse, error) {
|
|
return busctl.QueryStatus(q.tr, appID)
|
|
}
|
|
|
|
func runStatus(f *cmdutil.Factory, current, asJSON, failOnOrphan bool) error {
|
|
cfg, err := f.Config()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
seeds := map[string]struct{}{}
|
|
if current {
|
|
seeds[cfg.AppID] = struct{}{}
|
|
} else {
|
|
for _, id := range discoverAppIDs() {
|
|
seeds[id] = struct{}{}
|
|
}
|
|
// Always include the current profile so a first-time user sees it as not_running.
|
|
seeds[cfg.AppID] = struct{}{}
|
|
}
|
|
seedList := make([]string, 0, len(seeds))
|
|
for id := range seeds {
|
|
seedList = append(seedList, id)
|
|
}
|
|
|
|
tr := transport.New()
|
|
// --current: scope the scanner to this AppID so unrelated orphans don't surface.
|
|
var scanner busdiscover.Scanner
|
|
if current {
|
|
scanner = singleAppScanner{appID: cfg.AppID, inner: busdiscover.Default()}
|
|
} else {
|
|
scanner = busdiscover.Default()
|
|
}
|
|
statuses := deriveStatuses(
|
|
seedList,
|
|
scanner,
|
|
&transportQuerier{tr: tr},
|
|
time.Now(),
|
|
)
|
|
|
|
if asJSON {
|
|
if err := writeStatusJSON(f.IOStreams.Out, statuses); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
writeStatusText(f.IOStreams.Out, statuses)
|
|
}
|
|
return exitForOrphan(statuses, failOnOrphan)
|
|
}
|
|
|
|
// deriveStatuses classifies each AppID as running/orphan/not_running from socket + process-scan inputs; scanner errors are non-fatal.
|
|
func deriveStatuses(seedAppIDs []string, sc busdiscover.Scanner, q busQuerier, now time.Time) []appStatus {
|
|
procByAppID := map[string]busdiscover.Process{}
|
|
if sc != nil {
|
|
if procs, err := sc.ScanBusProcesses(); err == nil {
|
|
for _, p := range procs {
|
|
procByAppID[p.AppID] = p
|
|
}
|
|
}
|
|
}
|
|
|
|
ids := map[string]struct{}{}
|
|
for _, id := range seedAppIDs {
|
|
ids[id] = struct{}{}
|
|
}
|
|
for id := range procByAppID {
|
|
ids[id] = struct{}{}
|
|
}
|
|
sorted := make([]string, 0, len(ids))
|
|
for id := range ids {
|
|
sorted = append(sorted, id)
|
|
}
|
|
sort.Strings(sorted)
|
|
|
|
// Query in parallel so one wedged peer can't compound the per-op deadline across many apps.
|
|
type probe struct {
|
|
resp *protocol.StatusResponse
|
|
err error
|
|
}
|
|
probes := make([]probe, len(sorted))
|
|
var wg sync.WaitGroup
|
|
for i, appID := range sorted {
|
|
wg.Add(1)
|
|
go func(i int, appID string) {
|
|
defer wg.Done()
|
|
probes[i].resp, probes[i].err = q.QueryBusStatus(appID)
|
|
}(i, appID)
|
|
}
|
|
wg.Wait()
|
|
|
|
result := make([]appStatus, 0, len(sorted))
|
|
for i, appID := range sorted {
|
|
s := appStatus{AppID: appID, State: stateNotRunning}
|
|
if probes[i].err == nil {
|
|
resp := probes[i].resp
|
|
s.State = stateRunning
|
|
s.PID = resp.PID
|
|
s.UptimeSec = resp.UptimeSec
|
|
s.Active = resp.ActiveConns
|
|
s.Consumers = resp.Consumers
|
|
} else if p, ok := procByAppID[appID]; ok {
|
|
s.State = stateOrphan
|
|
s.PID = p.PID
|
|
s.UptimeSec = int(now.Sub(p.StartTime).Seconds())
|
|
}
|
|
result = append(result, s)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// humanizeDuration formats d as a coarse "N unit ago" string.
|
|
func humanizeDuration(d time.Duration) string {
|
|
s := int(d.Seconds())
|
|
if s < 60 {
|
|
return fmt.Sprintf("%ds ago", s)
|
|
}
|
|
m := s / 60
|
|
if m < 60 {
|
|
return fmt.Sprintf("%dm ago", m)
|
|
}
|
|
h := m / 60
|
|
if h < 24 {
|
|
return fmt.Sprintf("%dh ago", h)
|
|
}
|
|
return fmt.Sprintf("%dd ago", h/24)
|
|
}
|
|
|
|
func writeStatusText(out io.Writer, statuses []appStatus) {
|
|
for i, s := range statuses {
|
|
if i > 0 {
|
|
fmt.Fprintln(out)
|
|
}
|
|
fmt.Fprintf(out, "── %s ──\n", s.AppID)
|
|
switch s.State {
|
|
case stateNotRunning:
|
|
fmt.Fprintln(out, " Bus: not running")
|
|
case stateRunning:
|
|
fmt.Fprintf(out, " Bus: running (PID %d, uptime %s)\n",
|
|
s.PID, (time.Duration(s.UptimeSec) * time.Second).String())
|
|
fmt.Fprintf(out, " Active consumers: %d\n", s.Active)
|
|
if len(s.Consumers) > 0 {
|
|
headers := []string{"CONSUMER", "EVENT KEY", "SUB", "RECEIVED", "DROPPED"}
|
|
rows := make([][]string, 0, len(s.Consumers))
|
|
for _, c := range s.Consumers {
|
|
subDisplay := "-"
|
|
if c.SubscriptionID != "" && c.SubscriptionID != c.EventKey {
|
|
subDisplay = strings.TrimPrefix(c.SubscriptionID, c.EventKey+":")
|
|
}
|
|
rows = append(rows, []string{
|
|
fmt.Sprintf("pid=%d", c.PID),
|
|
c.EventKey,
|
|
subDisplay,
|
|
fmt.Sprintf("%d", c.Received),
|
|
fmt.Sprintf("%d", c.Dropped),
|
|
})
|
|
}
|
|
widths := tableWidths(headers, rows)
|
|
const colGap = " "
|
|
fmt.Fprintln(out)
|
|
fmt.Fprint(out, " ")
|
|
printTableRow(out, widths, headers, colGap)
|
|
for _, row := range rows {
|
|
fmt.Fprint(out, " ")
|
|
printTableRow(out, widths, row, colGap)
|
|
}
|
|
}
|
|
case stateOrphan:
|
|
if s.PID == 0 {
|
|
fmt.Fprintln(out, " Bus: orphan (PID unknown — bus.pid file unreadable)")
|
|
fmt.Fprintln(out, " Issue: live bus detected but pid file is missing or corrupt")
|
|
fmt.Fprintln(out, " Action: inspect ~/.lark-cli/events/<app>/bus.pid and kill manually")
|
|
break
|
|
}
|
|
fmt.Fprintf(out, " Bus: orphan (PID %d, started %s)\n",
|
|
s.PID, humanizeDuration(time.Duration(s.UptimeSec)*time.Second))
|
|
fmt.Fprintln(out, " Issue: socket file missing — consumers cannot connect")
|
|
fmt.Fprintf(out, " Action: kill %d\n", s.PID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func writeStatusJSON(w io.Writer, statuses []appStatus) error {
|
|
type jsonStatus struct {
|
|
AppID string `json:"app_id"`
|
|
Status string `json:"status"`
|
|
Running bool `json:"running"` // backward compat
|
|
PID int `json:"pid,omitempty"`
|
|
UptimeSec int `json:"uptime_sec,omitempty"`
|
|
Active int `json:"active_consumers,omitempty"`
|
|
Consumers []protocol.ConsumerInfo `json:"consumers,omitempty"`
|
|
Issue string `json:"issue,omitempty"`
|
|
SuggestedAction string `json:"suggested_action,omitempty"`
|
|
}
|
|
payload := make([]jsonStatus, 0, len(statuses))
|
|
for _, s := range statuses {
|
|
js := jsonStatus{
|
|
AppID: s.AppID,
|
|
Status: s.State.String(),
|
|
Running: s.State == stateRunning,
|
|
PID: s.PID,
|
|
UptimeSec: s.UptimeSec,
|
|
Active: s.Active,
|
|
Consumers: s.Consumers,
|
|
}
|
|
if s.State == stateOrphan {
|
|
if s.PID == 0 {
|
|
js.Issue = "live bus detected but pid file is missing or corrupt"
|
|
js.SuggestedAction = "inspect events dir and kill manually"
|
|
} else {
|
|
js.Issue = "socket file missing"
|
|
js.SuggestedAction = fmt.Sprintf("kill %d", s.PID)
|
|
}
|
|
}
|
|
payload = append(payload, js)
|
|
}
|
|
output.PrintJson(w, map[string]interface{}{"apps": payload})
|
|
return nil
|
|
}
|
|
|
|
// exitForOrphan returns ExitValidation iff failOnOrphan and any status is orphan; default exit 0 preserves observe-only semantics.
|
|
func exitForOrphan(statuses []appStatus, failOnOrphan bool) error {
|
|
if !failOnOrphan {
|
|
return nil
|
|
}
|
|
for _, s := range statuses {
|
|
if s.State == stateOrphan {
|
|
return output.ErrBare(output.ExitValidation)
|
|
}
|
|
}
|
|
return nil
|
|
}
|