diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 04771add011..b1d675fc1f6 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -80,7 +80,8 @@ pub use crate::node_config::{ use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, - S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, + S3EncryptionConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, + StorageConfigs, }; /// Returns true if the ingest API v2 is enabled. diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 3547a33e483..c821db04c90 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -308,6 +308,29 @@ impl fmt::Debug for AzureStorageConfig { } } +#[derive(Clone, Eq, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum S3EncryptionConfig { + /// This is the standard AES256 SSE-C header config. Key is expected to be a + /// 256bit base64-encoded string, and key_md5 is expected to be the + /// base64-encoded MD5 digest of the (binary) key. Akamai gen1 buckets don't + /// respect this (only the a 32 hex char key is expected). + SseC { key: String, key_md5: String }, +} + +impl fmt::Debug for S3EncryptionConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + S3EncryptionConfig::SseC { key_md5, .. } => f + .debug_struct("S3EncryptionConfig") + .field("type", &"sse_c") + .field("key", &"***redacted***") + .field("key_md5", key_md5) + .finish(), + } + } +} + #[derive(Clone, Default, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct S3StorageConfig { @@ -329,6 +352,8 @@ pub struct S3StorageConfig { pub disable_multi_object_delete: bool, #[serde(default)] pub disable_multipart_upload: bool, + #[serde(default)] + pub encryption: Option, } impl S3StorageConfig { @@ -685,4 +710,29 @@ mod tests { assert_eq!(s3_storage_config.flavor, Some(StorageBackendFlavor::MinIO)); } } + + #[test] + fn test_storage_s3_config_encryption_serde() { + { + let s3_storage_config_yaml = r#" + endpoint: http://localhost:4566 + encryption: + type: sse_c + key: test-customer-key + key_md5: test-customer-key-md5 + "#; + let s3_storage_config: S3StorageConfig = + serde_yaml::from_str(s3_storage_config_yaml).unwrap(); + + let expected_s3_config = S3StorageConfig { + endpoint: Some("http://localhost:4566".to_string()), + encryption: Some(S3EncryptionConfig::SseC { + key: "test-customer-key".to_string(), + key_md5: "test-customer-key-md5".to_string(), + }), + ..Default::default() + }; + assert_eq!(s3_storage_config, expected_s3_config); + } + } } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 1401a48d998..a3f2189a79e 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -38,7 +38,7 @@ use quickwit_aws::{aws_behavior_version, get_aws_config}; use quickwit_common::retry::{Retry, RetryParams}; use quickwit_common::uri::Uri; use quickwit_common::{chunk_range, into_u64_range}; -use quickwit_config::S3StorageConfig; +use quickwit_config::{S3EncryptionConfig, S3StorageConfig}; use regex::Regex; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf}; use tokio::sync::Semaphore; @@ -89,6 +89,7 @@ pub struct S3CompatibleObjectStorage { retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, + encryption: Option, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -97,6 +98,7 @@ impl fmt::Debug for S3CompatibleObjectStorage { .debug_struct("S3CompatibleObjectStorage") .field("bucket", &self.bucket) .field("prefix", &self.prefix) + .field("encryption", &self.encryption) .finish() } } @@ -184,6 +186,7 @@ impl S3CompatibleObjectStorage { retry_params, disable_multi_object_delete, disable_multipart_upload, + encryption: s3_storage_config.encryption.clone(), }) } @@ -201,6 +204,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, + encryption: self.encryption, } } @@ -289,12 +293,23 @@ impl S3CompatibleObjectStorage { .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; - self.s3_client + let mut req_builder = self + .s3_client .put_object() .bucket(bucket) .key(key) .body(body) - .content_length(len as i64) + .content_length(len as i64); + match &self.encryption { + Some(S3EncryptionConfig::SseC { key, key_md5 }) => { + req_builder = req_builder + .set_sse_customer_algorithm(Some("AES256".to_string())) + .set_sse_customer_key(Some(key.clone())) + .set_sse_customer_key_md5(Some(key_md5.clone())); + } + None => {} + } + req_builder .send() .with_count_and_upload_metrics(ActionLabel::PutObject, len) .await @@ -326,10 +341,21 @@ impl S3CompatibleObjectStorage { async fn create_multipart_upload(&self, key: &str) -> StorageResult { let upload_id = aws_retry(&self.retry_params, || async { - self.s3_client + let mut req_builder = self + .s3_client .create_multipart_upload() .bucket(self.bucket.clone()) - .key(key) + .key(key); + match &self.encryption { + Some(S3EncryptionConfig::SseC { key, key_md5 }) => { + req_builder = req_builder + .set_sse_customer_algorithm(Some("AES256".to_string())) + .set_sse_customer_key(Some(key.clone())) + .set_sse_customer_key_md5(Some(key_md5.clone())); + } + None => {} + } + req_builder .send() .with_count_metric(ActionLabel::CreateMultipartUpload) .await @@ -421,7 +447,7 @@ impl S3CompatibleObjectStorage { .map_err(Retry::Permanent)?; let md5 = BASE64_STANDARD.encode(part.md5.0); - let upload_part_output = self + let mut req_builder = self .s3_client .upload_part() .bucket(self.bucket.clone()) @@ -430,7 +456,17 @@ impl S3CompatibleObjectStorage { .content_length(part.len() as i64) .content_md5(md5) .part_number(part.part_number as i32) - .upload_id(upload_id.0) + .upload_id(upload_id.0); + match &self.encryption { + Some(S3EncryptionConfig::SseC { key, key_md5 }) => { + req_builder = req_builder + .set_sse_customer_algorithm(Some("AES256".to_string())) + .set_sse_customer_key(Some(key.clone())) + .set_sse_customer_key_md5(Some(key_md5.clone())); + } + _ => {} + } + let upload_part_output = req_builder .send() .with_count_and_upload_metrics(ActionLabel::UploadPart, part.len()) .await @@ -542,12 +578,22 @@ impl S3CompatibleObjectStorage { let key = self.key(path); let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1)); - let get_object_output = self + let mut req_builder = self .s3_client .get_object() .bucket(self.bucket.clone()) .key(key) - .set_range(range_str) + .set_range(range_str); + match &self.encryption { + Some(S3EncryptionConfig::SseC { key, key_md5 }) => { + req_builder = req_builder + .set_sse_customer_algorithm(Some("AES256".to_string())) + .set_sse_customer_key(Some(key.clone())) + .set_sse_customer_key_md5(Some(key_md5.clone())); + } + _ => {} + } + let get_object_output = req_builder .send() .with_count_and_duration_metrics(ActionLabel::GetObject) .await?; @@ -843,10 +889,17 @@ impl Storage for S3CompatibleObjectStorage { let bucket = self.bucket.clone(); let key = self.key(path); let head_object_output = aws_retry(&self.retry_params, || async { - self.s3_client - .head_object() - .bucket(&bucket) - .key(&key) + let mut req_builder = self.s3_client.head_object().bucket(&bucket).key(&key); + match &self.encryption { + Some(S3EncryptionConfig::SseC { key, key_md5 }) => { + req_builder = req_builder + .set_sse_customer_algorithm(Some("AES256".to_string())) + .set_sse_customer_key(Some(key.clone())) + .set_sse_customer_key_md5(Some(key_md5.clone())); + } + _ => {} + } + req_builder .send() .with_count_metric(ActionLabel::HeadObject) .await @@ -948,6 +1001,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + encryption: None, }; assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -995,6 +1049,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: true, disable_multipart_upload: false, + encryption: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1032,6 +1087,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + encryption: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1114,6 +1170,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + encryption: None, }; let bulk_delete_error = s3_storage .bulk_delete(&[ @@ -1205,10 +1262,610 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + encryption: None, }; s3_storage .put(Path::new("my-path"), Box::new(vec![1, 2, 3])) .await .unwrap(); } + + #[tokio::test] + async fn test_sse_c_headers_in_regular_put() { + let client = StaticReplayClient::new(vec![ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::empty()) + .unwrap(), + )]); + + let credentials = Credentials::new("mock_key", "mock_secret", None, None, "mock"); + let config = aws_sdk_s3::Config::builder() + .behavior_version(aws_behavior_version()) + .region(Some(Region::new("Foo"))) + .http_client(client.clone()) + .credentials_provider(credentials) + .build(); + + let s3_client = S3Client::from_conf(config); + let uri = Uri::for_test("s3://test-bucket/prefix"); + + let s3_storage = S3CompatibleObjectStorage { + s3_client, + uri, + bucket: "test-bucket".to_string(), + prefix: PathBuf::from("prefix"), + multipart_policy: MultiPartPolicy::default(), + retry_params: RetryParams::for_test(), + disable_multi_object_delete: false, + disable_multipart_upload: false, + encryption: Some(S3EncryptionConfig::SseC { + key: "dGVzdGtleWZvcmVuY3J5cHRpb24xMjM0NTY3OA==".to_string(), + key_md5: "SomeBase64MD5Value=".to_string(), + }), + }; + + let small_payload = vec![1u8; 100]; + let _ = s3_storage + .put(Path::new("small-file"), Box::new(small_payload)) + .await; + + let requests = client.actual_requests().collect::>(); + assert_eq!(requests.len(), 1, "Expected exactly 1 PutObject request"); + + let request = &requests[0]; + let headers = request.headers(); + + assert!(headers.contains_key("x-amz-server-side-encryption-customer-algorithm")); + assert!(headers.contains_key("x-amz-server-side-encryption-customer-key")); + assert!(headers.contains_key("x-amz-server-side-encryption-customer-key-md5")); + } + + #[tokio::test] + async fn test_sse_c_headers_in_multipart_upload() { + let client = StaticReplayClient::new(vec![ + ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::from( + r#" + + test-bucket + large-file + test-upload-id + "#, + )) + .unwrap(), + ), + ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .header("ETag", "\"etag1\"") + .body(SdkBody::empty()) + .unwrap(), + ), + ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .header("ETag", "\"etag2\"") + .body(SdkBody::empty()) + .unwrap(), + ), + ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::empty()) + .unwrap(), + ), + ]); + + let credentials = Credentials::new("mock_key", "mock_secret", None, None, "mock"); + let config = aws_sdk_s3::Config::builder() + .behavior_version(aws_behavior_version()) + .region(Some(Region::new("Foo"))) + .http_client(client.clone()) + .credentials_provider(credentials) + .build(); + + let s3_client = S3Client::from_conf(config); + let uri = Uri::for_test("s3://test-bucket/prefix"); + + // Use a custom multipart policy with a low threshold to trigger multipart + let multipart_policy = MultiPartPolicy { + target_part_num_bytes: 5 * 1024 * 1024, // 5MB parts + multipart_threshold_num_bytes: 10 * 1024 * 1024, // 10MB threshold + max_num_parts: 10_000, + max_object_num_bytes: 5_000_000_000_000u64, + max_concurrent_uploads: 100, + }; + + let s3_storage = S3CompatibleObjectStorage { + s3_client, + uri, + bucket: "test-bucket".to_string(), + prefix: PathBuf::from("prefix"), + multipart_policy, + retry_params: RetryParams::for_test(), + disable_multi_object_delete: false, + disable_multipart_upload: false, + encryption: Some(S3EncryptionConfig::SseC { + key: "dGVzdGtleWZvcmVuY3J5cHRpb24xMjM0NTY3OA==".to_string(), + key_md5: "SomeBase64MD5Value=".to_string(), + }), + }; + + // Test multipart upload with large payload that triggers multipart (15MB > 10MB threshold) + let large_payload = vec![2u8; 15 * 1024 * 1024]; // 15MB to trigger multipart + let _ = s3_storage + .put(Path::new("large-file"), Box::new(large_payload)) + .await; + + // Verify captured requests have SSE-C headers + let requests = client.actual_requests().collect::>(); + + // Should have: CreateMultipartUpload + N UploadParts + CompleteMultipartUpload + assert!( + requests.len() >= 3, + "Expected at least 3 requests got {}", + requests.len() + ); + + // Check CreateMultipartUpload and UploadPart requests for SSE-C headers + // CompleteMultipartUpload does not require SSE-C headers + for (i, request) in requests[..requests.len() - 1].iter().enumerate() { + let headers = request.headers(); + assert!( + headers.contains_key("x-amz-server-side-encryption-customer-algorithm"), + "Request {i}: Missing SSE-C algorithm header in CreateMultipartUpload" + ); + assert!( + headers.contains_key("x-amz-server-side-encryption-customer-key"), + "Request {i}: Missing SSE-C key header in CreateMultipartUpload" + ); + assert!( + headers.contains_key("x-amz-server-side-encryption-customer-key-md5"), + "Request {i}: Missing SSE-C key MD5 header in CreateMultipartUpload", + ); + } + } +} + +/// These tests serve as a playground to test how S3 providers react to +/// encryption headers. They require valid credentials to be configured as +/// environment variables. They are ignored by default and need to be run +/// ad-hoc. +#[cfg(test)] +mod provider_tests { + use std::path::Path; + + use aws_sdk_s3::types::ServerSideEncryption; + use quickwit_common::uri::Uri; + use quickwit_config::{S3EncryptionConfig, S3StorageConfig}; + + use crate::{MultiPartPolicy, S3CompatibleObjectStorage, Storage}; + + /// Checks that a file was encrypted with a managed encryption (SSE-S3) + /// + /// Does not work with SSE-C (meta headers are not the same) + async fn assert_auto_encrypted(storage: &S3CompatibleObjectStorage, path: &Path) { + let meta = storage + .s3_client + .head_object() + .bucket(&storage.bucket) + .key(&storage.key(path)) + .send() + .await + .unwrap(); + + assert_eq!( + meta.server_side_encryption(), + Some(&ServerSideEncryption::Aes256) + ); + } + + #[tokio::test] + #[ignore] + async fn test_akamai_s3_encryption() { + let access_key_id = + quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_ACCESS_KEY_ID", false); + let secret_access_key = + quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_SECRET_ACCESS_KEY", true); + let endpoint = quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_ENDPOINT_URL", false); + let bucket = "s3://remi-encryption-tests"; + + let config_enc = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + endpoint: endpoint.clone(), + region: Some("akamai".to_string()), + encryption: Some(S3EncryptionConfig::SseC { + key: "Ia6VCtDyKCRi3N88na+m7pgiMNeicLVq70Swq1fdDOU=".to_string(), + key_md5: "viLkS9avbUl3bNFYDdAJRQ==".to_string(), + }), + + ..Default::default() + }; + let storage_enc = S3CompatibleObjectStorage::from_uri(&config_enc, &Uri::for_test(bucket)) + .await + .unwrap(); + + let config_plain = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + endpoint: endpoint.clone(), + region: Some("akamai".to_string()), + encryption: None, + ..Default::default() + }; + let storage_plain = + S3CompatibleObjectStorage::from_uri(&config_plain, &Uri::for_test(bucket)) + .await + .unwrap(); + + storage_enc + .put( + Path::new("hello_enc"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + storage_plain + .put( + Path::new("hello_plain"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + // Nominal SSE-C path (write and read with the key) + let enc_obj_enc_read = storage_enc.get_all(Path::new("hello_enc")).await.unwrap(); + assert_eq!(enc_obj_enc_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // On Akamai, you can read plain objects with the encryption headers + let plain_obj_enc_read = storage_enc.get_all(Path::new("hello_plain")).await.unwrap(); + assert_eq!(plain_obj_enc_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_plain")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // Nominal plain path (write and read without the key) + // no SSE-S3 on Akamai, so no auto-encryption + let plain_obj_plain_read = storage_plain + .get_all(Path::new("hello_plain")) + .await + .unwrap(); + assert_eq!(plain_obj_plain_read, "Test content".as_bytes()); + + // Encrypted objects should not be readable without the key + storage_plain + .get_all(Path::new("hello_enc")) + .await + .unwrap_err(); + + storage_enc.delete(Path::new("hello_enc")).await.unwrap(); + storage_enc.delete(Path::new("hello_plain")).await.unwrap(); + } + + #[tokio::test] + #[ignore] + async fn test_akamai_s3_sse_c_multipart() { + let access_key_id = + quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_ACCESS_KEY_ID", false); + let secret_access_key = + quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_SECRET_ACCESS_KEY", true); + let endpoint = quickwit_common::get_from_env_opt("TEST_AKAMAI_S3_ENDPOINT_URL", false); + let bucket = "s3://remi-encryption-tests"; + + let multipart_policy = MultiPartPolicy { + target_part_num_bytes: 5 * 1024 * 1024, + multipart_threshold_num_bytes: 10 * 1024 * 1024, + max_num_parts: 10_000, + max_object_num_bytes: 5_000_000_000_000u64, + max_concurrent_uploads: 100, + }; + let config_enc = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + endpoint: endpoint.clone(), + region: Some("akamai".to_string()), + encryption: Some(S3EncryptionConfig::SseC { + key: "Ia6VCtDyKCRi3N88na+m7pgiMNeicLVq70Swq1fdDOU=".to_string(), + key_md5: "viLkS9avbUl3bNFYDdAJRQ==".to_string(), + }), + ..Default::default() + }; + let mut storage_enc = + S3CompatibleObjectStorage::from_uri(&config_enc, &Uri::for_test(bucket)) + .await + .unwrap(); + storage_enc.multipart_policy = multipart_policy; + + storage_enc + .put( + Path::new("hello_multipart_enc"), + Box::new(vec![2u8; 15 * 1024 * 1024]), + ) + .await + .unwrap(); + + // Nominal SSE-C path (write and read with the key) + let enc_obj_enc_read = storage_enc + .get_all(Path::new("hello_multipart_enc")) + .await + .unwrap(); + assert_eq!(enc_obj_enc_read, vec![2u8; 15 * 1024 * 1024].as_slice()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_multipart_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, 15 * 1024 * 1024); + } + + #[tokio::test] + #[ignore] + async fn test_aws_s3_encryption() { + let access_key_id = quickwit_common::get_from_env_opt("TEST_AWS_S3_ACCESS_KEY_ID", false); + let secret_access_key = + quickwit_common::get_from_env_opt("TEST_AWS_S3_SECRET_ACCESS_KEY", false); + let bucket = "s3://amzn-enc-test-sk"; + + let config_enc = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-1".to_string()), + encryption: Some(S3EncryptionConfig::SseC { + key: "Ia6VCtDyKCRi3N88na+m7pgiMNeicLVq70Swq1fdDOU=".to_string(), + key_md5: "viLkS9avbUl3bNFYDdAJRQ==".to_string(), + }), + ..Default::default() + }; + let storage_enc = S3CompatibleObjectStorage::from_uri(&config_enc, &Uri::for_test(bucket)) + .await + .unwrap(); + + let config_plain = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-1".to_string()), + encryption: None, + ..Default::default() + }; + let storage_plain = + S3CompatibleObjectStorage::from_uri(&config_plain, &Uri::for_test(bucket)) + .await + .unwrap(); + + storage_enc + .put( + Path::new("hello_enc"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + storage_plain + .put( + Path::new("hello_plain"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + // Nominal SSE-C path (write and read with the key) + let enc_obj_enc_read = storage_enc.get_all(Path::new("hello_enc")).await.unwrap(); + assert_eq!(enc_obj_enc_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // On AWS S3, you cannot read plain objects with the encryption headers + storage_enc + .get_all(Path::new("hello_plain")) + .await + .unwrap_err(); + + // SSE-S3 enabled on all AWS buckets by default + let plain_obj_plain_read = storage_plain + .get_all(Path::new("hello_plain")) + .await + .unwrap(); + assert_eq!(plain_obj_plain_read, "Test content".as_bytes()); + assert_auto_encrypted(&storage_plain, Path::new("hello_plain")).await; + + // Encrypted objects should not be readable without the key + storage_plain + .get_all(Path::new("hello_enc")) + .await + .unwrap_err(); + } + + /// Requires an OVH bucket **with encryption enabled**. + #[tokio::test] + #[ignore] + async fn test_ovh_omk_s3_encryption() { + let access_key_id = quickwit_common::get_from_env_opt("TEST_OVH_S3_ACCESS_KEY_ID", false); + let secret_access_key = + quickwit_common::get_from_env_opt("TEST_OVH_S3_SECRET_ACCESS_KEY", false); + let endpoint = quickwit_common::get_from_env_opt("TEST_OVH_S3_ENDPOINT_URL", false); + // OMK enabled on this bucket + let bucket = "s3://ambitious-walton"; + + let config_enc = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-par".to_string()), + endpoint: endpoint.clone(), + encryption: Some(S3EncryptionConfig::SseC { + key: "Ia6VCtDyKCRi3N88na+m7pgiMNeicLVq70Swq1fdDOU=".to_string(), + key_md5: "viLkS9avbUl3bNFYDdAJRQ==".to_string(), + }), + ..Default::default() + }; + let storage_enc = S3CompatibleObjectStorage::from_uri(&config_enc, &Uri::for_test(bucket)) + .await + .unwrap(); + + let config_plain = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-par".to_string()), + endpoint: endpoint.clone(), + encryption: None, + ..Default::default() + }; + let storage_plain = + S3CompatibleObjectStorage::from_uri(&config_plain, &Uri::for_test(bucket)) + .await + .unwrap(); + + storage_enc + .put( + Path::new("hello_enc"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + storage_plain + .put( + Path::new("hello_plain"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + // Nominal SSE-C path (write and read with the key) + let enc_obj_enc_read = storage_enc.get_all(Path::new("hello_enc")).await.unwrap(); + assert_eq!(enc_obj_enc_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // When the object was encrypted using OMK, you cannot read plain + // objects with the encryption headers + storage_enc + .get_all(Path::new("hello_plain")) + .await + .unwrap_err(); + + // Nominal plain-text path. The auto-encryption assertion will pass only + // on OVH buckets with SSE-OMK + let plain_obj_plain_read = storage_plain + .get_all(Path::new("hello_plain")) + .await + .unwrap(); + assert_eq!(plain_obj_plain_read, "Test content".as_bytes()); + assert_auto_encrypted(&storage_plain, Path::new("hello_plain")).await; + + // Encrypted objects should not be readable without the key + storage_plain + .get_all(Path::new("hello_enc")) + .await + .unwrap_err(); + } + + /// Requires an OVH bucket **with encryption disabled**. + #[tokio::test] + #[ignore] + async fn test_ovh_plain_s3_encryption() { + let access_key_id = quickwit_common::get_from_env_opt("TEST_OVH_S3_ACCESS_KEY_ID", false); + let secret_access_key = + quickwit_common::get_from_env_opt("TEST_OVH_S3_SECRET_ACCESS_KEY", false); + let endpoint = quickwit_common::get_from_env_opt("TEST_OVH_S3_ENDPOINT_URL", false); + // OMK disabled on this bucket + let bucket = "s3://dramatic-akasaki-plaintext"; + + let config_enc = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-par".to_string()), + endpoint: endpoint.clone(), + encryption: Some(S3EncryptionConfig::SseC { + key: "Ia6VCtDyKCRi3N88na+m7pgiMNeicLVq70Swq1fdDOU=".to_string(), + key_md5: "viLkS9avbUl3bNFYDdAJRQ==".to_string(), + }), + ..Default::default() + }; + let storage_enc = S3CompatibleObjectStorage::from_uri(&config_enc, &Uri::for_test(bucket)) + .await + .unwrap(); + + let config_plain = S3StorageConfig { + access_key_id: access_key_id.clone(), + secret_access_key: secret_access_key.clone(), + region: Some("eu-west-par".to_string()), + endpoint: endpoint.clone(), + encryption: None, + ..Default::default() + }; + let storage_plain = + S3CompatibleObjectStorage::from_uri(&config_plain, &Uri::for_test(bucket)) + .await + .unwrap(); + + storage_enc + .put( + Path::new("hello_enc"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + storage_plain + .put( + Path::new("hello_plain"), + Box::new("Test content".as_bytes().to_vec()), + ) + .await + .unwrap(); + + // Nominal SSE-C path (write and read with the key) + let enc_obj_enc_read = storage_enc.get_all(Path::new("hello_enc")).await.unwrap(); + assert_eq!(enc_obj_enc_read, "Test content".as_bytes()); + + // When encryption is disabled on the bucket, you can read plain-text objects with the + // encryption headers + let plain_obj_enc_read = storage_enc.get_all(Path::new("hello_plain")).await.unwrap(); + assert_eq!(plain_obj_enc_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // Nominal plain-text path (write and read without the key) + let plain_obj_plain_read = storage_plain + .get_all(Path::new("hello_plain")) + .await + .unwrap(); + assert_eq!(plain_obj_plain_read, "Test content".as_bytes()); + let num_bytes = storage_enc + .file_num_bytes(Path::new("hello_enc")) + .await + .unwrap(); + assert_eq!(num_bytes, "Test content".len() as u64); + + // Encrypted objects should not be readable without the key + storage_plain + .get_all(Path::new("hello_enc")) + .await + .unwrap_err(); + } }