Skip to content

Commit e98f940

Browse files
committed
feat(backup_service): support backup encryption
1 parent fef87ae commit e98f940

File tree

19 files changed

+402
-95
lines changed

19 files changed

+402
-95
lines changed

cmd/ydbcp/main.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"flag"
66
"fmt"
7-
"github.com/ydb-platform/ydb-go-sdk/v3/log"
87
"net/http"
98
_ "net/http/pprof"
109
"os"
@@ -19,6 +18,7 @@ import (
1918
"ydbcp/internal/connectors/db/yql/queries"
2019
"ydbcp/internal/connectors/s3"
2120
"ydbcp/internal/handlers"
21+
"ydbcp/internal/kms"
2222
"ydbcp/internal/metrics"
2323
"ydbcp/internal/processor"
2424
"ydbcp/internal/server"
@@ -32,6 +32,9 @@ import (
3232
"ydbcp/internal/watchers/schedule_watcher"
3333
"ydbcp/internal/watchers/ttl_watcher"
3434
ap "ydbcp/pkg/plugins/auth"
35+
kp "ydbcp/pkg/plugins/kms"
36+
37+
"github.com/ydb-platform/ydb-go-sdk/v3/log"
3538

3639
"github.com/jonboulle/clockwork"
3740

@@ -114,6 +117,24 @@ func main() {
114117
}
115118
}()
116119
xlog.Info(ctx, "Initialized AuthProvider")
120+
121+
var kmsProvider kp.KmsProvider
122+
if len(configInstance.KMS.PluginPath) == 0 {
123+
kmsProvider, err = kms.NewDummyKmsProvider(ctx)
124+
} else {
125+
kmsProvider, err = kms.NewKmsProvider(ctx, configInstance.KMS)
126+
}
127+
if err != nil {
128+
xlog.Error(ctx, "Error init KmsProvider", zap.Error(err))
129+
os.Exit(1)
130+
}
131+
defer func() {
132+
if err := kmsProvider.Close(ctx); err != nil {
133+
xlog.Error(ctx, "Error close kms provider", zap.Error(err))
134+
}
135+
}()
136+
xlog.Info(ctx, "Initialized KmsProvider")
137+
117138
metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer, clockwork.NewRealClock())
118139
xlog.Info(ctx, "Initialized metrics registry")
119140
audit.EventsDestination = configInstance.Audit.EventsDestination
@@ -144,6 +165,7 @@ func main() {
144165
dbConnector,
145166
clientConnector,
146167
authProvider,
168+
kmsProvider,
147169
*configInstance,
148170
).Register(server)
149171
operation.NewOperationService(dbConnector, authProvider).Register(server)
@@ -192,6 +214,7 @@ func main() {
192214
queries.NewWriteTableQuery,
193215
clockwork.NewRealClock(),
194216
*configInstance,
217+
kmsProvider,
195218
),
196219
); err != nil {
197220
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
@@ -210,7 +233,7 @@ func main() {
210233
xlog.Info(ctx, "Created TtlWatcher")
211234
}
212235

213-
backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock())
236+
backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock(), configInstance.FeatureFlags)
214237

