Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/store/cron_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
)

var (
ErrCronJobNotFound = errors.New("cron job not found")
ErrCronJobNoFutureRun = errors.New("cron job has no future run")
ErrCronJobNotFound = errors.New("cron job not found")
ErrCronJobNoFutureRun = errors.New("cron job has no future run")
ErrCronJobDuplicateName = errors.New("an active cron job with this name already exists")
)

// CronJob represents a scheduled job.
Expand Down
20 changes: 20 additions & 0 deletions internal/store/pg/cron_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package pg
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgconn"

"github.com/nextlevelbuilder/goclaw/internal/store"
)
Expand Down Expand Up @@ -74,6 +76,13 @@ func (s *PGCronStore) AddJob(ctx context.Context, name string, schedule store.Cr
intervalMS, payloadJSON, deleteAfterRun, deliver, channel, to, false, nextRun, now, now,
)
if err != nil {
// Unique constraint violation (race condition) — return existing job instead of error.
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == "23505" && pgErr.ConstraintName == "idx_cron_jobs_unique_active_name" {
if existing := s.findJobByName(ctx, name, agentID, userID); existing != nil {
return existing, nil
}
}
return nil, fmt.Errorf("create cron job: %w", err)
}

Expand Down Expand Up @@ -218,3 +227,14 @@ func (s *PGCronStore) EnableJob(ctx context.Context, jobID string, enabled bool)
s.InvalidateCache()
return nil
}

// findJobByName returns the first enabled job matching name for the given agent+user scope.
func (s *PGCronStore) findJobByName(ctx context.Context, name, agentID, userID string) *store.CronJob {
jobs := s.ListJobs(ctx, false, agentID, userID)
for _, j := range jobs {
if j.Name == name {
return &j
}
}
return nil
}
5 changes: 5 additions & 0 deletions internal/store/pg/cron_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgconn"

"github.com/nextlevelbuilder/goclaw/internal/store"
)
Expand Down Expand Up @@ -212,6 +213,10 @@ func execCronJobUpdateTx(ctx context.Context, tx *sql.Tx, id uuid.UUID, updates

res, err := tx.ExecContext(ctx, q, args...)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == "23505" && pgErr.ConstraintName == "idx_cron_jobs_unique_active_name" {
return store.ErrCronJobDuplicateName
}
return err
}
if n, _ := res.RowsAffected(); n == 0 {
Expand Down
20 changes: 20 additions & 0 deletions internal/store/sqlitestore/cron_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func (s *SQLiteCronStore) AddJob(ctx context.Context, name string, schedule stor
intervalMS, payloadJSON, deleteAfterRun, deliver, channel, to, false, nextRun, now, now,
)
if err != nil {
// Handle unique constraint violation (race condition with concurrent tool calls).
if strings.Contains(err.Error(), "UNIQUE constraint failed") || strings.Contains(err.Error(), "idx_cron_jobs_unique_active_name") {
if existing := s.findJobByName(ctx, name, agentID, userID); existing != nil {
return existing, nil
}
}
return nil, fmt.Errorf("create cron job: %w", err)
}

Expand Down Expand Up @@ -426,6 +432,9 @@ func execCronJobUpdateTx(ctx context.Context, tx *sql.Tx, id uuid.UUID, updates

res, err := tx.ExecContext(ctx, q, args...)
if err != nil {
if strings.Contains(err.Error(), "UNIQUE constraint failed") {
return store.ErrCronJobDuplicateName
}
return err
}
if n, _ := res.RowsAffected(); n == 0 {
Expand All @@ -434,3 +443,14 @@ func execCronJobUpdateTx(ctx context.Context, tx *sql.Tx, id uuid.UUID, updates
return nil
}

// findJobByName returns the first enabled job matching name for the given agent+user scope.
func (s *SQLiteCronStore) findJobByName(ctx context.Context, name, agentID, userID string) *store.CronJob {
jobs := s.ListJobs(ctx, false, agentID, userID)
for _, j := range jobs {
if j.Name == name {
return &j
}
}
return nil
}

13 changes: 13 additions & 0 deletions internal/tools/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,19 @@ func (t *CronTool) handleAdd(ctx context.Context, args map[string]any, agentID,
return ErrorResult("job.message is required")
}

// Dedup: return existing enabled job with same name for this agent+user.
existingJobs := t.cronStore.ListJobs(ctx, false, agentID, userID)
for _, ej := range existingJobs {
if ej.Name == name {
data, _ := json.MarshalIndent(map[string]any{
"existing": true,
"job": ej,
"note": fmt.Sprintf("cron job %q already exists (id: %s). Use 'update' action to modify it.", name, ej.ID),
}, "", " ")
return NewResult(string(data))
}
}

// Parse schedule
schedule := store.CronSchedule{
Kind: stringFromMap(scheduleObj, "kind"),
Expand Down
2 changes: 1 addition & 1 deletion internal/upgrade/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ package upgrade

// RequiredSchemaVersion is the schema migration version this binary requires.
// Bump this whenever adding a new SQL migration file.
const RequiredSchemaVersion uint = 36
const RequiredSchemaVersion uint = 37
1 change: 1 addition & 0 deletions migrations/000037_cron_unique_active_name.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX IF EXISTS idx_cron_jobs_unique_active_name;
19 changes: 19 additions & 0 deletions migrations/000037_cron_unique_active_name.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Rename duplicate active jobs before adding constraint (safe: no data loss, reversible).
-- Oldest job (smallest created_at) keeps its name; newer duplicates get a '-dup-{id[:8]}' suffix.
WITH dupes AS (
SELECT id, ROW_NUMBER() OVER (
PARTITION BY tenant_id, COALESCE(agent_id, '00000000-0000-0000-0000-000000000000'),
COALESCE(user_id, ''), name
ORDER BY created_at ASC
) AS rn
FROM cron_jobs
WHERE enabled = true
)
UPDATE cron_jobs
SET name = name || '-dup-' || substr(id::text, 1, 8)
WHERE id IN (SELECT id FROM dupes WHERE rn > 1);

-- Partial unique index: one active job per name per tenant+agent+user.
CREATE UNIQUE INDEX idx_cron_jobs_unique_active_name
ON cron_jobs (tenant_id, COALESCE(agent_id, '00000000-0000-0000-0000-000000000000'), COALESCE(user_id, ''), name)
WHERE enabled = true;
Loading