From cb7ce5a6f755ac137801fd3607800c5a75b7c2c9 Mon Sep 17 00:00:00 2001 From: Eden Tyler-Moss Date: Tue, 11 Nov 2025 01:00:07 +0000 Subject: [PATCH 1/8] Add method to query column metadata --- src/client.rs | 52 +++++++++++++++++++++++++++++---------------------- src/lib.rs | 3 ++- src/tds.rs | 2 +- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/src/client.rs b/src/client.rs index 688721d1..a9e7fed2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -28,6 +28,7 @@ use enumflags2::BitFlags; use futures_util::io::{AsyncRead, AsyncWrite}; use futures_util::stream::TryStreamExt; use std::{borrow::Cow, fmt::Debug}; +use crate::tds::codec::MetaDataColumn; /// `Client` is the main entry point to the SQL Server, providing query /// execution capabilities. @@ -300,7 +301,32 @@ impl Client { &'a mut self, table: &'a str, ) -> crate::Result> { - // Start the bulk request + let columns: Vec<_> = self.column_metadata(table).await? + .into_iter() + .filter(|column| column.base.flags.contains(ColumnFlag::Updateable)) + .collect(); + + self.connection.flush_stream().await?; + let col_data = columns.iter().map(|c| format!("{}", c)).join(", "); + let query = format!("INSERT BULK {} ({})", table, col_data); + + let req = BatchRequest::new(query, self.connection.context().transaction_descriptor()); + let id = self.connection.context_mut().next_packet_id(); + + self.connection.send(PacketHeader::batch(id), req).await?; + + let ts = TokenStream::new(&mut self.connection); + ts.flush_done().await?; + + BulkLoadRequest::new(&mut self.connection, columns) + } + + /// Retrieve the column metadata for a table, including column names, types, + /// sizes, and flags (e.g. nullability). + pub async fn column_metadata<'a, 'b>( + &'a mut self, + table: &'a str, + ) -> crate::Result>> { self.connection.flush_stream().await?; // retrieve column metadata from server @@ -321,30 +347,12 @@ impl Client { Ok(columns) }) - .await?; - - // now start bulk upload - let columns: Vec<_> = columns + .await? .ok_or_else(|| { crate::Error::Protocol("expecting column metadata from query but not found".into()) - })? - .into_iter() - .filter(|column| column.base.flags.contains(ColumnFlag::Updateable)) - .collect(); - - self.connection.flush_stream().await?; - let col_data = columns.iter().map(|c| format!("{}", c)).join(", "); - let query = format!("INSERT BULK {} ({})", table, col_data); + })?; - let req = BatchRequest::new(query, self.connection.context().transaction_descriptor()); - let id = self.connection.context_mut().next_packet_id(); - - self.connection.send(PacketHeader::batch(id), req).await?; - - let ts = TokenStream::new(&mut self.connection); - ts.flush_done().await?; - - BulkLoadRequest::new(&mut self.connection, columns) + Ok(columns) } /// Closes this database connection explicitly. diff --git a/src/lib.rs b/src/lib.rs index 882f5ad3..b522d777 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -277,8 +277,9 @@ pub use result::*; pub use row::{Column, ColumnType, Row}; pub use sql_browser::SqlBrowser; pub use tds::{ - codec::{BulkLoadRequest, ColumnData, ColumnFlag, IntoRow, TokenRow, TypeLength}, + codec::{BulkLoadRequest, ColumnData, MetaDataColumn, BaseMetaDataColumn, TypeInfo, FixedLenType, VarLenContext, VarLenType, ColumnFlag, IntoRow, TokenRow, TypeLength}, numeric, + collation::{Collation}, stream::QueryStream, time, xml, EncryptionLevel, }; diff --git a/src/tds.rs b/src/tds.rs index f4b6f925..1274ca3c 100644 --- a/src/tds.rs +++ b/src/tds.rs @@ -1,5 +1,5 @@ pub mod codec; -mod collation; +pub(crate) mod collation; mod context; pub mod numeric; pub mod stream; From 8220ed4e0558c506c4c59fc6e4201b16012c459a Mon Sep 17 00:00:00 2001 From: Eden Tyler-Moss Date: Wed, 26 Nov 2025 21:50:00 +0000 Subject: [PATCH 2/8] Implement Display for TypeInfo --- src/tds/codec/token/token_col_metadata.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/tds/codec/token/token_col_metadata.rs b/src/tds/codec/token/token_col_metadata.rs index 53ffdf1c..7bef5a00 100644 --- a/src/tds/codec/token/token_col_metadata.rs +++ b/src/tds/codec/token/token_col_metadata.rs @@ -25,9 +25,14 @@ pub struct MetaDataColumn<'a> { impl<'a> Display for MetaDataColumn<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} ", self.col_name)?; + write!(f, "{} {}", self.col_name, self.base.ty)?; - match &self.base.ty { + Ok(()) + } +} +impl Display for TypeInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { TypeInfo::FixedLen(fixed) => match fixed { FixedLenType::Int1 => write!(f, "tinyint")?, FixedLenType::Bit => write!(f, "bit")?, From 0fe21017e078400dd9a7ea805f10417f718cb5f1 Mon Sep 17 00:00:00 2001 From: Thomas Johnson Date: Fri, 27 Sep 2024 01:57:43 +0100 Subject: [PATCH 3/8] Allow Bulk Insert for a specified list of columns (#311) Adds `bulk_insert_columns(self, table, columns)` and turns `bulk_insert(self, table)` into a compatibility shim that calls `self.bulk_insert_columns(table, &["*"])`, maintaining the existing behaviour. --- src/client.rs | 84 +++++++++++++++++++++++++++++++++++-------- tests/bulk.rs | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 14 deletions(-) diff --git a/src/client.rs b/src/client.rs index a9e7fed2..259453cb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -15,20 +15,15 @@ pub use config::*; pub(crate) use connection::*; use crate::tds::stream::ReceivedToken; -use crate::{ - result::ExecuteResult, - tds::{ - codec::{self, IteratorJoin}, - stream::{QueryStream, TokenStream}, - }, - BulkLoadRequest, ColumnFlag, SqlReadBytes, ToSql, -}; +use crate::{result::ExecuteResult, tds::{ + codec::{self, IteratorJoin}, + stream::{QueryStream, TokenStream}, +}, BulkLoadRequest, ColumnFlag, MetaDataColumn, SqlReadBytes, ToSql}; use codec::{BatchRequest, ColumnData, PacketHeader, RpcParam, RpcProcId, TokenRpcRequest}; use enumflags2::BitFlags; use futures_util::io::{AsyncRead, AsyncWrite}; use futures_util::stream::TryStreamExt; use std::{borrow::Cow, fmt::Debug}; -use crate::tds::codec::MetaDataColumn; /// `Client` is the main entry point to the SQL Server, providing query /// execution capabilities. @@ -252,10 +247,13 @@ impl Client { Ok(result) } - /// Execute a `BULK INSERT` statement, efficiantly storing a large number of + /// Execute a `BULK INSERT` statement, efficiently storing a large number of /// rows to a specified table. Note: make sure the input row follows the same /// schema as the table, otherwise calling `send()` will return an error. /// + /// This is equivalent to calling `bulk_insert("table_name", &["*"])` to merge + /// all of a tables columns. + /// /// # Example /// /// ``` @@ -301,13 +299,69 @@ impl Client { &'a mut self, table: &'a str, ) -> crate::Result> { - let columns: Vec<_> = self.column_metadata(table).await? + self.bulk_insert_columns(table, &["*"]).await + } + + /// Execute a `BULK INSERT` statement, efficiently storing a large number of + /// rows to a specified table. Note: make sure the input row follows the same + /// schema as the column list, otherwise calling `send()` will return an error. + /// + /// # Example + /// + /// ``` + /// # use tiberius::{Config, IntoRow}; + /// # use tokio_util::compat::TokioAsyncWriteCompatExt; + /// # use std::env; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box> { + /// # let c_str = env::var("TIBERIUS_TEST_CONNECTION_STRING").unwrap_or( + /// # "server=tcp:localhost,1433;integratedSecurity=true;TrustServerCertificate=true".to_owned(), + /// # ); + /// # let config = Config::from_ado_string(&c_str)?; + /// # let tcp = tokio::net::TcpStream::connect(config.get_addr()).await?; + /// # tcp.set_nodelay(true)?; + /// # let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?; + /// let create_table = r#" + /// CREATE TABLE ##bulk_test ( + /// id INT IDENTITY PRIMARY KEY, + /// foo INT NOT NULL, + /// bar FLOAT NOT NULL + /// ) + /// "#; + /// + /// client.simple_query(create_table).await?; + /// + /// // Start the bulk insert with the client. + /// let mut req = client.bulk_insert_columns("##bulk_test", &["foo", "bar"]).await?; + /// + /// for (i, j) in [(0i32, 0f64), (1i32, 1f64), (2i32, 2f64)] { + /// let row = (i, j).into_row(); + /// + /// // The request will handle flushing to the wire in an optimal way, + /// // balancing between memory usage and IO performance. + /// req.send(row).await?; + /// } + /// + /// // The request must be finalized. + /// let res = req.finalize().await?; + /// assert_eq!(3, res.total()); + /// # Ok(()) + /// # } + /// ``` + pub async fn bulk_insert_columns<'a>( + &'a mut self, + table: &'a str, + columns: &'a [&'a str], + ) -> crate::Result> { + let columns: Vec> = self.column_metadata(table, columns).await? .into_iter() .filter(|column| column.base.flags.contains(ColumnFlag::Updateable)) .collect(); + // Start the bulk request self.connection.flush_stream().await?; - let col_data = columns.iter().map(|c| format!("{}", c)).join(", "); + + let col_data = columns.iter().map(|c| format!("[{}]", c)).join(", "); let query = format!("INSERT BULK {} ({})", table, col_data); let req = BatchRequest::new(query, self.connection.context().transaction_descriptor()); @@ -326,11 +380,13 @@ impl Client { pub async fn column_metadata<'a, 'b>( &'a mut self, table: &'a str, + columns: &'a [&'a str], ) -> crate::Result>> { self.connection.flush_stream().await?; // retrieve column metadata from server - let query = format!("SELECT TOP 0 * FROM {}", table); + let columns = columns.join(", "); + let query = format!("SELECT TOP 0 {columns} FROM {table}"); let req = BatchRequest::new(query, self.connection.context().transaction_descriptor()); @@ -379,7 +435,7 @@ impl Client { &'a mut self, proc_id: RpcProcId, mut rpc_params: Vec>, - params: impl Iterator>, + params: impl Iterator>, ) -> crate::Result<()> where 'a: 'b, diff --git a/tests/bulk.rs b/tests/bulk.rs index 33b90637..110f77a8 100644 --- a/tests/bulk.rs +++ b/tests/bulk.rs @@ -218,3 +218,101 @@ test_bulk_type!(datetime2_7( 100, vec![DateTime::from_timestamp(1658524194, 123456789); 100].into_iter() )); + +macro_rules! test_bulk_columns { + ($name:ident($total_generated:literal $(, $sql_type:literal)+ $(, ($cols:expr, $generator:expr ))+ $(,)?)) => { + paste::item! { + #[test_on_runtimes] + async fn [< bulk_load_optional_ $name >](mut conn: tiberius::Client) -> Result<()> + where + S: AsyncRead + AsyncWrite + Unpin + Send, + { + use tiberius::IntoRow; + + let table = format!("##{}", random_table().await); + let column_defs = &[$($sql_type,)+]; + + conn.execute( + &format!( + "CREATE TABLE {} (id INT IDENTITY PRIMARY KEY, {})", + table, + column_defs.join(", "), + ), + &[], + ) + .await?; + + let mut count = 0; + + $( + let mut req = conn.bulk_insert_columns(&table, $cols).await?; + for i in $generator { + let row = i.into_row(); + req.send(row).await?; + } + + let res = req.finalize().await?; + count += res.total(); + )+ + assert_eq!($total_generated, count); + + Ok(()) + } + + #[test_on_runtimes] + async fn [< bulk_load_required_ $name >](mut conn: tiberius::Client) -> Result<()> + where + S: AsyncRead + AsyncWrite + Unpin + Send, + { + use tiberius::IntoRow; + let table = format!("##{}", random_table().await); + let column_defs = &[$(format!("{} NOT NULL", $sql_type),)+]; + + conn.execute( + &format!( + "CREATE TABLE {} (id INT IDENTITY PRIMARY KEY, {})", + table, + column_defs.join(", "), + ), + &[], + ) + .await?; + + let mut count = 0; + + $( + let mut req = conn.bulk_insert_columns(&table, $cols).await?; + for i in $generator { + let row = i.into_row(); + req.send(row).await?; + } + + let res = req.finalize().await?; + count += res.total(); + )+ + assert_eq!($total_generated, count); + + Ok(()) + } + + } + }; +} + +test_bulk_columns!(ab_ba_default_columns( + 200, + "a INT", + "b FLOAT", + "c INT DEFAULT 0", + (&["a", "b"], vec![(1i32, 1f64); 100]), + (&["b", "a"], vec![(2f64, 2i32); 100]), +)); + +test_bulk_columns!(ab_ba_override_default_columns( + 200, + "a INT", + "b FLOAT", + "c INT DEFAULT 0", + (&["a", "b", "c"], vec![(1i32, 1f64, 10i32); 100]), + (&["b", "c", "a"], vec![(2f64, 20i32, 2i32); 100]), +)); From 366e47201e60617d53d54f921a7e77de6433aa16 Mon Sep 17 00:00:00 2001 From: Eden Tyler-Moss Date: Wed, 26 Nov 2025 23:08:21 +0000 Subject: [PATCH 4/8] Allow Bulk Insert for a specified list of columns (#311) Remove unnecessary lifetimes. --- src/client.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index 259453cb..52f840db 100644 --- a/src/client.rs +++ b/src/client.rs @@ -297,7 +297,7 @@ impl Client { /// ``` pub async fn bulk_insert<'a>( &'a mut self, - table: &'a str, + table: &str, ) -> crate::Result> { self.bulk_insert_columns(table, &["*"]).await } @@ -350,8 +350,8 @@ impl Client { /// ``` pub async fn bulk_insert_columns<'a>( &'a mut self, - table: &'a str, - columns: &'a [&'a str], + table: &str, + columns: &[&str], ) -> crate::Result> { let columns: Vec> = self.column_metadata(table, columns).await? .into_iter() @@ -379,8 +379,8 @@ impl Client { /// sizes, and flags (e.g. nullability). pub async fn column_metadata<'a, 'b>( &'a mut self, - table: &'a str, - columns: &'a [&'a str], + table: &str, + columns: &[&str], ) -> crate::Result>> { self.connection.flush_stream().await?; From 6568e34df9e03e4138ff54174f61bc12de9c4018 Mon Sep 17 00:00:00 2001 From: Eden Tyler-Moss Date: Wed, 26 Nov 2025 23:24:23 +0000 Subject: [PATCH 5/8] Add method to query column metadata Fix merge - surround only column name with []. --- src/client.rs | 2 +- src/tds/codec/token/token_col_metadata.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 52f840db..71c5dff1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -361,7 +361,7 @@ impl Client { // Start the bulk request self.connection.flush_stream().await?; - let col_data = columns.iter().map(|c| format!("[{}]", c)).join(", "); + let col_data = columns.iter().map(MetaDataColumn::to_string).join(", "); let query = format!("INSERT BULK {} ({})", table, col_data); let req = BatchRequest::new(query, self.connection.context().transaction_descriptor()); diff --git a/src/tds/codec/token/token_col_metadata.rs b/src/tds/codec/token/token_col_metadata.rs index 7bef5a00..2b67e7c6 100644 --- a/src/tds/codec/token/token_col_metadata.rs +++ b/src/tds/codec/token/token_col_metadata.rs @@ -25,7 +25,7 @@ pub struct MetaDataColumn<'a> { impl<'a> Display for MetaDataColumn<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} {}", self.col_name, self.base.ty)?; + write!(f, "[{}] {}", self.col_name, self.base.ty)?; Ok(()) } From 5e6a53a897d2878e6e84f6d9592c7c8b786378ba Mon Sep 17 00:00:00 2001 From: Eden Tyler-Moss Date: Fri, 2 Jan 2026 19:22:17 +0100 Subject: [PATCH 6/8] Implement IntoSql for rust_decimal --- src/tds/numeric.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/tds/numeric.rs b/src/tds/numeric.rs index 4f856beb..5a75d376 100644 --- a/src/tds/numeric.rs +++ b/src/tds/numeric.rs @@ -263,6 +263,23 @@ mod decimal { Numeric::new_with_scale(value, self_.scale() as u8) }); ); + + #[cfg(feature = "tds73")] + into_sql!(self_, + Decimal: (ColumnData::Numeric, { + let unpacked = self_.unpack(); + + let mut value = (((unpacked.hi as u128) << 64) + + ((unpacked.mid as u128) << 32) + + unpacked.lo as u128) as i128; + + if self_.is_sign_negative() { + value = -value; + } + + Numeric::new_with_scale(value, self_.scale() as u8) + }); + ); } #[cfg(feature = "bigdecimal")] From b800ed0d46b23b9c319e0f11ffc21d79b0519778 Mon Sep 17 00:00:00 2001 From: Eden Tyler-Moss Date: Wed, 7 Jan 2026 10:07:49 +0100 Subject: [PATCH 7/8] Expose Data for TokenRow --- src/row.rs | 5 +++++ src/tds/codec/token/token_row.rs | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/row.rs b/src/row.rs index 5441be70..05d7a89b 100644 --- a/src/row.rs +++ b/src/row.rs @@ -317,6 +317,11 @@ impl Row { self.result_index } + /// The raw column data + pub fn data(&self) -> &TokenRow<'static> { + &self.data + } + /// Returns the number of columns in the row. /// /// # Example diff --git a/src/tds/codec/token/token_row.rs b/src/tds/codec/token/token_row.rs index b1ff16b6..6445469a 100644 --- a/src/tds/codec/token/token_row.rs +++ b/src/tds/codec/token/token_row.rs @@ -8,7 +8,7 @@ use futures_util::io::AsyncReadExt; pub use into_row::IntoRow; /// A row of data. -#[derive(Debug, Default, Clone)] +#[derive(Debug, Default, Clone, PartialEq)] pub struct TokenRow<'a> { data: Vec>, } From 3d653e7ddd41510bd3fca5f0cbcdb9998790f676 Mon Sep 17 00:00:00 2001 From: Eden Tyler-Moss Date: Wed, 7 Jan 2026 10:09:11 +0100 Subject: [PATCH 8/8] Fix column flags --- src/tds/codec/token/token_col_metadata.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tds/codec/token/token_col_metadata.rs b/src/tds/codec/token/token_col_metadata.rs index 2b67e7c6..2c9f5fb9 100644 --- a/src/tds/codec/token/token_col_metadata.rs +++ b/src/tds/codec/token/token_col_metadata.rs @@ -270,9 +270,9 @@ pub enum ColumnFlag { /// If column is writeable. Updateable = 1 << 3, /// Column modification status unknown. - UpdateableUnknown = 1 << 4, + UpdateableUnknown = 1 << 2, /// Column is an identity. - Identity = 1 << 5, + Identity = 1 << 4, /// Coulumn is computed. Computed = 1 << 7, /// Column is a fixed-length common language runtime user-defined type (CLR