diff --git a/code/go/0chain.net/blobbercore/config/config.go b/code/go/0chain.net/blobbercore/config/config.go index 9420b2276..b2fdf3eaf 100644 --- a/code/go/0chain.net/blobbercore/config/config.go +++ b/code/go/0chain.net/blobbercore/config/config.go @@ -171,6 +171,9 @@ type Config struct { PebbleCache int64 PebbleMemtableSize int64 PebbleMaxOpenFiles int + + // EnableDirectIO enables O_DIRECT flag for file operations (bypasses OS cache) + EnableDirectIO bool } /*Configuration of the system */ @@ -321,6 +324,8 @@ func ReadConfig(deploymentMode int) { Configuration.PebbleCache = viper.GetInt64("kv.pebble_cache") Configuration.PebbleMemtableSize = viper.GetInt64("kv.pebble_memtable_size") Configuration.PebbleMaxOpenFiles = viper.GetInt("kv.pebble_max_open_files") + + Configuration.EnableDirectIO = viper.GetBool("storage.enable_direct_io") } // StorageSCConfiguration will include all the required sc configs to operate blobber diff --git a/code/go/0chain.net/blobbercore/filestore/storage.go b/code/go/0chain.net/blobbercore/filestore/storage.go index e4749b0fd..258eba627 100644 --- a/code/go/0chain.net/blobbercore/filestore/storage.go +++ b/code/go/0chain.net/blobbercore/filestore/storage.go @@ -41,7 +41,9 @@ import ( "sync" "syscall" "time" + "unsafe" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/blobber/code/go/0chain.net/core/encryption" "github.com/0chain/blobber/code/go/0chain.net/core/logging" @@ -61,10 +63,44 @@ const ( ThumbnailSuffix = "_thumbnail" ) +// getSectorSize returns the sector size for the given file path. +// It tries BLKSSZGET ioctl for block devices, and falls back to statfs for regular files. +func getSectorSize(path string) (int, error) { + f, err := os.Open(path) + if err != nil { + return 0, err + } + defer f.Close() + + const BLKSSZGET = 0x1268 + var sectorSize int32 + _, _, errno := syscall.Syscall( + syscall.SYS_IOCTL, + f.Fd(), + BLKSSZGET, + uintptr(unsafe.Pointer(§orSize)), + ) + if errno == 0 && sectorSize > 0 { + return int(sectorSize), nil + } + + // Fallback: use statfs for regular files + var stat syscall.Statfs_t + err = syscall.Statfs(path, &stat) + if err != nil { + return 0, err + } + if stat.Bsize > 0 { + return int(stat.Bsize), nil + } + return 512, nil // fallback default +} + func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, infile multipart.File) (*FileOutputData, error) { tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, fileData.FilePathHash, conID) var ( initialSize int64 + writtenSize int64 ) finfo, err := os.Stat(tempFilePath) if err != nil && !errors.Is(err, os.ErrNotExist) { @@ -77,7 +113,28 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i if err = createDirs(filepath.Dir(tempFilePath)); err != nil { return nil, common.NewError("dir_creation_error", err.Error()) } - f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_RDWR, 0644) + + useDirectIO := false + var f *os.File + var sectorSize int + if config.Configuration.EnableDirectIO { + fd, derr := unix.Open(tempFilePath, unix.O_WRONLY|unix.O_CREAT|unix.O_DIRECT, 0644) + if derr == nil { + useDirectIO = true + f, err = os.NewFile(uintptr(fd), tempFilePath), nil + // Get sector size for alignment + sectorSize, err = getSectorSize(tempFilePath) + if err != nil || sectorSize <= 0 { + sectorSize = 512 // fallback + } + } else { + logging.Logger.Warn("O_DIRECT not supported, falling back to regular file operations", zap.String("file", tempFilePath), zap.Error(derr)) + } + } + if !useDirectIO { + f, err = os.OpenFile(tempFilePath, os.O_CREATE|os.O_RDWR, 0644) + sectorSize = 0 // not needed + } if err != nil { return nil, common.NewError("file_open_error", err.Error()) } @@ -87,10 +144,32 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i if err != nil { return nil, common.NewError("file_seek_error", err.Error()) } - buf := make([]byte, BufferSize) - writtenSize, err := io.CopyBuffer(f, infile, buf) - if err != nil { - return nil, common.NewError("file_write_error", err.Error()) + + if useDirectIO { + // Round BufferSize up to the next multiple of sectorSize for O_DIRECT alignment requirements + // This ensures the buffer is properly aligned for direct I/O + alignedBufSize := BufferSize + if sectorSize > 0 { + alignedBufSize = (BufferSize + sectorSize - 1) &^ (sectorSize - 1) + } + alignedBuf := make([]byte, alignedBufSize) + writtenSize, err = copyWithDirectIO(f, infile, alignedBuf, BufferSize, sectorSize) + if err != nil { + return nil, common.NewError("file_write_error", err.Error()) + } + // Truncate the file to the actual data size to remove padding + if writtenSize > 0 { + err = f.Truncate(fileData.UploadOffset + writtenSize) + if err != nil { + return nil, common.NewError("file_truncate_error", err.Error()) + } + } + } else { + buf := make([]byte, BufferSize) + writtenSize, err = io.CopyBuffer(f, infile, buf) + if err != nil { + return nil, common.NewError("file_write_error", err.Error()) + } } finfo, err = f.Stat() @@ -1071,3 +1150,39 @@ func sanitizeFileName(fileName string) string { fileName = filepath.Base(fileName) return fileName } + +// copyWithDirectIO performs I/O operations that are compatible with O_DIRECT +// O_DIRECT requires aligned buffers and aligned I/O operations +func copyWithDirectIO(dst io.Writer, src io.Reader, buf []byte, maxBufferSize int, alignment int) (int64, error) { + var totalWritten int64 + + for { + n, err := src.Read(buf[:maxBufferSize]) + if n > 0 { + alignedSize := (n + alignment - 1) &^ (alignment - 1) + if alignedSize > len(buf) { + alignedSize = len(buf) + } + if alignedSize > n { + for i := n; i < alignedSize; i++ { + buf[i] = 0 + } + } + written, writeErr := dst.Write(buf[:alignedSize]) + if written > n { + written = n // Don't count padding bytes + } + totalWritten += int64(written) + if writeErr != nil { + return totalWritten, writeErr + } + } + if err != nil { + if err == io.EOF { + break + } + return totalWritten, err + } + } + return totalWritten, nil +} diff --git a/code/go/0chain.net/blobbercore/filestore/storage_benchmark_test.go b/code/go/0chain.net/blobbercore/filestore/storage_benchmark_test.go new file mode 100644 index 000000000..34ebeca6a --- /dev/null +++ b/code/go/0chain.net/blobbercore/filestore/storage_benchmark_test.go @@ -0,0 +1,134 @@ +package filestore + +import ( + "flag" + "fmt" + "math/rand" + "mime/multipart" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" + "github.com/0chain/blobber/code/go/0chain.net/core/logging" + gozap "go.uber.org/zap" +) + +var enableDirectIO = flag.Bool("enable_directio", false, "Enable O_DIRECT/direct I/O for WriteFile benchmark") + +var minFileSize = flag.Float64("min_file_size", 1.0, "Minimum file size in MB (can be fractional, e.g., 0.01 for 10KB)") +var maxFileSize = flag.Float64("max_file_size", 10.0, "Maximum file size in MB (can be fractional)") +var nFiles = flag.Int("n_files", 5000, "Number of files to generate") + +// Helper to generate a random file of given size with a unique index +func generateRandomFileWithIndex(path string, size int64, idx int) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + buf := make([]byte, 1024*1024) // 1MB buffer + var written int64 + rng := rand.New(rand.NewSource(time.Now().UnixNano() + int64(idx))) + for written < size-8 { + n := int64(len(buf)) + if size-8-written < n { + n = size - 8 - written + } + _, _ = rng.Read(buf[:n]) + _, err := f.Write(buf[:n]) + if err != nil { + return err + } + written += n + } + // Write the index as the last 8 bytes + idxBytes := []byte(fmt.Sprintf("%08d", idx)) + _, err = f.Write(idxBytes) + return err +} + +// generateRandomFilesInRange generates n files in dir with random sizes between minMB and maxMB (in MB, float64), +// each file has a unique index in its name and content. Returns the list of file paths and their sizes. +func generateRandomFilesInRange(dir string, n int, minMB, maxMB float64) ([]string, []int64, error) { + if minMB > maxMB || minMB <= 0 || n <= 0 { + return nil, nil, fmt.Errorf("invalid input parameters") + } + files := make([]string, n) + sizes := make([]int64, n) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < n; i++ { + sizeMB := minMB + rng.Float64()*(maxMB-minMB) + size := int64(sizeMB * 1024 * 1024) + if size < 1 { + size = 1 // at least 1 byte + } + files[i] = filepath.Join(dir, fmt.Sprintf("src_%d.data", i)) + sizes[i] = size + if err := generateRandomFileWithIndex(files[i], size, i); err != nil { + return nil, nil, fmt.Errorf("failed to generate file %d: %w", i, err) + } + } + return files, sizes, nil +} + +// Minimal config and logger setup for the test +func setupTestConfigAndLogger() { + // Minimal config + config.Configuration = config.Config{ + EnableDirectIO: *enableDirectIO, + } + logging.Logger, _ = gozap.NewDevelopment() // Or zap.NewNop() for no output +} + +func BenchmarkWriteFile_O_DIRECT_Batch(b *testing.B) { + tmpDir := b.TempDir() + + // Generate files of random size given on the range of min and max file size + srcFiles, _, err := generateRandomFilesInRange(tmpDir, *nFiles, *minFileSize, *maxFileSize) + if err != nil { + b.Fatalf("failed to generate files: %v", err) + } + + // Prepare FileStore (mock as needed) + fs := &FileStore{ + mp: tmpDir, + mAllocs: make(map[string]*allocation), + rwMU: &sync.RWMutex{}, + } + + b.ResetTimer() + setupTestConfigAndLogger() + for bench := 0; bench < b.N; bench++ { + writtenFiles := make([]string, *nFiles) + for i := 0; i < *nFiles; i++ { + src, err := os.Open(srcFiles[i]) + if err != nil { + b.Fatalf("failed to open src file %d: %v", i, err) + } + fileName := fmt.Sprintf("testfile_%d.data", i) + fileData := &FileInputData{ + Name: fileName, + Path: "/" + fileName, + FilePathHash: fmt.Sprintf("dummyhash_%d", i), + // Size: fileSize, + } + var infile multipart.File = src + + _, err = fs.WriteFile("allocid", "conid", fileData, infile) + src.Close() + if err != nil { + b.Fatalf("WriteFile failed for file %d: %v", i, err) + } + writtenFiles[i] = fs.getTempPathForFile("allocid", fileData.Name, fileData.FilePathHash, "conid") + } + // Clean up written files after each batch + for _, f := range writtenFiles { + os.Remove(f) + } + } + b.StopTimer() +} diff --git a/code/go/0chain.net/blobbercore/filestore/store_test.go b/code/go/0chain.net/blobbercore/filestore/store_test.go index 235b8ca22..435c2cc59 100644 --- a/code/go/0chain.net/blobbercore/filestore/store_test.go +++ b/code/go/0chain.net/blobbercore/filestore/store_test.go @@ -1019,3 +1019,170 @@ mainloop: mr = fixedMT.GetMerkleRoot() return } + +func TestDirectIOFunctionality(t *testing.T) { + // Test O_DIRECT functionality with configuration + tests := []struct { + name string + enableDirectIO bool + expectDirectIO bool + }{ + { + name: "O_DIRECT disabled", + enableDirectIO: false, + expectDirectIO: false, + }, + { + name: "O_DIRECT enabled", + enableDirectIO: true, + expectDirectIO: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set configuration + config.Configuration.EnableDirectIO = tt.enableDirectIO + + fs, cleanUp := setupStorage(t) + defer cleanUp() + + // Create test data + allocID := randString(64) + connID := randString(32) + fileName := "test_file.txt" + filePathHash := randString(64) + + // Create a simple test file + testData := []byte("Hello, O_DIRECT test!") + fileData := &FileInputData{ + Name: fileName, + Path: "/test/path", + FilePathHash: filePathHash, + UploadOffset: 0, + Size: int64(len(testData)), + } + + // Create multipart file from bytes + multipartFile := &mockMultipartFile{ + reader: bytes.NewReader(testData), + size: int64(len(testData)), + } + + // Test WriteFile function + result, err := fs.WriteFile(allocID, connID, fileData, multipartFile) + + // Should succeed regardless of O_DIRECT setting + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, int64(len(testData)), result.Size) + require.Equal(t, fileName, result.Name) + require.Equal(t, "/test/path", result.Path) + + // Verify file was actually written + tempFilePath := fs.getTempPathForFile(allocID, fileName, filePathHash, connID) + fileInfo, err := os.Stat(tempFilePath) + require.NoError(t, err) + require.Equal(t, int64(len(testData)), fileInfo.Size()) + + // Read back the file to verify content + content, err := os.ReadFile(tempFilePath) + require.NoError(t, err) + require.Equal(t, testData, content) + }) + } +} + +// mockMultipartFile implements multipart.File interface for testing +type mockMultipartFile struct { + reader io.Reader + size int64 +} + +func (m *mockMultipartFile) Read(p []byte) (n int, err error) { + return m.reader.Read(p) +} + +func (m *mockMultipartFile) Seek(offset int64, whence int) (int64, error) { + if seeker, ok := m.reader.(io.Seeker); ok { + return seeker.Seek(offset, whence) + } + return 0, errors.New("seeker not implemented") +} + +func (m *mockMultipartFile) Close() error { + return nil +} + +func (m *mockMultipartFile) ReadAt(p []byte, off int64) (n int, err error) { + if readerAt, ok := m.reader.(io.ReaderAt); ok { + return readerAt.ReadAt(p, off) + } + return 0, errors.New("ReadAt not implemented") +} + +func TestCopyWithDirectIO(t *testing.T) { + // Test the copyWithDirectIO function directly + tests := []struct { + name string + inputData []byte + bufferSize int + expectError bool + }{ + { + name: "Small data", + inputData: []byte("Hello, World!"), + bufferSize: 1024, + expectError: false, + }, + { + name: "Large data", + inputData: make([]byte, 8192), // 8KB + bufferSize: 4096, + expectError: false, + }, + { + name: "Empty data", + inputData: []byte{}, + bufferSize: 1024, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Fill large data with random content + if len(tt.inputData) > 100 { + _, err := rand.Read(tt.inputData) + require.NoError(t, err) + } + + src := bytes.NewReader(tt.inputData) + var dst bytes.Buffer + + // Create aligned buffer for O_DIRECT + const alignment = 512 + alignedBufSize := (tt.bufferSize + alignment - 1) &^ (alignment - 1) + buf := make([]byte, alignedBufSize) + + written, err := copyWithDirectIO(&dst, src, buf, tt.bufferSize, 512) + + if tt.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, int64(len(tt.inputData)), written) + out := dst.Bytes() + if len(tt.inputData) == 0 { + require.True(t, out == nil || len(out) == 0) + } else { + require.GreaterOrEqual(t, len(out), len(tt.inputData)) + require.Equal(t, tt.inputData, out[:len(tt.inputData)]) + for _, b := range out[len(tt.inputData):] { + require.Equal(t, byte(0), b) + } + } + } + }) + } +} diff --git a/docs/O_DIRECT_IMPLEMENTATION.md b/docs/O_DIRECT_IMPLEMENTATION.md new file mode 100644 index 000000000..0ee64c22a --- /dev/null +++ b/docs/O_DIRECT_IMPLEMENTATION.md @@ -0,0 +1,116 @@ +# O_DIRECT Implementation in Blobber Storage + +## Overview + +The O_DIRECT flag implementation in the blobber storage system provides an option to bypass the operating system's page cache for file operations, enabling direct I/O to storage devices. This can improve performance for high-throughput storage systems by reducing memory usage and providing more predictable I/O performance. + +## Configuration + +The O_DIRECT functionality is controlled by the `enable_direct_io` configuration option in the blobber configuration file: + +```yaml +storage: + files_dir: "/path/to/hdd" + # Enable O_DIRECT flag for file operations (bypasses OS cache for better performance) + # This is useful for high-performance storage systems but may not be supported on all filesystems + enable_direct_io: false +``` + +## Implementation Details + +### 1. File Opening with O_DIRECT + +When `enable_direct_io` is set to `true`, the system attempts to open files with the O_DIRECT flag: + +```go +fd, err := unix.Open(tempFilePath, unix.O_WRONLY|unix.O_CREAT|unix.O_DIRECT, 0644) +``` + +### 2. Fallback Mechanism + +If O_DIRECT is not supported by the filesystem or operating system, the implementation automatically falls back to regular file operations: + +```go +if err != nil { + // Fallback to regular file operations if O_DIRECT is not supported + logging.Logger.Warn("O_DIRECT not supported, falling back to regular file operations", + zap.String("file", tempFilePath), zap.Error(err)) + f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_RDWR, 0644) + // ... regular file operations +} +``` + +### 3. Buffer Alignment + +O_DIRECT requires aligned buffers and aligned I/O operations. The implementation ensures proper alignment: + +```go +const alignment = 512 // Most systems require 512-byte alignment for O_DIRECT +alignedBufSize := (BufferSize + alignment - 1) &^ (alignment - 1) // Round up to alignment boundary +alignedBuf := make([]byte, alignedBufSize) +``` + +### 4. Direct I/O Copy Function + +A specialized `copyWithDirectIO` function handles the aligned I/O operations: + +```go +func copyWithDirectIO(dst io.Writer, src io.Reader, buf []byte, maxBufferSize int) (int64, error) +``` + +This function: +- Ensures buffer alignment to 512-byte boundaries +- Pads buffers with zeros when necessary for alignment +- Handles unaligned data properly +- Returns the actual number of bytes written (excluding padding) + +## Benefits + +1. **Performance**: Bypasses OS cache for better throughput on high-performance storage +2. **Memory Efficiency**: Reduces memory usage by avoiding double-buffering +3. **Predictable I/O**: Provides more consistent I/O performance +4. **Direct Storage Access**: Ensures data goes directly to storage without intermediate caching + +## Considerations + +1. **Filesystem Support**: Not all filesystems support O_DIRECT +2. **Buffer Alignment**: Requires proper buffer alignment (typically 512 bytes) +3. **Performance Impact**: May not always improve performance, especially for small files +4. **Compatibility**: Falls back gracefully when not supported + +## Usage + +To enable O_DIRECT: + +1. Set `enable_direct_io: true` in your blobber configuration +2. Ensure your filesystem supports O_DIRECT +3. Monitor performance to ensure it provides benefits for your use case + +To disable O_DIRECT: + +1. Set `enable_direct_io: false` in your blobber configuration (default) +2. The system will use regular file operations with OS caching + +## Testing + +The implementation includes comprehensive tests in `store_test.go`: + +- `TestDirectIOFunctionality`: Tests O_DIRECT with different configuration settings +- `TestCopyWithDirectIO`: Tests the direct I/O copy function with various data sizes + +## Monitoring + +The system logs warnings when O_DIRECT is not supported and falls back to regular operations: + +``` +WARN O_DIRECT not supported, falling back to regular file operations file=/path/to/file +``` + +## Troubleshooting + +If you experience issues with O_DIRECT: + +1. Check if your filesystem supports O_DIRECT +2. Verify buffer alignment requirements +3. Monitor system logs for fallback warnings +4. Consider disabling O_DIRECT if performance doesn't improve \ No newline at end of file