-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix(agfs): fix s3 compatible for bytedance tos #742
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,8 +38,8 @@ def load_agfs_config() -> AGFSConfig: | |
| return None | ||
|
|
||
| return AGFSConfig(**agfs_data) | ||
| except Exception: | ||
| return None | ||
| except Exception as e: | ||
| raise Exception("config load error:") from e | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Design] (non-blocking) 这个错误处理变更(从 这个变更与 PR 的主要目的(修复 TOS 兼容性)无关。建议:
如果这是调试时的临时修改,考虑是否应该保留在最终版本中。 |
||
|
|
||
|
|
||
| AGFS_CONF = load_agfs_config() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,19 +42,18 @@ def load_agfs_config() -> AGFSConfig: | |
| return None | ||
|
|
||
| return AGFSConfig(**agfs_data) | ||
| except Exception: | ||
| return None | ||
| except Exception as e: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Design] (non-blocking) 同
|
||
| raise Exception("config load error:") from e | ||
|
|
||
|
|
||
| AGFS_CONF = load_agfs_config() | ||
| if AGFS_CONF is not None: | ||
| AGFS_CONF.mode = "http-client" | ||
|
|
||
| # 2. Skip tests if no S3 config found or backend is not S3 | ||
| pytestmark = pytest.mark.skipif( | ||
| AGFS_CONF is None or AGFS_CONF.backend != "s3", | ||
| reason="AGFS S3 configuration not found in ov.conf", | ||
| ) | ||
| AGFS_CONF.mode = "http-client" | ||
|
|
||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Bug] (blocking) 如果 原代码有条件检查: if AGFS_CONF is not None:
AGFS_CONF.mode = "http-client"
修复建议: pytestmark = pytest.mark.skipif(
AGFS_CONF is None or AGFS_CONF.backend != "s3",
reason="AGFS S3 configuration not found in ov.conf",
)
if AGFS_CONF is not None:
AGFS_CONF.mode = "http-client" |
||
| @pytest.fixture(scope="module") | ||
|
|
@@ -69,6 +68,7 @@ def s3_client(): | |
| region_name=s3_conf.region, | ||
| endpoint_url=s3_conf.endpoint, | ||
| use_ssl=s3_conf.use_ssl, | ||
| # use_path_style=s3_conf.use_path_style, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Suggestion] (non-blocking) 被注释的代码应该直接删除,而不是保留注释。如果将来需要,可以从 git 历史中恢复。 # 删除这行注释死代码会降低可读性,让后续维护者困惑。 |
||
| ) | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "fmt" | ||
| "os" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/c4pt0r/agfs/agfs-server/pkg/filesystem" | ||
| ) | ||
|
|
||
| // CGO_ENABLED=1 go test -v main.go main_test.go | ||
|
|
||
| // TestAGFS_Read_TOS_Logic tests the core logic of AGFS_Read (using globalFS.Read) | ||
| // with the configuration from ov-byted-tos.conf. | ||
| func TestAGFS_Read_TOS_Logic(t *testing.T) { | ||
| // 1. Read configuration from OPENVIKING_CONFIG_FILE environment variable | ||
| confPath := os.Getenv("OPENVIKING_CONFIG_FILE") | ||
| confData, err := os.ReadFile(confPath) | ||
| if err != nil { | ||
| t.Fatalf("failed to read config file: %v", err) | ||
| } | ||
|
|
||
| var fullConfig map[string]interface{} | ||
| if err := json.Unmarshal(confData, &fullConfig); err != nil { | ||
| t.Fatalf("failed to unmarshal config: %v", err) | ||
| } | ||
|
|
||
| storage := fullConfig["storage"].(map[string]interface{}) | ||
| agfs := storage["agfs"].(map[string]interface{}) | ||
| s3Config := agfs["s3"].(map[string]interface{}) | ||
|
|
||
| // 2. Map configuration parameters for s3fs plugin | ||
| mappedConfig := make(map[string]interface{}) | ||
| for k, v := range s3Config { | ||
| switch k { | ||
| case "access_key": | ||
| mappedConfig["access_key_id"] = v | ||
| case "secret_key": | ||
| mappedConfig["secret_access_key"] = v | ||
| case "use_ssl": | ||
| mappedConfig["disable_ssl"] = !(v.(bool)) | ||
| default: | ||
| mappedConfig[k] = v | ||
| } | ||
| } | ||
|
|
||
| // 3. Mount S3 plugin using the globalFS initialized in main.go | ||
| mountPath := "/s3" | ||
| err = globalFS.MountPlugin("s3fs", mountPath, mappedConfig) | ||
| if err != nil { | ||
| t.Fatalf("Failed to mount s3fs: %v", err) | ||
| } | ||
| fmt.Printf("Mounted s3fs at %s\n", mountPath) | ||
|
|
||
| // 4. Prepare test data with unique path | ||
| testPath := fmt.Sprintf("/s3/test_read_logic_%d.txt", time.Now().Unix()) | ||
| testContent := "test data for pybinding s3 read logic" | ||
| contentBytes := []byte(testContent) | ||
|
|
||
| // Write data using globalFS with create and truncate flags | ||
| n, err := globalFS.Write(testPath, contentBytes, -1, filesystem.WriteFlagCreate|filesystem.WriteFlagTruncate) | ||
| if err != nil { | ||
| t.Fatalf("Failed to write to s3: %v", err) | ||
| } | ||
| fmt.Printf("Written %d bytes\n", n) | ||
|
|
||
| // 5. Test the core logic of AGFS_Read (fs.Read) | ||
| // This corresponds to main.go:216 | ||
| data, err := globalFS.Read(testPath, 0, int64(len(contentBytes))) | ||
| if err != nil && err.Error() != "EOF" { | ||
| t.Fatalf("fs.Read failed: %v", err) | ||
| } | ||
|
|
||
| // Verify the data returned by fs.Read | ||
| if string(data) != testContent { | ||
| t.Errorf("Content mismatch: expected %q, got %q", testContent, string(data)) | ||
| } else { | ||
| fmt.Printf("Read logic verification successful: %s\n", string(data)) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,10 +10,12 @@ import ( | |
| "time" | ||
|
|
||
| "github.com/aws/aws-sdk-go-v2/aws" | ||
| v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" | ||
| "github.com/aws/aws-sdk-go-v2/config" | ||
| "github.com/aws/aws-sdk-go-v2/credentials" | ||
| "github.com/aws/aws-sdk-go-v2/service/s3" | ||
| "github.com/aws/aws-sdk-go-v2/service/s3/types" | ||
| "github.com/aws/smithy-go/middleware" | ||
| log "github.com/sirupsen/logrus" | ||
| ) | ||
|
|
||
|
|
@@ -27,14 +29,15 @@ type S3Client struct { | |
|
|
||
| // S3Config holds S3 client configuration | ||
| type S3Config struct { | ||
| Region string | ||
| Bucket string | ||
| AccessKeyID string | ||
| SecretAccessKey string | ||
| Endpoint string // Optional custom endpoint (for S3-compatible services) | ||
| Prefix string // Optional prefix for all keys | ||
| DisableSSL bool // For testing with local S3 | ||
| UsePathStyle bool // Whether to use path-style addressing (true) or virtual-host-style (false) | ||
| Region string | ||
| Bucket string | ||
| AccessKeyID string | ||
| SecretAccessKey string | ||
| Endpoint string // Optional custom endpoint (for S3-compatible services) | ||
| Prefix string // Optional prefix for all keys | ||
| DisableSSL bool // For testing with local S3 | ||
| UsePathStyle bool // Whether to use path-style addressing (true) or virtual-host-style (false) | ||
| DisableContentSHA256 bool // Disable x-amz-content-sha256 header (some S3-compatible providers require this) | ||
| } | ||
|
|
||
| // NewS3Client creates a new S3 client | ||
|
|
@@ -64,13 +67,24 @@ func NewS3Client(cfg S3Config) (*S3Client, error) { | |
| // Create S3 client options | ||
| clientOpts := []func(*s3.Options){} | ||
|
|
||
| // Add DisableContentSHA256 if needed | ||
| if cfg.DisableContentSHA256 { | ||
| clientOpts = append(clientOpts, func(o *s3.Options) { | ||
| o.APIOptions = append(o.APIOptions, func(s *middleware.Stack) error { | ||
| return v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware(s) | ||
| }) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Good] (informational) 使用 |
||
| o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired | ||
| o.ResponseChecksumValidation = aws.ResponseChecksumValidationWhenRequired | ||
| }) | ||
| } | ||
|
|
||
| // Set custom endpoint if provided (for MinIO, LocalStack, TOS, etc.) | ||
| if cfg.Endpoint != "" { | ||
| clientOpts = append(clientOpts, func(o *s3.Options) { | ||
| o.BaseEndpoint = aws.String(cfg.Endpoint) | ||
| // true represent UsePathStyle for MinIO and some S3-compatible services | ||
| // false represent VirtualHostStyle for TOS and some S3-compatible services | ||
| o.UsePathStyle = cfg.UsePathStyle | ||
| o.UsePathStyle = cfg.UsePathStyle | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -398,7 +412,7 @@ func (c *S3Client) DirectoryExists(ctx context.Context, path string) (bool, erro | |
| if !strings.HasSuffix(dirKey, "/") { | ||
| dirKey += "/" | ||
| } | ||
|
|
||
| // Try HeadObject to check if directory marker exists | ||
| _, err := c.client.HeadObject(ctx, &s3.HeadObjectInput{ | ||
| Bucket: aws.String(c.bucket), | ||
|
|
@@ -408,7 +422,7 @@ func (c *S3Client) DirectoryExists(ctx context.Context, path string) (bool, erro | |
| // Directory marker exists | ||
| return true, nil | ||
| } | ||
|
|
||
| // If directory marker doesn't exist, check if there are any objects with this prefix | ||
| prefix := dirKey | ||
| result, err := c.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Suggestion] (non-blocking)
配置项的文档可以更详细,帮助用户理解何时需要启用此选项: