Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 5 additions & 35 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,45 +7,15 @@

# Code style

Use standard Rust code style. Use `cargo fmt` to reformat code automatically after every edit.
Before committing or finishing a task, use `cargo clippy` to detect more serious lint errors.

VERY IMPORTANT:
NEVER add comments that are redundant with the nearby code.
ALWAYS be sure comments document "Why", not "What", the code is doing.
ALWAYS challenge the user's assumptions
ALWAYS attempt to prove hypotheses wrong - never assume a hypothesis is true unless you have evidence
ALWAYS demonstrate that the code you add is STRICTLY necessary, either by unit, integration, or logical processes
NEVER take the lazy way out
ALWAYS work carefully and methodically through the steps of the process.
NEVER use quick fixes. Always carefully work through the problem unless specifically asked.
ALWAYS Ask clarifying questions before implementing
ALWAYS Break large tasks into single-session chunks

VERY IMPORTANT: you are to act as a detective, attempting to find ways to falsify the code or planning we've done by discovering gaps or inconsistencies. ONLY write code when it is absolutely required to pass tests, the build, or typecheck.

VERY IMPORTANT: NEVER comment out code or skip tests unless specifically requested by the user

## Principles
- **Data first**: Define types before implementation
- **Small Modules**: Try to keep files under 200 lines, unless required by implementation. NEVER allow files to exceed 1000 lines unless specifically instructed.
- Use standard Rust code style.
- Use `cargo fmt` to reformat code automatically after every edit.
- Don't write functions with many arguments: create a struct and use that as input instead.

# Workflow

- Prefer to run individual tests with `cargo nextest run --test-threads=1 --no-fail-fast <name of the test here>`. This is much faster.
- A local PostgreSQL server is required for some tests to pass. Ensure it is set up, and if necessary create a database called "pgdog", and create a user called "pgdog" with password "pgdog".
- Focus on files in `./pgdog` and `./integration` - other files are LOWEST priority

## Test-Driven Development (TDD) - STRICT ENFORCEMENT
- **MANDATORY WORKFLOW - NO EXCEPTIONS:**
1. Write exactly ONE test that fails
2. Write ONLY the minimal code to make that test pass
3. Refactor if needed (tests must still pass)
4. Return to step 1 for next test
- **CRITICAL RULES:**
- NO implementation code without a failing test first
- NO untested code is allowed to exist
- Every line of production code must be justified by a test
- A local PostgreSQL server is required for some tests to pass. Assume it's running, if not, stop and ask the user to start it.
- Coe

# About the project

Expand Down
2 changes: 2 additions & 0 deletions integration/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# correctly.
#
COMMON_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
export NODE_ID=pgdog-dev-0

function wait_for_pgdog() {
echo "Waiting for PgDog"
while ! pg_isready -h 127.0.0.1 -p 6432 -U pgdog -d pgdog > /dev/null; do
Expand Down
9 changes: 9 additions & 0 deletions integration/ruby/pg_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,13 @@ def connect(dbname = 'pgdog', user = 'pgdog')
expect(res[0]['one']).to eq('2')
end
end

it 'unique_id' do
conn = connect "pgdog_sharded"
100.times do |i|
res = conn.exec "SELECT pgdog.unique_id() AS id, $1 AS counter", [i]
expect(res[0]["id"].to_i).to be > 0
expect(res[0]["counter"].to_i).to eq(i)
end
end
end
1 change: 1 addition & 0 deletions integration/rust/tests/integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ pub mod timestamp_sorting;
pub mod tls_enforced;
pub mod tls_reload;
pub mod transaction_state;
pub mod unique_id;
37 changes: 37 additions & 0 deletions integration/rust/tests/integration/unique_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use rust::setup::connections_sqlx;
use sqlx::{Executor, Row};

#[tokio::test]
async fn unique_id_returns_bigint() -> Result<(), Box<dyn std::error::Error>> {
let conns = connections_sqlx().await;
let sharded = conns.get(1).cloned().unwrap();

// Simple query
let row = sharded.fetch_one("SELECT pgdog.unique_id() AS id").await?;
let mut id: i64 = row.get("id");

assert!(
id > 0,
"unique_id should return a positive bigint, got {id}"
);

for _ in 0..100 {
// Prepared statement
let row = sqlx::query("SELECT pgdog.unique_id() AS id")
.fetch_one(&sharded)
.await?;
let prepared_id: i64 = row.get("id");
assert!(
prepared_id > 0,
"prepared unique_id should return a positive bigint, got {prepared_id}"
);

assert!(
prepared_id > id,
"prepared id should be greater than simple query id"
);
id = prepared_id;
}

Ok(())
}
20 changes: 8 additions & 12 deletions pgdog-config/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,7 @@ impl Config {
/// Organize all databases by name for quicker retrieval.
pub fn databases(&self) -> HashMap<String, Vec<Vec<EnumeratedDatabase>>> {
let mut databases = HashMap::new();
let mut number = 0;
for database in &self.databases {
for (number, database) in self.databases.iter().enumerate() {
let entry = databases
.entry(database.name.clone())
.or_insert_with(Vec::new);
Expand All @@ -210,7 +209,6 @@ impl Config {
number,
database: database.clone(),
});
number += 1;
}
databases
}
Expand Down Expand Up @@ -323,7 +321,7 @@ impl Config {
);
}
} else {
pooler_mode.insert(database.name.clone(), database.pooler_mode.clone());
pooler_mode.insert(database.name.clone(), database.pooler_mode);
}
}

