diff --git a/pkg/storage/chunk/client/baidubce/bos_storage_client.go b/pkg/storage/chunk/client/baidubce/bos_storage_client.go index d44bc004e52b6..0e96e0d5949aa 100644 --- a/pkg/storage/chunk/client/baidubce/bos_storage_client.go +++ b/pkg/storage/chunk/client/baidubce/bos_storage_client.go @@ -3,6 +3,7 @@ package baidubce import ( "context" "flag" + "fmt" "io" "time" @@ -41,6 +42,7 @@ type BOSStorageConfig struct { Endpoint string `yaml:"endpoint"` AccessKeyID string `yaml:"access_key_id"` SecretAccessKey flagext.Secret `yaml:"secret_access_key"` + PathPrefix string `yaml:"path_prefix"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -54,6 +56,7 @@ func (cfg *BOSStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.StringVar(&cfg.Endpoint, prefix+"baidubce.endpoint", DefaultEndpoint, "BOS endpoint to connect to.") f.StringVar(&cfg.AccessKeyID, prefix+"baidubce.access-key-id", "", "Baidu Cloud Engine (BCE) Access Key ID.") f.Var(&cfg.SecretAccessKey, prefix+"baidubce.secret-access-key", "Baidu Cloud Engine (BCE) Secret Access Key.") + f.StringVar(&cfg.PathPrefix, prefix+"baidubce.path-prefix", "", "BOS write prefix") } type BOSObjectStorage struct { @@ -84,6 +87,7 @@ func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, obje if err != nil { return err } + objectKey := fmt.Sprintf("%s/%s", b.cfg.PathPrefix, objectKey) _, err = b.client.BasicPutObject(b.cfg.BucketName, objectKey, body) return err }) @@ -93,6 +97,7 @@ func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io. var res *api.GetObjectResult err := instrument.CollectedRequest(ctx, "BOS.GetObject", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { var requestErr error + objectKey := fmt.Sprintf("%s/%s", b.cfg.PathPrefix, objectKey) res, requestErr = b.client.BasicGetObject(b.cfg.BucketName, objectKey) return requestErr }) @@ -109,6 +114,7 @@ func (b *BOSObjectStorage) List(ctx context.Context, prefix string, delimiter st err := instrument.CollectedRequest(ctx, "BOS.List", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { args := new(api.ListObjectsArgs) + prefix := fmt.Sprintf("%s/%s", b.cfg.PathPrefix, prefix) args.Prefix = prefix args.Delimiter = delimiter for { @@ -146,6 +152,7 @@ func (b *BOSObjectStorage) List(ctx context.Context, prefix string, delimiter st func (b *BOSObjectStorage) DeleteObject(ctx context.Context, objectKey string) error { return instrument.CollectedRequest(ctx, "BOS.DeleteObject", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + objectKey := fmt.Sprintf("%s/%s", b.cfg.PathPrefix, objectKey) err := b.client.DeleteObject(b.cfg.BucketName, objectKey) return err }) diff --git a/pkg/storage/stores/shipper/storage/cached_client.go b/pkg/storage/stores/shipper/storage/cached_client.go index 2d3c839724e7d..8fcceabe38dd5 100644 --- a/pkg/storage/stores/shipper/storage/cached_client.go +++ b/pkg/storage/stores/shipper/storage/cached_client.go @@ -159,9 +159,9 @@ func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool) } ss := strings.Split(object.Key, delimiter) - if len(ss) < 2 || len(ss) > 3 { - return fmt.Errorf("invalid key: %s", object.Key) - } + // if len(ss) < 2 || len(ss) > 3 { + // return fmt.Errorf("invalid key: %s", object.Key) + // } tableName := ss[0] tbl, ok := c.tables[tableName]