diff --git a/CMakeLists.txt b/CMakeLists.txt index 05fac0673d3..b8d92333926 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -299,6 +299,7 @@ if(FLB_ALL) set(FLB_OUT_NULL 1) set(FLB_OUT_PLOT 1) set(FLB_OUT_FILE 1) + set(FLB_OUT_LOGROTATE 1) set(FLB_OUT_RETRY 1) set(FLB_OUT_TD 1) set(FLB_OUT_STDOUT 1) diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 703073ff3f8..8f5ec837012 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -117,6 +117,7 @@ DEFINE_OPTION(FLB_OUT_DATADOG "Enable DataDog output plugin" DEFINE_OPTION(FLB_OUT_ES "Enable Elasticsearch output plugin" ON) DEFINE_OPTION(FLB_OUT_EXIT "Enable Exit output plugin" ON) DEFINE_OPTION(FLB_OUT_FILE "Enable file output plugin" ON) +DEFINE_OPTION(FLB_OUT_LOGROTATE "Enable logrotate output plugin" ON) DEFINE_OPTION(FLB_OUT_FLOWCOUNTER "Enable flowcount output plugin" ON) DEFINE_OPTION(FLB_OUT_FORWARD "Enable Forward output plugin" ON) DEFINE_OPTION(FLB_OUT_GELF "Enable GELF output plugin" ON) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index 2791fffb4d9..48a13aa5c71 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -100,6 +100,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_OUT_NATS No) set(FLB_OUT_PLOT No) set(FLB_OUT_FILE Yes) + set(FLB_OUT_LOGROTATE Yes) set(FLB_OUT_TD No) set(FLB_OUT_RETRY No) set(FLB_OUT_SPLUNK Yes) diff --git a/conf/fluent-bit-logrotate.conf b/conf/fluent-bit-logrotate.conf new file mode 100644 index 00000000000..e1b18482e63 --- /dev/null +++ b/conf/fluent-bit-logrotate.conf @@ -0,0 +1,21 @@ +[SERVICE] + Flush 1 + Log_Level info + Parsers_File parsers.conf + +[INPUT] + Name dummy + Tag test.logrotate + Dummy {"message": "test log message", "level": "info"} + Rate 10 + +[OUTPUT] + Name logrotate + Match test.logrotate + Path /tmp/logs + File test.log + Format json + Max_Size 10M + Max_Files 5 + Gzip On + Mkdir On diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 65d417d7cbb..fd7d12af5a0 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -363,6 +363,7 @@ REGISTER_OUT_PLUGIN("out_datadog") REGISTER_OUT_PLUGIN("out_es") REGISTER_OUT_PLUGIN("out_exit") REGISTER_OUT_PLUGIN("out_file") +REGISTER_OUT_PLUGIN("out_logrotate") REGISTER_OUT_PLUGIN("out_forward") REGISTER_OUT_PLUGIN("out_http") REGISTER_OUT_PLUGIN("out_influxdb") diff --git a/plugins/out_logrotate/CMakeLists.txt b/plugins/out_logrotate/CMakeLists.txt new file mode 100644 index 00000000000..0f6fc53df21 --- /dev/null +++ b/plugins/out_logrotate/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + logrotate.c) + +FLB_PLUGIN(out_logrotate "${src}" "") diff --git a/plugins/out_logrotate/logrotate.c b/plugins/out_logrotate/logrotate.c new file mode 100644 index 00000000000..a5cbd2077bd --- /dev/null +++ b/plugins/out_logrotate/logrotate.c @@ -0,0 +1,1644 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#ifndef FLB_SYSTEM_WINDOWS +#include +#include /* dirname */ +#endif +#include +#include +#include +#include /* PATH_MAX */ +#include /* PRIu64 */ + +#ifdef FLB_SYSTEM_WINDOWS +#include +#include +#include +#endif + +#include "logrotate.h" + +#ifdef FLB_SYSTEM_WINDOWS +#define NEWLINE "\r\n" +#define S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR) +#else +#define NEWLINE "\n" +#endif + +#ifdef FLB_SYSTEM_WINDOWS +#define FLB_PATH_SEPARATOR "\\" +#else +#define FLB_PATH_SEPARATOR "/" +#endif + +/* Constants for streaming gzip compression */ +#define GZIP_CHUNK_SIZE (64 * 1024) /* 64KB chunks for memory efficiency */ +#define GZIP_HEADER_SIZE 10 +#define GZIP_FOOTER_SIZE 8 + +struct logrotate_file_size { + flb_sds_t filename; + size_t size; + flb_lock_t lock; /* Mutex to protect file operations */ + struct mk_list _head; +}; + +struct flb_logrotate_conf { + const char *out_path; + const char *out_file; + const char *delimiter; + const char *label_delimiter; + const char *template; + int format; + int csv_column_names; + int mkdir; + size_t max_size; /* Max file size */ + int max_files; /* Maximum number of rotated files to keep */ + int gzip; /* Whether to gzip rotated files */ + struct mk_list file_sizes; /* Linked list to store file size per filename */ + struct flb_output_instance *ins; +}; + +static char *check_delimiter(const char *str) +{ + if (str == NULL) { + return NULL; + } + + if (!strcasecmp(str, "\\t") || !strcasecmp(str, "tab")) { + return "\t"; + } + else if (!strcasecmp(str, "space")) { + return " "; + } + else if (!strcasecmp(str, "comma")) { + return ","; + } + + return NULL; +} + +static int cb_logrotate_init(struct flb_output_instance *ins, + struct flb_config *config, + void *data) +{ + int ret; + const char *tmp; + char *ret_str; + (void) config; + (void) data; + struct flb_logrotate_conf *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_logrotate_conf)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + ctx->format = FLB_OUT_LOGROTATE_FMT_JSON; /* default */ + ctx->delimiter = NULL; + ctx->label_delimiter = NULL; + ctx->template = NULL; + + /* Initialize linked list to store file sizes per filename */ + mk_list_init(&ctx->file_sizes); + + ret = flb_output_config_map_set(ins, (void *) ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + /* Optional, file format */ + tmp = flb_output_get_property("Format", ins); + if (tmp) { + if (!strcasecmp(tmp, "csv")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_CSV; + ctx->delimiter = ","; + } + else if (!strcasecmp(tmp, "ltsv")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_LTSV; + ctx->delimiter = "\t"; + ctx->label_delimiter = ":"; + } + else if (!strcasecmp(tmp, "plain")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_PLAIN; + ctx->delimiter = NULL; + ctx->label_delimiter = NULL; + } + else if (!strcasecmp(tmp, "msgpack")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_MSGPACK; + ctx->delimiter = NULL; + ctx->label_delimiter = NULL; + } + else if (!strcasecmp(tmp, "template")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_TEMPLATE; + } + else if (!strcasecmp(tmp, "out_logrotate")) { + /* for explicit setting */ + ctx->format = FLB_OUT_LOGROTATE_FMT_JSON; + } + else { + flb_plg_error(ctx->ins, "unknown format %s. abort.", tmp); + flb_free(ctx); + return -1; + } + } + + tmp = flb_output_get_property("delimiter", ins); + ret_str = check_delimiter(tmp); + if (ret_str != NULL) { + ctx->delimiter = ret_str; + } + + tmp = flb_output_get_property("label_delimiter", ins); + ret_str = check_delimiter(tmp); + if (ret_str != NULL) { + ctx->label_delimiter = ret_str; + } + + /* Set the context */ + flb_output_set_context(ins, ctx); + + /* Log resolved configuration values */ + flb_plg_info(ctx->ins, "logrotate plugin initialized with: max_size=%zu, max_files=%d, gzip=%s, path=%s", + ctx->max_size, ctx->max_files, + ctx->gzip == FLB_TRUE ? "true" : "false", + ctx->out_path ? ctx->out_path : "not set"); + + return 0; +} + +static int csv_output(FILE *fp, int column_names, + struct flb_time *tm, msgpack_object *obj, + struct flb_logrotate_conf *ctx) +{ + int i; + int map_size; + msgpack_object_kv *kv = NULL; + + if (obj->type == MSGPACK_OBJECT_MAP && obj->via.map.size > 0) { + kv = obj->via.map.ptr; + map_size = obj->via.map.size; + + if (column_names == FLB_TRUE) { + fprintf(fp, "timestamp%s", ctx->delimiter); + for (i = 0; i < map_size; i++) { + msgpack_object_print(fp, (kv+i)->key); + if (i + 1 < map_size) { + fprintf(fp, "%s", ctx->delimiter); + } + } + fprintf(fp, NEWLINE); + } + + fprintf(fp, "%lld.%.09ld%s", + (long long) tm->tm.tv_sec, tm->tm.tv_nsec, ctx->delimiter); + + for (i = 0; i < map_size - 1; i++) { + msgpack_object_print(fp, (kv+i)->val); + fprintf(fp, "%s", ctx->delimiter); + } + + msgpack_object_print(fp, (kv+(map_size-1))->val); + fprintf(fp, NEWLINE); + } + return 0; +} + +static int ltsv_output(FILE *fp, struct flb_time *tm, msgpack_object *obj, + struct flb_logrotate_conf *ctx) +{ + msgpack_object_kv *kv = NULL; + int i; + int map_size; + + if (obj->type == MSGPACK_OBJECT_MAP && obj->via.map.size > 0) { + kv = obj->via.map.ptr; + map_size = obj->via.map.size; + fprintf(fp, "\"time\"%s%f%s", + ctx->label_delimiter, + flb_time_to_double(tm), + ctx->delimiter); + + for (i = 0; i < map_size - 1; i++) { + msgpack_object_print(fp, (kv+i)->key); + fprintf(fp, "%s", ctx->label_delimiter); + msgpack_object_print(fp, (kv+i)->val); + fprintf(fp, "%s", ctx->delimiter); + } + + msgpack_object_print(fp, (kv+(map_size-1))->key); + fprintf(fp, "%s", ctx->label_delimiter); + msgpack_object_print(fp, (kv+(map_size-1))->val); + fprintf(fp, NEWLINE); + } + return 0; +} + +static int template_output_write(struct flb_logrotate_conf *ctx, + FILE *fp, struct flb_time *tm, msgpack_object *obj, + const char *key, int size) +{ + int i; + msgpack_object_kv *kv; + + /* + * Right now we treat "{time}" specially and fill the placeholder + * with the metadata timestamp (formatted as float). + */ + if (!strncmp(key, "time", size)) { + fprintf(fp, "%f", flb_time_to_double(tm)); + return 0; + } + + if (obj->type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "invalid object type (type=%i)", obj->type); + return -1; + } + + for (i = 0; i < obj->via.map.size; i++) { + kv = obj->via.map.ptr + i; + + if (size != kv->key.via.str.size) { + continue; + } + + if (!memcmp(key, kv->key.via.str.ptr, size)) { + if (kv->val.type == MSGPACK_OBJECT_STR) { + fwrite(kv->val.via.str.ptr, 1, kv->val.via.str.size, fp); + } + else { + msgpack_object_print(fp, kv->val); + } + return 0; + } + } + return -1; +} + +/* + * Python-like string templating for out_logrotate. + * + * This accepts a format string like "my name is {name}" and fills + * placeholders using corresponding values in a record. + * + * e.g. {"name":"Tom"} => "my name is Tom" + */ +static int template_output(FILE *fp, struct flb_time *tm, msgpack_object *obj, + struct flb_logrotate_conf *ctx) +{ + int i; + int len = strlen(ctx->template); + int keysize; + const char *key; + const char *pos; + const char *inbrace = NULL; /* points to the last open brace */ + + for (i = 0; i < len; i++) { + pos = ctx->template + i; + if (*pos == '{') { + if (inbrace) { + /* + * This means that we find another open brace inside + * braces (e.g. "{a{b}"). Ignore the previous one. + */ + fwrite(inbrace, 1, pos - inbrace, fp); + } + inbrace = pos; + } + else if (*pos == '}' && inbrace) { + key = inbrace + 1; + keysize = pos - inbrace - 1; + + if (template_output_write(ctx, fp, tm, obj, key, keysize)) { + fwrite(inbrace, 1, pos - inbrace + 1, fp); + } + inbrace = NULL; + } + else { + if (!inbrace) { + fputc(*pos, fp); + } + } + } + + /* Handle an unclosed brace like "{abc" */ + if (inbrace) { + fputs(inbrace, fp); + } + fputs(NEWLINE, fp); + return 0; +} + +static int plain_output(FILE *fp, msgpack_object *obj, size_t alloc_size, + int escape_unicode) +{ + char *buf; + + buf = flb_msgpack_to_json_str(alloc_size, obj, escape_unicode); + if (buf) { + fprintf(fp, "%s" NEWLINE, + buf); + flb_free(buf); + } + return 0; +} + +static void print_metrics_text(struct flb_output_instance *ins, + FILE *fp, + const void *data, size_t bytes) +{ + int ret; + size_t off = 0; + cfl_sds_t text; + struct cmt *cmt = NULL; + + /* get cmetrics context */ + ret = cmt_decode_msgpack_create(&cmt, (char *) data, bytes, &off); + if (ret != 0) { + flb_plg_error(ins, "could not process metrics payload"); + return; + } + + /* convert to text representation */ + text = cmt_encode_text_create(cmt); + + /* destroy cmt context */ + cmt_destroy(cmt); + + fprintf(fp, "%s", text); + cmt_encode_text_destroy(text); +} + +static int mkpath(struct flb_output_instance *ins, const char *dir) +{ + struct stat st; + char *dup_dir = NULL; +#ifdef FLB_SYSTEM_MACOS + char *parent_dir = NULL; +#endif +#ifdef FLB_SYSTEM_WINDOWS + char parent_path[MAX_PATH]; + DWORD err; + char *p; + char *sep; +#endif + int ret; + + if (!dir) { + errno = EINVAL; + return -1; + } + + if (strlen(dir) == 0) { + errno = EINVAL; + return -1; + } + + if (stat(dir, &st) == 0) { + if (S_ISDIR (st.st_mode)) { + return 0; + } + flb_plg_error(ins, "%s is not a directory", dir); + errno = ENOTDIR; + return -1; + } + +#ifdef FLB_SYSTEM_WINDOWS + if (strncpy_s(parent_path, MAX_PATH, dir, _TRUNCATE) != 0) { + flb_plg_error(ins, "path is too long: %s", dir); + errno = ENAMETOOLONG; + return -1; + } + + p = parent_path; + + /* Skip the drive letter if present (e.g., "C:") */ + if (p[1] == ':') { + p += 2; + } + + /* Normalize all forward slashes to backslashes */ + while (*p != '\0') { + if (*p == '/') { + *p = '\\'; + } + p++; + } + + flb_plg_debug(ins, "processing path '%s'", parent_path); + sep = strstr(parent_path, FLB_PATH_SEPARATOR); + if (sep != NULL && PathRemoveFileSpecA(parent_path)) { + flb_plg_debug(ins, "creating directory (recursive) %s", parent_path); + ret = mkpath(ins, parent_path); + if (ret != 0) { + /* If creating the parent failed, we cannot continue. */ + return -1; + } + } + + flb_plg_debug(ins, "attempting to create final directory '%s'", dir); + if (!CreateDirectoryA(dir, NULL)) { + err = GetLastError(); + + if (err != ERROR_ALREADY_EXISTS) { + flb_plg_error(ins, "could not create directory '%s' (error=%lu)", + dir, err); + return -1; + } + } + + return 0; +#elif FLB_SYSTEM_MACOS + dup_dir = flb_strdup(dir); + if (!dup_dir) { + return -1; + } + + /* macOS's dirname(3) should return current directory when slash + * charachter is not included in passed string. + * And note that macOS's dirname(3) does not modify passed string. + */ + parent_dir = dirname(dup_dir); + if (stat(parent_dir, &st) == 0 && strncmp(parent_dir, ".", 1)) { + if (S_ISDIR (st.st_mode)) { + flb_plg_debug(ins, "creating directory %s", dup_dir); + ret = mkdir(dup_dir, 0755); + flb_free(dup_dir); + return ret; + } + } + + ret = mkpath(ins, dirname(dup_dir)); + if (ret != 0) { + flb_free(dup_dir); + return ret; + } + flb_plg_debug(ins, "creating directory %s", dup_dir); + ret = mkdir(dup_dir, 0755); + flb_free(dup_dir); + return ret; +#else + dup_dir = flb_strdup(dir); + if (!dup_dir) { + return -1; + } + ret = mkpath(ins, dirname(dup_dir)); + flb_free(dup_dir); + if (ret != 0) { + return ret; + } + flb_plg_debug(ins, "creating directory %s", dir); + return mkdir(dir, 0755); +#endif +} + +/* Helper function to find a file size entry by filename */ +static struct logrotate_file_size *find_file_size_entry(struct flb_logrotate_conf *ctx, + const char *filename) +{ + struct mk_list *head; + struct logrotate_file_size *entry; + + mk_list_foreach(head, &ctx->file_sizes) { + entry = mk_list_entry(head, struct logrotate_file_size, _head); + if (entry->filename && strcmp(entry->filename, filename) == 0) { + return entry; + } + } + return NULL; +} + + +/* Helper function to update or create file size entry */ +static int update_file_size(struct flb_logrotate_conf *ctx, + const char *filename, size_t size) +{ + struct logrotate_file_size *entry; + flb_sds_t filename_copy; + + entry = find_file_size_entry(ctx, filename); + if (entry != NULL) { + /* Update existing entry */ + entry->size = size; + return 0; + } + + /* Create new entry */ + entry = flb_calloc(1, sizeof(struct logrotate_file_size)); + if (!entry) { + flb_errno(); + return -1; + } + + filename_copy = flb_sds_create(filename); + if (!filename_copy) { + flb_free(entry); + flb_errno(); + return -1; + } + + entry->filename = filename_copy; + entry->size = size; + + /* Initialize mutex for this file entry */ + if (flb_lock_init(&entry->lock) != 0) { + flb_plg_error(ctx->ins, "failed to initialize mutex for file %s", + filename); + flb_sds_destroy(filename_copy); + flb_free(entry); + return -1; + } + + mk_list_add(&entry->_head, &ctx->file_sizes); + + return 0; +} + +/* Helper function to remove file size entry */ +static void remove_file_size(struct flb_logrotate_conf *ctx, const char *filename) +{ + struct logrotate_file_size *entry; + + entry = find_file_size_entry(ctx, filename); + if (entry != NULL) { + mk_list_del(&entry->_head); + /* Destroy mutex before freeing entry */ + flb_lock_destroy(&entry->lock); + if (entry->filename) { + flb_sds_destroy(entry->filename); + } + flb_free(entry); + } +} + + +/* Function to update file size counter for a specific file */ +static void update_file_size_counter(struct flb_logrotate_conf *ctx, + const char *filename, FILE *fp) +{ + struct stat st; + size_t file_size; + struct logrotate_file_size *entry; + int ret; + + if (fstat(fileno(fp), &st) != 0 || st.st_size < 0) { + return; + } + + file_size = (size_t) st.st_size; + + /* Find or create file size entry */ + entry = find_file_size_entry(ctx, filename); + if (entry == NULL) { + /* Entry doesn't exist, create it */ + ret = update_file_size(ctx, filename, file_size); + if (ret == -1) { + flb_plg_warn(ctx->ins, "failed to create file size entry for %s", + filename); + return; + } + /* Find the entry we just created */ + entry = find_file_size_entry(ctx, filename); + if (entry == NULL) { + flb_plg_warn(ctx->ins, + "failed to find file size entry for %s after creation", + filename); + return; + } + } + + /* Acquire lock before updating size */ + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_warn(ctx->ins, "failed to acquire lock for file %s", filename); + return; + } + + /* Update size atomically */ + entry->size = file_size; + + /* Release lock */ + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); +} + +/* Function to generate timestamp for rotated file */ +static void generate_timestamp(char *timestamp, size_t size) +{ + time_t now = time(NULL); + struct tm tm_info; +#ifdef FLB_SYSTEM_WINDOWS + localtime_s(&tm_info, &now); +#else + localtime_r(&now, &tm_info); +#endif + strftime(timestamp, size, "%Y%m%d_%H%M%S", &tm_info); +} + +/* Helper function to write gzip header (based on flb_gzip.c) */ +static void write_gzip_header(FILE *fp) +{ + uint8_t header[GZIP_HEADER_SIZE] = { + 0x1F, 0x8B, /* Magic bytes */ + 0x08, /* Compression method (deflate) */ + 0x00, /* Flags */ + 0x00, 0x00, 0x00, 0x00, /* Timestamp */ + 0x00, /* Compression flags */ + 0xFF /* OS (unknown) */ + }; + fwrite(header, 1, GZIP_HEADER_SIZE, fp); +} + +/* Helper function to write gzip footer */ +static void write_gzip_footer(FILE *fp, mz_ulong crc, size_t original_size) +{ + uint8_t footer[GZIP_FOOTER_SIZE]; + + /* Write CRC32 */ + footer[0] = crc & 0xFF; + footer[1] = (crc >> 8) & 0xFF; + footer[2] = (crc >> 16) & 0xFF; + footer[3] = (crc >> 24) & 0xFF; + + /* Write original size */ + footer[4] = original_size & 0xFF; + footer[5] = (original_size >> 8) & 0xFF; + footer[6] = (original_size >> 16) & 0xFF; + footer[7] = (original_size >> 24) & 0xFF; + + fwrite(footer, 1, GZIP_FOOTER_SIZE, fp); +} + +/* Function to compress a file using streaming gzip (memory-safe for large files) */ +static int gzip_compress_file(const char *input_filename, + const char *output_filename, + struct flb_output_instance *ins) +{ + FILE *src_fp = NULL, *dst_fp = NULL; + char *input_buffer = NULL, *output_buffer = NULL; + size_t bytes_read, output_buffer_size; + size_t total_input_size = 0; + mz_ulong crc = MZ_CRC32_INIT; + z_stream strm; + int ret = 0, flush, status; + int deflate_initialized = 0; + + /* Open source file */ + src_fp = fopen(input_filename, "rb"); + if (!src_fp) { + flb_plg_error(ins, "failed to open source file for gzip: %s", + input_filename); + return -1; + } + + /* Open destination file */ + dst_fp = fopen(output_filename, "wb"); + if (!dst_fp) { + flb_plg_error(ins, "failed to create gzip file: %s", + output_filename); + fclose(src_fp); + return -1; + } + + /* Allocate input and output buffers */ + input_buffer = flb_malloc(GZIP_CHUNK_SIZE); + output_buffer_size = compressBound(GZIP_CHUNK_SIZE); + output_buffer = flb_malloc(output_buffer_size); + + if (!input_buffer || !output_buffer) { + flb_plg_error(ins, "failed to allocate compression buffers"); + ret = -1; + goto cleanup; + } + + /* Write gzip header */ + write_gzip_header(dst_fp); + + /* Initialize deflate stream (raw deflate without gzip wrapper) */ + memset(&strm, 0, sizeof(strm)); + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + + status = deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, + -Z_DEFAULT_WINDOW_BITS, 9, Z_DEFAULT_STRATEGY); + if (status != Z_OK) { + flb_plg_error(ins, "failed to initialize deflate stream"); + ret = -1; + goto cleanup; + } + deflate_initialized = 1; + + /* Process file in chunks (ensure Z_FINISH is always issued) */ + do { + bytes_read = fread(input_buffer, 1, GZIP_CHUNK_SIZE, src_fp); + if (bytes_read > 0) { + /* Update CRC and total size */ + crc = mz_crc32(crc, (const unsigned char *)input_buffer, bytes_read); + total_input_size += bytes_read; + + /* Set up deflate input */ + strm.next_in = (Bytef *)input_buffer; + strm.avail_in = bytes_read; + + /* Determine flush mode based on EOF after this read */ + flush = feof(src_fp) ? Z_FINISH : Z_NO_FLUSH; + + /* Compress chunk */ + do { + strm.next_out = (Bytef *)output_buffer; + strm.avail_out = output_buffer_size; + + status = deflate(&strm, flush); + if (status == Z_STREAM_ERROR) { + flb_plg_error(ins, + "deflate stream error during compression"); + ret = -1; + goto deflate_cleanup; + } + + /* Write compressed data */ + size_t compressed_bytes = output_buffer_size - strm.avail_out; + if (compressed_bytes > 0) { + if (fwrite(output_buffer, 1, compressed_bytes, dst_fp) != + compressed_bytes) { + flb_plg_error(ins, "failed to write compressed data"); + ret = -1; + goto deflate_cleanup; + } + } + } while (strm.avail_out == 0); + + /* Verify all input was consumed */ + if (strm.avail_in != 0) { + flb_plg_error(ins, + "deflate did not consume all input data"); + ret = -1; + goto deflate_cleanup; + } + } + } while (bytes_read > 0 && status != Z_STREAM_END); + + /* Verify compression completed successfully */ + if (status != Z_STREAM_END) { + flb_plg_error(ins, "compression did not complete properly"); + ret = -1; + } + else { + /* Write gzip footer (CRC32 + original size) */ + write_gzip_footer(dst_fp, crc, total_input_size); + } + +deflate_cleanup: + if (deflate_initialized) { + deflateEnd(&strm); + deflate_initialized = 0; + } + +cleanup: + if (input_buffer) { + flb_free(input_buffer); + input_buffer = NULL; + } + if (output_buffer) { + flb_free(output_buffer); + output_buffer = NULL; + } + if (src_fp) { + fclose(src_fp); + src_fp = NULL; + } + if (dst_fp) { + fclose(dst_fp); + dst_fp = NULL; + } + + return ret; +} + +/* Function to rotate file */ +static int rotate_file(struct flb_logrotate_conf *ctx, const char *filename) +{ + char timestamp[32]; + char rotated_filename[PATH_MAX]; + char gzip_filename[PATH_MAX]; + size_t file_size = 0; + struct logrotate_file_size *entry; + int ret = 0; + int lock_acquired = 0; + + /* Get file size entry and acquire lock */ + entry = find_file_size_entry(ctx, filename); + if (entry != NULL) { + file_size = entry->size; + /* Acquire lock before rotation operations */ + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_error(ctx->ins, "failed to acquire lock for file %s", + filename); + return -1; + } + lock_acquired = 1; + } + + /* Log rotation event */ + flb_plg_info(ctx->ins, "rotating file: %s (current size: %zu bytes)", + filename, file_size); + + /* Generate timestamp */ + generate_timestamp(timestamp, sizeof(timestamp)); + + /* Create rotated filename with timestamp */ + snprintf(rotated_filename, PATH_MAX - 1, "%s.%s", filename, timestamp); + + /* Rename current file to rotated filename */ + if (rename(filename, rotated_filename) != 0) { + flb_plg_error(ctx->ins, "failed to rename file from %s to %s", + filename, rotated_filename); + ret = -1; + goto cleanup; + } + + /* If gzip is enabled, compress the rotated file */ + if (ctx->gzip == FLB_TRUE) { + snprintf(gzip_filename, PATH_MAX - 1, "%s.gz", rotated_filename); + flb_plg_debug(ctx->ins, "compressing file: %s to %s", + rotated_filename, gzip_filename); + ret = gzip_compress_file(rotated_filename, gzip_filename, ctx->ins); + if (ret == 0) { + /* Remove the uncompressed file */ + unlink(rotated_filename); + flb_plg_debug(ctx->ins, "rotated and compressed file: %s", + gzip_filename); + } + else { + /* Remove the failed gzip file */ + unlink(gzip_filename); + ret = -1; + goto cleanup; + } + } + else { + flb_plg_debug(ctx->ins, "rotated file: %s (no compression)", + rotated_filename); + } + +cleanup: + /* Release lock if we acquired it */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + + return ret; +} + +/* + * Function to validate if a filename matches the rotation pattern format + * Valid formats: + * - base_filename.YYYYMMDD_HHMMSS (15 chars after pattern) + * - base_filename.YYYYMMDD_HHMMSS.gz (18 chars after pattern) + */ +static int is_valid_rotation_filename(const char *filename, + const char *pattern) +{ + size_t pattern_len = strlen(pattern); + size_t filename_len = strlen(filename); + const char *suffix; + size_t suffix_len; + int i; + + /* Check that filename starts with pattern */ + if (strncmp(filename, pattern, pattern_len) != 0) { + return 0; + } + + /* Get the suffix after the pattern */ + suffix = filename + pattern_len; + suffix_len = filename_len - pattern_len; + + /* Must be exactly 15 or 18 characters */ + if (suffix_len != 15 && suffix_len != 18) { + return 0; + } + + /* For 18 characters, must end with .gz */ + if (suffix_len == 18) { + if (strcmp(suffix + 15, ".gz") != 0) { + return 0; + } + } + + /* Validate timestamp format: YYYYMMDD_HHMMSS + * - 8 digits (YYYYMMDD) + * - underscore at position 8 + * - 6 digits (HHMMSS) + */ + for (i = 0; i < 8; i++) { + if (suffix[i] < '0' || suffix[i] > '9') { + return 0; + } + } + if (suffix[8] != '_') { + return 0; + } + for (i = 9; i < 15; i++) { + if (suffix[i] < '0' || suffix[i] > '9') { + return 0; + } + } + + return 1; +} + +/* Function to clean up old rotated files */ +static int cleanup_old_files(struct flb_logrotate_conf *ctx, + const char *directory, + const char *base_filename) +{ + char pattern[PATH_MAX]; + char full_path[PATH_MAX]; + char search_path[PATH_MAX]; + char **files = NULL; + int file_count = 0; + int max_files = ctx->max_files; + int i, j; + + /* Create pattern to match rotated files */ + snprintf(pattern, PATH_MAX - 1, "%s.", base_filename); + +#ifdef FLB_SYSTEM_WINDOWS + HANDLE hFind; + WIN32_FIND_DATA findData; + + /* Create search path: directory\* */ + snprintf(search_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "*", directory); + + hFind = FindFirstFileA(search_path, &findData); + if (hFind == INVALID_HANDLE_VALUE) { + return 0; /* Directory doesn't exist or can't be opened */ + } + + /* Count matching files */ + do { + /* Skip . and .. */ + if (strcmp(findData.cFileName, ".") == 0 || + strcmp(findData.cFileName, "..") == 0) { + continue; + } + /* Skip directories */ + if (findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + continue; + } + if (is_valid_rotation_filename(findData.cFileName, pattern)) { + file_count++; + } + } while (FindNextFileA(hFind, &findData) != 0); + + if (file_count <= max_files) { + FindClose(hFind); + return 0; + } + + /* Allocate array for file names */ + files = flb_calloc(file_count, sizeof(char *)); + if (!files) { + FindClose(hFind); + return -1; + } + + /* Collect file names - restart search */ + FindClose(hFind); + hFind = FindFirstFileA(search_path, &findData); + if (hFind == INVALID_HANDLE_VALUE) { + flb_free(files); + return -1; + } + + i = 0; + do { + /* Skip . and .. */ + if (strcmp(findData.cFileName, ".") == 0 || + strcmp(findData.cFileName, "..") == 0) { + continue; + } + /* Skip directories */ + if (findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + continue; + } + if (is_valid_rotation_filename(findData.cFileName, pattern) && + i < file_count) { + snprintf(full_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", + directory, findData.cFileName); + files[i] = flb_strdup(full_path); + i++; + } + } while (FindNextFileA(hFind, &findData) != 0 && i < file_count); + + FindClose(hFind); +#else + DIR *dir; + struct dirent *entry; + + dir = opendir(directory); + if (!dir) { + return 0; /* Directory doesn't exist or can't be opened */ + } + + /* Count matching files */ + while ((entry = readdir(dir)) != NULL) { + if (is_valid_rotation_filename(entry->d_name, pattern)) { + file_count++; + } + } + + if (file_count <= max_files) { + closedir(dir); + return 0; + } + + /* Allocate array for file names */ + files = flb_calloc(file_count, sizeof(char *)); + if (!files) { + closedir(dir); + return -1; + } + + /* Collect file names */ + rewinddir(dir); + i = 0; + while ((entry = readdir(dir)) != NULL && i < file_count) { + if (is_valid_rotation_filename(entry->d_name, pattern)) { + snprintf(full_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", + directory, entry->d_name); + files[i] = flb_strdup(full_path); + i++; + } + } + closedir(dir); +#endif + + /* Sort files by modification time (oldest first) */ + for (i = 0; i < file_count - 1; i++) { + for (j = i + 1; j < file_count; j++) { + struct stat st1; + struct stat st2; + if (stat(files[i], &st1) == 0 && stat(files[j], &st2) == 0) { + if (st1.st_mtime > st2.st_mtime) { + char *temp = files[i]; + files[i] = files[j]; + files[j] = temp; + } + } + } + } + + /* Remove oldest files */ + if (file_count > max_files) { + flb_plg_info(ctx->ins, + "cleaning up old rotated files: removing %d files (keeping %d)", + file_count - max_files, max_files); + } + for (i = 0; i < file_count - max_files; i++) { + if (unlink(files[i]) == 0) { + flb_plg_debug(ctx->ins, "removed old rotated file: %s", files[i]); + } + flb_free(files[i]); + } + + /* Free remaining file names */ + for (i = file_count - max_files; i < file_count; i++) { + flb_free(files[i]); + } + + flb_free(files); + return 0; +} + +static void cb_logrotate_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, + void *out_context, + struct flb_config *config) +{ + int ret; + int column_names; + FILE *fp; + size_t off = 0; + size_t last_off = 0; + size_t alloc_size = 0; + size_t total; + size_t file_size = 0; + char out_file[PATH_MAX]; + char *buf; + char *out_file_copy; + char directory[PATH_MAX]; + char base_filename[PATH_MAX]; + long file_pos; + bool have_directory; + struct flb_logrotate_conf *ctx = out_context; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + struct logrotate_file_size *entry = NULL; + struct stat st; + int lock_acquired = 0; + + (void) config; + + /* Set the right output file */ + if (ctx->out_path) { + if (ctx->out_file) { + snprintf(out_file, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", + ctx->out_path, ctx->out_file); + } + else { + snprintf(out_file, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", + ctx->out_path, event_chunk->tag); + } + } + else { + if (ctx->out_file) { + snprintf(out_file, PATH_MAX - 1, "%s", ctx->out_file); + } + else { + snprintf(out_file, PATH_MAX - 1, "%s", event_chunk->tag); + } + } + + /* Find or create file size entry and acquire lock */ + entry = find_file_size_entry(ctx, out_file); + if (entry == NULL) { + /* Entry doesn't exist yet, create it with initial size 0 */ + if (update_file_size(ctx, out_file, 0) == 0) { + entry = find_file_size_entry(ctx, out_file); + } + } + + if (entry != NULL) { + /* Acquire lock before any file operations */ + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_error(ctx->ins, "failed to acquire lock for file %s", + out_file); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + lock_acquired = 1; + } + + /* Check if file needs rotation based on current size counter */ + if (entry != NULL) { + file_size = entry->size; + } + else { + /* Entry doesn't exist, check using stat */ + if (stat(out_file, &st) == 0 && st.st_size >= 0) { + file_size = (size_t) st.st_size; + } + } + + if (file_size >= ctx->max_size) { + have_directory = false; + directory[0] = '\0'; + /* Extract directory and base filename for cleanup */ + out_file_copy = flb_strdup(out_file); + if (out_file_copy) { +#ifdef FLB_SYSTEM_WINDOWS + PathRemoveFileSpecA(out_file_copy); + strncpy(directory, out_file_copy, PATH_MAX - 1); + directory[PATH_MAX - 1] = '\0'; +#else + strncpy(directory, dirname(out_file_copy), PATH_MAX - 1); + directory[PATH_MAX - 1] = '\0'; +#endif + flb_free(out_file_copy); + have_directory = true; + } + + /* Get base filename for cleanup */ + { + char *last_sep = strrchr(out_file, FLB_PATH_SEPARATOR[0]); + if (last_sep) { + strncpy(base_filename, last_sep + 1, PATH_MAX - 1); + } + else { + strncpy(base_filename, out_file, PATH_MAX - 1); + } + base_filename[PATH_MAX - 1] = '\0'; + } + + /* Release lock before rotation (rotate_file will acquire its own) */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + lock_acquired = 0; + } + + /* Rotate the file */ + if (rotate_file(ctx, out_file) == 0) { + /* Remove file size entry from list after rotation */ + remove_file_size(ctx, out_file); + entry = NULL; /* Entry was removed */ + /* Clean up old rotated files */ + if (have_directory) { + cleanup_old_files(ctx, directory, base_filename); + } + } + + /* Re-acquire lock after rotation */ + entry = find_file_size_entry(ctx, out_file); + if (entry == NULL) { + /* Create new entry after rotation */ + if (update_file_size(ctx, out_file, 0) == 0) { + entry = find_file_size_entry(ctx, out_file); + } + } + if (entry != NULL) { + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_error(ctx->ins, + "failed to re-acquire lock for file %s", out_file); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + lock_acquired = 1; + } + } + + /* Open output file with default name as the Tag */ + fp = fopen(out_file, "ab+"); + if (ctx->mkdir == FLB_TRUE && fp == NULL && errno == ENOENT) { + out_file_copy = flb_strdup(out_file); + if (out_file_copy) { +#ifdef FLB_SYSTEM_WINDOWS + PathRemoveFileSpecA(out_file_copy); + ret = mkpath(ctx->ins, out_file_copy); +#else + ret = mkpath(ctx->ins, dirname(out_file_copy)); +#endif + flb_free(out_file_copy); + if (ret == 0) { + fp = fopen(out_file, "ab+"); + } + } + } + if (fp == NULL) { + flb_errno(); + flb_plg_error(ctx->ins, "error opening: %s", out_file); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* Initialize file size counter if this is a new file */ + if (entry == NULL) { + /* File not in list, initialize it */ + update_file_size_counter(ctx, out_file, fp); + /* Re-find entry after initialization */ + entry = find_file_size_entry(ctx, out_file); + if (entry != NULL && !lock_acquired) { + /* Acquire lock if we didn't have it before */ + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) == 0) { + lock_acquired = 1; + } + } + } + + /* + * Get current file stream position, we gather this in case 'csv' format + * needs to write the column names. + */ + file_pos = ftell(fp); + + /* Check if the event type is metrics, handle the payload differently */ + if (event_chunk->type == FLB_INPUT_METRICS) { + print_metrics_text(ctx->ins, fp, + event_chunk->data, event_chunk->size); + /* Update file size counter - we already hold the lock */ + if (lock_acquired && entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t) st.st_size; + } + } + else { + update_file_size_counter(ctx, out_file, fp); + } + fclose(fp); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_OK); + } + + /* + * Msgpack output format used to create unit tests files, useful for + * Fluent Bit developers. + */ + if (ctx->format == FLB_OUT_LOGROTATE_FMT_MSGPACK) { + off = 0; + total = 0; + + do { + ret = fwrite((char *) event_chunk->data + off, 1, + event_chunk->size - off, fp); + if (ret < 0) { + flb_errno(); + fclose(fp); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_RETRY); + } + total += ret; + } while (total < event_chunk->size); + + /* Update file size counter - we already hold the lock */ + if (lock_acquired && entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t) st.st_size; + } + } + else { + update_file_size_counter(ctx, out_file, fp); + } + fclose(fp); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_OK); + } + + ret = flb_log_event_decoder_init(&log_decoder, + (char *) event_chunk->data, + event_chunk->size); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); + + /* Update file size counter before closing - we already hold the lock */ + if (lock_acquired && entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t) st.st_size; + } + } + else { + update_file_size_counter(ctx, out_file, fp); + } + fclose(fp); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + /* + * Upon flush, for each array, lookup the time and the first field + * of the map to use as a data point. + */ + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */ + last_off = off; + + switch (ctx->format){ + case FLB_OUT_LOGROTATE_FMT_JSON: + buf = flb_msgpack_to_json_str(alloc_size, log_event.body, + config->json_escape_unicode); + if (buf) { + fprintf(fp, "%s: [%"PRIu64".%09lu, %s]" NEWLINE, + event_chunk->tag, + (uint64_t) log_event.timestamp.tm.tv_sec, + log_event.timestamp.tm.tv_nsec, buf); + flb_free(buf); + } + else { + /* Update file size counter - we already hold the lock */ + if (lock_acquired && entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t) st.st_size; + } + } + else { + update_file_size_counter(ctx, out_file, fp); + } + flb_log_event_decoder_destroy(&log_decoder); + fclose(fp); + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + FLB_OUTPUT_RETURN(FLB_RETRY); + } + break; + case FLB_OUT_LOGROTATE_FMT_CSV: + if (ctx->csv_column_names == FLB_TRUE && file_pos == 0) { + column_names = FLB_TRUE; + file_pos = 1; + } + else { + column_names = FLB_FALSE; + } + csv_output(fp, column_names, + &log_event.timestamp, + log_event.body, ctx); + break; + case FLB_OUT_LOGROTATE_FMT_LTSV: + ltsv_output(fp, + &log_event.timestamp, + log_event.body, ctx); + break; + case FLB_OUT_LOGROTATE_FMT_PLAIN: + plain_output(fp, log_event.body, alloc_size, config->json_escape_unicode); + break; + case FLB_OUT_LOGROTATE_FMT_TEMPLATE: + template_output(fp, + &log_event.timestamp, + log_event.body, ctx); + break; + } + } + + flb_log_event_decoder_destroy(&log_decoder); + + /* Update file size counter - we already hold the lock */ + if (lock_acquired && entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t) st.st_size; + } + } + else { + update_file_size_counter(ctx, out_file, fp); + } + fclose(fp); + + /* Release lock before returning */ + if (lock_acquired && entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + + FLB_OUTPUT_RETURN(FLB_OK); +} + +static int cb_logrotate_exit(void *data, struct flb_config *config) +{ + struct flb_logrotate_conf *ctx = data; + struct mk_list *head; + struct mk_list *tmp; + struct logrotate_file_size *entry; + + if (!ctx) { + return 0; + } + + /* Free all file size entries from linked list */ + mk_list_foreach_safe(head, tmp, &ctx->file_sizes) { + entry = mk_list_entry(head, struct logrotate_file_size, _head); + mk_list_del(&entry->_head); + /* Destroy mutex before freeing entry */ + flb_lock_destroy(&entry->lock); + if (entry->filename) { + flb_sds_destroy(entry->filename); + } + flb_free(entry); + } + + flb_free(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_STR, "path", NULL, + 0, FLB_TRUE, offsetof(struct flb_logrotate_conf, out_path), + "Absolute path to store the files. This parameter is optional" + }, + + { + FLB_CONFIG_MAP_STR, "file", NULL, + 0, FLB_TRUE, offsetof(struct flb_logrotate_conf, out_file), + "Name of the target file to write the records. If 'path' is specified, " + "the value is prefixed" + }, + + { + FLB_CONFIG_MAP_STR, "format", NULL, + 0, FLB_FALSE, 0, + "Specify the output data format, the available options are: plain (json), " + "csv, ltsv and template. If no value is set the outgoing data is formatted " + "using the tag and the record in json" + }, + + { + FLB_CONFIG_MAP_STR, "delimiter", NULL, + 0, FLB_FALSE, 0, + "Set a custom delimiter for the records" + }, + + { + FLB_CONFIG_MAP_STR, "label_delimiter", NULL, + 0, FLB_FALSE, 0, + "Set a custom label delimiter, to be used with 'ltsv' format" + }, + + { + FLB_CONFIG_MAP_STR, "template", "{time} {message}", + 0, FLB_TRUE, offsetof(struct flb_logrotate_conf, template), + "Set a custom template format for the data" + }, + + { + FLB_CONFIG_MAP_BOOL, "csv_column_names", "false", + 0, FLB_TRUE, offsetof(struct flb_logrotate_conf, csv_column_names), + "Add column names (keys) in the first line of the target file" + }, + + { + FLB_CONFIG_MAP_BOOL, "mkdir", "false", + 0, FLB_TRUE, offsetof(struct flb_logrotate_conf, mkdir), + "Recursively create output directory if it does not exist. Permissions set to 0755" + }, + + { + FLB_CONFIG_MAP_SIZE, "max_size", "100000000", + 0, FLB_TRUE, offsetof(struct flb_logrotate_conf, max_size), + "Maximum size of file before rotation (default: 100M)" + }, + + { + FLB_CONFIG_MAP_INT, "max_files", "7", + 0, FLB_TRUE, offsetof(struct flb_logrotate_conf, max_files), + "Maximum number of rotated files to keep (default: 7)" + }, + + { + FLB_CONFIG_MAP_BOOL, "gzip", "true", + 0, FLB_TRUE, offsetof(struct flb_logrotate_conf, gzip), + "Whether to gzip rotated files (default: true)" + }, + + /* EOF */ + {0} +}; + +struct flb_output_plugin out_logrotate_plugin = { + .name = "logrotate", + .description = "Generate log file with rotation", + .cb_init = cb_logrotate_init, + .cb_flush = cb_logrotate_flush, + .cb_exit = cb_logrotate_exit, + .flags = 0, + .workers = 1, + .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS, + .config_map = config_map, +}; diff --git a/plugins/out_logrotate/logrotate.h b/plugins/out_logrotate/logrotate.h new file mode 100644 index 00000000000..82d77de732f --- /dev/null +++ b/plugins/out_logrotate/logrotate.h @@ -0,0 +1,32 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_LOGROTATE +#define FLB_OUT_LOGROTATE + +enum { + FLB_OUT_LOGROTATE_FMT_JSON, + FLB_OUT_LOGROTATE_FMT_CSV, + FLB_OUT_LOGROTATE_FMT_LTSV, + FLB_OUT_LOGROTATE_FMT_PLAIN, + FLB_OUT_LOGROTATE_FMT_MSGPACK, + FLB_OUT_LOGROTATE_FMT_TEMPLATE, +}; + +#endif diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index dd76c16faee..91fa07eb21a 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -234,6 +234,7 @@ if(FLB_IN_LIB) # These plugins work only on Linux if(NOT FLB_SYSTEM_WINDOWS) FLB_RT_TEST(FLB_OUT_FILE "out_file.c") + FLB_RT_TEST(FLB_OUT_LOGROTATE "out_logrotate.c") endif() FLB_RT_TEST(FLB_OUT_S3 "out_s3.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") diff --git a/tests/runtime/out_logrotate.c b/tests/runtime/out_logrotate.c new file mode 100644 index 00000000000..82f7c18ca7e --- /dev/null +++ b/tests/runtime/out_logrotate.c @@ -0,0 +1,1274 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include "flb_tests_runtime.h" +#include +#include +#include +#include +#include +#include +#include +#include + +/* Test data */ +#include "data/common/json_invalid.h" /* JSON_INVALID */ +#include "data/common/json_long.h" /* JSON_LONG */ +#include "data/common/json_small.h" /* JSON_SMALL */ + +/* Test functions */ +void flb_test_logrotate_basic_rotation(void); +void flb_test_logrotate_gzip_compression(void); +void flb_test_logrotate_max_files_cleanup(void); +void flb_test_logrotate_format_csv(void); +void flb_test_logrotate_format_ltsv(void); +void flb_test_logrotate_format_plain(void); +void flb_test_logrotate_format_msgpack(void); +void flb_test_logrotate_format_template(void); +void flb_test_logrotate_path(void); +void flb_test_logrotate_mkdir(void); +void flb_test_logrotate_delimiter(void); +void flb_test_logrotate_label_delimiter(void); +void flb_test_logrotate_csv_column_names(void); +void flb_test_logrotate_multithreaded(void); + +/* Test list */ +TEST_LIST = { + {"basic_rotation", flb_test_logrotate_basic_rotation}, + {"gzip_compression", flb_test_logrotate_gzip_compression}, + {"max_files_cleanup", flb_test_logrotate_max_files_cleanup}, + {"format_csv", flb_test_logrotate_format_csv}, + {"format_ltsv", flb_test_logrotate_format_ltsv}, + {"format_plain", flb_test_logrotate_format_plain}, + {"format_msgpack", flb_test_logrotate_format_msgpack}, + {"format_template", flb_test_logrotate_format_template}, + {"path", flb_test_logrotate_path}, + {"mkdir", flb_test_logrotate_mkdir}, + {"delimiter", flb_test_logrotate_delimiter}, + {"label_delimiter", flb_test_logrotate_label_delimiter}, + {"csv_column_names", flb_test_logrotate_csv_column_names}, + {"multithreaded", flb_test_logrotate_multithreaded}, + {NULL, NULL} +}; + +#define TEST_LOGFILE "flb_test_logrotate.log" +#define TEST_LOGPATH "out_logrotate" +#define TEST_TIMEOUT 10 + +/* Helper function to recursively delete directory and all its contents */ +static int recursive_delete_directory(const char *dir_path) +{ + DIR *dir; + struct dirent *entry; + struct stat statbuf; + char path[PATH_MAX]; + int ret = 0; + + if (dir_path == NULL) { + return -1; + } + + /* Check if directory exists */ + if (stat(dir_path, &statbuf) != 0) { + /* Directory doesn't exist, consider it success */ + return 0; + } + + /* Check if it's actually a directory */ + if (!S_ISDIR(statbuf.st_mode)) { + /* Not a directory, try to remove as file */ + return remove(dir_path); + } + + /* Open directory */ + dir = opendir(dir_path); + if (dir == NULL) { + return -1; + } + + /* Iterate through directory entries */ + while ((entry = readdir(dir)) != NULL) { + /* Skip . and .. */ + if (strcmp(entry->d_name, ".") == 0 || + strcmp(entry->d_name, "..") == 0) { + continue; + } + + /* Build full path */ + snprintf(path, sizeof(path), "%s/%s", dir_path, entry->d_name); + + /* Get file status */ + if (stat(path, &statbuf) != 0) { + continue; + } + + /* Recursively delete subdirectories */ + if (S_ISDIR(statbuf.st_mode)) { + if (recursive_delete_directory(path) != 0) { + ret = -1; + } + } + else { + /* Delete file */ + if (unlink(path) != 0) { + ret = -1; + } + } + } + + closedir(dir); + + /* Remove the directory itself */ + if (rmdir(dir_path) != 0) { + ret = -1; + } + + return ret; +} + +/* Helper function to count files in directory */ +static int count_files_in_directory(const char *dir_path, const char *prefix) +{ + DIR *dir; + struct dirent *entry; + int count = 0; + + dir = opendir(dir_path); + if (dir == NULL) { + return -1; + } + + while ((entry = readdir(dir)) != NULL) { + if (strncmp(entry->d_name, prefix, strlen(prefix)) == 0) { + count++; + } + } + + closedir(dir); + return count; +} + +/* + * Helper function: Wait for a file matching the pattern "prefix*gz" to appear + * in dir_path + */ +static int wait_for_file_pattern(const char *dir_path, const char *prefix, + const char *suffix, int time_limit) +{ + int elapsed_time, found = 0; + DIR *dir; + struct dirent *entry; + size_t prefix_len = strlen(prefix); + size_t suffix_len = strlen(suffix); + + for (elapsed_time = 0; elapsed_time < time_limit && !found; elapsed_time++) { + dir = opendir(dir_path); + if (dir) { + while ((entry = readdir(dir)) != NULL) { + if (strncmp(entry->d_name, prefix, prefix_len) == 0 && + strlen(entry->d_name) > prefix_len + suffix_len && + strcmp(entry->d_name + strlen(entry->d_name) - suffix_len, + suffix) == 0) { + found = 1; + break; + } + } + closedir(dir); + } + if (!found) { + flb_time_msleep(1000); + } + } + return found ? 0 : -1; +} + +/* Helper function to read file content into buffer */ +static char *read_file_content(const char *filename, size_t *out_size) +{ + FILE *fp; + char *buffer; + struct stat st; + size_t size; + + if (stat(filename, &st) != 0) { + return NULL; + } + + size = st.st_size; + fp = fopen(filename, "rb"); + if (!fp) { + return NULL; + } + + buffer = flb_malloc(size + 1); + if (!buffer) { + fclose(fp); + return NULL; + } + + if (fread(buffer, 1, size, fp) != size) { + flb_free(buffer); + fclose(fp); + return NULL; + } + + buffer[size] = '\0'; + fclose(fp); + *out_size = size; + return buffer; +} + +/* Format Tests */ +void flb_test_logrotate_format_csv(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_csv.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "format", "csv", + "max_size", "100M", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify CSV format - should contain commas as delimiters */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* CSV should contain commas */ + TEST_CHECK(strstr(content, ",") != NULL); + /* CSV should contain timestamp */ + TEST_CHECK(strstr(content, "1448403340") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_ltsv(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_ltsv.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "format", "ltsv", + "max_size", "100M", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify LTSV format - should contain colons (label delimiter) and tabs */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* LTSV should contain colons for label:value pairs */ + TEST_CHECK(strstr(content, ":") != NULL); + /* Should contain "time" label */ + TEST_CHECK(strstr(content, "time") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_plain(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_plain.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "format", "plain", + "max_size", "100M", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify plain format - should be JSON without tag/timestamp prefix */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Plain format should contain JSON */ + TEST_CHECK(strstr(content, "{") != NULL); + /* Should not contain tag prefix like "test: [" */ + TEST_CHECK(strstr(content, "test: [") == NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_msgpack(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[512]; + struct stat st; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_msgpack.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "format", "msgpack", + "max_size", "100M", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify msgpack format - should be binary data */ + if (stat(logfile, &st) == 0) { + TEST_CHECK(st.st_size > 0); + /* Msgpack files should not be readable as text (no newlines in first bytes) */ + fp = fopen(logfile, "rb"); + if (fp) { + unsigned char first_bytes[10]; + size_t read_bytes = fread(first_bytes, 1, 10, fp); + fclose(fp); + if (read_bytes > 0) { + /* + * Msgpack typically starts with array markers (0x91, 0x92, etc.) + * or map markers. Just verify it's not plain text JSON. + */ + TEST_CHECK(first_bytes[0] != '{' && first_bytes[0] != '['); + } + } + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_template(void) +{ + int i; + int ret; + int bytes; + /* Use JSON with specific fields for template testing */ + const char *json_template = + "[1448403340, {\"message\": \"test log entry\", \"level\": \"info\"}]"; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_template.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "format", "template", + "template", "{time} {message}", + "max_size", "100M", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, (char *)json_template, strlen(json_template)); + TEST_CHECK(bytes == strlen(json_template)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify template format - should contain substituted values */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Template should contain the message value */ + TEST_CHECK(strstr(content, "test log entry") != NULL); + /* Should contain timestamp (as float) */ + TEST_CHECK(strstr(content, "1448403340") != NULL || strstr(content, ".") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +/* Configuration Option Tests */ +void flb_test_logrotate_path(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[PATH_MAX]; + char test_path[PATH_MAX]; + + snprintf(test_path, sizeof(test_path), "%s/path_test", TEST_LOGPATH); + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + /* Construct logfile path - test_path is short so this is safe */ + #pragma GCC diagnostic push + #pragma GCC diagnostic ignored "-Wformat-truncation" + snprintf(logfile, sizeof(logfile), "%s/path_test.log", test_path); + #pragma GCC diagnostic pop + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "path", test_path, + "file", "path_test.log", + "mkdir", "true", + "max_size", "100M", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify file was created in the specified path */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + fclose(fp); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_mkdir(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[PATH_MAX]; + char nested_path[PATH_MAX]; + struct stat st; + + snprintf(nested_path, sizeof(nested_path), "%s/nested/deep/path", TEST_LOGPATH); + /* Construct logfile path - nested_path is short so this is safe */ + #pragma GCC diagnostic push + #pragma GCC diagnostic ignored "-Wformat-truncation" + snprintf(logfile, sizeof(logfile), "%s/test_mkdir.log", nested_path); + #pragma GCC diagnostic pop + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "mkdir", "true", + "max_size", "100M", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify nested directory was created */ + TEST_CHECK(stat(nested_path, &st) == 0); + TEST_CHECK(S_ISDIR(st.st_mode)); + + /* Verify file was created */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + fclose(fp); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_delimiter(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_delimiter.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "format", "csv", + "delimiter", "tab", + "max_size", "100M", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify tab delimiter is used (should contain tabs, not commas) */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain tab characters */ + int has_tab = 0; + int j; + for (j = 0; j < content_size; j++) { + if (content[j] == '\t') { + has_tab = 1; + break; + } + } + TEST_CHECK(has_tab); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_label_delimiter(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_label_delimiter.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "format", "ltsv", + "label_delimiter", "comma", + "max_size", "100M", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify custom label delimiter is used */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain "," as label delimiter (comma) */ + TEST_CHECK(strstr(content, ",") != NULL); + /* Should contain "time" label with comma delimiter */ + /* LTSV format prints "time" (with quotes) followed by delimiter */ + TEST_CHECK(strstr(content, "\"time\",") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_csv_column_names(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_csv_columns.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "format", "csv", + "csv_column_names", "true", + "max_size", "100M", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify CSV column names header exists */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* First line should contain "timestamp" */ + TEST_CHECK(strstr(content, "timestamp") != NULL); + /* Should contain key names from JSON */ + TEST_CHECK(strstr(content, "key_0") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +/* Multithreaded Test */ +struct thread_data { + flb_ctx_t *ctx; + int in_ffd; + int thread_id; + int events_per_thread; + char *json_data; + size_t json_len; + int *success; + pthread_mutex_t *mutex; +}; + +static void *thread_worker(void *arg) +{ + struct thread_data *data = (struct thread_data *)arg; + int i; + int bytes; + + for (i = 0; i < data->events_per_thread; i++) { + bytes = flb_lib_push(data->ctx, data->in_ffd, data->json_data, data->json_len); + if (bytes != (int)data->json_len) { + pthread_mutex_lock(data->mutex); + *data->success = 0; + pthread_mutex_unlock(data->mutex); + return NULL; + } + /* Small delay to allow interleaving */ + flb_time_msleep(10); + } + + return NULL; +} + +void flb_test_logrotate_multithreaded(void) +{ + int ret; + int i; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + pthread_t threads[8]; + struct thread_data thread_data[8]; + pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + int success = 1; + int num_threads = 4; + int events_per_thread = 10; + FILE *fp; + char *content; + size_t content_size; + int line_count = 0; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, "test_multithreaded.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "0.5", "Grace", "2", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "max_size", "1M", + "max_files", "5", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Prepare thread data */ + for (i = 0; i < num_threads; i++) { + thread_data[i].ctx = ctx; + thread_data[i].in_ffd = in_ffd; + thread_data[i].thread_id = i; + thread_data[i].events_per_thread = events_per_thread; + thread_data[i].json_data = p; + thread_data[i].json_len = strlen(p); + thread_data[i].success = &success; + thread_data[i].mutex = &mutex; + } + + /* Create and start threads */ + for (i = 0; i < num_threads; i++) { + ret = pthread_create(&threads[i], NULL, thread_worker, &thread_data[i]); + TEST_CHECK(ret == 0); + } + + /* Wait for all threads to complete */ + for (i = 0; i < num_threads; i++) { + pthread_join(threads[i], NULL); + } + + /* Wait for flush to complete - allow multiple flush cycles */ + flb_time_msleep(3000); + + /* Wait for file to exist and have content before stopping */ + ret = wait_for_file(logfile, 1000, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify all data was written correctly */ + TEST_CHECK(success == 1); + + /* Verify file exists and has content */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + char line[4096]; + while (fgets(line, sizeof(line), fp) != NULL) { + line_count++; + } + fclose(fp); + } + + /* Should have at least num_threads * events_per_thread records */ + /* (may be more due to JSON format adding tag prefix) */ + TEST_CHECK(line_count >= num_threads * events_per_thread); + + /* Verify file content is valid - read and check for expected data */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain tag */ + TEST_CHECK(strstr(content, "test") != NULL); + /* Should contain timestamp */ + TEST_CHECK(strstr(content, "1448403340") != NULL); + /* Count occurrences of key_0 to verify records */ + int key_count = 0; + char *pos = content; + while ((pos = strstr(pos, "key_0")) != NULL) { + key_count++; + pos++; + } + TEST_CHECK(key_count >= num_threads * events_per_thread); + flb_free(content); + } + + pthread_mutex_destroy(&mutex); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_basic_rotation(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[512]; + time_t now = time(NULL); + struct tm *tm_info = localtime(&now); + char timestamp[32]; + + strftime(timestamp, sizeof(timestamp), "%Y%m%d_%H%M%S", tm_info); + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "max_size", "5K", + "max_files", "3", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to fill the file (JSON_SMALL is ~4KB, 4 events = ~16KB) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + /* Wait for file to be created */ + ret = wait_for_file(logfile, 10*1024, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Wait a bit more to ensure flush completes and file size is updated */ + flb_time_msleep(1500); + + /* Write additional data to trigger rotation (4 more events = ~16KB more) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that the original file exists */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp != NULL) { + fclose(fp); + } + + /* Check that at least one rotated file exists: "flb_test_logrotate.log.*" */ + TEST_CHECK(count_files_in_directory(TEST_LOGPATH, "flb_test_logrotate.log.") >= 1); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_gzip_compression(void) +{ + int i; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "max_size", "5K", + "max_files", "3", + "gzip", "true", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to for rotation to happen (JSON_SMALL is ~4KB) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + /* Wait for file to be created */ + ret = wait_for_file(logfile, 10*1024, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Write enough data to trigger rotation (JSON_SMALL is ~4KB) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(2000); /* waiting flush and rotation/compression */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that a gzipped rotated file exists: "flb_test_logrotate.log.*.gz" */ + ret = wait_for_file_pattern(TEST_LOGPATH, "flb_test_logrotate.log.", + ".gz", TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_max_files_cleanup(void) +{ + int i, j; + int ret; + int bytes; + char *p = (char *) JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int file_count; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + mkdir(TEST_LOGPATH, 0755); + snprintf(logfile, sizeof(logfile), "%s/%s", TEST_LOGPATH, TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", + "Log_Level", "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, + "match", "test", + "file", logfile, + "max_size", "5K", + "max_files", "3", + "gzip", "false", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to trigger multiple rotations */ + for (i = 0; i < 5; i++) { /* Write ~5MB to trigger multiple rotations */ + /* Write enough data to for rotation to happen (JSON_SMALL is ~4KB) */ + for (j = 0; j < 4; j++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); /* waiting flush */ + + file_count = count_files_in_directory(TEST_LOGPATH, TEST_LOGFILE); + TEST_ASSERT(file_count >= 0); + TEST_CHECK(file_count <= 4); + } + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that only Max_Files + 1 files exist (current + rotated) */ + file_count = count_files_in_directory(TEST_LOGPATH, TEST_LOGFILE); + TEST_ASSERT(file_count >= 0); + TEST_CHECK(file_count <= 4); /* Current file + 3 rotated files (max_files=3) */ + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +}