From 98fb24ce1fcc7c5f6a7904e9b7160176242d00f3 Mon Sep 17 00:00:00 2001 From: sunchuanbao Date: Wed, 18 Mar 2026 19:55:43 +0800 Subject: [PATCH] fix(agfs): fix s3 compatible for bytedance tos --- openviking/agfs_manager.py | 1 + openviking_cli/utils/config/agfs_config.py | 5 ++ tests/agfs/test_fs_binding_s3.py | 4 +- tests/agfs/test_fs_s3.py | 7 +- .../agfs-server/cmd/pybinding/main_test.go | 82 +++++++++++++++++++ .../agfs-server/pkg/plugins/s3fs/client.go | 36 +++++--- .../agfs/agfs-server/pkg/plugins/s3fs/s3fs.go | 26 ++++-- 7 files changed, 136 insertions(+), 25 deletions(-) create mode 100644 third_party/agfs/agfs-server/cmd/pybinding/main_test.go diff --git a/openviking/agfs_manager.py b/openviking/agfs_manager.py index 14ed124a..8ec97075 100644 --- a/openviking/agfs_manager.py +++ b/openviking/agfs_manager.py @@ -164,6 +164,7 @@ def _generate_config(self) -> Path: "prefix": self.s3_config.prefix, "disable_ssl": not self.s3_config.use_ssl, "use_path_style": self.s3_config.use_path_style, + "disable_content_sha256": self.s3_config.disable_content_sha256, }, } elif self.backend == "memory": diff --git a/openviking_cli/utils/config/agfs_config.py b/openviking_cli/utils/config/agfs_config.py index 8e3061fe..870fd793 100644 --- a/openviking_cli/utils/config/agfs_config.py +++ b/openviking_cli/utils/config/agfs_config.py @@ -46,6 +46,11 @@ class S3Config(BaseModel): description="true represent UsePathStyle for MinIO and some S3-compatible services; false represent VirtualHostStyle for TOS and some S3-compatible services.", ) + disable_content_sha256: bool = Field( + default=False, + description="Disable x-amz-content-sha256 header. Required for some S3-compatible services like TOS.", + ) + model_config = {"extra": "forbid"} def validate_config(self): diff --git a/tests/agfs/test_fs_binding_s3.py b/tests/agfs/test_fs_binding_s3.py index 692b869d..bbb7c465 100644 --- a/tests/agfs/test_fs_binding_s3.py +++ b/tests/agfs/test_fs_binding_s3.py @@ -37,8 +37,8 @@ def load_agfs_config() -> AGFSConfig: return None return AGFSConfig(**agfs_data) - except Exception: - return None + except Exception as e: + raise Exception("config load error:") from e AGFS_CONF = load_agfs_config() diff --git a/tests/agfs/test_fs_s3.py b/tests/agfs/test_fs_s3.py index 330c7089..7e0d5ee5 100644 --- a/tests/agfs/test_fs_s3.py +++ b/tests/agfs/test_fs_s3.py @@ -41,18 +41,18 @@ def load_agfs_config() -> AGFSConfig: return None return AGFSConfig(**agfs_data) - except Exception: - return None + except Exception as e: + raise Exception("config load error:") from e AGFS_CONF = load_agfs_config() -AGFS_CONF.mode = "http-client" # 2. Skip tests if no S3 config found or backend is not S3 pytestmark = pytest.mark.skipif( AGFS_CONF is None or AGFS_CONF.backend != "s3", reason="AGFS S3 configuration not found in ov.conf", ) +AGFS_CONF.mode = "http-client" @pytest.fixture(scope="module") @@ -67,6 +67,7 @@ def s3_client(): region_name=s3_conf.region, endpoint_url=s3_conf.endpoint, use_ssl=s3_conf.use_ssl, + # use_path_style=s3_conf.use_path_style, ) diff --git a/third_party/agfs/agfs-server/cmd/pybinding/main_test.go b/third_party/agfs/agfs-server/cmd/pybinding/main_test.go new file mode 100644 index 00000000..cbdf0dac --- /dev/null +++ b/third_party/agfs/agfs-server/cmd/pybinding/main_test.go @@ -0,0 +1,82 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "testing" + "time" + + "github.com/c4pt0r/agfs/agfs-server/pkg/filesystem" +) + +// CGO_ENABLED=1 go test -v main.go main_test.go + +// TestAGFS_Read_TOS_Logic tests the core logic of AGFS_Read (using globalFS.Read) +// with the configuration from ov-byted-tos.conf. +func TestAGFS_Read_TOS_Logic(t *testing.T) { + // 1. Read configuration from OPENVIKING_CONFIG_FILE environment variable + confPath := os.Getenv("OPENVIKING_CONFIG_FILE") + confData, err := os.ReadFile(confPath) + if err != nil { + t.Fatalf("failed to read config file: %v", err) + } + + var fullConfig map[string]interface{} + if err := json.Unmarshal(confData, &fullConfig); err != nil { + t.Fatalf("failed to unmarshal config: %v", err) + } + + storage := fullConfig["storage"].(map[string]interface{}) + agfs := storage["agfs"].(map[string]interface{}) + s3Config := agfs["s3"].(map[string]interface{}) + + // 2. Map configuration parameters for s3fs plugin + mappedConfig := make(map[string]interface{}) + for k, v := range s3Config { + switch k { + case "access_key": + mappedConfig["access_key_id"] = v + case "secret_key": + mappedConfig["secret_access_key"] = v + case "use_ssl": + mappedConfig["disable_ssl"] = !(v.(bool)) + default: + mappedConfig[k] = v + } + } + + // 3. Mount S3 plugin using the globalFS initialized in main.go + mountPath := "/s3" + err = globalFS.MountPlugin("s3fs", mountPath, mappedConfig) + if err != nil { + t.Fatalf("Failed to mount s3fs: %v", err) + } + fmt.Printf("Mounted s3fs at %s\n", mountPath) + + // 4. Prepare test data with unique path + testPath := fmt.Sprintf("/s3/test_read_logic_%d.txt", time.Now().Unix()) + testContent := "test data for pybinding s3 read logic" + contentBytes := []byte(testContent) + + // Write data using globalFS with create and truncate flags + n, err := globalFS.Write(testPath, contentBytes, -1, filesystem.WriteFlagCreate|filesystem.WriteFlagTruncate) + if err != nil { + t.Fatalf("Failed to write to s3: %v", err) + } + fmt.Printf("Written %d bytes\n", n) + + // 5. Test the core logic of AGFS_Read (fs.Read) + // This corresponds to main.go:216 + data, err := globalFS.Read(testPath, 0, int64(len(contentBytes))) + if err != nil && err.Error() != "EOF" { + t.Fatalf("fs.Read failed: %v", err) + } + + // Verify the data returned by fs.Read + if string(data) != testContent { + t.Errorf("Content mismatch: expected %q, got %q", testContent, string(data)) + } else { + fmt.Printf("Read logic verification successful: %s\n", string(data)) + } +} diff --git a/third_party/agfs/agfs-server/pkg/plugins/s3fs/client.go b/third_party/agfs/agfs-server/pkg/plugins/s3fs/client.go index c7ba54bb..5bb9f39b 100644 --- a/third_party/agfs/agfs-server/pkg/plugins/s3fs/client.go +++ b/third_party/agfs/agfs-server/pkg/plugins/s3fs/client.go @@ -10,10 +10,12 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go/middleware" log "github.com/sirupsen/logrus" ) @@ -27,14 +29,15 @@ type S3Client struct { // S3Config holds S3 client configuration type S3Config struct { - Region string - Bucket string - AccessKeyID string - SecretAccessKey string - Endpoint string // Optional custom endpoint (for S3-compatible services) - Prefix string // Optional prefix for all keys - DisableSSL bool // For testing with local S3 - UsePathStyle bool // Whether to use path-style addressing (true) or virtual-host-style (false) + Region string + Bucket string + AccessKeyID string + SecretAccessKey string + Endpoint string // Optional custom endpoint (for S3-compatible services) + Prefix string // Optional prefix for all keys + DisableSSL bool // For testing with local S3 + UsePathStyle bool // Whether to use path-style addressing (true) or virtual-host-style (false) + DisableContentSHA256 bool // Disable x-amz-content-sha256 header (some S3-compatible providers require this) } // NewS3Client creates a new S3 client @@ -64,13 +67,24 @@ func NewS3Client(cfg S3Config) (*S3Client, error) { // Create S3 client options clientOpts := []func(*s3.Options){} + // Add DisableContentSHA256 if needed + if cfg.DisableContentSHA256 { + clientOpts = append(clientOpts, func(o *s3.Options) { + o.APIOptions = append(o.APIOptions, func(s *middleware.Stack) error { + return v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware(s) + }) + o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired + o.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired + }) + } + // Set custom endpoint if provided (for MinIO, LocalStack, TOS, etc.) if cfg.Endpoint != "" { clientOpts = append(clientOpts, func(o *s3.Options) { o.BaseEndpoint = aws.String(cfg.Endpoint) // true represent UsePathStyle for MinIO and some S3-compatible services // false represent VirtualHostStyle for TOS and some S3-compatible services - o.UsePathStyle = cfg.UsePathStyle + o.UsePathStyle = cfg.UsePathStyle }) } @@ -398,7 +412,7 @@ func (c *S3Client) DirectoryExists(ctx context.Context, path string) (bool, erro if !strings.HasSuffix(dirKey, "/") { dirKey += "/" } - + // Try HeadObject to check if directory marker exists _, err := c.client.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(c.bucket), @@ -408,7 +422,7 @@ func (c *S3Client) DirectoryExists(ctx context.Context, path string) (bool, erro // Directory marker exists return true, nil } - + // If directory marker doesn't exist, check if there are any objects with this prefix prefix := dirKey result, err := c.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ diff --git a/third_party/agfs/agfs-server/pkg/plugins/s3fs/s3fs.go b/third_party/agfs/agfs-server/pkg/plugins/s3fs/s3fs.go index befe7599..c3816a84 100644 --- a/third_party/agfs/agfs-server/pkg/plugins/s3fs/s3fs.go +++ b/third_party/agfs/agfs-server/pkg/plugins/s3fs/s3fs.go @@ -564,7 +564,7 @@ func (p *S3FSPlugin) Validate(cfg map[string]interface{}) error { // Check for unknown parameters allowedKeys := []string{ "bucket", "region", "access_key_id", "secret_access_key", "endpoint", "prefix", "disable_ssl", "mount_path", - "cache_enabled", "cache_ttl", "stat_cache_ttl", "cache_max_size", "use_path_style", + "cache_enabled", "cache_ttl", "stat_cache_ttl", "cache_max_size", "use_path_style", "disable_content_sha256", } if err := config.ValidateOnlyKnownKeys(cfg, allowedKeys); err != nil { return err @@ -605,14 +605,15 @@ func (p *S3FSPlugin) Initialize(config map[string]interface{}) error { // Parse S3 configuration cfg := S3Config{ - Region: getStringConfig(config, "region", "us-east-1"), - Bucket: getStringConfig(config, "bucket", ""), - AccessKeyID: getStringConfig(config, "access_key_id", ""), - SecretAccessKey: getStringConfig(config, "secret_access_key", ""), - Endpoint: getStringConfig(config, "endpoint", ""), - Prefix: getStringConfig(config, "prefix", ""), - DisableSSL: getBoolConfig(config, "disable_ssl", false), - UsePathStyle: getBoolConfig(config, "use_path_style", true), + Region: getStringConfig(config, "region", "us-east-1"), + Bucket: getStringConfig(config, "bucket", ""), + AccessKeyID: getStringConfig(config, "access_key_id", ""), + SecretAccessKey: getStringConfig(config, "secret_access_key", ""), + Endpoint: getStringConfig(config, "endpoint", ""), + Prefix: getStringConfig(config, "prefix", ""), + DisableSSL: getBoolConfig(config, "disable_ssl", false), + UsePathStyle: getBoolConfig(config, "use_path_style", true), + DisableContentSHA256: getBoolConfig(config, "disable_content_sha256", false), } if cfg.Bucket == "" { @@ -704,6 +705,13 @@ func (p *S3FSPlugin) GetConfigParams() []plugin.ConfigParameter { Default: "true", Description: "Whether to use path-style addressing (true) or virtual-host-style (false). Defaults to false for TOS, true for other services.", }, + { + Name: "disable_content_sha256", + Type: "bool", + Required: false, + Default: "false", + Description: "Disable x-amz-content-sha256 header (required for some S3-compatible providers)", + }, { Name: "cache_enabled", Type: "bool",