Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ jobs:
uses: actions/checkout@v4

- name: Spec
run: crystal spec
run: crystal spec -Dpreview_mt -Dexecution_context
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# SystemD

SystemD integration for Crystal applications, can notify systemd, get socket listeners and store/restore file descriptors. libsystemd is only required for storing FDs.
SystemD integration for Crystal applications, can notify systemd, get socket listeners, store/restore file descriptors, and monitor memory pressure. libsystemd is only required for storing FDs.

Man pages:

https://man7.org/linux/man-pages/man3/sd_pid_notify.3.html
https://man7.org/linux/man-pages/man3/sd_listen_fds.3.html
https://systemd.io/MEMORY_PRESSURE/

## Installation

Expand Down Expand Up @@ -41,6 +42,14 @@ end
# Enable systemd watchdog support with `WatchdogSec=5` under `[Service]`
SystemD.watchdog

# Monitor memory pressure notifications from systemd
# Enable with `MemoryPressureWatch=auto` and `MemoryPressureThresholdSec=1s` under `[Service]`
SystemD::MemoryPressure.monitor do
# Called when memory pressure is detected
# Take action like clearing caches, reducing memory usage, etc.
clear_caches
end

# Store FDs with the SystemD, they will be sent back
# to the application when it restarts. Requires libsystemd
clients = Array(TCPSocket).new
Expand Down
155 changes: 155 additions & 0 deletions spec/memory_pressure_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
require "./spec_helper"
require "../src/memory_pressure"

describe SystemD::MemoryPressure do
it "does nothing when MEMORY_PRESSURE_WATCH is not set" do
ENV.delete("MEMORY_PRESSURE_WATCH")
ENV.delete("MEMORY_PRESSURE_WRITE")

called = false
SystemD::MemoryPressure.monitor { called = true }

sleep 0.1.seconds
called.should be_false
end

it "does nothing when MEMORY_PRESSURE_WATCH is /dev/null" do
ENV["MEMORY_PRESSURE_WATCH"] = "/dev/null"
ENV.delete("MEMORY_PRESSURE_WRITE")

called = false
SystemD::MemoryPressure.monitor { called = true }

sleep 0.1.seconds
called.should be_false
end

it "monitors memory pressure on a FIFO" do
fifo_path = File.tempname
begin
# Create a FIFO
ret = LibC.mkfifo(fifo_path, 0o600)
raise IO::Error.from_errno("mkfifo failed") if ret != 0

ENV["MEMORY_PRESSURE_WATCH"] = fifo_path
ENV.delete("MEMORY_PRESSURE_WRITE")

wg = WaitGroup.new(1)
SystemD::MemoryPressure.monitor { wg.done }

# Write to the FIFO to trigger memory pressure
File.open(fifo_path, "w") do |f|
f.sync = true
f.print "pressure"
end

# Wait for the callback to be called
wg.wait
ensure
File.delete(fifo_path) if File.exists?(fifo_path)
ENV.delete("MEMORY_PRESSURE_WATCH")
end
end

it "monitors memory pressure on a Unix socket" do
socket_path = File.tempname
begin
# Create a Unix socket server
server = UNIXServer.new(socket_path)

ENV["MEMORY_PRESSURE_WATCH"] = socket_path
ENV.delete("MEMORY_PRESSURE_WRITE")

ch = Channel(Nil).new
SystemD::MemoryPressure.monitor { ch.send(nil) }

# Accept the connection and send data
client = server.accept
client.print "pressure"
client.flush

# Wait for the callback to be called
ch.receive

client.close
server.close
ensure
File.delete(socket_path) if File.exists?(socket_path)
ENV.delete("MEMORY_PRESSURE_WATCH")
end
end

it "writes threshold data when MEMORY_PRESSURE_WRITE is set" do
socket_path = File.tempname
begin
server = UNIXServer.new(socket_path)

# Set up both environment variables
write_data = "some threshold"
ENV["MEMORY_PRESSURE_WATCH"] = socket_path
ENV["MEMORY_PRESSURE_WRITE"] = Base64.strict_encode(write_data)

ch = Channel(Nil).new
SystemD::MemoryPressure.monitor { ch.send(nil) }

# Accept the connection and read the threshold data
client = server.accept
buffer = uninitialized UInt8[4096]
count = client.read(buffer.to_slice)
received = String.new(buffer.to_unsafe, count)
received.should eq write_data

# Now send pressure notification
client.print "pressure"
client.flush

# Wait for the callback to be called
ch.receive

client.close
server.close
ensure
File.delete(socket_path) if File.exists?(socket_path)
ENV.delete("MEMORY_PRESSURE_WATCH")
ENV.delete("MEMORY_PRESSURE_WRITE")
end
end

it "handles socket reconnection" do
socket_path = File.tempname
begin
server = UNIXServer.new(socket_path)

ENV["MEMORY_PRESSURE_WATCH"] = socket_path
ENV.delete("MEMORY_PRESSURE_WRITE")

call_count = 0
SystemD::MemoryPressure.monitor { call_count += 1 }

# Accept first connection and trigger pressure
client1 = server.accept
client1.print "pressure1"

# Close the connection to force reconnection
client1.close

# Accept second connection and trigger pressure again
client2 = server.accept
client2.print "pressure2"

# Wait a bit for callbacks
timeout = Time.monotonic + 2.seconds
until call_count >= 2 || Time.monotonic > timeout
Fiber.yield
end

call_count.should be >= 2

client2.close
server.close
ensure
File.delete(socket_path) if File.exists?(socket_path)
ENV.delete("MEMORY_PRESSURE_WATCH")
end
end
end
1 change: 1 addition & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require "spec"
require "socket"
require "wait_group"
require "../src/systemd"

ENV["LISTEN_PID"] = Process.pid.to_s
Loading
Loading