forked from dannycho7/split-file-stream
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
91 lines (74 loc) · 2.84 KB
/
index.js
File metadata and controls
91 lines (74 loc) · 2.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
const fs = require("fs");
const path = require("path");
const stream = require("stream");
const generateFilePath = (rootFileName, numFiles) => `${rootFileName}.split-${numFiles}`;
const _mergeFiles = (partitionIndex, partitionNames, combinationStream, callback) => {
if(partitionIndex == partitionNames.length) {
combinationStream.end();
return callback();
}
let partitionFileStream = fs.createReadStream(partitionNames[partitionIndex]);
partitionFileStream.on("data", (chunk) => combinationStream.write(chunk));
partitionFileStream.on("end", () => _mergeFiles(++partitionIndex, partitionNames, combinationStream, callback));
};
module.exports.mergeFilesToDisk = (partitionNames, outputPath, callback) => {
let combinationStream = fs.createWriteStream(outputPath);
_mergeFiles(0, partitionNames, combinationStream, callback);
};
module.exports.mergeFilesToStream = (partitionNames, callback) => {
let combinationStream = new stream.PassThrough();
_mergeFiles(0, partitionNames, combinationStream, () => {});
callback(combinationStream);
};
const _splitToStream = (outStreamCreate, fileStream, partitionStreamSize, callback) => {
const outStreams = [], { highWaterMark: defaultChunkSize } = fileStream._readableState;
let currentOutStream, currentFileSize = 0, fileStreamEnded = false, finishedWriteStreams = 0, openStream = false, partitionNum = 0;
const endCurrentWriteStream = () => {
currentOutStream.end();
currentOutStream = null;
currentFileSize = 0;
openStream = false;
};
const writeStreamFinishHandler = () => {
finishedWriteStreams++;
if(fileStreamEnded && partitionNum == finishedWriteStreams) {
callback(outStreams);
}
};
fileStream.on("readable", () => {
let chunk;
while (null !== (chunk = fileStream.read(Math.min(partitionStreamSize - currentFileSize, defaultChunkSize)))) {
if(openStream == false) {
currentOutStream = outStreamCreate(partitionNum);
currentOutStream.on("finish", writeStreamFinishHandler);
outStreams.push(currentOutStream);
partitionNum++;
openStream = true;
}
currentOutStream.write(chunk);
currentFileSize += chunk.length;
if(currentFileSize == partitionStreamSize) {
endCurrentWriteStream();
}
}
});
fileStream.on("end", () => {
if(currentOutStream) {
endCurrentWriteStream();
}
fileStreamEnded = true;
});
};
const split = (fileStream, maxFileSize, rootFilePath, callback) => {
const partitionNames = [];
const outStreamCreate = (partitionNum) => {
let filePath = generateFilePath(rootFilePath, partitionNum);
return fs.createWriteStream(filePath);
};
_splitToStream(outStreamCreate, fileStream, maxFileSize, (fileWriteStreams) => {
fileWriteStreams.forEach((fileWriteStream) => partitionNames.push(fileWriteStream["path"]));
callback(partitionNames);
});
};
module.exports.split = split;
module.exports._splitToStream = _splitToStream;