Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
.idea
.vscode
cars2
cars3
config.toml
datas
datas2
Expand Down
63 changes: 33 additions & 30 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,34 +154,37 @@ func ErrCallback() GraphBuildCallback {
return &errCallback{}
}

func Chunk(ctx context.Context,
expectSliceSize int64,
parentPath,
targetPath,
carDir,
graphName string,
parallel int,
cb GraphBuildCallback,
ef *ExtraFile,
randomRenameSourceFile bool,
randomSelectFile bool,
) error {
type ChunkParams struct {
ExpectSliceSize int64
ParentPath string
TargetPath string
CarDir string
GraphName string
Parallel int
Cb GraphBuildCallback
Ef *ExtraFile
RandomRenameSourceFile bool
RandomSelectFile bool
SkipFilename bool
}

func Chunk(ctx context.Context, params *ChunkParams) error {
var cumuSize int64 = 0
graphSliceCount := 0
graphFiles := make([]Finfo, 0)
if expectSliceSize == 0 {
if params.ExpectSliceSize == 0 {
return fmt.Errorf("slice size has been set as 0")
}
if parallel <= 0 {
if params.Parallel <= 0 {
return fmt.Errorf("parallel has to be greater than 0")
}
if parentPath == "" {
parentPath = targetPath
if params.ParentPath == "" {
params.ParentPath = params.TargetPath
}

partSliceSize := expectSliceSize - ef.sliceSize
args := []string{targetPath}
sliceTotal := GetGraphCount(args, expectSliceSize)
partSliceSize := params.ExpectSliceSize - params.Ef.sliceSize
args := []string{params.TargetPath}
sliceTotal := GetGraphCount(args, params.ExpectSliceSize)
if sliceTotal == 0 {
log.Warn("Empty folder or file!")
return nil
Expand All @@ -197,7 +200,7 @@ func Chunk(ctx context.Context,

for _, item := range allFiles {
item := item
if randomRenameSourceFile {
if params.RandomRenameSourceFile {
item = tryRenameFileName([]Finfo{item})[0]
}
// log.Infof("name: %s", item.Name)
Expand All @@ -210,9 +213,9 @@ func Chunk(ctx context.Context,
cumuSize += fileSize
graphFiles = append(graphFiles, item)
// todo build ipld from graphFiles
BuildIpldGraph(ctx, append(ef.getFiles(), graphFiles...), GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb, expectSliceSize, ef)
BuildIpldGraph(ctx, append(params.Ef.getFiles(), graphFiles...), GenGraphName(params.GraphName, graphSliceCount, sliceTotal), params)
log.Infof("cumu-size: %d", cumuSize)
log.Infof("%s", GenGraphName(graphName, graphSliceCount, sliceTotal))
log.Infof("%s", GenGraphName(params.GraphName, graphSliceCount, sliceTotal))
log.Infof("=================")
cumuSize = 0
graphFiles = make([]Finfo, 0)
Expand All @@ -234,16 +237,16 @@ func Chunk(ctx context.Context,
SeekStart: seekStart,
SeekEnd: seekEnd,
}
if randomRenameSourceFile {
if params.RandomRenameSourceFile {
graphFiles = append(graphFiles, tryRenameFileName([]Finfo{fi})...)
} else {
graphFiles = append(graphFiles, fi)
}
fileSliceCount++
// todo build ipld from graphFiles
BuildIpldGraph(ctx, append(ef.getFiles(), graphFiles...), GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb, expectSliceSize, ef)
BuildIpldGraph(ctx, append(params.Ef.getFiles(), graphFiles...), GenGraphName(params.GraphName, graphSliceCount, sliceTotal), params)
log.Infof("cumu-size: %d", cumuSize+firstCut)
log.Infof("%s", GenGraphName(graphName, graphSliceCount, sliceTotal))
log.Infof("%s", GenGraphName(params.GraphName, graphSliceCount, sliceTotal))
log.Infof("=================")
cumuSize = 0
graphFiles = make([]Finfo, 0)
Expand All @@ -264,7 +267,7 @@ func Chunk(ctx context.Context,
SeekStart: seekStart,
SeekEnd: seekEnd,
}
if randomRenameSourceFile {
if params.RandomRenameSourceFile {
graphFiles = append(graphFiles, tryRenameFileName([]Finfo{fi})...)
} else {
graphFiles = append(graphFiles, fi)
Expand All @@ -273,9 +276,9 @@ func Chunk(ctx context.Context,
fileSliceCount++
if seekEnd-seekStart == partSliceSize-1 {
// todo build ipld from graphFiles
BuildIpldGraph(ctx, append(ef.getFiles(), graphFiles...), GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb, expectSliceSize, ef)
BuildIpldGraph(ctx, append(params.Ef.getFiles(), graphFiles...), GenGraphName(params.GraphName, graphSliceCount, sliceTotal), params)
log.Infof("cumu-size: %d", partSliceSize)
log.Infof("%s", GenGraphName(graphName, graphSliceCount, sliceTotal))
log.Infof("%s", GenGraphName(params.GraphName, graphSliceCount, sliceTotal))
log.Infof("=================")
cumuSize = 0
graphFiles = make([]Finfo, 0)
Expand All @@ -286,9 +289,9 @@ func Chunk(ctx context.Context,
}
if cumuSize > 0 {
// todo build ipld from graphFiles
BuildIpldGraph(ctx, append(ef.getFiles(), graphFiles...), GenGraphName(graphName, graphSliceCount, sliceTotal), parentPath, carDir, parallel, cb, expectSliceSize, ef)
BuildIpldGraph(ctx, append(params.Ef.getFiles(), graphFiles...), GenGraphName(params.GraphName, graphSliceCount, sliceTotal), params)
log.Infof("cumu-size: %d", cumuSize)
log.Infof("%s", GenGraphName(graphName, graphSliceCount, sliceTotal))
log.Infof("%s", GenGraphName(params.GraphName, graphSliceCount, sliceTotal))
log.Infof("=================")
}
return nil
Expand Down
26 changes: 23 additions & 3 deletions cmd/graphsplit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ var chunkCmd = &cli.Command{
Usage: "random select file to chunk",
Value: true,
},
&cli.BoolFlag{
Name: "skip-filename",
Usage: "manifest csv detail not contain filename",
},
},
ArgsUsage: "<input path>",
Action: func(c *cli.Context) error {
Expand All @@ -110,6 +114,7 @@ var chunkCmd = &cli.Command{
graphName := c.String("graph-name")
randomRenameSourceFile := c.Bool("random-rename-source-file")
randomSelectFile := c.Bool("random-select-file")
skipFilename := c.Bool("skip-filename")
if !graphsplit.ExistDir(carDir) {
return fmt.Errorf("the path of car-dir does not exist")
}
Expand Down Expand Up @@ -151,7 +156,8 @@ var chunkCmd = &cli.Command{
return fmt.Errorf("slice size %d + extra file slice size %d exceeds 32 GiB", sliceSize, extraFileSliceSize)
}
log.Infof("extra file slice size: %d, random rename source file: %v, random select file: %v", extraFileSliceSize, randomRenameSourceFile, randomSelectFile)
rf, err := graphsplit.NewRealFile(strings.TrimSuffix(cfg.ExtraFilePath, "/"), int64(extraFileSliceSize), int64(sliceSize), randomRenameSourceFile)
log.Infof("skip filename: %v", skipFilename)
ef, err := graphsplit.NewExtraFile(strings.TrimSuffix(cfg.ExtraFilePath, "/"), int64(extraFileSliceSize), int64(sliceSize), randomRenameSourceFile)
if err != nil {
return err
}
Expand All @@ -166,15 +172,29 @@ var chunkCmd = &cli.Command{
cb = graphsplit.ErrCallback()
}

params := graphsplit.ChunkParams{
ExpectSliceSize: int64(sliceSize),
ParentPath: parentPath,
TargetPath: targetPath,
CarDir: carDir,
GraphName: graphName,
Parallel: int(parallel),
Cb: cb,
Ef: ef,
RandomRenameSourceFile: randomRenameSourceFile,
RandomSelectFile: randomSelectFile,
SkipFilename: skipFilename,
}

loop := c.Bool("loop")
fmt.Println("loop: ", loop)
if !loop {
fmt.Println("chunking once...")
return graphsplit.Chunk(ctx, int64(sliceSize), parentPath, targetPath, carDir, graphName, int(parallel), cb, rf, randomRenameSourceFile, randomSelectFile)
return graphsplit.Chunk(ctx, &params)
}
fmt.Println("loop chunking...")
for {
err = graphsplit.Chunk(ctx, int64(sliceSize), parentPath, targetPath, carDir, graphName, int(parallel), cb, rf, randomRenameSourceFile, randomSelectFile)
err = graphsplit.Chunk(ctx, &params)
if err != nil {
return fmt.Errorf("failed to chunk: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion extra_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type ExtraFile struct {
pieceRawSize int64
}

func NewRealFile(path string, sliceSize int64, pieceRawSize int64, randomRenameSourceFile bool) (*ExtraFile, error) {
func NewExtraFile(path string, sliceSize int64, pieceRawSize int64, randomRenameSourceFile bool) (*ExtraFile, error) {
rf := &ExtraFile{path: path, sliceSize: sliceSize, pieceRawSize: pieceRawSize}
if path != "" {
finfo, err := os.Stat(path)
Expand Down
42 changes: 32 additions & 10 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/rand"
"os"
"path"
"path/filepath"
"regexp"
"runtime"
"strings"
Expand Down Expand Up @@ -135,21 +136,17 @@ func (b *FSBuilder) getNodeByLink(ln *ipld.Link) (fn fsNode, err error) {

func BuildIpldGraph(ctx context.Context,
fileList []Finfo,
graphName,
parentPath,
carDir string,
parallel int,
cb GraphBuildCallback,
sliceSize int64,
ef *ExtraFile,
graphName string,
params *ChunkParams,
) {
buf, payloadCid, fsDetail, err := buildIpldGraph(ctx, fileList, parentPath, parallel, sliceSize, ef)
buf, payloadCid, fsDetail, err := buildIpldGraph(ctx, fileList, params.ParentPath, params.Parallel,
params.ExpectSliceSize, params.Ef, params.SkipFilename)
if err != nil {
// log.Fatal(err)
cb.OnError(err)
params.Cb.OnError(err)
return
}
cb.OnSuccess(buf, graphName, payloadCid, fsDetail)
params.Cb.OnSuccess(buf, graphName, payloadCid, fsDetail)
}

func buildIpldGraph(ctx context.Context,
Expand All @@ -158,6 +155,7 @@ func buildIpldGraph(ctx context.Context,
parallel int,
sliceSize int64,
ef *ExtraFile,
skipFilename bool,
) (*Buffer, string, string, error) {
bs2 := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore()))
dagServ := dag.NewDAGService(blockservice.New(bs2, offline.Exchange(bs2)))
Expand Down Expand Up @@ -330,6 +328,30 @@ func buildIpldGraph(ctx context.Context,
}
log.Info("++++++++++++ finished to build ipld +++++++++++++")

if skipFilename {
type path struct {
Path string `json:"path"`
}

var list []path
seen := make(map[string]struct{})
for _, f := range sfis {
dir := filepath.Dir(f.Path)
if dir == "" || dir == "." || dir == "/" {
continue
}
if _, ok := seen[dir]; !ok {
seen[dir] = struct{}{}
list = append(list, path{Path: dir})
}
}

fileInfo, err = json.Marshal(list)
if err != nil {
return nil, "", "", err
}
}

return buf, rootNode.Cid().String(), string(fileInfo), nil
}

Expand Down