From 83c5ecae965623ba0840f087ef63717aee686175 Mon Sep 17 00:00:00 2001 From: ulya-sidorina Date: Tue, 30 Sep 2025 14:29:58 +0200 Subject: [PATCH] feat(backup_service): support backup encryption --- .env | 1 + .github/workflows/unit-test.yml | 26 +- Dockerfile | 1 + cmd/integration/make_encrypted_backup/main.go | 260 ++++++++++++++++++ cmd/ydbcp/main.go | 29 +- docker-compose.yaml | 1 + internal/backup_operations/make_backup.go | 93 ++++++- internal/config/config.go | 6 +- internal/connectors/client/connector.go | 22 ++ .../connectors/client/prepare_items_test.go | 18 +- internal/connectors/db/process_result_set.go | 97 ++++++- internal/connectors/db/yql/queries/write.go | 42 +++ internal/connectors/s3/connector.go | 38 +++ internal/connectors/s3/mock.go | 47 +++- internal/handlers/delete_backup_test.go | 53 +--- internal/handlers/schedule_backup.go | 8 + internal/handlers/schedule_backup_test.go | 3 +- internal/handlers/take_backup_retry.go | 19 +- internal/handlers/take_backup_retry_test.go | 33 ++- internal/handlers/take_backup_test.go | 46 +--- internal/kms/dummy.go | 62 +++++ .../server/services/backup/backup_service.go | 55 +++- .../backup_schedule_service.go | 18 +- internal/types/backup.go | 44 +-- internal/types/operation.go | 8 +- internal/types/settings.go | 50 ++-- .../schedule_watcher/schedule_watcher_test.go | 11 +- local_config.yaml | 3 +- .../20251128000000_add_encryption_columns.sql | 26 ++ pkg/proto/ydbcp/v1alpha1/operation.pb.go | 57 ++-- pkg/proto/ydbcp/v1alpha1/operation.proto | 3 + 31 files changed, 957 insertions(+), 223 deletions(-) create mode 100644 cmd/integration/make_encrypted_backup/main.go create mode 100644 internal/kms/dummy.go create mode 100644 migrations/yql/20251128000000_add_encryption_columns.sql diff --git a/.env b/.env index 59c470e8..0e79add7 100644 --- a/.env +++ b/.env @@ -5,6 +5,7 @@ S3_BUCKET = "test-bucket" S3_REGION = "us-east-1" YDB_NAME = "local-ydb" ENABLE_NEW_PATHS_FORMAT = true +ENABLE_BACKUPS_ENCRYPTION = true # local-ydb image that was built from main # Image: https://github.com/ydb-platform/ydb/pkgs/container/local-ydb/551703770 # Built from revision 07aeccc41b43c9fc0b7da7680340fbac01b81427 diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 60b5d614..d7c50fea 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -54,9 +54,16 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - enable_new_paths_format: [ true, false ] + include: + - enable_new_paths_format: false + enable_backups_encryption: false + - enable_new_paths_format: true + enable_backups_encryption: false + - enable_new_paths_format: true + enable_backups_encryption: true env: ENABLE_NEW_PATHS_FORMAT: ${{ matrix.enable_new_paths_format }} + ENABLE_BACKUPS_ENCRYPTION: ${{ matrix.enable_backups_encryption }} steps: - uses: actions/checkout@v4 - name: supply with s3 access keys @@ -110,3 +117,20 @@ jobs: if: ${{ matrix.enable_new_paths_format }} run: | docker compose down + - name: docker compose up + if: ${{ matrix.enable_backups_encryption }} + run: | + docker compose up -d + - name: run make_encrypted_backup tests + if: ${{ matrix.enable_backups_encryption }} + run: | + while [ "$(docker inspect -f {{.State.Health.Status}} local-ydbcp)" != "healthy" ]; do + echo "Waiting for container to become healthy..." + sleep 1 + done + echo "Starting make_encrypted_backup tests!" + docker exec local-ydbcp sh -c './make_encrypted_backup' + - name: docker compose down + if: ${{ matrix.enable_backups_encryption }} + run: | + docker compose down diff --git a/Dockerfile b/Dockerfile index 53faa930..7f770eda 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,6 +21,7 @@ RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go RUN go build -o ./orm ./cmd/integration/orm/main.go RUN go build -o ./test_new_paths_format ./cmd/integration/new_paths_format/main.go +RUN go build -o ./make_encrypted_backup ./cmd/integration/make_encrypted_backup/main.go # Command to run the executable CMD ["./main", "--config=local_config.yaml"] diff --git a/cmd/integration/make_encrypted_backup/main.go b/cmd/integration/make_encrypted_backup/main.go new file mode 100644 index 00000000..30606f78 --- /dev/null +++ b/cmd/integration/make_encrypted_backup/main.go @@ -0,0 +1,260 @@ +package main + +import ( + "context" + "log" + "time" + "ydbcp/cmd/integration/common" + "ydbcp/internal/types" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + containerID = "abcde" + databaseName = "/local" + ydbcpEndpoint = "0.0.0.0:50051" + databaseEndpoint = "grpcs://local-ydb:2135" + testKmsKeyID = "test-kms-key-id-123" +) + +type encryptedBackupScenario struct { + name string + request *pb.MakeBackupRequest +} + +type negativeEncryptedBackupScenario struct { + name string + request *pb.MakeBackupRequest + expectedStatus codes.Code +} + +func newEncryptedBackupRequest(rootPath string, sourcePaths []string, kmsKeyID string) *pb.MakeBackupRequest { + encryptionSettings := &pb.EncryptionSettings{ + Algorithm: pb.EncryptionSettings_AES_256_GCM, + } + + if len(kmsKeyID) > 0 { + encryptionSettings.KeyEncryptionKey = &pb.EncryptionSettings_KmsKey_{ + KmsKey: &pb.EncryptionSettings_KmsKey{ + KeyId: kmsKeyID, + }, + } + } + + return &pb.MakeBackupRequest{ + ContainerId: containerID, + DatabaseName: databaseName, + DatabaseEndpoint: databaseEndpoint, + RootPath: rootPath, + SourcePaths: sourcePaths, + EncryptionSettings: encryptionSettings, + } +} + +func runEncryptedBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario encryptedBackupScenario) { + log.Printf("Running scenario: %s", scenario.name) + + tbwr, err := backupClient.MakeBackup(ctx, scenario.request) + if err != nil { + log.Panicf("scenario %s: failed to make backup: %v", scenario.name, err) + } + + op, err := opClient.GetOperation( + ctx, &pb.GetOperationRequest{ + Id: tbwr.Id, + }, + ) + if err != nil { + log.Panicf("scenario %s: failed to get operation: %v", scenario.name, err) + } + + if op.EncryptionSettings == nil { + log.Panicf("scenario %s: encryption settings are nil", scenario.name) + } + + if op.EncryptionSettings.GetKmsKey() == nil { + log.Panicf("scenario %s: KMS key is nil", scenario.name) + } + + if op.EncryptionSettings.GetKmsKey().GetKeyId() != testKmsKeyID { + log.Panicf("scenario %s: KMS key ID mismatch, expected %s, got %s", + scenario.name, testKmsKeyID, op.EncryptionSettings.GetKmsKey().GetKeyId()) + } + + if op.EncryptionSettings.GetAlgorithm() != pb.EncryptionSettings_AES_256_GCM { + log.Panicf("scenario %s: encryption algorithm is not AES_256_GCM", scenario.name) + } + + // Wait for operation handler + time.Sleep(time.Second * 3) + + ops, err := opClient.ListOperations( + ctx, &pb.ListOperationsRequest{ + ContainerId: containerID, + DatabaseNameMask: databaseName, + OperationTypes: []string{types.OperationTypeTB.String()}, + }, + ) + if err != nil { + log.Panicf("failed to list operations: %v", err) + } + + var tbOperation *pb.Operation + for _, op := range ops.Operations { + if op.GetParentOperationId() == tbwr.Id { + tbOperation = op + break + } + } + + if tbOperation == nil { + log.Panicf("scenario %s: TB operation not found", scenario.name) + } + + // Wait for backup to complete + done := false + var backup *pb.Backup + for range 30 { + backup, err = backupClient.GetBackup( + ctx, + &pb.GetBackupRequest{Id: tbOperation.BackupId}, + ) + if err != nil { + log.Panicf("scenario %s: failed to get backup: %v", scenario.name, err) + } + if backup.GetStatus().String() == types.BackupStateAvailable { + done = true + break + } + time.Sleep(time.Second) + } + if !done { + log.Panicf("scenario %s: failed to complete backup in 30 seconds", scenario.name) + } + + // Verify the backup has encryption settings + if backup.EncryptionSettings == nil { + log.Panicf("scenario %s: backup should have encryption settings", scenario.name) + } + + if backup.EncryptionSettings.GetKmsKey() == nil { + log.Panicf("scenario %s: backup should have KMS key", scenario.name) + } + + if backup.EncryptionSettings.GetKmsKey().GetKeyId() != testKmsKeyID { + log.Panicf("scenario %s: KMS key ID mismatch, expected %s, got %s", + scenario.name, testKmsKeyID, backup.EncryptionSettings.GetKmsKey().GetKeyId()) + } + + if backup.EncryptionSettings.GetAlgorithm() != pb.EncryptionSettings_AES_256_GCM { + log.Panicf("scenario %s: encryption algorithm is not AES_256_GCM", scenario.name) + } + + restoreRequest := &pb.MakeRestoreRequest{ + ContainerId: containerID, + BackupId: backup.Id, + DatabaseName: databaseName, + DatabaseEndpoint: databaseEndpoint, + DestinationPath: "/restored_backup_" + backup.Id, + SourcePaths: scenario.request.SourcePaths, + } + + restoreOperation, err := backupClient.MakeRestore(ctx, restoreRequest) + if err != nil { + log.Panicf("scenario %s: failed to make restore: %v", scenario.name, err) + } + + // Wait for restore operation to complete + done = false + for range 30 { + op, err := opClient.GetOperation( + ctx, + &pb.GetOperationRequest{Id: restoreOperation.Id}, + ) + if err != nil { + log.Panicf("scenario %s: failed to get restore operation: %v", scenario.name, err) + } + if op.GetStatus().String() == types.OperationStateDone.String() { + done = true + break + } + time.Sleep(time.Second) + } + if !done { + log.Panicf("scenario %s: failed to complete restore in 30 seconds", scenario.name) + } + + log.Printf("scenario %s: passed", scenario.name) +} + +func runNegativeEncryptedBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario negativeEncryptedBackupScenario) { + log.Printf("Running negative scenario: %s", scenario.name) + + _, err := backupClient.MakeBackup(ctx, scenario.request) + if err != nil { + st, ok := status.FromError(err) + if !ok { + log.Panicf("scenario %s: MakeBackup failed but couldn't extract status: %v", scenario.name, err) + } + if st.Code() != scenario.expectedStatus { + log.Panicf("scenario %s: expected status code %v, got %v: %v", scenario.name, scenario.expectedStatus, st.Code(), err) + } + log.Printf("scenario %s: passed", scenario.name) + } else { + log.Panicf("scenario %s: MakeBackup should fail with status code %v, but it succeeded", scenario.name, scenario.expectedStatus) + } +} + +func main() { + conn := common.CreateGRPCClient(ydbcpEndpoint) + defer func(conn *grpc.ClientConn) { + err := conn.Close() + if err != nil { + log.Panicln("failed to close connection") + } + }(conn) + backupClient := pb.NewBackupServiceClient(conn) + opClient := pb.NewOperationServiceClient(conn) + + ctx := context.Background() + + positiveScenarios := []encryptedBackupScenario{ + { + name: "full encrypted backup", + request: newEncryptedBackupRequest("", nil, testKmsKeyID), + }, + { + name: "partial encrypted backup", + request: newEncryptedBackupRequest("", []string{"kv_test"}, testKmsKeyID), + }, + { + name: "full encrypted backup with root path", + request: newEncryptedBackupRequest("stocks", nil, testKmsKeyID), + }, + { + name: "partial encrypted backup with root path", + request: newEncryptedBackupRequest("stocks", []string{"orders"}, testKmsKeyID), + }, + } + + for _, scenario := range positiveScenarios { + runEncryptedBackupScenario(ctx, backupClient, opClient, scenario) + time.Sleep(time.Second) + } + + negativeScenarios := []negativeEncryptedBackupScenario{ + { + name: "encrypted backup with empty kms key id", + request: newEncryptedBackupRequest("", nil, ""), + expectedStatus: codes.InvalidArgument, + }, + } + + for _, scenario := range negativeScenarios { + runNegativeEncryptedBackupScenario(ctx, backupClient, opClient, scenario) + } +} diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index f9b86981..02ac0969 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "github.com/ydb-platform/ydb-go-sdk/v3/log" "net/http" _ "net/http/pprof" "os" @@ -19,6 +18,7 @@ import ( "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/connectors/s3" "ydbcp/internal/handlers" + "ydbcp/internal/kms" "ydbcp/internal/metrics" "ydbcp/internal/processor" "ydbcp/internal/server" @@ -32,6 +32,9 @@ import ( "ydbcp/internal/watchers/schedule_watcher" "ydbcp/internal/watchers/ttl_watcher" ap "ydbcp/pkg/plugins/auth" + kp "ydbcp/pkg/plugins/kms" + + "github.com/ydb-platform/ydb-go-sdk/v3/log" "github.com/jonboulle/clockwork" @@ -114,6 +117,24 @@ func main() { } }() xlog.Info(ctx, "Initialized AuthProvider") + + var kmsProvider kp.KmsProvider + if len(configInstance.KMS.PluginPath) == 0 { + kmsProvider, err = kms.NewDummyKmsProvider(ctx) + } else { + kmsProvider, err = kms.NewKmsProvider(ctx, configInstance.KMS) + } + if err != nil { + xlog.Error(ctx, "Error init KmsProvider", zap.Error(err)) + os.Exit(1) + } + defer func() { + if err := kmsProvider.Close(ctx); err != nil { + xlog.Error(ctx, "Error close kms provider", zap.Error(err)) + } + }() + xlog.Info(ctx, "Initialized KmsProvider") + metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer, clockwork.NewRealClock()) xlog.Info(ctx, "Initialized metrics registry") audit.EventsDestination = configInstance.Audit.EventsDestination @@ -143,7 +164,9 @@ func main() { backup.NewBackupService( dbConnector, clientConnector, + s3Connector, authProvider, + kmsProvider, *configInstance, ).Register(server) operation.NewOperationService(dbConnector, authProvider).Register(server) @@ -189,9 +212,11 @@ func main() { handlers.NewTBWROperationHandler( dbConnector, clientConnector, + s3Connector, queries.NewWriteTableQuery, clockwork.NewRealClock(), *configInstance, + kmsProvider, ), ); err != nil { xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err)) @@ -210,7 +235,7 @@ func main() { xlog.Info(ctx, "Created TtlWatcher") } - backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock()) + backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock(), configInstance.FeatureFlags) schedule_watcher.NewScheduleWatcher( ctx, &wg, configInstance.OperationProcessor.ProcessorIntervalSeconds, dbConnector, diff --git a/docker-compose.yaml b/docker-compose.yaml index bcb64fd3..636fe4e1 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -109,6 +109,7 @@ services: S3_SECRET_KEY: ${S3_SECRET_KEY} YDB_NAME: ${YDB_NAME} ENABLE_NEW_PATHS_FORMAT: ${ENABLE_NEW_PATHS_FORMAT} + ENABLE_BACKUPS_ENCRYPTION: ${ENABLE_BACKUPS_ENCRYPTION} depends_on: setup_ydb: condition: service_completed_successfully diff --git a/internal/backup_operations/make_backup.go b/internal/backup_operations/make_backup.go index bb8ec5e6..b29c2a10 100644 --- a/internal/backup_operations/make_backup.go +++ b/internal/backup_operations/make_backup.go @@ -2,20 +2,24 @@ package backup_operations import ( "context" + "crypto/rand" "errors" "fmt" - "github.com/jonboulle/clockwork" - "github.com/ydb-platform/ydb-go-sdk/v3" "path" "regexp" "strings" "time" "ydbcp/internal/config" "ydbcp/internal/connectors/client" + s3connector "ydbcp/internal/connectors/s3" "ydbcp/internal/types" "ydbcp/internal/util/xlog" + kp "ydbcp/pkg/plugins/kms" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" + "github.com/jonboulle/clockwork" + "github.com/ydb-platform/ydb-go-sdk/v3" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -36,6 +40,7 @@ type MakeBackupInternalRequest struct { ScheduleID *string Ttl *time.Duration ParentOperationID *string + EncryptionSettings *pb.EncryptionSettings } func FromBackupSchedule(schedule *types.BackupSchedule) MakeBackupInternalRequest { @@ -65,6 +70,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter ScheduleID: tbwr.ScheduleID, Ttl: tbwr.Ttl, ParentOperationID: &tbwr.ID, + EncryptionSettings: tbwr.EncryptionSettings, } } @@ -282,9 +288,35 @@ func IsEmptyBackup(backup *types.Backup) bool { return backup.Size == 0 && backup.S3Endpoint == "" } +func GetEncryptionParams(settings *pb.EncryptionSettings) ([]byte, string, error) { + var algorithm string + var length int + + switch settings.Algorithm { + case pb.EncryptionSettings_UNSPECIFIED: + case pb.EncryptionSettings_AES_128_GCM: + algorithm = "AES-128-GCM" + length = 16 + case pb.EncryptionSettings_AES_256_GCM: + algorithm = "AES-256-GCM" + length = 32 + case pb.EncryptionSettings_CHACHA20_POLY1305: + algorithm = "ChaCha20-Poly1305" + length = 32 + } + + dek := make([]byte, length) + _, err := rand.Read(dek) + if err != nil { + return nil, "", err + } + return dek, algorithm, nil +} + func MakeBackup( ctx context.Context, clientConn client.ClientConnector, + s3Connector s3connector.S3Connector, s3 config.S3Config, allowedEndpointDomains []string, allowInsecureEndpoint bool, @@ -292,6 +324,7 @@ func MakeBackup( subject string, clock clockwork.Clock, featureFlags config.FeatureFlagsConfig, + kmsProvider kp.KmsProvider, ) (*types.Backup, *types.TakeBackupOperation, error) { if req.ScheduleID != nil { ctx = xlog.With(ctx, zap.String("ScheduleID", *req.ScheduleID)) @@ -359,6 +392,48 @@ func MakeBackup( S3ForcePathStyle: s3.S3ForcePathStyle, } + if req.EncryptionSettings != nil { + if featureFlags.EnableBackupsEncryption { + dek, algorithm, err := GetEncryptionParams(req.EncryptionSettings) + if err != nil { + return nil, nil, err + } + + s3Settings.EncryptionKey = dek + s3Settings.EncryptionAlgorithm = algorithm + + kmsKey := req.EncryptionSettings.GetKmsKey() + if kmsKey == nil { + xlog.Error(ctx, "kms key is not specified") + return nil, nil, status.Errorf(codes.InvalidArgument, "kms key is not specified") + } + + encryptResp, err := kmsProvider.Encrypt( + ctx, + &kp.EncryptRequest{ + KeyID: kmsKey.GetKeyId(), + Plaintext: dek, + }, + ) + + if err != nil { + xlog.Error(ctx, "can't encrypt data encryption key", zap.Error(err)) + return nil, nil, err + } + + dekKey := path.Join(destinationPrefix, "dek.encrypted") + err = s3Connector.PutObject(dekKey, s3.Bucket, encryptResp.Ciphertext) + if err != nil { + xlog.Error(ctx, "can't save encrypted DEK to S3", zap.Error(err), zap.String("dekKey", dekKey)) + return nil, nil, status.Errorf(codes.Internal, "can't save encrypted DEK to S3: %v", err) + } + xlog.Info(ctx, "encrypted DEK saved to S3", zap.String("dekKey", dekKey)) + } else { + xlog.Warn(ctx, "backup encryption is not enabled, using unencrypted backup") + req.EncryptionSettings = nil + } + } + clientOperationID, err := clientConn.ExportToS3(ctx, client, s3Settings, featureFlags) if err != nil { xlog.Error(ctx, "can't start export operation", zap.Error(err)) @@ -388,9 +463,10 @@ func MakeBackup( CreatedAt: now, Creator: subject, }, - ScheduleID: req.ScheduleID, - ExpireAt: expireAt, - SourcePaths: pathsForExport, + ScheduleID: req.ScheduleID, + ExpireAt: expireAt, + SourcePaths: pathsForExport, + EncryptionSettings: req.EncryptionSettings, } op := &types.TakeBackupOperation{ @@ -409,9 +485,10 @@ func MakeBackup( CreatedAt: now, Creator: subject, }, - YdbOperationId: clientOperationID, - UpdatedAt: now, - ParentOperationID: req.ParentOperationID, + YdbOperationId: clientOperationID, + UpdatedAt: now, + ParentOperationID: req.ParentOperationID, + EncryptionSettings: req.EncryptionSettings, } return backup, op, nil diff --git a/internal/config/config.go b/internal/config/config.go index 921345cb..fa2d9d60 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -65,8 +65,9 @@ type MetricsServerConfig struct { } type FeatureFlagsConfig struct { - DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"` - EnableNewPathsFormat bool `yaml:"enable_new_paths_format" default:"false"` + DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"` + EnableNewPathsFormat bool `yaml:"enable_new_paths_format" default:"false"` + EnableBackupsEncryption bool `yaml:"enable_backups_encryption" default:"false"` } type LogConfig struct { @@ -96,6 +97,7 @@ type Config struct { ClientConnection ClientConnectionConfig `yaml:"client_connection"` S3 S3Config `yaml:"s3"` Auth PluginConfig `yaml:"auth"` + KMS PluginConfig `yaml:"kms"` GRPCServer GRPCServerConfig `yaml:"grpc_server"` MetricsServer MetricsServerConfig `yaml:"metrics_server"` OperationProcessor OperationProcessorConfig `yaml:"operation_processor"` diff --git a/internal/connectors/client/connector.go b/internal/connectors/client/connector.go index a5bfb2fb..8513c09f 100644 --- a/internal/connectors/client/connector.go +++ b/internal/connectors/client/connector.go @@ -272,6 +272,17 @@ func (d *ClientYdbConnector) ExportToS3( exportRequest.Settings.DestinationPrefix = s3Settings.DestinationPrefix } + if featureFlags.EnableBackupsEncryption && len(s3Settings.EncryptionKey) > 0 { + exportRequest.Settings.EncryptionSettings = &Ydb_Export.EncryptionSettings{ + EncryptionAlgorithm: s3Settings.EncryptionAlgorithm, + Key: &Ydb_Export.EncryptionSettings_SymmetricKey_{ + SymmetricKey: &Ydb_Export.EncryptionSettings_SymmetricKey{ + Key: s3Settings.EncryptionKey, + }, + }, + } + } + response, err := exportClient.ExportToS3(ctx, exportRequest) if err != nil { @@ -425,6 +436,17 @@ func (d *ClientYdbConnector) ImportFromS3( importRequest.Settings.DestinationPath = path.Join(clientDb.Name(), s3Settings.DestinationPath) } + if len(s3Settings.EncryptionKey) > 0 { + importRequest.Settings.EncryptionSettings = &Ydb_Export.EncryptionSettings{ + EncryptionAlgorithm: s3Settings.EncryptionAlgorithm, + Key: &Ydb_Export.EncryptionSettings_SymmetricKey_{ + SymmetricKey: &Ydb_Export.EncryptionSettings_SymmetricKey{ + Key: s3Settings.EncryptionKey, + }, + }, + } + } + response, err := importClient.ImportFromS3(ctx, importRequest) if err != nil { diff --git a/internal/connectors/client/prepare_items_test.go b/internal/connectors/client/prepare_items_test.go index 1059f7b9..b8ec2b97 100644 --- a/internal/connectors/client/prepare_items_test.go +++ b/internal/connectors/client/prepare_items_test.go @@ -1,12 +1,12 @@ package client import ( - "github.com/aws/aws-sdk-go/aws" - "github.com/stretchr/testify/assert" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Import" "testing" "ydbcp/internal/connectors/s3" "ydbcp/internal/types" + + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Import" ) func deref(items []*Ydb_Import.ImportFromS3Settings_Item) []Ydb_Import.ImportFromS3Settings_Item { @@ -20,15 +20,9 @@ func deref(items []*Ydb_Import.ImportFromS3Settings_Item) []Ydb_Import.ImportFro func TestPrepareItemsForImport(t *testing.T) { s3ObjectsMap := make(map[string]s3.Bucket) s3ObjectsMap["bucket"] = s3.Bucket{ - "local/table_1/scheme.pb": { - Key: aws.String("local/table_1/scheme.pb"), - }, - "local/table_2/scheme.pb": { - Key: aws.String("local/table_2/scheme.pb"), - }, - "local/folder/table_3/scheme.pb": { - Key: aws.String("local/folder/table_3/scheme.pb"), - }, + "local/table_1/scheme.pb": []byte{}, + "local/table_2/scheme.pb": []byte{}, + "local/folder/table_3/scheme.pb": []byte{}, } s3Settings := types.ImportSettings{ diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index fd755ffd..d04da083 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -78,6 +78,9 @@ func ReadBackupFromResultSet(res query.Row) (*types.Backup, error) { completedAt *time.Time createdAt *time.Time expireAt *time.Time + + encryptionAlgorithm *string + kmsKeyId *string ) err := res.ScanNamed( @@ -99,6 +102,8 @@ func ReadBackupFromResultSet(res query.Row) (*types.Backup, error) { query.Named("created_at", &createdAt), query.Named("completed_at", &completedAt), query.Named("initiated", &creator), + query.Named("encryption_algorithm", &encryptionAlgorithm), + query.Named("kms_key_id", &kmsKeyId), ) if err != nil { return nil, err @@ -112,22 +117,39 @@ func ReadBackupFromResultSet(res query.Row) (*types.Backup, error) { } } + var encryptionSettings *pb.EncryptionSettings + if kmsKeyId != nil { + encryptionSettings = &pb.EncryptionSettings{} + encryptionSettings.KeyEncryptionKey = &pb.EncryptionSettings_KmsKey_{ + KmsKey: &pb.EncryptionSettings_KmsKey{ + KeyId: *kmsKeyId, + }, + } + + if encryptionAlgorithm != nil { + if algorithmValue, ok := pb.EncryptionSettings_Algorithm_value[*encryptionAlgorithm]; ok { + encryptionSettings.Algorithm = pb.EncryptionSettings_Algorithm(algorithmValue) + } + } + } + return &types.Backup{ - ID: backupId, - ContainerID: containerId, - DatabaseName: databaseName, - DatabaseEndpoint: databaseEndpoint, - S3Endpoint: StringOrEmpty(s3endpoint), - S3Region: StringOrEmpty(s3region), - S3Bucket: StringOrEmpty(s3bucket), - S3PathPrefix: StringOrEmpty(s3pathPrefix), - Status: StringOrDefault(status, types.BackupStateUnknown), - Message: StringOrEmpty(message), - AuditInfo: auditFromDb(creator, createdAt, completedAt), - Size: Int64OrZero(size), - ScheduleID: scheduleId, - ExpireAt: expireAt, - SourcePaths: sourcePathsSlice, + ID: backupId, + ContainerID: containerId, + DatabaseName: databaseName, + DatabaseEndpoint: databaseEndpoint, + S3Endpoint: StringOrEmpty(s3endpoint), + S3Region: StringOrEmpty(s3region), + S3Bucket: StringOrEmpty(s3bucket), + S3PathPrefix: StringOrEmpty(s3pathPrefix), + Status: StringOrDefault(status, types.BackupStateUnknown), + Message: StringOrEmpty(message), + AuditInfo: auditFromDb(creator, createdAt, completedAt), + Size: Int64OrZero(size), + ScheduleID: scheduleId, + ExpireAt: expireAt, + SourcePaths: sourcePathsSlice, + EncryptionSettings: encryptionSettings, }, nil } @@ -157,6 +179,9 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) { retries *uint32 retriesCount *uint32 maxBackoff *time.Duration + + encryptionAlgorithm *string + kmsKeyId *string ) err := res.ScanNamed( query.Named("id", &operationId), @@ -183,6 +208,8 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) { query.Named("retries", &retries), query.Named("retries_count", &retriesCount), query.Named("retries_max_backoff", &maxBackoff), + query.Named("encryption_algorithm", &encryptionAlgorithm), + query.Named("kms_key_id", &kmsKeyId), ) if err != nil { return nil, err @@ -211,6 +238,22 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) { updatedTs = timestamppb.New(*updatedAt) } + var encryptionSettings *pb.EncryptionSettings + if kmsKeyId != nil { + encryptionSettings = &pb.EncryptionSettings{} + encryptionSettings.KeyEncryptionKey = &pb.EncryptionSettings_KmsKey_{ + KmsKey: &pb.EncryptionSettings_KmsKey{ + KeyId: *kmsKeyId, + }, + } + + if encryptionAlgorithm != nil { + if algorithmValue, ok := pb.EncryptionSettings_Algorithm_value[*encryptionAlgorithm]; ok { + encryptionSettings.Algorithm = pb.EncryptionSettings_Algorithm(algorithmValue) + } + } + } + if operationType == string(types.OperationTypeTB) { if backupId == nil { return nil, fmt.Errorf("failed to read backup_id for TB operation: %s", operationId) @@ -232,6 +275,7 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) { Audit: auditFromDb(creator, createdAt, completedAt), UpdatedAt: updatedTs, ParentOperationID: parentOperationID, + EncryptionSettings: encryptionSettings, }, nil } else if operationType == string(types.OperationTypeRB) { if backupId == nil { @@ -309,6 +353,7 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) { SourcePathsToExclude: sourcePathsToExcludeSlice, Audit: auditFromDb(creator, createdAt, completedAt), UpdatedAt: updatedTs, + EncryptionSettings: encryptionSettings, }, ScheduleID: scheduleID, Ttl: ttl, @@ -342,6 +387,9 @@ func ReadBackupScheduleFromResultSet(res query.Row, withRPOInfo bool) (*types.Ba lastSuccessfulBackupID *string recoveryPoint *time.Time nextLaunch *time.Time + + encryptionAlgorithm *string + kmsKeyId *string ) namedValues := []query.NamedDestination{ @@ -361,6 +409,8 @@ func ReadBackupScheduleFromResultSet(res query.Row, withRPOInfo bool) (*types.Ba query.Named("paths_to_exclude", &sourcePathsToExclude), query.Named("recovery_point_objective", &recoveryPointObjective), query.Named("next_launch", &nextLaunch), + query.Named("encryption_algorithm", &encryptionAlgorithm), + query.Named("kms_key_id", &kmsKeyId), } if withRPOInfo { namedValues = append(namedValues, query.Named("last_backup_id", &lastBackupID)) @@ -401,6 +451,22 @@ func ReadBackupScheduleFromResultSet(res query.Row, withRPOInfo bool) (*types.Ba rpoDuration = durationpb.New(*recoveryPointObjective) } + var encryptionSettings *pb.EncryptionSettings + if kmsKeyId != nil { + encryptionSettings = &pb.EncryptionSettings{} + encryptionSettings.KeyEncryptionKey = &pb.EncryptionSettings_KmsKey_{ + KmsKey: &pb.EncryptionSettings_KmsKey{ + KeyId: *kmsKeyId, + }, + } + + if encryptionAlgorithm != nil { + if algorithmValue, ok := pb.EncryptionSettings_Algorithm_value[*encryptionAlgorithm]; ok { + encryptionSettings.Algorithm = pb.EncryptionSettings_Algorithm(algorithmValue) + } + } + } + return &types.BackupSchedule{ ID: ID, ContainerID: containerID, @@ -416,6 +482,7 @@ func ReadBackupScheduleFromResultSet(res query.Row, withRPOInfo bool) (*types.Ba SchedulePattern: &pb.BackupSchedulePattern{Crontab: crontab}, Ttl: ttlDuration, RecoveryPointObjective: rpoDuration, + EncryptionSettings: encryptionSettings, }, NextLaunch: nextLaunch, LastBackupID: lastBackupID, diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index c62da9ff..17aa94b5 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -132,6 +132,16 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle if tb.ParentOperationID != nil { d.AddValueParam("$parent_operation_id", table_types.StringValueFromString(*tb.ParentOperationID)) } + if tb.EncryptionSettings != nil { + d.AddValueParam( + "$encryption_algorithm", + table_types.StringValueFromString(tb.EncryptionSettings.GetAlgorithm().String()), + ) + d.AddValueParam( + "$kms_key_id", + table_types.StringValueFromString(tb.EncryptionSettings.GetKmsKey().GetKeyId()), + ) + } } else if operation.GetType() == types.OperationTypeTBWR { tbwr, ok := operation.(*types.TakeBackupWithRetryOperation) if !ok { @@ -184,6 +194,16 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle if tbwr.Ttl != nil { d.AddValueParam("$ttl", table_types.IntervalValueFromDuration(*tbwr.Ttl)) } + if tbwr.EncryptionSettings != nil { + d.AddValueParam( + "$encryption_algorithm", + table_types.StringValueFromString(tbwr.EncryptionSettings.GetAlgorithm().String()), + ) + d.AddValueParam( + "$kms_key_id", + table_types.StringValueFromString(tbwr.EncryptionSettings.GetKmsKey().GetKeyId()), + ) + } } else if operation.GetType() == types.OperationTypeRB { rb, ok := operation.(*types.RestoreBackupOperation) if !ok { @@ -336,6 +356,17 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl if b.ExpireAt != nil { d.AddValueParam("$expire_at", table_types.TimestampValueFromTime(*b.ExpireAt)) } + + if b.EncryptionSettings != nil { + d.AddValueParam( + "$encryption_algorithm", + table_types.StringValueFromString(b.EncryptionSettings.GetAlgorithm().String()), + ) + d.AddValueParam( + "$kms_key_id", + table_types.StringValueFromString(b.EncryptionSettings.GetKmsKey().GetKeyId()), + ) + } return d } @@ -389,6 +420,17 @@ func BuildCreateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr "$recovery_point_objective", table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()), ) + + if schedule.ScheduleSettings.EncryptionSettings != nil { + d.AddValueParam( + "$encryption_algorithm", + table_types.StringValueFromString(schedule.ScheduleSettings.EncryptionSettings.GetAlgorithm().String()), + ) + d.AddValueParam( + "$kms_key_id", + table_types.StringValueFromString(schedule.ScheduleSettings.EncryptionSettings.GetKmsKey().GetKeyId()), + ) + } } if schedule.NextLaunch != nil { d.AddValueParam("$next_launch", table_types.TimestampValueFromTime(*schedule.NextLaunch)) diff --git a/internal/connectors/s3/connector.go b/internal/connectors/s3/connector.go index 1b642066..a90097b4 100644 --- a/internal/connectors/s3/connector.go +++ b/internal/connectors/s3/connector.go @@ -1,7 +1,9 @@ package s3 import ( + "bytes" "fmt" + "io" "strings" "ydbcp/internal/config" @@ -16,6 +18,8 @@ type S3Connector interface { ListObjects(pathPrefix string, bucket string) ([]string, int64, error) GetSize(pathPrefix string, bucket string) (int64, error) DeleteObjects(keys []string, bucket string) error + PutObject(key string, bucket string, data []byte) error + GetObject(key string, bucket string) ([]byte, error) } type ClientS3Connector struct { @@ -151,3 +155,37 @@ func (c *ClientS3Connector) DeleteObjects(keys []string, bucket string) error { return nil } + +func (c *ClientS3Connector) PutObject(key string, bucket string, data []byte) error { + _, err := c.s3.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + }) + if err != nil { + return fmt.Errorf("failed to put object %s to bucket %s: %w", key, bucket, err) + } + return nil +} + +func (c *ClientS3Connector) GetObject(key string, bucket string) ([]byte, error) { + result, err := c.s3.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return nil, fmt.Errorf("failed to get object %s from bucket %s: %w", key, bucket, err) + } + defer func() { + if result.Body != nil { + _ = result.Body.Close() + } + }() + + data, err := io.ReadAll(result.Body) + if err != nil { + return nil, fmt.Errorf("failed to read object %s from bucket %s: %w", key, bucket, err) + } + + return data, nil +} diff --git a/internal/connectors/s3/mock.go b/internal/connectors/s3/mock.go index 3ef7ac43..27b0ed3a 100644 --- a/internal/connectors/s3/mock.go +++ b/internal/connectors/s3/mock.go @@ -1,12 +1,14 @@ package s3 import ( + "fmt" "strings" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" ) -type Bucket map[string]*s3.Object +type Bucket map[string][]byte type MockS3Connector struct { storage map[string]Bucket @@ -21,7 +23,7 @@ func (m *MockS3Connector) ListObjectsPages(input *s3.ListObjectsInput, fn func(* var s3objs []*s3.Object for i := 0; i < len(objects); i++ { s3objs = append(s3objs, &s3.Object{ - Key: &objects[i], + Key: aws.String(objects[i]), }) } @@ -40,13 +42,10 @@ func (m *MockS3Connector) ListObjects(pathPrefix string, bucketName string) ([]s var size int64 if bucket, ok := m.storage[bucketName]; ok { - for key, object := range bucket { + for key, data := range bucket { if strings.HasPrefix(key, pathPrefix) { objects = append(objects, key) - - if object.Size != nil { - size += *object.Size - } + size += int64(len(data)) } } } @@ -58,11 +57,9 @@ func (m *MockS3Connector) GetSize(pathPrefix string, bucketName string) (int64, var size int64 if bucket, ok := m.storage[bucketName]; ok { - for key, object := range bucket { + for key, data := range bucket { if strings.HasPrefix(key, pathPrefix) { - if object.Size != nil { - size += *object.Size - } + size += int64(len(data)) } } } @@ -79,3 +76,31 @@ func (m *MockS3Connector) DeleteObjects(objects []string, bucketName string) err return nil } + +func (m *MockS3Connector) PutObject(key string, bucketName string, data []byte) error { + if _, ok := m.storage[bucketName]; !ok { + m.storage[bucketName] = make(Bucket) + } + + dataCopy := make([]byte, len(data)) + copy(dataCopy, data) + m.storage[bucketName][key] = dataCopy + + return nil +} + +func (m *MockS3Connector) GetObject(key string, bucketName string) ([]byte, error) { + bucket, ok := m.storage[bucketName] + if !ok { + return nil, fmt.Errorf("bucket %s not found", bucketName) + } + + data, ok := bucket[key] + if !ok { + return nil, fmt.Errorf("object %s not found in bucket %s", key, bucketName) + } + + dataCopy := make([]byte, len(data)) + copy(dataCopy, data) + return dataCopy, nil +} diff --git a/internal/handlers/delete_backup_test.go b/internal/handlers/delete_backup_test.go index 961d695a..6cf067ed 100644 --- a/internal/handlers/delete_backup_test.go +++ b/internal/handlers/delete_backup_test.go @@ -3,7 +3,6 @@ package handlers import ( "context" "fmt" - "github.com/aws/aws-sdk-go/service/s3" "testing" "ydbcp/internal/metrics" @@ -14,7 +13,6 @@ import ( "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" - "github.com/aws/aws-sdk-go/aws" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -101,18 +99,9 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { backupMap[backupID] = backup opMap[opId] = &dbOp s3ObjectsMap["bucket"] = s3Client.Bucket{ - "pathPrefix/data_1.csv": { - Key: aws.String("data_1.csv"), - Size: aws.Int64(100), - }, - "pathPrefix/data_2.csv": { - Key: aws.String("data_2.csv"), - Size: aws.Int64(150), - }, - "pathPrefix/data_3.csv": { - Key: aws.String("data_3.csv"), - Size: aws.Int64(200), - }, + "pathPrefix/data_1.csv": make([]byte, 100), + "pathPrefix/data_2.csv": make([]byte, 150), + "pathPrefix/data_3.csv": make([]byte, 200), } dbConnector := db.NewMockDBConnector( @@ -181,18 +170,9 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { backupMap[backupID] = backup opMap[opId] = &dbOp s3ObjectsMap["bucket"] = s3Client.Bucket{ - "pathPrefix/data_1.csv": { - Key: aws.String("data_1.csv"), - Size: aws.Int64(100), - }, - "pathPrefix/data_2.csv": { - Key: aws.String("data_2.csv"), - Size: aws.Int64(150), - }, - "pathPrefix/data_3.csv": { - Key: aws.String("data_3.csv"), - Size: aws.Int64(200), - }, + "pathPrefix/data_1.csv": make([]byte, 100), + "pathPrefix/data_2.csv": make([]byte, 150), + "pathPrefix/data_3.csv": make([]byte, 200), } dbConnector := db.NewMockDBConnector( @@ -260,18 +240,9 @@ func TestDBOperationHandlerUnexpectedBackupStatus(t *testing.T) { backupMap[backupID] = backup opMap[opId] = &dbOp s3ObjectsMap["bucket"] = s3Client.Bucket{ - "pathPrefix/data_1.csv": { - Key: aws.String("data_1.csv"), - Size: aws.Int64(100), - }, - "pathPrefix/data_2.csv": { - Key: aws.String("data_2.csv"), - Size: aws.Int64(150), - }, - "pathPrefix/data_3.csv": { - Key: aws.String("data_3.csv"), - Size: aws.Int64(200), - }, + "pathPrefix/data_1.csv": make([]byte, 100), + "pathPrefix/data_2.csv": make([]byte, 150), + "pathPrefix/data_3.csv": make([]byte, 200), } dbConnector := db.NewMockDBConnector( @@ -335,11 +306,7 @@ func TestDBOperationHandlerDeleteMoreThanAllowedLimit(t *testing.T) { s3ObjectsMap["bucket"] = make(s3Client.Bucket) for i := 0; i < objectsListSize; i++ { bucket := s3ObjectsMap["bucket"] - - bucket[fmt.Sprintf("pathPrefix/data_%d.csv", i)] = &s3.Object{ - Key: aws.String(fmt.Sprintf("data_%d.csv", i)), - Size: aws.Int64(10), - } + bucket[fmt.Sprintf("pathPrefix/data_%d.csv", i)] = make([]byte, 10) } dbConnector := db.NewMockDBConnector( diff --git a/internal/handlers/schedule_backup.go b/internal/handlers/schedule_backup.go index 1708427c..e89441b4 100644 --- a/internal/handlers/schedule_backup.go +++ b/internal/handlers/schedule_backup.go @@ -8,6 +8,7 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" "ydbcp/internal/audit" + "ydbcp/internal/config" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" @@ -20,11 +21,13 @@ type BackupScheduleHandlerType func(context.Context, db.DBConnector, *types.Back func NewBackupScheduleHandler( queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, + featureFlags config.FeatureFlagsConfig, ) BackupScheduleHandlerType { return func(ctx context.Context, driver db.DBConnector, schedule *types.BackupSchedule) error { return BackupScheduleHandler( ctx, driver, schedule, queryBuilderFactory, clock, + featureFlags, ) } } @@ -46,6 +49,7 @@ func BackupScheduleHandler( schedule *types.BackupSchedule, queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, + featureFlags config.FeatureFlagsConfig, ) error { if schedule.Status != types.BackupScheduleStateActive { xlog.Error(ctx, "backup schedule is not active", zap.String("ScheduleID", schedule.ID)) @@ -86,6 +90,10 @@ func BackupScheduleHandler( d := schedule.ScheduleSettings.Ttl.AsDuration() tbwr.Ttl = &d } + + if schedule.ScheduleSettings.EncryptionSettings != nil && featureFlags.EnableBackupsEncryption { + tbwr.EncryptionSettings = schedule.ScheduleSettings.EncryptionSettings + } } xlog.Info( diff --git a/internal/handlers/schedule_backup_test.go b/internal/handlers/schedule_backup_test.go index a4bc4b50..d5c4d797 100644 --- a/internal/handlers/schedule_backup_test.go +++ b/internal/handlers/schedule_backup_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "testing" "time" + "ydbcp/internal/config" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" @@ -40,7 +41,7 @@ func TestBackupScheduleHandler(t *testing.T) { ) handler := NewBackupScheduleHandler( - queries.NewWriteTableQueryMock, clock, + queries.NewWriteTableQueryMock, clock, config.FeatureFlagsConfig{}, ) err := handler(ctx, dbConnector, &schedule) assert.Empty(t, err) diff --git a/internal/handlers/take_backup_retry.go b/internal/handlers/take_backup_retry.go index 0507866c..d3663342 100644 --- a/internal/handlers/take_backup_retry.go +++ b/internal/handlers/take_backup_retry.go @@ -4,10 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/jonboulle/clockwork" - table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" - "go.uber.org/zap" - "google.golang.org/protobuf/types/known/timestamppb" "math" "strings" "time" @@ -17,22 +13,31 @@ import ( "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/connectors/s3" "ydbcp/internal/metrics" "ydbcp/internal/types" "ydbcp/internal/util/xlog" + kp "ydbcp/pkg/plugins/kms" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" + + "github.com/jonboulle/clockwork" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" ) func NewTBWROperationHandler( db db.DBConnector, client client.ClientConnector, + s3Connector s3.S3Connector, queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, config config.Config, + kmsProvider kp.KmsProvider, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { err := TBWROperationHandler( - ctx, op, db, client, config.S3, config.ClientConnection, queryBuilderFactory, clock, config.FeatureFlags, + ctx, op, db, client, s3Connector, config.S3, config.ClientConnection, queryBuilderFactory, clock, config.FeatureFlags, kmsProvider, ) if err == nil { metrics.GlobalMetricsRegistry.ReportOperationMetrics(op) @@ -223,11 +228,13 @@ func TBWROperationHandler( operation types.Operation, db db.DBConnector, clientConn client.ClientConnector, + s3Connector s3.S3Connector, s3 config.S3Config, clientConfig config.ClientConnectionConfig, queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, featureFlags config.FeatureFlagsConfig, + kmsProvider kp.KmsProvider, ) error { ctx = xlog.With(ctx, zap.String("OperationID", operation.GetID())) @@ -329,6 +336,7 @@ func TBWROperationHandler( backup, tb, err := backup_operations.MakeBackup( ctx, clientConn, + s3Connector, s3, clientConfig.AllowedEndpointDomains, clientConfig.AllowInsecureEndpoint, @@ -336,6 +344,7 @@ func TBWROperationHandler( types.OperationCreatorName, clock, featureFlags, + kmsProvider, ) tbwr.IncRetries() diff --git a/internal/handlers/take_backup_retry_test.go b/internal/handlers/take_backup_retry_test.go index 76418bb4..71d36dd2 100644 --- a/internal/handlers/take_backup_retry_test.go +++ b/internal/handlers/take_backup_retry_test.go @@ -3,20 +3,23 @@ package handlers import ( "context" "fmt" - "github.com/jonboulle/clockwork" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" "testing" "time" "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/connectors/s3" + "ydbcp/internal/kms" "ydbcp/internal/metrics" "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" ) var ( @@ -353,9 +356,11 @@ func TestTBWRHandlerSuccess(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, + s3.NewMockS3Connector(make(map[string]s3.Bucket)), queries.NewWriteTableQueryMock, clock, config.Config{}, + kms.NewMockKmsProvider(nil), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -417,9 +422,11 @@ func TestTBWRHandlerSkipRunning(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, + s3.NewMockS3Connector(make(map[string]s3.Bucket)), queries.NewWriteTableQueryMock, clock, config.Config{}, + kms.NewMockKmsProvider(nil), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -482,9 +489,11 @@ func TestTBWRHandlerSkipError(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, + s3.NewMockS3Connector(make(map[string]s3.Bucket)), queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t3.AsTime()), config.Config{}, + kms.NewMockKmsProvider(nil), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -539,9 +548,11 @@ func TestTBWRHandlerError(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, + s3.NewMockS3Connector(make(map[string]s3.Bucket)), queries.NewWriteTableQueryMock, clock, config.Config{}, + kms.NewMockKmsProvider(nil), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -593,6 +604,7 @@ func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, + s3.NewMockS3Connector(make(map[string]s3.Bucket)), queries.NewWriteTableQueryMock, clock, config.Config{ @@ -604,6 +616,7 @@ func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { AllowInsecureEndpoint: true, }, }, + kms.NewMockKmsProvider(nil), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -667,6 +680,7 @@ func TestTBWRHandlerEmptyDatabase(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, + s3.NewMockS3Connector(make(map[string]s3.Bucket)), queries.NewWriteTableQueryMock, clock, config.Config{ @@ -678,6 +692,7 @@ func TestTBWRHandlerEmptyDatabase(t *testing.T) { AllowInsecureEndpoint: true, }, }, + kms.NewMockKmsProvider(nil), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -742,6 +757,7 @@ func TestTBWRHandlerInvalidEndpointRetry(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, + s3.NewMockS3Connector(make(map[string]s3.Bucket)), queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), config.Config{ @@ -753,6 +769,7 @@ func TestTBWRHandlerInvalidEndpointRetry(t *testing.T) { AllowInsecureEndpoint: true, }, }, + kms.NewMockKmsProvider(nil), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -802,6 +819,7 @@ func TestTBWRHandlerInvalidEndpointError(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, + s3.NewMockS3Connector(make(map[string]s3.Bucket)), queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), config.Config{ @@ -813,6 +831,7 @@ func TestTBWRHandlerInvalidEndpointError(t *testing.T) { AllowInsecureEndpoint: true, }, }, + kms.NewMockKmsProvider(nil), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -867,6 +886,7 @@ func TestTBWRHandlerStartCancel(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, + s3.NewMockS3Connector(make(map[string]s3.Bucket)), queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), config.Config{ @@ -878,6 +898,7 @@ func TestTBWRHandlerStartCancel(t *testing.T) { AllowInsecureEndpoint: true, }, }, + kms.NewMockKmsProvider(nil), ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -946,6 +967,7 @@ func TestTBWRHandlerFullCancel(t *testing.T) { handler := NewTBWROperationHandler( dbConnector, clientConnector, + s3.NewMockS3Connector(make(map[string]s3.Bucket)), queries.NewWriteTableQueryMock, clock, config.Config{ @@ -957,6 +979,7 @@ func TestTBWRHandlerFullCancel(t *testing.T) { AllowInsecureEndpoint: true, }, }, + kms.NewMockKmsProvider(nil), ) err := handler(ctx, &tbwr) assert.Empty(t, err) diff --git a/internal/handlers/take_backup_test.go b/internal/handlers/take_backup_test.go index ee6aecc2..95307670 100644 --- a/internal/handlers/take_backup_test.go +++ b/internal/handlers/take_backup_test.go @@ -14,7 +14,6 @@ import ( "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" - "github.com/aws/aws-sdk-go/aws" "github.com/stretchr/testify/assert" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" @@ -252,18 +251,9 @@ func TestTBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp s3ObjectsMap["bucket"] = s3Client.Bucket{ - "pathPrefix/data_1.csv": { - Key: aws.String("data_1.csv"), - Size: aws.Int64(100), - }, - "pathPrefix/data_2.csv": { - Key: aws.String("data_2.csv"), - Size: aws.Int64(150), - }, - "pathPrefix/data_3.csv": { - Key: aws.String("data_3.csv"), - Size: aws.Int64(200), - }, + "pathPrefix/data_1.csv": make([]byte, 100), + "pathPrefix/data_2.csv": make([]byte, 150), + "pathPrefix/data_3.csv": make([]byte, 200), } dbConnector := db.NewMockDBConnector( db.WithBackups(backupMap), @@ -571,18 +561,9 @@ func TestTBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp s3ObjectsMap["bucket"] = s3Client.Bucket{ - "pathPrefix/data_1.csv": { - Key: aws.String("data_1.csv"), - Size: aws.Int64(100), - }, - "pathPrefix/data_2.csv": { - Key: aws.String("data_2.csv"), - Size: aws.Int64(150), - }, - "pathPrefix/data_3.csv": { - Key: aws.String("data_3.csv"), - Size: aws.Int64(200), - }, + "pathPrefix/data_1.csv": make([]byte, 100), + "pathPrefix/data_2.csv": make([]byte, 150), + "pathPrefix/data_3.csv": make([]byte, 200), } dbConnector := db.NewMockDBConnector( db.WithBackups(backupMap), @@ -740,18 +721,9 @@ func TestTBOperationHandlerCancellingOperationCancelledWithRemovingDataFromS3(t opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp s3ObjectsMap["bucket"] = s3Client.Bucket{ - "pathPrefix/data_1.csv": { - Key: aws.String("data_1.csv"), - Size: aws.Int64(100), - }, - "pathPrefix/data_2.csv": { - Key: aws.String("data_2.csv"), - Size: aws.Int64(150), - }, - "pathPrefix/data_3.csv": { - Key: aws.String("data_3.csv"), - Size: aws.Int64(200), - }, + "pathPrefix/data_1.csv": make([]byte, 100), + "pathPrefix/data_2.csv": make([]byte, 150), + "pathPrefix/data_3.csv": make([]byte, 200), } dbConnector := db.NewMockDBConnector( db.WithBackups(backupMap), diff --git a/internal/kms/dummy.go b/internal/kms/dummy.go new file mode 100644 index 00000000..c2e2ad20 --- /dev/null +++ b/internal/kms/dummy.go @@ -0,0 +1,62 @@ +package kms + +import ( + "context" + "ydbcp/internal/util/xlog" + "ydbcp/pkg/plugins/kms" + + "go.uber.org/zap" +) + +type kmsProviderDummy struct { +} + +func (p *kmsProviderDummy) Init(ctx context.Context, _ string) error { + xlog.Info(ctx, "KmsProviderDummy was initialized successfully") + return nil +} + +func (p *kmsProviderDummy) Close(ctx context.Context) error { + xlog.Info(ctx, "KmsProviderDummy was closed") + return nil +} + +func (p *kmsProviderDummy) Encrypt( + ctx context.Context, + req *kms.EncryptRequest, +) (*kms.EncryptResponse, error) { + xlog.Info( + ctx, + "KmsProviderDummy Encrypt", + zap.String("KeyID", req.KeyID), + ) + + return &kms.EncryptResponse{ + KeyID: req.KeyID, + Ciphertext: req.Plaintext, + }, nil +} + +func (p *kmsProviderDummy) Decrypt( + ctx context.Context, + req *kms.DecryptRequest, +) (*kms.DecryptResponse, error) { + xlog.Info( + ctx, + "KmsProviderDummy Decrypt", + zap.String("KeyID", req.KeyID), + ) + + return &kms.DecryptResponse{ + KeyID: req.KeyID, + Plaintext: req.Ciphertext, + }, nil +} + +func NewDummyKmsProvider(ctx context.Context) (kms.KmsProvider, error) { + p := &kmsProviderDummy{} + if err := p.Init(ctx, ""); err != nil { + return nil, err + } + return p, nil +} diff --git a/internal/server/services/backup/backup_service.go b/internal/server/services/backup/backup_service.go index 46f1d623..6b267945 100644 --- a/internal/server/services/backup/backup_service.go +++ b/internal/server/services/backup/backup_service.go @@ -2,7 +2,7 @@ package backup import ( "context" - "github.com/jonboulle/clockwork" + "path" "strconv" "time" "ydbcp/internal/audit" @@ -12,14 +12,19 @@ import ( "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + s3connector "ydbcp/internal/connectors/s3" "ydbcp/internal/metrics" "ydbcp/internal/server" "ydbcp/internal/types" "ydbcp/internal/util/helpers" "ydbcp/internal/util/xlog" ap "ydbcp/pkg/plugins/auth" + kp "ydbcp/pkg/plugins/kms" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" + "github.com/jonboulle/clockwork" + "google.golang.org/protobuf/proto" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -31,8 +36,10 @@ type BackupService struct { pb.UnimplementedBackupServiceServer driver db.DBConnector clientConn client.ClientConnector + s3Connector s3connector.S3Connector s3 config.S3Config auth ap.AuthProvider + kms kp.KmsProvider allowedEndpointDomains []string allowInsecureEndpoint bool clock clockwork.Clock @@ -117,9 +124,19 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques return nil, status.Error(codes.Unimplemented, "backup root path is not supported yet") } + var encryptionSettings *pb.EncryptionSettings if req.EncryptionSettings != nil { - s.IncApiCallsCounter(methodName, codes.Unimplemented) - return nil, status.Error(codes.Unimplemented, "backup encryption is not supported yet") + if !s.featureFlags.EnableBackupsEncryption { + s.IncApiCallsCounter(methodName, codes.Unimplemented) + return nil, status.Error(codes.Unimplemented, "backup encryption is not supported yet") + } + + if req.EncryptionSettings.GetKeyEncryptionKey() == nil { + s.IncApiCallsCounter(methodName, codes.InvalidArgument) + return nil, status.Error(codes.InvalidArgument, "encryption key is required") + } + + encryptionSettings = proto.Clone(req.EncryptionSettings).(*pb.EncryptionSettings) } tbwr := &types.TakeBackupWithRetryOperation{ @@ -138,7 +155,8 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques Creator: subject, CreatedAt: now, }, - UpdatedAt: now, + UpdatedAt: now, + EncryptionSettings: encryptionSettings, }, Retries: 0, RetryConfig: &pb.RetryConfig{ @@ -397,6 +415,31 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ DestinationPath: req.GetDestinationPath(), } + if backup.EncryptionSettings != nil { + dekKey := path.Join(backup.S3PathPrefix, "dek.encrypted") + encryptedDEK, err := s.s3Connector.GetObject(dekKey, s.s3.Bucket) + if err != nil { + xlog.Error(ctx, "can't read encrypted DEK from S3", zap.Error(err), zap.String("dekKey", dekKey)) + s.IncApiCallsCounter(methodName, codes.Internal) + return nil, status.Errorf(codes.Internal, "can't read encrypted DEK from S3: %v", err) + } + + decryptResp, err := s.kms.Decrypt(ctx, &kp.DecryptRequest{ + KeyID: backup.EncryptionSettings.GetKmsKey().GetKeyId(), + Ciphertext: encryptedDEK, + }) + + if err != nil { + xlog.Error(ctx, "can't decrypt data encryption key", zap.Error(err)) + s.IncApiCallsCounter(methodName, codes.Internal) + return nil, status.Error(codes.Internal, "can't decrypt data encryption key") + } + + _, algorithm, _ := backup_operations.GetEncryptionParams(backup.EncryptionSettings) + s3Settings.EncryptionAlgorithm = algorithm + s3Settings.EncryptionKey = decryptResp.Plaintext + } + clientOperationID, err := s.clientConn.ImportFromS3(ctx, clientDriver, s3Settings, s.featureFlags) if err != nil { xlog.Error(ctx, "can't start import operation", zap.Error(err)) @@ -613,14 +656,18 @@ func (s *BackupService) Register(server server.Server) { func NewBackupService( driver db.DBConnector, clientConn client.ClientConnector, + s3Connector s3connector.S3Connector, auth ap.AuthProvider, + kms kp.KmsProvider, config config.Config, ) *BackupService { return &BackupService{ driver: driver, clientConn: clientConn, + s3Connector: s3Connector, s3: config.S3, auth: auth, + kms: kms, allowedEndpointDomains: config.ClientConnection.AllowedEndpointDomains, allowInsecureEndpoint: config.ClientConnection.AllowInsecureEndpoint, clock: clockwork.NewRealClock(), diff --git a/internal/server/services/backup_schedule/backup_schedule_service.go b/internal/server/services/backup_schedule/backup_schedule_service.go index 97c2e340..a772b9a0 100644 --- a/internal/server/services/backup_schedule/backup_schedule_service.go +++ b/internal/server/services/backup_schedule/backup_schedule_service.go @@ -3,9 +3,6 @@ package backup_schedule import ( "context" "fmt" - "github.com/jonboulle/clockwork" - "github.com/ydb-platform/ydb-go-sdk/v3/table" - "google.golang.org/protobuf/types/known/durationpb" "strconv" "time" "ydbcp/internal/audit" @@ -23,6 +20,10 @@ import ( ap "ydbcp/pkg/plugins/auth" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" + "github.com/jonboulle/clockwork" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "google.golang.org/protobuf/types/known/durationpb" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -120,8 +121,15 @@ func (s *BackupScheduleService) CreateBackupSchedule( } if request.ScheduleSettings.EncryptionSettings != nil { - s.IncApiCallsCounter(methodName, codes.Unimplemented) - return nil, status.Error(codes.Unimplemented, "backup encryption is not supported yet") + if !s.config.FeatureFlags.EnableBackupsEncryption { + s.IncApiCallsCounter(methodName, codes.Unimplemented) + return nil, status.Error(codes.Unimplemented, "backup encryption is not supported yet") + } + + if request.ScheduleSettings.EncryptionSettings.GetKeyEncryptionKey() == nil { + s.IncApiCallsCounter(methodName, codes.InvalidArgument) + return nil, status.Error(codes.InvalidArgument, "encryption key is required") + } } if request.ScheduleSettings.RecoveryPointObjective != nil && (request.ScheduleSettings.RecoveryPointObjective.Seconds == 0) { diff --git a/internal/types/backup.go b/internal/types/backup.go index 4b775f51..a1207bb4 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -37,21 +37,22 @@ func ParseObjectID(string string) (string, error) { } type Backup struct { - ID string - ContainerID string - DatabaseName string - DatabaseEndpoint string - S3Endpoint string - S3Region string - S3Bucket string - S3PathPrefix string - Status string - Message string - AuditInfo *pb.AuditInfo - Size int64 - ScheduleID *string - ExpireAt *time.Time - SourcePaths []string + ID string + ContainerID string + DatabaseName string + DatabaseEndpoint string + S3Endpoint string + S3Region string + S3Bucket string + S3PathPrefix string + Status string + Message string + AuditInfo *pb.AuditInfo + Size int64 + ScheduleID *string + ExpireAt *time.Time + SourcePaths []string + EncryptionSettings *pb.EncryptionSettings } func (o *Backup) String() string { @@ -77,12 +78,13 @@ func (o *Backup) Proto() *pb.Backup { Bucket: o.S3Bucket, PathPrefix: o.S3PathPrefix, }, - Audit: o.AuditInfo, - Size: o.Size, - Status: pb.Backup_Status(pb.Backup_Status_value[o.Status]), - Message: o.Message, - ExpireAt: nil, - SourcePaths: o.SourcePaths, + Audit: o.AuditInfo, + Size: o.Size, + Status: pb.Backup_Status(pb.Backup_Status_value[o.Status]), + Message: o.Message, + ExpireAt: nil, + SourcePaths: o.SourcePaths, + EncryptionSettings: o.EncryptionSettings, } if o.ScheduleID != nil { backup.ScheduleId = *o.ScheduleID diff --git a/internal/types/operation.go b/internal/types/operation.go index ea6a50fb..5eb3947b 100644 --- a/internal/types/operation.go +++ b/internal/types/operation.go @@ -51,6 +51,7 @@ type TakeBackupOperation struct { Audit *pb.AuditInfo UpdatedAt *timestamppb.Timestamp ParentOperationID *string + EncryptionSettings *pb.EncryptionSettings } func (o *TakeBackupOperation) GetID() string { @@ -117,6 +118,7 @@ func (o *TakeBackupOperation) Proto() *pb.Operation { Message: o.Message, UpdatedAt: o.UpdatedAt, TypeDescription: o.GetTypeDescription(), + EncryptionSettings: o.EncryptionSettings, } if o.ParentOperationID != nil { op.ParentOperationId = *o.ParentOperationID @@ -354,6 +356,7 @@ func (o *TakeBackupWithRetryOperation) Proto() *pb.Operation { UpdatedAt: o.UpdatedAt, RetryConfig: o.RetryConfig, TypeDescription: o.GetTypeDescription(), + EncryptionSettings: o.EncryptionSettings, } } @@ -376,8 +379,9 @@ func (o *TakeBackupWithRetryOperation) SpawnNewTBOperation(backupID string, subj Creator: subject, CreatedAt: timestamppb.Now(), }, - UpdatedAt: timestamppb.Now(), - ParentOperationID: &o.ID, + UpdatedAt: timestamppb.Now(), + ParentOperationID: &o.ID, + EncryptionSettings: o.EncryptionSettings, } } diff --git a/internal/types/settings.go b/internal/types/settings.go index 6f7661fb..4c290f44 100644 --- a/internal/types/settings.go +++ b/internal/types/settings.go @@ -11,30 +11,34 @@ func MakeYdbConnectionString(params YdbConnectionParams) string { } type ExportSettings struct { - Endpoint string - Region string - Bucket string - AccessKey string - SecretKey string - Description string - NumberOfRetries uint32 - RootPath string - SourcePaths []string - DestinationPrefix string - S3ForcePathStyle bool + Endpoint string + Region string + Bucket string + AccessKey string + SecretKey string + Description string + NumberOfRetries uint32 + RootPath string + SourcePaths []string + DestinationPrefix string + S3ForcePathStyle bool + EncryptionAlgorithm string + EncryptionKey []byte } type ImportSettings struct { - Endpoint string - Region string - Bucket string - AccessKey string - SecretKey string - Description string - NumberOfRetries uint32 - BackupID string - BucketDbRoot string - SourcePaths map[string]bool - S3ForcePathStyle bool - DestinationPath string + Endpoint string + Region string + Bucket string + AccessKey string + SecretKey string + Description string + NumberOfRetries uint32 + BackupID string + BucketDbRoot string + SourcePaths map[string]bool + S3ForcePathStyle bool + DestinationPath string + EncryptionAlgorithm string + EncryptionKey []byte } diff --git a/internal/watchers/schedule_watcher/schedule_watcher_test.go b/internal/watchers/schedule_watcher/schedule_watcher_test.go index eeb66306..e8704b2b 100644 --- a/internal/watchers/schedule_watcher/schedule_watcher_test.go +++ b/internal/watchers/schedule_watcher/schedule_watcher_test.go @@ -9,6 +9,7 @@ import ( "sync" "testing" "time" + "ydbcp/internal/config" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/handlers" @@ -65,7 +66,7 @@ func TestScheduleWatcherSimple(t *testing.T) { metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := handlers.NewBackupScheduleHandler( - queries.NewWriteTableQueryMock, clock, + queries.NewWriteTableQueryMock, clock, config.FeatureFlagsConfig{}, ) scheduleWatcherActionCompleted := make(chan struct{}) @@ -178,7 +179,7 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := handlers.NewBackupScheduleHandler( - queries.NewWriteTableQueryMock, clock, + queries.NewWriteTableQueryMock, clock, config.FeatureFlagsConfig{}, ) scheduleWatcherActionCompleted := make(chan struct{}) @@ -299,7 +300,7 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := handlers.NewBackupScheduleHandler( - queries.NewWriteTableQueryMock, clock, + queries.NewWriteTableQueryMock, clock, config.FeatureFlagsConfig{}, ) scheduleWatcherActionCompleted := make(chan struct{}) @@ -416,7 +417,7 @@ func TestAllScheduleMetrics(t *testing.T) { metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := handlers.NewBackupScheduleHandler( - queries.NewWriteTableQueryMock, clock, + queries.NewWriteTableQueryMock, clock, config.FeatureFlagsConfig{}, ) scheduleWatcherActionCompleted := make(chan struct{}) @@ -527,7 +528,7 @@ func TestAllScheduleMetricsBeforeFirstBackup(t *testing.T) { metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := handlers.NewBackupScheduleHandler( - queries.NewWriteTableQueryMock, clock, + queries.NewWriteTableQueryMock, clock, config.FeatureFlagsConfig{}, ) scheduleWatcherActionCompleted := make(chan struct{}) diff --git a/local_config.yaml b/local_config.yaml index 37ac78de..584024bc 100644 --- a/local_config.yaml +++ b/local_config.yaml @@ -39,4 +39,5 @@ log: level: INFO feature_flags: - enable_new_paths_format: ${ENABLE_NEW_PATHS_FORMAT} \ No newline at end of file + enable_new_paths_format: ${ENABLE_NEW_PATHS_FORMAT} + enable_backups_encryption: ${ENABLE_BACKUPS_ENCRYPTION} \ No newline at end of file diff --git a/migrations/yql/20251128000000_add_encryption_columns.sql b/migrations/yql/20251128000000_add_encryption_columns.sql new file mode 100644 index 00000000..fb2a2bc2 --- /dev/null +++ b/migrations/yql/20251128000000_add_encryption_columns.sql @@ -0,0 +1,26 @@ +-- +goose Up +ALTER TABLE Backups + ADD COLUMN encryption_algorithm String, + ADD COLUMN kms_key_id String; + +ALTER TABLE Operations + ADD COLUMN encryption_algorithm String, + ADD COLUMN kms_key_id String; + +ALTER TABLE BackupSchedules + ADD COLUMN encryption_algorithm String, + ADD COLUMN kms_key_id String; + +-- +goose Down +ALTER TABLE BackupSchedules + DROP COLUMN kms_key_id, + DROP COLUMN encryption_algorithm; + +ALTER TABLE Operations + DROP COLUMN kms_key_id, + DROP COLUMN encryption_algorithm; + +ALTER TABLE Backups + DROP COLUMN kms_key_id, + DROP COLUMN encryption_algorithm; + diff --git a/pkg/proto/ydbcp/v1alpha1/operation.pb.go b/pkg/proto/ydbcp/v1alpha1/operation.pb.go index e15a9796..70debf28 100644 --- a/pkg/proto/ydbcp/v1alpha1/operation.pb.go +++ b/pkg/proto/ydbcp/v1alpha1/operation.pb.go @@ -128,6 +128,9 @@ type Operation struct { // Root path for all objects included in the backup, // it must be relative to the db root. RootPath string `protobuf:"bytes,18,opt,name=root_path,json=rootPath,proto3" json:"root_path,omitempty"` + // Backup encryption settings, + // unspecified field means the backup is unencrypted (only for MakeBackup operation). + EncryptionSettings *EncryptionSettings `protobuf:"bytes,19,opt,name=encryption_settings,json=encryptionSettings,proto3" json:"encryption_settings,omitempty"` } func (x *Operation) Reset() { @@ -288,6 +291,13 @@ func (x *Operation) GetRootPath() string { return "" } +func (x *Operation) GetEncryptionSettings() *EncryptionSettings { + if x != nil { + return x.EncryptionSettings + } + return nil +} + var File_ydbcp_v1alpha1_operation_proto protoreflect.FileDescriptor var file_ydbcp_v1alpha1_operation_proto_rawDesc = []byte{ @@ -299,7 +309,7 @@ var file_ydbcp_v1alpha1_operation_proto_rawDesc = []byte{ 0x64, 0x62, 0x63, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2f, 0x72, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xf5, 0x06, 0x0a, 0x09, 0x4f, + 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xca, 0x07, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, @@ -346,20 +356,25 @@ var file_ydbcp_v1alpha1_operation_proto_rawDesc = []byte{ 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x11, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x79, 0x70, 0x65, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x6f, 0x6f, 0x74, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x12, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x72, 0x6f, 0x6f, 0x74, 0x50, 0x61, 0x74, 0x68, 0x22, 0x83, 0x01, 0x0a, - 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x54, 0x41, 0x54, 0x55, - 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, - 0x0b, 0x0a, 0x07, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, - 0x44, 0x4f, 0x4e, 0x45, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, - 0x03, 0x12, 0x0e, 0x0a, 0x0a, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x49, 0x4e, 0x47, 0x10, - 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x45, 0x44, 0x10, 0x05, 0x12, - 0x14, 0x0a, 0x10, 0x53, 0x54, 0x41, 0x52, 0x54, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x4c, - 0x49, 0x4e, 0x47, 0x10, 0x06, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, - 0x10, 0x07, 0x42, 0x3e, 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x79, 0x64, 0x62, 0x2d, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x79, 0x64, - 0x62, 0x63, 0x70, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x79, 0x64, - 0x62, 0x63, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x79, 0x64, 0x62, - 0x63, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x28, 0x09, 0x52, 0x08, 0x72, 0x6f, 0x6f, 0x74, 0x50, 0x61, 0x74, 0x68, 0x12, 0x53, 0x0a, 0x13, + 0x65, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x65, 0x74, 0x74, 0x69, + 0x6e, 0x67, 0x73, 0x18, 0x13, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x79, 0x64, 0x62, 0x63, + 0x70, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, + 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, 0x73, 0x52, 0x12, 0x65, + 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x65, 0x74, 0x74, 0x69, 0x6e, 0x67, + 0x73, 0x22, 0x83, 0x01, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, + 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, + 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, + 0x01, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x4f, 0x4e, 0x45, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x45, + 0x52, 0x52, 0x4f, 0x52, 0x10, 0x03, 0x12, 0x0e, 0x0a, 0x0a, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, + 0x4c, 0x49, 0x4e, 0x47, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, + 0x45, 0x44, 0x10, 0x05, 0x12, 0x14, 0x0a, 0x10, 0x53, 0x54, 0x41, 0x52, 0x54, 0x5f, 0x43, 0x41, + 0x4e, 0x43, 0x45, 0x4c, 0x4c, 0x49, 0x4e, 0x47, 0x10, 0x06, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x55, + 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x07, 0x42, 0x3e, 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x2d, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, + 0x72, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x79, 0x64, 0x62, 0x63, 0x70, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, + 0x31, 0x3b, 0x79, 0x64, 0x62, 0x63, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -382,17 +397,19 @@ var file_ydbcp_v1alpha1_operation_proto_goTypes = []any{ (*AuditInfo)(nil), // 2: ydbcp.v1alpha1.AuditInfo (*timestamppb.Timestamp)(nil), // 3: google.protobuf.Timestamp (*RetryConfig)(nil), // 4: ydbcp.v1alpha1.RetryConfig + (*EncryptionSettings)(nil), // 5: ydbcp.v1alpha1.EncryptionSettings } var file_ydbcp_v1alpha1_operation_proto_depIdxs = []int32{ 2, // 0: ydbcp.v1alpha1.Operation.audit:type_name -> ydbcp.v1alpha1.AuditInfo 0, // 1: ydbcp.v1alpha1.Operation.status:type_name -> ydbcp.v1alpha1.Operation.Status 3, // 2: ydbcp.v1alpha1.Operation.updated_at:type_name -> google.protobuf.Timestamp 4, // 3: ydbcp.v1alpha1.Operation.retry_config:type_name -> ydbcp.v1alpha1.RetryConfig - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 5, // 4: ydbcp.v1alpha1.Operation.encryption_settings:type_name -> ydbcp.v1alpha1.EncryptionSettings + 5, // [5:5] is the sub-list for method output_type + 5, // [5:5] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_ydbcp_v1alpha1_operation_proto_init() } diff --git a/pkg/proto/ydbcp/v1alpha1/operation.proto b/pkg/proto/ydbcp/v1alpha1/operation.proto index 368b2d53..e9613ab9 100644 --- a/pkg/proto/ydbcp/v1alpha1/operation.proto +++ b/pkg/proto/ydbcp/v1alpha1/operation.proto @@ -57,4 +57,7 @@ message Operation { // Root path for all objects included in the backup, // it must be relative to the db root. string root_path = 18; + // Backup encryption settings, + // unspecified field means the backup is unencrypted (only for MakeBackup operation). + EncryptionSettings encryption_settings = 19; } \ No newline at end of file