feat(timer): add one-shot delayed task system (/timer) (#1012)

* feat(timer): add one-shot delayed task system (/timer)

Introduces a one-shot timer feature parallel to the existing cron
(recurring) system. Users can schedule delayed tasks via chat command
(/timer add 2h check PR status), CLI (cc-connect timer add --delay 2h),
or agent system prompt.

Core changes:
- core/timer.go: TimerJob, TimerStore, TimerScheduler, ParseDelayOrTime
- core/timer_test.go: 13 unit tests covering store, scheduler, parsing
- cmd/cc-connect/timer.go: CLI subcommands (add/list/del/info)
- core/engine.go: ExecuteTimerJob, cmdTimer, renderTimerCard, shell exec
- core/api.go: /timer/add, /timer/list, /timer/info, /timer/del endpoints
- core/i18n.go: 22 MsgTimer* keys with 5-language translations
- core/hooks.go: HookEventTimerTriggered event
- core/interfaces.go: agent system prompt section for timers
- core/management.go: SetTimerScheduler wiring
- cmd/cc-connect/main.go: timer store/scheduler lifecycle

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(timer): add /timer to help card and improve usage docs

- Add /timer to /help card tools section (all 5 languages)
- Add /timer to text-based /help fallback (all 5 languages)
- Improve MsgTimerAddUsage with both relative and absolute time examples

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix(timer): use local timezone for absolute time parsing

When a user specifies an absolute time without timezone (e.g. "9:00"),
it should be interpreted as local time, not UTC. Use time.ParseInLocation
with time.Local for layouts that don't include a timezone component.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* docs(timer): clarify local timezone for absolute time

Absolute times without timezone (e.g. "2026-05-16T09:00") use the
system's local timezone, not UTC. This is now documented in:
- Agent system prompt (core/interfaces.go)
- /timer usage message (MsgTimerUsage, all 5 languages)
- /timer add usage message (MsgTimerAddUsage, all 5 languages)
- CLI --help text (cmd/cc-connect/timer.go)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix(timer): use time.Until instead of Sub(time.Now()) for lint

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix(timer): sync cron fixes for session_key validation and slash prompt expansion

Reference PR #973: reject empty session_key in validateTimerJob so
management API doesn't persist unrunnable timer jobs.

Reference PR #928: resolve /skill slash prompts through skill registry
in ExecuteTimerJob before constructing the agent message.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: lint - errcheck resp.Body.Close, staticcheck WriteString(fmt.Sprintf)

* fix: two more WriteString(fmt.Sprintf) → fmt.Fprintf

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Han
2026-06-11 00:10:12 +08:00
committed by GitHub
parent da6b92f9f7
commit e4c9e8e148
10 changed files with 2563 additions and 0 deletions

View File

@@ -146,6 +146,9 @@ func main() {
case "cron":
runCron(os.Args[2:])
return
case "timer", "at":
runTimer(os.Args[2:])
return
case "relay":
runRelay(os.Args[2:])
return
@@ -897,6 +900,26 @@ func main() {
}
}
// Start timer scheduler
timerStore, err := core.NewTimerStore(cfg.DataDir)
if err != nil {
slog.Warn("timer store unavailable", "error", err)
}
var timerSched *core.TimerScheduler
if timerStore != nil {
timerSched = core.NewTimerScheduler(timerStore)
if cfg.Cron.Silent != nil && *cfg.Cron.Silent {
timerSched.SetDefaultSilent(true)
}
if cfg.Cron.SessionMode != "" {
timerSched.SetDefaultSessionMode(cfg.Cron.SessionMode)
}
for i, e := range engines {
timerSched.RegisterEngine(cfg.Projects[i].Name, e)
e.SetTimerScheduler(timerSched)
}
}
// Start heartbeat scheduler
heartbeatSched := core.NewHeartbeatScheduler(cfg.DataDir)
for i, proj := range cfg.Projects {
@@ -926,6 +949,12 @@ func main() {
}
}
if timerSched != nil {
if err := timerSched.Start(); err != nil {
slog.Error("timer scheduler start failed", "error", err)
}
}
heartbeatSched.Start()
// Start bridge server if enabled
@@ -990,6 +1019,9 @@ func main() {
if cronSched != nil {
mgmtSrv.SetCronScheduler(cronSched)
}
if timerSched != nil {
mgmtSrv.SetTimerScheduler(timerSched)
}
mgmtSrv.SetHeartbeatScheduler(heartbeatSched)
if bridgeSrv != nil {
mgmtSrv.SetBridgeServer(bridgeSrv)
@@ -1173,6 +1205,9 @@ func main() {
if cronSched != nil {
apiSrv.SetCronScheduler(cronSched)
}
if timerSched != nil {
apiSrv.SetTimerScheduler(timerSched)
}
apiSrv.Start()
}
@@ -1208,6 +1243,9 @@ func main() {
webhookSrv.Stop()
}
heartbeatSched.Stop()
if timerSched != nil {
timerSched.Stop()
}
if cronSched != nil {
cronSched.Stop()
}

426
cmd/cc-connect/timer.go Normal file
View File

@@ -0,0 +1,426 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"strconv"
"strings"
)
func runTimer(args []string) {
if len(args) == 0 {
printTimerUsage()
return
}
switch args[0] {
case "add":
runTimerAdd(args[1:])
case "list":
runTimerList(args[1:])
case "info":
runTimerInfo(args[1:])
case "del", "delete", "rm", "remove":
runTimerDel(args[1:])
case "--help", "-h", "help":
printTimerUsage()
default:
fmt.Fprintf(os.Stderr, "Unknown timer subcommand: %s\n", args[0])
printTimerUsage()
os.Exit(1)
}
}
func runTimerAdd(args []string) {
var project, sessionKey, delay, atTime, prompt, execCmd, desc, dataDir, sessionMode string
var timeoutMins *int
var mute bool
var positional []string
for i := 0; i < len(args); i++ {
switch args[i] {
case "--project", "-p":
if i+1 < len(args) {
i++
project = args[i]
}
case "--session-key", "--session", "-s":
if i+1 < len(args) {
i++
sessionKey = args[i]
}
case "--delay", "-d":
if i+1 < len(args) {
i++
delay = args[i]
}
case "--at", "-a":
if i+1 < len(args) {
i++
atTime = args[i]
}
case "--prompt":
if i+1 < len(args) {
i++
prompt = args[i]
}
case "--exec":
if i+1 < len(args) {
i++
execCmd = args[i]
}
case "--desc", "--description":
if i+1 < len(args) {
i++
desc = args[i]
}
case "--data-dir":
if i+1 < len(args) {
i++
dataDir = args[i]
}
case "--session-mode":
if i+1 < len(args) {
i++
sessionMode = args[i]
}
case "--timeout-mins":
if i+1 < len(args) {
i++
v, err := strconv.Atoi(args[i])
if err != nil {
fmt.Fprintf(os.Stderr, "Error: invalid --timeout-mins: %v\n", err)
os.Exit(1)
}
timeoutMins = &v
}
case "--mute":
mute = true
case "--help", "-h":
printTimerAddUsage()
return
default:
positional = append(positional, args[i])
}
}
// Fallback to env vars
if project == "" {
project = os.Getenv("CC_PROJECT")
}
if sessionKey == "" {
sessionKey = os.Getenv("CC_SESSION_KEY")
}
// Positional: <delay_or_time> <prompt...>
if delay == "" && atTime == "" && len(positional) >= 2 {
delay = positional[0]
if prompt == "" && execCmd == "" {
prompt = strings.Join(positional[1:], " ")
}
} else if prompt == "" && execCmd == "" && len(positional) > 0 {
prompt = strings.Join(positional, " ")
}
fireTime := delay
if atTime != "" {
fireTime = atTime
}
if fireTime == "" || (prompt == "" && execCmd == "") {
fmt.Fprintln(os.Stderr, "Error: delay/at time and either --prompt or --exec are required")
printTimerAddUsage()
os.Exit(1)
}
if prompt != "" && execCmd != "" {
fmt.Fprintln(os.Stderr, "Error: --prompt and --exec are mutually exclusive")
os.Exit(1)
}
sockPath := resolveSocketPath(dataDir)
if _, err := os.Stat(sockPath); os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Error: cc-connect is not running (socket not found: %s)\n", sockPath)
os.Exit(1)
}
body := map[string]any{
"project": project,
"session_key": sessionKey,
"delay": fireTime,
"prompt": prompt,
"exec": execCmd,
"description": desc,
"mute": mute,
}
if sessionMode != "" {
body["session_mode"] = sessionMode
}
if timeoutMins != nil {
body["timeout_mins"] = *timeoutMins
}
payload, _ := json.Marshal(body)
resp, err := apiPost(sockPath, "/timer/add", payload)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
defer func() { _ = resp.Body.Close() }()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
fmt.Fprintf(os.Stderr, "Error: %s\n", strings.TrimSpace(string(respBody)))
os.Exit(1)
}
var result map[string]any
if err := json.Unmarshal(respBody, &result); err != nil {
fmt.Fprintf(os.Stderr, "Error: invalid response: %v\n", err)
os.Exit(1)
}
fmt.Printf("Timer created: %s\n", result["id"])
fmt.Printf("Fires at: %s\n", result["scheduled_at"])
if execCmd != "" {
fmt.Printf("Command: %s\n", result["exec"])
} else {
fmt.Printf("Prompt: %s\n", result["prompt"])
}
}
func runTimerList(args []string) {
var project, dataDir string
for i := 0; i < len(args); i++ {
switch args[i] {
case "--project", "-p":
if i+1 < len(args) {
i++
project = args[i]
}
case "--data-dir":
if i+1 < len(args) {
i++
dataDir = args[i]
}
}
}
if project == "" {
project = os.Getenv("CC_PROJECT")
}
sockPath := resolveSocketPath(dataDir)
if _, err := os.Stat(sockPath); os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Error: cc-connect is not running (socket not found: %s)\n", sockPath)
os.Exit(1)
}
url := "/timer/list"
if project != "" {
url += "?project=" + project
}
client := &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", sockPath)
},
},
}
resp, err := client.Get("http://unix" + url)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
fmt.Fprintf(os.Stderr, "Error: %s\n", strings.TrimSpace(string(body)))
os.Exit(1)
}
var jobs []map[string]any
if err := json.Unmarshal(body, &jobs); err != nil {
fmt.Fprintf(os.Stderr, "Error: invalid response: %v\n", err)
os.Exit(1)
}
if len(jobs) == 0 {
fmt.Println("No pending timers.")
return
}
fmt.Printf("Pending timers (%d):\n\n", len(jobs))
for _, j := range jobs {
id, _ := j["id"].(string)
scheduledAt, _ := j["scheduled_at"].(string)
prompt, _ := j["prompt"].(string)
execCmd, _ := j["exec"].(string)
desc, _ := j["description"].(string)
display := desc
if display == "" {
if execCmd != "" {
display = execCmd
} else {
display = prompt
}
if len(display) > 60 {
display = display[:60] + "..."
}
}
muteStr := ""
if m, ok := j["mute"].(bool); ok && m {
muteStr = " [mute]"
}
fmt.Printf(" ⏰ %s %s %s%s\n", id, scheduledAt, display, muteStr)
}
}
func runTimerDel(args []string) {
var dataDir string
var id string
for i := 0; i < len(args); i++ {
switch args[i] {
case "--data-dir":
if i+1 < len(args) {
i++
dataDir = args[i]
}
default:
id = args[i]
}
}
if id == "" {
fmt.Fprintln(os.Stderr, "Error: timer ID is required")
os.Exit(1)
}
sockPath := resolveSocketPath(dataDir)
if _, err := os.Stat(sockPath); os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Error: cc-connect is not running (socket not found: %s)\n", sockPath)
os.Exit(1)
}
payload, _ := json.Marshal(map[string]string{"id": id})
resp, err := apiPost(sockPath, "/timer/del", payload)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
fmt.Fprintf(os.Stderr, "Error: %s\n", strings.TrimSpace(string(body)))
os.Exit(1)
}
fmt.Printf("Timer %s cancelled.\n", id)
}
func runTimerInfo(args []string) {
var dataDir, id string
for i := 0; i < len(args); i++ {
switch args[i] {
case "--data-dir":
if i+1 < len(args) {
i++
dataDir = args[i]
}
default:
id = args[i]
}
}
if id == "" {
fmt.Fprintln(os.Stderr, "Error: timer ID is required")
fmt.Fprintln(os.Stderr, "Usage: cc-connect timer info <id>")
os.Exit(1)
}
sockPath := resolveSocketPath(dataDir)
if _, err := os.Stat(sockPath); os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Error: cc-connect is not running (socket not found: %s)\n", sockPath)
os.Exit(1)
}
client := &http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", sockPath)
},
},
}
resp, err := client.Get("http://unix/timer/info?id=" + id)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode == http.StatusNotFound {
fmt.Fprintf(os.Stderr, "Error: timer '%s' not found\n", id)
os.Exit(1)
}
if resp.StatusCode != http.StatusOK {
fmt.Fprintf(os.Stderr, "Error: %s\n", strings.TrimSpace(string(body)))
os.Exit(1)
}
var prettyJSON bytes.Buffer
if err := json.Indent(&prettyJSON, body, "", " "); err != nil {
fmt.Fprintf(os.Stderr, "Error: invalid JSON response: %v\n", err)
os.Exit(1)
}
fmt.Println(prettyJSON.String())
}
func printTimerUsage() {
fmt.Println(`Usage: cc-connect timer <command> [options]
Commands:
add Create a one-shot timer
list List pending timers
info <id> Show detailed info of a timer
del <id> Cancel a timer
Run 'cc-connect timer <command> --help' for details.`)
}
func printTimerAddUsage() {
fmt.Println(`Usage: cc-connect timer add [options] <delay> <prompt>
Create a one-shot timer (fires once after the specified delay).
Options:
-p, --project <name> Target project (auto-detected from CC_PROJECT env)
-s, --session-key <key> Target session (auto-detected from CC_SESSION_KEY env)
-d, --delay <duration> Delay from now (e.g. 30m, 2h, 1h30m)
-a, --at <time> Absolute ISO time (e.g. 2026-05-16T09:00, local timezone)
--prompt <text> Task prompt (runs through agent)
--exec <command> Shell command (runs directly, mutually exclusive with --prompt)
--desc <text> Short description
--session-mode <mode> reuse (default) or new-per-run
--timeout-mins <n> Max minutes to wait per run (0 = no limit; default 30)
--mute Suppress all messages (start + result)
--data-dir <path> Data directory (default: ~/.cc-connect)
-h, --help Show this help
Examples:
cc-connect timer add --delay 2h --prompt "Check PR status"
cc-connect timer add --delay 30m --exec "df -h" --desc "Disk check"
cc-connect timer add --at "2026-05-16T09:00" --prompt "Morning standup reminder"
cc-connect timer add 2h Check PR status`)
}

