diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java index 93ff95e6..0ddf2d67 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java @@ -14,6 +14,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.text.StringSubstitutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +25,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -33,13 +33,13 @@ import java.util.Date; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.text.SimpleDateFormat; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import software.amazon.awssdk.services.s3.S3Client; @@ -51,11 +51,20 @@ public class S3Writer implements LogStreamWriter { private static final String HOSTNAME = SingerUtils.HOSTNAME; private static final Logger LOG = LoggerFactory.getLogger(S3Writer.class); private static final SimpleDateFormat FORMATTER = new SimpleDateFormat("yyyyMMddHHmmssSSS"); + private static final int BYTES_IN_MB = 1024 * 1024; + private static final int TIME_UPLOAD_SCHEDULER_THREAD_POOL_SIZE = 2; + + // Time-based upload scheduler + private static final ScheduledExecutorService TIME_UPLOAD_SCHEDULER = + Executors.newScheduledThreadPool(TIME_UPLOAD_SCHEDULER_THREAD_POOL_SIZE, new ThreadFactoryBuilder() + .setNameFormat("S3Writer-TimeUpload-%d") + .setDaemon(true) + .build()); + private final Map envMappings = System.getenv(); private final LogStream logStream; private final String logName; private final String BUFFER_DIR; - private static final int BYTES_IN_MB = 1024 * 1024; private S3Uploader s3Uploader; private final S3WriterConfig s3WriterConfig; @@ -66,6 +75,7 @@ public class S3Writer implements LogStreamWriter { // Disk-buffered file that will eventually be uploaded to S3 if size or time thresholds are met private BufferedOutputStream bufferedOutputStream; private File bufferFile; + private volatile long bufferFileCreatedTimeMs = 0; // Custom Thresholds private int maxFileSizeMB; @@ -75,16 +85,10 @@ public class S3Writer implements LogStreamWriter { private boolean filenameParsingEnabled = false; private boolean matchAbsolutePath; - // Timer for scheduling uploads - private static ScheduledExecutorService fileUploadTimer; - private Future uploadFuture; - private final Object objLock = new Object(); // used for synchronization locking + // Time-based upload management + private Future timeBasedUploadTask = null; - static { - ScheduledThreadPoolExecutor tmpTimer = new ScheduledThreadPoolExecutor(1); - tmpTimer.setRemoveOnCancelPolicy(true); - fileUploadTimer = tmpTimer; - } + private final Object objLock = new Object(); // used for synchronization locking public enum DefaultTokens { UUID, @@ -93,6 +97,12 @@ public enum DefaultTokens { LOGNAME; } + public enum TriggerType { + TIME, + SIZE, + CLOSE; + } + /** * Constructs an S3Writer instance. * @@ -113,7 +123,7 @@ public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig) { // Static factory method for testing @VisibleForTesting public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig, S3Uploader s3Uploader, - String path) { + String path) { Preconditions.checkNotNull(logStream); Preconditions.checkNotNull(s3WriterConfig); Preconditions.checkNotNull(s3Uploader); @@ -137,10 +147,26 @@ private void initialize() { this.fileNameTokens = s3WriterConfig.getFilenameTokens(); } + scheduleTimeBasedUploadCheck(); + // Create directory if it does not exist new File(BUFFER_DIR).mkdirs(); try { - resetBufferFile(); + // Look for existing buffer file first (on startup/new log stream) + File existingBuffer = findExistingBufferFile(); + if (existingBuffer != null) { + bufferFile = existingBuffer; + bufferFileCreatedTimeMs = extractTimestampFromFilename(bufferFile.getName()); + if (bufferFileCreatedTimeMs == 0) { + LOG.warn("Failed to extract timestamp from existing buffer file: {}, using current time", bufferFile.getName()); + bufferFileCreatedTimeMs = System.currentTimeMillis(); + } + LOG.info("Found existing buffer file for log stream {}: {}, timestamp: {}", + logName, bufferFile.getName(), bufferFileCreatedTimeMs); + bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true)); + } else { + resetBufferFile(); + } } catch (IOException e) { throw new RuntimeException("Failed to create buffer file", e); } @@ -183,8 +209,8 @@ public boolean isCommittableWriter() { } /** - * Get or construct buffer file name based on the log stream name and absolute path hash. - * The buffer file naming convention is "log_name.absolute_path_hash.buffer". + * Get or construct buffer file name based on the log stream name, absolute path hash, and timestamp. + * The buffer file naming convention is "log_name.absolute_path_hash.timestamp". * * @return the buffer file name */ @@ -192,6 +218,10 @@ public String getBufferFileName() { if (bufferFile != null) { return bufferFile.getName(); } + return generateBufferFileName(System.currentTimeMillis()); + } + + private String generateBufferFileName(long timestamp) { try { MessageDigest digest = MessageDigest.getInstance("SHA-256"); byte[] hash = digest.digest(logStream.getFullPathPrefix().getBytes(StandardCharsets.UTF_8)); @@ -202,7 +232,7 @@ public String getBufferFileName() { if (hex.length() == 1) hashedPath.append('0'); hashedPath.append(hex); } - return logName + "." + hashedPath + ".buffer"; + return logName + "." + hashedPath + "." + timestamp; } catch (Exception e) { if (e instanceof NoSuchAlgorithmException) { LOG.error("Failed to generate hash for log stream {} and absolute path {}", logName, @@ -224,68 +254,111 @@ public String getBufferFileName() { public synchronized void startCommit(boolean isDraining) throws LogStreamWriterException { try { if (!bufferFile.exists()) { + // Buffer file missing - create a new one (recovery case) resetBufferFile(); } - bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true)); + // If buffer exists, bufferedOutputStream should already be set up correctly } catch (IOException e) { throw new RuntimeException("Failed to create buffer file: " + getBufferFileName(), e); } - if (uploadFuture == null) { - scheduleUploadTask(); - } } - /** - * Schedules a task to upload the buffer file at regular intervals. - * If the buffer file exists and has data, it is renamed and a new buffer file is created. - * The renamed file is then uploaded to S3. + * Schedules a periodic task to check for time-based uploads. + * Runs every 1/4 of the minUploadTime to ensure timely uploads even when no new messages arrive. */ - private void scheduleUploadTask() { - synchronized (objLock) { - if (uploadFuture != null && !uploadFuture.isDone()) { - LOG.info("An upload task is already scheduled or running"); - return; - } - uploadFuture = fileUploadTimer.schedule(() -> { - try { - synchronized (objLock) { - if (bufferFile.length() > 0) { - bufferedOutputStream.close(); - uploadDiskBufferedFileToS3(); + private void scheduleTimeBasedUploadCheck() { + if (minUploadTime <= 0) { + return; + } + + // Check every 1/4 of the upload time interval, minimum of 1 seconds + long checkIntervalSeconds = Math.max(1, minUploadTime / 4); + + timeBasedUploadTask = TIME_UPLOAD_SCHEDULER.scheduleWithFixedDelay(() -> { + try { + synchronized (objLock) { + // Check if buffer should be uploaded based on time + if (bufferFile != null && bufferFile.exists() && bufferFile.length() > 0) { + if (bufferFileCreatedTimeMs == 0) { + LOG.warn("Buffer file creation time not set for {} - disabling time-based upload", + bufferFile.getName()); + } else { + long currentTimeMs = System.currentTimeMillis(); + long ageInSeconds = (currentTimeMs - bufferFileCreatedTimeMs) / 1000; + if (ageInSeconds >= minUploadTime) { + LOG.info("Buffer file for log {} is {} seconds old, minUploadTime is {}", logName, ageInSeconds, minUploadTime); + LOG.info("Periodic time-based upload triggered for log {}", logName); + uploadDiskBufferedFileToS3(TriggerType.TIME); + } } - uploadFuture = null; } - } catch (Exception e) { - LOG.error("Failed to upload buffer file to S3", e); } - }, minUploadTime, TimeUnit.SECONDS); - } + } catch (Exception e) { + LOG.error("Error in time-based upload checker for log {}", logName, e); + } + }, checkIntervalSeconds, checkIntervalSeconds, TimeUnit.SECONDS); } /** - * Helper function that uploads the disk buffered file to s3 - * */ - private void uploadDiskBufferedFileToS3() throws IOException { - File - fileToUpload = - new File(BUFFER_DIR, getBufferFileName() + "." + FORMATTER.format(new Date())); - String fileFormat = generateS3ObjectKey(); + * Uploads the disk buffered file to S3 and manages bufferedOutputStream lifecycle. + * + * This method handles the upload cycle: + * 1. Closes the current bufferedOutputStream to ensure data is flushed to disk + * 2. Uploads the buffer file to S3 + * 3. Cleans up the old buffer file and creates a new one + * 4. Reopens the bufferedOutputStream for continued writing + * + * If upload fails, the buffer file remains intact for crash recovery. + * + * @param triggerType the type of trigger that initiated this upload (time, size, or close) + * @throws LogStreamWriterException if S3 upload fails or stream management fails + */ + private void uploadDiskBufferedFileToS3(TriggerType triggerType) throws LogStreamWriterException { + // Close current stream to ensure all data is written to disk try { - Files.move(bufferFile.toPath(), fileToUpload.toPath()); - resetBufferFile(); - if (this.s3Uploader.upload(new S3ObjectUpload(fileFormat, fileToUpload))) { - OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "num_uploads", 1, - "bucket=" + bucketName, "host=" + HOSTNAME, - "logName=" + logName); - } else { - OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "num_failed_uploads", 1, - "bucket=" + bucketName, "host=" + HOSTNAME, - "logName=" + logName); - } - fileToUpload.delete(); + bufferedOutputStream.close(); } catch (IOException e) { - LOG.error("Failed to rename buffer file " + getBufferFileName(), e); + LOG.error("Failed to close bufferedOutputStream before S3 upload", e); + throw new LogStreamWriterException("Cannot close buffer stream before upload", e); + } + + String s3Key = generateS3ObjectKey(); + boolean uploadSuccess = this.s3Uploader.upload(new S3ObjectUpload(s3Key, bufferFile)); + + if (!uploadSuccess) { + LOG.error("S3 upload failed for buffer file {}", getBufferFileName()); + OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "num_failed_uploads", 1, + "bucket=" + bucketName, "host=" + HOSTNAME, "logName=" + logName, "trigger_type=" + triggerType.name()); + throw new LogStreamWriterException("Buffer file S3 upload failed for log stream " + logName); + } + + OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "num_uploads", 1, + "bucket=" + bucketName, "host=" + HOSTNAME, "logName=" + logName, "trigger_type=" + triggerType.name()); + LOG.info("Successfully uploaded buffer file {}", getBufferFileName()); + + // Always clean up buffer file and reopen stream after successful upload + boolean deleted = bufferFile.delete(); + if (deleted) { + OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "buffer_file_delete", 1, + "bucket=" + bucketName, "host=" + HOSTNAME, "logName=" + logName, "success=true"); + LOG.debug("Deleted buffer file after successful upload: {}", getBufferFileName()); + } else { + OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "buffer_file_delete", 1, + "bucket=" + bucketName, "host=" + HOSTNAME, "logName=" + logName, "success=false"); + LOG.warn("Failed to delete buffer file after successful upload: {}", getBufferFileName()); + } + + // Only create new buffer and reopen stream if not during close + if (triggerType != TriggerType.CLOSE) { + try { + resetBufferFile(); + } catch (IOException e) { + LOG.error("Failed to reset buffer file after successful S3 upload of {}", getBufferFileName(), e); + OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "buffer_reset_failed", 1, + "bucket=" + bucketName, "host=" + HOSTNAME, "logName=" + logName); + throw new LogStreamWriterException("IO exception after successful S3 upload", e); + } } } @@ -298,52 +371,86 @@ private void uploadDiskBufferedFileToS3() throws IOException { */ @Override public synchronized void writeLogMessageToCommit(LogMessageAndPosition logMessageAndPosition, - boolean isDraining) + boolean isDraining) throws LogStreamWriterException { + + synchronized (objLock) { + long currentBufferSize = bufferFile.length(); + long messageSize = logMessageAndPosition.logMessage.getMessage().length; + long effectiveBufferSize = currentBufferSize + messageSize; + + // Check if buffer size threshold is exceeded + if (effectiveBufferSize >= maxFileSizeMB * BYTES_IN_MB) { + LOG.info("Buffer file size {} has exceeded the size threshold of {}, attempting S3 upload", + effectiveBufferSize, maxFileSizeMB * BYTES_IN_MB); + uploadDiskBufferedFileToS3(TriggerType.SIZE); + } + } + try { - writeMessageToBuffer(logMessageAndPosition); + byte[] logMessageBytes = logMessageAndPosition.logMessage.getMessage(); + bufferedOutputStream.write(logMessageBytes); + // Don't flush after each message - defer to endCommit OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "num_messages_written", "bucket=" + bucketName, "host=" + HOSTNAME, "logName=" + logName); } catch (IOException e) { - // TODO: Verify if this is retry is needed - try { - bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true)); - writeMessageToBuffer(logMessageAndPosition); - } catch (IOException ex) { - LOG.error("Failed to close buffer writer", ex); - throw new LogStreamWriterException("Failed to write log message to commit", e); - } + LOG.error("Failed to write message to buffer file", e); + throw new LogStreamWriterException("Failed to write log message to commit", e); } } /** - * This method writes the bytes of the log message to the buffered output stream - * and adds a newline character after each log message. It then flushes the output stream - * to ensure that the data is written to the buffer file. + * Resets the buffer file by creating a fresh one with new timestamp. + * Used after successful uploads to start clean. * - * @param logMessageAndPosition the log message and its position to be written to the buffer - * @throws IOException if an error occurs while writing to the buffer file + * @throws IOException */ - private void writeMessageToBuffer(LogMessageAndPosition logMessageAndPosition) - throws IOException { - byte[] logMessageBytes = logMessageAndPosition.logMessage.getMessage(); - bufferedOutputStream.write(logMessageBytes); - bufferedOutputStream.flush(); + private void resetBufferFile() throws IOException { + if (bufferFile != null && bufferFile.exists()) { + LOG.warn("Buffer file still exists, skipping reset"); + return; + } + long currentTime = System.currentTimeMillis(); + String newFileName = generateBufferFileName(currentTime); + bufferFile = new File(BUFFER_DIR, newFileName); + bufferFileCreatedTimeMs = currentTime; + if (bufferFile.createNewFile()) { + LOG.info("Created new buffer file for log stream {}: {}", logName, newFileName); + } + bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true)); } /** - * Resets the buffer file by creating a new buffer file and buffered output stream. - * - * @throws IOException + * Find existing buffer file for this log stream. */ - private void resetBufferFile() throws IOException { - bufferFile = new File(BUFFER_DIR, getBufferFileName()); - if (!bufferFile.createNewFile()) { - LOG.info( - "Buffer file for log stream {} already exists, continue with existing buffer file: {}", - logName, getBufferFileName()); + private File findExistingBufferFile() { + File bufferDir = new File(BUFFER_DIR); + if (!bufferDir.exists()) { + return null; } - bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true)); + + // Generate expected filename prefix + // but without the timestamp part: logName.hash. + String dummyFileName = generateBufferFileName(0); // Use timestamp 0 as placeholder + String expectedPrefix = dummyFileName.substring(0, dummyFileName.lastIndexOf('.') + 1); // Keep "logName.hash." + + File[] existingFiles = bufferDir.listFiles((dir, name) -> name.startsWith(expectedPrefix)); + if (existingFiles == null || existingFiles.length == 0) { + return null; + } + + // If multiple files exist, use the one with latest timestamp from filename + File bestMatch = null; + long latestTimestamp = 0; + + for (File file : existingFiles) { + long timestamp = extractTimestampFromFilename(file.getName()); + if (timestamp > latestTimestamp) { + latestTimestamp = timestamp; + bestMatch = file; + } + } + return bestMatch; } private Matcher extractTokensFromFilename(String logFileName) { @@ -356,6 +463,29 @@ private Matcher extractTokensFromFilename(String logFileName) { return matcher; } + /** + * Extract timestamp from buffer filename. + * Expected format: logName.hash.timestamp + */ + private long extractTimestampFromFilename(String filename) { + if (filename == null) { + return 0; + } + + String[] parts = filename.split("\\."); + if (parts.length >= 3) { + try { + // Last part should be timestamp + return Long.parseLong(parts[parts.length - 1]); + } catch (NumberFormatException e) { + LOG.warn("Failed to parse timestamp from filename: {}", filename); + return 0; + } + } + + return 0; + } + /** * Generates a map of default token values that can be used in the key format. * @@ -454,23 +584,27 @@ public String generateS3ObjectKey() { * * @param numLogMessagesRead the number of log messages read * @param isDraining whether the system is in a draining state - * @throws LogStreamWriterException if an error occurs while ending the commit + * @throws LogStreamWriterException if an error occurs while ending the commit or if S3 upload fails when buffer is full */ public synchronized void endCommit(int numLogMessagesRead, boolean isDraining) throws LogStreamWriterException { - try { - synchronized (objLock) { - if (bufferFile.length() >= maxFileSizeMB * BYTES_IN_MB) { - if (uploadFuture != null) { - uploadFuture.cancel(true); - } - bufferedOutputStream.close(); - uploadDiskBufferedFileToS3(); - scheduleUploadTask(); - } + synchronized (objLock) { + // Flush all buffered writes to disk + try { + bufferedOutputStream.flush(); + } catch (IOException e) { + LOG.error("Failed to flush buffer file after batch write", e); + throw new LogStreamWriterException("Failed to flush batch writes to disk", e); + } + + long currentBufferSize = bufferFile.length(); + + // Check if buffer size threshold is exceeded + if (currentBufferSize >= maxFileSizeMB * BYTES_IN_MB) { + LOG.info("Buffer file size {} has exceeded the size threshold of {}, attempting S3 upload", + currentBufferSize, maxFileSizeMB * BYTES_IN_MB); + uploadDiskBufferedFileToS3(TriggerType.SIZE); } - } catch (IOException e) { - throw new LogStreamWriterException("Failed to end commit", e); } } @@ -487,50 +621,22 @@ public void writeLogMessages(List messages) throws LogStreamWriterEx } /** - * Closes the S3Writer, ensuring that remaining buffered log messages are safely uploaded to S3. - * - * This method synchronizes on {@code objLock} to ensure thread safety while performing the - * following steps: - * Increments the close counter metric in OpenTsdb. - * If the buffer file has remaining data, it renames the file for upload, closes the buffer - * stream, - * and uploads the file to S3, recording relevant metrics. - * Cancels the upload future task if it is not already done. - * Attempts to close the buffered output stream if it is still open. - * Closes the S3 client connection. - * Any errors during these operations are logged accordingly. + * Closes the S3Writer by flushing any buffered data to disk and cancelling scheduled tasks. * - * @throws IOException + * @throws IOException if there's an error closing the buffered output stream */ @Override public void close() throws IOException { - synchronized (objLock) { - OpenTsdbMetricConverter.incr(SingerMetrics.S3_WRITER + "num_singer_close", 1, - "bucket=" + bucketName, "host=" + HOSTNAME, - "logName=" + logName); - - if (bufferFile.length() > 0) { - try { - bufferedOutputStream.close(); - uploadDiskBufferedFileToS3(); - bufferFile.delete(); - } catch (IOException e) { - LOG.error("Failed to close bufferedWriter or upload buffer file: " + getBufferFileName(), e); - } - } - - if (uploadFuture != null && !uploadFuture.isDone()) { - uploadFuture.cancel(true); - } + // Cancel periodic time-based upload task + if (timeBasedUploadTask != null && !timeBasedUploadTask.isCancelled()) { + timeBasedUploadTask.cancel(false); + LOG.debug("Cancelled time-based upload task for log {}", logName); + } - try { - if (bufferedOutputStream != null) { - bufferedOutputStream.close(); - } - } catch (IOException e) { - LOG.error("Failed to close buffer writers", e); - } - // Refrain from closing the S3 client because it can be shared by other writers + // Close buffered output stream to ensure data is flushed to disk + if (bufferedOutputStream != null) { + bufferedOutputStream.close(); + bufferedOutputStream = null; } } } \ No newline at end of file diff --git a/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java b/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java index 751114ac..e24da4a7 100644 --- a/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java +++ b/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java @@ -3,6 +3,7 @@ import com.pinterest.singer.SingerTestBase; import com.pinterest.singer.common.LogStream; import com.pinterest.singer.common.SingerLog; +import com.pinterest.singer.common.errors.LogStreamWriterException; import com.pinterest.singer.thrift.LogMessage; import com.pinterest.singer.thrift.LogMessageAndPosition; import com.pinterest.singer.thrift.configuration.S3WriterConfig; @@ -11,7 +12,6 @@ import com.pinterest.singer.writer.s3.S3Writer.DefaultTokens; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.FilenameUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -28,6 +28,7 @@ import static org.junit.Assert.assertNotEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -97,78 +98,70 @@ public void testWriteLogMessageToCommit() throws Exception { } @Test - public void testUploadToS3WhenSizeThresholdMet() throws Exception { - // Prepare log message - LogMessage - logMessage = - new LogMessage(ByteBuffer.wrap(new byte[1024 * 1024])); // simulate 1MB message - LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); - - // Mock upload behavior - when(mockS3Uploader.upload(any(S3ObjectUpload.class))).thenReturn(true); - - // Write log messages to commit - s3Writer.startCommit(false); - for (int i = 0; i < 51; i++) { // Write enough to exceed the threshold - s3Writer.writeLogMessageToCommit(logMessageAndPosition, false); - } - s3Writer.endCommit(2, false); - - // Verify upload was called - verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class)); - } + public void testSmallMessagesDoNotTriggerSizeBasedUploads() throws Exception { + // Configure large buffer to ensure small messages don't trigger upload + s3WriterConfig.setMaxFileSizeMB(10); // 10MB limit + s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); - @Test - public void testUploadIsScheduled() throws Exception { - // Prepare log message - ByteBuffer messageBuffer = ByteBuffer.wrap(new byte[1024]); // simulate 1KB message + // Prepare small message that won't exceed threshold + ByteBuffer messageBuffer = ByteBuffer.wrap("small test message".getBytes()); LogMessage logMessage = new LogMessage(messageBuffer); LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); - // Mock upload behavior - when(mockS3Uploader.upload(any(S3ObjectUpload.class))).thenReturn(true); - - // Write log messages to commit + // Write small message - should not trigger size-based upload s3Writer.startCommit(false); s3Writer.writeLogMessageToCommit(logMessageAndPosition, false); - - // Simulate passage of time and scheduled upload - Thread.sleep((s3WriterConfig.getMinUploadTimeInSeconds() + 2) * 1000); - s3Writer.endCommit(1, false); - // Verify upload was called - verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class)); + verify(mockS3Uploader, never()).upload(any(S3ObjectUpload.class)); + + File bufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName()); + assertTrue("Buffer should contain data", bufferFile.exists() && bufferFile.length() > 0); } + + @Test - public void testResumeFromExistingBufferFile() throws Exception { - // Prepare log message - ByteBuffer messageBuffer = ByteBuffer.wrap("This is message 1 :".getBytes()); + public void testResumeFromExistingBufferFileAfterCrash() throws Exception { + // Simulate crash scenario: write data but don't close properly (no upload) + ByteBuffer messageBuffer = ByteBuffer.wrap("This is message 1".getBytes()); LogMessage logMessage = new LogMessage(messageBuffer); LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); - // Write log message to commit s3Writer.startCommit(false); s3Writer.writeLogMessageToCommit(logMessageAndPosition, false); + s3Writer.endCommit(1, false); - // Create a new S3Writer with the same buffer file and write another message to simulate resuming - S3Writer - newS3Writer = - new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + // Verify buffer file exists with first message + File bufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName()); + assertTrue(bufferFile.exists()); + assertTrue(bufferFile.length() > 0); + String fileName = bufferFile.getName(); + + // Simulate crash - skip calling close() + + // Create new S3Writer to simulate restart after crash + // It should find and resume from the existing buffer file + S3Writer newS3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + + // Write another message - should append to existing buffer messageBuffer = ByteBuffer.wrap(" This is message 2".getBytes()); logMessage = new LogMessage(messageBuffer); logMessageAndPosition = new LogMessageAndPosition(logMessage, null); - // Write log message to commit newS3Writer.startCommit(false); newS3Writer.writeLogMessageToCommit(logMessageAndPosition, false); + newS3Writer.endCommit(1, false); + + // Verify it's using the same buffer file (resume scenario) + File resumedBufferFile = new File(tempPath + "/" + newS3Writer.getBufferFileName()); + assertEquals("Should resume using same buffer file", fileName, resumedBufferFile.getName()); + + // Verify both messages are in the buffer + String content = new String(Files.readAllBytes(resumedBufferFile.toPath())); + assertTrue("Buffer should contain first message", content.contains("This is message 1")); + assertTrue("Buffer should contain second message", content.contains(" This is message 2")); - // Verify that the messages are written to the buffer file - File bufferFile = new File(tempPath + "/" + newS3Writer.getBufferFileName()); - assertTrue(bufferFile.exists()); - String content = new String(Files.readAllBytes(bufferFile.toPath())); - assertTrue(content.contains("This is message 1 : This is message 2")); newS3Writer.close(); } @@ -238,26 +231,259 @@ public void testObjectKeyGeneration() { } @Test - public void testClose() throws Exception { - // Prepare log message - ByteBuffer messageBuffer = ByteBuffer.wrap("test message".getBytes()); + public void testTimedUploadSchedulerWithMultipleLogStreams() throws Exception { + s3WriterConfig.setMinUploadTimeInSeconds(1); + s3WriterConfig.setMaxFileSizeMB(100); + + LogStream logStream1 = new LogStream(singerLog, "test_log_1"); + LogStream logStream2 = new LogStream(singerLog, "test_log_2"); + LogStream logStream3 = new LogStream(singerLog, "test_log_3"); + + S3Writer writer1 = new S3Writer(logStream1, s3WriterConfig, mockS3Uploader, tempPath); + S3Writer writer2 = new S3Writer(logStream2, s3WriterConfig, mockS3Uploader, tempPath); + S3Writer writer3 = new S3Writer(logStream3, s3WriterConfig, mockS3Uploader, tempPath); + + try { + when(mockS3Uploader.upload(any(S3ObjectUpload.class))).thenReturn(true); + + // Write small data to each stream (not enough to trigger size-based upload) + ByteBuffer messageBuffer = ByteBuffer.wrap("scheduler test data".getBytes()); + LogMessage logMessage = new LogMessage(messageBuffer); + LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); + + writer1.startCommit(false); + writer1.writeLogMessageToCommit(logMessageAndPosition, false); + writer1.endCommit(1, false); + + Thread.sleep(300); + writer2.startCommit(false); + writer2.writeLogMessageToCommit(logMessageAndPosition, false); + writer2.endCommit(1, false); + + Thread.sleep(300); + writer3.startCommit(false); + writer3.writeLogMessageToCommit(logMessageAndPosition, false); + writer3.endCommit(1, false); + + File bufferFile1 = new File(tempPath + "/" + writer1.getBufferFileName()); + File bufferFile2 = new File(tempPath + "/" + writer2.getBufferFileName()); + File bufferFile3 = new File(tempPath + "/" + writer3.getBufferFileName()); + + assertTrue("Buffer 1 should contain data", bufferFile1.exists() && bufferFile1.length() > 0); + assertTrue("Buffer 2 should contain data", bufferFile2.exists() && bufferFile2.length() > 0); + assertTrue("Buffer 3 should contain data", bufferFile3.exists() && bufferFile3.length() > 0); + + long initialSize1 = bufferFile1.length(); + long initialSize2 = bufferFile2.length(); + long initialSize3 = bufferFile3.length(); + + Thread.sleep(2500); // 2.5 seconds should be enough + + verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class)); + + long finalSize1 = bufferFile1.exists() ? bufferFile1.length() : 0; + long finalSize2 = bufferFile2.exists() ? bufferFile2.length() : 0; + long finalSize3 = bufferFile3.exists() ? bufferFile3.length() : 0; + + assertTrue("Buffer 1 should be cleared after scheduled upload", finalSize1 < initialSize1); + assertTrue("Buffer 2 should be cleared after scheduled upload", finalSize2 < initialSize2); + assertTrue("Buffer 3 should be cleared after scheduled upload", finalSize3 < initialSize3); + + } finally { + // Clean up + writer1.close(); + writer2.close(); + writer3.close(); + } + } + + @Test + public void testExceptionHandlingPreservesBufferOnFailure() throws Exception { + s3WriterConfig.setMaxFileSizeMB(1); + s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + + ByteBuffer messageBuffer = ByteBuffer.wrap("important data that must not be lost".getBytes()); LogMessage logMessage = new LogMessage(messageBuffer); LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); - // Write log message to commit + when(mockS3Uploader.upload(any(S3ObjectUpload.class))).thenReturn(false); + s3Writer.startCommit(false); s3Writer.writeLogMessageToCommit(logMessageAndPosition, false); s3Writer.endCommit(1, false); - // Call close - s3Writer.close(); + File bufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName()); + assertTrue("Buffer should contain data", bufferFile.length() > 0); + String originalContent = new String(Files.readAllBytes(bufferFile.toPath())); + long originalSize = bufferFile.length(); + + try { + byte[] moreData = new byte[2 * 1024 * 1024]; + Arrays.fill(moreData, (byte) 'X'); + ByteBuffer largeBuffer = ByteBuffer.wrap(moreData); + LogMessage largeMessage = new LogMessage(largeBuffer); + LogMessageAndPosition largeLogMessageAndPosition = new LogMessageAndPosition(largeMessage, null); + + s3Writer.startCommit(false); + // This should trigger upload due to size, which will fail + s3Writer.writeLogMessageToCommit(largeLogMessageAndPosition, false); + + fail("Expected LogStreamWriterException when upload fails and buffer would be full"); + } catch (LogStreamWriterException e) { + // Expected - upload failed, exception propagated for batch retry + } + + assertTrue("Buffer file should still exist after upload failure", bufferFile.exists()); + assertEquals("Buffer size should be unchanged after upload failure", originalSize, bufferFile.length()); + + String preservedContent = new String(Files.readAllBytes(bufferFile.toPath())); + assertEquals("Buffer content should be preserved after upload failure", originalContent, preservedContent); + + verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class)); + } + + @Test + public void testBackpressurePreventsBatchWritesWhenBufferFull() throws Exception { + s3WriterConfig.setMaxFileSizeMB(1); // 1MB limit + s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + + when(mockS3Uploader.upload(any(S3ObjectUpload.class))).thenReturn(false); + + byte[] data = new byte[512 * 1024]; + Arrays.fill(data, (byte) 'A'); + ByteBuffer messageBuffer = ByteBuffer.wrap(data); + LogMessage logMessage = new LogMessage(messageBuffer); + LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); + + s3Writer.startCommit(false); + s3Writer.writeLogMessageToCommit(logMessageAndPosition, false); + s3Writer.endCommit(1, false); + + File bufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName()); + assertTrue("Buffer should contain initial data", bufferFile.length() > 0); + long initialBufferSize = bufferFile.length(); + + byte[] moreData = new byte[600 * 1024]; + Arrays.fill(moreData, (byte) 'B'); + ByteBuffer largeBuffer = ByteBuffer.wrap(moreData); + LogMessage largeMessage = new LogMessage(largeBuffer); + LogMessageAndPosition largeLogMessageAndPosition = new LogMessageAndPosition(largeMessage, null); + + // This should trigger upload attempt, fail, and throw exception (blocking the batch) + try { + s3Writer.startCommit(false); + s3Writer.writeLogMessageToCommit(largeLogMessageAndPosition, false); + fail("Expected LogStreamWriterException when buffer would overflow and upload fails"); + } catch (LogStreamWriterException e) { + // Expected - backpressure prevents buffer overflow + assertTrue("Exception should mention upload failure", + e.getMessage().contains("upload failed") || e.getMessage().contains("Buffer file S3 upload failed")); + } + + verify(mockS3Uploader).upload(any(S3ObjectUpload.class)); + + assertTrue("Buffer should still exist after failed upload", bufferFile.exists()); + assertEquals("Buffer size should be unchanged after failed write attempt", + initialBufferSize, bufferFile.length()); + assertTrue("Buffer should preserve original data", bufferFile.length() > 0); + } + + @Test + public void testSuccessfulUploadCleansBufferAndResetsStream() throws Exception { + s3WriterConfig.setMaxFileSizeMB(1); + s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + + when(mockS3Uploader.upload(any(S3ObjectUpload.class))).thenReturn(true); + + byte[] largeData = new byte[2 * 1024 * 1024]; // 2MB message + Arrays.fill(largeData, (byte) 'S'); + ByteBuffer messageBuffer = ByteBuffer.wrap(largeData); + LogMessage logMessage = new LogMessage(messageBuffer); + LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); + + s3Writer.startCommit(false); + + File bufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName()); + assertTrue("Buffer file should exist after startCommit", bufferFile.exists()); + + s3Writer.writeLogMessageToCommit(logMessageAndPosition, false); + s3Writer.endCommit(1, false); - // Verify that the buffer file was correctly handled - String - bufferFileName = s3Writer.getBufferFileName(); - File bufferFile = new File(FilenameUtils.concat(tempPath, bufferFileName)); - assertFalse(bufferFile.exists()); - assertEquals(0, bufferFile.length()); verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class)); + + File currentBufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName()); + + // Verify buffer was cleaned up (reset to empty) + assertTrue("Buffer file should exist after successful upload (reset)", currentBufferFile.exists()); + assertEquals("Buffer should be empty after successful upload cleanup", 0, currentBufferFile.length()); + + ByteBuffer newMessageBuffer = ByteBuffer.wrap("new data after reset".getBytes()); + LogMessage newLogMessage = new LogMessage(newMessageBuffer); + LogMessageAndPosition newLogMessageAndPosition = new LogMessageAndPosition(newLogMessage, null); + + s3Writer.startCommit(false); + s3Writer.writeLogMessageToCommit(newLogMessageAndPosition, false); + s3Writer.endCommit(1, false); + + // Verify new data was written successfully + assertTrue("Buffer should contain new data after reset", currentBufferFile.length() > 0); + String content = new String(Files.readAllBytes(currentBufferFile.toPath())); + assertTrue("Buffer should contain new message", content.contains("new data after reset")); + } + + @Test + public void testBufferFileRecoveryAfterRestartWithMultipleLogStreams() throws Exception { + // Create two different log streams (simulating different log files) + LogStream logStream1 = new LogStream(singerLog, "test_log_1"); + LogStream logStream2 = new LogStream(singerLog, "test_log_2"); + + // Create S3Writers for each log stream and write data + S3Writer writer1 = new S3Writer(logStream1, s3WriterConfig, mockS3Uploader, tempPath); + S3Writer writer2 = new S3Writer(logStream2, s3WriterConfig, mockS3Uploader, tempPath); + + ByteBuffer messageBuffer = ByteBuffer.wrap("test message".getBytes()); + LogMessage logMessage = new LogMessage(messageBuffer); + LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); + + // Write to both writers + writer1.startCommit(false); + writer1.writeLogMessageToCommit(logMessageAndPosition, false); + writer1.endCommit(1, false); + + writer2.startCommit(false); + writer2.writeLogMessageToCommit(logMessageAndPosition, false); + writer2.endCommit(1, false); + + // Get buffer file names + String bufferFile1Name = writer1.getBufferFileName(); + String bufferFile2Name = writer2.getBufferFileName(); + + // Verify they have different buffer file names + assertNotEquals("Different log streams should have different buffer files", bufferFile1Name, bufferFile2Name); + + // Close writers (simulating shutdown) + writer1.close(); + writer2.close(); + + // Verify buffer files exist + File buffer1 = new File(tempPath + "/" + bufferFile1Name); + File buffer2 = new File(tempPath + "/" + bufferFile2Name); + assertTrue("Buffer file 1 should exist after close", buffer1.exists()); + assertTrue("Buffer file 2 should exist after close", buffer2.exists()); + + // Simulate restart: create new S3Writers for the same log streams + S3Writer newWriter1 = new S3Writer(logStream1, s3WriterConfig, mockS3Uploader, tempPath); + S3Writer newWriter2 = new S3Writer(logStream2, s3WriterConfig, mockS3Uploader, tempPath); + + // Verify each writer found its own buffer file (not the other's) + String recoveredBufferFile1Name = newWriter1.getBufferFileName(); + String recoveredBufferFile2Name = newWriter2.getBufferFileName(); + + assertEquals("Writer 1 should recover its own buffer file", bufferFile1Name, recoveredBufferFile1Name); + assertEquals("Writer 2 should recover its own buffer file", bufferFile2Name, recoveredBufferFile2Name); + assertNotEquals("Recovered buffer files should still be different", recoveredBufferFile1Name, recoveredBufferFile2Name); + + newWriter1.close(); + newWriter2.close(); } }