Skip to content

Commit 9a1a8c0

Browse files
authored
Update with deadlock fix (#29)
1 parent 12d37c8 commit 9a1a8c0

File tree

8 files changed

+89
-9
lines changed

8 files changed

+89
-9
lines changed

iceberg_rust_ffi/Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iceberg_rust_ffi/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "iceberg_rust_ffi"
3-
version = "0.5.0"
3+
version = "0.6.0"
44
edition = "2021"
55

66
[lib]
@@ -12,7 +12,7 @@ default = ["julia"]
1212
julia = []
1313

1414
[dependencies]
15-
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "f2e1fa29c983244d607d5b61e789e810b291f810" }
15+
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "0589677785239560695742a18b6aa9afc1afa86b" }
1616
object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false }
1717
tokio = { version = "1.0", features = ["full"] }
1818
futures = "0.3"

iceberg_rust_ffi/src/full.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ impl_scan_builder_method!(
4343
n: usize
4444
);
4545

46+
impl_scan_builder_method!(
47+
iceberg_scan_with_manifest_file_concurrency_limit,
48+
IcebergScan,
49+
with_manifest_file_concurrency_limit,
50+
n: usize
51+
);
52+
4653
impl_scan_builder_method!(
4754
iceberg_scan_with_manifest_entry_concurrency_limit,
4855
IcebergScan,

iceberg_rust_ffi/src/incremental.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,17 +105,24 @@ pub extern "C" fn iceberg_new_incremental_scan(
105105
// Use macros from scan_common for shared functionality
106106
impl_select_columns!(iceberg_incremental_select_columns, IcebergIncrementalScan);
107107

108+
impl_scan_builder_method!(
109+
iceberg_incremental_scan_with_manifest_file_concurrency_limit,
110+
IcebergIncrementalScan,
111+
with_manifest_file_concurrency_limit,
112+
n: usize
113+
);
114+
108115
impl_scan_builder_method!(
109116
iceberg_incremental_scan_with_data_file_concurrency_limit,
110117
IcebergIncrementalScan,
111-
with_concurrency_limit_data_files,
118+
with_data_file_concurrency_limit,
112119
n: usize
113120
);
114121

115122
impl_scan_builder_method!(
116123
iceberg_incremental_scan_with_manifest_entry_concurrency_limit,
117124
IcebergIncrementalScan,
118-
with_concurrency_limit_manifest_entries,
125+
with_manifest_entry_concurrency_limit,
119126
n: usize
120127
);
121128

src/RustyIceberg.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ export FILE_COLUMN, POS_COLUMN
2121
const rust_lib = iceberg_rust_ffi_jll.libiceberg_rust_ffi
2222

2323
"""
24+
struct StaticConfig
25+
2426
Runtime configuration for the Iceberg library.
27+
Value of 0 means use all CPU cores, regardless of the number of threads in the Julia runtime.
2528
"""
2629
struct StaticConfig
2730
n_threads::Culonglong
@@ -89,7 +92,7 @@ This starts a `tokio` runtime for handling Iceberg requests.
8992
It must be called before sending a request.
9093
"""
9194
function init_runtime(
92-
config::StaticConfig=StaticConfig(0);
95+
config::StaticConfig=StaticConfig(Threads.nthreads());
9396
on_rust_panic::Function=default_panic_hook
9497
)
9598
global _PANIC_HOOK

src/full.jl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ function with_data_file_concurrency_limit!(scan::Scan, n::UInt)
7373
return nothing
7474
end
7575

76+
"""
77+
with_manifest_file_concurrency_limit!(scan::Scan, n::UInt)
78+
79+
Sets the manifest file concurrency level for the full scan.
80+
"""
81+
function with_manifest_file_concurrency_limit!(scan::Scan, n::UInt)
82+
result = @ccall rust_lib.iceberg_scan_with_manifest_file_concurrency_limit(
83+
convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}},
84+
n::Csize_t
85+
)::Cint
86+
87+
if result != 0
88+
error("Failed to set data file concurrency limit for incremental scan")
89+
end
90+
return nothing
91+
end
92+
7693
"""
7794
with_manifest_entry_concurrency_limit!(scan::Scan, n::UInt)
7895

src/incremental.jl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,23 @@ function select_columns!(scan::IncrementalScan, column_names::Vector{String})
109109
return nothing
110110
end
111111

112+
"""
113+
with_manifest_file_concurrency_limit!(scan::IncrementalScan, n::UInt)
114+
115+
Sets the manifest file concurrency level for the incremental scan.
116+
"""
117+
function with_manifest_file_concurrency_limit!(scan::IncrementalScan, n::UInt)
118+
result = @ccall rust_lib.iceberg_incremental_scan_with_manifest_file_concurrency_limit(
119+
convert(Ptr{Ptr{Cvoid}}, pointer_from_objref(scan))::Ptr{Ptr{Cvoid}},
120+
n::Csize_t
121+
)::Cint
122+
123+
if result != 0
124+
error("Failed to set data file concurrency limit for incremental scan")
125+
end
126+
return nothing
127+
end
128+
112129
"""
113130
with_data_file_concurrency_limit!(scan::IncrementalScan, n::UInt)
114131

test/runtests.jl

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ using Arrow
88
# Test runtime initialization - this should work
99
@test_nowarn init_runtime()
1010

11-
# Test that we can initialize multiple times safely
12-
@test_nowarn init_runtime()
11+
# Test that we can initialize multiple times safely. But a new static config
12+
# wouldn't take effect, would silently ignore the config.
13+
@test_nowarn init_runtime(StaticConfig(1))
1314

1415
println("✅ Runtime initialization successful")
1516
end
@@ -397,6 +398,11 @@ end
397398
@test scan3.ptr != C_NULL
398399
println("✅ Incremental scan created with nothing for both snapshot IDs")
399400

401+
RustyIceberg.with_manifest_file_concurrency_limit!(scan3, UInt(2))
402+
RustyIceberg.with_manifest_entry_concurrency_limit!(scan3, UInt(256))
403+
RustyIceberg.with_data_file_concurrency_limit!(scan3, UInt(1024))
404+
RustyIceberg.with_batch_size!(scan3, UInt(50))
405+
400406
inserts_stream3, deletes_stream3 = RustyIceberg.scan!(scan3)
401407
@test inserts_stream3 != C_NULL
402408
@test deletes_stream3 != C_NULL
@@ -671,6 +677,28 @@ end
671677
end
672678
end
673679

680+
@testset "with_manifest_file_concurrency_limit! - Full Scan" begin
681+
table = RustyIceberg.table_open(customer_path)
682+
scan = RustyIceberg.new_scan(table)
683+
684+
# Set concurrency limit (should not error)
685+
@test_nowarn RustyIceberg.with_manifest_file_concurrency_limit!(scan, UInt(4))
686+
stream = RustyIceberg.scan!(scan)
687+
688+
try
689+
batch_ptr = RustyIceberg.next_batch(stream)
690+
while batch_ptr != C_NULL
691+
RustyIceberg.free_batch(batch_ptr)
692+
batch_ptr = RustyIceberg.next_batch(stream)
693+
end
694+
println("✅ with_manifest_file_concurrency_limit! test passed for full scan")
695+
finally
696+
RustyIceberg.free_stream(stream)
697+
RustyIceberg.free_scan!(scan)
698+
RustyIceberg.free_table(table)
699+
end
700+
end
701+
674702
@testset "with_manifest_entry_concurrency_limit! - Incremental Scan" begin
675703
table = RustyIceberg.table_open(incremental_path)
676704
scan = new_incremental_scan(table, from_snapshot_id, to_snapshot_id)
@@ -736,6 +764,7 @@ end
736764
RustyIceberg.select_columns!(scan, ["n"])
737765
RustyIceberg.with_batch_size!(scan, UInt(5))
738766
RustyIceberg.with_data_file_concurrency_limit!(scan, UInt(2))
767+
RustyIceberg.with_manifest_file_concurrency_limit!(scan, UInt(2))
739768
RustyIceberg.with_manifest_entry_concurrency_limit!(scan, UInt(2))
740769

741770
inserts_stream, deletes_stream = RustyIceberg.scan!(scan)

0 commit comments

Comments
 (0)