- π― Overview
- π 30-Second Quick Start
- π Supported Formats & Libraries
- ποΈ Architecture
- β‘ Performance Comparison
- π‘ Real-World Use Cases
- π― Core Features
- π§ Advanced Usage
- π οΈ Installation
- π Documentation & Examples
- π Why Choose Atio?
- π License
Atio is a Python library that prevents data loss and ensures safe file writing. Through atomic writing, it protects existing data even when errors occur during file writing, and supports various data formats and database connections.
- π Zero Data Loss: Atomic operations guarantee file integrity
- β‘ High Performance: Minimal overhead with maximum safety
- π Auto Rollback: Automatic recovery when errors occur
- π Universal Support: Works with Pandas, Polars, NumPy, and more
- π― Simple API: Drop-in replacement for existing code
pip install atioimport atio
import pandas as pd
# Create sample data
df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"city": ["Seoul", "Busan", "Incheon"]
})
# Safe atomic writing
atio.write(df, "users.parquet", format="parquet")
# β
File saved safely with atomic operation!| Format | Pandas | Polars | NumPy | Description |
|---|---|---|---|---|
| CSV | β | β | β | Comma-separated values |
| Parquet | β | β | β | Columnar storage format |
| Excel | β | β | β | Microsoft Excel files |
| JSON | β | β | β | JavaScript Object Notation |
| SQL | β | β | β | SQL database storage |
| Database | β | β | β | Direct database connection |
| NPY/NPZ | β | β | β | NumPy binary formats |
| Pickle | β | β | β | Python serialization |
| HTML | β | β | β | HTML table format |
graph LR
A[Data Object] --> B[Temp File]
B --> C[Validation]
C --> D[Atomic Replace]
D --> E[Success Flag]
C -->|Error| F[Rollback]
F --> G[Original File Preserved]
style A fill:#e1f5fe
style E fill:#c8e6c9
style F fill:#ffcdd2
style G fill:#c8e6c9
- π‘οΈ Atomic Operations: Temporary file β Validation β Atomic replacement
- π Rollback Mechanism: Automatic recovery on failure
- π Progress Monitoring: Real-time progress for large files
- π Version Management: Snapshot-based data versioning
- π§Ή Auto Cleanup: Automatic cleanup of temporary files
# ETL pipeline with automatic rollback
try:
atio.write(processed_data, "final_results.parquet", format="parquet")
print("β
Pipeline completed successfully")
except Exception as e:
print("β Pipeline failed, but original data is safe")
# Original file remains untouched# Version-controlled experiment results
atio.write_snapshot(model_results, "experiment_v1", mode="overwrite")
atio.write_snapshot(improved_results, "experiment_v1", mode="append")
# Rollback to previous version if needed
atio.rollback("experiment_v1", version_id=1)# Progress monitoring for large datasets
atio.write(large_df, "big_data.parquet",
format="parquet",
show_progress=True)
# Shows: β Writing big_data.parquet... [ 45.2 MB | 12.3 MB/s | 00:15 ]# Safe writing with automatic rollback
atio.write(df, "data.parquet", format="parquet")
# Creates: data.parquet + .data.parquet._SUCCESS# Direct database storage
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@localhost/db')
atio.write(df, format="sql", name="users", con=engine, if_exists="replace")# Snapshot-based versioning
atio.write_snapshot(df, "my_table", mode="overwrite") # v1
atio.write_snapshot(new_df, "my_table", mode="append") # v2
# Read specific version
df_v1 = atio.read_table("my_table", version=1)# Real-time progress for large files
atio.write(large_df, "data.parquet",
format="parquet",
show_progress=True,
verbose=True)import polars as pl
import numpy as np
# Polars DataFrame
pl_df = pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
atio.write(pl_df, "data.parquet", format="parquet")
# NumPy Arrays
arr = np.random.randn(1000, 100)
atio.write(arr, "array.npy", format="npy")
# Multiple arrays
atio.write({'arr1': arr, 'arr2': arr*2}, "arrays.npz", format="npz")# Automatic rollback on failure
try:
atio.write(df, "data.parquet", format="parquet")
except Exception as e:
print(f"Write failed: {e}")
# Original file is automatically preserved# Detailed performance analysis
atio.write(df, "data.parquet", format="parquet", verbose=True)
# Output:
# [INFO] Temporary directory created: /tmp/tmp12345
# [INFO] Writer to use: to_parquet (format: parquet)
# [INFO] β
File writing completed (total time: 0.1234s)pip install atio# For Excel support
pip install atio[excel]
# For database support
pip install atio[database]
# For all features
pip install atio[all]git clone https://github.com/seojaeohcode/atio.git
cd atio
pip install -e .- Complete Documentation - Full API reference
- Quick Start Guide - Get started in minutes
- Advanced Usage - Power user features
import atio
import pandas as pd
# Create sample data
df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"city": ["Seoul", "Busan", "Incheon"]
})
# Safe atomic writing
atio.write(df, "users.parquet", format="parquet")
print("β
File saved safely!")
# Read back to verify
df_read = pd.read_parquet("users.parquet")
print(df_read)import atio
import pandas as pd
import numpy as np
# Create large dataset
large_df = pd.DataFrame(np.random.randn(200000, 5), columns=list("ABCDE"))
# Save with progress monitoring
atio.write(large_df, "large_data.parquet",
format="parquet",
show_progress=True)
# Shows: β Writing large_data.parquet... [ 45.2 MB | 12.3 MB/s | 00:15 ]import atio
import pandas as pd
# Version 1: Initial data
df_v1 = pd.DataFrame({"id": [1, 2, 3], "value": ["A", "B", "C"]})
atio.write_snapshot(df_v1, "my_table", mode="overwrite")
# Version 2: Append new data
df_v2 = pd.DataFrame({"score": [95, 87, 92]})
atio.write_snapshot(df_v2, "my_table", mode="append")
# Read specific version
df_latest = atio.read_table("my_table") # Latest version
df_v1 = atio.read_table("my_table", version=1) # Version 1import atio
import pandas as pd
import time
# Performance comparison
df = pd.DataFrame(np.random.randn(100000, 10))
# Standard pandas
start = time.time()
df.to_parquet("standard.parquet")
pandas_time = time.time() - start
# Atio with safety
start = time.time()
atio.write(df, "safe.parquet", format="parquet", verbose=True)
atio_time = time.time() - start
print(f"Pandas: {pandas_time:.3f}s")
print(f"Atio: {atio_time:.3f}s")
print(f"Safety overhead: {((atio_time/pandas_time - 1) * 100):.1f}%")# test_interrupt.py
import atio
import pandas as pd
import numpy as np
print("Creating large dataset...")
df = pd.DataFrame(np.random.randn(1000000, 10))
print("Starting write operation...")
print("Press Ctrl+C to test interrupt safety!")
try:
atio.write(df, "test_interrupt.parquet",
format="parquet",
show_progress=True)
print("β
Write completed successfully!")
except KeyboardInterrupt:
print("β Interrupted by user!")
print("π Checking file safety...")
import os
if os.path.exists("test_interrupt.parquet"):
print("β οΈ File exists but may be corrupted")
else:
print("β
No corrupted file left behind!")# test_oom.py
import atio
import pandas as pd
import numpy as np
def simulate_oom():
print("Creating extremely large dataset...")
# This will likely cause OOM
huge_df = pd.DataFrame(np.random.randn(10000000, 100))
print("Attempting to save...")
try:
atio.write(huge_df, "huge_data.parquet", format="parquet")
print("β
Successfully saved!")
except MemoryError:
print("β Out of Memory error!")
print("β
But original file is safe!")
except Exception as e:
print(f"β Error: {e}")
print("β
Atio protected your data!")
# Run the test
simulate_oom()# ci_pipeline.py
import atio
import pandas as pd
import os
def deploy_artifacts():
"""Simulate CI/CD pipeline deployment"""
# Generate deployment artifacts
config = pd.DataFrame({
"service": ["api", "web", "db"],
"version": ["v1.2.3", "v1.2.3", "v1.2.3"],
"status": ["ready", "ready", "ready"]
})
metrics = pd.DataFrame({
"metric": ["cpu", "memory", "disk"],
"value": [75.5, 68.2, 45.1],
"unit": ["%", "%", "%"]
})
print("π Starting deployment...")
try:
# Atomic deployment - either all succeed or all fail
atio.write(config, "deployment_config.json", format="json")
atio.write(metrics, "deployment_metrics.parquet", format="parquet")
# Create success marker
atio.write(pd.DataFrame({"status": ["deployed"]}),
"deployment_success.parquet", format="parquet")
print("β
Deployment completed successfully!")
return True
except Exception as e:
print(f"β Deployment failed: {e}")
print("π Rolling back...")
# Clean up any partial files
for file in ["deployment_config.json", "deployment_metrics.parquet"]:
if os.path.exists(file):
os.remove(file)
print("β
Rollback completed - system is clean!")
return False
# Test the pipeline
deploy_artifacts()- Zero data loss even during system failures
- Automatic rollback on any error
- File integrity guaranteed by atomic operations
- Minimal overhead (1.1-1.2x vs native libraries)
- Progress monitoring for large files
- Memory efficient processing
- Drop-in replacement for existing code
- Simple API with powerful features
- Comprehensive documentation and examples
- Multiple data formats (CSV, Parquet, Excel, JSON, etc.)
- Multiple libraries (Pandas, Polars, NumPy)
- Database integration (SQL, NoSQL)
This project is distributed under the Apache 2.0 License. See the LICENSE file for details.
