From b88367cfb52ddd132a4d2b93c65e008db0a96f9f Mon Sep 17 00:00:00 2001 From: realstone3639 Date: Sun, 26 Oct 2025 20:59:05 +0900 Subject: [PATCH 1/2] =?UTF-8?q?=EA=B8=B0=EB=8A=A5=20=EC=8B=9C=EC=97=B0=20?= =?UTF-8?q?=EC=BD=94=EB=93=9C=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- WriteTest.py | 254 +++++++++++++++++ demo_atio_usage.py | 100 ++++--- src/atio/__init__.py | 6 +- src/atio/core.py | 70 +++-- test_core_features.py | 567 ++++++++++++++++++++++++++++++++++++++ writeModelSanpshotTest.py | 221 +++++++++++++++ writeSnapshotTest.py | 263 ++++++++++++++++++ 7 files changed, 1415 insertions(+), 66 deletions(-) create mode 100644 WriteTest.py create mode 100644 test_core_features.py create mode 100644 writeModelSanpshotTest.py create mode 100644 writeSnapshotTest.py diff --git a/WriteTest.py b/WriteTest.py new file mode 100644 index 0000000..2908c99 --- /dev/null +++ b/WriteTest.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Atio Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Atio Atomic Write (`atio.write`) 기능 테스트 스크립트 +""" + +# --- 모듈 경로 설정을 위한 코드 --- +import sys +import os +import shutil +import time + +# 현재 스크립트의 경로를 기준으로 'src' 폴더의 절대 경로를 계산합니다. +project_root = os.path.dirname(os.path.abspath(__file__)) +src_path = os.path.join(project_root, 'src') +if src_path not in sys.path: + sys.path.insert(0, src_path) +# ------------------------------------ + +import atio +import pandas as pd +import numpy as np +import polars as pl +from sqlalchemy import create_engine + +# --- 전역 테스트 설정 --- +TEST_OUTPUT_DIR = "atio_atomic_write_tests" # 디렉토리 이름 변경 + +# --- 헬퍼 함수 --- +def print_test_header(title): + print("\n" + "=" * 60) + print(f"🧪 테스트 시작: {title}") + print("=" * 60) + +def print_test_result(success, message=""): + if success: + print(f" ✅ [성공] {message}") + else: + print(f" ❌ [실패] {message}") + +def setup_test_environment(): + """테스트 실행 전 테스트 디렉토리를 정리하고 생성합니다.""" + print(f"테스트 환경 설정: '{TEST_OUTPUT_DIR}' 디렉토리 정리 및 생성...") + if os.path.exists(TEST_OUTPUT_DIR): + shutil.rmtree(TEST_OUTPUT_DIR) + os.makedirs(TEST_OUTPUT_DIR, exist_ok=True) + # 모델 관련 디렉토리 생성은 필요 없음 + print("✅ 테스트 환경 준비 완료.") + +# --- atio.write() 테스트 함수들 --- + +def test_write_pandas(): + """atio.write() - Pandas DataFrame 지원 포맷 테스트""" + print_test_header("atio.write() - Pandas") + try: + df = pd.DataFrame({ + "A": [1, 2, 3], + "B": ["foo", "bar", "baz"], + "C": [0.1, 0.2, 0.3] + }) + formats_to_test = ['csv', 'parquet', 'json', 'pickle', 'html', 'xlsx'] + for fmt in formats_to_test: + try: + target_path = os.path.join(TEST_OUTPUT_DIR, f"pd_test.{fmt}") + kwargs = {} + format_to_write = fmt + if fmt == 'xlsx': + kwargs['engine'] = 'openpyxl' + format_to_write = 'excel' + atio.write(df, target_path, format=format_to_write, **kwargs) + print_test_result(True, f"format='{format_to_write}' (.{fmt}) 저장") + except Exception as e: + print_test_result(False, f"format='{format_to_write}' (.{fmt}) 저장 중 오류: {e}") + # SQL 테스트 + try: + engine = create_engine('sqlite:///:memory:') + atio.write(df, format="sql", name='pd_test_table', con=engine, index=False) + with engine.connect() as conn: + read_df = pd.read_sql("SELECT * FROM pd_test_table", conn) + assert len(read_df) == 3 + print_test_result(True, "format='sql' 저장 (in-memory)") + except Exception as e: + print_test_result(False, f"format='sql' 저장 중 오류: {e}") + except Exception as e: + print(f"Pandas 테스트 중 예기치 않은 오류: {e}") + +def test_write_polars(): + """atio.write() - Polars DataFrame 지원 포맷 테스트""" + print_test_header("atio.write() - Polars") + try: + df = pl.DataFrame({ + "X": [10, 20, 30], + "Y": [True, False, True] + }) + formats_to_test = ['csv', 'parquet', 'json', 'ipc', 'avro'] + for fmt in formats_to_test: + try: + target_path = os.path.join(TEST_OUTPUT_DIR, f"pl_test.{fmt}") + atio.write(df, target_path, format=fmt) + print_test_result(True, f"format='{fmt}' 저장") + except Exception as e: + print_test_result(False, f"format='{fmt}' 저장 중 오류: {e}") + except Exception as e: + print(f"Polars 테스트 중 예기치 않은 오류: {e}") + +def test_write_numpy(): + """atio.write() - NumPy Array 지원 포맷 테스트""" + print_test_header("atio.write() - NumPy") + try: + arr_1d = np.array([1, 2, 3, 4, 5]) + arr_2d = np.random.rand(5, 3) + arr_dict = {'a': arr_1d, 'b': arr_2d} + atio.write(arr_2d, os.path.join(TEST_OUTPUT_DIR, "np_test.npy"), "npy") + print_test_result(True, "format='npy' 저장") + atio.write(arr_dict, os.path.join(TEST_OUTPUT_DIR, "np_test.npz"), "npz") + print_test_result(True, "format='npz' 저장") + atio.write(arr_dict, os.path.join(TEST_OUTPUT_DIR, "np_test_comp.npz"), "npz_compressed") + print_test_result(True, "format='npz_compressed' 저장") + atio.write(arr_2d, os.path.join(TEST_OUTPUT_DIR, "np_test.csv"), "csv") + print_test_result(True, "format='csv' 저장") + atio.write(arr_1d.astype(np.float32), os.path.join(TEST_OUTPUT_DIR, "np_test.bin"), "bin") + print_test_result(True, "format='bin' 저장") + except Exception as e: + print(f" ❌ NumPy 테스트 중 예기치 않은 오류: {e}") + +def test_write_options(): + """atio.write() - show_progress 및 verbose 옵션 테스트""" + print_test_header("atio.write() - 옵션 (show_progress, verbose)") + try: + print("\n [1] show_progress=True 테스트 (진행도 표시줄이 나타나야 함):") + large_df = pd.DataFrame(np.random.randn(100000000, 5), columns=list("ABCDE")) + atio.write(large_df, os.path.join(TEST_OUTPUT_DIR, "large.parquet"), "parquet", show_progress=True) + print_test_result(True, "show_progress=True 실행 완료 (시각적 확인 필요)") + print("\n [2] verbose=True 테스트 (상세 로그 출력):") + small_df = pd.DataFrame({"id": [1]}) + atio.write(small_df, os.path.join(TEST_OUTPUT_DIR, "verbose.csv"), "csv", verbose=True) + print_test_result(True, "verbose=True 실행 완료") + except Exception as e: + print(f"옵션 테스트 중 예기치 않은 오류: {e}") + +def test_write_database(): + """atio.write() - 데이터베이스 쓰기 (Pandas, Polars) 테스트 및 내용 확인""" + print_test_header("atio.write() - 데이터베이스 쓰기 (sql, database)") + + # 1. Pandas (sql) - in-memory + try: + pd_df = pd.DataFrame({"pd_col": [1, 2]}) + engine_memory = create_engine('sqlite:///:memory:') + atio.write(pd_df, format="sql", name='pd_sql_test', con=engine_memory) + + # ==================== DB에 저장된 내용 출력 ========================== + with engine_memory.connect() as conn: + read_pd_df = pd.read_sql_table('pd_sql_test', conn) + print("\n --- Pandas (in-memory) DB 저장 내용 ---") + print(read_pd_df) + print(" --------------------------------------") + print_test_result(True, "Pandas format='sql' (in-memory) 저장 및 내용 출력") + except Exception as e: + print_test_result(False, f"Pandas format='sql' 저장 중 오류: {e}") + + # 2. Polars (database) - file-based + db_path = os.path.join(TEST_OUTPUT_DIR, "polars.db") + engine_uri = f"sqlite:///{db_path}" + + try: + pl_df = pl.DataFrame({"pl_col": [True, False]}) + + # --- 쓰기 --- + atio.write(pl_df, format="database", table_name='pl_db_test', + connection_uri=engine_uri, connection=engine_uri) + print_test_result(True, "Polars format='database' (file-based) 저장") + + # ==================== DB에 저장된 내용 출력 ========================== + print("\n --- Polars (file-based) DB 저장 내용 ---") + # 파일 DB에 연결 + verify_engine = create_engine(engine_uri) + read_back_df_pd = None + with verify_engine.connect() as conn: + # Pandas를 사용하여 DB에서 데이터 읽기 + read_back_df_pd = pd.read_sql_table('pl_db_test', conn) + + # 읽어온 DataFrame 출력 + print(read_back_df_pd) + print(" --------------------------------------") + print_test_result(True, "Polars DB 내용 읽기 및 출력 완료") + # --- 출력 끝 --- + + except Exception as e: + # 쓰기 또는 읽기 단계에서 오류 발생 시 + print_test_result(False, f"Polars format='database' 저장 또는 읽기 중 오류: {e}") + import traceback + traceback.print_exc() + +# 파일 정리 +def cleanup_demo_files(): + """테스트 실행 후 생성된 파일들을 정리합니다.""" + print("\n" + "=" * 60) + print("🧹 테스트 파일 정리") + print("=" * 60) + if not os.path.exists(TEST_OUTPUT_DIR): + print(f"🗑️ 정리할 디렉토리가 없습니다: '{TEST_OUTPUT_DIR}'") + return + print(f"🗑️ 생성된 테스트 디렉토리: '{TEST_OUTPUT_DIR}'") + print("\n❓ 테스트 디렉토리를 삭제하시겠습니까? (y/n): ", end="") + try: + response = input().lower().strip() + except (EOFError, KeyboardInterrupt): + response = 'n' + print("\n입력 없이 종료하여 파일을 보존합니다.") + if response == 'y': + try: + shutil.rmtree(TEST_OUTPUT_DIR) + print(f"\n✅ '{TEST_OUTPUT_DIR}' 디렉토리와 모든 내용이 삭제되었습니다.") + except Exception as e: + print(f"\n❌ 디렉토리 삭제 중 오류 발생: {e}") + else: + print(f"\n📁 '{TEST_OUTPUT_DIR}' 디렉토리가 보존되었습니다.") + +def main(): + + try: + setup_test_environment() + # --- atio.write() 테스트 호출 + # test_write_pandas() + # test_write_polars() + # test_write_numpy() + # test_write_options() + test_write_database() + except Exception as e: + print(f"\n" + "=" * 60) + print(f"테스트 실행 중 오류 발생: {e}") + import traceback + traceback.print_exc() + print("=" * 60) + finally: + cleanup_demo_files() + print("\n" + "=" * 60) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/demo_atio_usage.py b/demo_atio_usage.py index cc3d536..ba82ac7 100644 --- a/demo_atio_usage.py +++ b/demo_atio_usage.py @@ -33,11 +33,12 @@ import atio import pandas as pd import numpy as np +import polars as pl import time from sqlalchemy import create_engine import shutil import json - +# from .src.atio.core import export_to_datalake def demo_basic_usage(): """ 기본 파일 기반 쓰기 사용법 데모 @@ -47,36 +48,37 @@ def demo_basic_usage(): print("1. 기본 사용법 (파일 쓰기)") print("=" * 50) - # 간단한 DataFrame 생성 - df = pd.DataFrame({ - "name": ["Alice", "Bob", "Charlie", "Diana"], - "age": [25, 30, 35, 28], - "city": ["Seoul", "Busan", "Incheon", "Daegu"], - "salary": [50000, 60000, 70000, 55000] - }) - - print("📊 생성된 데이터:") - print(df) - print() - - # 다양한 형식으로 저장 - print("💾 파일 저장 중...") - atio.write(df, "users.parquet", format="parquet") - print("✅ users.parquet 저장 완료") + + data_array = np.array([1, 2, 3, 4, 5]) + + data_array2 = np.array([[1, 2, 3], + [4, 5, 6]]) - # to_csv에 추가 인자(index=False)를 전달하는 예시 - atio.write(df, "users.csv", format="csv", index=False) - print("✅ users.csv 저장 완료 (인덱스 제외)") + atio.write(data_array, "data.npy", "npy") + # atio.write(data_array2, "data.npz", "npz") + # atio.write(data_array2, "data.npz_compressed", "npz_compressed") + atio.write(data_array, "data.csv", "csv") + atio.write(data_array, "data.bin", "bin") + + + # atio.write(df, "users.sql", format="sql") + + + + + # # to_csv에 추가 인자(index=False)를 전달하는 예시 + # atio.write(df, "users.csv", format="csv", index=False) + # print("✅ users.csv 저장 완료 (인덱스 제외)") - print("\n📁 생성된 파일들:") - for file in ["users.parquet", "users.csv"]: - if os.path.exists(file): - size = os.path.getsize(file) - print(f" - {file} ({size} bytes)") - # _SUCCESS 파일 확인 - success_file = os.path.join(os.path.dirname(file), f".{os.path.basename(file)}._SUCCESS") - if os.path.exists(success_file): - print(f" └─ {os.path.basename(success_file)} (저장 완료 플래그)") + # print("\n📁 생성된 파일들:") + # for file in ["users.parquet", "users.csv"]: + # if os.path.exists(file): + # size = os.path.getsize(file) + # print(f" - {file} ({size} bytes)") + # # _SUCCESS 파일 확인 + # success_file = os.path.join(os.path.dirname(file), f".{os.path.basename(file)}._SUCCESS") + # if os.path.exists(success_file): + # print(f" └─ {os.path.basename(success_file)} (저장 완료 플래그)") def demo_excel_and_sql(): """ @@ -144,7 +146,7 @@ def demo_large_data(): print("=" * 50) print("📊 대용량 데이터 생성 중...") - large_df = pd.DataFrame(np.random.randn(200000, 5), columns=list("ABCDE")) + large_df = pd.DataFrame(np.random.randn(2000000, 5), columns=list("ABCDE")) print(f"생성된 데이터 크기: {large_df.shape}") print(f"메모리 사용량: {large_df.memory_usage(deep=True).sum() / 1024 / 1024:.1f} MB") @@ -237,6 +239,14 @@ def demo_polars_integration(): except ImportError: print("⚠️ Polars가 설치되지 않았습니다. (pip install polars)") +def dataLake(): + TABLE_DIR = "snapshot_demo_table" + df1 = pd.DataFrame({'id': [1], 'value_a': ['A1']}) + atio.write_snapshot(df1, TABLE_DIR) # v1 생성 + atio.export_to_datalake(TABLE_DIR, version=1, output_path="datalake/v1.parquet") + + + def demo_snapshots(): """ 스냅샷 생성, 롤백, 삭제 및 가비지 컬렉션(GC) 기능을 시연합니다. @@ -379,6 +389,17 @@ def cleanup_demo_files(): else: print("\n📁 데모 파일들이 보존되었습니다.") +def modelSnapshot(): + TABLE_DIR = "snapshot_demo_table" + # atio.write_model_snapshot("C:/Users/reals/Desktop/OSS/weight/output_weights/distilroberta_weights.pth", TABLE_DIR, show_progress=True) + # atio.write_model_snapshot("C:/Users/reals/Desktop/OSS/weight/output_weights/mnli_weights.pth", TABLE_DIR) + # atio.read_model_snapshot(TABLE_DIR, version=None, mode='auto', weights_only=True) + # atio.read_model_snapshot(TABLE_DIR, version=None, mode='restore', destination_path='destination', weights_only=True) + atio.rollback(TABLE_DIR, version_id=1) + atio.delete_version(TABLE_DIR, version_id=2) + # atio.read_model_snapshot(TABLE_DIR, version=None, mode='auto', weights_only=True) + + def main(): """ 메인 데모 실행 함수 @@ -388,15 +409,16 @@ def main(): print("안전한 원자적 파일 쓰기와 데이터베이스 쓰기의 다양한 기능을 보여줍니다.") # 각 데모 실행 - demo_basic_usage() - demo_excel_and_sql() - demo_large_data() - demo_performance_analysis() - demo_numpy_arrays() - demo_error_handling() - demo_polars_integration() - demo_snapshots() - + # demo_basic_usage() + # demo_excel_and_sql() + # demo_large_data() + # demo_performance_analysis() + # demo_numpy_arrays() + # demo_error_handling() + # demo_polars_integration() + # demo_snapshots() + # dataLake() + modelSnapshot() # 파일 정리 cleanup_demo_files() diff --git a/src/atio/__init__.py b/src/atio/__init__.py index f15cb1e..2415340 100644 --- a/src/atio/__init__.py +++ b/src/atio/__init__.py @@ -28,7 +28,8 @@ read_model_snapshot, tag_version, list_snapshots, - revert + revert, + export_to_datalake ) __all__ = [ @@ -41,5 +42,6 @@ "read_model_snapshot", "tag_version", "list_snapshots" - "revert" + "revert", + "export_to_datalake" ] \ No newline at end of file diff --git a/src/atio/core.py b/src/atio/core.py index d782019..af90f1e 100644 --- a/src/atio/core.py +++ b/src/atio/core.py @@ -67,6 +67,9 @@ def write(obj, target_path=None, format=None, show_progress=False, verbose=False elif format == 'database': # Polars if 'table_name' not in kwargs or 'connection_uri' not in kwargs: raise ValueError("'table_name'과 'connection_uri' 인자는 'database' 포맷에 필수입니다.") + + # [버그 수정] Polars는 'connection' 인자를 사용하므로 'connection_uri'를 변경합니다. + kwargs['connection'] = kwargs.pop('connection_uri') # target_path는 무시하고 **kwargs로 받은 인자들을 사용하여 DB에 직접 씁니다. writer_func(**kwargs) @@ -976,6 +979,7 @@ def write_model_snapshot(model_path: str, table_path: str, show_progress: bool = raise ValueError(f"지원하지 않는 모델 형식입니다: {model_path}") # --- 3. 생산자-소비자 패턴으로 병렬 처리 --- + # ▼▼▼ [핵심 수정] all_files_info 초기화를 루프 밖으로 이동 ▼▼▼ all_files_info = [] new_chunks_to_write = {} executor = get_process_pool() @@ -985,32 +989,33 @@ def write_model_snapshot(model_path: str, table_path: str, show_progress: bool = relative_path = os.path.relpath(file_path, model_path_base).replace('\\', '/') file_info = {"path": relative_path, "chunks": []} - with open(file_path, 'rb') as f: - # 생산자: fastcdc가 청크 '정보'를 하나씩 생성하는 제너레이터 - cdc = fastcdc.fastcdc(f, avg_size=65536, fat=True) - - # [핵심] 청크 정보를 만들면서 '동시에' 작업을 제출하는 future 제너레이터를 생성 - def submit_tasks_generator(): - for chunk in cdc: - job_ticket = (file_path, chunk.offset, chunk.length, data_dir) - yield executor.submit(_process_chunk_from_file_task, job_ticket) - - # 총 청크 수를 알 수 없으므로 파일 크기를 기준으로 진행률 표시 - file_size = os.path.getsize(file_path) - progress_desc = os.path.basename(file_path) - - with tqdm(total=file_size, desc=f"Processing {progress_desc}", unit='B', unit_scale=True, disable=not show_progress, leave=False) as pbar: - # as_completed는 future가 완료되는 대로 결과를 반환 - for future in as_completed(submit_tasks_generator()): - chunk_hash, chunk_content = future.result() - file_info["chunks"].append(chunk_hash) - if chunk_content is not None: - new_chunks_to_write[chunk_hash] = chunk_content - - # 대략적인 청크 크기만큼 진행률 업데이트 - pbar.update(65536) + with open(file_path, 'rb') as f: + # 생산자: fastcdc가 청크 '정보'를 하나씩 생성하는 제너레이터 + cdc = fastcdc.fastcdc(f, avg_size=65536, fat=True) + + # [핵심] 청크 정보를 만들면서 '동시에' 작업을 제출하는 future 제너레이터를 생성 + def submit_tasks_generator(): + for chunk in cdc: + job_ticket = (file_path, chunk.offset, chunk.length, data_dir) + yield executor.submit(_process_chunk_from_file_task, job_ticket) + + # 총 청크 수를 알 수 없으므로 파일 크기를 기준으로 진행률 표시 + file_size = os.path.getsize(file_path) + progress_desc = os.path.basename(file_path) + + with tqdm(total=file_size, desc=f"Processing {progress_desc}", unit='B', unit_scale=True, disable=not show_progress, leave=False) as pbar: + # as_completed는 future가 완료되는 대로 결과를 반환 + for future in as_completed(submit_tasks_generator()): + chunk_hash, chunk_content = future.result() + file_info["chunks"].append(chunk_hash) + if chunk_content is not None: + new_chunks_to_write[chunk_hash] = chunk_content + + # 대략적인 청크 크기만큼 진행률 업데이트 + pbar.update(65536) - all_files_info.append(file_info) + # ▼▼▼ 루프 안에서는 append만 수행 ▼▼▼ + all_files_info.append(file_info) logger.info(f"데이터 병렬 처리 완료") @@ -1022,6 +1027,7 @@ def submit_tasks_generator(): snapshot_id = int(time.time()) snapshot_filename = f"snapshot-{snapshot_id}-{uuid.uuid4()}.json" + # 이제 all_files_info에는 모든 파일 정보가 누적되어 있을 것입니다. new_snapshot = {'snapshot_id': snapshot_id, 'timestamp': time.time(), 'files': sorted(all_files_info, key=lambda x: x['path'])} write_json(new_snapshot, os.path.join(metadata_dir, snapshot_filename)) @@ -1101,6 +1107,7 @@ def _reassemble_from_chunks_threaded(table_path, snapshot, destination_path=None f_out.write(chunk_data) return output_path + # --- TensorFlow 디렉토리 구조 처리 --- # --- TensorFlow 디렉토리 구조 처리 --- else: if destination_path is None: @@ -1108,8 +1115,18 @@ def _reassemble_from_chunks_threaded(table_path, snapshot, destination_path=None os.makedirs(destination_path, exist_ok=True) + # ▼▼▼ 디버깅 print 추가 ▼▼▼ + print(f"DEBUG: Starting TensorFlow reassembly. Number of files in snapshot: {len(files_info)}") + if files_info: + print(f"DEBUG: First file info: {files_info[0]}") # 첫번째 파일 정보 확인 + iterable = tqdm(files_info, desc="Reassembling TensorFlow model", disable=not show_progress, unit=" file") + loop_count = 0 # 루프 실행 횟수 카운터 for file_info in iterable: + loop_count += 1 # 카운터 증가 + # ▼▼▼ 디버깅 print 추가 ▼▼▼ + print(f"DEBUG: Reassembling loop iteration {loop_count}: Processing file '{file_info.get('path', 'N/A')}'") + dir_name = os.path.dirname(file_info['path']) if dir_name: os.makedirs(os.path.join(destination_path, dir_name), exist_ok=True) @@ -1124,6 +1141,9 @@ def _reassemble_from_chunks_threaded(table_path, snapshot, destination_path=None # 파일 쓰기 시에는 내부 tqdm 없이 바로 조합 f_out.write(b"".join(all_chunk_data)) + # [수정] for 루프가 끝난 후에 return 하도록 들여쓰기 수정 + # ▼▼▼ 디버깅 print 추가 ▼▼▼ + print(f"DEBUG: Finished TensorFlow reassembly loop after {loop_count} iterations.") return destination_path def read_model_snapshot(table_path, version=None, mode='auto', destination_path=None, max_workers=None, show_progress=False): diff --git a/test_core_features.py b/test_core_features.py new file mode 100644 index 0000000..da54580 --- /dev/null +++ b/test_core_features.py @@ -0,0 +1,567 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Atio Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Atio Core 기능 테스트 스크립트 +core.py의 주요 기능들(write, snapshot, model_snapshot 등)을 테스트합니다. +""" + +# --- 모듈 경로 설정을 위한 코드 --- +import sys +import os +import shutil +import time + +# 현재 스크립트의 경로를 기준으로 'src' 폴더의 절대 경로를 계산합니다. +project_root = os.path.dirname(os.path.abspath(__file__)) +src_path = os.path.join(project_root, 'src') +if src_path not in sys.path: + sys.path.insert(0, src_path) +# ------------------------------------ + +import atio +import pandas as pd +import numpy as np +import polars as pl +import pyarrow as pa +from sqlalchemy import create_engine + +# --- 선택적 라이브러리 임포트 (모델 테스트용) --- +try: + import torch + _TORCH_AVAILABLE = True +except ImportError: + _TORCH_AVAILABLE = False + print("⚠️ PyTorch가 설치되지 않았습니다. 모델 스냅샷(PyTorch) 테스트를 건너뜁니다.") + +try: + import tensorflow as tf + _TENSORFLOW_AVAILABLE = True +except ImportError: + _TENSORFLOW_AVAILABLE = False + print("⚠️ TensorFlow가 설치되지 않았습니다. 모델 스냅샷(TensorFlow) 테스트를 건너뜁니다.") + + +# --- 전역 테스트 설정 --- +TEST_OUTPUT_DIR = "atio_functional_tests" +DATA_SNAPSHOT_PATH = os.path.join(TEST_OUTPUT_DIR, "data_table") +MODEL_SNAPSHOT_PATH = os.path.join(TEST_OUTPUT_DIR, "model_table") +MODEL_RESTORE_PATH = os.path.join(TEST_OUTPUT_DIR, "model_restore") + +# --- 헬퍼 함수 --- +def print_test_header(title): + print("\n" + "=" * 60) + print(f"🧪 테스트 시작: {title}") + print("=" * 60) + +def print_test_result(success, message=""): + if success: + print(f" ✅ [성공] {message}") + else: + print(f" ❌ [실패] {message}") + +def setup_test_environment(): + """테스트 실행 전 테스트 디렉토리를 정리하고 생성합니다.""" + print(f"🔧 테스트 환경 설정: '{TEST_OUTPUT_DIR}' 디렉토리 정리 및 생성...") + if os.path.exists(TEST_OUTPUT_DIR): + shutil.rmtree(TEST_OUTPUT_DIR) + os.makedirs(TEST_OUTPUT_DIR, exist_ok=True) + os.makedirs(MODEL_RESTORE_PATH, exist_ok=True) + print("✅ 테스트 환경 준비 완료.") + +def create_dummy_models(): + """테스트에 사용할 더미 모델 파일(.pth, saved_model)을 생성합니다.""" + print("🔧 더미 모델 파일 생성 중...") + + # 1. PyTorch 더미 모델 생성 + if _TORCH_AVAILABLE: + try: + pth_path = os.path.join(TEST_OUTPUT_DIR, "dummy_model_v1.pth") + state_dict = {'layer1.weight': torch.randn(10, 5), 'layer1.bias': torch.randn(10)} + torch.save(state_dict, pth_path) + print(f" ✅ PyTorch 더미 모델 생성 완료: {pth_path}") + except Exception as e: + print(f" ❌ PyTorch 더미 모델 생성 실패: {e}") + + # 2. TensorFlow 더미 모델 생성 + if _TENSORFLOW_AVAILABLE: + try: + tf_path = os.path.join(TEST_OUTPUT_DIR, "dummy_model_tf_v1") + + class SimpleModule(tf.Module): + def __init__(self, name=None): + super().__init__(name=name) + self.v = tf.Variable(5.0, name="v1") + @tf.function + def __call__(self, x): + return self.v * x + + model = SimpleModule() + tf.saved_model.save(model, tf_path) + print(f" ✅ TensorFlow 더미 모델 생성 완료: {tf_path}") + except Exception as e: + print(f" ❌ TensorFlow 더미 모델 생성 실패: {e}") + +# --- 1. atio.write() 테스트 --- + +def test_write_pandas(): + """atio.write() - Pandas DataFrame 지원 포맷 테스트""" + print_test_header("atio.write() - Pandas") + try: + df = pd.DataFrame({ + "A": [1, 2, 3], + "B": ["foo", "bar", "baz"], + "C": [0.1, 0.2, 0.3] + }) + + # 테스트할 포맷 리스트 (sql은 별도 처리) + # (수정) 'excel' 포맷의 대상 경로를 '.xlsx'로 변경 + formats_to_test = ['csv', 'parquet', 'json', 'pickle', 'html', 'xlsx'] + + for fmt in formats_to_test: + try: + # (수정) 파일명 포맷을 fmt 그대로 사용 (pd_test.xlsx) + target_path = os.path.join(TEST_OUTPUT_DIR, f"pd_test.{fmt}") + kwargs = {} + format_to_write = fmt + if fmt == 'xlsx': + # openpyxl 필요, format은 'excel'로 지정 + kwargs['engine'] = 'openpyxl' + format_to_write = 'excel' + + atio.write(df, target_path, format=format_to_write, **kwargs) + print_test_result(True, f"format='{format_to_write}' (.{fmt}) 저장") + except Exception as e: + print_test_result(False, f"format='{format_to_write}' (.{fmt}) 저장 중 오류: {e}") + + # SQL 테스트 (in-memory) + try: + engine = create_engine('sqlite:///:memory:') + atio.write(df, format="sql", name='pd_test_table', con=engine, index=False) + # 검증 + with engine.connect() as conn: + read_df = pd.read_sql("SELECT * FROM pd_test_table", conn) + assert len(read_df) == 3 + print_test_result(True, "format='sql' 저장 (in-memory)") + except Exception as e: + print_test_result(False, f"format='sql' 저장 중 오류: {e}") + + except Exception as e: + print(f" ❌ Pandas 테스트 중 예기치 않은 오류: {e}") + +def test_write_polars(): + """atio.write() - Polars DataFrame 지원 포맷 테스트""" + print_test_header("atio.write() - Polars") + try: + df = pl.DataFrame({ + "X": [10, 20, 30], + "Y": [True, False, True] + }) + + formats_to_test = ['csv', 'parquet', 'json', 'ipc', 'avro'] + + for fmt in formats_to_test: + try: + target_path = os.path.join(TEST_OUTPUT_DIR, f"pl_test.{fmt}") + atio.write(df, target_path, format=fmt) + print_test_result(True, f"format='{fmt}' 저장") + except Exception as e: + print_test_result(False, f"format='{fmt}' 저장 중 오류: {e}") + + except Exception as e: + print(f" ❌ Polars 테스트 중 예기치 않은 오류: {e}") + +def test_write_numpy(): + """atio.write() - NumPy Array 지원 포맷 테스트""" + print_test_header("atio.write() - NumPy") + try: + arr_1d = np.array([1, 2, 3, 4, 5]) + arr_2d = np.random.rand(5, 3) + arr_dict = {'a': arr_1d, 'b': arr_2d} + + # npy (1D/2D) + atio.write(arr_2d, os.path.join(TEST_OUTPUT_DIR, "np_test.npy"), "npy") + print_test_result(True, "format='npy' 저장") + + # npz + atio.write(arr_dict, os.path.join(TEST_OUTPUT_DIR, "np_test.npz"), "npz") + print_test_result(True, "format='npz' 저장") + + # npz_compressed + atio.write(arr_dict, os.path.join(TEST_OUTPUT_DIR, "np_test_comp.npz"), "npz_compressed") + print_test_result(True, "format='npz_compressed' 저장") + + # csv (1D 또는 2D) + atio.write(arr_2d, os.path.join(TEST_OUTPUT_DIR, "np_test.csv"), "csv") + print_test_result(True, "format='csv' 저장") + + # bin (1D) + atio.write(arr_1d.astype(np.float32), os.path.join(TEST_OUTPUT_DIR, "np_test.bin"), "bin") + print_test_result(True, "format='bin' 저장") + + except Exception as e: + print(f" ❌ NumPy 테스트 중 예기치 않은 오류: {e}") + +def test_write_options(): + """atio.write() - show_progress 및 verbose 옵션 테스트""" + print_test_header("atio.write() - 옵션 (show_progress, verbose)") + try: + # 1. show_progress=True 테스트 + print("\n [1] show_progress=True 테스트 (진행도 표시줄이 나타나야 함):") + large_df = pd.DataFrame(np.random.randn(1000000, 5), columns=list("ABCDE")) + atio.write(large_df, os.path.join(TEST_OUTPUT_DIR, "large.parquet"), "parquet", show_progress=True) + print_test_result(True, "show_progress=True 실행 완료 (시각적 확인 필요)") + + # 2. verbose=True 테스트 + print("\n [2] verbose=True 테스트 (상세 로그가 출력되어야 함):") + small_df = pd.DataFrame({"id": [1]}) + atio.write(small_df, os.path.join(TEST_OUTPUT_DIR, "verbose.csv"), "csv", verbose=True) + print_test_result(True, "verbose=True 실행 완료 (로그 확인 필요)") + + except Exception as e: + print(f" ❌ 옵션 테스트 중 예기치 않은 오류: {e}") + +def test_write_database(): + """atio.write() - 데이터베이스 쓰기 (Pandas, Polars) 테스트""" + print_test_header("atio.write() - 데이터베이스 쓰기 (sql, database)") + + # 1. Pandas (sql) - in-memory + try: + pd_df = pd.DataFrame({"pd_col": [1, 2]}) + engine = create_engine('sqlite:///:memory:') + atio.write(pd_df, format="sql", name='pd_sql_test', con=engine) + print_test_result(True, "Pandas format='sql' (in-memory) 저장") + except Exception as e: + print_test_result(False, f"Pandas format='sql' 저장 중 오류: {e}") + + # 2. Polars (database) - file-based (connection_uri) + try: + pl_df = pl.DataFrame({"pl_col": [True, False]}) + db_path = os.path.join(TEST_OUTPUT_DIR, "polars.db") + engine_uri = f"sqlite:///{db_path}" + + # (수정) Polars는 'connection' 인자를, core.py의 검증 로직은 'connection_uri'를 기대 + # 두 인자를 모두 전달하여 core.py의 검증과 polars의 실행을 모두 만족시킴 + atio.write(pl_df, format="database", table_name='pl_db_test', + connection_uri=engine_uri, connection=engine_uri) + print_test_result(True, "Polars format='database' (file-based) 저장") + except Exception as e: + print_test_result(False, f"Polars format='database' 저장 중 오류: {e}") + + +# --- 2. 데이터 스냅샷 테스트 (write_snapshot, read_table, rollback, delete, export) --- + +def test_data_snapshot_lifecycle(): + """데이터 스냅샷의 전체 생명주기(생성, 추가, 읽기, 롤백, 삭제, 내보내기)를 테스트합니다.""" + print_test_header("데이터 스냅샷 생명주기 (종합 테스트)") + + try: + # --- 1. write_snapshot (overwrite) 테스트 --- + print("\n [1] write_snapshot (overwrite) - 다양한 타입") + + # v1: Pandas + df_pd = pd.DataFrame({'id': [1, 2], 'pd_val': ['a', 'b']}) + atio.write_snapshot(df_pd, DATA_SNAPSHOT_PATH) + print_test_result(True, "v1 (Pandas, 2 rows) 생성") + + # v2: Polars + df_pl = pl.DataFrame({'id': [1, 2], 'pl_val': [True, False]}) + atio.write_snapshot(df_pl, DATA_SNAPSHOT_PATH) + print_test_result(True, "v2 (Polars, 2 rows) 생성") + + # v3: NumPy + arr_np = np.array([[1.1, 1.2], [2.1, 2.2]]) + atio.write_snapshot(arr_np, DATA_SNAPSHOT_PATH) + print_test_result(True, "v3 (NumPy, 2 rows) 생성") + + # v4: Arrow + arr_pa = pa.Table.from_pydict({'id': [10], 'pa_val': ['arrow']}) + atio.write_snapshot(arr_pa, DATA_SNAPSHOT_PATH) + print_test_result(True, "v4 (Arrow, 1 row) 생성") + + # --- 2. write_snapshot (append) 테스트 --- + print("\n [2] write_snapshot (mode='append')") + + # (수정) v4 (1 row)에 append 하므로, 1 row짜리 데이터프레임을 생성해야 함 + df_append = pd.DataFrame({'appended_col': [100]}) + + # v4에 'appended_col'을 추가하여 v5 생성 + atio.write_snapshot(df_append, DATA_SNAPSHOT_PATH, mode='append') + print_test_result(True, "v5 (Append, 1 row) 생성") + + # --- 3. read_table() 테스트 --- + print("\n [3] read_table() - output_as 옵션") + + # v1(Pandas) -> Pandas + read_v1 = atio.read_table(DATA_SNAPSHOT_PATH, version=1, output_as='pandas') + assert isinstance(read_v1, pd.DataFrame) and 'pd_val' in read_v1.columns + print_test_result(True, "v1(Pandas) -> output_as='pandas' 읽기") + + # v2(Polars) -> Polars + read_v2 = atio.read_table(DATA_SNAPSHOT_PATH, version=2, output_as='polars') + assert isinstance(read_v2, pl.DataFrame) and 'pl_val' in read_v2.columns + print_test_result(True, "v2(Polars) -> output_as='polars' 읽기") + + # v3(NumPy) -> NumPy + read_v3 = atio.read_table(DATA_SNAPSHOT_PATH, version=3, output_as='numpy') + assert isinstance(read_v3, np.ndarray) and read_v3.shape == (2, 2) + print_test_result(True, "v3(NumPy) -> output_as='numpy' 읽기") + + # v4(Arrow) -> Arrow + read_v4 = atio.read_table(DATA_SNAPSHOT_PATH, version=4, output_as='arrow') + assert isinstance(read_v4, pa.Table) and 'pa_val' in read_v4.schema.names + print_test_result(True, "v4(Arrow) -> output_as='arrow' 읽기") + + # v5(Append) -> Pandas + read_v5 = atio.read_table(DATA_SNAPSHOT_PATH, version=5, output_as='pandas') + # (수정) v5는 1 row 여야 함 + assert 'pa_val' in read_v5.columns and 'appended_col' in read_v5.columns and len(read_v5) == 1 + print_test_result(True, "v5(Append) 읽기 (v4 + appended_col)") + + # 최신 버전(v5) 읽기 + read_latest = atio.read_table(DATA_SNAPSHOT_PATH, version=None, output_as='pandas') + assert 'appended_col' in read_latest.columns + print_test_result(True, "최신 버전 (v5) 읽기 (version=None)") + + + # --- 4. rollback() 테스트 --- + print("\n [4] rollback() - v5 -> v3 롤백") + # Pre-condition: 현재 v5 + atio.rollback(DATA_SNAPSHOT_PATH, version_id=3) + read_after_rollback = atio.read_table(DATA_SNAPSHOT_PATH, output_as='numpy') + assert read_after_rollback.shape == (2, 2) + print_test_result(True, "v3 롤백 성공 (최신 버전이 v3가 됨)") + + # --- 5. delete_version() 테스트 --- + print("\n [5] delete_version() - v5 삭제") + # Pre-condition: 현재 v3이므로 v5는 최신이 아님 -> 삭제 가능 + delete_result = atio.delete_version(DATA_SNAPSHOT_PATH, version_id=5) + assert delete_result is True + print_test_result(True, "v5 (비활성 버전) 삭제 성공") + + # 삭제된 v5 읽기 시도 (실패해야 정상) + try: + atio.read_table(DATA_SNAPSHOT_PATH, version=5) + print_test_result(False, "삭제된 v5 읽기 시도 (오류가 발생해야 함)") + except FileNotFoundError: + print_test_result(True, "삭제된 v5 읽기 시도 (예상대로 FileNotFoundError 발생)") + except Exception: + # core.py 구현에 따라 다른 오류가 날 수도 있음 + print_test_result(True, "삭제된 v5 읽기 시도 (예상대로 오류 발생)") + + # [Pre condition] 최신 버전(v3) 삭제 시도 (실패해야 정상) + print("\n [5-2] delete_version() - v3 (최신) 삭제 시도") + delete_latest_result = atio.delete_version(DATA_SNAPSHOT_PATH, version_id=3) + assert delete_latest_result is False + print_test_result(True, "v3 (활성 버전) 삭제 시도 (예상대로 거부됨)") + + # --- 6. export_to_datalake() 테스트 --- + print("\n [6] export_to_datalake() - v2(Polars) 내보내기") + export_path = os.path.join(TEST_OUTPUT_DIR, "datalake_export.parquet") + atio.export_to_datalake(DATA_SNAPSHOT_PATH, version=2, output_path=export_path) + assert os.path.exists(export_path) + print_test_result(True, f"Parquet 파일 생성 완료: {export_path}") + + except Exception as e: + print_test_result(False, f"데이터 스냅샷 테스트 중 예기치 않은 오류: {e}") + # 오류 발생 시에도 테스트가 계속 진행되도록 함 (필요시) + import traceback + traceback.print_exc() + + +# --- 3. 모델 스냅샷 테스트 (write_model_snapshot, read_model_snapshot, rollback, delete) --- + +def test_model_snapshot_lifecycle(): + """모델 스냅샷의 전체 생명주기(생성, 읽기, 롤백, 삭제)를 테스트합니다.""" + print_test_header("모델 스냅샷 생명주기 (종합 테스트)") + + if not _TORCH_AVAILABLE and not _TENSORFLOW_AVAILABLE: + print(" ⚠️ Torch와 TensorFlow가 모두 없어 모델 스냅샷 테스트를 건너뜁니다.") + return + + try: + # --- 1. write_model_snapshot (PyTorch) --- + if _TORCH_AVAILABLE: + print("\n [1] write_model_snapshot (PyTorch)") + pth_path = os.path.join(TEST_OUTPUT_DIR, "dummy_model_v1.pth") + if not os.path.exists(pth_path): + print_test_result(False, "PyTorch 더미 모델 파일 없음. (create_dummy_models 실패?)") + return + + atio.write_model_snapshot(pth_path, MODEL_SNAPSHOT_PATH, show_progress=True) + print_test_result(True, "v1 (PyTorch) 생성") + else: + print("\n [1] write_model_snapshot (PyTorch) - 건너뜀") + + # --- 2. write_model_snapshot (TensorFlow) --- + if _TENSORFLOW_AVAILABLE: + print("\n [2] write_model_snapshot (TensorFlow)") + tf_path = os.path.join(TEST_OUTPUT_DIR, "dummy_model_tf_v1") + if not os.path.exists(tf_path): + print_test_result(False, "TensorFlow 더미 모델 파일 없음. (create_dummy_models 실패?)") + return + + atio.write_model_snapshot(tf_path, MODEL_SNAPSHOT_PATH, show_progress=True) + print_test_result(True, "v2 (TensorFlow) 생성") + else: + print("\n [2] write_model_snapshot (TensorFlow) - 건너뜀") + + # --- 3. read_model_snapshot (mode='auto') --- + print("\n [3] read_model_snapshot (mode='auto')") + if _TORCH_AVAILABLE: + try: + model_v1 = atio.read_model_snapshot(MODEL_SNAPSHOT_PATH, version=1, mode='auto') + assert isinstance(model_v1, dict) and 'layer1.weight' in model_v1 + print_test_result(True, "v1 (PyTorch) 'auto' 모드 로딩 성공") + except Exception as e: + print_test_result(False, f"v1 (PyTorch) 'auto' 로딩 실패: {e}") + + if _TENSORFLOW_AVAILABLE: + try: + # v2가 있어야 함 (torch만 테스트 시 v2가 없음) + if not os.path.exists(os.path.join(MODEL_SNAPSHOT_PATH, 'metadata', 'v2.metadata.json')): + print(" - v2 (TensorFlow) 스냅샷이 없어 'auto' 읽기 테스트를 건너뜁니다.") + else: + model_v2 = atio.read_model_snapshot(MODEL_SNAPSHOT_PATH, version=2, mode='auto') + assert "v" in dir(model_v2) # SimpleModule의 'v' 변수 확인 + print_test_result(True, "v2 (TensorFlow) 'auto' 모드 로딩 성공") + except Exception as e: + print_test_result(False, f"v2 (TensorFlow) 'auto' 로딩 실패: {e}") + + # --- 4. read_model_snapshot (mode='restore') --- + print("\n [4] read_model_snapshot (mode='restore')") + if _TORCH_AVAILABLE: + try: + restore_pth_path = os.path.join(MODEL_RESTORE_PATH, "restored_v1.pth") + atio.read_model_snapshot(MODEL_SNAPSHOT_PATH, version=1, mode='restore', destination_path=restore_pth_path, show_progress=True) + assert os.path.exists(restore_pth_path) + print_test_result(True, f"v1 (PyTorch) 'restore' 모드 복원 성공: {restore_pth_path}") + except Exception as e: + print_test_result(False, f"v1 (PyTorch) 'restore' 복원 실패: {e}") + + if _TENSORFLOW_AVAILABLE: + try: + if not os.path.exists(os.path.join(MODEL_SNAPSHOT_PATH, 'metadata', 'v2.metadata.json')): + print(" - v2 (TensorFlow) 스냅샷이 없어 'restore' 읽기 테스트를 건너뜁니다.") + else: + restore_tf_path = os.path.join(MODEL_RESTORE_PATH, "restored_v2_tf") + atio.read_model_snapshot(MODEL_SNAPSHOT_PATH, version=2, mode='restore', destination_path=restore_tf_path, show_progress=True) + assert os.path.exists(os.path.join(restore_tf_path, "saved_model.pb")) + print_test_result(True, f"v2 (TensorFlow) 'restore' 모드 복원 성공: {restore_tf_path}") + except Exception as e: + print_test_result(False, f"v2 (TensorFlow) 'restore' 복원 실패: {e}") + + # --- 5. rollback() 테스트 --- + print("\n [5] rollback() - v2 -> v1 롤백") + if _TORCH_AVAILABLE and _TENSORFLOW_AVAILABLE: + # Pre-condition: 현재 v2 + atio.rollback(MODEL_SNAPSHOT_PATH, version_id=1) + model_after_rollback = atio.read_model_snapshot(MODEL_SNAPSHOT_PATH, mode='auto') + assert isinstance(model_after_rollback, dict) # PyTorch(v1)인지 확인 + print_test_result(True, "v1 롤백 성공 (최신 버전이 v1가 됨)") + else: + print(" - 롤백 테스트는 PyTorch와 TensorFlow 스냅샷이 모두 필요하므로 건너뜁니다.") + + # --- 6. delete_version() 테스트 --- + print("\n [6] delete_version() - v2 삭제") + if _TORCH_AVAILABLE and _TENSORFLOW_AVAILABLE: + # Pre-condition: 현재 v1이므로 v2는 최신이 아님 -> 삭제 가능 + delete_result = atio.delete_version(MODEL_SNAPSHOT_PATH, version_id=2) + assert delete_result is True + print_test_result(True, "v2 (비활성 버전) 삭제 성공") + + # 삭제된 v2 읽기 시도 (실패해야 정상) + try: + atio.read_model_snapshot(MODEL_SNAPSHOT_PATH, version=2, mode='auto') + print_test_result(False, "삭제된 v2 읽기 시도 (오류가 발생해야 함)") + except FileNotFoundError: + print_test_result(True, "삭제된 v2 읽기 시도 (예상대로 FileNotFoundError 발생)") + except Exception: + print_test_result(True, "삭제된 v2 읽기 시도 (예상대로 오류 발생)") + else: + print(" - 삭제 테스트는 PyTorch와 TensorFlow 스냅샷이 모두 필요하므로 건너뜁니다.") + + except Exception as e: + print_test_result(False, f"모델 스냅샷 테스트 중 예기치 않은 오류: {e}") + import traceback + traceback.print_exc() + +def cleanup_demo_files(): + """데모 실행 후 생성된 파일들을 정리합니다.""" + print("\n" + "=" * 60) + print("🧹 테스트 파일 정리") + print("=" * 60) + + if not os.path.exists(TEST_OUTPUT_DIR): + print(f"🗑️ 정리할 디렉토리가 없습니다: '{TEST_OUTPUT_DIR}'") + return + + print(f"🗑️ 생성된 테스트 디렉토리: '{TEST_OUTPUT_DIR}'") + + print("\n❓ 테스트 디렉토리를 삭제하시겠습니까? (y/n): ", end="") + try: + response = input().lower().strip() + except (EOFError, KeyboardInterrupt): + response = 'n' + print("\n입력 없이 종료하여 파일을 보존합니다.") + + if response == 'y': + try: + shutil.rmtree(TEST_OUTPUT_DIR) + print(f"\n✅ '{TEST_OUTPUT_DIR}' 디렉토리와 모든 내용이 삭제되었습니다.") + except Exception as e: + print(f"\n❌ 디렉토리 삭제 중 오류 발생: {e}") + else: + print(f"\n📁 '{TEST_OUTPUT_DIR}' 디렉토리가 보존되었습니다.") + +def main(): + """ + 메인 기능 테스트 실행 함수 + Atio의 모든 주요 기능을 순차적으로 테스트합니다. + """ + try: + setup_test_environment() + create_dummy_models() + + # --- atio.write() 테스트 --- + test_write_pandas() + test_write_polars() + test_write_numpy() + test_write_options() + test_write_database() + + # --- 데이터 스냅샷 테스트 --- + test_data_snapshot_lifecycle() + + # --- 모델 스냅샷 테스트 --- + test_model_snapshot_lifecycle() + + except Exception as e: + print(f"\n" + "!" * 60) + print(f" CRITICAL: 테스트 실행 중 치명적인 오류 발생: {e}") + import traceback + traceback.print_exc() + print("!" * 60) + finally: + # 파일 정리 + cleanup_demo_files() + + print("\n" + "=" * 60) + print("🎉 Atio 기능 테스트 완료!") + print("=" * 60) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/writeModelSanpshotTest.py b/writeModelSanpshotTest.py new file mode 100644 index 0000000..d067897 --- /dev/null +++ b/writeModelSanpshotTest.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Atio Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Atio Model Snapshot 기능 테스트 스크립트 +(write_model_snapshot, read_model_snapshot, rollback, delete_version) +""" + +# --- 모듈 경로 설정을 위한 코드 --- +import sys +import os +import shutil +import time +import tempfile # TensorFlow 로딩 시 임시 폴더 사용 + +# 현재 스크립트의 경로를 기준으로 'src' 폴더의 절대 경로를 계산합니다. +project_root = os.path.dirname(os.path.abspath(__file__)) +src_path = os.path.join(project_root, 'src') +if src_path not in sys.path: + sys.path.insert(0, src_path) +# ------------------------------------ + +import atio +# 데이터 관련 라이브러리(pandas, numpy 등)는 여기서 필요 없음 + +# --- 선택적 라이브러리 임포트 (모델 테스트용) --- +try: + import torch + _TORCH_AVAILABLE = True +except ImportError: + _TORCH_AVAILABLE = False + print("⚠️ PyTorch가 설치되지 않았습니다. 모델 스냅샷(PyTorch) 테스트를 건너뜁니다.") + +try: + import tensorflow as tf + _TENSORFLOW_AVAILABLE = True +except ImportError: + _TENSORFLOW_AVAILABLE = False + print("⚠️ TensorFlow가 설치되지 않았습니다. 모델 스냅샷(TensorFlow) 테스트를 건너뜁니다.") + + +# --- 전역 테스트 설정 --- +TEST_OUTPUT_DIR = "atio_model_snapshot_tests" # 디렉토리 이름 변경 +MODEL_SNAPSHOT_PATH = os.path.join(TEST_OUTPUT_DIR, "model_table") +MODEL_RESTORE_PATH = os.path.join(TEST_OUTPUT_DIR, "model_restore") +# 데이터 스냅샷 경로는 필요 없음 + +# --- 헬퍼 함수 --- +def print_test_header(title): + print("\n" + "=" * 60) + print(f"🧪 테스트 시작: {title}") + print("=" * 60) + +def print_test_result(success, message=""): + if success: + print(f" ✅ [성공] {message}") + else: + print(f" ❌ [실패] {message}") + +def setup_test_environment(): + """테스트 실행 전 테스트 디렉토리를 정리하고 생성합니다.""" + print(f"🔧 테스트 환경 설정: '{TEST_OUTPUT_DIR}' 디렉토리 정리 및 생성...") + if os.path.exists(TEST_OUTPUT_DIR): + shutil.rmtree(TEST_OUTPUT_DIR) + os.makedirs(TEST_OUTPUT_DIR, exist_ok=True) + # os.makedirs(MODEL_RESTORE_PATH, exist_ok=True) # 모델 복원 경로는 필요 + print("✅ 테스트 환경 준비 완료.") + +def create_dummy_models(): + """테스트에 사용할 더미 모델 파일(.pth, saved_model)을 생성합니다.""" + print("🔧 더미 모델 파일 생성 중...") + # 1. PyTorch 더미 모델 생성 + if _TORCH_AVAILABLE: + try: + pth_path = os.path.join(TEST_OUTPUT_DIR, "dummy_model_v1.pth") + state_dict = {'layer1.weight': torch.randn(10, 5), 'layer1.bias': torch.randn(10)} + torch.save(state_dict, pth_path) + print(f" ✅ PyTorch 더미 모델 생성 완료: {pth_path}") + except Exception as e: + print(f" ❌ PyTorch 더미 모델 생성 실패: {e}") + # 2. TensorFlow 더미 모델 생성 + if _TENSORFLOW_AVAILABLE: + try: + tf_path = os.path.join(TEST_OUTPUT_DIR, "dummy_model_tf_v1") + class SimpleModule(tf.Module): + def __init__(self, name=None): + super().__init__(name=name) + self.v = tf.Variable(5.0, name="v1") + @tf.function + def __call__(self, x): + return self.v * x + model = SimpleModule() + tf.saved_model.save(model, tf_path) + print(f" ✅ TensorFlow 더미 모델 생성 완료: {tf_path}") + except Exception as e: + print(f" ❌ TensorFlow 더미 모델 생성 실패: {e}") + +# --- 모델 스냅샷 테스트 함수 --- + +def test_model_snapshot_lifecycle(): + """모델 스냅샷의 전체 생명주기(생성, 읽기, 롤백, 삭제)를 테스트합니다.""" + print_test_header("모델 스냅샷 생명주기 (종합 테스트)") + if not _TORCH_AVAILABLE and not _TENSORFLOW_AVAILABLE: + print(" ⚠️ Torch와 TensorFlow가 모두 없어 모델 스냅샷 테스트를 건너뜁니다.") + return + try: + # --- 1. write_model_snapshot (PyTorch) --- + if _TORCH_AVAILABLE: + print("\n [1] write_model_snapshot (PyTorch)") + pth_path = os.path.join(TEST_OUTPUT_DIR, "dummy_model_v1.pth") + if not os.path.exists(pth_path): + print_test_result(False, "PyTorch 더미 모델 파일 없음.") + return + atio.write_model_snapshot(pth_path, MODEL_SNAPSHOT_PATH, show_progress=True) + print_test_result(True, "v1 (PyTorch) 생성") + else: + print("\n [1] write_model_snapshot (PyTorch) - 건너뜀") + + # --- 2. write_model_snapshot (TensorFlow) --- + if _TENSORFLOW_AVAILABLE: + print("\n [2] write_model_snapshot (TensorFlow)") + tf_path = os.path.join(TEST_OUTPUT_DIR, "dummy_model_tf_v1") + if not os.path.exists(tf_path): + print_test_result(False, "TensorFlow 더미 모델 파일 없음.") + return + atio.write_model_snapshot(tf_path, MODEL_SNAPSHOT_PATH, show_progress=True) + print_test_result(True, "v2 (TensorFlow) 생성") + else: + print("\n [2] write_model_snapshot (TensorFlow) - 건너뜀") + + # --- 5. rollback() --- + print("\n [5] rollback() - v2 -> v1 롤백") + if _TORCH_AVAILABLE and _TENSORFLOW_AVAILABLE: + atio.rollback(MODEL_SNAPSHOT_PATH, version_id=1) + model_after_rollback = atio.read_model_snapshot(MODEL_SNAPSHOT_PATH, mode='auto') + assert isinstance(model_after_rollback, dict) + print_test_result(True, "v1 롤백 성공 (최신 버전이 v1가 됨)") + else: + print(" - 롤백 테스트는 PyTorch와 TensorFlow 스냅샷이 모두 필요하므로 건너뜁니다.") + + # --- 6. delete_version() --- + print("\n [6] delete_version() - v2 삭제") + if _TORCH_AVAILABLE and _TENSORFLOW_AVAILABLE: + delete_result = atio.delete_version(MODEL_SNAPSHOT_PATH, version_id=2) + assert delete_result is True + print_test_result(True, "v2 (비활성 버전) 삭제 성공") + try: + atio.read_model_snapshot(MODEL_SNAPSHOT_PATH, version=2, mode='auto') + print_test_result(False, "삭제된 v2 읽기 시도 (오류가 발생해야 함)") + except FileNotFoundError: + print_test_result(True, "삭제된 v2 읽기 시도 (예상대로 FileNotFoundError 발생)") + except Exception: + print_test_result(True, "삭제된 v2 읽기 시도 (예상대로 오류 발생)") + else: + print(" - 삭제 테스트는 PyTorch와 TensorFlow 스냅샷이 모두 필요하므로 건너뜁니다.") + + except Exception as e: + print_test_result(False, f"모델 스냅샷 테스트 중 예기치 않은 오류: {e}") + import traceback + traceback.print_exc() + +# --- 파일 정리 함수 --- +def cleanup_demo_files(): + """테스트 실행 후 생성된 파일들을 정리합니다.""" + print("\n" + "=" * 60) + print("🧹 테스트 파일 정리") + print("=" * 60) + if not os.path.exists(TEST_OUTPUT_DIR): + print(f"🗑️ 정리할 디렉토리가 없습니다: '{TEST_OUTPUT_DIR}'") + return + print(f"🗑️ 생성된 테스트 디렉토리: '{TEST_OUTPUT_DIR}'") + print("\n❓ 테스트 디렉토리를 삭제하시겠습니까? (y/n): ", end="") + try: + response = input().lower().strip() + except (EOFError, KeyboardInterrupt): + response = 'n' + print("\n입력 없이 종료하여 파일을 보존합니다.") + if response == 'y': + try: + shutil.rmtree(TEST_OUTPUT_DIR) + print(f"\n✅ '{TEST_OUTPUT_DIR}' 디렉토리와 모든 내용이 삭제되었습니다.") + except Exception as e: + print(f"\n❌ 디렉토리 삭제 중 오류 발생: {e}") + else: + print(f"\n📁 '{TEST_OUTPUT_DIR}' 디렉토리가 보존되었습니다.") + +# --- 메인 실행 함수 --- +def main(): + """모델 스냅샷 기능 테스트 실행 함수""" + try: + setup_test_environment() + create_dummy_models() # 모델 스냅샷 테스트는 더미 모델 생성이 필요 + # --- 모델 스냅샷 테스트 호출 --- + test_model_snapshot_lifecycle() + except Exception as e: + print(f"\n" + "!" * 60) + print(f" CRITICAL: 테스트 실행 중 치명적인 오류 발생: {e}") + import traceback + traceback.print_exc() + print("!" * 60) + finally: + cleanup_demo_files() + print("\n" + "=" * 60) + print("🎉 Atio Model Snapshot 기능 테스트 완료!") + print("=" * 60) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/writeSnapshotTest.py b/writeSnapshotTest.py new file mode 100644 index 0000000..26017c3 --- /dev/null +++ b/writeSnapshotTest.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Atio Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Atio Data Snapshot 기능 테스트 스크립트 +(write_snapshot, read_table, rollback, delete_version, export_to_datalake) +""" + +# --- 모듈 경로 설정을 위한 코드 --- +import sys +import os +import shutil +import time + +# 현재 스크립트의 경로를 기준으로 'src' 폴더의 절대 경로를 계산합니다. +project_root = os.path.dirname(os.path.abspath(__file__)) +src_path = os.path.join(project_root, 'src') +if src_path not in sys.path: + sys.path.insert(0, src_path) +# ------------------------------------ + +import atio +import pandas as pd +import numpy as np +import polars as pl +import pyarrow as pa +# sqlalchemy는 이 파일에서 필요 없음 + +# --- 전역 테스트 설정 --- +TEST_OUTPUT_DIR = "atio_data_snapshot_tests" # 디렉토리 이름 변경 +DATA_SNAPSHOT_PATH = os.path.join(TEST_OUTPUT_DIR, "data_table") +# 모델 관련 경로는 필요 없음 + +# --- 헬퍼 함수 --- +def print_test_header(title): + print("\n" + "=" * 60) + print(f"🧪 테스트 시작: {title}") + print("=" * 60) + +def print_test_result(success, message=""): + if success: + print(f" ✅ [성공] {message}") + else: + print(f" ❌ [실패] {message}") + +def setup_test_environment(): + """테스트 실행 전 테스트 디렉토리를 정리하고 생성합니다.""" + print(f"테스트 환경 설정: '{TEST_OUTPUT_DIR}' 디렉토리 정리 및 생성...") + if os.path.exists(TEST_OUTPUT_DIR): + shutil.rmtree(TEST_OUTPUT_DIR) + os.makedirs(TEST_OUTPUT_DIR, exist_ok=True) + # 모델 복원 디렉토리 생성은 필요 없음 + print("✅ 테스트 환경 준비 완료.") + +# --- 데이터 스냅샷 테스트 함수 --- + +def test_data_snapshot_lifecycle(): + """데이터 스냅샷의 전체 생명주기(생성, 추가, 읽기, 롤백, 삭제, 내보내기)를 테스트합니다.""" + print_test_header("데이터 스냅샷 생명주기 (종합 테스트)") + try: + # # --- write_snapshot (overwrite) --- + # print("\n [1] write_snapshot (overwrite) - 다양한 타입") + + # # Pandas DataFrame + # df_pd = pd.DataFrame({'id': [1, 2], 'pd_val': ['a', 'b']}) + # atio.write_snapshot(df_pd, DATA_SNAPSHOT_PATH) + # print_test_result(True, "v1 (Pandas, 2 rows) 생성") + + # # Polars DataFrame + # df_pl = pl.DataFrame({'id': [1, 2], 'pl_val': [True, False]}) + # atio.write_snapshot(df_pl, DATA_SNAPSHOT_PATH) + # print_test_result(True, "v2 (Polars, 2 rows) 생성") + + # # NumPy Array + # arr_np = np.array([[1.1, 1.2], [2.1, 2.2]]) + # atio.write_snapshot(arr_np, DATA_SNAPSHOT_PATH) + # print_test_result(True, "v3 (NumPy, 2 rows) 생성") + + # # PyArrow Table + # arr_pa = pa.Table.from_pydict({'id': [10], 'pa_val': ['arrow']}) + # atio.write_snapshot(arr_pa, DATA_SNAPSHOT_PATH) + # print_test_result(True, "v4 (Arrow, 1 row) 생성") + + # # --- 3. read_table() 테스트 --- + # print("\n [3] read_table() - output_as 옵션 및 내용 확인") # 제목 수정 + + # # v1(Pandas) -> Pandas + # read_v1 = atio.read_table(DATA_SNAPSHOT_PATH, version=1, output_as='pandas') + # assert isinstance(read_v1, pd.DataFrame) and 'pd_val' in read_v1.columns + # print_test_result(True, "v1(Pandas) -> output_as='pandas' 읽기 성공") + # print(" --- v1 (Pandas) 내용 ---") + # print(read_v1) # 읽어온 Pandas DataFrame 출력 + # print(" -------------------------") + + # # v2(Polars) -> Polars + # read_v2 = atio.read_table(DATA_SNAPSHOT_PATH, version=2, output_as='polars') + # assert isinstance(read_v2, pl.DataFrame) and 'pl_val' in read_v2.columns + # print_test_result(True, "v2(Polars) -> output_as='polars' 읽기 성공") + # print(" --- v2 (Polars) 내용 ---") + # print(read_v2) # 읽어온 Polars DataFrame 출력 + # print(" -------------------------") + + # # v3(NumPy) -> NumPy + # read_v3 = atio.read_table(DATA_SNAPSHOT_PATH, version=3, output_as='numpy') + # assert isinstance(read_v3, np.ndarray) and read_v3.shape == (2, 2) + # print_test_result(True, "v3(NumPy) -> output_as='numpy' 읽기 성공") + # print(" --- v3 (NumPy) 내용 ---") + # print(read_v3) # 읽어온 NumPy Array 출력 + # print(" ------------------------") + + # # v4(Arrow) -> Arrow + # read_v4 = atio.read_table(DATA_SNAPSHOT_PATH, version=4, output_as='arrow') + # assert isinstance(read_v4, pa.Table) and 'pa_val' in read_v4.schema.names + # print_test_result(True, "v4(Arrow) -> output_as='arrow' 읽기 성공") + # print(" --- v4 (Arrow) 내용 ---") + # print(read_v4) # 읽어온 PyArrow Table 출력 + # print(" ------------------------") + + + # # Pandas DataFrame V1 생성 + # df_pd = pd.DataFrame({'id': [1, 2], 'pd_val': ['a', 'b']}) + # atio.write_snapshot(df_pd, DATA_SNAPSHOT_PATH) + # print_test_result(True, "v1 (Pandas, 2 rows) 생성") + + # # --- 2. write_snapshot (append) --- V1에 열 추가하여 저장 + # print("\n [2] write_snapshot (mode='append')") + # df_append = pd.DataFrame({'appended_col': [100, 200]}) + # atio.write_snapshot(df_append, DATA_SNAPSHOT_PATH, mode='append') + # print_test_result(True, "v2 (Append, 2 rows) 생성") + + # read_v1 = atio.read_table(DATA_SNAPSHOT_PATH, version=1, output_as='pandas') + # assert isinstance(read_v1, pd.DataFrame) and 'pd_val' in read_v1.columns + # print_test_result(True, "v1(Pandas) -> output_as='pandas' 읽기 성공") + # print(" --- v1 (Pandas) 내용 ---") + # print(read_v1) # 읽어온 Pandas DataFrame 출력 + # print(" -------------------------") + + # read_v2 = atio.read_table(DATA_SNAPSHOT_PATH, version=2, output_as='pandas') + # assert isinstance(read_v2, pd.DataFrame) and 'pd_val' in read_v2.columns + # print_test_result(True, "v2(Pandas) -> output_as='pandas' 읽기 성공") + # print(" --- v2 (Pandas) 내용 ---") + # print(read_v2) # 읽어온 Pandas DataFrame 출력 + # print(" -------------------------") + + + # 테스트용 스냅샷 4개 생성 rollback 테스트 + + # # NumPy Array + # arr_np = np.array([[1.1, 1.2], [2.1, 2.2]]) + # atio.write_snapshot(arr_np, DATA_SNAPSHOT_PATH) + # print_test_result(True, "v3 (NumPy, 2 rows) 생성") + + # # PyArrow Table + # arr_pa = pa.Table.from_pydict({'id': [10], 'pa_val': ['arrow']}) + # atio.write_snapshot(arr_pa, DATA_SNAPSHOT_PATH) + # print_test_result(True, "v4 (Arrow, 1 row) 생성") + + # # --- 4. rollback() --- + # print("\n [4] rollback() - v4 -> v2 롤백") + # atio.rollback(DATA_SNAPSHOT_PATH, version_id=2) + # read_after_rollback = atio.read_table(DATA_SNAPSHOT_PATH, output_as='numpy') + # assert read_after_rollback.shape == (2, 2) + # print_test_result(True, "v2 롤백 성공 (최신 버전이 v2가 됨)") + + # # --- 5. delete_version() --- + # print("\n [5] delete_version() - v4 삭제") + # delete_result = atio.delete_version(DATA_SNAPSHOT_PATH, version_id=4) + # assert delete_result is True + # print_test_result(True, "v4 (구버전) 삭제 성공") + # try: + # atio.read_table(DATA_SNAPSHOT_PATH, version=4) + # print_test_result(False, "삭제된 v4 읽기 시도 (오류가 발생해야 함)") + # except FileNotFoundError: + # print_test_result(True, "삭제된 v4 읽기 시도 (예상대로 FileNotFoundError 발생)") + # except Exception: + # print_test_result(True, "삭제된 v4 읽기 시도 (예상대로 오류 발생)") + # print("\n [5-2] delete_version() - v2 (최신) 삭제 시도") + # delete_latest_result = atio.delete_version(DATA_SNAPSHOT_PATH, version_id=2) + # assert delete_latest_result is False + # print_test_result(True, "v2 (최신 버전) 삭제 시도") + + + # Pandas DataFrame + df_pd = pd.DataFrame({'id': [1, 2], 'pd_val': ['a', 'b']}) + atio.write_snapshot(df_pd, DATA_SNAPSHOT_PATH) + print_test_result(True, "v1 (Pandas, 2 rows) 생성") + + # Polars DataFrame + df_pl = pl.DataFrame({'id': [1, 2], 'pl_val': [True, False]}) + atio.write_snapshot(df_pl, DATA_SNAPSHOT_PATH) + print_test_result(True, "v2 (Polars, 2 rows) 생성") + + # --- 6. export_to_datalake() --- + print("\n [6] export_to_datalake() - v2(Polars) 내보내기") + export_path = os.path.join(TEST_OUTPUT_DIR, "datalake_export.parquet") + atio.export_to_datalake(DATA_SNAPSHOT_PATH, version=2, output_path=export_path) + assert os.path.exists(export_path) + print_test_result(True, f"Parquet 파일 생성 완료: {export_path}") + + except Exception as e: + print_test_result(False, f"데이터 스냅샷 테스트 중 예기치 않은 오류: {e}") + import traceback + traceback.print_exc() + +# --- 파일 정리 함수 --- +def cleanup_demo_files(): + """테스트 실행 후 생성된 파일들을 정리합니다.""" + print("\n" + "=" * 60) + print("🧹 테스트 파일 정리") + print("=" * 60) + if not os.path.exists(TEST_OUTPUT_DIR): + print(f"🗑️ 정리할 디렉토리가 없습니다: '{TEST_OUTPUT_DIR}'") + return + print(f"🗑️ 생성된 테스트 디렉토리: '{TEST_OUTPUT_DIR}'") + print("\n❓ 테스트 디렉토리를 삭제하시겠습니까? (y/n): ", end="") + try: + response = input().lower().strip() + except (EOFError, KeyboardInterrupt): + response = 'n' + print("\n입력 없이 종료하여 파일을 보존합니다.") + if response == 'y': + try: + shutil.rmtree(TEST_OUTPUT_DIR) + print(f"\n✅ '{TEST_OUTPUT_DIR}' 디렉토리와 모든 내용이 삭제되었습니다.") + except Exception as e: + print(f"\n❌ 디렉토리 삭제 중 오류 발생: {e}") + else: + print(f"\n📁 '{TEST_OUTPUT_DIR}' 디렉토리가 보존되었습니다.") + +# --- 메인 실행 함수 --- +def main(): + """데이터 스냅샷 기능 테스트 실행 함수""" + try: + setup_test_environment() + # --- 데이터 스냅샷 테스트 호출 --- + test_data_snapshot_lifecycle() + except Exception as e: + print(f"\n" + "!" * 60) + print(f" CRITICAL: 테스트 실행 중 치명적인 오류 발생: {e}") + import traceback + traceback.print_exc() + print("!" * 60) + finally: + cleanup_demo_files() + print("\n" + "=" * 60) + print("🎉 Atio Data Snapshot 기능 테스트 완료!") + print("=" * 60) + +if __name__ == "__main__": + main() \ No newline at end of file From afebbdedc47bda19216e18edf67a884df4aa1c40 Mon Sep 17 00:00:00 2001 From: realstone3639 Date: Sun, 26 Oct 2025 21:00:19 +0900 Subject: [PATCH 2/2] =?UTF-8?q?=EA=B5=AC=EB=B2=84=EC=A0=84=20demo=20?= =?UTF-8?q?=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- demo_atio_usage.py | 430 --------------------------------------------- 1 file changed, 430 deletions(-) delete mode 100644 demo_atio_usage.py diff --git a/demo_atio_usage.py b/demo_atio_usage.py deleted file mode 100644 index ba82ac7..0000000 --- a/demo_atio_usage.py +++ /dev/null @@ -1,430 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright 2025 Atio Developers -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Atio 사용 예제 데모 -안전한 원자적 파일 쓰기와 데이터베이스 쓰기의 다양한 사용법을 보여줍니다. -""" -# --- 모듈 경로 설정을 위한 코드 --- -import sys -import os - -# 현재 스크립트의 경로를 기준으로 'src' 폴더의 절대 경로를 계산합니다. -# 이렇게 하면 어떤 위치에서 스크립트를 실행하더라도 'atio' 모듈을 찾을 수 있습니다. -project_root = os.path.dirname(os.path.abspath(__file__)) -src_path = os.path.join(project_root, 'src') -if src_path not in sys.path: - sys.path.insert(0, src_path) -# ------------------------------------ - -import atio -import pandas as pd -import numpy as np -import polars as pl -import time -from sqlalchemy import create_engine -import shutil -import json -# from .src.atio.core import export_to_datalake -def demo_basic_usage(): - """ - 기본 파일 기반 쓰기 사용법 데모 - atio.write() 함수의 기본적인 사용법을 보여줍니다. - """ - print("=" * 50) - print("1. 기본 사용법 (파일 쓰기)") - print("=" * 50) - - - data_array = np.array([1, 2, 3, 4, 5]) - - data_array2 = np.array([[1, 2, 3], - [4, 5, 6]]) - - atio.write(data_array, "data.npy", "npy") - # atio.write(data_array2, "data.npz", "npz") - # atio.write(data_array2, "data.npz_compressed", "npz_compressed") - atio.write(data_array, "data.csv", "csv") - atio.write(data_array, "data.bin", "bin") - - - # atio.write(df, "users.sql", format="sql") - - - - - # # to_csv에 추가 인자(index=False)를 전달하는 예시 - # atio.write(df, "users.csv", format="csv", index=False) - # print("✅ users.csv 저장 완료 (인덱스 제외)") - - # print("\n📁 생성된 파일들:") - # for file in ["users.parquet", "users.csv"]: - # if os.path.exists(file): - # size = os.path.getsize(file) - # print(f" - {file} ({size} bytes)") - # # _SUCCESS 파일 확인 - # success_file = os.path.join(os.path.dirname(file), f".{os.path.basename(file)}._SUCCESS") - # if os.path.exists(success_file): - # print(f" └─ {os.path.basename(success_file)} (저장 완료 플래그)") - -def demo_excel_and_sql(): - """ - Excel 및 SQL 데이터베이스 쓰기 데모 - 파일 시스템을 사용하지 않는 경우의 atio.write() 사용법을 보여줍니다. - """ - print("\n" + "=" * 50) - print("2. Excel 및 SQL 데이터베이스 쓰기") - print("=" * 50) - - df = pd.DataFrame({ - "product_id": [101, 102, 103], - "product_name": ["Laptop", "Mouse", "Keyboard"], - "price": [1200, 25, 75] - }) - print("📊 예제 데이터 (Products):") - print(df) - print() - - # --- Excel 쓰기 데모 --- - print("💾 Excel 파일 저장 중...") - try: - # to_excel에 추가 인자를 kwargs로 전달 - atio.write(df, "products.xlsx", format="excel", index=False, sheet_name="Stock") - print("✅ products.xlsx 저장 완료 (Sheet: Stock, 인덱스 제외)") - except Exception as e: - print(f"❌ Excel 저장 실패: {e}") - print(" (필요 라이브러리 설치: pip install openpyxl)") - - print("-" * 20) - - # --- SQL 쓰기 데모 --- - print("💾 SQL 데이터베이스에 저장 중...") - try: - # DB 쓰기를 위한 SQLAlchemy 엔진 생성 (인메모리 SQLite 사용) - # 실제 사용 시에는 PostgreSQL, MySQL 등의 DB 연결 문자열 사용 - engine = create_engine('sqlite:///:memory:') - - # DB 쓰기 시 target_path는 None 또는 생략 - # 'name' (테이블명), 'con' (커넥션)은 kwargs로 전달 - atio.write(df, format="sql", name="products", con=engine, if_exists='replace', index=False) - print("✅ 'products' 테이블에 데이터 저장 완료 (in-memory SQLite)") - - # 검증: 데이터베이스에서 다시 읽어오기 - print("\n🔍 검증: 데이터베이스에서 데이터 읽기...") - with engine.connect() as connection: - read_df = pd.read_sql("SELECT * FROM products", connection) - print(read_df) - print("✅ 검증 완료: 데이터가 성공적으로 저장되었습니다.") - - except ImportError: - print("❌ SQL 저장 실패: sqlalchemy 라이브러리가 필요합니다.") - print(" (설치: pip install sqlalchemy)") - except Exception as e: - print(f"❌ SQL 저장 중 오류 발생: {e}") - - -def demo_large_data(): - """ - 대용량 데이터 저장 데모 - show_progress=True 옵션을 사용한 진행도 표시 기능을 보여줍니다. - """ - print("\n" + "=" * 50) - print("3. 대용량 데이터 저장 (진행도 표시)") - print("=" * 50) - - print("📊 대용량 데이터 생성 중...") - large_df = pd.DataFrame(np.random.randn(2000000, 5), columns=list("ABCDE")) - - print(f"생성된 데이터 크기: {large_df.shape}") - print(f"메모리 사용량: {large_df.memory_usage(deep=True).sum() / 1024 / 1024:.1f} MB") - print() - - print("💾 대용량 파일 저장 중 (진행도 표시)...") - atio.write(large_df, "large_data.parquet", format="parquet", show_progress=True) - -def demo_performance_analysis(): - """ - 성능 분석 데모 - verbose=True 옵션을 사용한 상세한 성능 진단 정보를 보여줍니다. - """ - print("\n" + "=" * 50) - print("4. 성능 분석 (verbose 모드)") - print("=" * 50) - - df = pd.DataFrame(np.random.randn(10000, 3), columns=list("xyz")) - - print("📊 성능 분석용 데이터 생성 완료") - print(f"데이터 크기: {df.shape}") - print() - - print("🔍 성능 분석 모드로 저장 중...") - atio.write(df, "performance_test.parquet", format="parquet", verbose=True) - print("✅ 성능 분석 완료") - -def demo_numpy_arrays(): - """ - NumPy 배열 저장 데모 - NumPy 배열의 다양한 저장 형식을 보여줍니다. - """ - print("\n" + "=" * 50) - print("5. NumPy 배열 저장") - print("=" * 50) - - arr = np.random.randn(1000, 100) - print(f"📊 NumPy 배열 생성: {arr.shape}, dtype: {arr.dtype}") - print() - - print("💾 NumPy 배열 저장 중...") - atio.write(arr, "array.npy", format="npy", show_progress=True) - - # 딕셔너리를 npz로 저장 - atio.write({'arr1': arr, 'arr2': arr * 2}, "arrays.npz", format="npz") - print("✅ array.npy 및 arrays.npz 저장 완료") - -def demo_error_handling(): - """ - 오류 처리 데모 - Atio의 안전한 오류 처리 기능을 보여줍니다. - """ - print("\n" + "=" * 50) - print("6. 오류 처리 데모") - print("=" * 50) - - df = pd.DataFrame({"a": [1, 2, 3]}) - - print("❌ 지원하지 않는 형식으로 저장 시도...") - try: - atio.write(df, "test.xyz", format="xyz") - except ValueError as e: - print(f"✅ 예상된 오류 발생: {e}") - print(" → 원본 파일은 보존되고 임시 파일만 정리됨") - -def demo_polars_integration(): - """ - Polars 통합 데모 - Polars DataFrame의 저장 기능을 보여줍니다. - """ - print("\n" + "=" * 50) - print("7. Polars 통합") - print("=" * 50) - - try: - import polars as pl - - df = pl.DataFrame({ - "name": ["Alice", "Bob"], "score": [95.5, 87.3], "active": [True, False] - }) - - print("📊 Polars DataFrame 생성:") - print(df) - print() - - print("💾 Polars DataFrame 저장 중...") - atio.write(df, "polars_data.parquet", format="parquet") - print("✅ polars_data.parquet 저장 완료") - - except ImportError: - print("⚠️ Polars가 설치되지 않았습니다. (pip install polars)") - -def dataLake(): - TABLE_DIR = "snapshot_demo_table" - df1 = pd.DataFrame({'id': [1], 'value_a': ['A1']}) - atio.write_snapshot(df1, TABLE_DIR) # v1 생성 - atio.export_to_datalake(TABLE_DIR, version=1, output_path="datalake/v1.parquet") - - - -def demo_snapshots(): - """ - 스냅샷 생성, 롤백, 삭제 및 가비지 컬렉션(GC) 기능을 시연합니다. - """ - try: - # atio의 스냅샷 관련 모든 기능을 import 합니다. - from atio import write_snapshot, read_table, delete_version, rollback - import shutil - - print("\n" + "=" * 50) - print("8. 스냅샷 관리 데모") - print("=" * 50) - - TABLE_DIR = "snapshot_demo_table" - - def list_files(title, table_path): - """테이블의 현재 파일 목록을 출력하는 헬퍼 함수""" - print(f"\n--- {title} ---") - for dirpath, _, filenames in os.walk(table_path): - relative_path = os.path.relpath(dirpath, table_path) - if relative_path == '.': relative_path = '' - indent = ' ' * (relative_path.count(os.sep) * 2) - print(f"{indent}[{os.path.basename(dirpath)} 폴더]") - for f in sorted(filenames): - print(f"{indent} - {f}") - print("-" * (len(title) + 4)) - - # 데모 시작 전 이전 폴더 정리 - if os.path.exists(TABLE_DIR): - shutil.rmtree(TABLE_DIR) - - # --- 1. 테스트용 버전 3개 생성 --- - print("\n1. 테스트를 위해 v1, v2, v3 버전을 생성합니다...") - df1 = pd.DataFrame({'id': [1], 'value_a': ['A1']}) - write_snapshot(df1, TABLE_DIR) # v1 생성 - - df2 = pd.DataFrame({'id': [1], 'value_b': ['B2']}) - write_snapshot(df2, TABLE_DIR) # v2 생성 (id는 v1과 중복) - - df3 = pd.DataFrame({'value_c': ['C3']}) - write_snapshot(df3, TABLE_DIR) # v3 생성 - - list_files("v1, v2, v3 생성 후 파일 상태", TABLE_DIR) - print("🔍 분석: v1과 v2의 'id' 컬럼은 내용이 같아 data 폴더에 하나의 파일만 저장되었습니다.") - - # --- 2. 최신 버전 삭제 시도 (안전장치 확인) --- - print("\n\n2. 최신 버전(v3) 삭제를 시도합니다...") - delete_version(TABLE_DIR, version_id=3) - print("-> 예상대로 안전장치가 작동하여 삭제가 거부되었습니다.") - - # --- 3. 롤백 후 버전 삭제 --- - print("\n\n3. v2로 롤백한 후, 더 이상 최신이 아닌 v3을 삭제합니다.") - rollback(TABLE_DIR, version_id=2) - - print("\n[삭제 미리보기 (dry_run=True)]") - delete_version(TABLE_DIR, version_id=3, dry_run=True) - - print("\n[실제 삭제 실행]") - delete_version(TABLE_DIR, version_id=3, dry_run=False) - - list_files("v3 삭제 및 GC 후 파일 상태", TABLE_DIR) - print("🔍 분석: v3의 메타데이터와 고유 데이터('value_c')가 모두 정리되었습니다.") - - # --- 4. 최종 상태 검증 --- - print("\n\n4. 최종 상태를 검증합니다.") - print(" - 삭제된 v3 읽기 시도...") - try: - read_table(TABLE_DIR, version=3) - except Exception: - print(" -> 성공: 예상대로 v3을 읽을 수 없습니다.") - - print(" - 남아있는 v2 읽기 시도...") - df2_read = read_table(TABLE_DIR, version=2) - if df2_read is not None: - print(" -> 성공: v2는 정상적으로 읽을 수 있습니다.") - - print("\n✅ 스냅샷 데모 완료!") - # 최종 폴더 정리 - shutil.rmtree(TABLE_DIR) - - except ImportError: - print("\n⚠️ atio 라이브러리 또는 해당 기능(write_snapshot 등)을 찾을 수 없습니다.") - except Exception as e: - print(f"\n❌ 스냅샷 데모 중 오류 발생: {e}") - -def cleanup_demo_files(): - """데모 실행 후 생성된 파일들을 정리합니다.""" - print("\n" + "=" * 50) - print("8. 데모 파일 정리") - print("=" * 50) - - # 개별 파일 목록 - demo_files = [ - "users.parquet", "users.csv", "products.xlsx", - "large_data.parquet", "performance_test.parquet", - "array.npy", "arrays.npz", "polars_data.parquet" - ] - - # (핵심 수정) 디렉토리 목록 추가 - demo_dirs = ["snapshot_demo_table"] - - all_files_to_check = [] - for f in demo_files: - all_files_to_check.append(f) - success_flag = os.path.join(os.path.dirname(f), f".{os.path.basename(f)}._SUCCESS") - all_files_to_check.append(success_flag) - - found_files = [f for f in all_files_to_check if os.path.exists(f)] - found_dirs = [d for d in demo_dirs if os.path.exists(d)] - - if not found_files and not found_dirs: - print("🗑️ 정리할 데모 파일이나 디렉토리가 없습니다.") - return - - print("🗑️ 생성된 데모 파일 및 디렉토리 목록:") - for item in found_files + found_dirs: - print(f" - {item}") - - print("\n❓ 데모 파일들을 삭제하시겠습니까? (y/n): ", end="") - try: - response = input().lower().strip() - except (EOFError, KeyboardInterrupt): - response = 'n' - print("\n입력 없이 종료하여 파일을 보존합니다.") - - if response == 'y': - # 파일 삭제 - for file in found_files: - if os.path.exists(file): - os.remove(file) - print(f"🗑️ 파일 '{file}' 삭제됨") - - # (핵심 수정) 디렉토리 삭제 - for directory in found_dirs: - if os.path.exists(directory): - shutil.rmtree(directory) - print(f"🗑️ 디렉토리 '{directory}' 삭제됨") - - print("\n✅ 모든 데모 파일이 정리되었습니다.") - else: - print("\n📁 데모 파일들이 보존되었습니다.") - -def modelSnapshot(): - TABLE_DIR = "snapshot_demo_table" - # atio.write_model_snapshot("C:/Users/reals/Desktop/OSS/weight/output_weights/distilroberta_weights.pth", TABLE_DIR, show_progress=True) - # atio.write_model_snapshot("C:/Users/reals/Desktop/OSS/weight/output_weights/mnli_weights.pth", TABLE_DIR) - # atio.read_model_snapshot(TABLE_DIR, version=None, mode='auto', weights_only=True) - # atio.read_model_snapshot(TABLE_DIR, version=None, mode='restore', destination_path='destination', weights_only=True) - atio.rollback(TABLE_DIR, version_id=1) - atio.delete_version(TABLE_DIR, version_id=2) - # atio.read_model_snapshot(TABLE_DIR, version=None, mode='auto', weights_only=True) - - -def main(): - """ - 메인 데모 실행 함수 - Atio의 모든 주요 기능을 순차적으로 실행합니다. - """ - print("🚀 Atio 사용 예제 데모 시작!") - print("안전한 원자적 파일 쓰기와 데이터베이스 쓰기의 다양한 기능을 보여줍니다.") - - # 각 데모 실행 - # demo_basic_usage() - # demo_excel_and_sql() - # demo_large_data() - # demo_performance_analysis() - # demo_numpy_arrays() - # demo_error_handling() - # demo_polars_integration() - # demo_snapshots() - # dataLake() - modelSnapshot() - # 파일 정리 - cleanup_demo_files() - - print("\n" + "=" * 50) - print("🎉 Atio 데모 완료!") - print("=" * 50) - -if __name__ == "__main__": - main()