diff --git a/controlplane/ffi/agent.go b/controlplane/ffi/agent.go index 67ba8dae..c6453088 100644 --- a/controlplane/ffi/agent.go +++ b/controlplane/ffi/agent.go @@ -77,6 +77,42 @@ func (m ShmDeviceConfig) AsRawPtr() unsafe.Pointer { return unsafe.Pointer(m.ptr) } +type FFIAgent interface { + DPAgent + DPDataProvider +} + +type DPAgent interface { + ModuleAgent + FunctionAgent + PipelineAgent + DeviceAgent + TakeError() error + CleanError() + CleanUp() error + Close() error +} + +type ModuleAgent interface { + UpdateModules(modules []ModuleConfig) error + DeleteModuleConfig(configName string) error +} + +type FunctionAgent interface { + UpdateFunction(functionConfig FunctionConfig) error + DeleteFunction(name string) error +} + +type PipelineAgent interface { + UpdatePipeline(pipelineConfig PipelineConfig) error + DeletePipeline(name string) error +} + +type DeviceAgent interface { + UpdatePlainDevices(devices []DeviceConfig) error + UpdateDevices(devices []ShmDeviceConfig) error +} + type Agent struct { name string ptr *C.struct_agent @@ -133,8 +169,8 @@ func (m *Agent) UpdateModules(modules []ModuleConfig) error { return nil } -func (m *Agent) DPConfig() *DPConfig { - return &DPConfig{ +func (m *Agent) DPConfig() DPConfig { + return &DPConfigImpl{ ptr: C.agent_dp_config(m.ptr), } } diff --git a/controlplane/ffi/shm.go b/controlplane/ffi/shm.go index b72d0204..67faecb3 100644 --- a/controlplane/ffi/shm.go +++ b/controlplane/ffi/shm.go @@ -20,6 +20,36 @@ import ( "github.com/yanet-platform/yanet2/bindings/go/cerrors" ) +type DPDataProvider interface { + DPConfig() DPConfig +} + +type DPConfig interface { + DPObserver + CounterAggregator + AllModulePositions(string) []ModuleReference +} + +type DPObserver interface { + NumaIdx() uint32 + WorkerCount() uint32 + Modules() []DPModule + CPConfigs() []CPConfig + Functions() []Function + Pipelines() []Pipeline + Agents() []AgentInfo + Devices() []DeviceInfo +} + +type CounterAggregator interface { + DeviceCounters(deviceName string) []CounterInfo + PipelineCounters(deviceName string, pipelineName string) []CounterInfo + FunctionCounters(deviceName string, pipelineName string, functionName string) []CounterInfo + ChainCounters(deviceName string, pipelineName string, functionName string, chainName string) []CounterInfo + ModuleCounters(deviceName string, pipelineName string, functionName string, chainName string, moduleType string, moduleName string, counterQuery []string) []CounterInfo + PerformanceCounters(deviceName string, pipelineName string, functionName string, chainName string, moduleType string, moduleName string) (*PerformanceCounters, error) +} + // SharedMemory represents a handle to YANET shared memory segment. type SharedMemory struct { ptr *C.struct_yanet_shm @@ -64,10 +94,10 @@ func (m *SharedMemory) AsRawPtr() unsafe.Pointer { } // DPConfig gets configuration of the dataplane instance from shared memory. -func (m *SharedMemory) DPConfig(instanceIdx uint32) *DPConfig { +func (m *SharedMemory) DPConfig(instanceIdx uint32) DPConfig { ptr := C.yanet_shm_dp_config(m.ptr, C.uint32_t(instanceIdx)) - return &DPConfig{ptr: ptr} + return &DPConfigImpl{ptr: ptr} } // AgentAttach attaches a module agent to shared memory on the dataplane instance. @@ -136,24 +166,24 @@ func (m *SharedMemory) AgentsAttach( } // DPConfig represents a handle to dataplane configuration. -type DPConfig struct { +type DPConfigImpl struct { ptr *C.struct_dp_config } -func NewDPConfigFromRaw(ptr unsafe.Pointer) *DPConfig { - return &DPConfig{ptr: (*C.struct_dp_config)(ptr)} +func NewDPConfigFromRaw(ptr unsafe.Pointer) *DPConfigImpl { + return &DPConfigImpl{ptr: (*C.struct_dp_config)(ptr)} } -func (m *DPConfig) NumaIdx() uint32 { +func (m *DPConfigImpl) NumaIdx() uint32 { return uint32(C.dataplane_instance_numa_idx(m.ptr)) } -func (m *DPConfig) WorkerCount() uint32 { +func (m *DPConfigImpl) WorkerCount() uint32 { return uint32(C.dataplane_instance_worker_count(m.ptr)) } // Modules returns a list of dataplane modules available. -func (m *DPConfig) Modules() []DPModule { +func (m *DPConfigImpl) Modules() []DPModule { ptr := C.yanet_get_dp_module_list_info(m.ptr) defer C.dp_module_list_info_free(ptr) @@ -174,7 +204,7 @@ func (m *DPConfig) Modules() []DPModule { return out } -func (m *DPConfig) CPConfigs() []CPConfig { +func (m *DPConfigImpl) CPConfigs() []CPConfig { cpModulesListInfo := C.yanet_get_cp_module_list_info(m.ptr) defer C.cp_module_list_info_free(cpModulesListInfo) @@ -210,7 +240,7 @@ type Function struct { } // Functions returns all functions configurations from the dataplane. -func (m *DPConfig) Functions() []Function { +func (m *DPConfigImpl) Functions() []Function { functionListInfo := C.yanet_get_cp_function_list_info(m.ptr) defer C.cp_function_list_info_free(functionListInfo) @@ -251,7 +281,7 @@ func (m *DPConfig) Functions() []Function { } // Pipelines returns all pipeline configurations from the dataplane. -func (m *DPConfig) Pipelines() []Pipeline { +func (m *DPConfigImpl) Pipelines() []Pipeline { pipelineListInfo := C.yanet_get_cp_pipeline_list_info(m.ptr) defer C.cp_pipeline_list_info_free(pipelineListInfo) @@ -278,7 +308,7 @@ func (m *DPConfig) Pipelines() []Pipeline { } // Agents returns all agent information from the dataplane. -func (m *DPConfig) Agents() []AgentInfo { +func (m *DPConfigImpl) Agents() []AgentInfo { agentListInfo := C.yanet_get_cp_agent_list_info(m.ptr) defer C.cp_agent_list_info_free(agentListInfo) @@ -384,7 +414,7 @@ type DeviceInfo struct { } // Devices returns all device information from the dataplane. -func (m *DPConfig) Devices() []DeviceInfo { +func (m *DPConfigImpl) Devices() []DeviceInfo { deviceListInfo := C.yanet_get_cp_device_list_info(m.ptr) if deviceListInfo == nil { return nil @@ -446,7 +476,7 @@ type CounterInfo struct { Values [][]uint64 } -func (m *DPConfig) encodeCounters( +func (m *DPConfigImpl) encodeCounters( counters *C.struct_counter_handle_list, ) []CounterInfo { res := make([]CounterInfo, 0) @@ -481,7 +511,7 @@ func (m *DPConfig) encodeCounters( return res } -func (m *DPConfig) DeviceCounters( +func (m *DPConfigImpl) DeviceCounters( deviceName string, ) []CounterInfo { cDeviceName := C.CString(deviceName) @@ -497,7 +527,7 @@ func (m *DPConfig) DeviceCounters( } // PipelineCounters returns pipeline counters -func (m *DPConfig) PipelineCounters( +func (m *DPConfigImpl) PipelineCounters( deviceName string, pipelineName string, ) []CounterInfo { @@ -515,7 +545,7 @@ func (m *DPConfig) PipelineCounters( return m.encodeCounters(counters) } -func (m *DPConfig) FunctionCounters( +func (m *DPConfigImpl) FunctionCounters( deviceName string, pipelineName string, functionName string, @@ -541,7 +571,7 @@ func (m *DPConfig) FunctionCounters( return m.encodeCounters(counters) } -func (m *DPConfig) ChainCounters( +func (m *DPConfigImpl) ChainCounters( deviceName string, pipelineName string, functionName string, @@ -574,7 +604,7 @@ func (m *DPConfig) ChainCounters( // ModuleCounters returns module counters, optionally filtered by name. // // If counterQuery is nil or empty, returns all counters. -func (m *DPConfig) ModuleCounters( +func (m *DPConfigImpl) ModuleCounters( deviceName string, pipelineName string, functionName string, @@ -661,7 +691,7 @@ type PerformanceCounters struct { // Performance counters provide detailed timing and batch processing statistics // for module execution, including mean latency and latency distribution across // different batch sizes, as well as tx/rx packet and byte counters. -func (m *DPConfig) PerformanceCounters( +func (m *DPConfigImpl) PerformanceCounters( deviceName string, pipelineName string, functionName string, diff --git a/controlplane/internal/gateway/inspect_service.go b/controlplane/internal/gateway/inspect_service.go index efb116df..0e2fbc37 100644 --- a/controlplane/internal/gateway/inspect_service.go +++ b/controlplane/internal/gateway/inspect_service.go @@ -47,7 +47,7 @@ func (m *InspectService) Inspect( return response, nil } -func (m *InspectService) dpModules(dpConfig *ffi.DPConfig) []*ynpb.DPModuleInfo { +func (m *InspectService) dpModules(dpConfig ffi.DPConfig) []*ynpb.DPModuleInfo { modules := dpConfig.Modules() out := make([]*ynpb.DPModuleInfo, len(modules)) @@ -60,7 +60,7 @@ func (m *InspectService) dpModules(dpConfig *ffi.DPConfig) []*ynpb.DPModuleInfo return out } -func (m *InspectService) cpConfigs(dpConfig *ffi.DPConfig) []*ynpb.CPConfigInfo { +func (m *InspectService) cpConfigs(dpConfig ffi.DPConfig) []*ynpb.CPConfigInfo { configs := dpConfig.CPConfigs() out := make([]*ynpb.CPConfigInfo, len(configs)) @@ -75,7 +75,7 @@ func (m *InspectService) cpConfigs(dpConfig *ffi.DPConfig) []*ynpb.CPConfigInfo return out } -func (m *InspectService) functions(dpConfig *ffi.DPConfig) []*ynpb.FunctionInfo { +func (m *InspectService) functions(dpConfig ffi.DPConfig) []*ynpb.FunctionInfo { functions := dpConfig.Functions() if len(functions) == 0 { return nil @@ -109,7 +109,7 @@ func (m *InspectService) functions(dpConfig *ffi.DPConfig) []*ynpb.FunctionInfo return out } -func (m *InspectService) pipelines(dpConfig *ffi.DPConfig) []*ynpb.PipelineInfo { +func (m *InspectService) pipelines(dpConfig ffi.DPConfig) []*ynpb.PipelineInfo { pipelines := dpConfig.Pipelines() out := make([]*ynpb.PipelineInfo, len(pipelines)) @@ -128,7 +128,7 @@ func (m *InspectService) pipelines(dpConfig *ffi.DPConfig) []*ynpb.PipelineInfo return out } -func (m *InspectService) agents(dpConfig *ffi.DPConfig) []*ynpb.AgentInfo { +func (m *InspectService) agents(dpConfig ffi.DPConfig) []*ynpb.AgentInfo { agents := dpConfig.Agents() out := make([]*ynpb.AgentInfo, len(agents)) @@ -154,7 +154,7 @@ func (m *InspectService) agents(dpConfig *ffi.DPConfig) []*ynpb.AgentInfo { return out } -func (m *InspectService) devices(dpConfig *ffi.DPConfig) []*ynpb.DeviceInfo { +func (m *InspectService) devices(dpConfig ffi.DPConfig) []*ynpb.DeviceInfo { devices := dpConfig.Devices() if len(devices) == 0 { return nil @@ -189,6 +189,6 @@ func (m *InspectService) devices(dpConfig *ffi.DPConfig) []*ynpb.DeviceInfo { return out } -func (m *InspectService) numaIdx(dpConfig *ffi.DPConfig) uint32 { +func (m *InspectService) numaIdx(dpConfig ffi.DPConfig) uint32 { return dpConfig.NumaIdx() } diff --git a/modules/balancer/agent/go/ffi/agent.go b/modules/balancer/agent/go/ffi/agent.go index 4e96e993..9cb1b8e4 100644 --- a/modules/balancer/agent/go/ffi/agent.go +++ b/modules/balancer/agent/go/ffi/agent.go @@ -106,7 +106,7 @@ func (a *BalancerAgent) Inspect() *AgentInspect { return inspect } -func (a *BalancerAgent) DPConfig() *yanet.DPConfig { +func (a *BalancerAgent) DPConfig() yanet.DPConfig { dpConfig := C.balancer_agent_dp_config(a.handle) return yanet.NewDPConfigFromRaw(unsafe.Pointer(dpConfig)) } diff --git a/modules/forward/controlplane/backend.go b/modules/forward/controlplane/backend.go index f7d72c45..64dc61d2 100644 --- a/modules/forward/controlplane/backend.go +++ b/modules/forward/controlplane/backend.go @@ -41,3 +41,7 @@ func (m *backend) UpdateModule(name string, rules []cforward.ForwardRule) (Modul func (m *backend) DeleteModule(name string) error { return m.agent.DeleteModuleConfig(name) } + +func (m *backend) Agent() ffi.FFIAgent { + return m.agent +} diff --git a/modules/forward/controlplane/forwardpb/forward.proto b/modules/forward/controlplane/forwardpb/forward.proto index e70d6574..c5272f99 100644 --- a/modules/forward/controlplane/forwardpb/forward.proto +++ b/modules/forward/controlplane/forwardpb/forward.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package forwardpb; import "common/filterpb/filter.proto"; +import "common/commonpb/metric.proto"; option go_package = "github.com/yanet-platform/yanet2/modules/forward/controlplane/forwardpb;forwardpb"; @@ -20,6 +21,8 @@ service ForwardService { // Allows to delete forward config if it is not referenced // by any pipeline. rpc DeleteConfig(DeleteConfigRequest) returns (DeleteConfigResponse); + + rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse); } message Config { @@ -77,3 +80,7 @@ message DeleteConfigRequest { } message DeleteConfigResponse { bool deleted = 1; } + +message GetMetricsRequest {} + +message GetMetricsResponse { repeated commonpb.Metric metrics = 1; } diff --git a/modules/forward/controlplane/metrics.go b/modules/forward/controlplane/metrics.go new file mode 100644 index 00000000..726ea640 --- /dev/null +++ b/modules/forward/controlplane/metrics.go @@ -0,0 +1,80 @@ +package forward + +import ( + "github.com/yanet-platform/yanet2/common/commonpb" +) + +func (m *ForwardService) Metrics() ([]*commonpb.Metric, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.collectMetrics() +} + +func makeGauge(name string, value float64, labels ...*commonpb.Label) *commonpb.Metric { + return &commonpb.Metric{ + Name: name, + Labels: labels, + Value: &commonpb.Metric_Gauge{Gauge: value}, + } +} + +func makeCounter(name string, value uint64, labels ...*commonpb.Label) *commonpb.Metric { + return &commonpb.Metric{ + Name: name, + Labels: labels, + Value: &commonpb.Metric_Counter{Counter: value}, + } +} + +func (m *ForwardService) collectMetrics() ([]*commonpb.Metric, error) { + dpConfig := m.backend.Agent().DPConfig() + positions := dpConfig.AllModulePositions("forward") + + result := make([]*commonpb.Metric, 0) + for _, pos := range positions { + configName := pos.ModuleName + + baseLabels := []*commonpb.Label{ + {Name: "config", Value: configName}, + {Name: "device", Value: pos.Device}, + {Name: "pipeline", Value: pos.Pipeline}, + {Name: "function", Value: pos.Function}, + {Name: "chain", Value: pos.Chain}, + } + + counters := dpConfig.ModuleCounters( + pos.Device, + pos.Pipeline, + pos.Function, + pos.Chain, + "forward", + configName, + nil, + ) + + for _, counter := range counters { + // Sum values across all workers + var cntPackets, bytes uint64 + for _, workerVals := range counter.Values { + if len(workerVals) > 0 { + cntPackets += workerVals[0] + } + if len(workerVals) > 1 { + bytes += workerVals[1] + } + } + + if cntPackets == 0 && bytes == 0 { + continue + } + + result = append(result, + makeCounter(counter.Name+"_packets", cntPackets, baseLabels...), + makeCounter(counter.Name+"_bytes", bytes, baseLabels...), + ) + + } + } + + return result, nil +} diff --git a/modules/forward/controlplane/service.go b/modules/forward/controlplane/service.go index 2eb8b9b0..bdc50586 100644 --- a/modules/forward/controlplane/service.go +++ b/modules/forward/controlplane/service.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/status" "github.com/yanet-platform/yanet2/common/filterpb" + "github.com/yanet-platform/yanet2/controlplane/ffi" "github.com/yanet-platform/yanet2/modules/forward/bindings/go/cforward" "github.com/yanet-platform/yanet2/modules/forward/controlplane/forwardpb" ) @@ -25,6 +26,8 @@ type Backend interface { UpdateModule(name string, rules []cforward.ForwardRule) (ModuleHandle, error) // DeleteModule removes a module config. DeleteModule(name string) error + // Agent returns the FFI agent. + Agent() ffi.FFIAgent } type forwardConfig struct { @@ -191,3 +194,20 @@ func (m *ForwardService) DeleteConfig(ctx context.Context, req *forwardpb.Delete return &forwardpb.DeleteConfigResponse{Deleted: true}, nil } + +func (m *ForwardService) GetMetrics( + ctx context.Context, + req *forwardpb.GetMetricsRequest, +) (*forwardpb.GetMetricsResponse, error) { + m.mu.Lock() + defer m.mu.Unlock() + + metrics, err := m.collectMetrics() + if err != nil { + return nil, err + } + + return &forwardpb.GetMetricsResponse{ + Metrics: metrics, + }, nil +} diff --git a/modules/forward/controlplane/service_test.go b/modules/forward/controlplane/service_test.go index 6669c0f5..e86eef36 100644 --- a/modules/forward/controlplane/service_test.go +++ b/modules/forward/controlplane/service_test.go @@ -27,6 +27,10 @@ func (m *mockBackend) DeleteModule(name string) error { return nil } +func (m *mockBackend) Agent() forward.FFIAgent { + return nil +} + // Run with: go test -race func TestForwardServiceConcurrentAccess(t *testing.T) { svc := forward.NewForwardService(&mockBackend{})