Skip to content

Commit a5905b7

Browse files
committed
feat(backup_service): support backup encryption
1 parent c5f7b99 commit a5905b7

File tree

19 files changed

+371
-93
lines changed

19 files changed

+371
-93
lines changed

cmd/ydbcp/main.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"flag"
66
"fmt"
7-
"github.com/ydb-platform/ydb-go-sdk/v3/log"
87
"net/http"
98
_ "net/http/pprof"
109
"os"
@@ -19,6 +18,7 @@ import (
1918
"ydbcp/internal/connectors/db/yql/queries"
2019
"ydbcp/internal/connectors/s3"
2120
"ydbcp/internal/handlers"
21+
"ydbcp/internal/kms"
2222
"ydbcp/internal/metrics"
2323
"ydbcp/internal/processor"
2424
"ydbcp/internal/server"
@@ -32,6 +32,9 @@ import (
3232
"ydbcp/internal/watchers/schedule_watcher"
3333
"ydbcp/internal/watchers/ttl_watcher"
3434
ap "ydbcp/pkg/plugins/auth"
35+
kp "ydbcp/pkg/plugins/kms"
36+
37+
"github.com/ydb-platform/ydb-go-sdk/v3/log"
3538

3639
"github.com/jonboulle/clockwork"
3740

@@ -114,6 +117,24 @@ func main() {
114117
}
115118
}()
116119
xlog.Info(ctx, "Initialized AuthProvider")
120+
121+
var kmsProvider kp.KmsProvider
122+
if len(configInstance.KMS.PluginPath) == 0 {
123+
kmsProvider, err = kms.NewDummyKmsProvider(ctx)
124+
} else {
125+
kmsProvider, err = kms.NewKmsProvider(ctx, configInstance.KMS)
126+
}
127+
if err != nil {
128+
xlog.Error(ctx, "Error init KmsProvider", zap.Error(err))
129+
os.Exit(1)
130+
}
131+
defer func() {
132+
if err := kmsProvider.Close(ctx); err != nil {
133+
xlog.Error(ctx, "Error close kms provider", zap.Error(err))
134+
}
135+
}()
136+
xlog.Info(ctx, "Initialized KmsProvider")
137+
117138
metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer, clockwork.NewRealClock())
118139
xlog.Info(ctx, "Initialized metrics registry")
119140
audit.EventsDestination = configInstance.Audit.EventsDestination
@@ -144,6 +165,7 @@ func main() {
144165
dbConnector,
145166
clientConnector,
146167
authProvider,
168+
kmsProvider,
147169
*configInstance,
148170
).Register(server)
149171
operation.NewOperationService(dbConnector, authProvider).Register(server)
@@ -192,6 +214,7 @@ func main() {
192214
queries.NewWriteTableQuery,
193215
clockwork.NewRealClock(),
194216
*configInstance,
217+
kmsProvider,
195218
),
196219
); err != nil {
197220
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
@@ -210,7 +233,7 @@ func main() {
210233
xlog.Info(ctx, "Created TtlWatcher")
211234
}
212235

213-
backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock())
236+
backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock(), configInstance.FeatureFlags)
214237

