diff --git a/internal/command/command_service.go b/internal/command/command_service.go index a242b92a2..83a4c62a2 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -104,12 +104,16 @@ func (cs *CommandService) UpdateDataPlaneStatus( cs.subscribeClientMutex.Unlock() return nil, errors.New("command service client is not initialized") } - response, updateError := cs.commandServiceClient.UpdateDataPlaneStatus(ctx, request) + + grpcCtx, cancel := context.WithTimeout(ctx, cs.agentConfig.Client.Grpc.ResponseTimeout) + defer cancel() + + response, updateError := cs.commandServiceClient.UpdateDataPlaneStatus(grpcCtx, request) cs.subscribeClientMutex.Unlock() validatedError := grpc.ValidateGrpcError(updateError) if validatedError != nil { - slog.ErrorContext(ctx, "Failed to send update data plane status", "error", validatedError) + slog.ErrorContext(grpcCtx, "Failed to send update data plane status", "error", validatedError) return nil, validatedError } @@ -384,13 +388,16 @@ func (cs *CommandService) dataPlaneHealthCallback( return nil, errors.New("command service client is not initialized") } - response, updateError := cs.commandServiceClient.UpdateDataPlaneHealth(ctx, request) + grpcCtx, cancel := context.WithTimeout(ctx, cs.agentConfig.Client.Grpc.ResponseTimeout) + defer cancel() + + response, updateError := cs.commandServiceClient.UpdateDataPlaneHealth(grpcCtx, request) cs.subscribeClientMutex.Unlock() validatedError := grpc.ValidateGrpcError(updateError) if validatedError != nil { - slog.ErrorContext(ctx, "Failed to send update data plane health", "error", validatedError) + slog.ErrorContext(grpcCtx, "Failed to send update data plane health", "error", validatedError) return nil, validatedError } @@ -558,13 +565,16 @@ func (cs *CommandService) connectCallback( request *mpi.CreateConnectionRequest, ) func() (*mpi.CreateConnectionResponse, error) { return func() (*mpi.CreateConnectionResponse, error) { + grpcCtx, cancel := context.WithTimeout(ctx, cs.agentConfig.Client.Grpc.ResponseTimeout) + defer cancel() + cs.subscribeClientMutex.Lock() - response, connectErr := cs.commandServiceClient.CreateConnection(ctx, request) + response, connectErr := cs.commandServiceClient.CreateConnection(grpcCtx, request) cs.subscribeClientMutex.Unlock() validatedError := grpc.ValidateGrpcError(connectErr) if validatedError != nil { - slog.ErrorContext(ctx, "Failed to create connection", "error", validatedError) + slog.ErrorContext(grpcCtx, "Failed to create connection", "error", validatedError) return nil, validatedError } diff --git a/internal/config/config.go b/internal/config/config.go index 75c829e64..1709190cd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -629,6 +629,12 @@ func registerClientFlags(fs *flag.FlagSet) { "Max file size in bytes.", ) + fs.Duration( + ClientGRPCResponseTimeoutKey, + DefResponseTimeout, + "Duration to wait for a response before retrying request", + ) + fs.Int( ClientGRPCMaxParallelFileOperationsKey, DefMaxParallelFileOperations, @@ -1111,6 +1117,7 @@ func resolveClient() *Client { MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey), MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey), FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey), + ResponseTimeout: viperInstance.GetDuration(ClientGRPCResponseTimeoutKey), MaxParallelFileOperations: viperInstance.GetInt(ClientGRPCMaxParallelFileOperationsKey), }, Backoff: &BackOff{ diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 5c4384ba8..6058eea24 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -1184,6 +1184,7 @@ func createConfig() *Config { MaxFileSize: 485753, FileChunkSize: 48575, MaxParallelFileOperations: 10, + ResponseTimeout: 30 * time.Second, }, Backoff: &BackOff{ InitialInterval: 200 * time.Millisecond, diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 0f1e08075..8a34e6dd8 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -66,6 +66,7 @@ const ( DefMaxFileSize uint32 = 1048576 // 1MB DefFileChunkSize uint32 = 524288 // 0.5MB DefMaxParallelFileOperations = 5 + DefResponseTimeout = 10 * time.Second // Client HTTP Settings DefHTTPTimeout = 10 * time.Second diff --git a/internal/config/flags.go b/internal/config/flags.go index d0f664540..a6ac9aac9 100644 --- a/internal/config/flags.go +++ b/internal/config/flags.go @@ -41,6 +41,7 @@ var ( ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size" ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size" ClientGRPCMaxParallelFileOperationsKey = pre(ClientRootKey) + "grpc_max_parallel_file_operations" + ClientGRPCResponseTimeoutKey = pre(ClientRootKey) + "grpc_response_timeout" ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval" ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval" diff --git a/internal/config/testdata/nginx-agent.conf b/internal/config/testdata/nginx-agent.conf index 2ac87b9ee..e09a2a5f3 100644 --- a/internal/config/testdata/nginx-agent.conf +++ b/internal/config/testdata/nginx-agent.conf @@ -53,6 +53,7 @@ client: max_message_receive_size: 1048575 max_message_send_size: 1048575 max_file_size: 485753 + response_timeout: 30s file_chunk_size: 48575 max_parallel_file_operations: 10 backoff: diff --git a/internal/config/types.go b/internal/config/types.go index 72eda1369..385be2a58 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -93,7 +93,8 @@ type ( //nolint:lll // max line limit exceeded GRPC struct { - KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"` + KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"` + ResponseTimeout time.Duration `yaml:"response_timeout" mapstructure:"response_timeout"` // if MaxMessageSize is size set then we use that value, // otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"` diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go index 19211c600..6fce049b3 100644 --- a/internal/file/file_service_operator.go +++ b/internal/file/file_service_operator.go @@ -182,12 +182,15 @@ func (fso *FileServiceOperator) UpdateOverview( "request", request, "parent_correlation_id", correlationID, ) - response, updateError := fso.fileServiceClient.UpdateOverview(newCtx, request) + grpcCtx, cancel := context.WithTimeout(ctx, fso.agentConfig.Client.Grpc.ResponseTimeout) + defer cancel() + + response, updateError := fso.fileServiceClient.UpdateOverview(grpcCtx, request) validatedError := internalgrpc.ValidateGrpcError(updateError) if validatedError != nil { - slog.ErrorContext(newCtx, "Failed to send update overview", "error", validatedError) + slog.ErrorContext(grpcCtx, "Failed to send update overview", "error", validatedError) return nil, validatedError } diff --git a/nginx-agent.conf b/nginx-agent.conf index 559754f43..a77d2dac8 100644 --- a/nginx-agent.conf +++ b/nginx-agent.conf @@ -17,6 +17,7 @@ allowed_directories: - /usr/share/nginx/modules - /var/run/nginx - /var/log/nginx + # # Command server settings to connect to a management plane server # diff --git a/test/config/agent/nginx-agent-with-auxiliary-command.conf b/test/config/agent/nginx-agent-with-auxiliary-command.conf index 759e8b8fd..5c9ced8ad 100644 --- a/test/config/agent/nginx-agent-with-auxiliary-command.conf +++ b/test/config/agent/nginx-agent-with-auxiliary-command.conf @@ -19,7 +19,10 @@ auxiliary_command: port: 9095 type: grpc - +client: + grpc: + response_timeout: 2s + allowed_directories: - /etc/nginx - /usr/local/etc/nginx diff --git a/test/config/agent/nginx-config-with-grpc-client.conf b/test/config/agent/nginx-config-with-grpc-client.conf index e04c7593e..1bd3b3cc1 100644 --- a/test/config/agent/nginx-config-with-grpc-client.conf +++ b/test/config/agent/nginx-config-with-grpc-client.conf @@ -13,6 +13,10 @@ command: port: 9092 type: grpc +client: + grpc: + response_timeout: 2s + allowed_directories: - /etc/nginx - /usr/local/etc/nginx diff --git a/test/config/agent/nginx-config-with-max-file-size.conf b/test/config/agent/nginx-config-with-max-file-size.conf index 0558d6cec..68d4a7227 100644 --- a/test/config/agent/nginx-config-with-max-file-size.conf +++ b/test/config/agent/nginx-config-with-max-file-size.conf @@ -14,7 +14,8 @@ command: client: - grpc: + grpc: + response_timeout: 2s max_file_size: 524288 file_chunk_size: 262144 diff --git a/test/integration/managementplane/config_upload_test.go b/test/integration/managementplane/config_upload_test.go index 92e9bf1ae..884ddc79c 100644 --- a/test/integration/managementplane/config_upload_test.go +++ b/test/integration/managementplane/config_upload_test.go @@ -38,7 +38,7 @@ func (s *MPITestSuite) TearDownTest() { func (s *MPITestSuite) SetupSuite() { slog.Info("starting MPI tests") s.ctx = context.Background() - s.teardownTest = utils.SetupConnectionTest(s.T(), true, false, false, + s.teardownTest = utils.SetupConnectionTest(s.T(), false, false, false, "../../config/agent/nginx-config-with-grpc-client.conf") s.nginxInstanceID = utils.VerifyConnection(s.T(), 2, utils.MockManagementPlaneAPIAddress) responses := utils.ManagementPlaneResponses(s.T(), 1, utils.MockManagementPlaneAPIAddress) diff --git a/test/integration/nginxless/nginx_less_mpi_connection_test.go b/test/integration/nginxless/nginx_less_mpi_connection_test.go index c5f58a50f..cf3246f7e 100644 --- a/test/integration/nginxless/nginx_less_mpi_connection_test.go +++ b/test/integration/nginxless/nginx_less_mpi_connection_test.go @@ -17,7 +17,7 @@ import ( // Verify that the agent sends a connection request to Management Plane even when Nginx is not present func TestNginxLessGrpc_Connection(t *testing.T) { slog.Info("starting nginxless connection test") - teardownTest := utils.SetupConnectionTest(t, true, true, false, + teardownTest := utils.SetupConnectionTest(t, false, true, false, "../../config/agent/nginx-config-with-grpc-client.conf") defer teardownTest(t) diff --git a/test/mock/grpc/mock_management_command_service.go b/test/mock/grpc/mock_management_command_service.go index f68c4c7cd..d39fa5269 100644 --- a/test/mock/grpc/mock_management_command_service.go +++ b/test/mock/grpc/mock_management_command_service.go @@ -33,19 +33,21 @@ import ( type CommandService struct { mpi.UnimplementedCommandServiceServer + instanceFiles map[string][]*mpi.File + firstConnectionCallCh chan struct{} server *gin.Engine connectionRequest *mpi.CreateConnectionRequest requestChan chan *mpi.ManagementPlaneRequest updateDataPlaneStatusRequest *mpi.UpdateDataPlaneStatusRequest updateDataPlaneHealthRequest *mpi.UpdateDataPlaneHealthRequest - instanceFiles map[string][]*mpi.File // key is instanceID - configDirectory string externalFileServer string + configDirectory string dataPlaneResponses []*mpi.DataPlaneResponse - updateDataPlaneHealthMutex sync.Mutex - connectionMutex sync.Mutex - updateDataPlaneStatusMutex sync.Mutex dataPlaneResponsesMutex sync.Mutex + updateDataPlaneStatusMutex sync.Mutex + connectionMutex sync.Mutex + updateDataPlaneHealthMutex sync.Mutex + firstConnectionCallFlag bool } func init() { @@ -66,6 +68,8 @@ func NewCommandService( configDirectory: configDirectory, externalFileServer: externalFileServer, instanceFiles: make(map[string][]*mpi.File), + firstConnectionCallCh: make(chan struct{}), + firstConnectionCallFlag: false, } handler := slog.NewTextHandler( @@ -109,13 +113,25 @@ func (cs *CommandService) CreateConnection( ) { slog.DebugContext(ctx, "Create connection request", "request", request) + // This checks if this is the first create connection call, this is done to test the logic in Agent where + // if Agent does not get a response to a request after a certain amount of time it will resend the request + if !cs.firstConnectionCallFlag { + cs.firstConnectionCallFlag = true + slog.DebugContext(ctx, "First CreateConnection call: blocking until second call") + <-cs.firstConnectionCallCh + } else { + slog.DebugContext(ctx, "Second CreateConnection call: unblocking first call") + close(cs.firstConnectionCallCh) + } + + cs.connectionMutex.Lock() + defer cs.connectionMutex.Unlock() + if request == nil { return nil, errors.New("empty connection request") } - cs.connectionMutex.Lock() cs.connectionRequest = request - cs.connectionMutex.Unlock() return &mpi.CreateConnectionResponse{ Response: &mpi.CommandResponse{