diff --git a/.github/workflows/ci-feat-warc-by-cdx.yaml b/.github/workflows/ci-feat-warc-by-cdx.yaml new file mode 100644 index 0000000..8688e8c --- /dev/null +++ b/.github/workflows/ci-feat-warc-by-cdx.yaml @@ -0,0 +1,56 @@ +name: CI (only feature) + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + unit-tests: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: true + max-parallel: 1 # avoids ever triggering a rate limit + matrix: + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] + os: [ubuntu-latest] + EXTRA: [false] # used to force includes to get included + include: + - python-version: '3.12' + os: ubuntu-latest + EXTRA: true + env: + LOGLEVEL=DEBUG + - python-version: '3.8' + os: ubuntu-22.04 # oldest version on github actions + EXTRA: true + + steps: + - name: checkout + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install setuptools on python 3.12+ + if: ${{ matrix.python-version >= '3.12' }} + run: | + pip install setuptools + + - name: Install cdx_toolkit + run: pip install .[test] + + - name: Run tests (feature only) + run: | + PYTHONPATH=. py.test -rA -s --doctest-modules --cov-report=xml --cov-append --cov cdx_toolkit tests/warc_by_cdx tests/unit -v -v + coverage report + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c43c4ca..7f471c7 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,13 +1,15 @@ name: CI -on: - # runtime is erratic and up to an hour - push: - branches: - - main - pull_request: - branches: - - main +on: workflow_dispatch +# Disabled for this feature +# on: +# # runtime is erratic and up to an hour +# push: +# branches: +# - main +# pull_request: +# branches: +# - main jobs: unit-tests: diff --git a/cdx_toolkit/cli.py b/cdx_toolkit/cli.py index 6ffa393..2c3fa60 100644 --- a/cdx_toolkit/cli.py +++ b/cdx_toolkit/cli.py @@ -6,7 +6,15 @@ import os import cdx_toolkit -from cdx_toolkit.commoncrawl import normalize_crawl + +from cdx_toolkit.utils import get_version, setup + +from cdx_toolkit.filter_cdx import run_filter_cdx +from cdx_toolkit.filter_cdx.args import add_filter_cdx_args + +from cdx_toolkit.warcer_by_cdx import run_warcer_by_cdx +from cdx_toolkit.warcer_by_cdx.args import add_warcer_by_cdx_args + LOGGER = logging.getLogger(__name__) @@ -54,6 +62,14 @@ def main(args=None): warc.add_argument('url') warc.set_defaults(func=warcer) + warc_by_cdx = subparsers.add_parser('warc_by_cdx', help='iterate over capture content based on an CDX index file, creating a warc') + add_warcer_by_cdx_args(warc_by_cdx) + warc_by_cdx.set_defaults(func=run_warcer_by_cdx) + + filter_cdx = subparsers.add_parser('filter_cdx', help='Filter CDX files based on SURT prefixes whitelist') + add_filter_cdx_args(filter_cdx) + filter_cdx.set_defaults(func=run_filter_cdx) + size = subparsers.add_parser('size', help='imprecise count of how many results are available') size.add_argument('--details', action='store_true', help='show details of each subindex') size.add_argument('url') @@ -89,48 +105,6 @@ def set_loglevel(cmd): LOGGER.info('set loglevel to %s', str(loglevel)) -def get_version(): - return cdx_toolkit.__version__ - - -def setup(cmd): - kwargs = {} - kwargs['source'] = 'cc' if cmd.crawl else cmd.cc or cmd.ia or cmd.source or None - if kwargs['source'] is None: - raise ValueError('must specify --cc, --ia, or a --source') - if cmd.wb: - kwargs['wb'] = cmd.wb - if cmd.cc_mirror: - kwargs['cc_mirror'] = cmd.cc_mirror - if cmd.crawl: - kwargs['crawl'] = normalize_crawl([cmd.crawl]) # currently a string, not a list - if getattr(cmd, 'warc_download_prefix', None) is not None: - kwargs['warc_download_prefix'] = cmd.warc_download_prefix - - cdx = cdx_toolkit.CDXFetcher(**kwargs) - - kwargs = {} - if cmd.limit: - kwargs['limit'] = cmd.limit - if 'from' in vars(cmd) and vars(cmd)['from']: # python, uh, from is a reserved word - kwargs['from_ts'] = vars(cmd)['from'] - if cmd.to: - kwargs['to'] = cmd.to - if cmd.closest: - if not cmd.get: # pragma: no cover - LOGGER.info('note: --closest works best with --get') - kwargs['closest'] = cmd.closest - if cmd.filter: - kwargs['filter'] = cmd.filter - - if cmd.cmd == 'warc' and cmd.size: - kwargs['size'] = cmd.size - - if cmd.cmd == 'size' and cmd.details: - kwargs['details'] = cmd.details - - return cdx, kwargs - def winnow_fields(cmd, fields, obj): if cmd.all_fields: @@ -213,9 +187,15 @@ def warcer(cmd, cmdline): LOGGER.warning('revisit record being resolved for url %s %s', url, timestamp) writer.write_record(record) + writer.close() + def sizer(cmd, cmdline): cdx, kwargs = setup(cmd) size = cdx.get_size_estimate(cmd.url, **kwargs) print(size) + + +if __name__ == "__main__": + main() diff --git a/cdx_toolkit/filter_cdx/__init__.py b/cdx_toolkit/filter_cdx/__init__.py new file mode 100644 index 0000000..75c9de0 --- /dev/null +++ b/cdx_toolkit/filter_cdx/__init__.py @@ -0,0 +1,217 @@ +import logging +import os +import time +import sys +from concurrent.futures import ProcessPoolExecutor, as_completed +from functools import partial + +import fsspec +from surt import surt + +from cdx_toolkit.filter_cdx.matcher import TupleMatcher, TrieMatcher + + +logger = logging.getLogger(__name__) + + +def run_filter_cdx(args, cmdline: str): + """Filter CDX index files based on a given URL or SURT whitelist. + + - If a URL filter is provided, it is converted to a SURT filter. + - A index entry's SURT must start with one of the SURTs from the whitelist to be considered. + - All other index entries are discarded. + - All input/output paths can be local or remote paths (S3, ...) and compressed (*.gz). + """ + logger.info('Filtering CDX files based on whitelist') + + # Start timing + start_time = time.time() + + # Resolve input and output paths using glob pattern + # This should support glob via S3 (e.g., to fetch the indices from s3://commoncrawl/cc-index/collections/* ...) + input_paths, output_paths = resolve_paths( + input_base_path=args.input_base_path, + input_glob=args.input_glob, + output_base_path=args.output_base_path, + ) + validate_resolved_paths(output_paths, args.overwrite) + + logger.info(f'Found {len(input_paths)} files matching pattern: {args.input_base_path}/{args.input_glob}') + + # Load URL or SURT prefixes from file (each line is a surt) + filter_fs, filter_fs_path = fsspec.url_to_fs(args.filter_file) + logger.info('Loading whitelist from %s', filter_fs_path) + + if not filter_fs.exists(filter_fs_path): # Check that surts file exists + logger.error(f'Filter file not found: {filter_fs_path}') + sys.exit(1) + + with filter_fs.open(filter_fs_path, 'rt') as input_f: + include_prefixes = [line.strip() for line in input_f.readlines()] + + # Convert to SURT if filter file contains URLs + if args.filter_type == 'url': + logger.info('Converting urls to surts ...') + include_surt_prefixes = [surt(url) for url in include_prefixes] + else: + # Filter is already given as surts + include_surt_prefixes = include_prefixes + + # Create matcher based on selected approach + matcher_classes = { + 'trie': TrieMatcher, + 'tuple': TupleMatcher, + } + + matcher = matcher_classes[args.matching_approach](include_surt_prefixes) + + logger.info(f'Loaded {len(include_surt_prefixes):,} filter entries using {args.matching_approach} approach') + + # Process files in parallel or sequentially + n_parallel = args.parallel + limit = 0 if args.limit is None else args.limit + total_lines_n = 0 + total_included_n = 0 + total_errors_n = 0 + + if n_parallel > 1: + # Parallel processing + logger.info('Parallel processes: %i', n_parallel) + with ProcessPoolExecutor(max_workers=n_parallel) as executor: + # Create partial function with common arguments + process_file_partial = partial(_process_single_file, matcher=matcher, limit=limit) + + # Submit all jobs + future_to_paths = { + executor.submit(process_file_partial, input_path, output_path): (input_path, output_path) + for input_path, output_path in zip(input_paths, output_paths) + } + + # Collect results + for future in as_completed(future_to_paths): + input_path, output_path = future_to_paths[future] + try: + lines_n, included_n = future.result() + logger.info( + f'File statistics for {input_path}: included_n={included_n}; lines_n={lines_n}; ratio={included_n / lines_n:.4f}' + ) + total_lines_n += lines_n + total_included_n += included_n + + except Exception as exc: + logger.error(f'File {input_path} generated an exception: {exc}') + total_errors_n += 1 + else: + # Sequential processing + logger.info('Sequential processing') + for input_path, output_path in zip(input_paths, output_paths): + try: + lines_n, included_n = _process_single_file(input_path, output_path, matcher, limit) + logger.info( + f'File statistics for {input_path}: included_n={included_n}; lines_n={lines_n}; ratio={included_n / lines_n:.4f}' + ) + total_lines_n += lines_n + total_included_n += included_n + + except Exception as exc: + logger.error(f'File {input_path} generated an exception: {exc}') + total_errors_n += 1 + logger.info( + f'Total statistics: included_n={total_included_n}; lines_n={total_lines_n}; ratio={total_included_n / total_lines_n:.4f}' + ) + if total_errors_n > 0: + logger.error('Processing errors: %i', total_errors_n) + + # End timing and log execution time + end_time = time.time() + execution_time = end_time - start_time + + logger.info(f'Script execution time: {execution_time:.3f} seconds') + + +def resolve_paths(input_base_path: str, input_glob: str, output_base_path: str): + """Resolve input paths from glob pattern and generate corresponding output paths.""" + # Use fsspec to handle local and remote file systems + input_fs, input_fs_base_path = fsspec.url_to_fs(input_base_path) + input_full_glob = input_fs_base_path + input_glob + + # Get input files from glob pattern + input_fs_file_paths = sorted(input_fs.glob(input_full_glob)) + if not input_fs_file_paths: + logger.error(f'No files found matching glob pattern: {input_full_glob}') + sys.exit(1) + + # Generate corresponding output paths + output_file_paths = [] + input_file_paths = [] + for input_path in input_fs_file_paths: + # Get relative path from input_base_path without last slash + rel_path = input_path[len(input_fs_base_path) + 1 :] + + # Create corresponding full input and output path + output_file_paths.append(os.path.join(output_base_path, rel_path)) + input_file_paths.append(os.path.join(input_base_path, rel_path)) + + return input_file_paths, output_file_paths + + +def _process_single_file(input_path, output_path, matcher, limit: int = 0, log_every_n: int = 100_000): + """Process a single input/output file pair. Returns (lines_n, included_n).""" + lines_n = 0 + included_n = 0 + + logger.info('Reading index from %s', input_path) + logger.info('Writing filter output to %s', output_path) + + # Input/output from local or remote file system + input_fs, input_fs_path = fsspec.url_to_fs(input_path) + output_fs, output_fs_path = fsspec.url_to_fs(output_path) + + # Make sure output directory exists + output_fs.makedirs(output_fs._parent(output_fs_path), exist_ok=True) + + # Read and write compressed file if needed + compression = 'gzip' if input_fs_path.endswith('.gz') else None + + with output_fs.open(output_fs_path, 'w', compression=compression) as output_f: + with input_fs.open(input_fs_path, 'rt', compression=compression) as input_f: + for i, line in enumerate(input_f, 1): + # Read CDX line + surt_length = line.find(' ') # we do not need to parse the full line + record_surt = line[:surt_length] + lines_n += 1 + + # Use SURT matcher + include_record = matcher.matches(record_surt) + + if include_record: + output_f.write(line) + included_n += 1 + + if limit > 0 and included_n >= limit: + logger.info('Limit reached at %i from %s', limit, input_path) + break + + if (i % log_every_n) == 0: + logger.info(f'Lines completed: {i:,} (matched: {included_n:,}) from {input_path}') + + # Delete file if empty + if included_n == 0: + logger.warning('Output file is empty, removing it: %s', output_fs_path) + output_fs.rm(output_fs_path) + + return lines_n, included_n + + +def validate_resolved_paths(output_paths, overwrite): + """Validate resolved output paths and create directories if needed.""" + # Check if output files exist and overwrite flag + if not overwrite: + output_fs, _ = fsspec.url_to_fs(output_paths[0]) + for output_path in output_paths: + if output_fs.exists(output_path): + logger.error(f'Output file already exists: {output_path}. Use --overwrite to overwrite existing files.') + sys.exit(1) + + # Make sure directory exists + output_fs.makedirs(output_fs._parent(output_path), exist_ok=True) diff --git a/cdx_toolkit/filter_cdx/args.py b/cdx_toolkit/filter_cdx/args.py new file mode 100644 index 0000000..5e72553 --- /dev/null +++ b/cdx_toolkit/filter_cdx/args.py @@ -0,0 +1,46 @@ +import argparse + + +def add_filter_cdx_args(parser: argparse.ArgumentParser): + """Add command line arguments.""" + parser.add_argument( + 'input_base_path', + help='Base directory path on the local file system or remote URL for one or multiple CDX files (e.g., URL to S3 bucket)', + ) + parser.add_argument( + 'filter_file', + help='Path to file containing URL or SURT prefixes to filter for (one per line)', + ) + parser.add_argument( + 'output_base_path', + help='Base directory path for output files (directory structure will be replicated from input_base_path)', + ) + parser.add_argument( + '--filter-type', + type=str, + default='url', + help='Type of filter entries (options: `url` or `surt`, defaults to `url`)', + ) + parser.add_argument( + '--input-glob', + help="Glob pattern relative to input_base_path (e.g., '**/*.cdx.gz' or 'collections/*/indexes/*.gz')", + ) + parser.add_argument( + '--matching-approach', + choices=['trie', 'tuple'], + default='trie', + help='Matching approach to use (default: trie)', + ) + parser.add_argument( + '--overwrite', + action='store_true', + help='Allow overwriting existing output files', + ) + parser.add_argument( + '--parallel', + type=int, + default=1, + help='Number of parallel workers for processing multiple input files (default: 1, sequential processing)', + ) + + return parser diff --git a/cdx_toolkit/filter_cdx/matcher.py b/cdx_toolkit/filter_cdx/matcher.py new file mode 100644 index 0000000..d6da16d --- /dev/null +++ b/cdx_toolkit/filter_cdx/matcher.py @@ -0,0 +1,86 @@ +from typing import List, Tuple, Union +import logging +from abc import ABC, abstractmethod + +logger = logging.getLogger(__name__) + + +class Matcher(ABC): + """Base class for all matching approaches.""" + + @abstractmethod + def __init__(self, prefixes: Union[Tuple[str], List[str]]): + """Initialize the matcher with a list of prefixes.""" + pass + + @abstractmethod + def matches(self, text: str) -> bool: + """Check if text starts with any of the prefixes.""" + pass + + @staticmethod + def validate_prefixes(prefixes: Union[Tuple[str], List[str]]) -> Tuple[str]: + valid_prefixes = [] + + for prefix in prefixes: + if prefix is None or not isinstance(prefix, str): + raise ValueError('Prefix must be a string and not none.') + + # remove white spaces + prefix = prefix.strip() + + if len(prefix) == 0: + raise ValueError('Empty prefixes are not allowed') + + valid_prefixes.append(prefix) + + return tuple(valid_prefixes) + + +class TrieNode: + def __init__(self): + self.children = {} + self.is_end = False + + +class TrieMatcher(Matcher): + """Trie-based matching approach.""" + + def __init__(self, prefixes: Union[Tuple[str], List[str]]): + logger.info(f'Building trie matcher based on {len(prefixes):,} inputs') + self.root = self._build_trie(self.validate_prefixes(prefixes)) + + def _build_trie(self, prefixes: Tuple[str]): + """Build a trie from a collection of prefixes.""" + root = TrieNode() + for prefix in prefixes: + node = root + for char in prefix: + if char not in node.children: + node.children[char] = TrieNode() + node = node.children[char] + node.is_end = True + return root + + def matches(self, text: str) -> bool: + """Check if text starts with any prefix in the trie.""" + node = self.root + for char in text: + if char not in node.children: + return False + node = node.children[char] + if node.is_end: + return True + return False + + +class TupleMatcher(Matcher): + """Tuple-based matching approach using the built-in method `str.startswith`.""" + + def __init__(self, prefixes: Union[Tuple[str], List[str]]): + logger.info(f'Building Tuple matcher based on {len(prefixes):,} inputs') + self.prefixes_Tuple = self.validate_prefixes(prefixes) + + def matches(self, text: str) -> bool: + """Check if text starts with any prefix in the Tuple.""" + return text.startswith(self.prefixes_Tuple) diff --git a/cdx_toolkit/utils.py b/cdx_toolkit/utils.py new file mode 100644 index 0000000..55f0d20 --- /dev/null +++ b/cdx_toolkit/utils.py @@ -0,0 +1,49 @@ +import cdx_toolkit +from cdx_toolkit.commoncrawl import normalize_crawl + +import logging + +LOGGER = logging.getLogger(__name__) + + +def get_version(): + return cdx_toolkit.__version__ + + +def setup(cmd): + kwargs = {} + kwargs['source'] = 'cc' if cmd.crawl else cmd.cc or cmd.ia or cmd.source or None + if kwargs['source'] is None: + raise ValueError('must specify --cc, --ia, or a --source') + if cmd.wb: + kwargs['wb'] = cmd.wb + if cmd.cc_mirror: + kwargs['cc_mirror'] = cmd.cc_mirror + if cmd.crawl: + kwargs['crawl'] = normalize_crawl([cmd.crawl]) # currently a string, not a list + if getattr(cmd, 'warc_download_prefix', None) is not None: + kwargs['warc_download_prefix'] = cmd.warc_download_prefix + + cdx = cdx_toolkit.CDXFetcher(**kwargs) + + kwargs = {} + if cmd.limit: + kwargs['limit'] = cmd.limit + if 'from' in vars(cmd) and vars(cmd)['from']: # python, uh, from is a reserved word + kwargs['from_ts'] = vars(cmd)['from'] + if cmd.to: + kwargs['to'] = cmd.to + if cmd.closest: + if not cmd.get: # pragma: no cover + LOGGER.info('note: --closest works best with --get') + kwargs['closest'] = cmd.closest + if cmd.filter: + kwargs['filter'] = cmd.filter + + if cmd.cmd == 'warc' and cmd.size: + kwargs['size'] = cmd.size + + if cmd.cmd == 'size' and cmd.details: + kwargs['details'] = cmd.details + + return cdx, kwargs diff --git a/cdx_toolkit/warc.py b/cdx_toolkit/warc.py index ac8435e..391e755 100644 --- a/cdx_toolkit/warc.py +++ b/cdx_toolkit/warc.py @@ -5,6 +5,7 @@ import logging import sys +import fsspec from warcio import WARCWriter from warcio.recordloader import ArcWarcRecordLoader from warcio.bufferedreaders import DecompressingBufferedReader @@ -131,10 +132,19 @@ def fetch_warc_record(capture, warc_download_prefix): length = int(capture['length']) warc_url = warc_download_prefix + '/' + filename - headers = {'Range': 'bytes={}-{}'.format(offset, offset+length-1)} - resp = myrequests_get(warc_url, headers=headers) - record_bytes = resp.content + if warc_url.startswith("s3:"): + # fetch from S3 + with fsspec.open(warc_url, 'rb') as f: + f.seek(offset) + record_bytes = f.read(length) + else: + # fetch over HTTP + headers = {'Range': 'bytes={}-{}'.format(offset, offset+length-1)} + + resp = myrequests_get(warc_url, headers=headers) + record_bytes = resp.content + stream = DecompressingBufferedReader(BytesIO(record_bytes)) record = ArcWarcRecordLoader().parse_record_stream(stream) @@ -152,6 +162,9 @@ def fetch_warc_record(capture, warc_download_prefix): class CDXToolkitWARCWriter: + """Writer for WARC files. + + The fsspec package is used for writting to local or remote file system, e.g., S3.""" def __init__(self, prefix, subprefix, info, size=1000000000, gzip=True, warc_version=None): self.prefix = prefix self.subprefix = subprefix @@ -161,6 +174,9 @@ def __init__(self, prefix, subprefix, info, size=1000000000, gzip=True, warc_ver self.warc_version = warc_version self.segment = 0 self.writer = None + self.file_handler = None + self.file_system, self.file_system_prefix = fsspec.url_to_fs(self.prefix) + self._file_context = None def write_record(self, *args, **kwargs): if self.writer is None: @@ -175,21 +191,21 @@ def write_record(self, *args, **kwargs): self.writer.write_record(*args, **kwargs) - fsize = os.fstat(self.fd.fileno()).st_size - if fsize > self.size: - self.fd.close() + # Compare file size of current segment with max. file size + if self.file_handler and self.file_handler.tell() > self.size: + self._close_current_file() self.writer = None self.segment += 1 def _unique_warc_filename(self): while True: - name = self.prefix + '-' + name = self.file_system_prefix + '-' if self.subprefix is not None: name += self.subprefix + '-' name += '{:06d}'.format(self.segment) + '.extracted.warc' if self.gzip: name += '.gz' - if os.path.exists(name): + if self.file_system.exists(name): self.segment += 1 else: break @@ -197,12 +213,24 @@ def _unique_warc_filename(self): def _start_new_warc(self): self.filename = self._unique_warc_filename() - self.fd = open(self.filename, 'wb') + self._file_context = self.file_system.open(self.filename, 'wb') + self.file_handler = self._file_context.__enter__() LOGGER.info('opening new warc file %s', self.filename) - self.writer = WARCWriter(self.fd, gzip=self.gzip, warc_version=self.warc_version) + self.writer = WARCWriter(self.file_handler, gzip=self.gzip, warc_version=self.warc_version) warcinfo = self.writer.create_warcinfo_record(self.filename, self.info) self.writer.write_record(warcinfo) + def _close_current_file(self): + # Close the handler of the current file (needed for fsspec abstraction) + if self._file_context is not None: + self._file_context.__exit__(None, None, None) + self._file_context = None + self.file_handler = None + + def close(self): + # Close the WARC writer (this must be called at the end) + self._close_current_file() + def get_writer(prefix, subprefix, info, **kwargs): return CDXToolkitWARCWriter(prefix, subprefix, info, **kwargs) diff --git a/cdx_toolkit/warcer_by_cdx/__init__.py b/cdx_toolkit/warcer_by_cdx/__init__.py new file mode 100644 index 0000000..d12ac45 --- /dev/null +++ b/cdx_toolkit/warcer_by_cdx/__init__.py @@ -0,0 +1,144 @@ +import logging +import sys +import time +from typing import List, Literal, Optional + +import fsspec + + +from cdx_toolkit.utils import get_version, setup +from cdx_toolkit.warcer_by_cdx.aioboto3_warcer import filter_warc_by_cdx_via_aioboto3 +from cdx_toolkit.warcer_by_cdx.fsspec_warcer import filter_warc_by_cdx_via_fsspec + + +logger = logging.getLogger(__name__) + +ImplementationType = Literal['fsspec', 'aioboto3'] + + +def run_warcer_by_cdx(args, cmdline): + """Like warcer but fetches WARC records based on one or more CDX index files. + + The CDX files can be filtered using the `filter_cdx` commands based a given URL/SURT list. + + Approach: + - Iterate over one or more CDX files to extract capture object (file, offset, length) + - Fetch WARC record based on capture object + - Write to new WARC file with metadata including resource record with index. + - The CDX resource record is written to the WARC directly before for response records that matches to the CDX. + """ + logger.info('Filtering WARC files based on CDX') + + cdx, kwargs = setup(args) + + # Start timing + start_time = time.time() + + implementation: ImplementationType = args.implementation + + write_paths_as_resource_records = args.write_paths_as_resource_records + write_paths_as_resource_records_metadata = args.write_paths_as_resource_records_metadata + + if write_paths_as_resource_records and write_paths_as_resource_records_metadata: + if len(write_paths_as_resource_records) != len(write_paths_as_resource_records_metadata): + raise ValueError("Number of paths to resource records must be equal to metadata paths.") + + if not write_paths_as_resource_records and write_paths_as_resource_records_metadata: + raise ValueError("Metadata paths are set but resource records paths are missing.") + + ispartof = args.prefix + if args.subprefix: + ispartof += '-' + args.subprefix + + info = { + 'software': 'pypi_cdx_toolkit/' + get_version(), + 'isPartOf': ispartof, + 'description': 'warc extraction based on CDX generated with: ' + cmdline, + 'format': 'WARC file version 1.0', + } + if args.creator: + info['creator'] = args.creator + if args.operator: + info['operator'] = args.operator + + writer_kwargs = {} + if 'size' in kwargs: + writer_kwargs['size'] = kwargs['size'] + del kwargs['size'] + + n_parallel = args.parallel + log_every_n = 5 + limit = 0 if args.limit is None else args.limit + prefix_path = str(args.prefix) + prefix_fs, prefix_fs_path = fsspec.url_to_fs(prefix_path) + + # make sure the base dir exists + prefix_fs.makedirs(prefix_fs._parent(prefix_fs_path), exist_ok=True) + + index_paths = get_index_paths( + args.index_path, + args.index_glob, + ) + + if implementation == 'fsspec': + records_n = filter_warc_by_cdx_via_fsspec( + index_paths=index_paths, + prefix_path=prefix_path, + writer_info=info, + writer_subprefix=args.subprefix, + write_paths_as_resource_records=write_paths_as_resource_records, + write_paths_as_resource_records_metadata=write_paths_as_resource_records_metadata, + limit=limit, + log_every_n=log_every_n, + warc_download_prefix=cdx.warc_download_prefix, + n_parallel=n_parallel, + writer_kwargs=writer_kwargs, + ) + elif implementation == 'aioboto3': + records_n = filter_warc_by_cdx_via_aioboto3( + index_paths=index_paths, + prefix_path=prefix_path, + writer_info=info, + writer_subprefix=args.subprefix, + write_paths_as_resource_records=write_paths_as_resource_records, + write_paths_as_resource_records_metadata=write_paths_as_resource_records_metadata, + limit=limit, + log_every_n=log_every_n, + warc_download_prefix=cdx.warc_download_prefix, + n_parallel=n_parallel, + writer_kwargs=writer_kwargs, + ) + else: + raise ValueError(f'Invalid implementation: {implementation}') + + logger.info('WARC records extracted: %i', records_n) + + # End timing and log execution time + end_time = time.time() + execution_time = end_time - start_time + + logger.info(f'Script execution time: {execution_time:.3f} seconds') + + +def get_index_paths(index_path: str, index_glob: Optional[str] = None) -> List[str]: + if index_glob is None: + # Read from a single index + index_paths = [index_path] + else: + # Prepare index paths + index_fs, index_fs_path = fsspec.url_to_fs(index_path) + + # Fetch multiple indicies via glob + full_glob = index_fs_path + index_glob + + logger.info('glob pattern from %s (%s)', full_glob, index_fs.protocol) + + index_paths = sorted(index_fs.glob(full_glob)) + + logger.info('glob pattern found %i index files in %s', len(index_paths), index_fs_path) + + if not index_paths: + logger.error('no index files found via glob') + sys.exit(1) + + return index_paths diff --git a/cdx_toolkit/warcer_by_cdx/aioboto3_utils.py b/cdx_toolkit/warcer_by_cdx/aioboto3_utils.py new file mode 100644 index 0000000..1ea64e8 --- /dev/null +++ b/cdx_toolkit/warcer_by_cdx/aioboto3_utils.py @@ -0,0 +1,195 @@ +import asyncio +import logging +import time +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple +from os import urandom + +from botocore.exceptions import ClientError, EndpointConnectionError + +_STOP = object() + +logger = logging.getLogger(__name__) + + +@dataclass +class ThroughputTracker: + """Track throughput metrics for fetchers and consumers.""" + + start_time: float = 0.0 + total_bytes: int = 0 + total_requests: int = 0 + + def start(self): + self.start_time = time.time() + + def add_bytes(self, bytes_count: int): + self.total_bytes += bytes_count + self.total_requests += 1 + + def get_stats(self) -> dict: + elapsed = time.time() - self.start_time + + return { + 'elapsed': elapsed, + 'total_bytes': self.total_bytes, + 'total_requests': self.total_requests, + 'bytes_per_sec': self.total_bytes / elapsed if elapsed > 0 else 0, + 'mb_per_sec': (self.total_bytes / elapsed) / (1024 * 1024) if elapsed > 0 else 0, + 'requests_per_sec': self.total_requests / elapsed if elapsed > 0 else 0, + } + + +@dataclass(frozen=True) +class RangeJob: + bucket: str + key: str + offset: int + length: int + + +@dataclass(frozen=True) +class RangePayload: + job: RangeJob + data: bytes + + +def _backoff(attempt: int, base_backoff_seconds: float) -> float: + """Time to sleep based on number of attempts""" + base = base_backoff_seconds * (2 ** (attempt - 1)) + + # Add random jitter between 80-120% of base delay + return max(0.05, base * (0.8 + 0.4 * urandom(1)[0] / 255)) + + +def parse_s3_uri(uri: str) -> Tuple[str, str]: + if not uri.startswith('s3://'): + raise ValueError(f'Not an S3 URI: {uri}') + rest = uri[5:] + i = rest.find('/') + if i <= 0 or i == len(rest) - 1: + raise ValueError(f'Malformed S3 URI: {uri}') + return rest[:i], rest[i + 1 :] + + +async def with_retries(coro_factory, *, op_name: str, max_attempts: int, base_backoff_seconds: float): + last_exc = None + for attempt in range(1, max_attempts + 1): + try: + return await coro_factory() + except (TimeoutError, ClientError, EndpointConnectionError) as exc: + last_exc = exc + if attempt >= max_attempts: + logger.error('%s failed after %d attempts: %r', op_name, attempt, exc) + break + sleep_s = _backoff(attempt, base_backoff_seconds) + logger.warning( + '%s failed (attempt %d/%d) – retrying in %.2fs', + op_name, + attempt, + max_attempts, + sleep_s, + ) + await asyncio.sleep(sleep_s) + raise last_exc + + +async def get_object_stream(s3, bucket: str, key: str, max_attempts: int, base_backoff_seconds: float): + resp = await with_retries( + lambda: s3.get_object(Bucket=bucket, Key=key), + op_name=f'get_object {bucket}/{key}', + max_attempts=max_attempts, + base_backoff_seconds=base_backoff_seconds, + ) + return resp['Body'] + + +async def ranged_get_bytes( + s3, + bucket: str, + key: str, + offset: int, + length: int, + max_attempts: int, + base_backoff_seconds: float, +) -> bytes: + end = offset + length - 1 # inclusive + resp = await with_retries( + lambda: s3.get_object(Bucket=bucket, Key=key, Range=f'bytes={offset}-{end}'), + op_name=f'ranged_get {bucket}/{key}[{offset}:{end}]', + max_attempts=max_attempts, + base_backoff_seconds=base_backoff_seconds, + ) + return await resp['Body'].read() + + +async def mpu_create( + s3, + bucket: str, + key: str, + *, + content_type: Optional[str], + max_attempts: int, + base_backoff_seconds: float, +): + kwargs = {'Bucket': bucket, 'Key': key} + if content_type: + kwargs['ContentType'] = content_type + resp = await with_retries( + lambda: s3.create_multipart_upload(**kwargs), + op_name=f'create_multipart_upload {bucket}/{key}', + max_attempts=max_attempts, + base_backoff_seconds=base_backoff_seconds, + ) + return resp['UploadId'] + + +async def mpu_upload_part( + s3, + bucket: str, + key: str, + upload_id: str, + part_number: int, + body: bytes, + max_attempts: int, + base_backoff_seconds: float, +) -> str: + resp = await with_retries( + lambda: s3.upload_part( + Bucket=bucket, + Key=key, + UploadId=upload_id, + PartNumber=part_number, + Body=body, + ), + op_name=f'upload_part {bucket}/{key}#{part_number}', + max_attempts=max_attempts, + base_backoff_seconds=base_backoff_seconds, + ) + return resp['ETag'] + + +async def mpu_complete( + s3, + bucket: str, + key: str, + upload_id: str, + parts: List[Dict], + max_attempts: int, + base_backoff_seconds: float, +): + await with_retries( + lambda: s3.complete_multipart_upload( + Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts} + ), + op_name=f'complete_multipart_upload {bucket}/{key}', + max_attempts=max_attempts, + base_backoff_seconds=base_backoff_seconds, + ) + + +async def mpu_abort(s3, bucket: str, key: str, upload_id: str): + try: + await s3.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id) + except Exception: + logger.exception('Failed to abort MPU %s on %s/%s', upload_id, bucket, key) diff --git a/cdx_toolkit/warcer_by_cdx/aioboto3_warcer.py b/cdx_toolkit/warcer_by_cdx/aioboto3_warcer.py new file mode 100644 index 0000000..511e3e6 --- /dev/null +++ b/cdx_toolkit/warcer_by_cdx/aioboto3_warcer.py @@ -0,0 +1,485 @@ +import asyncio +from io import BytesIO +import logging +from typing import List, Optional, Dict + +import aioboto3 +from botocore.config import Config +from warcio import WARCWriter + +from cdx_toolkit.warcer_by_cdx.aioboto3_utils import ( + _STOP, + RangeJob, + RangePayload, + ThroughputTracker, + parse_s3_uri, + ranged_get_bytes, +) +from cdx_toolkit.warcer_by_cdx.aioboto3_writer import ShardWriter +from cdx_toolkit.warcer_by_cdx.cdx_utils import ( + iter_cdx_index_from_path, +) +from cdx_toolkit.warcer_by_cdx.warc_utils import get_bytes_from_warc_record, get_resource_record_from_path + + +logger = logging.getLogger(__name__) + + +def filter_warc_by_cdx_via_aioboto3( + index_paths: List[str], + prefix_path: str, + writer_info: Dict, + writer_subprefix: Optional[str] = None, + write_paths_as_resource_records: Optional[List[str]] = None, + write_paths_as_resource_records_metadata: Optional[List[str]] = None, + limit: int = 0, + log_every_n: int = 1000, + warc_download_prefix: Optional[str] = None, + n_parallel: int = 1, + writer_kwargs: Optional[Dict] = None, +) -> int: + try: + return asyncio.run( + filter_warc_by_cdx_via_aioboto3_async( + index_paths=index_paths, + prefix_path=prefix_path, + writer_info=writer_info, + writer_subprefix=writer_subprefix, + write_paths_as_resource_records=write_paths_as_resource_records, + write_paths_as_resource_records_metadata=write_paths_as_resource_records_metadata, + limit=limit, + log_every_n=log_every_n, + warc_download_prefix=warc_download_prefix, + writer_kwargs=writer_kwargs, + n_parallel=n_parallel, + ) + ) + except KeyboardInterrupt: + logger.warning('Interrupted by user.') + + return -1 + + +async def filter_warc_by_cdx_via_aioboto3_async( + index_paths: List[str], + prefix_path: str, + writer_info: Dict, + writer_subprefix: Optional[str] = None, + write_paths_as_resource_records: Optional[List[str]] = None, + write_paths_as_resource_records_metadata: Optional[List[str]] = None, + limit: int = 0, + log_every_n: int = 1000, + warc_download_prefix: Optional[str] = None, + n_parallel: int = 1, + writer_kwargs: Optional[Dict] = None, + max_attempts: int = 5, + key_queue_size: int = 1000, + item_queue_size: int = 200, + base_backoff_seconds=0.5, + s3_region_name: str = 'us-east-1', +) -> int: + n_records = 0 + fetcher_to_consumer_ratio = 6 + num_fetchers = n_parallel + num_consumers = max(int(num_fetchers / fetcher_to_consumer_ratio), 1) + + key_queue: asyncio.Queue = asyncio.Queue(maxsize=key_queue_size) + item_queue: asyncio.Queue = asyncio.Queue(maxsize=item_queue_size) + + boto_cfg = Config( + region_name=s3_region_name, + retries={'max_attempts': max(2, max_attempts), 'mode': 'standard'}, + connect_timeout=10, + read_timeout=120, + ) + + session = aioboto3.Session() + + async with session.client('s3', config=boto_cfg) as s3: + # Fetch file paths and ranges (offset, length) from index files + logger.info('Starting lister, %d fetchers, %d consumers', num_fetchers, num_consumers) + lister_task = asyncio.create_task( + get_range_jobs_from_index_paths( + key_queue=key_queue, + index_paths=index_paths, + warc_download_prefix=warc_download_prefix, + num_fetchers=num_fetchers, + limit=limit, + ) + ) + + # Read WARC records based on file paths and ranges + fetchers = [ + asyncio.create_task( + fetch_warc_ranges( + fetcher_id=i, + key_queue=key_queue, + item_queue=item_queue, + s3=s3, + max_attempts=max_attempts, + base_backoff_seconds=base_backoff_seconds, + log_every_n=log_every_n, + ) + ) + for i in range(num_fetchers) + ] + + # Write WARC records + consumers = [ + asyncio.create_task( + write_warc( + consumer_id=i, + item_queue=item_queue, + s3=s3, + prefix_path=prefix_path, + max_attempts=max_attempts, + base_backoff_seconds=base_backoff_seconds, + write_paths_as_resource_records=write_paths_as_resource_records, + write_paths_as_resource_records_metadata=write_paths_as_resource_records_metadata, + writer_info=writer_info, + writer_subprefix=writer_subprefix, + writer_kwargs=writer_kwargs, + log_every_n=log_every_n, + gzip=index_paths[0].endswith('.gz') if index_paths else False, + ) + ) + for i in range(num_consumers) + ] + + await lister_task + logger.info('Lister completed, waiting for fetchers to finish') + + await asyncio.gather(*fetchers) + logger.info('All fetchers completed') + + # Send stop signals to consumers + for _ in range(num_consumers): + await item_queue.put(_STOP) + + consumer_results = await asyncio.gather(*consumers) + n_records = sum([result['stats']['total_requests'] for result in consumer_results]) + + logger.info('All consumers completed') + + return n_records + + +async def get_range_jobs_from_index_paths( + key_queue: asyncio.Queue, + index_paths: List[str], + warc_download_prefix: str, + num_fetchers: int, + limit: int = 0, +): + """Stage 1: stream the CDX paths, parse lines -> RangeJob (WARC files and offets) -> key_queue.""" + + logger.info('Range index limit: %i', limit) + count = 0 + + if not index_paths: + logger.error('No index paths provided!') + + else: + # Iterate over index files + for index_path in index_paths: + # Fetch range queries from index + try: + for warc_url, offset, length in iter_cdx_index_from_path( + index_path, warc_download_prefix=warc_download_prefix + ): + # Convert the CDX record back to a RangeJob + bucket, key = parse_s3_uri(warc_url) + job = RangeJob(bucket=bucket, key=key, offset=offset, length=length) + await key_queue.put(job) + count += 1 + + if limit > 0 and count >= limit: + logger.warning('Index limit reached at %i', count) + break + + except Exception as e: + logger.error('Failed to read CDX index from %s: %s', index_path, e) + + if limit > 0 and count >= limit: + logger.warning('Limit reached at %i', count) + break + + # signal fetchers to stop + for _ in range(num_fetchers): + await key_queue.put(_STOP) + + logger.info('Lister enqueued %d jobs from %s', count, index_path) + + +async def fetch_warc_ranges( + fetcher_id: int, + key_queue: asyncio.Queue, + item_queue: asyncio.Queue, + s3, + max_attempts: int, + base_backoff_seconds: float, + log_every_n: int = 1000, +): + """Stage 2: ranged GET per job -> enqueue RangePayload.""" + tracker = ThroughputTracker() + tracker.start() + counter = 0 + + while True: + job = await key_queue.get() + try: + if job is _STOP: + stats = tracker.get_stats() + logger.info( + 'Fetcher %d stopping. Stats: %.1fs, %d requests, %.1f MB, %.2f MB/s, %.2f req/s', + fetcher_id, + stats['elapsed'], + stats['total_requests'], + stats['total_bytes'] / (1024 * 1024), + stats['mb_per_sec'], + stats['requests_per_sec'], + ) + break # Exit loop, but still execute finally block + assert isinstance(job, RangeJob) + data = await ranged_get_bytes( + s3, + job.bucket, + job.key, + job.offset, + job.length, + max_attempts, + base_backoff_seconds, + ) + tracker.add_bytes(len(data)) + counter += 1 + + # Log progress every 10 items + if counter % log_every_n == 0: + stats = tracker.get_stats() + logger.info( + 'Fetcher %d: %d items, %.1f MB, %.2f MB/s, %.2f req/s', + fetcher_id, + counter, + stats['total_bytes'] / (1024 * 1024), + stats['mb_per_sec'], + stats['requests_per_sec'], + ) + + await item_queue.put(RangePayload(job=job, data=data)) + except Exception: + logger.exception( + 'Fetcher %d failed on %s/%s [%d,%d]', + fetcher_id, + getattr(job, 'bucket', '?'), + getattr(job, 'key', '?'), + getattr(job, 'offset', -1), + getattr(job, 'length', -1), + ) + finally: + key_queue.task_done() + + +def generate_warc_filename( + dest_prefix: str, + consumer_id: int, + sequence: int, + writer_subprefix: Optional[str] = None, + gzip: bool = False, +) -> str: + file_name = dest_prefix + '-' + if writer_subprefix is not None: + file_name += writer_subprefix + '-' + file_name += '{:06d}-{:03d}'.format(consumer_id, sequence) + '.extracted.warc' + if gzip: + file_name += '.gz' + + return file_name + + +async def create_new_writer_with_header( + s3, + consumer_id: int, + sequence: int, + dest_bucket: str, + dest_prefix: str, + max_attempts: int, + base_backoff_seconds: float, + min_part_size: int, + writer_info: Dict, + warc_version: str = '1.0', + writer_subprefix: Optional[str] = None, + gzip: bool = False, + content_type: Optional[str] = None, +): + filename = generate_warc_filename( + dest_prefix=dest_prefix, + consumer_id=consumer_id, + sequence=sequence, + writer_subprefix=writer_subprefix, + gzip=gzip, + ) + + new_writer = ShardWriter( + filename, + dest_bucket, + content_type, + min_part_size, + max_attempts, + base_backoff_seconds, + ) + + # Initialize writer + await new_writer.start(s3) + + # Write WARC header + buffer = BytesIO() + warc_writer = WARCWriter(buffer, gzip=gzip, warc_version=warc_version) + warcinfo = warc_writer.create_warcinfo_record(filename, writer_info) + warc_writer.write_record(warcinfo) + header_data = buffer.getvalue() + await new_writer.write(s3, header_data) + + return new_writer, len(header_data) + + +async def write_warc( + consumer_id: int, + item_queue: asyncio.Queue, + s3, + max_attempts: int, + base_backoff_seconds: float, + prefix_path: str, + writer_info: Dict, + writer_subprefix: Optional[str] = None, + write_paths_as_resource_records: Optional[List[str]] = None, + write_paths_as_resource_records_metadata: Optional[List[str]] = None, + writer_kwargs: Optional[Dict] = None, + warc_version: str = '1.0', + log_every_n: int = 1000, + gzip: bool = False, + content_type=None, + min_part_size: int = 5 * 1024 * 1024, # 5 MiB (for upload) + max_file_size: Optional[int] = 1 * 1024 * 1024 * 1024, # 1 GiB (for WARC outputs) +): + """Stage 3: Write WARC. Each consumer owns ONE shard MPU and appends ranges to it.""" + + dest_bucket, dest_prefix = parse_s3_uri(prefix_path) + + # File rotation tracking + current_file_sequence = 1 + current_file_size = 0 + + # Initialize first writer with header + writer, header_size = await create_new_writer_with_header( + s3, + consumer_id=consumer_id, + sequence=current_file_sequence, + dest_bucket=dest_bucket, + dest_prefix=dest_prefix, + max_attempts=max_attempts, + base_backoff_seconds=base_backoff_seconds, + writer_info=writer_info, + warc_version=warc_version, + writer_subprefix=writer_subprefix, + gzip=gzip, + content_type=content_type, + min_part_size=min_part_size, + ) + current_file_size = header_size + + tracker = ThroughputTracker() + tracker.start() + counter = 0 + + # Write WARC resource records + if write_paths_as_resource_records: + logger.info(f'Writing {len(write_paths_as_resource_records)} resource records to WARC ... ') + + # Resource records are written at the beginning the WARC file. + for i, resource_record_path in enumerate(write_paths_as_resource_records): + logger.info(f'Writing resource record from {resource_record_path} ...') + resource_record = get_resource_record_from_path( + file_path=resource_record_path, + metadata_path=( + write_paths_as_resource_records_metadata[i] if write_paths_as_resource_records_metadata else None + ), + ) + record_data = get_bytes_from_warc_record(resource_record, warc_version=warc_version, gzip=gzip) + + await writer.write(s3, record_data) + + # Keep track but do not rotate resource records + current_file_size += len(record_data) + + logger.info(f'Resource records added: {len(write_paths_as_resource_records)}') + + try: + while True: + item = await item_queue.get() + counter += 1 + try: + if item is _STOP: + stats = tracker.get_stats() + logger.info( + 'Consumer %d stopping. Stats: %.1fs, %d items, %.1f MB written, %.2f MB/s write speed', + consumer_id, + stats['elapsed'], + stats['total_requests'], + stats['total_bytes'] / (1024 * 1024), + stats['mb_per_sec'], + ) + should_stop = True + else: + should_stop = False + assert isinstance(item, RangePayload) + + # Check if we need to rotate files due to size limit + if max_file_size and current_file_size + len(item.data) > max_file_size: + await writer.close(s3) + current_file_sequence += 1 + + writer, header_size = await create_new_writer_with_header( + s3, + consumer_id=consumer_id, + sequence=current_file_sequence, + dest_bucket=dest_bucket, + dest_prefix=dest_prefix, + max_attempts=max_attempts, + base_backoff_seconds=base_backoff_seconds, + writer_info=writer_info, + warc_version=warc_version, + writer_subprefix=writer_subprefix, + gzip=gzip, + content_type=content_type, + min_part_size=min_part_size, + ) + + current_file_size = header_size + logger.info(f'Rotated to new WARC file sequence {current_file_sequence} due to size limit') + + await writer.write(s3, item.data) + current_file_size += len(item.data) + tracker.add_bytes(len(item.data)) + + # Log progress every 10 items + if counter % log_every_n == 0: + stats = tracker.get_stats() + logger.info( + 'Consumer %d: %d items, %.1f MB written, %.2f MB/s', + consumer_id, + counter, + stats['total_bytes'] / (1024 * 1024), + stats['mb_per_sec'], + ) + except Exception: + logger.exception('Consumer %d failed on %s', consumer_id, getattr(item, 'job', None)) + should_stop = False + finally: + item_queue.task_done() + + if should_stop: + break + finally: + await writer.close(s3) + + return {'consumer_id': consumer_id, 'stats': tracker.get_stats()} diff --git a/cdx_toolkit/warcer_by_cdx/aioboto3_writer.py b/cdx_toolkit/warcer_by_cdx/aioboto3_writer.py new file mode 100644 index 0000000..f262a88 --- /dev/null +++ b/cdx_toolkit/warcer_by_cdx/aioboto3_writer.py @@ -0,0 +1,101 @@ +import logging +from typing import List, Dict, Optional + +from cdx_toolkit.warcer_by_cdx.aioboto3_utils import ( + mpu_abort, + mpu_complete, + mpu_create, + mpu_upload_part, +) + +logger = logging.getLogger(__name__) + + +class ShardWriter: + """Manages one MPU: buffers bytes, uploads >=5 MiB parts, completes on close.""" + + def __init__( + self, + shard_key: str, + dest_bucket: str, + content_type: Optional[str], + min_part_size: int, + max_attempts: int, + base_backoff_seconds: float, + ): + self.shard_key = shard_key + self.dest_bucket = dest_bucket + self.content_type = content_type + self.min_part_size = min_part_size + self.max_attempts = max_attempts + self.base_backoff_seconds = base_backoff_seconds + self.upload_id: Optional[str] = None + self.part_number = 1 + self.parts: List[Dict] = [] + self.buffer = bytearray() + + async def start(self, s3): + self.upload_id = await mpu_create( + s3, + self.dest_bucket, + self.shard_key, + content_type=self.content_type, + max_attempts=self.max_attempts, + base_backoff_seconds=self.base_backoff_seconds, + ) + logger.info('Started MPU for %s (UploadId=%s)', self.shard_key, self.upload_id) + + async def _flush_full_parts(self, s3): + while len(self.buffer) >= self.min_part_size: + chunk = self.buffer[: self.min_part_size] + del self.buffer[: self.min_part_size] + etag = await mpu_upload_part( + s3, + self.dest_bucket, + self.shard_key, + self.upload_id, + self.part_number, + bytes(chunk), + self.max_attempts, + self.base_backoff_seconds, + ) + self.parts.append({'PartNumber': self.part_number, 'ETag': etag}) + self.part_number += 1 + + async def write(self, s3, data: bytes): + self.buffer.extend(data) + await self._flush_full_parts(s3) + + async def close(self, s3): + try: + if self.buffer: + etag = await mpu_upload_part( + s3, + self.dest_bucket, + self.shard_key, + self.upload_id, + self.part_number, + bytes(self.buffer), + self.max_attempts, + self.base_backoff_seconds, + ) + self.parts.append({'PartNumber': self.part_number, 'ETag': etag}) + self.part_number += 1 + self.buffer.clear() + + if self.parts: + await mpu_complete( + s3, + self.dest_bucket, + self.shard_key, + self.upload_id, + self.parts, + self.max_attempts, + self.base_backoff_seconds, + ) + logger.info('Completed MPU for %s with %d parts.', self.shard_key, len(self.parts)) + except Exception: + logger.exception('Completing MPU failed for %s; attempting abort.', self.shard_key) + if self.upload_id: + await mpu_abort(s3, self.dest_bucket, self.shard_key, self.upload_id) + raise diff --git a/cdx_toolkit/warcer_by_cdx/args.py b/cdx_toolkit/warcer_by_cdx/args.py new file mode 100644 index 0000000..c54496d --- /dev/null +++ b/cdx_toolkit/warcer_by_cdx/args.py @@ -0,0 +1,57 @@ +import logging +import argparse + + +logger = logging.getLogger(__name__) + + +def add_warcer_by_cdx_args(parser: argparse.ArgumentParser): + parser.add_argument('index_path', help='Path to CDX index file (local or remote, e.g. S3)') + parser.add_argument( + '--index-glob', + type=str, + default=None, + help='a glob pattern for read from multiple indices', + ) + parser.add_argument('--prefix', default='TEST', help='prefix for the warc filename') + parser.add_argument( + '--subprefix', + type=str, + default=None, + help='subprefix for the warc filename, default None', + ) + parser.add_argument( + '--size', + type=int, + default=1000000000, + help='target for the warc filesize in bytes', + ) + parser.add_argument( + '--creator', + action='store', + help='creator of the warc: person, organization, service', + ) + parser.add_argument('--operator', action='store', help='a person, if the creator is an organization') + parser.add_argument( + '--warc-download-prefix', + action='store', + help='prefix for downloading content, automatically set for CC', + ) + parser.add_argument( + '--write-paths-as-resource-records', # --write-index-as-record + nargs="*", + help='Paths to multiple files. File content is written to as a resource record to each the WARC file', + ) + parser.add_argument( + '--write-paths-as-resource-records-metadata', + nargs="*", + help='Paths to multiple metadata files (JSON) for resource records from `--write-paths-as-resource-records`', + ) + parser.add_argument( + '--parallel', + type=int, + default=1, + help='Number of parallel workers for fetchin WARC records (default: 1, sequential processing)', + ) + parser.add_argument('--implementation', type=str, default='fsspec', help='implementation (fsspec, aioboto3)') + return parser diff --git a/cdx_toolkit/warcer_by_cdx/cdx_utils.py b/cdx_toolkit/warcer_by_cdx/cdx_utils.py new file mode 100644 index 0000000..0f2bc30 --- /dev/null +++ b/cdx_toolkit/warcer_by_cdx/cdx_utils.py @@ -0,0 +1,88 @@ +import json +from pathlib import Path + +from io import BytesIO +from typing import Iterable, Optional, Tuple, Union + +import fsspec +import logging + +from warcio import WARCWriter +from warcio.recordloader import ArcWarcRecord + + +logger = logging.getLogger(__name__) + + +def get_index_as_string_from_path( + index_path: Union[str, Path], + index_fs: Optional[fsspec.AbstractFileSystem] = None + ) -> str: + """Fetch (and decompress) index content as string from local or remote path.""" + logger.info('Fetching index from %s ...', index_path) + if index_fs is None: + index_fs, index_fs_path = fsspec.url_to_fs(index_path) + else: + index_fs_path = index_path + + compression = 'gzip' if index_fs_path.endswith('.gz') else None + + with index_fs.open(index_fs_path, 'rt', compression=compression) as f: + return f.read() + + +def get_index_record(index: str, index_path: str, encoding: str = 'utf-8') -> ArcWarcRecord: + """Build WARC resource record for index.""" + return WARCWriter(None).create_warc_record( + uri=index_path, # TODO this could be a local / internal path + record_type='resource', + payload=BytesIO(index.encode(encoding)), + http_headers=None, + warc_content_type='application/cdx', + warc_headers_dict=None, # TODO should we add some other metadata headers? + ) + + +def read_cdx_line(line: str, warc_download_prefix: str) -> Tuple[str, int, int]: + cols = line.split(' ', maxsplit=2) + + if len(cols) == 3: + # TODO can there be a different format? + # surt, timestamp, json_data = cols + # + # CC seems to not follow the specification from https://iipc.github.io/warc-specifications/specifications/cdx-format/cdx-2015/ + # > The default first line of a CDX file is: + # > CDX A b e a m s c k r V v D d g M n + data = json.loads(cols[2]) + data['timestamp'] = cols[1] + else: + raise ValueError(f'Cannot parse line: {line}') + + filename = data['filename'] + offset = int(data['offset']) + length = int(data['length']) + + warc_url = warc_download_prefix + '/' + filename + + return (warc_url, offset, length) + + +def iter_cdx_index_from_path(index_path: str, warc_download_prefix: str) -> Iterable[Tuple[str, int, int]]: + """ + Iterate CDX records from a file path (gzipped; local or remote). + """ + # if not s3_path.startswith("s3://"): + # raise ValueError(f"Invalid S3 path: {s3_path}") + + logger.info('Reading CDX from %s', index_path) + + with fsspec.open(index_path, 'rt', compression='gzip' if index_path.endswith('.gz') else None) as f: + for line in f: + try: + yield read_cdx_line(line, warc_download_prefix) + except Exception: + # Skip malformed lines + logger.error('Invalid CDX line: %s', line) + continue + + logger.info(f'CDX completed from {index_path}') diff --git a/cdx_toolkit/warcer_by_cdx/fsspec_warcer.py b/cdx_toolkit/warcer_by_cdx/fsspec_warcer.py new file mode 100644 index 0000000..7e0fefe --- /dev/null +++ b/cdx_toolkit/warcer_by_cdx/fsspec_warcer.py @@ -0,0 +1,179 @@ +import json +import logging +from typing import Dict, Iterable, List, Optional + +import cdx_toolkit +from concurrent.futures import ThreadPoolExecutor, as_completed + +from warcio.recordloader import ArcWarcRecord + +from cdx_toolkit.warcer_by_cdx.cdx_utils import get_index_as_string_from_path +from cdx_toolkit.warcer_by_cdx.warc_utils import get_resource_record_from_path + + +logger = logging.getLogger(__name__) + + +def filter_warc_by_cdx_via_fsspec( + index_paths: List[str], + prefix_path: str, + writer_info: Dict, + writer_subprefix: Optional[str] = None, + write_paths_as_resource_records: Optional[List[str]] = None, + write_paths_as_resource_records_metadata: Optional[List[str]] = None, + limit: int = 0, + log_every_n: int = 1000, + warc_download_prefix: Optional[str] = None, + n_parallel: int = 1, + writer_kwargs: Optional[Dict] = None, +) -> int: + writer = cdx_toolkit.warc.get_writer( + prefix_path, + writer_subprefix, + writer_info, + **(writer_kwargs if writer_kwargs else {}), + ) + + # Iterate over index files + records_n = 0 + for index_path in index_paths: + logger.info('Filtering WARC based on CDX from %s', index_path) + + # Read index completely (for the WARC resource record) + index = get_index_as_string_from_path(index_path) + + if not index: + # skip empty indicies + continue + + # Write file content from paths as resource records to WARC + if write_paths_as_resource_records: + logger.info('Writing resource records to WARC ... ') + + # Resource records are written at the beginning the WARC file. + for i, resource_record_path in enumerate(write_paths_as_resource_records): + logger.info(f'Writing resource record from {resource_record_path} ...') + resource_record = get_resource_record_from_path( + file_path=resource_record_path, + metadata_path=( + write_paths_as_resource_records_metadata[i] + if write_paths_as_resource_records_metadata + else None + ), + ) + writer.write_record(resource_record) + + logger.info(f'Resource records added: {len(write_paths_as_resource_records)}') + + # The index file holds all the information to download specific objects (file, offset, length etc.) + index_lines = index.splitlines() + index_limit = limit - records_n + + if index_limit > 0: + index_lines = index_lines[:index_limit] + + records_gen = fetch_records_from_index( + index_lines=index_lines, warc_download_prefix=warc_download_prefix, n_parallel=n_parallel + ) + # records_gen = tqdm(fetch_records_from_index( + # index_lines=index_lines, warc_download_prefix=cdx.warc_download_prefix, n_parallel=n_parallel + # ), desc="Fetch and write WARC", total=len(index_lines)) + + for record in records_gen: + writer.write_record(record) + records_n += 1 + + if (records_n % log_every_n) == 0: + logger.info(f'Record progress: {records_n:,} from {index_path}') + + if limit > 0 and records_n >= limit: + # stop index loop + logger.info('Limit reached') + break + + logger.info('Filtering completed (index file: %s)', index_path) + + writer.close() + + return records_n + + +def fetch_single_record(obj): + """Fetch a single WARC record with error handling.""" + url = obj['url'] + timestamp = obj['timestamp'] + + try: + record = obj.fetch_warc_record() + if obj.is_revisit(): + logger.warning('revisit record being resolved for url %s %s', url, timestamp) + return record + except RuntimeError: # pragma: no cover + logger.warning('skipping capture for RuntimeError 404: %s %s', url, timestamp) + return None + + +def fetch_records_from_index( + index_lines: List[str], warc_download_prefix=None, limit: int = 0, n_parallel: int = 1 +) -> Iterable[ArcWarcRecord]: + """Fetch WARC records based on CDX index.""" + + if n_parallel <= 1: + # Sequential processing + for obj in generate_caputure_objects_from_index( + index_lines=index_lines, + warc_download_prefix=warc_download_prefix, + limit=limit, + ): + record = fetch_single_record(obj) + if record is not None: + yield record + else: + # Parallel processing + logger.info(f'Fetch records in parallel with {n_parallel=}') + objects = list( + generate_caputure_objects_from_index( + index_lines=index_lines, + warc_download_prefix=warc_download_prefix, + limit=limit, + ) + ) + + with ThreadPoolExecutor(max_workers=n_parallel) as executor: + # Submit all tasks + future_to_obj = {executor.submit(fetch_single_record, obj): obj for obj in objects} + + # Yield results as they complete + for future in as_completed(future_to_obj): + record = future.result() + if record is not None: + yield record + + +def generate_caputure_objects_from_index( + index_lines: List[str], warc_download_prefix=None, limit: int = 0, progress_bar: bool = False +) -> Iterable[cdx_toolkit.CaptureObject]: + """Read CDX index and generate CaptureObject objects.""" + + if limit > 0: + index_lines = index_lines[:limit] + + # if progress_bar: + # index_lines = tqdm(index_lines, desc="Extracting from WARC", total=len(index_lines)) + + for i, line in enumerate(index_lines, 1): + cols = line.split(' ', maxsplit=2) + + if len(cols) == 3: + # TODO can there be a different format? + # surt, timestamp, json_data = cols + # + # CC seems to not follow the specification from https://iipc.github.io/warc-specifications/specifications/cdx-format/cdx-2015/ + # > The default first line of a CDX file is: + # > CDX A b e a m s c k r V v D d g M n + data = json.loads(cols[2]) + data['timestamp'] = cols[1] + else: + raise ValueError(f'Cannot parse line: {line}') + + yield cdx_toolkit.CaptureObject(data=data, wb=None, warc_download_prefix=warc_download_prefix) diff --git a/cdx_toolkit/warcer_by_cdx/warc_utils.py b/cdx_toolkit/warcer_by_cdx/warc_utils.py new file mode 100644 index 0000000..0eb5b71 --- /dev/null +++ b/cdx_toolkit/warcer_by_cdx/warc_utils.py @@ -0,0 +1,78 @@ +from io import BytesIO +import json +from pathlib import Path +import fsspec +from warcio.recordloader import ArcWarcRecord +from warcio import WARCWriter + +from typing import Optional, Union + +import mimetypes + +def get_bytes_from_warc_record( + record, + warc_version: str = '1.0', + gzip: bool = False, + ): + buffer = BytesIO() + warc_writer = WARCWriter(buffer, gzip=gzip, warc_version=warc_version) + warc_writer.write_record(record) + + return buffer.getvalue() + +def get_resource_record_from_path( + file_path: Union[str, Path], + metadata_path: Optional[Union[str, Path]] = None, + ) -> ArcWarcRecord: + """Build WARC resource record for file path and metdata path. + + The metadata file must be a valid JSON and can have the following fields: + - warc_content_type + - uri + - http_headers + - warc_headers_dict + + If uri is not provided as metadata, the file_path is used. + If warc_content_type is not provided as metadata, the type is guessed. + """ + # Cast to string + file_path = str(file_path) + + with fsspec.open(file_path, "rb") as f: + file_bytes = BytesIO(f.read()) + + if metadata_path: + # Load metadata from path + metadata_path = str(metadata_path) + + if not metadata_path.endswith(".json"): + raise ValueError("Metadata must be provided JSON (file path ends with *.json)") + + with fsspec.open(metadata_path) as f: + metadata = json.load(f) + + warc_content_type = metadata.get("warc_content_type", None) + uri = metadata.get("uri", None) + http_headers = metadata.get("http_headers", None) + warc_headers_dict = metadata.get("warc_headers_dict", None) + else: + # Without metdata + warc_content_type = None + uri = None + http_headers = None + warc_headers_dict = None + + if warc_content_type is None: + warc_content_type = mimetypes.guess_type(file_path)[0] + + if uri is None: + uri = file_path + + return WARCWriter(None).create_warc_record( + uri=uri, + record_type='resource', + payload=file_bytes, + http_headers=http_headers, + warc_content_type=warc_content_type, + warc_headers_dict=warc_headers_dict, + ) \ No newline at end of file diff --git a/examples/iter-and-warc.py b/examples/iter-and-warc.py index 73ea3dd..b346d3f 100755 --- a/examples/iter-and-warc.py +++ b/examples/iter-and-warc.py @@ -32,3 +32,5 @@ writer.write_record(record) print(' wrote', url) + +writer.close() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 2d0357f..e6d8a91 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,12 +3,16 @@ requests==2.25.1 warcio==1.7.4 +fsspec[s3] +surt>=0.3.1 +tqdm>=4.67.1 # used by Makefile pytest==6.2.4 pytest-cov==2.12.1 pytest-sugar==0.9.4 coveralls==3.1.0 +botocore>=1.39.11 # packaging twine==3.4.1 diff --git a/scripts/cdx_iter b/scripts/cdx_iter index 8b0c5a3..99445c0 100644 --- a/scripts/cdx_iter +++ b/scripts/cdx_iter @@ -143,6 +143,8 @@ elif args.warc: if obj.is_revisit(): LOGGER.warning('revisit record being resolved for url %s %s', url, timestamp) writer.write_record(record) + + writer.close() else: for obj in cdx.iter(args.url, **kwargs): printme = winnow_fields(obj) diff --git a/setup.py b/setup.py index bceaefb..7d5c205 100755 --- a/setup.py +++ b/setup.py @@ -2,17 +2,15 @@ from os import path -from setuptools import setup +from setuptools import setup, find_packages -packages = [ - 'cdx_toolkit', -] +packages = find_packages(include=['cdx_toolkit*']) # remember: keep requires synchronized with requirements.txt -requires = ['requests', 'warcio'] +requires = ['requests', 'warcio', 'fsspec[s3]', 'aioboto3', 'surt', 'tqdm'] -test_requirements = ['pytest', 'pytest-cov'] +test_requirements = ['pytest', 'pytest-cov', 'boto3'] package_requirements = ['twine', 'setuptools', 'setuptools-scm'] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..b662111 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,25 @@ +from pathlib import Path +import pytest +import boto3 +from botocore.exceptions import NoCredentialsError, ClientError + +TEST_DATA_PATH = Path(__file__).parent / "data" + + +def check_aws_s3_access(): + """Check if AWS S3 access is available.""" + try: + s3_client = boto3.client('s3') + # Try to list buckets as a simple check + s3_client.list_buckets() + return True + except (NoCredentialsError, ClientError): + return False + + +def requires_aws_s3(func): + """Pytest decorator that skips test if AWS S3 access is not available.""" + return pytest.mark.skipif( + not check_aws_s3_access(), + reason="AWS S3 access not available (no credentials or permissions)" + )(func) \ No newline at end of file diff --git a/tests/data/filter_cdx/whitelist_10_surts.txt b/tests/data/filter_cdx/whitelist_10_surts.txt new file mode 100644 index 0000000..0754ed2 --- /dev/null +++ b/tests/data/filter_cdx/whitelist_10_surts.txt @@ -0,0 +1,10 @@ +com,example)/ +edu,si)/ +com,youtube)/ +gov,archives)/ +gov,census)/ +com,741,onlinedegrees)/online_university_degree_program.html +com,72pines,star)/2007/06/25/%e6%8f%90%e5%8f%96%e5%85%ac%e7%a7%af%e9%87%91/trackback +fr,missiondefrance,bibliotheque)/ +fr,mnhn,biodiv)/fr/taxonomy +fr,mobilierpourchr,wip)/produit/t-837 diff --git a/tests/data/filter_cdx/whitelist_10_urls.txt b/tests/data/filter_cdx/whitelist_10_urls.txt new file mode 100644 index 0000000..475f73d --- /dev/null +++ b/tests/data/filter_cdx/whitelist_10_urls.txt @@ -0,0 +1,10 @@ +example.com +si.edu +youtube.com +archive.gov +census.gov +onlinedegrees.741.com/online_university_degree_program.html +star.72pines.com/2007/06/25/%e6%8f%90%e5%8f%96%e5%85%ac%e7%a7%af%e9%87%91/trackback +bibliotheque.missiondefrance.fr +biodiv.mnhn.fr/fr/taxonomy +wip.mobilierpourchr.fr/produit/t-837 diff --git a/tests/data/filter_cdx/whitelist_11_surts.txt b/tests/data/filter_cdx/whitelist_11_surts.txt new file mode 100644 index 0000000..a2ee272 --- /dev/null +++ b/tests/data/filter_cdx/whitelist_11_surts.txt @@ -0,0 +1,11 @@ +com,example)/ +edu,si)/ +com,youtube)/ +gov,archives)/ +gov,census)/ +com,741,onlinedegrees)/online_university_degree_program.html +com,72pines,star)/2007/06/25/%e6%8f%90%e5%8f%96%e5%85%ac%e7%a7%af%e9%87%91/trackback +fr,missiondefrance,bibliotheque)/ +fr,mnhn,biodiv)/fr/taxonomy +fr,mobilierpourchr,wip)/produit/t-837 +fr,tie-up)/ \ No newline at end of file diff --git a/tests/data/warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.gz b/tests/data/warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.gz new file mode 100644 index 0000000..47941ed Binary files /dev/null and b/tests/data/warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.gz differ diff --git a/tests/data/warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.metadata.json b/tests/data/warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.metadata.json new file mode 100644 index 0000000..6a4103f --- /dev/null +++ b/tests/data/warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.metadata.json @@ -0,0 +1,4 @@ +{ + "uri": "filter_cdx.cdx.gz", + "warc_content_type": "application/cdx" +} \ No newline at end of file diff --git a/tests/unit/test_warc.py b/tests/unit/test_warc.py index e5df43f..e2474ff 100644 --- a/tests/unit/test_warc.py +++ b/tests/unit/test_warc.py @@ -1,6 +1,5 @@ import cdx_toolkit.warc - def test_wb_redir_to_original(): location = 'https://web.archive.org/web/20110209062054id_/http://commoncrawl.org/' ret = 'http://commoncrawl.org/' diff --git a/tests/warc_by_cdx/__init__.py b/tests/warc_by_cdx/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/warc_by_cdx/test_cdx_utils.py b/tests/warc_by_cdx/test_cdx_utils.py new file mode 100644 index 0000000..414a867 --- /dev/null +++ b/tests/warc_by_cdx/test_cdx_utils.py @@ -0,0 +1,28 @@ +import fsspec +import pytest +from cdx_toolkit.warcer_by_cdx.cdx_utils import get_index_as_string_from_path, read_cdx_line +from tests.conftest import TEST_DATA_PATH + + +def test_get_index_as_string_from_path(): + cdx_path = TEST_DATA_PATH / "warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.gz" + + index = get_index_as_string_from_path(cdx_path) + + assert len(index) == 568010 + + +def test_get_index_as_string_from_path_with_fs(): + fs, cdx_path = fsspec.url_to_fs(TEST_DATA_PATH / "warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.gz") + + index = get_index_as_string_from_path(cdx_path, fs) + + assert len(index) == 568010 + +get_index_as_string_from_path + +def test_read_cdx_line_error(): + with pytest.raises(ValueError) as ec_info: + read_cdx_line("this_is_a_bad_CDX-line", warc_download_prefix="http://") + + assert ec_info.match("Cannot parse line") \ No newline at end of file diff --git a/tests/warc_by_cdx/test_filter_cdx.py b/tests/warc_by_cdx/test_filter_cdx.py new file mode 100644 index 0000000..8eb8980 --- /dev/null +++ b/tests/warc_by_cdx/test_filter_cdx.py @@ -0,0 +1,164 @@ +import pytest + +from cdx_toolkit.cli import main +from cdx_toolkit.filter_cdx import resolve_paths, validate_resolved_paths +from tests.conftest import requires_aws_s3, TEST_DATA_PATH + +fixture_path = TEST_DATA_PATH / 'filter_cdx' + + +@requires_aws_s3 +def test_cli_filter_cdx_with_surts(tmpdir, caplog): + # check if expected number is reached + index_path = 's3://commoncrawl/cc-index/collections' + index_glob = '/CC-MAIN-2024-30/indexes/cdx-00187.gz' + whitelist_path = fixture_path / 'whitelist_10_surts.txt' # matches on first domain and after 100k and 200k lines + + main( + args=[ + '-v', + '--limit=1140', + 'filter_cdx', + f'{index_path}', + f'{str(whitelist_path)}', + f'{tmpdir}', + '--filter-type=surt', + f'--input-glob={index_glob}' + ] + ) + + assert 'Limit reached' in caplog.text + + +@requires_aws_s3 +def test_cli_filter_cdx_with_urls(tmpdir, caplog): + # check if expected number is reached + index_path = 's3://commoncrawl/cc-index/collections' + index_glob = '/CC-MAIN-2024-30/indexes/cdx-00187.gz' + whitelist_path = fixture_path / 'whitelist_10_urls.txt' # matches on first domain and after 100k and 200k lines + + main( + args=[ + '-v', + '--limit=1140', + 'filter_cdx', + f'{index_path}', + f'{str(whitelist_path)}', + f'{tmpdir}', + '--filter-type=url', + f'--input-glob={index_glob}' + ] + ) + + assert 'Limit reached' in caplog.text + + +@requires_aws_s3 +def test_resolve_cdx_paths_from_cc_s3_to_local(tmpdir): + tmpdir = str(tmpdir) + base_path = 's3://commoncrawl/cc-index/collections' + glob_pattern = '/CC-MAIN-2016-30/indexes/*.gz' + + input_files, output_files = resolve_paths(base_path, glob_pattern, output_base_path=tmpdir) + + assert len(input_files) == len(output_files), 'Input and output count must be the same' + assert len(input_files) == 300, 'Invalid input count' + assert input_files[0] == base_path + '/CC-MAIN-2016-30/indexes/cdx-00000.gz', 'Invalid input file' + assert output_files[0] == tmpdir + '/CC-MAIN-2016-30/indexes/cdx-00000.gz', 'Invalid output file' + assert input_files[-1] == base_path + '/CC-MAIN-2016-30/indexes/cdx-00299.gz' + + +@requires_aws_s3 +def test_resolve_cdx_paths_from_cc_s3_to_another_s3(): + output_base_path = 's3://some-other-bucket/filter-cdx' + base_path = 's3://commoncrawl/cc-index/collections' + glob_pattern = '/CC-MAIN-2016-30/indexes/cdx-000*.gz' + + input_files, output_files = resolve_paths(base_path, glob_pattern, output_base_path=output_base_path) + + assert len(input_files) == len(output_files), 'Input and output count must be the same' + assert len(input_files) == 100, 'Invalid input count' + assert input_files[0] == base_path + '/CC-MAIN-2016-30/indexes/cdx-00000.gz', 'Invalid input file' + assert output_files[0] == output_base_path + '/CC-MAIN-2016-30/indexes/cdx-00000.gz', 'Invalid output file' + assert input_files[-1] == base_path + '/CC-MAIN-2016-30/indexes/cdx-00099.gz' + + +@requires_aws_s3 +def test_filter_cdx_nonexistent_surt_file_exits(tmpdir, caplog): + index_path = 's3://commoncrawl/cc-index/collections' + index_glob = '/CC-MAIN-2024-30/indexes/cdx-00187.gz' + nonexistent_surt_file = str(tmpdir / 'nonexistent_surts.txt') + + # Test that the command exits when SURT file doesn't exist + with pytest.raises(SystemExit) as exc_info: + main( + args=[ + '-v', + '--limit=1140', + 'filter_cdx', + f'{index_path}', + f'{nonexistent_surt_file}', + f'{tmpdir}', + f'--input-glob={index_glob}' + ] + ) + + assert exc_info.value.code == 1 + assert f'Filter file not found: {nonexistent_surt_file}' in caplog.text + + +def test_resolve_paths_no_files_found_exits(tmpdir, caplog): + # Test that resolve_paths exits when no files match the glob pattern + with pytest.raises(SystemExit) as exc_info: + resolve_paths(input_base_path=str(tmpdir), input_glob='/nonexistent-pattern-*.gz', output_base_path=str(tmpdir)) + + assert exc_info.value.code == 1 + assert 'No files found matching glob pattern:' in caplog.text + + +def test_validate_resolved_paths_existing_file_exits(tmpdir, caplog): + # Create an existing output file + existing_file = tmpdir / 'existing_output.txt' + existing_file.write_text('existing content', encoding='utf-8') + + output_paths = [str(existing_file)] + + # Test that validate_resolved_paths exits when output file exists and overwrite=False + with pytest.raises(SystemExit) as exc_info: + validate_resolved_paths(output_paths, overwrite=False) + + assert exc_info.value.code == 1 + assert f'Output file already exists: {str(existing_file)}' in caplog.text + assert 'Use --overwrite to overwrite existing files' in caplog.text + + +@requires_aws_s3 +def test_cli_filter_cdx_with_parallel_processing(tmpdir, caplog): + """Test that parallel processing works correctly and processes multiple files.""" + index_path = 's3://commoncrawl/cc-index/collections' + index_glob = '/CC-MAIN-2024-30/indexes/cdx-0018[78].gz' # Multiple files pattern + whitelist_path = fixture_path / 'whitelist_11_surts.txt' # Additonal entry for cdx-00188.gz + + # Run with parallel processing (2 workers) + main( + args=[ + '-v', + '--limit=10', + 'filter_cdx', + f'{index_path}', + f'{str(whitelist_path)}', + f'{tmpdir}', + '--filter-type=surt', + f'--input-glob={index_glob}', + '--parallel=2' + ] + ) + + # Check that multiple files were processed in parallel + assert 'Found' in caplog.text and 'files matching pattern' in caplog.text + assert 'File statistics for' in caplog.text + assert 'Total statistics:' in caplog.text + + # Should have processed multiple files (pattern matches 2 files: cdx-00187.gz and cdx-00188.gz) + file_stats_count = caplog.text.count('File statistics for') + assert file_stats_count == 2, 'Should process exactly 2 files with the glob pattern' diff --git a/tests/warc_by_cdx/test_matcher.py b/tests/warc_by_cdx/test_matcher.py new file mode 100644 index 0000000..b449fc9 --- /dev/null +++ b/tests/warc_by_cdx/test_matcher.py @@ -0,0 +1,314 @@ +import pytest +from cdx_toolkit.filter_cdx.matcher import TupleMatcher, TrieMatcher + + +@pytest.mark.parametrize( + 'prefixes,test_strings,expected_results', + [ + # Basic functionality + ( + ['http://', 'https://'], + ['http://example.com', 'https://example.com', 'ftp://example.com'], + [True, True, False], + ), + # Empty prefix list + ([], ['any string', '', 'test'], [False, False, False]), + # Single character prefixes + ( + ['a', 'b', 'c'], + ['apple', 'banana', 'cherry', 'dog', ''], + [True, True, True, False, False], + ), + # Overlapping prefixes + ( + ['test', 'testing', 'te'], + ['test', 'testing', 'tea', 'other'], + [True, True, True, False], + ), + # Unicode characters + ( + ['café', 'naïve', 'résumé'], + ['café au lait', 'naïve person', 'résumé.pdf', 'regular text'], + [True, True, True, False], + ), + # Special characters + ( + ['[test]', '.*', '\\n'], + ['[test] case', '.*regex', '\\newline', 'normal'], + [True, True, True, False], + ), + # Case sensitivity + ( + ['HTTP', 'Https'], + ['HTTP://example.com', 'https://example.com', 'HTTPS://EXAMPLE.COM'], + [True, False, True], + ), + # Very long prefixes + ( + ['a' * 1000], + ['a' * 1000 + 'suffix', 'a' * 999, 'b' * 1000], + [True, False, False], + ), + # Duplicate prefixes + ( + ['test', 'test', 'demo'], + ['testing', 'demo version', 'other'], + [True, True, False], + ), + # Prefixes that are substrings of each other + ( + ['ab', 'abc', 'abcd'], + ['ab', 'abc', 'abcd', 'abcde', 'a'], + [True, True, True, True, False], + ), + # Numbers and mixed content + ( + ['123', '4.56'], + ['123test', '4.56789', '789', 'test123'], + [True, True, False, False], + ), + # Whitespace handling (note: whitespace is stripped from prefixes, so " test" becomes "test") + ( + [' test', '\tindent', '\nline'], + ['test case', 'indented', 'line break', ' test case', 'nowhitespace'], + [True, True, True, False, False], + ), + ], +) +def test_matcher_approaches(prefixes, test_strings, expected_results): + """Test that TupleMatcher and TrieMatcher produce identical results.""" + tuple_matcher = TupleMatcher(prefixes) + trie_matcher = TrieMatcher(prefixes) + + for test_string, expected_result in zip(test_strings, expected_results): + tuple_result = tuple_matcher.matches(test_string) + trie_result = trie_matcher.matches(test_string) + + # Both matchers should agree with each other + assert tuple_result == trie_result, ( + f'TupleMatcher({tuple_result}) != TrieMatcher({trie_result}) ' + f"for prefixes {prefixes} and string '{test_string}'" + ) + + # Both should match the expected result + assert tuple_result == expected_result, ( + f"Expected {expected_result}, got {tuple_result} for prefixes {prefixes} and string '{test_string}'" + ) + + +@pytest.mark.parametrize( + 'invalid_prefixes,expected_error', + [ + # Empty string prefixes + ([''], 'Empty prefixes are not allowed'), + # Whitespace-only prefixes (should be stripped to empty and raise error) + ([' '], 'Empty prefixes are not allowed'), + (['\t\n '], 'Empty prefixes are not allowed'), + # None values + ([None], 'Prefix must be a string and not none'), + (['test', None, 'demo'], 'Prefix must be a string and not none'), + # Non-string types + ([123], 'Prefix must be a string and not none'), + (['test', 456, 'demo'], 'Prefix must be a string and not none'), + ([[], {}, set()], 'Prefix must be a string and not none'), + ], +) +def test_prefix_validation_errors(invalid_prefixes, expected_error): + """Test that invalid prefixes raise appropriate ValueErrors.""" + + with pytest.raises(ValueError, match=expected_error): + TupleMatcher(invalid_prefixes) + + with pytest.raises(ValueError, match=expected_error): + TrieMatcher(invalid_prefixes) + + +@pytest.mark.parametrize( + 'test_string,expected', + [ + ('test', True), + ('testing', True), + ('demo', True), + ('demonstration', True), + ('example', True), + ('examples', True), + (' test', False), # Leading whitespace in test string shouldn't match + ('other', False), + ], +) +def test_whitespace_stripping(test_string, expected): + """Test that whitespace is properly stripped from prefixes.""" + + # Prefixes with leading/trailing whitespace should be stripped + prefixes_with_whitespace = [' test ', '\tdemo\n', ' example '] + + tuple_matcher = TupleMatcher(prefixes_with_whitespace) + trie_matcher = TrieMatcher(prefixes_with_whitespace) + + tuple_result = tuple_matcher.matches(test_string) + trie_result = trie_matcher.matches(test_string) + + assert tuple_result == trie_result == expected, ( + f"Whitespace stripping test failed for '{test_string}': " + f'expected {expected}, got Tuple({tuple_result}), Trie({trie_result})' + ) + + +@pytest.mark.parametrize('test_string', ['anything', '', 'test', 'a', '123']) +def test_empty_prefix_list(test_string): + """Test with empty prefix list - should never match anything.""" + empty_prefixes = [] + + tuple_matcher = TupleMatcher(empty_prefixes) + trie_matcher = TrieMatcher(empty_prefixes) + + tuple_result = tuple_matcher.matches(test_string) + trie_result = trie_matcher.matches(test_string) + + # Both should return False for empty prefix list + assert not tuple_result and not trie_result, ( + f"Both matchers should return False for '{test_string}' with empty prefixes, " + f'got Tuple({tuple_result}), Trie({trie_result})' + ) + + +def test_empty_string_against_prefixes(): + """Test matching empty strings against non-empty prefixes.""" + non_empty_prefixes = ['test', 'demo', 'example'] + empty_test_string = '' + + tuple_matcher = TupleMatcher(non_empty_prefixes) + trie_matcher = TrieMatcher(non_empty_prefixes) + + tuple_result = tuple_matcher.matches(empty_test_string) + trie_result = trie_matcher.matches(empty_test_string) + + # Both should return False when testing empty string against non-empty prefixes + assert not tuple_result and not trie_result, ( + f'Both matchers should return False for empty string with non-empty prefixes, ' + f'got Tuple({tuple_result}), Trie({trie_result})' + ) + + +@pytest.mark.parametrize( + 'test_string,expected', + [ + ('a', True), + ('1', True), + ('!', True), + ('ab', True), + ('12', True), + ('!@', True), + ('other', False), + ('', False), + ], +) +def test_single_character_edge_cases(test_string, expected): + """Test single character prefixes and strings (without empty prefixes).""" + prefixes = ['a', '1', '!'] + + tuple_matcher = TupleMatcher(prefixes) + trie_matcher = TrieMatcher(prefixes) + + tuple_result = tuple_matcher.matches(test_string) + trie_result = trie_matcher.matches(test_string) + + assert tuple_result == trie_result == expected, ( + f"Mismatch for '{test_string}': Tuple({tuple_result}), Trie({trie_result}), Expected({expected})" + ) + + +def test_performance_with_many_prefixes(): + """Test with a large number of prefixes to ensure both matchers handle it.""" + # Create many prefixes + prefixes = [f'prefix_{i}' for i in range(1000)] + test_strings = ['prefix_500test', 'prefix_999', 'nomatch', 'prefix_1000'] + + tuple_matcher = TupleMatcher(prefixes) + trie_matcher = TrieMatcher(prefixes) + + for test_string in test_strings: + tuple_result = tuple_matcher.matches(test_string) + trie_result = trie_matcher.matches(test_string) + assert tuple_result == trie_result + + +@pytest.mark.parametrize( + 'test_string,expected', + [ + ('', False), + ('a', True), + ('ab', True), + ('abc', True), + ('abcd', True), + ('abcde', True), + ('abcdef', True), + ('b', False), + ('ac', True), + ], +) +def test_nested_prefixes(test_string, expected): + """Test with prefixes that are nested within each other.""" + prefixes = ['a', 'ab', 'abc', 'abcd', 'abcde'] + + tuple_matcher = TupleMatcher(prefixes) + trie_matcher = TrieMatcher(prefixes) + + tuple_result = tuple_matcher.matches(test_string) + trie_result = trie_matcher.matches(test_string) + + assert tuple_result == trie_result == expected, ( + f"Nested prefix test failed for '{test_string}': " + f'expected {expected}, got Tuple({tuple_result}), Trie({trie_result})' + ) + + +@pytest.mark.parametrize( + 'test_string,expected', + [ + ('🌟test', True), + ('café au lait', True), + ('𝓤𝓷𝓲𝓬𝓸𝓭𝓮 text', True), + ('regular', False), + ('', False), + ], +) +def test_unicode_edge_cases(test_string, expected): + """Test Unicode handling edge cases (without empty prefixes).""" + prefixes = ['🌟', 'café', '𝓤𝓷𝓲𝓬𝓸𝓭𝓮'] + + tuple_matcher = TupleMatcher(prefixes) + trie_matcher = TrieMatcher(prefixes) + + tuple_result = tuple_matcher.matches(test_string) + trie_result = trie_matcher.matches(test_string) + + assert tuple_result == trie_result == expected, ( + f"Unicode mismatch for '{test_string}': Tuple({tuple_result}), Trie({trie_result}), Expected({expected})" + ) + + +def test_with_list_and_tuple_inputs(): + """Test that both list and tuple inputs work identically.""" + prefixes_list = ['test', 'demo', 'example'] + prefixes_tuple = ('test', 'demo', 'example') + test_strings = ['testing', 'demo version', 'example.com', 'other'] + + # Test with list input + tuple_matcher_list = TupleMatcher(prefixes_list) + trie_matcher_list = TrieMatcher(prefixes_list) + + # Test with tuple input + tuple_matcher_tuple = TupleMatcher(prefixes_tuple) + trie_matcher_tuple = TrieMatcher(prefixes_tuple) + + for test_string in test_strings: + # All four matchers should give same result + results = [ + tuple_matcher_list.matches(test_string), + trie_matcher_list.matches(test_string), + tuple_matcher_tuple.matches(test_string), + trie_matcher_tuple.matches(test_string), + ] + + assert all(r == results[0] for r in results), f"Inconsistent results for '{test_string}': {results}" diff --git a/tests/warc_by_cdx/test_warc_by_cdx.py b/tests/warc_by_cdx/test_warc_by_cdx.py new file mode 100644 index 0000000..b9b1724 --- /dev/null +++ b/tests/warc_by_cdx/test_warc_by_cdx.py @@ -0,0 +1,252 @@ +import os +from typing import List, Optional + +import fsspec +from cdx_toolkit.cli import main +from cdx_toolkit.warcer_by_cdx.cdx_utils import ( + get_index_as_string_from_path, +) +from cdx_toolkit.warcer_by_cdx.fsspec_warcer import ( + generate_caputure_objects_from_index, +) +import pytest +from warcio.archiveiterator import ArchiveIterator + +from tests.conftest import requires_aws_s3, TEST_DATA_PATH + + +fixture_path = TEST_DATA_PATH / 'warc_by_cdx' + + +def assert_cli_warc_by_cdx(warc_download_prefix, base_prefix, caplog, extra_args: Optional[List[str]] = None): + # test cli and check output + index_path = fixture_path / 'filtered_CC-MAIN-2024-30_cdx-00187.gz' + resource_record_path = TEST_DATA_PATH / 'filter_cdx/whitelist_10_urls.txt' + + if extra_args is None: + extra_args = [] + + main( + args=[ + '-v', + '--cc', + '--limit=10', + 'warc_by_cdx', + str(index_path), + '--write-paths-as-resource-records', + str(resource_record_path), + f'--prefix={str(base_prefix)}/TEST_warc_by_index', + '--creator=foo', + '--operator=bob', + f'--warc-download-prefix={warc_download_prefix}', + ] + + extra_args + ) + + # Check log + assert 'Limit reached' in caplog.text + + # Validate extracted WARC + warc_filename = 'TEST_warc_by_index-000000.extracted.warc.gz' + warc_path = str(base_prefix) + '/' + warc_filename + + info_record = None + response_records = [] + + resource_record = None + resource_record_content = None + + with fsspec.open(warc_path, 'rb') as stream: + for record in ArchiveIterator(stream): + if record.rec_type == 'warcinfo': + info_record = record.content_stream().read().decode('utf-8') + + if record.rec_type == 'response': + response_records.append(record) + + if record.rec_type == 'resource': + resource_record = record + resource_record_content = record.content_stream().read().decode('utf-8') + + assert len(response_records) == 10 + + assert resource_record is not None + assert resource_record.length == 294, 'Invalid resource record' + assert resource_record_content[:10] == 'example.co', 'Invalid resource record' + assert resource_record_content[-20:-1] == 'hr.fr/produit/t-837', 'Invalid resource record' + + assert info_record is not None, 'Invalid info record' + assert 'operator: bob' in info_record, 'Invalid info record' + + +def test_cli_warc_by_cdx_over_http(tmpdir, caplog): + assert_cli_warc_by_cdx('https://data.commoncrawl.org', base_prefix=tmpdir, caplog=caplog) + + +def test_cli_warc_by_cdx_over_http_in_parallel(tmpdir, caplog): + assert_cli_warc_by_cdx( + 'https://data.commoncrawl.org', base_prefix=tmpdir, caplog=caplog, extra_args=['--parallel=3'] + ) + + +@requires_aws_s3 +def test_cli_warc_by_cdx_over_s3(tmpdir, caplog): + assert_cli_warc_by_cdx('s3://commoncrawl', base_prefix=tmpdir, caplog=caplog) + + +@requires_aws_s3 +def test_cli_warc_by_cdx_over_s3_to_s3(tmpdir, caplog): + assert_cli_warc_by_cdx( + 's3://commoncrawl', base_prefix='s3://commoncrawl-dev/cdx_toolkit/ci/test-outputs' + str(tmpdir), caplog=caplog + ) + + +@requires_aws_s3 +def test_cli_warc_by_cdx_over_s3_to_s3_in_parallel(tmpdir, caplog): + assert_cli_warc_by_cdx( + 's3://commoncrawl', + base_prefix='s3://commoncrawl-dev/cdx_toolkit/ci/test-outputs' + str(tmpdir), + caplog=caplog, + extra_args=['--parallel=3'], + ) + + +def test_get_caputure_objects_from_index(): + index_path = fixture_path / 'filtered_CC-MAIN-2024-30_cdx-00187.gz' + + for obj in generate_caputure_objects_from_index(get_index_as_string_from_path(index_path).splitlines()): + break + + assert obj.data['length'] == '9754' + + +def test_warc_by_cdx_no_index_files_found_exits(tmpdir, caplog): + # Test that warc_by_cdx exits when no index files match the glob pattern + with pytest.raises(SystemExit) as exc_info: + main( + args=[ + '-v', + '--cc', + '--cc-mirror=https://index.commoncrawl.org/', + 'warc_by_cdx', + f'{str(tmpdir)}', + f'--prefix={str(tmpdir)}/TEST', + '--index-glob=/nonexistent-pattern-*.gz', + ] + ) + + assert exc_info.value.code == 1 + assert 'no index files found' in caplog.text + + +def test_generate_caputure_objects_invalid_cdx_line(): + # Test invalid CDX line parsing (line with wrong number of columns) + with pytest.raises(ValueError): + list(generate_caputure_objects_from_index('invalid-format')) + + +def test_generate_caputure_objects_with_limit(): + # Test limit functionality in get_caputure_objects_from_index + index_path = fixture_path / 'filtered_CC-MAIN-2024-30_cdx-00187.gz' + index_content = get_index_as_string_from_path(index_path) + + # Count objects with limit=2 + objects = list(generate_caputure_objects_from_index(index_content.splitlines(), limit=2)) + + # Should stop after 2 objects + assert len(objects) == 2 + + +def test_warc_by_cdx_subprefix_and_metadata(tmpdir): + # Test subprefix functionality and creator/operator metadata + index_path = fixture_path / 'filtered_CC-MAIN-2024-30_cdx-00187.gz' + + main( + args=[ + '-v', + '--cc', + '--cc-mirror=https://index.commoncrawl.org/', + '--limit=1', + 'warc_by_cdx', + f'{str(index_path)}', + f'--prefix={str(tmpdir)}/TEST', + '--subprefix=SUB', + '--creator=test_creator', + '--operator=test_operator', + ] + ) + + # Check that WARC file was created with subprefix + warc_path = os.path.join(tmpdir, 'TEST-SUB-000000.extracted.warc.gz') + assert os.path.exists(warc_path) + + # Validate metadata in warcinfo record + info_record = None + with open(warc_path, 'rb') as stream: + for record in ArchiveIterator(stream): + if record.rec_type == 'warcinfo': + info_record = record.content_stream().read().decode('utf-8') + break + + assert info_record is not None + assert 'creator: test_creator' in info_record + assert 'operator: test_operator' in info_record + + +def test_warc_by_cdx_without_creator_operator(tmpdir): + # Test that creator and operator are optional (lines 44-47) + index_path = fixture_path / 'filtered_CC-MAIN-2024-30_cdx-00187.gz' + + main( + args=[ + '-v', + '--cc', + '--cc-mirror=https://index.commoncrawl.org/', + '--limit=1', + 'warc_by_cdx', + f'{str(index_path)}', + f'--prefix={str(tmpdir)}/TEST_NO_META', + ] + ) + + # Check that WARC file was created + warc_path = os.path.join(tmpdir, 'TEST_NO_META-000000.extracted.warc.gz') + assert os.path.exists(warc_path) + + # Validate that creator/operator are not in warcinfo record + info_record = None + with open(warc_path, 'rb') as stream: + for record in ArchiveIterator(stream): + if record.rec_type == 'warcinfo': + info_record = record.content_stream().read().decode('utf-8') + break + + assert info_record is not None + assert 'creator:' not in info_record + assert 'operator:' not in info_record + + +def test_resource_records_paths_mismatch(): + # Test if mismatch of number of paths for resource records and their metdata is raised. + with pytest.raises(ValueError) as exc_info: + main( + args=[ + '-v', + '--cc', + 'warc_by_cdx', + 'foo/bar', + '--write-paths-as-resource-records', + 'resource1', + 'resource2', + '--write-paths-as-resource-records-metadata', + 'metadata2', + ] + ) + assert exc_info.match('Number of paths to resource records') + + +def test_metadata_paths_without_resource_records_paths(): + # Test if error of missing resource records paths is raised. + with pytest.raises(ValueError) as exc_info: + main(args=['-v', '--cc', 'warc_by_cdx', 'foo/bar', '--write-paths-as-resource-records-metadata', 'metadata2']) + assert exc_info.match('Metadata paths are set but') diff --git a/tests/warc_by_cdx/test_warc_by_cdx_aioboto3.py b/tests/warc_by_cdx/test_warc_by_cdx_aioboto3.py new file mode 100644 index 0000000..5cceead --- /dev/null +++ b/tests/warc_by_cdx/test_warc_by_cdx_aioboto3.py @@ -0,0 +1,229 @@ +import asyncio +from io import BytesIO +from typing import List, Optional + +import aioboto3 +import fsspec +from cdx_toolkit.cli import main +from warcio.archiveiterator import ArchiveIterator + +from tests.conftest import requires_aws_s3, TEST_DATA_PATH + +from warcio import WARCWriter +from cdx_toolkit.warcer_by_cdx.aioboto3_warcer import get_range_jobs_from_index_paths, write_warc +from cdx_toolkit.warcer_by_cdx.aioboto3_utils import RangePayload, _STOP + +fixture_path = TEST_DATA_PATH / 'warc_by_cdx' + + +def assert_cli_warc_by_cdx(warc_download_prefix, base_prefix, caplog, extra_args: Optional[List[str]] = None): + # test cli and check output + index_path = fixture_path / 'filtered_CC-MAIN-2024-30_cdx-00187.gz' + resource_record_path = TEST_DATA_PATH / 'filter_cdx/whitelist_10_urls.txt' + + if extra_args is None: + extra_args = [] + + main( + args=[ + '-v', + '--cc', + '--limit=10', + 'warc_by_cdx', + str(index_path), + '--write-paths-as-resource-records', + str(resource_record_path), + f'--prefix={str(base_prefix)}/TEST_warc_by_index', + '--creator=foo', + '--operator=bob', + f'--warc-download-prefix={warc_download_prefix}', + ] + + extra_args, + ) + + # Check log + assert 'Limit reached' in caplog.text + + # Validate extracted WARC + warc_filename = 'TEST_warc_by_index-000000-001.extracted.warc.gz' + warc_path = str(base_prefix) + '/' + warc_filename + + info_record = None + response_records = [] + response_contents = [] + + resource_record = None + resource_record_content = None + + with fsspec.open(warc_path, 'rb') as stream: + for record in ArchiveIterator(stream): + if record.rec_type == 'warcinfo': + info_record = record.content_stream().read().decode('utf-8') + + if record.rec_type == 'response': + response_records.append(record) + response_contents.append(record.content_stream().read().decode('utf-8', errors='ignore')) + + if record.rec_type == 'resource': + resource_record = record + resource_record_content = record.content_stream().read().decode('utf-8') + + assert len(response_records) == 10, 'Invalid record count' + # assert resource_record is not None + # assert resource_record.length == 568010 + + assert 'Catalogue en ligne Mission de France' in response_contents[0], 'Invalid response content' + assert 'dojo/dijit/themes/tundra/tundra' in response_contents[9], 'Invalid response content' + + assert info_record is not None, 'Invalid info record' + assert 'operator: bob' in info_record, 'Invalid info record' + + assert resource_record is not None + assert resource_record.length == 294, 'Invalid resource record' + assert resource_record_content[:10] == 'example.co', 'Invalid resource record' + assert resource_record_content[-20:-1] == 'hr.fr/produit/t-837', 'Invalid resource record' + + +@requires_aws_s3 +def test_cli_warc_by_cdx_over_s3_to_s3_in_parallel_aioboto3(tmpdir, caplog): + assert_cli_warc_by_cdx( + 's3://commoncrawl', + base_prefix='s3://commoncrawl-dev/cdx_toolkit/ci/test-outputs' + str(tmpdir), + caplog=caplog, + extra_args=[ + '--parallel=3', + '--implementation=aioboto3', + ], + ) + + +def test_warc_info(): + warc_version = '1.0' + gzip = False + file_handler = BytesIO() + filename = 'foo.warc' + + info = { + 'software': 'pypi_cdx_toolkit/123', + 'isPartOf': 'bar', + 'description': 'warc extraction based on CDX generated with: xx', + 'format': 'WARC file version 1.0', + } + + writer = WARCWriter(file_handler, gzip=gzip, warc_version=warc_version) + warcinfo = writer.create_warcinfo_record(filename, info) + + writer.write_record(warcinfo) + + file_value = file_handler.getvalue().decode('utf-8') + + assert 'pypi_cdx_toolkit/123' in file_value + + +@requires_aws_s3 +def test_write_warc_with_file_rotation(tmpdir): + """Test write_warc function with file size rotation""" + + async def run_test(): + # Setup test data + index_path = fixture_path / 'filtered_CC-MAIN-2024-30_cdx-00187.gz' + warc_download_prefix = 's3://commoncrawl' + prefix_path = f's3://commoncrawl-dev/cdx_toolkit/ci/test-outputs{tmpdir}/file_rotation_test' + + # Use small file size to force rotation (100 KB) + max_file_size = 100 * 1024 # 100 KB + + # Create asyncio queues + key_queue = asyncio.Queue() + item_queue = asyncio.Queue() + + # Writer info for WARC header + writer_info = { + 'software': 'cdx_toolkit test', + 'operator': 'test', + 'creator': 'test', + 'description': 'Test WARC with file rotation', + } + + # Setup S3 client + from botocore.config import Config + + boto_cfg = Config( + region_name='us-east-1', + retries={'max_attempts': 3, 'mode': 'standard'}, + connect_timeout=10, + read_timeout=120, + ) + + session = aioboto3.Session() + + async with session.client('s3', config=boto_cfg) as s3: + # Generate range jobs from CDX file + await get_range_jobs_from_index_paths( + key_queue=key_queue, + index_paths=[str(index_path)], + warc_download_prefix=warc_download_prefix, + num_fetchers=1, + limit=10, # Use 10 records to ensure we have enough data + ) + + # Collect all range jobs + range_jobs = [] + while not key_queue.empty(): + job = await key_queue.get() + if job is not _STOP: + range_jobs.append(job) + key_queue.task_done() + + # Create mock RangePayload objects with dummy data to simulate large content + # Each payload will be ~30KB to force multiple file rotations + dummy_data = b'A' * (30 * 1024) # 30KB of dummy data + + for job in range_jobs: + payload = RangePayload(job=job, data=dummy_data) + await item_queue.put(payload) + + # Add stop signal + await item_queue.put(_STOP) + + # Run write_warc function + await write_warc( + consumer_id=0, + item_queue=item_queue, + s3=s3, + max_attempts=3, + base_backoff_seconds=0.5, + prefix_path=prefix_path, + writer_info=writer_info, + max_file_size=max_file_size, + gzip=True, + ) + + # Verify that multiple WARC files were created + dest_bucket = 'commoncrawl-dev' + dest_prefix = f'cdx_toolkit/ci/test-outputs{tmpdir}/file_rotation_test' + + # List objects to find all created WARC files + response = await s3.list_objects_v2(Bucket=dest_bucket, Prefix=dest_prefix) + + warc_files = [] + if 'Contents' in response: + for obj in response['Contents']: + if obj['Key'].endswith('.extracted.warc.gz'): + warc_files.append(obj['Key']) + + # Assert that more than one WARC file was created + assert len(warc_files) == 4, f'Expected multiple WARC files, but found {len(warc_files)}: {warc_files}' + + # Verify filename pattern includes sequence numbers + for warc_file in warc_files: + filename = warc_file.split('/')[-1] + # Should match pattern: prefix-000000-XXX.extracted.warc.gz + assert '-000000-' in filename, f"Filename doesn't contain expected sequence pattern: {filename}" + + # Clean up created files + for warc_file in warc_files: + await s3.delete_object(Bucket=dest_bucket, Key=warc_file) + + # Run the async test + asyncio.run(run_test()) diff --git a/tests/warc_by_cdx/test_warc_from_fs.py b/tests/warc_by_cdx/test_warc_from_fs.py new file mode 100644 index 0000000..ccceaf0 --- /dev/null +++ b/tests/warc_by_cdx/test_warc_from_fs.py @@ -0,0 +1,53 @@ +from tests.conftest import requires_aws_s3 +from cdx_toolkit.warc import fetch_warc_record + + +def test_fetch_warc_record_from_http(): + encoding = 'utf-8' + capture = { + 'url': 'https://bibliotheque.missiondefrance.fr/index.php?lvl=bulletin_display&id=319', + 'mime': 'text/html', + 'mime-detected': 'application/xhtml+xml', + 'status': '200', + 'digest': 'D5K3FUWDRAOMMTJC2CTWV7L2ABFIJ5BP', + 'length': '9754', + 'offset': '111440525', + 'filename': 'crawl-data/CC-MAIN-2024-30/segments/1720763514759.37/warc/CC-MAIN-20240716142214-20240716172214-00337.warc.gz', + 'charset': 'UTF-8', + 'languages': 'fra', + 'timestamp': '20240716153155', + } + warc_download_prefix = 'https://data.commoncrawl.org' + + record = fetch_warc_record(capture, warc_download_prefix) + record_content = record.content_stream().read().decode(encoding, errors='ignore') + + assert record.rec_type == 'response' + assert record.length == 75825 + assert 'Catalogue en ligne Mission de France' in record_content + + +@requires_aws_s3 +def test_fetch_warc_record_from_s3(): + encoding = 'utf-8' + capture = { + 'url': 'https://bibliotheque.missiondefrance.fr/index.php?lvl=bulletin_display&id=319', + 'mime': 'text/html', + 'mime-detected': 'application/xhtml+xml', + 'status': '200', + 'digest': 'D5K3FUWDRAOMMTJC2CTWV7L2ABFIJ5BP', + 'length': '9754', + 'offset': '111440525', + 'filename': 'crawl-data/CC-MAIN-2024-30/segments/1720763514759.37/warc/CC-MAIN-20240716142214-20240716172214-00337.warc.gz', + 'charset': 'UTF-8', + 'languages': 'fra', + 'timestamp': '20240716153155', + } + warc_download_prefix = 's3://commoncrawl' + + record = fetch_warc_record(capture, warc_download_prefix) + record_content = record.content_stream().read().decode(encoding, errors='ignore') + + assert record.rec_type == 'response' + assert record.length == 75825 + assert 'Catalogue en ligne Mission de France' in record_content diff --git a/tests/warc_by_cdx/test_warc_utils.py b/tests/warc_by_cdx/test_warc_utils.py new file mode 100644 index 0000000..f95fbe5 --- /dev/null +++ b/tests/warc_by_cdx/test_warc_utils.py @@ -0,0 +1,31 @@ +import pytest +from cdx_toolkit.warcer_by_cdx.warc_utils import get_resource_record_from_path +from tests.conftest import TEST_DATA_PATH + + +def test_get_resource_record_from_path(): + resource_path = TEST_DATA_PATH / "filter_cdx/whitelist_10_urls.txt" + record = get_resource_record_from_path(resource_path) + + assert record.content_type == "text/plain" + + record_headers = dict(record.rec_headers.headers) + assert record_headers["WARC-Target-URI"] == str(resource_path) + + +def test_get_resource_record_from_path_with_metadata(): + resource_path = TEST_DATA_PATH / "warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.gz" + metadata_path = TEST_DATA_PATH / "warc_by_cdx/filtered_CC-MAIN-2024-30_cdx-00187.metadata.json" + + record = get_resource_record_from_path(resource_path, metadata_path) + + assert record.content_type == "application/cdx" + + record_headers = dict(record.rec_headers.headers) + assert record_headers["WARC-Target-URI"] == "filter_cdx.cdx.gz" + + +def test_get_resource_record_from_path_with_invalid_metadata_path(): + with pytest.raises(ValueError): + resource_path = TEST_DATA_PATH / "filter_cdx/whitelist_10_urls.txt" + get_resource_record_from_path(resource_path, "invalid_metadata.xy") \ No newline at end of file diff --git a/tests/warc_by_cdx/test_warc_writer.py b/tests/warc_by_cdx/test_warc_writer.py new file mode 100644 index 0000000..70ca45b --- /dev/null +++ b/tests/warc_by_cdx/test_warc_writer.py @@ -0,0 +1,126 @@ +from io import BytesIO +import fsspec +import pytest +import cdx_toolkit + +from tests.conftest import requires_aws_s3 + +from warcio import WARCWriter +from warcio.archiveiterator import ArchiveIterator + + +@pytest.mark.parametrize( + 'prefix,gzip', + [ + pytest.param('test-prefix', False, id='File name prefix on local'), + pytest.param('test-prefix', True, id='File name prefix on local with gzip'), + # raised FileNotFound error (parent dir does not exist) + # pytest.param("test-prefix-folder/file-prefix", None, id="Folder as prefix"), + ], +) +def test_write_to_local(prefix, gzip, tmpdir): + info = { + 'software': 'pypi_cdx_toolkit/test', + 'description': 'test', + 'format': 'WARC file version 1.0', + } + encoding = 'utf-8' + full_prefix = str(tmpdir) + '/' + prefix + fs, fs_prefix_path = fsspec.url_to_fs(full_prefix) + + writer = cdx_toolkit.warc.get_writer(full_prefix, None, info, gzip=gzip) + + # single record + input_resource_record_text = 'foo bar text' + writer.write_record( + WARCWriter(None).create_warc_record( + uri='foo/bar', + record_type='resource', + payload=BytesIO(input_resource_record_text.encode(encoding)), + warc_content_type='text/plain', + ) + ) + writer.close() + + # Check that WARC file was created + warc_path = fs_prefix_path + '-000000.extracted.warc' + if gzip: + warc_path += '.gz' + + assert fs.exists(warc_path) + + # Validate that creator/operator are not in warcinfo record + info_record = None + resource_record = None + with open(warc_path, 'rb') as stream: + for record in ArchiveIterator(stream): + if record.rec_type == 'warcinfo': + info_record = record.content_stream().read().decode(encoding) + + if record.rec_type == 'resource': + resource_record = record.content_stream().read().decode(encoding) + break + + assert resource_record is not None + assert info_record is not None + + assert 'description: test' in info_record + assert resource_record == input_resource_record_text + + +@requires_aws_s3 +@pytest.mark.parametrize( + 'prefix', + [ + pytest.param('s3://commoncrawl-dev/cdx_toolkit/ci/test-outputs', id='S3 prefix'), + ], +) +def test_write_to_s3(prefix, tmpdir): + info = { + 'software': 'pypi_cdx_toolkit/test', + 'description': 'test', + 'format': 'WARC file version 1.0', + } + encoding = 'utf-8' + full_prefix = prefix + str(tmpdir) # append tmp dir on S3 + fs, fs_prefix_path = fsspec.url_to_fs(full_prefix) + + # remove all existing paths from S3 dir + if fs.exists(prefix): + fs.rm(prefix, recursive=True) + + writer = cdx_toolkit.warc.get_writer(full_prefix, None, info) + + # single record + input_resource_record_text = 'foo bar text' + writer.write_record( + WARCWriter(None).create_warc_record( + uri='foo/bar', + record_type='resource', + payload=BytesIO(input_resource_record_text.encode(encoding)), + warc_content_type='text/plain', + ) + ) + writer.close() + + # Check that WARC file was created + warc_path = fs_prefix_path + '-000000.extracted.warc.gz' + assert fs.exists(warc_path) + + # Validate that creator/operator are not in warcinfo record + info_record = None + resource_record = None + with fs.open(warc_path, 'rb') as stream: + for record in ArchiveIterator(stream): + if record.rec_type == 'warcinfo': + info_record = record.content_stream().read().decode(encoding) + + if record.rec_type == 'resource': + resource_record = record.content_stream().read().decode(encoding) + break + + assert resource_record is not None + assert info_record is not None + + assert 'description: test' in info_record + assert resource_record == input_resource_record_text