Skip to content

Commit b1947ec

Browse files
authored
Download and Upload Files in Parallel (#1376)
1 parent 34a13e6 commit b1947ec

File tree

9 files changed

+92
-57
lines changed

9 files changed

+92
-57
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ require (
8181
go.uber.org/multierr v1.11.0
8282
go.uber.org/zap v1.27.0
8383
golang.org/x/mod v0.29.0
84+
golang.org/x/sync v0.17.0
8485
google.golang.org/protobuf v1.36.10
8586
)
8687

@@ -352,7 +353,6 @@ require (
352353
golang.org/x/arch v0.20.0 // indirect
353354
golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b // indirect
354355
golang.org/x/oauth2 v0.32.0 // indirect
355-
golang.org/x/sync v0.17.0 // indirect
356356
golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 // indirect
357357
golang.org/x/time v0.14.0 // indirect
358358
golang.org/x/tools v0.38.0 // indirect

internal/config/config.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,12 @@ func registerClientFlags(fs *flag.FlagSet) {
628628
DefMaxFileSize,
629629
"Max file size in bytes.",
630630
)
631+
632+
fs.Int(
633+
ClientGRPCMaxParallelFileOperationsKey,
634+
DefMaxParallelFileOperations,
635+
"Maximum number of file downloads or uploads performed in parallel",
636+
)
631637
}
632638

633639
func registerCommandFlags(fs *flag.FlagSet) {
@@ -1100,11 +1106,12 @@ func resolveClient() *Client {
11001106
Time: viperInstance.GetDuration(ClientKeepAliveTimeKey),
11011107
PermitWithoutStream: viperInstance.GetBool(ClientKeepAlivePermitWithoutStreamKey),
11021108
},
1103-
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
1104-
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
1105-
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
1106-
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
1107-
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
1109+
MaxMessageSize: viperInstance.GetInt(ClientGRPCMaxMessageSizeKey),
1110+
MaxMessageReceiveSize: viperInstance.GetInt(ClientGRPCMaxMessageReceiveSizeKey),
1111+
MaxMessageSendSize: viperInstance.GetInt(ClientGRPCMaxMessageSendSizeKey),
1112+
MaxFileSize: viperInstance.GetUint32(ClientGRPCMaxFileSizeKey),
1113+
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
1114+
MaxParallelFileOperations: viperInstance.GetInt(ClientGRPCMaxParallelFileOperationsKey),
11081115
},
11091116
Backoff: &BackOff{
11101117
InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey),

internal/config/config_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,11 +1178,12 @@ func createConfig() *Config {
11781178
Time: 10 * time.Second,
11791179
PermitWithoutStream: false,
11801180
},
1181-
MaxMessageSize: 1048575,
1182-
MaxMessageReceiveSize: 1048575,
1183-
MaxMessageSendSize: 1048575,
1184-
MaxFileSize: 485753,
1185-
FileChunkSize: 48575,
1181+
MaxMessageSize: 1048575,
1182+
MaxMessageReceiveSize: 1048575,
1183+
MaxMessageSendSize: 1048575,
1184+
MaxFileSize: 485753,
1185+
FileChunkSize: 48575,
1186+
MaxParallelFileOperations: 10,
11861187
},
11871188
Backoff: &BackOff{
11881189
InitialInterval: 200 * time.Millisecond,

internal/config/defaults.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ const (
6060
DefAuxiliaryCommandTLServerNameKey = ""
6161

6262
// Client GRPC Settings
63-
DefMaxMessageSize = 0 // 0 = unset
64-
DefMaxMessageRecieveSize = 4194304 // default 4 MB
65-
DefMaxMessageSendSize = 4194304 // default 4 MB
66-
DefMaxFileSize uint32 = 1048576 // 1MB
67-
DefFileChunkSize uint32 = 524288 // 0.5MB
63+
DefMaxMessageSize = 0 // 0 = unset
64+
DefMaxMessageRecieveSize = 4194304 // default 4 MB
65+
DefMaxMessageSendSize = 4194304 // default 4 MB
66+
DefMaxFileSize uint32 = 1048576 // 1MB
67+
DefFileChunkSize uint32 = 524288 // 0.5MB
68+
DefMaxParallelFileOperations = 5
6869

6970
// Client HTTP Settings
7071
DefHTTPTimeout = 10 * time.Second

internal/config/flags.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ var (
3434
ClientKeepAliveTimeKey = pre(GrpcKeepAlive) + "time"
3535
ClientKeepAliveTimeoutKey = pre(GrpcKeepAlive) + "timeout"
3636

37-
ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
38-
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
39-
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
40-
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
41-
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
42-
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
37+
ClientHTTPTimeoutKey = pre(ClientRootKey) + "http_timeout"
38+
ClientGRPCMaxMessageSendSizeKey = pre(ClientRootKey) + "grpc_max_message_send_size"
39+
ClientGRPCMaxMessageReceiveSizeKey = pre(ClientRootKey) + "grpc_max_message_receive_size"
40+
ClientGRPCMaxMessageSizeKey = pre(ClientRootKey) + "grpc_max_message_size"
41+
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
42+
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
43+
ClientGRPCMaxParallelFileOperationsKey = pre(ClientRootKey) + "grpc_max_parallel_file_operations"
4344

4445
ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval"
4546
ClientBackoffMaxIntervalKey = pre(ClientRootKey) + "backoff_max_interval"

internal/config/testdata/nginx-agent.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ client:
5454
max_message_send_size: 1048575
5555
max_file_size: 485753
5656
file_chunk_size: 48575
57+
max_parallel_file_operations: 10
5758
backoff:
5859
initial_interval: 200ms
5960
max_interval: 10s

internal/config/types.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,17 @@ type (
9191
Multiplier float64 `yaml:"multiplier" mapstructure:"multiplier"`
9292
}
9393

94+
//nolint:lll // max line limit exceeded
9495
GRPC struct {
9596
KeepAlive *KeepAlive `yaml:"keepalive" mapstructure:"keepalive"`
9697
// if MaxMessageSize is size set then we use that value,
9798
// otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings
98-
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
99-
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
100-
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
101-
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
102-
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
99+
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
100+
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
101+
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
102+
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
103+
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
104+
MaxParallelFileOperations int `yaml:"max_parallel_file_operations" mapstructure:"max_parallel_file_operations"`
103105
}
104106

105107
KeepAlive struct {

internal/file/file_manager_service.go

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"strconv"
1818
"sync"
1919

20+
"golang.org/x/sync/errgroup"
2021
"google.golang.org/grpc"
2122

2223
"github.com/nginx/agent/v3/internal/model"
@@ -288,28 +289,36 @@ func (fms *FileManagerService) ConfigUpdate(ctx context.Context,
288289
}
289290

290291
func (fms *FileManagerService) ConfigUpload(ctx context.Context, configUploadRequest *mpi.ConfigUploadRequest) error {
291-
var updatingFilesError error
292+
uploadFiles := configUploadRequest.GetOverview().GetFiles()
293+
if len(uploadFiles) == 0 {
294+
return nil
295+
}
292296

293-
for _, file := range configUploadRequest.GetOverview().GetFiles() {
294-
err := fms.fileServiceOperator.UpdateFile(
295-
ctx,
296-
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
297-
file,
298-
)
299-
if err != nil {
300-
slog.ErrorContext(
301-
ctx,
302-
"Failed to update file",
303-
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
304-
"file_name", file.GetFileMeta().GetName(),
305-
"error", err,
297+
errGroup, errGroupCtx := errgroup.WithContext(ctx)
298+
errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations)
299+
300+
for _, file := range uploadFiles {
301+
errGroup.Go(func() error {
302+
err := fms.fileServiceOperator.UpdateFile(
303+
errGroupCtx,
304+
configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
305+
file,
306306
)
307+
if err != nil {
308+
slog.ErrorContext(
309+
errGroupCtx,
310+
"Failed to update file",
311+
"instance_id", configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
312+
"file_name", file.GetFileMeta().GetName(),
313+
"error", err,
314+
)
315+
}
307316

308-
updatingFilesError = errors.Join(updatingFilesError, err)
309-
}
317+
return err
318+
})
310319
}
311320

312-
return updatingFilesError
321+
return errGroup.Wait()
313322
}
314323

315324
// DetermineFileActions compares two sets of files to determine the file action for each file. Returns a map of files
@@ -570,25 +579,36 @@ func (fms *FileManagerService) executeFileActions(ctx context.Context) (actionEr
570579
}
571580

572581
func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Context) (updateError error) {
582+
var downloadFiles []*model.FileCache
573583
for _, fileAction := range fms.fileActions {
574584
if fileAction.Action == model.Add || fileAction.Action == model.Update {
585+
downloadFiles = append(downloadFiles, fileAction)
586+
}
587+
}
588+
589+
if len(downloadFiles) == 0 {
590+
slog.DebugContext(ctx, "No updated files to download")
591+
return nil
592+
}
593+
594+
errGroup, errGroupCtx := errgroup.WithContext(ctx)
595+
errGroup.SetLimit(fms.agentConfig.Client.Grpc.MaxParallelFileOperations)
596+
597+
for _, fileAction := range downloadFiles {
598+
errGroup.Go(func() error {
575599
tempFilePath := tempFilePath(fileAction.File.GetFileMeta().GetName())
576600

577601
slog.DebugContext(
578-
ctx,
602+
errGroupCtx,
579603
"Downloading file to temp location",
580604
"file", tempFilePath,
581605
)
582606

583-
updateErr := fms.fileUpdate(ctx, fileAction.File, tempFilePath)
584-
if updateErr != nil {
585-
updateError = updateErr
586-
break
587-
}
588-
}
607+
return fms.fileUpdate(errGroupCtx, fileAction.File, tempFilePath)
608+
})
589609
}
590610

591-
return updateError
611+
return errGroup.Wait()
592612
}
593613

594614
func (fms *FileManagerService) moveOrDeleteFiles(ctx context.Context, actionError error) error {

test/types/config.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ const (
2929
commonRandomizationFactor = 0.1
3030
commonMultiplier = 0.2
3131

32-
reloadMonitoringPeriod = 400 * time.Millisecond
32+
maxParallelFileOperations = 5
33+
reloadMonitoringPeriod = 400 * time.Millisecond
3334
)
3435

3536
// Produces a populated Agent Config for testing usage.
@@ -52,10 +53,11 @@ func AgentConfig() *config.Config {
5253
Time: clientTime,
5354
PermitWithoutStream: clientPermitWithoutStream,
5455
},
55-
MaxMessageReceiveSize: 1,
56-
MaxMessageSendSize: 1,
57-
MaxFileSize: 1,
58-
FileChunkSize: 1,
56+
MaxMessageReceiveSize: 1,
57+
MaxMessageSendSize: 1,
58+
MaxFileSize: 1,
59+
FileChunkSize: 1,
60+
MaxParallelFileOperations: maxParallelFileOperations,
5961
},
6062
Backoff: &config.BackOff{
6163
InitialInterval: commonInitialInterval,

0 commit comments

Comments
 (0)