diff --git a/singer-commons/src/main/thrift/config.thrift b/singer-commons/src/main/thrift/config.thrift index 46e574ad..60c41120 100644 --- a/singer-commons/src/main/thrift/config.thrift +++ b/singer-commons/src/main/thrift/config.thrift @@ -179,6 +179,9 @@ struct S3WriterConfig { 11: optional string region = "us-east-1"; // Use absolute path for filenamePattern regex matching. 12: optional bool matchAbsolutePath = false; + // The content type header to set for uploaded S3 objects. + // e.g. "application/json", "text/plain", etc. + 13: optional string contentType; } enum RealpinObjectType { diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java index 200a5814..89b51a7f 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java @@ -127,5 +127,6 @@ public class SingerConfigDef { public static final String UPLOADER_CLASS = "uploaderClass"; public static final String REGION = "region"; public static final String MATCH_ABSOLUTE_PATH = "matchAbsolutePath"; + public static final String CONTENT_TYPE = "contentType"; public static final String NAMED_GROUP_PATTERN = "\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>"; } \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java index 7d843fb5..3e1657f2 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java @@ -1033,6 +1033,11 @@ private static S3WriterConfig parseS3WriterConfig(AbstractConfiguration writerCo } config.setRegion(writerConfiguration.getString(SingerConfigDef.REGION)); } + + if (writerConfiguration.containsKey(SingerConfigDef.CONTENT_TYPE)) { + config.setContentType(writerConfiguration.getString(SingerConfigDef.CONTENT_TYPE)); + } + return config; } diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/PutObjectUploader.java b/singer/src/main/java/com/pinterest/singer/writer/s3/PutObjectUploader.java index 03d0cee3..7f408cd7 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/s3/PutObjectUploader.java +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/PutObjectUploader.java @@ -13,19 +13,20 @@ import java.io.File; public class PutObjectUploader extends S3Uploader { + private static final Logger LOG = LoggerFactory.getLogger(PutObjectUploader.class); private final int maxRetries; private static final long INITIAL_BACKOFF = 1000; // Initial backoff in milliseconds private static final long MAX_BACKOFF = 32000; // Maximum backoff in milliseconds public PutObjectUploader(S3WriterConfig s3WriterConfig, S3Client s3Client) { - super(s3WriterConfig, s3Client); - this.maxRetries = s3WriterConfig.getMaxRetries(); + super(s3WriterConfig, s3Client); + this.maxRetries = s3WriterConfig.getMaxRetries(); } /** - * Uploads a file to S3 using the PutObject API. - * Uses exponential backoff with a cap for retries. + * Uploads a file to S3 using the PutObject API. Uses exponential backoff with a cap for + * retries. * * @param s3ObjectUpload the object to upload * @return true if the file was successfully uploaded, false otherwise @@ -40,15 +41,20 @@ public boolean upload(S3ObjectUpload s3ObjectUpload) { while (attempts < maxRetries && !success) { attempts++; try { - PutObjectRequest putObjectRequest = PutObjectRequest.builder() - .bucket(bucket) - .key(s3Key) - .build(); + PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() + .bucket(bucket) + .key(s3Key); + // Add optional fields if present if (cannedAcl != null) { - putObjectRequest = putObjectRequest.toBuilder().acl(cannedAcl).build(); + putObjectRequestBuilder = putObjectRequestBuilder.acl(cannedAcl); + } + if (contentType != null && !contentType.isEmpty()) { + putObjectRequestBuilder = putObjectRequestBuilder.contentType(contentType); } + PutObjectRequest putObjectRequest = putObjectRequestBuilder.build(); + PutObjectResponse putObjectResponse = s3Client.putObject(putObjectRequest, s3ObjectUpload.getFile().toPath()); diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Uploader.java b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Uploader.java index 0fff5629..358b6aae 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Uploader.java +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Uploader.java @@ -12,12 +12,14 @@ public abstract class S3Uploader { protected S3WriterConfig s3WriterConfig; protected final ObjectCannedACL cannedAcl; protected final String bucket; + protected final String contentType; protected S3Client s3Client; public S3Uploader(S3WriterConfig s3WriterConfig, S3Client s3Client) { this.s3WriterConfig = s3WriterConfig; this.bucket = s3WriterConfig.getBucket(); this.cannedAcl = ObjectCannedACL.fromValue(s3WriterConfig.getCannedAcl()); + this.contentType = s3WriterConfig.getContentType(); this.s3Client = s3Client; } diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java index adbda0da..af9aa031 100644 --- a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java +++ b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java @@ -471,7 +471,7 @@ public void testS3WriterConfigurations() throws Exception { "type=s3\n" + "s3.bucket=my-fav-bucket\n" + "s3.keyFormat=%{service}/%{index}/my_log\n" + "s3.maxFileSizeMB=100\n" + "s3.minUploadTimeInSeconds=1\n" + "s3.maxRetries=10\n" + "s3.filenamePattern=^(?[a-zA-Z0-9]+)_.*_(?\\\\d+)\\\\.log$\n" - + "s3.cannedAcl=bucket-owner-full-control"; + + "s3.cannedAcl=bucket-owner-full-control\n" + "s3.contentType=application/json"; PropertiesConfiguration conf = new PropertiesConfiguration(); conf.load(new ByteArrayInputStream(config.getBytes())); List tokens = new ArrayList<>();