Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import clockwork
import clockwork/schedule
import gleam/erlang/process
import gleam/io
import gleam/otp/static_supervisor as supervisor
import gleam/time/timestamp

pub fn main() {

Expand All @@ -28,17 +30,29 @@ pub fn main() {
|> clockwork.with_month(clockwork.ranging(from: 5, to: 10))
|> clockwork.with_weekday(clockwork.ranging_every(2, from: 2, to: 6))

let now = ...
let now = timestamp.system_time()

// Here we calculate the next occurrence
// of the cron schedule after the given timestamp.
clockwork.next_occurrence(given: cron, from: now)

// Here we schedule a function to be executed given the cron schedule.
schedule.configure_logger()
schedule.start("my_schedule", cron, fn() { io.println("Hello, world!") })
process.sleep_forever()
// The scheduler is run under the supervision of a static supervisor.
let schedule_receiver = process.new_subject()

let schedule_child_spec =
schedule.new("my_schedule", cron, fn() { io.println("Hello, world!") })
|> schedule.with_logging
|> schedule.supervised(schedule_receiver)

let assert Ok(_supervisor) =
supervisor.new()
|> supervisor.add(schedule_child_spec)
|> supervisor.start

let assert Ok(_schedule) = process.receive(schedule_receiver, 1000)

process.sleep_forever()
}
```

Expand Down
8 changes: 4 additions & 4 deletions gleam.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ repository = { type = "github", user = "renatillas", repo = "clockwork" }
# https://gleam.run/writing-gleam/gleam-toml/.

[dependencies]
gleam_stdlib = ">= 0.44.0 and < 2.0.0"
gleam_time = ">= 1.0.0-rc2 and < 2.0.0"
gleam_otp = ">= 0.16.1 and < 1.0.0"
gleam_erlang = ">= 0.34.0 and < 1.0.0"
gleam_stdlib = ">= 0.60.0 and < 2.0.0"
gleam_time = ">= 1.0.0 and < 2.0.0"
gleam_otp = ">= 1.0.0 and < 2.0.0"
gleam_erlang = ">= 1.0.0 and < 2.0.0"
logging = ">= 1.3.0 and < 2.0.0"
glotel = ">= 0.1.0 and < 1.0.0"

Expand Down
40 changes: 20 additions & 20 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
packages = [
{ name = "acceptor_pool", version = "1.0.0", build_tools = ["rebar3"], requirements = [], otp_app = "acceptor_pool", source = "hex", outer_checksum = "0CBCD83FDC8B9AD2EEE2067EF8B91A14858A5883CB7CD800E6FCD5803E158788" },
{ name = "argv", version = "1.0.2", build_tools = ["gleam"], requirements = [], otp_app = "argv", source = "hex", outer_checksum = "BA1FF0929525DEBA1CE67256E5ADF77A7CDDFE729E3E3F57A5BDCAA031DED09D" },
{ name = "birdie", version = "1.2.6", build_tools = ["gleam"], requirements = ["argv", "edit_distance", "filepath", "glance", "gleam_community_ansi", "gleam_erlang", "gleam_stdlib", "justin", "rank", "simplifile", "term_size", "trie_again"], otp_app = "birdie", source = "hex", outer_checksum = "1363F4C7E7433A4A8350CC682BCDDBA5BBC6F66C94EFC63BC43025F796C4F6D0" },
{ name = "birdie", version = "1.3.1", build_tools = ["gleam"], requirements = ["argv", "edit_distance", "filepath", "glance", "gleam_community_ansi", "gleam_stdlib", "justin", "rank", "simplifile", "term_size", "trie_again"], otp_app = "birdie", source = "hex", outer_checksum = "F811C9EDAF920EF48597A26E788907AAF80D9239A5E8C8CCFBD0DD1BB10184D7" },
{ name = "ctx", version = "0.6.0", build_tools = ["rebar3"], requirements = [], otp_app = "ctx", source = "hex", outer_checksum = "A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2" },
{ name = "edit_distance", version = "2.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "edit_distance", source = "hex", outer_checksum = "A1E485C69A70210223E46E63985FA1008B8B2DDA9848B7897469171B29020C05" },
{ name = "filepath", version = "1.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "67A6D15FB39EEB69DD31F8C145BB5A421790581BD6AA14B33D64D5A55DBD6587" },
{ name = "filepath", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "filepath", source = "hex", outer_checksum = "B06A9AF0BF10E51401D64B98E4B627F1D2E48C154967DA7AF4D0914780A6D40A" },
{ name = "glam", version = "2.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "glam", source = "hex", outer_checksum = "4932A2D139AB0389E149396407F89654928D7B815E212BB02F13C66F53B1BBA1" },
{ name = "glance", version = "2.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib", "glexer"], otp_app = "glance", source = "hex", outer_checksum = "106111453AE9BA959184302B7DADF2E8CF322B27A7CB68EE78F3EE43FEACCE2C" },
{ name = "gleam_community_ansi", version = "1.4.2", build_tools = ["gleam"], requirements = ["gleam_community_colour", "gleam_regexp", "gleam_stdlib"], otp_app = "gleam_community_ansi", source = "hex", outer_checksum = "479DEDC748D08B310C9FEB9C4CBEC46B95C874F7F4F2844304D6D20CA78A8BB5" },
{ name = "gleam_community_colour", version = "1.4.1", build_tools = ["gleam"], requirements = ["gleam_json", "gleam_stdlib"], otp_app = "gleam_community_colour", source = "hex", outer_checksum = "386CB9B01B33371538672EEA8A6375A0A0ADEF41F17C86DDCB81C92AD00DA610" },
{ name = "gleam_erlang", version = "0.34.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "0C38F2A128BAA0CEF17C3000BD2097EB80634E239CE31A86400C4416A5D0FDCC" },
{ name = "gleam_json", version = "2.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "C55C5C2B318533A8072D221C5E06E5A75711C129E420DD1CE463342106012E5D" },
{ name = "gleam_otp", version = "0.16.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "50DA1539FC8E8FA09924EB36A67A2BBB0AD6B27BCDED5A7EF627057CF69D035E" },
{ name = "gleam_regexp", version = "1.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_regexp", source = "hex", outer_checksum = "7F5E0C0BBEB3C58E57C9CB05FA9002F970C85AD4A63BA1E55CBCB35C15809179" },
{ name = "gleam_stdlib", version = "0.55.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "32D8F4AE03771516950047813A9E359249BD9FBA5C33463FDB7B953D6F8E896B" },
{ name = "gleam_time", version = "1.0.0-rc2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_time", source = "hex", outer_checksum = "359B03FE6C839BDF22217D4462F716ABCC7F066E7F1917952E8BD26B38A7DDEA" },
{ name = "gleeunit", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "0E6C83834BA65EDCAAF4FE4FB94AC697D9262D83E6F58A750D63C9F6C8A9D9FF" },
{ name = "glexer", version = "2.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "glexer", source = "hex", outer_checksum = "F74FB4F78C3C1E158DF15A7226F33A662672F58EEF1DFE6593B7FCDA38B0A0EB" },
{ name = "glance", version = "5.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "glexer"], otp_app = "glance", source = "hex", outer_checksum = "FAA3DAC74AF71D47C67D88EB32CE629075169F878D148BB1FF225439BE30070A" },
{ name = "gleam_community_ansi", version = "1.4.3", build_tools = ["gleam"], requirements = ["gleam_community_colour", "gleam_regexp", "gleam_stdlib"], otp_app = "gleam_community_ansi", source = "hex", outer_checksum = "8A62AE9CC6EA65BEA630D95016D6C07E4F9973565FA3D0DE68DC4200D8E0DD27" },
{ name = "gleam_community_colour", version = "2.0.1", build_tools = ["gleam"], requirements = ["gleam_json", "gleam_stdlib"], otp_app = "gleam_community_colour", source = "hex", outer_checksum = "F0ACE69E3A47E913B03D3D0BB23A5563A91A4A7D20956916286068F4A9F817FE" },
{ name = "gleam_erlang", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "7E6A5234F927C4B24F8054AB1E4572206C41F9E6D5C6C02273CB7531E7E5CED0" },
{ name = "gleam_json", version = "3.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "5BA154440B22D9800955B1AB854282FA37B97F30F409D76B0824D0A60C934188" },
{ name = "gleam_otp", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "7020E652D18F9ABAC9C877270B14160519FA0856EE80126231C505D719AD68DA" },
{ name = "gleam_regexp", version = "1.1.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_regexp", source = "hex", outer_checksum = "9C215C6CA84A5B35BB934A9B61A9A306EC743153BE2B0425A0D032E477B062A9" },
{ name = "gleam_stdlib", version = "0.60.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "621D600BB134BC239CB2537630899817B1A42E60A1D46C5E9F3FAE39F88C800B" },
{ name = "gleam_time", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_time", source = "hex", outer_checksum = "D71F1AFF7FEB534FF55E5DC58E534E9201BA75A444619788A2E4DEA4EBD87D16" },
{ name = "gleeunit", version = "1.5.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "D33B7736CF0766ED3065F64A1EBB351E72B2E8DE39BAFC8ADA0E35E92A6A934F" },
{ name = "glexer", version = "2.2.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "glexer", source = "hex", outer_checksum = "5C235CBDF4DA5203AD5EAB1D6D8B456ED8162C5424FE2309CFFB7EF438B7C269" },
{ name = "glotel", version = "0.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "opentelemetry", "opentelemetry_exporter"], otp_app = "glotel", source = "hex", outer_checksum = "0F88DEA8B3A03D801328EF744619869394744CB21FD771D2DC9046E10890728F" },
{ name = "gproc", version = "0.9.1", build_tools = ["rebar3"], requirements = [], otp_app = "gproc", source = "hex", outer_checksum = "905088E32E72127ED9466F0BAC0D8E65704CA5E73EE5A62CB073C3117916D507" },
{ name = "grpcbox", version = "0.17.1", build_tools = ["rebar3"], requirements = ["acceptor_pool", "ctx", "gproc", "ts_chatterbox"], otp_app = "grpcbox", source = "hex", outer_checksum = "4A3B5D7111DAABC569DC9CBD9B202A3237D81C80BF97212FBC676832CB0CEB17" },
Expand All @@ -29,22 +29,22 @@ packages = [
{ name = "opentelemetry", version = "1.5.0", build_tools = ["rebar3"], requirements = ["opentelemetry_api"], otp_app = "opentelemetry", source = "hex", outer_checksum = "CDF4F51D17B592FC592B9A75F86A6F808C23044BA7CF7B9534DEBBCC5C23B0EE" },
{ name = "opentelemetry_api", version = "1.4.0", build_tools = ["rebar3", "mix"], requirements = [], otp_app = "opentelemetry_api", source = "hex", outer_checksum = "3DFBBFAA2C2ED3121C5C483162836C4F9027DEF469C41578AF5EF32589FCFC58" },
{ name = "opentelemetry_exporter", version = "1.8.0", build_tools = ["rebar3"], requirements = ["grpcbox", "opentelemetry", "opentelemetry_api", "tls_certificate_check"], otp_app = "opentelemetry_exporter", source = "hex", outer_checksum = "A1F9F271F8D3B02B81462A6BFEF7075FD8457FDB06ADFF5D2537DF5E2264D9AF" },
{ name = "pprint", version = "1.0.4", build_tools = ["gleam"], requirements = ["glam", "gleam_stdlib"], otp_app = "pprint", source = "hex", outer_checksum = "C310A98BDC0995644847C3C8702DE19656D6BCD638B2A8A358B97824379ECAA1" },
{ name = "pprint", version = "1.0.5", build_tools = ["gleam"], requirements = ["glam", "gleam_stdlib"], otp_app = "pprint", source = "hex", outer_checksum = "B5328E55FB9EED3941F1648536FDBFA0D97B0C21D21A6625D726CCB22A36782E" },
{ name = "rank", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "rank", source = "hex", outer_checksum = "5660E361F0E49CBB714CC57CC4C89C63415D8986F05B2DA0C719D5642FAD91C9" },
{ name = "simplifile", version = "2.2.0", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "0DFABEF7DC7A9E2FF4BB27B108034E60C81BEBFCB7AB816B9E7E18ED4503ACD8" },
{ name = "simplifile", version = "2.2.1", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "C88E0EE2D509F6D86EB55161D631657675AA7684DAB83822F7E59EB93D9A60E3" },
{ name = "ssl_verify_fun", version = "1.1.7", build_tools = ["mix", "rebar3", "make"], requirements = [], otp_app = "ssl_verify_fun", source = "hex", outer_checksum = "FE4C190E8F37401D30167C8C405EDA19469F34577987C76DDE613E838BBC67F8" },
{ name = "term_size", version = "1.0.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "term_size", source = "hex", outer_checksum = "D00BD2BC8FB3EBB7E6AE076F3F1FF2AC9D5ED1805F004D0896C784D06C6645F1" },
{ name = "tls_certificate_check", version = "1.27.0", build_tools = ["rebar3"], requirements = ["ssl_verify_fun"], otp_app = "tls_certificate_check", source = "hex", outer_checksum = "51A5AD3DBD72D4694848965F3B5076E8B55D70EB8D5057FCDDD536029AB8A23C" },
{ name = "tls_certificate_check", version = "1.28.0", build_tools = ["rebar3"], requirements = ["ssl_verify_fun"], otp_app = "tls_certificate_check", source = "hex", outer_checksum = "3AB058C3F9457FFFCA916729587415F0DDC822048A0E5B5E2694918556D92DF1" },
{ name = "trie_again", version = "1.1.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "trie_again", source = "hex", outer_checksum = "5B19176F52B1BD98831B57FDC97BD1F88C8A403D6D8C63471407E78598E27184" },
{ name = "ts_chatterbox", version = "0.15.1", build_tools = ["rebar3"], requirements = ["hpack_erl"], otp_app = "chatterbox", source = "hex", outer_checksum = "4F75B91451338BC0DA5F52F3480FA6EF6E3A2AEECFC33686D6B3D0A0948F31AA" },
]

[requirements]
birdie = { version = ">= 1.2.6 and < 2.0.0" }
gleam_erlang = { version = ">= 0.34.0 and < 1.0.0" }
gleam_otp = { version = ">= 0.16.1 and < 1.0.0" }
gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" }
gleam_time = { version = ">= 1.0.0-rc2 and < 2.0.0" }
gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" }
gleam_otp = { version = ">= 1.0.0 and < 2.0.0" }
gleam_stdlib = { version = ">= 0.60.0 and < 2.0.0" }
gleam_time = { version = ">= 1.0.0 and < 2.0.0" }
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
glotel = { version = ">= 0.1.0 and < 1.0.0" }
logging = { version = ">= 1.3.0 and < 2.0.0" }
Expand Down
143 changes: 81 additions & 62 deletions src/clockwork/schedule.gleam
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import clockwork
import gleam/erlang/process
import gleam/float
import gleam/function
import gleam/int
import gleam/io
import gleam/otp/actor
import gleam/otp/supervision
import gleam/result
import gleam/string
import gleam/time/calendar
Expand Down Expand Up @@ -85,44 +84,73 @@ pub fn with_time_offset(
)
}

pub fn start(scheduler: Scheduler) -> Result(Schedule, actor.StartError) {
fn start_actor(scheduler: Scheduler) {
case scheduler.with_logging {
True -> logging.configure()
False -> Nil
}
actor.start_spec(actor.Spec(
init: fn() {
init(
scheduler.cron,
scheduler.job,
scheduler.id,
scheduler.with_telemetry,
scheduler.offset,

actor.new_with_initialiser(100, fn(self) {
let state =
State(
id: scheduler.id,
self:,
cron: scheduler.cron,
job: scheduler.job,
with_telemetry: scheduler.with_telemetry,
offset: scheduler.offset,
)
},
loop: loop,
init_timeout: 100,
))
|> result.map(Schedule)
}

pub fn stop(schedule: Schedule) {
process.send(schedule.subject, Stop)
let selector =
process.new_selector()
|> process.select(self)

actor.initialised(state)
|> actor.selecting(selector)
|> actor.returning(self)
|> Ok
})
|> actor.on_message(loop)
|> actor.start
}

fn init(cron, job, name, telemetry, offset) {
let subject = process.new_subject()
let state = State(name, subject, cron, job, telemetry, offset)
/// Start an unsupervised scheduler. Prefer to use [`supervised`](#supervised) to start
/// the scheduler as part of your supervision tree.
pub fn start(scheduler: Scheduler) -> Result(Schedule, actor.StartError) {
start_actor(scheduler)
|> result.map(fn(started) { Schedule(started.data) })
}

let selector =
process.new_selector() |> process.selecting(subject, function.identity)
/// Start a scheduler as part of your supervision tree. You should provide a subject to receive
/// the schedule value once your supervisor has started.
///
/// ```gleam
/// let schedule_receiver = process.new_subject()
///
/// let schedule_child_spec = schedule.supervised(scheduler, schedule_receiver)
///
/// // Start your supervision tree...
///
/// let assert Ok(schedule) = process.receive(schedule_receiver, 1000)
///
/// schedule.stop(schedule)
/// ```
pub fn supervised(
scheduler: Scheduler,
schedule_receiver: process.Subject(Schedule),
) {
supervision.worker(fn() {
use started <- result.try(start_actor(scheduler))
process.send(schedule_receiver, Schedule(started.data))
Ok(started)
})
}

enqueue_job(cron, state)
logging.log(logging.Info, "[CLOCKWORK] Started cron job: " <> state.id)
actor.Ready(state, selector)
pub fn stop(schedule: Schedule) {
process.send(schedule.subject, Stop)
}

fn loop(message: Message, state: State) {
fn loop(state: State, message: Message) {
case message {
Run -> {
logging.log(
Expand All @@ -135,47 +163,38 @@ fn loop(message: Message, state: State) {
|> timestamp.to_unix_seconds
|> float.to_string(),
)
process.start(
fn() {
case state.with_telemetry {
True -> {
let human_readable_time =
process.spawn(fn() {
case state.with_telemetry {
True -> {
let human_readable_time =
timestamp.system_time()
|> timestamp.to_calendar(state.offset)

use _ <- span.new_of_kind(span_kind.Consumer, "job-" <> state.id, [
#(
"timestamp",
timestamp.system_time()
|> timestamp.to_calendar(state.offset)

use _ <- span.new_of_kind(span_kind.Consumer, "job-" <> state.id, [
#(
"timestamp",
timestamp.system_time()
|> timestamp.to_unix_seconds
|> float.to_string(),
),
#("year", { human_readable_time.0 }.year |> int.to_string()),
#("month", { human_readable_time.0 }.month |> string.inspect()),
#("day", { human_readable_time.0 }.day |> int.to_string()),
#("hour", { human_readable_time.1 }.hours |> int.to_string()),
#(
"minute",
{ human_readable_time.1 }.minutes |> int.to_string(),
),
#(
"second",
{ human_readable_time.1 }.seconds |> int.to_string(),
),
])
state.job()
}
False -> state.job()
|> timestamp.to_unix_seconds
|> float.to_string(),
),
#("year", { human_readable_time.0 }.year |> int.to_string()),
#("month", { human_readable_time.0 }.month |> string.inspect()),
#("day", { human_readable_time.0 }.day |> int.to_string()),
#("hour", { human_readable_time.1 }.hours |> int.to_string()),
#("minute", { human_readable_time.1 }.minutes |> int.to_string()),
#("second", { human_readable_time.1 }.seconds |> int.to_string()),
])
state.job()
}
},
True,
)
False -> state.job()
}
})
enqueue_job(state.cron, state)
actor.continue(state)
}
Stop -> {
logging.log(logging.Info, "[CLOCKWORK] Stopping job: " <> state.id)
actor.Stop(process.Normal)
actor.stop()
}
}
}
Expand Down