From 2601745009e610764fb1140a881f14b3195b4394 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Fri, 29 Sep 2023 11:14:43 +0900 Subject: [PATCH 01/13] ignore idea stuff --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 15e1172..c37af38 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ env *.pyc *.egg-info +.idea \ No newline at end of file From ae9b27f3e279418f716f3532a7472c9b1c417914 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Fri, 29 Sep 2023 11:23:04 +0900 Subject: [PATCH 02/13] add text_archive and decoupling data model and serialisation --- lm_dataformat/__init__.py | 153 +++++++++++++++++++++++++++----------- 1 file changed, 109 insertions(+), 44 deletions(-) diff --git a/lm_dataformat/__init__.py b/lm_dataformat/__init__.py index ee1955b..cb9ba69 100644 --- a/lm_dataformat/__init__.py +++ b/lm_dataformat/__init__.py @@ -1,24 +1,46 @@ -import os -import zstandard -import ujson as json -import time -import tarfile import codecs -from functools import reduce -import jsonlines -import io -from zipfile import ZipFile import gzip -from math import ceil +import io import mmap import multiprocessing as mp +import os +import re +import tarfile +import time +from functools import reduce +from math import ceil from pathlib import Path +import zipfile + +import jsonlines +import ujson as json +import zstandard + +VALID_EXTENSIONS = ['openwebtext.tar.xz', '_data.xz', '.dat.zst', '.jsonl', '.jsonl.zst', '.jsonl.zst.tar', '.json.zst', + '.txt', '.zip', '.tar.gz', '.json.gz', '.gz'] + +LM_DATAFORMAT_FORMAT = "lm_dataformat" +TEXT_FORMAT = "txt" +JSON_FORMAT = "json" + +SUPPORTED_FORMATS = [ + TEXT_FORMAT, + LM_DATAFORMAT_FORMAT, + JSON_FORMAT +] + + +def filter_newlines(text): + return re.sub("\n{3,}", "\n\n", text) + +def handle_unicode_errors(txt): + return txt.encode('utf-8', 'replace').decode() -VALID_EXTENSIONS = ['openwebtext.tar.xz', '_data.xz', '.dat.zst', '.jsonl', '.jsonl.zst', '.jsonl.zst.tar', '.json.zst', '.txt', '.zip', '.tar.gz', '.json.gz', '.gz'] def has_valid_extension(file): return any([file.endswith(ext) for ext in VALID_EXTENSIONS]) + def _listdir_or_file(x): if isinstance(x, list): return reduce(lambda x, y: x + y, map(listdir_or_file, sorted(x))) @@ -29,9 +51,11 @@ def _listdir_or_file(x): else: raise FileNotFoundError(f"{x} not found") + def listdir_or_file(x): return list(filter(has_valid_extension, _listdir_or_file(x))) + def tarfile_reader(file, streaming=False): # we need our own tarfile parser because `tarfile` doesn't work well for # big tarfiles; it seems to be reading the entire file to get a list of @@ -49,10 +73,10 @@ def tarfile_reader(file, streaming=False): # https://www.gnu.org/software/tar/manual/html_node/Standard.html # end at 135 not 136 because of \0 terminator - if hdr[124:135] == b'\0'*11: + if hdr[124:135] == b'\0' * 11: # end of record break - + fname = hdr[:100].split(b'\0')[0] # if the file is too big to fit in the size field, tarfiles will actually @@ -71,11 +95,13 @@ def tarfile_reader(file, streaming=False): if type == 'x': meta = file.read(padded_size)[:size] + def kv(x): return x.decode('utf-8').split(' ')[1].split('=') + paxfileattrs = { - kv(x)[0]: kv(x)[1] - for x in meta.split(b'\n') if x + kv(x)[0]: kv(x)[1] + for x in meta.split(b'\n') if x } paxfilesize = int(paxfileattrs['size']) @@ -101,6 +127,7 @@ def kv(x): yield file.read(padded_size)[:size] offset += padded_size + def handle_jsonl(jsonl_reader, get_meta, autojoin_paragraphs, para_joiner, key='text'): for ob in jsonl_reader: # naive jsonl where each object is just the string itself, with no meta. For legacy compatibility. @@ -123,7 +150,7 @@ def handle_jsonl(jsonl_reader, get_meta, autojoin_paragraphs, para_joiner, key=' class Reader: def __init__(self, in_path): self.in_path = in_path - + def stream_data(self, get_meta=False, threaded=False): if not threaded: yield from self._stream_data(get_meta) @@ -136,7 +163,7 @@ def stream_data(self, get_meta=False, threaded=False): res = q.get() if res is None: break yield res - + def _stream_data_threaded(self, q, get_meta=False): for data in self._stream_data(get_meta): q.put(data) @@ -166,14 +193,14 @@ def _stream_data(self, get_meta=False, jsonl_key="text"): elif f.endswith('.jsonl.zst'): yield from self.read_jsonl_zst(f, get_meta, key=jsonl_key) elif f.endswith('.jsonl.zst.tar'): - yield from self.read_jsonl_tar(f, get_meta, jsonl_key=key) + yield from self.read_jsonl_tar(f, get_meta, key=jsonl_key) elif f.endswith('.json.zst'): assert not get_meta yield from self.read_json(f) elif f.endswith('.txt'): assert not get_meta - + yield from self.read_txt(f) elif f.endswith('.zip'): assert not get_meta @@ -185,11 +212,11 @@ def _stream_data(self, get_meta=False, jsonl_key="text"): yield from self.read_tgz(f) elif f.endswith('.json.gz'): assert not get_meta - + yield from self.read_jsongz(f) elif f.endswith('.gz'): assert not get_meta - + yield from self.read_gz(f) else: # shouldn't be reached @@ -200,23 +227,23 @@ def read_txt(self, file): yield fh.read() def read_zip(self, file): - archive = ZipFile(file, 'r') + archive = zipfile.ZipFile(file, 'r') for f in archive.namelist(): yield archive.read(f).decode('UTF-8') def read_tgz(self, file): gz = gzip.open(file) yield from (x.decode('utf-8') for x in tarfile_reader(gz, streaming=False)) - - def read_gz(self, file): + + def read_gz(self, file): with gzip.open(file, 'rb') as f: for line in f: yield line.decode('utf-8') - - def read_jsongz(self, file): + + def read_jsongz(self, file): for line in self.read_gz(file): yield json.loads(line) - + def read_json(self, file): with open(file, 'rb') as fh: cctx = zstandard.ZstdDecompressor() @@ -240,7 +267,7 @@ def read_dat(self, file): def read_jsonl(self, file, get_meta=False, autojoin_paragraphs=True, para_joiner='\n\n', key='text'): with jsonlines.open(file) as rdr: yield from handle_jsonl(rdr, get_meta, autojoin_paragraphs, para_joiner, key) - + def read_jsonl_zst(self, file, get_meta=False, autojoin_paragraphs=True, para_joiner='\n\n', key='text'): with open(file, 'rb') as fh: cctx = zstandard.ZstdDecompressor() @@ -256,7 +283,7 @@ def read_jsonl_tar(self, file, get_meta=False, autojoin_paragraphs=True, para_jo rdr = jsonlines.Reader(reader) yield from handle_jsonl(rdr, get_meta, autojoin_paragraphs, para_joiner, key) f.close() - + def read_owt(self, file): tar = tarfile.open(file, encoding='utf-8') utf8reader = codecs.getreader('utf-8') @@ -283,19 +310,20 @@ def __init__(self, out_dir, compression_level=3, threads=8): self.out_dir = out_dir os.makedirs(out_dir, exist_ok=True) self.i = 0 - + self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') self.cctx = zstandard.ZstdCompressor(level=compression_level, threads=threads) self.compressor = self.cctx.stream_writer(self.fh) - - + def add_data(self, data, meta={}): - self.compressor.write(json.dumps({'text': data, 'meta': meta}).encode('UTF-8') + b'\n') - + out_str = TextArchive.to_text(data) + self.compressor.write(json.dumps({'text': out_str, 'meta': meta}).encode('UTF-8') + b'\n') + def commit(self, archive_name='default'): - fname = self.out_dir + '/data_' + str(self.i) + '_time' + str(int(time.time())) + '_' + archive_name + '.jsonl.zst' + fname = self.out_dir + '/data_' + str(self.i) + '_time' + str( + int(time.time())) + '_' + archive_name + '.jsonl.zst' self.compressor.flush(zstandard.FLUSH_FRAME) - + self.fh.flush() self.fh.close() os.rename(self.out_dir + '/current_chunk_incomplete', fname) @@ -313,10 +341,10 @@ def __init__(self, out_dir): self.i = 0 if os.path.exists(out_dir) and len(os.listdir(out_dir)) > 0: self.i = max(map(lambda x: int(x.split('_')[1].split('.')[0]), os.listdir(out_dir))) + 1 - + def add_data(self, data): - self.data.append(data) - + self.data.append(TextArchive.to_text(data)) + def commit(self, archive_name=None): # TODO: streaming cctx = zstandard.ZstdCompressor(level=3) @@ -324,7 +352,8 @@ def commit(self, archive_name=None): if archive_name is None: archive_name = str(int(time.time())) - res = b''.join(map(lambda x: ("%016d" % len(x)).encode('UTF-8') + x, map(lambda x: x.encode('UTF-8'), self.data))) + res = b''.join( + map(lambda x: ("%016d" % len(x)).encode('UTF-8') + x, map(lambda x: x.encode('UTF-8'), self.data))) cdata = cctx.compress(res) with open(self.out_dir + '/data_' + str(self.i) + '_' + archive_name + '.dat.zst', 'wb') as fh: @@ -333,6 +362,7 @@ def commit(self, archive_name=None): self.i += 1 self.data = [] + class JSONArchive: def __init__(self, out_dir): self.out_dir = out_dir @@ -341,16 +371,51 @@ def __init__(self, out_dir): self.i = 0 if os.path.exists(out_dir) and len(os.listdir(out_dir)) > 0: self.i = max(map(lambda x: int(x.split('_')[1].split('.')[0]), os.listdir(out_dir))) + 1 - + def add_data(self, data): self.data.append(data) - - def commit(self): + + def commit(self, name): cctx = zstandard.ZstdCompressor(level=3) - + cdata = cctx.compress(json.dumps(self.data).encode('UTF-8')) with open(self.out_dir + '/data_' + str(self.i) + '_' + str(int(time.time())) + '.json.zst', 'wb') as fh: fh.write(cdata) self.i += 1 self.data = [] + + +class TextArchive: + def __init__(self, out_dir): + self.out_dir = out_dir + os.makedirs(out_dir, exist_ok=True) + self.data = [] + + def add_data(self, data): + self.data.append(TextArchive.to_text(data)) + + @staticmethod + def to_text(data): + out_str = "" + out_str += 'Q:\n\n' + out_str += '{}\n\n'.format(data['question']['title']) + out_str += '{}\n\n'.format(data['question']['body']) + for answer in data['answers']: + out_str += 'A:\n\n{}\n\n'.format(answer['body']) + + try: + out_str = filter_newlines(out_str) + except: + out_str = filter_newlines(handle_unicode_errors(out_str)) + + return out_str + + def commit(self, archive_name): + fname = self.out_dir + '/data' + '_time' + str(int(time.time())) + '_' + archive_name + '.txt.zip' + with zipfile.ZipFile(fname, 'w', zipfile.ZIP_DEFLATED) as zipf: + for idx, example in enumerate(self.data): + filename = 'data_' + str(idx) + '_' + str(int(time.time())) + '.txt' + zipf.writestr(filename, example.encode('UTF-8')) + + self.data = [] From 7e6c54eb1f202fe213d57b3354183b241e1f3b3f Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Fri, 29 Sep 2023 11:31:10 +0900 Subject: [PATCH 03/13] cleanup --- lm_dataformat/__init__.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/lm_dataformat/__init__.py b/lm_dataformat/__init__.py index cb9ba69..2e06b4e 100644 --- a/lm_dataformat/__init__.py +++ b/lm_dataformat/__init__.py @@ -30,13 +30,6 @@ ] -def filter_newlines(text): - return re.sub("\n{3,}", "\n\n", text) - -def handle_unicode_errors(txt): - return txt.encode('utf-8', 'replace').decode() - - def has_valid_extension(file): return any([file.endswith(ext) for ext in VALID_EXTENSIONS]) @@ -316,8 +309,7 @@ def __init__(self, out_dir, compression_level=3, threads=8): self.compressor = self.cctx.stream_writer(self.fh) def add_data(self, data, meta={}): - out_str = TextArchive.to_text(data) - self.compressor.write(json.dumps({'text': out_str, 'meta': meta}).encode('UTF-8') + b'\n') + self.compressor.write(json.dumps({'text': data, 'meta': meta}).encode('UTF-8') + b'\n') def commit(self, archive_name='default'): fname = self.out_dir + '/data_' + str(self.i) + '_time' + str( @@ -343,7 +335,7 @@ def __init__(self, out_dir): self.i = max(map(lambda x: int(x.split('_')[1].split('.')[0]), os.listdir(out_dir))) + 1 def add_data(self, data): - self.data.append(TextArchive.to_text(data)) + self.data.append(data) def commit(self, archive_name=None): # TODO: streaming @@ -395,6 +387,15 @@ def __init__(self, out_dir): def add_data(self, data): self.data.append(TextArchive.to_text(data)) + @staticmethod + def filter_newlines(text): + return re.sub("\n{3,}", "\n\n", text) + + @staticmethod + def handle_unicode_errors(txt): + return txt.encode('utf-8', 'replace').decode() + + @staticmethod def to_text(data): out_str = "" @@ -405,9 +406,9 @@ def to_text(data): out_str += 'A:\n\n{}\n\n'.format(answer['body']) try: - out_str = filter_newlines(out_str) + out_str = TextArchive.filter_newlines(out_str) except: - out_str = filter_newlines(handle_unicode_errors(out_str)) + out_str = TextArchive.filter_newlines(TextArchive.handle_unicode_errors(out_str)) return out_str From c7606f6b0193da2b08a107f3a306eb7c313d74e2 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Fri, 29 Sep 2023 11:34:50 +0900 Subject: [PATCH 04/13] improving naming --- lm_dataformat/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lm_dataformat/__init__.py b/lm_dataformat/__init__.py index 2e06b4e..b33fa40 100644 --- a/lm_dataformat/__init__.py +++ b/lm_dataformat/__init__.py @@ -367,11 +367,11 @@ def __init__(self, out_dir): def add_data(self, data): self.data.append(data) - def commit(self, name): + def commit(self, archive_name): cctx = zstandard.ZstdCompressor(level=3) cdata = cctx.compress(json.dumps(self.data).encode('UTF-8')) - with open(self.out_dir + '/data_' + str(self.i) + '_' + str(int(time.time())) + '.json.zst', 'wb') as fh: + with open(self.out_dir + '/data_' + str(self.i) + '_' + archive_name + '.json.zst', 'wb') as fh: fh.write(cdata) self.i += 1 @@ -413,7 +413,7 @@ def to_text(data): return out_str def commit(self, archive_name): - fname = self.out_dir + '/data' + '_time' + str(int(time.time())) + '_' + archive_name + '.txt.zip' + fname = self.out_dir + '/data' + '_' + archive_name + '.txt.zip' with zipfile.ZipFile(fname, 'w', zipfile.ZIP_DEFLATED) as zipf: for idx, example in enumerate(self.data): filename = 'data_' + str(idx) + '_' + str(int(time.time())) + '.txt' From e2a4ac1f07d759ca9e445d6729cd1aba47f76428 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Sun, 5 Nov 2023 08:57:40 +0900 Subject: [PATCH 05/13] Enable Github actions (#2) * enable github actions * python 3.9 * add requirements.txt --- .github/workflows/ci-build.yml | 36 ++++++++++++++++++++++++ .travis.yml | 12 -------- pytest.ini | 2 ++ requirements.txt | 3 ++ test/{ => resources}/blns.jsonl.zst.tar | Bin test/{ => resources}/blns.txt | 0 test/{ => resources}/blns.txt.tar.gz | Bin test/{ => resources}/blns.txt.zip | Bin test/{ => resources}/testtarfile.tar | Bin test/test_dat_archive_reader.py | 28 +++++++++--------- 10 files changed, 55 insertions(+), 26 deletions(-) create mode 100644 .github/workflows/ci-build.yml delete mode 100644 .travis.yml create mode 100644 pytest.ini create mode 100644 requirements.txt rename test/{ => resources}/blns.jsonl.zst.tar (100%) rename test/{ => resources}/blns.txt (100%) rename test/{ => resources}/blns.txt.tar.gz (100%) rename test/{ => resources}/blns.txt.zip (100%) rename test/{ => resources}/testtarfile.tar (100%) diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml new file mode 100644 index 0000000..3bd37dc --- /dev/null +++ b/.github/workflows/ci-build.yml @@ -0,0 +1,36 @@ +name: Build unstable + +on: [push] + +concurrency: + group: unstable +# cancel-in-progress: true + + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v4 + with: + python-version: "3.9" +# cache: 'pip' + - name: Cleanup more disk space + run: sudo rm -rf /usr/share/dotnet && sudo rm -rf /opt/ghc && sudo rm -rf "/usr/local/share/boost" && sudo rm -rf "$AGENT_TOOLSDIRECTORY" + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install --upgrade flake8 pytest pycodestyle + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + run: | + python -m pytest --rootdir . \ No newline at end of file diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 60505e3..0000000 --- a/.travis.yml +++ /dev/null @@ -1,12 +0,0 @@ -language: python -python: - - "3.6" -install: - - pip install pytest - - pip install pytest-cov - - pip install coveralls - - pip install -e . -script: - - pytest --cov=lm_dataformat/ -after_success: - - coveralls diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..54fc9ba --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +testpaths = test \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bf0c7f1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +jsonlines +ujson +zstandard \ No newline at end of file diff --git a/test/blns.jsonl.zst.tar b/test/resources/blns.jsonl.zst.tar similarity index 100% rename from test/blns.jsonl.zst.tar rename to test/resources/blns.jsonl.zst.tar diff --git a/test/blns.txt b/test/resources/blns.txt similarity index 100% rename from test/blns.txt rename to test/resources/blns.txt diff --git a/test/blns.txt.tar.gz b/test/resources/blns.txt.tar.gz similarity index 100% rename from test/blns.txt.tar.gz rename to test/resources/blns.txt.tar.gz diff --git a/test/blns.txt.zip b/test/resources/blns.txt.zip similarity index 100% rename from test/blns.txt.zip rename to test/resources/blns.txt.zip diff --git a/test/testtarfile.tar b/test/resources/testtarfile.tar similarity index 100% rename from test/testtarfile.tar rename to test/resources/testtarfile.tar diff --git a/test/test_dat_archive_reader.py b/test/test_dat_archive_reader.py index 0fd0a5f..831f87f 100644 --- a/test/test_dat_archive_reader.py +++ b/test/test_dat_archive_reader.py @@ -10,7 +10,7 @@ def sha256str(s): def test_dat(): archive = lmd.DatArchive('test_dir') - blns = open('test/blns.txt').read() + blns = open('test/resources/blns.txt').read() archive.add_data(blns) archive.add_data('testing 123') archive.add_data(blns) @@ -29,12 +29,12 @@ def test_dat(): def test_json(): archive = lmd.JSONArchive('test_dir') - blns = open('test/blns.txt').read() + blns = open('test/resources/blns.txt').read() archive.add_data(blns) archive.add_data('testing 123') archive.add_data(blns) archive.add_data('testing 123456789') - archive.commit() + archive.commit("archive_name") reader = lmd.Reader('test_dir') @@ -48,7 +48,7 @@ def test_json(): def test_jsonl(): archive = lmd.Archive('test_dir') - blns = open('test/blns.txt').read() + blns = open('test/resources/blns.txt').read() archive.add_data(blns) archive.add_data('testing 123', meta={'testing': 123}) archive.add_data(blns, meta={'testing2': 456, 'testing': ['a','b']}) @@ -67,7 +67,7 @@ def test_jsonl(): def test_jsonl_paras(): archive = lmd.Archive('test_dir') - blns = open('test/blns.txt').read() + blns = open('test/resources/blns.txt').read() archive.add_data(blns) archive.add_data(['testing 123', 'testing 345'], meta={'testing': 123}) archive.add_data(blns, meta={'testing2': 456, 'testing': ['a','b']}) @@ -85,8 +85,8 @@ def test_jsonl_paras(): shutil.rmtree('test_dir') def test_jsonl_tar(): - blns = open('test/blns.txt').read() - reader = lmd.Reader('test/blns.jsonl.zst.tar') + blns = open('test/resources/blns.txt').read() + reader = lmd.Reader('test/resources/blns.jsonl.zst.tar') data = list(reader.stream_data(get_meta=True)) @@ -101,8 +101,8 @@ def test_jsonl_tar(): assert data[7] == ('testing 123456789', {}) def test_txt_read(): - reader = lmd.Reader('test/blns.txt') - blns = open('test/blns.txt').read() + reader = lmd.Reader('test/resources/blns.txt') + blns = open('test/resources/blns.txt').read() data = list(reader.stream_data(get_meta=False)) @@ -110,8 +110,8 @@ def test_txt_read(): assert len(data) == 1 def test_zip_read(): - reader = lmd.Reader('test/blns.txt.zip') - blns = open('test/blns.txt').read() + reader = lmd.Reader('test/resources/blns.txt.zip') + blns = open('test/resources/blns.txt').read() data = list(reader.stream_data(get_meta=False)) @@ -119,8 +119,8 @@ def test_zip_read(): assert len(data) == 1 def test_tgz_read(): - reader = lmd.Reader('test/blns.txt.tar.gz') - blns = open('test/blns.txt').read() + reader = lmd.Reader('test/resources/blns.txt.tar.gz') + blns = open('test/resources/blns.txt').read() data = list(reader.stream_data(get_meta=False)) @@ -128,7 +128,7 @@ def test_tgz_read(): assert len(data) == 1 def test_tarfile_reader(): - rdr = lmd.tarfile_reader(open('test/testtarfile.tar', 'rb'), streaming=True) + rdr = lmd.tarfile_reader(open('test/resources/testtarfile.tar', 'rb'), streaming=True) hashes = map(lambda doc: sha256str(doc.read()), rdr) From 334dd2b4fceb2ef25856a27dc435b19f2a76cd65 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Sun, 5 Nov 2023 09:01:54 +0900 Subject: [PATCH 06/13] update README --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index aecfe90..a79514e 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Utilities for storing data for LM training. +**NOTE**: The original project seems abandoned, I've continued the development to support ore data formats and integration with stackexchange_dataset + ## Basic Usage From b9764057d4baab81dcf79d531dc379eda487ca34 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Sun, 5 Nov 2023 10:07:17 +0900 Subject: [PATCH 07/13] add status badge --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a79514e..292b5f3 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -# LM_Dataformat [![Build Status](https://travis-ci.org/leogao2/lm_dataformat.svg?branch=master)](https://travis-ci.org/leogao2/lm_dataformat) [![Coverage Status](https://coveralls.io/repos/github/leogao2/lm_dataformat/badge.svg?branch=master)](https://coveralls.io/github/leogao2/lm_dataformat?branch=master) +# LM_Dataformat + +[![Build unstable](https://github.com/lfoppiano/lm_dataformat/actions/workflows/ci-build.yml/badge.svg)](https://github.com/lfoppiano/lm_dataformat/actions/workflows/ci-build.yml) Utilities for storing data for LM training. From fed718b582d5589ed0d0e880c5fd3dca1fb9c565 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Mon, 6 Nov 2023 18:41:45 +0900 Subject: [PATCH 08/13] correct how number of files already present is calculated --- lm_dataformat/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lm_dataformat/__init__.py b/lm_dataformat/__init__.py index b33fa40..bec9024 100644 --- a/lm_dataformat/__init__.py +++ b/lm_dataformat/__init__.py @@ -362,7 +362,7 @@ def __init__(self, out_dir): self.data = [] self.i = 0 if os.path.exists(out_dir) and len(os.listdir(out_dir)) > 0: - self.i = max(map(lambda x: int(x.split('_')[1].split('.')[0]), os.listdir(out_dir))) + 1 + self.i = max(map(lambda x: int(x.split('_')[1].split('.')[0]), filter(lambda y: y.endswith(".json.zst"), os.listdir(out_dir)))) + 1 def add_data(self, data): self.data.append(data) From 7cc037d285112910a76b24475868e0bc65d2ac80 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Mon, 4 Dec 2023 10:13:29 +0900 Subject: [PATCH 09/13] stream data directly to the output when questions and answers are completed --- lm_dataformat/__init__.py | 45 +++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/lm_dataformat/__init__.py b/lm_dataformat/__init__.py index bec9024..6eb5dae 100644 --- a/lm_dataformat/__init__.py +++ b/lm_dataformat/__init__.py @@ -356,36 +356,45 @@ def commit(self, archive_name=None): class JSONArchive: - def __init__(self, out_dir): + def __init__(self, out_dir, compression_level=3, threads=8): self.out_dir = out_dir os.makedirs(out_dir, exist_ok=True) self.data = [] self.i = 0 - if os.path.exists(out_dir) and len(os.listdir(out_dir)) > 0: - self.i = max(map(lambda x: int(x.split('_')[1].split('.')[0]), filter(lambda y: y.endswith(".json.zst"), os.listdir(out_dir)))) + 1 + self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') + self.cctx = zstandard.ZstdCompressor(level=compression_level, threads=threads) + self.compressor = self.cctx.stream_writer(self.fh) def add_data(self, data): - self.data.append(data) + self.compressor.write(json.dumps(data).encode('UTF-8') + b'\n') + # self.data.append(data) def commit(self, archive_name): - cctx = zstandard.ZstdCompressor(level=3) + fname = self.out_dir + '/data_' + str(self.i) + '_' + archive_name + '.json.zst' + self.compressor.flush(zstandard.FLUSH_FRAME) - cdata = cctx.compress(json.dumps(self.data).encode('UTF-8')) - with open(self.out_dir + '/data_' + str(self.i) + '_' + archive_name + '.json.zst', 'wb') as fh: - fh.write(cdata) + self.fh.flush() + self.fh.close() + os.rename(self.out_dir + '/current_chunk_incomplete', fname) + self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') + self.compressor = self.cctx.stream_writer(self.fh) self.i += 1 self.data = [] class TextArchive: - def __init__(self, out_dir): + def __init__(self, out_dir, compression_level=3, threads=8): self.out_dir = out_dir os.makedirs(out_dir, exist_ok=True) self.data = [] + self.i = 0 + self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') + self.cctx = zstandard.ZstdCompressor(level=compression_level, threads=threads) + self.compressor = self.cctx.stream_writer(self.fh) def add_data(self, data): - self.data.append(TextArchive.to_text(data)) + self.compressor.write(TextArchive.to_text(data).encode('UTF-8') + b'\n') @staticmethod def filter_newlines(text): @@ -395,7 +404,6 @@ def filter_newlines(text): def handle_unicode_errors(txt): return txt.encode('utf-8', 'replace').decode() - @staticmethod def to_text(data): out_str = "" @@ -413,10 +421,15 @@ def to_text(data): return out_str def commit(self, archive_name): - fname = self.out_dir + '/data' + '_' + archive_name + '.txt.zip' - with zipfile.ZipFile(fname, 'w', zipfile.ZIP_DEFLATED) as zipf: - for idx, example in enumerate(self.data): - filename = 'data_' + str(idx) + '_' + str(int(time.time())) + '.txt' - zipf.writestr(filename, example.encode('UTF-8')) + fname = self.out_dir + '/data' + '_' + archive_name + '.txt.zst' + + self.compressor.flush(zstandard.FLUSH_FRAME) + self.fh.flush() + self.fh.close() + os.rename(self.out_dir + '/current_chunk_incomplete', fname) + self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') + self.compressor = self.cctx.stream_writer(self.fh) + + self.i += 1 self.data = [] From 83cbd428443027cfe9ca0f6eb8653c26ee52c9c7 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Mon, 4 Dec 2023 11:18:48 +0900 Subject: [PATCH 10/13] remove json format and use jsonl format with streaming directly in the output file --- lm_dataformat/__init__.py | 64 +++++---------------------------- test/test_dat_archive_reader.py | 40 ++++++++------------- 2 files changed, 23 insertions(+), 81 deletions(-) diff --git a/lm_dataformat/__init__.py b/lm_dataformat/__init__.py index 6eb5dae..a57018b 100644 --- a/lm_dataformat/__init__.py +++ b/lm_dataformat/__init__.py @@ -19,14 +19,14 @@ VALID_EXTENSIONS = ['openwebtext.tar.xz', '_data.xz', '.dat.zst', '.jsonl', '.jsonl.zst', '.jsonl.zst.tar', '.json.zst', '.txt', '.zip', '.tar.gz', '.json.gz', '.gz'] -LM_DATAFORMAT_FORMAT = "lm_dataformat" +# LM_DATAFORMAT_FORMAT = "lm_dataformat" TEXT_FORMAT = "txt" -JSON_FORMAT = "json" +JSONL_FORMAT = "jsonl" SUPPORTED_FORMATS = [ TEXT_FORMAT, - LM_DATAFORMAT_FORMAT, - JSON_FORMAT + # LM_DATAFORMAT_FORMAT, + JSONL_FORMAT ] @@ -355,45 +355,11 @@ def commit(self, archive_name=None): self.data = [] -class JSONArchive: - def __init__(self, out_dir, compression_level=3, threads=8): - self.out_dir = out_dir - os.makedirs(out_dir, exist_ok=True) - self.data = [] - self.i = 0 - self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') - self.cctx = zstandard.ZstdCompressor(level=compression_level, threads=threads) - self.compressor = self.cctx.stream_writer(self.fh) - - def add_data(self, data): - self.compressor.write(json.dumps(data).encode('UTF-8') + b'\n') - # self.data.append(data) - - def commit(self, archive_name): - fname = self.out_dir + '/data_' + str(self.i) + '_' + archive_name + '.json.zst' - self.compressor.flush(zstandard.FLUSH_FRAME) - - self.fh.flush() - self.fh.close() - os.rename(self.out_dir + '/current_chunk_incomplete', fname) - self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') - self.compressor = self.cctx.stream_writer(self.fh) - - self.i += 1 - self.data = [] - - -class TextArchive: - def __init__(self, out_dir, compression_level=3, threads=8): - self.out_dir = out_dir - os.makedirs(out_dir, exist_ok=True) - self.data = [] - self.i = 0 - self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') - self.cctx = zstandard.ZstdCompressor(level=compression_level, threads=threads) - self.compressor = self.cctx.stream_writer(self.fh) +class TextArchive(Archive): + def __init__(self, out_dir): + super().__init__(out_dir) - def add_data(self, data): + def add_data(self, data, **kwargs): self.compressor.write(TextArchive.to_text(data).encode('UTF-8') + b'\n') @staticmethod @@ -419,17 +385,3 @@ def to_text(data): out_str = TextArchive.filter_newlines(TextArchive.handle_unicode_errors(out_str)) return out_str - - def commit(self, archive_name): - fname = self.out_dir + '/data' + '_' + archive_name + '.txt.zst' - - self.compressor.flush(zstandard.FLUSH_FRAME) - - self.fh.flush() - self.fh.close() - os.rename(self.out_dir + '/current_chunk_incomplete', fname) - self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') - self.compressor = self.cctx.stream_writer(self.fh) - - self.i += 1 - self.data = [] diff --git a/test/test_dat_archive_reader.py b/test/test_dat_archive_reader.py index 831f87f..c7403c5 100644 --- a/test/test_dat_archive_reader.py +++ b/test/test_dat_archive_reader.py @@ -3,11 +3,13 @@ import hashlib import shutil + def sha256str(s): h = hashlib.sha256() h.update(s) return h.hexdigest() + def test_dat(): archive = lmd.DatArchive('test_dir') blns = open('test/resources/blns.txt').read() @@ -27,31 +29,13 @@ def test_dat(): assert data[3] == 'testing 123456789' shutil.rmtree('test_dir') -def test_json(): - archive = lmd.JSONArchive('test_dir') - blns = open('test/resources/blns.txt').read() - archive.add_data(blns) - archive.add_data('testing 123') - archive.add_data(blns) - archive.add_data('testing 123456789') - archive.commit("archive_name") - - reader = lmd.Reader('test_dir') - - data = list(reader.stream_data()) - - assert data[0] == blns - assert data[1] == 'testing 123' - assert data[2] == blns - assert data[3] == 'testing 123456789' - shutil.rmtree('test_dir') def test_jsonl(): archive = lmd.Archive('test_dir') blns = open('test/resources/blns.txt').read() archive.add_data(blns) archive.add_data('testing 123', meta={'testing': 123}) - archive.add_data(blns, meta={'testing2': 456, 'testing': ['a','b']}) + archive.add_data(blns, meta={'testing2': 456, 'testing': ['a', 'b']}) archive.add_data('testing 123456789') archive.commit() @@ -61,16 +45,17 @@ def test_jsonl(): assert data[0] == (blns, {}) assert data[1] == ('testing 123', {'testing': 123}) - assert data[2] == (blns, {'testing2': 456, 'testing': ['a','b']}) + assert data[2] == (blns, {'testing2': 456, 'testing': ['a', 'b']}) assert data[3] == ('testing 123456789', {}) shutil.rmtree('test_dir') + def test_jsonl_paras(): archive = lmd.Archive('test_dir') blns = open('test/resources/blns.txt').read() archive.add_data(blns) archive.add_data(['testing 123', 'testing 345'], meta={'testing': 123}) - archive.add_data(blns, meta={'testing2': 456, 'testing': ['a','b']}) + archive.add_data(blns, meta={'testing2': 456, 'testing': ['a', 'b']}) archive.add_data('testing 123456789') archive.commit() @@ -80,10 +65,11 @@ def test_jsonl_paras(): assert data[0] == (blns, {}) assert data[1] == ('testing 123\n\ntesting 345', {'testing': 123}) - assert data[2] == (blns, {'testing2': 456, 'testing': ['a','b']}) + assert data[2] == (blns, {'testing2': 456, 'testing': ['a', 'b']}) assert data[3] == ('testing 123456789', {}) shutil.rmtree('test_dir') + def test_jsonl_tar(): blns = open('test/resources/blns.txt').read() reader = lmd.Reader('test/resources/blns.jsonl.zst.tar') @@ -92,14 +78,15 @@ def test_jsonl_tar(): assert data[0] == (blns, {}) assert data[1] == ('testing 123\n\ntesting 345', {'testing': 123}) - assert data[2] == (blns, {'testing2': 456, 'testing': ['a','b']}) + assert data[2] == (blns, {'testing2': 456, 'testing': ['a', 'b']}) assert data[3] == ('testing 123456789', {}) assert data[4] == (blns, {}) assert data[5] == ('testing 123\n\ntesting 345', {'testing': 123}) - assert data[6] == (blns, {'testing2': 456, 'testing': ['a','b']}) + assert data[6] == (blns, {'testing2': 456, 'testing': ['a', 'b']}) assert data[7] == ('testing 123456789', {}) + def test_txt_read(): reader = lmd.Reader('test/resources/blns.txt') blns = open('test/resources/blns.txt').read() @@ -109,6 +96,7 @@ def test_txt_read(): assert data[0] == blns assert len(data) == 1 + def test_zip_read(): reader = lmd.Reader('test/resources/blns.txt.zip') blns = open('test/resources/blns.txt').read() @@ -118,6 +106,7 @@ def test_zip_read(): assert data[0] == blns assert len(data) == 1 + def test_tgz_read(): reader = lmd.Reader('test/resources/blns.txt.tar.gz') blns = open('test/resources/blns.txt').read() @@ -127,9 +116,10 @@ def test_tgz_read(): assert data[0] == blns assert len(data) == 1 + def test_tarfile_reader(): rdr = lmd.tarfile_reader(open('test/resources/testtarfile.tar', 'rb'), streaming=True) - + hashes = map(lambda doc: sha256str(doc.read()), rdr) expected = [ From 92e04b9863d21ba38344c7097ec57957fd4c2247 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Wed, 6 Dec 2023 18:08:47 +0900 Subject: [PATCH 11/13] fix extension for txt --- lm_dataformat/__init__.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lm_dataformat/__init__.py b/lm_dataformat/__init__.py index a57018b..99f2ac1 100644 --- a/lm_dataformat/__init__.py +++ b/lm_dataformat/__init__.py @@ -311,9 +311,9 @@ def __init__(self, out_dir, compression_level=3, threads=8): def add_data(self, data, meta={}): self.compressor.write(json.dumps({'text': data, 'meta': meta}).encode('UTF-8') + b'\n') - def commit(self, archive_name='default'): - fname = self.out_dir + '/data_' + str(self.i) + '_time' + str( - int(time.time())) + '_' + archive_name + '.jsonl.zst' + def commit(self, archive_name='default', extension="jsonl"): + fname = "{}/data_{}_time{}_{}.{}.zst".format(self.out_dir, str(self.i), str(int(time.time())), archive_name, extension) + # fname = self.out_dir + '/data_' + str(self.i) + '_time' + str(int(time.time())) + '_' + archive_name + '.jsonl.zst' self.compressor.flush(zstandard.FLUSH_FRAME) self.fh.flush() @@ -362,6 +362,8 @@ def __init__(self, out_dir): def add_data(self, data, **kwargs): self.compressor.write(TextArchive.to_text(data).encode('UTF-8') + b'\n') + def commit(self, archive_name='default', extension="txt"): + super().commit(archive_name, extension) @staticmethod def filter_newlines(text): return re.sub("\n{3,}", "\n\n", text) From 7ebda11c0c67da7c30ea2507538ba521956e1636 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Wed, 6 Dec 2023 18:43:32 +0900 Subject: [PATCH 12/13] split archives when more than 10M records --- lm_dataformat/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lm_dataformat/__init__.py b/lm_dataformat/__init__.py index 99f2ac1..011c9a5 100644 --- a/lm_dataformat/__init__.py +++ b/lm_dataformat/__init__.py @@ -312,7 +312,8 @@ def add_data(self, data, meta={}): self.compressor.write(json.dumps({'text': data, 'meta': meta}).encode('UTF-8') + b'\n') def commit(self, archive_name='default', extension="jsonl"): - fname = "{}/data_{}_time{}_{}.{}.zst".format(self.out_dir, str(self.i), str(int(time.time())), archive_name, extension) + fname = "{}/data_{}_time{}_{}.{}.zst".format(self.out_dir, str(self.i), str(int(time.time())), archive_name, + extension) # fname = self.out_dir + '/data_' + str(self.i) + '_time' + str(int(time.time())) + '_' + archive_name + '.jsonl.zst' self.compressor.flush(zstandard.FLUSH_FRAME) @@ -364,6 +365,7 @@ def add_data(self, data, **kwargs): def commit(self, archive_name='default', extension="txt"): super().commit(archive_name, extension) + @staticmethod def filter_newlines(text): return re.sub("\n{3,}", "\n\n", text) From 0fb78060baf976535069b469dbe4ecaadf852b91 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Thu, 7 Dec 2023 10:35:07 +0900 Subject: [PATCH 13/13] set the time at the creation of the archive so all the multiple files have the same timestamp --- lm_dataformat/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lm_dataformat/__init__.py b/lm_dataformat/__init__.py index 011c9a5..e6ef6ba 100644 --- a/lm_dataformat/__init__.py +++ b/lm_dataformat/__init__.py @@ -303,6 +303,7 @@ def __init__(self, out_dir, compression_level=3, threads=8): self.out_dir = out_dir os.makedirs(out_dir, exist_ok=True) self.i = 0 + self.time = int(time.time()) self.fh = open(self.out_dir + '/current_chunk_incomplete', 'wb') self.cctx = zstandard.ZstdCompressor(level=compression_level, threads=threads) @@ -312,7 +313,7 @@ def add_data(self, data, meta={}): self.compressor.write(json.dumps({'text': data, 'meta': meta}).encode('UTF-8') + b'\n') def commit(self, archive_name='default', extension="jsonl"): - fname = "{}/data_{}_time{}_{}.{}.zst".format(self.out_dir, str(self.i), str(int(time.time())), archive_name, + fname = "{}/data_{}_time{}_{}.{}.zst".format(self.out_dir, str(self.i), str(self.time), archive_name, extension) # fname = self.out_dir + '/data_' + str(self.i) + '_time' + str(int(time.time())) + '_' + archive_name + '.jsonl.zst' self.compressor.flush(zstandard.FLUSH_FRAME)