diff --git a/plugins/in_udp/udp_conn.c b/plugins/in_udp/udp_conn.c index ea85783df68..39106078dad 100644 --- a/plugins/in_udp/udp_conn.c +++ b/plugins/in_udp/udp_conn.c @@ -264,6 +264,7 @@ static ssize_t parse_payload_none(struct udp_conn *conn) char *buf; char *s; char *separator; + char *source_address; struct flb_in_udp_config *ctx; ctx = conn->ctx; @@ -282,17 +283,30 @@ static ssize_t parse_payload_none(struct udp_conn *conn) break; } else if (len > 0) { + source_address = NULL; ret = flb_log_event_encoder_begin_record(ctx->log_encoder); if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder); } - + if (ctx->source_address_key != NULL) { + source_address = flb_connection_get_remote_address(conn->connection); + } if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_values( - ctx->log_encoder, - FLB_LOG_EVENT_CSTRING_VALUE("log"), - FLB_LOG_EVENT_STRING_VALUE(buf, len)); + if (source_address != NULL) { + ret = flb_log_event_encoder_append_body_values( + ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("log"), + FLB_LOG_EVENT_STRING_VALUE(buf, len), + FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key), + FLB_LOG_EVENT_CSTRING_VALUE(source_address)); + } + else { + ret = flb_log_event_encoder_append_body_values( + ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("log"), + FLB_LOG_EVENT_STRING_VALUE(buf, len)); + } } if (ret == FLB_EVENT_ENCODER_SUCCESS) { diff --git a/tests/runtime/in_udp.c b/tests/runtime/in_udp.c index df60c923fad..e59f0c368ad 100644 --- a/tests/runtime/in_udp.c +++ b/tests/runtime/in_udp.c @@ -431,10 +431,77 @@ void flb_test_format_none_separator() test_ctx_destroy(ctx); } +void flb_test_format_none_with_source_address() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct sockaddr_in addr; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + + char *buf = "message\n"; + size_t size = strlen(buf); + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + cb_data.data = "\"log\":\"message\",\"source_host\":\"udp://"; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "format", "none", + "source_address_key", "source_host", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* use default host/port */ + fd = init_udp(NULL, -1, &addr); + if (!TEST_CHECK(fd >= 0)) { + exit(EXIT_FAILURE); + } + + w_size = sendto(fd, buf, size, 0, (const struct sockaddr *)&addr, sizeof(addr)); + if (!TEST_CHECK(w_size == size)) { + TEST_MSG("failed to send, errno=%d", errno); + flb_socket_close(fd); + test_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + TEST_LIST = { {"udp", flb_test_udp}, {"udp_with_source_address", flb_test_udp_with_source_address}, {"format_none", flb_test_format_none}, {"format_none_separator", flb_test_format_none_separator}, + {"format_none_with_source_address", flb_test_format_none_with_source_address}, {NULL, NULL} };