|
1 | 1 | use std::collections::HashMap; |
2 | 2 | use std::sync::Arc; |
3 | 3 |
|
4 | | -use arrow_array::types::Int32Type; |
5 | 4 | use arrow_array::{ |
6 | | - ArrayRef, BinaryArray, Float64Array, Int32Array, LargeBinaryArray, RecordBatch, StringArray, |
| 5 | + ArrayRef, Float64Array, Int32Array, LargeBinaryArray, StringArray, |
7 | 6 | }; |
8 | 7 | use futures::TryStreamExt; |
9 | | -use geo_types::{Coord, Geometry, LineString, Point, Polygon, geometry}; |
| 8 | +use geo_types::{Geometry, Point}; |
10 | 9 | use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; |
11 | | -use iceberg::transaction::{ApplyTransactionAction, Transaction}; |
12 | 10 | use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; |
13 | 11 | use iceberg::writer::file_writer::ParquetWriterBuilder; |
14 | 12 | use iceberg::writer::file_writer::location_generator::{ |
15 | 13 | DefaultFileNameGenerator, DefaultLocationGenerator, |
16 | 14 | }; |
17 | 15 | use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; |
18 | | -use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; |
| 16 | +use iceberg::writer::IcebergWriterBuilder; |
19 | 17 | use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent}; |
20 | 18 | use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalogBuilder}; |
21 | 19 | use parquet::file::properties::WriterProperties; |
@@ -152,19 +150,27 @@ async fn main() { |
152 | 150 | let table_ident = TableIdent::new(namespace_ident.clone(), TABLE_NAME.to_string()); |
153 | 151 |
|
154 | 152 | println!("Checking if table exists..."); |
155 | | - let table_exists = catalog.table_exists(&table_ident).await.map_err(|e| { |
156 | | - eprintln!("Failed to check if table exists: {:?}", e); |
157 | | - eprintln!("Error: {}", e); |
158 | | - e |
159 | | - }).unwrap(); |
| 153 | + let table_exists = catalog |
| 154 | + .table_exists(&table_ident) |
| 155 | + .await |
| 156 | + .map_err(|e| { |
| 157 | + eprintln!("Failed to check if table exists: {:?}", e); |
| 158 | + eprintln!("Error: {}", e); |
| 159 | + e |
| 160 | + }) |
| 161 | + .unwrap(); |
160 | 162 |
|
161 | 163 | if table_exists { |
162 | 164 | println!("Table {TABLE_NAME} already exists, dropping now."); |
163 | | - catalog.drop_table(&table_ident).await.map_err(|e| { |
164 | | - eprintln!("Failed to drop table: {:?}", e); |
165 | | - eprintln!("Error: {}", e); |
166 | | - e |
167 | | - }).unwrap(); |
| 165 | + catalog |
| 166 | + .drop_table(&table_ident) |
| 167 | + .await |
| 168 | + .map_err(|e| { |
| 169 | + eprintln!("Failed to drop table: {:?}", e); |
| 170 | + eprintln!("Error: {}", e); |
| 171 | + e |
| 172 | + }) |
| 173 | + .unwrap(); |
168 | 174 | } |
169 | 175 |
|
170 | 176 | let iceberg_schema = Schema::builder() |
@@ -284,7 +290,7 @@ async fn main() { |
284 | 290 | file_name_generator.clone(), |
285 | 291 | ); |
286 | 292 | let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder); |
287 | | - let mut data_file_writer = data_file_writer_builder.build(None).await.unwrap(); |
| 293 | + let data_file_writer = data_file_writer_builder.build(None).await.unwrap(); |
288 | 294 |
|
289 | 295 | let features = mock_sample_features(); |
290 | 296 | let ids: ArrayRef = Arc::new(Int32Array::from_iter_values(features.iter().map(|f| f.id))); |
@@ -323,25 +329,25 @@ async fn main() { |
323 | 329 | .iter() |
324 | 330 | .map(|f| f.properties.get("population").unwrap().as_str()), |
325 | 331 | )); |
326 | | - //TODO: make write with credentials |
| 332 | + //TODO: make write with credentials |
327 | 333 | /*let record_batch = RecordBatch::try_new(schema.clone(), vec![ |
328 | | - ids, |
329 | | - names, |
330 | | - geometries_wkb, |
331 | | - geometry_types, |
332 | | - srids, |
333 | | - bbox_min_xs, |
334 | | - bbox_min_ys, |
335 | | - bbox_max_xs, |
336 | | - bbox_max_ys, |
337 | | - countries, |
338 | | - populations, |
339 | | - ]) |
340 | | - .unwrap(); |
341 | | - |
342 | | - data_file_writer.write(record_batch.clone()).await.unwrap(); |
343 | | - let data_file = data_file_writer.close().await.unwrap(); |
344 | | -*/ |
| 334 | + ids, |
| 335 | + names, |
| 336 | + geometries_wkb, |
| 337 | + geometry_types, |
| 338 | + srids, |
| 339 | + bbox_min_xs, |
| 340 | + bbox_min_ys, |
| 341 | + bbox_max_xs, |
| 342 | + bbox_max_ys, |
| 343 | + countries, |
| 344 | + populations, |
| 345 | + ]) |
| 346 | + .unwrap(); |
| 347 | +
|
| 348 | + data_file_writer.write(record_batch.clone()).await.unwrap(); |
| 349 | + let data_file = data_file_writer.close().await.unwrap(); |
| 350 | + */ |
345 | 351 |
|
346 | 352 | let loaded_table = catalog.load_table(&table_ident).await.unwrap(); |
347 | 353 | println!("Table {TABLE_NAME} loaded!\n\nTable: {loaded_table:?}"); |
|
0 commit comments