diff --git a/requirements.txt b/requirements.txt index 10dd0cd0..ea89ed15 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,5 @@ numpy pandas paramiko pyparsing +pytz yapf diff --git a/xun/__init__.py b/xun/__init__.py index 10068a41..7a57b3d2 100644 --- a/xun/__init__.py +++ b/xun/__init__.py @@ -7,10 +7,6 @@ from . import fs from . import serialization -from .core import ExportError -from .core import SchemaError -from .core import args_hash -from .core import filename_from_args from .functions import ComputeError from .functions import CopyError from .functions import Function diff --git a/xun/cli.py b/xun/cli.py index 1f244de1..453490ba 100644 --- a/xun/cli.py +++ b/xun/cli.py @@ -15,6 +15,7 @@ Also see (1) from http://click.pocoo.org/5/setuptools/#setuptools-integration """ from pathlib import Path +from textwrap import dedent import argparse import sys @@ -53,6 +54,38 @@ def main(args=None): parser_fgraph_action.add_argument('--dot', action='store_true') +# +# XunFS +# +try: + from .fs import cli as fs + parser_mnt = subparsers.add_parser('mount', description=dedent('''\ + arguments after a bare -- are forwarded to fuse as is. The fuse + argument for mountpoint is required. + + Example + ------- + xun mount -s disk /path/to/store -q '() => ...' -- /mnt/pnt + '''), formatter_class=argparse.RawTextHelpFormatter) + parser_mnt.set_defaults(func=fs.main) + parser_mnt_store = parser_mnt.add_mutually_exclusive_group(required=True) + parser_mnt_store.add_argument('-s', '--store', + nargs='+', + action=fs.StoreAction) + parser_mnt_store.add_argument('--store-pickle', + type=fs.store_pickle, + dest='store') + parser_mnt_query = parser_mnt.add_mutually_exclusive_group(required=True) + parser_mnt_query.add_argument('-q', '--query', + nargs='+', + action=fs.QueryAction) + parser_mnt_query.add_argument('--query-file', + type=fs.query_file, + dest='query') + parser_mnt.add_argument('fuse_args', nargs=argparse.REMAINDER) +except NotImplementedError: + pass + # # create new project from cookiecutter template # diff --git a/xun/core.py b/xun/core.py deleted file mode 100644 index 0fdd5868..00000000 --- a/xun/core.py +++ /dev/null @@ -1,55 +0,0 @@ -import hashlib - - -class ExportError(Exception): pass -class SchemaError(Exception): pass - - -def args_hash(args): - """Hash command line arguments - - Converts full command line arguments to a hash to use as filename - - Parameters - ---------- - args : argparse.Namespace - The argument namespace as parsed by argparse - - Returns - ------- - str - Hash digest value as a string of hexadecimal digits - """ - args = vars(args) - strings = sorted('{}={}'.format(k, v) for k, v in args.items()) - param_str = ','.join(strings) - - return hashlib.sha256(param_str.encode('utf-8')).hexdigest() - - -def filename_from_args(args, prefix='', postfix=''): - """Filename from command line arguments - - Parameters - ---------- - args : argparse.Namespace - The argument namespace as parsed by argparse - prefix : str, optional - Prefix of the file name - postfix : str, optional - Postfix of the file name - - Returns - ------- - str - File name - - Examples - -------- - - >>> filename_from_args(args, prefix='prefix', postfix='.txt') - 'prefixe3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.txt' - - """ - hash = args_hash(args) - return prefix + hash + postfix diff --git a/xun/fs/__init__.py b/xun/fs/__init__.py index 5b54b09c..90903c82 100644 --- a/xun/fs/__init__.py +++ b/xun/fs/__init__.py @@ -1 +1,10 @@ from . import queries + +try: + from .filesystem import mount +except NotImplementedError: + pass + fuse_available = False +else: + from . import cli + fuse_available = True diff --git a/xun/fs/cli.py b/xun/fs/cli.py new file mode 100644 index 00000000..ad2ada5f --- /dev/null +++ b/xun/fs/cli.py @@ -0,0 +1,45 @@ +import argparse +import base64 +import pickle + + +def main(args): + from .filesystem import fuse, Fuse, XunFS + server = XunFS(args.store, + args.query, + usage="XunFS\n" + Fuse.fusage, + version="%prog " + fuse.__version__) + fuse_args = list(args.fuse_args)[1:] # Strip -- from previous parse + fuse_args.append('-f') # Run in foreground + server.parse(fuse_args) + server.main() + + +class StoreAction(argparse.Action): + class StoreConstructors: + @staticmethod + def disk(path): + from ..functions.store import Disk + return Disk(path, create_dirs=False) + + def __call__(self, parser, namespace, values, option_string=None): + store_name, *args = values + try: + store = getattr(StoreAction.StoreConstructors, store_name)(*args) + setattr(namespace, self.dest, store) + except Exception as e: + parser.error(f'Invalid store ({store_name}) arguments ({args})') + + +class QueryAction(argparse.Action): + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, ' '.join(values)) + + +def store_pickle(s): + return pickle.loads(base64.urlsafe_b64decode(s.encode())) + + +def query_file(filename): + with open(filename, 'r') as f: + return f.read() diff --git a/xun/fs/filesystem.py b/xun/fs/filesystem.py new file mode 100644 index 00000000..25613287 --- /dev/null +++ b/xun/fs/filesystem.py @@ -0,0 +1,280 @@ +from . import cli +from io import BytesIO +from pathlib import Path +from textwrap import dedent +import contextlib +import errno +import networkx as nx +import os +import stat +import subprocess +import time + + +try: + import fuse + from fuse import Fuse + if not hasattr(fuse, '__version__'): + raise RuntimeError("your fuse-py doesn't know of fuse.__version__, probably it's too old.") + fuse.fuse_python_api = (0, 2) +except Exception: + raise NotImplementedError('Missing fuse') + + +@contextlib.contextmanager +def mount(store, query, mountpoint, capture_output=True, timeout=5): + import pickle + import base64 + cmd = [ + 'python3', + '-m', + 'xun', + 'mount', + '--store-pickle', base64.urlsafe_b64encode(pickle.dumps(store)), + '--query', query, + '--', + str(mountpoint) + ] + kwargs = {} if not capture_output else { + 'stdout': subprocess.PIPE, + 'stderr': subprocess.PIPE, + } + proc = subprocess.Popen(cmd, **kwargs) + try: + wait_for_ctrl(str(mountpoint), timeout=timeout) + yield Path(mountpoint) + except TimeoutError: + if capture_output: + out, err = proc.communicate() + msg = dedent(f'''\ + xun mount stdout: + {out.decode()} + + xun mount stderr: + {err.decode()} + ''') + raise RuntimeError(f'failed to mount\n{msg}') + else: + raise RuntimeError('failed to mount') + finally: + proc.terminate() + try: + proc.wait(5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait() + + +def wait_for_ctrl(path, timeout): + interval = 0.1 + remaining = timeout + ctrl_path = os.path.join(path, 'control') + while True: + time.sleep(interval) + remaining -= interval + + if os.path.exists(ctrl_path): + return + + if remaining <= 0: + raise TimeoutError + + +class XunFS(Fuse): + class File: + def __init__(self, mode=0): + self.mode = mode + + def open(self, flags): + pass + + def read(self, size, offset): + return -errno.EACCES + + def truncate(self, size): + return -errno.EACCES + + def write(self, buf, offset): + return -errno.EACCES + + def getattr(self): + st = Stat() + st.st_mode = stat.S_IFREG | self.mode + st.st_nlink = 1 + return st + + class Refresh(File): + def __init__(self): + super().__init__(stat.S_IRUSR | stat.S_IXUSR) + self.contents = dedent('''\ + #!/bin/bash + echo refresh > `dirname $0`/control + ''') + + def read(self, size, offset): + return self.contents[offset:offset + size].encode() + + def getattr(self): + st = Stat() + st.st_mode = stat.S_IFREG | self.mode + st.st_size = len(self.contents) + st.st_nlink = 1 + return st + + class Control(File): + def __init__(self, fs): + super().__init__(stat.S_IWUSR) + self.commands = { + b'refresh': lambda *_: fs.refresh(), + } + + def truncate(self, size): + pass + + def write(self, buf, offset): + io = BytesIO(buf) + actions = [] + lines = io.readlines() + + for ln in lines: + ln = ln.strip() + if ln in self.commands: + actions.append(ln) + else: + return -errno.EINVAL + for action in actions: + self.commands[action]() + + return len(buf) + + def __init__(self, store, query, *args, **kwargs): + super().__init__(*args, **kwargs) + self.store = store + self.query = query + self.refresh() + + def refresh(self): + graph = nx.DiGraph() + + graph.add_node('/control', file=self.Control(self)) + graph.add_node('/refresh', file=self.Refresh()) + + graph = self.add_path_edges('/control', graph) + graph = self.add_path_edges('/refresh', graph) + graph = self.add_path_edges('/store', graph) + + structure = self.store.query(self.query) + graph = self.add_structure_to_graph(structure, graph) + + self.graph = graph + + def getattr(self, path): + if path not in self.graph: + return -errno.ENOENT + if self.is_file(path): + return self.file(path).getattr() + else: + st = Stat() + st.st_mode = stat.S_IFDIR | 0o500 + st.st_nlink = 1 + return st + + def open(self, path, flags): + try: + return self.file(path).open(flags) + except KeyError: + return -errno.ENOENT + + def read(self, path, size, offset): + if path not in self.graph: + return -errno.ENOENT + return self.file(path).read(size, offset) + + def readdir(self, path, offset): + for node in self.graph.successors(path): + yield fuse.Direntry(os.path.basename(node)) + + def readlink(self, path): + raise RuntimeError + + def rmdir(self, path): + try: + self.graph.remove_node(path) + except: + self.refresh() + raise + + def truncate(self, path, size): + if self.is_file(path): + return self.file(path).truncate(size) + return -errno.EINVAL + + def unlink(self, path): + if not self.is_file(path): + return -errno.EINVAL + self.graph.remove_node(path) + try: + sha256 = os.path.basename(path) + del self.store[self.store.from_sha256(sha256)] + except Exception: + self.refresh() + raise + + def write(self, path, buf, offset): + if self.is_file(path): + return self.file(path).write(buf, offset) + return -errno.EINVAL + + def is_file(self, path): + return path in self.graph and 'file' in self.graph.nodes[path] + + def file(self, path): + return self.graph.nodes[path]['file'] + + @staticmethod + def add_path_edges(path, graph): + graph = graph.copy() + parent, _ = os.path.split(path) + while parent and parent != '/': + graph.add_edge(parent, path) + path = parent + parent, _ = os.path.split(path) + graph.add_edge('/', path) + return graph + + @staticmethod + def add_structure_to_graph(structure, graph, prefix='/store', copy=True): + if copy: + graph = graph.copy() + + if isinstance(structure, dict): + for name, children in structure.items(): + name = os.path.join(prefix, name) + graph.add_edge(prefix, name) + graph = XunFS.add_structure_to_graph( + children, + graph, + prefix=name, + copy=False, + ) + else: + for callnode in structure: + name = os.path.join(prefix, callnode.sha256()) + graph.add_node(name, file=XunFS.File(0o600)) + graph.add_edge(prefix, name) + + return graph + + +class Stat(fuse.Stat): + def __init__(self): + self.st_mode = 0 + self.st_ino = 0 + self.st_dev = 0 + self.st_nlink = 0 + self.st_uid = 0 + self.st_gid = 0 + self.st_size = 0 + self.st_atime = 0 + self.st_mtime = 0 + self.st_ctime = 0 diff --git a/xun/functions/driver/dask.py b/xun/functions/driver/dask.py index 7821e347..88f68495 100644 --- a/xun/functions/driver/dask.py +++ b/xun/functions/driver/dask.py @@ -29,15 +29,17 @@ def _exec(self, scheduler = DaskSchedule(self.client, function_images, store, - global_resources) + global_resources, + entry_call, + self.timestamp()) return scheduler(entry_call, graph) -def compute_proxy(store, func): +def compute_proxy(store, func, tags): """Dask 'handles' functools.partial so that we can't use it. Create a new function instead.""" def λ(node): - Driver.compute_and_store(node, func, store) + Driver.compute_and_store(node, func, store, tags) return node functools.update_wrapper(λ, func) return λ @@ -55,10 +57,14 @@ def __init__(self, client, function_images, store, - global_resources): + global_resources, + entry_call, + start_time): self.client = client self.function_images = function_images self.store = store + self.entry_call = entry_call + self.start_time = start_time self.errored = False self.futures = {} self.semaphores = { @@ -108,6 +114,7 @@ async def visit(self, node, atomic_graph, queue): else: async with contextlib.AsyncExitStack() as stack: func_img = self.function_images[node.function_name] + callable = func_img['callable'] semaphores = [ stack.enter_async_context(self.semaphores[res]) @@ -119,7 +126,11 @@ async def visit(self, node, atomic_graph, queue): await asyncio.gather(*semaphores) logger.info(f'Submitting {node}') - func = compute_proxy(self.store, func_img['callable']) + tags = Driver.create_tags(callable, + self.entry_call, + node, + self.start_time) + func = compute_proxy(self.store, callable, tags) kwargs = {} for res, value in (func_img['worker_resources'].items()): kwargs.setdefault('resources', {})[res] = value diff --git a/xun/functions/driver/driver.py b/xun/functions/driver/driver.py index 371cd2c4..1836a9ab 100644 --- a/xun/functions/driver/driver.py +++ b/xun/functions/driver/driver.py @@ -4,7 +4,10 @@ from abc import abstractmethod from copy import deepcopy import contextvars +import datetime +import getpass import logging +import pytz logger = logging.getLogger(__name__) @@ -66,7 +69,25 @@ def value_computed(callnode, store): return callnode in store @staticmethod - def compute_and_store(callnode, func, store): + def timestamp(): + return datetime.datetime.utcnow().astimezone(pytz.utc).isoformat() + + @classmethod + def create_tags(cls, func, entry_call, callnode, start_time): + return { + 'created_by': getpass.getuser(), + 'entry_point': entry_call.function_name, + 'function_name': func.__name__, + 'start_time': start_time, + 'timestamp': cls.timestamp(), + **{ + k: v.format(*callnode.args, *callnode.kwargs.values()) + for k, v in func.tags.items() + } + } + + @staticmethod + def compute_and_store(callnode, func, store, tags): cached_store = store.cached() results = func(*callnode.args, **callnode.kwargs) results.send(None) @@ -77,7 +98,7 @@ def compute_and_store(callnode, func, store): if func.can_write_to(result_call): logger.debug(f'Storing result for {result_call} ' f'(interface of {callnode})') - store.store(result_call, result) + store.store(result_call, result, **tags) else: msg = (f'Call {callnode} attempted to write an interface ' f'[{result_call.function_name}' @@ -87,7 +108,7 @@ def compute_and_store(callnode, func, store): raise XunInterfaceError(msg) except StopIteration as result: logger.debug(f'Storing result for {callnode}') - store.store(callnode, result.value) + store.store(callnode, result.value, **tags) return result.value except Exception as e: raise e from func.Raise() diff --git a/xun/functions/driver/sequential.py b/xun/functions/driver/sequential.py index b869c7b8..1cd8c5ae 100644 --- a/xun/functions/driver/sequential.py +++ b/xun/functions/driver/sequential.py @@ -21,6 +21,7 @@ def _exec(self, assert nx.is_directed_acyclic_graph(graph) schedule = list(nx.topological_sort(graph)) + start_time = self.timestamp() for node in schedule: if not isinstance(node, CallNode): @@ -36,7 +37,8 @@ def _exec(self, logger.info('Running {}'.format(node)) try: - self.compute_and_store(node, func, store) + tags = self.create_tags(func, entry_call, node, start_time) + self.compute_and_store(node, func, store, tags) except Exception as e: logger.error( '{} failed with {}'.format(node, str(e)) diff --git a/xun/functions/function.py b/xun/functions/function.py index a63bd1fc..552c2ad2 100644 --- a/xun/functions/function.py +++ b/xun/functions/function.py @@ -258,11 +258,11 @@ class Function(AbstractFunction): function : Function decorator to create xun functions """ - def __init__(self, desc, dependencies, max_parallel): + def __init__(self, desc, dependencies, tags): self.desc = desc self._dependencies = dependencies self.interfaces = {} - self.max_parallel = max_parallel + self.tags = tags self._global_resources = {} self._worker_resources = {} self._hash = self.sha256() @@ -304,7 +304,7 @@ def globals(self): } @staticmethod - def from_function(func, max_parallel=None): + def from_function(func, **tags): """From Function Creates a xun function from a python function @@ -313,18 +313,14 @@ def from_function(func, max_parallel=None): ---------- func : python function The function definition to create the xun function from - max_parallel : int - The maximum parallel jobs allowed for this function + **tags : Dict[str, Callable[*args, **kwargs, str]] + Custom tag functions Returns ------- Function The `Function` representation of the given function """ - if max_parallel is not None: - msg = 'Limiting parallel execution not yet implemented' - raise NotImplementedError(msg) - desc = describe(func) dependencies = { g.name: g @@ -332,7 +328,7 @@ def from_function(func, max_parallel=None): if isinstance(g, AbstractFunction) } - f = Function(desc, dependencies, max_parallel) + f = Function(desc, dependencies, tags) # Add f to it's dependencies, to allow recursive dependencies f.dependencies[f.name] = f @@ -381,6 +377,7 @@ def callable(self): yields, globals=self.globals, hash=self.hash, + tags=self.tags, original_source_code=self.code.source, interface_hashes=frozenset( i.hash for i in self.interfaces.values() @@ -395,7 +392,7 @@ def interface(self, func): return interface -def function(max_parallel=None): +def function(**tags): """xun.function Function decorator used to create xun functions from python functions @@ -411,8 +408,12 @@ def function(max_parallel=None): ... def get_b(): ... return 'b' ... - >>> @xun.function - ... def workflow(prefix): + >>> @xun.function(custom_tag_name=''' + ... Format string supplied with function arguments, in this case + ... 'prefix' and 'another_argument', which would be {0} and {1} + ... respectively''' + ... ) + ... def workflow(prefix, another_argument): ... return prefix + a + b ... with ...: ... a = get_a() @@ -431,7 +432,7 @@ def function(max_parallel=None): xun function created from the decorated function """ def decorator(func): - return Function.from_function(func, max_parallel) + return Function.from_function(func, **tags) return decorator @@ -545,5 +546,6 @@ def callable(self): self._callable = xform.assemble(self.desc, interface, globals=self.globals, - hash=self.hash) + hash=self.hash, + tags=self.target.tags) return self._callable diff --git a/xun/functions/function_image.py b/xun/functions/function_image.py index 395abb37..74943fb3 100644 --- a/xun/functions/function_image.py +++ b/xun/functions/function_image.py @@ -81,6 +81,7 @@ def __init__(self, annotations, module_name, globals, + tags, referenced_modules, original_source_code=None, interface_hashes=frozenset(), @@ -92,6 +93,7 @@ def __init__(self, self.__annotations__ = annotations self.__module__ = module_name self.globals = globals + self.tags = tags self.referenced_modules = referenced_modules self.hash = hash self.original_source_code = original_source_code @@ -145,6 +147,7 @@ def from_description(desc, hash=None): desc.annotations, desc.module, desc.globals, + None, desc.referenced_modules, hash=hash, ) @@ -237,6 +240,7 @@ def __getstate__(self): self.__annotations__, self.__module__, self.globals, + self.tags, self.referenced_modules, self.original_source_code, self.interface_hashes, @@ -255,10 +259,11 @@ def __setstate__(self, state): self.__annotations__ = state[4] self.__module__ = state[5] self.globals = state[6] - self.referenced_modules = state[7] - self.original_source_code = state[8] - self.interface_hashes = state[9] - self.hash = state[10] + self.tags = state[7] + self.referenced_modules = state[8] + self.original_source_code = state[9] + self.interface_hashes = state[10] + self.hash = state[11] self._func = None def __repr__(self): diff --git a/xun/functions/store/disk.py b/xun/functions/store/disk.py index 6245f5b4..1df4f65b 100644 --- a/xun/functions/store/disk.py +++ b/xun/functions/store/disk.py @@ -1,8 +1,12 @@ from ... import serialization from .store import Store +from .store import TagDB from collections import namedtuple from pathlib import Path +import base64 import contextlib +import shutil +import sqlite3 import tempfile @@ -17,6 +21,7 @@ def __init__(self, dir, create_dirs=True): (self.dir / 'values').mkdir(parents=True, exist_ok=True) elif not self.dir.exists(): raise ValueError(f'Store Directory {str(self.dir)} does not exist') + self._tagdb = DiskTagDB(self, create_dirs) def paths(self, key, root=None): """ Key Paths @@ -55,18 +60,27 @@ def _load_value(self, key): self.key_invariant(key) if not self.__contains__(key): - raise KeyError('KeyError: {}'.format(str(key))) + raise KeyError(f'KeyError: {str(key)}') with self.paths(key).val.open() as f: return serialization.load(f) def _load_tags(self, key): - raise NotImplementedError + return self._tagdb.tags(key) + + def from_sha256(self, sha256): + path = self.dir / 'keys' / sha256 + if not path.is_file(): + raise KeyError(f'KeyError: {str(sha256)}') + with path.open() as f: + return serialization.load(f) def filter(self, *conditions): - raise NotImplementedError + return self._tagdb.query(*conditions) def _store(self, key, value, **tags): + self._tagdb.update(key, tags) + with tempfile.TemporaryDirectory(dir=self.dir) as tmpdir: tmpdir = Path(tmpdir) @@ -91,9 +105,126 @@ def remove(self, key): paths = self.paths(key) paths.key.unlink() paths.val.unlink() + self._tagdb.remove(key) def __getstate__(self): return self.dir def __setstate__(self, state): self.dir = state + self._tagdb = DiskTagDB(self) + + +class DiskTagDB(TagDB): + def __init__(self, store, create_dirs=True): + super().__init__(store) + self.dir = store.dir / 'db' + if create_dirs: + self.dir.mkdir(parents=True, exist_ok=True) + + def refresh(self): + reconciled = False + with contextlib.ExitStack() as stack: + stack.enter_context(self.savepoint()) + databases = {} + for f in self.dir.iterdir(): + checkpoint_hash = base64.urlsafe_b64decode(f.name.encode()) + if f.is_file() and not self.has_checkpoint(checkpoint_hash): + try: + uri = f'file:{str(f)}?mode=ro' + + # Connecting to the database opens a file handle. In + # the case of the file being deleted, we still have a + # handle to it and can read from it. + con = sqlite3.connect(uri, uri=True) + + # Attach our memory database to this connection + # imediately as this will fail with a DatabaseError if + # the database we just connected to is not a valid + # database. + con.execute( + f'ATTACH DATABASE \'{self.uri}\' AS _xun_main' + ) + + # Keep connections (and file handles) alive until we + # leave the context + ctx = contextlib.closing(con) + databases[f] = stack.enter_context(ctx) + except sqlite3.OperationalError: + pass # Database was deleted before we could connect + except sqlite3.DatabaseError: + pass # File is not a database + for con in databases.values(): + self.reconcile(con, read_from=None, write_to='_xun_main') + reconciled = True + + self.create_views() + if not reconciled: + return + checkpoint_name = self.checkpoint() + + if self.dump(checkpoint_name): + # Now that our changes have been reconciled and dumped. Delete any + # databases ours is comprising. + for db in databases: + print('unlinking', db) + db.unlink() + + def reconcile(self, con, read_from=None, write_to=None): + print('RECONCILING') + if read_from is None and write_to is None: + raise ValueError('Cannot read and write to main database') + + def latest_common_checkpoint(): + _, checkpoint = con.execute(''' + SELECT MAX(this.journal_id), this.hash + FROM _xun_main._xun_journal AS this + INNER JOIN _xun_journal as other + ON + this.hash = other.hash AND + this.journal_id = other.journal_id + ''').fetchone() + return checkpoint + + with self.savepoint(): + checkpoint = latest_common_checkpoint() + + diff_con_results, diff_con_tags = self.diff(checkpoint, con=con) + diff_mem_results, diff_mem_tags = self.diff(checkpoint) + + new_results = sorted( + diff_con_results + diff_mem_results, + key=lambda el: (el[3], el[1]) + ) + new_tags = sorted( + diff_con_tags + diff_mem_tags, + key=lambda el: (el[4], el[1]) + ) + + self.reset_to_checkpoint(checkpoint) + + self.mem.executemany(''' + INSERT INTO _xun_results_table + (result_id, callnode, timestamp, deleted) + VALUES + (?, ?, ?, ?) + ''', new_results) + self.mem.executemany(''' + INSERT INTO _xun_tags_table + (result_id, name, value, timestamp, deleted) + VALUES + (?, ?, ?, ?, ?) + ''', new_tags) + + def dump(self, name): + if self.mem.in_transaction: + raise RuntimeError('Database Busy') + if (self.dir / name).exists(): + return False + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir = Path(tmpdir) + tmp = tmpdir / 'sql' + with contextlib.closing(sqlite3.connect(tmp)) as bck: + self.mem.backup(bck) + shutil.copy(tmp, self.dir / name) + return True diff --git a/xun/functions/store/layered.py b/xun/functions/store/layered.py index f9fabb2b..3d73652d 100644 --- a/xun/functions/store/layered.py +++ b/xun/functions/store/layered.py @@ -54,10 +54,25 @@ def remove(self, callnode): del self._layers[0][callnode] def _load_tags(self, callnode): - raise NotImplementedError + for layer in self._layers: + if callnode in layer: + return layer._load_tags(callnode) + raise KeyError(repr(callnode)) + + def from_sha256(self, sha256): + for layer in self._layers: + try: + return layer.from_sha256(sha256) + except KeyError: + pass + raise KeyError(f'KeyError: {str(sha256)}') def filter(self, *tag_conditions): - raise NotImplementedError + results = set() + for layer in reversed(self._layers): + res = layer.filter(*tag_conditions) + results.update(res) + return list(results) def __getstate__(self): return self._layers diff --git a/xun/functions/store/memory.py b/xun/functions/store/memory.py index 34f8b93d..fa16b66d 100644 --- a/xun/functions/store/memory.py +++ b/xun/functions/store/memory.py @@ -1,5 +1,6 @@ from ..errors import CopyError from .store import Store +from .store import TagDB class Memory(Store): @@ -10,8 +11,23 @@ class Memory(Store): may make them incompatible with multiprocessing drivers. """ + class MemoryTagDB(TagDB): + """ + Memory Stores don't persist state, so these methods can be ignored + """ + + def refresh(self): + pass + + def checkpoint(self): + pass + + def dump(self, name): + pass + def __init__(self): self._container = {} + self._tagdb = self.MemoryTagDB(self) def __contains__(self, callnode): return callnode in self._container @@ -20,16 +36,25 @@ def _load_value(self, callnode): value = self._container[callnode] return value - def remove(self, callnode): - del self._store[callnode] + def from_sha256(self, sha256): + for key in self._store.keys(): + if key.sha256() == sha256: + return key + raise KeyError(f'KeyError: {str(sha256)}') + def _store(self, callnode, value, **tags): self._container[callnode] = value + self._tagdb.update(callnode, tags) + + def remove(self, callnode): + del self._container[callnode] + self._tagdb.remove(callnode) def _load_tags(self, callnode): - raise NotImplementedError + return self._tagdb.tags(callnode) def filter(self, *tag_conditions): - raise NotImplementedError + return self._tagdb.query(*tag_conditions) def __copy__(self): raise CopyError('Cannot copy in-memory store') diff --git a/xun/functions/store/store.py b/xun/functions/store/store.py index 68245d01..cf2d05c8 100644 --- a/xun/functions/store/store.py +++ b/xun/functions/store/store.py @@ -1,6 +1,13 @@ from ... import serialization from ...fs.queries import parse from abc import ABC, abstractmethod +from uuid import uuid4 +import base64 +import contextlib +import hashlib +import sqlite3 +import struct +import threading def restructure(data, shape): @@ -85,6 +92,10 @@ def store(self, key, value, **tags): self._store(proxy_callnode, value._referencing) self._store(key, value, **tags) + @abstractmethod + def from_sha256(self, sha256): + pass + def load_callnode(self, callnode): result = self._load_value(callnode._replace(subscript=())) for subscript in callnode.subscript: @@ -126,6 +137,9 @@ def _load_value(self, key): def _load_tags(self, key): return self._wrapped_store._load_tags(key) + def from_sha256(self, sha256): + return self._wrapped_store.from_sha256(key) + def filter(self, *conditions): return self._wrapped_store.filter(*conditions) @@ -161,6 +175,9 @@ def _load_value(self, key): def _load_tags(self, key): return self._wrapped_store._load_tags(key) + def from_sha256(self, sha256): + return self._wrapped_store.from_sha256(key) + def filter(self, *conditions): return self._wrapped_store.filter(*conditions) @@ -179,6 +196,8 @@ def __init__(self, store): self.store = store def __getitem__(self, key): + if key not in self.store: + raise KeyError(repr(key)) return self.store._load_tags(key) def __getattr__(self, name): @@ -253,3 +272,382 @@ def __str__(self): return f'{self.tag}{self.op}{self.value}' else: return self.tag + + +class TagDB: + @abstractmethod + def refresh(self): + pass + + @abstractmethod + def dump(self, name): + pass + + def __init__(self, store): + self.uri = f'file:{id(store)}?mode=memory&cache=shared' + self._lock = threading.RLock() + self.mem = sqlite3.connect(self.uri, + uri=True, + isolation_level=None, + check_same_thread=False) + self.mem.executescript(''' + -- PRIVATE + ---------- + + CREATE TABLE _xun_results_table ( + journal_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + result_id BLOB NOT NULL, + callnode TEXT NOT NULL, + timestamp TEXT NOT NULL, + deleted INTEGER NOT NULL + ); + + CREATE TABLE _xun_tags_table ( + journal_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + result_id BLOB NOT NULL, + name TEXT NOT NULL, + value TEXT NOT NULL, + timestamp TEXT NOT NULL, + deleted INTEGER NOT NULL, + FOREIGN KEY (result_id) REFERENCES _xun_results(result_id) + ); + + CREATE TABLE _xun_journal ( + journal_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + hash BLOB NOT NULL UNIQUE, + result_journal_id INTEGER NOT NULL, + tag_journal_id INTEGER NOT NULL + ); + INSERT INTO _xun_journal(hash, result_journal_id, tag_journal_id) + VALUES (X'', -1, -1); + + -- RESULTS INTERFACE + -------------------- + + CREATE VIEW _xun_results(journal_id, result_id, callnode) AS + SELECT + MAX(journal_id), result_id, callnode + FROM + _xun_results_table + GROUP BY + result_id, + callnode -- ? -- + HAVING + NOT deleted + ORDER BY journal_id; + + CREATE TRIGGER _xun_results_insert + INSTEAD OF INSERT ON _xun_results + BEGIN + INSERT INTO _xun_results_table + (result_id, callnode, timestamp, deleted) + VALUES + (NEW.result_id, NEW.callnode, datetime('now'), 0); + END; + + -- TAGS INTERFACE + ----------------- + + CREATE VIEW _xun_tags(journal_id, result_id, name, value) AS + SELECT + MAX(journal_id), result_id, name, value + FROM + _xun_tags_table + GROUP BY + result_id, + name + HAVING + NOT deleted + ORDER BY journal_id; + + CREATE TRIGGER _xun_tags_insert + INSTEAD OF INSERT ON _xun_tags + BEGIN + INSERT INTO _xun_tags_table + (result_id, name, value, timestamp, deleted) + VALUES + (NEW.result_id, NEW.name, NEW.value, datetime('now'), 0); + END; + + CREATE TRIGGER _xun_tags_delete + INSTEAD OF DELETE ON _xun_tags + BEGIN + INSERT INTO _xun_tags_table + (result_id, name, value, timestamp, deleted) + VALUES + (OLD.result_id, OLD.name, OLD.value, datetime('now'), 1); + END; + + -- DB INTERFACE + --------------- + + CREATE TRIGGER _xun_delete INSTEAD OF DELETE ON _xun_results + FOR EACH ROW + BEGIN + -- Mark row as deleted in _xun_results_table + INSERT INTO _xun_results_table + (result_id, callnode, timestamp, deleted) + VALUES + (OLD.result_id, OLD.callnode, datetime('now'), 1); + + -- Mark row as deleted in _xun_tags_table + DELETE FROM _xun_tags WHERE result_id = OLD.result_id; + END; + ''') + + def __del__(self): + self.mem.close() + + def tags(self, callnode): + self.refresh() + result_id = callnode.sha256(encode=False) + result = self.mem.execute(''' + SELECT name, value FROM _xun_tags + WHERE result_id = :result_id + ''', {'result_id': result_id}) + return dict(result) + + def query(self, *conditions): + self.refresh() + joins = ' '.join( + f'INNER JOIN [{c.tag}] ON [{c.tag}].result_id = ' + '_xun_results.result_id' + for c in conditions + ) + where = ' AND '.join( + f'[{c.tag}] {c.op} :{c.tag}' + for c in conditions + if c.op is not None + ) + if where: + where = 'WHERE ' + where + result = self.mem.execute( + 'SELECT _xun_results.callnode ' + f'from _xun_results {joins} {where}', + {c.tag: c.value for c in conditions} + ) + return [serialization.loads(r) for r, *_ in result if r is not None] + + def update(self, callnode, tags): + prev_tags = self.tags(callnode) + if prev_tags == tags: + # Don't need a savepoint if we're not altering anything + return + + with self.savepoint(): + if prev_tags: + self.remove(callnode) + result_id = callnode.sha256(encode=False) + serialized = serialization.dumps(callnode) + self.mem.execute(''' + INSERT INTO _xun_results(result_id, callnode) + VALUES(:result_id, :serialized) + ''', { + 'result_id': result_id, + 'serialized': serialized, + }) + for tag, tag_value in tags.items(): + if '[' in tag or ']' in tag: + raise ValueError('tag name cannot contain "[" or "]"') + self.mem.execute(''' + INSERT INTO _xun_tags(result_id, name, value) + VALUES(:result_id, :tag, :tag_value) + ''', { + 'result_id': result_id, + 'tag': tag, + 'tag_value': tag_value, + }) + self.create_views(tag) + checkpoint_name = self.checkpoint() + self.dump(checkpoint_name) + + def create_views(self, *tags): + with self.savepoint(): + if not tags: + tags = [ + tag for tag, in + self.mem.execute('SELECT DISTINCT name FROM _xun_tags') + ] + + for tag in tags: + self.mem.execute(f''' + CREATE INDEX IF NOT EXISTS [_xun_tag_index_{tag}] + ON _xun_tags_table(result_id) + WHERE name = {self.sql_literal(tag)} AND NOT deleted + ''') + self.mem.execute( #nosec + f''' + CREATE VIEW IF NOT EXISTS [{tag}](result_id, [{tag}]) AS + SELECT + result_id, value + FROM + _xun_tags + WHERE + name = {self.sql_literal(tag)} + ''') + + def unique_tags(self): + self.refresh() + result_id = callnode.sha256(encode=False) + result = self.mem.execute(''' + SELECT name, value FROM _xun_tags + WHERE result_id = :result_id + ''', {'result_id': result_id}) + return dict(result) + + def remove(self, callnode): + result_id = callnode.sha256(encode=False) + with self.savepoint(): + self.mem.execute('DELETE FROM _xun_results WHERE result_id = ?', + (result_id,)) + + def checkpoint(self): + with self.savepoint(): + max_result_id, = self.mem.execute( + 'SELECT MAX(journal_id) FROM _xun_results_table').fetchone() + max_tag_id, = self.mem.execute( + 'SELECT MAX(journal_id) FROM _xun_tags_table').fetchone() + if max_result_id is None or max_tag_id is None: + return + + _, checkpoint = self.mem.execute(''' + SELECT MAX(journal_id), hash FROM _xun_journal + ''').fetchone() + diff_results, diff_tags = self.diff(checkpoint) + sha256 = self.sha256(checkpoint, *diff_results, *diff_tags) + + self.mem.execute(''' + INSERT INTO _xun_journal + (hash, result_journal_id, tag_journal_id) + VALUES (?, ?, ?) + ''', (sha256, max_result_id, max_tag_id)) + checkpoint_name = base64.urlsafe_b64encode(sha256).decode() + return checkpoint_name + + def has_checkpoint(self, checkpoint): + result = list(self.mem.execute( + 'SELECT * FROM _xun_journal WHERE hash = ?', + (checkpoint,), + )) + return len(result) > 0 + + def diff(self, checkpoint, con=None): + if con is None: + con = self.mem + result_journal_id, tag_journal_id = con.execute(''' + SELECT + result_journal_id, tag_journal_id + FROM + _xun_journal + WHERE + hash = :checkpoint + ''', { + 'checkpoint': checkpoint, + }).fetchone() + + results = list(con.execute(''' + SELECT + result_id, callnode, timestamp, deleted + FROM + _xun_results_table + WHERE + journal_id > :journal_id + ORDER BY + timestamp, result_id + ''', { + 'journal_id': result_journal_id, + })) + + tags = list(con.execute(''' + SELECT + result_id, name, value, timestamp, deleted + FROM + _xun_tags_table + WHERE + journal_id > :journal_id + ORDER BY + timestamp, result_id + ''', { + 'journal_id': tag_journal_id, + })) + + return results, tags + + def reset_to_checkpoint(self, checkpoint): + with self.savepoint(): + result_journal_id, tag_journal_id = self.mem.execute(''' + SELECT + result_journal_id, tag_journal_id + FROM + _xun_journal + WHERE + hash = :checkpoint + ''', { + 'checkpoint': checkpoint, + }).fetchone() + + # Delete any rows older than the checkpoint and reset the + # autoincrement in SQLITE_SEQUENCE. This will cause any new row + # inserted to start incrementing from this checkpoint. + + self.mem.execute(''' + DELETE FROM _xun_results_table WHERE journal_id > :id + ''', { + 'id': result_journal_id, + }) + self.mem.execute(''' + UPDATE SQLITE_SEQUENCE SET SEQ=:new_journal_id + WHERE NAME='_xun_results_table' + ''', { + 'new_journal_id': result_journal_id, + }) + + self.mem.execute(''' + DELETE FROM _xun_tags_table WHERE journal_id > :id + ''', { + 'id': tag_journal_id, + }) + self.mem.execute(''' + UPDATE SQLITE_SEQUENCE SET SEQ=:new_journal_id + WHERE NAME='_xun_tags_table' + ''', { + 'new_journal_id': tag_journal_id, + }) + + def sql_literal(self, value): + return self.mem.execute('SELECT QUOTE(?)', (value,)).fetchone()[0] + + def sha256(self, checkpoint, *rows): + def coerce(obj): + if isinstance(obj, bytes): # BLOB type + return obj + elif isinstance(obj, str): # TEXT type + return obj.encode() + elif isinstance(obj, type(None)): # NULL type + return b'\x00' + elif isinstance(obj, int): # INTEGER type + return struct.pack('q', obj) + elif isinstance(obj, float): # REAL type + return struct.pack('d', obj) + else: + raise ValueError(f'cound not coerce obj {obj}') + + sha256 = hashlib.sha256(checkpoint) + for row in rows: + for el in row: + sha256.update(coerce(el)) + return sha256.digest() + + @contextlib.contextmanager + def savepoint(self): + try: + self._lock.acquire() + savepoint_name = str(uuid4()) + self.mem.execute(f'SAVEPOINT [{savepoint_name}]') + yield + except: + self.mem.execute(f'ROLLBACK TO [{savepoint_name}]') + raise + finally: + self.mem.execute(f'RELEASE [{savepoint_name}]') + self._lock.release() diff --git a/xun/functions/transformations.py b/xun/functions/transformations.py index a1237935..4e19a6f1 100644 --- a/xun/functions/transformations.py +++ b/xun/functions/transformations.py @@ -35,6 +35,7 @@ def assemble(desc, *nodes, globals=None, hash=None, + tags=None, original_source_code=None, interface_hashes=frozenset()): """Assemble serializable `FunctionImage` representation @@ -88,6 +89,7 @@ def assemble(desc, desc.annotations, desc.module, globals if globals is not None else desc.globals, + tags, desc.referenced_modules, original_source_code=original_source_code, interface_hashes=interface_hashes, diff --git a/xun/tests/helpers.py b/xun/tests/helpers.py index caafe4e7..ff5e60e8 100644 --- a/xun/tests/helpers.py +++ b/xun/tests/helpers.py @@ -69,6 +69,7 @@ def __getstate__(self): def __setstate__(self, state): self.id = state self._container = PicklableMemoryStore._cached_stores[self.id]._container + self._tagdb = PicklableMemoryStore._cached_stores[self.id]._tagdb def __enter__(self): PicklableMemoryStore._cached_stores.setdefault(self.id, self) diff --git a/xun/tests/test_data/query.xunql b/xun/tests/test_data/query.xunql new file mode 100644 index 00000000..7939c253 --- /dev/null +++ b/xun/tests/test_data/query.xunql @@ -0,0 +1,8 @@ +(start_time>="2030-01-02" entry_point function_name) => + start_time { + entry_point { + function_name { + ... + } + } + } diff --git a/xun/tests/test_data/xun-fs-store/db/1ZmzmKz-kQjB5eoTjgql7gwKyB4NQMVlAPe9HiuontU= b/xun/tests/test_data/xun-fs-store/db/1ZmzmKz-kQjB5eoTjgql7gwKyB4NQMVlAPe9HiuontU= new file mode 100644 index 00000000..2326da47 Binary files /dev/null and b/xun/tests/test_data/xun-fs-store/db/1ZmzmKz-kQjB5eoTjgql7gwKyB4NQMVlAPe9HiuontU= differ diff --git a/xun/tests/test_data/xun-fs-store/db/FLQ8VBqqPhvWx8UxdGXAQ5fnT2DHc6qpzFHC0DhxrAs= b/xun/tests/test_data/xun-fs-store/db/FLQ8VBqqPhvWx8UxdGXAQ5fnT2DHc6qpzFHC0DhxrAs= new file mode 100644 index 00000000..64d8d8ff Binary files /dev/null and b/xun/tests/test_data/xun-fs-store/db/FLQ8VBqqPhvWx8UxdGXAQ5fnT2DHc6qpzFHC0DhxrAs= differ diff --git a/xun/tests/test_data/xun-fs-store/keys/-aP_NmX6ZFh21uwALyci8NDMoo1JxTQGiTVsUzhYlwc= b/xun/tests/test_data/xun-fs-store/keys/-aP_NmX6ZFh21uwALyci8NDMoo1JxTQGiTVsUzhYlwc= new file mode 100644 index 00000000..0f5b329c --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/keys/-aP_NmX6ZFh21uwALyci8NDMoo1JxTQGiTVsUzhYlwc= @@ -0,0 +1,7 @@ +!xun/CallNodeFunctor::c28854635ca5ee81f476af73eb2f5c95849a1ea296ecca3b1b46f0e361791bf2 +args: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd +- 4 +function_hash: xg30cGXs0nKN8gdbzYFWMKidNySbgYZtg5dRV2bj58w= +function_name: f +kwargs: !xun/FrozenmapFunctor::2b24855c32a6786935cf2afc005db4e5e6338470f775982e5adb7106df901cb8 {} +subscript: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd [] diff --git a/xun/tests/test_data/xun-fs-store/keys/FyhmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng= b/xun/tests/test_data/xun-fs-store/keys/FyhmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng= new file mode 100644 index 00000000..e8288926 --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/keys/FyhmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng= @@ -0,0 +1,7 @@ +!xun/CallNodeFunctor::c28854635ca5ee81f476af73eb2f5c95849a1ea296ecca3b1b46f0e361791bf2 +args: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd +- 2 +function_hash: xg30cGXs0nKN8gdbzYFWMKidNySbgYZtg5dRV2bj58w= +function_name: f +kwargs: !xun/FrozenmapFunctor::2b24855c32a6786935cf2afc005db4e5e6338470f775982e5adb7106df901cb8 {} +subscript: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd [] diff --git a/xun/tests/test_data/xun-fs-store/keys/KtKWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM= b/xun/tests/test_data/xun-fs-store/keys/KtKWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM= new file mode 100644 index 00000000..0e4b8752 --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/keys/KtKWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM= @@ -0,0 +1,7 @@ +!xun/CallNodeFunctor::c28854635ca5ee81f476af73eb2f5c95849a1ea296ecca3b1b46f0e361791bf2 +args: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd +- 0 +function_hash: ADdFftG12ZwLBHmkPHE9XxBzXrOgE2JSia-ES-Bp7ZM= +function_name: main +kwargs: !xun/FrozenmapFunctor::2b24855c32a6786935cf2afc005db4e5e6338470f775982e5adb7106df901cb8 {} +subscript: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd [] diff --git a/xun/tests/test_data/xun-fs-store/keys/XDIGpJ5CMsMuBUlpwrapOWBLi_0wnbdPdSh1H1iZbEY= b/xun/tests/test_data/xun-fs-store/keys/XDIGpJ5CMsMuBUlpwrapOWBLi_0wnbdPdSh1H1iZbEY= new file mode 100644 index 00000000..50dc457f --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/keys/XDIGpJ5CMsMuBUlpwrapOWBLi_0wnbdPdSh1H1iZbEY= @@ -0,0 +1,7 @@ +!xun/CallNodeFunctor::c28854635ca5ee81f476af73eb2f5c95849a1ea296ecca3b1b46f0e361791bf2 +args: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd +- 3 +function_hash: xg30cGXs0nKN8gdbzYFWMKidNySbgYZtg5dRV2bj58w= +function_name: f +kwargs: !xun/FrozenmapFunctor::2b24855c32a6786935cf2afc005db4e5e6338470f775982e5adb7106df901cb8 {} +subscript: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd [] diff --git a/xun/tests/test_data/xun-fs-store/keys/i-61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg= b/xun/tests/test_data/xun-fs-store/keys/i-61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg= new file mode 100644 index 00000000..77b07968 --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/keys/i-61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg= @@ -0,0 +1,7 @@ +!xun/CallNodeFunctor::c28854635ca5ee81f476af73eb2f5c95849a1ea296ecca3b1b46f0e361791bf2 +args: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd +- 1 +function_hash: xg30cGXs0nKN8gdbzYFWMKidNySbgYZtg5dRV2bj58w= +function_name: f +kwargs: !xun/FrozenmapFunctor::2b24855c32a6786935cf2afc005db4e5e6338470f775982e5adb7106df901cb8 {} +subscript: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd [] diff --git a/xun/tests/test_data/xun-fs-store/keys/mrasCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4= b/xun/tests/test_data/xun-fs-store/keys/mrasCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4= new file mode 100644 index 00000000..5867a721 --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/keys/mrasCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4= @@ -0,0 +1,7 @@ +!xun/CallNodeFunctor::c28854635ca5ee81f476af73eb2f5c95849a1ea296ecca3b1b46f0e361791bf2 +args: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd +- 0 +function_hash: xg30cGXs0nKN8gdbzYFWMKidNySbgYZtg5dRV2bj58w= +function_name: f +kwargs: !xun/FrozenmapFunctor::2b24855c32a6786935cf2afc005db4e5e6338470f775982e5adb7106df901cb8 {} +subscript: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd [] diff --git a/xun/tests/test_data/xun-fs-store/keys/sH8_BA-3noPrHY43vfgbYIG5QUz-8UmdAZ0PufjuFAg= b/xun/tests/test_data/xun-fs-store/keys/sH8_BA-3noPrHY43vfgbYIG5QUz-8UmdAZ0PufjuFAg= new file mode 100644 index 00000000..e674057b --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/keys/sH8_BA-3noPrHY43vfgbYIG5QUz-8UmdAZ0PufjuFAg= @@ -0,0 +1,7 @@ +!xun/CallNodeFunctor::c28854635ca5ee81f476af73eb2f5c95849a1ea296ecca3b1b46f0e361791bf2 +args: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd +- d +function_hash: zQyxdYTwShxT-hMTHCPQ2WQqSx6jrAGMQfyuNJkF3RE= +function_name: f +kwargs: !xun/FrozenmapFunctor::2b24855c32a6786935cf2afc005db4e5e6338470f775982e5adb7106df901cb8 {} +subscript: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd [] diff --git a/xun/tests/test_data/xun-fs-store/keys/uJVEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc= b/xun/tests/test_data/xun-fs-store/keys/uJVEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc= new file mode 100644 index 00000000..2e36d7c5 --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/keys/uJVEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc= @@ -0,0 +1,7 @@ +!xun/CallNodeFunctor::c28854635ca5ee81f476af73eb2f5c95849a1ea296ecca3b1b46f0e361791bf2 +args: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd +- 1 +function_hash: ADdFftG12ZwLBHmkPHE9XxBzXrOgE2JSia-ES-Bp7ZM= +function_name: main +kwargs: !xun/FrozenmapFunctor::2b24855c32a6786935cf2afc005db4e5e6338470f775982e5adb7106df901cb8 {} +subscript: !xun/TupleFunctor::933f8cbf11d0f0c9280093bbb352867eeba6513f9ef3cf39e4fb30777e5e9cfd [] diff --git a/xun/tests/test_data/xun-fs-store/values/-aP_NmX6ZFh21uwALyci8NDMoo1JxTQGiTVsUzhYlwc= b/xun/tests/test_data/xun-fs-store/values/-aP_NmX6ZFh21uwALyci8NDMoo1JxTQGiTVsUzhYlwc= new file mode 100644 index 00000000..0520b426 --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/values/-aP_NmX6ZFh21uwALyci8NDMoo1JxTQGiTVsUzhYlwc= @@ -0,0 +1,2 @@ +4 +... diff --git a/xun/tests/test_data/xun-fs-store/values/FyhmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng= b/xun/tests/test_data/xun-fs-store/values/FyhmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng= new file mode 100644 index 00000000..f336b8eb --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/values/FyhmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng= @@ -0,0 +1,2 @@ +1 +... diff --git a/xun/tests/test_data/xun-fs-store/values/KtKWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM= b/xun/tests/test_data/xun-fs-store/values/KtKWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM= new file mode 100644 index 00000000..f336b8eb --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/values/KtKWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM= @@ -0,0 +1,2 @@ +1 +... diff --git a/xun/tests/test_data/xun-fs-store/values/XDIGpJ5CMsMuBUlpwrapOWBLi_0wnbdPdSh1H1iZbEY= b/xun/tests/test_data/xun-fs-store/values/XDIGpJ5CMsMuBUlpwrapOWBLi_0wnbdPdSh1H1iZbEY= new file mode 100644 index 00000000..de99e28c --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/values/XDIGpJ5CMsMuBUlpwrapOWBLi_0wnbdPdSh1H1iZbEY= @@ -0,0 +1,2 @@ +3 +... diff --git a/xun/tests/test_data/xun-fs-store/values/i-61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg= b/xun/tests/test_data/xun-fs-store/values/i-61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg= new file mode 100644 index 00000000..f336b8eb --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/values/i-61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg= @@ -0,0 +1,2 @@ +1 +... diff --git a/xun/tests/test_data/xun-fs-store/values/mrasCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4= b/xun/tests/test_data/xun-fs-store/values/mrasCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4= new file mode 100644 index 00000000..a531cf89 --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/values/mrasCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4= @@ -0,0 +1,2 @@ +0 +... diff --git a/xun/tests/test_data/xun-fs-store/values/sH8_BA-3noPrHY43vfgbYIG5QUz-8UmdAZ0PufjuFAg= b/xun/tests/test_data/xun-fs-store/values/sH8_BA-3noPrHY43vfgbYIG5QUz-8UmdAZ0PufjuFAg= new file mode 100644 index 00000000..a19e517d --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/values/sH8_BA-3noPrHY43vfgbYIG5QUz-8UmdAZ0PufjuFAg= @@ -0,0 +1,2 @@ +hello +... diff --git a/xun/tests/test_data/xun-fs-store/values/uJVEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc= b/xun/tests/test_data/xun-fs-store/values/uJVEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc= new file mode 100644 index 00000000..f336b8eb --- /dev/null +++ b/xun/tests/test_data/xun-fs-store/values/uJVEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc= @@ -0,0 +1,2 @@ +1 +... diff --git a/xun/tests/test_filesystem.py b/xun/tests/test_filesystem.py new file mode 100644 index 00000000..ccdb3af7 --- /dev/null +++ b/xun/tests/test_filesystem.py @@ -0,0 +1,165 @@ +from .test_stores import TmpDisk +from .test_stores import create_instance +from base64 import urlsafe_b64encode +import contextlib +import os +import pickle +import pytest +import shutil +import subprocess +import tempfile +import xun + + +store = xun.functions.store.Disk('xun/tests/test_data/xun-fs-store') +query = """ +(start_time>="2030-01-02" entry_point function_name) => + start_time { + entry_point { + function_name { + ... + } + } + } +""" +expected = sorted([ + ('control', 0o100200), + ('refresh', 0o100500), + ('store/2030-01-02T13:37:00+00:00/main/f/' + 'i-61R9ZvhrLqMJTd5uI__mIctKDd0B8lhcnGuEK46wg=', 0o100600), + ('store/2030-01-02T13:37:00+00:00/main/f/' + 'mrasCwYq-xgxF31_sLzWoFC-nityLw-B5mxymJHqcM4=', 0o100600), + ('store/2030-01-02T13:37:00+00:00/main/main/' + 'KtKWyz-WXKQN5JqoXxcoJ_rdt4L3z7LFsceotmVd8MM=', 0o100600), + ('store/2030-01-03T13:37:00+00:00/main/f/' + 'FyhmbEKFuHsxAssR-gdIhx5mAljsKg0LotLBuRrZnng=', 0o100600), + ('store/2030-01-03T13:37:00+00:00/main/main/' + 'uJVEipdbWNDxXvSsmtVtKt6dn6PS2YLK8ppLoEcdefc=', 0o100600), +]) + + +def tree(path): + files = sorted( + os.path.relpath(os.path.join(dir, file), start=path) + for dir, _, files in os.walk(path) for file in files) + return [(f, os.stat(os.path.join(path, f)).st_mode) for f in files] + + +@contextlib.contextmanager +def simple_store_mnt(): + @xun.function() + def f(*args, **kwargs): + return args, kwargs + + store_contents = { + f.callnode('a'): ('hello', {'tag': 'tag'}), + f.callnode('b'): ('world', {'tag': 'tag'}), + f.callnode('c'): ('goodbye', {'tag': 'tag'}), + } + + with contextlib.ExitStack() as stack: + tmp = stack.enter_context(tempfile.TemporaryDirectory()) + + store = xun.functions.store.Disk(os.path.join(tmp, 'store')) + for callnode, (value, tags) in store_contents.items(): + store.store(callnode, value, **tags) + + mnt_pnt = os.path.join(tmp, 'mnt') + mnt_store = os.path.join(mnt_pnt, 'store') + os.mkdir(mnt_pnt) + stack.enter_context( + xun.fs.mount(store, '() => ...', mnt_pnt, capture_output=False) + ) + + callnodes = [callnode for callnode in store_contents] + yield mnt_pnt, store, callnodes, f + + +@pytest.mark.skipif(not xun.fs.fuse_available, reason='Fuse not available') +def test_filesystem(): + with tempfile.TemporaryDirectory() as tmp, xun.fs.mount(store, query, tmp): + assert tree(tmp) == expected + + +@pytest.mark.skipif(not xun.fs.fuse_available, reason='Fuse not available') +@pytest.mark.parametrize('cmd', [ + ['python3', '-m', 'xun', 'mount', *store_args, *query_args] + for store_args in [ + ['-s', 'disk', 'xun/tests/test_data/xun-fs-store'], + ['--store', 'disk', 'xun/tests/test_data/xun-fs-store'], + ['--store-pickle', urlsafe_b64encode(pickle.dumps(store)).decode()], + ] + for query_args in [ + ['-q', query], + ['--query', query], + ['--query-file', 'xun/tests/test_data/query.xunql'], + ] +]) +def test_filesystem_cli(cmd): + with tempfile.TemporaryDirectory() as tmp: + cmd += ['--', tmp] + proc = subprocess.Popen(cmd) + timeout = 5 + try: + from xun.fs.filesystem import wait_for_ctrl + wait_for_ctrl(tmp, timeout=timeout) + except TimeoutError: + msg = f'control file not mounted after {timeout} seconds' + raise RuntimeError(msg) + finally: + proc.terminate() + try: + proc.wait(5) + except subprocess.TimeoutExpired: + proc.kill() + + +@pytest.mark.skipif(not xun.fs.fuse_available, reason='Fuse not available') +def test_filesystem_control_refresh(): + with simple_store_mnt() as (mnt_pnt, store, callnodes, f): + hashes = sorted(callnode.sha256() for callnode in callnodes) + def ls(): + return sorted(os.listdir(os.path.join(mnt_pnt, 'store'))) + assert ls() == hashes + + new = f.callnode('d') + store.store(new, 'hello', tag='tag') + assert ls() == hashes + + refresh = os.path.abspath(os.path.join(mnt_pnt, 'refresh')) + subprocess.check_call(refresh) + assert ls() == sorted(hashes + [new.sha256()]) + + +@pytest.mark.skipif(not xun.fs.fuse_available, reason='Fuse not available') +def test_delete_mounted_deletes_store(): + with simple_store_mnt() as (mnt_pnt, store, callnodes, f): + a = callnodes[0] + + assert a in store + os.unlink(os.path.join(mnt_pnt, 'store', a.sha256())) + assert a not in store + + +@pytest.mark.skipif(not xun.fs.fuse_available, reason='Fuse not available') +def test_rmdir_deletes_store_items(): + with contextlib.ExitStack() as exit_stack: + mnt_pnt = exit_stack.enter_context(tempfile.TemporaryDirectory()) + (store, callnodes) = exit_stack.enter_context(create_instance(TmpDisk)) + exit_stack.enter_context( + xun.fs.mount(store, query, mnt_pnt, capture_output=False) + ) + + path = os.path.join( + mnt_pnt, + 'store', + '2030-01-02T13:37:00+00:00', + ) + assert callnodes.f_0 in store + assert callnodes.f_1 in store + assert callnodes.main_0 in store + shutil.rmtree(path) + assert not os.path.exists(path) + assert callnodes.f_0 not in store + assert callnodes.f_1 not in store + assert callnodes.main_0 not in store diff --git a/xun/tests/test_functions.py b/xun/tests/test_functions.py index 3d1b1b6e..6fc4e63c 100644 --- a/xun/tests/test_functions.py +++ b/xun/tests/test_functions.py @@ -528,7 +528,7 @@ def f(): f1 = f - w1 = xun.functions.Function(workflow.desc, {'f': f1}, None) + w1 = xun.functions.Function(workflow.desc, {'f': f1}, {}) assert driver.value_computed(f0.callnode(), store) assert driver.value_computed(w0.callnode(), store) @@ -654,11 +654,11 @@ def script(): dependencies = script.dependencies dependencies['g'] = before_edit - script.__init__(script.desc, dependencies, script.max_parallel) + script.__init__(script.desc, dependencies, script.tags) assert '{name}' == script.blueprint().run(driver=driver, store=store) dependencies['g'] = after_edit - script.__init__(script.desc, dependencies, script.max_parallel) + script.__init__(script.desc, dependencies, script.tags) assert 'World' == script.blueprint().run(driver=driver, store=store) diff --git a/xun/tests/test_stores.py b/xun/tests/test_stores.py index 7ac3c6e4..ec21a1b3 100644 --- a/xun/tests/test_stores.py +++ b/xun/tests/test_stores.py @@ -132,7 +132,6 @@ def test_store_implementation(cls, contents): assert store[key] == value -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) @settings(phases=[Phase.generate, Phase.target, Phase.explain], deadline=500, @@ -148,7 +147,6 @@ def test_store_tags(cls, contents): assert store.tags[key] == tags -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) @pytest.mark.parametrize('op', [operator.eq, operator.gt, @@ -168,7 +166,6 @@ def test_store_select_operator(cls, op): assert result == expected -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) def test_store_select_shape(cls): with create_instance(cls) as (store, callnodes): @@ -209,7 +206,6 @@ def test_store_select_shape(cls): } -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) def test_store_query(cls): with create_instance(cls) as (store, callnodes): @@ -225,7 +221,6 @@ def test_store_query(cls): } -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) def test_store_query_argument(cls): with create_instance(cls) as (store, callnodes): @@ -249,7 +244,6 @@ def test_store_query_argument(cls): } -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) def test_store_query_advanced(cls): with create_instance(cls) as (store, callnodes): @@ -290,7 +284,6 @@ def test_store_query_advanced(cls): } -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores - {Layered}) def test_store_remove(cls): with create_instance(cls) as (store, callnodes): @@ -313,7 +306,6 @@ def test_store_remove(cls): assert tags == [(0,), (0,), (0,), (1,), (1,), (1,)] -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores - {Layered}) def test_store_missing_tags_raise(cls): with create_instance(cls) as (store, callnodes): @@ -322,7 +314,6 @@ def test_store_missing_tags_raise(cls): store.tags[callnodes.f_0] -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores) def test_store_rewrite_tags(cls): with create_instance(cls) as (store, callnodes): @@ -331,7 +322,6 @@ def test_store_rewrite_tags(cls): assert store.tags[callnodes.f_1] == new_tags -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores - {Layered}) def test_tags_are_not_duplicated_on_double_write(cls): with create_instance(cls) as (store, callnodes): @@ -364,7 +354,6 @@ def test_tags_are_not_duplicated_on_double_write(cls): assert tags == tags_after -@pytest.mark.xfail(reason='Tagged stores not implemented') @pytest.mark.parametrize('cls', stores - {Memory, Layered}) def test_store_must_be_picklable(cls): with create_instance(cls) as (store, callnodes): diff --git a/xun/tests/test_xun.py b/xun/tests/test_xun.py index e69de29b..33865582 100644 --- a/xun/tests/test_xun.py +++ b/xun/tests/test_xun.py @@ -0,0 +1,60 @@ +from .helpers import run_in_process +import xun + + +def test_xun_result_references(): + @xun.function() + def return_reference(): + data = b'hello world!\n' + return xun.Reference(data) + + @xun.function() + def use_reference(): + with ...: + ref = return_reference() + assert ref.value == b'hello world!\n' + + run_in_process(use_reference.blueprint()) + + +def test_tagged_stores(): + driver = xun.functions.driver.Sequential() + store = xun.functions.store.Memory() + + @xun.function(custom_tag='hello {1}!') + def f(a, b): + pass + + @xun.function() + def main(): + with ...: + f(1, 'argument') + + main.blueprint().run(driver=driver, store=store) + + main_tags = store.tags[main.callnode()] + f_tags = store.tags[f.callnode(1, 'argument')] + + assert set(main_tags.keys()) == { + 'created_by', + 'entry_point', + 'function_name', + 'start_time', + 'timestamp', + } + + assert set(f_tags.keys()) == { + 'created_by', + 'custom_tag', + 'entry_point', + 'function_name', + 'start_time', + 'timestamp', + } + + assert main_tags['entry_point'] == 'main' + assert main_tags['function_name'] == 'main' + + assert f_tags['entry_point'] == 'main' + assert f_tags['function_name'] == 'f' + assert f_tags['custom_tag'] == 'hello argument!'