diff --git a/README.md b/README.md index 456c29b..da5f5f2 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,8 @@ import clockwork import clockwork/schedule import gleam/erlang/process import gleam/io +import gleam/otp/static_supervisor as supervisor +import gleam/time/timestamp pub fn main() { @@ -28,17 +30,29 @@ pub fn main() { |> clockwork.with_month(clockwork.ranging(from: 5, to: 10)) |> clockwork.with_weekday(clockwork.ranging_every(2, from: 2, to: 6)) - let now = ... + let now = timestamp.system_time() // Here we calculate the next occurrence // of the cron schedule after the given timestamp. clockwork.next_occurrence(given: cron, from: now) // Here we schedule a function to be executed given the cron schedule. - schedule.configure_logger() - schedule.start("my_schedule", cron, fn() { io.println("Hello, world!") }) - process.sleep_forever() + // The scheduler is run under the supervision of a static supervisor. + let schedule_receiver = process.new_subject() + + let schedule_child_spec = + schedule.new("my_schedule", cron, fn() { io.println("Hello, world!") }) + |> schedule.with_logging + |> schedule.supervised(schedule_receiver) + + let assert Ok(_supervisor) = + supervisor.new() + |> supervisor.add(schedule_child_spec) + |> supervisor.start + let assert Ok(_schedule) = process.receive(schedule_receiver, 1000) + + process.sleep_forever() } ``` diff --git a/gleam.toml b/gleam.toml index 48d4ba9..17f3dcc 100644 --- a/gleam.toml +++ b/gleam.toml @@ -13,10 +13,10 @@ repository = { type = "github", user = "renatillas", repo = "clockwork" } # https://gleam.run/writing-gleam/gleam-toml/. [dependencies] -gleam_stdlib = ">= 0.44.0 and < 2.0.0" -gleam_time = ">= 1.0.0-rc2 and < 2.0.0" -gleam_otp = ">= 0.16.1 and < 1.0.0" -gleam_erlang = ">= 0.34.0 and < 1.0.0" +gleam_stdlib = ">= 0.60.0 and < 2.0.0" +gleam_time = ">= 1.0.0 and < 2.0.0" +gleam_otp = ">= 1.0.0 and < 2.0.0" +gleam_erlang = ">= 1.0.0 and < 2.0.0" logging = ">= 1.3.0 and < 2.0.0" glotel = ">= 0.1.0 and < 1.0.0" diff --git a/manifest.toml b/manifest.toml index 98c7140..8e8833e 100644 --- a/manifest.toml +++ b/manifest.toml @@ -4,22 +4,22 @@ packages = [ { name = "acceptor_pool", version = "1.0.0", build_tools = ["rebar3"], requirements = [], otp_app = "acceptor_pool", source = "hex", outer_checksum = "0CBCD83FDC8B9AD2EEE2067EF8B91A14858A5883CB7CD800E6FCD5803E158788" }, { name = "argv", version = "1.0.2", build_tools = ["gleam"], requirements = [], otp_app = "argv", source = "hex", outer_checksum = "BA1FF0929525DEBA1CE67256E5ADF77A7CDDFE729E3E3F57A5BDCAA031DED09D" }, - { name = "birdie", version = "1.2.6", build_tools = ["gleam"], requirements = ["argv", "edit_distance", "filepath", "glance", "gleam_community_ansi", "gleam_erlang", "gleam_stdlib", "justin", "rank", "simplifile", "term_size", "trie_again"], otp_app = "birdie", source = "hex", outer_checksum = "1363F4C7E7433A4A8350CC682BCDDBA5BBC6F66C94EFC63BC43025F796C4F6D0" }, + { name = "birdie", version = "1.3.1", build_tools = ["gleam"], requirements = ["argv", "edit_distance", "filepath", "glance", "gleam_community_ansi", "gleam_stdlib", "justin", "rank", "simplifile", "term_size", "trie_again"], otp_app = "birdie", source = "hex", outer_checksum = "F811C9EDAF920EF48597A26E788907AAF80D9239A5E8C8CCFBD0DD1BB10184D7" }, { name = "ctx", version = "0.6.0", build_tools = ["rebar3"], requirements = [], otp_app = "ctx", source = "hex", outer_checksum = "A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2" }, { name = "edit_distance", version = "2.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "edit_distance", source = "hex", outer_checksum = "A1E485C69A70210223E46E63985FA1008B8B2DDA9848B7897469171B29020C05" }, - { name = "filepath", version = "1.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "67A6D15FB39EEB69DD31F8C145BB5A421790581BD6AA14B33D64D5A55DBD6587" }, + { name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" }, { name = "glam", version = "2.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "glam", source = "hex", outer_checksum = "4932A2D139AB0389E149396407F89654928D7B815E212BB02F13C66F53B1BBA1" }, - { name = "glance", version = "2.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib", "glexer"], otp_app = "glance", source = "hex", outer_checksum = "106111453AE9BA959184302B7DADF2E8CF322B27A7CB68EE78F3EE43FEACCE2C" }, - { name = "gleam_community_ansi", version = "1.4.2", build_tools = ["gleam"], requirements = ["gleam_community_colour", "gleam_regexp", "gleam_stdlib"], otp_app = "gleam_community_ansi", source = "hex", outer_checksum = "479DEDC748D08B310C9FEB9C4CBEC46B95C874F7F4F2844304D6D20CA78A8BB5" }, - { name = "gleam_community_colour", version = "1.4.1", build_tools = ["gleam"], requirements = ["gleam_json", "gleam_stdlib"], otp_app = "gleam_community_colour", source = "hex", outer_checksum = "386CB9B01B33371538672EEA8A6375A0A0ADEF41F17C86DDCB81C92AD00DA610" }, - { name = "gleam_erlang", version = "0.34.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "0C38F2A128BAA0CEF17C3000BD2097EB80634E239CE31A86400C4416A5D0FDCC" }, - { name = "gleam_json", version = "2.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "C55C5C2B318533A8072D221C5E06E5A75711C129E420DD1CE463342106012E5D" }, - { name = "gleam_otp", version = "0.16.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "50DA1539FC8E8FA09924EB36A67A2BBB0AD6B27BCDED5A7EF627057CF69D035E" }, - { name = "gleam_regexp", version = "1.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_regexp", source = "hex", outer_checksum = "7F5E0C0BBEB3C58E57C9CB05FA9002F970C85AD4A63BA1E55CBCB35C15809179" }, - { name = "gleam_stdlib", version = "0.55.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "32D8F4AE03771516950047813A9E359249BD9FBA5C33463FDB7B953D6F8E896B" }, - { name = "gleam_time", version = "1.0.0-rc2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_time", source = "hex", outer_checksum = "359B03FE6C839BDF22217D4462F716ABCC7F066E7F1917952E8BD26B38A7DDEA" }, - { name = "gleeunit", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "0E6C83834BA65EDCAAF4FE4FB94AC697D9262D83E6F58A750D63C9F6C8A9D9FF" }, - { name = "glexer", version = "2.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "glexer", source = "hex", outer_checksum = "F74FB4F78C3C1E158DF15A7226F33A662672F58EEF1DFE6593B7FCDA38B0A0EB" }, + { name = "glance", version = "5.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "glexer"], otp_app = "glance", source = "hex", outer_checksum = "FAA3DAC74AF71D47C67D88EB32CE629075169F878D148BB1FF225439BE30070A" }, + { name = "gleam_community_ansi", version = "1.4.3", build_tools = ["gleam"], requirements = ["gleam_community_colour", "gleam_regexp", "gleam_stdlib"], otp_app = "gleam_community_ansi", source = "hex", outer_checksum = "8A62AE9CC6EA65BEA630D95016D6C07E4F9973565FA3D0DE68DC4200D8E0DD27" }, + { name = "gleam_community_colour", version = "2.0.1", build_tools = ["gleam"], requirements = ["gleam_json", "gleam_stdlib"], otp_app = "gleam_community_colour", source = "hex", outer_checksum = "F0ACE69E3A47E913B03D3D0BB23A5563A91A4A7D20956916286068F4A9F817FE" }, + { name = "gleam_erlang", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "7E6A5234F927C4B24F8054AB1E4572206C41F9E6D5C6C02273CB7531E7E5CED0" }, + { name = "gleam_json", version = "3.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "5BA154440B22D9800955B1AB854282FA37B97F30F409D76B0824D0A60C934188" }, + { name = "gleam_otp", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "7020E652D18F9ABAC9C877270B14160519FA0856EE80126231C505D719AD68DA" }, + { name = "gleam_regexp", version = "1.1.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_regexp", source = "hex", outer_checksum = "9C215C6CA84A5B35BB934A9B61A9A306EC743153BE2B0425A0D032E477B062A9" }, + { name = "gleam_stdlib", version = "0.60.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "621D600BB134BC239CB2537630899817B1A42E60A1D46C5E9F3FAE39F88C800B" }, + { name = "gleam_time", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_time", source = "hex", outer_checksum = "D71F1AFF7FEB534FF55E5DC58E534E9201BA75A444619788A2E4DEA4EBD87D16" }, + { name = "gleeunit", version = "1.5.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "D33B7736CF0766ED3065F64A1EBB351E72B2E8DE39BAFC8ADA0E35E92A6A934F" }, + { name = "glexer", version = "2.2.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "glexer", source = "hex", outer_checksum = "5C235CBDF4DA5203AD5EAB1D6D8B456ED8162C5424FE2309CFFB7EF438B7C269" }, { name = "glotel", version = "0.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "opentelemetry", "opentelemetry_exporter"], otp_app = "glotel", source = "hex", outer_checksum = "0F88DEA8B3A03D801328EF744619869394744CB21FD771D2DC9046E10890728F" }, { name = "gproc", version = "0.9.1", build_tools = ["rebar3"], requirements = [], otp_app = "gproc", source = "hex", outer_checksum = "905088E32E72127ED9466F0BAC0D8E65704CA5E73EE5A62CB073C3117916D507" }, { name = "grpcbox", version = "0.17.1", build_tools = ["rebar3"], requirements = ["acceptor_pool", "ctx", "gproc", "ts_chatterbox"], otp_app = "grpcbox", source = "hex", outer_checksum = "4A3B5D7111DAABC569DC9CBD9B202A3237D81C80BF97212FBC676832CB0CEB17" }, @@ -29,22 +29,22 @@ packages = [ { name = "opentelemetry", version = "1.5.0", build_tools = ["rebar3"], requirements = ["opentelemetry_api"], otp_app = "opentelemetry", source = "hex", outer_checksum = "CDF4F51D17B592FC592B9A75F86A6F808C23044BA7CF7B9534DEBBCC5C23B0EE" }, { name = "opentelemetry_api", version = "1.4.0", build_tools = ["rebar3", "mix"], requirements = [], otp_app = "opentelemetry_api", source = "hex", outer_checksum = "3DFBBFAA2C2ED3121C5C483162836C4F9027DEF469C41578AF5EF32589FCFC58" }, { name = "opentelemetry_exporter", version = "1.8.0", build_tools = ["rebar3"], requirements = ["grpcbox", "opentelemetry", "opentelemetry_api", "tls_certificate_check"], otp_app = "opentelemetry_exporter", source = "hex", outer_checksum = "A1F9F271F8D3B02B81462A6BFEF7075FD8457FDB06ADFF5D2537DF5E2264D9AF" }, - { name = "pprint", version = "1.0.4", build_tools = ["gleam"], requirements = ["glam", "gleam_stdlib"], otp_app = "pprint", source = "hex", outer_checksum = "C310A98BDC0995644847C3C8702DE19656D6BCD638B2A8A358B97824379ECAA1" }, + { name = "pprint", version = "1.0.5", build_tools = ["gleam"], requirements = ["glam", "gleam_stdlib"], otp_app = "pprint", source = "hex", outer_checksum = "B5328E55FB9EED3941F1648536FDBFA0D97B0C21D21A6625D726CCB22A36782E" }, { name = "rank", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "rank", source = "hex", outer_checksum = "5660E361F0E49CBB714CC57CC4C89C63415D8986F05B2DA0C719D5642FAD91C9" }, - { name = "simplifile", version = "2.2.0", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "0DFABEF7DC7A9E2FF4BB27B108034E60C81BEBFCB7AB816B9E7E18ED4503ACD8" }, + { name = "simplifile", version = "2.2.1", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "C88E0EE2D509F6D86EB55161D631657675AA7684DAB83822F7E59EB93D9A60E3" }, { name = "ssl_verify_fun", version = "1.1.7", build_tools = ["mix", "rebar3", "make"], requirements = [], otp_app = "ssl_verify_fun", source = "hex", outer_checksum = "FE4C190E8F37401D30167C8C405EDA19469F34577987C76DDE613E838BBC67F8" }, { name = "term_size", version = "1.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "term_size", source = "hex", outer_checksum = "D00BD2BC8FB3EBB7E6AE076F3F1FF2AC9D5ED1805F004D0896C784D06C6645F1" }, - { name = "tls_certificate_check", version = "1.27.0", build_tools = ["rebar3"], requirements = ["ssl_verify_fun"], otp_app = "tls_certificate_check", source = "hex", outer_checksum = "51A5AD3DBD72D4694848965F3B5076E8B55D70EB8D5057FCDDD536029AB8A23C" }, + { name = "tls_certificate_check", version = "1.28.0", build_tools = ["rebar3"], requirements = ["ssl_verify_fun"], otp_app = "tls_certificate_check", source = "hex", outer_checksum = "3AB058C3F9457FFFCA916729587415F0DDC822048A0E5B5E2694918556D92DF1" }, { name = "trie_again", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "trie_again", source = "hex", outer_checksum = "5B19176F52B1BD98831B57FDC97BD1F88C8A403D6D8C63471407E78598E27184" }, { name = "ts_chatterbox", version = "0.15.1", build_tools = ["rebar3"], requirements = ["hpack_erl"], otp_app = "chatterbox", source = "hex", outer_checksum = "4F75B91451338BC0DA5F52F3480FA6EF6E3A2AEECFC33686D6B3D0A0948F31AA" }, ] [requirements] birdie = { version = ">= 1.2.6 and < 2.0.0" } -gleam_erlang = { version = ">= 0.34.0 and < 1.0.0" } -gleam_otp = { version = ">= 0.16.1 and < 1.0.0" } -gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } -gleam_time = { version = ">= 1.0.0-rc2 and < 2.0.0" } +gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" } +gleam_otp = { version = ">= 1.0.0 and < 2.0.0" } +gleam_stdlib = { version = ">= 0.60.0 and < 2.0.0" } +gleam_time = { version = ">= 1.0.0 and < 2.0.0" } gleeunit = { version = ">= 1.0.0 and < 2.0.0" } glotel = { version = ">= 0.1.0 and < 1.0.0" } logging = { version = ">= 1.3.0 and < 2.0.0" } diff --git a/src/clockwork/schedule.gleam b/src/clockwork/schedule.gleam index 547c8d8..bdec012 100644 --- a/src/clockwork/schedule.gleam +++ b/src/clockwork/schedule.gleam @@ -1,10 +1,9 @@ import clockwork import gleam/erlang/process import gleam/float -import gleam/function import gleam/int -import gleam/io import gleam/otp/actor +import gleam/otp/supervision import gleam/result import gleam/string import gleam/time/calendar @@ -85,44 +84,73 @@ pub fn with_time_offset( ) } -pub fn start(scheduler: Scheduler) -> Result(Schedule, actor.StartError) { +fn start_actor(scheduler: Scheduler) { case scheduler.with_logging { True -> logging.configure() False -> Nil } - actor.start_spec(actor.Spec( - init: fn() { - init( - scheduler.cron, - scheduler.job, - scheduler.id, - scheduler.with_telemetry, - scheduler.offset, + + actor.new_with_initialiser(100, fn(self) { + let state = + State( + id: scheduler.id, + self:, + cron: scheduler.cron, + job: scheduler.job, + with_telemetry: scheduler.with_telemetry, + offset: scheduler.offset, ) - }, - loop: loop, - init_timeout: 100, - )) - |> result.map(Schedule) -} -pub fn stop(schedule: Schedule) { - process.send(schedule.subject, Stop) + let selector = + process.new_selector() + |> process.select(self) + + actor.initialised(state) + |> actor.selecting(selector) + |> actor.returning(self) + |> Ok + }) + |> actor.on_message(loop) + |> actor.start } -fn init(cron, job, name, telemetry, offset) { - let subject = process.new_subject() - let state = State(name, subject, cron, job, telemetry, offset) +/// Start an unsupervised scheduler. Prefer to use [`supervised`](#supervised) to start +/// the scheduler as part of your supervision tree. +pub fn start(scheduler: Scheduler) -> Result(Schedule, actor.StartError) { + start_actor(scheduler) + |> result.map(fn(started) { Schedule(started.data) }) +} - let selector = - process.new_selector() |> process.selecting(subject, function.identity) +/// Start a scheduler as part of your supervision tree. You should provide a subject to receive +/// the schedule value once your supervisor has started. +/// +/// ```gleam +/// let schedule_receiver = process.new_subject() +/// +/// let schedule_child_spec = schedule.supervised(scheduler, schedule_receiver) +/// +/// // Start your supervision tree... +/// +/// let assert Ok(schedule) = process.receive(schedule_receiver, 1000) +/// +/// schedule.stop(schedule) +/// ``` +pub fn supervised( + scheduler: Scheduler, + schedule_receiver: process.Subject(Schedule), +) { + supervision.worker(fn() { + use started <- result.try(start_actor(scheduler)) + process.send(schedule_receiver, Schedule(started.data)) + Ok(started) + }) +} - enqueue_job(cron, state) - logging.log(logging.Info, "[CLOCKWORK] Started cron job: " <> state.id) - actor.Ready(state, selector) +pub fn stop(schedule: Schedule) { + process.send(schedule.subject, Stop) } -fn loop(message: Message, state: State) { +fn loop(state: State, message: Message) { case message { Run -> { logging.log( @@ -135,47 +163,38 @@ fn loop(message: Message, state: State) { |> timestamp.to_unix_seconds |> float.to_string(), ) - process.start( - fn() { - case state.with_telemetry { - True -> { - let human_readable_time = + process.spawn(fn() { + case state.with_telemetry { + True -> { + let human_readable_time = + timestamp.system_time() + |> timestamp.to_calendar(state.offset) + + use _ <- span.new_of_kind(span_kind.Consumer, "job-" <> state.id, [ + #( + "timestamp", timestamp.system_time() - |> timestamp.to_calendar(state.offset) - - use _ <- span.new_of_kind(span_kind.Consumer, "job-" <> state.id, [ - #( - "timestamp", - timestamp.system_time() - |> timestamp.to_unix_seconds - |> float.to_string(), - ), - #("year", { human_readable_time.0 }.year |> int.to_string()), - #("month", { human_readable_time.0 }.month |> string.inspect()), - #("day", { human_readable_time.0 }.day |> int.to_string()), - #("hour", { human_readable_time.1 }.hours |> int.to_string()), - #( - "minute", - { human_readable_time.1 }.minutes |> int.to_string(), - ), - #( - "second", - { human_readable_time.1 }.seconds |> int.to_string(), - ), - ]) - state.job() - } - False -> state.job() + |> timestamp.to_unix_seconds + |> float.to_string(), + ), + #("year", { human_readable_time.0 }.year |> int.to_string()), + #("month", { human_readable_time.0 }.month |> string.inspect()), + #("day", { human_readable_time.0 }.day |> int.to_string()), + #("hour", { human_readable_time.1 }.hours |> int.to_string()), + #("minute", { human_readable_time.1 }.minutes |> int.to_string()), + #("second", { human_readable_time.1 }.seconds |> int.to_string()), + ]) + state.job() } - }, - True, - ) + False -> state.job() + } + }) enqueue_job(state.cron, state) actor.continue(state) } Stop -> { logging.log(logging.Info, "[CLOCKWORK] Stopping job: " <> state.id) - actor.Stop(process.Normal) + actor.stop() } } }