From 9edce8a3050c765e1ead6ee1b0ae98284800a788 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 28 Jul 2025 19:28:48 -0700 Subject: [PATCH 1/7] initial commit --- .../geneva-uploader/src/bench.rs | 2 +- .../geneva-uploader/src/client.rs | 2 +- .../src/payload_encoder/otlp_encoder.rs | 67 +++++++++++++++---- 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs index 78e18c2b..13e1ed8d 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs @@ -151,7 +151,7 @@ mod benchmarks { // To run: $cargo test --release encode_log_batch_benchmark -- --nocapture --ignored fn encode_log_batch_benchmark() { let mut criterion = Criterion::default(); - let encoder = OtlpEncoder::new(); + let encoder = OtlpEncoder::new("benchmark".to_string()); let metadata = "namespace=benchmark/eventVersion=Ver1v0"; // Benchmark 1: Different numbers of attributes diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs index 72bf5dd4..a0c076c7 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs @@ -87,7 +87,7 @@ impl GenevaClient { }); Ok(Self { uploader: Arc::new(uploader), - encoder: OtlpEncoder::new(), + encoder: OtlpEncoder::new(cfg.namespace), metadata, max_concurrent_uploads, }) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index 99d2c3df..0f32d15b 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -28,12 +28,14 @@ const FIELD_BODY: &str = "body"; pub(crate) struct OtlpEncoder { // TODO - limit cache size or use LRU eviction, and/or add feature flag for caching schema_cache: SchemaCache, + namespace: String, } impl OtlpEncoder { - pub(crate) fn new() -> Self { + pub(crate) fn new(namespace: String) -> Self { OtlpEncoder { schema_cache: Arc::new(RwLock::new(HashMap::new())), + namespace, } } @@ -102,9 +104,12 @@ impl OtlpEncoder { }; // 1. Get schema with optimized single-pass field collection and schema ID calculation - let (field_info, schema_id) = + let (field_info, base_schema_id) = Self::determine_fields_and_schema_id(log_record, event_name_str); + // Include namespace in schema ID to differentiate schemas with same structure but different namespaces + let schema_id = Self::calculate_final_schema_id(base_schema_id, &self.namespace); + let schema_entry = self.get_or_create_schema(schema_id, field_info.as_slice()); // 2. Encode row let row_buffer = self.write_row_data(log_record, &field_info); @@ -265,10 +270,9 @@ impl OtlpEncoder { } } - // Only clone field_info when we actually need to create a new schema - // Investigate if we can avoid cloning by using Cow using Arc to fields_info + // Use namespace from the struct let schema = - BondEncodedSchema::from_fields("OtlpLogRecord", "telemetry", field_info.to_vec()); //TODO - use actual struct name and namespace + BondEncodedSchema::from_fields("MdsContainer", &self.namespace, field_info.to_vec()); let schema_bytes = schema.as_bytes(); let schema_md5 = md5::compute(schema_bytes).0; @@ -287,6 +291,19 @@ impl OtlpEncoder { } } + + /// Calculate final schema ID by combining base schema ID with namespace + /// This ensures different namespaces create different schema cache entries + fn calculate_final_schema_id(base_schema_id: u64, namespace: &str) -> u64 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + base_schema_id.hash(&mut hasher); + namespace.hash(&mut hasher); + hasher.finish() + } + /// Write row data directly from LogRecord fn write_row_data(&self, log: &LogRecord, sorted_fields: &[FieldDef]) -> Vec { let mut buffer = Vec::with_capacity(sorted_fields.len() * 50); //TODO - estimate better @@ -399,7 +416,7 @@ mod tests { #[test] fn test_encoding() { - let encoder = OtlpEncoder::new(); + let encoder = OtlpEncoder::new("testNamespace".to_string()); let mut log = LogRecord { observed_time_unix_nano: 1_700_000_000_000_000_000, @@ -432,7 +449,7 @@ mod tests { #[test] fn test_schema_caching() { - let encoder = OtlpEncoder::new(); + let encoder = OtlpEncoder::new("test".to_string()); let log1 = LogRecord { observed_time_unix_nano: 1_700_000_000_000_000_000, @@ -464,7 +481,7 @@ mod tests { #[test] fn test_single_event_single_schema() { - let encoder = OtlpEncoder::new(); + let encoder = OtlpEncoder::new("test".to_string()); let log = LogRecord { observed_time_unix_nano: 1_700_000_000_000_000_000, @@ -483,7 +500,7 @@ mod tests { #[test] fn test_same_event_name_multiple_schemas() { - let encoder = OtlpEncoder::new(); + let encoder = OtlpEncoder::new("test".to_string()); // Schema 1: Basic log let log1 = LogRecord { @@ -528,7 +545,7 @@ mod tests { #[test] fn test_different_event_names() { - let encoder = OtlpEncoder::new(); + let encoder = OtlpEncoder::new("test".to_string()); let log1 = LogRecord { event_name: "login".to_string(), @@ -557,7 +574,7 @@ mod tests { #[test] fn test_empty_event_name_defaults_to_log() { - let encoder = OtlpEncoder::new(); + let encoder = OtlpEncoder::new("test".to_string()); let log = LogRecord { event_name: "".to_string(), @@ -574,7 +591,7 @@ mod tests { #[test] fn test_mixed_scenario() { - let encoder = OtlpEncoder::new(); + let encoder = OtlpEncoder::new("test".to_string()); // event_name1 with schema1 let log1 = LogRecord { @@ -641,4 +658,30 @@ mod tests { // Should have 4 different schemas cached assert_eq!(encoder.schema_cache.read().unwrap().len(), 4); } + + + #[test] + fn test_schema_uses_different_namespaces() { + let log = LogRecord { + observed_time_unix_nano: 1_700_000_000_000_000_000, + event_name: "test_event".to_string(), + severity_number: 9, + ..Default::default() + }; + + // Test with different namespaces + let encoder1 = OtlpEncoder::new("customNamespace".to_string()); + let encoder2 = OtlpEncoder::new("anotherNamespace".to_string()); + + let metadata = "eventVersion=Ver1v0"; + let result1 = encoder1.encode_log_batch([log.clone()].iter(), metadata); + let result2 = encoder2.encode_log_batch([log].iter(), metadata); + + assert!(!result1.is_empty()); + assert!(!result2.is_empty()); + + // Each encoder should have its own schema cache + assert_eq!(encoder1.schema_cache.read().unwrap().len(), 1); + assert_eq!(encoder2.schema_cache.read().unwrap().len(), 1); + } } From a086677a7cbf007e967612655e06f860ead97ace Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 28 Jul 2025 19:34:32 -0700 Subject: [PATCH 2/7] Fix --- .../src/payload_encoder/otlp_encoder.rs | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index 0f32d15b..fc67344b 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -107,10 +107,7 @@ impl OtlpEncoder { let (field_info, base_schema_id) = Self::determine_fields_and_schema_id(log_record, event_name_str); - // Include namespace in schema ID to differentiate schemas with same structure but different namespaces - let schema_id = Self::calculate_final_schema_id(base_schema_id, &self.namespace); - - let schema_entry = self.get_or_create_schema(schema_id, field_info.as_slice()); + let schema_entry = self.get_or_create_schema(base_schema_id, field_info.as_slice()); // 2. Encode row let row_buffer = self.write_row_data(log_record, &field_info); let level = log_record.severity_number as u8; @@ -135,13 +132,13 @@ impl OtlpEncoder { } // 4. Add schema entry if not already present (multiple schemas per event_name batch) - if !entry.schemas.iter().any(|s| s.id == schema_id) { + if !entry.schemas.iter().any(|s| s.id == base_schema_id) { entry.schemas.push(schema_entry); } // 5. Create CentralEventEntry directly (optimization: no intermediate EncodedRow) let central_event = CentralEventEntry { - schema_id, + schema_id: base_schema_id, level, event_name: Arc::new(event_name_str.to_string()), row: row_buffer, @@ -292,17 +289,6 @@ impl OtlpEncoder { } - /// Calculate final schema ID by combining base schema ID with namespace - /// This ensures different namespaces create different schema cache entries - fn calculate_final_schema_id(base_schema_id: u64, namespace: &str) -> u64 { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - - let mut hasher = DefaultHasher::new(); - base_schema_id.hash(&mut hasher); - namespace.hash(&mut hasher); - hasher.finish() - } /// Write row data directly from LogRecord fn write_row_data(&self, log: &LogRecord, sorted_fields: &[FieldDef]) -> Vec { From 7af48069282b77ee0c724046951cc72190b5cd5c Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 28 Jul 2025 19:39:02 -0700 Subject: [PATCH 3/7] fix --- .../geneva-uploader/src/payload_encoder/otlp_encoder.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index fc67344b..29bec0a4 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -104,10 +104,10 @@ impl OtlpEncoder { }; // 1. Get schema with optimized single-pass field collection and schema ID calculation - let (field_info, base_schema_id) = + let (field_info, schema_id) = Self::determine_fields_and_schema_id(log_record, event_name_str); - let schema_entry = self.get_or_create_schema(base_schema_id, field_info.as_slice()); + let schema_entry = self.get_or_create_schema(schema_id, field_info.as_slice()); // 2. Encode row let row_buffer = self.write_row_data(log_record, &field_info); let level = log_record.severity_number as u8; @@ -132,13 +132,13 @@ impl OtlpEncoder { } // 4. Add schema entry if not already present (multiple schemas per event_name batch) - if !entry.schemas.iter().any(|s| s.id == base_schema_id) { + if !entry.schemas.iter().any(|s| s.id == schema_id) { entry.schemas.push(schema_entry); } // 5. Create CentralEventEntry directly (optimization: no intermediate EncodedRow) let central_event = CentralEventEntry { - schema_id: base_schema_id, + schema_id, level, event_name: Arc::new(event_name_str.to_string()), row: row_buffer, From 2e5c9436683f61ace79ebae98411bae548211d0e Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 28 Jul 2025 19:52:50 -0700 Subject: [PATCH 4/7] fix --- .../geneva-uploader/src/bench.rs | 25 +++++++++----- .../geneva-uploader/src/client.rs | 6 ++-- .../src/payload_encoder/otlp_encoder.rs | 34 +++++++++---------- 3 files changed, 36 insertions(+), 29 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs index 13e1ed8d..fae7ebd6 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/bench.rs @@ -151,7 +151,7 @@ mod benchmarks { // To run: $cargo test --release encode_log_batch_benchmark -- --nocapture --ignored fn encode_log_batch_benchmark() { let mut criterion = Criterion::default(); - let encoder = OtlpEncoder::new("benchmark".to_string()); + let encoder = OtlpEncoder::new(); let metadata = "namespace=benchmark/eventVersion=Ver1v0"; // Benchmark 1: Different numbers of attributes @@ -178,8 +178,11 @@ mod benchmarks { .collect(); b.iter(|| { - let res = - encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata)); + let res = encoder.encode_log_batch( + black_box(logs.iter()), + black_box("benchmark"), + black_box(metadata), + ); black_box(res); // double sure the return value is generated }); }, @@ -206,9 +209,11 @@ mod benchmarks { .collect(); b.iter(|| { - let res = black_box( - encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata)), - ); + let res = black_box(encoder.encode_log_batch( + black_box(logs.iter()), + black_box("benchmark"), + black_box(metadata), + )); black_box(res); // double sure the return value is generated }); }, @@ -231,9 +236,11 @@ mod benchmarks { .collect(); b.iter(|| { - let res = black_box( - encoder.encode_log_batch(black_box(logs.iter()), black_box(metadata)), - ); + let res = black_box(encoder.encode_log_batch( + black_box(logs.iter()), + black_box("benchmark"), + black_box(metadata), + )); black_box(res); }); }); diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs index a0c076c7..9b961968 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs @@ -32,6 +32,7 @@ pub struct GenevaClient { uploader: Arc, encoder: OtlpEncoder, metadata: String, + namespace: String, max_concurrent_uploads: usize, } @@ -87,8 +88,9 @@ impl GenevaClient { }); Ok(Self { uploader: Arc::new(uploader), - encoder: OtlpEncoder::new(cfg.namespace), + encoder: OtlpEncoder::new(), metadata, + namespace: cfg.namespace, max_concurrent_uploads, }) } @@ -101,7 +103,7 @@ impl GenevaClient { .flat_map(|scope_log| scope_log.log_records.iter()); // TODO: Investigate using tokio::spawn_blocking for event encoding to avoid blocking // the async executor thread for CPU-intensive work. - let blobs = self.encoder.encode_log_batch(log_iter, &self.metadata); + let blobs = self.encoder.encode_log_batch(log_iter, &self.namespace, &self.metadata); // create an iterator that yields futures for each upload let upload_futures = blobs.into_iter().map(|batch| { diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index 29bec0a4..8621fa35 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -28,14 +28,12 @@ const FIELD_BODY: &str = "body"; pub(crate) struct OtlpEncoder { // TODO - limit cache size or use LRU eviction, and/or add feature flag for caching schema_cache: SchemaCache, - namespace: String, } impl OtlpEncoder { - pub(crate) fn new(namespace: String) -> Self { + pub(crate) fn new() -> Self { OtlpEncoder { schema_cache: Arc::new(RwLock::new(HashMap::new())), - namespace, } } @@ -46,7 +44,7 @@ impl OtlpEncoder { } /// Encode a batch of logs into a vector of (event_name, bytes, schema_ids, start_time_nanos, end_time_nanos) - pub(crate) fn encode_log_batch<'a, I>(&self, logs: I, metadata: &str) -> Vec + pub(crate) fn encode_log_batch<'a, I>(&self, logs: I, namespace: &str, metadata: &str) -> Vec where I: IntoIterator, { @@ -107,7 +105,7 @@ impl OtlpEncoder { let (field_info, schema_id) = Self::determine_fields_and_schema_id(log_record, event_name_str); - let schema_entry = self.get_or_create_schema(schema_id, field_info.as_slice()); + let schema_entry = self.get_or_create_schema(schema_id, field_info.as_slice(), namespace); // 2. Encode row let row_buffer = self.write_row_data(log_record, &field_info); let level = log_record.severity_number as u8; @@ -254,7 +252,7 @@ impl OtlpEncoder { } /// Get or create schema - fields are accessible via returned schema entry - fn get_or_create_schema(&self, schema_id: u64, field_info: &[FieldDef]) -> CentralSchemaEntry { + fn get_or_create_schema(&self, schema_id: u64, field_info: &[FieldDef], namespace: &str) -> CentralSchemaEntry { { if let Some((schema_arc, schema_md5)) = self.schema_cache.read().unwrap().get(&schema_id) @@ -267,9 +265,9 @@ impl OtlpEncoder { } } - // Use namespace from the struct + // Use namespace from the parameter let schema = - BondEncodedSchema::from_fields("MdsContainer", &self.namespace, field_info.to_vec()); + BondEncodedSchema::from_fields("MdsContainer", namespace, field_info.to_vec()); let schema_bytes = schema.as_bytes(); let schema_md5 = md5::compute(schema_bytes).0; @@ -402,7 +400,7 @@ mod tests { #[test] fn test_encoding() { - let encoder = OtlpEncoder::new("testNamespace".to_string()); + let encoder = OtlpEncoder::new(); let mut log = LogRecord { observed_time_unix_nano: 1_700_000_000_000_000_000, @@ -428,14 +426,14 @@ mod tests { }); let metadata = "namespace=testNamespace/eventVersion=Ver1v0"; - let result = encoder.encode_log_batch([log].iter(), metadata); + let result = encoder.encode_log_batch([log].iter(), "testNamespace", metadata); assert!(!result.is_empty()); } #[test] fn test_schema_caching() { - let encoder = OtlpEncoder::new("test".to_string()); + let encoder = OtlpEncoder::new(); let log1 = LogRecord { observed_time_unix_nano: 1_700_000_000_000_000_000, @@ -452,22 +450,22 @@ mod tests { let metadata = "namespace=test"; // First encoding creates schema - let _result1 = encoder.encode_log_batch([log1].iter(), metadata); + let _result1 = encoder.encode_log_batch([log1].iter(), "test", metadata); assert_eq!(encoder.schema_cache.read().unwrap().len(), 1); // Second encoding with same schema structure reuses schema - let _result2 = encoder.encode_log_batch([log2.clone()].iter(), metadata); + let _result2 = encoder.encode_log_batch([log2.clone()].iter(), "test", metadata); assert_eq!(encoder.schema_cache.read().unwrap().len(), 1); // Add trace_id to create different schema log2.trace_id = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; - let _result3 = encoder.encode_log_batch([log2].iter(), metadata); + let _result3 = encoder.encode_log_batch([log2].iter(), "test", metadata); assert_eq!(encoder.schema_cache.read().unwrap().len(), 2); } #[test] fn test_single_event_single_schema() { - let encoder = OtlpEncoder::new("test".to_string()); + let encoder = OtlpEncoder::new(); let log = LogRecord { observed_time_unix_nano: 1_700_000_000_000_000_000, @@ -476,7 +474,7 @@ mod tests { ..Default::default() }; - let result = encoder.encode_log_batch([log].iter(), "test"); + let result = encoder.encode_log_batch([log].iter(), "test", "test"); assert_eq!(result.len(), 1); assert_eq!(result[0].event_name, "test_event"); @@ -486,7 +484,7 @@ mod tests { #[test] fn test_same_event_name_multiple_schemas() { - let encoder = OtlpEncoder::new("test".to_string()); + let encoder = OtlpEncoder::new(); // Schema 1: Basic log let log1 = LogRecord { @@ -516,7 +514,7 @@ mod tests { }), }); - let result = encoder.encode_log_batch([log1, log2, log3].iter(), "test"); + let result = encoder.encode_log_batch([log1, log2, log3].iter(), "test", "test"); // All should be in one batch with same event_name assert_eq!(result.len(), 1); From 4e36dcfc3989882625d3a925c59c20b1830fa21e Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 28 Jul 2025 21:36:25 -0700 Subject: [PATCH 5/7] fix --- .../src/payload_encoder/otlp_encoder.rs | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index 8621fa35..948bb161 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -529,7 +529,7 @@ mod tests { #[test] fn test_different_event_names() { - let encoder = OtlpEncoder::new("test".to_string()); + let encoder = OtlpEncoder::new(); let log1 = LogRecord { event_name: "login".to_string(), @@ -543,7 +543,7 @@ mod tests { ..Default::default() }; - let result = encoder.encode_log_batch([log1, log2].iter(), "test"); + let result = encoder.encode_log_batch([log1, log2].iter(), "test", "test"); // Should create 2 separate batches assert_eq!(result.len(), 2); @@ -558,7 +558,7 @@ mod tests { #[test] fn test_empty_event_name_defaults_to_log() { - let encoder = OtlpEncoder::new("test".to_string()); + let encoder = OtlpEncoder::new(); let log = LogRecord { event_name: "".to_string(), @@ -566,7 +566,7 @@ mod tests { ..Default::default() }; - let result = encoder.encode_log_batch([log].iter(), "test"); + let result = encoder.encode_log_batch([log].iter(), "test", "test"); assert_eq!(result.len(), 1); assert_eq!(result[0].event_name, "Log"); // Should default to "Log" @@ -575,7 +575,7 @@ mod tests { #[test] fn test_mixed_scenario() { - let encoder = OtlpEncoder::new("test".to_string()); + let encoder = OtlpEncoder::new(); // event_name1 with schema1 let log1 = LogRecord { @@ -612,7 +612,7 @@ mod tests { }), }); - let result = encoder.encode_log_batch([log1, log2, log3, log4].iter(), "test"); + let result = encoder.encode_log_batch([log1, log2, log3, log4].iter(), "test", "test"); // Should create 3 batches: "user_action", "system_alert", "Log" assert_eq!(result.len(), 3); @@ -653,13 +653,13 @@ mod tests { ..Default::default() }; - // Test with different namespaces - let encoder1 = OtlpEncoder::new("customNamespace".to_string()); - let encoder2 = OtlpEncoder::new("anotherNamespace".to_string()); + // Test with different namespaces using separate encoder instances + let encoder1 = OtlpEncoder::new(); + let encoder2 = OtlpEncoder::new(); let metadata = "eventVersion=Ver1v0"; - let result1 = encoder1.encode_log_batch([log.clone()].iter(), metadata); - let result2 = encoder2.encode_log_batch([log].iter(), metadata); + let result1 = encoder1.encode_log_batch([log.clone()].iter(), "customNamespace", metadata); + let result2 = encoder2.encode_log_batch([log].iter(), "anotherNamespace", metadata); assert!(!result1.is_empty()); assert!(!result2.is_empty()); From f3b170e83d181dac57b90dfbb58ff9f8a67c3b36 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 28 Jul 2025 21:54:32 -0700 Subject: [PATCH 6/7] fix --- .../src/payload_encoder/otlp_encoder.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index aa3e3782..b40457c0 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -90,7 +90,7 @@ impl OtlpEncoder { // 1. Get schema with optimized single-pass field collection and schema ID calculation let (field_info, schema_id) = Self::determine_fields_and_schema_id(log_record, event_name_str); - let schema_entry = self.create_schema(schema_id, field_info.as_slice(), namespace); + let schema_entry = Self::create_schema(schema_id, field_info.as_slice(), namespace); // 2. Encode row let row_buffer = self.write_row_data(log_record, &field_info); let level = log_record.severity_number as u8; @@ -679,8 +679,11 @@ mod tests { assert!(!result1.is_empty()); assert!(!result2.is_empty()); - // Each encoder should have its own schema cache - assert_eq!(encoder1.schema_cache.read().unwrap().len(), 1); - assert_eq!(encoder2.schema_cache.read().unwrap().len(), 1); + // Verify that both namespaces produce results + // Since schema_cache was removed, we just verify the functionality works + assert_eq!(result1.len(), 1); + assert_eq!(result2.len(), 1); + assert_eq!(result1[0].event_name, "test_event"); + assert_eq!(result2[0].event_name, "test_event"); } } From 5b1f8f6aab7b9cb1a4d2bd8047370013dbc71f53 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 28 Jul 2025 21:58:33 -0700 Subject: [PATCH 7/7] fix --- .../geneva-uploader/src/payload_encoder/otlp_encoder.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs index b40457c0..e51ca097 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -20,6 +20,8 @@ const FIELD_SEVERITY_NUMBER: &str = "SeverityNumber"; const FIELD_SEVERITY_TEXT: &str = "SeverityText"; const FIELD_BODY: &str = "body"; +const SCHEMA_TYPE_NAME: &str = "OtlpLogRecord"; + /// Encoder to write OTLP payload in bond form. #[derive(Clone)] pub(crate) struct OtlpEncoder; @@ -239,7 +241,7 @@ impl OtlpEncoder { /// Create schema - always creates a new CentralSchemaEntry fn create_schema(schema_id: u64, field_info: &[FieldDef], namespace: &str) -> CentralSchemaEntry { let schema = - BondEncodedSchema::from_fields("MdsContainer", namespace, field_info.to_vec()); + BondEncodedSchema::from_fields(SCHEMA_TYPE_NAME, namespace, field_info.to_vec()); let schema_bytes = schema.as_bytes(); let schema_md5 = md5::compute(schema_bytes).0;