diff --git a/Cargo.lock b/Cargo.lock index aced4a3f..e35a2986 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,9 +81,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb372a7cbcac02a35d3fb7b3fc1f969ec078e871f9bb899bf00a2e1809bec8a3" +checksum = "e4754a624e5ae42081f464514be454b39711daae0458906dacde5f4c632f33a8" dependencies = [ "arrow-arith", "arrow-array", @@ -102,9 +102,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f377dcd19e440174596d83deb49cd724886d91060c07fec4f67014ef9d54049" +checksum = "f7b3141e0ec5145a22d8694ea8b6d6f69305971c4fa1c1a13ef0195aef2d678b" dependencies = [ "arrow-array", "arrow-buffer", @@ -116,9 +116,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eaff85a44e9fa914660fb0d0bb00b79c4a3d888b5334adb3ea4330c84f002" +checksum = "4c8955af33b25f3b175ee10af580577280b4bd01f7e823d94c7cdef7cf8c9aef" dependencies = [ "ahash", "arrow-buffer", @@ -135,9 +135,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2819d893750cb3380ab31ebdc8c68874dd4429f90fd09180f3c93538bd21626" +checksum = "c697ddca96183182f35b3a18e50b9110b11e916d7b7799cbfd4d34662f2c56c2" dependencies = [ "bytes", "half", @@ -147,9 +147,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d131abb183f80c450d4591dc784f8d7750c50c6e2bc3fcaad148afc8361271" +checksum = "646bbb821e86fd57189c10b4fcdaa941deaf4181924917b0daa92735baa6ada5" dependencies = [ "arrow-array", "arrow-buffer", @@ -158,7 +158,7 @@ dependencies = [ "arrow-schema", "arrow-select", "atoi", - "base64 0.22.1", + "base64", "chrono", "comfy-table", "half", @@ -169,9 +169,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2275877a0e5e7e7c76954669366c2aa1a829e340ab1f612e647507860906fb6b" +checksum = "8da746f4180004e3ce7b83c977daf6394d768332349d3d913998b10a120b790a" dependencies = [ "arrow-array", "arrow-cast", @@ -184,9 +184,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05738f3d42cb922b9096f7786f606fcb8669260c2640df8490533bb2fa38c9d3" +checksum = "1fdd994a9d28e6365aa78e15da3f3950c0fdcea6b963a12fa1c391afb637b304" dependencies = [ "arrow-buffer", "arrow-schema", @@ -197,16 +197,16 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b5f57c3d39d1b1b7c1376a772ea86a131e7da310aed54ebea9363124bb885e3" +checksum = "58c5b083668e6230eae3eab2fc4b5fb989974c845d0aa538dde61a4327c78675" dependencies = [ "arrow-array", "arrow-buffer", "arrow-cast", "arrow-ipc", "arrow-schema", - "base64 0.22.1", + "base64", "bytes", "futures", "prost", @@ -217,9 +217,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d09446e8076c4b3f235603d9ea7c5494e73d441b01cd61fb33d7254c11964b3" +checksum = "abf7df950701ab528bf7c0cf7eeadc0445d03ef5d6ffc151eaae6b38a58feff1" dependencies = [ "arrow-array", "arrow-buffer", @@ -233,9 +233,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "371ffd66fa77f71d7628c63f209c9ca5341081051aa32f9c8020feb0def787c0" +checksum = "0ff8357658bedc49792b13e2e862b80df908171275f8e6e075c460da5ee4bf86" dependencies = [ "arrow-array", "arrow-buffer", @@ -257,9 +257,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc94fc7adec5d1ba9e8cd1b1e8d6f72423b33fe978bf1f46d970fafab787521" +checksum = "f7d8f1870e03d4cbed632959498bcc84083b5a24bded52905ae1695bd29da45b" dependencies = [ "arrow-array", "arrow-buffer", @@ -270,9 +270,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "169676f317157dc079cc5def6354d16db63d8861d61046d2f3883268ced6f99f" +checksum = "18228633bad92bff92a95746bbeb16e5fc318e8382b75619dec26db79e4de4c0" dependencies = [ "arrow-array", "arrow-buffer", @@ -283,9 +283,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d27609cd7dd45f006abae27995c2729ef6f4b9361cde1ddd019dc31a5aa017e0" +checksum = "8c872d36b7bf2a6a6a2b40de9156265f0242910791db366a2c17476ba8330d68" dependencies = [ "bitflags", "serde", @@ -295,9 +295,9 @@ dependencies = [ [[package]] name = "arrow-select" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae980d021879ea119dd6e2a13912d81e64abed372d53163e804dfe84639d8010" +checksum = "68bf3e3efbd1278f770d67e5dc410257300b161b93baedb3aae836144edcaf4b" dependencies = [ "ahash", "arrow-array", @@ -309,9 +309,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf35e8ef49dcf0c5f6d175edee6b8af7b45611805333129c541a8b89a0fc0534" +checksum = "85e968097061b3c0e9fe3079cf2e703e487890700546b5b0647f60fca1b5a8d8" dependencies = [ "arrow-array", "arrow-buffer", @@ -613,23 +613,17 @@ dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "h2 0.3.26", - "h2 0.4.12", - "http 0.2.12", + "h2", "http 1.2.0", - "http-body 0.4.6", - "hyper 0.14.32", - "hyper 1.6.0", - "hyper-rustls 0.24.2", - "hyper-rustls 0.27.5", + "hyper", + "hyper-rustls", "hyper-util", "pin-project-lite", - "rustls 0.21.12", - "rustls 0.23.34", - "rustls-native-certs 0.8.1", + "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls", "tower", "tracing", ] @@ -805,12 +799,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "base64" -version = "0.21.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" - [[package]] name = "base64" version = "0.22.1" @@ -953,9 +941,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "bytes-utils" @@ -1028,11 +1016,22 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.0", +] + [[package]] name = "chrono" -version = "0.4.42" +version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", "js-sys", @@ -1140,16 +1139,6 @@ dependencies = [ "unicode-segmentation", ] -[[package]] -name = "core-foundation" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation" version = "0.10.0" @@ -1175,6 +1164,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.4.0" @@ -1702,7 +1700,7 @@ checksum = "794a9db7f7b96b3346fc007ff25e994f09b8f0511b4cf7dff651fadfe3ebb28f" dependencies = [ "arrow", "arrow-buffer", - "base64 0.22.1", + "base64", "blake2", "blake3", "chrono", @@ -1875,7 +1873,7 @@ dependencies = [ "itertools 0.14.0", "parking_lot", "paste", - "petgraph 0.8.3", + "petgraph", ] [[package]] @@ -2077,9 +2075,9 @@ dependencies = [ [[package]] name = "delta_kernel_derive" -version = "0.19.0" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e6474dabfc8e0b849ee2d68f8f13025230d1945b28c69695e9a21b9219ac8e" +checksum = "86815a2c475835751ffa9b8d9ac8ed86cf86294304c42bedd1103d54f25ecbfe" dependencies = [ "proc-macro2", "quote", @@ -2100,9 +2098,9 @@ dependencies = [ [[package]] name = "deltalake-aws" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63b470ec0212b5a704424db8a7f44ae90d8c2b4fc96246860dea2b90f80fe1ee" +checksum = "b60353287c8dc49bc21caa77c62e6eca4141bdcaf967365553dc62b518c7d2f1" dependencies = [ "async-trait", "aws-config", @@ -2127,9 +2125,9 @@ dependencies = [ [[package]] name = "deltalake-core" -version = "0.30.0" +version = "0.30.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fd42250f1dc45510e9745f5f747201ed9de72c13911ca5c11dd2cc27fe207e3" +checksum = "5b098d0ce09726f10a08b102c885a501ee18f06ea4aca864570508a9d5b620d1" dependencies = [ "arrow", "arrow-arith", @@ -2148,6 +2146,7 @@ dependencies = [ "chrono", "dashmap", "datafusion", + "datafusion-datasource", "datafusion-proto", "delta_kernel", "deltalake-derive", @@ -2311,7 +2310,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" dependencies = [ "cfg-if", - "rustix 1.1.3", + "rustix 1.1.4", "windows-sys 0.59.0", ] @@ -2506,29 +2505,24 @@ dependencies = [ ] [[package]] -name = "glob" -version = "0.3.2" +name = "getrandom" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "rand_core 0.10.0", + "wasip2", + "wasip3", +] [[package]] -name = "h2" -version = "0.3.26" +name = "glob" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.12", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] +checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" [[package]] name = "h2" @@ -2558,7 +2552,7 @@ dependencies = [ "cfg-if", "crunchy", "num-traits", - "zerocopy 0.8.31", + "zerocopy 0.8.42", ] [[package]] @@ -2696,30 +2690,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" -[[package]] -name = "hyper" -version = "0.14.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "h2 0.3.26", - "http 0.2.12", - "http-body 0.4.6", - "httparse", - "httpdate", - "itoa", - "pin-project-lite", - "socket2 0.5.8", - "tokio", - "tower-service", - "tracing", - "want", -] - [[package]] name = "hyper" version = "1.6.0" @@ -2729,7 +2699,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.12", + "h2", "http 1.2.0", "http-body 1.0.1", "httparse", @@ -2741,22 +2711,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.24.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" -dependencies = [ - "futures-util", - "http 0.2.12", - "hyper 0.14.32", - "log", - "rustls 0.21.12", - "rustls-native-certs 0.6.3", - "tokio", - "tokio-rustls 0.24.1", -] - [[package]] name = "hyper-rustls" version = "0.27.5" @@ -2765,13 +2719,13 @@ checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2" dependencies = [ "futures-util", "http 1.2.0", - "hyper 1.6.0", + "hyper", "hyper-util", - "rustls 0.23.34", - "rustls-native-certs 0.8.1", + "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls", "tower-service", ] @@ -2781,7 +2735,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper 1.6.0", + "hyper", "hyper-util", "pin-project-lite", "tokio", @@ -2794,14 +2748,14 @@ version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8" dependencies = [ - "base64 0.22.1", + "base64", "bytes", "futures-channel", "futures-core", "futures-util", "http 1.2.0", "http-body 1.0.1", - "hyper 1.6.0", + "hyper", "ipnet", "libc", "percent-encoding", @@ -2953,6 +2907,12 @@ dependencies = [ "syn", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "ident_case" version = "1.0.1" @@ -2988,6 +2948,8 @@ checksum = "6717a8d2a5a929a1a2eb43a12812498ed141a0bcfb7e8f7844fbdbe4303bba9f" dependencies = [ "equivalent", "hashbrown 0.16.0", + "serde", + "serde_core", ] [[package]] @@ -3061,6 +3023,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "lexical-core" version = "1.0.5" @@ -3133,9 +3101,9 @@ checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" [[package]] name = "libc" -version = "0.2.177" +version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" [[package]] name = "libloading" @@ -3180,9 +3148,9 @@ checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "linux-raw-sys" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" [[package]] name = "litemap" @@ -3390,10 +3358,12 @@ dependencies = [ "modelardb_test", "modelardb_types", "object_store", + "serde_json", "sqlparser", "tempfile", "tokio", "tonic", + "tracing", "url", "uuid", ] @@ -3558,7 +3528,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c1be0c6c22ec0817cdc77d3842f721a17fd30ab6965001415b5402a74e6b740" dependencies = [ "async-trait", - "base64 0.22.1", + "base64", "bytes", "chrono", "form_urlencoded", @@ -3567,7 +3537,7 @@ dependencies = [ "http-body-util", "httparse", "humantime", - "hyper 1.6.0", + "hyper", "itertools 0.14.0", "md-5", "parking_lot", @@ -3576,7 +3546,7 @@ dependencies = [ "rand 0.9.2", "reqwest", "ring", - "rustls-pemfile 2.2.0", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", @@ -3647,9 +3617,9 @@ dependencies = [ [[package]] name = "parquet" -version = "57.1.0" +version = "57.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be3e4f6d320dd92bfa7d612e265d7d08bba0a240bab86af3425e1d255a511d89" +checksum = "6ee96b29972a257b855ff2341b37e61af5f12d6af1158b6dcdb5b31ea07bb3cb" dependencies = [ "ahash", "arrow-array", @@ -3659,7 +3629,7 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", - "base64 0.22.1", + "base64", "brotli", "bytes", "chrono", @@ -3709,16 +3679,6 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3637c05577168127568a64e9dc5a6887da720efef07b3d9472d45f63ab191166" -[[package]] -name = "petgraph" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" -dependencies = [ - "fixedbitset", - "indexmap", -] - [[package]] name = "petgraph" version = "0.8.3" @@ -3819,7 +3779,7 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" dependencies = [ - "zerocopy 0.8.31", + "zerocopy 0.8.42", ] [[package]] @@ -3834,9 +3794,9 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "3.4.0" +version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" +checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" dependencies = [ "toml_edit", ] @@ -3893,9 +3853,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.14.1" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" dependencies = [ "bytes", "prost-derive", @@ -3903,16 +3863,15 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.14.1" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" +checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck", "itertools 0.14.0", "log", "multimap", - "once_cell", - "petgraph 0.7.1", + "petgraph", "prettyplease", "prost", "prost-types", @@ -3923,9 +3882,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.14.1" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", "itertools 0.14.0", @@ -3936,9 +3895,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.14.1" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" +checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" dependencies = [ "prost", ] @@ -3979,7 +3938,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.34", + "rustls", "socket2 0.5.8", "thiserror", "tokio", @@ -3997,7 +3956,7 @@ dependencies = [ "rand 0.8.5", "ring", "rustc-hash", - "rustls 0.23.34", + "rustls", "rustls-pki-types", "slab", "thiserror", @@ -4022,13 +3981,19 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.42" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "radix_trie" version = "0.2.1" @@ -4060,6 +4025,17 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.0", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -4098,6 +4074,12 @@ dependencies = [ "getrandom 0.3.1", ] +[[package]] +name = "rand_core" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" + [[package]] name = "rand_xorshift" version = "0.4.0" @@ -4149,9 +4131,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.12.2" +version = "1.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" dependencies = [ "aho-corasick", "memchr", @@ -4161,9 +4143,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" dependencies = [ "aho-corasick", "memchr", @@ -4194,31 +4176,31 @@ version = "0.12.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" dependencies = [ - "base64 0.22.1", + "base64", "bytes", "futures-core", "futures-util", - "h2 0.4.12", + "h2", "http 1.2.0", "http-body 1.0.1", "http-body-util", - "hyper 1.6.0", - "hyper-rustls 0.27.5", + "hyper", + "hyper-rustls", "hyper-util", "js-sys", "log", "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.34", - "rustls-native-certs 0.8.1", + "rustls", + "rustls-native-certs", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-rustls 0.26.2", + "tokio-rustls", "tokio-util", "tower", "tower-http", @@ -4313,29 +4295,17 @@ dependencies = [ [[package]] name = "rustix" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ "bitflags", "errno", "libc", - "linux-raw-sys 0.11.0", + "linux-raw-sys 0.12.1", "windows-sys 0.61.2", ] -[[package]] -name = "rustls" -version = "0.21.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" -dependencies = [ - "log", - "ring", - "rustls-webpki 0.101.7", - "sct", -] - [[package]] name = "rustls" version = "0.23.34" @@ -4346,23 +4316,11 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.103.8", + "rustls-webpki", "subtle", "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" -dependencies = [ - "openssl-probe", - "rustls-pemfile 1.0.4", - "schannel", - "security-framework 2.11.1", -] - [[package]] name = "rustls-native-certs" version = "0.8.1" @@ -4372,16 +4330,7 @@ dependencies = [ "openssl-probe", "rustls-pki-types", "schannel", - "security-framework 3.2.0", -] - -[[package]] -name = "rustls-pemfile" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" -dependencies = [ - "base64 0.21.7", + "security-framework", ] [[package]] @@ -4403,16 +4352,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-webpki" -version = "0.101.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "rustls-webpki" version = "0.103.8" @@ -4495,29 +4434,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - -[[package]] -name = "security-framework" -version = "2.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" -dependencies = [ - "bitflags", - "core-foundation 0.9.4", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - [[package]] name = "security-framework" version = "3.2.0" @@ -4525,7 +4441,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ "bitflags", - "core-foundation 0.10.0", + "core-foundation", "core-foundation-sys", "libc", "security-framework-sys", @@ -4585,14 +4501,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.140" +version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ "itoa", "memchr", - "ryu", "serde", + "serde_core", + "zmij", ] [[package]] @@ -4623,7 +4540,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -4876,14 +4793,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.24.0" +version = "3.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", "getrandom 0.3.1", "once_cell", - "rustix 1.1.3", + "rustix 1.1.4", "windows-sys 0.61.2", ] @@ -5020,23 +4937,13 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-rustls" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" -dependencies = [ - "rustls 0.21.12", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls 0.23.34", + "rustls", "tokio", ] @@ -5066,14 +4973,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.9.10+spec-1.1.0" +version = "0.9.12+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0825052159284a1a8b4d6c0c86cbc801f2da5afd2b225fa548c72f2e74002f48" +checksum = "cf92845e79fc2e2def6a5d828f0801e29a2f8acc037becc5ab08595c7d5e9863" dependencies = [ "indexmap", "serde_core", "serde_spanned", - "toml_datetime", + "toml_datetime 0.7.5+spec-1.1.0", "toml_parser", "toml_writer", "winnow", @@ -5088,23 +4995,32 @@ dependencies = [ "serde_core", ] +[[package]] +name = "toml_datetime" +version = "1.0.0+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32c2555c699578a4f59f0cc68e5116c8d7cabbd45e1409b989d4be085b53f13e" +dependencies = [ + "serde_core", +] + [[package]] name = "toml_edit" -version = "0.23.10+spec-1.0.0" +version = "0.25.4+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269" +checksum = "7193cbd0ce53dc966037f54351dbbcf0d5a642c7f0038c382ef9e677ce8c13f2" dependencies = [ "indexmap", - "toml_datetime", + "toml_datetime 1.0.0+spec-1.1.0", "toml_parser", "winnow", ] [[package]] name = "toml_parser" -version = "1.0.6+spec-1.1.0" +version = "1.0.9+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" +checksum = "702d4415e08923e7e1ef96cd5727c0dfed80b4d2fa25db9647fe5eb6f7c5a4c4" dependencies = [ "winnow", ] @@ -5117,19 +5033,19 @@ checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" [[package]] name = "tonic" -version = "0.14.2" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" +checksum = "fec7c61a0695dc1887c1b53952990f3ad2e3a31453e1f49f10e75424943a93ec" dependencies = [ "async-trait", "axum", - "base64 0.22.1", + "base64", "bytes", - "h2 0.4.12", + "h2", "http 1.2.0", "http-body 1.0.1", "http-body-util", - "hyper 1.6.0", + "hyper", "hyper-timeout", "hyper-util", "percent-encoding", @@ -5146,9 +5062,9 @@ dependencies = [ [[package]] name = "tonic-prost" -version = "0.14.2" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +checksum = "a55376a0bbaa4975a3f10d009ad763d8f4108f067c7c2e74f3001fb49778d309" dependencies = [ "bytes", "prost", @@ -5324,6 +5240,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "untrusted" version = "0.9.0" @@ -5368,13 +5290,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.19.0" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" +checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" dependencies = [ - "getrandom 0.3.1", + "getrandom 0.4.2", "js-sys", - "rand 0.9.2", + "rand 0.10.0", "serde_core", "wasm-bindgen", ] @@ -5470,6 +5392,24 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasip2" +version = "1.0.2+wasi-0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -5541,6 +5481,28 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + [[package]] name = "wasm-streams" version = "0.4.2" @@ -5554,6 +5516,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.2", + "indexmap", + "semver", +] + [[package]] name = "web-sys" version = "0.3.77" @@ -5905,6 +5879,26 @@ dependencies = [ "memchr", ] +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + [[package]] name = "wit-bindgen-rt" version = "0.33.0" @@ -5914,6 +5908,74 @@ dependencies = [ "bitflags", ] +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "write16" version = "1.0.0" @@ -5982,11 +6044,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.31" +version = "0.8.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" +checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" dependencies = [ - "zerocopy-derive 0.8.31", + "zerocopy-derive 0.8.42", ] [[package]] @@ -6002,9 +6064,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.31" +version = "0.8.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" +checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" dependencies = [ "proc-macro2", "quote", @@ -6066,6 +6128,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f06ae92f42f5e5c42443fd094f245eb656abf56dd7cce9b8b263236565e00f2" +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" + [[package]] name = "zstd" version = "0.13.3" diff --git a/Cargo.toml b/Cargo.toml index 8da56f28..aaa68c81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ prost-build = "0.14.1" rand = "0.9.2" rustyline = "17.0.2" serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.149" snmalloc-rs = "0.3.8" sqlparser = "0.59.0" sysinfo = "0.37.2" diff --git a/crates/modelardb_server/src/configuration.rs b/crates/modelardb_server/src/configuration.rs index dbdcc5db..90d9c796 100644 --- a/crates/modelardb_server/src/configuration.rs +++ b/crates/modelardb_server/src/configuration.rs @@ -382,6 +382,7 @@ mod tests { use std::sync::Arc; use modelardb_storage::data_folder::DataFolder; + use modelardb_storage::write_ahead_log::WriteAheadLog; use modelardb_types::types::{Node, ServerMode}; use tempfile::TempDir; use tokio::sync::RwLock; @@ -689,6 +690,10 @@ mod tests { .await .unwrap(); + let write_ahead_log = Arc::new(RwLock::new( + WriteAheadLog::try_new(&local_data_folder).await.unwrap(), + )); + let configuration_manager = Arc::new(RwLock::new( ConfigurationManager::try_new( local_data_folder, @@ -699,7 +704,7 @@ mod tests { )); let storage_engine = Arc::new(RwLock::new( - StorageEngine::try_new(data_folders, &configuration_manager) + StorageEngine::try_new(data_folders, write_ahead_log, &configuration_manager) .await .unwrap(), )); diff --git a/crates/modelardb_server/src/context.rs b/crates/modelardb_server/src/context.rs index c88b7be6..e1f5ba5d 100644 --- a/crates/modelardb_server/src/context.rs +++ b/crates/modelardb_server/src/context.rs @@ -20,9 +20,10 @@ use std::sync::Arc; use datafusion::arrow::datatypes::Schema; use datafusion::catalog::{SchemaProvider, TableProvider}; +use modelardb_storage::write_ahead_log::WriteAheadLog; use modelardb_types::types::TimeSeriesTableMetadata; use tokio::sync::RwLock; -use tracing::info; +use tracing::{info, warn}; use crate::configuration::ConfigurationManager; use crate::error::{ModelarDbServerError, Result}; @@ -38,6 +39,8 @@ pub struct Context { pub configuration_manager: Arc>, /// Manages all uncompressed and compressed data in the system. pub storage_engine: Arc>, + /// Write-ahead log for persisting data and operations. + write_ahead_log: Arc>, } impl Context { @@ -50,14 +53,24 @@ impl Context { .await?, )); + let write_ahead_log = Arc::new(RwLock::new( + WriteAheadLog::try_new(&data_folders.local_data_folder).await?, + )); + let storage_engine = Arc::new(RwLock::new( - StorageEngine::try_new(data_folders.clone(), &configuration_manager).await?, + StorageEngine::try_new( + data_folders.clone(), + write_ahead_log.clone(), + &configuration_manager, + ) + .await?, )); Ok(Context { data_folders, configuration_manager, storage_engine, + write_ahead_log, }) } @@ -103,6 +116,12 @@ impl Context { self.register_and_save_time_series_table(time_series_table_metadata) .await?; + // Create a file in the write-ahead log to log uncompressed data for the table. + let mut write_ahead_log = self.write_ahead_log.write().await; + write_ahead_log + .create_table_log(time_series_table_metadata) + .await?; + Ok(()) } @@ -237,6 +256,40 @@ impl Context { Ok(()) } + /// For each time series table in the local data folder, use the write-ahead log to replay any + /// data that was written to the storage engine but not compressed and saved to disk. Note that + /// this method should only be called before the storage engine starts ingesting data to avoid + /// replaying data that is currently in memory. + pub(super) async fn replay_write_ahead_log(&self) -> Result<()> { + let local_data_folder = &self.data_folders.local_data_folder; + + let write_ahead_log = self.write_ahead_log.write().await; + let mut storage_engine = self.storage_engine.write().await; + + for metadata in local_data_folder.time_series_table_metadata().await? { + let unpersisted_batches = + write_ahead_log.unpersisted_batches_in_table_log(&metadata.name)?; + + if !unpersisted_batches.is_empty() { + warn!( + table = %metadata.name, + batch_count = unpersisted_batches.len(), + "Replaying unpersisted batches for time series table." + ); + } + + for (batch_id, batch) in unpersisted_batches { + storage_engine.insert_data_points_with_batch_id( + metadata.clone(), + batch, + batch_id, + )?; + } + } + + Ok(()) + } + /// Drop the table with `table_name` if it exists. The table is deregistered from the Apache /// Arrow Datafusion session context and deleted from the storage engine and Delta Lake. If the /// table does not exist or if it could not be dropped, [`ModelarDbServerError`] is returned. @@ -254,11 +307,16 @@ impl Context { self.drop_table_from_storage_engine(table_name).await?; + let local_data_folder = &self.data_folders.local_data_folder; + + // If the table is a time series table, delete the table log file from the write-ahead log. + if local_data_folder.is_time_series_table(table_name).await? { + let mut write_ahead_log = self.write_ahead_log.write().await; + write_ahead_log.remove_table_log(table_name)?; + } + // Drop the table from the Delta Lake. - self.data_folders - .local_data_folder - .drop_table(table_name) - .await?; + local_data_folder.drop_table(table_name).await?; Ok(()) } diff --git a/crates/modelardb_server/src/main.rs b/crates/modelardb_server/src/main.rs index be10e170..ce5cfbd0 100644 --- a/crates/modelardb_server/src/main.rs +++ b/crates/modelardb_server/src/main.rs @@ -86,13 +86,8 @@ async fn main() -> Result<()> { // Setup CTRL+C handler. setup_ctrl_c_handler(&context); - // Initialize storage engine with spilled buffers. - context - .storage_engine - .read() - .await - .initialize(&context) - .await?; + // Replay any data that was written to the storage engine but not compressed and saved to disk. + context.replay_write_ahead_log().await?; // Start the Apache Arrow Flight interface. remote::start_apache_arrow_flight_server(context, *PORT).await?; diff --git a/crates/modelardb_server/src/storage/compressed_data_buffer.rs b/crates/modelardb_server/src/storage/compressed_data_buffer.rs index 187609a5..3086e757 100644 --- a/crates/modelardb_server/src/storage/compressed_data_buffer.rs +++ b/crates/modelardb_server/src/storage/compressed_data_buffer.rs @@ -15,6 +15,7 @@ //! Buffer for compressed segments from the same time series table. +use std::collections::HashSet; use std::sync::Arc; use datafusion::arrow::record_batch::RecordBatch; @@ -30,16 +31,21 @@ pub(super) struct CompressedSegmentBatch { pub(super) time_series_table_metadata: Arc, /// Compressed segments representing the data points to insert. pub(super) compressed_segments: Vec, + /// The ids of the uncompressed batches that correspond to the compressed segments. The ids are + /// assigned by the WAL and are used to delete uncompressed data when compressed data is saved. + pub(super) batch_ids: HashSet, } impl CompressedSegmentBatch { pub(super) fn new( time_series_table_metadata: Arc, compressed_segments: Vec, + batch_ids: HashSet, ) -> Self { Self { time_series_table_metadata, compressed_segments, + batch_ids, } } @@ -59,6 +65,9 @@ pub(super) struct CompressedDataBuffer { compressed_segments: Vec, /// Continuously updated total sum of the size of the compressed segments. pub(super) size_in_bytes: u64, + /// The ids of the uncompressed batches that correspond to the compressed segments. The ids are + /// assigned by the WAL and are used to delete uncompressed data when compressed data is saved. + batch_ids: HashSet, } impl CompressedDataBuffer { @@ -67,16 +76,19 @@ impl CompressedDataBuffer { time_series_table_metadata, compressed_segments: vec![], size_in_bytes: 0, + batch_ids: HashSet::new(), } } - /// Append `compressed_segments` to the [`CompressedDataBuffer`] and return the size of - /// `compressed_segments` in bytes if their schema matches the time series table, otherwise - /// [`ModelarDbServerError`] is returned. - pub(super) fn append_compressed_segments( + /// Append the compressed segments in `compressed_segment_batch` to the [`CompressedDataBuffer`] + /// and return the size of the compressed segments in bytes if their schema matches the time + /// series table, otherwise [`ModelarDbServerError`] is returned. + pub(super) fn append_compressed_segment_batch( &mut self, - mut compressed_segments: Vec, + compressed_segment_batch: CompressedSegmentBatch, ) -> Result { + let mut compressed_segments = compressed_segment_batch.compressed_segments; + if compressed_segments.iter().any(|compressed_segments| { compressed_segments.schema() != self.time_series_table_metadata.compressed_schema }) { @@ -94,6 +106,8 @@ impl CompressedDataBuffer { self.size_in_bytes += compressed_segments_size; } + self.batch_ids.extend(compressed_segment_batch.batch_ids); + Ok(compressed_segments_size) } @@ -102,6 +116,11 @@ impl CompressedDataBuffer { self.compressed_segments } + /// Return the ids given to the uncompressed batches by the WAL. + pub(super) fn batch_ids(&self) -> HashSet { + self.batch_ids.clone() + } + /// Return the size in bytes of `compressed_segments`. fn size_of_compressed_segments(compressed_segments: &RecordBatch) -> u64 { let mut total_size: u64 = 0; @@ -128,15 +147,12 @@ mod tests { use modelardb_test::table; #[test] - fn test_can_append_valid_compressed_segments() { + fn test_can_append_valid_compressed_segment_batch() { let mut compressed_data_buffer = CompressedDataBuffer::new(table::time_series_table_metadata_arc()); compressed_data_buffer - .append_compressed_segments(vec![ - table::compressed_segments_record_batch(), - table::compressed_segments_record_batch(), - ]) + .append_compressed_segment_batch(compressed_segment_batch()) .unwrap(); assert_eq!(compressed_data_buffer.compressed_segments.len(), 2); @@ -150,10 +166,7 @@ mod tests { CompressedDataBuffer::new(table::time_series_table_metadata_arc()); compressed_data_buffer - .append_compressed_segments(vec![ - table::compressed_segments_record_batch(), - table::compressed_segments_record_batch(), - ]) + .append_compressed_segment_batch(compressed_segment_batch()) .unwrap(); assert!(compressed_data_buffer.size_in_bytes > 0); @@ -164,12 +177,8 @@ mod tests { let mut compressed_data_buffer = CompressedDataBuffer::new(table::time_series_table_metadata_arc()); - let compressed_segments = vec![ - table::compressed_segments_record_batch(), - table::compressed_segments_record_batch(), - ]; compressed_data_buffer - .append_compressed_segments(compressed_segments) + .append_compressed_segment_batch(compressed_segment_batch()) .unwrap(); let record_batches = compressed_data_buffer.record_batches(); @@ -179,6 +188,17 @@ mod tests { assert_eq!(record_batch.num_rows(), 6); } + fn compressed_segment_batch() -> CompressedSegmentBatch { + CompressedSegmentBatch::new( + table::time_series_table_metadata_arc(), + vec![ + table::compressed_segments_record_batch(), + table::compressed_segments_record_batch(), + ], + HashSet::from([0, 1, 2]), + ) + } + #[test] fn test_get_size_of_compressed_data_buffer() { let compressed_data_buffer = table::compressed_segments_record_batch(); diff --git a/crates/modelardb_server/src/storage/compressed_data_manager.rs b/crates/modelardb_server/src/storage/compressed_data_manager.rs index 52a42869..a0ccd9a7 100644 --- a/crates/modelardb_server/src/storage/compressed_data_manager.rs +++ b/crates/modelardb_server/src/storage/compressed_data_manager.rs @@ -22,6 +22,7 @@ use crossbeam_queue::SegQueue; use dashmap::DashMap; use datafusion::arrow::record_batch::RecordBatch; use modelardb_storage::data_folder::DataFolder; +use modelardb_storage::write_ahead_log::WriteAheadLog; use tokio::runtime::Handle; use tokio::sync::RwLock; use tracing::{debug, error, info}; @@ -50,6 +51,8 @@ pub(super) struct CompressedDataManager { channels: Arc, /// Track how much memory is left for storing uncompressed and compressed data. memory_pool: Arc, + /// Write-ahead log for persisting data and operations. + write_ahead_log: Arc>, } impl CompressedDataManager { @@ -58,6 +61,7 @@ impl CompressedDataManager { local_data_folder: DataFolder, channels: Arc, memory_pool: Arc, + write_ahead_log: Arc>, ) -> Self { Self { data_transfer, @@ -66,6 +70,7 @@ impl CompressedDataManager { compressed_queue: SegQueue::new(), channels, memory_pool, + write_ahead_log, } } @@ -147,8 +152,7 @@ impl CompressedDataManager { { debug!("Found existing compressed data buffer for table '{time_series_table_name}'.",); - compressed_data_buffer - .append_compressed_segments(compressed_segment_batch.compressed_segments) + compressed_data_buffer.append_compressed_segment_batch(compressed_segment_batch) } else { // A String is created as two copies are required for compressed_data_buffer and // compressed_queue anyway and compressed_segments cannot be moved out of @@ -158,10 +162,11 @@ impl CompressedDataManager { "Creating compressed data buffer for table '{time_series_table_name}' as none exist.", ); - let mut compressed_data_buffer = - CompressedDataBuffer::new(compressed_segment_batch.time_series_table_metadata); - let segment_size = compressed_data_buffer - .append_compressed_segments(compressed_segment_batch.compressed_segments); + let mut compressed_data_buffer = CompressedDataBuffer::new( + compressed_segment_batch.time_series_table_metadata.clone(), + ); + let segment_size = + compressed_data_buffer.append_compressed_segment_batch(compressed_segment_batch); self.compressed_data_buffers .insert(time_series_table_name.clone(), compressed_data_buffer); @@ -244,11 +249,23 @@ impl CompressedDataManager { // actual size is not computed as DeltaTable seems to have no support for listing the files // added in a version without iterating through all of the Add actions from file_actions(). let compressed_data_buffer_size_in_bytes = compressed_data_buffer.size_in_bytes; + let batch_ids = compressed_data_buffer.batch_ids(); let compressed_segments = compressed_data_buffer.record_batches(); + + // If a crash occurs between writing to the Delta Lake and updating the WAL, no data is + // lost or duplicated. The batch_ids are stored in the Delta Lake commit metadata, so on + // restart the WAL recovers which batches were persisted from the commit history and + // excludes them during replay. The WAL update is only an optimization to eagerly delete + // fully persisted WAL segment files. self.local_data_folder - .write_record_batches(table_name, compressed_segments) + .write_record_batches_with_batch_ids(table_name, compressed_segments, batch_ids.clone()) .await?; + // Inform the write-ahead log that data has been written to disk. We use a read lock since + // the specific WAL file is locked internally before being updated. + let write_ahead_log = self.write_ahead_log.read().await; + write_ahead_log.mark_batches_as_persisted_in_table_log(table_name, batch_ids)?; + // Inform the data transfer component about the new data if a remote data folder was // provided. If the total size of the data related to table_name has reached the transfer // threshold, all of the data is transferred to the remote object store. @@ -290,6 +307,8 @@ impl CompressedDataManager { mod tests { use super::*; + use std::collections::HashSet; + use datafusion::arrow::array::{Array, Int8Array}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use modelardb_test::table::{self, NORMAL_TABLE_NAME, TIME_SERIES_TABLE_NAME}; @@ -552,6 +571,10 @@ mod tests { .await .unwrap(); + let write_ahead_log = Arc::new(RwLock::new( + WriteAheadLog::try_new(&local_data_folder).await.unwrap(), + )); + ( temp_dir, CompressedDataManager::new( @@ -559,6 +582,7 @@ mod tests { local_data_folder, channels, memory_pool, + write_ahead_log, ), ) } @@ -583,6 +607,7 @@ mod tests { offset, ), ], + HashSet::from([0, 1]), ) } } diff --git a/crates/modelardb_server/src/storage/mod.rs b/crates/modelardb_server/src/storage/mod.rs index f7488f10..a1aed2e8 100644 --- a/crates/modelardb_server/src/storage/mod.rs +++ b/crates/modelardb_server/src/storage/mod.rs @@ -34,13 +34,13 @@ use std::sync::{Arc, LazyLock}; use std::thread::{self, JoinHandle}; use datafusion::arrow::record_batch::RecordBatch; +use modelardb_storage::write_ahead_log::WriteAheadLog; use modelardb_types::types::TimeSeriesTableMetadata; use tokio::runtime::Handle; use tokio::sync::RwLock; use tracing::error; use crate::configuration::ConfigurationManager; -use crate::context::Context; use crate::data_folders::DataFolders; use crate::error::{ModelarDbServerError, Result}; use crate::storage::compressed_data_manager::CompressedDataManager; @@ -82,6 +82,8 @@ pub struct StorageEngine { join_handles: Vec>, /// Unbounded channels used by the threads to communicate. channels: Arc, + /// Write-ahead log for persisting data and operations. + write_ahead_log: Arc>, } impl StorageEngine { @@ -91,6 +93,7 @@ impl StorageEngine { /// created. pub(super) async fn try_new( data_folders: DataFolders, + write_ahead_log: Arc>, configuration_manager: &Arc>, ) -> Result { // Create shared memory pool. @@ -109,11 +112,14 @@ impl StorageEngine { let channels = Arc::new(Channels::new()); // Create the uncompressed data manager. - let uncompressed_data_manager = Arc::new(UncompressedDataManager::new( - data_folders.local_data_folder.clone(), - memory_pool.clone(), - channels.clone(), - )); + let uncompressed_data_manager = Arc::new( + UncompressedDataManager::try_new( + data_folders.local_data_folder.clone(), + memory_pool.clone(), + channels.clone(), + ) + .await?, + ); { let runtime_handle = runtime_handle.clone(); @@ -172,6 +178,7 @@ impl StorageEngine { data_folders.local_data_folder, channels.clone(), memory_pool.clone(), + write_ahead_log.clone(), )); { @@ -198,6 +205,7 @@ impl StorageEngine { memory_pool, join_handles, channels, + write_ahead_log, }; // Start the task that transfers data periodically if a remote data folder is given and @@ -233,13 +241,6 @@ impl StorageEngine { Ok(()) } - /// Add references to the - /// [`UncompressedDataBuffers`](uncompressed_data_buffer::UncompressedDataBuffer) currently on - /// disk to [`UncompressedDataManager`] which immediately will start compressing them. - pub(super) async fn initialize(&self, context: &Context) -> Result<()> { - self.uncompressed_data_manager.initialize(context).await - } - /// Pass `record_batch` to [`CompressedDataManager`]. Return [`Ok`] if `record_batch` was /// successfully written to an Apache Parquet file, otherwise return [`ModelarDbServerError`]. pub(super) async fn insert_record_batch( @@ -259,7 +260,30 @@ impl StorageEngine { time_series_table_metadata: Arc, multivariate_data_points: RecordBatch, ) -> Result<()> { - // TODO: write to a WAL and use it to ensure termination never duplicates or loses data. + // Write to the write-ahead log to ensure termination never loses data. We use a read lock + // since the specific log file is locked internally before writing. + let batch_id = { + let write_ahead_log = self.write_ahead_log.read().await; + write_ahead_log + .append_to_table_log(&time_series_table_metadata.name, &multivariate_data_points)? + }; + + self.insert_data_points_with_batch_id( + time_series_table_metadata, + multivariate_data_points, + batch_id, + ) + } + + /// Pass `data_points` to [`UncompressedDataManager`] with a batch id given to the data by the + /// WAL. Return [`Ok`] if all of the data points were successfully inserted, otherwise return + /// [`ModelarDbServerError`]. + pub(super) fn insert_data_points_with_batch_id( + &mut self, + time_series_table_metadata: Arc, + multivariate_data_points: RecordBatch, + batch_id: u64, + ) -> Result<()> { self.memory_pool .wait_for_ingested_memory(multivariate_data_points.get_array_memory_size() as u64); @@ -268,6 +292,7 @@ impl StorageEngine { .send(Message::Data(IngestedDataBuffer::new( time_series_table_metadata, multivariate_data_points, + batch_id, ))) .map_err(|error| error.into()) } diff --git a/crates/modelardb_server/src/storage/uncompressed_data_buffer.rs b/crates/modelardb_server/src/storage/uncompressed_data_buffer.rs index 92e7a547..ad266f48 100644 --- a/crates/modelardb_server/src/storage/uncompressed_data_buffer.rs +++ b/crates/modelardb_server/src/storage/uncompressed_data_buffer.rs @@ -18,6 +18,7 @@ //! supports inserting and storing data in-memory, while [`UncompressedOnDiskDataBuffer`] provides //! support for storing uncompressed data points in Apache Parquet files on disk. +use std::collections::HashSet; use std::fmt::{Debug, Formatter, Result as FmtResult}; use std::sync::Arc; use std::{iter, mem}; @@ -45,16 +46,20 @@ pub(super) struct IngestedDataBuffer { pub(super) time_series_table_metadata: Arc, /// Uncompressed data points to insert. pub(super) data_points: RecordBatch, + /// The id given to the data by the WAL. + pub(super) batch_id: u64, } impl IngestedDataBuffer { pub(super) fn new( time_series_table_metadata: Arc, data_points: RecordBatch, + batch_id: u64, ) -> Self { Self { time_series_table_metadata, data_points, + batch_id, } } } @@ -83,6 +88,8 @@ pub(super) struct UncompressedInMemoryDataBuffer { values: Vec, /// The tag values for the time series the buffer stores data points for. tag_values: Vec, + /// The ids given to the batches by the WAL. + batch_ids: HashSet, } impl UncompressedInMemoryDataBuffer { @@ -91,6 +98,7 @@ impl UncompressedInMemoryDataBuffer { tag_values: Vec, time_series_table_metadata: Arc, current_batch_index: u64, + batch_ids: HashSet, ) -> Self { let timestamps = TimestampBuilder::with_capacity(*UNCOMPRESSED_DATA_BUFFER_CAPACITY); let values = (0..time_series_table_metadata.field_column_indices.len()) @@ -104,6 +112,7 @@ impl UncompressedInMemoryDataBuffer { timestamps, values, tag_values, + batch_ids, } } @@ -148,6 +157,11 @@ impl UncompressedInMemoryDataBuffer { debug!("Inserted data point into {:?}.", self) } + /// Add `batch_id` to the batch ids given to the data by the WAL. + pub(super) fn insert_batch_id(&mut self, batch_id: u64) { + self.batch_ids.insert(batch_id); + } + /// Return how many data points the [`UncompressedInMemoryDataBuffer`] can contain. pub(super) fn capacity(&self) -> usize { // The capacity is always the same for both builders. @@ -201,6 +215,11 @@ impl UncompressedInMemoryDataBuffer { compute_memory_size(self.values.len()) } + /// Return the ids given to the batches by the WAL. + pub(super) fn batch_ids(&self) -> &HashSet { + &self.batch_ids + } + /// Spill the in-memory [`UncompressedInMemoryDataBuffer`] to an Apache Parquet file and return /// an [`UncompressedOnDiskDataBuffer`] when finished. pub(super) async fn spill_to_apache_parquet( @@ -215,6 +234,7 @@ impl UncompressedInMemoryDataBuffer { self.updated_by_batch_index, local_data_folder, data_points, + self.batch_ids.clone(), ) .await } @@ -254,6 +274,8 @@ pub(super) struct UncompressedOnDiskDataBuffer { /// Path to the Apache Parquet file containing the uncompressed data in the /// [`UncompressedOnDiskDataBuffer`]. file_path: Path, + /// The ids given to the batches by the WAL. + batch_ids: HashSet, } impl UncompressedOnDiskDataBuffer { @@ -267,6 +289,7 @@ impl UncompressedOnDiskDataBuffer { updated_by_batch_index: u64, local_data_folder: Arc, data_points: RecordBatch, + batch_ids: HashSet, ) -> Result { // Create a path that uses the first timestamp as the filename. let timestamp_index = time_series_table_metadata.timestamp_column_index; @@ -291,28 +314,7 @@ impl UncompressedOnDiskDataBuffer { updated_by_batch_index, local_data_folder, file_path, - }) - } - - /// Return an [`UncompressedOnDiskDataBuffer`] with the data points for `tag_hash` in - /// `file_path` if a file at `file_path` exists, otherwise - /// [`ModelarDbServerError`](crate::error::ModelarDbServerError) is returned. - pub(super) fn try_new( - tag_hash: u64, - time_series_table_metadata: Arc, - updated_by_batch_index: u64, - local_data_folder: Arc, - file_name: &str, - ) -> Result { - let file_path = - spilled_buffer_file_path(&time_series_table_metadata.name, tag_hash, file_name); - - Ok(Self { - tag_hash, - time_series_table_metadata, - updated_by_batch_index, - local_data_folder, - file_path, + batch_ids, }) } @@ -337,6 +339,11 @@ impl UncompressedOnDiskDataBuffer { &self.time_series_table_metadata } + /// Return the ids given to the batches by the WAL. + pub(super) fn batch_ids(&self) -> &HashSet { + &self.batch_ids + } + /// Return [`true`] if all the data points in the [`UncompressedOnDiskDataBuffer`] are from /// [`RecordBatches`](`RecordBatch`) that are [`RECORD_BATCH_OFFSET_REQUIRED_FOR_UNUSED`] older /// than the [`RecordBatch`] with index `current_batch_index` ingested by the current process. @@ -368,6 +375,7 @@ impl UncompressedOnDiskDataBuffer { tag_values, self.time_series_table_metadata.clone(), current_batch_index, + self.batch_ids.clone(), ); for index in 0..data_points.num_rows() { @@ -416,6 +424,7 @@ mod tests { use tokio::runtime::Runtime; const CURRENT_BATCH_INDEX: u64 = 1; + const BATCH_ID: u64 = 0; const TAG_VALUE: &str = "tag"; const TAG_HASH: u64 = 15537859409877038916; @@ -427,6 +436,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX, + HashSet::new(), ); assert_eq!( @@ -456,6 +466,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX, + HashSet::new(), ); assert_eq!(uncompressed_buffer.len(), 0); @@ -468,6 +479,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX, + HashSet::new(), ); insert_data_points(1, &mut uncompressed_buffer); @@ -481,6 +493,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX - 1, + HashSet::new(), ); assert!(!uncompressed_buffer.is_unused(CURRENT_BATCH_INDEX - 1)); @@ -500,6 +513,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX, + HashSet::new(), ); insert_data_points(uncompressed_buffer.capacity(), &mut uncompressed_buffer); @@ -513,6 +527,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX, + HashSet::new(), ); assert!(!uncompressed_buffer.is_full()); @@ -527,6 +542,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX, + HashSet::new(), ); insert_data_points(uncompressed_buffer.capacity() + 1, &mut uncompressed_buffer); @@ -540,6 +556,7 @@ mod tests { vec![TAG_VALUE.to_owned()], time_series_table_metadata.clone(), CURRENT_BATCH_INDEX, + HashSet::new(), ); insert_data_points(uncompressed_buffer.capacity(), &mut uncompressed_buffer); @@ -562,6 +579,7 @@ mod tests { vec![TAG_VALUE.to_owned()], time_series_table_metadata.clone(), CURRENT_BATCH_INDEX, + HashSet::new(), ); // u64 is generated and then cast to i64 to ensure only positive values are generated. @@ -584,6 +602,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX, + HashSet::new(), ); insert_data_points(1, &mut uncompressed_buffer); assert!(!uncompressed_buffer.is_full()); @@ -609,6 +628,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX, + HashSet::new(), ); insert_data_points(uncompressed_buffer.capacity(), &mut uncompressed_buffer); assert!(uncompressed_buffer.is_full()); @@ -660,6 +680,7 @@ mod tests { vec![TAG_VALUE.to_owned()], time_series_table_metadata.clone(), CURRENT_BATCH_INDEX, + HashSet::new(), ); // u64 is generated and then cast to i64 to ensure only positive values are generated. @@ -713,6 +734,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX, + HashSet::new(), ); insert_data_points( @@ -749,6 +771,7 @@ mod tests { vec![TAG_VALUE.to_owned()], table::time_series_table_metadata_arc(), CURRENT_BATCH_INDEX, + HashSet::new(), ); insert_data_points( @@ -774,5 +797,7 @@ mod tests { &mut values.iter().copied(), ); } + + uncompressed_buffer.insert_batch_id(BATCH_ID); } } diff --git a/crates/modelardb_server/src/storage/uncompressed_data_manager.rs b/crates/modelardb_server/src/storage/uncompressed_data_manager.rs index c00800c7..34f078dc 100644 --- a/crates/modelardb_server/src/storage/uncompressed_data_manager.rs +++ b/crates/modelardb_server/src/storage/uncompressed_data_manager.rs @@ -16,6 +16,7 @@ //! Support for managing all uncompressed data that is ingested into the //! [`StorageEngine`](crate::storage::StorageEngine). +use std::collections::HashSet; use std::hash::{DefaultHasher, Hasher}; use std::io::{Error as IOError, ErrorKind as IOErrorKind}; use std::mem; @@ -23,14 +24,13 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use dashmap::DashMap; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use modelardb_storage::data_folder::DataFolder; use modelardb_types::types::{TimeSeriesTableMetadata, Timestamp, Value}; -use object_store::path::{Path, PathPart}; +use object_store::path::Path; use tokio::runtime::Handle; use tracing::{debug, error, warn}; -use crate::context::Context; use crate::error::Result; use crate::storage::UNCOMPRESSED_DATA_FOLDER; use crate::storage::compressed_data_buffer::CompressedSegmentBatch; @@ -68,68 +68,34 @@ pub(super) struct UncompressedDataManager { } impl UncompressedDataManager { - pub(super) fn new( + /// Create a new [`UncompressedDataManager`] and delete all existing spilled buffers if + /// there are any. If the existing buffers could not be deleted, return + /// [`ModelarDbServerError`](crate::error::ModelarDbServerError). + pub(super) async fn try_new( local_data_folder: DataFolder, memory_pool: Arc, channels: Arc, - ) -> Self { - Self { + ) -> Result { + // Delete the previously spilled on disk data buffers if they exist. + let object_store = local_data_folder.object_store(); + let spilled_buffers = object_store + .list(Some(&Path::from(UNCOMPRESSED_DATA_FOLDER))) + .map_ok(|object_meta| object_meta.location) + .boxed(); + + object_store + .delete_stream(spilled_buffers) + .try_collect::>() + .await?; + + Ok(Self { local_data_folder, current_batch_index: AtomicU64::new(0), uncompressed_in_memory_data_buffers: DashMap::new(), uncompressed_on_disk_data_buffers: DashMap::new(), channels, memory_pool, - } - } - - /// Add references to the [`UncompressedDataBuffers`](UncompressedDataBuffer) currently on disk - /// to [`UncompressedDataManager`] which immediately will start compressing them. - pub(super) async fn initialize(&self, context: &Context) -> Result<()> { - let local_data_folder = self.local_data_folder.object_store(); - let mut spilled_buffers = - local_data_folder.list(Some(&Path::from(UNCOMPRESSED_DATA_FOLDER))); - - while let Some(maybe_spilled_buffer) = spilled_buffers.next().await { - let spilled_buffer = maybe_spilled_buffer?; - let path_parts: Vec = spilled_buffer.location.parts().collect(); - - let table_name = path_parts - .get(1) - .expect("The spilled buffers should be partitioned by their table name.") - .as_ref(); - - let tag_hash = path_parts - .get(2) - .expect("The spilled buffers should be partitioned by their tag hash.") - .as_ref() - .parse::() - .unwrap(); - - let file_name = path_parts - .get(3) - .expect("The spilled buffers should have an auto-generated file name.") - .as_ref(); - - let time_series_table_metadata = context - .time_series_table_metadata_from_default_database_schema(table_name) - .await? - .expect("The time series table for the spilled buffer should exist."); - - let buffer = UncompressedOnDiskDataBuffer::try_new( - tag_hash, - time_series_table_metadata, - self.current_batch_index.load(Ordering::Relaxed), - local_data_folder.clone(), - file_name, - )?; - - self.channels - .uncompressed_data_sender - .send(Message::Data(UncompressedDataBuffer::OnDisk(buffer)))?; - } - - Ok(()) + }) } /// Read and process messages received from the [`StorageEngine`](super::StorageEngine) to @@ -201,6 +167,7 @@ impl UncompressedDataManager { &mut values, time_series_table_metadata.clone(), current_batch_index, + ingested_data_buffer.batch_id, ) .await?; } @@ -235,6 +202,7 @@ impl UncompressedDataManager { values: &mut dyn Iterator, time_series_table_metadata: Arc, current_batch_index: u64, + batch_id: u64, ) -> Result { let tag_hash = calculate_tag_hash(&time_series_table_metadata.name, &tag_values); @@ -259,6 +227,9 @@ impl UncompressedDataManager { timestamp, values, ); + + uncompressed_in_memory_data_buffer.insert_batch_id(batch_id); + buffer_is_full = uncompressed_in_memory_data_buffer.is_full(); true } else { @@ -292,6 +263,8 @@ impl UncompressedDataManager { values, ); + uncompressed_in_memory_data_buffer.insert_batch_id(batch_id); + buffer_is_full = uncompressed_in_memory_data_buffer.is_full(); // The read-only reference must be dropped before the map can be modified. @@ -307,6 +280,7 @@ impl UncompressedDataManager { tag_values, time_series_table_metadata, current_batch_index, + HashSet::from([batch_id]), ); debug!( @@ -558,7 +532,7 @@ impl UncompressedDataManager { &self, uncompressed_data_buffer: UncompressedDataBuffer, ) -> Result<()> { - let (memory_use, maybe_data_points, time_series_table_metadata) = + let (memory_use, maybe_data_points, time_series_table_metadata, batch_ids) = match uncompressed_data_buffer { UncompressedDataBuffer::InMemory(mut uncompressed_in_memory_data_buffer) => ( uncompressed_in_memory_data_buffer.memory_size(), @@ -566,6 +540,7 @@ impl UncompressedDataManager { uncompressed_in_memory_data_buffer .time_series_table_metadata() .clone(), + uncompressed_in_memory_data_buffer.batch_ids().clone(), ), UncompressedDataBuffer::OnDisk(uncompressed_on_disk_data_buffer) => ( 0, @@ -573,6 +548,7 @@ impl UncompressedDataManager { uncompressed_on_disk_data_buffer .time_series_table_metadata() .clone(), + uncompressed_on_disk_data_buffer.batch_ids().clone(), ), }; @@ -610,6 +586,7 @@ impl UncompressedDataManager { .send(Message::Data(CompressedSegmentBatch::new( time_series_table_metadata.clone(), compressed_segments, + batch_ids, )))?; // Add the size of the uncompressed buffer back to the remaining reserved bytes. @@ -657,6 +634,7 @@ fn calculate_tag_hash(table_name: &str, tag_values: &[String]) -> u64 { mod tests { use super::*; + use std::collections::HashSet; use std::sync::Arc; use datafusion::arrow::array::StringBuilder; @@ -670,86 +648,22 @@ mod tests { use object_store::local::LocalFileSystem; use tempfile::TempDir; use tokio::runtime::Runtime; - use tokio::time::{Duration, sleep}; use crate::storage::UNCOMPRESSED_DATA_BUFFER_CAPACITY; - use crate::{ClusterMode, DataFolders}; const TAG_VALUE: &str = "tag"; const TAG_HASH: u64 = 14957893031159457585; + const BATCH_ID: u64 = 0; // Tests for UncompressedDataManager. - #[tokio::test] - async fn test_can_compress_existing_on_disk_data_buffers_when_initializing() { - let temp_dir = tempfile::tempdir().unwrap(); - let temp_dir_url = temp_dir.path().to_str().unwrap(); - let local_data_folder = DataFolder::open_local_url(temp_dir_url).await.unwrap(); - - // Create a context with a storage engine. - let context = Arc::new( - Context::try_new( - DataFolders::new(local_data_folder.clone(), None, local_data_folder), - ClusterMode::SingleNode, - ) - .await - .unwrap(), - ); - - // Create a time series table in the context. - let time_series_table_metadata = Arc::new(table::time_series_table_metadata()); - context - .create_time_series_table(&time_series_table_metadata) - .await - .unwrap(); - - // Ingest a single data point and sleep to allow the ingestion thread to finish. - let mut storage_engine = context.storage_engine.write().await; - let data = table::uncompressed_time_series_table_record_batch(1); - - storage_engine - .insert_data_points(time_series_table_metadata, data) - .await - .unwrap(); - - sleep(Duration::from_millis(500)).await; - - storage_engine - .uncompressed_data_manager - .spill_in_memory_data_buffer() - .await - .unwrap(); - - // Compress the spilled buffer and sleep to allow the compression thread to finish. - assert!(storage_engine.initialize(&context).await.is_ok()); - sleep(Duration::from_millis(500)).await; - - // The spilled buffer should be deleted and the content should be compressed. - let spilled_buffers = storage_engine - .uncompressed_data_manager - .local_data_folder - .object_store() - .list(Some(&Path::from(UNCOMPRESSED_DATA_FOLDER))) - .collect::>() - .await; - - assert_eq!(spilled_buffers.len(), 0); - - assert_eq!( - storage_engine - .compressed_data_manager - .compressed_data_buffers - .len(), - 1 - ); - } - #[tokio::test] async fn test_can_insert_record_batch() { let temp_dir = tempfile::tempdir().unwrap(); let (data_manager, time_series_table_metadata) = create_managers(&temp_dir).await; let data = table::uncompressed_time_series_table_record_batch(1); - let ingested_data_buffer = IngestedDataBuffer::new(time_series_table_metadata, data); + let ingested_data_buffer = + IngestedDataBuffer::new(time_series_table_metadata, data, BATCH_ID); data_manager .insert_data_points(ingested_data_buffer) @@ -766,7 +680,8 @@ mod tests { let (data_manager, time_series_table_metadata) = create_managers(&temp_dir).await; let data = table::uncompressed_time_series_table_record_batch(2); - let ingested_data_buffer = IngestedDataBuffer::new(time_series_table_metadata, data); + let ingested_data_buffer = + IngestedDataBuffer::new(time_series_table_metadata, data, BATCH_ID); data_manager .insert_data_points(ingested_data_buffer) @@ -790,7 +705,8 @@ mod tests { .memory_pool .remaining_ingested_memory_in_bytes(); - let ingested_data_buffer = IngestedDataBuffer::new(time_series_table_metadata, data); + let ingested_data_buffer = + IngestedDataBuffer::new(time_series_table_metadata, data, BATCH_ID); data_manager .insert_data_points(ingested_data_buffer) @@ -919,7 +835,7 @@ mod tests { .unwrap(); let ingested_data_buffer = - IngestedDataBuffer::new(time_series_table_metadata.clone(), data); + IngestedDataBuffer::new(time_series_table_metadata.clone(), data, BATCH_ID); data_manager .insert_data_points(ingested_data_buffer) .await @@ -939,7 +855,7 @@ mod tests { // Insert using insert_data_points() to finish unused buffers. let empty_record_batch = RecordBatch::new_empty(time_series_table_metadata.schema.clone()); let ingested_data_buffer = - IngestedDataBuffer::new(time_series_table_metadata, empty_record_batch); + IngestedDataBuffer::new(time_series_table_metadata, empty_record_batch, BATCH_ID); data_manager .insert_data_points(ingested_data_buffer) @@ -1151,6 +1067,7 @@ mod tests { 0, object_store, uncompressed_data, + HashSet::new(), )) .unwrap(); @@ -1267,6 +1184,7 @@ mod tests { &mut values.iter().copied(), time_series_table_metadata.clone(), current_batch_index, + BATCH_ID, ) .await .unwrap(); @@ -1298,7 +1216,9 @@ mod tests { let channels = Arc::new(Channels::new()); let uncompressed_data_manager = - UncompressedDataManager::new(local_data_folder, memory_pool, channels); + UncompressedDataManager::try_new(local_data_folder, memory_pool, channels) + .await + .unwrap(); ( uncompressed_data_manager, diff --git a/crates/modelardb_storage/Cargo.toml b/crates/modelardb_storage/Cargo.toml index 9e027ac4..6f6a20e2 100644 --- a/crates/modelardb_storage/Cargo.toml +++ b/crates/modelardb_storage/Cargo.toml @@ -33,8 +33,10 @@ futures.workspace = true modelardb_compression = { path = "../modelardb_compression" } modelardb_types = { path = "../modelardb_types" } object_store = { workspace = true, features = ["aws", "azure"] } +serde_json.workspace = true sqlparser.workspace = true tonic.workspace = true +tracing.workspace = true url.workspace = true uuid.workspace = true diff --git a/crates/modelardb_storage/src/data_folder/delta_table_writer.rs b/crates/modelardb_storage/src/data_folder/delta_table_writer.rs index 8501c4f7..78395ba6 100644 --- a/crates/modelardb_storage/src/data_folder/delta_table_writer.rs +++ b/crates/modelardb_storage/src/data_folder/delta_table_writer.rs @@ -17,6 +17,7 @@ //! [`RecordBatches`](RecordBatch) to a Delta table stored in an object store. Writing can be //! committed or rolled back to ensure that the Delta table is always in a consistent state. +use std::collections::HashSet; use std::sync::Arc; use arrow::array::RecordBatch; @@ -34,6 +35,7 @@ use deltalake::protocol::{DeltaOperation, SaveMode}; use modelardb_types::schemas::{COMPRESSED_SCHEMA, FIELD_COLUMN}; use object_store::ObjectStore; use object_store::path::Path; +use serde_json::json; use uuid::Uuid; use crate::apache_parquet_writer_properties; @@ -52,6 +54,9 @@ pub struct DeltaTableWriter { operation_id: Uuid, /// Writes record batches to the Delta table as Apache Parquet files. delta_writer: DeltaWriter, + /// Batch ids from the WAL to include in the commit metadata so the uncompressed batches that + /// correspond to the written data can be deleted from the WAL. + batch_ids: HashSet, } impl DeltaTableWriter { @@ -140,9 +145,17 @@ impl DeltaTableWriter { delta_operation, operation_id, delta_writer, + batch_ids: HashSet::new(), }) } + /// Add batch ids from the WAL that are included in the commit metadata so the uncompressed + /// batches that correspond to the written data can be deleted from the WAL. + pub fn with_batch_ids(mut self, batch_ids: HashSet) -> Self { + self.batch_ids = batch_ids; + self + } + /// Write `record_batch` to the Delta table. Returns a [`ModelarDbStorageError`] if the /// [`RecordBatches`](RecordBatch) does not match the schema of the Delta table or if the /// writing fails. @@ -196,7 +209,13 @@ impl DeltaTableWriter { // Prepare all inputs to the commit. let object_store = self.delta_table.object_store(); - let commit_properties = CommitProperties::default(); + + let mut commit_properties = CommitProperties::default(); + if !self.batch_ids.is_empty() { + commit_properties = commit_properties + .with_metadata(vec![("batchIds".to_owned(), json!(self.batch_ids))]); + } + let table_data = match self.delta_table.snapshot() { Ok(table_data) => table_data, Err(delta_table_error) => { diff --git a/crates/modelardb_storage/src/data_folder/mod.rs b/crates/modelardb_storage/src/data_folder/mod.rs index 9dc814e8..a08b5e7c 100644 --- a/crates/modelardb_storage/src/data_folder/mod.rs +++ b/crates/modelardb_storage/src/data_folder/mod.rs @@ -18,7 +18,7 @@ pub mod cluster; pub mod delta_table_writer; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::env; use std::path::Path as StdPath; use std::sync::Arc; @@ -331,6 +331,14 @@ impl DataFolder { &self.session_context } + /// Return the location of the Delta Lake. This is `memory:///modelardb` if the Delta Lake is + /// in memory, the local path if the Delta Lake is stored on disk, `az://container-name` + /// if the Delta Lake is stored in Azure Blob Storage, or `s3://bucket-name` if the Delta Lake + /// is stored in Amazon S3. + pub fn location(&self) -> &str { + &self.location + } + /// Return an [`ObjectStore`] to access the root of the Delta Lake. pub fn object_store(&self) -> Arc { self.object_store.clone() @@ -690,6 +698,28 @@ impl DataFolder { .await } + /// Write `record_batches` to the table with `table_name` in the Delta Lake. The correct + /// writer is selected automatically based on the table type. The `batch_ids` from the WAL are + /// included in the commit metadata, so the uncompressed batches that correspond to + /// `record_batches` can be deleted from the WAL. If the uncompressed batches were not written + /// to the WAL, use [`Self::write_record_batches`] instead. Returns an updated [`DeltaTable`] + /// if the file was written successfully, otherwise returns [`ModelarDbStorageError`]. + pub async fn write_record_batches_with_batch_ids( + &self, + table_name: &str, + record_batches: Vec, + batch_ids: HashSet, + ) -> Result { + let delta_table_writer = self + .table_writer(table_name) + .await? + .with_batch_ids(batch_ids); + + delta_table_writer + .write_all_and_commit(&record_batches) + .await + } + /// Write `columns` to a Delta Lake table with `table_name`. Returns an updated [`DeltaTable`] /// version if the file was written successfully, otherwise returns [`ModelarDbStorageError`]. async fn write_columns_to_metadata_table( diff --git a/crates/modelardb_storage/src/error.rs b/crates/modelardb_storage/src/error.rs index cd39ca8e..b3ef4fa9 100644 --- a/crates/modelardb_storage/src/error.rs +++ b/crates/modelardb_storage/src/error.rs @@ -47,6 +47,8 @@ pub enum ModelarDbStorageError { EnvironmentVar(VarError), /// Error returned when an invalid argument was passed. InvalidArgument(String), + /// Error returned when an invalid state is encountered. + InvalidState(String), /// Error returned from IO operations. Io(IoError), /// Error returned by ObjectStore. @@ -71,6 +73,7 @@ impl Display for ModelarDbStorageError { Self::DeltaLake(reason) => write!(f, "Delta Lake Error: {reason}"), Self::EnvironmentVar(reason) => write!(f, "Environment Variable Error: {reason}"), Self::InvalidArgument(reason) => write!(f, "Invalid Argument Error: {reason}"), + Self::InvalidState(reason) => write!(f, "Invalid State Error: {reason}"), Self::Io(reason) => write!(f, "Io Error: {reason}"), Self::ObjectStore(reason) => write!(f, "Object Store Error: {reason}"), Self::ObjectStorePath(reason) => write!(f, "Object Store Path Error: {reason}"), @@ -91,6 +94,7 @@ impl Error for ModelarDbStorageError { Self::DeltaLake(reason) => Some(reason), Self::EnvironmentVar(reason) => Some(reason), Self::InvalidArgument(_reason) => None, + Self::InvalidState(_reason) => None, Self::Io(reason) => Some(reason), Self::ObjectStore(reason) => Some(reason), Self::ObjectStorePath(reason) => Some(reason), diff --git a/crates/modelardb_storage/src/lib.rs b/crates/modelardb_storage/src/lib.rs index 2c5bebe9..94450506 100644 --- a/crates/modelardb_storage/src/lib.rs +++ b/crates/modelardb_storage/src/lib.rs @@ -21,6 +21,7 @@ pub mod error; mod optimizer; pub mod parser; mod query; +pub mod write_ahead_log; use std::result::Result as StdResult; use std::sync::Arc; @@ -60,6 +61,9 @@ const TABLE_FOLDER: &str = "tables"; /// The folder storing metadata in the data folders. const METADATA_FOLDER: &str = "metadata"; +/// The folder storing the write-ahead log in the data folders. +pub const WRITE_AHEAD_LOG_FOLDER: &str = "wal"; + /// Create a new [`SessionContext`] for interacting with Apache DataFusion. The [`SessionContext`] /// is constructed with the default configuration, default resource managers, and additional /// optimizer rules that rewrite simple aggregate queries to be executed directly on the segments diff --git a/crates/modelardb_storage/src/write_ahead_log.rs b/crates/modelardb_storage/src/write_ahead_log.rs new file mode 100644 index 00000000..78e71471 --- /dev/null +++ b/crates/modelardb_storage/src/write_ahead_log.rs @@ -0,0 +1,1413 @@ +/* Copyright 2026 The ModelarDB Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Implementation of types that provide a write-ahead log for ModelarDB that can be used to +//! efficiently persist data on disk to avoid data loss and enable crash recovery. Each table has +//! its own segmented log consisting of an active segment that is appended to and zero or more +//! closed segments that are read-only. The active segment is closed once a configured number of +//! batches have been written to it, and closed segments are deleted once all of their batches have +//! been persisted to the Delta Lake. + +use std::collections::{BTreeSet, HashMap, HashSet}; +use std::fs::{File, OpenOptions}; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; + +use arrow::datatypes::Schema; +use arrow::error::ArrowError::IpcError; +use arrow::ipc::reader::StreamReader; +use arrow::ipc::writer::StreamWriter; +use arrow::record_batch::RecordBatch; +use deltalake::DeltaTable; +use modelardb_types::types::TimeSeriesTableMetadata; +use tracing::{debug, info, warn}; + +use crate::WRITE_AHEAD_LOG_FOLDER; +use crate::data_folder::DataFolder; +use crate::error::{ModelarDbStorageError, Result}; + +/// Number of batches to write to a single WAL segment file before closing it and starting a new one. +const SEGMENT_BATCH_COUNT_THRESHOLD: u64 = 100; + +/// Write-ahead log that logs data on a per-table level. +pub struct WriteAheadLog { + /// Path to the folder that contains the write-ahead log. + folder_path: PathBuf, + /// Logs for each table. The key is the table name, and the value is the table log for that table. + table_logs: HashMap, +} + +impl WriteAheadLog { + /// Create a new [`WriteAheadLog`] that stores the WAL in the root of `local_data_folder` in + /// the [`WRITE_AHEAD_LOG_FOLDER`] folder. `local_data_folder` must be in a local path since the + /// WAL uses the [`std::fs`] API to avoid the overhead of the `ObjectStore` API and to allow the + /// use of [`sync_data()`](File::sync_data). If the folder does not exist, it is created. If + /// `local_data_folder` is not in a local path or the WAL could not be created, return + /// [`ModelarDbStorageError`]. + pub async fn try_new(local_data_folder: &DataFolder) -> Result { + // Create the folder for the write-ahead log if it does not exist. + let location = local_data_folder.location(); + + if location.contains("://") { + return Err(ModelarDbStorageError::InvalidState(format!( + "Write-ahead log location '{location}' is not a local path." + ))); + } + + let log_folder_path = PathBuf::from(location).join(WRITE_AHEAD_LOG_FOLDER); + + std::fs::create_dir_all(&log_folder_path)?; + + let mut write_ahead_log = Self { + folder_path: log_folder_path.clone(), + table_logs: HashMap::new(), + }; + + // For each time series table, create a table log if it does not already exist. + for metadata in local_data_folder.time_series_table_metadata().await? { + let delta_table = local_data_folder.delta_table(&metadata.name).await?; + write_ahead_log.create_table_log(&metadata).await?; + + // Load the persisted batch ids from the commit history of the delta table. This is + // only necessary when initializing the WAL for an existing table. + let table_log = write_ahead_log.table_log(&metadata.name)?; + table_log + .load_persisted_batches_from_delta_table(delta_table) + .await?; + } + + info!( + path = %log_folder_path.display(), + table_count = write_ahead_log.table_logs.len(), + "WAL initialized." + ); + + Ok(write_ahead_log) + } + + /// Create a new segmented log for the table with the given metadata. If a table log already + /// exists in the map or the table log could not be created, return [`ModelarDbStorageError`]. + /// Note that if the table log already exists, but it is not present in the map, the existing + /// table log will be added to the map. + pub async fn create_table_log( + &mut self, + time_series_table_metadata: &TimeSeriesTableMetadata, + ) -> Result<()> { + let table_name = time_series_table_metadata.name.clone(); + + if !self.table_logs.contains_key(&table_name) { + let table_log_path = self.folder_path.join(&table_name); + let table_log = + SegmentedLog::try_new(table_log_path, &time_series_table_metadata.schema)?; + + debug!( + table = %table_name, + folder_path = %table_log.folder_path.display(), + "WAL table log created." + ); + + self.table_logs.insert(table_name, table_log); + + Ok(()) + } else { + Err(ModelarDbStorageError::InvalidState(format!( + "Table log for table '{table_name}' already exists.", + ))) + } + } + + /// Remove the table log for the table with the given name. If the table log does not exist or + /// could not be removed, return [`ModelarDbStorageError`]. + pub fn remove_table_log(&mut self, table_name: &str) -> Result<()> { + let log_path; + + if let Some(table_log) = self.table_logs.remove(table_name) { + log_path = table_log.folder_path; + // table_log is dropped here as it goes out of scope which automatically closes its + // internal file handle. + } else { + return Err(ModelarDbStorageError::InvalidState(format!( + "Table log for table '{table_name}' does not exist.", + ))); + } + + // Now that the file handle is closed, the files can be removed. + std::fs::remove_dir_all(&log_path)?; + + debug!( + table = %table_name, + folder_path = %log_path.display(), + "WAL table log removed." + ); + + Ok(()) + } + + /// Append data to the table log for the given table and sync the file to ensure that all data + /// is on disk. Only requires read access to the write-ahead log since the internal Mutex + /// handles write synchronization. Return the batch id given to the appended data. If a table + /// log does not exist or the data could not be appended, return [`ModelarDbStorageError`]. + pub fn append_to_table_log(&self, table_name: &str, data: &RecordBatch) -> Result { + let table_log = self.table_log(table_name)?; + table_log.append_and_sync(data) + } + + /// Mark the given batch ids as saved to disk in the corresponding table log. Fully persisted + /// segment files are deleted. If a table log does not exist or a segment file could not be + /// deleted, return [`ModelarDbStorageError`]. + pub fn mark_batches_as_persisted_in_table_log( + &self, + table_name: &str, + batch_ids: HashSet, + ) -> Result<()> { + let table_log = self.table_log(table_name)?; + table_log.mark_batches_as_persisted(batch_ids) + } + + /// Return pairs of (batch_id, batch) for all batches in the corresponding table log that have + /// not yet been persisted. If the table log does not exist or the batches could not be read + /// from the table log, return [`ModelarDbStorageError`]. + pub fn unpersisted_batches_in_table_log( + &self, + table_name: &str, + ) -> Result> { + let table_log = self.table_log(table_name)?; + table_log.unpersisted_batches() + } + + /// Get the table log for the table with the given name. If the table log does not exist, return + /// [`ModelarDbStorageError`]. + fn table_log(&self, table_name: &str) -> Result<&SegmentedLog> { + self.table_logs.get(table_name).ok_or_else(|| { + ModelarDbStorageError::InvalidState(format!( + "Table log for table '{table_name}' does not exist." + )) + }) + } +} + +/// A closed WAL segment file. The file contains all batches with ids in `[start_id, end_id]` +/// and will not be written to again. +struct ClosedSegment { + /// Path to the segment file on disk. + path: PathBuf, + /// Batch id of the first batch in this segment. + start_id: u64, + /// Batch id of the last batch in this segment (inclusive). + end_id: u64, +} + +impl ClosedSegment { + /// Return `true` if every batch id in this segment is present in `persisted`. + fn is_fully_persisted(&self, persisted: &BTreeSet) -> bool { + // Iterate in reverse since newer (higher) ids are least likely to be persisted, allowing + // all() to short-circuit earlier for partially persisted segments. + (self.start_id..=self.end_id) + .rev() + .all(|id| persisted.contains(&id)) + } +} + +/// The currently active WAL segment being written to. All fields are mutated together +/// when closing the active segment and are protected by the mutex in [`SegmentedLog`]. +struct ActiveSegment { + /// Path to the active segment file. + path: PathBuf, + /// Batch id of the first batch written to this segment. + start_id: u64, + /// Writer to write data in Apache Arrow IPC streaming format to this segment file. + writer: StreamWriter, + /// The batch id to give to the next batch of data. Monotonically increasing across segments. + next_batch_id: u64, +} + +impl ActiveSegment { + /// Create a new [`ActiveSegment`] in `folder_path` with the given `start_id` and `schema`. + /// If the file could not be created, return [`ModelarDbStorageError`]. + fn try_new(folder_path: PathBuf, schema: &Schema, start_id: u64) -> Result { + let path = folder_path.join(format!("{start_id}-.arrows")); + let file = OpenOptions::new() + .create(true) + .read(true) + .write(true) + .truncate(true) + .open(&path)?; + + let writer = StreamWriter::try_new(file, schema)?; + + debug!( + path = %path.display(), + "WAL file created." + ); + + Ok(Self { + path, + start_id, + writer, + next_batch_id: start_id, + }) + } +} + +/// Segmented log that appends data in Apache Arrow IPC streaming format to segment files in a +/// folder. At any point in time there is exactly one active segment being written to plus zero or +/// more closed segments that are read-only. The active segment is closed once +/// [`SEGMENT_BATCH_COUNT_THRESHOLD`] batches have been written to it. Appending enforces that +/// [`sync_data()`](File::sync_data) is called immediately after writing to ensure that all data is +/// on disk before returning. Note that an exclusive lock is held on the file while it is being +/// written to, to ensure that no other thread can write to it. +struct SegmentedLog { + /// Folder that contains all segment files for this log. + folder_path: PathBuf, + /// Arrow schema shared by every segment in this log. + schema: Schema, + /// The active segment currently being written to. + active_segment: Mutex, + /// Closed, read-only segment files ordered by `start_id`. + closed_segments: Mutex>, + /// Batch ids that have been confirmed as saved to disk. Used to determine when closed segments + /// can be deleted. + persisted_batch_ids: Mutex>, +} + +impl SegmentedLog { + /// Create a new [`SegmentedLog`] that appends data with `schema` to segment files in + /// `folder_path`. Existing closed segment files are appended to the closed-segment list. + /// A fresh active segment is always created on start-up. If the folder or file could not be + /// created, return [`ModelarDbStorageError`]. + fn try_new(folder_path: PathBuf, schema: &Schema) -> Result { + std::fs::create_dir_all(&folder_path)?; + + close_leftover_active_segment(&folder_path)?; + + // Collect all closed segment files already on disk and sort them by start_id. + let mut closed_segments = find_closed_segments(&folder_path)?; + closed_segments.sort_by_key(|s| s.start_id); + + // The next batch id is one past the end of the last closed segment, or 0 if there are none. + let next_id = closed_segments.last().map(|s| s.end_id + 1).unwrap_or(0); + + if !closed_segments.is_empty() { + debug!( + folder_path = %folder_path.display(), + closed_segment_count = closed_segments.len(), + next_batch_id = next_id, + "Found closed WAL segments." + ); + } + + // Always create a fresh active segment on startup to avoid writing into the middle of + // an existing Apache Arrow IPC stream. + let active_file = ActiveSegment::try_new(folder_path.clone(), schema, next_id)?; + + Ok(Self { + folder_path, + schema: schema.clone(), + active_segment: Mutex::new(active_file), + closed_segments: Mutex::new(closed_segments), + persisted_batch_ids: Mutex::new(BTreeSet::new()), + }) + } + + /// Append the given data to the active segment and sync the file to ensure that all data is on + /// disk. Return the batch id given to the appended data. Close the active segment and start a + /// new one if [`SEGMENT_BATCH_COUNT_THRESHOLD`] is reached. If the data could not be appended + /// or the file could not be synced, return [`ModelarDbStorageError`]. + fn append_and_sync(&self, data: &RecordBatch) -> Result { + // Acquire the mutex to ensure only one thread can write at a time. + let mut active = self + .active_segment + .lock() + .expect("Mutex should not be poisoned."); + + active.writer.write(data)?; + + // Flush the writer's internal buffers to the file. + active.writer.flush()?; + + // Get a reference to the underlying file handle and sync to disk. Note that file metadata + // such as modification timestamps and permissions are not updated since we only sync data. + // Only syncing data reduces disk operations and improves performance. + active.writer.get_ref().sync_data()?; + + // Increment the batch id for the next batch of data. + let current_batch_id = active.next_batch_id; + active.next_batch_id += 1; + + debug!( + path = %active.path.display(), + batch_id = current_batch_id, + row_count = data.num_rows(), + "Appended batch to WAL file." + ); + + // Close the active segment and start a new one if the threshold has been reached. The + // number of batches in the active segment is the difference between the next batch id + // (post-increment) and the active start id. + let active_batch_count = active.next_batch_id - active.start_id; + if active_batch_count >= SEGMENT_BATCH_COUNT_THRESHOLD { + self.close_active_segment(&mut active)?; + } + + Ok(current_batch_id) + } + + /// Close the current active segment by renaming it to its final `{start_id}-{end_id}.arrows` + /// name and open a fresh active segment. The end id is added to the file name to avoid having + /// to read the entire file to determine the end id later. The caller must hold the + /// `active_segment` lock. + fn close_active_segment(&self, active: &mut ActiveSegment) -> Result<()> { + let mut closed_segments = self + .closed_segments + .lock() + .expect("Mutex should not be poisoned."); + + let end_id = active.next_batch_id - 1; + + debug!( + path = %active.path.display(), + start_id = active.start_id, + end_id, + "Closing active WAL segment." + ); + + // Finish the current writer so the Apache Arrow IPC end-of-stream marker is written. + active.writer.finish()?; + + // Rename the active file to its permanent name that includes the end id. + let closed_path = self + .folder_path + .join(format!("{}-{end_id}.arrows", active.start_id)); + std::fs::rename(&active.path, &closed_path)?; + + closed_segments.push(ClosedSegment { + path: closed_path, + start_id: active.start_id, + end_id, + }); + + // Open a new active segment. + let next_id = end_id + 1; + *active = ActiveSegment::try_new(self.folder_path.clone(), &self.schema, next_id)?; + + Ok(()) + } + + /// Mark the given batch ids as saved to disk. Any closed segment whose entire batch-id range + /// is now persisted is deleted from disk and removed from the in-memory list. If a segment file + /// could not be deleted, return [`ModelarDbStorageError`]. + fn mark_batches_as_persisted(&self, batch_ids: HashSet) -> Result<()> { + debug!( + folder_path = %self.folder_path.display(), + batch_ids = ?batch_ids, + "Marking batches as persisted." + ); + + let mut persisted = self + .persisted_batch_ids + .lock() + .expect("Mutex should not be poisoned."); + + persisted.extend(batch_ids); + + let mut closed_segments = self + .closed_segments + .lock() + .expect("Mutex should not be poisoned."); + + // Identify and delete fully persisted segments. + let (to_delete, to_retain): (Vec<_>, Vec<_>) = closed_segments + .drain(..) + .partition(|segment| segment.is_fully_persisted(&persisted)); + + *closed_segments = to_retain; + + for segment in to_delete { + debug!( + path = %segment.path.display(), + "Deleting fully persisted WAL segment." + ); + + std::fs::remove_file(&segment.path)?; + + // Remove the persisted ids for this segment as they are no longer needed. + for id in segment.start_id..=segment.end_id { + persisted.remove(&id); + } + } + + Ok(()) + } + + /// Update the in-memory set of persisted batch ids from the commit history of `delta_table` + /// and delete any fully persisted closed segment files. If the commit history could not be + /// read or a segment file could not be deleted, return [`ModelarDbStorageError`]. + async fn load_persisted_batches_from_delta_table(&self, delta_table: DeltaTable) -> Result<()> { + let mut persisted_batch_ids = HashSet::new(); + + let history = delta_table.history(None).await?; + for commit in history.into_iter() { + if let Some(batch_ids) = commit.info.get("batchIds") { + let batch_ids: Vec = serde_json::from_value(batch_ids.clone()).expect( + "The batchIds field in the commit metadata should be a JSON array of u64 values.", + ); + + persisted_batch_ids.extend(batch_ids); + } + } + + debug!( + folder_path = %self.folder_path.display(), + batch_ids = ?persisted_batch_ids, + "Loaded persisted batch ids from Delta table commit history." + ); + + self.mark_batches_as_persisted(persisted_batch_ids) + } + + /// Return pairs of (batch_id, batch) for all batches in the log that have not yet been + /// persisted according to the current in-memory `persisted_batch_ids` set. If the batches + /// could not be read from the segment files, return [`ModelarDbStorageError`]. + fn unpersisted_batches(&self) -> Result> { + let persisted = self + .persisted_batch_ids + .lock() + .expect("Mutex should not be poisoned."); + + Ok(self + .all_batches()? + .into_iter() + .filter(|(batch_id, _)| !persisted.contains(batch_id)) + .collect()) + } + + /// Read all data from all segment files (closed and active) in order and return them as pairs + /// of (batch_id, batch). If any file could not be read, return [`ModelarDbStorageError`]. + fn all_batches(&self) -> Result> { + // Acquire the mutex to ensure data is not being written while reading. + let active = self + .active_segment + .lock() + .expect("Mutex should not be poisoned."); + + let closed_segments = self + .closed_segments + .lock() + .expect("Mutex should not be poisoned."); + + let mut all_batches = Vec::new(); + for segment in closed_segments.iter() { + let batches = read_batches_from_path(&segment.path)?; + all_batches.extend((segment.start_id..=segment.end_id).zip(batches)); + } + + // Append the active segment's batches to the end of the list. + let active_batches = read_batches_from_path(&active.path)?; + if !active_batches.is_empty() { + all_batches.extend((active.start_id..=active.next_batch_id - 1).zip(active_batches)); + } + + debug!( + folder_path = %self.folder_path.display(), + closed_segment_count = closed_segments.len(), + batch_count = all_batches.len(), + "Read all batches from WAL files." + ); + + Ok(all_batches) + } +} + +/// If a leftover active segment (`{start_id}-.arrows`) exists in `folder_path`, rename it to +/// its final `{start_id}-{end_id}.arrows` name so it is picked up as a closed segment. If the +/// file contains no batches, it is removed instead. If the file could not be renamed or +/// removed, return [`ModelarDbStorageError`]. +fn close_leftover_active_segment(folder_path: &Path) -> Result<()> { + let Some(active_path) = std::fs::read_dir(folder_path)? + .filter_map(|maybe_entry| maybe_entry.ok()) + .map(|entry| entry.path()) + .find(|path| { + path.file_stem() + .and_then(|stem| stem.to_str()) + .is_some_and(|stem| stem.ends_with('-')) + }) + else { + return Ok(()); + }; + + let stem = active_path + .file_stem() + .and_then(|stem| stem.to_str()) + .expect("Active WAL segment stem should be '{start_id}-'."); + + let start_id: u64 = stem[..stem.len() - 1] + .parse() + .expect("Active WAL segment stem should start with a valid u64."); + + let batches = read_batches_from_path(&active_path)?; + + if batches.is_empty() { + std::fs::remove_file(&active_path)?; + debug!(path = %active_path.display(), "Removed empty leftover active WAL segment."); + } else { + let end_id = start_id + batches.len() as u64 - 1; + let closed_path = folder_path.join(format!("{start_id}-{end_id}.arrows")); + + warn!( + path = %active_path.display(), + closed_path = %closed_path.display(), + batch_count = batches.len(), + "Closed leftover active WAL segment from unclean shutdown." + ); + + std::fs::rename(&active_path, closed_path)?; + } + + Ok(()) +} + +/// Collect all closed segment files in `folder_path`. Closed segments have names of the form +/// `{start_id}-{end_id}.arrows` where both `start_id` and `end_id` are valid `u64` values. +fn find_closed_segments(folder_path: &Path) -> Result> { + let mut segments = Vec::new(); + + for entry in std::fs::read_dir(folder_path)? { + let path = entry?.path(); + let stem = path + .file_stem() + .and_then(|stem| stem.to_str()) + .expect("WAL file should have a valid UTF-8 stem."); + + if let Some((start_id, end_id)) = stem + .split_once('-') + .and_then(|(s, e)| Some((s.parse::().ok()?, e.parse::().ok()?))) + { + segments.push(ClosedSegment { + path, + start_id, + end_id, + }); + } else { + return Err(ModelarDbStorageError::InvalidState(format!( + "Unexpected file found in WAL folder: {}.", + path.display() + ))); + } + } + + Ok(segments) +} + +/// Read all [`RecordBatches`](RecordBatch) from the file at `path`. Tolerates a missing +/// end-of-stream marker, which is normal for the active segment since [`StreamWriter::finish()`] +/// has not been called yet. If the file could not be read, return [`ModelarDbStorageError`]. +fn read_batches_from_path(path: &Path) -> Result> { + let file = File::open(path)?; + let reader = StreamReader::try_new(file, None)?; + + let mut batches = Vec::new(); + for maybe_batch in reader { + match maybe_batch { + Ok(batch) => batches.push(batch), + Err(IpcError(msg)) => { + if msg.contains("UnexpectedEof") || msg.contains("unexpected end of file") { + break; + } + return Err(IpcError(msg).into()); + } + Err(e) => return Err(e.into()), + } + } + + Ok(batches) +} + +#[cfg(test)] +mod tests { + use super::*; + + use modelardb_test::table; + use modelardb_test::table::TIME_SERIES_TABLE_NAME; + use tempfile::TempDir; + + // Tests for WriteAheadLog. + #[tokio::test] + async fn test_try_new_without_tables_creates_empty_wal() { + let (_temp_dir, wal) = new_empty_write_ahead_log().await; + + assert!(wal.table_logs.is_empty()); + } + + #[tokio::test] + async fn test_try_new_with_existing_table_creates_table_log() { + let (_temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + let wal = WriteAheadLog::try_new(&data_folder).await.unwrap(); + + assert_eq!(wal.table_logs.len(), 1); + assert!(wal.table_logs.contains_key(TIME_SERIES_TABLE_NAME)); + } + + #[tokio::test] + async fn test_try_new_loads_persisted_batch_ids() { + let (_temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + + // Simulate that data was written to the table in a previous process to ensure that the + // WAL can load already persisted batch ids from the commit history. + write_compressed_segments_with_batch_ids(&data_folder, HashSet::from([0, 1, 2])).await; + + let wal = WriteAheadLog::try_new(&data_folder).await.unwrap(); + + let persisted = wal.table_logs[TIME_SERIES_TABLE_NAME] + .persisted_batch_ids + .lock() + .unwrap(); + + assert_eq!(*persisted, BTreeSet::from([0, 1, 2])); + } + + #[tokio::test] + async fn test_try_new_fails_for_non_local_data_folder() { + let data_folder = DataFolder::open_memory().await.unwrap(); + let result = WriteAheadLog::try_new(&data_folder).await; + + assert_eq!( + result.err().unwrap().to_string(), + "Invalid State Error: Write-ahead log location 'memory:///modelardb' is not a local path." + ); + } + + #[tokio::test] + async fn test_create_table_log_adds_log_for_table() { + let (_temp_dir, mut wal) = new_empty_write_ahead_log().await; + let metadata = table::time_series_table_metadata(); + + assert!(wal.table_logs.is_empty()); + + wal.create_table_log(&metadata).await.unwrap(); + + assert!(wal.table_logs.contains_key(TIME_SERIES_TABLE_NAME)); + } + + #[tokio::test] + async fn test_create_table_log_fails_if_table_log_already_exists() { + let (_temp_dir, mut wal) = new_empty_write_ahead_log().await; + let metadata = table::time_series_table_metadata(); + + wal.create_table_log(&metadata).await.unwrap(); + let result = wal.create_table_log(&metadata).await; + + assert_eq!( + result.err().unwrap().to_string(), + format!( + "Invalid State Error: Table log for table '{TIME_SERIES_TABLE_NAME}' already exists.", + ) + ); + } + + #[tokio::test] + async fn test_remove_table_log_removes_log_and_directory() { + let (_temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + let mut wal = WriteAheadLog::try_new(&data_folder).await.unwrap(); + + let log_path = wal.table_logs[TIME_SERIES_TABLE_NAME].folder_path.clone(); + assert!(log_path.exists()); + assert!(wal.table_logs.contains_key(TIME_SERIES_TABLE_NAME)); + + wal.remove_table_log(TIME_SERIES_TABLE_NAME).unwrap(); + + assert!(!wal.table_logs.contains_key(TIME_SERIES_TABLE_NAME)); + assert!(!log_path.exists()); + } + + #[tokio::test] + async fn test_remove_and_recreate_table_log_resets_batch_ids() { + let (_temp_dir, mut wal) = new_empty_write_ahead_log().await; + + let metadata = table::time_series_table_metadata(); + let batch = table::uncompressed_time_series_table_record_batch(5); + + wal.create_table_log(&metadata).await.unwrap(); + + wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch) + .unwrap(); + wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch) + .unwrap(); + + wal.remove_table_log(TIME_SERIES_TABLE_NAME).unwrap(); + wal.create_table_log(&metadata).await.unwrap(); + + assert_eq!( + wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch) + .unwrap(), + 0 + ); + } + + #[tokio::test] + async fn test_remove_table_log_fails_if_table_log_does_not_exist() { + let (_temp_dir, mut wal) = new_empty_write_ahead_log().await; + + let result = wal.remove_table_log(TIME_SERIES_TABLE_NAME); + + assert_eq!( + result.err().unwrap().to_string(), + format!( + "Invalid State Error: Table log for table '{TIME_SERIES_TABLE_NAME}' does not exist.", + ) + ); + } + + #[tokio::test] + async fn test_append_to_table_log_returns_incrementing_batch_ids() { + let (_temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + let wal = WriteAheadLog::try_new(&data_folder).await.unwrap(); + + let batch = table::uncompressed_time_series_table_record_batch(5); + + assert_eq!( + wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch) + .unwrap(), + 0 + ); + assert_eq!( + wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch) + .unwrap(), + 1 + ); + assert_eq!( + wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch) + .unwrap(), + 2 + ); + } + + #[tokio::test] + async fn test_append_to_table_log_fails_if_table_log_does_not_exist() { + let (_temp_dir, wal) = new_empty_write_ahead_log().await; + + let batch = table::uncompressed_time_series_table_record_batch(5); + let result = wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch); + + assert_eq!( + result.err().unwrap().to_string(), + format!( + "Invalid State Error: Table log for table '{TIME_SERIES_TABLE_NAME}' does not exist.", + ) + ); + } + + #[tokio::test] + async fn test_mark_batches_as_persisted_in_table_log_removes_from_unpersisted() { + let (_temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + let wal = WriteAheadLog::try_new(&data_folder).await.unwrap(); + + let batch = table::uncompressed_time_series_table_record_batch(5); + wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch) + .unwrap(); + wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch) + .unwrap(); + + let unpersisted = wal + .unpersisted_batches_in_table_log(TIME_SERIES_TABLE_NAME) + .unwrap(); + assert_eq!(unpersisted.len(), 2); + + wal.mark_batches_as_persisted_in_table_log(TIME_SERIES_TABLE_NAME, HashSet::from([0, 1])) + .unwrap(); + + let unpersisted = wal + .unpersisted_batches_in_table_log(TIME_SERIES_TABLE_NAME) + .unwrap(); + + assert!(unpersisted.is_empty()); + } + + #[tokio::test] + async fn test_mark_batches_as_persisted_in_table_log_fails_if_table_log_does_not_exist() { + let (_temp_dir, wal) = new_empty_write_ahead_log().await; + + let result = + wal.mark_batches_as_persisted_in_table_log(TIME_SERIES_TABLE_NAME, HashSet::new()); + + assert_eq!( + result.err().unwrap().to_string(), + format!( + "Invalid State Error: Table log for table '{TIME_SERIES_TABLE_NAME}' does not exist.", + ) + ); + } + + #[tokio::test] + async fn test_unpersisted_batches_in_table_log_returns_all_when_none_persisted() { + let (_temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + let wal = WriteAheadLog::try_new(&data_folder).await.unwrap(); + + let batch = table::uncompressed_time_series_table_record_batch(5); + wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch) + .unwrap(); + wal.append_to_table_log(TIME_SERIES_TABLE_NAME, &batch) + .unwrap(); + + let unpersisted = wal + .unpersisted_batches_in_table_log(TIME_SERIES_TABLE_NAME) + .unwrap(); + + assert_eq!(unpersisted.len(), 2); + } + + #[tokio::test] + async fn test_unpersisted_batches_in_table_log_fails_if_table_log_does_not_exist() { + let (_temp_dir, wal) = new_empty_write_ahead_log().await; + + let result = wal.unpersisted_batches_in_table_log(TIME_SERIES_TABLE_NAME); + + assert_eq!( + result.err().unwrap().to_string(), + format!( + "Invalid State Error: Table log for table '{TIME_SERIES_TABLE_NAME}' does not exist.", + ) + ); + } + + async fn new_empty_write_ahead_log() -> (TempDir, WriteAheadLog) { + let temp_dir = tempfile::tempdir().unwrap(); + let data_folder = DataFolder::open_local(temp_dir.path()).await.unwrap(); + let wal = WriteAheadLog::try_new(&data_folder).await.unwrap(); + + (temp_dir, wal) + } + + // Tests for SegmentedLog. + #[test] + fn test_try_new_creates_active_segment() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let active = segmented_log.active_segment.lock().unwrap(); + assert!(active.path.exists()); + assert_eq!(active.next_batch_id, 0); + + assert!(segmented_log.closed_segments.lock().unwrap().is_empty()); + } + + #[test] + fn test_read_all_empty_file() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batches = segmented_log.all_batches().unwrap(); + assert!(batches.is_empty()); + + let active = segmented_log.active_segment.lock().unwrap(); + assert_eq!(active.next_batch_id, 0); + } + + #[test] + fn test_append_and_read_single_batch() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batch = table::uncompressed_time_series_table_record_batch(5); + segmented_log.append_and_sync(&batch).unwrap(); + + let batches = segmented_log.all_batches().unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], (0, batch)); + + let active = segmented_log.active_segment.lock().unwrap(); + assert_eq!(active.next_batch_id, 1); + } + + #[test] + fn test_append_and_read_multiple_batches() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batch_1 = table::uncompressed_time_series_table_record_batch(10); + let batch_2 = table::uncompressed_time_series_table_record_batch(20); + let batch_3 = table::uncompressed_time_series_table_record_batch(30); + + segmented_log.append_and_sync(&batch_1).unwrap(); + segmented_log.append_and_sync(&batch_2).unwrap(); + segmented_log.append_and_sync(&batch_3).unwrap(); + + let batches = segmented_log.all_batches().unwrap(); + assert_eq!(batches.len(), 3); + assert_eq!(batches[0], (0, batch_1)); + assert_eq!(batches[1], (1, batch_2)); + assert_eq!(batches[2], (2, batch_3)); + + let active = segmented_log.active_segment.lock().unwrap(); + assert_eq!(active.next_batch_id, 3); + } + + #[test] + fn test_segment_closes_at_threshold() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batch = table::uncompressed_time_series_table_record_batch(5); + + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD { + segmented_log.append_and_sync(&batch).unwrap(); + } + + let closed = segmented_log.closed_segments.lock().unwrap(); + assert_eq!(closed.len(), 1); + assert_eq!(closed[0].start_id, 0); + assert_eq!(closed[0].end_id, SEGMENT_BATCH_COUNT_THRESHOLD - 1); + + let active = segmented_log.active_segment.lock().unwrap(); + assert_eq!(active.start_id, SEGMENT_BATCH_COUNT_THRESHOLD); + } + + #[test] + fn test_reopen_loads_closed_segments() { + let temp_dir = tempfile::tempdir().unwrap(); + let folder_path = temp_dir.path().join(TIME_SERIES_TABLE_NAME); + let metadata = table::time_series_table_metadata(); + + let batch = table::uncompressed_time_series_table_record_batch(10); + + // Write enough batches to close the active segment, then drop. + { + let segmented_log = + SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap(); + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD { + segmented_log.append_and_sync(&batch).unwrap(); + } + } + + // The closed segment should be detected and the next id should continue. + let segmented_log = SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap(); + + let active = segmented_log.active_segment.lock().unwrap(); + assert_eq!(active.next_batch_id, SEGMENT_BATCH_COUNT_THRESHOLD); + assert_eq!(segmented_log.closed_segments.lock().unwrap().len(), 1); + } + + #[test] + fn test_reopen_and_append_continues_batch_ids() { + let temp_dir = tempfile::tempdir().unwrap(); + let folder_path = temp_dir.path().join(TIME_SERIES_TABLE_NAME); + let metadata = table::time_series_table_metadata(); + + let batch = table::uncompressed_time_series_table_record_batch(10); + + // Write enough batches to close the active segment, then drop. + { + let segmented_log = + SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap(); + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD { + segmented_log.append_and_sync(&batch).unwrap(); + } + } + + let segmented_log = SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap(); + segmented_log.append_and_sync(&batch).unwrap(); + + let batches = segmented_log.all_batches().unwrap(); + assert_eq!(batches.len() as u64, SEGMENT_BATCH_COUNT_THRESHOLD + 1); + + let active = segmented_log.active_segment.lock().unwrap(); + assert_eq!(active.next_batch_id, SEGMENT_BATCH_COUNT_THRESHOLD + 1); + } + + #[test] + fn test_close_leftover_active_segment_on_reopen() { + let temp_dir = tempfile::tempdir().unwrap(); + let folder_path = temp_dir.path().join(TIME_SERIES_TABLE_NAME); + let metadata = table::time_series_table_metadata(); + + let batch = table::uncompressed_time_series_table_record_batch(5); + + // Write enough batches to close the active segment and append to a new active segment, + // then drop. + { + let segmented_log = + SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap(); + + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD + 2 { + segmented_log.append_and_sync(&batch).unwrap(); + } + } + + // On re-open the leftover active segment should be closed, leaving two closed segments + // and a fresh active segment starting after them. + let segmented_log = SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap(); + + let closed = segmented_log.closed_segments.lock().unwrap(); + assert_eq!(closed.len(), 2); + assert_eq!(closed[0].start_id, 0); + assert_eq!(closed[0].end_id, SEGMENT_BATCH_COUNT_THRESHOLD - 1); + assert_eq!(closed[1].start_id, SEGMENT_BATCH_COUNT_THRESHOLD); + assert_eq!(closed[1].end_id, SEGMENT_BATCH_COUNT_THRESHOLD + 1); + + let active = segmented_log.active_segment.lock().unwrap(); + assert_eq!(active.next_batch_id, SEGMENT_BATCH_COUNT_THRESHOLD + 2); + } + + #[test] + fn test_delete_leftover_empty_active_segment_on_reopen() { + let temp_dir = tempfile::tempdir().unwrap(); + let folder_path = temp_dir.path().join(TIME_SERIES_TABLE_NAME); + let metadata = table::time_series_table_metadata(); + + // Create a segmented log and immediately drop it without writing anything. + // This leaves an empty "{start_id}-.arrows" active segment. + { + SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap(); + } + + // On re-open, the empty leftover active segment should be removed. + let segmented_log = SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap(); + + assert!(segmented_log.closed_segments.lock().unwrap().is_empty()); + let active = segmented_log.active_segment.lock().unwrap(); + assert_eq!(active.next_batch_id, 0); + assert!(active.path.exists()); + + // Only the new active segment file should exist. + let file_count = std::fs::read_dir(&folder_path).unwrap().count(); + assert_eq!(file_count, 1); + } + + #[test] + fn test_mark_batches_as_persisted_deletes_fully_persisted_segment() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batch = table::uncompressed_time_series_table_record_batch(5); + + // Fill and close one full segment. + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD { + segmented_log.append_and_sync(&batch).unwrap(); + } + + let segment_path = segmented_log.closed_segments.lock().unwrap()[0] + .path + .clone(); + assert!(segment_path.exists()); + + let ids: HashSet = (0..SEGMENT_BATCH_COUNT_THRESHOLD).collect(); + segmented_log.mark_batches_as_persisted(ids).unwrap(); + + assert!(!segment_path.exists()); + assert!(segmented_log.closed_segments.lock().unwrap().is_empty()); + } + + #[test] + fn test_mark_batches_as_persisted_retains_partially_persisted_segment() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batch = table::uncompressed_time_series_table_record_batch(5); + + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD { + segmented_log.append_and_sync(&batch).unwrap(); + } + + let segment_path = segmented_log.closed_segments.lock().unwrap()[0] + .path + .clone(); + + // Only persist a subset of the batch ids in the closed segment. + let partial_ids: HashSet = (0..SEGMENT_BATCH_COUNT_THRESHOLD - 1).collect(); + segmented_log + .mark_batches_as_persisted(partial_ids) + .unwrap(); + + // Segment should still exist since not all ids are persisted. + assert!(segment_path.exists()); + assert_eq!(segmented_log.closed_segments.lock().unwrap().len(), 1); + + // When persisting the last batch, the segment should be deleted. + segmented_log + .mark_batches_as_persisted(HashSet::from([SEGMENT_BATCH_COUNT_THRESHOLD - 1])) + .unwrap(); + + assert!(!segment_path.exists()); + assert!(segmented_log.closed_segments.lock().unwrap().is_empty()); + } + + #[test] + fn test_multiple_fully_persisted_segments_all_deleted() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batch = table::uncompressed_time_series_table_record_batch(5); + + // Close five full segments. + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD * 5 { + segmented_log.append_and_sync(&batch).unwrap(); + } + + assert_eq!(segmented_log.closed_segments.lock().unwrap().len(), 5); + + let ids: HashSet = (0..SEGMENT_BATCH_COUNT_THRESHOLD * 5).collect(); + segmented_log.mark_batches_as_persisted(ids).unwrap(); + + assert!(segmented_log.closed_segments.lock().unwrap().is_empty()); + assert!(segmented_log.persisted_batch_ids.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn test_no_batch_ids_in_history_leaves_persisted_set_empty() { + let (temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + let (_wal_dir, segmented_log) = new_segmented_log(&temp_dir); + + let delta_table = data_folder + .delta_table(TIME_SERIES_TABLE_NAME) + .await + .unwrap(); + + segmented_log + .load_persisted_batches_from_delta_table(delta_table) + .await + .unwrap(); + + assert!(segmented_log.persisted_batch_ids.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn test_load_persisted_batches_loads_single_commit() { + let (temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + let (_wal_dir, segmented_log) = new_segmented_log(&temp_dir); + + let delta_table = + write_compressed_segments_with_batch_ids(&data_folder, HashSet::from([0, 1, 2])).await; + + segmented_log + .load_persisted_batches_from_delta_table(delta_table) + .await + .unwrap(); + + let persisted = segmented_log.persisted_batch_ids.lock().unwrap(); + assert_eq!(*persisted, BTreeSet::from([0, 1, 2])); + } + + #[tokio::test] + async fn test_load_persisted_batches_loads_multiple_commits() { + let (temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + let (_wal_dir, segmented_log) = new_segmented_log(&temp_dir); + + write_compressed_segments_with_batch_ids(&data_folder, HashSet::from([0, 1, 2])).await; + let delta_table = + write_compressed_segments_with_batch_ids(&data_folder, HashSet::from([3, 4, 5])).await; + + segmented_log + .load_persisted_batches_from_delta_table(delta_table) + .await + .unwrap(); + + let persisted = segmented_log.persisted_batch_ids.lock().unwrap(); + assert_eq!(*persisted, BTreeSet::from([0, 1, 2, 3, 4, 5])); + } + + #[tokio::test] + async fn test_load_persisted_batches_deletes_fully_persisted_closed_segment() { + let (temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + let (_wal_dir, segmented_log) = new_segmented_log(&temp_dir); + + let batch = table::uncompressed_time_series_table_record_batch(5); + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD { + segmented_log.append_and_sync(&batch).unwrap(); + } + + let segment_path = segmented_log.closed_segments.lock().unwrap()[0] + .path + .clone(); + assert!(segment_path.exists()); + + let all_ids: HashSet = (0..SEGMENT_BATCH_COUNT_THRESHOLD).collect(); + let delta_table = write_compressed_segments_with_batch_ids(&data_folder, all_ids).await; + + segmented_log + .load_persisted_batches_from_delta_table(delta_table) + .await + .unwrap(); + + assert!(!segment_path.exists()); + assert!(segmented_log.closed_segments.lock().unwrap().is_empty()); + assert!(segmented_log.persisted_batch_ids.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn test_load_persisted_batches_retains_partially_persisted_closed_segment() { + let (temp_dir, data_folder) = create_data_folder_with_time_series_table().await; + let (_wal_dir, segmented_log) = new_segmented_log(&temp_dir); + + let batch = table::uncompressed_time_series_table_record_batch(5); + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD { + segmented_log.append_and_sync(&batch).unwrap(); + } + + let segment_path = segmented_log.closed_segments.lock().unwrap()[0] + .path + .clone(); + + let partial_ids: HashSet = (0..SEGMENT_BATCH_COUNT_THRESHOLD - 1).collect(); + let delta_table = write_compressed_segments_with_batch_ids(&data_folder, partial_ids).await; + + segmented_log + .load_persisted_batches_from_delta_table(delta_table) + .await + .unwrap(); + + assert!(segment_path.exists()); + assert_eq!(segmented_log.closed_segments.lock().unwrap().len(), 1); + assert_eq!( + segmented_log.persisted_batch_ids.lock().unwrap().len() as u64, + SEGMENT_BATCH_COUNT_THRESHOLD - 1 + ); + } + + async fn create_data_folder_with_time_series_table() -> (TempDir, DataFolder) { + let temp_dir = tempfile::tempdir().unwrap(); + let data_folder = DataFolder::open_local(temp_dir.path()).await.unwrap(); + let metadata = table::time_series_table_metadata(); + + data_folder + .create_time_series_table(&metadata) + .await + .unwrap(); + + (temp_dir, data_folder) + } + + async fn write_compressed_segments_with_batch_ids( + data_folder: &DataFolder, + batch_ids: HashSet, + ) -> DeltaTable { + let compressed_segments = table::compressed_segments_record_batch(); + + data_folder + .write_record_batches_with_batch_ids( + TIME_SERIES_TABLE_NAME, + vec![compressed_segments], + batch_ids, + ) + .await + .unwrap() + } + + #[test] + fn test_unpersisted_batches_returns_empty_when_no_batches_written() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let unpersisted = segmented_log.unpersisted_batches().unwrap(); + assert!(unpersisted.is_empty()); + } + + #[test] + fn test_unpersisted_batches_returns_all_when_none_persisted() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batch_1 = table::uncompressed_time_series_table_record_batch(10); + let batch_2 = table::uncompressed_time_series_table_record_batch(20); + segmented_log.append_and_sync(&batch_1).unwrap(); + segmented_log.append_and_sync(&batch_2).unwrap(); + + let unpersisted = segmented_log.unpersisted_batches().unwrap(); + assert_eq!(unpersisted.len(), 2); + assert_eq!(unpersisted[0], (0, batch_1)); + assert_eq!(unpersisted[1], (1, batch_2)); + } + + #[test] + fn test_unpersisted_batches_returns_empty_when_all_persisted() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batch = table::uncompressed_time_series_table_record_batch(10); + segmented_log.append_and_sync(&batch).unwrap(); + segmented_log.append_and_sync(&batch).unwrap(); + + let unpersisted = segmented_log.unpersisted_batches().unwrap(); + assert_eq!(unpersisted.len(), 2); + + segmented_log + .mark_batches_as_persisted(HashSet::from([0, 1])) + .unwrap(); + + let unpersisted = segmented_log.unpersisted_batches().unwrap(); + assert!(unpersisted.is_empty()); + } + + #[test] + fn test_unpersisted_batches_filters_persisted_batch_ids() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batch_1 = table::uncompressed_time_series_table_record_batch(10); + let batch_2 = table::uncompressed_time_series_table_record_batch(20); + let batch_3 = table::uncompressed_time_series_table_record_batch(30); + segmented_log.append_and_sync(&batch_1).unwrap(); + segmented_log.append_and_sync(&batch_2).unwrap(); + segmented_log.append_and_sync(&batch_3).unwrap(); + + let unpersisted = segmented_log.unpersisted_batches().unwrap(); + assert_eq!(unpersisted.len(), 3); + assert_eq!(unpersisted[1], (1, batch_2)); + + segmented_log + .mark_batches_as_persisted(HashSet::from([1])) + .unwrap(); + + let unpersisted = segmented_log.unpersisted_batches().unwrap(); + assert_eq!(unpersisted.len(), 2); + assert_eq!(unpersisted[0], (0, batch_1)); + assert_eq!(unpersisted[1], (2, batch_3)); + } + + #[test] + fn test_unpersisted_batches_returns_batches_across_closed_and_active_segments() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_folder_path, segmented_log) = new_segmented_log(&temp_dir); + + let batch = table::uncompressed_time_series_table_record_batch(5); + + // Fill one full segment and write two more into the active segment. + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD + 2 { + segmented_log.append_and_sync(&batch).unwrap(); + } + + // Persist one batch id in the closed segment and one in the active segment. + segmented_log + .mark_batches_as_persisted(HashSet::from([0, SEGMENT_BATCH_COUNT_THRESHOLD + 1])) + .unwrap(); + + assert_eq!(segmented_log.closed_segments.lock().unwrap().len(), 1); + + let unpersisted = segmented_log.unpersisted_batches().unwrap(); + assert_eq!(unpersisted.len() as u64, SEGMENT_BATCH_COUNT_THRESHOLD); + assert_eq!(unpersisted.first().unwrap(), &(1, batch.clone())); + assert_eq!( + unpersisted.last().unwrap(), + &(SEGMENT_BATCH_COUNT_THRESHOLD, batch) + ); + } + + fn new_segmented_log(temp_dir: &TempDir) -> (PathBuf, SegmentedLog) { + let folder_path = temp_dir.path().join(TIME_SERIES_TABLE_NAME); + let metadata = table::time_series_table_metadata(); + + let segmented_log = SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap(); + + (folder_path, segmented_log) + } +}