Files
chenhg5-cc-connect/core/timer.go
Han e4c9e8e148 feat(timer): add one-shot delayed task system (/timer) (#1012)
* feat(timer): add one-shot delayed task system (/timer)

Introduces a one-shot timer feature parallel to the existing cron
(recurring) system. Users can schedule delayed tasks via chat command
(/timer add 2h check PR status), CLI (cc-connect timer add --delay 2h),
or agent system prompt.

Core changes:
- core/timer.go: TimerJob, TimerStore, TimerScheduler, ParseDelayOrTime
- core/timer_test.go: 13 unit tests covering store, scheduler, parsing
- cmd/cc-connect/timer.go: CLI subcommands (add/list/del/info)
- core/engine.go: ExecuteTimerJob, cmdTimer, renderTimerCard, shell exec
- core/api.go: /timer/add, /timer/list, /timer/info, /timer/del endpoints
- core/i18n.go: 22 MsgTimer* keys with 5-language translations
- core/hooks.go: HookEventTimerTriggered event
- core/interfaces.go: agent system prompt section for timers
- core/management.go: SetTimerScheduler wiring
- cmd/cc-connect/main.go: timer store/scheduler lifecycle

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(timer): add /timer to help card and improve usage docs

- Add /timer to /help card tools section (all 5 languages)
- Add /timer to text-based /help fallback (all 5 languages)
- Improve MsgTimerAddUsage with both relative and absolute time examples

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix(timer): use local timezone for absolute time parsing

When a user specifies an absolute time without timezone (e.g. "9:00"),
it should be interpreted as local time, not UTC. Use time.ParseInLocation
with time.Local for layouts that don't include a timezone component.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* docs(timer): clarify local timezone for absolute time

Absolute times without timezone (e.g. "2026-05-16T09:00") use the
system's local timezone, not UTC. This is now documented in:
- Agent system prompt (core/interfaces.go)
- /timer usage message (MsgTimerUsage, all 5 languages)
- /timer add usage message (MsgTimerAddUsage, all 5 languages)
- CLI --help text (cmd/cc-connect/timer.go)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix(timer): use time.Until instead of Sub(time.Now()) for lint

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix(timer): sync cron fixes for session_key validation and slash prompt expansion

Reference PR #973: reject empty session_key in validateTimerJob so
management API doesn't persist unrunnable timer jobs.

Reference PR #928: resolve /skill slash prompts through skill registry
in ExecuteTimerJob before constructing the agent message.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* fix: lint - errcheck resp.Body.Close, staticcheck WriteString(fmt.Sprintf)

* fix: two more WriteString(fmt.Sprintf) → fmt.Fprintf

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-11 00:10:12 +08:00

520 lines
13 KiB
Go

package core
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// TimerJob represents a persisted one-shot timer.
type TimerJob struct {
ID string `json:"id"`
Project string `json:"project"`
SessionKey string `json:"session_key"`
ScheduledAt time.Time `json:"scheduled_at"` // absolute fire time
Prompt string `json:"prompt"`
Exec string `json:"exec,omitempty"` // shell command; mutually exclusive with Prompt
WorkDir string `json:"work_dir,omitempty"` // working directory for exec; empty = agent work_dir
Description string `json:"description"`
Silent *bool `json:"silent,omitempty"` // suppress start notification; nil = use global default
Mute bool `json:"mute,omitempty"` // suppress ALL messages (start + result)
SessionMode string `json:"session_mode,omitempty"` // "" or "reuse" = share active session; "new_per_run" = fresh session each run
Mode string `json:"mode,omitempty"` // permission mode override; "" = use project default
TimeoutMins *int `json:"timeout_mins,omitempty"` // nil = default 30m; 0 = no limit; >0 = minutes
CreatedAt time.Time `json:"created_at"`
Fired bool `json:"fired"`
FiredAt time.Time `json:"fired_at,omitempty"`
LastError string `json:"last_error,omitempty"`
}
// IsShellJob returns true if the job runs a shell command directly.
func (j *TimerJob) IsShellJob() bool {
return j.Exec != ""
}
const defaultTimerJobTimeout = 30 * time.Minute
// ExecutionTimeout returns how long the scheduler waits for the job goroutine to finish.
func (j *TimerJob) ExecutionTimeout() time.Duration {
if j.TimeoutMins == nil {
return defaultTimerJobTimeout
}
if *j.TimeoutMins <= 0 {
return 0
}
return time.Duration(*j.TimeoutMins) * time.Minute
}
// UsesNewSessionPerRun reports whether the timer should use a new engine session.
func (j *TimerJob) UsesNewSessionPerRun() bool {
return NormalizeCronSessionMode(j.SessionMode) == "new_per_run"
}
func validateTimerJob(j *TimerJob) error {
if strings.TrimSpace(j.SessionKey) == "" {
return fmt.Errorf("session_key is required")
}
if j.ScheduledAt.IsZero() {
return fmt.Errorf("scheduled_at is required")
}
if j.Prompt == "" && j.Exec == "" {
return fmt.Errorf("either prompt or exec is required")
}
if j.Prompt != "" && j.Exec != "" {
return fmt.Errorf("prompt and exec are mutually exclusive")
}
mode := NormalizeCronSessionMode(j.SessionMode)
if mode != "" && mode != "new_per_run" {
return fmt.Errorf("invalid session_mode %q (want reuse, new_per_run, or new-per-run)", j.SessionMode)
}
if j.Mode != "" {
switch j.Mode {
case "default", "bypassPermissions", "acceptEdits", "plan", "auto", "dontAsk":
default:
return fmt.Errorf("invalid mode %q", j.Mode)
}
}
if j.TimeoutMins != nil && *j.TimeoutMins < 0 {
return fmt.Errorf("timeout_mins must be >= 0")
}
return nil
}
// TimerStore persists timer jobs to a JSON file.
type TimerStore struct {
path string
mu sync.Mutex
jobs []*TimerJob
}
func NewTimerStore(dataDir string) (*TimerStore, error) {
dir := filepath.Join(dataDir, "timers")
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
path := filepath.Join(dir, "jobs.json")
s := &TimerStore{path: path}
s.load()
return s, nil
}
func (s *TimerStore) load() {
data, err := os.ReadFile(s.path)
if err != nil {
return
}
if err := json.Unmarshal(data, &s.jobs); err != nil {
slog.Error("timer: failed to load jobs", "path", s.path, "error", err)
}
}
func (s *TimerStore) save() error {
data, err := json.MarshalIndent(s.jobs, "", " ")
if err != nil {
return err
}
return AtomicWriteFile(s.path, data, 0o644)
}
func (s *TimerStore) Add(job *TimerJob) error {
s.mu.Lock()
defer s.mu.Unlock()
s.jobs = append(s.jobs, job)
return s.save()
}
func (s *TimerStore) Remove(id string) bool {
s.mu.Lock()
defer s.mu.Unlock()
for i, j := range s.jobs {
if j.ID == id {
s.jobs = append(s.jobs[:i], s.jobs[i+1:]...)
if err := s.save(); err != nil {
slog.Warn("timer: failed to save after remove", "error", err)
}
return true
}
}
return false
}
func (s *TimerStore) SetMute(id string, mute bool) bool {
s.mu.Lock()
defer s.mu.Unlock()
for _, j := range s.jobs {
if j.ID == id {
j.Mute = mute
if err := s.save(); err != nil {
slog.Warn("timer: save after mute toggle", "error", err)
}
return true
}
}
return false
}
func (s *TimerStore) MarkFired(id string, err error) {
s.mu.Lock()
defer s.mu.Unlock()
for _, j := range s.jobs {
if j.ID == id {
j.Fired = true
j.FiredAt = time.Now()
if err != nil {
j.LastError = err.Error()
} else {
j.LastError = ""
}
if saveErr := s.save(); saveErr != nil {
slog.Warn("timer: failed to save after mark fired", "error", saveErr)
}
return
}
}
}
func (s *TimerStore) List() []*TimerJob {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]*TimerJob, len(s.jobs))
copy(out, s.jobs)
return out
}
func (s *TimerStore) ListByProject(project string) []*TimerJob {
s.mu.Lock()
defer s.mu.Unlock()
var out []*TimerJob
for _, j := range s.jobs {
if j.Project == project {
out = append(out, j)
}
}
return out
}
func (s *TimerStore) ListBySessionKey(sessionKey string) []*TimerJob {
s.mu.Lock()
defer s.mu.Unlock()
var out []*TimerJob
for _, j := range s.jobs {
if j.SessionKey == sessionKey {
out = append(out, j)
}
}
return out
}
func (s *TimerStore) Get(id string) *TimerJob {
s.mu.Lock()
defer s.mu.Unlock()
for _, j := range s.jobs {
if j.ID == id {
return j
}
}
return nil
}
// ListPending returns all non-fired timer jobs.
func (s *TimerStore) ListPending() []*TimerJob {
s.mu.Lock()
defer s.mu.Unlock()
var out []*TimerJob
for _, j := range s.jobs {
if !j.Fired {
out = append(out, j)
}
}
return out
}
// TimerScheduler runs one-shot timer jobs using time.AfterFunc.
type TimerScheduler struct {
store *TimerStore
engines map[string]*Engine
mu sync.RWMutex
timers map[string]*time.Timer // job ID → active timer
defaultSilent bool
defaultSessionMode string
}
// missedJobGracePeriod is how long after a missed fire time we still execute.
// Older missed jobs are logged and skipped.
const missedJobGracePeriod = 5 * time.Minute
func NewTimerScheduler(store *TimerStore) *TimerScheduler {
return &TimerScheduler{
store: store,
engines: make(map[string]*Engine),
timers: make(map[string]*time.Timer),
}
}
func (ts *TimerScheduler) RegisterEngine(name string, e *Engine) {
ts.mu.Lock()
defer ts.mu.Unlock()
ts.engines[name] = e
}
// SetDefaultSilent sets the default silent mode for timer jobs.
// Must be called before Start (not safe for concurrent use).
func (ts *TimerScheduler) SetDefaultSilent(silent bool) {
ts.defaultSilent = silent
}
// SetDefaultSessionMode sets the default session mode for timer jobs.
// Must be called before Start (not safe for concurrent use).
func (ts *TimerScheduler) SetDefaultSessionMode(mode string) {
ts.defaultSessionMode = NormalizeCronSessionMode(mode)
}
// IsSilent returns whether the timer job should suppress the start notification.
func (ts *TimerScheduler) IsSilent(job *TimerJob) bool {
if job.Silent != nil {
return *job.Silent
}
return ts.defaultSilent
}
// UsesNewSession returns whether the job should create a fresh session.
func (ts *TimerScheduler) UsesNewSession(job *TimerJob) bool {
if job.SessionMode != "" {
return job.UsesNewSessionPerRun()
}
return ts.defaultSessionMode == "new_per_run"
}
func (ts *TimerScheduler) Start() error {
jobs := ts.store.List()
now := time.Now()
var scheduled, missed, skipped int
for _, job := range jobs {
if job.Fired {
continue
}
delay := job.ScheduledAt.Sub(now)
if delay <= 0 {
// Past due
if -delay <= missedJobGracePeriod {
// Just missed — fire immediately
slog.Info("timer: firing missed job immediately", "id", job.ID, "overdue", -delay)
ts.scheduleAt(job, 0)
missed++
} else {
slog.Warn("timer: skipping stale job", "id", job.ID, "scheduled_at", job.ScheduledAt, "overdue", -delay)
ts.store.MarkFired(job.ID, fmt.Errorf("missed by %v (stale)", -delay))
skipped++
}
} else {
ts.scheduleAt(job, delay)
scheduled++
}
}
slog.Info("timer: scheduler started", "scheduled", scheduled, "missed_fired", missed, "skipped_stale", skipped, "total", len(jobs))
return nil
}
func (ts *TimerScheduler) Stop() {
ts.mu.Lock()
defer ts.mu.Unlock()
for id, t := range ts.timers {
t.Stop()
delete(ts.timers, id)
}
}
func (ts *TimerScheduler) AddJob(job *TimerJob) error {
if err := validateTimerJob(job); err != nil {
return err
}
job.SessionMode = NormalizeCronSessionMode(job.SessionMode)
if err := ts.store.Add(job); err != nil {
return err
}
delay := time.Until(job.ScheduledAt)
if delay <= 0 {
// Already due — fire immediately
slog.Info("timer: new job already due, firing immediately", "id", job.ID)
ts.scheduleAt(job, 0)
} else {
ts.scheduleAt(job, delay)
}
return nil
}
func (ts *TimerScheduler) RemoveJob(id string) bool {
ts.mu.Lock()
if t, ok := ts.timers[id]; ok {
t.Stop()
delete(ts.timers, id)
}
// Remove from store while still holding ts.mu to prevent executeJob
// from running a job that was just cancelled (the AfterFunc callback
// also needs ts.mu, so it will see the store entry is gone).
removed := ts.store.Remove(id)
ts.mu.Unlock()
return removed
}
func (ts *TimerScheduler) SetMute(id string, mute bool) bool {
return ts.store.SetMute(id, mute)
}
func (ts *TimerScheduler) Store() *TimerStore {
return ts.store
}
func (ts *TimerScheduler) scheduleAt(job *TimerJob, delay time.Duration) {
jobID := job.ID
ts.mu.Lock()
// Stop any existing timer for this job
if old, ok := ts.timers[jobID]; ok {
old.Stop()
}
// Register timer in map before it can fire, so RemoveJob can always cancel it.
t := time.AfterFunc(delay, func() {
ts.mu.Lock()
delete(ts.timers, jobID)
ts.mu.Unlock()
ts.executeJob(jobID)
})
ts.timers[jobID] = t
ts.mu.Unlock()
}
func (ts *TimerScheduler) executeJob(jobID string) {
job := ts.store.Get(jobID)
if job == nil || job.Fired {
return
}
ts.mu.RLock()
engine, ok := ts.engines[job.Project]
ts.mu.RUnlock()
if !ok {
slog.Error("timer: project not found", "job", jobID, "project", job.Project)
ts.store.MarkFired(jobID, fmt.Errorf("project %q not found", job.Project))
return
}
slog.Info("timer: executing job", "id", jobID, "project", job.Project, "prompt", truncateStr(job.Prompt, 60))
done := make(chan error, 1)
go func() {
done <- engine.ExecuteTimerJob(job)
}()
var err error
timeout := job.ExecutionTimeout()
if timeout > 0 {
select {
case err = <-done:
case <-time.After(timeout):
err = fmt.Errorf("job timed out after %v", timeout)
}
} else {
err = <-done
}
ts.store.MarkFired(jobID, err)
if err != nil {
slog.Error("timer: job failed", "id", jobID, "error", err)
} else {
slog.Info("timer: job completed", "id", jobID)
}
}
func GenerateTimerID() string {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
panic(fmt.Errorf("generate timer id: %w", err))
}
return hex.EncodeToString(b)
}
// ParseDelayOrTime parses a relative duration ("2h", "30m", "1h30m") or
// an absolute ISO time ("2026-05-15T14:00", "2026-05-15T14:00:00+08:00")
// and returns the absolute fire time.
func ParseDelayOrTime(s string) (time.Time, error) {
s = strings.TrimSpace(s)
if s == "" {
return time.Time{}, fmt.Errorf("empty delay or time")
}
// Try as a Go duration first (e.g., "2h", "30m", "1h30m", "2h30m15s")
if d, err := time.ParseDuration(s); err == nil {
if d <= 0 {
return time.Time{}, fmt.Errorf("delay must be positive")
}
return time.Now().Add(d), nil
}
// Try ISO time formats
// RFC3339 includes timezone (e.g. "2026-05-15T14:00:00+08:00"),
// so it's parsed directly. The other layouts have no timezone
// and are interpreted in the system's local timezone.
layouts := []struct {
layout string
local bool
}{
{time.RFC3339, false},
{"2006-01-02T15:04:05", true},
{"2006-01-02T15:04", true},
{"2006-01-02 15:04:05", true},
{"2006-01-02 15:04", true},
}
for _, l := range layouts {
if l.local {
if t, err := time.ParseInLocation(l.layout, s, time.Local); err == nil {
return t, nil
}
} else {
if t, err := time.Parse(l.layout, s); err == nil {
return t, nil
}
}
}
return time.Time{}, fmt.Errorf("invalid delay or time %q (use duration like 2h30m, or ISO time like 2026-05-15T14:00)", s)
}
// FormatTimerRemaining returns a human-readable string for time remaining
// until the scheduled fire time.
func FormatTimerRemaining(scheduledAt time.Time) string {
d := time.Until(scheduledAt)
if d <= 0 {
return "overdue"
}
if d < time.Minute {
secs := int(d.Seconds() + 0.5) // round up for countdown display
if secs >= 60 {
return "1m"
}
return fmt.Sprintf("%ds", secs)
}
if d < time.Hour {
mins := int(d.Minutes() + 0.5) // round up for countdown display
if mins >= 60 {
return "1h"
}
return fmt.Sprintf("%dm", mins)
}
totalMins := int(d.Minutes() + 0.5) // round up
hours := totalMins / 60
mins := totalMins % 60
if mins == 0 {
return fmt.Sprintf("%dh", hours)
}
return fmt.Sprintf("%dh%dm", hours, mins)
}