@@ -1469,6 +1469,13 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
14691469) (error , []byte ) {
14701470 metric := newMetric (runType , envId , "EOR" )
14711471 defer monitoring .TimerSendSingle (& metric , monitoring .Millisecond )()
1472+ eor := "EOR"
1473+ detectorDurations := map [dcspb.Detector ]time.Duration {}
1474+ start := time .Now ()
1475+
1476+ wholeMetric := newMetric (eor )
1477+ wholeMetric .AddTag ("detector" , "All" )
1478+ defer monitoring .TimerSendSingle (& wholeMetric , monitoring .Millisecond )()
14721479
14731480 var dcsEvent * dcspb.RunEvent
14741481 var err error
@@ -1581,6 +1588,7 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
15811588 }
15821589
15831590 detectorStatusMap [dcsEvent .GetDetector ()] = dcsEvent .GetState ()
1591+ detectorDurations [dcsEvent .GetDetector ()] = time .Since (start )
15841592 ecsDet := dcsToEcsDetector (dcsEvent .GetDetector ())
15851593
15861594 if dcsEvent .GetState () == dcspb .DetectorState_EOR_FAILURE {
@@ -1692,6 +1700,9 @@ func EORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
16921700 })
16931701 }
16941702 }
1703+
1704+ convertAndSendDetectorDurationsAndStates (eor , detectorStatusMap , detectorDurations , & wholeMetric )
1705+
16951706 return err , payloadJsonForKafka
16961707}
16971708
@@ -1701,6 +1712,13 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
17011712) (error , []byte ) {
17021713 metric := newMetric (runType , envId , "SOR" )
17031714 defer monitoring .TimerSendSingle (& metric , monitoring .Millisecond )()
1715+ sor := "SOR"
1716+ detectorDurations := map [dcspb.Detector ]time.Duration {}
1717+ start := time .Now ()
1718+
1719+ wholeMetric := newMetric (sor )
1720+ wholeMetric .AddTag ("detector" , "All" )
1721+ defer monitoring .TimerSendSingle (& wholeMetric , monitoring .Millisecond )()
17041722
17051723 var dcsEvent * dcspb.RunEvent
17061724 var err error
@@ -1813,6 +1831,7 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
18131831 }
18141832
18151833 detectorStatusMap [dcsEvent .GetDetector ()] = dcsEvent .GetState ()
1834+ detectorDurations [dcsEvent .GetDetector ()] = time .Since (start )
18161835 ecsDet := dcsToEcsDetector (dcsEvent .GetDetector ())
18171836
18181837 if dcsEvent .GetState () == dcspb .DetectorState_SOR_FAILURE {
@@ -1961,15 +1980,43 @@ func SORgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
19611980 })
19621981 }
19631982 }
1983+
1984+ convertAndSendDetectorDurationsAndStates (sor , detectorStatusMap , detectorDurations , & wholeMetric )
1985+
19641986 return err , payloadJsonForKafka
19651987}
19661988
1989+ func convertAndSendDetectorDurationsAndStates (method string , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState , detectorDurations map [dcspb.Detector ]time.Duration , wholeMetric * monitoring.Metric ) {
1990+ resultsMap := make (map [dcspb.DetectorState ]int )
1991+ for dcsDet , state := range detectorStatusMap {
1992+ metric := newMetric (method )
1993+ det := dcsToEcsDetector (dcsDet )
1994+ metric .AddTag ("detector" , det )
1995+ metric .AddTag ("state" , dcspb .DetectorState_name [int32 (state )])
1996+ resultsMap [state ] += 1
1997+ if duration , ok := detectorDurations [dcsDet ]; ok {
1998+ metric .SetFieldInt64 ("execution_time_ms" , duration .Milliseconds ())
1999+ monitoring .Send (& metric )
2000+ }
2001+ }
2002+ for detectorState , detectorCount := range resultsMap {
2003+ wholeMetric .SetFieldInt64 (dcspb .DetectorState_name [int32 (detectorState )], int64 (detectorCount ))
2004+ }
2005+ }
2006+
19672007func PFRgRPCCommunicationLoop (ctx context.Context , timeout time.Duration , call * callable.Call , envId string ,
19682008 payloadJsonForKafka []byte , stream dcspb.Configurator_StartOfRunClient , detectorStatusMap map [dcspb.Detector ]dcspb.DetectorState ,
19692009 callFailedStr string , payload map [string ]interface {}, runType string ,
19702010) (error , []byte ) {
19712011 metric := newMetric (runType , envId , "PFR" )
19722012 defer monitoring .TimerSendSingle (& metric , monitoring .Millisecond )()
2013+ pfr := "PFR"
2014+ detectorDurations := map [dcspb.Detector ]time.Duration {}
2015+ start := time .Now ()
2016+
2017+ wholeMetric := newMetric (pfr )
2018+ wholeMetric .AddTag ("detector" , "All" )
2019+ defer monitoring .TimerSendSingle (& wholeMetric , monitoring .Millisecond )()
19732020
19742021 var err error
19752022 var dcsEvent * dcspb.RunEvent
@@ -2083,6 +2130,7 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
20832130 }
20842131
20852132 detectorStatusMap [dcsEvent .GetDetector ()] = dcsEvent .GetState ()
2133+ detectorDurations [dcsEvent .GetDetector ()] = time .Since (start )
20862134 ecsDet := dcsToEcsDetector (dcsEvent .GetDetector ())
20872135
20882136 if dcsEvent .GetState () == dcspb .DetectorState_SOR_FAILURE {
@@ -2230,6 +2278,9 @@ func PFRgRPCCommunicationLoop(ctx context.Context, timeout time.Duration, call *
22302278 })
22312279 }
22322280 }
2281+
2282+ convertAndSendDetectorDurationsAndStates (pfr , detectorStatusMap , detectorDurations , & wholeMetric )
2283+
22332284 return err , payloadJsonForKafka
22342285}
22352286
0 commit comments