diff --git a/db/migrations/check_in_constraints_migration.sql b/db/migrations/check_in_constraints_migration.sql new file mode 100644 index 0000000..e43fafe --- /dev/null +++ b/db/migrations/check_in_constraints_migration.sql @@ -0,0 +1,86 @@ +-- Migration: Add CHECK constraints to check-in tables +-- Purpose: Enforce data integrity constraints identified in code review +-- Related: TASK-002 Critical Issue #3 + +-- Add CHECK constraints to agent_check_in_configs table +ALTER TABLE agent_check_in_configs +ADD CONSTRAINT chk_timeout_seconds_positive +CHECK (timeout_seconds >= 0); + +ALTER TABLE agent_check_in_configs +ADD CONSTRAINT chk_retry_attempts_non_negative +CHECK (retry_attempts >= 0); + +ALTER TABLE agent_check_in_configs +ADD CONSTRAINT chk_retry_delay_seconds_positive +CHECK (retry_delay_seconds >= 0); + +-- Add CHECK constraints to check_in_events table +ALTER TABLE check_in_events +ADD CONSTRAINT chk_retry_count_non_negative +CHECK (retry_count >= 0); + +ALTER TABLE check_in_events +ADD CONSTRAINT chk_latency_non_negative +CHECK (response_latency_ms IS NULL OR response_latency_ms >= 0); + +ALTER TABLE check_in_events +ADD CONSTRAINT chk_triggered_after_scheduled +CHECK (triggered_at >= scheduled_at OR triggered_at = '0001-01-01 00:00:00'); + +-- Add response consistency constraint (ensure response_received matches response_at state) +ALTER TABLE check_in_events +ADD CONSTRAINT chk_response_consistency +CHECK ( + (response_received = FALSE AND response_at IS NULL) OR + (response_received = TRUE AND response_at IS NOT NULL) +); + +-- Note: Cron schedule regex validation is enforced at API layer using robfig/cron parser +-- PostgreSQL CHECK constraints with regex are limited and can cause performance issues +-- See handlers_checkin.go lines 104-109 for cron validation via cron.NewParser() + +-- Add foreign key constraints for referential integrity and CASCADE cleanup +ALTER TABLE agent_check_in_configs +ADD CONSTRAINT fk_agent_checkin_config_agent +FOREIGN KEY (space_name, agent_name) +REFERENCES agents(space_name, agent_name) +ON DELETE CASCADE; + +ALTER TABLE check_in_events +ADD CONSTRAINT fk_checkin_event_config +FOREIGN KEY (space_name, agent_name) +REFERENCES agent_check_in_configs(space_name, agent_name) +ON DELETE CASCADE; + +-- Verify indexes exist (these should be auto-created by GORM, but we verify) +-- Partial indexes: +-- 1. idx_checkin_enabled on agent_check_in_configs WHERE check_in_enabled = true +-- 2. idx_pending_checkins on check_in_events WHERE response_received = false AND message_sent = true + +-- Composite indexes: +-- 3. idx_checkin_space_agent on agent_check_in_configs (space_name, agent_name) - unique +-- 4. idx_event_space_agent on check_in_events (space_name, agent_name) +-- 5. idx_event_agent_time on check_in_events (agent_name, triggered_at DESC) + +-- Regular indexes: +-- 6. agent_check_in_configs.space_name +-- 7. check_in_events.space_name +-- 8. check_in_events.agent_name +-- 9. check_in_events.message_id +-- 10. check_in_scheduler_locks.locked_at +-- 11. check_in_scheduler_locks.expires_at + +-- Query to verify all indexes are created: +-- SELECT tablename, indexname, indexdef FROM pg_indexes +-- WHERE tablename IN ('agent_check_in_configs', 'check_in_events', 'check_in_scheduler_locks') +-- ORDER BY tablename, indexname; + +-- Query to verify CHECK constraints: +-- SELECT conname, contype, pg_get_constraintdef(oid) +-- FROM pg_constraint +-- WHERE conrelid IN ( +-- 'agent_check_in_configs'::regclass, +-- 'check_in_events'::regclass +-- ) AND contype = 'c' +-- ORDER BY conrelid::regclass::text, conname; diff --git a/db/migrations/check_in_constraints_rollback.sql b/db/migrations/check_in_constraints_rollback.sql new file mode 100644 index 0000000..4a4d2fc --- /dev/null +++ b/db/migrations/check_in_constraints_rollback.sql @@ -0,0 +1,41 @@ +-- Rollback Migration: Remove CHECK constraints and FK constraints from check-in tables +-- Purpose: Rollback script for check_in_constraints_migration.sql +-- Related: TASK-002 Critical Issue #3 + +-- Remove FK constraints first (order matters for dependencies) +ALTER TABLE check_in_events +DROP CONSTRAINT IF EXISTS fk_checkin_event_config; + +ALTER TABLE agent_check_in_configs +DROP CONSTRAINT IF EXISTS fk_agent_checkin_config_agent; + +-- Remove CHECK constraints from check_in_events table +ALTER TABLE check_in_events +DROP CONSTRAINT IF EXISTS chk_response_consistency; + +ALTER TABLE check_in_events +DROP CONSTRAINT IF EXISTS chk_triggered_after_scheduled; + +ALTER TABLE check_in_events +DROP CONSTRAINT IF EXISTS chk_latency_non_negative; + +ALTER TABLE check_in_events +DROP CONSTRAINT IF EXISTS chk_retry_count_non_negative; + +-- Remove CHECK constraints from agent_check_in_configs table +ALTER TABLE agent_check_in_configs +DROP CONSTRAINT IF EXISTS chk_retry_delay_seconds_positive; + +ALTER TABLE agent_check_in_configs +DROP CONSTRAINT IF EXISTS chk_retry_attempts_non_negative; + +ALTER TABLE agent_check_in_configs +DROP CONSTRAINT IF EXISTS chk_timeout_seconds_positive; + +-- Note: This rollback does NOT remove indexes or tables, only the CHECK constraints +-- To fully rollback the check-in feature, use the full rollback migration below + +-- FULL ROLLBACK (USE WITH CAUTION - DESTRUCTIVE): +-- DROP TABLE IF EXISTS check_in_events CASCADE; +-- DROP TABLE IF EXISTS agent_check_in_configs CASCADE; +-- DROP TABLE IF EXISTS check_in_scheduler_locks CASCADE; diff --git a/db/migrations/verify_check_in_indexes.sql b/db/migrations/verify_check_in_indexes.sql new file mode 100644 index 0000000..c6e0bc7 --- /dev/null +++ b/db/migrations/verify_check_in_indexes.sql @@ -0,0 +1,102 @@ +-- Verification Script: Check-in System Database Indexes +-- Purpose: Verify all required indexes exist for the check-in feature +-- Related: TASK-002 Critical Issue #3 + +-- List all indexes on check-in tables (PostgreSQL) +SELECT + tablename, + indexname, + indexdef +FROM pg_indexes +WHERE tablename IN ('agent_check_in_configs', 'check_in_events', 'check_in_scheduler_locks') +ORDER BY tablename, indexname; + +-- Expected indexes (13 total): + +-- agent_check_in_configs (4 indexes): +-- 1. agent_check_in_configs_pkey (PRIMARY KEY on id) +-- 2. idx_checkin_space_agent (UNIQUE on space_name, agent_name) +-- 3. idx_checkin_enabled (PARTIAL INDEX WHERE check_in_enabled = true) +-- 4. idx_agent_check_in_configs_space_name (on space_name) + +-- check_in_events (7 indexes): +-- 5. check_in_events_pkey (PRIMARY KEY on id) +-- 6. idx_event_space_agent (on space_name, agent_name) +-- 7. idx_event_agent_time (on agent_name, triggered_at DESC) +-- 8. idx_pending_checkins (PARTIAL INDEX WHERE response_received = false AND message_sent = true) +-- 9. idx_check_in_events_space_name (on space_name) +-- 10. idx_check_in_events_agent_name (on agent_name) +-- 11. idx_check_in_events_message_id (on message_id) + +-- check_in_scheduler_locks (3 indexes): +-- 12. check_in_scheduler_locks_pkey (PRIMARY KEY on id) +-- 13. idx_check_in_scheduler_locks_locked_at (on locked_at) +-- 14. idx_check_in_scheduler_locks_expires_at (on expires_at) + +-- Total: 14 indexes (including 3 primary keys) +-- Partial indexes: 2 (idx_checkin_enabled, idx_pending_checkins) +-- Unique indexes: 1 (idx_checkin_space_agent) + +-- Verify partial indexes specifically: +SELECT + schemaname, + tablename, + indexname, + indexdef +FROM pg_indexes +WHERE tablename IN ('agent_check_in_configs', 'check_in_events') + AND indexdef LIKE '%WHERE%' +ORDER BY tablename, indexname; + +-- Expected output: +-- agent_check_in_configs | idx_checkin_enabled | ... WHERE check_in_enabled = true +-- check_in_events | idx_pending_checkins | ... WHERE response_received = false AND message_sent = true + +-- Verify CHECK constraints: +SELECT + conrelid::regclass AS table_name, + conname AS constraint_name, + pg_get_constraintdef(oid) AS constraint_definition +FROM pg_constraint +WHERE conrelid IN ( + 'agent_check_in_configs'::regclass, + 'check_in_events'::regclass +) +AND contype = 'c' +ORDER BY conrelid::regclass::text, conname; + +-- Expected CHECK constraints (7 total): +-- agent_check_in_configs | chk_timeout_seconds_positive | CHECK (timeout_seconds >= 0) +-- agent_check_in_configs | chk_retry_attempts_non_negative | CHECK (retry_attempts >= 0) +-- agent_check_in_configs | chk_retry_delay_seconds_positive | CHECK (retry_delay_seconds >= 0) +-- check_in_events | chk_retry_count_non_negative | CHECK (retry_count >= 0) +-- check_in_events | chk_latency_non_negative | CHECK (response_latency_ms IS NULL OR response_latency_ms >= 0) +-- check_in_events | chk_triggered_after_scheduled | CHECK (triggered_at >= scheduled_at OR triggered_at = '0001-01-01 00:00:00') +-- check_in_events | chk_response_consistency | CHECK ((response_received = FALSE AND response_at IS NULL) OR (response_received = TRUE AND response_at IS NOT NULL)) + +-- Note: Cron schedule regex validation is enforced at API layer (handlers_checkin.go:104-109) +-- using robfig/cron parser. Database CHECK constraints with regex cause performance issues. + +-- Verify foreign key constraints: +SELECT + conrelid::regclass AS table_name, + conname AS constraint_name, + pg_get_constraintdef(oid) AS constraint_definition +FROM pg_constraint +WHERE conrelid IN ( + 'agent_check_in_configs'::regclass, + 'check_in_events'::regclass +) +AND contype = 'f' +ORDER BY conrelid::regclass::text, conname; + +-- Expected FK constraints (2 total): +-- agent_check_in_configs | fk_agent_checkin_config_agent | FOREIGN KEY (space_name, agent_name) REFERENCES agents(space_name, agent_name) ON DELETE CASCADE +-- check_in_events | fk_checkin_event_config | FOREIGN KEY (space_name, agent_name) REFERENCES agent_check_in_configs(space_name, agent_name) ON DELETE CASCADE + +-- For SQLite (used in development): +-- SQLite stores index info differently, use: +-- SELECT name, sql FROM sqlite_master WHERE type='index' AND tbl_name IN ('agent_check_in_configs', 'check_in_events', 'check_in_scheduler_locks'); +-- For FK constraints in SQLite: +-- PRAGMA foreign_key_list('agent_check_in_configs'); +-- PRAGMA foreign_key_list('check_in_events'); diff --git a/go.mod b/go.mod index af67317..551fa22 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,8 @@ require ( ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/glebarez/go-sqlite v1.21.2 // indirect github.com/google/jsonschema-go v0.4.2 // indirect @@ -21,16 +23,24 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/mattn/go-isatty v0.0.17 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_golang v1.23.2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/segmentio/asm v1.1.3 // indirect github.com/segmentio/encoding v0.5.3 // indirect - github.com/stretchr/testify v1.9.0 // indirect + github.com/stretchr/testify v1.11.1 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/crypto v0.31.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect - golang.org/x/sync v0.10.0 // indirect + golang.org/x/sync v0.16.0 // indirect golang.org/x/sys v0.40.0 // indirect - golang.org/x/text v0.21.0 // indirect + golang.org/x/text v0.28.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect modernc.org/libc v1.22.5 // indirect modernc.org/mathutil v1.5.0 // indirect diff --git a/go.sum b/go.sum index a319b39..dec326a 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -33,11 +37,23 @@ github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPn github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/modelcontextprotocol/go-sdk v1.4.0 h1:u0kr8lbJc1oBcawK7Df+/ajNMpIDFE41OEPxdeTLOn8= github.com/modelcontextprotocol/go-sdk v1.4.0/go.mod h1:Nxc2n+n/GdCebUaqCOhTetptS17SXXNu9IfNTaLDi1E= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/segmentio/asm v1.1.3 h1:WM03sfUOENvvKexOLp+pCqgb/WDjsi7EK8gIsICtzhc= github.com/segmentio/asm v1.1.3/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg= github.com/segmentio/encoding v0.5.3 h1:OjMgICtcSFuNvQCdwqMCv9Tg7lEOXGwm1J5RPQccx6w= @@ -47,21 +63,30 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4= github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw= golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc= golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/coordinator/checkin/metrics.go b/internal/coordinator/checkin/metrics.go new file mode 100644 index 0000000..8b2cc72 --- /dev/null +++ b/internal/coordinator/checkin/metrics.go @@ -0,0 +1,106 @@ +package checkin + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Metrics holds Prometheus metrics for the check-in system. +type Metrics struct { + // Scheduler metrics + CheckInsTriggered prometheus.Counter + CheckInsFailed prometheus.Counter + CheckInsSucceeded prometheus.Counter + CheckInsTimedOut prometheus.Counter + ResponseLatency prometheus.Histogram + + // Retry metrics + CheckInRetries prometheus.Counter + MaxRetriesExceeded prometheus.Counter + + // Leader election metrics + LeaderElections prometheus.Counter + LeadershipDuration prometheus.Gauge + LockRenewals prometheus.Counter + LockFailures prometheus.Counter + + // Configuration metrics + ActiveConfigs prometheus.Gauge + ConfigChanges prometheus.Counter + + // Agent metrics + IdleOnlySkipped prometheus.Counter + DuplicateSkipped prometheus.Counter + MessageDeliveryFailures prometheus.Counter +} + +// NewMetrics creates and registers Prometheus metrics for check-ins. +func NewMetrics() *Metrics { + return &Metrics{ + CheckInsTriggered: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_triggers_total", + Help: "Total number of check-ins triggered", + }), + CheckInsFailed: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_failures_total", + Help: "Total number of check-ins that failed (max retries exceeded)", + }), + CheckInsSucceeded: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_successes_total", + Help: "Total number of successful check-ins (agent responded)", + }), + CheckInsTimedOut: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_timeouts_total", + Help: "Total number of check-ins that timed out", + }), + ResponseLatency: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "checkin_response_latency_seconds", + Help: "Check-in response latency in seconds", + Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600}, // 1s to 10min + }), + CheckInRetries: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_retries_total", + Help: "Total number of check-in retry attempts", + }), + MaxRetriesExceeded: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_max_retries_exceeded_total", + Help: "Total number of check-ins that exceeded max retry attempts", + }), + LeaderElections: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_leader_elections_total", + Help: "Total number of leader elections (instance became leader)", + }), + LeadershipDuration: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "checkin_leadership_seconds", + Help: "Duration in seconds since this instance became leader (0 if not leader)", + }), + LockRenewals: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_lock_renewals_total", + Help: "Total number of successful leader lock renewals", + }), + LockFailures: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_lock_failures_total", + Help: "Total number of leader lock renewal failures", + }), + ActiveConfigs: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "checkin_active_configs", + Help: "Number of currently active check-in configurations", + }), + ConfigChanges: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_config_changes_total", + Help: "Total number of check-in configuration changes (create/update/delete)", + }), + IdleOnlySkipped: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_idle_only_skipped_total", + Help: "Total number of check-ins skipped because agent was not idle", + }), + DuplicateSkipped: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_duplicate_skipped_total", + Help: "Total number of check-ins skipped due to pending check-in exists", + }), + MessageDeliveryFailures: promauto.NewCounter(prometheus.CounterOpts{ + Name: "checkin_message_delivery_failures_total", + Help: "Total number of check-in message delivery failures", + }), + } +} diff --git a/internal/coordinator/checkin/response_tracker.go b/internal/coordinator/checkin/response_tracker.go new file mode 100644 index 0000000..2323681 --- /dev/null +++ b/internal/coordinator/checkin/response_tracker.go @@ -0,0 +1,194 @@ +package checkin + +import ( + "fmt" + "log" + "time" + + "github.com/ambient/platform/components/boss/internal/coordinator/db" +) + +// ResponseTracker monitors agent status updates and correlates them with pending check-ins. +type ResponseTracker struct { + repo *db.Repository + metrics *Metrics +} + +// NewResponseTracker creates a new response tracker instance. +func NewResponseTracker(repo *db.Repository, metrics *Metrics) *ResponseTracker { + return &ResponseTracker{ + repo: repo, + metrics: metrics, + } +} + +// CheckPendingEvents checks for pending check-in events and validates responses. +// Should be called periodically (e.g., every 30 seconds). +func (rt *ResponseTracker) CheckPendingEvents() error { + // Get all pending check-ins from the last 10 minutes + events, err := rt.repo.GetPendingCheckInEvents(10) + if err != nil { + return fmt.Errorf("get pending events: %w", err) + } + + now := time.Now().UTC() + + for _, event := range events { + // Get current agent status + agent, err := rt.repo.GetAgent(event.SpaceName, event.AgentName) + if err != nil { + log.Printf("[response tracker] error getting agent %s/%s: %v", + event.SpaceName, event.AgentName, err) + continue + } + if agent == nil { + log.Printf("[response tracker] agent %s/%s not found for event %s", + event.SpaceName, event.AgentName, event.ID) + continue + } + + // Check if agent has updated their status since the check-in was triggered + // Agent's UpdatedAt timestamp should be after the check-in was triggered + if agent.UpdatedAt.After(event.TriggeredAt) { + // Agent has posted a status update - consider this a response + event.ResponseReceived = true + event.ResponseAt.Time = agent.UpdatedAt + event.ResponseAt.Valid = true + + // Calculate response latency + latencyMs := agent.UpdatedAt.Sub(event.TriggeredAt).Milliseconds() + event.ResponseLatencyMs.Int64 = latencyMs + event.ResponseLatencyMs.Valid = true + + event.StatusAfterCheckIn = agent.Status + + if err := rt.repo.UpdateCheckInEvent(event); err != nil { + log.Printf("[response tracker] error updating event %s: %v", event.ID, err) + continue + } + + log.Printf("[response tracker] check-in response received for %s/%s (latency: %dms)", + event.SpaceName, event.AgentName, latencyMs) + + // Track metrics + rt.metrics.CheckInsSucceeded.Inc() + rt.metrics.ResponseLatency.Observe(float64(latencyMs) / 1000.0) // convert to seconds + + // Update the config's last_check_in_at timestamp + if err := rt.updateLastCheckIn(event.SpaceName, event.AgentName); err != nil { + log.Printf("[response tracker] error updating last check-in time: %v", err) + } + } else { + // Check if timeout has been exceeded + cfg, err := rt.repo.GetCheckInConfig(event.SpaceName, event.AgentName) + if err != nil { + log.Printf("[response tracker] error getting config for %s/%s: %v", + event.SpaceName, event.AgentName, err) + continue + } + if cfg == nil { + continue + } + + timeoutDuration := time.Duration(cfg.TimeoutSeconds) * time.Second + if now.Sub(event.TriggeredAt) > timeoutDuration { + // Timeout exceeded - check if we should retry + if event.RetryCount < cfg.RetryAttempts { + // Calculate retry delay with exponential backoff: 1x, 2x, 4x + retryDelay := time.Duration(cfg.RetryDelaySeconds) * time.Second + backoffMultiplier := 1 << event.RetryCount // 2^retryCount + retryDelay = retryDelay * time.Duration(backoffMultiplier) + + scheduledFor := now.Add(retryDelay) + + log.Printf("[response tracker] check-in timeout for %s/%s, scheduling retry %d/%d (delay: %s, scheduled for: %s)", + event.SpaceName, event.AgentName, event.RetryCount+1, cfg.RetryAttempts, retryDelay, scheduledFor.Format(time.RFC3339)) + + // Mark original event as timed out + event.ErrorMessage = fmt.Sprintf("timeout after %s, retry scheduled", timeoutDuration) + event.ResponseReceived = false + + if err := rt.repo.UpdateCheckInEvent(event); err != nil { + log.Printf("[response tracker] error updating event %s: %v", event.ID, err) + } + + // Create a new check-in event for the retry + retryEvent := &db.CheckInEvent{ + ID: fmt.Sprintf("%s-retry-%d", event.ID, event.RetryCount+1), + SpaceName: event.SpaceName, + AgentName: event.AgentName, + ScheduledAt: scheduledFor, + TriggeredAt: time.Time{}, // Will be set when actually triggered + AgentStatus: event.AgentStatus, + MessageSent: false, + ResponseReceived: false, + RetryCount: event.RetryCount + 1, + } + + if err := rt.repo.CreateCheckInEvent(retryEvent); err != nil { + log.Printf("[response tracker] error creating retry event: %v", err) + rt.metrics.MessageDeliveryFailures.Inc() + } else { + rt.metrics.CheckInRetries.Inc() + log.Printf("[response tracker] retry event created: %s (scheduled for %s)", + retryEvent.ID, scheduledFor.Format(time.RFC3339)) + } + } else { + // Max retries exceeded - mark as failed + event.ResponseReceived = false + event.ErrorMessage = fmt.Sprintf("max retries (%d) exceeded, no response after %s", + cfg.RetryAttempts, timeoutDuration) + + // Track metrics + rt.metrics.MaxRetriesExceeded.Inc() + rt.metrics.CheckInsFailed.Inc() + + if err := rt.repo.UpdateCheckInEvent(event); err != nil { + log.Printf("[response tracker] error updating event %s: %v", event.ID, err) + } + + log.Printf("[response tracker] check-in failed for %s/%s: max retries exceeded", + event.SpaceName, event.AgentName) + } + } + } + } + + return nil +} + +// updateLastCheckIn updates the last_check_in_at timestamp for a config. +func (rt *ResponseTracker) updateLastCheckIn(spaceName, agentName string) error { + cfg, err := rt.repo.GetCheckInConfig(spaceName, agentName) + if err != nil { + return err + } + if cfg == nil { + return fmt.Errorf("config not found") + } + + cfg.LastCheckInAt.Time = time.Now().UTC() + cfg.LastCheckInAt.Valid = true + + return rt.repo.UpsertCheckInConfig(cfg) +} + +// ValidateResponse checks if an agent's status update counts as a valid check-in response. +func (rt *ResponseTracker) ValidateResponse(spaceName, agentName string, updateTime time.Time) (bool, error) { + // Get pending check-ins for this agent + events, err := rt.repo.GetPendingCheckInEvents(10) + if err != nil { + return false, fmt.Errorf("get pending events: %w", err) + } + + for _, event := range events { + if event.SpaceName == spaceName && event.AgentName == agentName { + // Check if this update is after the check-in was triggered + if updateTime.After(event.TriggeredAt) { + return true, nil + } + } + } + + return false, nil +} diff --git a/internal/coordinator/checkin/scheduler.go b/internal/coordinator/checkin/scheduler.go new file mode 100644 index 0000000..165161b --- /dev/null +++ b/internal/coordinator/checkin/scheduler.go @@ -0,0 +1,453 @@ +// Package checkin implements the agent check-in scheduler and coordinator. +package checkin + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/ambient/platform/components/boss/internal/coordinator/db" + "github.com/google/uuid" + "github.com/robfig/cron/v3" +) + +const ( + // Leader election lock duration (30 seconds as per spec) + lockDuration = 30 * time.Second + // Lock renewal interval (15 seconds - renew at half the lock duration) + renewInterval = 15 * time.Second + // Failover detection window (40 seconds max as per spec) + pollInterval = 10 * time.Second +) + +// Scheduler manages check-in schedules and coordinates execution across instances. +type Scheduler struct { + repo *db.Repository + cron *cron.Cron + instanceID string + isLeader bool + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + + // Message sender function injected for testing + sendMessage func(spaceName, agentName, message string) error + + // Metrics for observability + metrics *Metrics + leaderSince time.Time +} + +// New creates a new check-in scheduler instance. +func New(repo *db.Repository) *Scheduler { + ctx, cancel := context.WithCancel(context.Background()) + return &Scheduler{ + repo: repo, + cron: cron.New(cron.WithSeconds()), + instanceID: uuid.New().String(), + ctx: ctx, + cancel: cancel, + metrics: NewMetrics(), + } +} + +// SetMessageSender injects the message delivery function. +func (s *Scheduler) SetMessageSender(fn func(spaceName, agentName, message string) error) { + s.sendMessage = fn +} + +// Start begins the scheduler's leader election and job execution loop. +func (s *Scheduler) Start() error { + log.Printf("[check-in scheduler] starting instance %s", s.instanceID) + + // Start leader election loop + go s.leaderElectionLoop() + + return nil +} + +// Stop gracefully shuts down the scheduler. +func (s *Scheduler) Stop() error { + log.Printf("[check-in scheduler] stopping instance %s", s.instanceID) + s.cancel() + + // Release leader lock if held + if s.isLeader { + if err := s.repo.ReleaseSchedulerLock(s.instanceID); err != nil { + log.Printf("[check-in scheduler] error releasing lock: %v", err) + } + } + + // Stop cron scheduler + ctx := s.cron.Stop() + <-ctx.Done() + + return nil +} + +// leaderElectionLoop continuously tries to acquire/maintain leader status. +func (s *Scheduler) leaderElectionLoop() { + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + s.tryBecomeLeader() + } + } +} + +// tryBecomeLeader attempts to acquire the leader lock. +func (s *Scheduler) tryBecomeLeader() { + acquired, err := s.repo.AcquireSchedulerLock(s.instanceID, lockDuration) + if err != nil { + log.Printf("[check-in scheduler] error acquiring lock: %v", err) + return + } + + s.mu.Lock() + wasLeader := s.isLeader + s.isLeader = acquired + s.mu.Unlock() + + if acquired && !wasLeader { + log.Printf("[check-in scheduler] instance %s became leader", s.instanceID) + s.leaderSince = time.Now() + s.metrics.LeaderElections.Inc() + go s.leaderLoop() + } +} + +// leaderLoop runs while this instance is the leader. +func (s *Scheduler) leaderLoop() { + // Load all enabled check-in configs and schedule them + if err := s.reloadSchedules(); err != nil { + log.Printf("[check-in scheduler] error loading schedules: %v", err) + } + + // Start cron scheduler + s.cron.Start() + + // Start retry queue processor + retryTicker := time.NewTicker(30 * time.Second) + defer retryTicker.Stop() + + // Lock renewal loop + renewTicker := time.NewTicker(renewInterval) + defer renewTicker.Stop() + + for { + select { + case <-s.ctx.Done(): + return + case <-retryTicker.C: + // Process retry queue every 30 seconds + if err := s.processRetryQueue(); err != nil { + log.Printf("[check-in scheduler] error processing retry queue: %v", err) + } + case <-renewTicker.C: + if err := s.repo.RenewSchedulerLock(s.instanceID, lockDuration); err != nil { + log.Printf("[check-in scheduler] lost leader lock: %v", err) + s.metrics.LockFailures.Inc() + s.metrics.LeadershipDuration.Set(0) + s.mu.Lock() + s.isLeader = false + s.mu.Unlock() + s.cron.Stop() + return + } + s.metrics.LockRenewals.Inc() + s.metrics.LeadershipDuration.Set(time.Since(s.leaderSince).Seconds()) + } + } +} + +// reloadSchedules loads all enabled check-in configs and schedules cron jobs. +func (s *Scheduler) reloadSchedules() error { + configs, err := s.repo.ListCheckInConfigs(true) + if err != nil { + return fmt.Errorf("list enabled configs: %w", err) + } + + // Clear existing entries + for _, entry := range s.cron.Entries() { + s.cron.Remove(entry.ID) + } + + // Add new entries + for _, cfg := range configs { + if err := s.addSchedule(cfg); err != nil { + log.Printf("[check-in scheduler] error adding schedule for %s/%s: %v", + cfg.SpaceName, cfg.AgentName, err) + continue + } + } + + log.Printf("[check-in scheduler] loaded %d check-in schedules", len(configs)) + s.metrics.ActiveConfigs.Set(float64(len(configs))) + return nil +} + +// addSchedule adds a cron job for a check-in configuration. +func (s *Scheduler) addSchedule(cfg *db.AgentCheckInConfig) error { + parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + schedule, err := parser.Parse(cfg.CronSchedule) + if err != nil { + return fmt.Errorf("parse cron schedule %q: %w", cfg.CronSchedule, err) + } + + // Create job that triggers check-in + job := func() { + if err := s.triggerCheckIn(cfg); err != nil { + log.Printf("[check-in scheduler] error triggering check-in for %s/%s: %v", + cfg.SpaceName, cfg.AgentName, err) + } + } + + s.cron.Schedule(schedule, cron.FuncJob(job)) + return nil +} + +// triggerCheckIn executes a check-in for an agent. +func (s *Scheduler) triggerCheckIn(cfg *db.AgentCheckInConfig) error { + // Get current agent status + agent, err := s.repo.GetAgent(cfg.SpaceName, cfg.AgentName) + if err != nil { + return fmt.Errorf("get agent: %w", err) + } + if agent == nil { + return fmt.Errorf("agent not found") + } + + // 3-check idle validation (if idle_only is enabled) + if cfg.IdleOnly { + // Check 1: Agent status must be "idle" + if agent.Status != "idle" { + log.Printf("[check-in scheduler] skipping %s/%s (status=%s, idle_only=true)", + cfg.SpaceName, cfg.AgentName, agent.Status) + s.metrics.IdleOnlySkipped.Inc() + return nil + } + + // Check 2: Agent must be idle for at least 5 minutes (300 seconds) + idleDuration := time.Since(agent.UpdatedAt) + if idleDuration < 300*time.Second { + log.Printf("[check-in scheduler] skipping %s/%s (idle for %v, required 5m)", + cfg.SpaceName, cfg.AgentName, idleDuration) + s.metrics.IdleOnlySkipped.Inc() + return nil + } + + // Check 3: No pending check-ins within 10-minute window + pending, err := s.repo.GetPendingCheckInEvents(10) + if err != nil { + log.Printf("[check-in scheduler] error checking pending events: %v", err) + // Continue anyway - don't block on this check + } else { + for _, evt := range pending { + if evt.AgentName == cfg.AgentName && evt.SpaceName == cfg.SpaceName { + log.Printf("[check-in scheduler] skipping %s/%s (pending check-in exists)", + cfg.SpaceName, cfg.AgentName) + s.metrics.DuplicateSkipped.Inc() + return nil + } + } + } + + // 100ms debounce before message send + time.Sleep(100 * time.Millisecond) + + // Re-validate all 3 checks after debounce + agent, err = s.repo.GetAgent(cfg.SpaceName, cfg.AgentName) + if err != nil { + return fmt.Errorf("re-validate agent: %w", err) + } + if agent == nil { + return fmt.Errorf("agent not found on re-validation") + } + + // Re-check 1: Status still idle + if agent.Status != "idle" { + log.Printf("[check-in scheduler] skipping %s/%s (status changed to %s after debounce)", + cfg.SpaceName, cfg.AgentName, agent.Status) + s.metrics.IdleOnlySkipped.Inc() + return nil + } + + // Re-check 2: Still idle for at least 5 minutes + idleDuration = time.Since(agent.UpdatedAt) + if idleDuration < 300*time.Second { + log.Printf("[check-in scheduler] skipping %s/%s (idle duration %v after debounce)", + cfg.SpaceName, cfg.AgentName, idleDuration) + s.metrics.IdleOnlySkipped.Inc() + return nil + } + + // Re-check 3: No new pending check-ins appeared + pending, err = s.repo.GetPendingCheckInEvents(10) + if err != nil { + log.Printf("[check-in scheduler] error re-checking pending events: %v", err) + } else { + for _, evt := range pending { + if evt.AgentName == cfg.AgentName && evt.SpaceName == cfg.SpaceName { + log.Printf("[check-in scheduler] skipping %s/%s (pending check-in appeared after debounce)", + cfg.SpaceName, cfg.AgentName) + s.metrics.DuplicateSkipped.Inc() + return nil + } + } + } + } else { + // When idle_only is false, still check for duplicate pending check-ins + pending, err := s.repo.GetPendingCheckInEvents(10) + if err != nil { + log.Printf("[check-in scheduler] error checking pending events: %v", err) + // Continue anyway - don't block on this check + } else { + for _, evt := range pending { + if evt.AgentName == cfg.AgentName && evt.SpaceName == cfg.SpaceName { + log.Printf("[check-in scheduler] skipping %s/%s (pending check-in exists)", + cfg.SpaceName, cfg.AgentName) + s.metrics.DuplicateSkipped.Inc() + return nil + } + } + } + } + + // Create check-in event + now := time.Now().UTC() + event := &db.CheckInEvent{ + ID: uuid.New().String(), + SpaceName: cfg.SpaceName, + AgentName: cfg.AgentName, + ScheduledAt: now, + TriggeredAt: now, + AgentStatus: agent.Status, + MessageSent: false, + ResponseReceived: false, + RetryCount: 0, + } + + if err := s.repo.CreateCheckInEvent(event); err != nil { + return fmt.Errorf("create check-in event: %w", err) + } + + // Send check-in message + if s.sendMessage != nil { + message := fmt.Sprintf("🔔 Scheduled check-in. Please confirm you're operational by posting a status update.") + if err := s.sendMessage(cfg.SpaceName, cfg.AgentName, message); err != nil { + log.Printf("[check-in scheduler] failed to send message to %s/%s: %v", + cfg.SpaceName, cfg.AgentName, err) + event.ErrorMessage = err.Error() + event.MessageSent = false + s.metrics.MessageDeliveryFailures.Inc() + } else { + event.MessageSent = true + event.MessageID = uuid.New().String() // TODO: get actual message ID from send_message + } + } else { + event.ErrorMessage = "message sender not configured" + s.metrics.MessageDeliveryFailures.Inc() + } + + // Update event with message status + if err := s.repo.UpdateCheckInEvent(event); err != nil { + log.Printf("[check-in scheduler] error updating event: %v", err) + } + + log.Printf("[check-in scheduler] triggered check-in for %s/%s (event %s)", + cfg.SpaceName, cfg.AgentName, event.ID) + s.metrics.CheckInsTriggered.Inc() + + return nil +} + +// IsLeader returns true if this instance is currently the leader. +func (s *Scheduler) IsLeader() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.isLeader +} + +// ReloadSchedules triggers a reload of all check-in schedules. +// Called when a configuration is added/updated/deleted. +func (s *Scheduler) ReloadSchedules() error { + if !s.IsLeader() { + return nil // Only leader reloads schedules + } + s.metrics.ConfigChanges.Inc() + return s.reloadSchedules() +} + +// GetMetrics returns the metrics instance for sharing with response tracker. +func (s *Scheduler) GetMetrics() *Metrics { + return s.metrics +} + +// processRetryQueue checks for scheduled check-in events that are ready to trigger +// and sends their messages. This handles retry events created by the response tracker. +func (s *Scheduler) processRetryQueue() error { + events, err := s.repo.GetScheduledCheckInEvents() + if err != nil { + return fmt.Errorf("get scheduled events: %w", err) + } + + if len(events) == 0 { + return nil + } + + log.Printf("[check-in scheduler] processing %d scheduled retry events", len(events)) + + for _, event := range events { + // Get the agent's current status + agent, err := s.repo.GetAgent(event.SpaceName, event.AgentName) + if err != nil { + log.Printf("[check-in scheduler] error getting agent %s/%s for retry: %v", + event.SpaceName, event.AgentName, err) + continue + } + if agent == nil { + log.Printf("[check-in scheduler] agent %s/%s not found for retry event %s", + event.SpaceName, event.AgentName, event.ID) + continue + } + + // Update event with current agent status and triggered time + event.AgentStatus = agent.Status + event.TriggeredAt = time.Now().UTC() + + // Send check-in message + if s.sendMessage != nil { + message := fmt.Sprintf("🔔 Scheduled check-in (retry %d). Please confirm you're operational by posting a status update.", event.RetryCount) + if err := s.sendMessage(event.SpaceName, event.AgentName, message); err != nil { + log.Printf("[check-in scheduler] failed to send retry message to %s/%s: %v", + event.SpaceName, event.AgentName, err) + event.ErrorMessage = err.Error() + event.MessageSent = false + s.metrics.MessageDeliveryFailures.Inc() + } else { + event.MessageSent = true + event.MessageID = fmt.Sprintf("retry-%s", event.ID) + } + } else { + event.ErrorMessage = "message sender not configured" + s.metrics.MessageDeliveryFailures.Inc() + } + + // Update event with message status + if err := s.repo.UpdateCheckInEvent(event); err != nil { + log.Printf("[check-in scheduler] error updating retry event %s: %v", event.ID, err) + } + + log.Printf("[check-in scheduler] triggered retry check-in for %s/%s (event %s, retry %d)", + event.SpaceName, event.AgentName, event.ID, event.RetryCount) + } + + return nil +} diff --git a/internal/coordinator/db/db.go b/internal/coordinator/db/db.go index 436a602..421fcbc 100644 --- a/internal/coordinator/db/db.go +++ b/internal/coordinator/db/db.go @@ -96,6 +96,9 @@ func migrate(db *gorm.DB) error { &InterruptRecord{}, &PersonaRow{}, &PersonaVersionRow{}, + &AgentCheckInConfig{}, + &CheckInEvent{}, + &CheckInSchedulerLock{}, ); err != nil { return err } @@ -120,6 +123,69 @@ func migrate(db *gorm.DB) error { ) WHERE status_changed_at IS NULL OR status_changed_at = '0001-01-01 00:00:00+00:00' OR status_changed_at = ''`) + // Apply CHECK constraints for check-in tables (idempotent - safe to run on every startup). + if err := migrateCheckInConstraints(db); err != nil { + return fmt.Errorf("migrate check-in constraints: %w", err) + } + + return nil +} + +// migrateCheckInConstraints adds CHECK constraints to check-in tables. +// These constraints enforce data integrity and are not supported by GORM tags. +// The migration is idempotent and safe to run multiple times. +func migrateCheckInConstraints(db *gorm.DB) error { + // Get database type to determine SQL dialect + dbType := os.Getenv("DB_TYPE") + if dbType == "" { + dbType = "sqlite" + } + + // For PostgreSQL, we can check if constraints exist before adding them + if dbType == "postgres" { + // Check and add constraints for agent_check_in_configs and check_in_events + constraints := []struct { + table string + name string + definition string + }{ + {"agent_check_in_configs", "chk_timeout_seconds_positive", "CHECK (timeout_seconds >= 0)"}, + {"agent_check_in_configs", "chk_retry_attempts_non_negative", "CHECK (retry_attempts >= 0)"}, + {"agent_check_in_configs", "chk_retry_delay_seconds_positive", "CHECK (retry_delay_seconds >= 0)"}, + {"check_in_events", "chk_retry_count_non_negative", "CHECK (retry_count >= 0)"}, + {"check_in_events", "chk_latency_non_negative", "CHECK (response_latency_ms IS NULL OR response_latency_ms >= 0)"}, + {"check_in_events", "chk_response_consistency", "CHECK ((response_received = FALSE AND response_at IS NULL) OR (response_received = TRUE AND response_at IS NOT NULL))"}, + } + + for _, c := range constraints { + // Check if constraint exists + var exists bool + err := db.Raw(` + SELECT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conname = ? AND conrelid = ?::regclass + ) + `, c.name, c.table).Scan(&exists).Error + if err != nil { + return fmt.Errorf("check constraint %s existence: %w", c.name, err) + } + + // Add constraint if it doesn't exist + if !exists { + sql := fmt.Sprintf("ALTER TABLE %s ADD CONSTRAINT %s %s", c.table, c.name, c.definition) + if err := db.Exec(sql).Error; err != nil { + return fmt.Errorf("add constraint %s: %w", c.name, err) + } + } + } + } + + // For SQLite, constraints must be added during table creation or via table recreation. + // Since GORM AutoMigrate already created the tables, we would need to recreate them. + // SQLite doesn't support ALTER TABLE ADD CONSTRAINT for CHECK constraints. + // For now, we skip SQLite CHECK constraints as they're primarily for production (PostgreSQL). + // Application-level validation in handlers provides the same protection for SQLite. + return nil } diff --git a/internal/coordinator/db/models.go b/internal/coordinator/db/models.go index 5a37f58..77df999 100644 --- a/internal/coordinator/db/models.go +++ b/internal/coordinator/db/models.go @@ -258,3 +258,54 @@ type InterruptRecord struct { } func (InterruptRecord) TableName() string { return "interrupts" } + +// AgentCheckInConfig stores per-agent check-in configuration. +type AgentCheckInConfig struct { + ID uint `gorm:"primarykey;autoIncrement"` + SpaceName string `gorm:"not null;uniqueIndex:idx_checkin_space_agent,priority:1;index"` + AgentName string `gorm:"not null;uniqueIndex:idx_checkin_space_agent,priority:2"` + CheckInEnabled bool `gorm:"not null;default:false;index:idx_checkin_enabled,where:check_in_enabled = true"` + CronSchedule string `gorm:"not null"` + IdleOnly bool `gorm:"not null;default:true"` + TimeoutSeconds int `gorm:"not null;default:300"` + RetryAttempts int `gorm:"not null;default:3"` + RetryDelaySeconds int `gorm:"not null;default:60"` + NotificationChannels string `gorm:"type:text"` // JSON array + LastCheckInAt sql.NullTime + EnabledBy string + CreatedAt time.Time + UpdatedAt time.Time +} + +func (AgentCheckInConfig) TableName() string { return "agent_check_in_configs" } + +// CheckInEvent records a check-in event and its outcome. +type CheckInEvent struct { + ID string `gorm:"primarykey;not null"` // UUID + SpaceName string `gorm:"not null;index:idx_event_space_agent,priority:1;index"` + AgentName string `gorm:"not null;index:idx_event_space_agent,priority:2;index:idx_event_agent_time,priority:1"` + ScheduledAt time.Time `gorm:"not null"` + TriggeredAt time.Time `gorm:"not null;index:idx_event_agent_time,priority:2"` + AgentStatus string `gorm:"not null"` + MessageSent bool `gorm:"not null;default:false"` + MessageID string `gorm:"index"` + ResponseReceived bool `gorm:"not null;default:false;index:idx_pending_checkins,where:response_received = false AND message_sent = true"` + ResponseAt sql.NullTime + ResponseLatencyMs sql.NullInt64 + StatusAfterCheckIn string + ErrorMessage string `gorm:"type:text"` + RetryCount int `gorm:"not null;default:0"` +} + +func (CheckInEvent) TableName() string { return "check_in_events" } + +// CheckInSchedulerLock implements PostgreSQL-based leader election for the scheduler. +type CheckInSchedulerLock struct { + ID uint `gorm:"primarykey;autoIncrement"` + LockedBy string `gorm:"not null"` // instance identifier + LockedAt time.Time `gorm:"not null;index"` + ExpiresAt time.Time `gorm:"not null;index"` + RenewedAt time.Time `gorm:"not null"` +} + +func (CheckInSchedulerLock) TableName() string { return "check_in_scheduler_locks" } diff --git a/internal/coordinator/db/repository.go b/internal/coordinator/db/repository.go index 327c325..612fc87 100644 --- a/internal/coordinator/db/repository.go +++ b/internal/coordinator/db/repository.go @@ -549,3 +549,152 @@ func (r *Repository) PersonaExists(id string) (bool, error) { err := r.db.Model(&PersonaRow{}).Where("id = ?", id).Count(&count).Error return count > 0, err } + +// ---- Check-In operations ---- + +// UpsertCheckInConfig creates or updates a check-in configuration for an agent. +func (r *Repository) UpsertCheckInConfig(cfg *AgentCheckInConfig) error { + return r.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "space_name"}, {Name: "agent_name"}}, + DoUpdates: clause.AssignmentColumns([]string{ + "check_in_enabled", "cron_schedule", "idle_only", + "timeout_seconds", "retry_attempts", "retry_delay_seconds", + "notification_channels", "last_check_in_at", "enabled_by", "updated_at", + }), + }).Create(cfg).Error +} + +// GetCheckInConfig returns the check-in config for an agent, or (nil, nil) if not found. +func (r *Repository) GetCheckInConfig(spaceName, agentName string) (*AgentCheckInConfig, error) { + var cfg AgentCheckInConfig + err := r.db.Where("space_name = ? AND agent_name = ?", spaceName, agentName).First(&cfg).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + return &cfg, err +} + +// ListCheckInConfigs returns all check-in configs, optionally filtered by enabled status. +func (r *Repository) ListCheckInConfigs(enabledOnly bool) ([]*AgentCheckInConfig, error) { + var configs []*AgentCheckInConfig + query := r.db + if enabledOnly { + query = query.Where("check_in_enabled = ?", true) + } + return configs, query.Order("space_name ASC, agent_name ASC").Find(&configs).Error +} + +// DeleteCheckInConfig removes a check-in configuration. +func (r *Repository) DeleteCheckInConfig(spaceName, agentName string) error { + return r.db.Where("space_name = ? AND agent_name = ?", spaceName, agentName).Delete(&AgentCheckInConfig{}).Error +} + +// CreateCheckInEvent records a new check-in event. +func (r *Repository) CreateCheckInEvent(event *CheckInEvent) error { + return r.db.Create(event).Error +} + +// GetCheckInEvent returns a check-in event by ID, or (nil, nil) if not found. +func (r *Repository) GetCheckInEvent(eventID string) (*CheckInEvent, error) { + var event CheckInEvent + err := r.db.Where("id = ?", eventID).First(&event).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + return &event, err +} + +// ListCheckInEvents returns check-in events for an agent, newest first. +func (r *Repository) ListCheckInEvents(spaceName, agentName string, limit int) ([]*CheckInEvent, error) { + var events []*CheckInEvent + query := r.db.Where("space_name = ? AND agent_name = ?", spaceName, agentName). + Order("triggered_at DESC") + if limit > 0 { + query = query.Limit(limit) + } + return events, query.Find(&events).Error +} + +// UpdateCheckInEvent updates fields on an existing check-in event. +func (r *Repository) UpdateCheckInEvent(event *CheckInEvent) error { + return r.db.Save(event).Error +} + +// GetPendingCheckInEvents returns check-in events awaiting response (message sent but not received). +func (r *Repository) GetPendingCheckInEvents(lookbackMinutes int) ([]*CheckInEvent, error) { + var events []*CheckInEvent + cutoff := time.Now().Add(-time.Duration(lookbackMinutes) * time.Minute) + return events, r.db.Where("message_sent = ? AND response_received = ? AND triggered_at > ?", + true, false, cutoff).Find(&events).Error +} + +// GetScheduledCheckInEvents returns check-in events that are scheduled to trigger now or in the past, +// but have not yet sent their message. Used for retry queue processing. +func (r *Repository) GetScheduledCheckInEvents() ([]*CheckInEvent, error) { + var events []*CheckInEvent + now := time.Now() + return events, r.db.Where("scheduled_at <= ? AND message_sent = ? AND response_received = ?", + now, false, false).Order("scheduled_at ASC").Find(&events).Error +} + +// AcquireSchedulerLock attempts to acquire the scheduler lock. Returns true if acquired. +// Uses INSERT...ON CONFLICT to atomically acquire expired locks. The single-row table +// pattern (id=1) ensures only one lock can exist; the WHERE clause on DO UPDATE prevents +// stealing an active lock from another instance. +func (r *Repository) AcquireSchedulerLock(instanceID string, duration time.Duration) (bool, error) { + now := time.Now() + expiresAt := now.Add(duration) + + // Atomic lock acquisition: insert or update if lock is expired OR same instance. + // PostgreSQL: uses ON CONFLICT DO UPDATE with WHERE clause. + // SQLite: ON CONFLICT REPLACE doesn't support WHERE, so we delete expired locks first. + + // For SQLite: clean up expired lock, then insert + r.db.Exec(`DELETE FROM check_in_scheduler_locks WHERE expires_at < ?`, now) + + // Try to insert new lock. WHERE clause allows acquisition when: + // 1. Lock is expired (expires_at < NOW), OR + // 2. Same instance is re-acquiring (locked_by matches) + result := r.db.Exec(` + INSERT INTO check_in_scheduler_locks (id, locked_by, locked_at, expires_at, renewed_at) + VALUES (1, ?, ?, ?, ?) + ON CONFLICT (id) DO UPDATE + SET locked_by = EXCLUDED.locked_by, + locked_at = EXCLUDED.locked_at, + expires_at = EXCLUDED.expires_at, + renewed_at = EXCLUDED.renewed_at + WHERE check_in_scheduler_locks.expires_at < ? + OR check_in_scheduler_locks.locked_by = EXCLUDED.locked_by + `, instanceID, now, expiresAt, now, now) + + if result.Error != nil { + return false, result.Error + } + + return result.RowsAffected > 0, nil +} + +// RenewSchedulerLock extends the expiration time for the scheduler lock held by this instance. +func (r *Repository) RenewSchedulerLock(instanceID string, duration time.Duration) error { + now := time.Now() + expiresAt := now.Add(duration) + + result := r.db.Exec(` + UPDATE check_in_scheduler_locks + SET expires_at = ?, renewed_at = ? + WHERE locked_by = ? AND expires_at > ? + `, expiresAt, now, instanceID, now) + + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + return fmt.Errorf("lock not held by instance %s", instanceID) + } + return nil +} + +// ReleaseSchedulerLock releases the scheduler lock held by this instance. +func (r *Repository) ReleaseSchedulerLock(instanceID string) error { + return r.db.Where("locked_by = ?", instanceID).Delete(&CheckInSchedulerLock{}).Error +} diff --git a/internal/coordinator/handlers_checkin.go b/internal/coordinator/handlers_checkin.go new file mode 100644 index 0000000..06a7c48 --- /dev/null +++ b/internal/coordinator/handlers_checkin.go @@ -0,0 +1,473 @@ +package coordinator + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/ambient/platform/components/boss/internal/coordinator/db" + "github.com/google/uuid" + "github.com/robfig/cron/v3" +) + +// CheckInConfigRequest is the JSON request body for creating/updating check-in config. +type CheckInConfigRequest struct { + CheckInEnabled bool `json:"check_in_enabled"` + CronSchedule string `json:"cron_schedule"` + IdleOnly bool `json:"idle_only"` + TimeoutSeconds int `json:"timeout_seconds,omitempty"` + RetryAttempts int `json:"retry_attempts,omitempty"` + RetryDelaySeconds int `json:"retry_delay_seconds,omitempty"` + NotificationChannels []string `json:"notification_channels,omitempty"` +} + +// CheckInConfigResponse is the JSON response for check-in configuration. +type CheckInConfigResponse struct { + AgentName string `json:"agent_name"` + SpaceName string `json:"space_name"` + CheckInEnabled bool `json:"check_in_enabled"` + CronSchedule string `json:"cron_schedule"` + IdleOnly bool `json:"idle_only"` + TimeoutSeconds int `json:"timeout_seconds"` + RetryAttempts int `json:"retry_attempts"` + RetryDelaySeconds int `json:"retry_delay_seconds"` + NotificationChannels []string `json:"notification_channels"` + LastCheckInAt *time.Time `json:"last_check_in_at,omitempty"` + EnabledBy string `json:"enabled_by,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// CheckInEventResponse is the JSON response for check-in events. +type CheckInEventResponse struct { + ID string `json:"id"` + AgentName string `json:"agent_name"` + SpaceName string `json:"space_name"` + ScheduledAt time.Time `json:"scheduled_at"` + TriggeredAt time.Time `json:"triggered_at"` + AgentStatus string `json:"agent_status"` + MessageSent bool `json:"message_sent"` + MessageID string `json:"message_id,omitempty"` + ResponseReceived bool `json:"response_received"` + ResponseAt *time.Time `json:"response_at,omitempty"` + ResponseLatencyMs *int64 `json:"response_latency_ms,omitempty"` + StatusAfterCheckIn string `json:"status_after_check_in,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` + RetryCount int `json:"retry_count"` +} + +// validateScheduleFrequency ensures cron schedule doesn't trigger more frequently than every 5 minutes. +// Spec requirement: minimum 5-minute interval to prevent excessive check-ins. +func validateScheduleFrequency(schedule cron.Schedule, cronStr string) error { + // Calculate time between first two scheduled runs + now := time.Now() + next1 := schedule.Next(now) + next2 := schedule.Next(next1) + + interval := next2.Sub(next1) + minInterval := 5 * time.Minute + + if interval < minInterval { + return fmt.Errorf("schedule %q triggers too frequently (every %v); minimum interval is 5 minutes", cronStr, interval) + } + + return nil +} + +// validateAgentName ensures agent name follows safe naming conventions. +// Prevents injection attacks and ensures URL/database safety. +func validateAgentName(name string) error { + if name == "" { + return fmt.Errorf("agent name cannot be empty") + } + + if len(name) > 100 { + return fmt.Errorf("agent name too long (max 100 characters)") + } + + // Allow alphanumeric, hyphens, underscores, dots (standard identifier format) + for i, r := range name { + valid := (r >= 'a' && r <= 'z') || + (r >= 'A' && r <= 'Z') || + (r >= '0' && r <= '9') || + r == '-' || r == '_' || r == '.' + + if !valid { + return fmt.Errorf("agent name contains invalid character %q at position %d; only alphanumeric, hyphens, underscores, and dots allowed", r, i) + } + } + + return nil +} + +// handleAgentCheckInConfig handles GET/POST/PATCH/DELETE /spaces/{space}/agent/{agent}/check-in/config +func (s *Server) handleAgentCheckInConfig(w http.ResponseWriter, r *http.Request, spaceName, agentName string) { + switch r.Method { + case http.MethodGet: + s.getAgentCheckInConfig(w, r, spaceName, agentName) + case http.MethodPost: + s.createAgentCheckInConfig(w, r, spaceName, agentName) + case http.MethodPatch: + s.updateAgentCheckInConfig(w, r, spaceName, agentName) + case http.MethodDelete: + s.deleteAgentCheckInConfig(w, r, spaceName, agentName) + default: + writeJSONError(w, "method not allowed", http.StatusMethodNotAllowed) + } +} + +// getAgentCheckInConfig retrieves the check-in configuration for an agent. +func (s *Server) getAgentCheckInConfig(w http.ResponseWriter, r *http.Request, spaceName, agentName string) { + cfg, err := s.repo.GetCheckInConfig(spaceName, agentName) + if err != nil { + writeJSONError(w, fmt.Sprintf("database error: %v", err), http.StatusInternalServerError) + return + } + if cfg == nil { + writeJSONError(w, "check-in configuration not found", http.StatusNotFound) + return + } + + resp := configToResponse(cfg) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} + +// createAgentCheckInConfig creates a new check-in configuration for an agent. +func (s *Server) createAgentCheckInConfig(w http.ResponseWriter, r *http.Request, spaceName, agentName string) { + var req CheckInConfigRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeJSONError(w, fmt.Sprintf("invalid JSON: %v", err), http.StatusBadRequest) + return + } + + // Validate agent name format + if err := validateAgentName(agentName); err != nil { + writeJSONError(w, err.Error(), http.StatusBadRequest) + return + } + + // Validate required fields + if req.CronSchedule == "" { + writeJSONError(w, "cron_schedule is required", http.StatusBadRequest) + return + } + + // Validate cron schedule syntax + parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + schedule, err := parser.Parse(req.CronSchedule) + if err != nil { + writeJSONError(w, fmt.Sprintf("invalid cron schedule: %v", err), http.StatusBadRequest) + return + } + + // Validate schedule frequency (minimum 5 minutes per spec) + if err := validateScheduleFrequency(schedule, req.CronSchedule); err != nil { + writeJSONError(w, err.Error(), http.StatusBadRequest) + return + } + + // Validate timeout bounds (5-300 seconds per spec) + if req.TimeoutSeconds != 0 && (req.TimeoutSeconds < 5 || req.TimeoutSeconds > 300) { + writeJSONError(w, "timeout_seconds must be between 5 and 300 seconds", http.StatusBadRequest) + return + } + + // Validate retry values + if req.RetryAttempts < 0 || req.RetryDelaySeconds < 0 { + writeJSONError(w, "retry_attempts and retry_delay_seconds must be non-negative", http.StatusBadRequest) + return + } + + // Set defaults + if req.TimeoutSeconds == 0 { + req.TimeoutSeconds = 300 + } + if req.RetryAttempts == 0 { + req.RetryAttempts = 3 + } + if req.RetryDelaySeconds == 0 { + req.RetryDelaySeconds = 60 + } + + // Get caller name for audit + callerName := r.Header.Get("X-Agent-Name") + if callerName == "" { + callerName = "unknown" + } + + // Marshal notification channels to JSON + notifJSON, _ := json.Marshal(req.NotificationChannels) + + cfg := &db.AgentCheckInConfig{ + SpaceName: spaceName, + AgentName: agentName, + CheckInEnabled: req.CheckInEnabled, + CronSchedule: req.CronSchedule, + IdleOnly: req.IdleOnly, + TimeoutSeconds: req.TimeoutSeconds, + RetryAttempts: req.RetryAttempts, + RetryDelaySeconds: req.RetryDelaySeconds, + NotificationChannels: string(notifJSON), + EnabledBy: callerName, + CreatedAt: time.Now().UTC(), + UpdatedAt: time.Now().UTC(), + } + + if err := s.repo.UpsertCheckInConfig(cfg); err != nil { + writeJSONError(w, fmt.Sprintf("database error: %v", err), http.StatusInternalServerError) + return + } + + // Reload scheduler to pick up new config + if s.checkInScheduler != nil { + s.checkInScheduler.ReloadSchedules() + } + + resp := configToResponse(cfg) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(resp) +} + +// updateAgentCheckInConfig updates an existing check-in configuration. +func (s *Server) updateAgentCheckInConfig(w http.ResponseWriter, r *http.Request, spaceName, agentName string) { + // Validate agent name format + if err := validateAgentName(agentName); err != nil { + writeJSONError(w, err.Error(), http.StatusBadRequest) + return + } + + // Fetch existing config + cfg, err := s.repo.GetCheckInConfig(spaceName, agentName) + if err != nil { + writeJSONError(w, fmt.Sprintf("database error: %v", err), http.StatusInternalServerError) + return + } + if cfg == nil { + writeJSONError(w, "check-in configuration not found", http.StatusNotFound) + return + } + + var req CheckInConfigRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeJSONError(w, fmt.Sprintf("invalid JSON: %v", err), http.StatusBadRequest) + return + } + + // Validate cron schedule if provided + if req.CronSchedule != "" { + parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + schedule, err := parser.Parse(req.CronSchedule) + if err != nil { + writeJSONError(w, fmt.Sprintf("invalid cron schedule: %v", err), http.StatusBadRequest) + return + } + // Validate schedule frequency (minimum 5 minutes) + if err := validateScheduleFrequency(schedule, req.CronSchedule); err != nil { + writeJSONError(w, err.Error(), http.StatusBadRequest) + return + } + cfg.CronSchedule = req.CronSchedule + } + + // Validate timeout bounds if provided (5-300 seconds) + if req.TimeoutSeconds > 0 && (req.TimeoutSeconds < 5 || req.TimeoutSeconds > 300) { + writeJSONError(w, "timeout_seconds must be between 5 and 300 seconds", http.StatusBadRequest) + return + } + + // Update fields (only non-zero values) + cfg.CheckInEnabled = req.CheckInEnabled + if req.TimeoutSeconds > 0 { + cfg.TimeoutSeconds = req.TimeoutSeconds + } + if req.RetryAttempts >= 0 { + cfg.RetryAttempts = req.RetryAttempts + } + if req.RetryDelaySeconds > 0 { + cfg.RetryDelaySeconds = req.RetryDelaySeconds + } + if req.NotificationChannels != nil { + notifJSON, _ := json.Marshal(req.NotificationChannels) + cfg.NotificationChannels = string(notifJSON) + } + cfg.IdleOnly = req.IdleOnly + cfg.UpdatedAt = time.Now().UTC() + + if err := s.repo.UpsertCheckInConfig(cfg); err != nil { + writeJSONError(w, fmt.Sprintf("database error: %v", err), http.StatusInternalServerError) + return + } + + // Reload scheduler to pick up updated config + if s.checkInScheduler != nil { + s.checkInScheduler.ReloadSchedules() + } + + resp := configToResponse(cfg) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} + +// deleteAgentCheckInConfig disables check-ins for an agent. +func (s *Server) deleteAgentCheckInConfig(w http.ResponseWriter, r *http.Request, spaceName, agentName string) { + if err := s.repo.DeleteCheckInConfig(spaceName, agentName); err != nil { + writeJSONError(w, fmt.Sprintf("database error: %v", err), http.StatusInternalServerError) + return + } + + // Reload scheduler to remove deleted config + if s.checkInScheduler != nil { + s.checkInScheduler.ReloadSchedules() + } + + w.WriteHeader(http.StatusNoContent) +} + +// handleCheckInConfigsList handles GET /spaces/{space}/check-ins/configs +func (s *Server) handleCheckInConfigsList(w http.ResponseWriter, r *http.Request, spaceName string) { + enabledOnly := r.URL.Query().Get("enabled") == "true" + + configs, err := s.repo.ListCheckInConfigs(enabledOnly) + if err != nil { + writeJSONError(w, fmt.Sprintf("database error: %v", err), http.StatusInternalServerError) + return + } + + // Filter by space + var filtered []*db.AgentCheckInConfig + for _, cfg := range configs { + if cfg.SpaceName == spaceName { + filtered = append(filtered, cfg) + } + } + + responses := make([]CheckInConfigResponse, 0, len(filtered)) + for _, cfg := range filtered { + responses = append(responses, configToResponse(cfg)) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(responses) +} + +// handleAgentCheckInHistory handles GET /spaces/{space}/agent/{agent}/check-in/history +func (s *Server) handleAgentCheckInHistory(w http.ResponseWriter, r *http.Request, spaceName, agentName string) { + limitStr := r.URL.Query().Get("limit") + limit := 50 // default + if limitStr != "" { + if parsed, err := strconv.Atoi(limitStr); err == nil && parsed > 0 { + limit = parsed + } + } + + events, err := s.repo.ListCheckInEvents(spaceName, agentName, limit) + if err != nil { + writeJSONError(w, fmt.Sprintf("database error: %v", err), http.StatusInternalServerError) + return + } + + responses := make([]CheckInEventResponse, 0, len(events)) + for _, evt := range events { + responses = append(responses, eventToResponse(evt)) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(responses) +} + +// configToResponse converts a DB model to an API response. +func configToResponse(cfg *db.AgentCheckInConfig) CheckInConfigResponse { + var channels []string + if cfg.NotificationChannels != "" { + json.Unmarshal([]byte(cfg.NotificationChannels), &channels) + } + + var lastCheckIn *time.Time + if cfg.LastCheckInAt.Valid { + lastCheckIn = &cfg.LastCheckInAt.Time + } + + return CheckInConfigResponse{ + AgentName: cfg.AgentName, + SpaceName: cfg.SpaceName, + CheckInEnabled: cfg.CheckInEnabled, + CronSchedule: cfg.CronSchedule, + IdleOnly: cfg.IdleOnly, + TimeoutSeconds: cfg.TimeoutSeconds, + RetryAttempts: cfg.RetryAttempts, + RetryDelaySeconds: cfg.RetryDelaySeconds, + NotificationChannels: channels, + LastCheckInAt: lastCheckIn, + EnabledBy: cfg.EnabledBy, + CreatedAt: cfg.CreatedAt, + UpdatedAt: cfg.UpdatedAt, + } +} + +// eventToResponse converts a DB event model to an API response. +func eventToResponse(evt *db.CheckInEvent) CheckInEventResponse { + var responseAt *time.Time + if evt.ResponseAt.Valid { + responseAt = &evt.ResponseAt.Time + } + + var latency *int64 + if evt.ResponseLatencyMs.Valid { + latency = &evt.ResponseLatencyMs.Int64 + } + + return CheckInEventResponse{ + ID: evt.ID, + AgentName: evt.AgentName, + SpaceName: evt.SpaceName, + ScheduledAt: evt.ScheduledAt, + TriggeredAt: evt.TriggeredAt, + AgentStatus: evt.AgentStatus, + MessageSent: evt.MessageSent, + MessageID: evt.MessageID, + ResponseReceived: evt.ResponseReceived, + ResponseAt: responseAt, + ResponseLatencyMs: latency, + StatusAfterCheckIn: evt.StatusAfterCheckIn, + ErrorMessage: evt.ErrorMessage, + RetryCount: evt.RetryCount, + } +} + +// CreateCheckInEventForAgent creates a check-in event and sends a message to the agent. +// This is called by the scheduler. +func (s *Server) CreateCheckInEventForAgent(spaceName, agentName string) error { + // Get agent status + agent, err := s.repo.GetAgent(spaceName, agentName) + if err != nil { + return fmt.Errorf("get agent: %w", err) + } + if agent == nil { + return fmt.Errorf("agent not found") + } + + // Create event + event := &db.CheckInEvent{ + ID: uuid.New().String(), + SpaceName: spaceName, + AgentName: agentName, + ScheduledAt: time.Now().UTC(), + TriggeredAt: time.Now().UTC(), + AgentStatus: agent.Status, + MessageSent: false, + ResponseReceived: false, + } + + if err := s.repo.CreateCheckInEvent(event); err != nil { + return fmt.Errorf("create event: %w", err) + } + + // Send check-in message via the existing message system + // TODO: Integrate with odis-mcp send_message when scheduler is implemented + + return nil +} diff --git a/internal/coordinator/handlers_space.go b/internal/coordinator/handlers_space.go index d644e0e..fb14bc3 100644 --- a/internal/coordinator/handlers_space.go +++ b/internal/coordinator/handlers_space.go @@ -182,6 +182,21 @@ func (s *Server) handleSpaceRoute(w http.ResponseWriter, r *http.Request) { s.handleAgentDuplicate(w, r, spaceName, agentName) case "documents": s.handleAgentDocumentsList(w, r, spaceName, agentName) + case "check-in": + if len(parts) >= 5 { + // /spaces/{space}/agent/{agent}/check-in/{config|history} + subAction := strings.TrimRight(parts[4], "/") + switch subAction { + case "config": + s.handleAgentCheckInConfig(w, r, spaceName, agentName) + case "history": + s.handleAgentCheckInHistory(w, r, spaceName, agentName) + default: + http.NotFound(w, r) + } + } else { + http.NotFound(w, r) + } default: // Handle document path: /spaces/{space}/agent/{agent}/{slug} s.handleAgentDocument(w, r, spaceName, agentName, action) @@ -220,6 +235,18 @@ func (s *Server) handleSpaceRoute(w http.ResponseWriter, r *http.Request) { s.handleSpaceHierarchy(w, r, spaceName) case "history": s.handleSpaceHistory(w, r, spaceName) + case "check-ins": + if len(parts) >= 3 { + subAction := strings.TrimRight(parts[2], "/") + switch subAction { + case "configs": + s.handleCheckInConfigsList(w, r, spaceName) + default: + http.NotFound(w, r) + } + } else { + http.NotFound(w, r) + } case "ignition": agentName := "" if len(parts) == 3 { diff --git a/internal/coordinator/server.go b/internal/coordinator/server.go index 53c9a8b..3059bc1 100644 --- a/internal/coordinator/server.go +++ b/internal/coordinator/server.go @@ -13,8 +13,10 @@ import ( "time" bossdb "github.com/ambient/platform/components/boss/internal/coordinator/db" + "github.com/ambient/platform/components/boss/internal/coordinator/checkin" sqliteadapter "github.com/ambient/platform/components/boss/internal/adapters/storage/sqlite" "github.com/ambient/platform/components/boss/internal/domain/ports" + "github.com/prometheus/client_golang/prometheus/promhttp" ) const ( @@ -101,6 +103,10 @@ type Server struct { // round-trip on every authenticated agent POST. Keyed by "space/agent" // (lowercase). Invalidated when a new token hash is written (spawn/restart). agentTokenCache sync.Map + // checkInScheduler manages automated agent check-ins + checkInScheduler *checkin.Scheduler + // checkInResponseTracker monitors and validates check-in responses + checkInResponseTracker *checkin.ResponseTracker } func NewServer(port, dataDir string) *Server { @@ -314,6 +320,8 @@ func (s *Server) Start() error { s.handlePersonaDetail(w, r, rest) }) mux.HandleFunc("/settings", s.handleSettings) + // Prometheus metrics endpoint + mux.Handle("/metrics", promhttp.Handler()) mcpHandler := s.buildMCPHandler() mux.Handle("/mcp", mcpHandler) mux.Handle("/mcp/", mcpHandler) @@ -356,6 +364,22 @@ func (s *Server) Start() error { s.startCompactionLoop(30 * time.Minute) } + // Initialize and start the check-in scheduler + if s.repo != nil { + s.checkInScheduler = checkin.New(s.repo) + // Wire message sender to use the existing message infrastructure + s.checkInScheduler.SetMessageSender(func(spaceName, agentName, message string) error { + return s.sendMessageToAgent(spaceName, agentName, "check-in-scheduler", message, "info") + }) + if err := s.checkInScheduler.Start(); err != nil { + return fmt.Errorf("start check-in scheduler: %w", err) + } + + // Initialize response tracker with shared metrics and start monitoring loop + s.checkInResponseTracker = checkin.NewResponseTracker(s.repo, s.checkInScheduler.GetMetrics()) + go s.checkInResponseLoop(30 * time.Second) + } + return nil } @@ -398,6 +422,14 @@ func (s *Server) Stop() error { return fmt.Errorf("not running") } + // Stop check-in scheduler first + if s.checkInScheduler != nil { + if err := s.checkInScheduler.Stop(); err != nil { + s.emit(DomainEvent{Level: LevelWarn, EventType: EventServerError, + Msg: fmt.Sprintf("error stopping check-in scheduler: %v", err)}) + } + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() close(s.stopLiveness) @@ -406,3 +438,84 @@ func (s *Server) Stop() error { s.emit(DomainEvent{Level: LevelInfo, EventType: EventServerStopped, Msg: "coordinator stopped"}) return err } + +// sendMessageToAgent delivers a message to an agent's inbox. +// This is used internally by the check-in scheduler and other system components. +func (s *Server) sendMessageToAgent(spaceName, targetName, senderName, messageText, priority string) error { + if strings.TrimSpace(messageText) == "" { + return fmt.Errorf("message content is required") + } + + msgReq := AgentMessage{ + ID: fmt.Sprintf("%d", time.Now().UnixNano()), + Message: strings.TrimSpace(messageText), + Sender: senderName, + Timestamp: time.Now().UTC(), + Priority: MessagePriority(priority), + } + + ks, ok := s.getSpace(spaceName) + if !ok { + return fmt.Errorf("space %q not found", spaceName) + } + + s.mu.Lock() + defer s.mu.Unlock() + + canonical := resolveAgentName(ks, targetName) + ag := ks.agentStatus(canonical) + if ag == nil { + ag = &AgentUpdate{ + Status: StatusIdle, + Summary: fmt.Sprintf("%s: pending message delivery", canonical), + Messages: []AgentMessage{}, + UpdatedAt: time.Now().UTC(), + } + ks.setAgentStatus(canonical, ag) + } + if ag.Messages == nil { + ag.Messages = []AgentMessage{} + } + ag.Messages = append(ag.Messages, msgReq) + + notif := AgentNotification{ + ID: fmt.Sprintf("%s-%d", canonical, time.Now().UnixNano()), + Type: NotifTypeMessage, + Title: fmt.Sprintf("New message from %s", senderName), + Body: truncateLine(msgReq.Message, 120), + From: senderName, + Timestamp: time.Now().UTC(), + } + ag.Notifications = append(ag.Notifications, notif) + + ks.UpdatedAt = time.Now().UTC() + if err := s.saveSpace(ks); err != nil { + return fmt.Errorf("save space: %w", err) + } + + // Emit events and broadcast SSE + s.emit(DomainEvent{Level: LevelInfo, EventType: EventMsgDelivered, Space: spaceName, Agent: canonical, + Msg: fmt.Sprintf("check-in message delivered to %s", canonical), + Fields: map[string]string{"sender": senderName, "priority": priority}}) + s.journal.Append(spaceName, EventMessageSent, canonical, &msgReq) + + return nil +} + +// checkInResponseLoop periodically checks for pending check-ins and validates responses. +func (s *Server) checkInResponseLoop(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-s.stopLiveness: + return + case <-ticker.C: + if s.checkInResponseTracker != nil { + if err := s.checkInResponseTracker.CheckPendingEvents(); err != nil { + s.logEvent(fmt.Sprintf("warning: check pending check-ins: %v", err)) + } + } + } + } +}