diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index 77da340d81e..d33b36fbadc 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -124,28 +124,37 @@ static int fw_unix_create(struct flb_in_fw_config *ctx) static int in_fw_collect(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { + int state_backup; struct flb_connection *connection; struct fw_conn *conn; struct flb_in_fw_config *ctx; ctx = in_context; + state_backup = ctx->state; + ctx->state = FW_INSTANCE_STATE_ACCEPTING_CLIENT; + connection = flb_downstream_conn_get(ctx->downstream); if (connection == NULL) { flb_plg_error(ctx->ins, "could not accept new connection"); + ctx->state = state_backup; return -1; } if (!config->is_ingestion_active) { flb_downstream_conn_release(connection); + ctx->state = state_backup; + return -1; } if(ctx->is_paused) { flb_downstream_conn_release(connection); flb_plg_trace(ins, "TCP connection will be closed FD=%i", connection->fd); + ctx->state = state_backup; + return -1; } @@ -154,9 +163,17 @@ static int in_fw_collect(struct flb_input_instance *ins, conn = fw_conn_add(connection, ctx); if (!conn) { flb_downstream_conn_release(connection); + ctx->state = state_backup; + return -1; } + ctx->state = state_backup; + + if (ctx->state == FW_INSTANCE_STATE_PAUSED) { + fw_conn_del_all(ctx); + } + return 0; } @@ -263,6 +280,7 @@ static int in_fw_init(struct flb_input_instance *ins, return -1; } + ctx->state = FW_INSTANCE_STATE_RUNNING; ctx->coll_fd = -1; ctx->ins = ins; mk_list_init(&ctx->connections); @@ -386,7 +404,10 @@ static void in_fw_pause(void *data, struct flb_config *config) return; } - fw_conn_del_all(ctx); + if (ctx->state == FW_INSTANCE_STATE_RUNNING) { + fw_conn_del_all(ctx); + } + ctx->is_paused = FLB_TRUE; ret = pthread_mutex_unlock(&ctx->conn_mutex); if (ret != 0) { @@ -406,6 +427,8 @@ static void in_fw_pause(void *data, struct flb_config *config) if (config->is_ingestion_active == FLB_FALSE) { fw_conn_del_all(ctx); } + + ctx->state = FW_INSTANCE_STATE_PAUSED; } static void in_fw_resume(void *data, struct flb_config *config) { @@ -427,6 +450,8 @@ static void in_fw_resume(void *data, struct flb_config *config) { flb_plg_error(ctx->ins, "cannot unlock collector mutex"); return; } + + ctx->state = FW_INSTANCE_STATE_RUNNING; } } diff --git a/plugins/in_forward/fw.h b/plugins/in_forward/fw.h index 527b4859dd9..42557ef4ee1 100644 --- a/plugins/in_forward/fw.h +++ b/plugins/in_forward/fw.h @@ -25,6 +25,12 @@ #include #include +#define FW_INSTANCE_STATE_RUNNING 0 +#define FW_INSTANCE_STATE_ACCEPTING_CLIENT 1 +#define FW_INSTANCE_STATE_PROCESSING_PACKET 2 +#define FW_INSTANCE_STATE_PAUSED 3 + + enum { FW_HANDSHAKE_HELO = 1, FW_HANDSHAKE_PINGPONG = 2, @@ -76,6 +82,8 @@ struct flb_in_fw_config { pthread_mutex_t conn_mutex; + int state; + /* Plugin is paused */ int is_paused; }; diff --git a/plugins/in_forward/fw_conn.c b/plugins/in_forward/fw_conn.c index 116c56fb2fc..af503426f37 100644 --- a/plugins/in_forward/fw_conn.c +++ b/plugins/in_forward/fw_conn.c @@ -28,8 +28,7 @@ #include "fw_prot.h" #include "fw_conn.h" -/* Callback invoked every time an event is triggered for a connection */ -int fw_conn_event(void *data) +static int fw_conn_event_internal(struct flb_connection *connection) { int ret; int bytes; @@ -39,9 +38,6 @@ int fw_conn_event(void *data) struct fw_conn *conn; struct mk_event *event; struct flb_in_fw_config *ctx; - struct flb_connection *connection; - - connection = (struct flb_connection *) data; conn = connection->user_data; @@ -127,6 +123,37 @@ int fw_conn_event(void *data) return 0; } +/* Callback invoked every time an event is triggered for a connection */ +int fw_conn_event(void *data) +{ + struct flb_in_fw_config *ctx; + struct fw_conn *conn; + int result; + struct flb_connection *connection; + int state_backup; + + connection = (struct flb_connection *) data; + + conn = connection->user_data; + + ctx = conn->ctx; + + state_backup = ctx->state; + + ctx->state = FW_INSTANCE_STATE_PROCESSING_PACKET; + + result = fw_conn_event_internal(connection); + + if (ctx->state == FW_INSTANCE_STATE_PROCESSING_PACKET) { + ctx->state = state_backup; + } + else if (ctx->state == FW_INSTANCE_STATE_PAUSED) { + fw_conn_del_all(ctx); + } + + return result; +} + /* Create a new Forward request instance */ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_config *ctx) {