View File

@@ -24,6 +24,7 @@ type APIServer struct {
mux *http.ServeMux
engines map[string]*Engine // project name → engine
cron *CronScheduler
timer *TimerScheduler
relay *RelayManager
mu sync.RWMutex
}
@@ -73,6 +74,10 @@ func NewAPIServer(dataDir string) (*APIServer, error) {
s.mux.HandleFunc("/cron/info", s.handleCronInfo)
s.mux.HandleFunc("/cron/edit", s.handleCronEdit)
s.mux.HandleFunc("/cron/del", s.handleCronDel)
s.mux.HandleFunc("/timer/add", s.handleTimerAdd)
s.mux.HandleFunc("/timer/list", s.handleTimerList)
s.mux.HandleFunc("/timer/info", s.handleTimerInfo)
s.mux.HandleFunc("/timer/del", s.handleTimerDel)
s.mux.HandleFunc("/cron/exec", s.handleCronExec)
s.mux.HandleFunc("/cron/run", s.handleCronExec)
s.mux.HandleFunc("/relay/send", s.handleRelaySend)
@@ -107,6 +112,10 @@ func (s *APIServer) SetCronScheduler(cs *CronScheduler) {
s.cron = cs
}
func (s *APIServer) SetTimerScheduler(ts *TimerScheduler) {
s.timer = ts
}
func (s *APIServer) Start() {
s.server = &http.Server{Handler: s.mux}
go func() {
@@ -479,6 +488,195 @@ func (s *APIServer) handleCronEdit(w http.ResponseWriter, r *http.Request) {
apiJSON(w, http.StatusOK, job)
}
// ── Timer API ─────────────────────────────────────────────────
// TimerAddRequest is the JSON body for POST /timer/add.
type TimerAddRequest struct {
Project string `json:"project"`
SessionKey string `json:"session_key"`
Delay string `json:"delay"` // relative ("2h") or absolute ISO time
Prompt string `json:"prompt"`
Exec string `json:"exec"`
WorkDir string `json:"work_dir"`
Description string `json:"description"`
Silent *bool `json:"silent,omitempty"`
Mute bool `json:"mute,omitempty"`
SessionMode string `json:"session_mode,omitempty"`
Mode string `json:"mode,omitempty"`
TimeoutMins *int `json:"timeout_mins,omitempty"`
}
func (s *APIServer) handleTimerAdd(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
if s.timer == nil {
http.Error(w, "timer scheduler not available", http.StatusServiceUnavailable)
return
}
var req TimerAddRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.Delay == "" {
http.Error(w, "delay is required (e.g. 2h, 30m, or ISO time)", http.StatusBadRequest)
return
}
if req.Prompt == "" && req.Exec == "" {
http.Error(w, "either prompt or exec is required", http.StatusBadRequest)
return
}
if req.Prompt != "" && req.Exec != "" {
http.Error(w, "prompt and exec are mutually exclusive", http.StatusBadRequest)
return
}
fireAt, err := ParseDelayOrTime(req.Delay)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
project := req.Project
if project == "" {
s.mu.RLock()
if len(s.engines) == 1 {
for name := range s.engines {
project = name
}
}
s.mu.RUnlock()
}
if project == "" {
http.Error(w, "project is required (multiple projects configured)", http.StatusBadRequest)
return
}
sessionKey := req.SessionKey
if sessionKey == "" {
s.mu.RLock()
engine := s.engines[project]
s.mu.RUnlock()
if engine != nil {
keys := engine.ActiveSessionKeys()
if len(keys) == 1 {
sessionKey = keys[0]
}
}
}
if sessionKey == "" {
http.Error(w, "session_key is required", http.StatusBadRequest)
return
}
job := &TimerJob{
ID: GenerateTimerID(),
Project: project,
SessionKey: sessionKey,
ScheduledAt: fireAt,
Prompt: req.Prompt,
Exec: req.Exec,
WorkDir: req.WorkDir,
Description: req.Description,
Silent: req.Silent,
Mute: req.Mute,
SessionMode: req.SessionMode,
Mode: req.Mode,
TimeoutMins: req.TimeoutMins,
CreatedAt: time.Now(),
}
if err := s.timer.AddJob(job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
apiJSON(w, http.StatusOK, job)
}
func (s *APIServer) handleTimerList(w http.ResponseWriter, r *http.Request) {
if s.timer == nil {
http.Error(w, "timer scheduler not available", http.StatusServiceUnavailable)
return
}
project := r.URL.Query().Get("project")
var jobs []*TimerJob
if project != "" {
jobs = s.timer.Store().ListByProject(project)
} else {
jobs = s.timer.Store().List()
}
// Filter to pending only
var pending []*TimerJob
for _, j := range jobs {
if !j.Fired {
pending = append(pending, j)
}
}
apiJSON(w, http.StatusOK, pending)
}
func (s *APIServer) handleTimerInfo(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "GET only", http.StatusMethodNotAllowed)
return
}
if s.timer == nil {
http.Error(w, "timer scheduler not available", http.StatusServiceUnavailable)
return
}
id := r.URL.Query().Get("id")
if id == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
job := s.timer.Store().Get(id)
if job == nil {
http.Error(w, "timer not found", http.StatusNotFound)
return
}
apiJSON(w, http.StatusOK, job)
}
func (s *APIServer) handleTimerDel(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", http.StatusMethodNotAllowed)
return
}
if s.timer == nil {
http.Error(w, "timer scheduler not available", http.StatusServiceUnavailable)
return
}
var req struct {
ID string `json:"id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON: "+err.Error(), http.StatusBadRequest)
return
}
if req.ID == "" {
http.Error(w, "id is required", http.StatusBadRequest)
return
}
if !s.timer.RemoveJob(req.ID) {
http.Error(w, "timer not found", http.StatusNotFound)
return
}
apiJSON(w, http.StatusOK, map[string]string{"status": "ok", "id": req.ID})
}
// ── Relay API ──────────────────────────────────────────────────
func (s *APIServer) handleRelaySend(w http.ResponseWriter, r *http.Request) {

View File

@@ -196,6 +196,7 @@ type Engine struct {
hooks *HookManager
cronScheduler *CronScheduler
timerScheduler *TimerScheduler
heartbeatScheduler *HeartbeatScheduler
commands *CommandRegistry
@@ -839,6 +840,10 @@ func (e *Engine) SetCronScheduler(cs *CronScheduler) {
e.cronScheduler = cs
}
func (e *Engine) SetTimerScheduler(ts *TimerScheduler) {
e.timerScheduler = ts
}
func (e *Engine) SetHeartbeatScheduler(hs *HeartbeatScheduler) {
e.heartbeatScheduler = hs
}
@@ -1368,6 +1373,365 @@ func (e *Engine) ExecuteCronJob(job *CronJob) error {
return nil
}
// ExecuteTimerJob fires a one-shot timer job: resolves the platform, sends a
// notification (unless muted), and either runs a shell command or injects a
// synthetic message into the agent session.
func (e *Engine) ExecuteTimerJob(job *TimerJob) error {
e.hooks.Emit(HookEvent{
Event: HookEventTimerTriggered,
SessionKey: job.SessionKey,
Content: job.Prompt,
Extra: map[string]any{"job_id": job.ID, "job_description": job.Description},
})
sessionKey := job.SessionKey
platformName := ""
if idx := strings.Index(sessionKey, ":"); idx > 0 {
platformName = sessionKey[:idx]
}
var targetPlatform Platform
for _, p := range e.platforms {
if p.Name() == platformName {
targetPlatform = p
break
}
}
// Multi-workspace fallback: strip workspace prefix from session key.
if targetPlatform == nil {
for _, p := range e.platforms {
needle := ":" + p.Name() + ":"
if idx := strings.Index(sessionKey, needle); idx >= 0 {
targetPlatform = p
platformName = p.Name()
sessionKey = sessionKey[idx+1:]
break
}
}
}
if targetPlatform == nil {
return fmt.Errorf("platform %q not found for session %q", platformName, sessionKey)
}
rc, ok := targetPlatform.(ReplyContextReconstructor)
if !ok {
return fmt.Errorf("platform %q does not support proactive messaging (timer)", platformName)
}
runSessionKey := sessionKey
var replyCtx any
var err error
if !job.Mute {
if resolver, ok := targetPlatform.(CronReplyTargetResolver); ok {
resolvedSessionKey, resolvedReplyCtx, err := resolver.ResolveCronReplyTarget(sessionKey, timerRunTitle(job))
if err != nil {
if !errors.Is(err, ErrNotSupported) {
return fmt.Errorf("resolve timer reply target: %w", err)
}
} else {
if resolvedSessionKey != "" {
runSessionKey = resolvedSessionKey
}
if resolvedReplyCtx != nil {
replyCtx = resolvedReplyCtx
}
}
}
}
if replyCtx == nil {
replyCtx, err = rc.ReconstructReplyCtx(runSessionKey)
if err != nil {
return fmt.Errorf("reconstruct reply context: %w", err)
}
}
effectivePlatform := targetPlatform
if job.Mute {
effectivePlatform = &mutePlatform{targetPlatform}
}
// Notify user unless muted or silent
if !job.Mute {
silent := false
if e.timerScheduler != nil {
silent = e.timerScheduler.IsSilent(job)
}
if !silent {
desc := job.Description
if desc == "" {
if job.IsShellJob() {
desc = truncateStr(job.Exec, 40)
} else {
desc = truncateStr(job.Prompt, 40)
}
}
e.send(targetPlatform, replyCtx, fmt.Sprintf("⏰ %s", desc))
}
}
if job.IsShellJob() {
return e.executeTimerShell(effectivePlatform, replyCtx, job)
}
content := job.Prompt
if strings.HasPrefix(content, "/") {
parts := strings.Fields(content)
if len(parts) > 0 {
cmd := strings.ToLower(strings.TrimPrefix(parts[0], "/"))
if skill := e.skills.Resolve(cmd); skill != nil {
content = BuildSkillInvocationPrompt(skill, parts[1:])
}
}
}
msg := &Message{
SessionKey: sessionKey,
Platform: platformName,
UserID: "timer",
UserName: "timer",
Content: content,
ReplyCtx: replyCtx,
ModeOverride: job.Mode,
}
agent := e.agent
sessions := e.sessions
workspaceDir := ""
if e.multiWorkspace {
channelID := extractChannelID(sessionKey)
if channelID != "" {
workspace, _, err := e.resolveWorkspace(targetPlatform, channelID)
if err == nil && workspace != "" {
wsAgent, wsSessions, _, effectiveDir, err := e.workspaceContext(workspace, sessionKey)
if err == nil {
agent = wsAgent
sessions = wsSessions
workspaceDir = effectiveDir
}
}
}
}
if job.WorkDir != "" {
wsAgent, wsSessions, err := e.getOrCreateWorkspaceAgent(job.WorkDir)
if err == nil {
agent = wsAgent
sessions = wsSessions
workspaceDir = job.WorkDir
} else {
slog.Warn("timer: workspace agent creation failed, using global",
"work_dir", job.WorkDir, "session_key", sessionKey, "error", err)
}
}
useNewSession := false
if e.timerScheduler != nil {
useNewSession = e.timerScheduler.UsesNewSession(job)
} else {
useNewSession = job.UsesNewSessionPerRun()
}
if useNewSession {
msg.SessionKey = runSessionKey
session := sessions.NewSideSession(runSessionKey, "timer-"+job.ID)
if !session.TryLock() {
return fmt.Errorf("session %q is busy", runSessionKey)
}
iKey := fmt.Sprintf("%s#timer:%s", runSessionKey, session.ID)
if workspaceDir != "" {
iKey = workspaceDir + ":" + iKey
}
e.processInteractiveMessageWith(effectivePlatform, msg, session, agent, sessions, iKey, workspaceDir, runSessionKey)
e.cleanupInteractiveState(iKey)
return nil
}
session := sessions.GetOrCreateActive(sessionKey)
if !session.TryLock() {
return fmt.Errorf("session %q is busy", sessionKey)
}
iKey := sessionKey
if workspaceDir != "" {
iKey = workspaceDir + ":" + sessionKey
}
e.processInteractiveMessageWith(effectivePlatform, msg, session, agent, sessions, iKey, workspaceDir, sessionKey)
return nil
}
func timerRunTitle(job *TimerJob) string {
if job == nil {
return "timer"
}
if job.Description != "" {
return job.Description
}
if job.IsShellJob() {
return truncateStr(job.Exec, 40)
}
return truncateStr(job.Prompt, 40)
}
// executeTimerShell runs a shell command for a timer job and sends the output.
func (e *Engine) executeTimerShell(p Platform, replyCtx any, job *TimerJob) error {
workDir := job.WorkDir
if workDir == "" {
if wd, ok := e.agent.(interface{ GetWorkDir() string }); ok {
workDir = wd.GetWorkDir()
}
}
if workDir == "" {
workDir, _ = os.Getwd()
}
timeout := job.ExecutionTimeout()
if timeout <= 0 {
timeout = 60 * time.Second
}
cmdLabel := truncateStr(job.Exec, 60)
ctx, cancel := context.WithTimeout(e.ctx, timeout)
defer cancel()
var shellCmd *exec.Cmd
if runtime.GOOS == "windows" {
shellCmd = exec.CommandContext(ctx, "powershell.exe", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", job.Exec)
} else {
shellCmd = exec.CommandContext(ctx, "sh", "-c", job.Exec)
}
shellCmd.Dir = workDir
stdout, err := shellCmd.StdoutPipe()
if err != nil {
return fmt.Errorf("shell: stdout pipe: %w", err)
}
stderr, err := shellCmd.StderrPipe()
if err != nil {
return fmt.Errorf("shell: stderr pipe: %w", err)
}
if err := shellCmd.Start(); err != nil {
e.send(p, replyCtx, fmt.Sprintf("⏰ ❌ `%s`\nerror: failed to start: %v", cmdLabel, err))
return fmt.Errorf("shell: start: %w", err)
}
var mu sync.Mutex
var buf bytes.Buffer
doneCh := make(chan struct{})
readPipe := func(r io.Reader) {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, 64*1024), 64*1024)
for scanner.Scan() {
mu.Lock()
if buf.Len() > 0 {
buf.WriteByte('\n')
}
buf.WriteString(scanner.Text())
mu.Unlock()
}
}
var pipeWg sync.WaitGroup
pipeWg.Add(2)
go func() { defer pipeWg.Done(); readPipe(stdout) }()
go func() { defer pipeWg.Done(); readPipe(stderr) }()
go func() {
pipeWg.Wait()
_ = shellCmd.Wait()
close(doneCh)
}()
select {
case <-doneCh:
return e.finishCronShell(p, replyCtx, shellCmd, &mu, &buf, cmdLabel)
case <-ctx.Done():
killAndWait(shellCmd, doneCh)
mu.Lock()
output := buf.String()
mu.Unlock()
msg := fmt.Sprintf("⏰ ⚠️ timeout: `%s`", cmdLabel)
if output != "" {
msg = fmt.Sprintf("⏰ ⚠️ timeout: `%s`\n\n%s", cmdLabel, truncateStr(output, 3000))
}
e.send(p, replyCtx, msg)
return fmt.Errorf("shell command timed out")
case <-time.After(quickFinishTimeout):
}
// Long-running command — try in-place updates
var previewHandle any
var useUpdate bool
if _, ok := p.(MessageUpdater); ok {
if starter, ok := p.(PreviewStarter); ok {
mu.Lock()
output := buf.String()
mu.Unlock()
progressMsg := fmt.Sprintf("⏰ ⏳ `%s`", cmdLabel)
if output != "" {
progressMsg = fmt.Sprintf("⏰ ⏳ `%s`\n\n%s", cmdLabel, truncateStr(output, 3000))
}
handle, err := starter.SendPreviewStart(e.ctx, replyCtx, progressMsg)
if err == nil && handle != nil {
previewHandle = handle
useUpdate = true
}
}
}
if !useUpdate {
e.send(p, replyCtx, fmt.Sprintf("⏰ ⏳ `%s`", cmdLabel))
}
updateDone := make(chan struct{})
if useUpdate {
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
mu.Lock()
output := buf.String()
mu.Unlock()
msg := fmt.Sprintf("⏰ ⏳ `%s`", cmdLabel)
if output != "" {
msg = fmt.Sprintf("⏰ ⏳ `%s`\n\n%s", cmdLabel, truncateStr(output, 3000))
}
_ = updaterFor(p).UpdateMessage(e.ctx, previewHandle, msg)
case <-updateDone:
return
case <-ctx.Done():
return
}
}
}()
}
select {
case <-doneCh:
close(updateDone)
return e.finishCronShell(p, replyCtx, shellCmd, &mu, &buf, cmdLabel, useUpdate, previewHandle)
case <-ctx.Done():
close(updateDone)
killAndWait(shellCmd, doneCh)
mu.Lock()
output := buf.String()
mu.Unlock()
msg := fmt.Sprintf("⏰ ⚠️ timeout: `%s`", cmdLabel)
if output != "" {
msg = fmt.Sprintf("⏰ ⚠️ timeout: `%s`\n\n%s", cmdLabel, truncateStr(output, 3000))
}
if useUpdate {
_ = updaterFor(p).UpdateMessage(e.ctx, previewHandle, msg)
} else {
e.send(p, replyCtx, msg)
}
return fmt.Errorf("shell command timed out")
}
}
func cronRunTitle(job *CronJob) string {
if job == nil {
return "cron"
@@ -5188,6 +5552,7 @@ var builtinCommands = []struct {
{[]string{"provider"}, "provider"},
{[]string{"memory"}, "memory"},
{[]string{"cron"}, "cron"},
{[]string{"timer", "at", "remind"}, "timer"},
{[]string{"heartbeat", "hb"}, "heartbeat"},
{[]string{"compress", "compact"}, "compress"},
{[]string{"stop"}, "stop"},
@@ -5403,6 +5768,8 @@ func (e *Engine) handleCommand(p Platform, msg *Message, raw string) bool {
e.cmdMemory(p, msg, args)
case "cron":
e.cmdCron(p, msg, args)
case "timer":
e.cmdTimer(p, msg, args)
case "heartbeat":
e.cmdHeartbeat(p, msg, args)
case "compress":
@@ -8163,6 +8530,7 @@ func helpCardGroups() []helpCardGroup {
{command: "/shell", action: "cmd:/shell"},
{command: "/show", action: "cmd:/show"},
{command: "/cron", action: "nav:/cron"},
{command: "/timer", action: "nav:/timer"},
{command: "/heartbeat", action: "nav:/heartbeat"},
{command: "/commands", action: "nav:/commands"},
{command: "/alias", action: "nav:/alias"},
@@ -10373,6 +10741,8 @@ func (e *Engine) handleCardNav(action string, sessionKey string) *Card {
return e.renderProviderAddCard(sessionKey)
case "/cron":
return e.renderCronCard(sessionKey, extractUserID(sessionKey))
case "/timer":
return e.renderTimerCard(sessionKey, extractUserID(sessionKey))
case "/heartbeat":
return e.renderHeartbeatCard()
case "/commands":
@@ -10707,6 +11077,24 @@ func (e *Engine) executeCardAction(cmd, args, sessionKey string) {
case "unmute":
e.cronScheduler.Store().SetMute(id, false)
}
case "/timer":
if e.timerScheduler == nil || args == "" {
return
}
subArgs := strings.Fields(args)
if len(subArgs) < 2 {
return
}
sub, id := subArgs[0], subArgs[1]
switch sub {
case "delete":
e.timerScheduler.RemoveJob(id)
case "mute":
e.timerScheduler.SetMute(id, true)
case "unmute":
e.timerScheduler.SetMute(id, false)
}
}
}
@@ -11875,6 +12263,66 @@ func (e *Engine) renderCronCard(sessionKey string, userID string) *Card {
return cb.Build()
}
func (e *Engine) renderTimerCard(sessionKey string, userID string) *Card {
if e.timerScheduler == nil {
return e.simpleCard(e.i18n.T(MsgCardTitleTimer), "blue", e.i18n.T(MsgTimerNotAvailable))
}
jobs := e.timerScheduler.Store().ListPending()
// Filter to current session
var filtered []*TimerJob
for _, j := range jobs {
if j.SessionKey == sessionKey {
filtered = append(filtered, j)
}
}
if len(filtered) == 0 {
return e.simpleCard(e.i18n.T(MsgCardTitleTimer), "blue", e.i18n.T(MsgTimerEmpty))
}
cb := NewCard().Title(e.i18n.T(MsgCardTitleTimer), "blue")
cb.Markdown(fmt.Sprintf(e.i18n.T(MsgTimerListTitle), len(filtered)))
for _, j := range filtered {
desc := j.Description
if desc == "" {
if j.IsShellJob() {
desc = "🖥 " + truncateStr(j.Exec, 60)
} else {
desc = truncateStr(j.Prompt, 60)
}
}
if j.Mute {
desc += " [mute]"
}
remaining := FormatTimerRemaining(j.ScheduledAt)
var sb strings.Builder
fmt.Fprintf(&sb, "⏰ %s\n", desc)
sb.WriteString(e.i18n.Tf(MsgTimerIDLabel, j.ID))
sb.WriteString(e.i18n.Tf(MsgTimerScheduledLabel, j.ScheduledAt.Format("2006-01-02 15:04"), remaining))
if j.LastError != "" {
sb.WriteString(e.i18n.Tf(MsgTimerFailedSuffix, truncateStr(j.LastError, 40)))
}
cb.Markdown(sb.String())
var btns []CardButton
if j.Mute {
btns = append(btns, DefaultBtn(e.i18n.T(MsgTimerBtnUnmute), fmt.Sprintf("act:/timer unmute %s", j.ID)))
} else {
btns = append(btns, DefaultBtn(e.i18n.T(MsgTimerBtnMute), fmt.Sprintf("act:/timer mute %s", j.ID)))
}
btns = append(btns, DangerBtn(e.i18n.T(MsgTimerBtnDelete), fmt.Sprintf("act:/timer delete %s", j.ID)))
cb.ButtonsEqual(btns...)
}
cb.Divider()
cb.Note(e.i18n.T(MsgTimerCardHint))
cb.Buttons(e.cardBackButton())
return cb.Build()
}
func (e *Engine) renderCommandsCard() *Card {
cmds := e.commands.ListAll()
if len(cmds) == 0 {
@@ -12413,6 +12861,189 @@ func (e *Engine) cmdCronSetup(p Platform, msg *Message) {
}
}
// ──────────────────────────────────────────────────────────────
// /timer command
// ──────────────────────────────────────────────────────────────
func (e *Engine) cmdTimer(p Platform, msg *Message, args []string) {
if e.timerScheduler == nil {
e.reply(p, msg.ReplyCtx, e.i18n.T(MsgTimerNotAvailable))
return
}
if len(args) == 0 {
if !supportsCards(p) {
e.cmdTimerList(p, msg)
return
}
e.replyWithCard(p, msg.ReplyCtx, e.renderTimerCard(msg.SessionKey, msg.UserID))
return
}
sub := matchSubCommand(strings.ToLower(args[0]), []string{
"add", "addexec", "list", "del", "delete", "rm", "remove", "mute", "unmute",
})
switch sub {
case "add":
e.cmdTimerAdd(p, msg, args[1:])
case "addexec":
e.cmdTimerAddExec(p, msg, args[1:])
case "list":
e.cmdTimerList(p, msg)
case "del", "delete", "rm", "remove":
e.cmdTimerDel(p, msg, args[1:])
case "mute":
e.cmdTimerMute(p, msg, args[1:], true)
case "unmute":
e.cmdTimerMute(p, msg, args[1:], false)
default:
e.reply(p, msg.ReplyCtx, e.i18n.T(MsgTimerUsage))
}
}
func (e *Engine) cmdTimerAdd(p Platform, msg *Message, args []string) {
// /timer add <delay_or_time> <prompt...>
if len(args) < 2 {
e.reply(p, msg.ReplyCtx, e.i18n.T(MsgTimerAddUsage))
return
}
fireAt, err := ParseDelayOrTime(args[0])
if err != nil {
e.reply(p, msg.ReplyCtx, e.i18n.Tf(MsgError, err))
return
}
prompt := strings.Join(args[1:], " ")
job := &TimerJob{
ID: GenerateTimerID(),
Project: e.name,
SessionKey: msg.SessionKey,
ScheduledAt: fireAt,
Prompt: prompt,
CreatedAt: time.Now(),
}
if err := e.timerScheduler.AddJob(job); err != nil {
e.reply(p, msg.ReplyCtx, e.i18n.Tf(MsgError, err))
return
}
remaining := FormatTimerRemaining(fireAt)
e.reply(p, msg.ReplyCtx, fmt.Sprintf(e.i18n.T(MsgTimerAdded), job.ID, remaining, truncateStr(prompt, 60)))
}
func (e *Engine) cmdTimerAddExec(p Platform, msg *Message, args []string) {
if !e.isAdmin(msg.UserID) {
e.reply(p, msg.ReplyCtx, fmt.Sprintf(e.i18n.T(MsgAdminRequired), "/timer addexec"))
return
}
// /timer addexec <delay_or_time> <shell command...>
if len(args) < 2 {
e.reply(p, msg.ReplyCtx, e.i18n.T(MsgTimerAddExecUsage))
return
}
fireAt, err := ParseDelayOrTime(args[0])
if err != nil {
e.reply(p, msg.ReplyCtx, e.i18n.Tf(MsgError, err))
return
}
shellCmd := strings.Join(args[1:], " ")
job := &TimerJob{
ID: GenerateTimerID(),
Project: e.name,
SessionKey: msg.SessionKey,
ScheduledAt: fireAt,
Exec: shellCmd,
CreatedAt: time.Now(),
}
if err := e.timerScheduler.AddJob(job); err != nil {
e.reply(p, msg.ReplyCtx, e.i18n.Tf(MsgError, err))
return
}
remaining := FormatTimerRemaining(fireAt)
e.reply(p, msg.ReplyCtx, fmt.Sprintf(e.i18n.T(MsgTimerAddedExec), job.ID, remaining, truncateStr(shellCmd, 60)))
}
func (e *Engine) cmdTimerList(p Platform, msg *Message) {
jobs := e.timerScheduler.Store().ListPending()
// Filter to current session
var filtered []*TimerJob
for _, j := range jobs {
if j.SessionKey == msg.SessionKey {
filtered = append(filtered, j)
}
}
if len(filtered) == 0 {
e.reply(p, msg.ReplyCtx, e.i18n.T(MsgTimerEmpty))
return
}
var sb strings.Builder
fmt.Fprintf(&sb, e.i18n.T(MsgTimerListTitle), len(filtered))
sb.WriteString("\n")
for i, j := range filtered {
if i > 0 {
sb.WriteString("\n")
}
desc := j.Description
if desc == "" {
if j.IsShellJob() {
desc = "🖥 " + truncateStr(j.Exec, 60)
} else {
desc = truncateStr(j.Prompt, 60)
}
}
if j.Mute {
desc += " [mute]"
}
remaining := FormatTimerRemaining(j.ScheduledAt)
fmt.Fprintf(&sb, "⏰ %s (%s)\n", desc, remaining)
fmt.Fprintf(&sb, "ID: %s\n", j.ID)
}
fmt.Fprintf(&sb, "\n%s", e.i18n.T(MsgTimerListFooter))
e.reply(p, msg.ReplyCtx, sb.String())
}
func (e *Engine) cmdTimerDel(p Platform, msg *Message, args []string) {
if len(args) == 0 {
e.reply(p, msg.ReplyCtx, e.i18n.T(MsgTimerDelUsage))
return
}
id := args[0]
if !e.timerScheduler.RemoveJob(id) {
e.reply(p, msg.ReplyCtx, e.i18n.T(MsgTimerNotFound))
return
}
e.reply(p, msg.ReplyCtx, fmt.Sprintf(e.i18n.T(MsgTimerDeleted), id))
}
func (e *Engine) cmdTimerMute(p Platform, msg *Message, args []string, mute bool) {
if len(args) == 0 {
e.reply(p, msg.ReplyCtx, e.i18n.T(MsgTimerMuteUsage))
return
}
id := args[0]
if !e.timerScheduler.SetMute(id, mute) {
e.reply(p, msg.ReplyCtx, e.i18n.T(MsgTimerNotFound))
return
}
if mute {
e.reply(p, msg.ReplyCtx, fmt.Sprintf(e.i18n.T(MsgTimerMuted), id))
} else {
e.reply(p, msg.ReplyCtx, fmt.Sprintf(e.i18n.T(MsgTimerUnmuted), id))
}
}
// ──────────────────────────────────────────────────────────────
// Heartbeat management commands
// ──────────────────────────────────────────────────────────────

View File

@@ -22,6 +22,7 @@ const (
HookEventSessionStarted HookEventType = "session.started"
HookEventSessionEnded HookEventType = "session.ended"
HookEventCronTriggered HookEventType = "cron.triggered"
HookEventTimerTriggered HookEventType = "timer.triggered"
HookEventPermissionRequested HookEventType = "permission.requested"
HookEventError HookEventType = "error"
)

View File

@@ -364,6 +364,7 @@ const (
MsgCardTitleHistoryLast MsgKey = "card_title_history_last"
MsgCardTitleProvider MsgKey = "card_title_provider"
MsgCardTitleCron MsgKey = "card_title_cron"
MsgCardTitleTimer MsgKey = "card_title_timer"
MsgCardTitleHeartbeat MsgKey = "card_title_heartbeat"
MsgCardTitleCommands MsgKey = "card_title_commands"
MsgCardTitleAlias MsgKey = "card_title_alias"
@@ -376,6 +377,29 @@ const (
MsgListEmptySummary MsgKey = "list_empty_summary"
MsgCronIDLabel MsgKey = "cron_id_label"
MsgCronFailedSuffix MsgKey = "cron_failed_suffix"
MsgTimerNotAvailable MsgKey = "timer_not_available"
MsgTimerUsage MsgKey = "timer_usage"
MsgTimerAddUsage MsgKey = "timer_add_usage"
MsgTimerAdded MsgKey = "timer_added"
MsgTimerAddedExec MsgKey = "timer_added_exec"
MsgTimerAddExecUsage MsgKey = "timer_addexec_usage"
MsgTimerEmpty MsgKey = "timer_empty"
MsgTimerListTitle MsgKey = "timer_list_title"
MsgTimerListFooter MsgKey = "timer_list_footer"
MsgTimerDelUsage MsgKey = "timer_del_usage"
MsgTimerMuteUsage MsgKey = "timer_mute_usage"
MsgTimerDeleted MsgKey = "timer_deleted"
MsgTimerNotFound MsgKey = "timer_not_found"
MsgTimerMuted MsgKey = "timer_muted"
MsgTimerUnmuted MsgKey = "timer_unmuted"
MsgTimerCardHint MsgKey = "timer_card_hint"
MsgTimerBtnMute MsgKey = "timer_btn_mute"
MsgTimerBtnUnmute MsgKey = "timer_btn_unmute"
MsgTimerBtnDelete MsgKey = "timer_btn_delete"
MsgTimerIDLabel MsgKey = "timer_id_label"
MsgTimerScheduledLabel MsgKey = "timer_scheduled_label"
MsgTimerFailedSuffix MsgKey = "timer_failed_suffix"
MsgCommandsTagAgent MsgKey = "commands_tag_agent"
MsgCommandsTagShell MsgKey = "commands_tag_shell"
MsgUpgradeTimeoutSuffix MsgKey = "upgrade_timeout_suffix"
@@ -975,6 +999,7 @@ var messages = map[MsgKey]map[Language]string{
"/dir [path|reset]\n Show, switch, or reset agent working directory\n\n" +
"/stop\n Stop current execution\n\n" +
"/cron [add|list|exec|del|enable|disable]\n Manage scheduled tasks\n\n" +
"/timer [add|list|del|mute|unmute]\n Manage one-shot timers\n\n" +
"/heartbeat [status|pause|resume|run|interval]\n Manage heartbeat\n\n" +
"/commands [add|del]\n Manage custom slash commands\n\n" +
"/alias [add|del]\n Manage command aliases (e.g. 帮助 → /help)\n\n" +
@@ -1018,6 +1043,7 @@ var messages = map[MsgKey]map[Language]string{
"/dir [路径|reset]\n 查看、切换或重置 Agent 工作目录\n\n" +
"/stop\n 停止当前执行\n\n" +
"/cron [add|list|exec|del|enable|disable]\n 管理定时任务\n\n" +
"/timer [add|list|del|mute|unmute]\n 管理一次性定时器\n\n" +
"/heartbeat [status|pause|resume|run|interval]\n 管理心跳\n\n" +
"/commands [add|del]\n 管理自定义命令\n\n" +
"/alias [add|del]\n 管理命令别名(如 帮助 → /help\n\n" +
@@ -1060,6 +1086,7 @@ var messages = map[MsgKey]map[Language]string{
"/dir [路徑|reset]\n 查看、切換或重置 Agent 工作目錄\n\n" +
"/stop\n 停止當前執行\n\n" +
"/cron [add|list|exec|del|enable|disable]\n 管理定時任務\n\n" +
"/timer [add|list|del|mute|unmute]\n 管理一次性定時器\n\n" +
"/heartbeat [status|pause|resume|run|interval]\n 管理心跳\n\n" +
"/commands [add|del]\n 管理自訂命令\n\n" +
"/alias [add|del]\n 管理命令別名(如 幫助 → /help\n\n" +
@@ -1101,6 +1128,7 @@ var messages = map[MsgKey]map[Language]string{
"/dir [パス|reset]\n エージェントの作業ディレクトリを表示/切り替え/リセット\n\n" +
"/stop\n 現在の実行を停止\n\n" +
"/cron [add|list|exec|del|enable|disable]\n スケジュールタスク管理\n\n" +
"/timer [add|list|del|mute|unmute]\n ワンショットタイマー管理\n\n" +
"/heartbeat [status|pause|resume|run|interval]\n ハートビート管理\n\n" +
"/commands [add|del]\n カスタムコマンド管理\n\n" +
"/alias [add|del]\n コマンドエイリアス管理(例: ヘルプ → /help\n\n" +
@@ -1142,6 +1170,7 @@ var messages = map[MsgKey]map[Language]string{
"/dir [ruta|reset]\n Ver, cambiar o restablecer el directorio de trabajo del agente\n\n" +
"/stop\n Detener ejecución actual\n\n" +
"/cron [add|list|exec|del|enable|disable]\n Gestionar tareas programadas\n\n" +
"/timer [add|list|del|mute|unmute]\n Gestionar temporizadores de uso único\n\n" +
"/heartbeat [status|pause|resume|run|interval]\n Gestionar heartbeat\n\n" +
"/commands [add|del]\n Gestionar comandos personalizados\n\n" +
"/alias [add|del]\n Gestionar alias de comandos (ej. ayuda → /help)\n\n" +
@@ -1260,6 +1289,7 @@ var messages = map[MsgKey]map[Language]string{
"/show <ref> — View file / directory / snippet by reference\n" +
"/dir [path|reset] — Show, switch, or reset work directory\n" +
"/cron [add|list|exec|del|...] — Scheduled tasks\n" +
"/timer [add|list|del|...] — One-shot timers\n" +
"/commands [add|del] — Custom commands\n" +
"/alias [add|del] — Command aliases\n" +
"/skills — List agent skills\n" +
@@ -1270,6 +1300,7 @@ var messages = map[MsgKey]map[Language]string{
"/show <引用> — 按引用查看文件、目录或代码片段\n" +
"/dir [路径|reset] — 查看、切换或重置工作目录\n" +
"/cron [add|list|exec|del|...] — 定时任务\n" +
"/timer [add|list|del|...] — 一次性定时器\n" +
"/commands [add|del] — 自定义命令\n" +
"/alias [add|del] — 命令别名\n" +
"/skills — 列出 Agent Skills\n" +
@@ -1280,6 +1311,7 @@ var messages = map[MsgKey]map[Language]string{
"/show <引用> — 按引用查看檔案、目錄或程式碼片段\n" +
"/dir [路徑|reset] — 查看、切換或重置工作目錄\n" +
"/cron [add|list|exec|del|...] — 定時任務\n" +
"/timer [add|list|del|...] — 一次性定時器\n" +
"/commands [add|del] — 自訂命令\n" +
"/alias [add|del] — 命令別名\n" +
"/skills — 列出 Agent Skills\n" +
@@ -1290,6 +1322,7 @@ var messages = map[MsgKey]map[Language]string{
"/show <参照> — ファイル/ディレクトリ/スニペットを参照で表示\n" +
"/dir [パス|reset] — 作業ディレクトリの表示/切り替え/リセット\n" +
"/cron [add|list|exec|del|...] — スケジュールタスク\n" +
"/timer [add|list|del|...] — ワンショットタイマー\n" +
"/commands [add|del] — カスタムコマンド\n" +
"/alias [add|del] — コマンドエイリアス\n" +
"/skills — エージェントスキル一覧\n" +
@@ -1300,6 +1333,7 @@ var messages = map[MsgKey]map[Language]string{
"/show <ref> — Ver archivo/directorio/fragmento por referencia\n" +
"/dir [ruta|reset] — Ver, cambiar o restablecer directorio de trabajo\n" +
"/cron [add|list|exec|del|...] — Tareas programadas\n" +
"/timer [add|list|del|...] — Temporizadores de uso único\n" +
"/commands [add|del] — Comandos personalizados\n" +
"/alias [add|del] — Alias de comandos\n" +
"/skills — Listar skills del agente\n" +
@@ -1952,6 +1986,165 @@ var messages = map[MsgKey]map[Language]string{
LangJapanese: "前回",
LangSpanish: "Últ",
},
// ── Timer (one-shot) ──────────────────────────────────────
MsgCardTitleTimer: {
LangEnglish: "One-Shot Timer",
LangChinese: "一次性定时器",
LangTraditionalChinese: "一次性定時器",
LangJapanese: "ワンショットタイマー",
LangSpanish: "Temporizador único",
},
MsgTimerNotAvailable: {
LangEnglish: "Timer scheduler is not available.",
LangChinese: "定时器调度器未启用。",
LangTraditionalChinese: "定時器調度器未啟用。",
LangJapanese: "タイマースケジューラは利用できません。",
LangSpanish: "El programador de temporizador no está disponible.",
},
MsgTimerUsage: {
LangEnglish: "Usage:\n/timer add <delay|time> <prompt>\n/timer addexec <delay|time> <command>\n/timer list\n/timer del <id>\n/timer mute <id> · /timer unmute <id>\n\nDelay: 30m, 2h, 1h30m. Or absolute time: 2026-05-16T09:00\nTime without timezone uses system local time.",
LangChinese: "用法:\n/timer add <延迟|时间> <任务描述>\n/timer addexec <延迟|时间> <命令>\n/timer list\n/timer del <id>\n/timer mute <id> · /timer unmute <id>\n\n延迟30m、2h、1h30m。或绝对时间2026-05-16T09:00\n不带时区的时间按系统本地时区解析。",
LangTraditionalChinese: "用法:\n/timer add <延遲|時間> <任務描述>\n/timer addexec <延遲|時間> <命令>\n/timer list\n/timer del <id>\n/timer mute <id> · /timer unmute <id>\n\n延遲30m、2h、1h30m。或絕對時間2026-05-16T09:00\n不帶時區的時間按系統本地時區解析。",
LangJapanese: "使い方:\n/timer add <遅延|時刻> <タスク内容>\n/timer addexec <遅延|時刻> <コマンド>\n/timer list\n/timer del <id>\n/timer mute <id> · /timer unmute <id>\n\n遅延: 30m, 2h, 1h30m。または絶対時刻: 2026-05-16T09:00\nタイムゾーンなしの時刻はシステムのローカルタイムゾーンで解釈されます。",
LangSpanish: "Uso:\n/timer add <retraso|hora> <tarea>\n/timer addexec <retraso|hora> <comando>\n/timer list\n/timer del <id>\n/timer mute <id> · /timer unmute <id>\n\nRetraso: 30m, 2h, 1h30m. O hora absoluta: 2026-05-16T09:00\nHora sin zona horaria usa la hora local del sistema.",
},
MsgTimerAddUsage: {
LangEnglish: "Usage: /timer add <delay|time> <prompt>\nExamples:\n /timer add 2h Check PR status\n /timer add 2026-05-16T09:00 Morning standup reminder\nDelay: 30m, 2h, 1h30m. Time: ISO format (2026-05-16T09:00)\nTime without timezone uses system local time.",
LangChinese: "用法:/timer add <延迟|时间> <任务描述>\n示例\n /timer add 2h 检查PR状态\n /timer add 2026-05-16T09:00 早会提醒\n延迟30m、2h、1h30m。时间ISO格式2026-05-16T09:00\n不带时区的时间按系统本地时区解析。",
LangTraditionalChinese: "用法:/timer add <延遲|時間> <任務描述>\n範例\n /timer add 2h 檢查PR狀態\n /timer add 2026-05-16T09:00 早會提醒\n延遲30m、2h、1h30m。時間ISO格式2026-05-16T09:00\n不帶時區的時間按系統本地時區解析。",
LangJapanese: "使い方: /timer add <遅延|時刻> <タスク内容>\n例:\n /timer add 2h PRの状態を確認\n /timer add 2026-05-16T09:00 朝会リマインダー\n遅延: 30m, 2h, 1h30m。時刻: ISO形式2026-05-16T09:00\nタイムゾーンなしの時刻はシステムのローカルタイムゾーンで解釈されます。",
LangSpanish: "Uso: /timer add <retraso|hora> <tarea>\nEjemplos:\n /timer add 2h Verificar estado del PR\n /timer add 2026-05-16T09:00 Recordatorio de reunión\nRetraso: 30m, 2h, 1h30m. Hora: formato ISO (2026-05-16T09:00)\nHora sin zona horaria usa la hora local del sistema.",
},
MsgTimerAdded: {
LangEnglish: "✅ Timer created\nID: `%s`\nFires in: %s\nPrompt: %s",
LangChinese: "✅ 定时器已创建\nID: `%s`\n将在 %s 后触发\n内容: %s",
LangTraditionalChinese: "✅ 定時器已建立\nID: `%s`\n將在 %s 後觸發\n內容: %s",
LangJapanese: "✅ タイマーを作成しました\nID: `%s`\n%s 後に実行\n内容: %s",
LangSpanish: "✅ Temporizador creado\nID: `%s`\nSe ejecuta en: %s\nContenido: %s",
},
MsgTimerAddedExec: {
LangEnglish: "✅ Shell timer created\nID: `%s`\nFires in: %s\nCommand: `%s`",
LangChinese: "✅ Shell 定时器已创建\nID: `%s`\n将在 %s 后触发\n命令: `%s`",
LangTraditionalChinese: "✅ Shell 定時器已建立\nID: `%s`\n將在 %s 後觸發\n命令: `%s`",
LangJapanese: "✅ Shell タイマーを作成しました\nID: `%s`\n%s 後に実行\nコマンド: `%s`",
LangSpanish: "✅ Temporizador shell creado\nID: `%s`\nSe ejecuta en: %s\nComando: `%s`",
},
MsgTimerAddExecUsage: {
LangEnglish: "Usage: /timer addexec <delay> <shell command>\nExample: /timer addexec 30m df -h",
LangChinese: "用法:/timer addexec <延迟> <shell 命令>\n示例/timer addexec 30m df -h",
LangTraditionalChinese: "用法:/timer addexec <延遲> <shell 命令>\n範例/timer addexec 30m df -h",
LangJapanese: "使い方: /timer addexec <遅延> <シェルコマンド>\n例: /timer addexec 30m df -h",
LangSpanish: "Uso: /timer addexec <retraso> <comando shell>\nEjemplo: /timer addexec 30m df -h",
},
MsgTimerEmpty: {
LangEnglish: "No pending timers.",
LangChinese: "暂无待执行的定时器。",
LangTraditionalChinese: "暫無待執行的定時器。",
LangJapanese: "保留中のタイマーはありません。",
LangSpanish: "No hay temporizadores pendientes.",
},
MsgTimerListTitle: {
LangEnglish: "⏰ Pending Timers (%d)",
LangChinese: "⏰ 待执行定时器 (%d)",
LangTraditionalChinese: "⏰ 待執行定時器 (%d)",
LangJapanese: "⏰ 保留中のタイマー (%d)",
LangSpanish: "⏰ Temporizadores pendientes (%d)",
},
MsgTimerListFooter: {
LangEnglish: "`/timer del <id>` remove · `/timer mute/unmute <id>` mute",
LangChinese: "`/timer del <id>` 删除 · `/timer mute/unmute <id>` 静音",
LangTraditionalChinese: "`/timer del <id>` 刪除 · `/timer mute/unmute <id>` 靜音",
LangJapanese: "`/timer del <id>` 削除 · `/timer mute/unmute <id>` ミュート",
LangSpanish: "`/timer del <id>` eliminar · `/timer mute/unmute <id>` silenciar",
},
MsgTimerDelUsage: {
LangEnglish: "Usage: /timer del <id>",
LangChinese: "用法:/timer del <id>",
LangTraditionalChinese: "用法:/timer del <id>",
LangJapanese: "使い方: /timer del <id>",
LangSpanish: "Uso: /timer del <id>",
},
MsgTimerMuteUsage: {
LangEnglish: "Usage: /timer mute <id> · /timer unmute <id>",
LangChinese: "用法:/timer mute <id> · /timer unmute <id>",
LangTraditionalChinese: "用法:/timer mute <id> · /timer unmute <id>",
LangJapanese: "使い方: /timer mute <id> · /timer unmute <id>",
LangSpanish: "Uso: /timer mute <id> · /timer unmute <id>",
},
MsgTimerDeleted: {
LangEnglish: "✅ Timer `%s` cancelled.",
LangChinese: "✅ 定时器 `%s` 已取消。",
LangTraditionalChinese: "✅ 定時器 `%s` 已取消。",
LangJapanese: "✅ タイマー `%s` をキャンセルしました。",
LangSpanish: "✅ Temporizador `%s` cancelado.",
},
MsgTimerNotFound: {
LangEnglish: "❌ Timer `%s` not found.",
LangChinese: "❌ 定时器 `%s` 未找到。",
LangTraditionalChinese: "❌ 定時器 `%s` 未找到。",
LangJapanese: "❌ タイマー `%s` が見つかりません。",
LangSpanish: "❌ Temporizador `%s` no encontrado.",
},
MsgTimerMuted: {
LangEnglish: "🔇 Timer `%s` muted.",
LangChinese: "🔇 定时器 `%s` 已静音。",
LangTraditionalChinese: "🔇 定時器 `%s` 已靜音。",
LangJapanese: "🔇 タイマー `%s` をミュートしました。",
LangSpanish: "🔇 Temporizador `%s` silenciado.",
},
MsgTimerUnmuted: {
LangEnglish: "🔔 Timer `%s` unmuted.",
LangChinese: "🔔 定时器 `%s` 已取消静音。",
LangTraditionalChinese: "🔔 定時器 `%s` 已取消靜音。",
LangJapanese: "🔔 タイマー `%s` のミュートを解除しました。",
LangSpanish: "🔔 Temporizador `%s` reactivado.",
},
MsgTimerCardHint: {
LangEnglish: "💡 `/timer add <delay> <prompt>` · `/timer del <id>` · `/timer mute/unmute <id>`",
LangChinese: "💡 `/timer add <延迟> <内容>` 添加 · `/timer del <id>` 删除 · `/timer mute/unmute <id>` 静音",
LangTraditionalChinese: "💡 `/timer add <延遲> <內容>` 新增 · `/timer del <id>` 刪除 · `/timer mute/unmute <id>` 靜音",
LangJapanese: "💡 `/timer add <遅延> <内容>` 追加 · `/timer del <id>` 削除 · `/timer mute/unmute <id>` ミュート",
LangSpanish: "💡 `/timer add <retraso> <tarea>` · `/timer del <id>` · `/timer mute/unmute <id>`",
},
MsgTimerBtnMute: {
LangEnglish: "Mute",
LangChinese: "静音",
LangTraditionalChinese: "靜音",
LangJapanese: "ミュート",
LangSpanish: "Silenciar",
},
MsgTimerBtnUnmute: {
LangEnglish: "Unmute",
LangChinese: "取消静音",
LangTraditionalChinese: "取消靜音",
LangJapanese: "ミュート解除",
LangSpanish: "Reactivar",
},
MsgTimerBtnDelete: {
LangEnglish: "Cancel Timer",
LangChinese: "取消定时器",
LangTraditionalChinese: "取消定時器",
LangJapanese: "タイマーをキャンセル",
LangSpanish: "Cancelar temporizador",
},
MsgTimerIDLabel: {
LangEnglish: "ID: %s\n", LangChinese: "ID%s\n", LangTraditionalChinese: "ID%s\n",
LangJapanese: "ID: %s\n", LangSpanish: "ID: %s\n",
},
MsgTimerScheduledLabel: {
LangEnglish: "Scheduled: %s (%s remaining)\n",
LangChinese: "计划: %s剩余 %s\n",
LangTraditionalChinese: "計劃: %s剩餘 %s\n",
LangJapanese: "予定: %s残り %s\n",
LangSpanish: "Programado: %s (%s restante)\n",
},
MsgTimerFailedSuffix: {
LangEnglish: " (failed: %s)", LangChinese: "(失败:%s", LangTraditionalChinese: "(失敗:%s",
LangJapanese: "(失敗: %s", LangSpanish: " (falló: %s)",
},
MsgStatusTitle: {
LangEnglish: "cc-connect Status\n\n" +
"Project: %s\n" +

View File

@@ -129,6 +129,33 @@ Examples:
cc-connect cron edit abc123 enabled false
cc-connect cron edit abc123 prompt "Updated daily summary task"
### One-shot timers (timer)
When the user asks you to do something after a delay (e.g. "两小时后帮我检查PR"),
use the Bash tool to run:
cc-connect timer add --delay <duration> --prompt "<task description>"
Duration examples: 30m, 2h, 1h30m. Or use absolute time: --at "2026-05-16T09:00"
Absolute times without timezone (e.g. "2026-05-16T09:00") are interpreted as the
system's local timezone. When the user says "明天早上9点", use local time.
Environment variables CC_PROJECT and CC_SESSION_KEY are already set.
Optional flags:
--exec <command> run a shell command directly instead of --prompt
--desc <text> short description
--session-mode <mode> reuse (default) or new-per-run (fresh session each run)
--timeout-mins <n> max wait per run in minutes (default 30, 0 = unlimited)
--mute suppress all messages (start notification + result)
Examples:
cc-connect timer add --delay 2h --prompt "Check PR status" --desc "PR check"
cc-connect timer add --delay 30m --exec "df -h" --desc "Disk check"
cc-connect timer add --at "2026-05-16T09:00" --prompt "Morning standup reminder"
You can also list or cancel timers:
cc-connect timer list
cc-connect timer del <timer-id>
### Bot-to-bot relay
When you need to communicate with another bot (e.g. ask another AI agent a question), use:

View File

@@ -45,6 +45,7 @@ type ManagementServer struct {
engines map[string]*Engine // project name → engine
cronScheduler *CronScheduler
timerScheduler *TimerScheduler
heartbeatScheduler *HeartbeatScheduler
bridgeServer *BridgeServer
@@ -89,6 +90,7 @@ func (m *ManagementServer) RegisterEngine(name string, e *Engine) {
}
func (m *ManagementServer) SetCronScheduler(cs *CronScheduler) { m.cronScheduler = cs }
func (m *ManagementServer) SetTimerScheduler(ts *TimerScheduler) { m.timerScheduler = ts }
func (m *ManagementServer) SetHeartbeatScheduler(hs *HeartbeatScheduler) { m.heartbeatScheduler = hs }
func (m *ManagementServer) SetBridgeServer(bs *BridgeServer) { m.bridgeServer = bs }
func (m *ManagementServer) SetSetupFeishuSave(fn func(FeishuSetupSaveRequest) error) {

519
core/timer.go Normal file
View File

@@ -0,0 +1,519 @@
package core
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// TimerJob represents a persisted one-shot timer.
type TimerJob struct {
ID string `json:"id"`
Project string `json:"project"`
SessionKey string `json:"session_key"`
ScheduledAt time.Time `json:"scheduled_at"` // absolute fire time
Prompt string `json:"prompt"`
Exec string `json:"exec,omitempty"` // shell command; mutually exclusive with Prompt
WorkDir string `json:"work_dir,omitempty"` // working directory for exec; empty = agent work_dir
Description string `json:"description"`
Silent *bool `json:"silent,omitempty"` // suppress start notification; nil = use global default
Mute bool `json:"mute,omitempty"` // suppress ALL messages (start + result)
SessionMode string `json:"session_mode,omitempty"` // "" or "reuse" = share active session; "new_per_run" = fresh session each run
Mode string `json:"mode,omitempty"` // permission mode override; "" = use project default
TimeoutMins *int `json:"timeout_mins,omitempty"` // nil = default 30m; 0 = no limit; >0 = minutes
CreatedAt time.Time `json:"created_at"`
Fired bool `json:"fired"`
FiredAt time.Time `json:"fired_at,omitempty"`
LastError string `json:"last_error,omitempty"`
}
// IsShellJob returns true if the job runs a shell command directly.
func (j *TimerJob) IsShellJob() bool {
return j.Exec != ""
}
const defaultTimerJobTimeout = 30 * time.Minute
// ExecutionTimeout returns how long the scheduler waits for the job goroutine to finish.
func (j *TimerJob) ExecutionTimeout() time.Duration {
if j.TimeoutMins == nil {
return defaultTimerJobTimeout
}
if *j.TimeoutMins <= 0 {
return 0
}
return time.Duration(*j.TimeoutMins) * time.Minute
}
// UsesNewSessionPerRun reports whether the timer should use a new engine session.
func (j *TimerJob) UsesNewSessionPerRun() bool {
return NormalizeCronSessionMode(j.SessionMode) == "new_per_run"
}
func validateTimerJob(j *TimerJob) error {
if strings.TrimSpace(j.SessionKey) == "" {
return fmt.Errorf("session_key is required")
}
if j.ScheduledAt.IsZero() {
return fmt.Errorf("scheduled_at is required")
}
if j.Prompt == "" && j.Exec == "" {
return fmt.Errorf("either prompt or exec is required")
}
if j.Prompt != "" && j.Exec != "" {
return fmt.Errorf("prompt and exec are mutually exclusive")
}
mode := NormalizeCronSessionMode(j.SessionMode)
if mode != "" && mode != "new_per_run" {
return fmt.Errorf("invalid session_mode %q (want reuse, new_per_run, or new-per-run)", j.SessionMode)
}
if j.Mode != "" {
switch j.Mode {
case "default", "bypassPermissions", "acceptEdits", "plan", "auto", "dontAsk":
default:
return fmt.Errorf("invalid mode %q", j.Mode)
}
}
if j.TimeoutMins != nil && *j.TimeoutMins < 0 {
return fmt.Errorf("timeout_mins must be >= 0")
}
return nil
}
// TimerStore persists timer jobs to a JSON file.
type TimerStore struct {
path string
mu sync.Mutex
jobs []*TimerJob
}
func NewTimerStore(dataDir string) (*TimerStore, error) {
dir := filepath.Join(dataDir, "timers")
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
path := filepath.Join(dir, "jobs.json")
s := &TimerStore{path: path}
s.load()
return s, nil
}
func (s *TimerStore) load() {
data, err := os.ReadFile(s.path)
if err != nil {
return
}
if err := json.Unmarshal(data, &s.jobs); err != nil {
slog.Error("timer: failed to load jobs", "path", s.path, "error", err)
}
}
func (s *TimerStore) save() error {
data, err := json.MarshalIndent(s.jobs, "", " ")
if err != nil {
return err
}
return AtomicWriteFile(s.path, data, 0o644)
}
func (s *TimerStore) Add(job *TimerJob) error {
s.mu.Lock()
defer s.mu.Unlock()
s.jobs = append(s.jobs, job)
return s.save()
}
func (s *TimerStore) Remove(id string) bool {
s.mu.Lock()
defer s.mu.Unlock()
for i, j := range s.jobs {
if j.ID == id {
s.jobs = append(s.jobs[:i], s.jobs[i+1:]...)
if err := s.save(); err != nil {
slog.Warn("timer: failed to save after remove", "error", err)
}
return true
}
}
return false
}
func (s *TimerStore) SetMute(id string, mute bool) bool {
s.mu.Lock()
defer s.mu.Unlock()
for _, j := range s.jobs {
if j.ID == id {
j.Mute = mute
if err := s.save(); err != nil {
slog.Warn("timer: save after mute toggle", "error", err)
}
return true
}
}
return false
}
func (s *TimerStore) MarkFired(id string, err error) {
s.mu.Lock()
defer s.mu.Unlock()
for _, j := range s.jobs {
if j.ID == id {
j.Fired = true
j.FiredAt = time.Now()
if err != nil {
j.LastError = err.Error()
} else {
j.LastError = ""
}
if saveErr := s.save(); saveErr != nil {
slog.Warn("timer: failed to save after mark fired", "error", saveErr)
}
return
}
}
}
func (s *TimerStore) List() []*TimerJob {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]*TimerJob, len(s.jobs))
copy(out, s.jobs)
return out
}
func (s *TimerStore) ListByProject(project string) []*TimerJob {
s.mu.Lock()
defer s.mu.Unlock()
var out []*TimerJob
for _, j := range s.jobs {
if j.Project == project {
out = append(out, j)
}
}
return out
}
func (s *TimerStore) ListBySessionKey(sessionKey string) []*TimerJob {
s.mu.Lock()
defer s.mu.Unlock()
var out []*TimerJob
for _, j := range s.jobs {
if j.SessionKey == sessionKey {
out = append(out, j)
}
}
return out
}
func (s *TimerStore) Get(id string) *TimerJob {
s.mu.Lock()
defer s.mu.Unlock()
for _, j := range s.jobs {
if j.ID == id {
return j
}
}
return nil
}
// ListPending returns all non-fired timer jobs.
func (s *TimerStore) ListPending() []*TimerJob {
s.mu.Lock()
defer s.mu.Unlock()
var out []*TimerJob
for _, j := range s.jobs {
if !j.Fired {
out = append(out, j)
}
}
return out
}
// TimerScheduler runs one-shot timer jobs using time.AfterFunc.
type TimerScheduler struct {
store *TimerStore
engines map[string]*Engine
mu sync.RWMutex
timers map[string]*time.Timer // job ID → active timer
defaultSilent bool
defaultSessionMode string
}
// missedJobGracePeriod is how long after a missed fire time we still execute.
// Older missed jobs are logged and skipped.
const missedJobGracePeriod = 5 * time.Minute
func NewTimerScheduler(store *TimerStore) *TimerScheduler {
return &TimerScheduler{
store: store,
engines: make(map[string]*Engine),
timers: make(map[string]*time.Timer),
}
}
func (ts *TimerScheduler) RegisterEngine(name string, e *Engine) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.engines[name] = e
}
// SetDefaultSilent sets the default silent mode for timer jobs.
// Must be called before Start (not safe for concurrent use).
func (ts *TimerScheduler) SetDefaultSilent(silent bool) {
ts.defaultSilent = silent
}
// SetDefaultSessionMode sets the default session mode for timer jobs.
// Must be called before Start (not safe for concurrent use).
func (ts *TimerScheduler) SetDefaultSessionMode(mode string) {
ts.defaultSessionMode = NormalizeCronSessionMode(mode)
}
// IsSilent returns whether the timer job should suppress the start notification.
func (ts *TimerScheduler) IsSilent(job *TimerJob) bool {
if job.Silent != nil {
return *job.Silent
}
return ts.defaultSilent
}
// UsesNewSession returns whether the job should create a fresh session.
func (ts *TimerScheduler) UsesNewSession(job *TimerJob) bool {
if job.SessionMode != "" {
return job.UsesNewSessionPerRun()
}
return ts.defaultSessionMode == "new_per_run"
}
func (ts *TimerScheduler) Start() error {
jobs := ts.store.List()
now := time.Now()
var scheduled, missed, skipped int
for _, job := range jobs {
if job.Fired {
continue
}
delay := job.ScheduledAt.Sub(now)
if delay <= 0 {
// Past due
if -delay <= missedJobGracePeriod {
// Just missed — fire immediately
slog.Info("timer: firing missed job immediately", "id", job.ID, "overdue", -delay)
ts.scheduleAt(job, 0)
missed++
} else {
slog.Warn("timer: skipping stale job", "id", job.ID, "scheduled_at", job.ScheduledAt, "overdue", -delay)
ts.store.MarkFired(job.ID, fmt.Errorf("missed by %v (stale)", -delay))
skipped++
}
} else {
ts.scheduleAt(job, delay)
scheduled++
}
}
slog.Info("timer: scheduler started", "scheduled", scheduled, "missed_fired", missed, "skipped_stale", skipped, "total", len(jobs))
return nil
}
func (ts *TimerScheduler) Stop() {
ts.mu.Lock()
defer ts.mu.Unlock()
for id, t := range ts.timers {
t.Stop()
delete(ts.timers, id)
}
}
func (ts *TimerScheduler) AddJob(job *TimerJob) error {
if err := validateTimerJob(job); err != nil {
return err
}
job.SessionMode = NormalizeCronSessionMode(job.SessionMode)
if err := ts.store.Add(job); err != nil {
return err
}
delay := time.Until(job.ScheduledAt)
if delay <= 0 {
// Already due — fire immediately
slog.Info("timer: new job already due, firing immediately", "id", job.ID)
ts.scheduleAt(job, 0)
} else {
ts.scheduleAt(job, delay)
}
return nil
}
func (ts *TimerScheduler) RemoveJob(id string) bool {
ts.mu.Lock()
if t, ok := ts.timers[id]; ok {
t.Stop()
delete(ts.timers, id)
}
// Remove from store while still holding ts.mu to prevent executeJob
// from running a job that was just cancelled (the AfterFunc callback
// also needs ts.mu, so it will see the store entry is gone).
removed := ts.store.Remove(id)
ts.mu.Unlock()
return removed
}
func (ts *TimerScheduler) SetMute(id string, mute bool) bool {
return ts.store.SetMute(id, mute)
}
func (ts *TimerScheduler) Store() *TimerStore {
return ts.store
}
func (ts *TimerScheduler) scheduleAt(job *TimerJob, delay time.Duration) {
jobID := job.ID
ts.mu.Lock()
// Stop any existing timer for this job
if old, ok := ts.timers[jobID]; ok {
old.Stop()
}
// Register timer in map before it can fire, so RemoveJob can always cancel it.
t := time.AfterFunc(delay, func() {
ts.mu.Lock()
delete(ts.timers, jobID)
ts.mu.Unlock()
ts.executeJob(jobID)
})
ts.timers[jobID] = t
ts.mu.Unlock()
}
func (ts *TimerScheduler) executeJob(jobID string) {
job := ts.store.Get(jobID)
if job == nil || job.Fired {
return
}
ts.mu.RLock()
engine, ok := ts.engines[job.Project]
ts.mu.RUnlock()
if !ok {
slog.Error("timer: project not found", "job", jobID, "project", job.Project)
ts.store.MarkFired(jobID, fmt.Errorf("project %q not found", job.Project))
return
}
slog.Info("timer: executing job", "id", jobID, "project", job.Project, "prompt", truncateStr(job.Prompt, 60))
done := make(chan error, 1)
go func() {
done <- engine.ExecuteTimerJob(job)
}()
var err error
timeout := job.ExecutionTimeout()
if timeout > 0 {
select {
case err = <-done:
case <-time.After(timeout):
err = fmt.Errorf("job timed out after %v", timeout)
}
} else {
err = <-done
}
ts.store.MarkFired(jobID, err)
if err != nil {
slog.Error("timer: job failed", "id", jobID, "error", err)
} else {
slog.Info("timer: job completed", "id", jobID)
}
}
func GenerateTimerID() string {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
panic(fmt.Errorf("generate timer id: %w", err))
}
return hex.EncodeToString(b)
}
// ParseDelayOrTime parses a relative duration ("2h", "30m", "1h30m") or
// an absolute ISO time ("2026-05-15T14:00", "2026-05-15T14:00:00+08:00")
// and returns the absolute fire time.
func ParseDelayOrTime(s string) (time.Time, error) {
s = strings.TrimSpace(s)
if s == "" {
return time.Time{}, fmt.Errorf("empty delay or time")
}
// Try as a Go duration first (e.g., "2h", "30m", "1h30m", "2h30m15s")
if d, err := time.ParseDuration(s); err == nil {
if d <= 0 {
return time.Time{}, fmt.Errorf("delay must be positive")
}
return time.Now().Add(d), nil
}
// Try ISO time formats
// RFC3339 includes timezone (e.g. "2026-05-15T14:00:00+08:00"),
// so it's parsed directly. The other layouts have no timezone
// and are interpreted in the system's local timezone.
layouts := []struct {
layout string
local bool
}{
{time.RFC3339, false},
{"2006-01-02T15:04:05", true},
{"2006-01-02T15:04", true},
{"2006-01-02 15:04:05", true},
{"2006-01-02 15:04", true},
}
for _, l := range layouts {
if l.local {
if t, err := time.ParseInLocation(l.layout, s, time.Local); err == nil {
return t, nil
}
} else {
if t, err := time.Parse(l.layout, s); err == nil {
return t, nil
}
}
}
return time.Time{}, fmt.Errorf("invalid delay or time %q (use duration like 2h30m, or ISO time like 2026-05-15T14:00)", s)
}
// FormatTimerRemaining returns a human-readable string for time remaining
// until the scheduled fire time.
func FormatTimerRemaining(scheduledAt time.Time) string {
d := time.Until(scheduledAt)
if d <= 0 {
return "overdue"
}
if d < time.Minute {
secs := int(d.Seconds() + 0.5) // round up for countdown display
if secs >= 60 {
return "1m"
}
return fmt.Sprintf("%ds", secs)
}
if d < time.Hour {
mins := int(d.Minutes() + 0.5) // round up for countdown display
if mins >= 60 {
return "1h"
}
return fmt.Sprintf("%dm", mins)
}
totalMins := int(d.Minutes() + 0.5) // round up
hours := totalMins / 60
mins := totalMins % 60
if mins == 0 {
return fmt.Sprintf("%dh", hours)
}
return fmt.Sprintf("%dh%dm", hours, mins)
}

528
core/timer_test.go Normal file
View File

@@ -0,0 +1,528 @@
package core
import (
"os"
"path/filepath"
"testing"
"time"
)
func TestParseDelayOrTime_Relative(t *testing.T) {
tests := []struct {
input string
wantErr bool
}{
{"2h", false},
{"30m", false},
{"1h30m", false},
{"2h30m15s", false},
{"500ms", false},
{"0s", true}, // zero = not positive
{"-1h", true}, // negative
{"", true}, // empty
{"garbage", true}, // invalid
{"2", true}, // bare number
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
got, err := ParseDelayOrTime(tt.input)
if tt.wantErr {
if err == nil {
t.Errorf("ParseDelayOrTime(%q) = %v, want error", tt.input, got)
}
return
}
if err != nil {
t.Fatalf("ParseDelayOrTime(%q): %v", tt.input, err)
}
// Should be in the future
if got.Before(time.Now().Add(-time.Second)) {
t.Errorf("ParseDelayOrTime(%q) = %v, want future time", tt.input, got)
}
})
}
}
func TestParseDelayOrTime_Absolute(t *testing.T) {
tests := []struct {
input string
wantErr bool
}{
{"2026-05-15T14:00:00+08:00", false},
{"2026-05-15T14:00:00Z", false},
{"2026-05-15T14:00:00", false},
{"2026-05-15T14:00", false},
{"2026-05-15 14:00:00", false},
{"2026-05-15 14:00", false},
{"not-a-date", true},
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
got, err := ParseDelayOrTime(tt.input)
if tt.wantErr {
if err == nil {
t.Errorf("ParseDelayOrTime(%q) = %v, want error", tt.input, got)
}
return
}
if err != nil {
t.Fatalf("ParseDelayOrTime(%q): %v", tt.input, err)
}
if got.IsZero() {
t.Errorf("ParseDelayOrTime(%q) returned zero time", tt.input)
}
})
}
}
func TestParseDelayOrTime_LocalTimezone(t *testing.T) {
// When no timezone is specified, should use system local timezone
got, err := ParseDelayOrTime("2026-05-15T14:00")
if err != nil {
t.Fatalf("ParseDelayOrTime: %v", err)
}
if got.Location() != time.Local {
t.Errorf("expected local timezone %v, got %v", time.Local, got.Location())
}
if got.Hour() != 14 || got.Minute() != 0 {
t.Errorf("expected 14:00 local, got %02d:%02d", got.Hour(), got.Minute())
}
// With explicit timezone, should use that timezone
got2, err := ParseDelayOrTime("2026-05-15T14:00:00+08:00")
if err != nil {
t.Fatalf("ParseDelayOrTime: %v", err)
}
_, offset := got2.Zone()
if offset != 8*3600 {
t.Errorf("expected +08:00 offset (%d), got %d", 8*3600, offset)
}
}
func TestTimerStore_CRUD(t *testing.T) {
dir := t.TempDir()
store, err := NewTimerStore(dir)
if err != nil {
t.Fatalf("NewTimerStore: %v", err)
}
job := &TimerJob{
ID: "abcd1234",
Project: "test",
SessionKey: "feishu:chat1:user1",
ScheduledAt: time.Now().Add(time.Hour),
Prompt: "test prompt",
Description: "test",
CreatedAt: time.Now(),
}
// Add
if err := store.Add(job); err != nil {
t.Fatalf("Add: %v", err)
}
// Get
got := store.Get("abcd1234")
if got == nil {
t.Fatal("Get returned nil")
}
if got.Prompt != "test prompt" {
t.Errorf("Get.Prompt = %q, want %q", got.Prompt, "test prompt")
}
// List
all := store.List()
if len(all) != 1 {
t.Errorf("List returned %d jobs, want 1", len(all))
}
// ListPending
pending := store.ListPending()
if len(pending) != 1 {
t.Errorf("ListPending returned %d jobs, want 1", len(pending))
}
// ListByProject
byProject := store.ListByProject("test")
if len(byProject) != 1 {
t.Errorf("ListByProject returned %d jobs, want 1", len(byProject))
}
// ListBySessionKey
bySession := store.ListBySessionKey("feishu:chat1:user1")
if len(bySession) != 1 {
t.Errorf("ListBySessionKey returned %d jobs, want 1", len(bySession))
}
// SetMute
if !store.SetMute("abcd1234", true) {
t.Error("SetMute returned false")
}
got = store.Get("abcd1234")
if !got.Mute {
t.Error("Mute not set")
}
// MarkFired
store.MarkFired("abcd1234", nil)
got = store.Get("abcd1234")
if !got.Fired {
t.Error("Fired not set")
}
if got.FiredAt.IsZero() {
t.Error("FiredAt is zero")
}
// ListPending should be empty now
pending = store.ListPending()
if len(pending) != 0 {
t.Errorf("ListPending after fire returned %d jobs, want 0", len(pending))
}
// Remove
if !store.Remove("abcd1234") {
t.Error("Remove returned false")
}
if store.Get("abcd1234") != nil {
t.Error("Get after Remove returned non-nil")
}
}
func TestTimerStore_Persistence(t *testing.T) {
dir := t.TempDir()
store1, err := NewTimerStore(dir)
if err != nil {
t.Fatalf("NewTimerStore: %v", err)
}
job := &TimerJob{
ID: "persist1",
Project: "test",
SessionKey: "key1",
ScheduledAt: time.Now().Add(time.Hour),
Prompt: "hello",
CreatedAt: time.Now(),
}
if err := store1.Add(job); err != nil {
t.Fatalf("Add: %v", err)
}
// Reload from disk
store2, err := NewTimerStore(dir)
if err != nil {
t.Fatalf("NewTimerStore reload: %v", err)
}
got := store2.Get("persist1")
if got == nil {
t.Fatal("Get after reload returned nil")
}
if got.Prompt != "hello" {
t.Errorf("Prompt after reload = %q, want %q", got.Prompt, "hello")
}
}
func TestTimerScheduler_FiresOnTime(t *testing.T) {
dir := t.TempDir()
store, err := NewTimerStore(dir)
if err != nil {
t.Fatalf("NewTimerStore: %v", err)
}
sched := NewTimerScheduler(store)
// Register a stub engine
engine := &Engine{name: "test"}
sched.RegisterEngine("test", engine)
// We can't easily test ExecuteTimerJob without a full engine setup,
// so we test the scheduling mechanics by verifying the timer is created.
job := &TimerJob{
ID: "fire1",
Project: "test",
SessionKey: "key1",
ScheduledAt: time.Now().Add(100 * time.Millisecond),
Prompt: "test",
CreatedAt: time.Now(),
}
if err := sched.AddJob(job); err != nil {
t.Fatalf("AddJob: %v", err)
}
// Verify timer is registered
sched.mu.RLock()
_, has := sched.timers["fire1"]
sched.mu.RUnlock()
if !has {
t.Error("timer not registered in sched.timers")
}
// Wait for fire (the job will fail because engine is bare, but that's OK)
time.Sleep(300 * time.Millisecond)
// Timer entry should be cleaned up
sched.mu.RLock()
_, has = sched.timers["fire1"]
sched.mu.RUnlock()
if has {
t.Error("timer entry not cleaned up after fire")
}
// Job should be marked fired
got := store.Get("fire1")
if got == nil {
t.Fatal("job not found after fire")
}
if !got.Fired {
t.Error("job not marked as fired")
}
}
func TestTimerScheduler_Cancellation(t *testing.T) {
dir := t.TempDir()
store, err := NewTimerStore(dir)
if err != nil {
t.Fatalf("NewTimerStore: %v", err)
}
sched := NewTimerScheduler(store)
sched.RegisterEngine("test", &Engine{name: "test"})
job := &TimerJob{
ID: "cancel1",
Project: "test",
SessionKey: "key1",
ScheduledAt: time.Now().Add(time.Hour),
Prompt: "test",
CreatedAt: time.Now(),
}
if err := sched.AddJob(job); err != nil {
t.Fatalf("AddJob: %v", err)
}
// Remove before fire
if !sched.RemoveJob("cancel1") {
t.Error("RemoveJob returned false")
}
// Timer should be stopped and removed
sched.mu.RLock()
_, has := sched.timers["cancel1"]
sched.mu.RUnlock()
if has {
t.Error("timer entry not removed after cancellation")
}
// Job should be gone from store
if store.Get("cancel1") != nil {
t.Error("job still in store after removal")
}
}
func TestTimerScheduler_Recovery(t *testing.T) {
dir := t.TempDir()
store, err := NewTimerStore(dir)
if err != nil {
t.Fatalf("NewTimerStore: %v", err)
}
// Add a job that's due very soon
job := &TimerJob{
ID: "recover1",
Project: "test",
SessionKey: "key1",
ScheduledAt: time.Now().Add(50 * time.Millisecond),
Prompt: "recover test",
CreatedAt: time.Now(),
}
if err := store.Add(job); err != nil {
t.Fatalf("Add: %v", err)
}
// Simulate restart: create new scheduler from same store
sched := NewTimerScheduler(store)
sched.RegisterEngine("test", &Engine{name: "test"})
if err := sched.Start(); err != nil {
t.Fatalf("Start: %v", err)
}
defer sched.Stop()
// The job should fire (within grace period)
time.Sleep(500 * time.Millisecond)
got := store.Get("recover1")
if got == nil {
t.Fatal("job not found after recovery")
}
if !got.Fired {
t.Error("recovered job not fired")
}
}
func TestTimerScheduler_StaleJobSkipped(t *testing.T) {
dir := t.TempDir()
store, err := NewTimerStore(dir)
if err != nil {
t.Fatalf("NewTimerStore: %v", err)
}
// Add a job that's way past due (>5 min)
job := &TimerJob{
ID: "stale1",
Project: "test",
SessionKey: "key1",
ScheduledAt: time.Now().Add(-10 * time.Minute),
Prompt: "stale test",
CreatedAt: time.Now().Add(-15 * time.Minute),
}
if err := store.Add(job); err != nil {
t.Fatalf("Add: %v", err)
}
sched := NewTimerScheduler(store)
sched.RegisterEngine("test", &Engine{name: "test"})
if err := sched.Start(); err != nil {
t.Fatalf("Start: %v", err)
}
defer sched.Stop()
// Give scheduler a moment to process
time.Sleep(100 * time.Millisecond)
got := store.Get("stale1")
if got == nil {
t.Fatal("job not found")
}
if !got.Fired {
t.Error("stale job not marked as fired")
}
if got.LastError == "" {
t.Error("stale job should have a LastError")
}
}
func TestValidateTimerJob(t *testing.T) {
now := time.Now()
tests := []struct {
name string
job *TimerJob
wantErr bool
}{
{
name: "valid prompt job",
job: &TimerJob{SessionKey: "feishu:chat1:user1", ScheduledAt: now.Add(time.Hour), Prompt: "test"},
wantErr: false,
},
{
name: "valid exec job",
job: &TimerJob{SessionKey: "feishu:chat1:user1", ScheduledAt: now.Add(time.Hour), Exec: "echo hello"},
wantErr: false,
},
{
name: "empty session_key",
job: &TimerJob{ScheduledAt: now.Add(time.Hour), Prompt: "test"},
wantErr: true,
},
{
name: "whitespace session_key",
job: &TimerJob{SessionKey: " ", ScheduledAt: now.Add(time.Hour), Prompt: "test"},
wantErr: true,
},
{
name: "missing scheduled_at",
job: &TimerJob{SessionKey: "feishu:chat1:user1", Prompt: "test"},
wantErr: true,
},
{
name: "missing prompt and exec",
job: &TimerJob{SessionKey: "feishu:chat1:user1", ScheduledAt: now.Add(time.Hour)},
wantErr: true,
},
{
name: "both prompt and exec",
job: &TimerJob{SessionKey: "feishu:chat1:user1", ScheduledAt: now.Add(time.Hour), Prompt: "test", Exec: "echo"},
wantErr: true,
},
{
name: "invalid session_mode",
job: &TimerJob{SessionKey: "feishu:chat1:user1", ScheduledAt: now.Add(time.Hour), Prompt: "test", SessionMode: "bad"},
wantErr: true,
},
{
name: "valid session_mode",
job: &TimerJob{SessionKey: "feishu:chat1:user1", ScheduledAt: now.Add(time.Hour), Prompt: "test", SessionMode: "new-per-run"},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateTimerJob(tt.job)
if tt.wantErr && err == nil {
t.Error("expected error, got nil")
}
if !tt.wantErr && err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}
func TestGenerateTimerID(t *testing.T) {
ids := make(map[string]bool)
for i := 0; i < 100; i++ {
id := GenerateTimerID()
if len(id) != 8 {
t.Errorf("GenerateTimerID() = %q (len %d), want len 8", id, len(id))
}
if ids[id] {
t.Errorf("duplicate ID: %s", id)
}
ids[id] = true
}
}
func TestFormatTimerRemaining(t *testing.T) {
tests := []struct {
name string
d time.Duration
want string
}{
{"seconds", 30 * time.Second, "30s"},
{"minutes", 5 * time.Minute, "5m"},
{"hours", 2 * time.Hour, "2h"},
{"hours and minutes", 90 * time.Minute, "1h30m"},
{"overdue", -time.Minute, "overdue"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := FormatTimerRemaining(time.Now().Add(tt.d))
if got != tt.want {
t.Errorf("FormatTimerRemaining() = %q, want %q", got, tt.want)
}
})
}
}
func TestTimerStore_FilePath(t *testing.T) {
dir := t.TempDir()
store, err := NewTimerStore(dir)
if err != nil {
t.Fatalf("NewTimerStore: %v", err)
}
expected := filepath.Join(dir, "timers", "jobs.json")
if store.path != expected {
t.Errorf("path = %q, want %q", store.path, expected)
}
if _, err := os.Stat(filepath.Dir(store.path)); err != nil {
t.Errorf("timers directory not created: %v", err)
}
}