215238
schedule_watcher.NewScheduleWatcher(
216239
ctx, &wg, configInstance.OperationProcessor.ProcessorIntervalSeconds, dbConnector,

internal/backup_operations/make_backup.go

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package backup_operations
22

33
import (
44
"context"
5+
"crypto/rand"
56
"errors"
67
"fmt"
78
"github.com/jonboulle/clockwork"
@@ -36,6 +37,7 @@ type MakeBackupInternalRequest struct {
3637
ScheduleID *string
3738
Ttl *time.Duration
3839
ParentOperationID *string
40+
EncryptionSettings *pb.EncryptionSettings
3941
}
4042

4143
func FromBackupSchedule(schedule *types.BackupSchedule) MakeBackupInternalRequest {
@@ -65,6 +67,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter
6567
ScheduleID: tbwr.ScheduleID,
6668
Ttl: tbwr.Ttl,
6769
ParentOperationID: &tbwr.ID,
70+
EncryptionSettings: tbwr.EncryptionSettings,
6871
}
6972
}
7073

@@ -282,6 +285,34 @@ func IsEmptyBackup(backup *types.Backup) bool {
282285
return backup.Size == 0 && backup.S3Endpoint == ""
283286
}
284287

288+
func GetEncryptionParams(settings *pb.EncryptionSettings) ([]byte, string, error) {
289+
var algorithm string
290+
var length int
291+
292+
switch settings.Algorithm {
293+
case pb.EncryptionSettings_UNSPECIFIED:
294+
case pb.EncryptionSettings_AES_128_GCM:
295+
algorithm = "AES-128-GCM"
296+
length = 16
297+
break
298+
case pb.EncryptionSettings_AES_256_GCM:
299+
algorithm = "AES-256-GCM"
300+
length = 32
301+
break
302+
case pb.EncryptionSettings_CHACHA20_POLY1305:
303+
algorithm = "ChaCha20-Poly1305"
304+
length = 32
305+
break
306+
}
307+
308+
dek := make([]byte, length)
309+
_, err := rand.Read(dek)
310+
if err != nil {
311+
return nil, "", err
312+
}
313+
return dek, algorithm, nil
314+
}
315+
285316
func MakeBackup(
286317
ctx context.Context,
287318
clientConn client.ClientConnector,
@@ -359,6 +390,18 @@ func MakeBackup(
359390
S3ForcePathStyle: s3.S3ForcePathStyle,
360391
}
361392

393+
if req.EncryptionSettings != nil && featureFlags.EnableBackupEncryption {
394+
dek, algorithm, err := GetEncryptionParams(req.EncryptionSettings)
395+
if err != nil {
396+
return nil, nil, err
397+
}
398+
399+
s3Settings.EncryptionKey = dek
400+
s3Settings.EncryptionAlgorithm = algorithm
401+
// TODO: encrypt the DEK using the specified KEK
402+
// TODO: stores the encrypted DEK in S3
403+
}
404+
362405
clientOperationID, err := clientConn.ExportToS3(ctx, client, s3Settings, featureFlags)
363406
if err != nil {
364407
xlog.Error(ctx, "can't start export operation", zap.Error(err))
@@ -388,9 +431,10 @@ func MakeBackup(
388431
CreatedAt: now,
389432
Creator: subject,
390433
},
391-
ScheduleID: req.ScheduleID,
392-
ExpireAt: expireAt,
393-
SourcePaths: pathsForExport,
434+
ScheduleID: req.ScheduleID,
435+
ExpireAt: expireAt,
436+
SourcePaths: pathsForExport,
437+
EncryptionSettings: req.EncryptionSettings,
394438
}
395439

396440
op := &types.TakeBackupOperation{
@@ -409,9 +453,10 @@ func MakeBackup(
409453
CreatedAt: now,
410454
Creator: subject,
411455
},
412-
YdbOperationId: clientOperationID,
413-
UpdatedAt: now,
414-
ParentOperationID: req.ParentOperationID,
456+
YdbOperationId: clientOperationID,
457+
UpdatedAt: now,
458+
ParentOperationID: req.ParentOperationID,
459+
EncryptionSettings: req.EncryptionSettings,
415460
}
416461

417462
return backup, op, nil

internal/config/config.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ type MetricsServerConfig struct {
6565
}
6666

6767
type FeatureFlagsConfig struct {
68-
DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"`
69-
EnableNewPathsFormat bool `yaml:"enable_new_paths_format" default:"false"`
68+
DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"`
69+
EnableNewPathsFormat bool `yaml:"enable_new_paths_format" default:"false"`
70+
EnableBackupEncryption bool `yaml:"enable_backup_encryption" default:"false"`
7071
}
7172

7273
type LogConfig struct {
@@ -96,6 +97,7 @@ type Config struct {
9697
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
9798
S3 S3Config `yaml:"s3"`
9899
Auth PluginConfig `yaml:"auth"`
100+
KMS PluginConfig `yaml:"kms"`
99101
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
100102
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
101103
OperationProcessor OperationProcessorConfig `yaml:"operation_processor"`

internal/connectors/client/connector.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,17 @@ func (d *ClientYdbConnector) ExportToS3(
272272
exportRequest.Settings.DestinationPrefix = s3Settings.DestinationPrefix
273273
}
274274

275+
if featureFlags.EnableBackupEncryption && len(s3Settings.EncryptionKey) > 0 {
276+
exportRequest.Settings.EncryptionSettings = &Ydb_Export.EncryptionSettings{
277+
EncryptionAlgorithm: s3Settings.EncryptionAlgorithm,
278+
Key: &Ydb_Export.EncryptionSettings_SymmetricKey_{
279+
SymmetricKey: &Ydb_Export.EncryptionSettings_SymmetricKey{
280+
Key: s3Settings.EncryptionKey,
281+
},
282+
},
283+
}
284+
}
285+
275286
response, err := exportClient.ExportToS3(ctx, exportRequest)
276287

277288
if err != nil {
@@ -425,6 +436,17 @@ func (d *ClientYdbConnector) ImportFromS3(
425436
importRequest.Settings.DestinationPath = path.Join(clientDb.Name(), s3Settings.DestinationPath)
426437
}
427438

439+
if len(s3Settings.EncryptionKey) > 0 {
440+
importRequest.Settings.EncryptionSettings = &Ydb_Export.EncryptionSettings{
441+
EncryptionAlgorithm: s3Settings.EncryptionAlgorithm,
442+
Key: &Ydb_Export.EncryptionSettings_SymmetricKey_{
443+
SymmetricKey: &Ydb_Export.EncryptionSettings_SymmetricKey{
444+
Key: s3Settings.EncryptionKey,
445+
},
446+
},
447+
}
448+
}
449+
428450
response, err := importClient.ImportFromS3(ctx, importRequest)
429451

430452
if err != nil {

internal/connectors/db/yql/queries/write.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,16 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
132132
if tb.ParentOperationID != nil {
133133
d.AddValueParam("$parent_operation_id", table_types.StringValueFromString(*tb.ParentOperationID))
134134
}
135+
if tb.EncryptionSettings != nil {
136+
d.AddValueParam(
137+
"$encryption_algorithm",
138+
table_types.StringValueFromString(tb.EncryptionSettings.GetAlgorithm().String()),
139+
)
140+
d.AddValueParam(
141+
"$kms_key_id",
142+
table_types.StringValueFromString(tb.EncryptionSettings.GetKmsKey().GetKeyId()),
143+
)
144+
}
135145
} else if operation.GetType() == types.OperationTypeTBWR {
136146
tbwr, ok := operation.(*types.TakeBackupWithRetryOperation)
137147
if !ok {
@@ -184,6 +194,16 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
184194
if tbwr.Ttl != nil {
185195
d.AddValueParam("$ttl", table_types.IntervalValueFromDuration(*tbwr.Ttl))
186196
}
197+
if tbwr.EncryptionSettings != nil {
198+
d.AddValueParam(
199+
"$encryption_algorithm",
200+
table_types.StringValueFromString(tbwr.EncryptionSettings.GetAlgorithm().String()),
201+
)
202+
d.AddValueParam(
203+
"$kms_key_id",
204+
table_types.StringValueFromString(tbwr.EncryptionSettings.GetKmsKey().GetKeyId()),
205+
)
206+
}
187207
} else if operation.GetType() == types.OperationTypeRB {
188208
rb, ok := operation.(*types.RestoreBackupOperation)
189209
if !ok {
@@ -336,6 +356,17 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
336356
if b.ExpireAt != nil {
337357
d.AddValueParam("$expire_at", table_types.TimestampValueFromTime(*b.ExpireAt))
338358
}
359+
360+
if b.EncryptionSettings != nil {
361+
d.AddValueParam(
362+
"$encryption_algorithm",
363+
table_types.StringValueFromString(b.EncryptionSettings.GetAlgorithm().String()),
364+
)
365+
d.AddValueParam(
366+
"$kms_key_id",
367+
table_types.StringValueFromString(b.EncryptionSettings.GetKmsKey().GetKeyId()),
368+
)
369+
}
339370
return d
340371
}
341372

@@ -389,6 +420,17 @@ func BuildCreateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr
389420
"$recovery_point_objective",
390421
table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()),
391422
)
423+
424+
if schedule.ScheduleSettings.EncryptionSettings != nil {
425+
d.AddValueParam(
426+
"$encryption_algorithm",
427+
table_types.StringValueFromString(schedule.ScheduleSettings.EncryptionSettings.GetAlgorithm().String()),
428+
)
429+
d.AddValueParam(
430+
"$kms_key_id",
431+
table_types.StringValueFromString(schedule.ScheduleSettings.EncryptionSettings.GetKmsKey().GetKeyId()),
432+
)
433+
}
392434
}
393435
if schedule.NextLaunch != nil {
394436
d.AddValueParam("$next_launch", table_types.TimestampValueFromTime(*schedule.NextLaunch))

internal/connectors/db/yql/schema/create_tables.yql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ CREATE TABLE Backups (
2222

2323
schedule_id String,
2424

25+
encryption_algorithm String,
26+
kms_key_id String,
27+
2528
INDEX idx_container_id GLOBAL ON (container_id),
2629
INDEX idx_created_at GLOBAL ON (created_at),
2730
INDEX idx_expire_at GLOBAL ON (status, expire_at),
@@ -56,6 +59,8 @@ CREATE TABLE Operations (
5659
paths_to_exclude String,
5760
operation_id String,
5861
parent_operation_id String,
62+
encryption_algorithm String,
63+
kms_key_id String,
5964
--used only in TBWR
6065
schedule_id String,
6166
ttl Interval,
@@ -87,6 +92,9 @@ CREATE TABLE BackupSchedules (
8792
initiated String,
8893
created_at Timestamp,
8994

95+
encryption_algorithm String,
96+
kms_key_id String,
97+
9098
recovery_point_objective Interval,
9199

92100
next_launch Timestamp,

internal/handlers/schedule_backup.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"google.golang.org/protobuf/types/known/durationpb"
99
"google.golang.org/protobuf/types/known/timestamppb"
1010
"ydbcp/internal/audit"
11+
"ydbcp/internal/config"
1112
"ydbcp/internal/connectors/db"
1213
"ydbcp/internal/connectors/db/yql/queries"
1314
"ydbcp/internal/types"
@@ -20,11 +21,13 @@ type BackupScheduleHandlerType func(context.Context, db.DBConnector, *types.Back
2021
func NewBackupScheduleHandler(
2122
queryBuilderFactory queries.WriteQueryBuilderFactory,
2223
clock clockwork.Clock,
24+
featureFlags config.FeatureFlagsConfig,
2325
) BackupScheduleHandlerType {
2426
return func(ctx context.Context, driver db.DBConnector, schedule *types.BackupSchedule) error {
2527
return BackupScheduleHandler(
2628
ctx, driver, schedule,
2729
queryBuilderFactory, clock,
30+
featureFlags,
2831
)
2932
}
3033
}
@@ -46,6 +49,7 @@ func BackupScheduleHandler(
4649
schedule *types.BackupSchedule,
4750
queryBuilderFactory queries.WriteQueryBuilderFactory,
4851
clock clockwork.Clock,
52+
featureFlags config.FeatureFlagsConfig,
4953
) error {
5054
if schedule.Status != types.BackupScheduleStateActive {
5155
xlog.Error(ctx, "backup schedule is not active", zap.String("ScheduleID", schedule.ID))
@@ -86,6 +90,10 @@ func BackupScheduleHandler(
8690
d := schedule.ScheduleSettings.Ttl.AsDuration()
8791
tbwr.Ttl = &d
8892
}
93+
94+
if schedule.ScheduleSettings.EncryptionSettings != nil && featureFlags.EnableBackupEncryption {
95+
tbwr.EncryptionSettings = schedule.ScheduleSettings.EncryptionSettings
96+
}
8997
}
9098

9199
xlog.Info(

internal/handlers/schedule_backup_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/stretchr/testify/assert"
77
"testing"
88
"time"
9+
"ydbcp/internal/config"
910
"ydbcp/internal/connectors/db"
1011
"ydbcp/internal/connectors/db/yql/queries"
1112
"ydbcp/internal/types"
@@ -40,7 +41,7 @@ func TestBackupScheduleHandler(t *testing.T) {
4041
)
4142

4243
handler := NewBackupScheduleHandler(
43-
queries.NewWriteTableQueryMock, clock,
44+
queries.NewWriteTableQueryMock, clock, config.FeatureFlagsConfig{},
4445
)
4546
err := handler(ctx, dbConnector, &schedule)
4647
assert.Empty(t, err)

0 commit comments

Comments
 (0)