diff --git a/plugins/in_calyptia_fleet/in_calyptia_fleet.c b/plugins/in_calyptia_fleet/in_calyptia_fleet.c index e47fb6cf74b..52e45d406f7 100644 --- a/plugins/in_calyptia_fleet/in_calyptia_fleet.c +++ b/plugins/in_calyptia_fleet/in_calyptia_fleet.c @@ -63,6 +63,7 @@ static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx); static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx, time_t timestamp); +static void in_calyptia_fleet_destroy(struct flb_in_calyptia_fleet_config *ctx); #ifndef FLB_SYSTEM_WINDOWS @@ -633,8 +634,6 @@ static int execute_reload(struct flb_in_calyptia_fleet_config *ctx, flb_sds_t cf flb_plg_error(ctx->ins, "unable to change to configuration directory"); } - fleet_cur_chdir(ctx); - pthread_attr_init(&ptha); pthread_attr_setdetachstate(&ptha, PTHREAD_CREATE_DETACHED); pthread_create(&pth, &ptha, do_reload, reload); @@ -2214,7 +2213,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, /* Load the config map */ ret = flb_input_config_map_set(in, (void *) ctx); if (ret == -1) { - flb_free(ctx); + in_calyptia_fleet_destroy(ctx); flb_plg_error(in, "unable to load configuration"); return -1; } @@ -2225,7 +2224,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, if (tmpdir == NULL) { flb_plg_error(in, "unable to find temporary directory (%%TEMP%%)."); - flb_free(ctx); + in_calyptia_fleet_destroy(ctx); return -1; } @@ -2233,7 +2232,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, if (ctx->config_dir == NULL) { flb_plg_error(in, "unable to allocate config-dir."); - flb_free(ctx); + in_calyptia_fleet_destroy(ctx); return -1; } flb_sds_printf(&ctx->config_dir, "%s" PATH_SEPARATOR "%s", tmpdir, "calyptia-fleet"); @@ -2251,7 +2250,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, if (!ctx->u) { flb_plg_error(ctx->ins, "could not initialize upstream"); - flb_free(ctx); + in_calyptia_fleet_destroy(ctx); return -1; } @@ -2273,6 +2272,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, /* create fleet directory before creating the fleet header. */ if (create_fleet_directory(ctx) != 0) { flb_plg_error(ctx->ins, "unable to create fleet directories"); + in_calyptia_fleet_destroy(ctx); return -1; } @@ -2301,8 +2301,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in, if (ret == -1) { flb_plg_error(ctx->ins, "could not initialize collector for fleet input plugin"); - flb_upstream_destroy(ctx->u); - flb_free(ctx); + in_calyptia_fleet_destroy(ctx); return -1; } @@ -2325,10 +2324,11 @@ static void cb_in_calyptia_fleet_resume(void *data, struct flb_config *config) flb_input_collector_resume(ctx->collect_fd, ctx->ins); } -static int in_calyptia_fleet_exit(void *data, struct flb_config *config) +static void in_calyptia_fleet_destroy(struct flb_in_calyptia_fleet_config *ctx) { - (void) *config; - struct flb_in_calyptia_fleet_config *ctx = (struct flb_in_calyptia_fleet_config *)data; + if (!ctx) { + return; + } if (ctx->fleet_url) { flb_sds_destroy(ctx->fleet_url); @@ -2342,10 +2342,21 @@ static int in_calyptia_fleet_exit(void *data, struct flb_config *config) flb_sds_destroy(ctx->fleet_id); } - flb_input_collector_delete(ctx->collect_fd, ctx->ins); - flb_upstream_destroy(ctx->u); + if (ctx->collect_fd >= 0) { + flb_input_collector_delete(ctx->collect_fd, ctx->ins); + } + + if (ctx->u) { + flb_upstream_destroy(ctx->u); + } + flb_free(ctx); +} +static int in_calyptia_fleet_exit(void *data, struct flb_config *config) +{ + (void) *config; + in_calyptia_fleet_destroy((struct flb_in_calyptia_fleet_config *)data); return 0; } diff --git a/plugins/out_calyptia/calyptia.c b/plugins/out_calyptia/calyptia.c index 5f145075c1e..d26270583af 100644 --- a/plugins/out_calyptia/calyptia.c +++ b/plugins/out_calyptia/calyptia.c @@ -33,6 +33,7 @@ #include flb_sds_t custom_calyptia_pipeline_config_get(struct flb_config *ctx); +static void calyptia_ctx_destroy(struct flb_calyptia *ctx); static int get_io_flags(struct flb_output_instance *ins) { @@ -733,21 +734,20 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, /* Load the config map */ ret = flb_output_config_map_set(ins, (void *) ctx); if (ret == -1) { - flb_free(ctx); + calyptia_ctx_destroy(ctx); return NULL; } ctx->metrics_endpoint = flb_sds_create_size(256); if (!ctx->metrics_endpoint) { - flb_free(ctx); + calyptia_ctx_destroy(ctx); return NULL; } #ifdef FLB_HAVE_CHUNK_TRACE ctx->trace_endpoint = flb_sds_create_size(256); if (!ctx->trace_endpoint) { - flb_sds_destroy(ctx->metrics_endpoint); - flb_free(ctx); + calyptia_ctx_destroy(ctx); return NULL; } #endif @@ -755,13 +755,14 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, /* api_key */ if (!ctx->api_key) { flb_plg_error(ctx->ins, "configuration 'api_key' is missing"); - flb_free(ctx); + calyptia_ctx_destroy(ctx); return NULL; } /* parse 'add_label' */ ret = config_add_labels(ins, ctx); if (ret == -1) { + calyptia_ctx_destroy(ctx); return NULL; } @@ -775,6 +776,8 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, if (ctx->store_path) { ret = store_init(ctx); if (ret == -1) { + flb_output_set_context(ins, NULL); + calyptia_ctx_destroy(ctx); return NULL; } } @@ -782,6 +785,8 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, /* the machine-id is provided by custom calyptia, which invokes this plugin. */ if (!ctx->machine_id) { flb_plg_error(ctx->ins, "machine_id has not been set"); + flb_output_set_context(ins, NULL); + calyptia_ctx_destroy(ctx); return NULL; } @@ -793,6 +798,8 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins, ctx->cloud_host, ctx->cloud_port, flags, ctx->ins->tls); if (!ctx->u) { + flb_output_set_context(ins, NULL); + calyptia_ctx_destroy(ctx); return NULL; } @@ -878,12 +885,10 @@ static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes) cmt_destroy(cmt); } -static int cb_calyptia_exit(void *data, struct flb_config *config) +static void calyptia_ctx_destroy(struct flb_calyptia *ctx) { - struct flb_calyptia *ctx = data; - if (!ctx) { - return 0; + return; } if (ctx->u) { @@ -910,7 +915,7 @@ static int cb_calyptia_exit(void *data, struct flb_config *config) if (ctx->trace_endpoint) { flb_sds_destroy(ctx->trace_endpoint); } -#endif /* FLB_HAVE_CHUNK_TRACE */ +#endif if (ctx->fs) { flb_fstore_destroy(ctx->fs); @@ -918,7 +923,12 @@ static int cb_calyptia_exit(void *data, struct flb_config *config) flb_kv_release(&ctx->kv_labels); flb_free(ctx); +} +static int cb_calyptia_exit(void *data, struct flb_config *config) +{ + (void) config; + calyptia_ctx_destroy((struct flb_calyptia *) data); return 0; }