From fe2b70df62308eb1c74376d6dd52d1e93bbfcd12 Mon Sep 17 00:00:00 2001 From: Zhanyuan Zhang Date: Thu, 4 Aug 2022 09:59:27 -0700 Subject: [PATCH 1/9] merged --- pirlib/backends/docker_batch.py | 2 ++ pirlib/backends/inproc.py | 2 ++ pirlib/handlers/v1.py | 12 ++++++++++++ pirlib/task.py | 22 ++++++++++++++++++++++ 4 files changed, 38 insertions(+) diff --git a/pirlib/backends/docker_batch.py b/pirlib/backends/docker_batch.py index c7fda49..ec610c7 100644 --- a/pirlib/backends/docker_batch.py +++ b/pirlib/backends/docker_batch.py @@ -126,7 +126,9 @@ def run_node(node, graph_inputs): outputs[out.id] = None events = HandlerV1Event(inputs, outputs) context = HandlerV1Context(node) + handler.setup(context) handler.run_handler(events, context) + handler.teardown(context) for out in node.outputs: path = f"/mnt/node_outputs/{node.id}/{out.id}" if out.iotype == "DATAFRAME": diff --git a/pirlib/backends/inproc.py b/pirlib/backends/inproc.py index 368347c..5783e6d 100644 --- a/pirlib/backends/inproc.py +++ b/pirlib/backends/inproc.py @@ -92,5 +92,7 @@ def _execute_node(self, node: pirlib.pir.Node, inputs: Dict[str, Any]): outputs[out.id] = None event = HandlerV1Event(inputs, outputs) context = HandlerV1Context(node) + handler.setup(context) handler.run_handler(event, context) + handler.teardown(context) return outputs diff --git a/pirlib/handlers/v1.py b/pirlib/handlers/v1.py index 997ea41..001a82c 100644 --- a/pirlib/handlers/v1.py +++ b/pirlib/handlers/v1.py @@ -31,3 +31,15 @@ def run_handler( context: HandlerV1Context, ) -> None: raise NotImplementedError + + def setup( + self, + context: HandlerV1Context, + ) -> None: + pass + + def teardown( + self, + context: HandlerV1Context, + ) -> None: + pass \ No newline at end of file diff --git a/pirlib/task.py b/pirlib/task.py index 744f0c8..7a78b66 100644 --- a/pirlib/task.py +++ b/pirlib/task.py @@ -2,6 +2,7 @@ import copy import functools import inspect +import importlib import typeguard from dataclasses import dataclass from typing import Any, Callable, Dict, Optional @@ -153,6 +154,18 @@ def run_handler( return_value, ) + def setup( + self, + context: HandlerV1Context, + ): + pass + + def teardown( + self, + context: HandlerV1Context, + ): + pass + def task( func: Optional[Callable] = None, @@ -160,6 +173,8 @@ def task( name: Optional[str] = None, config: Optional[dict] = None, framework: Optional[pirlib.pir.Framework] = None, + setup: str = None, + teardown: str = None, ) -> TaskDefinition: if framework: if config is None: @@ -174,6 +189,13 @@ def task( framework=framework, ) functools.update_wrapper(wrapper, func) + if setup: + module_name, handler_name = setup.split(":") + setattr(wrapper, "setup", getattr(module_name, handler_name)) + if teardown: + module_name, handler_name = setup.split(":") + setattr(wrapper, "teardown", getattr(module_name, handler_name)) + return wrapper From fda1a6a70ed0cfa8fc04cfd1f44b33fb4d2a4ce0 Mon Sep 17 00:00:00 2001 From: "zhanyuan.zhang" Date: Fri, 5 Aug 2022 13:03:49 -0700 Subject: [PATCH 2/9] WIP: added setup & teardown method --- example/docker-compose.yml | 34 +++++++-------- example/example.py | 44 +++++++++++-------- example/package_docker.yml | 29 +++---------- example/package_inproc.yml | 34 +-------------- example/run_inproc.sh | 1 - pirlib/handlers/v1.py | 15 ++++++- pirlib/task.py | 88 ++++++++++++++++++++++---------------- 7 files changed, 116 insertions(+), 129 deletions(-) diff --git a/example/docker-compose.yml b/example/docker-compose.yml index f1259ec..43e69cb 100644 --- a/example/docker-compose.yml +++ b/example/docker-compose.yml @@ -17,11 +17,10 @@ services: condition: service_completed_successfully train_pipeline.train: condition: service_completed_successfully - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac volumes: - node_outputs:/mnt/node_outputs - ${INPUT_train_dataset:?err}:/mnt/graph_inputs/train_dataset - - ${INPUT_translate_model:?err}:/mnt/graph_inputs/translate_model - ${INPUT_sentences:?err}:/mnt/graph_inputs/sentences - ${OUTPUT:?err}:/mnt/graph_outputs train_pipeline.clean: @@ -30,9 +29,9 @@ services: - -m - pirlib.backends.docker_batch - node - - gASVWwIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMBWNsZWFulIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBVleGFtcGxlLmV4YW1wbGU6Y2xlYW6UjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6MTJhYmVmZmYtODlkNy00ODNhLTliMjMtNzgxNTNkZDUxMGRjlHVic4wJZnJhbWV3b3JrlE6MBmNvbmZpZ5R9lIwGaW5wdXRzlF2UaACMBUlucHV0lJOUKYGUfZQoaAWMB2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjAZzb3VyY2WUaACMCkRhdGFTb3VyY2WUk5QpgZR9lCiMB25vZGVfaWSUTowLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklE6MDmdyYXBoX2lucHV0X2lklIwNdHJhaW5fZGF0YXNldJR1YowEbWV0YZRoAIwITWV0YWRhdGGUk5QpgZR9lCiMBG5hbWWUjAdkYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJhjAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCGMCURJUkVDVE9SWZRoLWgvKYGUfZQoaDKMBnJldHVybpRoNE51YnViYWgtaC8pgZR9lChoMowFY2xlYW6UaDROdWJ1Yi4= - - gASVJAEAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMD3RyYW5zbGF0ZV9tb2RlbJRoCIwERklMRZRoCmgMKYGUfZQoaA+MD3RyYW5zbGF0ZV9tb2RlbJRoEU51YnViaAMpgZR9lChoBowJc2VudGVuY2VzlGgIjAlESVJFQ1RPUlmUaApoDCmBlH2UKGgPjAlzZW50ZW5jZXOUaBFOdWJ1YmUu - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + - gASVWwIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMBWNsZWFulIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBVleGFtcGxlLmV4YW1wbGU6Y2xlYW6UjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6YjhmN2ViYjQtMmI0OS00Zjg3LThjMWEtYWZkYjg1Yjk1NmFjlHVic4wJZnJhbWV3b3JrlE6MBmNvbmZpZ5R9lIwGaW5wdXRzlF2UaACMBUlucHV0lJOUKYGUfZQoaAWMB2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjAZzb3VyY2WUaACMCkRhdGFTb3VyY2WUk5QpgZR9lCiMB25vZGVfaWSUTowLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklE6MDmdyYXBoX2lucHV0X2lklIwNdHJhaW5fZGF0YXNldJR1YowEbWV0YZRoAIwITWV0YWRhdGGUk5QpgZR9lCiMBG5hbWWUjAdkYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJhjAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCGMCURJUkVDVE9SWZRoLWgvKYGUfZQoaDKMBnJldHVybpRoNE51YnViYWgtaC8pgZR9lChoMowFY2xlYW6UaDROdWJ1Yi4= + - gASV2gAAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMCXNlbnRlbmNlc5RoCIwJRElSRUNUT1JZlGgKaAwpgZR9lChoD4wJc2VudGVuY2VzlGgRTnVidWJlLg== + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac volumes: - node_outputs:/mnt/node_outputs - ${INPUT_train_dataset:?err}:/mnt/graph_inputs/train_dataset @@ -42,12 +41,12 @@ services: - -m - pirlib.backends.docker_batch - node - - gASVCAMAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMCGV2YWx1YXRllIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBhleGFtcGxlLmV4YW1wbGU6ZXZhbHVhdGWUjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6MTJhYmVmZmYtODlkNy00ODNhLTliMjMtNzgxNTNkZDUxMGRjlHVic4wJZnJhbWV3b3JrlE6MBmNvbmZpZ5R9lIwGaW5wdXRzlF2UKGgAjAVJbnB1dJSTlCmBlH2UKGgFjBNrd2FyZ3MudGVzdF9kYXRhc2V0lIwGaW90eXBllIwJRElSRUNUT1JZlIwGc291cmNllGgAjApEYXRhU291cmNllJOUKYGUfZQojAdub2RlX2lklE6MC3N1YmdyYXBoX2lklE6MCW91dHB1dF9pZJROjA5ncmFwaF9pbnB1dF9pZJSMCXNlbnRlbmNlc5R1YowEbWV0YZRoAIwITWV0YWRhdGGUk5QpgZR9lCiMBG5hbWWUjBNrd2FyZ3MudGVzdF9kYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJoHSmBlH2UKGgFjBJrd2FyZ3MucHJlZGljdGlvbnOUaCGMCURJUkVDVE9SWZRoI2glKYGUfZQoaCiMGGluZmVyX3BpcGVsaW5lLnNlbnRpbWVudJRoKU5oKowGcmV0dXJulGgrTnViaC1oLymBlH2UKGgyjBJrd2FyZ3MucHJlZGljdGlvbnOUaDROdWJ1YmWMB291dHB1dHOUXZRoAIwGT3V0cHV0lJOUKYGUfZQoaAWMBnJldHVybpRoIYwJREFUQUZSQU1FlGgtaC8pgZR9lChoMowGcmV0dXJulGg0TnVidWJhaC1oLymBlH2UKGgyjAhldmFsdWF0ZZRoNE51YnViLg== - - gASVJAEAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMD3RyYW5zbGF0ZV9tb2RlbJRoCIwERklMRZRoCmgMKYGUfZQoaA+MD3RyYW5zbGF0ZV9tb2RlbJRoEU51YnViaAMpgZR9lChoBowJc2VudGVuY2VzlGgIjAlESVJFQ1RPUlmUaApoDCmBlH2UKGgPjAlzZW50ZW5jZXOUaBFOdWJ1YmUu + - gASVCAMAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMCGV2YWx1YXRllIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBhleGFtcGxlLmV4YW1wbGU6ZXZhbHVhdGWUjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6YjhmN2ViYjQtMmI0OS00Zjg3LThjMWEtYWZkYjg1Yjk1NmFjlHVic4wJZnJhbWV3b3JrlE6MBmNvbmZpZ5R9lIwGaW5wdXRzlF2UKGgAjAVJbnB1dJSTlCmBlH2UKGgFjBNrd2FyZ3MudGVzdF9kYXRhc2V0lIwGaW90eXBllIwJRElSRUNUT1JZlIwGc291cmNllGgAjApEYXRhU291cmNllJOUKYGUfZQojAdub2RlX2lklE6MC3N1YmdyYXBoX2lklE6MCW91dHB1dF9pZJROjA5ncmFwaF9pbnB1dF9pZJSMCXNlbnRlbmNlc5R1YowEbWV0YZRoAIwITWV0YWRhdGGUk5QpgZR9lCiMBG5hbWWUjBNrd2FyZ3MudGVzdF9kYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJoHSmBlH2UKGgFjBJrd2FyZ3MucHJlZGljdGlvbnOUaCGMCURJUkVDVE9SWZRoI2glKYGUfZQoaCiMGGluZmVyX3BpcGVsaW5lLnNlbnRpbWVudJRoKU5oKowGcmV0dXJulGgrTnViaC1oLymBlH2UKGgyjBJrd2FyZ3MucHJlZGljdGlvbnOUaDROdWJ1YmWMB291dHB1dHOUXZRoAIwGT3V0cHV0lJOUKYGUfZQoaAWMBnJldHVybpRoIYwJREFUQUZSQU1FlGgtaC8pgZR9lChoMowGcmV0dXJulGg0TnVidWJhaC1oLymBlH2UKGgyjAhldmFsdWF0ZZRoNE51YnViLg== + - gASV2gAAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMCXNlbnRlbmNlc5RoCIwJRElSRUNUT1JZlGgKaAwpgZR9lChoD4wJc2VudGVuY2VzlGgRTnVidWJlLg== depends_on: train_pipeline.infer_pipeline.sentiment: condition: service_completed_successfully - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac volumes: - node_outputs:/mnt/node_outputs - ${INPUT_sentences:?err}:/mnt/graph_inputs/sentences @@ -57,14 +56,14 @@ services: - -m - pirlib.backends.docker_batch - node - - gASV7QIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMGGluZmVyX3BpcGVsaW5lLnNlbnRpbWVudJSMC2VudHJ5cG9pbnRzlH2UjARtYWlulGgAjApFbnRyeXBvaW50lJOUKYGUfZQojAd2ZXJzaW9ulIwCdjGUjAdoYW5kbGVylIwZZXhhbXBsZS5leGFtcGxlOnNlbnRpbWVudJSMB3J1bnRpbWWUjApweXRob246My45lIwHY29kZXVybJROjAVpbWFnZZSMMXBpcmNsaS1idWlsZDoxMmFiZWZmZi04OWQ3LTQ4M2EtOWIyMy03ODE1M2RkNTEwZGOUdWJzjAlmcmFtZXdvcmuUTowGY29uZmlnlH2UjAZpbnB1dHOUXZQoaACMBUlucHV0lJOUKYGUfZQoaAWMBW1vZGVslIwGaW90eXBllIwERklMRZSMBnNvdXJjZZRoAIwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJSMBXRyYWlulIwLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklIwGcmV0dXJulIwOZ3JhcGhfaW5wdXRfaWSUTnVijARtZXRhlGgAjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMBW1vZGVslIwLYW5ub3RhdGlvbnOUTnVidWJoHSmBlH2UKGgFjAlzZW50ZW5jZXOUaCGMCURJUkVDVE9SWZRoI2glKYGUfZQoaCiMGmluZmVyX3BpcGVsaW5lLnRyYW5zbGF0ZV8xlGgqTmgrjAZyZXR1cm6UaC1OdWJoLmgwKYGUfZQoaDOMCXNlbnRlbmNlc5RoNU51YnViZYwHb3V0cHV0c5RdlGgAjAZPdXRwdXSUk5QpgZR9lChoBYwGcmV0dXJulGghjAlESVJFQ1RPUlmUaC5oMCmBlH2UKGgzjAZyZXR1cm6UaDVOdWJ1YmFoLmgwKYGUfZQoaDOMCXNlbnRpbWVudJRoNU51YnViLg== - - gASVJAEAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMD3RyYW5zbGF0ZV9tb2RlbJRoCIwERklMRZRoCmgMKYGUfZQoaA+MD3RyYW5zbGF0ZV9tb2RlbJRoEU51YnViaAMpgZR9lChoBowJc2VudGVuY2VzlGgIjAlESVJFQ1RPUlmUaApoDCmBlH2UKGgPjAlzZW50ZW5jZXOUaBFOdWJ1YmUu + - gASV7QIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMGGluZmVyX3BpcGVsaW5lLnNlbnRpbWVudJSMC2VudHJ5cG9pbnRzlH2UjARtYWlulGgAjApFbnRyeXBvaW50lJOUKYGUfZQojAd2ZXJzaW9ulIwCdjGUjAdoYW5kbGVylIwZZXhhbXBsZS5leGFtcGxlOnNlbnRpbWVudJSMB3J1bnRpbWWUjApweXRob246My45lIwHY29kZXVybJROjAVpbWFnZZSMMXBpcmNsaS1idWlsZDpiOGY3ZWJiNC0yYjQ5LTRmODctOGMxYS1hZmRiODViOTU2YWOUdWJzjAlmcmFtZXdvcmuUTowGY29uZmlnlH2UjAZpbnB1dHOUXZQoaACMBUlucHV0lJOUKYGUfZQoaAWMBW1vZGVslIwGaW90eXBllIwERklMRZSMBnNvdXJjZZRoAIwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJSMBXRyYWlulIwLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklIwGcmV0dXJulIwOZ3JhcGhfaW5wdXRfaWSUTnVijARtZXRhlGgAjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMBW1vZGVslIwLYW5ub3RhdGlvbnOUTnVidWJoHSmBlH2UKGgFjAlzZW50ZW5jZXOUaCGMCURJUkVDVE9SWZRoI2glKYGUfZQoaCiMGmluZmVyX3BpcGVsaW5lLnRyYW5zbGF0ZV8xlGgqTmgrjAZyZXR1cm6UaC1OdWJoLmgwKYGUfZQoaDOMCXNlbnRlbmNlc5RoNU51YnViZYwHb3V0cHV0c5RdlGgAjAZPdXRwdXSUk5QpgZR9lChoBYwGcmV0dXJulGghjAlESVJFQ1RPUlmUaC5oMCmBlH2UKGgzjAZyZXR1cm6UaDVOdWJ1YmFoLmgwKYGUfZQoaDOMCXNlbnRpbWVudJRoNU51YnViLg== + - gASV2gAAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMCXNlbnRlbmNlc5RoCIwJRElSRUNUT1JZlGgKaAwpgZR9lChoD4wJc2VudGVuY2VzlGgRTnVidWJlLg== depends_on: train_pipeline.infer_pipeline.translate_1: condition: service_completed_successfully train_pipeline.train: condition: service_completed_successfully - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac volumes: - node_outputs:/mnt/node_outputs train_pipeline.infer_pipeline.translate_1: @@ -73,12 +72,11 @@ services: - -m - pirlib.backends.docker_batch - node - - gASV5QIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMGmluZmVyX3BpcGVsaW5lLnRyYW5zbGF0ZV8xlIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBlleGFtcGxlLmV4YW1wbGU6dHJhbnNsYXRllIwHcnVudGltZZSMCnB5dGhvbjozLjmUjAdjb2RldXJslE6MBWltYWdllIwxcGlyY2xpLWJ1aWxkOjEyYWJlZmZmLTg5ZDctNDgzYS05YjIzLTc4MTUzZGQ1MTBkY5R1YnOMCWZyYW1ld29ya5ROjAZjb25maWeUfZSMA2tleZSMBXZhbHVllHOMBmlucHV0c5RdlChoAIwFSW5wdXSUk5QpgZR9lChoBYwGYXJncy4wlIwGaW90eXBllIwERklMRZSMBnNvdXJjZZRoAIwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJROjAtzdWJncmFwaF9pZJROjAlvdXRwdXRfaWSUTowOZ3JhcGhfaW5wdXRfaWSUjA90cmFuc2xhdGVfbW9kZWyUdWKMBG1ldGGUaACMCE1ldGFkYXRhlJOUKYGUfZQojARuYW1llIwGYXJncy4wlIwLYW5ub3RhdGlvbnOUTnVidWJoHymBlH2UKGgFjAZhcmdzLjGUaCOMCURJUkVDVE9SWZRoJWgnKYGUfZQoaCpOaCtOaCxOaC2MCXNlbnRlbmNlc5R1YmgvaDEpgZR9lChoNIwGYXJncy4xlGg2TnVidWJljAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCOMCURJUkVDVE9SWZRoL2gxKYGUfZQoaDSMBnJldHVybpRoNk51YnViYWgvaDEpgZR9lChoNIwLdHJhbnNsYXRlXzGUaDZOdWJ1Yi4= - - gASVJAEAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMD3RyYW5zbGF0ZV9tb2RlbJRoCIwERklMRZRoCmgMKYGUfZQoaA+MD3RyYW5zbGF0ZV9tb2RlbJRoEU51YnViaAMpgZR9lChoBowJc2VudGVuY2VzlGgIjAlESVJFQ1RPUlmUaApoDCmBlH2UKGgPjAlzZW50ZW5jZXOUaBFOdWJ1YmUu - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + - gASViQIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMGmluZmVyX3BpcGVsaW5lLnRyYW5zbGF0ZV8xlIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBlleGFtcGxlLmV4YW1wbGU6dHJhbnNsYXRllIwHcnVudGltZZSMCnB5dGhvbjozLjmUjAdjb2RldXJslE6MBWltYWdllIwxcGlyY2xpLWJ1aWxkOmI4ZjdlYmI0LTJiNDktNGY4Ny04YzFhLWFmZGI4NWI5NTZhY5R1YnOMCWZyYW1ld29ya5ROjAZjb25maWeUfZSMA2tleZSMBXZhbHVllHOMBmlucHV0c5RdlGgAjAVJbnB1dJSTlCmBlH2UKGgFjAlzZW50ZW5jZXOUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjAZzb3VyY2WUaACMCkRhdGFTb3VyY2WUk5QpgZR9lCiMB25vZGVfaWSUTowLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklE6MDmdyYXBoX2lucHV0X2lklIwJc2VudGVuY2VzlHVijARtZXRhlGgAjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMCXNlbnRlbmNlc5SMC2Fubm90YXRpb25zlE51YnViYYwHb3V0cHV0c5RdlGgAjAZPdXRwdXSUk5QpgZR9lChoBYwGcmV0dXJulGgjjAlESVJFQ1RPUlmUaC9oMSmBlH2UKGg0jAZyZXR1cm6UaDZOdWJ1YmFoL2gxKYGUfZQoaDSMC3RyYW5zbGF0ZV8xlGg2TnVidWIu + - gASV2gAAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMCXNlbnRlbmNlc5RoCIwJRElSRUNUT1JZlGgKaAwpgZR9lChoD4wJc2VudGVuY2VzlGgRTnVidWJlLg== + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac volumes: - node_outputs:/mnt/node_outputs - - ${INPUT_translate_model:?err}:/mnt/graph_inputs/translate_model - ${INPUT_sentences:?err}:/mnt/graph_inputs/sentences train_pipeline.train: command: @@ -86,12 +84,12 @@ services: - -m - pirlib.backends.docker_batch - node - - gASVswIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMBXRyYWlulIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBVleGFtcGxlLmV4YW1wbGU6dHJhaW6UjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6MTJhYmVmZmYtODlkNy00ODNhLTliMjMtNzgxNTNkZDUxMGRjlHVic4wJZnJhbWV3b3JrlGgAjAlGcmFtZXdvcmuUk5QpgZR9lCiMBG5hbWWUjAdhZGFwdGRslGgOTowGY29uZmlnlE51YmgefZQojBRhZGFwdGRsL21pbl9yZXBsaWNhc5RLAYwUYWRhcHRkbC9tYXhfcmVwbGljYXOUSwR1jAZpbnB1dHOUXZRoAIwFSW5wdXSUk5QpgZR9lChoBYwHZGF0YXNldJSMBmlvdHlwZZSMCURJUkVDVE9SWZSMBnNvdXJjZZRoAIwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJSMBWNsZWFulIwLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklIwGcmV0dXJulIwOZ3JhcGhfaW5wdXRfaWSUTnVijARtZXRhlGgAjAhNZXRhZGF0YZSTlCmBlH2UKGgcjAdkYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJhjAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCmMBEZJTEWUaDZoOCmBlH2UKGgcjAZyZXR1cm6UaDxOdWJ1YmFoNmg4KYGUfZQoaByMBXRyYWlulGg8TnVidWIu - - gASVJAEAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMD3RyYW5zbGF0ZV9tb2RlbJRoCIwERklMRZRoCmgMKYGUfZQoaA+MD3RyYW5zbGF0ZV9tb2RlbJRoEU51YnViaAMpgZR9lChoBowJc2VudGVuY2VzlGgIjAlESVJFQ1RPUlmUaApoDCmBlH2UKGgPjAlzZW50ZW5jZXOUaBFOdWJ1YmUu + - gASVswIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMBXRyYWlulIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBVleGFtcGxlLmV4YW1wbGU6dHJhaW6UjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6YjhmN2ViYjQtMmI0OS00Zjg3LThjMWEtYWZkYjg1Yjk1NmFjlHVic4wJZnJhbWV3b3JrlGgAjAlGcmFtZXdvcmuUk5QpgZR9lCiMBG5hbWWUjAdhZGFwdGRslGgOTowGY29uZmlnlE51YmgefZQojBRhZGFwdGRsL21pbl9yZXBsaWNhc5RLAYwUYWRhcHRkbC9tYXhfcmVwbGljYXOUSwR1jAZpbnB1dHOUXZRoAIwFSW5wdXSUk5QpgZR9lChoBYwHZGF0YXNldJSMBmlvdHlwZZSMCURJUkVDVE9SWZSMBnNvdXJjZZRoAIwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJSMBWNsZWFulIwLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklIwGcmV0dXJulIwOZ3JhcGhfaW5wdXRfaWSUTnVijARtZXRhlGgAjAhNZXRhZGF0YZSTlCmBlH2UKGgcjAdkYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJhjAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCmMBEZJTEWUaDZoOCmBlH2UKGgcjAZyZXR1cm6UaDxOdWJ1YmFoNmg4KYGUfZQoaByMBXRyYWlulGg8TnVidWIu + - gASV2gAAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMCXNlbnRlbmNlc5RoCIwJRElSRUNUT1JZlGgKaAwpgZR9lChoD4wJc2VudGVuY2VzlGgRTnVidWJlLg== depends_on: train_pipeline.clean: condition: service_completed_successfully - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac volumes: - node_outputs:/mnt/node_outputs version: '3.9' diff --git a/example/example.py b/example/example.py index 3507677..3f7f19e 100644 --- a/example/example.py +++ b/example/example.py @@ -6,6 +6,7 @@ from pirlib.frameworks.adaptdl import AdaptDL from pirlib.iotypes import DirectoryPath, FilePath +from pirlib.handlers.v1 import HandlerV1Context from pirlib.task import task from pirlib.pipeline import pipeline @@ -23,6 +24,7 @@ def clean(dataset: DirectoryPath) -> DirectoryPath: @task(framework=AdaptDL(min_replicas=1, max_replicas=4)) def train(dataset: DirectoryPath) -> FilePath: task_ctx = task.context() + task_ctx.set("train accuracy", 0.83) with open(dataset / "file.txt") as f: print("train({}, config={})".format(f.read().strip(), task_ctx.config)) outfile = task.context().output @@ -46,20 +48,34 @@ def evaluate(kwargs: EvaluateInput) -> pandas.DataFrame: return df -@task -def translate(args: Tuple[FilePath, DirectoryPath]) -> DirectoryPath: - model, sentences = args +class TranslateModel(object): + def translate(self, inp: str) -> str: + output = f"translation: {inp}" + return output + +def translate_setup(context: HandlerV1Context) -> None: + context.set("translate_model", TranslateModel()) + print(">>> Initialized translation model.") + +def translate_teardown(context: HandlerV1Context) -> None: + context.reset("translate_model") + print(">>> Cleaned up translation model.") + +@task(setup="translate_setup", teardown="translate_teardown") +def translate(sentences: DirectoryPath) -> DirectoryPath: task_ctx = task.context() - with open(model) as f, open(sentences / "file.txt") as g: + model = task_ctx.get("translate_model") + with open(sentences / "file.txt") as g: + inp = g.read().strip() print( - "translate({}, {}, config={})".format( - f.read().strip(), - g.read().strip(), + "translate({}, config={})".format( + inp, task_ctx.config) ) + translate_result = model.translate(inp) outdir = task_ctx.output with open(outdir / "file.txt", "w") as f: - f.write("translate_result") + f.write(translate_result) return outdir @@ -74,21 +90,19 @@ def sentiment(model: FilePath, sentences: DirectoryPath) -> DirectoryPath: @pipeline -def infer_pipeline(translate_model: FilePath, - sentiment_model: FilePath, +def infer_pipeline(sentiment_model: FilePath, sentences: DirectoryPath) -> DirectoryPath: translate_1 = translate.instance("translate_1") translate_1.config["key"] = "value" - return sentiment(sentiment_model, translate_1((translate_model, sentences))) + return sentiment(sentiment_model, translate_1(sentences)) @pipeline def train_pipeline( train_dataset: DirectoryPath, - translate_model: FilePath, sentences: DirectoryPath) -> Tuple[FilePath, pandas.DataFrame]: sentiment_model = train(clean(train_dataset)) - sentiment = infer_pipeline(translate_model, sentiment_model, sentences) + sentiment = infer_pipeline(sentiment_model, sentences) eval_input = {"test_dataset": sentences, "predictions": sentiment} return sentiment_model, evaluate(eval_input) @@ -98,17 +112,13 @@ def train_pipeline( print(yaml.dump(asdict(package), sort_keys=False)) # Prepare inputs. dir_1 = tempfile.TemporaryDirectory() - file_2 = tempfile.NamedTemporaryFile() dir_3 = tempfile.TemporaryDirectory() with open(f"{dir_1.name}/file.txt", "w") as f: f.write("train_dataset") - with open(f"{file_2.name}", "w") as f: - f.write("translate_model") with open(f"{dir_3.name}/file.txt", "w") as f: f.write("sentences") # Test calling end-to-end pipeline. model_path, metrics = train_pipeline(DirectoryPath(dir_1.name), - FilePath(file_2.name), DirectoryPath(dir_3.name)) with open(model_path) as f: print("pipeline model: {}".format(f.read().strip())) diff --git a/example/package_docker.yml b/example/package_docker.yml index e4501b1..8ec29ed 100644 --- a/example/package_docker.yml +++ b/example/package_docker.yml @@ -8,7 +8,7 @@ graphs: handler: example.example:clean runtime: python:3.9 codeurl: null - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac framework: null config: {} inputs: @@ -38,7 +38,7 @@ graphs: handler: example.example:train runtime: python:3.9 codeurl: null - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac framework: name: adaptdl version: null @@ -72,7 +72,7 @@ graphs: handler: example.example:evaluate runtime: python:3.9 codeurl: null - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac framework: null config: {} inputs: @@ -112,22 +112,12 @@ graphs: handler: example.example:translate runtime: python:3.9 codeurl: null - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac framework: null config: key: value inputs: - - id: args.0 - iotype: FILE - source: - node_id: null - subgraph_id: null - output_id: null - graph_input_id: translate_model - meta: - name: args.0 - annotations: null - - id: args.1 + - id: sentences iotype: DIRECTORY source: node_id: null @@ -135,7 +125,7 @@ graphs: output_id: null graph_input_id: sentences meta: - name: args.1 + name: sentences annotations: null outputs: - id: return @@ -153,7 +143,7 @@ graphs: handler: example.example:sentiment runtime: python:3.9 codeurl: null - image: pircli-build:12abefff-89d7-483a-9b23-78153dd510dc + image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac framework: null config: {} inputs: @@ -193,11 +183,6 @@ graphs: meta: name: train_dataset annotations: null - - id: translate_model - iotype: FILE - meta: - name: translate_model - annotations: null - id: sentences iotype: DIRECTORY meta: diff --git a/example/package_inproc.yml b/example/package_inproc.yml index 4ca7c87..6a3163d 100644 --- a/example/package_inproc.yml +++ b/example/package_inproc.yml @@ -13,17 +13,7 @@ graphs: config: key: value inputs: - - id: args.0 - iotype: FILE - source: - node_id: null - subgraph_id: null - output_id: null - graph_input_id: translate_model - meta: - name: args.0 - annotations: null - - id: args.1 + - id: sentences iotype: DIRECTORY source: node_id: null @@ -31,7 +21,7 @@ graphs: output_id: null graph_input_id: sentences meta: - name: args.1 + name: sentences annotations: null outputs: - id: return @@ -84,11 +74,6 @@ graphs: annotations: null subgraphs: [] inputs: - - id: translate_model - iotype: FILE - meta: - name: translate_model - annotations: null - id: sentiment_model iotype: FILE meta: @@ -224,16 +209,6 @@ graphs: graph_id: infer_pipeline config: {} inputs: - - id: translate_model - iotype: FILE - source: - node_id: null - subgraph_id: null - output_id: null - graph_input_id: translate_model - meta: - name: translate_model - annotations: null - id: sentiment_model iotype: FILE source: @@ -269,11 +244,6 @@ graphs: meta: name: train_dataset annotations: null - - id: translate_model - iotype: FILE - meta: - name: translate_model - annotations: null - id: sentences iotype: DIRECTORY meta: diff --git a/example/run_inproc.sh b/example/run_inproc.sh index 570c981..c306131 100644 --- a/example/run_inproc.sh +++ b/example/run_inproc.sh @@ -10,7 +10,6 @@ PYTHONPATH=$ROOTDIR $ROOTDIR/bin/pircli execute \ $EXAMPLEDIR/package_inproc.yml train_pipeline \ --target pirlib.backends.inproc:InprocBackend \ --input train_dataset=$EXAMPLEDIR/inputs/train_dataset \ - --input translate_model=$EXAMPLEDIR/inputs/translate_model.txt \ --input sentences=$EXAMPLEDIR/inputs/sentences \ --output return.0=$EXAMPLEDIR/outputs/return.0 \ --output return.1=$EXAMPLEDIR/outputs/return.1 diff --git a/pirlib/handlers/v1.py b/pirlib/handlers/v1.py index 001a82c..0626d92 100644 --- a/pirlib/handlers/v1.py +++ b/pirlib/handlers/v1.py @@ -1,5 +1,5 @@ from abc import abstractmethod -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any, Dict from pirlib.pir import Node @@ -7,6 +7,19 @@ @dataclass class HandlerV1Context(object): node: Node + states: Dict[str, Any] = field(default_factory=dict) + + def set(self, key: str, value: Any): + self.states[key] = value + + def get(self, key: str, default: Any =None) -> Any: + return self.states.get(key, default) + + def reset(self, key: str) -> None: + del self.states[key] + + def sync_states(self, context) -> None: + self.states.update(context.states) @dataclass diff --git a/pirlib/task.py b/pirlib/task.py index 7a78b66..629355f 100644 --- a/pirlib/task.py +++ b/pirlib/task.py @@ -4,7 +4,7 @@ import inspect import importlib import typeguard -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any, Callable, Dict, Optional import pirlib.pir @@ -20,6 +20,19 @@ class TaskContext: config: Dict[str, Any] output: Any + states: Dict = field(default_factory=dict) + + def set(self, key: str, value: Any) -> None: + self.states[key] = value + + def get(self, key: str, default: Any =None) -> Any: + return self.states.get(key, default) + + def reset(self, key: str) -> None: + del self.states[key] + + def sync_states(self, handler_context: HandlerV1Context) -> None: + self.states.update(handler_context.states) def task_context() -> TaskContext: @@ -78,12 +91,28 @@ def __init__( name: Optional[str] = None, config: Optional[dict] = None, framework: Optional[pirlib.pir.Framework] = None, + setup: Optional[str] = None, + teardown: Optional[str] = None, ): - self._func = func if func is None else typeguard.typechecked(func) + self._func = typeguard.typechecked(func) self._name = name if name else getattr(func, "__name__", None) self._config = copy.deepcopy(config) if config else None self._framework = framework + # Both setup and teardown methods need to come before func + # Otherwise they won't be found in this function + func_module = inspect.getmodule(self._func) + if setup: + if hasattr(func_module, setup): + setattr(self, "setup", getattr(func_module, setup)) + else: + raise ValueError(f"Couldn't find setup function {setup}.") + if teardown: + if hasattr(func_module, teardown): + setattr(self, "teardown", getattr(func_module, teardown)) + else: + raise ValueError(f"Couldn't find teardown function {teardown}.") + @property def func(self): return self._func @@ -101,15 +130,6 @@ def framework(self): return self._framework def __call__(self, *args, **kwargs): - if len(args) == 1 and callable(args[0]) and not kwargs: - wrapper = TaskDefinition( - func=args[0], - name=self.name, - config=self.config, - framework=self.framework, - ) - functools.update_wrapper(wrapper, args[0]) - return wrapper return self.instance(self.name)(*args, **kwargs) def instance(self, name: str) -> TaskInstance: @@ -132,6 +152,7 @@ def run_handler( inputs, outputs = event.inputs, event.outputs sig = inspect.signature(self.func) task_context = TaskContext(context.node.config, None) + task_context.sync_states(context) task_context.output = recurse_hint( lambda name, hint: outputs[name], "return", sig.return_annotation ) @@ -145,6 +166,7 @@ def run_handler( token = _TASK_CONTEXT.set(task_context) try: return_value = self.func(*args, **kwargs) + context.sync_states(task_context) finally: _TASK_CONTEXT.reset(token) recurse_hint( @@ -154,18 +176,6 @@ def run_handler( return_value, ) - def setup( - self, - context: HandlerV1Context, - ): - pass - - def teardown( - self, - context: HandlerV1Context, - ): - pass - def task( func: Optional[Callable] = None, @@ -182,21 +192,23 @@ def task( f_name = framework.name for k, v in framework.config.items(): config[f"{f_name}/{k}"] = v - wrapper = TaskDefinition( - func=func, - name=name, - config=config, - framework=framework, - ) - functools.update_wrapper(wrapper, func) - if setup: - module_name, handler_name = setup.split(":") - setattr(wrapper, "setup", getattr(module_name, handler_name)) - if teardown: - module_name, handler_name = setup.split(":") - setattr(wrapper, "teardown", getattr(module_name, handler_name)) - - return wrapper + + def wrapper(func) -> TaskDefinition: + task_dfn = TaskDefinition( + func=func, + name=name, + config=config, + framework=framework, + setup=setup, + teardown=teardown, + ) + functools.update_wrapper(task_dfn, func) + return task_dfn + + if func: + return wrapper(func) + else: + return wrapper task.context = task_context From 1360b21a6b4abe781f71b1bb0e38a844800f40d5 Mon Sep 17 00:00:00 2001 From: "zhanyuan.zhang" Date: Fri, 5 Aug 2022 13:07:42 -0700 Subject: [PATCH 3/9] fixed lint --- pirlib/handlers/v1.py | 4 ++-- pirlib/task.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pirlib/handlers/v1.py b/pirlib/handlers/v1.py index 0626d92..61d4223 100644 --- a/pirlib/handlers/v1.py +++ b/pirlib/handlers/v1.py @@ -12,7 +12,7 @@ class HandlerV1Context(object): def set(self, key: str, value: Any): self.states[key] = value - def get(self, key: str, default: Any =None) -> Any: + def get(self, key: str, default: Any = None) -> Any: return self.states.get(key, default) def reset(self, key: str) -> None: @@ -55,4 +55,4 @@ def teardown( self, context: HandlerV1Context, ) -> None: - pass \ No newline at end of file + pass diff --git a/pirlib/task.py b/pirlib/task.py index 629355f..673795c 100644 --- a/pirlib/task.py +++ b/pirlib/task.py @@ -2,7 +2,6 @@ import copy import functools import inspect -import importlib import typeguard from dataclasses import dataclass, field from typing import Any, Callable, Dict, Optional @@ -25,7 +24,7 @@ class TaskContext: def set(self, key: str, value: Any) -> None: self.states[key] = value - def get(self, key: str, default: Any =None) -> Any: + def get(self, key: str, default: Any = None) -> Any: return self.states.get(key, default) def reset(self, key: str) -> None: From 321411e05df647dfbf08fa42c1062a92eb371e9a Mon Sep 17 00:00:00 2001 From: "zhanyuan.zhang" Date: Mon, 8 Aug 2022 00:26:38 -0700 Subject: [PATCH 4/9] hid handler context from users --- example/example.py | 23 ++++++++----- pirlib/backends/docker_batch.py | 4 +-- pirlib/backends/inproc.py | 4 +-- pirlib/handlers/v1.py | 7 ++-- pirlib/task.py | 60 ++++++++++++++++----------------- 5 files changed, 51 insertions(+), 47 deletions(-) diff --git a/example/example.py b/example/example.py index 3f7f19e..78466d5 100644 --- a/example/example.py +++ b/example/example.py @@ -53,15 +53,8 @@ def translate(self, inp: str) -> str: output = f"translation: {inp}" return output -def translate_setup(context: HandlerV1Context) -> None: - context.set("translate_model", TranslateModel()) - print(">>> Initialized translation model.") - -def translate_teardown(context: HandlerV1Context) -> None: - context.reset("translate_model") - print(">>> Cleaned up translation model.") -@task(setup="translate_setup", teardown="translate_teardown") +@task def translate(sentences: DirectoryPath) -> DirectoryPath: task_ctx = task.context() model = task_ctx.get("translate_model") @@ -79,6 +72,20 @@ def translate(sentences: DirectoryPath) -> DirectoryPath: return outdir +@translate.setup +def translate_setup() -> None: + task_ctx = task.context() + task_ctx.set("translate_model", TranslateModel()) + print(">>> Initialized translation model.") + + +@translate.teardown +def translate_teardown() -> None: + task_ctx = task.context() + task_ctx.reset("translate_model") + print(">>> Cleaned up translation model.") + + @task def sentiment(model: FilePath, sentences: DirectoryPath) -> DirectoryPath: with open(model) as f, open(sentences / "file.txt") as g: diff --git a/pirlib/backends/docker_batch.py b/pirlib/backends/docker_batch.py index ec610c7..ef1ae5b 100644 --- a/pirlib/backends/docker_batch.py +++ b/pirlib/backends/docker_batch.py @@ -126,9 +126,9 @@ def run_node(node, graph_inputs): outputs[out.id] = None events = HandlerV1Event(inputs, outputs) context = HandlerV1Context(node) - handler.setup(context) + handler.setup_handler(context) handler.run_handler(events, context) - handler.teardown(context) + handler.teardown_handler(context) for out in node.outputs: path = f"/mnt/node_outputs/{node.id}/{out.id}" if out.iotype == "DATAFRAME": diff --git a/pirlib/backends/inproc.py b/pirlib/backends/inproc.py index 5783e6d..35ce7dd 100644 --- a/pirlib/backends/inproc.py +++ b/pirlib/backends/inproc.py @@ -92,7 +92,7 @@ def _execute_node(self, node: pirlib.pir.Node, inputs: Dict[str, Any]): outputs[out.id] = None event = HandlerV1Event(inputs, outputs) context = HandlerV1Context(node) - handler.setup(context) + handler.setup_handler(context) handler.run_handler(event, context) - handler.teardown(context) + handler.teardown_handler(context) return outputs diff --git a/pirlib/handlers/v1.py b/pirlib/handlers/v1.py index 61d4223..a1e4177 100644 --- a/pirlib/handlers/v1.py +++ b/pirlib/handlers/v1.py @@ -18,9 +18,6 @@ def get(self, key: str, default: Any = None) -> Any: def reset(self, key: str) -> None: del self.states[key] - def sync_states(self, context) -> None: - self.states.update(context.states) - @dataclass class HandlerV1Event(object): @@ -45,13 +42,13 @@ def run_handler( ) -> None: raise NotImplementedError - def setup( + def setup_handler( self, context: HandlerV1Context, ) -> None: pass - def teardown( + def teardown_handler( self, context: HandlerV1Context, ) -> None: diff --git a/pirlib/task.py b/pirlib/task.py index 673795c..83763c8 100644 --- a/pirlib/task.py +++ b/pirlib/task.py @@ -1,3 +1,4 @@ +from ast import Call import contextvars import copy import functools @@ -30,9 +31,6 @@ def get(self, key: str, default: Any = None) -> Any: def reset(self, key: str) -> None: del self.states[key] - def sync_states(self, handler_context: HandlerV1Context) -> None: - self.states.update(handler_context.states) - def task_context() -> TaskContext: return _TASK_CONTEXT.get() @@ -90,27 +88,15 @@ def __init__( name: Optional[str] = None, config: Optional[dict] = None, framework: Optional[pirlib.pir.Framework] = None, - setup: Optional[str] = None, - teardown: Optional[str] = None, ): self._func = typeguard.typechecked(func) self._name = name if name else getattr(func, "__name__", None) self._config = copy.deepcopy(config) if config else None self._framework = framework + self._setup = None + self._teardown = None + self.ctx_token = None - # Both setup and teardown methods need to come before func - # Otherwise they won't be found in this function - func_module = inspect.getmodule(self._func) - if setup: - if hasattr(func_module, setup): - setattr(self, "setup", getattr(func_module, setup)) - else: - raise ValueError(f"Couldn't find setup function {setup}.") - if teardown: - if hasattr(func_module, teardown): - setattr(self, "teardown", getattr(func_module, teardown)) - else: - raise ValueError(f"Couldn't find teardown function {teardown}.") @property def func(self): @@ -150,8 +136,8 @@ def run_handler( ) -> None: inputs, outputs = event.inputs, event.outputs sig = inspect.signature(self.func) - task_context = TaskContext(context.node.config, None) - task_context.sync_states(context) + task_context = task.context() + task_context.config = context.node.config task_context.output = recurse_hint( lambda name, hint: outputs[name], "return", sig.return_annotation ) @@ -162,12 +148,7 @@ def run_handler( kwargs[param.name] = value else: args.append(value) - token = _TASK_CONTEXT.set(task_context) - try: - return_value = self.func(*args, **kwargs) - context.sync_states(task_context) - finally: - _TASK_CONTEXT.reset(token) + return_value = self.func(*args, **kwargs) recurse_hint( lambda n, h, v: outputs.__setitem__(n, v), "return", @@ -175,6 +156,29 @@ def run_handler( return_value, ) + def setup_handler( + self, + context: HandlerV1Context, + ) -> None: + task_context = TaskContext(None, None) + self.ctx_token = _TASK_CONTEXT.set(task_context) + if self._setup: + self._setup() + + def teardown_handler( + self, + context: HandlerV1Context, + ) -> None: + if self._teardown: + self._teardown() + _TASK_CONTEXT.reset(self.ctx_token) + + def setup(self, func) -> None: + setattr(self, "_setup", func) + + def teardown(self, func) -> None: + setattr(self, "_teardown", func) + def task( func: Optional[Callable] = None, @@ -182,8 +186,6 @@ def task( name: Optional[str] = None, config: Optional[dict] = None, framework: Optional[pirlib.pir.Framework] = None, - setup: str = None, - teardown: str = None, ) -> TaskDefinition: if framework: if config is None: @@ -198,8 +200,6 @@ def wrapper(func) -> TaskDefinition: name=name, config=config, framework=framework, - setup=setup, - teardown=teardown, ) functools.update_wrapper(task_dfn, func) return task_dfn From 1551517b63d2b5fea23e3cdc99fa08de14eaab7b Mon Sep 17 00:00:00 2001 From: "zhanyuan.zhang" Date: Mon, 8 Aug 2022 00:28:59 -0700 Subject: [PATCH 5/9] fixed lint --- pirlib/task.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pirlib/task.py b/pirlib/task.py index 83763c8..92010cb 100644 --- a/pirlib/task.py +++ b/pirlib/task.py @@ -1,4 +1,3 @@ -from ast import Call import contextvars import copy import functools @@ -97,7 +96,6 @@ def __init__( self._teardown = None self.ctx_token = None - @property def func(self): return self._func From d591722a46ac8919fbba6fae6851d53dba9a7c12 Mon Sep 17 00:00:00 2001 From: "zhanyuan.zhang" Date: Mon, 8 Aug 2022 00:34:11 -0700 Subject: [PATCH 6/9] removed unused import and metric report --- example/example.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/example/example.py b/example/example.py index 78466d5..bc3c4b2 100644 --- a/example/example.py +++ b/example/example.py @@ -6,7 +6,6 @@ from pirlib.frameworks.adaptdl import AdaptDL from pirlib.iotypes import DirectoryPath, FilePath -from pirlib.handlers.v1 import HandlerV1Context from pirlib.task import task from pirlib.pipeline import pipeline @@ -24,7 +23,6 @@ def clean(dataset: DirectoryPath) -> DirectoryPath: @task(framework=AdaptDL(min_replicas=1, max_replicas=4)) def train(dataset: DirectoryPath) -> FilePath: task_ctx = task.context() - task_ctx.set("train accuracy", 0.83) with open(dataset / "file.txt") as f: print("train({}, config={})".format(f.read().strip(), task_ctx.config)) outfile = task.context().output From 6395eabd4f610d93bea0567564c1cb6655058515 Mon Sep 17 00:00:00 2001 From: "zhanyuan.zhang" Date: Tue, 9 Aug 2022 11:09:40 -0700 Subject: [PATCH 7/9] changed pipeline decorator mechanism and cleaned up context methods --- example/example.py | 6 +++--- pirlib/handlers/v1.py | 9 --------- pirlib/pipeline.py | 27 ++++++++++++--------------- pirlib/task.py | 10 ---------- 4 files changed, 15 insertions(+), 37 deletions(-) diff --git a/example/example.py b/example/example.py index bc3c4b2..8ff27c8 100644 --- a/example/example.py +++ b/example/example.py @@ -55,7 +55,7 @@ def translate(self, inp: str) -> str: @task def translate(sentences: DirectoryPath) -> DirectoryPath: task_ctx = task.context() - model = task_ctx.get("translate_model") + model = task_ctx.translate_model with open(sentences / "file.txt") as g: inp = g.read().strip() print( @@ -73,14 +73,14 @@ def translate(sentences: DirectoryPath) -> DirectoryPath: @translate.setup def translate_setup() -> None: task_ctx = task.context() - task_ctx.set("translate_model", TranslateModel()) + task_ctx.translate_model = TranslateModel() print(">>> Initialized translation model.") @translate.teardown def translate_teardown() -> None: task_ctx = task.context() - task_ctx.reset("translate_model") + del task_ctx.translate_model print(">>> Cleaned up translation model.") diff --git a/pirlib/handlers/v1.py b/pirlib/handlers/v1.py index a1e4177..aa0c49b 100644 --- a/pirlib/handlers/v1.py +++ b/pirlib/handlers/v1.py @@ -9,15 +9,6 @@ class HandlerV1Context(object): node: Node states: Dict[str, Any] = field(default_factory=dict) - def set(self, key: str, value: Any): - self.states[key] = value - - def get(self, key: str, default: Any = None) -> Any: - return self.states.get(key, default) - - def reset(self, key: str) -> None: - del self.states[key] - @dataclass class HandlerV1Event(object): diff --git a/pirlib/pipeline.py b/pirlib/pipeline.py index 7f439ca..fae7256 100644 --- a/pirlib/pipeline.py +++ b/pirlib/pipeline.py @@ -75,14 +75,6 @@ def config(self): return self._config def __call__(self, *args, **kwargs): - if len(args) == 1 and callable(args[0]) and not kwargs: - wrapper = PipelineDefinition( - func=args[0], - name=self.name, - config=self.config, - ) - functools.update_wrapper(wrapper, args[0]) - return wrapper return self.instance(self.name)(*args, **kwargs) def instance(self, name: str) -> PipelineInstance: @@ -98,10 +90,15 @@ def pipeline( name: Optional[str] = None, config: Optional[dict] = None, ) -> PipelineDefinition: - wrapper = PipelineDefinition( - func=func, - name=name, - config=config, - ) - functools.update_wrapper(wrapper, func) - return wrapper + def wrapper(func) -> PipelineDefinition: + pipeline_dfn = PipelineDefinition( + func=func, + name=name, + config=config, + ) + functools.update_wrapper(pipeline_dfn, func) + return pipeline_dfn + if func: + return wrapper(func) + else: + return wrapper diff --git a/pirlib/task.py b/pirlib/task.py index 92010cb..8260be0 100644 --- a/pirlib/task.py +++ b/pirlib/task.py @@ -19,16 +19,6 @@ class TaskContext: config: Dict[str, Any] output: Any - states: Dict = field(default_factory=dict) - - def set(self, key: str, value: Any) -> None: - self.states[key] = value - - def get(self, key: str, default: Any = None) -> Any: - return self.states.get(key, default) - - def reset(self, key: str) -> None: - del self.states[key] def task_context() -> TaskContext: From 1601280a6b1b37ea923deb8473c02e9376c63a03 Mon Sep 17 00:00:00 2001 From: "zhanyuan.zhang" Date: Tue, 9 Aug 2022 11:20:25 -0700 Subject: [PATCH 8/9] fixed lint --- pirlib/pipeline.py | 1 + pirlib/task.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pirlib/pipeline.py b/pirlib/pipeline.py index fae7256..1a57e64 100644 --- a/pirlib/pipeline.py +++ b/pirlib/pipeline.py @@ -98,6 +98,7 @@ def wrapper(func) -> PipelineDefinition: ) functools.update_wrapper(pipeline_dfn, func) return pipeline_dfn + if func: return wrapper(func) else: diff --git a/pirlib/task.py b/pirlib/task.py index 8260be0..2a4f00e 100644 --- a/pirlib/task.py +++ b/pirlib/task.py @@ -3,7 +3,7 @@ import functools import inspect import typeguard -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import Any, Callable, Dict, Optional import pirlib.pir From 8947ef9d758b7d5998ee8adf6f528c76d22f8a8e Mon Sep 17 00:00:00 2001 From: "zhanyuan.zhang" Date: Thu, 11 Aug 2022 00:11:48 -0700 Subject: [PATCH 9/9] added context log --- example/docker-compose.yml | 34 ++++++++++--------- example/example.py | 39 +++++++++++++--------- example/package_docker.yml | 29 ++++++++++++---- example/package_inproc.yml | 34 +++++++++++++++++-- example/run_inproc.sh | 1 + pirlib/backends/docker_batch.py | 7 +++- pirlib/backends/inproc.py | 8 +++-- pirlib/handlers/v1.py | 10 +++--- pirlib/task.py | 59 ++++++++++++++++++++++----------- pirlib/utils.py | 22 ++++++++++++ tests/test_example.py | 55 ++++++++++++++++++++---------- 11 files changed, 212 insertions(+), 86 deletions(-) diff --git a/example/docker-compose.yml b/example/docker-compose.yml index 43e69cb..35d8e11 100644 --- a/example/docker-compose.yml +++ b/example/docker-compose.yml @@ -17,10 +17,11 @@ services: condition: service_completed_successfully train_pipeline.train: condition: service_completed_successfully - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 volumes: - node_outputs:/mnt/node_outputs - ${INPUT_train_dataset:?err}:/mnt/graph_inputs/train_dataset + - ${INPUT_translate_model:?err}:/mnt/graph_inputs/translate_model - ${INPUT_sentences:?err}:/mnt/graph_inputs/sentences - ${OUTPUT:?err}:/mnt/graph_outputs train_pipeline.clean: @@ -29,9 +30,9 @@ services: - -m - pirlib.backends.docker_batch - node - - gASVWwIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMBWNsZWFulIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBVleGFtcGxlLmV4YW1wbGU6Y2xlYW6UjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6YjhmN2ViYjQtMmI0OS00Zjg3LThjMWEtYWZkYjg1Yjk1NmFjlHVic4wJZnJhbWV3b3JrlE6MBmNvbmZpZ5R9lIwGaW5wdXRzlF2UaACMBUlucHV0lJOUKYGUfZQoaAWMB2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjAZzb3VyY2WUaACMCkRhdGFTb3VyY2WUk5QpgZR9lCiMB25vZGVfaWSUTowLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklE6MDmdyYXBoX2lucHV0X2lklIwNdHJhaW5fZGF0YXNldJR1YowEbWV0YZRoAIwITWV0YWRhdGGUk5QpgZR9lCiMBG5hbWWUjAdkYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJhjAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCGMCURJUkVDVE9SWZRoLWgvKYGUfZQoaDKMBnJldHVybpRoNE51YnViYWgtaC8pgZR9lChoMowFY2xlYW6UaDROdWJ1Yi4= - - gASV2gAAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMCXNlbnRlbmNlc5RoCIwJRElSRUNUT1JZlGgKaAwpgZR9lChoD4wJc2VudGVuY2VzlGgRTnVidWJlLg== - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + - gASVWwIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMBWNsZWFulIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBVleGFtcGxlLmV4YW1wbGU6Y2xlYW6UjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6NjgyYzUyZjAtZDhhZS00M2FhLWI4NmUtNzhmOGU0MDEzM2I5lHVic4wJZnJhbWV3b3JrlE6MBmNvbmZpZ5R9lIwGaW5wdXRzlF2UaACMBUlucHV0lJOUKYGUfZQoaAWMB2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjAZzb3VyY2WUaACMCkRhdGFTb3VyY2WUk5QpgZR9lCiMB25vZGVfaWSUTowLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklE6MDmdyYXBoX2lucHV0X2lklIwNdHJhaW5fZGF0YXNldJR1YowEbWV0YZRoAIwITWV0YWRhdGGUk5QpgZR9lCiMBG5hbWWUjAdkYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJhjAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCGMCURJUkVDVE9SWZRoLWgvKYGUfZQoaDKMBnJldHVybpRoNE51YnViYWgtaC8pgZR9lChoMowFY2xlYW6UaDROdWJ1Yi4= + - gASVJAEAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMD3RyYW5zbGF0ZV9tb2RlbJRoCIwERklMRZRoCmgMKYGUfZQoaA+MD3RyYW5zbGF0ZV9tb2RlbJRoEU51YnViaAMpgZR9lChoBowJc2VudGVuY2VzlGgIjAlESVJFQ1RPUlmUaApoDCmBlH2UKGgPjAlzZW50ZW5jZXOUaBFOdWJ1YmUu + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 volumes: - node_outputs:/mnt/node_outputs - ${INPUT_train_dataset:?err}:/mnt/graph_inputs/train_dataset @@ -41,12 +42,12 @@ services: - -m - pirlib.backends.docker_batch - node - - gASVCAMAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMCGV2YWx1YXRllIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBhleGFtcGxlLmV4YW1wbGU6ZXZhbHVhdGWUjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6YjhmN2ViYjQtMmI0OS00Zjg3LThjMWEtYWZkYjg1Yjk1NmFjlHVic4wJZnJhbWV3b3JrlE6MBmNvbmZpZ5R9lIwGaW5wdXRzlF2UKGgAjAVJbnB1dJSTlCmBlH2UKGgFjBNrd2FyZ3MudGVzdF9kYXRhc2V0lIwGaW90eXBllIwJRElSRUNUT1JZlIwGc291cmNllGgAjApEYXRhU291cmNllJOUKYGUfZQojAdub2RlX2lklE6MC3N1YmdyYXBoX2lklE6MCW91dHB1dF9pZJROjA5ncmFwaF9pbnB1dF9pZJSMCXNlbnRlbmNlc5R1YowEbWV0YZRoAIwITWV0YWRhdGGUk5QpgZR9lCiMBG5hbWWUjBNrd2FyZ3MudGVzdF9kYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJoHSmBlH2UKGgFjBJrd2FyZ3MucHJlZGljdGlvbnOUaCGMCURJUkVDVE9SWZRoI2glKYGUfZQoaCiMGGluZmVyX3BpcGVsaW5lLnNlbnRpbWVudJRoKU5oKowGcmV0dXJulGgrTnViaC1oLymBlH2UKGgyjBJrd2FyZ3MucHJlZGljdGlvbnOUaDROdWJ1YmWMB291dHB1dHOUXZRoAIwGT3V0cHV0lJOUKYGUfZQoaAWMBnJldHVybpRoIYwJREFUQUZSQU1FlGgtaC8pgZR9lChoMowGcmV0dXJulGg0TnVidWJhaC1oLymBlH2UKGgyjAhldmFsdWF0ZZRoNE51YnViLg== - - gASV2gAAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMCXNlbnRlbmNlc5RoCIwJRElSRUNUT1JZlGgKaAwpgZR9lChoD4wJc2VudGVuY2VzlGgRTnVidWJlLg== + - gASVCAMAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMCGV2YWx1YXRllIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBhleGFtcGxlLmV4YW1wbGU6ZXZhbHVhdGWUjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6NjgyYzUyZjAtZDhhZS00M2FhLWI4NmUtNzhmOGU0MDEzM2I5lHVic4wJZnJhbWV3b3JrlE6MBmNvbmZpZ5R9lIwGaW5wdXRzlF2UKGgAjAVJbnB1dJSTlCmBlH2UKGgFjBNrd2FyZ3MudGVzdF9kYXRhc2V0lIwGaW90eXBllIwJRElSRUNUT1JZlIwGc291cmNllGgAjApEYXRhU291cmNllJOUKYGUfZQojAdub2RlX2lklE6MC3N1YmdyYXBoX2lklE6MCW91dHB1dF9pZJROjA5ncmFwaF9pbnB1dF9pZJSMCXNlbnRlbmNlc5R1YowEbWV0YZRoAIwITWV0YWRhdGGUk5QpgZR9lCiMBG5hbWWUjBNrd2FyZ3MudGVzdF9kYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJoHSmBlH2UKGgFjBJrd2FyZ3MucHJlZGljdGlvbnOUaCGMCURJUkVDVE9SWZRoI2glKYGUfZQoaCiMGGluZmVyX3BpcGVsaW5lLnNlbnRpbWVudJRoKU5oKowGcmV0dXJulGgrTnViaC1oLymBlH2UKGgyjBJrd2FyZ3MucHJlZGljdGlvbnOUaDROdWJ1YmWMB291dHB1dHOUXZRoAIwGT3V0cHV0lJOUKYGUfZQoaAWMBnJldHVybpRoIYwJREFUQUZSQU1FlGgtaC8pgZR9lChoMowGcmV0dXJulGg0TnVidWJhaC1oLymBlH2UKGgyjAhldmFsdWF0ZZRoNE51YnViLg== + - gASVJAEAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMD3RyYW5zbGF0ZV9tb2RlbJRoCIwERklMRZRoCmgMKYGUfZQoaA+MD3RyYW5zbGF0ZV9tb2RlbJRoEU51YnViaAMpgZR9lChoBowJc2VudGVuY2VzlGgIjAlESVJFQ1RPUlmUaApoDCmBlH2UKGgPjAlzZW50ZW5jZXOUaBFOdWJ1YmUu depends_on: train_pipeline.infer_pipeline.sentiment: condition: service_completed_successfully - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 volumes: - node_outputs:/mnt/node_outputs - ${INPUT_sentences:?err}:/mnt/graph_inputs/sentences @@ -56,14 +57,14 @@ services: - -m - pirlib.backends.docker_batch - node - - gASV7QIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMGGluZmVyX3BpcGVsaW5lLnNlbnRpbWVudJSMC2VudHJ5cG9pbnRzlH2UjARtYWlulGgAjApFbnRyeXBvaW50lJOUKYGUfZQojAd2ZXJzaW9ulIwCdjGUjAdoYW5kbGVylIwZZXhhbXBsZS5leGFtcGxlOnNlbnRpbWVudJSMB3J1bnRpbWWUjApweXRob246My45lIwHY29kZXVybJROjAVpbWFnZZSMMXBpcmNsaS1idWlsZDpiOGY3ZWJiNC0yYjQ5LTRmODctOGMxYS1hZmRiODViOTU2YWOUdWJzjAlmcmFtZXdvcmuUTowGY29uZmlnlH2UjAZpbnB1dHOUXZQoaACMBUlucHV0lJOUKYGUfZQoaAWMBW1vZGVslIwGaW90eXBllIwERklMRZSMBnNvdXJjZZRoAIwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJSMBXRyYWlulIwLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklIwGcmV0dXJulIwOZ3JhcGhfaW5wdXRfaWSUTnVijARtZXRhlGgAjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMBW1vZGVslIwLYW5ub3RhdGlvbnOUTnVidWJoHSmBlH2UKGgFjAlzZW50ZW5jZXOUaCGMCURJUkVDVE9SWZRoI2glKYGUfZQoaCiMGmluZmVyX3BpcGVsaW5lLnRyYW5zbGF0ZV8xlGgqTmgrjAZyZXR1cm6UaC1OdWJoLmgwKYGUfZQoaDOMCXNlbnRlbmNlc5RoNU51YnViZYwHb3V0cHV0c5RdlGgAjAZPdXRwdXSUk5QpgZR9lChoBYwGcmV0dXJulGghjAlESVJFQ1RPUlmUaC5oMCmBlH2UKGgzjAZyZXR1cm6UaDVOdWJ1YmFoLmgwKYGUfZQoaDOMCXNlbnRpbWVudJRoNU51YnViLg== - - gASV2gAAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMCXNlbnRlbmNlc5RoCIwJRElSRUNUT1JZlGgKaAwpgZR9lChoD4wJc2VudGVuY2VzlGgRTnVidWJlLg== + - gASV7QIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMGGluZmVyX3BpcGVsaW5lLnNlbnRpbWVudJSMC2VudHJ5cG9pbnRzlH2UjARtYWlulGgAjApFbnRyeXBvaW50lJOUKYGUfZQojAd2ZXJzaW9ulIwCdjGUjAdoYW5kbGVylIwZZXhhbXBsZS5leGFtcGxlOnNlbnRpbWVudJSMB3J1bnRpbWWUjApweXRob246My45lIwHY29kZXVybJROjAVpbWFnZZSMMXBpcmNsaS1idWlsZDo2ODJjNTJmMC1kOGFlLTQzYWEtYjg2ZS03OGY4ZTQwMTMzYjmUdWJzjAlmcmFtZXdvcmuUTowGY29uZmlnlH2UjAZpbnB1dHOUXZQoaACMBUlucHV0lJOUKYGUfZQoaAWMBW1vZGVslIwGaW90eXBllIwERklMRZSMBnNvdXJjZZRoAIwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJSMBXRyYWlulIwLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklIwGcmV0dXJulIwOZ3JhcGhfaW5wdXRfaWSUTnVijARtZXRhlGgAjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMBW1vZGVslIwLYW5ub3RhdGlvbnOUTnVidWJoHSmBlH2UKGgFjAlzZW50ZW5jZXOUaCGMCURJUkVDVE9SWZRoI2glKYGUfZQoaCiMGmluZmVyX3BpcGVsaW5lLnRyYW5zbGF0ZV8xlGgqTmgrjAZyZXR1cm6UaC1OdWJoLmgwKYGUfZQoaDOMCXNlbnRlbmNlc5RoNU51YnViZYwHb3V0cHV0c5RdlGgAjAZPdXRwdXSUk5QpgZR9lChoBYwGcmV0dXJulGghjAlESVJFQ1RPUlmUaC5oMCmBlH2UKGgzjAZyZXR1cm6UaDVOdWJ1YmFoLmgwKYGUfZQoaDOMCXNlbnRpbWVudJRoNU51YnViLg== + - gASVJAEAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMD3RyYW5zbGF0ZV9tb2RlbJRoCIwERklMRZRoCmgMKYGUfZQoaA+MD3RyYW5zbGF0ZV9tb2RlbJRoEU51YnViaAMpgZR9lChoBowJc2VudGVuY2VzlGgIjAlESVJFQ1RPUlmUaApoDCmBlH2UKGgPjAlzZW50ZW5jZXOUaBFOdWJ1YmUu depends_on: train_pipeline.infer_pipeline.translate_1: condition: service_completed_successfully train_pipeline.train: condition: service_completed_successfully - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 volumes: - node_outputs:/mnt/node_outputs train_pipeline.infer_pipeline.translate_1: @@ -72,11 +73,12 @@ services: - -m - pirlib.backends.docker_batch - node - - gASViQIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMGmluZmVyX3BpcGVsaW5lLnRyYW5zbGF0ZV8xlIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBlleGFtcGxlLmV4YW1wbGU6dHJhbnNsYXRllIwHcnVudGltZZSMCnB5dGhvbjozLjmUjAdjb2RldXJslE6MBWltYWdllIwxcGlyY2xpLWJ1aWxkOmI4ZjdlYmI0LTJiNDktNGY4Ny04YzFhLWFmZGI4NWI5NTZhY5R1YnOMCWZyYW1ld29ya5ROjAZjb25maWeUfZSMA2tleZSMBXZhbHVllHOMBmlucHV0c5RdlGgAjAVJbnB1dJSTlCmBlH2UKGgFjAlzZW50ZW5jZXOUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjAZzb3VyY2WUaACMCkRhdGFTb3VyY2WUk5QpgZR9lCiMB25vZGVfaWSUTowLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklE6MDmdyYXBoX2lucHV0X2lklIwJc2VudGVuY2VzlHVijARtZXRhlGgAjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMCXNlbnRlbmNlc5SMC2Fubm90YXRpb25zlE51YnViYYwHb3V0cHV0c5RdlGgAjAZPdXRwdXSUk5QpgZR9lChoBYwGcmV0dXJulGgjjAlESVJFQ1RPUlmUaC9oMSmBlH2UKGg0jAZyZXR1cm6UaDZOdWJ1YmFoL2gxKYGUfZQoaDSMC3RyYW5zbGF0ZV8xlGg2TnVidWIu - - gASV2gAAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMCXNlbnRlbmNlc5RoCIwJRElSRUNUT1JZlGgKaAwpgZR9lChoD4wJc2VudGVuY2VzlGgRTnVidWJlLg== - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + - gASV5QIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMGmluZmVyX3BpcGVsaW5lLnRyYW5zbGF0ZV8xlIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBlleGFtcGxlLmV4YW1wbGU6dHJhbnNsYXRllIwHcnVudGltZZSMCnB5dGhvbjozLjmUjAdjb2RldXJslE6MBWltYWdllIwxcGlyY2xpLWJ1aWxkOjY4MmM1MmYwLWQ4YWUtNDNhYS1iODZlLTc4ZjhlNDAxMzNiOZR1YnOMCWZyYW1ld29ya5ROjAZjb25maWeUfZSMA2tleZSMBXZhbHVllHOMBmlucHV0c5RdlChoAIwFSW5wdXSUk5QpgZR9lChoBYwGYXJncy4wlIwGaW90eXBllIwERklMRZSMBnNvdXJjZZRoAIwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJROjAtzdWJncmFwaF9pZJROjAlvdXRwdXRfaWSUTowOZ3JhcGhfaW5wdXRfaWSUjA90cmFuc2xhdGVfbW9kZWyUdWKMBG1ldGGUaACMCE1ldGFkYXRhlJOUKYGUfZQojARuYW1llIwGYXJncy4wlIwLYW5ub3RhdGlvbnOUTnVidWJoHymBlH2UKGgFjAZhcmdzLjGUaCOMCURJUkVDVE9SWZRoJWgnKYGUfZQoaCpOaCtOaCxOaC2MCXNlbnRlbmNlc5R1YmgvaDEpgZR9lChoNIwGYXJncy4xlGg2TnVidWJljAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCOMCURJUkVDVE9SWZRoL2gxKYGUfZQoaDSMBnJldHVybpRoNk51YnViYWgvaDEpgZR9lChoNIwLdHJhbnNsYXRlXzGUaDZOdWJ1Yi4= + - gASVJAEAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMD3RyYW5zbGF0ZV9tb2RlbJRoCIwERklMRZRoCmgMKYGUfZQoaA+MD3RyYW5zbGF0ZV9tb2RlbJRoEU51YnViaAMpgZR9lChoBowJc2VudGVuY2VzlGgIjAlESVJFQ1RPUlmUaApoDCmBlH2UKGgPjAlzZW50ZW5jZXOUaBFOdWJ1YmUu + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 volumes: - node_outputs:/mnt/node_outputs + - ${INPUT_translate_model:?err}:/mnt/graph_inputs/translate_model - ${INPUT_sentences:?err}:/mnt/graph_inputs/sentences train_pipeline.train: command: @@ -84,12 +86,12 @@ services: - -m - pirlib.backends.docker_batch - node - - gASVswIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMBXRyYWlulIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBVleGFtcGxlLmV4YW1wbGU6dHJhaW6UjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6YjhmN2ViYjQtMmI0OS00Zjg3LThjMWEtYWZkYjg1Yjk1NmFjlHVic4wJZnJhbWV3b3JrlGgAjAlGcmFtZXdvcmuUk5QpgZR9lCiMBG5hbWWUjAdhZGFwdGRslGgOTowGY29uZmlnlE51YmgefZQojBRhZGFwdGRsL21pbl9yZXBsaWNhc5RLAYwUYWRhcHRkbC9tYXhfcmVwbGljYXOUSwR1jAZpbnB1dHOUXZRoAIwFSW5wdXSUk5QpgZR9lChoBYwHZGF0YXNldJSMBmlvdHlwZZSMCURJUkVDVE9SWZSMBnNvdXJjZZRoAIwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJSMBWNsZWFulIwLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklIwGcmV0dXJulIwOZ3JhcGhfaW5wdXRfaWSUTnVijARtZXRhlGgAjAhNZXRhZGF0YZSTlCmBlH2UKGgcjAdkYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJhjAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCmMBEZJTEWUaDZoOCmBlH2UKGgcjAZyZXR1cm6UaDxOdWJ1YmFoNmg4KYGUfZQoaByMBXRyYWlulGg8TnVidWIu - - gASV2gAAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMCXNlbnRlbmNlc5RoCIwJRElSRUNUT1JZlGgKaAwpgZR9lChoD4wJc2VudGVuY2VzlGgRTnVidWJlLg== + - gASVswIAAAAAAACMCnBpcmxpYi5waXKUjAROb2RllJOUKYGUfZQojAJpZJSMBXRyYWlulIwLZW50cnlwb2ludHOUfZSMBG1haW6UaACMCkVudHJ5cG9pbnSUk5QpgZR9lCiMB3ZlcnNpb26UjAJ2MZSMB2hhbmRsZXKUjBVleGFtcGxlLmV4YW1wbGU6dHJhaW6UjAdydW50aW1llIwKcHl0aG9uOjMuOZSMB2NvZGV1cmyUTowFaW1hZ2WUjDFwaXJjbGktYnVpbGQ6NjgyYzUyZjAtZDhhZS00M2FhLWI4NmUtNzhmOGU0MDEzM2I5lHVic4wJZnJhbWV3b3JrlGgAjAlGcmFtZXdvcmuUk5QpgZR9lCiMBG5hbWWUjAdhZGFwdGRslGgOTowGY29uZmlnlE51YmgefZQojBRhZGFwdGRsL21pbl9yZXBsaWNhc5RLAYwUYWRhcHRkbC9tYXhfcmVwbGljYXOUSwR1jAZpbnB1dHOUXZRoAIwFSW5wdXSUk5QpgZR9lChoBYwHZGF0YXNldJSMBmlvdHlwZZSMCURJUkVDVE9SWZSMBnNvdXJjZZRoAIwKRGF0YVNvdXJjZZSTlCmBlH2UKIwHbm9kZV9pZJSMBWNsZWFulIwLc3ViZ3JhcGhfaWSUTowJb3V0cHV0X2lklIwGcmV0dXJulIwOZ3JhcGhfaW5wdXRfaWSUTnVijARtZXRhlGgAjAhNZXRhZGF0YZSTlCmBlH2UKGgcjAdkYXRhc2V0lIwLYW5ub3RhdGlvbnOUTnVidWJhjAdvdXRwdXRzlF2UaACMBk91dHB1dJSTlCmBlH2UKGgFjAZyZXR1cm6UaCmMBEZJTEWUaDZoOCmBlH2UKGgcjAZyZXR1cm6UaDxOdWJ1YmFoNmg4KYGUfZQoaByMBXRyYWlulGg8TnVidWIu + - gASVJAEAAAAAAABdlCiMCnBpcmxpYi5waXKUjApHcmFwaElucHV0lJOUKYGUfZQojAJpZJSMDXRyYWluX2RhdGFzZXSUjAZpb3R5cGWUjAlESVJFQ1RPUlmUjARtZXRhlGgBjAhNZXRhZGF0YZSTlCmBlH2UKIwEbmFtZZSMDXRyYWluX2RhdGFzZXSUjAthbm5vdGF0aW9uc5ROdWJ1YmgDKYGUfZQoaAaMD3RyYW5zbGF0ZV9tb2RlbJRoCIwERklMRZRoCmgMKYGUfZQoaA+MD3RyYW5zbGF0ZV9tb2RlbJRoEU51YnViaAMpgZR9lChoBowJc2VudGVuY2VzlGgIjAlESVJFQ1RPUlmUaApoDCmBlH2UKGgPjAlzZW50ZW5jZXOUaBFOdWJ1YmUu depends_on: train_pipeline.clean: condition: service_completed_successfully - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 volumes: - node_outputs:/mnt/node_outputs version: '3.9' diff --git a/example/example.py b/example/example.py index 8ff27c8..0a6b774 100644 --- a/example/example.py +++ b/example/example.py @@ -14,7 +14,7 @@ def clean(dataset: DirectoryPath) -> DirectoryPath: with open(dataset / "file.txt") as f: print("clean({})".format(f.read().strip())) - outdir = task.context().output + outdir = clean.context.output with open(outdir / "file.txt", "w") as f: f.write("clean_result") return outdir @@ -22,12 +22,13 @@ def clean(dataset: DirectoryPath) -> DirectoryPath: @task(framework=AdaptDL(min_replicas=1, max_replicas=4)) def train(dataset: DirectoryPath) -> FilePath: - task_ctx = task.context() + task_ctx = train.context with open(dataset / "file.txt") as f: print("train({}, config={})".format(f.read().strip(), task_ctx.config)) - outfile = task.context().output + outfile = task_ctx.output with open(outfile, "w") as f: f.write("train_result") + task_ctx.logger.info(">>> Train accuracy: 0.83") return outfile @@ -53,13 +54,15 @@ def translate(self, inp: str) -> str: @task -def translate(sentences: DirectoryPath) -> DirectoryPath: - task_ctx = task.context() +def translate(args: Tuple[FilePath, DirectoryPath]) -> DirectoryPath: + model_path, sentences = args + task_ctx = translate.context model = task_ctx.translate_model - with open(sentences / "file.txt") as g: + with open(model_path) as f, open(sentences / "file.txt") as g: inp = g.read().strip() print( - "translate({}, config={})".format( + "translate({}, {}, config={})".format( + f.read().strip(), inp, task_ctx.config) ) @@ -72,42 +75,44 @@ def translate(sentences: DirectoryPath) -> DirectoryPath: @translate.setup def translate_setup() -> None: - task_ctx = task.context() + task_ctx = translate.context task_ctx.translate_model = TranslateModel() - print(">>> Initialized translation model.") + task_ctx.logger.info(">>> Initialized translation model.") @translate.teardown def translate_teardown() -> None: - task_ctx = task.context() + task_ctx = translate.context del task_ctx.translate_model - print(">>> Cleaned up translation model.") + task_ctx.logger.info(">>> Cleaned up translation model.") @task def sentiment(model: FilePath, sentences: DirectoryPath) -> DirectoryPath: with open(model) as f, open(sentences / "file.txt") as g: print("sentiment({}, {})".format(f.read().strip(), g.read().strip())) - outdir = task.context().output + outdir = sentiment.context.output with open(outdir / "file.txt", "w") as f: f.write("sentiment_result") return outdir @pipeline -def infer_pipeline(sentiment_model: FilePath, +def infer_pipeline(translate_model: FilePath, + sentiment_model: FilePath, sentences: DirectoryPath) -> DirectoryPath: translate_1 = translate.instance("translate_1") translate_1.config["key"] = "value" - return sentiment(sentiment_model, translate_1(sentences)) + return sentiment(sentiment_model, translate_1((translate_model, sentences))) @pipeline def train_pipeline( train_dataset: DirectoryPath, + translate_model: FilePath, sentences: DirectoryPath) -> Tuple[FilePath, pandas.DataFrame]: sentiment_model = train(clean(train_dataset)) - sentiment = infer_pipeline(sentiment_model, sentences) + sentiment = infer_pipeline(translate_model, sentiment_model, sentences) eval_input = {"test_dataset": sentences, "predictions": sentiment} return sentiment_model, evaluate(eval_input) @@ -117,13 +122,17 @@ def train_pipeline( print(yaml.dump(asdict(package), sort_keys=False)) # Prepare inputs. dir_1 = tempfile.TemporaryDirectory() + file_2 = tempfile.NamedTemporaryFile() dir_3 = tempfile.TemporaryDirectory() with open(f"{dir_1.name}/file.txt", "w") as f: f.write("train_dataset") + with open(f"{file_2.name}", "w") as f: + f.write("translate_model") with open(f"{dir_3.name}/file.txt", "w") as f: f.write("sentences") # Test calling end-to-end pipeline. model_path, metrics = train_pipeline(DirectoryPath(dir_1.name), + FilePath(file_2.name), DirectoryPath(dir_3.name)) with open(model_path) as f: print("pipeline model: {}".format(f.read().strip())) diff --git a/example/package_docker.yml b/example/package_docker.yml index 8ec29ed..d222453 100644 --- a/example/package_docker.yml +++ b/example/package_docker.yml @@ -8,7 +8,7 @@ graphs: handler: example.example:clean runtime: python:3.9 codeurl: null - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 framework: null config: {} inputs: @@ -38,7 +38,7 @@ graphs: handler: example.example:train runtime: python:3.9 codeurl: null - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 framework: name: adaptdl version: null @@ -72,7 +72,7 @@ graphs: handler: example.example:evaluate runtime: python:3.9 codeurl: null - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 framework: null config: {} inputs: @@ -112,12 +112,22 @@ graphs: handler: example.example:translate runtime: python:3.9 codeurl: null - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 framework: null config: key: value inputs: - - id: sentences + - id: args.0 + iotype: FILE + source: + node_id: null + subgraph_id: null + output_id: null + graph_input_id: translate_model + meta: + name: args.0 + annotations: null + - id: args.1 iotype: DIRECTORY source: node_id: null @@ -125,7 +135,7 @@ graphs: output_id: null graph_input_id: sentences meta: - name: sentences + name: args.1 annotations: null outputs: - id: return @@ -143,7 +153,7 @@ graphs: handler: example.example:sentiment runtime: python:3.9 codeurl: null - image: pircli-build:b8f7ebb4-2b49-4f87-8c1a-afdb85b956ac + image: pircli-build:682c52f0-d8ae-43aa-b86e-78f8e40133b9 framework: null config: {} inputs: @@ -183,6 +193,11 @@ graphs: meta: name: train_dataset annotations: null + - id: translate_model + iotype: FILE + meta: + name: translate_model + annotations: null - id: sentences iotype: DIRECTORY meta: diff --git a/example/package_inproc.yml b/example/package_inproc.yml index 6a3163d..4ca7c87 100644 --- a/example/package_inproc.yml +++ b/example/package_inproc.yml @@ -13,7 +13,17 @@ graphs: config: key: value inputs: - - id: sentences + - id: args.0 + iotype: FILE + source: + node_id: null + subgraph_id: null + output_id: null + graph_input_id: translate_model + meta: + name: args.0 + annotations: null + - id: args.1 iotype: DIRECTORY source: node_id: null @@ -21,7 +31,7 @@ graphs: output_id: null graph_input_id: sentences meta: - name: sentences + name: args.1 annotations: null outputs: - id: return @@ -74,6 +84,11 @@ graphs: annotations: null subgraphs: [] inputs: + - id: translate_model + iotype: FILE + meta: + name: translate_model + annotations: null - id: sentiment_model iotype: FILE meta: @@ -209,6 +224,16 @@ graphs: graph_id: infer_pipeline config: {} inputs: + - id: translate_model + iotype: FILE + source: + node_id: null + subgraph_id: null + output_id: null + graph_input_id: translate_model + meta: + name: translate_model + annotations: null - id: sentiment_model iotype: FILE source: @@ -244,6 +269,11 @@ graphs: meta: name: train_dataset annotations: null + - id: translate_model + iotype: FILE + meta: + name: translate_model + annotations: null - id: sentences iotype: DIRECTORY meta: diff --git a/example/run_inproc.sh b/example/run_inproc.sh index c306131..570c981 100644 --- a/example/run_inproc.sh +++ b/example/run_inproc.sh @@ -10,6 +10,7 @@ PYTHONPATH=$ROOTDIR $ROOTDIR/bin/pircli execute \ $EXAMPLEDIR/package_inproc.yml train_pipeline \ --target pirlib.backends.inproc:InprocBackend \ --input train_dataset=$EXAMPLEDIR/inputs/train_dataset \ + --input translate_model=$EXAMPLEDIR/inputs/translate_model.txt \ --input sentences=$EXAMPLEDIR/inputs/sentences \ --output return.0=$EXAMPLEDIR/outputs/return.0 \ --output return.1=$EXAMPLEDIR/outputs/return.1 diff --git a/pirlib/backends/docker_batch.py b/pirlib/backends/docker_batch.py index ef1ae5b..26d4b4b 100644 --- a/pirlib/backends/docker_batch.py +++ b/pirlib/backends/docker_batch.py @@ -3,11 +3,13 @@ import pickle import sys import yaml +from dataclasses import asdict from typing import Optional import pirlib.pir from pirlib.backends import Backend from pirlib.handlers.v1 import HandlerV1Context, HandlerV1Event +from pirlib.utils import get_logger def encode(x): @@ -125,7 +127,10 @@ def run_node(node, graph_inputs): else: outputs[out.id] = None events = HandlerV1Event(inputs, outputs) - context = HandlerV1Context(node) + context = HandlerV1Context( + asdict(node), + get_logger(node.id), + ) handler.setup_handler(context) handler.run_handler(events, context) handler.teardown_handler(context) diff --git a/pirlib/backends/inproc.py b/pirlib/backends/inproc.py index 35ce7dd..f766765 100644 --- a/pirlib/backends/inproc.py +++ b/pirlib/backends/inproc.py @@ -2,6 +2,7 @@ import importlib import pandas import tempfile +from dataclasses import asdict from typing import Any, Dict, Optional import pirlib.pir @@ -9,7 +10,7 @@ from pirlib.backends import Backend from pirlib.handlers.v1 import HandlerV1Context, HandlerV1Event from pirlib.iotypes import DirectoryPath, FilePath -from pirlib.utils import find_by_id +from pirlib.utils import find_by_id, get_logger class InprocBackend(Backend): @@ -91,7 +92,10 @@ def _execute_node(self, node: pirlib.pir.Node, inputs: Dict[str, Any]): else: outputs[out.id] = None event = HandlerV1Event(inputs, outputs) - context = HandlerV1Context(node) + context = HandlerV1Context( + asdict(node), + get_logger(node.id), + ) handler.setup_handler(context) handler.run_handler(event, context) handler.teardown_handler(context) diff --git a/pirlib/handlers/v1.py b/pirlib/handlers/v1.py index aa0c49b..627e456 100644 --- a/pirlib/handlers/v1.py +++ b/pirlib/handlers/v1.py @@ -1,13 +1,13 @@ from abc import abstractmethod -from dataclasses import dataclass, field -from typing import Any, Dict -from pirlib.pir import Node +from dataclasses import dataclass +from logging import Logger +from typing import Any, Dict, Optional @dataclass class HandlerV1Context(object): - node: Node - states: Dict[str, Any] = field(default_factory=dict) + node: Dict[str, Any] + logger: Optional[Logger] = None @dataclass diff --git a/pirlib/task.py b/pirlib/task.py index 2a4f00e..34fc0d7 100644 --- a/pirlib/task.py +++ b/pirlib/task.py @@ -1,9 +1,9 @@ -import contextvars import copy import functools import inspect import typeguard from dataclasses import dataclass +from logging import Logger from typing import Any, Callable, Dict, Optional import pirlib.pir @@ -12,17 +12,23 @@ from pirlib.package import recurse_hint, task_call, package_task -_TASK_CONTEXT = contextvars.ContextVar("_TASK_CONTEXT") - - -@dataclass +@dataclass(init=False) class TaskContext: config: Dict[str, Any] output: Any + node_ctx: HandlerV1Context + logger: Optional[Logger] = None - -def task_context() -> TaskContext: - return _TASK_CONTEXT.get() + def __init__( + self, + config: Dict[str, Any], + output: Any, + node_ctx: HandlerV1Context, + ): + self.config = config + self.output = output + self.node_ctx = node_ctx + self.logger = node_ctx.logger class TaskInstance(object): @@ -84,7 +90,7 @@ def __init__( self._framework = framework self._setup = None self._teardown = None - self.ctx_token = None + self.context = None @property def func(self): @@ -124,11 +130,22 @@ def run_handler( ) -> None: inputs, outputs = event.inputs, event.outputs sig = inspect.signature(self.func) - task_context = task.context() - task_context.config = context.node.config - task_context.output = recurse_hint( - lambda name, hint: outputs[name], "return", sig.return_annotation - ) + if self.context is None: + # If setup_handler is not called, + # need to initialize task context here. + self.context = TaskContext( + context.node["config"], + recurse_hint( + lambda name, hint: outputs[name], + "return", + sig.return_annotation, + ), + context, + ) + else: + self.context.output = recurse_hint( + lambda name, hint: outputs[name], "return", sig.return_annotation + ) args, kwargs = [], {} for param in sig.parameters.values(): value = recurse_hint(lambda name, hint: inputs[name], param.name, param.annotation) @@ -148,8 +165,12 @@ def setup_handler( self, context: HandlerV1Context, ) -> None: - task_context = TaskContext(None, None) - self.ctx_token = _TASK_CONTEXT.set(task_context) + # Initialize task context + self.context = TaskContext( + context.node["config"], + None, + context, + ) if self._setup: self._setup() @@ -159,7 +180,8 @@ def teardown_handler( ) -> None: if self._teardown: self._teardown() - _TASK_CONTEXT.reset(self.ctx_token) + # Reset task context + self.context = None def setup(self, func) -> None: setattr(self, "_setup", func) @@ -196,6 +218,3 @@ def wrapper(func) -> TaskDefinition: return wrapper(func) else: return wrapper - - -task.context = task_context diff --git a/pirlib/utils.py b/pirlib/utils.py index aaa997c..d3aa0a7 100644 --- a/pirlib/utils.py +++ b/pirlib/utils.py @@ -1,4 +1,26 @@ +import logging +import sys + + def find_by_id(iterable, id): for item in iterable: if item.id == id: return item + + +FORMATTER = logging.Formatter("%(asctime)s — %(name)s — %(levelname)s — %(message)s") + + +def get_console_handler(): + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(FORMATTER) + return console_handler + + +def get_logger(logger_name, logging_level=logging.INFO): + logger = logging.getLogger(logger_name) + logger.setLevel(logging_level) + if not logger.hasHandlers(): + logger.addHandler(get_console_handler()) + logger.propagate = False + return logger diff --git a/tests/test_example.py b/tests/test_example.py index e44a8eb..bcee3e2 100644 --- a/tests/test_example.py +++ b/tests/test_example.py @@ -12,7 +12,7 @@ def clean(dataset: DirectoryPath) -> DirectoryPath: with open(dataset / "file.txt") as f: print("clean({})".format(f.read().strip())) - outdir = task.context().output + outdir = clean.context.output with open(outdir / "file.txt", "w") as f: f.write("clean_result") return outdir @@ -20,11 +20,13 @@ def clean(dataset: DirectoryPath) -> DirectoryPath: @task(framework=AdaptDL(min_replicas=1, max_replicas=4)) def train(dataset: DirectoryPath) -> FilePath: + task_ctx = train.context with open(dataset / "file.txt") as f: - print("train({})".format(f.read().strip())) - outfile = task.context().output + print("train({}, config={})".format(f.read().strip(), task_ctx.config)) + outfile = task_ctx.output with open(outfile, "w") as f: f.write("train_result") + task_ctx.logger.info(">>> Train accuracy: 0.83") return outfile @@ -43,25 +45,46 @@ def evaluate(kwargs: EvaluateInput) -> pandas.DataFrame: return df +class TranslateModel(object): + def translate(self, inp: str) -> str: + output = f"translation: {inp}" + return output + + @task def translate(args: Tuple[FilePath, DirectoryPath]) -> DirectoryPath: - model, sentences = args - opctx = task.context() - with open(model) as f, open(sentences / "file.txt") as g: - print( - "translate({}, {}, config={})".format(f.read().strip(), g.read().strip(), opctx.config) - ) - outdir = opctx.output + model_path, sentences = args + task_ctx = translate.context + model = task_ctx.translate_model + with open(model_path) as f, open(sentences / "file.txt") as g: + inp = g.read().strip() + print("translate({}, {}, config={})".format(f.read().strip(), inp, task_ctx.config)) + translate_result = model.translate(inp) + outdir = task_ctx.output with open(outdir / "file.txt", "w") as f: - f.write("translate_result") + f.write(translate_result) return outdir +@translate.setup +def translate_setup() -> None: + task_ctx = translate.context + task_ctx.translate_model = TranslateModel() + task_ctx.logger.info(">>> Initialized translation model.") + + +@translate.teardown +def translate_teardown() -> None: + task_ctx = translate.context + del task_ctx.translate_model + task_ctx.logger.info(">>> Cleaned up translation model.") + + @task def sentiment(model: FilePath, sentences: DirectoryPath) -> DirectoryPath: with open(model) as f, open(sentences / "file.txt") as g: print("sentiment({}, {})".format(f.read().strip(), g.read().strip())) - outdir = task.context().output + outdir = sentiment.context.output with open(outdir / "file.txt", "w") as f: f.write("sentiment_result") return outdir @@ -69,9 +92,7 @@ def sentiment(model: FilePath, sentences: DirectoryPath) -> DirectoryPath: @pipeline def infer_pipeline( - translate_model: FilePath, - sentiment_model: FilePath, - sentences: DirectoryPath, + translate_model: FilePath, sentiment_model: FilePath, sentences: DirectoryPath ) -> DirectoryPath: translate_1 = translate.instance("translate_1") translate_1.config["key"] = "value" @@ -80,9 +101,7 @@ def infer_pipeline( @pipeline def train_pipeline( - train_dataset: DirectoryPath, - translate_model: FilePath, - sentences: DirectoryPath, + train_dataset: DirectoryPath, translate_model: FilePath, sentences: DirectoryPath ) -> Tuple[FilePath, pandas.DataFrame]: sentiment_model = train(clean(train_dataset)) sentiment = infer_pipeline(translate_model, sentiment_model, sentences)