215238
schedule_watcher.NewScheduleWatcher(
216239
ctx, &wg, configInstance.OperationProcessor.ProcessorIntervalSeconds, dbConnector,

internal/backup_operations/make_backup.go

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ package backup_operations
22

33
import (
44
"context"
5+
"crypto/rand"
56
"errors"
67
"fmt"
7-
"github.com/jonboulle/clockwork"
8-
"github.com/ydb-platform/ydb-go-sdk/v3"
98
"path"
109
"regexp"
1110
"strings"
@@ -14,8 +13,12 @@ import (
1413
"ydbcp/internal/connectors/client"
1514
"ydbcp/internal/types"
1615
"ydbcp/internal/util/xlog"
16+
kp "ydbcp/pkg/plugins/kms"
1717
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
1818

19+
"github.com/jonboulle/clockwork"
20+
"github.com/ydb-platform/ydb-go-sdk/v3"
21+
1922
"go.uber.org/zap"
2023
"google.golang.org/grpc/codes"
2124
"google.golang.org/grpc/status"
@@ -36,6 +39,7 @@ type MakeBackupInternalRequest struct {
3639
ScheduleID *string
3740
Ttl *time.Duration
3841
ParentOperationID *string
42+
EncryptionSettings *pb.EncryptionSettings
3943
}
4044

4145
func FromBackupSchedule(schedule *types.BackupSchedule) MakeBackupInternalRequest {
@@ -65,6 +69,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter
6569
ScheduleID: tbwr.ScheduleID,
6670
Ttl: tbwr.Ttl,
6771
ParentOperationID: &tbwr.ID,
72+
EncryptionSettings: tbwr.EncryptionSettings,
6873
}
6974
}
7075

@@ -282,6 +287,31 @@ func IsEmptyBackup(backup *types.Backup) bool {
282287
return backup.Size == 0 && backup.S3Endpoint == ""
283288
}
284289

290+
func GetEncryptionParams(settings *pb.EncryptionSettings) ([]byte, string, error) {
291+
var algorithm string
292+
var length int
293+
294+
switch settings.Algorithm {
295+
case pb.EncryptionSettings_UNSPECIFIED:
296+
case pb.EncryptionSettings_AES_128_GCM:
297+
algorithm = "AES-128-GCM"
298+
length = 16
299+
case pb.EncryptionSettings_AES_256_GCM:
300+
algorithm = "AES-256-GCM"
301+
length = 32
302+
case pb.EncryptionSettings_CHACHA20_POLY1305:
303+
algorithm = "ChaCha20-Poly1305"
304+
length = 32
305+
}
306+
307+
dek := make([]byte, length)
308+
_, err := rand.Read(dek)
309+
if err != nil {
310+
return nil, "", err
311+
}
312+
return dek, algorithm, nil
313+
}
314+
285315
func MakeBackup(
286316
ctx context.Context,
287317
clientConn client.ClientConnector,
@@ -292,6 +322,7 @@ func MakeBackup(
292322
subject string,
293323
clock clockwork.Clock,
294324
featureFlags config.FeatureFlagsConfig,
325+
kmsProvider kp.KmsProvider,
295326
) (*types.Backup, *types.TakeBackupOperation, error) {
296327
if req.ScheduleID != nil {
297328
ctx = xlog.With(ctx, zap.String("ScheduleID", *req.ScheduleID))
@@ -359,6 +390,37 @@ func MakeBackup(
359390
S3ForcePathStyle: s3.S3ForcePathStyle,
360391
}
361392

393+
if req.EncryptionSettings != nil && featureFlags.EnableBackupEncryption {
394+
dek, algorithm, err := GetEncryptionParams(req.EncryptionSettings)
395+
if err != nil {
396+
return nil, nil, err
397+
}
398+
399+
s3Settings.EncryptionKey = dek
400+
s3Settings.EncryptionAlgorithm = algorithm
401+
402+
kmsKey := req.EncryptionSettings.GetKmsKey()
403+
if kmsKey == nil {
404+
xlog.Error(ctx, "kms key is not specified")
405+
return nil, nil, status.Errorf(codes.InvalidArgument, "kms key is not specified")
406+
}
407+
408+
_, err = kmsProvider.Encrypt(
409+
ctx,
410+
&kp.EncryptRequest{
411+
KeyID: kmsKey.GetKeyId(),
412+
Plaintext: dek,
413+
},
414+
)
415+
416+
if err != nil {
417+
xlog.Error(ctx, "can't encrypt data encryption key", zap.Error(err))
418+
return nil, nil, err
419+
}
420+
421+
// TODO: save encrypted key to s3
422+
}
423+
362424
clientOperationID, err := clientConn.ExportToS3(ctx, client, s3Settings, featureFlags)
363425
if err != nil {
364426
xlog.Error(ctx, "can't start export operation", zap.Error(err))
@@ -388,9 +450,10 @@ func MakeBackup(
388450
CreatedAt: now,
389451
Creator: subject,
390452
},
391-
ScheduleID: req.ScheduleID,
392-
ExpireAt: expireAt,
393-
SourcePaths: pathsForExport,
453+
ScheduleID: req.ScheduleID,
454+
ExpireAt: expireAt,
455+
SourcePaths: pathsForExport,
456+
EncryptionSettings: req.EncryptionSettings,
394457
}
395458

396459
op := &types.TakeBackupOperation{
@@ -409,9 +472,10 @@ func MakeBackup(
409472
CreatedAt: now,
410473
Creator: subject,
411474
},
412-
YdbOperationId: clientOperationID,
413-
UpdatedAt: now,
414-
ParentOperationID: req.ParentOperationID,
475+
YdbOperationId: clientOperationID,
476+
UpdatedAt: now,
477+
ParentOperationID: req.ParentOperationID,
478+
EncryptionSettings: req.EncryptionSettings,
415479
}
416480

417481
return backup, op, nil

internal/config/config.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ type MetricsServerConfig struct {
6565
}
6666

6767
type FeatureFlagsConfig struct {
68-
DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"`
69-
EnableNewPathsFormat bool `yaml:"enable_new_paths_format" default:"false"`
68+
DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"`
69+
EnableNewPathsFormat bool `yaml:"enable_new_paths_format" default:"false"`
70+
EnableBackupEncryption bool `yaml:"enable_backup_encryption" default:"false"`
7071
}
7172

7273
type LogConfig struct {
@@ -96,6 +97,7 @@ type Config struct {
9697
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
9798
S3 S3Config `yaml:"s3"`
9899
Auth PluginConfig `yaml:"auth"`
100+
KMS PluginConfig `yaml:"kms"`
99101
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
100102
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
101103
OperationProcessor OperationProcessorConfig `yaml:"operation_processor"`

internal/connectors/client/connector.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,17 @@ func (d *ClientYdbConnector) ExportToS3(
272272
exportRequest.Settings.DestinationPrefix = s3Settings.DestinationPrefix
273273
}
274274

275+
if featureFlags.EnableBackupEncryption && len(s3Settings.EncryptionKey) > 0 {
276+
exportRequest.Settings.EncryptionSettings = &Ydb_Export.EncryptionSettings{
277+
EncryptionAlgorithm: s3Settings.EncryptionAlgorithm,
278+
Key: &Ydb_Export.EncryptionSettings_SymmetricKey_{
279+
SymmetricKey: &Ydb_Export.EncryptionSettings_SymmetricKey{
280+
Key: s3Settings.EncryptionKey,
281+
},
282+
},
283+
}
284+
}
285+
275286
response, err := exportClient.ExportToS3(ctx, exportRequest)
276287

277288
if err != nil {
@@ -425,6 +436,17 @@ func (d *ClientYdbConnector) ImportFromS3(
425436
importRequest.Settings.DestinationPath = path.Join(clientDb.Name(), s3Settings.DestinationPath)
426437
}
427438

439+
if len(s3Settings.EncryptionKey) > 0 {
440+
importRequest.Settings.EncryptionSettings = &Ydb_Export.EncryptionSettings{
441+
EncryptionAlgorithm: s3Settings.EncryptionAlgorithm,
442+
Key: &Ydb_Export.EncryptionSettings_SymmetricKey_{
443+
SymmetricKey: &Ydb_Export.EncryptionSettings_SymmetricKey{
444+
Key: s3Settings.EncryptionKey,
445+
},
446+
},
447+
}
448+
}
449+
428450
response, err := importClient.ImportFromS3(ctx, importRequest)
429451

430452
if err != nil {

internal/connectors/db/yql/queries/write.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
132132
if tb.ParentOperationID != nil {
133133
d.AddValueParam("$parent_operation_id", table_types.StringValueFromString(*tb.ParentOperationID))
134134
}
135+
if tb.EncryptionSettings != nil {
136+
d.AddValueParam(
137+
"$encryption_algorithm",
138+
table_types.StringValueFromString(tb.EncryptionSettings.GetAlgorithm().String()),
139+
)
140+
d.AddValueParam(
141+
"$kms_key_id",
142+
table_types.StringValueFromString(tb.EncryptionSettings.GetKmsKey().GetKeyId()),
143+
)
144+
}
135145
} else if operation.GetType() == types.OperationTypeTBWR {
136146
tbwr, ok := operation.(*types.TakeBackupWithRetryOperation)
137147
if !ok {
@@ -184,6 +194,16 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
184194
if tbwr.Ttl != nil {
185195
d.AddValueParam("$ttl", table_types.IntervalValueFromDuration(*tbwr.Ttl))
186196
}
197+
if tbwr.EncryptionSettings != nil {
198+
d.AddValueParam(
199+
"$encryption_algorithm",
200+
table_types.StringValueFromString(tbwr.EncryptionSettings.GetAlgorithm().String()),
201+
)
202+
d.AddValueParam(
203+
"$kms_key_id",
204+
table_types.StringValueFromString(tbwr.EncryptionSettings.GetKmsKey().GetKeyId()),
205+
)
206+
}
187207
} else if operation.GetType() == types.OperationTypeRB {
188208
rb, ok := operation.(*types.RestoreBackupOperation)
189209
if !ok {
@@ -336,6 +356,17 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
336356
if b.ExpireAt != nil {
337357
d.AddValueParam("$expire_at", table_types.TimestampValueFromTime(*b.ExpireAt))
338358
}
359+
360+
if b.EncryptionSettings != nil {
361+
d.AddValueParam(
362+
"$encryption_algorithm",
363+
table_types.StringValueFromString(b.EncryptionSettings.GetAlgorithm().String()),
364+
)
365+
d.AddValueParam(
366+
"$kms_key_id",
367+
table_types.StringValueFromString(b.EncryptionSettings.GetKmsKey().GetKeyId()),
368+
)
369+
}
339370
return d
340371
}
341372

@@ -389,6 +420,17 @@ func BuildCreateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr
389420
"$recovery_point_objective",
390421
table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()),
391422
)
423+
424+
if schedule.ScheduleSettings.EncryptionSettings != nil {
425+
d.AddValueParam(
426+
"$encryption_algorithm",
427+
table_types.StringValueFromString(schedule.ScheduleSettings.EncryptionSettings.GetAlgorithm().String()),
428+
)
429+
d.AddValueParam(
430+
"$kms_key_id",
431+
table_types.StringValueFromString(schedule.ScheduleSettings.EncryptionSettings.GetKmsKey().GetKeyId()),
432+
)
433+
}
392434
}
393435
if schedule.NextLaunch != nil {
394436
d.AddValueParam("$next_launch", table_types.TimestampValueFromTime(*schedule.NextLaunch))

internal/handlers/schedule_backup.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"google.golang.org/protobuf/types/known/durationpb"
99
"google.golang.org/protobuf/types/known/timestamppb"
1010
"ydbcp/internal/audit"
11+
"ydbcp/internal/config"
1112
"ydbcp/internal/connectors/db"
1213
"ydbcp/internal/connectors/db/yql/queries"
1314
"ydbcp/internal/types"
@@ -20,11 +21,13 @@ type BackupScheduleHandlerType func(context.Context, db.DBConnector, *types.Back
2021
func NewBackupScheduleHandler(
2122
queryBuilderFactory queries.WriteQueryBuilderFactory,
2223
clock clockwork.Clock,
24+
featureFlags config.FeatureFlagsConfig,
2325
) BackupScheduleHandlerType {
2426
return func(ctx context.Context, driver db.DBConnector, schedule *types.BackupSchedule) error {
2527
return BackupScheduleHandler(
2628
ctx, driver, schedule,
2729
queryBuilderFactory, clock,
30+
featureFlags,
2831
)
2932
}
3033
}
@@ -46,6 +49,7 @@ func BackupScheduleHandler(
4649
schedule *types.BackupSchedule,
4750
queryBuilderFactory queries.WriteQueryBuilderFactory,
4851
clock clockwork.Clock,
52+
featureFlags config.FeatureFlagsConfig,
4953
) error {
5054
if schedule.Status != types.BackupScheduleStateActive {
5155
xlog.Error(ctx, "backup schedule is not active", zap.String("ScheduleID", schedule.ID))
@@ -86,6 +90,10 @@ func BackupScheduleHandler(
8690
d := schedule.ScheduleSettings.Ttl.AsDuration()
8791
tbwr.Ttl = &d
8892
}
93+
94+
if schedule.ScheduleSettings.EncryptionSettings != nil && featureFlags.EnableBackupEncryption {
95+
tbwr.EncryptionSettings = schedule.ScheduleSettings.EncryptionSettings
96+
}
8997
}
9098

9199
xlog.Info(

0 commit comments

Comments
 (0)