diff --git a/docs/beam_integration.md b/docs/beam_integration.md deleted file mode 100644 index 5314ebb..0000000 --- a/docs/beam_integration.md +++ /dev/null @@ -1,509 +0,0 @@ -# Apache Beam Integration - -ArrayRecord provides comprehensive Apache Beam integration for large-scale data processing and conversion workflows. This integration enables you to process ArrayRecord files in distributed Beam pipelines on various runners including Google Cloud Dataflow. - -## Overview - -The Beam integration provides: - -- **PTransform for writing**: `WriteToArrayRecord` for disk-based output -- **DoFn for GCS**: `ConvertToArrayRecordGCS` for cloud storage output -- **Pre-built pipelines**: Ready-to-use conversion utilities -- **Format conversion**: Seamless TFRecord to ArrayRecord conversion - -## Installation - -Install ArrayRecord with Beam support: - -```bash -pip install array_record[beam] -``` - -This includes: -- Apache Beam with GCP support (>=2.53.0) -- Google Cloud Storage client library -- TensorFlow for TFRecord compatibility - -## Quick Start - -### Basic File Conversion - -Convert TFRecord files to ArrayRecord format: - -```python -from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk -from apache_beam.options.pipeline_options import PipelineOptions - -# Convert TFRecords to ArrayRecords on local disk -pipeline = convert_tf_to_arrayrecord_disk( - num_shards=4, - args=['--input', '/path/to/tfrecords/*', '--output', '/path/to/arrayrecords/output'], - pipeline_options=PipelineOptions() -) - -result = pipeline.run() -result.wait_until_finish() -``` - -### Cloud Storage Conversion - -Convert files and upload to Google Cloud Storage: - -```python -from array_record.beam.pipelines import convert_tf_to_arrayrecord_gcs -from apache_beam.options.pipeline_options import PipelineOptions - -pipeline = convert_tf_to_arrayrecord_gcs( - args=[ - '--input', 'gs://source-bucket/tfrecords/*', - '--output', 'gs://dest-bucket/arrayrecords/' - ], - pipeline_options=PipelineOptions([ - '--runner=DataflowRunner', - '--project=my-project', - '--region=us-central1' - ]) -) - -result = pipeline.run() -result.wait_until_finish() -``` - -## Core Components - -### WriteToArrayRecord PTransform - -For writing ArrayRecord files to disk-based filesystems: - -```python -import apache_beam as beam -from array_record.beam.arrayrecordio import WriteToArrayRecord - -with beam.Pipeline() as pipeline: - # Create some data - data = pipeline | beam.Create([ - b'record 1', - b'record 2', - b'record 3' - ]) - - # Write to ArrayRecord files - data | WriteToArrayRecord( - file_path_prefix='/tmp/output', - file_name_suffix='.array_record', - num_shards=2 - ) -``` - -**Important**: `WriteToArrayRecord` only works with local/disk-based paths, not cloud storage URLs. - -### ConvertToArrayRecordGCS DoFn - -For writing ArrayRecord files to Google Cloud Storage: - -```python -import apache_beam as beam -from array_record.beam.dofns import ConvertToArrayRecordGCS - -# Prepare data as (filename, records) tuples -file_data = [ - ('file1.tfrecord', [b'record1', b'record2']), - ('file2.tfrecord', [b'record3', b'record4']) -] - -with beam.Pipeline() as pipeline: - data = pipeline | beam.Create(file_data) - - data | beam.ParDo( - ConvertToArrayRecordGCS(), - path='gs://my-bucket/arrayrecords/', - file_path_suffix='.array_record' - ) -``` - -## Pre-built Pipelines - -### Disk-based Conversion - -```python -from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk - -# Convert with specific number of shards -pipeline = convert_tf_to_arrayrecord_disk( - num_shards=10, - args=['--input', 'gs://bucket/tfrecords/*', '--output', '/local/arrayrecords/output'] -) -``` - -### Matching Shard Count - -Convert while preserving the number of input files: - -```python -from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk_match_shards - -# Output will have same number of files as input -pipeline = convert_tf_to_arrayrecord_disk_match_shards( - args=['--input', '/path/to/tfrecords/*', '--output', '/path/to/arrayrecords/output'] -) -``` - -### GCS Conversion - -```python -from array_record.beam.pipelines import convert_tf_to_arrayrecord_gcs - -pipeline = convert_tf_to_arrayrecord_gcs( - overwrite_extension=True, # Replace .tfrecord with .array_record - args=[ - '--input', 'gs://input-bucket/tfrecords/*', - '--output', 'gs://output-bucket/arrayrecords/' - ] -) -``` - -## Command Line Usage - -Run conversions directly from the command line: - -```bash -# Local conversion -python -m array_record.beam.pipelines \ - --input /path/to/tfrecords/* \ - --output /path/to/arrayrecords/output \ - --num_shards 5 - -# GCS conversion with Dataflow -python -m array_record.beam.pipelines \ - --input gs://source-bucket/tfrecords/* \ - --output gs://dest-bucket/arrayrecords/ \ - --runner DataflowRunner \ - --project my-project \ - --region us-central1 \ - --temp_location gs://my-bucket/temp -``` - -## Google Cloud Dataflow - -### Basic Dataflow Setup - -```python -from apache_beam.options.pipeline_options import PipelineOptions -from array_record.beam.pipelines import convert_tf_to_arrayrecord_gcs - -dataflow_options = PipelineOptions([ - '--runner=DataflowRunner', - '--project=my-project', - '--region=us-central1', - '--temp_location=gs://my-bucket/temp', - '--staging_location=gs://my-bucket/staging', - '--max_num_workers=20', - '--disk_size_gb=100' -]) - -pipeline = convert_tf_to_arrayrecord_gcs( - args=[ - '--input', 'gs://large-dataset/tfrecords/*', - '--output', 'gs://processed-data/arrayrecords/' - ], - pipeline_options=dataflow_options -) - -result = pipeline.run() -result.wait_until_finish() -``` - -### Monitoring Dataflow Jobs - -Monitor your conversion jobs through: -- [Google Cloud Console](https://console.cloud.google.com/dataflow) -- Beam metrics and logging -- Custom monitoring DoFns - -```python -class MonitoringDoFn(beam.DoFn): - def __init__(self): - self.records_processed = Metrics.counter('conversion', 'records_processed') - - def process(self, element): - self.records_processed.inc() - yield element - -# Add to your pipeline -data | beam.ParDo(MonitoringDoFn()) | ... -``` - -## Custom Pipelines - -### Reading ArrayRecord Files - -```python -import apache_beam as beam -from array_record.python import array_record_data_source - -class ReadArrayRecordDoFn(beam.DoFn): - def process(self, file_path): - with array_record_data_source.ArrayRecordDataSource(file_path) as ds: - for i in range(len(ds)): - yield ds[i] - -with beam.Pipeline() as pipeline: - files = pipeline | beam.Create(['file1.array_record', 'file2.array_record']) - records = files | beam.ParDo(ReadArrayRecordDoFn()) - - # Process records further - records | beam.Map(lambda x: len(x)) | beam.io.WriteToText('record_lengths.txt') -``` - -### Custom Conversion Logic - -```python -import apache_beam as beam -from array_record.python import array_record_module -import tempfile -import os - -class CustomArrayRecordWriterDoFn(beam.DoFn): - def process(self, element): - filename, records = element - - # Create temporary file - with tempfile.NamedTemporaryFile(delete=False, suffix='.array_record') as tmp: - writer = array_record_module.ArrayRecordWriter( - tmp.name, - 'group_size:1000,brotli:9' # Custom options - ) - - for record in records: - # Apply custom transformation - transformed = self.transform_record(record) - writer.write(transformed) - - writer.close() - - # Yield the result - yield (filename, tmp.name) - - def transform_record(self, record): - # Custom record transformation logic - return record.upper() - -# Use in pipeline -with beam.Pipeline() as pipeline: - file_data = pipeline | beam.Create([ - ('input1.txt', [b'hello', b'world']), - ('input2.txt', [b'foo', b'bar']) - ]) - - transformed = file_data | beam.ParDo(CustomArrayRecordWriterDoFn()) -``` - -## Performance Optimization - -### Writer Configuration - -Optimize ArrayRecord writer settings for your use case: - -```python -# For high compression (slower) -high_compression_options = 'group_size:10000,brotli:11,max_parallelism:1' - -# For fast writing (larger files) -fast_writing_options = 'group_size:1000,snappy,max_parallelism:8' - -# Balanced -balanced_options = 'group_size:2000,brotli:6,max_parallelism:4' -``` - -### Dataflow Optimization - -```python -dataflow_options = PipelineOptions([ - '--runner=DataflowRunner', - '--max_num_workers=50', - '--num_workers=10', - '--worker_machine_type=n1-highmem-4', - '--disk_size_gb=200', - '--use_public_ips=false', # For better network performance - '--network=my-vpc', - '--subnetwork=my-subnet' -]) -``` - -### Batch Processing - -Process files in batches for better resource utilization: - -```python -class BatchProcessingDoFn(beam.DoFn): - def process(self, element, batch_size=100): - filename, records = element - - # Process in batches - for i in range(0, len(records), batch_size): - batch = records[i:i + batch_size] - yield self.process_batch(filename, batch) - - def process_batch(self, filename, batch): - # Process batch of records - pass -``` - -## Error Handling and Monitoring - -### Robust Error Handling - -```python -import logging -from apache_beam.transforms.util import Reshuffle - -class RobustConversionDoFn(beam.DoFn): - def process(self, element): - try: - filename, records = element - - # Conversion logic here - result = self.convert_file(filename, records) - - yield beam.pvalue.TaggedOutput('success', result) - - except Exception as e: - logging.error(f"Failed to process {filename}: {e}") - yield beam.pvalue.TaggedOutput('failed', (filename, str(e))) - - def convert_file(self, filename, records): - # Your conversion logic - pass - -# Use with error handling -with beam.Pipeline() as pipeline: - input_data = pipeline | beam.Create(file_data) - - results = input_data | beam.ParDo(RobustConversionDoFn()).with_outputs( - 'success', 'failed', main='success' - ) - - # Handle successful conversions - results.success | beam.Map(lambda x: f"Converted: {x}") - - # Handle failures - results.failed | beam.Map(lambda x: f"Failed: {x[0]} - {x[1]}") -``` - -### Progress Monitoring - -```python -from apache_beam.metrics import Metrics - -class MonitoredConversionDoFn(beam.DoFn): - def __init__(self): - self.files_processed = Metrics.counter('conversion', 'files_processed') - self.records_written = Metrics.counter('conversion', 'records_written') - self.bytes_written = Metrics.counter('conversion', 'bytes_written') - - def process(self, element): - filename, records = element - - self.files_processed.inc() - - # Process file - total_bytes = 0 - for record in records: - # Write record - total_bytes += len(record) - self.records_written.inc() - - self.bytes_written.inc(total_bytes) - - yield f"Processed {filename}: {len(records)} records, {total_bytes} bytes" -``` - -## Best Practices - -### File Organization - -```python -# Use meaningful file patterns -input_pattern = 'gs://data-bucket/year=2024/month=*/day=*/tfrecords/*.tfrecord' -output_prefix = 'gs://processed-bucket/year=2024/arrayrecords/data' - -# Include metadata in filenames -output_filename = f"{output_prefix}-{datetime.now().strftime('%Y%m%d')}" -``` - -### Resource Management - -```python -# Use appropriate machine types -# For CPU-intensive compression: n1-highcpu-* -# For memory-intensive operations: n1-highmem-* -# For balanced workloads: n1-standard-* - -worker_options = [ - '--worker_machine_type=n1-standard-4', - '--disk_size_gb=100', - '--max_num_workers=20' -] -``` - -### Testing - -Test your pipelines locally before running on Dataflow: - -```python -# Local testing -local_options = PipelineOptions(['--runner=DirectRunner']) - -# Test with small dataset -test_pipeline = convert_tf_to_arrayrecord_disk( - num_shards=1, - args=['--input', 'test_data/*.tfrecord', '--output', 'test_output/'], - pipeline_options=local_options -) -``` - -## Troubleshooting - -### Common Issues - -1. **Import errors**: Ensure `array_record[beam]` is installed -2. **Permission errors**: Check GCS bucket permissions -3. **Out of disk space**: Increase worker disk size -4. **Memory errors**: Use appropriate machine types -5. **Slow performance**: Tune parallelism and batch sizes - -### Debug Tips - -```python -# Enable debug logging -import logging -logging.getLogger().setLevel(logging.DEBUG) - -# Add debug outputs -debug_data = input_data | beam.Map(lambda x: logging.info(f"Processing: {x}")) -``` - -### Performance Monitoring - -Use Dataflow's built-in monitoring or add custom metrics: - -```python -# Custom timing metrics -from apache_beam.metrics import Metrics -import time - -class TimedConversionDoFn(beam.DoFn): - def __init__(self): - self.conversion_time = Metrics.distribution('conversion', 'time_ms') - - def process(self, element): - start_time = time.time() - - # Conversion logic - result = self.convert(element) - - end_time = time.time() - self.conversion_time.update(int((end_time - start_time) * 1000)) - - yield result -``` diff --git a/docs/beam_reference.rst b/docs/beam_reference.rst deleted file mode 100644 index 69c0eef..0000000 --- a/docs/beam_reference.rst +++ /dev/null @@ -1,338 +0,0 @@ -Apache Beam Integration Reference -=================================== - -ArrayRecord provides comprehensive Apache Beam integration for large-scale data processing. This integration allows you to read from and write to ArrayRecord files in distributed Beam pipelines. - - -Overview --------- - -The ArrayRecord Beam integration provides: - -* **WriteToArrayRecord**: PTransform for writing ArrayRecord files to disk -* **ConvertToArrayRecordGCS**: DoFn for writing ArrayRecord files to Google Cloud Storage -* **Pipeline utilities**: Pre-built pipelines for common conversion tasks - -Installation ------------- - -Install ArrayRecord with Beam support: - -.. code-block:: bash - - pip install array_record[beam] - -This includes Apache Beam with GCP support and Google Cloud Storage client libraries. - -Core Components ---------------- - -WriteToArrayRecord -~~~~~~~~~~~~~~~~~~ - -A PTransform for writing data to ArrayRecord files on disk-based filesystems. - - A PTransform for writing data to ArrayRecord files on disk-based filesystems. - - **Important**: This sink only works with disk-based paths. It does not support cloud storage URLs (gs://, s3://, etc.) directly. - - **Parameters:** - - * ``file_path_prefix`` (str): Path prefix for output files - * ``file_name_suffix`` (str, optional): Suffix for output files (default: "") - * ``num_shards`` (int, optional): Number of output shards (default: 0 for auto) - * ``shard_name_template`` (str, optional): Template for shard naming - * ``coder`` (Coder, optional): Beam coder for encoding records - * ``compression_type`` (str, optional): Compression type - - **Example:** - - .. code-block:: python - - import apache_beam as beam - from array_record.beam.arrayrecordio import WriteToArrayRecord - - with beam.Pipeline() as pipeline: - data = pipeline | beam.Create([b'record1', b'record2', b'record3']) - data | WriteToArrayRecord( - file_path_prefix='/tmp/output', - file_name_suffix='.array_record', - num_shards=2 - ) - -DoFn Components -~~~~~~~~~~~~~~~ - -The DoFn components provide custom processing functions for Beam pipelines. - -ConvertToArrayRecordGCS -~~~~~~~~~~~~~~~~~~~~~~~ - -A DoFn that writes ArrayRecord files to Google Cloud Storage. - - A DoFn that writes ArrayRecord files to Google Cloud Storage. This DoFn processes - tuples of (filename, records) and uploads the resulting ArrayRecord files to GCS. - - **Parameters:** - - * ``path`` (str): GCS bucket path prefix (e.g., "gs://my-bucket/path/") - * ``write_dir`` (str, optional): Local temporary directory (default: "/tmp/") - * ``file_path_suffix`` (str, optional): File suffix (default: ".arrayrecord") - * ``overwrite_extension`` (bool, optional): Replace existing extension (default: False) - - **Example:** - - .. code-block:: python - - import apache_beam as beam - from array_beam.beam.dofns import ConvertToArrayRecordGCS - - def read_tfrecord_with_filename(file_pattern): - # Custom function to read TFRecords and return (filename, records) tuples - pass - - with beam.Pipeline() as pipeline: - file_data = pipeline | beam.Create([ - ('file1.tfrecord', [b'record1', b'record2']), - ('file2.tfrecord', [b'record3', b'record4']), - ]) - - file_data | beam.ParDo( - ConvertToArrayRecordGCS(), - path='gs://my-bucket/arrayrecords/', - file_path_suffix='.array_record' - ) - -Pipeline Utilities ------------------- - -The pipelines module provides several ready-to-use pipeline functions for common data conversion tasks. - -Pre-built Pipelines -~~~~~~~~~~~~~~~~~~~~ - -The pipelines module provides several ready-to-use pipeline functions: - -convert_tf_to_arrayrecord_disk -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Converts TFRecord files to ArrayRecord files on disk with configurable sharding. - - Converts TFRecord files to ArrayRecord files on disk with configurable sharding. - - **Parameters:** - - * ``num_shards`` (int): Number of output shards - * ``args`` (list): Command line arguments - * ``pipeline_options`` (PipelineOptions): Beam pipeline options - - **Example:** - - .. code-block:: python - - from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk - from apache_beam.options.pipeline_options import PipelineOptions - - # Convert with 4 output shards - pipeline = convert_tf_to_arrayrecord_disk( - num_shards=4, - args=['--input', 'gs://bucket/tfrecords/*', '--output', '/tmp/arrayrecords/output'], - pipeline_options=PipelineOptions() - ) - pipeline.run().wait_until_finish() - -convert_tf_to_arrayrecord_gcs -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Converts TFRecord files to ArrayRecord files on Google Cloud Storage. - - Converts TFRecord files to ArrayRecord files on Google Cloud Storage. - - **Parameters:** - - * ``overwrite_extension`` (bool): Whether to replace file extensions - * ``file_path_suffix`` (str): Suffix for output files - * ``args`` (list): Command line arguments - * ``pipeline_options`` (PipelineOptions): Beam pipeline options - - **Example:** - - .. code-block:: python - - from array_record.beam.pipelines import convert_tf_to_arrayrecord_gcs - - pipeline = convert_tf_to_arrayrecord_gcs( - overwrite_extension=True, - file_path_suffix='.array_record', - args=['--input', 'gs://input-bucket/tfrecords/*', - '--output', 'gs://output-bucket/arrayrecords/'], - pipeline_options=PipelineOptions() - ) - -convert_tf_to_arrayrecord_disk_match_shards -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Converts TFRecord files to ArrayRecord files with matching number of shards. - - Converts TFRecord files to ArrayRecord files with matching number of shards. - - **Example:** - - .. code-block:: python - - from array_record.beam.pipelines import convert_tf_to_arrayrecord_disk_match_shards - - # Output will have same number of files as input - pipeline = convert_tf_to_arrayrecord_disk_match_shards( - args=['--input', '/path/to/tfrecords/*', '--output', '/path/to/arrayrecords/output'] - ) - -Command Line Usage ------------------- - -The pipeline utilities can be run from the command line: - -.. code-block:: bash - - # Convert TFRecords to ArrayRecords on disk - python -m array_record.beam.pipelines \ - --input gs://bucket/tfrecords/* \ - --output /tmp/arrayrecords/output \ - --num_shards 10 - - # Convert to GCS - python -m array_record.beam.pipelines \ - --input gs://input-bucket/tfrecords/* \ - --output gs://output-bucket/arrayrecords/ \ - --runner DataflowRunner \ - --project my-project \ - --region us-central1 - -Configuration Options ---------------------- - -Writer Configuration -~~~~~~~~~~~~~~~~~~~~ - -Configure ArrayRecord writer options in your pipelines: - -.. code-block:: python - - from array_record.beam.arrayrecordio import _ArrayRecordSink - - # The sink uses 'group_size:1' by default - # You can modify this by subclassing _ArrayRecordSink - -Reader Configuration -~~~~~~~~~~~~~~~~~~~~ - -When reading ArrayRecord files in Beam pipelines, you can use the standard -ArrayRecord API within your DoFns: - -.. code-block:: python - - import apache_beam as beam - from array_record.python import array_record_data_source - - class ReadArrayRecordDoFn(beam.DoFn): - def process(self, file_path): - with array_record_data_source.ArrayRecordDataSource(file_path) as ds: - for i in range(len(ds)): - yield ds[i] - - with beam.Pipeline() as pipeline: - files = pipeline | beam.Create(['file1.array_record', 'file2.array_record']) - records = files | beam.ParDo(ReadArrayRecordDoFn()) - -Google Cloud Dataflow ----------------------- - -ArrayRecord Beam integration works with Google Cloud Dataflow: - -.. code-block:: python - - from apache_beam.options.pipeline_options import PipelineOptions - - dataflow_options = PipelineOptions([ - '--runner=DataflowRunner', - '--project=my-project', - '--region=us-central1', - '--temp_location=gs://my-bucket/temp', - '--staging_location=gs://my-bucket/staging', - ]) - - pipeline = convert_tf_to_arrayrecord_gcs( - args=['--input', 'gs://input/tfrecords/*', '--output', 'gs://output/arrayrecords/'], - pipeline_options=dataflow_options - ) - -Best Practices --------------- - -1. **File Size Management**: Use appropriate sharding to avoid very large files - -2. **Temporary Storage**: Ensure sufficient disk space for temporary files when using GCS DoFn - -3. **Error Handling**: Implement proper error handling in custom DoFns - -4. **Resource Management**: Use context managers for file operations - -5. **Monitoring**: Monitor Dataflow jobs through the Google Cloud Console - -Example: Complete Conversion Pipeline -------------------------------------- - -Here's a complete example of converting TFRecord files to ArrayRecord: - -.. code-block:: python - - import apache_beam as beam - from apache_beam.options.pipeline_options import PipelineOptions - from array_record.beam.pipelines import convert_tf_to_arrayrecord_gcs - - def main(): - pipeline_options = PipelineOptions([ - '--runner=DataflowRunner', - '--project=my-project', - '--region=us-central1', - '--temp_location=gs://my-bucket/temp', - '--max_num_workers=10', - ]) - - pipeline = convert_tf_to_arrayrecord_gcs( - overwrite_extension=True, - file_path_suffix='.array_record', - args=[ - '--input', 'gs://source-bucket/tfrecords/*.tfrecord', - '--output', 'gs://dest-bucket/arrayrecords/' - ], - pipeline_options=pipeline_options - ) - - result = pipeline.run() - result.wait_until_finish() - - if __name__ == '__main__': - main() - -Troubleshooting ---------------- - -Common Issues -~~~~~~~~~~~~~ - -1. **"Module not found" errors**: Ensure you installed with ``pip install array_record[beam]`` - -2. **GCS permission errors**: Check that your service account has proper GCS permissions - -3. **Disk space errors**: Increase disk size for Dataflow workers or use smaller batch sizes - -4. **Memory errors**: Reduce parallelism or increase worker memory - -Performance Tips -~~~~~~~~~~~~~~~~ - -1. Use appropriate worker machine types for your data size -2. Tune the number of workers based on your input data -3. Use regional persistent disks for better I/O performance -4. Monitor resource usage through Dataflow monitoring diff --git a/docs/changelog.md b/docs/changelog.md deleted file mode 100644 index 77f8e4e..0000000 --- a/docs/changelog.md +++ /dev/null @@ -1,298 +0,0 @@ -# Changelog - -All notable changes to ArrayRecord will be documented in this file. - -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), -and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - -## [Unreleased] - -### Added -- Comprehensive Sphinx documentation -- Performance optimization guides -- Extended examples for machine learning workflows -- Multi-modal data storage examples - -### Changed -- Improved documentation structure and navigation -- Enhanced API reference documentation - -### Fixed -- Documentation formatting and cross-references - -## [0.8.1] - 2024-01-15 - -### Added -- Support for Python 3.13 -- Enhanced error handling in Python bindings -- Improved memory management in C++ components - -### Changed -- Updated dependencies to latest versions -- Optimized default buffer sizes for better performance - -### Fixed -- Memory leaks in certain edge cases -- Compatibility issues with newer TensorFlow versions -- Build issues on macOS with Apple Silicon - -### Security -- Updated dependencies to address security vulnerabilities - -## [0.8.0] - 2023-12-01 - -### Added -- New `index_storage_option` for memory optimization -- Support for offloaded index storage to reduce memory usage -- Enhanced parallel reading capabilities -- Improved random access performance - -### Changed -- **BREAKING**: Changed default group size from 32768 to 65536 -- Improved compression ratio with better chunk organization -- Enhanced Apache Beam integration with better error handling - -### Fixed -- Race conditions in parallel writing scenarios -- Incorrect record counts in certain file configurations -- Memory usage spikes during large file processing - -### Deprecated -- Old-style option parsing (will be removed in v1.0.0) - -## [0.7.2] - 2023-10-15 - -### Added -- Support for transposition with custom bucket sizes -- New writer option `transpose_bucket_size` -- Enhanced debugging capabilities - -### Fixed -- Transposition issues with variable-length records -- Performance regression in sequential access -- Build failures on certain Linux distributions - -## [0.7.1] - 2023-09-01 - -### Fixed -- Critical bug in record indexing for large files -- Memory corruption in multi-threaded scenarios -- Incorrect file size calculations - -### Changed -- Improved error messages for better debugging -- Enhanced logging in debug builds - -## [0.7.0] - 2023-08-15 - -### Added -- Apache Beam integration for large-scale data processing -- Support for Google Cloud Storage via Beam DoFns -- Pre-built pipelines for TFRecord conversion -- Command-line utilities for data conversion - -### Changed -- Restructured project layout for better modularity -- Improved build system with better dependency management - -### Fixed -- Threading issues in concurrent read scenarios -- File handle leaks in error conditions - -## [0.6.3] - 2023-07-01 - -### Added -- Support for macOS on Apple Silicon (aarch64) -- Enhanced protocol buffer support -- New parallel reading methods with index support - -### Fixed -- Build issues on newer macOS versions -- Performance degradation in certain access patterns -- Memory alignment issues on ARM processors - -## [0.6.2] - 2023-06-01 - -### Fixed -- Critical bug in chunk boundary calculations -- Data corruption issues in specific compression scenarios -- Build compatibility with newer Bazel versions - -### Changed -- Improved error handling and reporting -- Enhanced validation of writer options - -## [0.6.1] - 2023-05-15 - -### Added -- Support for Python 3.12 -- Enhanced compression options with Zstd support -- New writer option `saturation_delay_ms` - -### Changed -- Improved default compression settings -- Better memory usage patterns in large file scenarios - -### Fixed -- Compatibility issues with newer Python versions -- Memory leaks in certain error scenarios - -## [0.6.0] - 2023-04-01 - -### Added -- **New Feature**: Parallel writing capabilities -- Support for custom thread pools -- Enhanced random access performance -- New reader options for optimization - -### Changed -- **BREAKING**: Modified ArrayRecordReader API for better performance -- Improved chunk indexing for faster seeks -- Enhanced compression efficiency - -### Fixed -- Race conditions in multi-threaded environments -- Incorrect record ordering in parallel scenarios - -### Deprecated -- Legacy reader initialization methods - -## [0.5.2] - 2023-03-01 - -### Fixed -- Critical data corruption bug in specific compression scenarios -- Build issues on CentOS and RHEL systems -- Memory usage optimization for large files - -### Changed -- Improved error messages and diagnostics -- Enhanced validation of input parameters - -## [0.5.1] - 2023-02-15 - -### Added -- Support for Python 3.11 -- Enhanced debugging and profiling capabilities -- New utility functions for file inspection - -### Fixed -- Performance regression in sequential reading -- Build compatibility with newer GCC versions -- Memory alignment issues on certain architectures - -## [0.5.0] - 2023-01-15 - -### Added -- **Major Feature**: Random access by record index -- Support for multiple compression algorithms (Brotli, Zstd, Snappy) -- Configurable group sizes for performance tuning -- Enhanced Python API with context manager support - -### Changed -- **BREAKING**: New file format with improved indexing -- Restructured C++ API for better performance -- Improved memory usage patterns - -### Fixed -- Data integrity issues in certain edge cases -- Build system improvements for better portability - -### Migration Guide -- Files created with v0.4.x need to be recreated with v0.5.0+ -- Update code to use new ArrayRecordDataSource API -- Review compression settings for optimal performance - -## [0.4.3] - 2022-12-01 - -### Fixed -- Memory leaks in long-running processes -- Compatibility with TensorFlow 2.11+ -- Build issues on Ubuntu 22.04 - -### Changed -- Improved error handling in Python bindings -- Enhanced logging for debugging - -## [0.4.2] - 2022-11-01 - -### Added -- Support for Linux aarch64 (ARM64) -- Enhanced error reporting and diagnostics - -### Fixed -- Segmentation faults in certain error conditions -- Build compatibility with newer Python versions - -## [0.4.1] - 2022-10-15 - -### Fixed -- Critical bug in record boundary detection -- Performance issues with small records -- Build system improvements - -### Changed -- Optimized buffer management for better performance -- Improved documentation and examples - -## [0.4.0] - 2022-10-01 - -### Added -- **Initial public release** -- Core ArrayRecord format implementation -- Python bindings with basic read/write functionality -- C++ API for high-performance applications -- Support for Linux x86_64 -- Basic compression support with Brotli - -### Features -- Sequential reading and writing -- Configurable compression levels -- Thread-safe operations -- Integration with Riegeli format - ---- - -## Version Support - -| Version | Python Support | Platform Support | Status | -|---------|----------------|------------------|---------| -| 0.8.x | 3.10, 3.11, 3.12, 3.13 | Linux (x86_64, aarch64), macOS (aarch64) | Active | -| 0.7.x | 3.10, 3.11, 3.12 | Linux (x86_64, aarch64), macOS (aarch64) | Security fixes only | -| 0.6.x | 3.10, 3.11, 3.12 | Linux (x86_64, aarch64), macOS (aarch64) | End of life | -| 0.5.x | 3.10, 3.11 | Linux (x86_64, aarch64) | End of life | -| < 0.5 | 3.8, 3.9, 3.10 | Linux (x86_64) | End of life | - -## Migration Notes - -### Upgrading from 0.7.x to 0.8.x - -- No breaking changes -- New features available but optional -- Recommended to update for performance improvements - -### Upgrading from 0.6.x to 0.7.x - -- Apache Beam integration requires separate installation: `pip install array_record[beam]` -- New command-line tools available -- Existing code continues to work without changes - -### Upgrading from 0.5.x to 0.6.x - -- Python 3.12 support added -- New compression options available -- Performance improvements in random access - -### Upgrading from 0.4.x to 0.5.x - -- **File format changed**: Files need to be recreated -- **API changes**: Update to new ArrayRecordDataSource -- **Performance**: Significant improvements in random access -- **Compression**: New algorithms available - -## Contributing - -See [CONTRIBUTING.md](contributing.md) for information about contributing to ArrayRecord. - -## License - -ArrayRecord is licensed under the Apache License 2.0. See [LICENSE](../LICENSE) for details. diff --git a/docs/conf.py b/docs/conf.py index 4d2eaa5..f18eaf1 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -22,8 +22,8 @@ author = 'ArrayRecord Contributors' # The full version, including alpha/beta/rc tags -release = '0.8.1' -version = '0.8.1' +release = '0.8.2' +version = '0.8.2' # -- General configuration --------------------------------------------------- diff --git a/docs/contributing.md b/docs/contributing.md deleted file mode 100644 index c51d4c4..0000000 --- a/docs/contributing.md +++ /dev/null @@ -1,462 +0,0 @@ -# Contributing to ArrayRecord - -We welcome contributions to ArrayRecord! This document provides guidelines for contributing to the project. - -## Getting Started - -### Development Environment - -1. **Clone the repository**: - ```bash - git clone https://github.com/google/array_record.git - cd array_record - ``` - -2. **Set up development environment**: - ```bash - # Create virtual environment - python -m venv venv - source venv/bin/activate # On Windows: venv\Scripts\activate - - # Install development dependencies - pip install -e .[test,beam] - ``` - -3. **Install build dependencies**: - ```bash - # Install Bazel (for C++ components) - # Follow instructions at https://bazel.build/install - - # Verify installation - bazel version - ``` - -### Building from Source - -```bash -# Build all targets -bazel build //... - -# Build specific components -bazel build //python:array_record_module -bazel build //cpp:array_record_reader -bazel build //beam:all -``` - -### Running Tests - -```bash -# Run all tests -bazel test //... - -# Run specific test suites -bazel test //python:array_record_module_test -bazel test //cpp:array_record_reader_test -bazel test //beam:all - -# Run Python tests with pytest -pytest python/ -v -``` - -## Code Style and Standards - -### Python Code Style - -We follow PEP 8 with some modifications. Use the following tools: - -```bash -# Install formatting tools -pip install black isort pylint mypy - -# Format code -black . -isort . - -# Check style -pylint array_record/ -mypy array_record/ -``` - -### C++ Code Style - -We follow the Google C++ Style Guide. Use clang-format: - -```bash -# Format C++ code -find cpp/ -name "*.cc" -o -name "*.h" | xargs clang-format -i -``` - -### Documentation Style - -- Use Google-style docstrings for Python -- Use Doxygen-style comments for C++ -- Write clear, concise documentation -- Include code examples where helpful - -## Types of Contributions - -### Bug Reports - -When reporting bugs, please include: - -1. **Clear description** of the issue -2. **Steps to reproduce** the problem -3. **Expected vs. actual behavior** -4. **Environment information** (OS, Python version, ArrayRecord version) -5. **Minimal code example** that demonstrates the issue - -**Bug Report Template**: -```markdown -**Describe the bug** -A clear and concise description of what the bug is. - -**To Reproduce** -Steps to reproduce the behavior: -1. Create ArrayRecord file with... -2. Read using... -3. See error... - -**Expected behavior** -A clear description of what you expected to happen. - -**Environment:** -- OS: [e.g., Ubuntu 20.04] -- Python version: [e.g., 3.10.8] -- ArrayRecord version: [e.g., 0.8.1] -- Installation method: [e.g., pip, source] - -**Additional context** -Add any other context about the problem here. -``` - -### Feature Requests - -For feature requests, please include: - -1. **Clear description** of the proposed feature -2. **Use case** and motivation -3. **Proposed API** (if applicable) -4. **Implementation considerations** - -### Code Contributions - -#### Pull Request Process - -1. **Fork the repository** and create a feature branch -2. **Make your changes** following the coding standards -3. **Add tests** for new functionality -4. **Update documentation** as needed -5. **Run tests** to ensure everything works -6. **Submit a pull request** - -#### Pull Request Guidelines - -- **One feature per PR**: Keep changes focused and atomic -- **Clear commit messages**: Use descriptive commit messages -- **Add tests**: All new code should have corresponding tests -- **Update docs**: Update documentation for API changes -- **Follow style guides**: Ensure code follows project standards - -#### Example Workflow - -```bash -# 1. Fork and clone your fork -git clone https://github.com/yourusername/array_record.git -cd array_record - -# 2. Create feature branch -git checkout -b feature/my-new-feature - -# 3. Make changes and commit -git add . -git commit -m "Add new feature: description of changes" - -# 4. Run tests -bazel test //... - -# 5. Push to your fork -git push origin feature/my-new-feature - -# 6. Create pull request on GitHub -``` - -## Development Guidelines - -### Adding New Features - -#### Python Features - -1. **Add implementation** in appropriate module -2. **Write comprehensive tests**: - ```python - # python/my_feature_test.py - import unittest - from array_record.python import my_feature - - class MyFeatureTest(unittest.TestCase): - def test_basic_functionality(self): - # Test implementation - pass - ``` - -3. **Update BUILD files**: - ```python - # python/BUILD - py_test( - name = "my_feature_test", - srcs = ["my_feature_test.py"], - deps = [":my_feature"], - ) - ``` - -4. **Add documentation**: - ```python - def my_function(arg1: str, arg2: int) -> bool: - """Brief description of the function. - - Args: - arg1: Description of arg1. - arg2: Description of arg2. - - Returns: - Description of return value. - - Raises: - ValueError: When invalid arguments are provided. - - Example: - >>> result = my_function("test", 42) - >>> print(result) - True - """ - ``` - -#### C++ Features - -1. **Add header file**: - ```cpp - // cpp/my_feature.h - #ifndef ARRAY_RECORD_CPP_MY_FEATURE_H_ - #define ARRAY_RECORD_CPP_MY_FEATURE_H_ - - namespace array_record { - - class MyFeature { - public: - // Public interface - }; - - } // namespace array_record - - #endif // ARRAY_RECORD_CPP_MY_FEATURE_H_ - ``` - -2. **Add implementation**: - ```cpp - // cpp/my_feature.cc - #include "cpp/my_feature.h" - - namespace array_record { - - // Implementation - - } // namespace array_record - ``` - -3. **Add tests**: - ```cpp - // cpp/my_feature_test.cc - #include "cpp/my_feature.h" - #include "gtest/gtest.h" - - namespace array_record { - - TEST(MyFeatureTest, BasicFunctionality) { - // Test implementation - } - - } // namespace array_record - ``` - -4. **Update BUILD files**: - ```python - # cpp/BUILD - cc_library( - name = "my_feature", - srcs = ["my_feature.cc"], - hdrs = ["my_feature.h"], - deps = [ - # Dependencies - ], - ) - - cc_test( - name = "my_feature_test", - srcs = ["my_feature_test.cc"], - deps = [ - ":my_feature", - "@googletest//:gtest_main", - ], - ) - ``` - -### Documentation - -#### API Documentation - -- **Python**: Use Google-style docstrings -- **C++**: Use Doxygen-style comments -- **Include examples** in docstrings -- **Document all public APIs** - -#### User Documentation - -- **Update relevant guides** when adding features -- **Add examples** to the examples section -- **Update performance guide** if applicable -- **Keep documentation up to date** with code changes - -### Testing - -#### Test Categories - -1. **Unit tests**: Test individual components -2. **Integration tests**: Test component interactions -3. **Performance tests**: Benchmark critical paths -4. **Compatibility tests**: Test across Python versions and platforms - -#### Test Guidelines - -- **Comprehensive coverage**: Test normal and edge cases -- **Clear test names**: Describe what is being tested -- **Independent tests**: Each test should be self-contained -- **Fast execution**: Keep tests fast and reliable - -#### Example Test Structure - -```python -class ArrayRecordFeatureTest(unittest.TestCase): - def setUp(self): - """Set up test fixtures.""" - self.temp_dir = tempfile.mkdtemp() - self.test_file = os.path.join(self.temp_dir, 'test.array_record') - - def tearDown(self): - """Clean up test fixtures.""" - shutil.rmtree(self.temp_dir) - - def test_normal_case(self): - """Test normal operation.""" - # Test implementation - pass - - def test_edge_case(self): - """Test edge case handling.""" - # Test implementation - pass - - def test_error_handling(self): - """Test error conditions.""" - with self.assertRaises(ValueError): - # Code that should raise ValueError - pass -``` - -## Release Process - -### Version Management - -ArrayRecord uses semantic versioning (MAJOR.MINOR.PATCH): - -- **MAJOR**: Incompatible API changes -- **MINOR**: Backward-compatible functionality additions -- **PATCH**: Backward-compatible bug fixes - -### Release Checklist - -1. **Update version numbers** in: - - `setup.py` - - `docs/conf.py` - - Version constants in code - -2. **Update CHANGELOG.md** with: - - New features - - Bug fixes - - Breaking changes - - Performance improvements - -3. **Run full test suite**: - ```bash - bazel test //... - ``` - -4. **Build and test packages**: - ```bash - python setup.py sdist bdist_wheel - twine check dist/* - ``` - -5. **Create release tag**: - ```bash - git tag -a v0.8.2 -m "Release version 0.8.2" - git push origin v0.8.2 - ``` - -## Community Guidelines - -### Code of Conduct - -We follow the [Google Open Source Community Guidelines](https://opensource.google/conduct/). Please: - -- **Be respectful** and inclusive -- **Be collaborative** and constructive -- **Be mindful** of cultural differences -- **Focus on the issue**, not the person - -### Communication - -- **GitHub Issues**: For bug reports and feature requests -- **Pull Requests**: For code contributions -- **Discussions**: For general questions and design discussions - -### Getting Help - -- **Documentation**: Check the documentation first -- **Search existing issues**: Your question might already be answered -- **Create new issue**: If you can't find an answer -- **Provide context**: Include relevant details when asking for help - -## Advanced Topics - -### Performance Optimization - -When contributing performance improvements: - -1. **Benchmark before and after** changes -2. **Profile code** to identify bottlenecks -3. **Consider different use cases** (sequential vs. random access) -4. **Document performance implications** - -### Cross-Platform Compatibility - -- **Test on multiple platforms** when possible -- **Consider platform-specific optimizations** -- **Use appropriate build configurations** -- **Document platform limitations** - -### Security Considerations - -- **Validate inputs** thoroughly -- **Handle errors gracefully** -- **Avoid buffer overflows** in C++ code -- **Consider security implications** of new features - -## Resources - -- [ArrayRecord GitHub Repository](https://github.com/google/array_record) -- [Bazel Documentation](https://bazel.build/docs) -- [Google C++ Style Guide](https://google.github.io/styleguide/cppguide.html) -- [PEP 8 Python Style Guide](https://www.python.org/dev/peps/pep-0008/) -- [Apache Beam Documentation](https://beam.apache.org/documentation/) - -Thank you for contributing to ArrayRecord! Your contributions help make high-performance data storage accessible to everyone. diff --git a/docs/core_concepts.md b/docs/core_concepts.md index ab1e599..59bf46f 100644 --- a/docs/core_concepts.md +++ b/docs/core_concepts.md @@ -1,277 +1,391 @@ # Core Concepts -This document explains the fundamental concepts behind ArrayRecord and how to use them effectively. + +This document explains the fundamental concepts behind ArrayRecord and how to use it efficiently. ## What is ArrayRecord? -ArrayRecord is a file format designed for high-performance storage and retrieval of sequential data. It's built on top of [Riegeli](https://github.com/google/riegeli) and provides: +ArrayRecord functions as a serialization format engineered to accommodate three +primary access methodologies: sequential, batch, and random read. To effectively +address these diverse operational requirements, ArrayRecord is designed to +support various configurations, each optimized for a specific access pattern. +Certain configurations are adjustable at runtime, permitting modifications +post-serialization, while others necessitate user decisions at the time of file +creation or serialization. To maximize the utility of ArrayRecord for a specific +application, a detailed examination of its internal architecture and the core +principles of advanced systems engineering is required. -- **Parallel I/O**: Multiple threads can read different parts of the file simultaneously -- **Random Access**: Jump to any record by index without scanning the entire file -- **Compression**: Multiple compression algorithms with configurable levels -- **Chunked Storage**: Data is organized in chunks for optimal compression and access patterns ## File Structure -ArrayRecord files are organized hierarchically: +An ArrayRecord file is structured as a sequence of individually compressed +chunks. A user data chunk may encapsulate one or more records, the quantity of +which is determined by the write configuration. Subsequent to the user data +chunks are the footer chunk, which stores the file offsets for each chunk, and a +postscript section designated for file metadata. This organization is +illustrated below. ``` ┌─────────────────────┐ │ User Data │ -│ Riegeli Chunk 1 │ +│ Chunk 1 │ ├─────────────────────┤ │ User Data │ -│ Riegeli Chunk 2 │ +│ Chunk 2 │ ├─────────────────────┤ │ ... │ ├─────────────────────┤ │ User Data │ -│ Riegeli Chunk N │ +│ Chunk N │ ├─────────────────────┤ │ Footer Chunk │ -│ (Index Data) │ +│ (Index Data) │ ├─────────────────────┤ │ Postscript │ -│ (File Metadata) │ +│ (File Metadata) │ └─────────────────────┘ ``` -### Key Components - -1. **User Data Chunks**: Contain your actual records, compressed using the specified algorithm -2. **Footer Chunk**: Contains index information for random access -3. **Postscript**: File metadata and chunk offsets (fits in 64KB) -## Records and Chunks +## Write-time Configuration +The subsequent options govern the methodology by which ArrayRecord serializes +data to the disk medium. Within both the C++ and Python APIs, ArrayRecord +employs an option string for the encoding of these configurations. The option +string is comprised of one or more option specifications, delineated by commas. +Each individual option is inherently optional. Typically, each option is +succeeded by a colon (:) and its corresponding value, with the exception of +`uncompressed` and `snappy`, which function as boolean flags. The following +illustrates an example configuration: -### Records +```python +from array_record.python import array_record_module -A record is the basic unit of data in ArrayRecord. Records can be: -- Raw bytes (any binary data) -- Protocol buffer messages -- Serialized objects -- Text data (encoded as bytes) +zstd_writer = array_record_module.ArrayRecordWriter( + 'output.array_record', + 'group_size:1024,zstd:5,window_log:10' +) -### Chunks (Groups) +snappy_writer = array_record_module.ArrayRecordWriter( + 'output.array_record', + 'group_size:1024,snappy' +) +``` -Records are organized into chunks (also called "groups") before compression. The `group_size` parameter controls how many records are packed into each chunk. +The aforementioned code snippet initializes an `output.array_record` file where +`group_size` is set to 1024, the compression algorithm is Zstandard with level +5, and the compression tuning option `window_log` is set to 10. Adjusting these +parameters facilitates the optimization of the on-disk data representation for +various read access patterns. + +### Group size + +The `group_size` option dictates the quantity of records to be consolidated and +compressed into a single chunk. This parameter represents the most critical +tuning variable for accommodating diverse usage scenarios. Given that each group +is compressed autonomously, accessing a single record within a chunk +necessitates the complete decompression of that chunk. While the decompressed +chunk is subsequently cached for later access, this mechanism does not benefit +random access patterns unless the requested record resides in the currently +cached chunk. Consequently, as a general guideline, an ArrayRecord file +primarily intended for random access should consistently employ a `group_size` +of one. + +In contrast to the random access use case, ArrayRecord achieves a superior +compression ratio and reduced I/O overhead when configured with a larger group +size. This configuration is particularly advantageous for sequential and batch +access of contiguous records. + +```{attention} +When creating ArrayRecord files for `array_record_data_source`, +users must set the `group_size` to 1. +``` -**Trade-offs:** -- **Larger group_size**: Better compression ratio, but slower random access -- **Smaller group_size**: Faster random access, but less compression +### Compression +ArrayRecord provides support for multiple compression algorithms. Users select +the algorithm by `algo[:level]`, where `algo` is one of the following: `zstd`, +`brotli`, `snappy`, or `uncompressed`, and the `level` specifies the compression +level which differs by the algorithm type. + +#### Zstd (Default) +Zstandard (zstd) is a fast, lossless data compression algorithm developed by +Yann Collet at Facebook (now Meta). It is engineered to strike an effective +balance between compression speed and compression ratio, rendering it suitable +for a broad spectrum of applications. The configurable tuning parameters are: + +- Levels -131072 to 22 (default: 3). A higher numerical value corresponds to a + greater compression ratio, while negative values activate "fast" compression + algorithms. Level 0 is functionally equivalent to the default compression + level, which is 3. +- `window_log` 10 to 30 (default 20). This represents the logarithm of the LZ77 + sliding window size. This option tunes the trade-off between compression + density and memory utilization (where a higher value results in improved + density but increased memory consumption). + +The following example configuration is optimized for rapid random access with a +minimal memory footprint for the compression sliding window: ```python -# High compression, slower random access -writer = ArrayRecordWriter('file.array_record', 'group_size:10000') - -# Fast random access, less compression -writer = ArrayRecordWriter('file.array_record', 'group_size:100') +writer = array_record_module.ArrayRecordWriter( + 'output.array_record', + 'group_size:1,zstd:1,window_log:12' +) ``` -## Compression Options +#### Brotli +Brotli is an open-source compression algorithm developed by Google that +typically yields superior compression ratios compared to gzip for web content, +thereby facilitating faster loading times and reduced bandwidth consumption. In +comparison to zstd, Brotli often achieves a better compression ratio but may +entail slower compression and decompression speeds. The configurable tuning +parameters are: -ArrayRecord supports multiple compression algorithms: +- Levels 0 to 11 (default: 6). A higher numerical value indicates a greater + compression ratio. +- `window_log` 10 to 30 (default 22). This represents the logarithm of the LZ77 + sliding window size. This option tunes the trade-off between compression + density and memory utilization (where a higher value results in improved + density but increased memory consumption). -### Brotli (Default) -- Best overall compression ratio -- Good balance of speed and size -- Levels 0-11 (default: 6) +The following is an example configuration tailored for a high compression ratio +suitable for batch access: ```python -writer = ArrayRecordWriter('file.array_record', 'brotli:9') # High compression +writer = ArrayRecordWriter( + 'output.array_record', + 'group_size:65536,brotli:9,window_log:26' +) ``` -### Zstd -- Very fast compression/decompression -- Good compression ratio -- Levels -131072 to 22 (default: 3) +#### Snappy +Snappy is a rapid and efficient compression/decompression library developed by +Google, where speed is prioritized over maximizing file size reduction. It is +extensively utilized in big data systems, such as Hadoop, Spark, and LevelDB, +where real-time processing capability is paramount. While Snappy possesses +certain tuning parameters, these are currently considered experimental, and thus +the tuning options have been disabled. + +The following represents an example Snappy configuration for random access: ```python -writer = ArrayRecordWriter('file.array_record', 'zstd:1') # Fast compression +writer = array_record_module.ArrayRecordWriter( + 'output.array_record', + 'group_size:1,snappy' +) ``` -### Snappy -- Extremely fast compression/decompression -- Lower compression ratio -- No compression levels +#### Uncompressed +Finally, ArrayRecord offers the uncompressed option. It is important to note +that for optimal fast random access, the user should still specify a group size +of 1 to minimize I/O operations. ```python -writer = ArrayRecordWriter('file.array_record', 'snappy') +writer = ArrayRecordWriter('output.array_record', 'group_size:1,uncompressed') ``` -### Uncompressed -- No compression overhead -- Largest file size -- Fastest access +## Read-time configuration + +ArrayRecord furnishes two distinct APIs for data retrieval: +- `array_record_module.Reader`: This component offers a direct, one-to-one + correspondence with the underlying C++ API, thereby providing comprehensive + support for all operational scenarios, including random access, batch access, + and sequential read operations. +- `array_record_data_source`: This serves as a multi-file adapter specifically + designed for integration with pygrain, primarily architected to facilitate + random access to the encapsulated records. + +Configuration of the `array_record_module.Reader` options is executed using a +syntax analogous to that of the writer: a string of comma-separated key-value +pairs. Conversely, for `array_record_data_source`, the options are specified via a +Python dictionary. The following code snippet illustrates the initialization of +both reader types: ```python -writer = ArrayRecordWriter('file.array_record', 'uncompressed') +from array_record.python import array_record_module +from array_record.python import array_record_data_source + +reader = array_record_module.ArrayRecordReader( + 'output.array_record', + 'index_storage_option:offloaded,readahead_buffer_size=0' +) +ds = array_record_data_source( + 'output.array_record', + reader_options={ + 'index_storage_option': 'offloaded', + } +) ``` -## Access Patterns +### Index Storage Option -### Sequential Access +The `index_storage_option` constitutes the primary tuning variable for +optimizing read-time performance. Two options are currently available: -Optimized by default with readahead buffering: +- `in_memory` (default): This configuration facilitates rapid random access by + loading the chunk offset index directly into memory. However, this may result + in a substantial memory footprint if the number of records is exceptionally + large, particularly when utilized with ArrayRecordDataSource, where the + indices from multiple ArrayRecord files would collectively reside in memory. +- `offloaded`: In this mode, the chunk offset index is not loaded into memory; + instead, it is retrieved from the disk for each access operation. -```python -# Default settings optimize for sequential access -data_source = ArrayRecordDataSource('file.array_record') +### Read-Ahead for Sequential Access -for i in range(len(data_source)): - record = data_source[i] - # Process record sequentially +```{caution} +This feature is only available in `array_record_module.Reader` and is not supported by `array_record_data_source`. ``` -### Random Access - -Disable readahead for pure random access: +Sequential access is an access pattern employed in the majority of Google's +storage formats. This paradigm involves a user opening a file and iteratively +invoking a read() operation to retrieve the file content and advance the +internal cursor until the end-of-file is reached. See the demonstration below: ```python -reader_options = { - 'readahead_buffer_size': '0', - 'max_parallelism': '0', -} - -data_source = ArrayRecordDataSource('file.array_record', reader_options=reader_options) - -# Now random access is optimized -import random -random_index = random.randint(0, len(data_source) - 1) -record = data_source[random_index] +from array_record.python import array_record_module + +reader = array_record_module.ArrayRecordReader( + 'output.array_record', + 'readahead_buffer_size:65536,max_parallelism:8' +) +for _ in range(reader.num_records()): + record = reader.read() ``` -### Batch Access +Given the predictable nature of this access pattern, ArrayRecord incorporates an +integrated read-ahead system to prefetch subsequent records. The following +parameters govern the configuration of this read-ahead functionality: -Read multiple records efficiently: +- `readahead_buffer_size`: This specifies the size of the read-ahead buffer per + thread, measured in bytes. Setting this parameter to zero effectively disables + the read-ahead mechanism. +- `max_parallelism`: This parameter dictates the number of dedicated threads + utilized for read-ahead prefetching. -```python -# Read specific indices -indices = [10, 100, 1000, 5000] -batch = data_source[indices] -# Read a range -range_batch = data_source[100:200] # If supported by implementation -``` +## Access Patterns -## Parallel Processing +ArrayRecord is designed to efficiently support three primary data access +patterns. Following these best practices for each pattern can help you optimize +performance and resource utilization. -ArrayRecord supports parallel operations: +### Random Access +Random access is used when you need to quickly retrieve individual, +non-contiguous records from the dataset. To optimize for this: + +- Configure the writer to set `group_size:1`. This ensures each record is a + self-contained group, minimizing the data read for a single lookup. +- Enable compression (e.g., `zstd`) for text and numerical data to reduce file + size and improve I/O speed. Compression is not typically needed for data + already compressed by specific algorithms. +- For image data (e.g., JPEG, PNG), which is typically compressed using + format-specific algorithms, it is better to keep it in its original binary + form rather than applying an additional generic compression layer. +- The optimal compression algorithm and level require experimentation, but the + default setting, `zstd:3`, provides a good balance of speed and ratio for most + general-purpose data. +- If the memory footprint of the index becomes a concern, configure the reader + to offload the index (`index_storage_option:offloaded`). This trades increase + in latency for lower memory consumption. +- Whenever possible, prefer batch access (even for non-contiguous records) to + leverage the underlying C++ thread pool for better overall performance due to + parallel I/O and decompression. -### Parallel Writing ```python -# Configure parallel writing -options = 'group_size:1000,max_parallelism:4' -writer = ArrayRecordWriter('file.array_record', options) +from array_record.python import array_record_module +from array_record.python import array_record_data_source + +# Writer with the default compression option, which is zstd:3 +writer = array_record_module.ArrayRecordWriter( + 'output.array_record', + 'group_size:1' +) + +# Reader/data-source with in-memory index +reader = array_record_module.ArrayRecordReader( 'output.array_record') +ds = array_record_data_source('output.array_record') + +# Reader/data-source with offloaded index +reader = array_record_module.ArrayRecordReader( + 'output.array_record', + 'index_storage_option:offloaded' +) +ds = array_record_data_source( + 'output.array_record', + reader_options={ + 'index_storage_option': 'offloaded', + } +) ``` -### Parallel Reading +### Sequential Access +Sequential access is used for iterating through the dataset in order (e.g., for +training loops). To optimize for this: + +- A larger `group_size` improves the compression ratio (by grouping more data) + and reduces the number of I/O operations needed to read the file. +- While optimal compression requires tuning, the default `zstd:3` is generally a + good starting point for generic data in sequential reads. +- Prefer using the Batch Access API over traditional prefetch mechanisms for + better thread utilization and throughput. + ```python -# Configure parallel reading -reader_options = { - 'max_parallelism': '4', - 'readahead_buffer_size': '16M', -} -data_source = ArrayRecordDataSource('file.array_record', reader_options=reader_options) +from array_record.python import array_record_module + +# Writer with the default compression option, which is zstd:3 +writer = array_record_module.ArrayRecordWriter( + 'output.array_record', + 'group_size:1024' +) + +# Optimize sequential access with a read-ahead buffer. +reader = array_record_module.ArrayRecordReader( + 'output.array_record', + 'readahead_buffer_size:16M' +) + +# Sequential access with read ahead buffer (slow) +for _ in range(reader.num_records()): + record = reader.read() + +# Read all data with a thread pool (fast) +records = reader.read_all() ``` -## Performance Considerations +### Batch Access +Batch access involves reading multiple records in a single function call, which +is the most efficient way to use ArrayRecord. -### Group Size Selection +- Batch access is the recommended way to use ArrayRecord for high-performance + reading. +- The API supports reading records using multiple methods: batch all (read the + entire file), batch range (read a contiguous subset), and batch with indices + (read an arbitrary set of records). +- When using an offloaded index reader with batch access, the index chunk is only + read once per batch invocation. This greatly reduces the amortized latency per + record compared to reading the index for every single record. -Choose group size based on your access pattern: +#### `array_record_module.ArrayRecordReader` batch access APIs ```python -# For mostly sequential access with some random access -writer = ArrayRecordWriter('file.array_record', 'group_size:1000') +from array_record.python import array_record_module -# For heavy random access -writer = ArrayRecordWriter('file.array_record', 'group_size:100') - -# For maximum compression (sequential only) -writer = ArrayRecordWriter('file.array_record', 'group_size:10000') -``` +reader = array_record_module.ArrayRecordReader('output.array_record') -### Memory Usage +# Read all the records +records = reader.read_all() -Control memory usage with buffer settings: +# Read records by range +records = reader.read(0, 100) -```python -# Low memory usage -reader_options = { - 'readahead_buffer_size': '1M', - 'max_parallelism': '1', -} - -# High performance (more memory) -reader_options = { - 'readahead_buffer_size': '64M', - 'max_parallelism': '8', -} +# Read records by indices +records = reader.read([0, 1, 100]) ``` -## Data Types and Serialization - -### Binary Data +#### `array_record_data_source` batch access APIs ```python -# Raw bytes -writer.write(b'binary data') - -# Numpy arrays -import numpy as np -arr = np.array([1, 2, 3, 4, 5]) -writer.write(arr.tobytes()) -``` +from array_record.python import array_record_data_source -### Protocol Buffers -```python -# Assuming you have a protobuf message -message = MyProtoMessage() -message.field = "value" -writer.write(message.SerializeToString()) -``` +ds = array_record_data_source.ArrayRecordDataSource('output.array_record') -### JSON Data -```python -import json -data = {"key": "value", "number": 42} -writer.write(json.dumps(data).encode('utf-8')) +# Read records by indices +records = ds.__getitems__([10, 100, 200]) ``` - -## Best Practices - -### Writing -1. Always call `close()` on writers -2. Choose appropriate group size for your access pattern -3. Use compression for storage efficiency -4. Consider using context managers for automatic cleanup - -### Reading -1. Configure reader options based on access pattern -2. Use batch reading for better performance -3. Cache frequently accessed records -4. Use appropriate parallelism settings - -### File Management -1. Use descriptive file extensions (e.g., `.array_record`) -2. Include metadata in filenames when helpful -3. Consider file size limits for your storage system -4. Plan for concurrent access if needed - -## Integration with Other Systems - -### Apache Beam -ArrayRecord integrates seamlessly with Apache Beam for large-scale data processing. See [Beam Integration](beam_integration.md) for details. - -### Machine Learning Frameworks -ArrayRecord works well with: -- TensorFlow (via tf.data) -- JAX/Grain (native support) -- PyTorch (via custom datasets) - -### Cloud Storage -ArrayRecord files can be stored on: -- Local filesystems -- Google Cloud Storage (via Beam) -- Amazon S3 (via appropriate readers) -- Network filesystems (NFS, etc.) diff --git a/docs/cpp_reference.rst b/docs/cpp_reference.rst deleted file mode 100644 index 2913925..0000000 --- a/docs/cpp_reference.rst +++ /dev/null @@ -1,436 +0,0 @@ -C++ API Reference -================== - -ArrayRecord provides a high-performance C++ API built on top of Riegeli. The C++ API offers the best performance for applications that can integrate directly with C++. - -.. note:: - The C++ API is primarily for advanced users who need maximum performance or are integrating ArrayRecord into C++ applications. Most users should use the Python API. - -Core Classes ------------- - -ArrayRecordReader -~~~~~~~~~~~~~~~~~ - -The ``ArrayRecordReader`` class provides high-performance reading of ArrayRecord files with support for parallel processing and random access. - -**Header**: ``cpp/array_record_reader.h`` - -**Template Declaration**: - -.. code-block:: cpp - - template - class ArrayRecordReader : public ArrayRecordReaderBase - -**Basic Usage**: - -.. code-block:: cpp - - #include "cpp/array_record_reader.h" - #include "riegeli/bytes/fd_reader.h" - - // Read from file - ArrayRecordReader reader(riegeli::Maker("input.array_record")); - - if (!reader.ok()) { - // Handle error - return reader.status(); - } - - std::cout << "Total records: " << reader.NumRecords() << std::endl; - - // Read records sequentially - absl::string_view record; - while (reader.ReadRecord(&record)) { - // Process record - std::cout << "Record: " << record << std::endl; - } - - if (!reader.Close()) { - return reader.status(); - } - -ArrayRecordReaderBase -~~~~~~~~~~~~~~~~~~~~~ - -Base class containing template-independent functionality. - -**Key Methods**: - -.. code-block:: cpp - - class ArrayRecordReaderBase : public riegeli::Object { - public: - // Get total number of records - uint64_t NumRecords() const; - - // Get current record index - uint64_t RecordIndex() const; - - // Seek to specific record - bool SeekRecord(uint64_t record_index); - - // Read current record - bool ReadRecord(absl::string_view* record); - bool ReadRecord(google::protobuf::MessageLite* record); - - // Parallel reading methods - absl::Status ParallelReadRecords( - absl::FunctionRef callback) const; - - absl::Status ParallelReadRecordsWithIndices( - absl::Span indices, - absl::FunctionRef callback) const; - - absl::Status ParallelReadRecordsInRange( - uint64_t begin, uint64_t end, - absl::FunctionRef callback) const; - }; - -**Options Class**: - -.. code-block:: cpp - - class ArrayRecordReaderBase::Options { - public: - // Parse options from string - static absl::StatusOr FromString(absl::string_view text); - - // Set readahead buffer size (default: 16MB, set to 0 for random access) - Options& set_readahead_buffer_size(uint64_t size); - - // Set maximum parallelism (default: auto, set to 0 for random access) - Options& set_max_parallelism(std::optional parallelism); - - // Set index storage option - Options& set_index_storage_option(IndexStorageOption option); - }; - -ArrayRecordWriter -~~~~~~~~~~~~~~~~~ - -The ``ArrayRecordWriter`` class provides high-performance writing of ArrayRecord files with configurable compression and parallel processing. - -**Header**: ``cpp/array_record_writer.h`` - -**Template Declaration**: - -.. code-block:: cpp - - template - class ArrayRecordWriter : public ArrayRecordWriterBase - -**Basic Usage**: - -.. code-block:: cpp - - #include "cpp/array_record_writer.h" - #include "riegeli/bytes/fd_writer.h" - - // Write to file - ArrayRecordWriter writer(riegeli::Maker("output.array_record")); - - if (!writer.ok()) { - return writer.status(); - } - - // Write records - for (int i = 0; i < 1000; ++i) { - std::string record = absl::StrCat("Record ", i); - if (!writer.WriteRecord(record)) { - return writer.status(); - } - } - - if (!writer.Close()) { - return writer.status(); - } - -ArrayRecordWriterBase -~~~~~~~~~~~~~~~~~~~~~ - -Base class containing template-independent functionality. - -**Key Methods**: - -.. code-block:: cpp - - class ArrayRecordWriterBase : public riegeli::Object { - public: - // Write records of various types - bool WriteRecord(const google::protobuf::MessageLite& record); - bool WriteRecord(absl::string_view record); - bool WriteRecord(const absl::Cord& record); - bool WriteRecord(const void* data, size_t num_bytes); - - template - bool WriteRecord(absl::Span record); - }; - -**Options Class**: - -.. code-block:: cpp - - class ArrayRecordWriterBase::Options { - public: - // Parse options from string - static absl::StatusOr FromString(absl::string_view text); - - // Set group size (number of records per chunk) - Options& set_group_size(uint32_t group_size); - - // Set maximum parallelism - Options& set_max_parallelism(std::optional parallelism); - - // Compression options - Options& set_uncompressed(); - Options& set_brotli(int level = kDefaultBrotli); - Options& set_zstd(int level = kDefaultZstd); - Options& set_snappy(); - - // Advanced options - Options& set_transpose(bool transpose); - Options& set_transpose_bucket_size(uint64_t size); - Options& set_window_log(std::optional window_log); - Options& set_pad_to_block_boundary(bool pad); - Options& set_metadata(const std::optional& metadata); - }; - -Usage Examples --------------- - -Reading with Different Sources -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. code-block:: cpp - - #include "cpp/array_record_reader.h" - #include "riegeli/bytes/string_reader.h" - #include "riegeli/bytes/cord_reader.h" - #include "riegeli/bytes/fd_reader.h" - - // Read from string - std::string data = /* ... */; - ArrayRecordReader string_reader(riegeli::Maker(data)); - - // Read from cord - absl::Cord cord = /* ... */; - ArrayRecordReader cord_reader(riegeli::Maker(&cord)); - - // Read from file - ArrayRecordReader file_reader(riegeli::Maker("file.array_record")); - -Writing with Different Destinations -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -.. code-block:: cpp - - #include "cpp/array_record_writer.h" - #include "riegeli/bytes/string_writer.h" - #include "riegeli/bytes/cord_writer.h" - #include "riegeli/bytes/fd_writer.h" - - // Write to string - std::string output; - ArrayRecordWriter string_writer(riegeli::Maker(&output)); - - // Write to cord - absl::Cord cord; - ArrayRecordWriter cord_writer(riegeli::Maker(&cord)); - - // Write to file - ArrayRecordWriter file_writer(riegeli::Maker("output.array_record")); - -Parallel Reading -~~~~~~~~~~~~~~~~ - -.. code-block:: cpp - - ArrayRecordReader reader(riegeli::Maker("large_file.array_record")); - - // Read all records in parallel - absl::Status status = reader.ParallelReadRecords( - [](uint64_t record_index, absl::string_view record_data) -> absl::Status { - // Process record - std::cout << "Record " << record_index << ": " << record_data << std::endl; - return absl::OkStatus(); - }); - - if (!status.ok()) { - std::cerr << "Error: " << status << std::endl; - } - - // Read specific indices - std::vector indices = {10, 100, 1000, 5000}; - status = reader.ParallelReadRecordsWithIndices( - indices, - [&indices](uint64_t indices_index, absl::string_view record_data) -> absl::Status { - uint64_t record_index = indices[indices_index]; - std::cout << "Record " << record_index << ": " << record_data << std::endl; - return absl::OkStatus(); - }); - - // Read a range - status = reader.ParallelReadRecordsInRange( - 1000, 2000, // Read records 1000-1999 - [](uint64_t record_index, absl::string_view record_data) -> absl::Status { - std::cout << "Record " << record_index << ": " << record_data << std::endl; - return absl::OkStatus(); - }); - -Protocol Buffer Support -~~~~~~~~~~~~~~~~~~~~~~~ - -.. code-block:: cpp - - #include "your_proto.pb.h" - - // Writing protocol buffers - ArrayRecordWriter writer(riegeli::Maker("protos.array_record")); - - YourProtoMessage message; - message.set_field("value"); - writer.WriteRecord(message); - - // Reading protocol buffers - ArrayRecordReader reader(riegeli::Maker("protos.array_record")); - - // Parallel reading with automatic proto parsing - absl::Status status = reader.ParallelReadRecords( - [](uint64_t record_index, YourProtoMessage proto) -> absl::Status { - std::cout << "Record " << record_index << ": " << proto.field() << std::endl; - return absl::OkStatus(); - }); - -Advanced Configuration -~~~~~~~~~~~~~~~~~~~~~~ - -.. code-block:: cpp - - // Writer with custom options - ArrayRecordWriterBase::Options writer_options; - writer_options - .set_group_size(1000) - .set_brotli(9) // High compression - .set_max_parallelism(4) - .set_transpose(true); // For proto messages - - ArrayRecordWriter writer( - riegeli::Maker("optimized.array_record"), - writer_options); - - // Reader optimized for random access - ArrayRecordReaderBase::Options reader_options; - reader_options - .set_readahead_buffer_size(0) // Disable readahead - .set_max_parallelism(0) // Disable parallel readahead - .set_index_storage_option( - ArrayRecordReaderBase::Options::IndexStorageOption::kInMemory); - - ArrayRecordReader reader( - riegeli::Maker("data.array_record"), - reader_options); - -Error Handling --------------- - -Always check for errors in C++ code: - -.. code-block:: cpp - - ArrayRecordWriter writer(riegeli::Maker("output.array_record")); - - if (!writer.ok()) { - std::cerr << "Failed to create writer: " << writer.status() << std::endl; - return -1; - } - - if (!writer.WriteRecord("test data")) { - std::cerr << "Failed to write record: " << writer.status() << std::endl; - return -1; - } - - if (!writer.Close()) { - std::cerr << "Failed to close writer: " << writer.status() << std::endl; - return -1; - } - -Thread Safety -------------- - -- ``ArrayRecordReader`` and ``ArrayRecordWriter`` are **thread-compatible** but not **thread-safe** -- Multiple readers can safely read from the same file simultaneously if each has its own instance -- Writers should not be accessed from multiple threads without external synchronization -- The parallel reading methods handle their own thread safety internally - -Performance Considerations --------------------------- - -Reading Performance -~~~~~~~~~~~~~~~~~~~ - -.. code-block:: cpp - - // For sequential access (default) - ArrayRecordReaderBase::Options sequential_options; - sequential_options.set_readahead_buffer_size(64 * 1024 * 1024); // 64MB - - // For random access - ArrayRecordReaderBase::Options random_options; - random_options - .set_readahead_buffer_size(0) - .set_max_parallelism(0); - -Writing Performance -~~~~~~~~~~~~~~~~~~~ - -.. code-block:: cpp - - // High throughput writing - ArrayRecordWriterBase::Options high_throughput; - high_throughput - .set_group_size(10000) // Larger groups for better compression - .set_max_parallelism(8) // More parallel encoders - .set_brotli(3); // Lower compression for speed - - // Balanced performance - ArrayRecordWriterBase::Options balanced; - balanced - .set_group_size(1000) - .set_brotli(6); // Default compression - -Build Integration ------------------ - -To use ArrayRecord in your C++ project, you'll need to integrate with the build system: - -Bazel -~~~~~ - -.. code-block:: starlark - - cc_binary( - name = "my_app", - srcs = ["my_app.cc"], - deps = [ - "//cpp:array_record_reader", - "//cpp:array_record_writer", - "@com_google_riegeli//riegeli/bytes:fd_reader", - "@com_google_riegeli//riegeli/bytes:fd_writer", - ], - ) - -CMake -~~~~~ - -.. code-block:: cmake - - find_package(ArrayRecord REQUIRED) - find_package(Riegeli REQUIRED) - - target_link_libraries(my_app - ArrayRecord::array_record_reader - ArrayRecord::array_record_writer - Riegeli::riegeli - ) diff --git a/docs/examples.md b/docs/examples.md deleted file mode 100644 index c9cd4f1..0000000 --- a/docs/examples.md +++ /dev/null @@ -1,811 +0,0 @@ -# Examples - -This section provides practical examples of using ArrayRecord in various scenarios, from basic file operations to advanced machine learning workflows. - -## Basic File Operations - -### Writing and Reading Simple Data - -```python -from array_record.python import array_record_module, array_record_data_source - -# Writing data -writer = array_record_module.ArrayRecordWriter('simple_data.array_record') - -# Write different types of data -data_samples = [ - b"Hello, ArrayRecord!", - b"This is record 2", - b"Binary data: \x00\x01\x02\x03", - "Unicode text: 你好".encode('utf-8') -] - -for data in data_samples: - writer.write(data) - -writer.close() - -# Reading data back -with array_record_data_source.ArrayRecordDataSource('simple_data.array_record') as ds: - print(f"Total records: {len(ds)}") - - for i in range(len(ds)): - record = ds[i] - print(f"Record {i}: {record}") -``` - -### Working with JSON Data - -```python -import json -from array_record.python import array_record_module, array_record_data_source - -# Sample data -users = [ - {"id": 1, "name": "Alice", "age": 30, "email": "alice@example.com"}, - {"id": 2, "name": "Bob", "age": 25, "email": "bob@example.com"}, - {"id": 3, "name": "Charlie", "age": 35, "email": "charlie@example.com"}, -] - -# Writing JSON data -writer = array_record_module.ArrayRecordWriter('users.array_record') - -for user in users: - json_bytes = json.dumps(user).encode('utf-8') - writer.write(json_bytes) - -writer.close() - -# Reading JSON data -with array_record_data_source.ArrayRecordDataSource('users.array_record') as ds: - for i in range(len(ds)): - json_bytes = ds[i] - user = json.loads(json_bytes.decode('utf-8')) - print(f"User {user['id']}: {user['name']} ({user['age']} years old)") -``` - -### Working with tf.train.Example - -ArrayRecord integrates seamlessly with TensorFlow's `tf.train.Example` format for structured ML data: - -```python -import tensorflow as tf -import grain -import dataclasses -from array_record.python import array_record_module, array_record_data_source - -# Writing tf.train.Example records -def create_tf_example(text_data, is_tokens=False): - if is_tokens: - # Integer tokens - features = {'text': tf.train.Feature(int64_list=tf.train.Int64List(value=text_data))} - else: - # String text - features = {'text': tf.train.Feature(bytes_list=tf.train.BytesList(value=[text_data.encode('utf-8')]))} - - return tf.train.Example(features=tf.train.Features(feature=features)) - -# Write examples to ArrayRecord -writer = array_record_module.ArrayRecordWriter('tf_examples.array_record', 'group_size:1') - -# Write string examples -for text in ["The quick brown fox", "Machine learning is powerful", "ArrayRecord stores data efficiently"]: - example = create_tf_example(text) - writer.write(example.SerializeToString()) # Already bytes, no .encode() needed - -# Write token examples -for tokens in [[1, 2, 3, 4, 5], [6, 7, 8, 9], [10, 11, 12, 13, 14, 15]]: - example = create_tf_example(tokens, is_tokens=True) - writer.write(example.SerializeToString()) - -writer.close() -print("Created tf.train.Example dataset with string and token examples") - -# MaxText-style parsing with Grain -@dataclasses.dataclass -class ParseFeatures(grain.MapTransform): - """Parse tf.train.Example records (from MaxText).""" - def __init__(self, data_columns, tokenize): - self.data_columns = data_columns - self.dtype = tf.string if tokenize else tf.int64 - - def map(self, element): - return tf.io.parse_example( - element, - {col: tf.io.FixedLenSequenceFeature([], dtype=self.dtype, allow_missing=True) - for col in self.data_columns} - ) - -@dataclasses.dataclass -class NormalizeFeatures(grain.MapTransform): - """Normalize features (from MaxText).""" - def __init__(self, column_names, tokenize): - self.column_names = column_names - self.tokenize = tokenize - - def map(self, element): - if self.tokenize: - return {col: element[col].numpy()[0].decode() for col in self.column_names} - else: - return {col: element[col].numpy() for col in self.column_names} - -# Create MaxText-style training pipeline for strings -data_source = array_record_data_source.ArrayRecordDataSource('tf_examples.array_record') -string_dataset = ( - grain.MapDataset.source(data_source) - .map(ParseFeatures(['text'], tokenize=True)) # Parse tf.train.Example - .map(NormalizeFeatures(['text'], tokenize=True)) # Normalize features - .batch(batch_size=2) - .shuffle(seed=42) -) - -# Create MaxText-style training pipeline for tokens -token_dataset = ( - grain.MapDataset.source(data_source) - .map(ParseFeatures(['text'], tokenize=False)) # Parse tf.train.Example - .map(NormalizeFeatures(['text'], tokenize=False)) # Normalize features - .batch(batch_size=2) - .shuffle(seed=42) -) - -print("Created MaxText-style pipelines for both string and token data") -``` - -This example demonstrates the complete workflow for using tf.train.Example with ArrayRecord and MaxText-style data processing, supporting both string text and integer token formats. - -## Machine Learning Examples - -### Storing Image Data - -```python -import numpy as np -import json -from array_record.python import array_record_module, array_record_data_source -from PIL import Image -import io - -def create_image_dataset(): - """Create a dataset with images and metadata.""" - writer = array_record_module.ArrayRecordWriter( - 'image_dataset.array_record', - 'group_size:100,brotli:6' # Optimize for image data - ) - - # Generate sample images - for i in range(1000): - # Create a simple colored image - image = np.random.randint(0, 256, (64, 64, 3), dtype=np.uint8) - - # Convert to PNG bytes - pil_image = Image.fromarray(image) - img_buffer = io.BytesIO() - pil_image.save(img_buffer, format='PNG') - img_bytes = img_buffer.getvalue() - - # Create record with image and metadata - record = { - 'image_data': img_bytes.hex(), # Store as hex string - 'width': 64, - 'height': 64, - 'channels': 3, - 'label': i % 10, # Simple label - 'filename': f'image_{i:04d}.png' - } - - writer.write(json.dumps(record).encode('utf-8')) - - writer.close() - print("Image dataset created!") - -def read_image_dataset(): - """Read and process the image dataset.""" - with array_record_data_source.ArrayRecordDataSource('image_dataset.array_record') as ds: - print(f"Dataset contains {len(ds)} images") - - # Read a few samples - for i in range(5): - record_bytes = ds[i] - record = json.loads(record_bytes.decode('utf-8')) - - # Decode image - img_bytes = bytes.fromhex(record['image_data']) - image = Image.open(io.BytesIO(img_bytes)) - - print(f"Image {i}: {record['filename']}, " - f"size: {record['width']}x{record['height']}, " - f"label: {record['label']}") - -# Create and read the dataset -create_image_dataset() -read_image_dataset() -``` - -### Text Processing with Embeddings - -```python -import numpy as np -import json -from array_record.python import array_record_module, array_record_data_source - -def create_text_embedding_dataset(): - """Create a dataset with text and embeddings.""" - writer = array_record_module.ArrayRecordWriter( - 'text_embeddings.array_record', - 'group_size:1000,brotli:9' # High compression for text - ) - - # Sample texts - texts = [ - "The quick brown fox jumps over the lazy dog", - "Machine learning is transforming technology", - "ArrayRecord provides efficient data storage", - "Natural language processing enables understanding", - "Deep learning models require large datasets" - ] - - for i, text in enumerate(texts * 200): # Repeat for larger dataset - # Simulate text embedding (normally from a model like BERT) - embedding = np.random.randn(768).astype(np.float32) # 768-dim embedding - - record = { - 'id': i, - 'text': text, - 'embedding': embedding.tolist(), # Convert to list for JSON - 'length': len(text), - 'word_count': len(text.split()) - } - - writer.write(json.dumps(record).encode('utf-8')) - - writer.close() - print(f"Text embedding dataset created with {len(texts) * 200} records!") - -def search_similar_texts(query_embedding, top_k=5): - """Find similar texts using cosine similarity.""" - query_embedding = np.array(query_embedding) - similarities = [] - - with array_record_data_source.ArrayRecordDataSource('text_embeddings.array_record') as ds: - for i in range(len(ds)): - record_bytes = ds[i] - record = json.loads(record_bytes.decode('utf-8')) - - embedding = np.array(record['embedding']) - similarity = np.dot(query_embedding, embedding) / ( - np.linalg.norm(query_embedding) * np.linalg.norm(embedding) - ) - - similarities.append((similarity, record)) - - # Sort by similarity and return top-k - similarities.sort(key=lambda x: x[0], reverse=True) - return similarities[:top_k] - -# Create dataset and search -create_text_embedding_dataset() - -# Example search -query = np.random.randn(768).astype(np.float32) -results = search_similar_texts(query, top_k=3) - -print("Top similar texts:") -for similarity, record in results: - print(f"Similarity: {similarity:.4f}, Text: {record['text'][:50]}...") -``` - -## Apache Beam Examples - -### Large-Scale Data Conversion - -```python -import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions -from array_record.beam.pipelines import convert_tf_to_arrayrecord_gcs - -def convert_large_dataset(): - """Convert a large TFRecord dataset to ArrayRecord on GCS.""" - - # Configure pipeline for large dataset - pipeline_options = PipelineOptions([ - '--runner=DataflowRunner', - '--project=my-project', - '--region=us-central1', - '--temp_location=gs://my-bucket/temp', - '--staging_location=gs://my-bucket/staging', - '--max_num_workers=50', - '--worker_machine_type=n1-standard-4', - '--disk_size_gb=100' - ]) - - # Run conversion - pipeline = convert_tf_to_arrayrecord_gcs( - overwrite_extension=True, - file_path_suffix='.array_record', - args=[ - '--input', 'gs://large-dataset/tfrecords/*.tfrecord', - '--output', 'gs://processed-dataset/arrayrecords/' - ], - pipeline_options=pipeline_options - ) - - result = pipeline.run() - result.wait_until_finish() - print("Conversion completed!") - -# Run the conversion -convert_large_dataset() -``` - -### Custom Beam Pipeline - -```python -import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions -from array_record.python import array_record_module -import tempfile -import json - -class ProcessAndWriteArrayRecordDoFn(beam.DoFn): - """Custom DoFn to process data and write ArrayRecord files.""" - - def process(self, element): - batch_id, records = element - - # Create temporary ArrayRecord file - with tempfile.NamedTemporaryFile(suffix='.array_record', delete=False) as tmp_file: - writer = array_record_module.ArrayRecordWriter( - tmp_file.name, - 'group_size:1000,brotli:6' - ) - - processed_count = 0 - for record in records: - # Process each record - processed_record = self.process_record(record) - writer.write(json.dumps(processed_record).encode('utf-8')) - processed_count += 1 - - writer.close() - - yield { - 'batch_id': batch_id, - 'file_path': tmp_file.name, - 'record_count': processed_count - } - - def process_record(self, record): - """Apply custom processing to each record.""" - # Example: add processing timestamp and normalize text - processed = { - 'original': record, - 'processed_text': record.get('text', '').lower().strip(), - 'word_count': len(record.get('text', '').split()), - 'processing_timestamp': beam.utils.timestamp.Timestamp.now().to_utc_datetime().isoformat() - } - return processed - -def run_custom_pipeline(): - """Run a custom pipeline that processes data and creates ArrayRecord files.""" - - # Sample input data - sample_data = [ - {'id': 1, 'text': 'Hello World', 'category': 'greeting'}, - {'id': 2, 'text': 'Machine Learning', 'category': 'tech'}, - {'id': 3, 'text': 'Data Processing', 'category': 'tech'}, - ] * 100 # Repeat for larger dataset - - pipeline_options = PipelineOptions(['--runner=DirectRunner']) - - with beam.Pipeline(options=pipeline_options) as pipeline: - # Create input data - input_data = pipeline | 'CreateData' >> beam.Create(sample_data) - - # Group into batches - batched_data = ( - input_data - | 'AddKeys' >> beam.Map(lambda x: (x['category'], x)) - | 'GroupByCategory' >> beam.GroupByKey() - | 'CreateBatches' >> beam.Map(lambda x: (x[0], list(x[1]))) - ) - - # Process and write ArrayRecord files - results = ( - batched_data - | 'ProcessAndWrite' >> beam.ParDo(ProcessAndWriteArrayRecordDoFn()) - ) - - # Output results - results | 'PrintResults' >> beam.Map( - lambda x: print(f"Processed batch {x['batch_id']}: " - f"{x['record_count']} records -> {x['file_path']}") - ) - -run_custom_pipeline() -``` - -## Performance Optimization Examples - -### Optimizing for Different Access Patterns - -```python -from array_record.python import array_record_module, array_record_data_source -import time -import random - -def create_test_dataset(filename, num_records=10000): - """Create a test dataset for performance testing.""" - writer = array_record_module.ArrayRecordWriter(filename) - - for i in range(num_records): - data = f"Record {i}: " + "x" * random.randint(100, 1000) - writer.write(data.encode('utf-8')) - - writer.close() - -def benchmark_sequential_access(filename): - """Benchmark sequential access performance.""" - print("Benchmarking sequential access...") - - # Default settings (optimized for sequential access) - start_time = time.time() - - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - total_bytes = 0 - for i in range(len(ds)): - record = ds[i] - total_bytes += len(record) - - end_time = time.time() - - print(f"Sequential access: {end_time - start_time:.2f}s, " - f"{total_bytes / (1024*1024):.2f} MB processed") - -def benchmark_random_access(filename): - """Benchmark random access performance.""" - print("Benchmarking random access...") - - # Settings optimized for random access - reader_options = { - 'readahead_buffer_size': '0', - 'max_parallelism': '0' - } - - start_time = time.time() - - with array_record_data_source.ArrayRecordDataSource( - filename, reader_options=reader_options - ) as ds: - # Random access pattern - total_bytes = 0 - indices = random.sample(range(len(ds)), min(1000, len(ds))) - - for idx in indices: - record = ds[idx] - total_bytes += len(record) - - end_time = time.time() - - print(f"Random access: {end_time - start_time:.2f}s, " - f"{total_bytes / (1024*1024):.2f} MB processed") - -def benchmark_batch_access(filename): - """Benchmark batch access performance.""" - print("Benchmarking batch access...") - - start_time = time.time() - - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - batch_size = 100 - total_bytes = 0 - - for start_idx in range(0, len(ds), batch_size): - end_idx = min(start_idx + batch_size, len(ds)) - indices = list(range(start_idx, end_idx)) - - batch = ds[indices] - for record in batch: - total_bytes += len(record) - - end_time = time.time() - - print(f"Batch access: {end_time - start_time:.2f}s, " - f"{total_bytes / (1024*1024):.2f} MB processed") - -# Run performance benchmarks -test_file = 'performance_test.array_record' -create_test_dataset(test_file) - -benchmark_sequential_access(test_file) -benchmark_random_access(test_file) -benchmark_batch_access(test_file) -``` - -### Compression Comparison - -```python -from array_record.python import array_record_module, array_record_data_source -import os -import time - -def test_compression_options(data, base_filename): - """Test different compression options.""" - - compression_options = [ - ('uncompressed', 'uncompressed'), - ('snappy', 'snappy'), - ('brotli_fast', 'brotli:1'), - ('brotli_default', 'brotli:6'), - ('brotli_max', 'brotli:11'), - ('zstd_fast', 'zstd:1'), - ('zstd_default', 'zstd:3'), - ('zstd_max', 'zstd:22'), - ] - - results = [] - - for name, options in compression_options: - filename = f"{base_filename}_{name}.array_record" - - # Write with compression - start_time = time.time() - writer = array_record_module.ArrayRecordWriter(filename, options) - - for item in data: - writer.write(item.encode('utf-8')) - - writer.close() - write_time = time.time() - start_time - - # Check file size - file_size = os.path.getsize(filename) - - # Read back (for read performance) - start_time = time.time() - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - for i in range(len(ds)): - _ = ds[i] - read_time = time.time() - start_time - - results.append({ - 'name': name, - 'write_time': write_time, - 'read_time': read_time, - 'file_size_mb': file_size / (1024 * 1024), - 'compression_ratio': file_size / results[0]['file_size_mb'] if results else 1.0 - }) - - print(f"{name:15} | Write: {write_time:6.2f}s | " - f"Read: {read_time:6.2f}s | Size: {file_size/(1024*1024):7.2f} MB") - - return results - -# Generate test data -test_data = [] -for i in range(5000): - # Mix of repetitive and random data - if i % 2 == 0: - data = f"Repetitive data entry {i % 100}: " + "Lorem ipsum " * 20 - else: - data = f"Random entry {i}: " + os.urandom(200).hex() - test_data.append(data) - -print("Compression Performance Comparison") -print("=" * 70) -print(f"{'Method':<15} | {'Write Time':<12} | {'Read Time':<11} | {'File Size'}") -print("-" * 70) - -results = test_compression_options(test_data, 'compression_test') -``` - -## Integration Examples - -### Using with TensorFlow - -```python -import tensorflow as tf -from array_record.python import array_record_data_source -import json - -def create_tf_compatible_dataset(): - """Create ArrayRecord dataset compatible with TensorFlow.""" - from array_record.python import array_record_module - - writer = array_record_module.ArrayRecordWriter('tf_dataset.array_record') - - # Create sample data similar to tf.data format - for i in range(1000): - # Simulate image classification data - features = { - 'image': tf.random.normal([224, 224, 3]).numpy().tolist(), - 'label': i % 10, - 'id': i - } - writer.write(json.dumps(features).encode('utf-8')) - - writer.close() - -def create_tf_dataset_from_arrayrecord(filename): - """Create TensorFlow dataset from ArrayRecord file.""" - - def generator(): - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - for i in range(len(ds)): - record_bytes = ds[i] - record = json.loads(record_bytes.decode('utf-8')) - - # Convert back to tensors - image = tf.constant(record['image'], dtype=tf.float32) - label = tf.constant(record['label'], dtype=tf.int32) - - yield {'image': image, 'label': label} - - # Create TensorFlow dataset - dataset = tf.data.Dataset.from_generator( - generator, - output_signature={ - 'image': tf.TensorSpec(shape=[224, 224, 3], dtype=tf.float32), - 'label': tf.TensorSpec(shape=[], dtype=tf.int32) - } - ) - - return dataset - -# Create and use TensorFlow dataset -create_tf_compatible_dataset() -tf_dataset = create_tf_dataset_from_arrayrecord('tf_dataset.array_record') - -# Use in training pipeline -tf_dataset = tf_dataset.batch(32).prefetch(tf.data.AUTOTUNE) - -print("TensorFlow dataset created from ArrayRecord:") -for batch in tf_dataset.take(1): - print(f"Batch shape - Images: {batch['image'].shape}, Labels: {batch['label'].shape}") -``` - -### Using with PyTorch - -```python -import torch -from torch.utils.data import Dataset, DataLoader -from array_record.python import array_record_data_source -import json -import numpy as np - -class ArrayRecordDataset(Dataset): - """PyTorch Dataset wrapper for ArrayRecord files.""" - - def __init__(self, filename, transform=None): - self.filename = filename - self.transform = transform - - # Open data source - self.data_source = array_record_data_source.ArrayRecordDataSource(filename) - self.length = len(self.data_source) - - def __len__(self): - return self.length - - def __getitem__(self, idx): - # Read record - record_bytes = self.data_source[idx] - record = json.loads(record_bytes.decode('utf-8')) - - # Convert to PyTorch tensors - image = torch.tensor(record['image'], dtype=torch.float32) - label = torch.tensor(record['label'], dtype=torch.long) - - if self.transform: - image = self.transform(image) - - return image, label - - def __del__(self): - # Clean up - if hasattr(self, 'data_source'): - self.data_source.__exit__(None, None, None) - -def create_pytorch_dataloader(): - """Create PyTorch DataLoader from ArrayRecord file.""" - - # Create dataset - dataset = ArrayRecordDataset('tf_dataset.array_record') # Reuse from TF example - - # Create DataLoader - dataloader = DataLoader( - dataset, - batch_size=32, - shuffle=True, - num_workers=4, - pin_memory=True - ) - - return dataloader - -# Use with PyTorch -dataloader = create_pytorch_dataloader() - -print("PyTorch DataLoader created from ArrayRecord:") -for batch_idx, (images, labels) in enumerate(dataloader): - print(f"Batch {batch_idx}: Images shape: {images.shape}, Labels shape: {labels.shape}") - if batch_idx >= 2: # Just show first few batches - break -``` - -## Advanced Use Cases - -### Multi-Modal Data Storage - -```python -import json -import base64 -from array_record.python import array_record_module, array_record_data_source - -def create_multimodal_dataset(): - """Create a dataset with multiple data modalities.""" - - writer = array_record_module.ArrayRecordWriter( - 'multimodal_dataset.array_record', - 'group_size:100,brotli:6' - ) - - for i in range(500): - # Simulate multi-modal record - record = { - 'id': i, - 'text': f"This is sample text for item {i}", - 'image_data': base64.b64encode(np.random.bytes(1024)).decode('utf-8'), # Fake image - 'audio_features': np.random.randn(128).tolist(), # Audio features - 'metadata': { - 'timestamp': f"2024-01-{(i % 30) + 1:02d}T10:00:00Z", - 'source': 'synthetic', - 'quality_score': np.random.uniform(0.5, 1.0) - }, - 'labels': { - 'category': ['tech', 'science', 'art'][i % 3], - 'sentiment': np.random.choice(['positive', 'negative', 'neutral']), - 'confidence': np.random.uniform(0.7, 0.99) - } - } - - writer.write(json.dumps(record).encode('utf-8')) - - writer.close() - print("Multi-modal dataset created!") - -def query_multimodal_dataset(category_filter=None, min_quality=0.8): - """Query the multi-modal dataset with filters.""" - - results = [] - - with array_record_data_source.ArrayRecordDataSource('multimodal_dataset.array_record') as ds: - for i in range(len(ds)): - record_bytes = ds[i] - record = json.loads(record_bytes.decode('utf-8')) - - # Apply filters - if category_filter and record['labels']['category'] != category_filter: - continue - - if record['metadata']['quality_score'] < min_quality: - continue - - results.append(record) - - return results - -# Create and query multi-modal dataset -create_multimodal_dataset() - -# Query examples -tech_records = query_multimodal_dataset(category_filter='tech', min_quality=0.9) -print(f"Found {len(tech_records)} high-quality tech records") - -for record in tech_records[:3]: - print(f"ID: {record['id']}, Text: {record['text'][:50]}...") - print(f"Quality: {record['metadata']['quality_score']:.3f}, " - f"Sentiment: {record['labels']['sentiment']}") - print() -``` - - -This comprehensive set of examples covers the major use cases for ArrayRecord, from basic file operations to advanced machine learning workflows and integrations with popular frameworks. Each example includes practical code that can be adapted for specific needs. diff --git a/docs/index.rst b/docs/index.rst index 1ca6dc8..ee40bb4 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,7 +1,11 @@ ArrayRecord - High-Performance Data Storage ================================================== -ArrayRecord is a high-performance file format designed for machine learning workloads, derived from `Riegeli `_ and achieving a new frontier of IO efficiency. ArrayRecord supports parallel read, write, and random access by record index, making it ideal for machine learning workloads and large-scale data processing. +ArrayRecord is a high-performance file format designed for machine learning +workloads, derived from `Riegeli `_ and +achieving a new frontier of IO efficiency. ArrayRecord supports parallel read, +write, and random access by record index, making it ideal for machine learning +workloads and large-scale data processing. .. image:: https://img.shields.io/pypi/v/array-record-python.svg :target: https://pypi.org/project/array-record-python/ @@ -11,21 +15,8 @@ ArrayRecord is a high-performance file format designed for machine learning work :target: https://github.com/bzantium/array_record/blob/main/LICENSE :alt: License -Features --------- - -* **High Performance**: Optimized for both sequential and random access patterns -* **Parallel Processing**: Built-in support for concurrent read and write operations -* **Compression**: Multiple compression algorithms (Brotli, Zstd, Snappy) with configurable levels -* **Random Access**: Efficient random access by record index without full file scanning -* **Apache Beam Integration**: Seamless integration with Apache Beam for large-scale data processing -* **Cross-Platform**: Available for Linux (x86_64, aarch64) and macOS (aarch64) - -Quick Start ------------ - Installation -~~~~~~~~~~~~ +------------ .. code-block:: bash @@ -38,93 +29,115 @@ For Apache Beam integration: pip install array_record[beam] Basic Usage -~~~~~~~~~~~ +----------- Writing Records -^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~ .. code-block:: python from array_record.python import array_record_module # Create a writer - writer = array_record_module.ArrayRecordWriter('output.array_record', 'group_size:1000') - + # Use `group_size` to configure the access granularity. + # `group_size:1` is optimized for random access, while a larger group size + # increases compression ratio and improves the performance of sequential and + # batch access. + writer = array_record_module.ArrayRecordWriter('output.array_record', 'group_size:1') + # Write some records for i in range(10000): data = f"Record {i}".encode('utf-8') writer.write(data) - + # Close the writer (important!) writer.close() Reading Records -^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~ + +ArrayRecord provides two read APIs. One is at the file level +(``array_record_module``) which has one-to-one mapping of the underlying C++ API; +the other wraps around ``array_record_module`` for convenience to access +multiple ArrayRecord files (``array_record_data_source``). + +File-level read API ``array_record_module.reader`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python - from array_record.python import array_record_data_source + from array_record.python import array_record_module + + reader = array_record_module.ArrayRecordReader('output.array_record') + + # Read records sequentially with a read-ahead thread pool. + for _ in range(reader.num_records()): + record = reader.read() + + # Read all the records at once with an internal thread pool + records = reader.read_all() - # Create a data source - data_source = array_record_data_source.ArrayRecordDataSource('output.array_record') - - # Read all records - for i in range(len(data_source)): - record = data_source[i] - print(f"Record {i}: {record.decode('utf-8')}") + # Read records by range with an internal thread pool + records = reader.read(0, 100) - # Or read specific records - records = data_source[[0, 100, 500]] # Read records at indices 0, 100, and 500 + # Read records by indices with an internal thread pool + records = reader.read([0, 1, 100]) -Random Access -^^^^^^^^^^^^^ + # Releases the thread pool and the file handle. + reader.close() + +Multi-file read API ``array_record_data_source`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +ArrayRecord data source is designed specifically for pygrain. +It interfaces with multiple ArrayRecord files for random access. +To use ArrayRecord data source, the writer must specify the +``group_size`` to be 1. .. code-block:: python - # ArrayRecord supports efficient random access - with array_record_data_source.ArrayRecordDataSource('output.array_record') as ds: + import glob + from array_record.python import array_record_module + from array_record.python import array_record_data_source + + num_files = 5 + for i in range(num_files): + # array_record_data_source strictly require the group_size + # to be 1. + writer = array_record_module.ArrayRecordWriter( + f"output.array_record-{i:05d}-of-{num_files:05d}", + "group_size:1" + ) + for j in range(100): + data = f"File {i:05d}, Record {j}".encode("utf-8") + writer.write(data) + writer.close() + + # The data source object accepts multiple array record files + # as the input and creates a virtual concatenated view over the files + with array_record_data_source.ArrayRecordDataSource(glob.glob("output.array_record*")) as ds: # Get total number of records total_records = len(ds) - - # Read a specific record - record_1000 = ds[1000] - - # Read multiple specific records - batch = ds[[10, 20, 30, 40, 50]] -Table of Contents ------------------ + # Read a specific record from the files as if all the files were + # concatenated into a single virtual array. + record_1000 = ds[150] -.. toctree:: - :maxdepth: 2 - :caption: Getting Started + # Read multiple records. The records indexed in the same file would + # use a thread pool to speed up the reading performance. + batch = ds.__getitems__([10, 100, 200]) - installation - quickstart - performance + +Table of Contents +----------------- .. toctree:: :maxdepth: 2 :caption: User Guide core_concepts - beam_integration - examples - -.. toctree:: - :maxdepth: 2 - :caption: API Reference - + performance python_reference - beam_reference - cpp_reference - -.. toctree:: - :maxdepth: 1 - :caption: Development - - contributing - changelog Indices and tables ================== diff --git a/docs/installation.md b/docs/installation.md deleted file mode 100644 index ae2a930..0000000 --- a/docs/installation.md +++ /dev/null @@ -1,115 +0,0 @@ -# Installation - -## Requirements - -ArrayRecord requires Python 3.10 or later and is available for the following platforms: - -| Platform | x86_64 | aarch64 | -|----------|--------|---------| -| Linux | ✓ | ✓ | -| macOS | ✗ | ✓ | -| Windows | ✗ | ✗ | - -## Basic Installation - -Install ArrayRecord from PyPI: - -```bash -pip install array-record-python -``` - -## Optional Dependencies - -### Apache Beam Integration - -For large-scale data processing with Apache Beam: - -```bash -pip install array-record-python[beam] -``` - -This includes: -- `apache-beam[gcp]>=2.53.0` -- `google-cloud-storage>=2.11.0` -- `tensorflow>=2.20.0` - -### Development and Testing - -For development and testing: - -```bash -pip install array-record-python[test] -``` - -This includes: -- `jax` -- `grain` -- `tensorflow>=2.20.0` - -## Building from Source - -### Prerequisites - -- Python 3.10+ -- C++17 compatible compiler -- Bazel (for building C++ components) - -### Clone and Build - -```bash -git clone https://github.com/google/array_record.git -cd array_record -pip install -e . -``` - -### Building with Bazel - -```bash -# Build all targets -bazel build //... - -# Run tests -bazel test //... -``` - -## Verification - -Verify your installation: - -```python -import array_record.python.array_record_module as ar - -# Create a simple test -writer = ar.ArrayRecordWriter('/tmp/test.array_record') -writer.write(b'Hello, ArrayRecord!') -writer.close() - -reader = ar.ArrayRecordReader('/tmp/test.array_record') -print(f"Records: {reader.num_records()}") -reader.close() -``` - -## Common Issues - -### Import Errors - -If you encounter import errors, ensure you have the correct Python version and platform: - -```python -import sys -print(f"Python version: {sys.version}") -print(f"Platform: {sys.platform}") -``` - -### Missing Dependencies - -If you're using ArrayRecord with other libraries like TensorFlow or JAX, install them separately: - -```bash -pip install tensorflow>=2.20.0 -pip install jax -``` - -### Platform Compatibility - -ArrayRecord currently supports limited platforms. If you're on an unsupported platform, you may need to build from source or use a compatible environment like Docker. diff --git a/docs/performance.md b/docs/performance.md index a401728..f58f275 100644 --- a/docs/performance.md +++ b/docs/performance.md @@ -1,661 +1,161 @@ # Performance Guide -This guide covers performance optimization strategies for ArrayRecord, including configuration options, access patterns, and benchmarking techniques. +This guide covers how the configurations discussed in the previous section (core +concepts) affects the data compression ratio and read performance. Besides the +configurations discussed previously, the read performance can vary by other +factors: -## Understanding ArrayRecord Performance +1. File system. Local file systems and remote file systems (such as GCS) can + have different performance characteristics. Improving compression ratio on + remote file systems reduces both the storage and data transmission rate. + Conversely, the data transmission is basically free on a local file system. -ArrayRecord's performance characteristics depend on several factors: +2. Record data type. Text data is in general very compressible, but most media + formats (PNG, JPG, MP4, MPEG) are precompressed which do not benefit from + additional compression settings. Embedding data is moderately compressible, + but users may benefit more by applying custom quantization algorithms + instead. + +In this guide we only consider the most basic form: a local file system with +plain text generated by lorem ipsum python packages. Nevertheless, users should +still benefit from the benchmark results. -- **Access Pattern**: Sequential vs. random access -- **Group Size**: Number of records per compressed chunk -- **Compression Algorithm**: Trade-off between size and speed -- **Parallelism**: Number of concurrent operations -- **Buffer Sizes**: Memory usage vs. performance trade-offs -## Configuration for Different Use Cases +The lorem ipsum text data is generated with the following simple program: -### Writer Configuration +```python +from lorem_text import lorem -The writer configuration significantly impacts both file size and read performance. Choose the right settings based on your access patterns: - -#### Maximum Random Access Performance - -Use `group_size:1` for applications requiring ultra-fast random access: - -```python -from array_record.python import array_record_module - -# Ultra-fast random access (larger file size) -writer = array_record_module.ArrayRecordWriter( - 'random_access.array_record', - 'group_size:1' -) - -# Each record is stored individually for instant access -for i in range(10000): - data = f"Record {i}".encode('utf-8') - writer.write(data) -writer.close() - -# Benefits: -# - Zero decompression overhead for random access -# - Constant-time record retrieval O(1) -# - Ideal for ML training with random sampling -# -# Trade-offs: -# - Larger file size (no compression grouping) -# - Higher memory usage for index -``` - -#### Balanced Performance - -Use moderate group sizes for balanced performance: - -```python -# Balanced performance and compression -writer = array_record_module.ArrayRecordWriter( - 'balanced.array_record', - 'group_size:100,brotli:6' -) - -# Benefits: -# - Good compression ratio -# - Reasonable random access performance -# - Lower memory usage -``` - -#### Maximum Compression - -Use large group sizes for storage-optimized scenarios: - -```python -# Maximum compression (slower random access) -writer = array_record_module.ArrayRecordWriter( - 'compressed.array_record', - 'group_size:1000,brotli:9' -) - -# Benefits: -# - Smallest file size -# - Best for archival storage -# - Efficient for sequential access -# -# Trade-offs: -# - Slower random access (must decompress groups) -# - Higher CPU usage during reads -``` - -#### Configuration Comparison - -| Configuration | File Size | Random Access | Sequential | Use Case | -|---------------|-----------|---------------|------------|----------| -| `group_size:1` | Largest | Fastest | Fast | ML training, real-time | -| `group_size:100,brotli:6` | Medium | Good | Fast | General purpose | -| `group_size:1000,brotli:9` | Smallest | Slower | Fastest | Archival, batch processing | - -### Sequential Access (Default) - -Optimized for reading records in order: - -```python -from array_record.python import array_record_data_source - -# Default settings are optimized for sequential access -reader_options = { - 'readahead_buffer_size': '16MB', # Large buffer for prefetching - 'max_parallelism': 'auto', # Use available CPU cores -} - -data_source = array_record_data_source.ArrayRecordDataSource( - 'sequential_data.array_record', - reader_options=reader_options -) - -# Efficient sequential iteration -for i in range(len(data_source)): - record = data_source[i] - # Process record... -``` - -### Random Access - -Optimized for jumping to arbitrary record positions: - -```python -# Random access optimization -reader_options = { - 'readahead_buffer_size': '0', # Disable prefetching - 'max_parallelism': '0', # Disable parallel readahead - 'index_storage_option': 'in_memory' # Keep index in memory -} - -data_source = array_record_data_source.ArrayRecordDataSource( - 'random_data.array_record', - reader_options=reader_options -) - -# Efficient random access -import random -indices = random.sample(range(len(data_source)), 1000) -for idx in indices: - record = data_source[idx] - # Process record... -``` - -### Batch Processing - -Optimized for processing multiple records at once: - -```python -# Batch access with moderate buffering -reader_options = { - 'readahead_buffer_size': '4MB', - 'max_parallelism': '2', -} - -data_source = array_record_data_source.ArrayRecordDataSource( - 'batch_data.array_record', - reader_options=reader_options -) - -# Process in batches -batch_size = 1000 -for start in range(0, len(data_source), batch_size): - end = min(start + batch_size, len(data_source)) - indices = list(range(start, end)) - batch = data_source[indices] - - # Process batch efficiently - for record in batch: - # Process record... - pass -``` - -## Writing Performance - -### High Throughput Writing - -```python -from array_record.python import array_record_module - -# High throughput configuration -writer = array_record_module.ArrayRecordWriter( - 'high_throughput.array_record', - 'group_size:10000,max_parallelism:8,snappy' -) - -# Large group size for better compression and fewer I/O operations -# More parallel encoders for CPU utilization -# Snappy for fast compression -``` - -### Balanced Performance - -```python -# Balanced configuration -writer = array_record_module.ArrayRecordWriter( - 'balanced.array_record', - 'group_size:2000,max_parallelism:4,brotli:6' -) - -# Medium group size balances compression and access speed -# Moderate parallelism -# Default Brotli compression -``` - -### Low Latency Writing - -```python -# Low latency configuration -writer = array_record_module.ArrayRecordWriter( - 'low_latency.array_record', - 'group_size:100,max_parallelism:1,snappy' -) - -# Small group size for immediate availability -# Single thread to avoid coordination overhead -# Fast compression -``` - -## Compression Performance Comparison - -### Compression Algorithms - -| Algorithm | Compression Ratio | Compression Speed | Decompression Speed | Use Case | -|-----------|------------------|-------------------|---------------------|----------| -| Uncompressed | 1.0x | Fastest | Fastest | Maximum speed, unlimited storage | -| Snappy | 2-4x | Very Fast | Very Fast | High throughput, moderate storage | -| Brotli (level 1) | 3-5x | Fast | Fast | Balanced performance | -| Brotli (level 6) | 4-6x | Medium | Fast | Default choice | -| Brotli (level 11) | 5-7x | Slow | Fast | Maximum compression | -| Zstd (level 1) | 3-5x | Very Fast | Very Fast | Alternative to Snappy | -| Zstd (level 3) | 4-6x | Fast | Fast | Alternative to Brotli | -| Zstd (level 22) | 6-8x | Very Slow | Fast | Maximum compression | - -### Benchmark Example - -```python -import time -import os -from array_record.python import array_record_module, array_record_data_source - -def benchmark_compression(data, algorithms): - """Benchmark different compression algorithms.""" - results = {} - - for name, options in algorithms.items(): - filename = f'benchmark_{name}.array_record' - - # Write benchmark - start_time = time.time() - writer = array_record_module.ArrayRecordWriter(filename, options) - - for item in data: - writer.write(item) - - writer.close() - write_time = time.time() - start_time - - # File size - file_size = os.path.getsize(filename) - - # Read benchmark - start_time = time.time() - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - for i in range(len(ds)): - _ = ds[i] - read_time = time.time() - start_time - - results[name] = { - 'write_time': write_time, - 'read_time': read_time, - 'file_size_mb': file_size / (1024 * 1024), - } - - # Cleanup - os.remove(filename) - - return results - -# Test data -test_data = [f"Test record {i}: " + "x" * 100 for i in range(10000)] - -algorithms = { - 'uncompressed': 'uncompressed', - 'snappy': 'snappy', - 'brotli_1': 'brotli:1', - 'brotli_6': 'brotli:6', - 'brotli_11': 'brotli:11', - 'zstd_1': 'zstd:1', - 'zstd_3': 'zstd:3', - 'zstd_22': 'zstd:22', -} - -results = benchmark_compression([data.encode() for data in test_data], algorithms) - -# Print results -print(f"{'Algorithm':<15} {'Write (s)':<10} {'Read (s)':<9} {'Size (MB)':<10}") -print("-" * 50) -for name, metrics in results.items(): - print(f"{name:<15} {metrics['write_time']:<10.2f} " - f"{metrics['read_time']:<9.2f} {metrics['file_size_mb']:<10.2f}") -``` - -## Memory Usage Optimization - -### Low Memory Configuration - -```python -# Minimize memory usage -reader_options = { - 'readahead_buffer_size': '1MB', # Small buffer - 'max_parallelism': '1', # Single thread - 'index_storage_option': 'offloaded' # Store index on disk -} - -writer_options = 'group_size:500,max_parallelism:1' - -# Use context managers to ensure cleanup -with array_record_data_source.ArrayRecordDataSource( - 'data.array_record', reader_options=reader_options -) as ds: - # Process data with minimal memory footprint - for i in range(len(ds)): - record = ds[i] - # Process immediately and don't store - process_record_immediately(record) -``` - -### High Memory Configuration - -```python -# Use more memory for better performance -reader_options = { - 'readahead_buffer_size': '128MB', # Large buffer - 'max_parallelism': '8', # Many threads - 'index_storage_option': 'in_memory' # Keep index in memory -} - -writer_options = 'group_size:5000,max_parallelism:8' - -# Batch processing with large buffers -batch_size = 10000 -with array_record_data_source.ArrayRecordDataSource( - 'data.array_record', reader_options=reader_options -) as ds: - for start in range(0, len(ds), batch_size): - end = min(start + batch_size, len(ds)) - batch = ds[list(range(start, end))] - process_batch(batch) -``` - -## Parallel Processing - -### Multi-threaded Reading - -```python -import concurrent.futures -from array_record.python import array_record_data_source - -def process_chunk(args): - """Process a chunk of records.""" - filename, start_idx, end_idx = args - - # Each thread gets its own data source - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - results = [] - for i in range(start_idx, end_idx): - record = ds[i] - # Process record - result = len(record) # Example processing - results.append(result) - return results - -def parallel_process_file(filename, num_threads=4): - """Process file using multiple threads.""" - - # Get file size - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - total_records = len(ds) - - # Divide work among threads - chunk_size = total_records // num_threads - chunks = [] - - for i in range(num_threads): - start_idx = i * chunk_size - end_idx = min((i + 1) * chunk_size, total_records) - if i == num_threads - 1: # Last chunk gets remainder - end_idx = total_records - chunks.append((filename, start_idx, end_idx)) - - # Process in parallel - with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: - results = list(executor.map(process_chunk, chunks)) - - # Combine results - all_results = [] - for chunk_results in results: - all_results.extend(chunk_results) - - return all_results - -# Example usage -results = parallel_process_file('large_file.array_record', num_threads=8) -print(f"Processed {len(results)} records") -``` - -### Asynchronous Processing - -```python -import asyncio -from array_record.python import array_record_data_source - -async def async_process_records(filename, batch_size=1000): - """Asynchronously process records in batches.""" - - def process_batch_sync(batch): - # CPU-intensive processing - return [len(record) for record in batch] - - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - total_records = len(ds) - - tasks = [] - for start in range(0, total_records, batch_size): - end = min(start + batch_size, total_records) - indices = list(range(start, end)) - batch = ds[indices] - - # Process batch asynchronously - task = asyncio.get_event_loop().run_in_executor( - None, process_batch_sync, batch - ) - tasks.append(task) - - # Wait for all batches to complete - results = await asyncio.gather(*tasks) - - # Flatten results - all_results = [] - for batch_results in results: - all_results.extend(batch_results) - - return all_results - -# Run async processing -async def main(): - results = await async_process_records('large_file.array_record') - print(f"Processed {len(results)} records asynchronously") - -# asyncio.run(main()) # Uncomment to run -``` - -## Benchmarking Tools - -### Performance Measurement - -```python -import time -import psutil -import os -from contextlib import contextmanager -from array_record.python import array_record_data_source, array_record_module - -@contextmanager -def measure_performance(): - """Context manager to measure performance metrics.""" - process = psutil.Process() - - # Initial measurements - start_time = time.time() - start_cpu = process.cpu_percent() - start_memory = process.memory_info().rss / 1024 / 1024 # MB - - yield - - # Final measurements - end_time = time.time() - end_cpu = process.cpu_percent() - end_memory = process.memory_info().rss / 1024 / 1024 # MB - - print(f"Execution time: {end_time - start_time:.2f} seconds") - print(f"CPU usage: {end_cpu:.1f}%") - print(f"Memory usage: {end_memory:.1f} MB (Δ{end_memory - start_memory:+.1f} MB)") - -def benchmark_access_patterns(filename): - """Benchmark different access patterns.""" - - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - total_records = len(ds) - print(f"Benchmarking file with {total_records} records") - - # Sequential access - print("\n1. Sequential Access:") - with measure_performance(): - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - for i in range(min(1000, total_records)): - _ = ds[i] - - # Random access - print("\n2. Random Access:") - reader_options = {'readahead_buffer_size': '0', 'max_parallelism': '0'} - with measure_performance(): - with array_record_data_source.ArrayRecordDataSource( - filename, reader_options=reader_options - ) as ds: - import random - indices = random.sample(range(total_records), min(1000, total_records)) - for idx in indices: - _ = ds[idx] - - # Batch access - print("\n3. Batch Access:") - with measure_performance(): - with array_record_data_source.ArrayRecordDataSource(filename) as ds: - batch_size = 100 - for start in range(0, min(1000, total_records), batch_size): - end = min(start + batch_size, total_records) - indices = list(range(start, end)) - _ = ds[indices] - -# Example usage -# Create test file first -test_file = 'performance_test.array_record' -if not os.path.exists(test_file): - writer = array_record_module.ArrayRecordWriter(test_file) - for i in range(10000): - writer.write(f"Test record {i}: " + "x" * 200) - writer.close() - -benchmark_access_patterns(test_file) -``` - -## Performance Best Practices - -### General Guidelines - -1. **Choose the right group size**: - - Small (100-500): Fast random access, larger files - - Medium (1000-5000): Balanced performance - - Large (10000+): Best compression, sequential access only - -2. **Select appropriate compression**: - - Snappy: Maximum speed, moderate compression - - Brotli 1-3: Fast with good compression - - Brotli 6: Default balanced choice - - Brotli 9-11: Maximum compression, slower - -3. **Configure parallelism**: - - Sequential access: Use available CPU cores - - Random access: Disable parallelism - - Mixed access: Use moderate parallelism (2-4 threads) - -4. **Optimize buffer sizes**: - - Sequential: Large buffers (16-64MB) - - Random: No buffering (0) - - Batch: Medium buffers (4-16MB) - -### Platform-Specific Optimizations - -#### Linux -```python -# Use larger buffer sizes on Linux -reader_options = { - 'readahead_buffer_size': '64MB', - 'max_parallelism': '8' -} -``` - -#### macOS -```python -# More conservative settings for macOS -reader_options = { - 'readahead_buffer_size': '16MB', - 'max_parallelism': '4' -} +num_words = 200 +num_records = 65536 +records = [] +n_bytes = 0 +for _ in range(num_records): + record = lorem.words(num_words).encode("utf-8") + records.append(record) + n_bytes += len(record) ``` -### Cloud Storage Optimizations - -When working with cloud storage (GCS, S3): - -```python -# Optimize for cloud storage latency -reader_options = { - 'readahead_buffer_size': '32MB', # Larger buffers for network latency - 'max_parallelism': '4', # Moderate parallelism -} - -# Use larger group sizes for cloud storage -writer_options = 'group_size:5000,brotli:6,max_parallelism:4' -``` - -### Memory-Constrained Environments - -```python -# Minimal memory configuration -reader_options = { - 'readahead_buffer_size': '512KB', - 'max_parallelism': '1', - 'index_storage_option': 'offloaded' -} - -writer_options = 'group_size:200,snappy,max_parallelism:1' -``` - -## Troubleshooting Performance Issues - -### Common Issues and Solutions - -1. **Slow random access**: - - Disable readahead: `'readahead_buffer_size': '0'` - - Disable parallelism: `'max_parallelism': '0'` - - Use smaller group sizes - -2. **High memory usage**: - - Reduce buffer sizes - - Use offloaded index storage - - Reduce parallelism - -3. **Slow sequential access**: - - Increase buffer sizes - - Enable parallelism - - Use larger group sizes - -4. **Poor compression ratio**: - - Increase group size - - Use higher compression levels - - Enable transposition for protocol buffers - -### Profiling Tools - -Use Python profiling tools to identify bottlenecks: - -```python -import cProfile -import pstats - -def profile_arrayrecord_operations(): - """Profile ArrayRecord operations.""" - - def test_function(): - with array_record_data_source.ArrayRecordDataSource('test.array_record') as ds: - for i in range(1000): - _ = ds[i] - - # Profile the function - pr = cProfile.Profile() - pr.enable() - test_function() - pr.disable() - - # Print results - stats = pstats.Stats(pr) - stats.sort_stats('cumulative') - stats.print_stats(10) # Top 10 functions - -# profile_arrayrecord_operations() -``` +## Compression ratio -By following these performance guidelines and using the provided benchmarking tools, you can optimize ArrayRecord for your specific use case and achieve maximum performance. +We first consider the compression ratio which plays an important role in remote +file systems. We tested out four compression algorithms (zstd, brotli, snappy, +and uncompressed), different `group_size`, and optionally the compression levels +if the algorithm supports it. Note that the uncompressed file size can be larger +than the original data because of the metadata and indices. + +### `group_size:1` + +Can be used by both ArrayRecordReader and ArrayRecordDataSource. + +| Compression Algorithm | Compression size | +|-----------------------|------------------| +|zstd:1 | 54.66% | +|zstd:3 | 54.12% | +|zstd:5 | 53.58% | +|zstd:7 | 53.52% | +|brotli:1 | 55.74% | +|brotli:3 | 53.11% | +|brotli:5 | 53.72% | +|brotli:7 | 53.79% | +|snappy | 86.95% | +|uncompressed | 103.87% | + + +### `group_size:256` + +Can only be used by ArrayRecordReader + +| Compression Algorithm | Compression size | +|-----------------------|------------------| +|zstd:1 | 27.23% | +|zstd:3 | 25.88% | +|zstd:5 | 25.61% | +|zstd:7 | 24.87% | +|brotli:1 | 30.26% | +|brotli:3 | 27.50% | +|brotli:5 | 25.21% | +|brotli:7 | 24.06% | +|snappy | 38.49% | +|uncompressed | 100.29% | + + +For textual data, the difference between zstd and brotli across various +compression levels was found to be minimal. The `group_size` parameter +constitutes a more critical factor influencing the compression ratio. However, +for applications requiring random access, users are advised to maintain a +`group_size` value of 1. + +## Random access + +We now consider the read performance with random access. Datasets are created +with the previous write benchmark, and we use only the compression level 3 since +varying the compression level didn't affect the compression ratio significantly. + +The random access indices are generated with numpy random permutation: + +```python +import numpy as np +rng = np.random.default_rng(42) +num_records = 65536 # dataset size +indices = [int(v) for v in rng.permutations(num_records)] +``` + +|compression | reader type | individual access (qps) | batch access (qps) | +|------------|-------------|-------------------------|--------------------| +|zstd | ArrayRecordReader | 4,933 | 310,120 | +|zstd | ArrayRecordDataSource | 5,551 | 188,412 | +|brotli | ArrayRecordReader | 5,433 | 328,267 | +|brotli | ArrayRecordDataSource | 5,268 | 214,682 | +|snappy | ArrayRecordReader | 5,333 | 451,081 | +|snappy | ArrayRecordDataSource | 5,407 | 258,196 | +|uncompressed | ArrayRecordReader | 5,407 | 490,610 | +|uncompressed | ArrayRecordDataSource | 5,155 | 243,658 | + +The benchmark clearly demonstrates the superior performance afforded by batch +access compared to individual record access. The internal C++ thread pool +employs atomic counters to efficiently manage and track workload distribution +among threads. Consequently, even if individual access were implemented using +Python threads or processes, it would be unable to attain the same level of +efficiency. + + +It is important to note that although uncompressed data yields higher throughput +in this specific benchmark, this result only reflects local file access +efficiency. In remote file system environments, compressed records generally +provide superior throughput performance. + +## Sequential access + +Finally, we examine the sequential access APIs ArrayRecordReader provides. In +contrast to ArrayRecordDataSource, ArrayRecordReader accepts `group_size` larger +than 1 which affects the sequential access performance. + +We compare the sequential access with repeated calls to `read()` that uses +read-ahead threads for prefetching, and the `read_all()` API that uses a thread +pool to process all items concurrently. + +|compression | `group_size` | `read_all` (qps) | sequential read (qps) | +|------------|-------------|-------------------------|--------------------| +|zstd | 1 | 260,210 | 229,386 | +|zstd | 256 | 459,331 | 492,653 | +|brotli | 1 | 222,285 | 252,891 | +|brotli | 256 | 367,190 | 374,484 | +|snappy | 1 | 493,140 | 544,226 | +|snappy | 256 | 430,323 | 588,599 | +|uncompressed | 1 | 645,149 | 619,762 | +|uncompressed | 256 | 1,094,563 | 1,078,190 | + +To our surprise, the repeated calls to `read()` performs better than the +`read_all()` API. This may be due to the overhead of python objects creation of +`read_all()`. In C++ benchmark we typically see the opposite result. + +## Summary + +This guide has examined how the tuning parameters discussed within the core +concepts section influence both file compression ratio and read performance. +Although performance characteristics may exhibit variability dependent on the +dataset (e.g., media type, record size, record count) and the underlying file +system, setting `group_size:1` and utilizing the batch access API is anticipated +to provide optimal results for most users employing ArrayRecord for random +access operations. diff --git a/docs/python_reference.md b/docs/python_reference.md new file mode 100644 index 0000000..5e66981 --- /dev/null +++ b/docs/python_reference.md @@ -0,0 +1,160 @@ +# Python API Reference + + +## `array_record.python.array_record_module.ArrayRecordWriter` + +### `ArrayRecordWriter(path: str, options: str)` + +* `path` (str): File path where the ArrayRecord to be written. +* `options` (str, optional): Comma-separated options string. Default "" + +#### Options string format + +The options string can contain the following comma-separated options: + +* `group_size:N` - Number of records per chunk (default: 1) +* `uncompressed` - Disable compression +* `brotli[:N]` - Use Brotli compression with level N (0-11, default: 6) +* `zstd[:N]` - Use Zstd compression with level N (-131072 to 22, default: 3) +* `snappy` - Use Snappy compression +* `window_log:N` - LZ77 window size (10-31) for zstd and brotli. +* `pad_to_block_boundary:true/false` - Pad chunks to 64KB boundaries (default + false) + +User should only select one of the compression options `zstd`, `brotli`, +`snappy`, `uncompressed`, otherwise an error would be raised. + +### `ok() -> bool` + +Returns true when the writer object is having a healthy state. + +### `close()` + +Closes the file. May raise an error if it failed to do so. + +### `is_open() -> bool` + +Returns true when the file is opened. + +### `write(record: bytes)` + +Writes a record to the file. May raise an error if it failed to do so. + + +## `array_record.python.array_record_module.ArrayRecordReader` + +### `ArrayRecordReader(path: str, options: str)` + +* `path` (str): File path to read from. +* `options` (str, optional): Comma-separated options string. Default "" + +#### Options string format + +The options string can contain the following comma-separated options: + +* `readahead_buffer_size:N` - Number of bytes for read-ahead buffer size per + thread (default 0) +* `max_parallelism: N` - Number of read-ahead threads. +* `index_storage_options:in_memory/offloaded` - Specifies to store the record + index in memory or on disk (default: `in_memory`) + +### `ok() -> bool` + +Returns true when the reader object is having a healthy state. + +### `close()` + +Closes the file. May raise an error if it failed to do so. + +### `is_open() -> bool` + +Returns true when the file was opened. + +### `num_records() -> int` + +Returns the number of records in the file. + +### `record_index() -> int` + +Returns the current record index. This field is only relevant in the sequential +reading mode. + +### `writer_options_string() -> str` + +Returns the writer options string that was used when creating the ArrayRecord +file. + +### `seek(index: int)` + +Update the cursor to the specified index. Throws an error if the index was out +of bound. + +### `read() -> bytes` + +Reads a record and advance the cursor index by one. Throws an error if the +cursor reaches the end of the file. + +### `read(indices: Sequence) -> Sequence[bytes]` + +Reads the set of records specified by the input indices with an internal thread +pool. Throws an error if any of the index was out of bound. + +### `read(start: int, end: int) -> Sequence[bytes]` + +Reads the set of records by range with an internal thread pool. Throws an error +if the index was out of bound. + +### `read_all() -> Sequence[bytes]` + +Reads all records with an internal thread pool. Throws an error if the index was +out of bound. + +## `array_record.python.array_record_data_source.ArrayRecordDataSource` + +### `ArrayRecordDataSource(paths: Sequence[str], reader_options: str)` + +* `paths` (Sequence[str]): File paths to read from. +* `options` (str, optional): Comma-separated options string. Default "". See + `ArrayRecordReader` constructor options for details. + +### `__len__() -> int` + +Returns the number of records of all the array record files specified in the +constructor. + +```python +from array_record.python import array_record_data_source +ds = array_record_data_source.ArrayRecordDataSource(glob.glob("output.array_record*")) +len(ds) +``` + +### `__iter__() -> Iterator[bytes]` + +Iterator interface for data access. + +```python +from array_record.python import array_record_data_source +ds = array_record_data_source.ArrayRecordDataSource(glob.glob("output.array_record*")) +it = iter(ds) +record = next(it) +``` + +### `__getitem__(index: int) -> bytes` + +Reads a record at the specified index. + +```python +from array_record.python import array_record_data_source +ds = array_record_data_source.ArrayRecordDataSource(glob.glob("output.array_record*")) +ds[idx] +``` + +### `__getitems__(indices: Sequence[int]) -> Sequence[bytes]` + +Reads a set of records of the specified indices. + +```python +from array_record.python import array_record_data_source +ds = array_record_data_source.ArrayRecordDataSource(glob.glob("output.array_record*")) +ds.__getitems__(indices) +``` diff --git a/docs/python_reference.rst b/docs/python_reference.rst deleted file mode 100644 index 592659c..0000000 --- a/docs/python_reference.rst +++ /dev/null @@ -1,233 +0,0 @@ -Python API Reference -==================== - -This section documents the Python API for ArrayRecord. The Python interface provides high-level access to ArrayRecord functionality through two main modules. - -Core Module ------------ - -The core Python module provides the fundamental ArrayRecord functionality through C++ bindings. - -ArrayRecordWriter -~~~~~~~~~~~~~~~~~ - -The ArrayRecordWriter class is used to create ArrayRecord files with various compression options and parallel writing capabilities. - - The ArrayRecordWriter class is used to create ArrayRecord files. It supports various compression - options and parallel writing capabilities. - - **Constructor Parameters:** - - * ``path`` (str): File path where the ArrayRecord will be written - * ``options`` (str, optional): Comma-separated options string. Default: "" - - **Options String Format:** - - The options string can contain the following comma-separated options: - - * ``group_size:N`` - Number of records per chunk (default: 65536) - * ``max_parallelism:N`` - Maximum parallel threads (default: auto) - * ``saturation_delay_ms:N`` - Delay when queue is saturated (default: 10) - * ``uncompressed`` - Disable compression - * ``brotli:N`` - Use Brotli compression with level N (0-11, default: 6) - * ``zstd:N`` - Use Zstd compression with level N (-131072 to 22, default: 3) - * ``snappy`` - Use Snappy compression - * ``transpose:true/false`` - Enable/disable transposition for proto messages - * ``transpose_bucket_size:N`` - Bucket size for transposition - * ``window_log:N`` - LZ77 window size (10-31) - * ``pad_to_block_boundary:true/false`` - Pad chunks to 64KB boundaries - - **Example:** - - .. code-block:: python - - from array_record.python import array_record_module - - # Basic usage - writer = array_record_module.ArrayRecordWriter('output.array_record') - writer.write(b'Hello, World!') - writer.close() - - # With options - writer = array_record_module.ArrayRecordWriter( - 'compressed.array_record', - 'group_size:1000,brotli:9,max_parallelism:4' - ) - -ArrayRecordReader -~~~~~~~~~~~~~~~~~ - -The ArrayRecordReader class provides low-level sequential access to ArrayRecord files. - - The ArrayRecordReader class provides low-level sequential access to ArrayRecord files. - - **Constructor Parameters:** - - * ``path`` (str): File path to read from - * ``options`` (str, optional): Comma-separated options string. Default: "" - - **Options String Format:** - - * ``readahead_buffer_size:N`` - Buffer size for readahead (default: 16MB, set to 0 for random access) - * ``max_parallelism:N`` - Maximum parallel threads (default: auto, set to 0 for random access) - * ``index_storage_option:in_memory/offloaded`` - How to store the index (default: in_memory) - - **Example:** - - .. code-block:: python - - from array_record.python import array_record_module - - # Sequential reading - reader = array_record_module.ArrayRecordReader('input.array_record') - - print(f"Total records: {reader.num_records()}") - - # Read records sequentially - reader.seek(0) - while True: - record = reader.read_record() - if not record: - break - print(f"Record: {record}") - - reader.close() - - # Random access optimized - reader = array_record_module.ArrayRecordReader( - 'input.array_record', - 'readahead_buffer_size:0,max_parallelism:0' - ) - -Data Source Module ------------------- - -The data source module provides high-level random access to ArrayRecord files. - -ArrayRecordDataSource -~~~~~~~~~~~~~~~~~~~~~ - -The ArrayRecordDataSource class provides high-level random access to ArrayRecord files with support for indexing and slicing. - - The ArrayRecordDataSource class provides high-level random access to ArrayRecord files. - It implements a Python sequence interface with support for indexing and slicing. - - **Constructor Parameters:** - - * ``paths`` (str, pathlib.Path, FileInstruction, or list): Path(s) to ArrayRecord file(s) - * ``reader_options`` (dict, optional): Dictionary of reader options - - **Reader Options:** - - * ``readahead_buffer_size`` (str): Buffer size (e.g., "16MB", "0" for random access) - * ``max_parallelism`` (str): Number of parallel threads (e.g., "4", "0" for random access) - * ``index_storage_option`` (str): "in_memory" or "offloaded" - - **Example:** - - .. code-block:: python - - from array_record.python import array_record_data_source - - # Basic usage - data_source = array_record_data_source.ArrayRecordDataSource('data.array_record') - - # Get number of records - print(f"Total records: {len(data_source)}") - - # Read single record - first_record = data_source[0] - - # Read multiple records - batch = data_source[[0, 10, 100]] - - # Context manager usage - with array_record_data_source.ArrayRecordDataSource('data.array_record') as ds: - records = ds[0:10] # Read first 10 records - - # Multiple files - files = ['part-00000.array_record', 'part-00001.array_record'] - data_source = array_record_data_source.ArrayRecordDataSource(files) - - # Optimized for random access - reader_options = { - 'readahead_buffer_size': '0', - 'max_parallelism': '0' - } - data_source = array_record_data_source.ArrayRecordDataSource( - 'data.array_record', - reader_options=reader_options - ) - -FileInstruction -~~~~~~~~~~~~~~~ - -FileInstruction allows you to specify a subset of records to read from a file, which can significantly speed up initialization when you only need part of a large file. - - FileInstruction allows you to specify a subset of records to read from a file, - which can significantly speed up initialization when you only need part of a large file. - - **Example:** - - .. code-block:: python - - from array_record.python.array_record_data_source import FileInstruction - - # Read only records 1000-2000 from the file - instruction = FileInstruction( - filename='large_file.array_record', - start=1000, - num_records=1000 - ) - - data_source = array_record_data_source.ArrayRecordDataSource([instruction]) - -Utility Functions ------------------ - -The module also provides several utility functions for working with ArrayRecord files, including functions for processing file instructions and validating group sizes. - -Error Handling --------------- - -ArrayRecord operations can raise various exceptions: - -* ``ValueError``: Invalid parameters or file paths -* ``RuntimeError``: File I/O errors or corruption -* ``OSError``: System-level file access errors - -**Example:** - -.. code-block:: python - - try: - data_source = array_record_data_source.ArrayRecordDataSource('nonexistent.array_record') - except (ValueError, RuntimeError, OSError) as e: - print(f"Error opening file: {e}") - -Performance Tips ----------------- - -1. **For Sequential Access**: Use default settings or increase buffer size - - .. code-block:: python - - reader_options = { - 'readahead_buffer_size': '64MB', - 'max_parallelism': '8' - } - -2. **For Random Access**: Disable readahead and parallelism - - .. code-block:: python - - reader_options = { - 'readahead_buffer_size': '0', - 'max_parallelism': '0' - } - -3. **For Large Files**: Use FileInstruction to read subsets - -4. **For Batch Processing**: Read multiple records at once using list indexing - -5. **Memory Management**: Use context managers to ensure proper cleanup diff --git a/docs/quickstart.md b/docs/quickstart.md deleted file mode 100644 index 07e35fe..0000000 --- a/docs/quickstart.md +++ /dev/null @@ -1,335 +0,0 @@ -# Quick Start Guide - -This guide will help you get started with ArrayRecord quickly. ArrayRecord is designed to be simple to use while providing high performance for both sequential and random access patterns. - -## Basic Writing and Reading - -### Writing Your First ArrayRecord File - -```python -from array_record.python import array_record_module - -# Create a writer with default settings -writer = array_record_module.ArrayRecordWriter('my_data.array_record') - -# Write some records -for i in range(1000): - data = f"Record number {i}".encode('utf-8') - writer.write(data) - -# Always close the writer to finalize the file -writer.close() -``` - -### Writer Configuration Options - -ArrayRecord allows you to configure the writer for different performance characteristics: - -```python -from array_record.python import array_record_module - -# Maximum random access performance -# group_size:1 stores each record individually for fastest random access -writer = array_record_module.ArrayRecordWriter( - 'random_access.array_record', - 'group_size:1' -) -for i in range(1000): - data = f"Record {i}".encode('utf-8') - writer.write(data) -writer.close() - -# High compression configuration -# group_size:1000 groups records together for better compression -writer = array_record_module.ArrayRecordWriter( - 'compressed.array_record', - 'group_size:1000,brotli:9' -) -for i in range(1000): - data = f"Record {i}".encode('utf-8') - writer.write(data) -writer.close() -``` - -**Key Configuration Parameters:** - -- **`group_size:N`**: Number of records per group - - `group_size:1` - Maximum random access speed (larger file size) - - `group_size:100` - Balanced performance (recommended default) - - `group_size:1000` - Better compression (slower random access) - -- **Compression Options:** - - `brotli:1-11` - Brotli compression (higher = better compression) - - `zstd:1-22` - Zstandard compression (fast compression/decompression) - - `snappy` - Very fast compression with moderate ratio - - `uncompressed` - No compression (fastest write/read) - -## Working with tf.train.Example - -ArrayRecord integrates seamlessly with TensorFlow's `tf.train.Example` format for structured ML data: - -```python -import tensorflow as tf -import grain -import dataclasses -from array_record.python import array_record_module, array_record_data_source - -# Writing tf.train.Example records -def create_tf_example(text_data, is_tokens=False): - if is_tokens: - # Integer tokens - features = {'text': tf.train.Feature(int64_list=tf.train.Int64List(value=text_data))} - else: - # String text - features = {'text': tf.train.Feature(bytes_list=tf.train.BytesList(value=[text_data.encode('utf-8')]))} - - return tf.train.Example(features=tf.train.Features(feature=features)) - -# Write examples to ArrayRecord -writer = array_record_module.ArrayRecordWriter('tf_examples.array_record', 'group_size:1') -for text in ["Sample text", "Another example"]: - example = create_tf_example(text) - writer.write(example.SerializeToString()) # Already bytes, no .encode() needed -writer.close() - -# MaxText-style parsing with Grain -@dataclasses.dataclass -class ParseFeatures(grain.MapTransform): - """Parse tf.train.Example records (from MaxText).""" - def __init__(self, data_columns, tokenize): - self.data_columns = data_columns - self.dtype = tf.string if tokenize else tf.int64 - - def map(self, element): - return tf.io.parse_example( - element, - {col: tf.io.FixedLenSequenceFeature([], dtype=self.dtype, allow_missing=True) - for col in self.data_columns} - ) - -@dataclasses.dataclass -class NormalizeFeatures(grain.MapTransform): - """Normalize features (from MaxText).""" - def __init__(self, column_names, tokenize): - self.column_names = column_names - self.tokenize = tokenize - - def map(self, element): - if self.tokenize: - return {col: element[col].numpy()[0].decode() for col in self.column_names} - else: - return {col: element[col].numpy() for col in self.column_names} - -# Create MaxText-style training pipeline -data_source = array_record_data_source.ArrayRecordDataSource('tf_examples.array_record') -dataset = ( - grain.MapDataset.source(data_source) - .map(ParseFeatures(['text'], tokenize=True)) # Parse tf.train.Example - .map(NormalizeFeatures(['text'], tokenize=True)) # Normalize features - .batch(batch_size=32) - .shuffle(seed=42) -) -``` - -**Benefits**: Standard TensorFlow format + ArrayRecord performance + MaxText compatibility for production LLM training. - -### Reading ArrayRecord Files - -```python -from array_record.python import array_record_data_source - -# Create a data source for reading -data_source = array_record_data_source.ArrayRecordDataSource('my_data.array_record') - -# Get the total number of records -print(f"Total records: {len(data_source)}") - -# Read the first record -first_record = data_source[0] -print(f"First record: {first_record.decode('utf-8')}") - -# Read multiple records by index -batch = data_source[[0, 10, 100, 500]] -for i, record in enumerate(batch): - print(f"Record: {record.decode('utf-8')}") -``` - -## Working with Binary Data - -ArrayRecord excels at storing binary data like serialized protocol buffers: - -```python -import json -from array_record.python import array_record_module - -# Writing JSON data as bytes -writer = array_record_module.ArrayRecordWriter('json_data.array_record') - -data_objects = [ - {"id": 1, "name": "Alice", "score": 95.5}, - {"id": 2, "name": "Bob", "score": 87.2}, - {"id": 3, "name": "Charlie", "score": 92.8}, -] - -for obj in data_objects: - json_bytes = json.dumps(obj).encode('utf-8') - writer.write(json_bytes) - -writer.close() - -# Reading JSON data back -from array_record.python import array_record_data_source - -data_source = array_record_data_source.ArrayRecordDataSource('json_data.array_record') - -for i in range(len(data_source)): - json_bytes = data_source[i] - obj = json.loads(json_bytes.decode('utf-8')) - print(f"Object {i}: {obj}") -``` - -## Configuration Options - -### Writer Options - -ArrayRecord provides several configuration options for optimization: - -```python -from array_record.python import array_record_module - -# Configure writer options -options = { - 'group_size': '1000', # Records per chunk (affects compression vs random access trade-off) - 'compression': 'brotli:6', # Compression algorithm and level -} - -writer = array_record_module.ArrayRecordWriter( - 'optimized.array_record', - ','.join([f'{k}:{v}' for k, v in options.items()]) -) - -# Write data... -writer.close() -``` - -### Reader Options - -```python -from array_record.python import array_record_data_source - -# Configure reader options for different access patterns -reader_options = { - 'readahead_buffer_size': '0', # Disable readahead for pure random access - 'max_parallelism': '4', # Number of parallel threads -} - -data_source = array_record_data_source.ArrayRecordDataSource( - 'optimized.array_record', - reader_options=reader_options -) -``` - -## Performance Tips - -### Sequential Access - -For sequential reading, use the default settings: - -```python -# Default settings are optimized for sequential access -data_source = array_record_data_source.ArrayRecordDataSource('data.array_record') - -# Iterate through all records -for i in range(len(data_source)): - record = data_source[i] - # Process record... -``` - -### Random Access - -For random access, disable readahead: - -```python -# Optimize for random access -reader_options = { - 'readahead_buffer_size': '0', - 'max_parallelism': '0', -} - -data_source = array_record_data_source.ArrayRecordDataSource( - 'data.array_record', - reader_options=reader_options -) - -# Random access is now optimized -import random -indices = random.sample(range(len(data_source)), 100) -batch = data_source[indices] -``` - -### Batch Processing - -Read multiple records at once for better performance: - -```python -data_source = array_record_data_source.ArrayRecordDataSource('data.array_record') - -# Process in batches -batch_size = 100 -total_records = len(data_source) - -for start in range(0, total_records, batch_size): - end = min(start + batch_size, total_records) - indices = list(range(start, end)) - batch = data_source[indices] - - # Process batch... - for record in batch: - # Process individual record... - pass -``` - -## Context Manager Usage - -Use context managers for automatic resource cleanup: - -```python -from array_record.python import array_record_data_source - -# Automatically handles cleanup -with array_record_data_source.ArrayRecordDataSource('data.array_record') as data_source: - # Read data... - for i in range(min(10, len(data_source))): - record = data_source[i] - print(f"Record {i}: {len(record)} bytes") -# File is automatically closed -``` - -## Error Handling - -Always handle potential errors: - -```python -from array_record.python import array_record_module, array_record_data_source - -try: - # Writing - writer = array_record_module.ArrayRecordWriter('output.array_record') - writer.write(b'test data') - writer.close() - - # Reading - data_source = array_record_data_source.ArrayRecordDataSource('output.array_record') - record = data_source[0] - print(f"Successfully read: {record}") - -except Exception as e: - print(f"Error: {e}") -``` - -## Next Steps - -- Learn about [Core Concepts](core_concepts.md) for deeper understanding -- Explore [Python API Reference](python_reference.rst) for complete API documentation -- Check out [Apache Beam Integration](beam_integration.md) for large-scale processing -- See [Examples](examples.md) for real-world use cases