-
Notifications
You must be signed in to change notification settings - Fork 2
Use lance for dependency management #533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer's GuideRefactors the Dependencies backend from a pandas DataFrame to an in‑memory PyArrow table with indexed lookups and adds Lance as the primary on-disk format (with CSV and Parquet compatibility), updating loading/saving, cache semantics, and tests accordingly. Sequence diagram for resolving dependencies with cache and backendsequenceDiagram
actor Client
participant API as api.dependencies
participant Deps as Dependencies
participant Backend as BackendInterface
Client->>API: dependencies(name, version, cache_root)
API->>API: resolve db_root
API->>Deps: deps = Dependencies()
API->>Deps: deps.load(db_root/DB.lance)
Deps-->>API: raise or return
alt Cached load succeeds
API-->>Client: return deps
else Cached load fails
API->>Backend: backend_interface = lookup_backend(name, version)
API->>Backend: exists(/name/DB.lance, version)
alt Lance exists
API->>Backend: get_file(/name/DB.lance, tmp_root/DB.lance, version)
API->>Deps: deps.load(tmp_root/DB.lance)
else Lance missing
API->>Backend: exists(/name/DB.parquet, version)
alt Parquet exists
API->>Backend: get_file(/name/DB.parquet, tmp_root/DB.parquet, version)
API->>Deps: deps.load(tmp_root/DB.parquet)
else Parquet missing
API->>Backend: get_archive(/name/DB.zip, tmp_root, version)
API->>Deps: deps.load(tmp_root/DB.csv)
end
end
API->>Deps: deps.save(db_root/DB.lance)
API-->>Client: return deps
end
Class diagram for updated Dependencies storage and operationsclassDiagram
class Dependencies {
- pa.Schema _schema
- pa.Table _table
- dict~str,int~ _file_index
+ Dependencies()
+ __call__() pd.DataFrame
+ __contains__(file str) bool
+ __eq__(other Dependencies) bool
+ __getitem__(file str) list
+ __len__() int
+ __str__() str
+ __getstate__() dict
+ __setstate__(state dict) None
+ archives() list~str~
+ attachments() list~str~
+ attachment_ids() list~str~
+ files() list~str~
+ media() list~str~
+ removed_media() list~str~
+ table_ids() list~str~
+ tables() list~str~
+ archive(file str) str
+ bit_depth(file str) int
+ channels(file str) int
+ checksum(file str) str
+ duration(file str) float
+ format(file str) str
+ removed(file str) bool
+ sampling_rate(file str) int
+ type(file str) int
+ version(file str) str
+ load(path str) None
+ save(path str) None
- _add_attachment(file str, archive str, checksum str, version str) None
- _add_media(values list~tuple~) None
- _add_meta(file str, checksum str, version str) None
- _column_loc(column str, file str, dtype type) any
- _rebuild_index() None
- _dataframe_to_table(df pd.DataFrame, file_column bool) pa.Table
- _table_to_dataframe(table pa.Table) pd.DataFrame
- _drop(files Sequence~str~) None
- _remove(file str) None
- _set_dtypes(df pd.DataFrame) pd.DataFrame
- _update_media(values list~tuple~) None
- _update_media_version(files list~str~, version str) None
}
class LanceFileReader {
+ LanceFileReader(path str)
+ read_all() LanceReaderResult
}
class LanceFileWriter {
+ LanceFileWriter(path str, schema pa.Schema)
+ write_batch(table pa.Table) None
+ __enter__() LanceFileWriter
+ __exit__(exc_type type, exc_val BaseException, exc_tb object) None
}
class BackendInterface {
+ join(root str, name str, file str) str
+ exists(path str, version str) bool
+ get_file(remote str, local str, version str, verbose bool) None
+ get_archive(remote str, local_root str, version str, verbose bool) None
}
Dependencies ..> pa.Schema
Dependencies ..> pa.Table
Dependencies ..> LanceFileReader
Dependencies ..> LanceFileWriter
Dependencies ..> BackendInterface
Flow diagram for dependency file format resolution and loadingflowchart TD
A_start["api.dependencies(name, version)"] --> B_has_cached
B_has_cached["Cached file path = db_root/DB.lance"] --> C_try_load_cached
C_try_load_cached["Dependencies.load(DB.lance)"] --> D_cached_ok{Loaded successfully?}
D_cached_ok -- Yes --> Z_return_cached["return deps"]
D_cached_ok -- No --> E_lookup_backend
E_lookup_backend["backend_interface = lookup_backend(name, version)"] --> F_download_deps
subgraph DownloadDependencies
F_download_deps["download_dependencies(backend_interface, name, version)"] --> G_try_lance
G_try_lance["remote = /name/DB.lance\nbackend_interface.exists(remote, version)"] --> H_lance_exists{Exists?}
H_lance_exists -- Yes --> I_get_lance["get_file(remote, local DB.lance)"] --> J_local_path_lance["local_deps_file = DB.lance"]
H_lance_exists -- No --> K_try_parquet
K_try_parquet["remote = /name/DB.parquet\nbackend_interface.exists(remote, version)"] --> L_parquet_exists{Exists?}
L_parquet_exists -- Yes --> M_get_parquet["get_file(remote, local DB.parquet)"] --> N_local_path_parquet["local_deps_file = DB.parquet"]
L_parquet_exists -- No --> O_fallback_legacy
O_fallback_legacy["remote = /name/DB.zip"] --> P_get_legacy["get_archive(remote, tmp_root)"] --> Q_local_path_legacy["local_deps_file = DB.csv (legacy)"]
end
J_local_path_lance --> R_load_downloaded
N_local_path_parquet --> R_load_downloaded
Q_local_path_legacy --> R_load_downloaded
R_load_downloaded["deps = Dependencies(); deps.load(local_deps_file)"] --> S_save_cache
S_save_cache["deps.save(db_root/DB.lance)"] --> T_return_downloaded["return deps"]
subgraph Dependencies.load
U_start_load["load(path)"] --> V_ext
V_ext["extension = file_extension(path)"] --> W_check_ext
W_check_ext{"ext in [csv, parquet, lance]?"} -- No --> X_error["raise ValueError"]
W_check_ext -- Yes --> Y_branch
Y_branch{Extension} -->|lance| Y1_lance["reader = LanceFileReader(path)\nresults = reader.read_all()\ntable = results.to_table()"]
Y_branch -->|csv| Y2_csv["table = csv.read_csv(path, schema=_schema)"]
Y_branch -->|parquet| Y3_parquet["table = parquet.read_table(path)"]
Y1_lance --> Z_set_table
Y2_csv --> Z_set_table
Y3_parquet --> Z_set_table
Z_set_table["self._table = table\nself._rebuild_index()"] --> AA_end_load["return None"]
end
subgraph Dependencies.save
AB_start_save["save(path)"] --> AC_choose_ext
AC_choose_ext{path suffix} -->|.csv| AD_save_csv
AC_choose_ext -->|.parquet| AE_save_parquet
AC_choose_ext -->|.lance| AF_save_lance
AD_save_csv["df = self()\ntable = _dataframe_to_table(df)\ncsv.write_csv(table, path)"] --> AG_end_save["return None"]
AE_save_parquet["df = self()\ntable = _dataframe_to_table(df, file_column=True)\nparquet.write_table(table, path)"] --> AG_end_save
AF_save_lance["if exists(path): os.remove(path)\nwith LanceFileWriter(path, schema=_schema) as writer:\n writer.write_batch(self._table)"] --> AG_end_save
end
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
The current implementation does only use |
Try the lance approach proposed in #517
Benchmark results
Summary by Sourcery
Store and manage database dependencies with an in-memory PyArrow table persisted as Lance files, deprecating the old pickle/parquet-centric cache and updating loading, publishing, and tests accordingly.
New Features:
Enhancements:
Build:
Tests: