Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,22 @@ jobs:
run: cargo fmt --all -- --check
- name: Run cargo clippy
run: make lint
test:
name: Unit Tests

test-windows:
name: Windows Unit Tests
runs-on: windows-latest
steps:
- name: Install postgres
uses: tj-actions/install-postgresql@v3
with:
postgresql-version: 17
- name: Checkout
uses: actions/checkout@v4
- name: Run tests
run: make test-ci-windows

test-unix:
name: Unix Unit Tests
runs-on: ubuntu-latest
steps:
- name: Install postgres
Expand All @@ -27,7 +41,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Run tests
run: make test-ci
run: make test-ci-unix

doc:
name: Documentation Check
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ test:
cargo test --tests --examples --all-features

# Run all tests with and without all features
test-ci:
test-ci-unix:
cargo test --tests --examples --no-default-features
cargo test --tests --examples --all-features
make -C examples/python-sqlalchemy test-ci

test-ci-windows:
cargo test --tests --examples --no-default-features
cargo test --tests --examples --all-features

# Run clippy
lint:
cargo clippy --no-deps --all-targets --all-features -- -W clippy::pedantic \
Expand Down
92 changes: 60 additions & 32 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use crate::{PgTempDB, PgTempDBBuilder};

use std::net::SocketAddr;

use tokio::io::AsyncWriteExt;
use tokio::io::{self, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::signal::unix::{signal, SignalKind};

#[cfg(feature = "cli")]
/// Contains the clap args struct
Expand Down Expand Up @@ -144,7 +143,7 @@ impl PgTempDaemon {

/// Start the daemon, listening for either TCP connections on the configured port. The server
/// shuts down when sent a SIGINT (e.g. via ctrl-C).
pub async fn start(mut self) {
pub async fn start(self) {
let uri = self.conn_uri();
if self.single_mode {
println!("starting pgtemp server in single mode at {}", uri);
Expand All @@ -155,46 +154,75 @@ impl PgTempDaemon {
let listener = TcpListener::bind(("127.0.0.1", self.port))
.await
.expect("failed to bind to daemon port");
self.listen(listener).await
}

/// Main daemon listening loop for unix
#[cfg(unix)]
async fn listen(mut self, listener: TcpListener) {
use tokio::signal::unix::{signal, SignalKind};

let mut sig = signal(SignalKind::interrupt()).expect("failed to hook to interrupt signal");
loop {
tokio::select! {
res = listener.accept() => {
if let Ok((client_conn, client_addr)) = res {
client_conn.set_nodelay(true).expect("failed to set nodelay on client connection");
let db: Option<PgTempDB>;
let db_port: u16;
if self.single_mode {
db = None;
db_port = self.dbs[0].db_port();
}
else {
let take_db = self.dbs.pop().unwrap();
db_port = take_db.db_port();
db = Some(take_db);
}
let db_conn = TcpStream::connect(("127.0.0.1", db_port))
.await
.expect("failed to connect to postgres server");
db_conn
.set_nodelay(true)
.expect("failed to set nodelay on db connection");
tokio::spawn(async move { proxy_connection(db, db_conn, client_conn, client_addr).await });
// preallocate a new db after one is used
if self.dbs.is_empty() && !self.single_mode {
self.allocate_db().await;
}
}
else {
println!("idk when this errs");
}
res = listener.accept() => self.on_listener_accept(res).await,
_sig_event = sig.recv() => {
println!("got interrupt, exiting");
break;
}
}
}
}

/// Main daemon listening loop for windows
#[cfg(windows)]
async fn listen(mut self, listener: TcpListener) {
let mut sig =
tokio::signal::windows::ctrl_c().expect("failed to hook windows interrupt signal");
loop {
tokio::select! {
res = listener.accept() => self.on_listener_accept(res).await,
_sig_event = sig.recv() => {
println!("got interrupt, exiting");
break;
}
}
}
}

/// Called when a connection is accepted from a TcpListener.
async fn on_listener_accept(&mut self, result: io::Result<(TcpStream, SocketAddr)>) {
if let Ok((client_conn, client_addr)) = result {
client_conn
.set_nodelay(true)
.expect("failed to set nodelay on client connection");
let db: Option<PgTempDB>;
let db_port: u16;
if self.single_mode {
db = None;
db_port = self.dbs[0].db_port();
} else {
let take_db = self.dbs.pop().unwrap();
db_port = take_db.db_port();
db = Some(take_db);
}
let db_conn = TcpStream::connect(("127.0.0.1", db_port))
.await
.expect("failed to connect to postgres server");
db_conn
.set_nodelay(true)
.expect("failed to set nodelay on db connection");
tokio::spawn(
async move { proxy_connection(db, db_conn, client_conn, client_addr).await },
);
// preallocate a new db after one is used
if self.dbs.is_empty() && !self.single_mode {
self.allocate_db().await;
}
} else {
println!("idk when this errs");
}
}
}

/// When we're in single mode, we pass None to the db here so it doesn't get deallocated when the
Expand Down
20 changes: 18 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ impl PgTempDB {
.postgres_process
.take()
.expect("shutdown with no postgres process");
let temp_dir = self.temp_dir.take().unwrap();

// fast (not graceful) shutdown via SIGINT
// TODO: graceful shutdown via SIGTERM
Expand All @@ -173,11 +172,28 @@ impl PgTempDB {
// the postgres server says "we're still connected to a client, can't shut down yet" and we
// have a deadlock.
#[allow(clippy::cast_possible_wrap)]
let _ret = unsafe { libc::kill(postgres_process.id() as i32, libc::SIGINT) };
#[cfg(unix)]
{
unsafe {
libc::kill(postgres_process.id() as i32, libc::SIGINT);
}
}
#[cfg(windows)]
{
std::process::Command::new("pg_ctl")
.arg("stop")
.arg("-D")
.arg(self.data_dir())
.output()
.expect("Failed to stop server with pg_ctl. Is it installed and on your path?");
}

let _output = postgres_process
.wait_with_output()
.expect("postgres server failed to exit cleanly");

let temp_dir = self.temp_dir.take().unwrap();

if self.persist {
// this prevents the dir from being deleted on drop
let _path = temp_dir.into_path();
Expand Down
Loading