diff --git a/internal/store/cron_store.go b/internal/store/cron_store.go index 6f6583310..49bce8843 100644 --- a/internal/store/cron_store.go +++ b/internal/store/cron_store.go @@ -12,8 +12,9 @@ import ( ) var ( - ErrCronJobNotFound = errors.New("cron job not found") - ErrCronJobNoFutureRun = errors.New("cron job has no future run") + ErrCronJobNotFound = errors.New("cron job not found") + ErrCronJobNoFutureRun = errors.New("cron job has no future run") + ErrCronJobDuplicateName = errors.New("an active cron job with this name already exists") ) // CronJob represents a scheduled job. diff --git a/internal/store/pg/cron_crud.go b/internal/store/pg/cron_crud.go index aad26ee3d..5618caf74 100644 --- a/internal/store/pg/cron_crud.go +++ b/internal/store/pg/cron_crud.go @@ -3,11 +3,13 @@ package pg import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "time" "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgconn" "github.com/nextlevelbuilder/goclaw/internal/store" ) @@ -74,6 +76,13 @@ func (s *PGCronStore) AddJob(ctx context.Context, name string, schedule store.Cr intervalMS, payloadJSON, deleteAfterRun, deliver, channel, to, false, nextRun, now, now, ) if err != nil { + // Unique constraint violation (race condition) — return existing job instead of error. + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.Code == "23505" && pgErr.ConstraintName == "idx_cron_jobs_unique_active_name" { + if existing := s.findJobByName(ctx, name, agentID, userID); existing != nil { + return existing, nil + } + } return nil, fmt.Errorf("create cron job: %w", err) } @@ -218,3 +227,14 @@ func (s *PGCronStore) EnableJob(ctx context.Context, jobID string, enabled bool) s.InvalidateCache() return nil } + +// findJobByName returns the first enabled job matching name for the given agent+user scope. +func (s *PGCronStore) findJobByName(ctx context.Context, name, agentID, userID string) *store.CronJob { + jobs := s.ListJobs(ctx, false, agentID, userID) + for _, j := range jobs { + if j.Name == name { + return &j + } + } + return nil +} diff --git a/internal/store/pg/cron_update.go b/internal/store/pg/cron_update.go index a7c859704..bca1a37d8 100644 --- a/internal/store/pg/cron_update.go +++ b/internal/store/pg/cron_update.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgconn" "github.com/nextlevelbuilder/goclaw/internal/store" ) @@ -212,6 +213,10 @@ func execCronJobUpdateTx(ctx context.Context, tx *sql.Tx, id uuid.UUID, updates res, err := tx.ExecContext(ctx, q, args...) if err != nil { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.Code == "23505" && pgErr.ConstraintName == "idx_cron_jobs_unique_active_name" { + return store.ErrCronJobDuplicateName + } return err } if n, _ := res.RowsAffected(); n == 0 { diff --git a/internal/store/sqlitestore/cron_crud.go b/internal/store/sqlitestore/cron_crud.go index cefcdacaa..875382a0e 100644 --- a/internal/store/sqlitestore/cron_crud.go +++ b/internal/store/sqlitestore/cron_crud.go @@ -77,6 +77,12 @@ func (s *SQLiteCronStore) AddJob(ctx context.Context, name string, schedule stor intervalMS, payloadJSON, deleteAfterRun, deliver, channel, to, false, nextRun, now, now, ) if err != nil { + // Handle unique constraint violation (race condition with concurrent tool calls). + if strings.Contains(err.Error(), "UNIQUE constraint failed") || strings.Contains(err.Error(), "idx_cron_jobs_unique_active_name") { + if existing := s.findJobByName(ctx, name, agentID, userID); existing != nil { + return existing, nil + } + } return nil, fmt.Errorf("create cron job: %w", err) } @@ -426,6 +432,9 @@ func execCronJobUpdateTx(ctx context.Context, tx *sql.Tx, id uuid.UUID, updates res, err := tx.ExecContext(ctx, q, args...) if err != nil { + if strings.Contains(err.Error(), "UNIQUE constraint failed") { + return store.ErrCronJobDuplicateName + } return err } if n, _ := res.RowsAffected(); n == 0 { @@ -434,3 +443,14 @@ func execCronJobUpdateTx(ctx context.Context, tx *sql.Tx, id uuid.UUID, updates return nil } +// findJobByName returns the first enabled job matching name for the given agent+user scope. +func (s *SQLiteCronStore) findJobByName(ctx context.Context, name, agentID, userID string) *store.CronJob { + jobs := s.ListJobs(ctx, false, agentID, userID) + for _, j := range jobs { + if j.Name == name { + return &j + } + } + return nil +} + diff --git a/internal/tools/cron.go b/internal/tools/cron.go index 7eaab3ad5..06389fe48 100644 --- a/internal/tools/cron.go +++ b/internal/tools/cron.go @@ -211,6 +211,19 @@ func (t *CronTool) handleAdd(ctx context.Context, args map[string]any, agentID, return ErrorResult("job.message is required") } + // Dedup: return existing enabled job with same name for this agent+user. + existingJobs := t.cronStore.ListJobs(ctx, false, agentID, userID) + for _, ej := range existingJobs { + if ej.Name == name { + data, _ := json.MarshalIndent(map[string]any{ + "existing": true, + "job": ej, + "note": fmt.Sprintf("cron job %q already exists (id: %s). Use 'update' action to modify it.", name, ej.ID), + }, "", " ") + return NewResult(string(data)) + } + } + // Parse schedule schedule := store.CronSchedule{ Kind: stringFromMap(scheduleObj, "kind"), diff --git a/internal/upgrade/version.go b/internal/upgrade/version.go index 53f28a101..906468fda 100644 --- a/internal/upgrade/version.go +++ b/internal/upgrade/version.go @@ -2,4 +2,4 @@ package upgrade // RequiredSchemaVersion is the schema migration version this binary requires. // Bump this whenever adding a new SQL migration file. -const RequiredSchemaVersion uint = 36 +const RequiredSchemaVersion uint = 37 diff --git a/migrations/000037_cron_unique_active_name.down.sql b/migrations/000037_cron_unique_active_name.down.sql new file mode 100644 index 000000000..ccdc9852f --- /dev/null +++ b/migrations/000037_cron_unique_active_name.down.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS idx_cron_jobs_unique_active_name; diff --git a/migrations/000037_cron_unique_active_name.up.sql b/migrations/000037_cron_unique_active_name.up.sql new file mode 100644 index 000000000..8f70f1522 --- /dev/null +++ b/migrations/000037_cron_unique_active_name.up.sql @@ -0,0 +1,19 @@ +-- Rename duplicate active jobs before adding constraint (safe: no data loss, reversible). +-- Oldest job (smallest created_at) keeps its name; newer duplicates get a '-dup-{id[:8]}' suffix. +WITH dupes AS ( + SELECT id, ROW_NUMBER() OVER ( + PARTITION BY tenant_id, COALESCE(agent_id, '00000000-0000-0000-0000-000000000000'), + COALESCE(user_id, ''), name + ORDER BY created_at ASC + ) AS rn + FROM cron_jobs + WHERE enabled = true +) +UPDATE cron_jobs +SET name = name || '-dup-' || substr(id::text, 1, 8) +WHERE id IN (SELECT id FROM dupes WHERE rn > 1); + +-- Partial unique index: one active job per name per tenant+agent+user. +CREATE UNIQUE INDEX idx_cron_jobs_unique_active_name + ON cron_jobs (tenant_id, COALESCE(agent_id, '00000000-0000-0000-0000-000000000000'), COALESCE(user_id, ''), name) + WHERE enabled = true;