Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions singer-commons/src/main/thrift/config.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ struct S3WriterConfig {
11: optional string region = "us-east-1";
// Use absolute path for filenamePattern regex matching.
12: optional bool matchAbsolutePath = false;
// The content type header to set for uploaded S3 objects.
// e.g. "application/json", "text/plain", etc.
13: optional string contentType;
}

enum RealpinObjectType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,6 @@ public class SingerConfigDef {
public static final String UPLOADER_CLASS = "uploaderClass";
public static final String REGION = "region";
public static final String MATCH_ABSOLUTE_PATH = "matchAbsolutePath";
public static final String CONTENT_TYPE = "contentType";
public static final String NAMED_GROUP_PATTERN = "\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>";
}
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,11 @@ private static S3WriterConfig parseS3WriterConfig(AbstractConfiguration writerCo
}
config.setRegion(writerConfiguration.getString(SingerConfigDef.REGION));
}

if (writerConfiguration.containsKey(SingerConfigDef.CONTENT_TYPE)) {
config.setContentType(writerConfiguration.getString(SingerConfigDef.CONTENT_TYPE));
}

return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@
import java.io.File;

public class PutObjectUploader extends S3Uploader {

private static final Logger LOG = LoggerFactory.getLogger(PutObjectUploader.class);
private final int maxRetries;
private static final long INITIAL_BACKOFF = 1000; // Initial backoff in milliseconds
private static final long MAX_BACKOFF = 32000; // Maximum backoff in milliseconds

public PutObjectUploader(S3WriterConfig s3WriterConfig, S3Client s3Client) {
super(s3WriterConfig, s3Client);
this.maxRetries = s3WriterConfig.getMaxRetries();
super(s3WriterConfig, s3Client);
this.maxRetries = s3WriterConfig.getMaxRetries();
}

/**
* Uploads a file to S3 using the PutObject API.
* Uses exponential backoff with a cap for retries.
* Uploads a file to S3 using the PutObject API. Uses exponential backoff with a cap for
* retries.
*
* @param s3ObjectUpload the object to upload
* @return true if the file was successfully uploaded, false otherwise
Expand All @@ -40,15 +41,20 @@ public boolean upload(S3ObjectUpload s3ObjectUpload) {
while (attempts < maxRetries && !success) {
attempts++;
try {
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(bucket)
.key(s3Key)
.build();
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(bucket)
.key(s3Key);

// Add optional fields if present
if (cannedAcl != null) {
putObjectRequest = putObjectRequest.toBuilder().acl(cannedAcl).build();
putObjectRequestBuilder = putObjectRequestBuilder.acl(cannedAcl);
}
if (contentType != null && !contentType.isEmpty()) {
putObjectRequestBuilder = putObjectRequestBuilder.contentType(contentType);
}

PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();

PutObjectResponse
putObjectResponse =
s3Client.putObject(putObjectRequest, s3ObjectUpload.getFile().toPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ public abstract class S3Uploader {
protected S3WriterConfig s3WriterConfig;
protected final ObjectCannedACL cannedAcl;
protected final String bucket;
protected final String contentType;
protected S3Client s3Client;

public S3Uploader(S3WriterConfig s3WriterConfig, S3Client s3Client) {
this.s3WriterConfig = s3WriterConfig;
this.bucket = s3WriterConfig.getBucket();
this.cannedAcl = ObjectCannedACL.fromValue(s3WriterConfig.getCannedAcl());
this.contentType = s3WriterConfig.getContentType();
this.s3Client = s3Client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ public void testS3WriterConfigurations() throws Exception {
"type=s3\n" + "s3.bucket=my-fav-bucket\n" + "s3.keyFormat=%{service}/%{index}/my_log\n"
+ "s3.maxFileSizeMB=100\n" + "s3.minUploadTimeInSeconds=1\n" + "s3.maxRetries=10\n"
+ "s3.filenamePattern=^(?<service>[a-zA-Z0-9]+)_.*_(?<index>\\\\d+)\\\\.log$\n"
+ "s3.cannedAcl=bucket-owner-full-control";
+ "s3.cannedAcl=bucket-owner-full-control\n" + "s3.contentType=application/json";
PropertiesConfiguration conf = new PropertiesConfiguration();
conf.load(new ByteArrayInputStream(config.getBytes()));
List<String> tokens = new ArrayList<>();
Expand Down