From 8b88ecfc71b4059a7c674d3104c159fe79941da1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jens=20G=C3=A5semyr=20Magnus?= Date: Thu, 20 Jan 2022 20:28:30 +0100 Subject: [PATCH 1/3] Tagged Stores --- xun/functions/store/disk.py | 128 ++++++++++- xun/functions/store/layered.py | 11 +- xun/functions/store/memory.py | 27 ++- xun/functions/store/store.py | 387 +++++++++++++++++++++++++++++++++ xun/tests/helpers.py | 1 + xun/tests/test_stores.py | 11 - 6 files changed, 546 insertions(+), 19 deletions(-) diff --git a/xun/functions/store/disk.py b/xun/functions/store/disk.py index 6245f5b..e3192c6 100644 --- a/xun/functions/store/disk.py +++ b/xun/functions/store/disk.py @@ -1,8 +1,12 @@ from ... import serialization from .store import Store +from .store import TagDB from collections import namedtuple from pathlib import Path +import base64 import contextlib +import shutil +import sqlite3 import tempfile @@ -17,6 +21,7 @@ def __init__(self, dir, create_dirs=True): (self.dir / 'values').mkdir(parents=True, exist_ok=True) elif not self.dir.exists(): raise ValueError(f'Store Directory {str(self.dir)} does not exist') + self._tagdb = DiskTagDB(self, create_dirs) def paths(self, key, root=None): """ Key Paths @@ -61,12 +66,14 @@ def _load_value(self, key): return serialization.load(f) def _load_tags(self, key): - raise NotImplementedError + return self._tagdb.tags(key) def filter(self, *conditions): - raise NotImplementedError + return self._tagdb.query(*conditions) def _store(self, key, value, **tags): + self._tagdb.update(key, tags) + with tempfile.TemporaryDirectory(dir=self.dir) as tmpdir: tmpdir = Path(tmpdir) @@ -91,9 +98,126 @@ def remove(self, key): paths = self.paths(key) paths.key.unlink() paths.val.unlink() + self._tagdb.remove(key) def __getstate__(self): return self.dir def __setstate__(self, state): self.dir = state + self._tagdb = DiskTagDB(self) + + +class DiskTagDB(TagDB): + def __init__(self, store, create_dirs=True): + super().__init__(store) + self.dir = store.dir / 'db' + if create_dirs: + self.dir.mkdir(parents=True, exist_ok=True) + + def refresh(self): + reconciled = False + with contextlib.ExitStack() as stack: + stack.enter_context(self.savepoint()) + databases = {} + for f in self.dir.iterdir(): + checkpoint_hash = base64.urlsafe_b64decode(f.name.encode()) + if f.is_file() and not self.has_checkpoint(checkpoint_hash): + try: + uri = f'file:{str(f)}?mode=ro' + + # Connecting to the database opens a file handle. In + # the case of the file being deleted, we still have a + # handle to it and can read from it. + con = sqlite3.connect(uri, uri=True) + + # Attach our memory database to this connection + # imediately as this will fail with a DatabaseError if + # the database we just connected to is not a valid + # database. + con.execute( + f'ATTACH DATABASE \'{self.uri}\' AS _xun_main' + ) + + # Keep connections (and file handles) alive until we + # leave the context + ctx = contextlib.closing(con) + databases[f] = stack.enter_context(ctx) + except sqlite3.OperationalError: + pass # Database was deleted before we could connect + except sqlite3.DatabaseError: + pass # File is not a database + for con in databases.values(): + self.reconcile(con, read_from=None, write_to='_xun_main') + reconciled = True + + self.create_views() + if not reconciled: + return + checkpoint_name = self.checkpoint() + + if self.dump(checkpoint_name): + # Now that our changes have been reconciled and dumped. Delete any + # databases ours is comprising. + for db in databases: + print('unlinking', db) + db.unlink() + + def reconcile(self, con, read_from=None, write_to=None): + print('RECONCILING') + if read_from is None and write_to is None: + raise ValueError('Cannot read and write to main database') + + def latest_common_checkpoint(): + _, checkpoint = con.execute(''' + SELECT MAX(this.journal_id), this.hash + FROM _xun_main._xun_journal AS this + INNER JOIN _xun_journal as other + ON + this.hash = other.hash AND + this.journal_id = other.journal_id + ''').fetchone() + return checkpoint + + with self.savepoint(): + checkpoint = latest_common_checkpoint() + + diff_con_results, diff_con_tags = self.diff(checkpoint, con=con) + diff_mem_results, diff_mem_tags = self.diff(checkpoint) + + new_results = sorted( + diff_con_results + diff_mem_results, + key=lambda el: (el[3], el[1]) + ) + new_tags = sorted( + diff_con_tags + diff_mem_tags, + key=lambda el: (el[4], el[1]) + ) + + self.reset_to_checkpoint(checkpoint) + + self.mem.executemany(''' + INSERT INTO _xun_results_table + (result_id, callnode, timestamp, deleted) + VALUES + (?, ?, ?, ?) + ''', new_results) + self.mem.executemany(''' + INSERT INTO _xun_tags_table + (result_id, name, value, timestamp, deleted) + VALUES + (?, ?, ?, ?, ?) + ''', new_tags) + + def dump(self, name): + if self.mem.in_transaction: + raise RuntimeError('Database Busy') + if (self.dir / name).exists(): + return False + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + tmp = tmpdir / 'sql' + with contextlib.closing(sqlite3.connect(tmp)) as bck: + self.mem.backup(bck) + shutil.copy(tmp, self.dir / name) + return True diff --git a/xun/functions/store/layered.py b/xun/functions/store/layered.py index f9fabb2..2c255f3 100644 --- a/xun/functions/store/layered.py +++ b/xun/functions/store/layered.py @@ -54,10 +54,17 @@ def remove(self, callnode): del self._layers[0][callnode] def _load_tags(self, callnode): - raise NotImplementedError + for layer in self._layers: + if callnode in layer: + return layer._load_tags(callnode) + raise KeyError(repr(callnode)) def filter(self, *tag_conditions): - raise NotImplementedError + results = set() + for layer in reversed(self._layers): + res = layer.filter(*tag_conditions) + results.update(res) + return list(results) def __getstate__(self): return self._layers diff --git a/xun/functions/store/memory.py b/xun/functions/store/memory.py index 34f8b93..42c5b98 100644 --- a/xun/functions/store/memory.py +++ b/xun/functions/store/memory.py @@ -1,5 +1,6 @@ from ..errors import CopyError from .store import Store +from .store import TagDB class Memory(Store): @@ -10,8 +11,23 @@ class Memory(Store): may make them incompatible with multiprocessing drivers. """ + class MemoryTagDB(TagDB): + """ + Memory Stores don't persist state, so these methods can be ignored + """ + + def refresh(self): + pass + + def checkpoint(self): + pass + + def dump(self, name): + pass + def __init__(self): self._container = {} + self._tagdb = self.MemoryTagDB(self) def __contains__(self, callnode): return callnode in self._container @@ -20,16 +36,19 @@ def _load_value(self, callnode): value = self._container[callnode] return value - def remove(self, callnode): - del self._store[callnode] def _store(self, callnode, value, **tags): self._container[callnode] = value + self._tagdb.update(callnode, tags) + + def remove(self, callnode): + del self._container[callnode] + self._tagdb.remove(callnode) def _load_tags(self, callnode): - raise NotImplementedError + return self._tagdb.tags(callnode) def filter(self, *tag_conditions): - raise NotImplementedError + return self._tagdb.query(*tag_conditions) def __copy__(self): raise CopyError('Cannot copy in-memory store') diff --git a/xun/functions/store/store.py b/xun/functions/store/store.py index 68245d0..9c0f958 100644 --- a/xun/functions/store/store.py +++ b/xun/functions/store/store.py @@ -1,6 +1,12 @@ from ... import serialization from ...fs.queries import parse from abc import ABC, abstractmethod +from uuid import uuid4 +import base64 +import contextlib +import hashlib +import sqlite3 +import struct def restructure(data, shape): @@ -179,6 +185,8 @@ def __init__(self, store): self.store = store def __getitem__(self, key): + if key not in self.store: + raise KeyError(repr(key)) return self.store._load_tags(key) def __getattr__(self, name): @@ -253,3 +261,382 @@ def __str__(self): return f'{self.tag}{self.op}{self.value}' else: return self.tag + + +class TagDB: + @abstractmethod + def refresh(self): + pass + + @abstractmethod + def dump(self, name): + pass + + def __init__(self, store): + self.uri = f'file:{id(store)}?mode=memory&cache=shared' + self.mem = sqlite3.connect(self.uri, + uri=True, + isolation_level=None, + check_same_thread=False) + self.mem.executescript(''' + -- PRIVATE + ---------- + + CREATE TABLE _xun_results_table ( + journal_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + result_id BLOB NOT NULL, + callnode TEXT NOT NULL, + timestamp TEXT NOT NULL, + deleted INTEGER NOT NULL + ); + + CREATE TABLE _xun_tags_table ( + journal_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + result_id BLOB NOT NULL, + name TEXT NOT NULL, + value TEXT NOT NULL, + timestamp TEXT NOT NULL, + deleted INTEGER NOT NULL, + FOREIGN KEY (result_id) REFERENCES _xun_results(result_id) + ); + + CREATE TABLE _xun_journal ( + journal_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + hash BLOB NOT NULL UNIQUE, + result_journal_id INTEGER NOT NULL, + tag_journal_id INTEGER NOT NULL + ); + INSERT INTO _xun_journal(hash, result_journal_id, tag_journal_id) + VALUES (X'', -1, -1); + + -- RESULTS INTERFACE + -------------------- + + CREATE VIEW _xun_results(journal_id, result_id, callnode) AS + SELECT + MAX(journal_id), result_id, callnode + FROM + _xun_results_table + GROUP BY + result_id, + callnode -- ? -- + HAVING + NOT deleted + ORDER BY journal_id; + + CREATE TRIGGER _xun_results_insert + INSTEAD OF INSERT ON _xun_results + BEGIN + INSERT INTO _xun_results_table + (result_id, callnode, timestamp, deleted) + VALUES + (NEW.result_id, NEW.callnode, datetime('now'), 0); + END; + + -- TAGS INTERFACE + ----------------- + + CREATE VIEW _xun_tags(journal_id, result_id, name, value) AS + SELECT + MAX(journal_id), result_id, name, value + FROM + _xun_tags_table + GROUP BY + result_id, + name + HAVING + NOT deleted + ORDER BY journal_id; + + CREATE TRIGGER _xun_tags_insert + INSTEAD OF INSERT ON _xun_tags + BEGIN + INSERT INTO _xun_tags_table + (result_id, name, value, timestamp, deleted) + VALUES + (NEW.result_id, NEW.name, NEW.value, datetime('now'), 0); + END; + + CREATE TRIGGER _xun_tags_delete + INSTEAD OF DELETE ON _xun_tags + BEGIN + INSERT INTO _xun_tags_table + (result_id, name, value, timestamp, deleted) + VALUES + (OLD.result_id, OLD.name, OLD.value, datetime('now'), 1); + END; + + -- DB INTERFACE + --------------- + + CREATE TRIGGER _xun_delete INSTEAD OF DELETE ON _xun_results + FOR EACH ROW + BEGIN + -- Mark row as deleted in _xun_results_table + INSERT INTO _xun_results_table + (result_id, callnode, timestamp, deleted) + VALUES + (OLD.result_id, OLD.callnode, datetime('now'), 1); + + -- Mark row as deleted in _xun_tags_table + DELETE FROM _xun_tags WHERE result_id = OLD.result_id; + END; + ''') + + def __del__(self): + self.mem.close() + + def tags(self, callnode): + self.refresh() + result_id = callnode.sha256(encode=False) + result = self.mem.execute(''' + SELECT name, value FROM _xun_tags + WHERE result_id = :result_id + ''', {'result_id': result_id}) + return dict(result) + + def query(self, *conditions): + self.refresh() + joins = ' '.join( + f'INNER JOIN [{c.tag}] ON [{c.tag}].result_id = ' + '_xun_results.result_id' + for c in conditions + ) + where = ' AND '.join( + f'[{c.tag}] {c.op} :{c.tag}' + for c in conditions + if c.op is not None + ) + if where: + where = 'WHERE ' + where + result = self.mem.execute( + 'SELECT _xun_results.callnode ' + f'from _xun_results {joins} {where}', + {c.tag: c.value for c in conditions} + ) + return [serialization.loads(r) for r, *_ in result if r is not None] + + def update(self, callnode, tags): + print(f'writing {callnode}::{callnode.sha256()} {tags}') + prev_tags = self.tags(callnode) + print(prev_tags) + if prev_tags == tags: + # Don't need a savepoint if we're not altering anything + print(f'nothing {callnode}::{callnode.sha256()}') + return + + with self.savepoint(): + if prev_tags: + self.remove(callnode) + result_id = callnode.sha256(encode=False) + serialized = serialization.dumps(callnode) + self.mem.execute(''' + INSERT INTO _xun_results(result_id, callnode) + VALUES(:result_id, :serialized) + ''', { + 'result_id': result_id, + 'serialized': serialized, + }) + for tag, tag_value in tags.items(): + if '[' in tag or ']' in tag: + raise ValueError('tag name cannot contain "[" or "]"') + self.mem.execute(''' + INSERT INTO _xun_tags(result_id, name, value) + VALUES(:result_id, :tag, :tag_value) + ''', { + 'result_id': result_id, + 'tag': tag, + 'tag_value': tag_value, + }) + self.create_views(tag) + checkpoint_name = self.checkpoint() + self.dump(checkpoint_name) + + def create_views(self, *tags): + with self.savepoint(): + if not tags: + tags = [ + tag for tag, in + self.mem.execute('SELECT DISTINCT name FROM _xun_tags') + ] + + for tag in tags: + self.mem.execute(f''' + CREATE INDEX IF NOT EXISTS [_xun_tag_index_{tag}] + ON _xun_tags_table(result_id) + WHERE name = {self.sql_literal(tag)} AND NOT deleted + ''') + self.mem.execute( #nosec + f''' + CREATE VIEW IF NOT EXISTS [{tag}](result_id, [{tag}]) AS + SELECT + result_id, value + FROM + _xun_tags + WHERE + name = {self.sql_literal(tag)} + ''') + + def unique_tags(self): + self.refresh() + result_id = callnode.sha256(encode=False) + result = self.mem.execute(''' + SELECT name, value FROM _xun_tags + WHERE result_id = :result_id + ''', {'result_id': result_id}) + return dict(result) + + def remove(self, callnode): + result_id = callnode.sha256(encode=False) + with self.savepoint(): + self.mem.execute('DELETE FROM _xun_results WHERE result_id = ?', + (result_id,)) + + def checkpoint(self): + with self.savepoint(): + max_result_id, = self.mem.execute( + 'SELECT MAX(journal_id) FROM _xun_results_table').fetchone() + max_tag_id, = self.mem.execute( + 'SELECT MAX(journal_id) FROM _xun_tags_table').fetchone() + if max_result_id is None or max_tag_id is None: + return + + _, checkpoint = self.mem.execute(''' + SELECT MAX(journal_id), hash FROM _xun_journal + ''').fetchone() + diff_results, diff_tags = self.diff(checkpoint) + sha256 = self.sha256(checkpoint, *diff_results, *diff_tags) + + self.mem.execute(''' + INSERT INTO _xun_journal + (hash, result_journal_id, tag_journal_id) + VALUES (?, ?, ?) + ''', (sha256, max_result_id, max_tag_id)) + checkpoint_name = base64.urlsafe_b64encode(sha256).decode() + return checkpoint_name + + def has_checkpoint(self, checkpoint): + result = list(self.mem.execute( + 'SELECT * FROM _xun_journal WHERE hash = ?', + (checkpoint,), + )) + return len(result) > 0 + + def diff(self, checkpoint, con=None): + if con is None: + con = self.mem + result_journal_id, tag_journal_id = con.execute(''' + SELECT + result_journal_id, tag_journal_id + FROM + _xun_journal + WHERE + hash = :checkpoint + ''', { + 'checkpoint': checkpoint, + }).fetchone() + + results = list(con.execute(''' + SELECT + result_id, callnode, timestamp, deleted + FROM + _xun_results_table + WHERE + journal_id > :journal_id + ORDER BY + timestamp, result_id + ''', { + 'journal_id': result_journal_id, + })) + + tags = list(con.execute(''' + SELECT + result_id, name, value, timestamp, deleted + FROM + _xun_tags_table + WHERE + journal_id > :journal_id + ORDER BY + timestamp, result_id + ''', { + 'journal_id': tag_journal_id, + })) + + return results, tags + + def reset_to_checkpoint(self, checkpoint): + with self.savepoint(): + result_journal_id, tag_journal_id = self.mem.execute(''' + SELECT + result_journal_id, tag_journal_id + FROM + _xun_journal + WHERE + hash = :checkpoint + ''', { + 'checkpoint': checkpoint, + }).fetchone() + + # Delete any rows older than the checkpoint and reset the + # autoincrement in SQLITE_SEQUENCE. This will cause any new row + # inserted to start incrementing from this checkpoint. + + self.mem.execute(''' + DELETE FROM _xun_results_table WHERE journal_id > :id + ''', { + 'id': result_journal_id, + }) + self.mem.execute(''' + UPDATE SQLITE_SEQUENCE SET SEQ=:new_journal_id + WHERE NAME='_xun_results_table' + ''', { + 'new_journal_id': result_journal_id, + }) + + self.mem.execute(''' + DELETE FROM _xun_tags_table WHERE journal_id > :id + ''', { + 'id': tag_journal_id, + }) + self.mem.execute(''' + UPDATE SQLITE_SEQUENCE SET SEQ=:new_journal_id + WHERE NAME='_xun_tags_table' + ''', { + 'new_journal_id': tag_journal_id, + }) + + def sql_literal(self, value): + return self.mem.execute('SELECT QUOTE(?)', (value,)).fetchone()[0] + + def sha256(self, checkpoint, *rows): + def coerce(obj): + if isinstance(obj, bytes): # BLOB type + return obj + elif isinstance(obj, str): # TEXT type + return obj.encode() + elif isinstance(obj, type(None)): # NULL type + return b'\x00' + elif isinstance(obj, int): # INTEGER type + return struct.pack('q', obj) + elif isinstance(obj, float): # REAL type + return struct.pack('d', obj) + else: + raise ValueError(f'cound not coerce obj {obj}') + + sha256 = hashlib.sha256(checkpoint) + for row in rows: + for el in row: + sha256.update(coerce(el)) + return sha256.digest() + + @contextlib.contextmanager + def savepoint(self): + try: + savepoint_name = str(uuid4()) + self.mem.execute(f'SAVEPOINT [{savepoint_name}]') + yield + except: + self.mem.execute(f'ROLLBACK TO [{savepoint_name}]') + raise + finally: + self.mem.execute(f'RELEASE [{savepoint_name}]') diff --git a/xun/tests/helpers.py b/xun/tests/helpers.py index caafe4e..ff5e60e 100644 --- a/xun/tests/helpers.py +++ b/xun/tests/helpers.py @@ -69,6 +69,7 @@ def __getstate__(self): def __setstate__(self, state): self.id = state self._container = PicklableMemoryStore._cached_stores[self.id]._container + self._tagdb = PicklableMemoryStore._cached_stores[self.id]._tagdb def __enter__(self): PicklableMemoryStore._cached_stores.setdefault(self.id, self) diff --git a/xun/tests/test_stores.py b/xun/tests/test_stores.py index 7ac3c6e..ec21a1b 100644 --- a/xun/tests/test_stores.py +++ b/xun/tests/test_stores.py @@ -132,7 +132,6 @@ def test_store_implementation(cls, contents): assert store[key] == value -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) @settings(phases=[Phase.generate, Phase.target, Phase.explain], deadline=500, @@ -148,7 +147,6 @@ def test_store_tags(cls, contents): assert store.tags[key] == tags -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) @pytest.mark.parametrize('op', [operator.eq, operator.gt, @@ -168,7 +166,6 @@ def test_store_select_operator(cls, op): assert result == expected -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) def test_store_select_shape(cls): with create_instance(cls) as (store, callnodes): @@ -209,7 +206,6 @@ def test_store_select_shape(cls): } -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) def test_store_query(cls): with create_instance(cls) as (store, callnodes): @@ -225,7 +221,6 @@ def test_store_query(cls): } -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) def test_store_query_argument(cls): with create_instance(cls) as (store, callnodes): @@ -249,7 +244,6 @@ def test_store_query_argument(cls): } -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) def test_store_query_advanced(cls): with create_instance(cls) as (store, callnodes): @@ -290,7 +284,6 @@ def test_store_query_advanced(cls): } -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores - {Layered}) def test_store_remove(cls): with create_instance(cls) as (store, callnodes): @@ -313,7 +306,6 @@ def test_store_remove(cls): assert tags == [(0,), (0,), (0,), (1,), (1,), (1,)] -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores - {Layered}) def test_store_missing_tags_raise(cls): with create_instance(cls) as (store, callnodes): @@ -322,7 +314,6 @@ def test_store_missing_tags_raise(cls): store.tags[callnodes.f_0] -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) def test_store_rewrite_tags(cls): with create_instance(cls) as (store, callnodes): @@ -331,7 +322,6 @@ def test_store_rewrite_tags(cls): assert store.tags[callnodes.f_1] == new_tags -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores - {Layered}) def test_tags_are_not_duplicated_on_double_write(cls): with create_instance(cls) as (store, callnodes): @@ -364,7 +354,6 @@ def test_tags_are_not_duplicated_on_double_write(cls): assert tags == tags_after -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores - {Memory, Layered}) def test_store_must_be_picklable(cls): with create_instance(cls) as (store, callnodes): From 7cf62e5b18e166b007bff26f962f5280e4bba3f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jens=20G=C3=A5semyr=20Magnus?= Date: Wed, 26 Jan 2022 14:46:18 +0100 Subject: [PATCH 2/3] xun-tagging --- requirements.txt | 1 + xun/__init__.py | 4 -- xun/core.py | 55 --------------------------- xun/functions/driver/dask.py | 21 ++++++++--- xun/functions/driver/driver.py | 27 ++++++++++++-- xun/functions/driver/sequential.py | 4 +- xun/functions/function.py | 24 ++++++------ xun/functions/function_image.py | 13 +++++-- xun/functions/store/store.py | 7 ++-- xun/functions/transformations.py | 2 + xun/tests/test_functions.py | 6 +-- xun/tests/test_xun.py | 60 ++++++++++++++++++++++++++++++ 12 files changed, 133 insertions(+), 91 deletions(-) delete mode 100644 xun/core.py diff --git a/requirements.txt b/requirements.txt index 10dd0cd..ea89ed1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,5 @@ numpy pandas paramiko pyparsing +pytz yapf diff --git a/xun/__init__.py b/xun/__init__.py index 10068a4..7a57b3d 100644 --- a/xun/__init__.py +++ b/xun/__init__.py @@ -7,10 +7,6 @@ from . import fs from . import serialization -from .core import ExportError -from .core import SchemaError -from .core import args_hash -from .core import filename_from_args from .functions import ComputeError from .functions import CopyError from .functions import Function diff --git a/xun/core.py b/xun/core.py deleted file mode 100644 index 0fdd586..0000000 --- a/xun/core.py +++ /dev/null @@ -1,55 +0,0 @@ -import hashlib - - -class ExportError(Exception): pass -class SchemaError(Exception): pass - - -def args_hash(args): - """Hash command line arguments - - Converts full command line arguments to a hash to use as filename - - Parameters - ---------- - args : argparse.Namespace - The argument namespace as parsed by argparse - - Returns - ------- - str - Hash digest value as a string of hexadecimal digits - """ - args = vars(args) - strings = sorted('{}={}'.format(k, v) for k, v in args.items()) - param_str = ','.join(strings) - - return hashlib.sha256(param_str.encode('utf-8')).hexdigest() - - -def filename_from_args(args, prefix='', postfix=''): - """Filename from command line arguments - - Parameters - ---------- - args : argparse.Namespace - The argument namespace as parsed by argparse - prefix : str, optional - Prefix of the file name - postfix : str, optional - Postfix of the file name - - Returns - ------- - str - File name - - Examples - -------- - - >>> filename_from_args(args, prefix='prefix', postfix='.txt') - 'prefixe3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.txt' - - """ - hash = args_hash(args) - return prefix + hash + postfix diff --git a/xun/functions/driver/dask.py b/xun/functions/driver/dask.py index 7821e34..88f6849 100644 --- a/xun/functions/driver/dask.py +++ b/xun/functions/driver/dask.py @@ -29,15 +29,17 @@ def _exec(self, scheduler = DaskSchedule(self.client, function_images, store, - global_resources) + global_resources, + entry_call, + self.timestamp()) return scheduler(entry_call, graph) -def compute_proxy(store, func): +def compute_proxy(store, func, tags): """Dask 'handles' functools.partial so that we can't use it. Create a new function instead.""" def λ(node): - Driver.compute_and_store(node, func, store) + Driver.compute_and_store(node, func, store, tags) return node functools.update_wrapper(λ, func) return λ @@ -55,10 +57,14 @@ def __init__(self, client, function_images, store, - global_resources): + global_resources, + entry_call, + start_time): self.client = client self.function_images = function_images self.store = store + self.entry_call = entry_call + self.start_time = start_time self.errored = False self.futures = {} self.semaphores = { @@ -108,6 +114,7 @@ async def visit(self, node, atomic_graph, queue): else: async with contextlib.AsyncExitStack() as stack: func_img = self.function_images[node.function_name] + callable = func_img['callable'] semaphores = [ stack.enter_async_context(self.semaphores[res]) @@ -119,7 +126,11 @@ async def visit(self, node, atomic_graph, queue): await asyncio.gather(*semaphores) logger.info(f'Submitting {node}') - func = compute_proxy(self.store, func_img['callable']) + tags = Driver.create_tags(callable, + self.entry_call, + node, + self.start_time) + func = compute_proxy(self.store, callable, tags) kwargs = {} for res, value in (func_img['worker_resources'].items()): kwargs.setdefault('resources', {})[res] = value diff --git a/xun/functions/driver/driver.py b/xun/functions/driver/driver.py index 371cd2c..1836a9a 100644 --- a/xun/functions/driver/driver.py +++ b/xun/functions/driver/driver.py @@ -4,7 +4,10 @@ from abc import abstractmethod from copy import deepcopy import contextvars +import datetime +import getpass import logging +import pytz logger = logging.getLogger(__name__) @@ -66,7 +69,25 @@ def value_computed(callnode, store): return callnode in store @staticmethod - def compute_and_store(callnode, func, store): + def timestamp(): + return datetime.datetime.utcnow().astimezone(pytz.utc).isoformat() + + @classmethod + def create_tags(cls, func, entry_call, callnode, start_time): + return { + 'created_by': getpass.getuser(), + 'entry_point': entry_call.function_name, + 'function_name': func.__name__, + 'start_time': start_time, + 'timestamp': cls.timestamp(), + **{ + k: v.format(*callnode.args, *callnode.kwargs.values()) + for k, v in func.tags.items() + } + } + + @staticmethod + def compute_and_store(callnode, func, store, tags): cached_store = store.cached() results = func(*callnode.args, **callnode.kwargs) results.send(None) @@ -77,7 +98,7 @@ def compute_and_store(callnode, func, store): if func.can_write_to(result_call): logger.debug(f'Storing result for {result_call} ' f'(interface of {callnode})') - store.store(result_call, result) + store.store(result_call, result, **tags) else: msg = (f'Call {callnode} attempted to write an interface ' f'[{result_call.function_name}' @@ -87,7 +108,7 @@ def compute_and_store(callnode, func, store): raise XunInterfaceError(msg) except StopIteration as result: logger.debug(f'Storing result for {callnode}') - store.store(callnode, result.value) + store.store(callnode, result.value, **tags) return result.value except Exception as e: raise e from func.Raise() diff --git a/xun/functions/driver/sequential.py b/xun/functions/driver/sequential.py index b869c7b..1cd8c5a 100644 --- a/xun/functions/driver/sequential.py +++ b/xun/functions/driver/sequential.py @@ -21,6 +21,7 @@ def _exec(self, assert nx.is_directed_acyclic_graph(graph) schedule = list(nx.topological_sort(graph)) + start_time = self.timestamp() for node in schedule: if not isinstance(node, CallNode): @@ -36,7 +37,8 @@ def _exec(self, logger.info('Running {}'.format(node)) try: - self.compute_and_store(node, func, store) + tags = self.create_tags(func, entry_call, node, start_time) + self.compute_and_store(node, func, store, tags) except Exception as e: logger.error( '{} failed with {}'.format(node, str(e)) diff --git a/xun/functions/function.py b/xun/functions/function.py index a63bd1f..0213a12 100644 --- a/xun/functions/function.py +++ b/xun/functions/function.py @@ -258,11 +258,11 @@ class Function(AbstractFunction): function : Function decorator to create xun functions """ - def __init__(self, desc, dependencies, max_parallel): + def __init__(self, desc, dependencies, tags): self.desc = desc self._dependencies = dependencies self.interfaces = {} - self.max_parallel = max_parallel + self.tags = tags self._global_resources = {} self._worker_resources = {} self._hash = self.sha256() @@ -304,7 +304,7 @@ def globals(self): } @staticmethod - def from_function(func, max_parallel=None): + def from_function(func, **tags): """From Function Creates a xun function from a python function @@ -313,18 +313,14 @@ def from_function(func, max_parallel=None): ---------- func : python function The function definition to create the xun function from - max_parallel : int - The maximum parallel jobs allowed for this function + **tags : Dict[str, Callable[*args, **kwargs, str]] + Custom tag functions Returns ------- Function The `Function` representation of the given function """ - if max_parallel is not None: - msg = 'Limiting parallel execution not yet implemented' - raise NotImplementedError(msg) - desc = describe(func) dependencies = { g.name: g @@ -332,7 +328,7 @@ def from_function(func, max_parallel=None): if isinstance(g, AbstractFunction) } - f = Function(desc, dependencies, max_parallel) + f = Function(desc, dependencies, tags) # Add f to it's dependencies, to allow recursive dependencies f.dependencies[f.name] = f @@ -381,6 +377,7 @@ def callable(self): yields, globals=self.globals, hash=self.hash, + tags=self.tags, original_source_code=self.code.source, interface_hashes=frozenset( i.hash for i in self.interfaces.values() @@ -395,7 +392,7 @@ def interface(self, func): return interface -def function(max_parallel=None): +def function(**tags): """xun.function Function decorator used to create xun functions from python functions @@ -431,7 +428,7 @@ def function(max_parallel=None): xun function created from the decorated function """ def decorator(func): - return Function.from_function(func, max_parallel) + return Function.from_function(func, **tags) return decorator @@ -545,5 +542,6 @@ def callable(self): self._callable = xform.assemble(self.desc, interface, globals=self.globals, - hash=self.hash) + hash=self.hash, + tags=self.target.tags) return self._callable diff --git a/xun/functions/function_image.py b/xun/functions/function_image.py index 395abb3..74943fb 100644 --- a/xun/functions/function_image.py +++ b/xun/functions/function_image.py @@ -81,6 +81,7 @@ def __init__(self, annotations, module_name, globals, + tags, referenced_modules, original_source_code=None, interface_hashes=frozenset(), @@ -92,6 +93,7 @@ def __init__(self, self.__annotations__ = annotations self.__module__ = module_name self.globals = globals + self.tags = tags self.referenced_modules = referenced_modules self.hash = hash self.original_source_code = original_source_code @@ -145,6 +147,7 @@ def from_description(desc, hash=None): desc.annotations, desc.module, desc.globals, + None, desc.referenced_modules, hash=hash, ) @@ -237,6 +240,7 @@ def __getstate__(self): self.__annotations__, self.__module__, self.globals, + self.tags, self.referenced_modules, self.original_source_code, self.interface_hashes, @@ -255,10 +259,11 @@ def __setstate__(self, state): self.__annotations__ = state[4] self.__module__ = state[5] self.globals = state[6] - self.referenced_modules = state[7] - self.original_source_code = state[8] - self.interface_hashes = state[9] - self.hash = state[10] + self.tags = state[7] + self.referenced_modules = state[8] + self.original_source_code = state[9] + self.interface_hashes = state[10] + self.hash = state[11] self._func = None def __repr__(self): diff --git a/xun/functions/store/store.py b/xun/functions/store/store.py index 9c0f958..d25e7cd 100644 --- a/xun/functions/store/store.py +++ b/xun/functions/store/store.py @@ -7,6 +7,7 @@ import hashlib import sqlite3 import struct +import threading def restructure(data, shape): @@ -274,6 +275,7 @@ def dump(self, name): def __init__(self, store): self.uri = f'file:{id(store)}?mode=memory&cache=shared' + self._lock = threading.RLock() self.mem = sqlite3.connect(self.uri, uri=True, isolation_level=None, @@ -417,12 +419,9 @@ def query(self, *conditions): return [serialization.loads(r) for r, *_ in result if r is not None] def update(self, callnode, tags): - print(f'writing {callnode}::{callnode.sha256()} {tags}') prev_tags = self.tags(callnode) - print(prev_tags) if prev_tags == tags: # Don't need a savepoint if we're not altering anything - print(f'nothing {callnode}::{callnode.sha256()}') return with self.savepoint(): @@ -632,6 +631,7 @@ def coerce(obj): @contextlib.contextmanager def savepoint(self): try: + self._lock.acquire() savepoint_name = str(uuid4()) self.mem.execute(f'SAVEPOINT [{savepoint_name}]') yield @@ -640,3 +640,4 @@ def savepoint(self): raise finally: self.mem.execute(f'RELEASE [{savepoint_name}]') + self._lock.release() diff --git a/xun/functions/transformations.py b/xun/functions/transformations.py index a123793..4e19a6f 100644 --- a/xun/functions/transformations.py +++ b/xun/functions/transformations.py @@ -35,6 +35,7 @@ def assemble(desc, *nodes, globals=None, hash=None, + tags=None, original_source_code=None, interface_hashes=frozenset()): """Assemble serializable `FunctionImage` representation @@ -88,6 +89,7 @@ def assemble(desc, desc.annotations, desc.module, globals if globals is not None else desc.globals, + tags, desc.referenced_modules, original_source_code=original_source_code, interface_hashes=interface_hashes, diff --git a/xun/tests/test_functions.py b/xun/tests/test_functions.py index 3d1b1b6..6fc4e63 100644 --- a/xun/tests/test_functions.py +++ b/xun/tests/test_functions.py @@ -528,7 +528,7 @@ def f(): f1 = f - w1 = xun.functions.Function(workflow.desc, {'f': f1}, None) + w1 = xun.functions.Function(workflow.desc, {'f': f1}, {}) assert driver.value_computed(f0.callnode(), store) assert driver.value_computed(w0.callnode(), store) @@ -654,11 +654,11 @@ def script(): dependencies = script.dependencies dependencies['g'] = before_edit - script.__init__(script.desc, dependencies, script.max_parallel) + script.__init__(script.desc, dependencies, script.tags) assert '{name}' == script.blueprint().run(driver=driver, store=store) dependencies['g'] = after_edit - script.__init__(script.desc, dependencies, script.max_parallel) + script.__init__(script.desc, dependencies, script.tags) assert 'World' == script.blueprint().run(driver=driver, store=store) diff --git a/xun/tests/test_xun.py b/xun/tests/test_xun.py index e69de29..3386558 100644 --- a/xun/tests/test_xun.py +++ b/xun/tests/test_xun.py @@ -0,0 +1,60 @@ +from .helpers import run_in_process +import xun + + +def test_xun_result_references(): + @xun.function() + def return_reference(): + data = b'hello world!\n' + return xun.Reference(data) + + @xun.function() + def use_reference(): + with ...: + ref = return_reference() + assert ref.value == b'hello world!\n' + + run_in_process(use_reference.blueprint()) + + +def test_tagged_stores(): + driver = xun.functions.driver.Sequential() + store = xun.functions.store.Memory() + + @xun.function(custom_tag='hello {1}!') + def f(a, b): + pass + + @xun.function() + def main(): + with ...: + f(1, 'argument') + + main.blueprint().run(driver=driver, store=store) + + main_tags = store.tags[main.callnode()] + f_tags = store.tags[f.callnode(1, 'argument')] + + assert set(main_tags.keys()) == { + 'created_by', + 'entry_point', + 'function_name', + 'start_time', + 'timestamp', + } + + assert set(f_tags.keys()) == { + 'created_by', + 'custom_tag', + 'entry_point', + 'function_name', + 'start_time', + 'timestamp', + } + + assert main_tags['entry_point'] == 'main' + assert main_tags['function_name'] == 'main' + + assert f_tags['entry_point'] == 'main' + assert f_tags['function_name'] == 'f' + assert f_tags['custom_tag'] == 'hello argument!' From 0332c0c5f5b739f8f96058381a77099da67a99e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jens=20G=C3=A5semyr=20Magnus?= Date: Wed, 19 Jan 2022 15:13:18 +0100 Subject: [PATCH 3/3] File system --- xun/cli.py | 33 +++ xun/fs/__init__.py | 9 + xun/fs/cli.py | 45 +++ xun/fs/filesystem.py | 280 ++++++++++++++++++ xun/functions/function.py | 8 +- xun/functions/store/disk.py | 9 +- xun/functions/store/layered.py | 8 + xun/functions/store/memory.py | 6 + xun/functions/store/store.py | 10 + xun/tests/test_data/query.xunql | 8 + ...mzmKz-kQjB5eoTjgql7gwKyB4NQMVlAPe9HiuontU= | Bin 0 -> 163840 bytes ...Q8VBqqPhvWx8UxdGXAQ5fnT2DHc6qpzFHC0DhxrAs= | Bin 0 -> 94208 bytes ...P_NmX6ZFh21uwALyci8NDMoo1JxTQGiTVsUzhYlwc= | 7 + ...hmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng= | 7 + ...KWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM= | 7 + ...IGpJ5CMsMuBUlpwrapOWBLi_0wnbdPdSh1H1iZbEY= | 7 + ...61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg= | 7 + ...asCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4= | 7 + ...8_BA-3noPrHY43vfgbYIG5QUz-8UmdAZ0PufjuFAg= | 7 + ...VEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc= | 7 + ...P_NmX6ZFh21uwALyci8NDMoo1JxTQGiTVsUzhYlwc= | 2 + ...hmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng= | 2 + ...KWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM= | 2 + ...IGpJ5CMsMuBUlpwrapOWBLi_0wnbdPdSh1H1iZbEY= | 2 + ...61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg= | 2 + ...asCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4= | 2 + ...8_BA-3noPrHY43vfgbYIG5QUz-8UmdAZ0PufjuFAg= | 2 + ...VEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc= | 2 + xun/tests/test_filesystem.py | 165 +++++++++++ 29 files changed, 650 insertions(+), 3 deletions(-) create mode 100644 xun/fs/cli.py create mode 100644 xun/fs/filesystem.py create mode 100644 xun/tests/test_data/query.xunql create mode 100644 xun/tests/test_data/xun-fs-store/db/1ZmzmKz-kQjB5eoTjgql7gwKyB4NQMVlAPe9HiuontU= create mode 100644 xun/tests/test_data/xun-fs-store/db/FLQ8VBqqPhvWx8UxdGXAQ5fnT2DHc6qpzFHC0DhxrAs= create mode 100644 xun/tests/test_data/xun-fs-store/keys/-aP_NmX6ZFh21uwALyci8NDMoo1JxTQGiTVsUzhYlwc= create mode 100644 xun/tests/test_data/xun-fs-store/keys/FyhmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng= create mode 100644 xun/tests/test_data/xun-fs-store/keys/KtKWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM= create mode 100644 xun/tests/test_data/xun-fs-store/keys/XDIGpJ5CMsMuBUlpwrapOWBLi_0wnbdPdSh1H1iZbEY= create mode 100644 xun/tests/test_data/xun-fs-store/keys/i-61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg= create mode 100644 xun/tests/test_data/xun-fs-store/keys/mrasCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4= create mode 100644 xun/tests/test_data/xun-fs-store/keys/sH8_BA-3noPrHY43vfgbYIG5QUz-8UmdAZ0PufjuFAg= create mode 100644 xun/tests/test_data/xun-fs-store/keys/uJVEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc= create mode 100644 xun/tests/test_data/xun-fs-store/values/-aP_NmX6ZFh21uwALyci8NDMoo1JxTQGiTVsUzhYlwc= create mode 100644 xun/tests/test_data/xun-fs-store/values/FyhmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng= create mode 100644 xun/tests/test_data/xun-fs-store/values/KtKWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM= create mode 100644 xun/tests/test_data/xun-fs-store/values/XDIGpJ5CMsMuBUlpwrapOWBLi_0wnbdPdSh1H1iZbEY= create mode 100644 xun/tests/test_data/xun-fs-store/values/i-61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg= create mode 100644 xun/tests/test_data/xun-fs-store/values/mrasCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4= create mode 100644 xun/tests/test_data/xun-fs-store/values/sH8_BA-3noPrHY43vfgbYIG5QUz-8UmdAZ0PufjuFAg= create mode 100644 xun/tests/test_data/xun-fs-store/values/uJVEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc= create mode 100644 xun/tests/test_filesystem.py diff --git a/xun/cli.py b/xun/cli.py index 1f244de..453490b 100644 --- a/xun/cli.py +++ b/xun/cli.py @@ -15,6 +15,7 @@ Also see (1) from http://click.pocoo.org/5/setuptools/#setuptools-integration """ from pathlib import Path +from textwrap import dedent import argparse import sys @@ -53,6 +54,38 @@ def main(args=None): parser_fgraph_action.add_argument('--dot', action='store_true') +# +# XunFS +# +try: + from .fs import cli as fs + parser_mnt = subparsers.add_parser('mount', description=dedent('''\ + arguments after a bare -- are forwarded to fuse as is. The fuse + argument for mountpoint is required. + + Example + ------- + xun mount -s disk /path/to/store -q '() => ...' -- /mnt/pnt + '''), formatter_class=argparse.RawTextHelpFormatter) + parser_mnt.set_defaults(func=fs.main) + parser_mnt_store = parser_mnt.add_mutually_exclusive_group(required=True) + parser_mnt_store.add_argument('-s', '--store', + nargs='+', + action=fs.StoreAction) + parser_mnt_store.add_argument('--store-pickle', + type=fs.store_pickle, + dest='store') + parser_mnt_query = parser_mnt.add_mutually_exclusive_group(required=True) + parser_mnt_query.add_argument('-q', '--query', + nargs='+', + action=fs.QueryAction) + parser_mnt_query.add_argument('--query-file', + type=fs.query_file, + dest='query') + parser_mnt.add_argument('fuse_args', nargs=argparse.REMAINDER) +except NotImplementedError: + pass + # # create new project from cookiecutter template # diff --git a/xun/fs/__init__.py b/xun/fs/__init__.py index 5b54b09..90903c8 100644 --- a/xun/fs/__init__.py +++ b/xun/fs/__init__.py @@ -1 +1,10 @@ from . import queries + +try: + from .filesystem import mount +except NotImplementedError: + pass + fuse_available = False +else: + from . import cli + fuse_available = True diff --git a/xun/fs/cli.py b/xun/fs/cli.py new file mode 100644 index 0000000..ad2ada5 --- /dev/null +++ b/xun/fs/cli.py @@ -0,0 +1,45 @@ +import argparse +import base64 +import pickle + + +def main(args): + from .filesystem import fuse, Fuse, XunFS + server = XunFS(args.store, + args.query, + usage="XunFS\n" + Fuse.fusage, + version="%prog " + fuse.__version__) + fuse_args = list(args.fuse_args)[1:] # Strip -- from previous parse + fuse_args.append('-f') # Run in foreground + server.parse(fuse_args) + server.main() + + +class StoreAction(argparse.Action): + class StoreConstructors: + @staticmethod + def disk(path): + from ..functions.store import Disk + return Disk(path, create_dirs=False) + + def __call__(self, parser, namespace, values, option_string=None): + store_name, *args = values + try: + store = getattr(StoreAction.StoreConstructors, store_name)(*args) + setattr(namespace, self.dest, store) + except Exception as e: + parser.error(f'Invalid store ({store_name}) arguments ({args})') + + +class QueryAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, ' '.join(values)) + + +def store_pickle(s): + return pickle.loads(base64.urlsafe_b64decode(s.encode())) + + +def query_file(filename): + with open(filename, 'r') as f: + return f.read() diff --git a/xun/fs/filesystem.py b/xun/fs/filesystem.py new file mode 100644 index 0000000..2561328 --- /dev/null +++ b/xun/fs/filesystem.py @@ -0,0 +1,280 @@ +from . import cli +from io import BytesIO +from pathlib import Path +from textwrap import dedent +import contextlib +import errno +import networkx as nx +import os +import stat +import subprocess +import time + + +try: + import fuse + from fuse import Fuse + if not hasattr(fuse, '__version__'): + raise RuntimeError("your fuse-py doesn't know of fuse.__version__, probably it's too old.") + fuse.fuse_python_api = (0, 2) +except Exception: + raise NotImplementedError('Missing fuse') + + +@contextlib.contextmanager +def mount(store, query, mountpoint, capture_output=True, timeout=5): + import pickle + import base64 + cmd = [ + 'python3', + '-m', + 'xun', + 'mount', + '--store-pickle', base64.urlsafe_b64encode(pickle.dumps(store)), + '--query', query, + '--', + str(mountpoint) + ] + kwargs = {} if not capture_output else { + 'stdout': subprocess.PIPE, + 'stderr': subprocess.PIPE, + } + proc = subprocess.Popen(cmd, **kwargs) + try: + wait_for_ctrl(str(mountpoint), timeout=timeout) + yield Path(mountpoint) + except TimeoutError: + if capture_output: + out, err = proc.communicate() + msg = dedent(f'''\ + xun mount stdout: + {out.decode()} + + xun mount stderr: + {err.decode()} + ''') + raise RuntimeError(f'failed to mount\n{msg}') + else: + raise RuntimeError('failed to mount') + finally: + proc.terminate() + try: + proc.wait(5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + + +def wait_for_ctrl(path, timeout): + interval = 0.1 + remaining = timeout + ctrl_path = os.path.join(path, 'control') + while True: + time.sleep(interval) + remaining -= interval + + if os.path.exists(ctrl_path): + return + + if remaining <= 0: + raise TimeoutError + + +class XunFS(Fuse): + class File: + def __init__(self, mode=0): + self.mode = mode + + def open(self, flags): + pass + + def read(self, size, offset): + return -errno.EACCES + + def truncate(self, size): + return -errno.EACCES + + def write(self, buf, offset): + return -errno.EACCES + + def getattr(self): + st = Stat() + st.st_mode = stat.S_IFREG | self.mode + st.st_nlink = 1 + return st + + class Refresh(File): + def __init__(self): + super().__init__(stat.S_IRUSR | stat.S_IXUSR) + self.contents = dedent('''\ + #!/bin/bash + echo refresh > `dirname $0`/control + ''') + + def read(self, size, offset): + return self.contents[offset:offset + size].encode() + + def getattr(self): + st = Stat() + st.st_mode = stat.S_IFREG | self.mode + st.st_size = len(self.contents) + st.st_nlink = 1 + return st + + class Control(File): + def __init__(self, fs): + super().__init__(stat.S_IWUSR) + self.commands = { + b'refresh': lambda *_: fs.refresh(), + } + + def truncate(self, size): + pass + + def write(self, buf, offset): + io = BytesIO(buf) + actions = [] + lines = io.readlines() + + for ln in lines: + ln = ln.strip() + if ln in self.commands: + actions.append(ln) + else: + return -errno.EINVAL + for action in actions: + self.commands[action]() + + return len(buf) + + def __init__(self, store, query, *args, **kwargs): + super().__init__(*args, **kwargs) + self.store = store + self.query = query + self.refresh() + + def refresh(self): + graph = nx.DiGraph() + + graph.add_node('/control', file=self.Control(self)) + graph.add_node('/refresh', file=self.Refresh()) + + graph = self.add_path_edges('/control', graph) + graph = self.add_path_edges('/refresh', graph) + graph = self.add_path_edges('/store', graph) + + structure = self.store.query(self.query) + graph = self.add_structure_to_graph(structure, graph) + + self.graph = graph + + def getattr(self, path): + if path not in self.graph: + return -errno.ENOENT + if self.is_file(path): + return self.file(path).getattr() + else: + st = Stat() + st.st_mode = stat.S_IFDIR | 0o500 + st.st_nlink = 1 + return st + + def open(self, path, flags): + try: + return self.file(path).open(flags) + except KeyError: + return -errno.ENOENT + + def read(self, path, size, offset): + if path not in self.graph: + return -errno.ENOENT + return self.file(path).read(size, offset) + + def readdir(self, path, offset): + for node in self.graph.successors(path): + yield fuse.Direntry(os.path.basename(node)) + + def readlink(self, path): + raise RuntimeError + + def rmdir(self, path): + try: + self.graph.remove_node(path) + except: + self.refresh() + raise + + def truncate(self, path, size): + if self.is_file(path): + return self.file(path).truncate(size) + return -errno.EINVAL + + def unlink(self, path): + if not self.is_file(path): + return -errno.EINVAL + self.graph.remove_node(path) + try: + sha256 = os.path.basename(path) + del self.store[self.store.from_sha256(sha256)] + except Exception: + self.refresh() + raise + + def write(self, path, buf, offset): + if self.is_file(path): + return self.file(path).write(buf, offset) + return -errno.EINVAL + + def is_file(self, path): + return path in self.graph and 'file' in self.graph.nodes[path] + + def file(self, path): + return self.graph.nodes[path]['file'] + + @staticmethod + def add_path_edges(path, graph): + graph = graph.copy() + parent, _ = os.path.split(path) + while parent and parent != '/': + graph.add_edge(parent, path) + path = parent + parent, _ = os.path.split(path) + graph.add_edge('/', path) + return graph + + @staticmethod + def add_structure_to_graph(structure, graph, prefix='/store', copy=True): + if copy: + graph = graph.copy() + + if isinstance(structure, dict): + for name, children in structure.items(): + name = os.path.join(prefix, name) + graph.add_edge(prefix, name) + graph = XunFS.add_structure_to_graph( + children, + graph, + prefix=name, + copy=False, + ) + else: + for callnode in structure: + name = os.path.join(prefix, callnode.sha256()) + graph.add_node(name, file=XunFS.File(0o600)) + graph.add_edge(prefix, name) + + return graph + + +class Stat(fuse.Stat): + def __init__(self): + self.st_mode = 0 + self.st_ino = 0 + self.st_dev = 0 + self.st_nlink = 0 + self.st_uid = 0 + self.st_gid = 0 + self.st_size = 0 + self.st_atime = 0 + self.st_mtime = 0 + self.st_ctime = 0 diff --git a/xun/functions/function.py b/xun/functions/function.py index 0213a12..552c2ad 100644 --- a/xun/functions/function.py +++ b/xun/functions/function.py @@ -408,8 +408,12 @@ def function(**tags): ... def get_b(): ... return 'b' ... - >>> @xun.function - ... def workflow(prefix): + >>> @xun.function(custom_tag_name=''' + ... Format string supplied with function arguments, in this case + ... 'prefix' and 'another_argument', which would be {0} and {1} + ... respectively''' + ... ) + ... def workflow(prefix, another_argument): ... return prefix + a + b ... with ...: ... a = get_a() diff --git a/xun/functions/store/disk.py b/xun/functions/store/disk.py index e3192c6..1df4f65 100644 --- a/xun/functions/store/disk.py +++ b/xun/functions/store/disk.py @@ -60,7 +60,7 @@ def _load_value(self, key): self.key_invariant(key) if not self.__contains__(key): - raise KeyError('KeyError: {}'.format(str(key))) + raise KeyError(f'KeyError: {str(key)}') with self.paths(key).val.open() as f: return serialization.load(f) @@ -68,6 +68,13 @@ def _load_value(self, key): def _load_tags(self, key): return self._tagdb.tags(key) + def from_sha256(self, sha256): + path = self.dir / 'keys' / sha256 + if not path.is_file(): + raise KeyError(f'KeyError: {str(sha256)}') + with path.open() as f: + return serialization.load(f) + def filter(self, *conditions): return self._tagdb.query(*conditions) diff --git a/xun/functions/store/layered.py b/xun/functions/store/layered.py index 2c255f3..3d73652 100644 --- a/xun/functions/store/layered.py +++ b/xun/functions/store/layered.py @@ -59,6 +59,14 @@ def _load_tags(self, callnode): return layer._load_tags(callnode) raise KeyError(repr(callnode)) + def from_sha256(self, sha256): + for layer in self._layers: + try: + return layer.from_sha256(sha256) + except KeyError: + pass + raise KeyError(f'KeyError: {str(sha256)}') + def filter(self, *tag_conditions): results = set() for layer in reversed(self._layers): diff --git a/xun/functions/store/memory.py b/xun/functions/store/memory.py index 42c5b98..fa16b66 100644 --- a/xun/functions/store/memory.py +++ b/xun/functions/store/memory.py @@ -36,6 +36,12 @@ def _load_value(self, callnode): value = self._container[callnode] return value + def from_sha256(self, sha256): + for key in self._store.keys(): + if key.sha256() == sha256: + return key + raise KeyError(f'KeyError: {str(sha256)}') + def _store(self, callnode, value, **tags): self._container[callnode] = value self._tagdb.update(callnode, tags) diff --git a/xun/functions/store/store.py b/xun/functions/store/store.py index d25e7cd..cf2d05c 100644 --- a/xun/functions/store/store.py +++ b/xun/functions/store/store.py @@ -92,6 +92,10 @@ def store(self, key, value, **tags): self._store(proxy_callnode, value._referencing) self._store(key, value, **tags) + @abstractmethod + def from_sha256(self, sha256): + pass + def load_callnode(self, callnode): result = self._load_value(callnode._replace(subscript=())) for subscript in callnode.subscript: @@ -133,6 +137,9 @@ def _load_value(self, key): def _load_tags(self, key): return self._wrapped_store._load_tags(key) + def from_sha256(self, sha256): + return self._wrapped_store.from_sha256(key) + def filter(self, *conditions): return self._wrapped_store.filter(*conditions) @@ -168,6 +175,9 @@ def _load_value(self, key): def _load_tags(self, key): return self._wrapped_store._load_tags(key) + def from_sha256(self, sha256): + return self._wrapped_store.from_sha256(key) + def filter(self, *conditions): return self._wrapped_store.filter(*conditions) diff --git a/xun/tests/test_data/query.xunql b/xun/tests/test_data/query.xunql new file mode 100644 index 0000000..7939c25 --- /dev/null +++ b/xun/tests/test_data/query.xunql @@ -0,0 +1,8 @@ +(start_time>="2030-01-02" entry_point function_name) => + start_time { + entry_point { + function_name { + ... + } + } + } diff --git a/xun/tests/test_data/xun-fs-store/db/1ZmzmKz-kQjB5eoTjgql7gwKyB4NQMVlAPe9HiuontU= b/xun/tests/test_data/xun-fs-store/db/1ZmzmKz-kQjB5eoTjgql7gwKyB4NQMVlAPe9HiuontU= new file mode 100644 index 0000000000000000000000000000000000000000..2326da47f886043d008978a349e6fd7a669be0d9 GIT binary patch literal 163840 zcmeHw378b+wRTr;)qAlIvPdJ0Gb{tWOwXnWGtjdz8w>*s;4pOebPofwz$~mW+D&U* zqWMh>DhY(^lAm~u7mXt3W^s*i=VIW77`(<9lPDT6LC|X6ocMpH_X|#fg2kIC12O zlOb+#Vwx&WhE5VE!^VjdVWc>*IK&Ch+l6N}I^iLW@S^aLa76rxKBNFCKnjooqyQ;E z3XlS%04YEUkOHItDex~;V3bzp<{CD2w>5NxJG)!DIvctI!ItoP;V&A4W(cQCPtIJ^ z-rdm_Xwi<)>85ED@UB3Ugyr(Z?H`)54^aO%3U9-2mnZ7~$Au4s_mV%P0O<=UKnjoo zqyQ;E3XlS%04YEUkOHItDL@K*aTORJOBXk8Ty(2A9UEOJPRB$ih||l~h?CJiyExSf zuW5u+;)Fh=04YEUkOHItDL@L40;B*bKnjooqyQ=K#aAFxYtU41+C%sL=$?nq-f4Q~ z4<8J_!}8$AHp?$Bx6gSYtU2}5%ct$xb!a`8&vOQijpP3LPm8!$TH(Jn!sp_IKBNFC zKnjooqyQ;E3XlS%04YEUkOHItDR3z%Fj-f@$)y3bI-Si7^?#mwLu3ED^%L`l{CkF@ zy2IjE`dpd{#PvL{srcGNuW9PR?XS$b;n5fO>2eWe`uCO=9oty6tMm&~4M^k5kGeKV4y{@H~Ip6Dz7K|bgnLN7S3xdj&zl{Jx$&&`ttKUVPBvznCo`sG)D4W?ocq#x%E0rXLqnO z)X}`IOA%F^E9xyCm&cRka%Z``PPezflV9M=j_ZwN16A9=>n+H3$MpuX0ofp~*OLv% z264TPY@lizNPPn>*??>i*K=e8vOzNb{~g5te>0!pKQjDT_XqBGmuB!wTRngRac(M{ z=VQI5+dh7x*)LM&ORF)_xSFk2PUN7_|h--7w!MhbO~kz+`UnKG9$35 z$?FQ0)OEVr%B%948iQMw6)&wUZ*HvGyeQbTYvNKtCBgAXWVInk0#{chW?4Mgtn@{q`4aanU z;@%q|j-;P1Sq0);t#rA+)?<8NSCQwr8HTdvXZQUee^vQy|8VVmvbsC7dqUm4Eyb7M zsqUo|K+1g*7dX%3+#>RTdYgPH4OBxOP`w8hk_Xi50Vxf%fIOgj4^)!}RPTW*@_@=c zkc|Hi^}YZ92zTg`4RvY5L4i2ukWTC!`yRF$r+zltop#;zdw+6x_ky48n$|H>h`!&_ zanGF}tb6?u4EXzv0-&1zfI|Q{J9$9m7x%c{1|Fc%K&l-LBn<o?g5Jb)e!%Wl+JT{9H4sJ`w`>;m0#=Q+;H-M>OC-w zJfM0H3?&b!*8@@>a0q!o^&Sw&11k3b#s6xE|J~AgPWgWI#{VwzfXc7+ac(AgK=mG& zK^{=O2eQZms`tQj@_>3hAl3h;kq1=nfn@yumd5_M^|bjUf86kA-MiddU;da_1$!9R zPlD51eEZk_Z_m5+?iFRQ{UeCz55RyiM{;Ej`H0l<2RqW1c!RM&mZbp zIIpod(pBR2EZFo~!&kk8@+m1M2mF6#I=K52)S) zmyril?}5?e0o8jTjXa=w4~!xYsMiCDvJl1pYKZ?=Nay+A@Be2`_)KHnhQr4+Gc&F@ ze&E&nZZCTw+jxtwWl>(;-TWnZBS7}~Hv(*G^14DLb)BxZ@~XV1#^9D^#Y-#8o7H=s zOR?W_@__0+u#7yQdJoh|9=O(Hd|+3R=eZe%vgT*^{UCo;`ECDj?R>JjJF|O2-MuZv zmtb4qrM3X~{Qsro0rg%7qKGaLFqiF_+P#EdB9AkHs!48C@iU57}?xiHGgff zx5huKciFZ;Yq-D}v8>$~=xFLJa3+)}?r7f^z?@3PGcnB$#cCTv*pIbw|*Bi+T1tV^EqbuSH<$Lm6u6%DW81(u)c{#b^a4?YL zb9*EC;fOaB@#crKBNu)ajL+%z75F>_o_vV^eVWZ0;R#`lFv0OF$1RR)?0>cIvDe#& z*i(g7OxL6vr~NPOP1>ut5mEl6wwOxUH*W zbHlp!=C-cZKy#bN>+*vNK)9n>^pum@1md%dJ%Z|#!j>;`jO+C2%mD$^`eZcWrx1f!shfs1_&L zDHj%uDn zCD+~{+&-5bRgpx+AVpOuQ4OlaNk&9-*ip@9M|BN5s#)x)u9m0<)#4;&zlt4I0XwRE zc2s#1)u39Oq*l4?sB)N5CFB1M8eyN%B3$7({3%=U)uMq8fsL+dVU zt<_?A+Ooki-TbckZgYj1Gwn02H97f1{H^?K<4NQ9jVp{p3@;eA7-s6<)8DJF(rd)} zf1~a)?d#fY+C1(f?mliQXVW~R*#Pl>X%uQP&;yq9pnP9C3bhzUR8WgyLIi(wUpmPer+!;TFqGOXB2qfnFq z(d7Cp2-`zZh7lDMWq?ZV{TCP8szgt*Gzvu-MpRIgVMGN*8Aeo4lwm{#MHwbkN1R|pf1CR3hFY9sGu$bRD)^s zCE1}a!-xv%GK{F8F2jgQBT)^e|B%$`dS*;tWyW+JGp4UFW7^7$X$vc+^3A|B_{@63 z2<4ktl5J#2R=PnFY%slsq}jWfF?C5ygXuFQ0>9EuW=tK-nAS66TE~p3ofT7g8!!#N zw~#!1t+h}iSm#?lw>)AA zSw@>*Gsn!iremgmGc7S$`KS4AeyZ_*jCUE!3}+2{4PnC={TuqYK415-?hf4yG5T-Q zI=Dx;D$U2@N@PFvQ7BH$B35F3Blwg2r@R4v7 zN-&J5pajE+%Eyi>n;n&x9hHX_Re976RICExsEaAvOonWw(HW9xay<=1jxviK)pUt! zF#U&QFch7}j%q49swwQKGTBjOu%arDUI|pJV&iDKv|)qkKS+L<%#LajJE|+#QB7n= z<&>xf(|<_F4)Om~&Gi~#w=l=?j^hT0)Bd!5m0e@oZJT3#$9jX+X?e-gVHsvMF6bsKb}wEMJ6xKFw5T#n|jNGkuSjY8IS zVwQYBa^Zj~HO^0TM(d-Hf`vFru3mIu+6&|nBz{QD0>7N^2jIVWoGkG}u2xBYNZ%^S z4;fu0`617%BtK+-fnV+f`~N+@kP}vtA2P*C@Ay%}{u)L3lkxxk8sP^* zjpH-N1CC1jNA}z8S+>_~tv0*$ht^8VN0!?ySz_J4)od4^`d7(+#NWvZ?)=4yX0-t9k$dyWfnI?es^;XevdAA=;5f}O<}WS*4dhjf&Z{E(wk zk{=RRz^_neHwM`(CHY~niIV&<0tNUL>g>i~Sc-D|wJ{j0!oIT`8>l;nqTLrU@+B!1>S$z(7V(<{ucQRz|Sykn#lInMqvn(0<$xgHVzm21h8#4rYTcIRB47gB|A4XvSKl7BLq(2PJP?8_U zYbeKW5!>cEU71Gst?;DqO`%mN6(%{(I*vFVcYMRK#^D#o1AJybY=6}Lb$gS&$Ufe7 z+V-aH5!)@cM%!%L80+7x2dw{Yy~!G|UTsaYd}4Xcvd6N`veJ@o8Daj5`DOEi<{Qi_ z%sJ+v;u8a2G~H*q&QxdenjHL}`RDoX@mu)CyonrpbdadrLZolqZx;Alx=|c*9krbG$tKj6i2z3}x5e76J z0~!S&xo*W$3}^`kv={@b#ef!JK!q6491LhS26PPuGz$Zoh5=2*fTmzTnHW$81~d`_ z8i4@~$AE@mKtll}*R`s_fEHpv3oxK+4CrbM=qe1T00YX$fUd-V(lMaP7|%L0r41+5kPWxDwJSA#Tbwu19D?PE(~ZU z1~e7}8iN5{h5;EcAUy`81CZQZ6!S2kxfsw43@8f&8jS&^VL;kRx(c(L1a{jVjmdaI zG}-?^mBztnl|$ILbD-5qU#mWJlW>CTRDXLn0i z;!k)J?R4jcKudRcinDN$#o6bx$Y0@~TbuF|%&jb}>%%tXLUbw171vZ(ru+az=Qd8< z{$NWuWqwIb_2LE2qGc&R!a}{<-ftG{OUjR>g-gn+N>Y9TimK{bXJfb}+!bz2nO9vi z&tD_1)7h(4Yz@zk^`@I{m#P_@}?7)>&PZvQP@T$X`-cmGTShjkI7{Rc&?3q63HuaQI~Q zis%dlT3XuL8$}bqqlm?a8MLmO-lNWxek)s2Sh3ij>XIa-NLuAzI-?IUeC;J~40MHI z1(_Lb?He;h)3~NwmBQ|?nitz@>j22OWbR33Dd`uDVFTn1{U`2;Y!)^ z3wyg~>)F`5XXVbUEaz--lClvDtUg)Kr=_QJacg386V~lH5kP+FZ$pp83yUiJNyABV zoS7+G1it85zPVAfX{{el2{`$x7FSd_7u1wh7S=3tmiw1E3m4Z`msN?(mHw*Q>HTO{ z2Rc_fiz=#%dY4{YRkm=kzduwm;6+y4e;M)W*|T@zHL>58rfklXT!S&KJe|`tw>5@0 zb*^t|?g}>qy1UvF-y2d?bNBsb6q_XzG)>!P==ij0)3}?4_v)Oueh>6j-(T%sta5@& zDvDyHsNP>Xxa2x({dM9om5ytFOrmj-1D(70$N{AbcfX_5f4#-kHU6@as)WIMuXt0O zHU46MjlXKHe^JWyvJWKnh%e9@;dm`e93Sb3cZSz@hucEo^SM-Cru+O`-vSz==%{wG~q5?!hl;`jRAYvFWf z;)i~nr=DKl1!9Dr+2)V3FA9q5Q~+6HVK$hhy0L0J7e{QjTcvEt&+nJ^ z@$KD_jOfZCeA;aBFiQt2^)UAWdyc1ckkT)Dw0jp*=AifTXUHG>41ph+kztAtg=3%k z=R1>7my0m?evM(9YCI_G97p<^%pff%Nd!8GvxTqUamQ=4oIs=_O8760QTS^d?Y%){m zpuzjXb|t5f%?z6_q;sFV@kMn`;!<|MkIs@} zJ?H49rt<*l;-tXbro8hvQa9*fGa3 z*8Zvefc+tR+`iJDXCG$!$o5Oy{kE^#mf5mxg7pLI3)bD%t=1)0x7BL-gXQOzdo7)o zN=t@?Grwhi!u$<$vw5z0yy=wbp!n{9n@u6pEK?ePf`5g7kiVW^#(R0Y@sHx@|L=%z z6I^JVZsZN`8h&EfZdhlSZ$*LlhWifJ%~f+##GUDL_x_dghY2Or9u(^TqEP<_h57~xbq5M{I|}u6 z6zVn@dm?A7E)1v>1M0wl)?+~HFraqOKyDnr=2;Z#`zX|{uvv0zYy&E}q5PU>P^hO- zsGp-yKSQB@ib6ezLOp;&-H$@uheG`u3Uvz#bu$X}H5BS56zWE>mz;B2F`yO6lwYY`old zZNPxKF`(rb&@zbs&5nmO!f%Afg*$}RLXj}e@u}m0_!E6d0aAbzAO%PPQh*d71xNu> zfD|AFNP&NW0uW=%l^r#|M4?_pp?-lvy?{bJk3v0%LOqN^{W}WvP@nt%GwA;Re}S!O zm81YEKnjooqyQ;E3XlS%04YEUkOHK@U=+9#6aRCgq5dD{v;)-t9dXA>N1kJt{h0k1 z_TBa^_FDT4yUF&h?I*VFwsp4ow#nAB)?Zs6vwqzgvCgrMv7EHLF24OQX1UgqV;N%p z!2G;To2k@v1^+pJnBU9a$~W@Y@S}~N7+*Dh-*|&@xiQ=5 zF#O5ztl_(c4Tc5cyZ`k1xApt=-_*D0i}g<38QrgRkLYgJ1$9^JMrn_0U)DaL{i=BP zzej81-shg?c5q!>6_+XQ%5VF~#_0BcE)Auo27}W46NNg9Lj41U`W%J&4260Rh59`T z^)3qaI~3|26zYEfC3mwu=Ru)HVNjYgDAe14lDkWrbE8l$6lx|4H3NmpLZL>YP$N*N z;V9HF6ly32rTIGwbsB{_g+hIbLj4VeI*LO57KM5Xh58K&bp%jyH-~f6QK)Gs)KnB| z3JNs@g%VIG2MT4!pfo2@sK266pP*1DP^e#{Q2&iW9Y&!J0ZQ(6Z7vgq%0QtK{Vv%x zrlYtfqfnDjs4Gw?8wzDbp)4qr8HF;TP&^7{#Go|CQK-M5P#>dEAE8jkP^b@4s1H!6 zU!hQMqEH7>sQ*Ht4xmtPpir*^O74z$ZXychM4=|2PzDrAk3#7%D9xWys6U}le?*~P zL!n+ppfD|AFNC8rS6d(mif&X;{ApYNb{cOGPDoRiau3XarQ*nPHw00)aj2$n7I7^2Q!aWegFUf literal 0 HcmV?d00001 diff --git a/xun/tests/test_data/xun-fs-store/db/FLQ8VBqqPhvWx8UxdGXAQ5fnT2DHc6qpzFHC0DhxrAs= b/xun/tests/test_data/xun-fs-store/db/FLQ8VBqqPhvWx8UxdGXAQ5fnT2DHc6qpzFHC0DhxrAs= new file mode 100644 index 0000000000000000000000000000000000000000..64d8d8ffc4d4ddbb3522e09e009683e76ffbb69d GIT binary patch literal 94208 zcmeHQ3v?URnbznvdMgluDS>7}Vr(aNqK9Qm4iM~kYzsS*k>%JX7^BC?DA=+i$$9W7 zwc}7Il$Jh#Qp!S0Pt%r`M=7v9J?*v}%C^v!lU>qfTiD$$hxU*h3VkdTO4<8sMwTX$ zGdJMX-UE82mG4+;1N;H4}^Rt5% zwx)d}GuN9;#f5!i`({#O*~s{q_)AD-)BDG!r;@2`{!9j)j!p=9x=|B%+$R{|@^Fc)J}qQsCbDD6ctZ(#sD}^nTq-gta2IlQMG>mw+5??jAQG#i%(r)P z!7i?o@8i+U=3_2hD+{nI7~=T$Z9QBlI3zpK!FT%uvLm2JQxms6lD?WtPwnL*nfxtq z$&?rcDFjjLl8aHy_)r){FdW2zLwE&=8@X6yVj?va7aF;2a#F}-Ba_pO`52TYz>fC5 zL4Kel4mr|bU+1QUv}@$xXMsE($qEoaeO+p5Z(Tzp=WN(wD&bKO0(ns~Ny(K`qI89V z+Y5%`M87OV3YkV0$t^M(KF~QQnAp`#YYYdC#YV{mTjUn_UCt^+I2rNc-hnG*(DME$ zN#a~cxvVF<8j=A%#pw%~rIFhcnV6|e5{vvGs}x4rlosP9e&=*HE~axbl@Zc~0%lR2 zMV##Ri*%M0%jePMrgMNFD$YvqQ`8y$T1;mL%2CJboHVej)*wIE?nw%J3k9uIu;*R8 z!_N;Da!viNshM;tGJ#6qMs6YRH0Fw3$w>`dySxw>KxN{#uq=3p%J%l*A~uxk%hSoO zgDfmN07QkTm%6??6ddg5Iz~z^Dg;(?3e=Px>}lWO4|K~8LAZ<6D8W!CsvtT>xPn%5 zyI6L$yVh`!EiOC6IAl;&xL|APP&sGI(X!YqZf@x!85*-3$%j`eD%?DLii?s$X};8| zqKv-_8HXke z5A80)@eFaOEI}B zO$oG6=qLfDE&xWdqINp(?+)Y)Rw(HjxDelkIvRm({D5fRe7Ui3)AE*aL%&)N={1r| zE-A$`!tNO%6%&s8sgWhuaVP6@wLH4AVeS%!t#LY;`K<*DNU;$60XY}Qj;LkKrRVT^2RJyG#=JI+y%`I+EEaDLauQSoy+7d~$x`n7K;feV?-ezCKDMVbp79kdkxTDT! zb4$V@xLcg9K4&!HGDXtknKmv*9-f(=C@}ln?u0iMO*ox#N5T>Fxx5aC&mE0M-5!^> zrBx83krt2Bo$v_>cP!!d3C)S9+tJ$EDtH86ED<*~aSl@g;+dREjX|~6#_bz-J7V3# z8Aqx&;2n=guNmnY+TNRt2lfv{$44&9j(g&v9j@puk9Y5dg=?WKY2y;6tM?W~wkth# zjgXp*OfN{`in^M;9#72einO$PTYN~9ge#JWIUJsNv>C;!#qIVsw>lE7tsbA(C3qt7 zXsgrF5>NOX&REpTU3O z+1|d6p2@5GdwAdQzK&~#)4_4xb;&?7(!>umbxgNjw!OpylaXZVgJ=V{yRF$duhmf- zP#er^wbTaG2J>1CwLw+eKx!MPsST(N=Cus9!3W$1sN-sNqA$w^^krU$zD(z!FXJlo zWw4dwVrNy#&Vs-X@1rG1#`byXZo&b%Cz42qVYE4HpAP7ZyKV88umGMmTl1=)!(Nd z)SGla&}DTE+LyJT(e`NG(>$sf*PNq1tiDOTg?XL1pBZATs;5-@At?0e@u1 zQcS&0U|OX-lPl8+rZvk*)$1ThwwWMV&n8K*n)<{<$H6DBb*i5|?H#t~V;@eahmUQr zn!j}ThyQ%`i}&u6-DX)kG?8NRHzKBWNctx~*ZJ`~cMd=Q<`sW>@a`8TH@y7CfBN#_ z7w3NZhr4_4>ppk?TSZS>lnS4liS=)ksFc=ZU{Gx!N7X=%s-7HG9XYD=C8}Fq|4#Rj zsT&`C&GFfnUj5D1zeUe^?7o+tUsHSVCqI6&Lsc@mXUfsCQm zT5?nzIjS|}s6I@N>RfVE=a8dXEm0}0AHp!h+2p7`M2_k#iAt%y!MMR1a#UwZR7~?ZgHgZ%}a#R+HYDqPYsFj%< zm5CgcksOtQ92Kk&hGs1R*pSJ@3S1)6T5$FWc_?ZScxxGGG7NbC>?;dGGEA&0f_# zETdCd80P)=P3W%F_Ft;(e??#PLlvM3Pz9(0Q~|00Re&l$6`%@G1*ig40jj`qQeeHN zk5L*1P-`?6Bh3G^%wd)FFXlfOkFu}mUeX*vKj~-LDnR%Dqx=8S{r~9xe{}ypx(#HT zv<)QP|Bvnligp8~`~Ow94RVWz=e2bIKWYQCMKs<2kJ_MF?Ek-_vi{Zlw((Ew8@gkf z-!ZQ&+s-eIco_xe8F>7Qz7e2G-v}V~0T_60k9wfGJpd0}QV&$O2jEFr>VYctfY<|M z;1Or)f$H`EJey8EP~9GYO$De2s@Vf#|NmvN|NkcYn(kH2ZH_*s(9-HaS_}f4-BBA<@2g>@F(BBekoNtmv__ETgkXnB>VazZ0Gb;Tlc&3fZm55#GscVGPWMsN)CKbz0rQ6zEQCG9q69)Oo0P!CkM2jHz0)C1M+0eIaA^+1(+AlD|O z{r{8fOfN(DJOghDgM6;^#*by#|92PK?sOHviTeNWdN=BUYA);N8F()r^+0ud0A8p_ zJy6{qfHz@M4^+1Y;1#Ce0XipCy<@@f?pf-ADlG?cW5KllzYII{CmH2=23{FY>-%cX z=krVh^*}tZzq&n8Pd!le9;l-psCo~aPd!le9;l@rs9q09>jBqO4^+7aX#f8tJJZV$ zKF>I%eEyd&zuzLf6Cd7lVB^RJ{ipB@fUwq18M8zma;NddmSi|Gy0T@~0Bz zdB#iY`zp`p^GqxCK-GJog?gasJ>a1psCo}HQx8@C%h_R}k zLP3e29%zTu*_EG3Pr#l_$*I&BY|WH#Ib5zLhqKAy=A7=f+%5D#ZAOL(%0x0ukh6#| z^}^hJ-72?X!{Ixg-FnC64_IHh`_NcTecd}RJu&yzFCKmRk!KD)E{wkMluW0|NHSGM zs$Q4`CPoIciKNJSU|I?UQ+gatcAI7G0FxU;nEWtAz79#Rv|+Z8%BJ^^O;06L*;3(i z!)$&SX$7jgAAiuS-|+5P&YJ6Qc=&rq?(6^ly&Kb;?X#~=r0=@@wdtQ@P{G3-gs9+| z076u-w44wXtWYLI1q)q)N@>G84B275Bq1tTc1MT`cJ?4d1sg;FmC}R|hE}l803j-P zT%8aVJoQY73Lc&%LRO?`v0XCf^LP`_R7@2RPX{BLR9b`5<*n)ItxNn@Ja+iRIsf+P%Wv( z5oL$n*$Gj>3!n*6!Fz-WQNinIfoe%Lj;Iy9)szqwyd0Ad6})4S5EZpl0xh%rTpF-3?mT}g~-j1*Jv6~Khi5S69fB7+}2qf)e#nxZnYm^9mbBQFFQfoqXn`Mh(KQX2riD@Zqgj}1r zr<)j47cnND7*i)PrfsB{dOLt=>1_lt`P!vuEv1bhGF?oJX)7_Ni-<8@NQ~(MiD@Zq z1X1xV#F*NMG5Lrw!QyEmQ@K5`1eXjGEaW7@1pWUFs#{g|`|TaJ|FzBAYOO!Ej#~|u zuURfJA2r`m`H z@rP$sz~7`iqPlqe%Z!nSZ>R|CM=HX4eMMNWs|f40m0%sc23V1Ym6ixB?tv3dnCzG6 zi8sM=U4_wom8IWXS^7PdrJt!ReO96;-g*?1Hd9&pbY~+RZ)8A{~IkwRQCJq7u)_Dt^Rjee`TGwHlaiGLlvM3Pz9(0Q~|00 zRe&l$6`%@G1(tyV;aPYTisVRXZh$vD3y(!rfEFH+ssJrKE>!_qcyy`)wD1@eaoW*Y zc%-T#ot9h(sX|#4(Ln(DK%6# z;(#{bfEsW>^*Eq99MCEp&>1+Ol{lc&aX_a5NNK|Td>l|M4ro0NXdMn{1rErL1G3?O ztN>D)96t{Sv=#@%;egiQfIf@^Iu{3Y4i3nI12W@)OgJDT4#`$kgF@YmActM3tQ8^k(ujFrsBfBJnfkHs|cAtlR=?ICxn%fIQ~E;w6GB#q%PyTi-^{LFS8KS*o=qpm z$Axq*h9WZerzm|!x{Ywi-`&lJ#E6L~&L0>E^X;8nu#4+NNr;?~kGXWMEWoZ{h~wM0 z^>Cr!knBVU-|Y{`jzC&8HF4V`>8rW))Lt%<$=?E(Oo>sDLJ;LCxfsQa4~0<#!$BN4 zgjbNbk&8tpCQ?&zp^<~_C(yde$?3*?3`!GVM|DkaAg1b~Pjee2UW-@_8edD^Mzu#3Db) zDuq!trNwxO-#ML)i|L$9WdyVrO1?x}RA&(A!ziVnHor+B4YP5yC)0iuEB`1|sZHrWE z+uMhW*if!7PbZ6Z*;;NPa%HlAcPKd6&vlH*SU`wEU?rzOP1(Vo_8tB}x9kv5ELNig zL!GFC=osM&TFvcZ+12h^!$G#V>=5ISK~>>`t))ZdoGnMoVzaoprHf=}%yJ|j9$ryW zD9x8zRh02}fxM&uDQE1Ao?TUo#BN$JcK%+VK>8)i{x%OaY_e3%)j74U$Xlr0-E#~rQK0i2%f9dDeYuTFG zT4vsyyT9Ud#eeJad7{0ej~A-<6gMEEmf5^YRJv74g z@*`aPU^wUxAm;6SAlz6&b5$gB71z-h>?mA481P>@$d^LJG#WcWz@^uqn&(6p$fDD5 zV3UTe*|?Fpab-cKLRFtrs_;`=DpuiS<@Az7K&s0`E4_0lV!1FsjFLo3mQqZvDpLY2 z6gEmgsp(&0hthCS`<(Z82Xg)>G?p5;5Z{IROMz|tfUGH91d%6LzgiFJHIhp%DMMt0 z-7`WeCLH%uBTKI1PS)pYcXVaL+$9WK<8(6fTMHJDl?^!;$d0IG%%$i50lbhekCaky z$w@6w9+?04vcFT=e}#4cydCWU*o}4pd=Kpd_-(WkVA9rUTZ483d;#qTcnjJQupR9Q z_!im~@SA8~z?kJC%ZJQ=GXKK-b@M^wxWaJM@MFXMhU*MN2A9E#0;C_R09Al0Koy`0Pz5Tk08AJt?Xece0mX1YQ5;YN z2XrM4XbcC`g9GZu0d?VkcpOkC4rm*Il&1f$zyWmtNNM_i6bE!U4rnJ1=rSD82o9(n z2XrwGXe$orA{@|#08*O%AI1R<;edAFfCh0u7vO-l;DFk2Kt2E|P5*~+Km$0S5Dw^4 z98fIG{~9pr+jO|GF`i?Jlds%$hbE>kNLjRUbu%=!Ysm6`%@G1x{UoIn`=@dgt;i_9&k7!w?Z1HGgZY`;}ERQaK6m=l9%(Tq% zFp>Wxmp_X0|Ido^|DUq&vh#M%_O9&(+e2tCz_@Lz?QH9xtvv^gYv^rroAqQ?2oRT_7A z&tjoIgN6Du7V37mMWxv9#{uob0qw;B?ZE-f!0jo`_p3gIh595G>NYIYCm`}l@ygD!LeGChABgmzs#54|Q3I~+J0Zrn7CU8Jk x="2030-01-02" entry_point function_name) => + start_time { + entry_point { + function_name { + ... + } + } + } +""" +expected = sorted([ + ('control', 0o100200), + ('refresh', 0o100500), + ('store/2030-01-02T13:37:00+00:00/main/f/' + 'i-61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg=', 0o100600), + ('store/2030-01-02T13:37:00+00:00/main/f/' + 'mrasCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4=', 0o100600), + ('store/2030-01-02T13:37:00+00:00/main/main/' + 'KtKWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM=', 0o100600), + ('store/2030-01-03T13:37:00+00:00/main/f/' + 'FyhmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng=', 0o100600), + ('store/2030-01-03T13:37:00+00:00/main/main/' + 'uJVEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc=', 0o100600), +]) + + +def tree(path): + files = sorted( + os.path.relpath(os.path.join(dir, file), start=path) + for dir, _, files in os.walk(path) for file in files) + return [(f, os.stat(os.path.join(path, f)).st_mode) for f in files] + + +@contextlib.contextmanager +def simple_store_mnt(): + @xun.function() + def f(*args, **kwargs): + return args, kwargs + + store_contents = { + f.callnode('a'): ('hello', {'tag': 'tag'}), + f.callnode('b'): ('world', {'tag': 'tag'}), + f.callnode('c'): ('goodbye', {'tag': 'tag'}), + } + + with contextlib.ExitStack() as stack: + tmp = stack.enter_context(tempfile.TemporaryDirectory()) + + store = xun.functions.store.Disk(os.path.join(tmp, 'store')) + for callnode, (value, tags) in store_contents.items(): + store.store(callnode, value, **tags) + + mnt_pnt = os.path.join(tmp, 'mnt') + mnt_store = os.path.join(mnt_pnt, 'store') + os.mkdir(mnt_pnt) + stack.enter_context( + xun.fs.mount(store, '() => ...', mnt_pnt, capture_output=False) + ) + + callnodes = [callnode for callnode in store_contents] + yield mnt_pnt, store, callnodes, f + + +@pytest.mark.skipif(not xun.fs.fuse_available, reason='Fuse not available') +def test_filesystem(): + with tempfile.TemporaryDirectory() as tmp, xun.fs.mount(store, query, tmp): + assert tree(tmp) == expected + + +@pytest.mark.skipif(not xun.fs.fuse_available, reason='Fuse not available') +@pytest.mark.parametrize('cmd', [ + ['python3', '-m', 'xun', 'mount', *store_args, *query_args] + for store_args in [ + ['-s', 'disk', 'xun/tests/test_data/xun-fs-store'], + ['--store', 'disk', 'xun/tests/test_data/xun-fs-store'], + ['--store-pickle', urlsafe_b64encode(pickle.dumps(store)).decode()], + ] + for query_args in [ + ['-q', query], + ['--query', query], + ['--query-file', 'xun/tests/test_data/query.xunql'], + ] +]) +def test_filesystem_cli(cmd): + with tempfile.TemporaryDirectory() as tmp: + cmd += ['--', tmp] + proc = subprocess.Popen(cmd) + timeout = 5 + try: + from xun.fs.filesystem import wait_for_ctrl + wait_for_ctrl(tmp, timeout=timeout) + except TimeoutError: + msg = f'control file not mounted after {timeout} seconds' + raise RuntimeError(msg) + finally: + proc.terminate() + try: + proc.wait(5) + except subprocess.TimeoutExpired: + proc.kill() + + +@pytest.mark.skipif(not xun.fs.fuse_available, reason='Fuse not available') +def test_filesystem_control_refresh(): + with simple_store_mnt() as (mnt_pnt, store, callnodes, f): + hashes = sorted(callnode.sha256() for callnode in callnodes) + def ls(): + return sorted(os.listdir(os.path.join(mnt_pnt, 'store'))) + assert ls() == hashes + + new = f.callnode('d') + store.store(new, 'hello', tag='tag') + assert ls() == hashes + + refresh = os.path.abspath(os.path.join(mnt_pnt, 'refresh')) + subprocess.check_call(refresh) + assert ls() == sorted(hashes + [new.sha256()]) + + +@pytest.mark.skipif(not xun.fs.fuse_available, reason='Fuse not available') +def test_delete_mounted_deletes_store(): + with simple_store_mnt() as (mnt_pnt, store, callnodes, f): + a = callnodes[0] + + assert a in store + os.unlink(os.path.join(mnt_pnt, 'store', a.sha256())) + assert a not in store + + +@pytest.mark.skipif(not xun.fs.fuse_available, reason='Fuse not available') +def test_rmdir_deletes_store_items(): + with contextlib.ExitStack() as exit_stack: + mnt_pnt = exit_stack.enter_context(tempfile.TemporaryDirectory()) + (store, callnodes) = exit_stack.enter_context(create_instance(TmpDisk)) + exit_stack.enter_context( + xun.fs.mount(store, query, mnt_pnt, capture_output=False) + ) + + path = os.path.join( + mnt_pnt, + 'store', + '2030-01-02T13:37:00+00:00', + ) + assert callnodes.f_0 in store + assert callnodes.f_1 in store + assert callnodes.main_0 in store + shutil.rmtree(path) + assert not os.path.exists(path) + assert callnodes.f_0 not in store + assert callnodes.f_1 not in store + assert callnodes.main_0 not in store