Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 56 additions & 8 deletions tools/runtime/sflow/sflow-to-rrd-handler
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# sflow-to-rrd-handler
#
# Copyright (C) 2009 - 2019 Internet Neutral Exchange Association Company Limited By Guarantee.
# Copyright (C) 2009 - 2025 Internet Neutral Exchange Association Company Limited By Guarantee.
# All Rights Reserved.
#
# This file is part of IXP Manager.
Expand Down Expand Up @@ -36,16 +36,21 @@ use Time::HiRes qw(ualarm gettimeofday tv_interval);
use JSON;
use REST::Client;

use threads;

use FindBin qw($Bin);
use File::Spec;
use lib File::Spec->catdir( $Bin, File::Spec->updir(), File::Spec->updir(), 'perl-lib', 'IXPManager', 'lib' );

use IXPManager::Config;

my $ixp = new IXPManager::Config (dbase_disable => 1); # (configfile => $configfile);
my $ixp = new IXPManager::Config; # (configfile => $configfile);
my $dbh = $ixp->{db};

my $debug = defined($ixp->{ixp}->{debug}) ? $ixp->{ixp}->{debug} : 0;
my $insanedebug = 0;
my $map_vlans = defined($ixp->{ixp}->{map_vlans}) ? $ixp->{ixp}->{map_vlans} : 0;
my $map_vlan_query = $ixp->{ixp}->{map_vlan_query};
my $rrdcached = defined($ixp->{ixp}->{sflow_rrdcached}) ? $ixp->{ixp}->{sflow_rrdcached} : 1;
my $sflowtool = defined($ixp->{ixp}->{sflowtool}) ? $ixp->{ixp}->{sflowtool} : '/usr/bin/sflowtool';
my $sflowtool_opts = defined($ixp->{ixp}->{sflowtool_opts}) ? $ixp->{ixp}->{sflowtool_opts} : '-l';
Expand All @@ -59,6 +64,7 @@ my $daemon = 1;
my $infraid = undef;
my $macdbtype = defined($ixp->{ixp}->{macdbtype}) ? $ixp->{ixp}->{macdbtype} : '';
my $macdbrest;
my $vlan_mappings;

# conundrum: do we run GetOptions() before creating a new IXPManager::Config
# object, which would allow us to set the configfile location on the command
Expand Down Expand Up @@ -106,10 +112,17 @@ my $mactable = reload_mactable($client, $macdbrest);
$mactable || die "FATAL: could not read IXP Manager API call on $apibaseurl$macdbrest\n";
my $matrix = matrix_init($mactable, $infraid);

if ($map_vlans) {
$vlan_mappings = reload_vlan_mappings($dbh,$map_vlan_query);
$debug && print Dumper ($vlan_mappings);
}

my $execute_periodic = 0;
my $quit_after_periodic = 0;
my $mactablereloadfails = 0;

my $thread;

