Skip to content

in_calyptia_fleet: fix init error mem leaks. #10502

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx);
static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx,
time_t timestamp);
static void in_calyptia_fleet_destroy(struct flb_in_calyptia_fleet_config *ctx);

#ifndef FLB_SYSTEM_WINDOWS

Expand Down Expand Up @@ -633,8 +634,6 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf
flb_plg_error(ctx->ins, "unable to change to configuration directory");
}

fleet_cur_chdir(ctx);

pthread_attr_init(&ptha);
pthread_attr_setdetachstate(&ptha, PTHREAD_CREATE_DETACHED);
pthread_create(&pth, &ptha, do_reload, reload);
Expand Down Expand Up @@ -2214,7 +2213,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
/* Load the config map */
ret = flb_input_config_map_set(in, (void *) ctx);
if (ret == -1) {
flb_free(ctx);
in_calyptia_fleet_destroy(ctx);
flb_plg_error(in, "unable to load configuration");
return -1;
}
Expand All @@ -2225,15 +2224,15 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,

if (tmpdir == NULL) {
flb_plg_error(in, "unable to find temporary directory (%%TEMP%%).");
flb_free(ctx);
in_calyptia_fleet_destroy(ctx);
return -1;
}

ctx->config_dir = flb_sds_create_size(CALYPTIA_MAX_DIR_SIZE);

if (ctx->config_dir == NULL) {
flb_plg_error(in, "unable to allocate config-dir.");
flb_free(ctx);
in_calyptia_fleet_destroy(ctx);
return -1;
}
flb_sds_printf(&ctx->config_dir, "%s" PATH_SEPARATOR "%s", tmpdir, "calyptia-fleet");
Expand All @@ -2251,7 +2250,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,

if (!ctx->u) {
flb_plg_error(ctx->ins, "could not initialize upstream");
flb_free(ctx);
in_calyptia_fleet_destroy(ctx);
return -1;
}

Expand All @@ -2273,6 +2272,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
/* create fleet directory before creating the fleet header. */
if (create_fleet_directory(ctx) != 0) {
flb_plg_error(ctx->ins, "unable to create fleet directories");
in_calyptia_fleet_destroy(ctx);
return -1;
}

Expand Down Expand Up @@ -2301,8 +2301,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,

if (ret == -1) {
flb_plg_error(ctx->ins, "could not initialize collector for fleet input plugin");
flb_upstream_destroy(ctx->u);
flb_free(ctx);
in_calyptia_fleet_destroy(ctx);
return -1;
}

Expand All @@ -2325,10 +2324,11 @@ static void cb_in_calyptia_fleet_resume(void *data, struct flb_config *config)
flb_input_collector_resume(ctx->collect_fd, ctx->ins);
}

static int in_calyptia_fleet_exit(void *data, struct flb_config *config)
static void in_calyptia_fleet_destroy(struct flb_in_calyptia_fleet_config *ctx)
{
(void) *config;
struct flb_in_calyptia_fleet_config *ctx = (struct flb_in_calyptia_fleet_config *)data;
if (!ctx) {
return;
}

if (ctx->fleet_url) {
flb_sds_destroy(ctx->fleet_url);
Expand All @@ -2342,10 +2342,21 @@ static int in_calyptia_fleet_exit(void *data, struct flb_config *config)
flb_sds_destroy(ctx->fleet_id);
}

flb_input_collector_delete(ctx->collect_fd, ctx->ins);
flb_upstream_destroy(ctx->u);
if (ctx->collect_fd >= 0) {
flb_input_collector_delete(ctx->collect_fd, ctx->ins);
}

if (ctx->u) {
flb_upstream_destroy(ctx->u);
}

flb_free(ctx);
}

static int in_calyptia_fleet_exit(void *data, struct flb_config *config)
{
(void) *config;
in_calyptia_fleet_destroy((struct flb_in_calyptia_fleet_config *)data);
return 0;
}

Expand Down
30 changes: 20 additions & 10 deletions plugins/out_calyptia/calyptia.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <cmetrics/cmt_encode_influx.h>

flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx);
static void calyptia_ctx_destroy(struct flb_calyptia *ctx);

static int get_io_flags(struct flb_output_instance *ins)
{
Expand Down Expand Up @@ -733,35 +734,35 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
/* Load the config map */
ret = flb_output_config_map_set(ins, (void *) ctx);
if (ret == -1) {
flb_free(ctx);
calyptia_ctx_destroy(ctx);
return NULL;
}

ctx->metrics_endpoint = flb_sds_create_size(256);
if (!ctx->metrics_endpoint) {
flb_free(ctx);
calyptia_ctx_destroy(ctx);
return NULL;
}

#ifdef FLB_HAVE_CHUNK_TRACE
ctx->trace_endpoint = flb_sds_create_size(256);
if (!ctx->trace_endpoint) {
flb_sds_destroy(ctx->metrics_endpoint);
flb_free(ctx);
calyptia_ctx_destroy(ctx);
return NULL;
}
#endif

/* api_key */
if (!ctx->api_key) {
flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
flb_free(ctx);
calyptia_ctx_destroy(ctx);
return NULL;
}

/* parse 'add_label' */
ret = config_add_labels(ins, ctx);
if (ret == -1) {
calyptia_ctx_destroy(ctx);
return NULL;
}

Expand All @@ -775,13 +776,17 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
if (ctx->store_path) {
ret = store_init(ctx);
if (ret == -1) {
flb_output_set_context(ins, NULL);
calyptia_ctx_destroy(ctx);
return NULL;
}
}

/* the machine-id is provided by custom calyptia, which invokes this plugin. */
if (!ctx->machine_id) {
flb_plg_error(ctx->ins, "machine_id has not been set");
flb_output_set_context(ins, NULL);
calyptia_ctx_destroy(ctx);
return NULL;
}

Expand All @@ -793,6 +798,8 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
ctx->cloud_host, ctx->cloud_port,
flags, ctx->ins->tls);
if (!ctx->u) {
flb_output_set_context(ins, NULL);
calyptia_ctx_destroy(ctx);
return NULL;
}

Expand Down Expand Up @@ -878,12 +885,10 @@ static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes)
cmt_destroy(cmt);
}

static int cb_calyptia_exit(void *data, struct flb_config *config)
static void calyptia_ctx_destroy(struct flb_calyptia *ctx)
{
struct flb_calyptia *ctx = data;

if (!ctx) {
return 0;
return;
}

if (ctx->u) {
Expand All @@ -910,15 +915,20 @@ static int cb_calyptia_exit(void *data, struct flb_config *config)
if (ctx->trace_endpoint) {
flb_sds_destroy(ctx->trace_endpoint);
}
#endif /* FLB_HAVE_CHUNK_TRACE */
#endif

if (ctx->fs) {
flb_fstore_destroy(ctx->fs);
}

flb_kv_release(&ctx->kv_labels);
flb_free(ctx);
}

static int cb_calyptia_exit(void *data, struct flb_config *config)
{
(void) config;
calyptia_ctx_destroy((struct flb_calyptia *) data);
return 0;
}

Expand Down
Loading