@@ -191,6 +191,7 @@ def perform_account_aggr_job(*args, **job_params):
191
191
values .extend (NetFlowBot .get_top_N_IPs_for_entity_interfaces (interval_label , last_used_ts , max_ts , time_between , direction , entity_id , entity_ip ))
192
192
values .extend (NetFlowBot .get_top_N_protocols_for_entity (interval_label , last_used_ts , max_ts , time_between , direction , entity_id , entity_ip ))
193
193
values .extend (NetFlowBot .get_top_N_protocols_for_entity_interfaces (interval_label , last_used_ts , max_ts , time_between , direction , entity_id , entity_ip ))
194
+ values .extend (NetFlowBot .get_top_N_connections_for_entity (interval_label , last_used_ts , max_ts , time_between , direction , entity_id , entity_ip ))
194
195
195
196
if not values :
196
197
log .warning ("No values found to be sent to Grafolean" )
@@ -419,6 +420,39 @@ def get_top_N_protocols_for_entity(interval_label, last_used_ts, max_ts, time_be
419
420
420
421
return values
421
422
423
+ @staticmethod
424
+ @slow_down
425
+ def get_top_N_connections_for_entity (interval_label , last_used_ts , max_ts , time_between , direction , entity_id , entity_ip ):
426
+ with get_db_cursor () as c :
427
+ values = []
428
+ c .execute (f"""
429
+ SELECT
430
+ f.ipv4_src_addr, f.ipv4_dst_addr,
431
+ sum(f.in_bytes) "traffic"
432
+ FROM
433
+ { DB_PREFIX } flows "f"
434
+ WHERE
435
+ f.client_ip = %s AND
436
+ f.ts > %s AND
437
+ f.ts <= %s AND
438
+ f.direction = %s
439
+ GROUP BY
440
+ f.ipv4_src_addr, f.ipv4_dst_addr
441
+ ORDER BY
442
+ traffic desc
443
+ LIMIT { TOP_N_MAX } ;
444
+ """ , (entity_ip , last_used_ts , max_ts , direction ,))
445
+
446
+ output_path_entity = NetFlowBot .construct_output_path_prefix (interval_label , direction , entity_id , interface = None )
447
+ for ipv4_src_addr , ipv4_dst_addr , traffic_bytes in c .fetchall ():
448
+ output_path = f"{ output_path_entity } .topconn.{ path_part_encode (ipv4_src_addr )} .{ path_part_encode (ipv4_dst_addr )} "
449
+ values .append ({
450
+ 'p' : output_path ,
451
+ 'v' : traffic_bytes / time_between , # Bps
452
+ })
453
+
454
+ return values
455
+
422
456
# @staticmethod
423
457
# @slow_down
424
458
# def get_top_N_protocols(output_path_prefix, from_time, to_time, interface_index, is_direction_in=True):
0 commit comments