diff --git a/buffer.go b/buffer.go index 627c0fd..e34c4ba 100644 --- a/buffer.go +++ b/buffer.go @@ -19,6 +19,13 @@ type Buffer struct { lastRead readOp // last read operation, so that Unread* can work correctly. } +func NewBuffer(length int) *Buffer { + return &Buffer{ + buf: make([]byte, 0, length), + lastRead: opInvalid, + } +} + // The readOp constants describe the last action performed on // the buffer, so that UnreadRune and UnreadByte can check for // invalid usage. opReadRuneX constants are chosen such that diff --git a/chunk.go b/chunk.go index f296a7e..6abed5c 100644 --- a/chunk.go +++ b/chunk.go @@ -4,8 +4,10 @@ import ( "context" "encoding/csv" "fmt" + "io" "os" "path" + "path/filepath" "strconv" "time" @@ -37,9 +39,20 @@ func (cc *commPCallback) OnSuccess(buf *Buffer, graphName, payloadCid, fsDetail log.Infof("piece cid: %s, payload size: %d, size: %d ", cpRes.Root.String(), cpRes.PayloadSize, cpRes.Size) buf.SeekStart() - if err := os.WriteFile(path.Join(cc.carDir, cpRes.Root.String()+".car"), buf.Bytes(), 0o644); err != nil { + carFileName := filepath.Join(cc.carDir, cpRes.Root.String()) + if !cc.rename { + carFileName += ".car" + } + carFile, err := os.OpenFile(carFileName, os.O_RDWR|os.O_CREATE, 0o644) + if err != nil { + log.Fatalf("failed to create car file: %s", err) + } + defer carFile.Close() + + if _, err = io.Copy(carFile, buf); err != nil { log.Fatalf("failed to write car file: %s", err) } + buf.Reset() // Add node inof to manifest.csv manifestPath := path.Join(cc.carDir, "manifest.csv") @@ -167,7 +180,7 @@ func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir, cumuSize += fileSize graphFiles = append(graphFiles, item) // todo build ipld from graphFiles - BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb) + BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb, sliceSize) log.Infof("cumu-size: %d", cumuSize) log.Infof("%s", GenGraphName(graphName, graphSliceCount, sliceTotal)) log.Infof("=================") @@ -193,7 +206,7 @@ func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir, }) fileSliceCount++ // todo build ipld from graphFiles - BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb) + BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb, sliceSize) log.Infof("cumu-size: %d", cumuSize+firstCut) log.Infof("%s", GenGraphName(graphName, graphSliceCount, sliceTotal)) log.Infof("=================") @@ -219,7 +232,7 @@ func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir, fileSliceCount++ if seekEnd-seekStart == sliceSize-1 { // todo build ipld from graphFiles - BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb) + BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb, sliceSize) log.Infof("cumu-size: %d", sliceSize) log.Infof("%s", GenGraphName(graphName, graphSliceCount, sliceTotal)) log.Infof("=================") @@ -232,7 +245,7 @@ func Chunk(ctx context.Context, sliceSize int64, parentPath, targetPath, carDir, } if cumuSize > 0 { // todo build ipld from graphFiles - BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb) + BuildIpldGraph(ctx, graphFiles, GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb, sliceSize) log.Infof("cumu-size: %d", cumuSize) log.Infof("%s", GenGraphName(graphName, graphSliceCount, sliceTotal)) log.Infof("=================") diff --git a/utils.go b/utils.go index 8330c8e..c842743 100644 --- a/utils.go +++ b/utils.go @@ -131,8 +131,15 @@ func (b *FSBuilder) getNodeByLink(ln *ipld.Link) (fn fsNode, err error) { return } -func BuildIpldGraph(ctx context.Context, fileList []Finfo, graphName, parentPath, carDir string, parallel int, cb GraphBuildCallback) { - buf, payloadCid, fsDetail, err := buildIpldGraph(ctx, fileList, parentPath, parallel) +func BuildIpldGraph(ctx context.Context, + fileList []Finfo, + graphName, parentPath, + carDir string, + parallel int, + cb GraphBuildCallback, + sliceSize int64, +) { + buf, payloadCid, fsDetail, err := buildIpldGraph(ctx, fileList, parentPath, parallel, sliceSize) if err != nil { // log.Fatal(err) cb.OnError(err) @@ -141,7 +148,7 @@ func BuildIpldGraph(ctx context.Context, fileList []Finfo, graphName, parentPath cb.OnSuccess(buf, graphName, payloadCid, fsDetail) } -func buildIpldGraph(ctx context.Context, fileList []Finfo, parentPath string, parallel int) (*Buffer, string, string, error) { +func buildIpldGraph(ctx context.Context, fileList []Finfo, parentPath string, parallel int, sliceSize int64) (*Buffer, string, string, error) { bs2 := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore())) dagServ := dag.NewDAGService(blockservice.New(bs2, offline.Exchange(bs2))) @@ -282,7 +289,7 @@ func buildIpldGraph(ctx context.Context, fileList []Finfo, parentPath string, pa log.Infof("start to generate car for %s", rootNode.Cid()) genCarStartTime := time.Now() // car - buf := new(Buffer) + buf := NewBuffer(int(sliceSize)) selector := allSelector() sc := car.NewSelectiveCar(ctx, bs2, []car.Dag{{Root: rootNode.Cid(), Selector: selector}}) err = sc.Write(buf)