Skip to content

Commit 48ffe64

Browse files
committed
in_udp: Add a capabiliy to inject source IP
Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
1 parent fb7d4c8 commit 48ffe64

File tree

4 files changed

+135
-3
lines changed

4 files changed

+135
-3
lines changed

plugins/in_udp/udp.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ static struct flb_config_map config_map[] = {
174174
0, FLB_TRUE, offsetof(struct flb_in_udp_config, buffer_size_str),
175175
"Set the buffer size"
176176
},
177+
{
178+
FLB_CONFIG_MAP_STR, "source_address_key", (char *) NULL,
179+
0, FLB_TRUE, offsetof(struct flb_in_udp_config, source_address_key),
180+
"Key where the source address will be injected"
181+
},
177182
/* EOF */
178183
{0}
179184
};

plugins/in_udp/udp.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ struct flb_in_udp_config {
4343
char *port; /* Port */
4444
flb_sds_t raw_separator; /* Unescaped string delimiterr */
4545
flb_sds_t separator; /* String delimiter */
46+
flb_sds_t source_address_key; /* Source IP address */
4647
int collector_id; /* Listener collector id */
4748
struct flb_downstream *downstream; /* Client manager */
4849
struct udp_conn *dummy_conn; /* Datagram dummy connection */

plugins/in_udp/udp_conn.c

Lines changed: 126 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,84 @@ static inline void consume_bytes(char *buf, int bytes, int length)
3232
memmove(buf, buf + bytes, length - bytes);
3333
}
3434