# handle signals gracefully
$SIG{TERM} = sub { $execute_periodic = 1; $quit_after_periodic = 1; };
$SIG{QUIT} = sub { $execute_periodic = 1; $quit_after_periodic = 1; };
Expand All @@ -132,7 +145,7 @@ my $sflowpid = open (SFLOWTOOL, '-|', $sflowtool, split(' ', $sflowtool_opts));
# methodology is to throw away as much crap as possible before parsing
while (<SFLOWTOOL>) {
next unless (substr($_, 0, 4) eq 'FLOW'); # don't use regexp here for performance reasons
my ($ipprotocol);
my ($ipprotocol,$pvlan);

chomp;

Expand All @@ -152,6 +165,14 @@ while (<SFLOWTOOL>) {
next;
}

$pvlan = $vlan;

# if enabled, correct VLAN if there are VLAN translations e.g. for resold customers:
if ($vlan_mappings->{$vlan}->{'vlan'}) {
print STDERR "DEBUG: [VLAN mapping: VLAN $vlan -> $vlan_mappings->{$vlan}->{'vlan'}]\n" if ($insanedebug);
$vlan = $vlan_mappings->{$vlan}->{'vlan'};
}

my $srcvli = getvlifrommactable ($mactable, $infraid, $vlan, $srcmac);
my $dstvli = getvlifrommactable ($mactable, $infraid, $vlan, $dstmac);

Expand All @@ -164,7 +185,7 @@ while (<SFLOWTOOL>) {
if ($ipprotocol && $srcvli && $dstvli && ($srcvli != $dstvli) ) {
$insanedebug && print STDERR "DEBUG: accepted update for: ".
"protocol: $ipprotocol ".
"vlan: $vlan ".
"vlan: $vlan (pvlan: $pvlan) ".
"srcvli: $srcvli ".
"dstvli: $dstvli ".
"pktsize: $pktsize ".
Expand All @@ -179,7 +200,7 @@ while (<SFLOWTOOL>) {
} else {
$debug && print STDERR "DEBUG: dropped update for: ".
"protocol: $ipprotocol ".
"vlan: $vlan ".
"vlan: $vlan (pvlan: $pvlan) ".
"srcvli: $srcvli ".
"dstvli: $dstvli ".
"pktsize: $pktsize ".
Expand All @@ -196,8 +217,11 @@ while (<SFLOWTOOL>) {
my $newtv = [gettimeofday()];
my $interval = tv_interval($tv, $newtv);
$tv = $newtv;
my $flush_start_time = time();
$debug && print STDERR "DEBUG: starting rrd flush at time interval: $interval, time: ".time()."\n";
process_rrd($interval, $matrix, $rrdcached);
$thread = threads->create('process_rrd', $interval, $matrix, $rrdcached);
$thread->detach();

if ($quit_after_periodic) {
$debug && print STDERR "DEBUG: orderly quit at ".time()."\n";
exit 0;
Expand All @@ -217,7 +241,8 @@ while (<SFLOWTOOL>) {
die "FATAL: could not reload mactable after $mactabletimeout seconds. Aborting.\n";
}
}
$debug && print STDERR "DEBUG: flush completed at ".time()."\n";
my $flush_run_time = time() - $flush_start_time;
$debug && print STDERR "DEBUG: flush completed at ".time()." (".$flush_run_time."s)\n";
}
}

Expand All @@ -229,6 +254,7 @@ kill 9, $sflowpid;
# oops, we should never exit
die "Oops, input pipe died. Aborting.\n";


#
# write traffic matrix out to RRD file while calculating totals
#
Expand Down Expand Up @@ -283,7 +309,7 @@ sub process_rrd {

# write per-vlan aggregates out to rrd
$rrdfile = sprintf ("$basedir/ipv$ipprotocol/$rrdtype/aggregate/aggregate.ipv$ipprotocol.$rrdtype.vlan%05d.rrd", $vlan);
$debug && print STDERR "DEBUG: aggregate: building update for vlan: $vlan type: $rrdtype protocol: $ipprotocol file: $rrdfile\n";
$insanedebug && print STDERR "DEBUG: aggregate: building update for vlan: $vlan type: $rrdtype protocol: $ipprotocol file: $rrdfile\n";

my $in = $matrix->{aggregate}->{$ipprotocol}->{$rrdtype}->{$vlan}->{in};
my $out = $matrix->{aggregate}->{$ipprotocol}->{$rrdtype}->{$vlan}->{out};
Expand Down Expand Up @@ -440,3 +466,25 @@ sub reload_mactable

return $json;
}

#
# VLAN translation mappings
#
sub reload_vlan_mappings {

my ($d,$sql) = @_;

return undef unless ($sql);

my ($s, $mapping);

$debug && print STDERR "DEBUG: sql $sql\n";

$s = $d->prepare("$sql");
$s->execute();

$mapping = $s->fetchall_hashref('pvlan');

return $mapping;

}