mirror of
https://github.com/chenhg5/cc-connect.git
synced 2026-07-03 12:28:10 +08:00
* feat(timer): add one-shot delayed task system (/timer) Introduces a one-shot timer feature parallel to the existing cron (recurring) system. Users can schedule delayed tasks via chat command (/timer add 2h check PR status), CLI (cc-connect timer add --delay 2h), or agent system prompt. Core changes: - core/timer.go: TimerJob, TimerStore, TimerScheduler, ParseDelayOrTime - core/timer_test.go: 13 unit tests covering store, scheduler, parsing - cmd/cc-connect/timer.go: CLI subcommands (add/list/del/info) - core/engine.go: ExecuteTimerJob, cmdTimer, renderTimerCard, shell exec - core/api.go: /timer/add, /timer/list, /timer/info, /timer/del endpoints - core/i18n.go: 22 MsgTimer* keys with 5-language translations - core/hooks.go: HookEventTimerTriggered event - core/interfaces.go: agent system prompt section for timers - core/management.go: SetTimerScheduler wiring - cmd/cc-connect/main.go: timer store/scheduler lifecycle Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * feat(timer): add /timer to help card and improve usage docs - Add /timer to /help card tools section (all 5 languages) - Add /timer to text-based /help fallback (all 5 languages) - Improve MsgTimerAddUsage with both relative and absolute time examples Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(timer): use local timezone for absolute time parsing When a user specifies an absolute time without timezone (e.g. "9:00"), it should be interpreted as local time, not UTC. Use time.ParseInLocation with time.Local for layouts that don't include a timezone component. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * docs(timer): clarify local timezone for absolute time Absolute times without timezone (e.g. "2026-05-16T09:00") use the system's local timezone, not UTC. This is now documented in: - Agent system prompt (core/interfaces.go) - /timer usage message (MsgTimerUsage, all 5 languages) - /timer add usage message (MsgTimerAddUsage, all 5 languages) - CLI --help text (cmd/cc-connect/timer.go) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(timer): use time.Until instead of Sub(time.Now()) for lint Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix(timer): sync cron fixes for session_key validation and slash prompt expansion Reference PR #973: reject empty session_key in validateTimerJob so management API doesn't persist unrunnable timer jobs. Reference PR #928: resolve /skill slash prompts through skill registry in ExecuteTimerJob before constructing the agent message. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * fix: lint - errcheck resp.Body.Close, staticcheck WriteString(fmt.Sprintf) * fix: two more WriteString(fmt.Sprintf) → fmt.Fprintf --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
520 lines
13 KiB
Go
520 lines
13 KiB
Go
package core
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// TimerJob represents a persisted one-shot timer.
|
|
type TimerJob struct {
|
|
ID string `json:"id"`
|
|
Project string `json:"project"`
|
|
SessionKey string `json:"session_key"`
|
|
ScheduledAt time.Time `json:"scheduled_at"` // absolute fire time
|
|
Prompt string `json:"prompt"`
|
|
Exec string `json:"exec,omitempty"` // shell command; mutually exclusive with Prompt
|
|
WorkDir string `json:"work_dir,omitempty"` // working directory for exec; empty = agent work_dir
|
|
Description string `json:"description"`
|
|
Silent *bool `json:"silent,omitempty"` // suppress start notification; nil = use global default
|
|
Mute bool `json:"mute,omitempty"` // suppress ALL messages (start + result)
|
|
SessionMode string `json:"session_mode,omitempty"` // "" or "reuse" = share active session; "new_per_run" = fresh session each run
|
|
Mode string `json:"mode,omitempty"` // permission mode override; "" = use project default
|
|
TimeoutMins *int `json:"timeout_mins,omitempty"` // nil = default 30m; 0 = no limit; >0 = minutes
|
|
CreatedAt time.Time `json:"created_at"`
|
|
Fired bool `json:"fired"`
|
|
FiredAt time.Time `json:"fired_at,omitempty"`
|
|
LastError string `json:"last_error,omitempty"`
|
|
}
|
|
|
|
// IsShellJob returns true if the job runs a shell command directly.
|
|
func (j *TimerJob) IsShellJob() bool {
|
|
return j.Exec != ""
|
|
}
|
|
|
|
const defaultTimerJobTimeout = 30 * time.Minute
|
|
|
|
// ExecutionTimeout returns how long the scheduler waits for the job goroutine to finish.
|
|
func (j *TimerJob) ExecutionTimeout() time.Duration {
|
|
if j.TimeoutMins == nil {
|
|
return defaultTimerJobTimeout
|
|
}
|
|
if *j.TimeoutMins <= 0 {
|
|
return 0
|
|
}
|
|
return time.Duration(*j.TimeoutMins) * time.Minute
|
|
}
|
|
|
|
// UsesNewSessionPerRun reports whether the timer should use a new engine session.
|
|
func (j *TimerJob) UsesNewSessionPerRun() bool {
|
|
return NormalizeCronSessionMode(j.SessionMode) == "new_per_run"
|
|
}
|
|
|
|
func validateTimerJob(j *TimerJob) error {
|
|
if strings.TrimSpace(j.SessionKey) == "" {
|
|
return fmt.Errorf("session_key is required")
|
|
}
|
|
if j.ScheduledAt.IsZero() {
|
|
return fmt.Errorf("scheduled_at is required")
|
|
}
|
|
if j.Prompt == "" && j.Exec == "" {
|
|
return fmt.Errorf("either prompt or exec is required")
|
|
}
|
|
if j.Prompt != "" && j.Exec != "" {
|
|
return fmt.Errorf("prompt and exec are mutually exclusive")
|
|
}
|
|
mode := NormalizeCronSessionMode(j.SessionMode)
|
|
if mode != "" && mode != "new_per_run" {
|
|
return fmt.Errorf("invalid session_mode %q (want reuse, new_per_run, or new-per-run)", j.SessionMode)
|
|
}
|
|
if j.Mode != "" {
|
|
switch j.Mode {
|
|
case "default", "bypassPermissions", "acceptEdits", "plan", "auto", "dontAsk":
|
|
default:
|
|
return fmt.Errorf("invalid mode %q", j.Mode)
|
|
}
|
|
}
|
|
if j.TimeoutMins != nil && *j.TimeoutMins < 0 {
|
|
return fmt.Errorf("timeout_mins must be >= 0")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TimerStore persists timer jobs to a JSON file.
|
|
type TimerStore struct {
|
|
path string
|
|
mu sync.Mutex
|
|
jobs []*TimerJob
|
|
}
|
|
|
|
func NewTimerStore(dataDir string) (*TimerStore, error) {
|
|
dir := filepath.Join(dataDir, "timers")
|
|
if err := os.MkdirAll(dir, 0o755); err != nil {
|
|
return nil, err
|
|
}
|
|
path := filepath.Join(dir, "jobs.json")
|
|
s := &TimerStore{path: path}
|
|
s.load()
|
|
return s, nil
|
|
}
|
|
|
|
func (s *TimerStore) load() {
|
|
data, err := os.ReadFile(s.path)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if err := json.Unmarshal(data, &s.jobs); err != nil {
|
|
slog.Error("timer: failed to load jobs", "path", s.path, "error", err)
|
|
}
|
|
}
|
|
|
|
func (s *TimerStore) save() error {
|
|
data, err := json.MarshalIndent(s.jobs, "", " ")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return AtomicWriteFile(s.path, data, 0o644)
|
|
}
|
|
|
|
func (s *TimerStore) Add(job *TimerJob) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.jobs = append(s.jobs, job)
|
|
return s.save()
|
|
}
|
|
|
|
func (s *TimerStore) Remove(id string) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
for i, j := range s.jobs {
|
|
if j.ID == id {
|
|
s.jobs = append(s.jobs[:i], s.jobs[i+1:]...)
|
|
if err := s.save(); err != nil {
|
|
slog.Warn("timer: failed to save after remove", "error", err)
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *TimerStore) SetMute(id string, mute bool) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
for _, j := range s.jobs {
|
|
if j.ID == id {
|
|
j.Mute = mute
|
|
if err := s.save(); err != nil {
|
|
slog.Warn("timer: save after mute toggle", "error", err)
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (s *TimerStore) MarkFired(id string, err error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
for _, j := range s.jobs {
|
|
if j.ID == id {
|
|
j.Fired = true
|
|
j.FiredAt = time.Now()
|
|
if err != nil {
|
|
j.LastError = err.Error()
|
|
} else {
|
|
j.LastError = ""
|
|
}
|
|
if saveErr := s.save(); saveErr != nil {
|
|
slog.Warn("timer: failed to save after mark fired", "error", saveErr)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *TimerStore) List() []*TimerJob {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
out := make([]*TimerJob, len(s.jobs))
|
|
copy(out, s.jobs)
|
|
return out
|
|
}
|
|
|
|
func (s *TimerStore) ListByProject(project string) []*TimerJob {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
var out []*TimerJob
|
|
for _, j := range s.jobs {
|
|
if j.Project == project {
|
|
out = append(out, j)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (s *TimerStore) ListBySessionKey(sessionKey string) []*TimerJob {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
var out []*TimerJob
|
|
for _, j := range s.jobs {
|
|
if j.SessionKey == sessionKey {
|
|
out = append(out, j)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (s *TimerStore) Get(id string) *TimerJob {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
for _, j := range s.jobs {
|
|
if j.ID == id {
|
|
return j
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListPending returns all non-fired timer jobs.
|
|
func (s *TimerStore) ListPending() []*TimerJob {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
var out []*TimerJob
|
|
for _, j := range s.jobs {
|
|
if !j.Fired {
|
|
out = append(out, j)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// TimerScheduler runs one-shot timer jobs using time.AfterFunc.
|
|
type TimerScheduler struct {
|
|
store *TimerStore
|
|
engines map[string]*Engine
|
|
mu sync.RWMutex
|
|
timers map[string]*time.Timer // job ID → active timer
|
|
defaultSilent bool
|
|
defaultSessionMode string
|
|
}
|
|
|
|
// missedJobGracePeriod is how long after a missed fire time we still execute.
|
|
// Older missed jobs are logged and skipped.
|
|
const missedJobGracePeriod = 5 * time.Minute
|
|
|
|
func NewTimerScheduler(store *TimerStore) *TimerScheduler {
|
|
return &TimerScheduler{
|
|
store: store,
|
|
engines: make(map[string]*Engine),
|
|
timers: make(map[string]*time.Timer),
|
|
}
|
|
}
|
|
|
|
func (ts *TimerScheduler) RegisterEngine(name string, e *Engine) {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
ts.engines[name] = e
|
|
}
|
|
|
|
// SetDefaultSilent sets the default silent mode for timer jobs.
|
|
// Must be called before Start (not safe for concurrent use).
|
|
func (ts *TimerScheduler) SetDefaultSilent(silent bool) {
|
|
ts.defaultSilent = silent
|
|
}
|
|
|
|
// SetDefaultSessionMode sets the default session mode for timer jobs.
|
|
// Must be called before Start (not safe for concurrent use).
|
|
func (ts *TimerScheduler) SetDefaultSessionMode(mode string) {
|
|
ts.defaultSessionMode = NormalizeCronSessionMode(mode)
|
|
}
|
|
|
|
// IsSilent returns whether the timer job should suppress the start notification.
|
|
func (ts *TimerScheduler) IsSilent(job *TimerJob) bool {
|
|
if job.Silent != nil {
|
|
return *job.Silent
|
|
}
|
|
return ts.defaultSilent
|
|
}
|
|
|
|
// UsesNewSession returns whether the job should create a fresh session.
|
|
func (ts *TimerScheduler) UsesNewSession(job *TimerJob) bool {
|
|
if job.SessionMode != "" {
|
|
return job.UsesNewSessionPerRun()
|
|
}
|
|
return ts.defaultSessionMode == "new_per_run"
|
|
}
|
|
|
|
func (ts *TimerScheduler) Start() error {
|
|
jobs := ts.store.List()
|
|
now := time.Now()
|
|
var scheduled, missed, skipped int
|
|
for _, job := range jobs {
|
|
if job.Fired {
|
|
continue
|
|
}
|
|
delay := job.ScheduledAt.Sub(now)
|
|
if delay <= 0 {
|
|
// Past due
|
|
if -delay <= missedJobGracePeriod {
|
|
// Just missed — fire immediately
|
|
slog.Info("timer: firing missed job immediately", "id", job.ID, "overdue", -delay)
|
|
ts.scheduleAt(job, 0)
|
|
missed++
|
|
} else {
|
|
slog.Warn("timer: skipping stale job", "id", job.ID, "scheduled_at", job.ScheduledAt, "overdue", -delay)
|
|
ts.store.MarkFired(job.ID, fmt.Errorf("missed by %v (stale)", -delay))
|
|
skipped++
|
|
}
|
|
} else {
|
|
ts.scheduleAt(job, delay)
|
|
scheduled++
|
|
}
|
|
}
|
|
slog.Info("timer: scheduler started", "scheduled", scheduled, "missed_fired", missed, "skipped_stale", skipped, "total", len(jobs))
|
|
return nil
|
|
}
|
|
|
|
func (ts *TimerScheduler) Stop() {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
for id, t := range ts.timers {
|
|
t.Stop()
|
|
delete(ts.timers, id)
|
|
}
|
|
}
|
|
|
|
func (ts *TimerScheduler) AddJob(job *TimerJob) error {
|
|
if err := validateTimerJob(job); err != nil {
|
|
return err
|
|
}
|
|
job.SessionMode = NormalizeCronSessionMode(job.SessionMode)
|
|
if err := ts.store.Add(job); err != nil {
|
|
return err
|
|
}
|
|
delay := time.Until(job.ScheduledAt)
|
|
if delay <= 0 {
|
|
// Already due — fire immediately
|
|
slog.Info("timer: new job already due, firing immediately", "id", job.ID)
|
|
ts.scheduleAt(job, 0)
|
|
} else {
|
|
ts.scheduleAt(job, delay)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ts *TimerScheduler) RemoveJob(id string) bool {
|
|
ts.mu.Lock()
|
|
if t, ok := ts.timers[id]; ok {
|
|
t.Stop()
|
|
delete(ts.timers, id)
|
|
}
|
|
// Remove from store while still holding ts.mu to prevent executeJob
|
|
// from running a job that was just cancelled (the AfterFunc callback
|
|
// also needs ts.mu, so it will see the store entry is gone).
|
|
removed := ts.store.Remove(id)
|
|
ts.mu.Unlock()
|
|
return removed
|
|
}
|
|
|
|
func (ts *TimerScheduler) SetMute(id string, mute bool) bool {
|
|
return ts.store.SetMute(id, mute)
|
|
}
|
|
|
|
func (ts *TimerScheduler) Store() *TimerStore {
|
|
return ts.store
|
|
}
|
|
|
|
func (ts *TimerScheduler) scheduleAt(job *TimerJob, delay time.Duration) {
|
|
jobID := job.ID
|
|
ts.mu.Lock()
|
|
// Stop any existing timer for this job
|
|
if old, ok := ts.timers[jobID]; ok {
|
|
old.Stop()
|
|
}
|
|
// Register timer in map before it can fire, so RemoveJob can always cancel it.
|
|
t := time.AfterFunc(delay, func() {
|
|
ts.mu.Lock()
|
|
delete(ts.timers, jobID)
|
|
ts.mu.Unlock()
|
|
ts.executeJob(jobID)
|
|
})
|
|
ts.timers[jobID] = t
|
|
ts.mu.Unlock()
|
|
}
|
|
|
|
func (ts *TimerScheduler) executeJob(jobID string) {
|
|
job := ts.store.Get(jobID)
|
|
if job == nil || job.Fired {
|
|
return
|
|
}
|
|
|
|
ts.mu.RLock()
|
|
engine, ok := ts.engines[job.Project]
|
|
ts.mu.RUnlock()
|
|
|
|
if !ok {
|
|
slog.Error("timer: project not found", "job", jobID, "project", job.Project)
|
|
ts.store.MarkFired(jobID, fmt.Errorf("project %q not found", job.Project))
|
|
return
|
|
}
|
|
|
|
slog.Info("timer: executing job", "id", jobID, "project", job.Project, "prompt", truncateStr(job.Prompt, 60))
|
|
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
done <- engine.ExecuteTimerJob(job)
|
|
}()
|
|
|
|
var err error
|
|
timeout := job.ExecutionTimeout()
|
|
if timeout > 0 {
|
|
select {
|
|
case err = <-done:
|
|
case <-time.After(timeout):
|
|
err = fmt.Errorf("job timed out after %v", timeout)
|
|
}
|
|
} else {
|
|
err = <-done
|
|
}
|
|
|
|
ts.store.MarkFired(jobID, err)
|
|
|
|
if err != nil {
|
|
slog.Error("timer: job failed", "id", jobID, "error", err)
|
|
} else {
|
|
slog.Info("timer: job completed", "id", jobID)
|
|
}
|
|
}
|
|
|
|
func GenerateTimerID() string {
|
|
b := make([]byte, 4)
|
|
if _, err := rand.Read(b); err != nil {
|
|
panic(fmt.Errorf("generate timer id: %w", err))
|
|
}
|
|
return hex.EncodeToString(b)
|
|
}
|
|
|
|
// ParseDelayOrTime parses a relative duration ("2h", "30m", "1h30m") or
|
|
// an absolute ISO time ("2026-05-15T14:00", "2026-05-15T14:00:00+08:00")
|
|
// and returns the absolute fire time.
|
|
func ParseDelayOrTime(s string) (time.Time, error) {
|
|
s = strings.TrimSpace(s)
|
|
if s == "" {
|
|
return time.Time{}, fmt.Errorf("empty delay or time")
|
|
}
|
|
|
|
// Try as a Go duration first (e.g., "2h", "30m", "1h30m", "2h30m15s")
|
|
if d, err := time.ParseDuration(s); err == nil {
|
|
if d <= 0 {
|
|
return time.Time{}, fmt.Errorf("delay must be positive")
|
|
}
|
|
return time.Now().Add(d), nil
|
|
}
|
|
|
|
// Try ISO time formats
|
|
// RFC3339 includes timezone (e.g. "2026-05-15T14:00:00+08:00"),
|
|
// so it's parsed directly. The other layouts have no timezone
|
|
// and are interpreted in the system's local timezone.
|
|
layouts := []struct {
|
|
layout string
|
|
local bool
|
|
}{
|
|
{time.RFC3339, false},
|
|
{"2006-01-02T15:04:05", true},
|
|
{"2006-01-02T15:04", true},
|
|
{"2006-01-02 15:04:05", true},
|
|
{"2006-01-02 15:04", true},
|
|
}
|
|
for _, l := range layouts {
|
|
if l.local {
|
|
if t, err := time.ParseInLocation(l.layout, s, time.Local); err == nil {
|
|
return t, nil
|
|
}
|
|
} else {
|
|
if t, err := time.Parse(l.layout, s); err == nil {
|
|
return t, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return time.Time{}, fmt.Errorf("invalid delay or time %q (use duration like 2h30m, or ISO time like 2026-05-15T14:00)", s)
|
|
}
|
|
|
|
// FormatTimerRemaining returns a human-readable string for time remaining
|
|
// until the scheduled fire time.
|
|
func FormatTimerRemaining(scheduledAt time.Time) string {
|
|
d := time.Until(scheduledAt)
|
|
if d <= 0 {
|
|
return "overdue"
|
|
}
|
|
if d < time.Minute {
|
|
secs := int(d.Seconds() + 0.5) // round up for countdown display
|
|
if secs >= 60 {
|
|
return "1m"
|
|
}
|
|
return fmt.Sprintf("%ds", secs)
|
|
}
|
|
if d < time.Hour {
|
|
mins := int(d.Minutes() + 0.5) // round up for countdown display
|
|
if mins >= 60 {
|
|
return "1h"
|
|
}
|
|
return fmt.Sprintf("%dm", mins)
|
|
}
|
|
totalMins := int(d.Minutes() + 0.5) // round up
|
|
hours := totalMins / 60
|
|
mins := totalMins % 60
|
|
if mins == 0 {
|
|
return fmt.Sprintf("%dh", hours)
|
|
}
|
|
return fmt.Sprintf("%dh%dm", hours, mins)
|
|
}
|