35+
static int append_message_to_record_data(char **result_buffer,
36+
size_t *result_size,
37+
flb_sds_t message_key_name,
38+
char *base_object_buffer,
39+
size_t base_object_size,
40+
char *message_buffer,
41+
size_t message_size,
42+
int message_type)
43+
{
44+
int result;
45+
char *modified_data_buffer;
46+
int modified_data_size;
47+
msgpack_object_kv *new_map_entries[1];
48+
msgpack_object_kv message_entry;
49+
*result_buffer = NULL;
50+
*result_size = 0;
51+
modified_data_buffer = NULL;
52+
53+
if (message_key_name != NULL) {
54+
new_map_entries[0] = &message_entry;
55+
56+
message_entry.key.type = MSGPACK_OBJECT_STR;
57+
message_entry.key.via.str.size = flb_sds_len(message_key_name);
58+
message_entry.key.via.str.ptr = message_key_name;
59+
60+
if (message_type == MSGPACK_OBJECT_BIN) {
61+
message_entry.val.type = MSGPACK_OBJECT_BIN;
62+
message_entry.val.via.bin.size = message_size;
63+
message_entry.val.via.bin.ptr = message_buffer;
64+
}
65+
else if (message_type == MSGPACK_OBJECT_STR) {
66+
message_entry.val.type = MSGPACK_OBJECT_BIN;
67+
message_entry.val.via.str.size = message_size;
68+
message_entry.val.via.str.ptr = message_buffer;
69+
}
70+
else {
71+
result = FLB_MAP_NOT_MODIFIED;
72+
73+
return result;
74+
}
75+
76+
result = flb_msgpack_expand_map(base_object_buffer,
77+
base_object_size,
78+
new_map_entries, 1,
79+
&modified_data_buffer,
80+
&modified_data_size);
81+
if (result == 0) {
82+
result = FLB_MAP_EXPAND_SUCCESS;
83+
}
84+
}
85+
86+
if (result != FLB_MAP_EXPAND_SUCCESS) {
87+
result = FLB_MAP_EXPANSION_ERROR;
88+
89+
return result;
90+
}
91+
92+
*result_buffer = modified_data_buffer;
93+
*result_size = modified_data_size;
94+
95+
return result;
96+
}
97+
3598
static inline int process_pack(struct udp_conn *conn,
3699
char *pack, size_t size)
37100
{
38101
int ret;
39102
size_t off = 0;
40103
msgpack_unpacked result;
41104
msgpack_object entry;
105+
msgpack_sbuffer sbuf;
106+
msgpack_packer pck;
42107
struct flb_in_udp_config *ctx;
108+
char *appended_address_buffer;
109+
size_t appended_address_size;
110+
char *source_address;
111+
int i;
112+
int len;
43113

44114
ctx = conn->ctx;
45115

@@ -50,23 +120,72 @@ static inline int process_pack(struct udp_conn *conn,
50120
while (msgpack_unpack_next(&result, pack, size, &off) == MSGPACK_UNPACK_SUCCESS) {
51121
entry = result.data;
52122

123+
appended_address_buffer = NULL;
124+
source_address = NULL;
53125

54126
ret = flb_log_event_encoder_begin_record(ctx->log_encoder);
55127

56128
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
57129
ret = flb_log_event_encoder_set_current_timestamp(ctx->log_encoder);
58130
}
59131

132+
if (ctx->source_address_key != NULL) {
133+
source_address = flb_connection_get_remote_address(conn->connection);
134+
}
135+
60136
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
61137
if (entry.type == MSGPACK_OBJECT_MAP) {
62-
ret = flb_log_event_encoder_set_body_from_msgpack_object(
63-
ctx->log_encoder, &entry);
138+
if (source_address != NULL) {
139+
msgpack_sbuffer_init(&sbuf);
140+
msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);
141+
142+
len = entry.via.map.size;
143+
msgpack_pack_map(&pck, len);
144+
145+
for (i=0; i<len; i++) {
146+
msgpack_pack_object(&pck, entry.via.map.ptr[i].key);
147+
msgpack_pack_object(&pck, entry.via.map.ptr[i].val);
148+
}
149+
150+
ret = append_message_to_record_data(&appended_address_buffer,
151+
&appended_address_size,
152+
ctx->source_address_key,
153+
sbuf.data,
154+
sbuf.size,
155+
source_address,
156+
strlen(source_address),
157+
MSGPACK_OBJECT_STR);
158+
msgpack_sbuffer_destroy(&sbuf);
159+
}
160+
161+
if (ret == FLB_MAP_EXPANSION_ERROR) {
162+
flb_plg_debug(ctx->ins, "error expanding source_address : %d", ret);
163+
}
164+
165+
if (appended_address_buffer != NULL) {
166+
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
167+
ctx->log_encoder, appended_address_buffer, appended_address_size);
168+
}
169+
else {
170+
ret = flb_log_event_encoder_set_body_from_msgpack_object(
171+
ctx->log_encoder, &entry);
172+
}
64173
}
65174
else if (entry.type == MSGPACK_OBJECT_ARRAY) {
66-
ret = flb_log_event_encoder_append_body_values(
175+
if (source_address != NULL) {
176+
ret = flb_log_event_encoder_append_body_values(
177+
ctx->log_encoder,
178+
FLB_LOG_EVENT_CSTRING_VALUE("msg"),
179+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry),
180+
FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key),
181+
FLB_LOG_EVENT_CSTRING_VALUE(source_address));
182+
}
183+
else {
184+
ret = flb_log_event_encoder_append_body_values(
67185
ctx->log_encoder,
68186
FLB_LOG_EVENT_CSTRING_VALUE("msg"),
69187
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&entry));
188+
}
70189
}
71190
else {
72191
ret = FLB_EVENT_ENCODER_ERROR_INVALID_VALUE_TYPE;
@@ -76,6 +195,10 @@ static inline int process_pack(struct udp_conn *conn,
76195
ret = flb_log_event_encoder_commit_record(ctx->log_encoder);
77196
}
78197

198+
if (appended_address_buffer != NULL) {
199+
flb_free(appended_address_buffer);
200+
}
201+
79202
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
80203
break;
81204
}

plugins/in_udp/udp_conn.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525

2626
#define FLB_IN_UDP_CHUNK "32768"
2727

28+
#define FLB_MAP_EXPAND_SUCCESS 0
29+
#define FLB_MAP_NOT_MODIFIED -1
30+
#define FLB_MAP_EXPANSION_ERROR -2
2831

2932
struct udp_conn_stream {
3033
char *tag;

0 commit comments

Comments
 (0)