Expand Down Expand Up @@ -352,17 +350,15 @@ impl Config {
_ => (),
}

if !self.general.two_phase_commit {
if self.rewrite.enabled {
if self.rewrite.shard_key == RewriteMode::Rewrite {
warn!("rewrite.shard_key=rewrite will apply non-atomic shard-key rewrites; enabling two_phase_commit is strongly recommended"
if !self.general.two_phase_commit && self.rewrite.enabled {
if self.rewrite.shard_key == RewriteMode::Rewrite {
warn!("rewrite.shard_key=rewrite will apply non-atomic shard-key rewrites; enabling two_phase_commit is strongly recommended"
);
}
}

if self.rewrite.split_inserts == RewriteMode::Rewrite {
warn!("rewrite.split_inserts=rewrite may commit partial multi-row INSERTs; enabling two_phase_commit is strongly recommended"
if self.rewrite.split_inserts == RewriteMode::Rewrite {
warn!("rewrite.split_inserts=rewrite may commit partial multi-row INSERTs; enabling two_phase_commit is strongly recommended"
);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pgdog-config/src/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl ShardedSchema {
}

pub fn name(&self) -> &str {
self.name.as_ref().map(|name| name.as_str()).unwrap_or("*")
self.name.as_deref().unwrap_or("*")
}

pub fn shard(&self) -> Option<usize> {
Expand Down
2 changes: 1 addition & 1 deletion pgdog-config/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl ConfigAndUsers {

let mirroring = mirror_strs
.iter()
.map(|s| Mirroring::from_str(s).map_err(|e| Error::ParseError(e)))
.map(|s| Mirroring::from_str(s).map_err(Error::ParseError))
.collect::<Result<Vec<_>, _>>()?;

self.config.mirroring = mirroring;
Expand Down
1 change: 1 addition & 0 deletions pgdog/src/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod show_pools;
pub mod show_prepared_statements;
pub mod show_query_cache;
pub mod show_replication;
pub mod show_rewrite;
pub mod show_server_memory;
pub mod show_servers;
pub mod show_stats;
Expand Down
6 changes: 5 additions & 1 deletion pgdog/src/admin/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{
show_client_memory::ShowClientMemory, show_clients::ShowClients, show_config::ShowConfig,
show_instance_id::ShowInstanceId, show_lists::ShowLists, show_mirrors::ShowMirrors,
show_peers::ShowPeers, show_pools::ShowPools, show_prepared_statements::ShowPreparedStatements,
show_query_cache::ShowQueryCache, show_replication::ShowReplication,
show_query_cache::ShowQueryCache, show_replication::ShowReplication, show_rewrite::ShowRewrite,
show_server_memory::ShowServerMemory, show_servers::ShowServers, show_stats::ShowStats,
show_transactions::ShowTransactions, show_version::ShowVersion, shutdown::Shutdown, Command,
Error,
Expand All @@ -30,6 +30,7 @@ pub enum ParseResult {
ShowStats(ShowStats),
ShowTransactions(ShowTransactions),
ShowMirrors(ShowMirrors),
ShowRewrite(ShowRewrite),
ShowVersion(ShowVersion),
ShowInstanceId(ShowInstanceId),
SetupSchema(SetupSchema),
Expand Down Expand Up @@ -65,6 +66,7 @@ impl ParseResult {
ShowStats(show_stats) => show_stats.execute().await,
ShowTransactions(show_transactions) => show_transactions.execute().await,
ShowMirrors(show_mirrors) => show_mirrors.execute().await,
ShowRewrite(show_rewrite) => show_rewrite.execute().await,
ShowVersion(show_version) => show_version.execute().await,
ShowInstanceId(show_instance_id) => show_instance_id.execute().await,
SetupSchema(setup_schema) => setup_schema.execute().await,
Expand Down Expand Up @@ -100,6 +102,7 @@ impl ParseResult {
ShowStats(show_stats) => show_stats.name(),
ShowTransactions(show_transactions) => show_transactions.name(),
ShowMirrors(show_mirrors) => show_mirrors.name(),
ShowRewrite(show_rewrite) => show_rewrite.name(),
ShowVersion(show_version) => show_version.name(),
ShowInstanceId(show_instance_id) => show_instance_id.name(),
SetupSchema(setup_schema) => setup_schema.name(),
Expand Down Expand Up @@ -163,6 +166,7 @@ impl Parser {
"lists" => ParseResult::ShowLists(ShowLists::parse(&sql)?),
"prepared" => ParseResult::ShowPrepared(ShowPreparedStatements::parse(&sql)?),
"replication" => ParseResult::ShowReplication(ShowReplication::parse(&sql)?),
"rewrite" => ParseResult::ShowRewrite(ShowRewrite::parse(&sql)?),
command => {
debug!("unknown admin show command: '{}'", command);
return Err(Error::Syntax);
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/admin/show_mirrors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Command for ShowMirrors {
let counts = {
let stats = cluster.stats();
let stats = stats.lock();
stats.counts
stats.mirrors
};

// Create a data row for this cluster
Expand Down
49 changes: 49 additions & 0 deletions pgdog/src/admin/show_rewrite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//! SHOW REWRITE - per-cluster rewrite statistics

use crate::backend::databases::databases;

use super::prelude::*;

pub struct ShowRewrite;

#[async_trait]
impl Command for ShowRewrite {
fn name(&self) -> String {
"SHOW REWRITE".into()
}

fn parse(_: &str) -> Result<Self, Error> {
Ok(Self)
}

async fn execute(&self) -> Result<Vec<Message>, Error> {
let fields = vec![
Field::text("database"),
Field::text("user"),
Field::numeric("parse"),
Field::numeric("bind"),
Field::numeric("simple"),
];

let mut messages = vec![RowDescription::new(&fields).message()?];

for (user, cluster) in databases().all() {
let rewrite = {
let stats = cluster.stats();
let stats = stats.lock();
stats.rewrite
};

let mut dr = DataRow::new();
dr.add(user.database.as_str())
.add(user.user.as_str())
.add(rewrite.parse as i64)
.add(rewrite.bind as i64)
.add(rewrite.simple as i64);

messages.push(dr.message()?);
}

Ok(messages)
}
}
4 changes: 2 additions & 2 deletions pgdog/src/admin/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::admin::Command;
use crate::backend::databases::{databases, from_config, replace_databases, Databases};
use crate::backend::pool::mirror_stats::Counts;
use crate::backend::pool::cluster_stats::MirrorStats;
use crate::config::{self, ConfigAndUsers, Database, Role, User as ConfigUser};
use crate::net::messages::{DataRow, DataType, FromBytes, Protocol, RowDescription};

Expand Down Expand Up @@ -226,7 +226,7 @@ async fn show_mirrors_reports_counts() {
{
let cluster_stats = cluster.stats();
let mut stats = cluster_stats.lock();
stats.counts = Counts {
stats.mirrors = MirrorStats {
total_count: 5,
mirrored_count: 4,
dropped_count: 1,
Expand Down
4 changes: 2 additions & 2 deletions pgdog/src/backend/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@ impl Databases {
for cluster in self.all().values() {
cluster.launch();

if cluster.pooler_mode() == PoolerMode::Session && cluster.router_needed() {
if cluster.pooler_mode() == PoolerMode::Session && cluster.use_parser() {
warn!(
r#"user "{}" for database "{}" requires transaction mode to route queries"#,
r#"user "{}" for database "{}" requires transaction mode to parse and route queries"#,
cluster.user(),
cluster.name()
);
Expand Down
18 changes: 11 additions & 7 deletions pgdog/src/backend/pool/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
net::{messages::BackendKeyData, Query},
};

use super::{Address, Config, Error, Guard, MirrorStats, Request, Shard, ShardConfig};
use super::{Address, ClusterStats, Config, Error, Guard, Request, Shard, ShardConfig};
use crate::config::LoadBalancingStrategy;

#[derive(Clone, Debug, Default)]
Expand All @@ -53,7 +53,7 @@ pub struct Cluster {
multi_tenant: Option<MultiTenant>,
rw_strategy: ReadWriteStrategy,
schema_admin: bool,
stats: Arc<Mutex<MirrorStats>>,
stats: Arc<Mutex<ClusterStats>>,
cross_shard_disabled: bool,
two_phase_commit: bool,
two_phase_commit_auto: bool,
Expand Down Expand Up @@ -245,7 +245,7 @@ impl Cluster {
multi_tenant: multi_tenant.clone(),
rw_strategy,
schema_admin,
stats: Arc::new(Mutex::new(MirrorStats::default())),
stats: Arc::new(Mutex::new(ClusterStats::default())),
cross_shard_disabled,
two_phase_commit: two_pc && shards.len() > 1,
two_phase_commit_auto: two_pc_auto && shards.len() > 1,
Expand Down Expand Up @@ -409,14 +409,18 @@ impl Cluster {
self.schema_admin = owner;
}

pub fn stats(&self) -> Arc<Mutex<MirrorStats>> {
pub fn stats(&self) -> Arc<Mutex<ClusterStats>> {
self.stats.clone()
}

/// We'll need the query router to figure out
/// where a query should go.
pub fn router_needed(&self) -> bool {
/// We need to parse the query using pg_query.
pub fn use_parser(&self) -> bool {
!(self.shards().len() == 1 && (self.read_only() || self.write_only()))
|| self.query_parser_enabled
|| self.multi_tenant.is_some()
|| self.pub_sub_enabled()
|| self.prepared_statements() == &PreparedStatements::Full
|| self.dry_run
}

/// Multi-tenant config.
Expand Down
Loading
Loading