diff --git a/python/lib/client/dmod/client/__main__.py b/python/lib/client/dmod/client/__main__.py index deb1c8542..da72481dd 100644 --- a/python/lib/client/dmod/client/__main__.py +++ b/python/lib/client/dmod/client/__main__.py @@ -5,9 +5,10 @@ from . import name as package_name from .dmod_client import YamlClientConfig, DmodClient from dmod.communication.client import get_or_create_eventloop -from dmod.core.meta_data import ContinuousRestriction, DataCategory, DataDomain, DataFormat, DiscreteRestriction +from dmod.core.meta_data import ContinuousRestriction, DataCategory, DataDomain, DataFormat, DiscreteRestriction, \ + TimeRange from pathlib import Path -from typing import List, Optional, Tuple +from typing import Any, List, Optional, Tuple DEFAULT_CLIENT_CONFIG_BASENAME = '.dmod_client_config.yml' @@ -19,6 +20,52 @@ class DmodCliArgumentError(ValueError): pass +def _create_ngen_based_exec_parser(subcommand_container: Any, parser_name: str, + default_alloc_paradigm: AllocationParadigm) -> argparse.ArgumentParser: + """ + Helper function to create a nested parser under the ``exec`` command for different NextGen-related workflows. + + Parameters + ---------- + subcommand_container + The ``workflow`` subcommand "special action object" created by ::method:`ArgumentParser.add_subparsers`, which + is a child of the ``exec`` parser, and to which the new nested parser is to be added. + parser_name : str + The name to give to the new parser to be added. + default_alloc_paradigm : AllocationParadigm + The default ::class:`AllocationParadigm` value to use when adding the ``--allocation-paradigm`` argument to the + parser. + + Returns + ------- + argparse.ArgumentParser + The newly created and associated subparser. + """ + new_parser = subcommand_container.add_parser(parser_name) + new_parser.add_argument('--partition-config-data-id', dest='partition_cfg_data_id', default=None, + help='Provide data_id for desired partition config dataset.') + new_parser.add_argument('--allocation-paradigm', + dest='allocation_paradigm', + type=AllocationParadigm.get_from_name, + choices=[val.name.lower() for val in AllocationParadigm], + default=default_alloc_paradigm, + help='Specify job resource allocation paradigm to use.') + new_parser.add_argument('--catchment-ids', dest='catchments', nargs='+', help='Specify catchment subset.') + + date_format = DataDomain.get_datetime_str_format() + print_date_format = 'YYYY-mm-dd HH:MM:SS' + + new_parser.add_argument('time_range', type=TimeRange.parse_from_string, + help='Model time range ({} to {})'.format(print_date_format, print_date_format)) + new_parser.add_argument('hydrofabric_data_id', help='Identifier of dataset of required hydrofabric') + new_parser.add_argument('hydrofabric_uid', help='Unique identifier of required hydrofabric') + new_parser.add_argument('config_data_id', help='Identifier of dataset of required realization config') + new_parser.add_argument('bmi_cfg_data_id', help='Identifier of dataset of required BMI init configs') + new_parser.add_argument('cpu_count', type=int, help='Provide the desired number of processes for the execution') + + return new_parser + + def _handle_config_command_args(parent_subparsers_container): """ Handle setup of arg parsing for 'config' command, which allows for various operations related to config. @@ -43,6 +90,10 @@ def _handle_exec_command_args(parent_subparsers_container): parent_subparsers_container The top-level parent container for subparsers of various commands, including the 'exec' command, to which some numbers of nested subparser containers and parsers will be added. + + See Also + ---------- + _create_ngen_based_exec_subparser """ # A parser for the 'exec' command itself, underneath the parent 'command' subparsers container command_parser = parent_subparsers_container.add_parser('exec') @@ -52,33 +103,66 @@ def _handle_exec_command_args(parent_subparsers_container): workflow_subparsers.required = True # Nested parser for the 'ngen' action - parser_ngen = workflow_subparsers.add_parser('ngen') - - parser_ngen.add_argument('--partition-config-data-id', dest='partition_cfg_data_id', default=None, - help='Provide data_id for desired partition config dataset.') - parser_ngen.add_argument('--allocation-paradigm', - dest='allocation_paradigm', - type=AllocationParadigm.get_from_name, - choices=[val.name.lower() for val in AllocationParadigm], - default=AllocationParadigm.get_default_selection(), - help='Specify job resource allocation paradigm to use.') - parser_ngen.add_argument('--catchment-ids', dest='cat_ids', nargs='+', help='Specify catchment subset.') - date_format = DataDomain.get_datetime_str_format() - printable_date_format = 'YYYY-mm-dd HH:MM:SS' + parser_ngen = _create_ngen_based_exec_parser(subcommand_container=workflow_subparsers, parser_name='ngen', + default_alloc_paradigm=AllocationParadigm.get_default_selection()) + + # TODO: default alloc paradigm needs to be GROUPED_SINGLE_NODE once that has been approved and added + # Nested parser for the 'ngen_cal' action, which is very similar to the 'ngen' parser + parser_ngen_cal = _create_ngen_based_exec_parser(subcommand_container=workflow_subparsers, parser_name='ngen_cal', + default_alloc_paradigm=AllocationParadigm.get_default_selection()) - def date_parser(date_time_str: str) -> datetime.datetime: + # Calibration parser needs a few more calibration-specific items + def positive_int(arg_val: str): try: - return datetime.datetime.strptime(date_time_str, date_format) + arg_as_int = int(arg_val) except ValueError: - raise argparse.ArgumentTypeError("Not a valid date: {}".format(date_time_str)) + raise argparse.ArgumentTypeError("Non-integer value '%s' provided when positive integer expected" % arg_val) + if arg_as_int <= 0: + raise argparse.ArgumentTypeError("Invalid value '%s': expected integer greater than 0" % arg_val) + return arg_as_int - parser_ngen.add_argument('start', type=date_parser, help='Model start date and time ({})'.format(printable_date_format)) - parser_ngen.add_argument('end', type=date_parser, help='Model end date and time ({})'.format(printable_date_format)) - parser_ngen.add_argument('hydrofabric_data_id', help='Identifier of dataset of required hydrofabric') - parser_ngen.add_argument('hydrofabric_uid', help='Unique identifier of required hydrofabric') - parser_ngen.add_argument('realization_cfg_data_id', help='Identifier of dataset of required realization config') - parser_ngen.add_argument('bmi_cfg_data_id', help='Identifier of dataset of required BMI init configs') - parser_ngen.add_argument('cpu_count', type=int, help='Provide the desired number of processes for the execution') + def model_calibration_param(arg_val: str): + split_arg = arg_val.split(',') + try: + if len(split_arg) != 4: + raise RuntimeError + # Support float args in any order by sorting, since min, max, and other/init will always be self-evident + float_values = sorted([float(split_arg[i]) for i in [1, 2, 3]]) + # Return is (param, (min, max, init)) + return split_arg[0], (float_values[0], float_values[2], float_values[1]) + except: + raise argparse.ArgumentTypeError("Invalid arg '%s'; format must be ,,," % arg_val) + + parser_ngen_cal.add_argument('--calibrated-param', dest='model_cal_params', type=model_calibration_param, + nargs='+', metavar='PARAM_NAME,MIN_VAL,MAX_VAL,INIT_VAL', + help='Description of parameters to calibrate, as comma delimited string') + + parser_ngen_cal.add_argument('--job-name', default=None, dest='job_name', help='Optional job name.') + # TODO (later): add more choices once available + parser_ngen_cal.add_argument('--strategy', default='estimation', dest='cal_strategy_type', + choices=['estimation'], help='The ngen_cal calibration strategy.') + # TODO (later): need to add other supported algorithms (there should be a few more now) + parser_ngen_cal.add_argument('--algorithm', type=str, default='dds', dest='cal_strategy_algorithm', + choices=['dds'], help='The ngen_cal parameter search algorithm.') + parser_ngen_cal.add_argument('--objective-function', default='nnse', dest='cal_strategy_objective_func', + choices=["kling_gupta", "nnse", "custom", "single_peak", "volume"], + help='The ngen_cal objective function.') + parser_ngen_cal.add_argument('--is-objective-func-minimized', type=bool, default=True, + dest='is_objective_func_minimized', + help='Whether the target of objective function is minimized or maximized.') + parser_ngen_cal.add_argument('--iterations', type=positive_int, default=100, dest='iterations', + help='The number of ngen_cal iterations.') + # TODO (later): in the future, figure out how to best handle this kind of scenario + #parser_ngen_cal.add_argument('--is-restart', action='store_true', dest='is_restart', + # help='Whether this is restarting a previous job.') + #ngen calibration strategies include + #uniform: Each catchment shares the same parameter space, evaluates at one observable nexus + #independet: Each catchment upstream of observable nexus gets its own permuated parameter space, evalutates at one observable nexus + #explicit: only calibrates basins in the realization_config with a "calibration" definition and an observable nexus + # TODO: add this kind of information to the help message + parser_ngen_cal.add_argument('--model-strategy', default='uniform', dest='model_strategy', + choices=["uniform", "independent", "explicit"], + help='The model calibration strategy used by ngen_cal.') def _handle_dataset_command_args(parent_subparsers_container): @@ -449,9 +533,13 @@ def execute_jobs_command(args, client: DmodClient): def execute_workflow_command(args, client: DmodClient): async_loop = get_or_create_eventloop() + # TODO: aaraney if args.workflow == 'ngen': result = async_loop.run_until_complete(client.submit_ngen_request(**(vars(args)))) print(result) + elif args.workflow == "ngen_cal": + result = async_loop.run_until_complete(client.submit_ngen_cal_request(**(vars(args)))) + print(result) else: print("ERROR: Unsupported execution workflow {}".format(args.workflow)) exit(1) diff --git a/python/lib/client/dmod/client/dmod_client.py b/python/lib/client/dmod/client/dmod_client.py index 82ab83274..8fd2134b5 100644 --- a/python/lib/client/dmod/client/dmod_client.py +++ b/python/lib/client/dmod/client/dmod_client.py @@ -1,6 +1,6 @@ from dmod.core.execution import AllocationParadigm from dmod.core.meta_data import DataCategory, DataDomain, DataFormat, DiscreteRestriction -from .request_clients import DatasetClient, DatasetExternalClient, DatasetInternalClient, NgenRequestClient +from .request_clients import DatasetClient, DatasetExternalClient, DatasetInternalClient, NgenRequestClient, NgenCalRequestClient from .client_config import YamlClientConfig from datetime import datetime from pathlib import Path @@ -13,6 +13,7 @@ def __init__(self, client_config: YamlClientConfig, bypass_request_service: bool self._client_config = client_config self._dataset_client = None self._ngen_client = None + self._ngen_cal_client = None self._bypass_request_service = bypass_request_service @property @@ -103,6 +104,12 @@ def ngen_request_client(self) -> NgenRequestClient: self._ngen_client = NgenRequestClient(self.requests_endpoint_uri, self.requests_ssl_dir) return self._ngen_client + @property + def ngen_cal_request_client(self) -> NgenCalRequestClient: + if self._ngen_cal_client is None: + self._ngen_cal_client = NgenCalRequestClient(self.requests_endpoint_uri, self.requests_ssl_dir) + return self._ngen_cal_client + async def delete_dataset(self, dataset_name: str, **kwargs): return await self.dataset_client.delete_dataset(dataset_name, **kwargs) @@ -237,6 +244,14 @@ async def submit_ngen_request(self, start: datetime, end: datetime, hydrofabric_ cpu_count, realization_cfg_data_id, bmi_cfg_data_id, partition_cfg_data_id, cat_ids, allocation_paradigm) + async def submit_ngen_cal_request(self, start: datetime, end: datetime, hydrofabric_data_id: str, hydrofabric_uid: str, + cpu_count: int, realization_cfg_data_id: str, bmi_cfg_data_id: str, ngen_cal_cfg_data_id: str, + partition_cfg_data_id: Optional[str] = None, cat_ids: Optional[List[str]] = None, + allocation_paradigm: Optional[AllocationParadigm] = None, *args, **kwargs): + return await self.ngen_cal_request_client.request_exec(start, end, hydrofabric_data_id, hydrofabric_uid, + cpu_count, realization_cfg_data_id, bmi_cfg_data_id, ngen_cal_cfg_data_id, + partition_cfg_data_id, cat_ids, allocation_paradigm) + def print_config(self): print(self.client_config.print_config()) diff --git a/python/lib/client/dmod/client/request_clients.py b/python/lib/client/dmod/client/request_clients.py index 09251c01b..31c57010c 100644 --- a/python/lib/client/dmod/client/request_clients.py +++ b/python/lib/client/dmod/client/request_clients.py @@ -2,7 +2,8 @@ from datetime import datetime from dmod.core.execution import AllocationParadigm from dmod.communication import DataServiceClient, ExternalRequestClient, ManagementAction, ModelExecRequestClient, \ - NGENRequest, NGENRequestResponse + NGENRequest, NGENRequestResponse, \ + NgenCalibrationRequest, NgenCalibrationResponse from dmod.communication.client import R from dmod.communication.dataset_management_message import DatasetManagementMessage, DatasetManagementResponse, \ MaaSDatasetManagementMessage, MaaSDatasetManagementResponse, QueryType, DatasetQuery @@ -21,25 +22,34 @@ class NgenRequestClient(ModelExecRequestClient[NGENRequest, NGENRequestResponse]): # In particular needs - endpoint_uri: str, ssl_directory: Path - def __init__(self, *args, **kwargs): + def __init__(self, cached_session_file: Optional[Path] = None, *args, **kwargs): super().__init__(*args, **kwargs) - self._cached_session_file = Path.home().joinpath('.dmod_client_session') + if cached_session_file is None: + self._cached_session_file = Path.home().joinpath('.dmod_client_session') + else: + self._cached_session_file = cached_session_file + + # TODO: need some integration tests for this and CLI main and arg parsing + async def request_exec(self, *args, **kwargs) -> NGENRequestResponse: + await self._async_acquire_session_info() + request = NGENRequest(session_secret=self.session_secret, *args, **kwargs) + return await self.async_make_request(request) + + +class NgenCalRequestClient(ModelExecRequestClient[NgenCalibrationRequest, NgenCalibrationResponse]): + + # In particular needs - endpoint_uri: str, ssl_directory: Path + def __init__(self, cached_session_file: Optional[Path] = None, *args, **kwargs): + super().__init__(*args, **kwargs) + if cached_session_file is None: + self._cached_session_file = Path.home().joinpath('.dmod_client_session') + else: + self._cached_session_file = cached_session_file - async def request_exec(self, start: datetime, end: datetime, hydrofabric_data_id: str, hydrofabric_uid: str, - cpu_count: int, realization_cfg_data_id: str, bmi_cfg_data_id: str, - partition_cfg_data_id: Optional[str] = None, cat_ids: Optional[List[str]] = None, - allocation_paradigm: Optional[AllocationParadigm] = None) -> NGENRequestResponse: + # TODO: need some integration tests for this and CLI main and arg parsing + async def request_exec(self, *args, **kwargs) -> NgenCalibrationResponse: await self._async_acquire_session_info() - request = NGENRequest(session_secret=self.session_secret, - cpu_count=cpu_count, - allocation_paradigm=allocation_paradigm, - time_range=TimeRange(begin=start, end=end), - hydrofabric_uid=hydrofabric_uid, - hydrofabric_data_id=hydrofabric_data_id, - config_data_id=realization_cfg_data_id, - bmi_cfg_data_id=bmi_cfg_data_id, - partition_cfg_data_id=partition_cfg_data_id, - catchments=cat_ids) + request = NgenCalibrationRequest(session_secret=self.session_secret, *args, **kwargs) return await self.async_make_request(request) diff --git a/python/lib/communication/dmod/communication/__init__.py b/python/lib/communication/dmod/communication/__init__.py index 528c6575f..48645e039 100644 --- a/python/lib/communication/dmod/communication/__init__.py +++ b/python/lib/communication/dmod/communication/__init__.py @@ -2,8 +2,9 @@ from .client import DataServiceClient, InternalServiceClient, ModelExecRequestClient, ExternalRequestClient, \ PartitionerServiceClient, SchedulerClient from .maas_request import get_available_models, get_available_outputs, get_distribution_types, get_parameters, \ - get_request, Distribution, ExternalRequest, ExternalRequestResponse, ModelExecRequest, ModelExecRequestResponse, \ - NWMRequest, NWMRequestResponse, Scalar, NGENRequest, NGENRequestResponse + get_request, AbstractNgenRequest, Distribution, DmodJobRequest, ExternalRequest, ExternalRequestResponse,\ + ModelExecRequest, ModelExecRequestResponse, NWMRequest, NWMRequestResponse, Scalar, NGENRequest, \ + NGENRequestResponse, NgenCalibrationRequest, NgenCalibrationResponse from .message import AbstractInitRequest, MessageEventType, Message, Response, InvalidMessage, InvalidMessageResponse, \ InitRequestResponseReason from .metadata_message import MetadataPurpose, MetadataMessage, MetadataResponse diff --git a/python/lib/communication/dmod/communication/_version.py b/python/lib/communication/dmod/communication/_version.py index f323a57be..2c7bffbf8 100644 --- a/python/lib/communication/dmod/communication/_version.py +++ b/python/lib/communication/dmod/communication/_version.py @@ -1 +1 @@ -__version__ = '0.11.0' +__version__ = '0.12.0' diff --git a/python/lib/communication/dmod/communication/maas_request/__init__.py b/python/lib/communication/dmod/communication/maas_request/__init__.py index cad8e026e..6df109014 100644 --- a/python/lib/communication/dmod/communication/maas_request/__init__.py +++ b/python/lib/communication/dmod/communication/maas_request/__init__.py @@ -5,10 +5,12 @@ get_request, ) from .distribution import Distribution +from .dmod_job_request import DmodJobRequest from .parameter import Scalar from .external_request import ExternalRequest from .external_request_response import ExternalRequestResponse from .model_exec_request import ModelExecRequest, get_available_models from .model_exec_request_response import ModelExecRequestResponse from .nwm import NWMRequest, NWMRequestResponse -from .ngen import NGENRequest, NGENRequestResponse +from .ngen import AbstractNgenRequest, NGENRequest, NGENRequestResponse, NgenCalibrationRequest, \ + NgenCalibrationResponse diff --git a/python/lib/communication/dmod/communication/maas_request/dmod_job_request.py b/python/lib/communication/dmod/communication/maas_request/dmod_job_request.py index 5288e7941..a0f09839e 100644 --- a/python/lib/communication/dmod/communication/maas_request/dmod_job_request.py +++ b/python/lib/communication/dmod/communication/maas_request/dmod_job_request.py @@ -1,7 +1,8 @@ from abc import ABC, abstractmethod -from typing import List +from typing import List, Optional, Union +from dmod.core.execution import AllocationParadigm from dmod.core.meta_data import DataFormat, DataRequirement from ..message import AbstractInitRequest @@ -11,8 +12,60 @@ class DmodJobRequest(AbstractInitRequest, ABC): The base class underlying all types of messages requesting execution of some kind of workflow job. """ - def __int__(self, *args, **kwargs): - super(DmodJobRequest, self).__int__(*args, **kwargs) + _DEFAULT_CPU_COUNT = 1 + """ The default number of CPUs to assume are being requested for the job, when not explicitly provided. """ + + def __init__(self, config_data_id: str, cpu_count: Optional[int] = None, + allocation_paradigm: Optional[Union[str, AllocationParadigm]] = None, *args, **kwargs): + super(DmodJobRequest, self).__init__(*args, **kwargs) + self._config_data_id = config_data_id + self._cpu_count = ( + cpu_count if cpu_count is not None else self._DEFAULT_CPU_COUNT + ) + if allocation_paradigm is None: + self._allocation_paradigm = AllocationParadigm.get_default_selection() + elif isinstance(allocation_paradigm, str): + self._allocation_paradigm = AllocationParadigm.get_from_name( + allocation_paradigm + ) + else: + self._allocation_paradigm = allocation_paradigm + + @property + def allocation_paradigm(self) -> AllocationParadigm: + """ + The allocation paradigm desired for use when allocating resources for this request. + + Returns + ------- + AllocationParadigm + The allocation paradigm desired for use with this request. + """ + return self._allocation_paradigm + + @property + def config_data_id(self) -> str: + """ + Value of ``data_id`` index to uniquely identify the dataset with the primary configuration for this request. + + Returns + ------- + str + Value of ``data_id`` identifying the dataset with the primary configuration applicable to this request. + """ + return self._config_data_id + + @property + def cpu_count(self) -> int: + """ + The number of processors requested for this job. + + Returns + ------- + int + The number of processors requested for this job. + """ + return self._cpu_count @property @abstractmethod diff --git a/python/lib/communication/dmod/communication/maas_request/model_exec_request.py b/python/lib/communication/dmod/communication/maas_request/model_exec_request.py index de537091b..c637974db 100644 --- a/python/lib/communication/dmod/communication/maas_request/model_exec_request.py +++ b/python/lib/communication/dmod/communication/maas_request/model_exec_request.py @@ -1,8 +1,5 @@ from abc import ABC -from typing import Optional, Union - -from dmod.core.execution import AllocationParadigm from ..message import MessageEventType from .dmod_job_request import DmodJobRequest from .external_request import ExternalRequest @@ -12,12 +9,25 @@ def get_available_models() -> dict: """ :return: The names of all models mapped to their class """ - available_models = dict() + # TODO: the previous implementation; confirm reason this change was needed + # available_models = dict() + # + # for subclass in ModelExecRequest.__subclasses__(): # type: ModelExecRequest + # available_models[subclass.model_name] = subclass + # + # return available_models + + def recursively_get_all_model_subclasses(model_exec_request: "ModelExecRequest") -> dict: + available_models = dict() - for subclass in ModelExecRequest.__subclasses__(): # type: ModelExecRequest - available_models[subclass.model_name] = subclass + for subclass in model_exec_request.__subclasses__(): # type: ModelExecRequest + available_models[subclass.model_name] = subclass + # TODO: what to do if descendant subclass "overwrites" ancestor subclass? + available_models.update(recursively_get_all_model_subclasses(subclass)) - return available_models + return available_models + + return recursively_get_all_model_subclasses(ModelExecRequest) class ModelExecRequest(ExternalRequest, DmodJobRequest, ABC): @@ -30,9 +40,6 @@ class ModelExecRequest(ExternalRequest, DmodJobRequest, ABC): model_name = None """(:class:`str`) The name of the model to be used""" - _DEFAULT_CPU_COUNT = 1 - """ The default number of CPUs to assume are being requested for the job, when not explicitly provided. """ - @classmethod def factory_init_correct_subtype_from_deserialized_json( cls, json_obj: dict @@ -73,35 +80,8 @@ def get_model_name(cls) -> str: """ return cls.model_name - def __init__( - self, - config_data_id: str, - cpu_count: Optional[int] = None, - allocation_paradigm: Optional[Union[str, AllocationParadigm]] = None, - *args, - **kwargs - ): - """ - Initialize model-exec-specific attributes and state of this request object common to all model exec requests. - - Parameters - ---------- - session_secret : str - The session secret for the right session when communicating with the request handler. - """ + def __init__(self, *args, **kwargs): super(ModelExecRequest, self).__init__(*args, **kwargs) - self._config_data_id = config_data_id - self._cpu_count = ( - cpu_count if cpu_count is not None else self._DEFAULT_CPU_COUNT - ) - if allocation_paradigm is None: - self._allocation_paradigm = AllocationParadigm.get_default_selection() - elif isinstance(allocation_paradigm, str): - self._allocation_paradigm = AllocationParadigm.get_from_name( - allocation_paradigm - ) - else: - self._allocation_paradigm = allocation_paradigm def __eq__(self, other): if not self._check_class_compatible_for_equality(other): @@ -121,39 +101,3 @@ def __eq__(self, other): if req not in other.data_requirements: return False return True - - @property - def allocation_paradigm(self) -> AllocationParadigm: - """ - The allocation paradigm desired for use when allocating resources for this request. - - Returns - ------- - AllocationParadigm - The allocation paradigm desired for use with this request. - """ - return self._allocation_paradigm - - @property - def config_data_id(self) -> str: - """ - Value of ``data_id`` index to uniquely identify the dataset with the primary configuration for this request. - - Returns - ------- - str - Value of ``data_id`` identifying the dataset with the primary configuration applicable to this request. - """ - return self._config_data_id - - @property - def cpu_count(self) -> int: - """ - The number of processors requested for this job. - - Returns - ------- - int - The number of processors requested for this job. - """ - return self._cpu_count diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/__init__.py b/python/lib/communication/dmod/communication/maas_request/ngen/__init__.py index c53e6e3ee..c6a4ee593 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/__init__.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/__init__.py @@ -1 +1,3 @@ +from .abstract_nextgen_request import AbstractNgenRequest from .ngen_request import NGENRequest, NGENRequestResponse +from .ngen_calibration_request import NgenCalibrationRequest, NgenCalibrationResponse diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py new file mode 100644 index 000000000..d56ae3546 --- /dev/null +++ b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py @@ -0,0 +1,625 @@ +from abc import ABC, abstractmethod +from numbers import Number +from typing import Dict, List, Optional, Set, Union + +from dmod.core.meta_data import ( + DataCategory, + DataDomain, + DataFormat, + DataRequirement, + DiscreteRestriction, + TimeRange, +) + +from ..dmod_job_request import DmodJobRequest +from ..model_exec_request import ExternalRequest + + +class AbstractNgenRequest(DmodJobRequest, ABC): + """ + Abstract extension of ::class:`DmodJobRequest` for requesting some kind of job involving the ngen framework. + + A representation of request for a job involving the ngen framework. As such it contains attributes/properties + inherent to running ngen within DMOD: + + - execution time range + - hydrofabric UID, dataset id, and ::class:`DataRequirement` + - primary config dataset id (i.e., the realization config) and ::class:`DataRequirement` + - BMI init configs dataset id and ::class:`DataRequirement` + - forcing ::class:`DataRequirement` + - list of each output dataset's ::class:`DataFormat` + - (Optional) partitioning config dataset id and ::class:`DataRequirement` + - (Optional) list of catchments + + This type provides the implementation for ::method:`factory_init_from_deserialized_json` for all subtypes. This + works by having each level of the class hierarchy be responsible for deserialization applicable to it, as described + below. + + Instead of implementing full deserialization, this type and subtypes include a function to deserialize from JSON the + type-specific keyword parameters passed to the individual type's ::method:`__init__`. This is the + ::method:`deserialize_for_init` class method. Subclass implementations should ensure they call superclass's version + and build on the returned dict of deserialized keyword params from ancestor levels. + + This abstract type also implements a version of ::method:`to_dict` for serializing all the state included at this + level. + """ + + @classmethod + def deserialize_for_init(cls, json_obj: dict) -> dict: + """ + Deserialize a JSON representation to the keyword args needed for use with this type's ::method:`__init__`. + + Parameters + ---------- + json_obj: dict + A serialized JSON representation of an instance. + + Returns + ------- + dict + A dictionary containing the keyword args (both required and any contained optional) necessary for + initializing an instance, with the values deserialized from the received JSON. + """ + deserialized_kwargs = dict() + deserialized_kwargs["time_range"] = TimeRange.factory_init_from_deserialized_json(json_obj["time_range"]) + deserialized_kwargs["hydrofabric_uid"] = json_obj["hydrofabric_uid"] + deserialized_kwargs["hydrofabric_data_id"] = json_obj["hydrofabric_data_id"] + deserialized_kwargs["config_data_id"] = json_obj["config_data_id"] + deserialized_kwargs["bmi_cfg_data_id"] = json_obj["bmi_config_data_id"] + + if "cpu_count" in json_obj: + deserialized_kwargs["cpu_count"] = json_obj["cpu_count"] + if "allocation_paradigm" in json_obj: + deserialized_kwargs["allocation_paradigm"] = json_obj["allocation_paradigm"] + if "catchments" in json_obj: + deserialized_kwargs["catchments"] = json_obj["catchments"] + if "partition_config_data_id" in json_obj: + deserialized_kwargs["partition_config_data_id"] = json_obj["partition_config_data_id"] + + return deserialized_kwargs + + @classmethod + def factory_init_from_deserialized_json(cls, json_obj: dict) -> Optional["AbstractNgenRequest"]: + """ + Deserialize request formated as JSON to an instance. + + See the documentation of this type's ::method:`to_dict` for an example of the format of valid JSON. + + Parameters + ---------- + json_obj : dict + The serialized JSON representation of a request object. + + Returns + ------- + The deserialized ::class:`NGENRequest`, or ``None`` if the JSON was not valid for deserialization. + + See Also + ------- + ::method:`to_dict` + """ + try: + keyword_args = cls.deserialize_for_init(json_obj) + return cls(**keyword_args) + except Exception as e: + return None + + def __eq__(self, other): + return ( + self.time_range == other.time_range + and self.hydrofabric_data_id == other.hydrofabric_data_id + and self.hydrofabric_uid == other.hydrofabric_uid + and self.config_data_id == other.config_data_id + and self.bmi_config_data_id == other.bmi_config_data_id + and self.cpu_count == other.cpu_count + and self.partition_cfg_data_id == other.partition_cfg_data_id + and self.catchments == other.catchments + ) + + def __hash__(self): + hash_str = "{}-{}-{}-{}-{}-{}-{}-{}".format( + self.time_range.to_json(), + self.hydrofabric_data_id, + self.hydrofabric_uid, + self.config_data_id, + self.bmi_config_data_id, + self.cpu_count, + self.partition_cfg_data_id, + ",".join(self.catchments), + ) + return hash(hash_str) + + def __init__(self, + time_range: TimeRange, + hydrofabric_uid: str, + hydrofabric_data_id: str, + bmi_cfg_data_id: str, + catchments: Optional[Union[Set[str], List[str]]] = None, + partition_cfg_data_id: Optional[str] = None, + *args, + **kwargs): + """ + Initialize an instance. + + Parameters + ---------- + time_range : TimeRange + A definition of the time range for the requested model execution. + hydrofabric_uid : str + The unique ID of the applicable hydrofabric for modeling, which provides the outermost geospatial domain. + hydrofabric_data_id : str + A data identifier for the hydrofabric, for distinguishing between different hydrofabrics that cover the same + set of catchments and nexuses (i.e., the same sets of catchment and nexus ids). + catchments : Optional[Union[Set[str], List[str]]] + An optional collection of the catchment ids to narrow the geospatial domain, where the default of ``None`` + or an empty collection implies all catchments in the hydrofabric. + bmi_cfg_data_id : Optional[str] + The optioanl BMI init config ``data_id`` index, for identifying the particular BMI init config datasets + applicable to this request. + + Keyword Args + ----------- + config_data_id : str + The config data id index, for identifying the particular configuration datasets applicable to this request. + session_secret : str + The session secret for the right session when communicating with the MaaS request handler + """ + super(AbstractNgenRequest, self).__init__(*args, **kwargs) + self._time_range = time_range + self._hydrofabric_uid = hydrofabric_uid + self._hydrofabric_data_id = hydrofabric_data_id + self._bmi_config_data_id = bmi_cfg_data_id + self._part_config_data_id = partition_cfg_data_id + # Convert an initial list to a set to remove duplicates + try: + catchments = set(catchments) + # TypeError should mean that we received `None`, so just use that to set _catchments + except TypeError: + self._catchments = catchments + # Assuming we have a set now, move this set back to list and sort + else: + self._catchments = list(catchments) + self._catchments.sort() + + self._hydrofabric_data_requirement = None + self._forcing_data_requirement = None + self._realization_cfg_data_requirement = None + self._bmi_cfg_data_requirement = None + self._partition_cfg_data_requirement = None + + def _gen_catchments_domain_restriction(self, var_name: str = "catchment_id") -> DiscreteRestriction: + """ + Generate a ::class:`DiscreteRestriction` that will restrict to the catchments applicable to this request. + + Note that if the ::attribute:`catchments` property is ``None`` or empty, then the generated restriction object + will reflect that with an empty list of values, implying "all catchments in hydrofabric." This is slightly + different than the internal behavior of ::class:`DiscreteRestriction` itself, which only infers this for empty + lists (i.e., not a ``values`` value of ``None``). This is intentional here, as the natural implication of + specific catchments not being provided as part of a job request is to include all of them. + + Parameters + ---------- + var_name : str + The value of the ::attribute:`DiscreteRestriction.variable` for the restriction; defaults to `catchment-id`. + + Returns + ------- + DiscreteRestriction + ::class:`DiscreteRestriction` that will restrict to the catchments applicable to this request. + """ + return DiscreteRestriction( + variable=var_name, + values=([] if self.catchments is None else self.catchments), + ) + + @property + def data_requirements(self) -> List[DataRequirement]: + """ + List of all the explicit and implied data requirements for this request, as needed for creating a job object. + + Returns + ------- + List[DataRequirement] + List of all the explicit and implied data requirements for this request. + """ + requirements = [ + self.bmi_cfg_data_requirement, + self.forcing_data_requirement, + self.hydrofabric_data_requirement, + self.realization_cfg_data_requirement, + ] + if self.use_parallel_ngen: + requirements.append(self.partition_cfg_data_requirement) + return requirements + + @property + def bmi_config_data_id(self) -> str: + """ + The index value of ``data_id`` to uniquely identify sets of BMI module config data that are otherwise similar. + + Returns + ------- + str + Index value of ``data_id`` to uniquely identify sets of BMI module config data that are otherwise similar. + """ + return self._bmi_config_data_id + + @property + def bmi_cfg_data_requirement(self) -> DataRequirement: + """ + A requirement object defining of the BMI configuration data needed to execute this request. + + Returns + ------- + DataRequirement + A requirement object defining of the BMI configuration data needed to execute this request. + """ + if self._bmi_cfg_data_requirement is None: + bmi_config_restrict = [ + DiscreteRestriction( + variable="data_id", values=[self.bmi_config_data_id] + ) + ] + bmi_config_domain = DataDomain( + data_format=DataFormat.BMI_CONFIG, + discrete_restrictions=bmi_config_restrict, + ) + self._bmi_cfg_data_requirement = DataRequirement( + bmi_config_domain, True, DataCategory.CONFIG + ) + return self._bmi_cfg_data_requirement + + @property + def catchments(self) -> Optional[List[str]]: + """ + An optional list of catchment ids for those catchments in the request ngen execution. + + No list implies "all" known catchments. + + Returns + ------- + Optional[List[str]] + An optional list of catchment ids for those catchments in the request ngen execution. + """ + return self._catchments + + @property + def forcing_data_requirement(self) -> DataRequirement: + """ + A requirement object defining of the forcing data needed to execute this request. + + Returns + ------- + DataRequirement + A requirement object defining of the forcing data needed to execute this request. + """ + if self._forcing_data_requirement is None: + # TODO: going to need to address the CSV usage later + forcing_domain = DataDomain( + data_format=DataFormat.AORC_CSV, + continuous_restrictions=[self._time_range], + discrete_restrictions=[self._gen_catchments_domain_restriction()], + ) + self._forcing_data_requirement = DataRequirement( + domain=forcing_domain, is_input=True, category=DataCategory.FORCING + ) + return self._forcing_data_requirement + + @property + def hydrofabric_data_requirement(self) -> DataRequirement: + """ + A requirement object defining the hydrofabric data needed to execute this request. + + Returns + ------- + DataRequirement + A requirement object defining the hydrofabric data needed to execute this request. + """ + if self._hydrofabric_data_requirement is None: + hydro_restrictions = [ + DiscreteRestriction( + variable="hydrofabric_id", values=[self._hydrofabric_uid] + ), + DiscreteRestriction( + variable="data_id", values=[self._hydrofabric_data_id] + ), + ] + hydro_domain = DataDomain( + data_format=DataFormat.NGEN_GEOJSON_HYDROFABRIC, + discrete_restrictions=hydro_restrictions, + ) + self._hydrofabric_data_requirement = DataRequirement( + domain=hydro_domain, is_input=True, category=DataCategory.HYDROFABRIC + ) + return self._hydrofabric_data_requirement + + @property + def hydrofabric_data_id(self) -> str: + """ + The data format ``data_id`` for the hydrofabric dataset to use in requested modeling. + + This identifier is needed to distinguish the correct hydrofabric dataset, and thus the correct hydrofabric, + expected for this modeling request. For arbitrary hydrofabric types, this may not be possible with the unique + id of the hydrofabric alone. E.g., a slight adjustment of catchment coordinates may be ignored with respect + to the hydrofabric's uid, but may be relevant with respect to a model request. + + Returns + ------- + str + The data format ``data_id`` for the hydrofabric dataset to use in requested modeling. + """ + return self._hydrofabric_data_id + + @property + def hydrofabric_uid(self) -> str: + """ + The unique id of the hydrofabric for this modeling request. + + Returns + ------- + str + The unique id of the hydrofabric for this modeling request. + """ + return self._hydrofabric_uid + + @property + def output_formats(self) -> List[DataFormat]: + """ + List of the formats of each required output dataset for the requested job. + + Returns + ------- + List[DataFormat] + List of the formats of each required output dataset for the requested job. + """ + return [DataFormat.NGEN_OUTPUT] + + @property + def partition_cfg_data_id(self) -> Optional[str]: + """ + The ``data_id`` for the partition config dataset to use in requested modeling. + + This identifier is needed to distinguish the correct specific partition config dataset, and thus the correct + partition config, expected for this modeling request. However, this may not always be necessary, as it should + be possible to find a compatible partitioning config dataset of the right hydrofabric and size, so long as one + exists. + + Returns + ------- + Optional[str] + The data format ``data_id`` for the partition config dataset to use in requested modeling, or ``None``. + """ + return self._part_config_data_id + + @property + def partition_cfg_data_requirement(self) -> DataRequirement: + """ + A requirement object defining of the partitioning configuration data needed to execute this request. + + Returns + ------- + DataRequirement + A requirement object defining of the partitioning configuration data needed to execute this request. + """ + if self._partition_cfg_data_requirement is None and self.use_parallel_ngen: + d_restricts = [] + + # Add restriction on hydrofabric + d_restricts.append( + DiscreteRestriction( + variable="hydrofabric_id", values=[self.hydrofabric_uid] + ) + ) + + # Add restriction on partition count, which will be based on the number of request CPUs + d_restricts.append( + DiscreteRestriction(variable="length", values=[self.cpu_count]) + ) + + # If present, add restriction on data_id + if self.partition_cfg_data_id is not None: + d_restricts.append( + DiscreteRestriction( + variable="data_id", values=[self.partition_cfg_data_id] + ) + ) + part_domain = DataDomain( + data_format=DataFormat.NGEN_PARTITION_CONFIG, + discrete_restrictions=d_restricts, + ) + self._partition_cfg_data_requirement = DataRequirement( + domain=part_domain, is_input=True, category=DataCategory.CONFIG + ) + return self._partition_cfg_data_requirement + + @property + def realization_config_data_id(self) -> str: + """ + The index value of ``data_id`` to uniquely identify sets of realization config data that are otherwise similar. + + For example, two realization configs may apply to the same time and catchments, but be very different. The + nature of the differences is not necessarily even possible to define generally, and certainly not through + (pre-existing) indices. As such, the `data_id` index is added for such differentiating purposes. + + Returns + ------- + str + The index value of ``data_id`` to uniquely identify the required realization config dataset. + """ + return self.config_data_id + + @property + def realization_cfg_data_requirement(self) -> DataRequirement: + """ + A requirement object defining of the realization configuration data needed to execute this request. + + Returns + ------- + DataRequirement + A requirement object defining of the realization configuration data needed to execute this request. + """ + if self._realization_cfg_data_requirement is None: + real_cfg_dis_restrict = [ + self._gen_catchments_domain_restriction(), + DiscreteRestriction( + variable="data_id", values=[self.realization_config_data_id] + ), + ] + real_cfg_domain = DataDomain( + data_format=DataFormat.NGEN_REALIZATION_CONFIG, + continuous_restrictions=[self.time_range], + discrete_restrictions=real_cfg_dis_restrict, + ) + self._realization_cfg_data_requirement = DataRequirement( + domain=real_cfg_domain, is_input=True, category=DataCategory.CONFIG + ) + return self._realization_cfg_data_requirement + + @property + def time_range(self) -> TimeRange: + """ + The time range for the requested model execution. + + Returns + ------- + TimeRange + The time range for the requested model execution. + """ + return self._time_range + + def to_dict(self) -> Dict[str, Union[str, Number, dict, list]]: + """ + Converts the request to a dictionary that may be passed to web requests. + + Will look like: + + { + 'allocation_paradigm': , + 'cpu_count': , + 'time_range': { }, + 'hydrofabric_data_id': 'hy-data-id-val', + 'hydrofabric_uid': 'hy-uid-val', + 'config_data_id': 'config-data-id-val', + 'bmi_config_data_id': 'bmi-config-data-id', + 'partition_config_data_id': 'partition_config_data_id', + ['catchments': { },] + } + + As a reminder, the ``catchments`` item may be absent, which implies the object does not have a specified list of + catchment ids. + + Returns + ------- + Dict[str, Union[str, Number, dict, list]] + A dictionary containing all the data in such a way that it may be used by a web request + """ + serial = dict() + serial["allocation_paradigm"] = self.allocation_paradigm.name + serial["cpu_count"] = self.cpu_count + serial["time_range"] = self.time_range.to_dict() + serial["hydrofabric_data_id"] = self.hydrofabric_data_id + serial["hydrofabric_uid"] = self.hydrofabric_uid + serial["config_data_id"] = self.config_data_id + serial["bmi_config_data_id"] = self._bmi_config_data_id + if self.catchments is not None: + serial["catchments"] = self.catchments + if self.partition_cfg_data_id is not None: + serial["partition_config_data_id"] = self.partition_cfg_data_id + return serial + + @property + def use_parallel_ngen(self) -> bool: + """ + Whether this request specifies to use the variant of the Nextgen framework compiled for parallel execution. + + Nextgen may be compiled to execute either serially or using parallelization. DMOD and its Nextgen job workers + can now support either. This property indicates whether this request indicates that parallel execution should + be used. + + In the default implementation, this property is ``True`` IFF ::method:`use_serial_ngen` is ``False``. + + Returns + ------- + bool + Whether this request specifies parallel Nextgen execution for the job. + + See Also + ------- + use_serial_ngen + """ + return not self.use_serial_ngen + + @property + @abstractmethod + def use_serial_ngen(self) -> bool: + """ + Whether this request specifies to use the variant of the Nextgen framework compiled for serial execution. + + Nextgen may be compiled to execute either serially or using parallelization. DMOD and its Nextgen job workers + can now support either. This property indicates whether this request indicates that serially execution should + be used. + + Returns + ------- + bool + Whether this request specifies serial Nextgen execution for the job. + + See Also + ------- + use_parallel_ngen + """ + pass + + +class ExternalAbstractNgenRequest(ExternalRequest, AbstractNgenRequest, ABC): + """ + Abstract extension of both ::class:`AbstractNgenRequest` and ::class:`ExternalRequest`. + + An abstract subclass of ::class:`AbstractNgenRequest` and ::class:`ExternalRequest` that, due to the latter, + contains a ::attribute:`session_secret` property. As such, the implementations of several functions from + ::class:`AbstractNgenRequest` are extended to properly account for this property (e.g., ::method:`__eq__`). + """ + + @classmethod + def deserialize_for_init(cls, json_obj: dict) -> dict: + """ + Deserialize a JSON representation to the keyword args needed for use with this type's ::method:`__init__`. + + Builds on the superclass's implementation by append the ::attribute:`session_secret` property value. + + Parameters + ---------- + json_obj: dict + A serialized JSON representation of an instance. + + Returns + ------- + dict + A dictionary containing the keyword args (both required and any contained optional) necessary for + initializing an instance, with the values deserialized from the received JSON. + """ + deserialized_kwargs = super().deserialize_for_init(json_obj) + deserialized_kwargs["session_secret"] = json_obj["session_secret"] + return deserialized_kwargs + + def __eq__(self, other): + return super().__eq__(other) and self.session_secret == other.session_secret + + def __hash__(self): + hash_str = "{}-{}-{}-{}-{}-{}-{}-{}-{}".format( + self.time_range.to_json(), + self.hydrofabric_data_id, + self.hydrofabric_uid, + self.config_data_id, + self.bmi_config_data_id, + self.session_secret, + self.cpu_count, + self.partition_cfg_data_id, + ",".join(self.catchments), + ) + return hash(hash_str) + + def to_dict(self) -> Dict[str, Union[str, Number, dict, list]]: + serial = super().to_dict() + serial["session_secret"] = self.session_secret + return serial \ No newline at end of file diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_calibration_request.py b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_calibration_request.py new file mode 100644 index 000000000..345418308 --- /dev/null +++ b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_calibration_request.py @@ -0,0 +1,189 @@ +from numbers import Number +from dmod.core.meta_data import TimeRange +from typing import Dict, List, Optional, Set, Tuple, Union + +from ...message import MessageEventType +from ...maas_request import ModelExecRequestResponse +from .ngen_request import ExternalAbstractNgenRequest + + +class NgenCalibrationRequest(ExternalAbstractNgenRequest): + """ + An extension of ::class:`ExternalAbstractNgenRequest` for requesting ngen framework calibration jobs using ngen-cal. + """ + + event_type: MessageEventType = MessageEventType.CALIBRATION_REQUEST + job_exec_name = 'ngen-cal' #FIXME case sentitivity + + _KEY_CAL_STRATEGY_ALGO = 'strategy_algorithm' + _KEY_CAL_STRATEGY_OBJ_FUNC = 'strategy_objective_function' + _KEY_CAL_STRATEGY_TYPE = 'strategy_type' + _KEY_IS_OBJ_FUNC_MIN = 'is_obj_func_min' + _KEY_IS_RESTART = 'is_restart' + _KEY_ITERATIONS = 'iterations' + _KEY_JOB_NAME = 'job_name' + _KEY_MODEL_CAL_PARAMS = 'model_cal_params' + _KEY_MODEL_STRATEGY = 'model_strategy' + + @classmethod + def deserialize_for_init(cls, json_obj: dict) -> dict: + """ + Deserialize a JSON representation to the keyword args needed for use with this type's ::method:`__init__`. + + Parameters + ---------- + json_obj: dict + A serialized JSON representation of an instance. + + Returns + ------- + dict + A dictionary containing the keyword args (both required and any contained optional) necessary for + initializing an instance, with the values deserialized from the received JSON. + """ + deserialized_init_params = super().deserialize_for_init(json_obj) + deserialized_init_params['cal_strategy_algorithm'] = json_obj[cls._KEY_CAL_STRATEGY_ALGO] + deserialized_init_params['cal_strategy_objective_func'] = json_obj[cls._KEY_CAL_STRATEGY_OBJ_FUNC] + deserialized_init_params['cal_strategy_type'] = json_obj[cls._KEY_CAL_STRATEGY_TYPE] + deserialized_init_params['is_objective_func_minimized'] = json_obj[cls._KEY_IS_OBJ_FUNC_MIN] + deserialized_init_params['is_restart'] = json_obj[cls._KEY_IS_RESTART] + deserialized_init_params['iterations'] = json_obj[cls._KEY_ITERATIONS] + deserialized_init_params['job_name'] = json_obj[cls._KEY_JOB_NAME] + deserialized_init_params['model_cal_params'] = json_obj[cls._KEY_MODEL_CAL_PARAMS] + deserialized_init_params['model_strategy'] = json_obj[cls._KEY_MODEL_STRATEGY] + return deserialized_init_params + + @classmethod + def factory_init_correct_response_subtype(cls, json_obj: dict) -> 'NgenCalibrationResponse': + """ + Init a :obj:`Response` instance of the appropriate subtype for this class from the provided JSON object. + + Parameters + ---------- + json_obj + + Returns + ------- + CalibrationJobResponse + A response of the correct type, with state details from the provided JSON. + """ + return NgenCalibrationResponse.factory_init_from_deserialized_json(json_obj=json_obj) + + def __init__(self, + model_cal_params: Dict[str, Tuple[float, float, float]], + iterations: int, + cal_strategy_type: str = 'estimation', + cal_strategy_algorithm: str = 'dds', + cal_strategy_objective_func: str = 'nnse', + is_objective_func_minimized: bool = True, + model_strategy: str = 'uniform', + job_name: Optional[str] = None, + is_restart: bool = False, + parallel: Optional[int] = None, + *args, + **kwargs): + """ + Initialize an instance. + + Parameters + ---------- + model_cal_params : Dict[str, Tuple[float, float, float]] + A collection of the calibratable params, keyed by name, with a tuple of the min, max, and initial values. + iterations : int + The total number of search iterations to run. + cal_strategy_type : str + The ngen-cal general strategy type for the calibration config (default: ``estimation``). + cal_strategy_algorithm : str + Calibration strategy algorithm ("dds" by default). + cal_strategy_objective_func : str + The standard name ("kling_gupta", "nnse", "custom", "single_peak", "volume") or full ngen_cal package module + name for the objective function to use ("nnse" by default). + is_objective_func_minimized : bool + Whether to minimize the objective function (implies maximize when ``False``; default value: ``True``). + model_strategy : str + The ngen-cal model calibration strategy; one of : + 'uniform' : Each catchment shares the same parameter space, evaluates at one observable nexus + 'independent' : Each catchment upstream of observable nexus gets its own permutated parameter space, + evaluates at one observable nexus + 'explicit' : Only calibrates basins in the realization_config with a "calibration" definition and an + observable nexus + job_name : Optional[str] + Optional job name for the calibration run, which can be used by ngen-cal when generating files. + is_restart : bool + Whether this represents restarting a previous job; ``False`` by default. + parallel: Optional[int] + Optional setting for number of parallel ngen processes used by ngen-cal. + + Keyword Args + ----------- + time_range : TimeRange + A definition of the time range for the configured execution of the ngen framework. + hydrofabric_uid : str + The unique ID of the applicable hydrofabric for modeling, which provides the outermost geospatial domain. + hydrofabric_data_id : str + A data identifier for the hydrofabric, for distinguishing between different hydrofabrics that cover the same + set of catchments and nexuses (i.e., the same sets of catchment and nexus ids). + catchments : Optional[Union[Set[str], List[str]]] + An optional collection of the catchment ids to narrow the geospatial domain, where the default of ``None`` + or an empty collection implies all catchments in the hydrofabric. + bmi_cfg_data_id : Optional[str] + The optional BMI init config ``data_id`` index, for identifying the particular BMI init config datasets + applicable to this request. + config_data_id : str + The config data id index, for identifying the particular configuration datasets applicable to this request. + session_secret : str + The session secret for the right session when communicating with the MaaS request handler + """ + super(NgenCalibrationRequest, self).__init__(*args, **kwargs) + self.model_cal_params = model_cal_params + self.iterations = iterations + self.cal_strategy_type = cal_strategy_type + self.cal_strategy_algorithm = cal_strategy_algorithm + self.cal_strategy_objective_function = cal_strategy_objective_func + self.is_objective_func_minimized = is_objective_func_minimized + self.model_strategy = model_strategy + self.job_name = job_name + self.parallel = parallel + + self.is_restart = is_restart + + # TODO: may need to modify this to have (realization) config dataset start empty (at least optionally) and apply + + def to_dict(self) -> Dict[str, Union[str, Number, dict, list]]: + serial = super().to_dict() + serial["name"] = self.job_exec_name + serial[self._KEY_MODEL_CAL_PARAMS] = self.model_cal_params + serial[self._KEY_CAL_STRATEGY_TYPE] = self.cal_strategy_type + serial[self._KEY_CAL_STRATEGY_ALGO] = self.cal_strategy_algorithm + serial[self._KEY_CAL_STRATEGY_OBJ_FUNC] = self.cal_strategy_objective_function + serial[self._KEY_IS_OBJ_FUNC_MIN] = self.is_objective_func_minimized + serial[self._KEY_ITERATIONS] = self.iterations + serial[self._KEY_JOB_NAME] = self.job_name + serial[self._KEY_MODEL_STRATEGY] = self.model_strategy + serial[self._KEY_IS_RESTART] = self.is_restart + return serial + + @property + def use_serial_ngen(self) -> bool: + return self.parallel is None or self.parallel < 2 + + # TODO: This should likely be created or determined if it already exsits on the fly + # @property + # def data_requirements(self) -> List[DataRequirement]: + # """ + # List of all the explicit and implied data requirements for this request, as needed fo r creating a job object. + + # Returns + # ------- + # List[DataRequirement] + # List of all the explicit and implied data requirements for this request. + # """ + # data_requirements = super().data_requirements + # return [self.calibration_cfg_data_requirement ,*data_requirements] + + +# TODO: aaraney. this looks unfinished +# class NgenCalibrationResponse(ExternalRequestResponse): +class NgenCalibrationResponse(ModelExecRequestResponse): + + response_to_type = NgenCalibrationRequest diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_request.py b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_request.py index b1745952f..e654ca891 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_request.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_request.py @@ -2,20 +2,14 @@ from typing import Dict, List, Optional, Set, Union -from dmod.core.meta_data import ( - DataCategory, - DataDomain, - DataFormat, - DataRequirement, - DiscreteRestriction, - TimeRange, -) +from dmod.core.meta_data import TimeRange from ...message import MessageEventType +from .abstract_nextgen_request import ExternalAbstractNgenRequest from ..model_exec_request import ModelExecRequest from ..model_exec_request_response import ModelExecRequestResponse -class NGENRequest(ModelExecRequest): +class NGENRequest(ModelExecRequest, ExternalAbstractNgenRequest): event_type = MessageEventType.MODEL_EXEC_REQUEST """(:class:`MessageEventType`) The type of event for this message""" @@ -24,57 +18,36 @@ class NGENRequest(ModelExecRequest): """(:class:`str`) The name of the model to be used""" @classmethod - def factory_init_from_deserialized_json( - cls, json_obj: dict - ) -> Optional["NGENRequest"]: + def deserialize_for_init(cls, json_obj: dict) -> dict: """ - Deserialize request formated as JSON to an instance. + Deserialize a JSON representation to the keyword args needed for use with this type's ::method:`__init__`. - See the documentation of this type's ::method:`to_dict` for an example of the format of valid JSON. + Note that this type's implementation, as is the case with others, relies on the superclass's implementation for + a large part of the logic. However, since the serialized format of this type is a little shifted compared to + the superclass's (see docstring for this instance's ::method:`to_dict`), this function copies the received JSON, + flattens this copy, and sends this flattened copy to the superclass call. Parameters ---------- - json_obj : dict - The serialized JSON representation of a request object. + json_obj: dict + A serialized JSON representation of an instance. Returns ------- - The deserialized ::class:`NGENRequest`, or ``None`` if the JSON was not valid for deserialization. + dict + A dictionary containing the keyword args (both required and any contained optional) necessary for + initializing an instance, with the values deserialized from the received JSON. See Also ------- - ::method:`to_dict` + to_dict """ - try: - optional_kwargs_w_defaults = dict() - if "cpu_count" in json_obj["model"]: - optional_kwargs_w_defaults["cpu_count"] = json_obj["model"]["cpu_count"] - if "allocation_paradigm" in json_obj["model"]: - optional_kwargs_w_defaults["allocation_paradigm"] = json_obj["model"][ - "allocation_paradigm" - ] - if "catchments" in json_obj["model"]: - optional_kwargs_w_defaults["catchments"] = json_obj["model"][ - "catchments" - ] - if "partition_config_data_id" in json_obj["model"]: - optional_kwargs_w_defaults["partition_config_data_id"] = json_obj[ - "model" - ]["partition_config_data_id"] - - return cls( - time_range=TimeRange.factory_init_from_deserialized_json( - json_obj["model"]["time_range"] - ), - hydrofabric_uid=json_obj["model"]["hydrofabric_uid"], - hydrofabric_data_id=json_obj["model"]["hydrofabric_data_id"], - config_data_id=json_obj["model"]["config_data_id"], - bmi_cfg_data_id=json_obj["model"]["bmi_config_data_id"], - session_secret=json_obj["session-secret"], - **optional_kwargs_w_defaults - ) - except Exception as e: - return None + # Because of the weird formatting of the JSON, have to manipulate things before passing to the superclass method + flattened_copy = json_obj.copy() + model_part = flattened_copy.pop("model") + flattened_copy.update(model_part) + + return super().deserialize_for_init(flattened_copy) @classmethod def factory_init_correct_response_subtype( @@ -95,48 +68,11 @@ def factory_init_correct_response_subtype( json_obj=json_obj ) - def __eq__(self, other): - return ( - self.time_range == other.time_range - and self.hydrofabric_data_id == other.hydrofabric_data_id - and self.hydrofabric_uid == other.hydrofabric_uid - and self.config_data_id == other.config_data_id - and self.bmi_config_data_id == other.bmi_config_data_id - and self.session_secret == other.session_secret - and self.cpu_count == other.cpu_count - and self.partition_cfg_data_id == other.partition_cfg_data_id - and self.catchments == other.catchments - ) - - def __hash__(self): - hash_str = "{}-{}-{}-{}-{}-{}-{}-{}-{}".format( - self.time_range.to_json(), - self.hydrofabric_data_id, - self.hydrofabric_uid, - self.config_data_id, - self.bmi_config_data_id, - self.session_secret, - self.cpu_count, - self.partition_cfg_data_id, - ",".join(self.catchments), - ) - return hash(hash_str) - - def __init__( - self, - time_range: TimeRange, - hydrofabric_uid: str, - hydrofabric_data_id: str, - bmi_cfg_data_id: str, - catchments: Optional[Union[Set[str], List[str]]] = None, - partition_cfg_data_id: Optional[str] = None, - *args, - **kwargs - ): + def __init__(self, *args, **kwargs): """ Initialize an instance. - Parameters + Keyword Args ---------- time_range : TimeRange A definition of the time range for the requested model execution. @@ -151,387 +87,12 @@ def __init__( bmi_cfg_data_id : Optional[str] The optioanl BMI init config ``data_id`` index, for identifying the particular BMI init config datasets applicable to this request. - - Keyword Args - ----------- config_data_id : str The config data id index, for identifying the particular configuration datasets applicable to this request. session_secret : str The session secret for the right session when communicating with the MaaS request handler """ super().__init__(*args, **kwargs) - self._time_range = time_range - self._hydrofabric_uid = hydrofabric_uid - self._hydrofabric_data_id = hydrofabric_data_id - self._bmi_config_data_id = bmi_cfg_data_id - self._part_config_data_id = partition_cfg_data_id - # Convert an initial list to a set to remove duplicates - try: - catchments = set(catchments) - # TypeError should mean that we received `None`, so just use that to set _catchments - except TypeError: - self._catchments = catchments - # Assuming we have a set now, move this set back to list and sort - else: - self._catchments = list(catchments) - self._catchments.sort() - - self._hydrofabric_data_requirement = None - self._forcing_data_requirement = None - self._realization_cfg_data_requirement = None - self._bmi_cfg_data_requirement = None - self._partition_cfg_data_requirement = None - - def _gen_catchments_domain_restriction( - self, var_name: str = "catchment_id" - ) -> DiscreteRestriction: - """ - Generate a ::class:`DiscreteRestriction` that will restrict to the catchments applicable to this request. - - Note that if the ::attribute:`catchments` property is ``None`` or empty, then the generated restriction object - will reflect that with an empty list of values, implying "all catchments in hydrofabric." This is slightly - different than the internal behavior of ::class:`DiscreteRestriction` itself, which only infers this for empty - lists (i.e., not a ``values`` value of ``None``). This is intentional here, as the natural implication of - specific catchments not being provided as part of a job request is to include all of them. - - Parameters - ---------- - var_name : str - The value of the ::attribute:`DiscreteRestriction.variable` for the restriction; defaults to `catchment-id`. - - Returns - ------- - DiscreteRestriction - ::class:`DiscreteRestriction` that will restrict to the catchments applicable to this request. - """ - return DiscreteRestriction( - variable=var_name, - values=([] if self.catchments is None else self.catchments), - ) - - @property - def data_requirements(self) -> List[DataRequirement]: - """ - List of all the explicit and implied data requirements for this request, as needed for creating a job object. - - Returns - ------- - List[DataRequirement] - List of all the explicit and implied data requirements for this request. - """ - requirements = [ - self.bmi_cfg_data_requirement, - self.forcing_data_requirement, - self.hydrofabric_data_requirement, - self.realization_cfg_data_requirement, - ] - if self.use_parallel_ngen: - requirements.append(self.partition_cfg_data_requirement) - return requirements - - @property - def bmi_config_data_id(self) -> str: - """ - The index value of ``data_id`` to uniquely identify sets of BMI module config data that are otherwise similar. - - Returns - ------- - str - Index value of ``data_id`` to uniquely identify sets of BMI module config data that are otherwise similar. - """ - return self._bmi_config_data_id - - @property - def bmi_cfg_data_requirement(self) -> DataRequirement: - """ - A requirement object defining of the BMI configuration data needed to execute this request. - - Returns - ------- - DataRequirement - A requirement object defining of the BMI configuration data needed to execute this request. - """ - if self._bmi_cfg_data_requirement is None: - bmi_config_restrict = [ - DiscreteRestriction( - variable="data_id", values=[self._bmi_config_data_id] - ) - ] - bmi_config_domain = DataDomain( - data_format=DataFormat.BMI_CONFIG, - discrete_restrictions=bmi_config_restrict, - ) - self._bmi_cfg_data_requirement = DataRequirement( - bmi_config_domain, True, DataCategory.CONFIG - ) - return self._bmi_cfg_data_requirement - - @property - def catchments(self) -> Optional[List[str]]: - """ - An optional list of catchment ids for those catchments in the request ngen execution. - - No list implies "all" known catchments. - - Returns - ------- - Optional[List[str]] - An optional list of catchment ids for those catchments in the request ngen execution. - """ - return self._catchments - - @property - def forcing_data_requirement(self) -> DataRequirement: - """ - A requirement object defining of the forcing data needed to execute this request. - - Returns - ------- - DataRequirement - A requirement object defining of the forcing data needed to execute this request. - """ - if self._forcing_data_requirement is None: - # TODO: going to need to address the CSV usage later - forcing_domain = DataDomain( - # TODO: come back to this to change to other type - data_format=DataFormat.AORC_CSV, - continuous_restrictions=[self._time_range], - discrete_restrictions=[self._gen_catchments_domain_restriction()], - ) - self._forcing_data_requirement = DataRequirement( - domain=forcing_domain, is_input=True, category=DataCategory.FORCING - ) - return self._forcing_data_requirement - - @property - def hydrofabric_data_requirement(self) -> DataRequirement: - """ - A requirement object defining the hydrofabric data needed to execute this request. - - Returns - ------- - DataRequirement - A requirement object defining the hydrofabric data needed to execute this request. - """ - if self._hydrofabric_data_requirement is None: - hydro_restrictions = [ - DiscreteRestriction( - variable="hydrofabric_id", values=[self._hydrofabric_uid] - ), - DiscreteRestriction( - variable="data_id", values=[self._hydrofabric_data_id] - ), - ] - hydro_domain = DataDomain( - data_format=DataFormat.NGEN_GEOJSON_HYDROFABRIC, - discrete_restrictions=hydro_restrictions, - ) - self._hydrofabric_data_requirement = DataRequirement( - domain=hydro_domain, is_input=True, category=DataCategory.HYDROFABRIC - ) - return self._hydrofabric_data_requirement - - @property - def hydrofabric_data_id(self) -> str: - """ - The data format ``data_id`` for the hydrofabric dataset to use in requested modeling. - - This identifier is needed to distinguish the correct hydrofabric dataset, and thus the correct hydrofabric, - expected for this modeling request. For arbitrary hydrofabric types, this may not be possible with the unique - id of the hydrofabric alone. E.g., a slight adjustment of catchment coordinates may be ignored with respect - to the hydrofabric's uid, but may be relevant with respect to a model request. - - Returns - ------- - str - The data format ``data_id`` for the hydrofabric dataset to use in requested modeling. - """ - return self._hydrofabric_data_id - - @property - def hydrofabric_uid(self) -> str: - """ - The unique id of the hydrofabric for this modeling request. - - Returns - ------- - str - The unique id of the hydrofabric for this modeling request. - """ - return self._hydrofabric_uid - - @property - def use_parallel_ngen(self) -> bool: - """ - Whether this request specifies to use the variant of the NextGen framework compiled for parallel execution. - - NextGen may be compiled to execute either serially or using parallelization. DMOD and its NextGen job workers - can now support either. This property indicates whether this request indicates that parallel execution should - be used. - - In the current implementation, this property is ``True`` IFF ::method:`use_serial_ngen` is ``False``. Note that - this will result in CPU counts of ``0`` or negative numbers, if they were to occur, also resulting in this - returning ``True``. - - Returns - ------- - bool - Whether this request specifies parallel NextGen execution for the job. - - See Also - ------- - use_serial_ngen - """ - return not self.use_serial_ngen - - @property - def use_serial_ngen(self) -> bool: - """ - Whether this request specifies to use the variant of the NextGen framework compiled for serial execution. - - NextGen may be compiled to execute either serially or using parallelization. DMOD and its NextGen job workers - can now support either. This property indicates whether this request indicates that serially execution should - be used. - - In the current implementation, this property is ``True`` IFF the request required a CPU count of exactly ``1``. - - Returns - ------- - bool - Whether this request specifies serial NextGen execution for the job. - - See Also - ------- - use_parallel_ngen - """ - return self.cpu_count == 1 - - - - @property - def output_formats(self) -> List[DataFormat]: - """ - List of the formats of each required output dataset for the requested job. - - Returns - ------- - List[DataFormat] - List of the formats of each required output dataset for the requested job. - """ - return [DataFormat.NGEN_OUTPUT] - - @property - def partition_cfg_data_id(self) -> Optional[str]: - """ - The ``data_id`` for the partition config dataset to use in requested modeling. - - This identifier is needed to distinguish the correct specific partition config dataset, and thus the correct - partition config, expected for this modeling request. However, this may not always be necessary, as it should - be possible to find a compatible partitioning config dataset of the right hydrofabric and size, so long as one - exists. - - Returns - ------- - Optional[str] - The data format ``data_id`` for the partition config dataset to use in requested modeling, or ``None``. - """ - return self._part_config_data_id - - @property - def partition_cfg_data_requirement(self) -> Optional[DataRequirement]: - """ - A requirement object defining of the partitioning configuration data needed to execute this request. - - Returns - ------- - Optional[DataRequirement] - Requirement object defining of the partitioning configuration data needed to execute this request. - """ - if self._partition_cfg_data_requirement is None and self.use_parallel_ngen: - d_restricts = [] - - # Add restriction on hydrofabric - d_restricts.append( - DiscreteRestriction( - variable="hydrofabric_id", values=[self.hydrofabric_uid] - ) - ) - - # Add restriction on partition count, which will be based on the number of request CPUs - d_restricts.append( - DiscreteRestriction(variable="length", values=[self.cpu_count]) - ) - - # If present, add restriction on data_id - if self.partition_cfg_data_id is not None: - d_restricts.append( - DiscreteRestriction( - variable="data_id", values=[self.partition_cfg_data_id] - ) - ) - part_domain = DataDomain( - data_format=DataFormat.NGEN_PARTITION_CONFIG, - discrete_restrictions=d_restricts, - ) - self._partition_cfg_data_requirement = DataRequirement( - domain=part_domain, is_input=True, category=DataCategory.CONFIG - ) - return self._partition_cfg_data_requirement - - @property - def realization_config_data_id(self) -> str: - """ - The index value of ``data_id`` to uniquely identify sets of realization config data that are otherwise similar. - - For example, two realization configs may apply to the same time and catchments, but be very different. The - nature of the differences is not necessarily even possible to define generally, and certainly not through - (pre-existing) indices. As such, the `data_id` index is added for such differentiating purposes. - - Returns - ------- - str - The index value of ``data_id`` to uniquely identify the required realization config dataset. - """ - return self.config_data_id - - @property - def realization_cfg_data_requirement(self) -> DataRequirement: - """ - A requirement object defining of the realization configuration data needed to execute this request. - - Returns - ------- - DataRequirement - A requirement object defining of the realization configuration data needed to execute this request. - """ - if self._realization_cfg_data_requirement is None: - real_cfg_dis_restrict = [ - self._gen_catchments_domain_restriction(), - DiscreteRestriction( - variable="data_id", values=[self.realization_config_data_id] - ), - ] - real_cfg_domain = DataDomain( - data_format=DataFormat.NGEN_REALIZATION_CONFIG, - continuous_restrictions=[self.time_range], - discrete_restrictions=real_cfg_dis_restrict, - ) - self._realization_cfg_data_requirement = DataRequirement( - domain=real_cfg_domain, is_input=True, category=DataCategory.CONFIG - ) - return self._realization_cfg_data_requirement - - @property - def time_range(self) -> TimeRange: - """ - The time range for the requested model execution. - - Returns - ------- - TimeRange - The time range for the requested model execution. - """ - return self._time_range def to_dict(self) -> Dict[str, Union[str, Number, dict, list]]: """ @@ -553,7 +114,7 @@ def to_dict(self) -> Dict[str, Union[str, Number, dict, list]]: ['catchments': { },] 'version': 4.0 }, - 'session-secret': 'secret-string-val' + 'session_secret': 'secret-string-val' } As a reminder, the ``catchments`` item may be absent, which implies the object does not have a specified list of @@ -564,21 +125,33 @@ def to_dict(self) -> Dict[str, Union[str, Number, dict, list]]: Dict[str, Union[str, Number, dict, list]] A dictionary containing all the data in such a way that it may be used by a web request """ - model = dict() + model = super().to_dict() model["name"] = self.get_model_name() - model["allocation_paradigm"] = self.allocation_paradigm.name - model["cpu_count"] = self.cpu_count - model["time_range"] = self.time_range.to_dict() - model["hydrofabric_data_id"] = self.hydrofabric_data_id - model["hydrofabric_uid"] = self.hydrofabric_uid - model["config_data_id"] = self.config_data_id - model["bmi_config_data_id"] = self._bmi_config_data_id - if self.catchments is not None: - model["catchments"] = self.catchments - if self.partition_cfg_data_id is not None: - model["partition_config_data_id"] = self.partition_cfg_data_id - - return {"model": model, "session-secret": self.session_secret} + # Move this to outer layer + session_secret_val = model.pop("session_secret") + return {"model": model, "session_secret": session_secret_val} + + @property + def use_serial_ngen(self) -> bool: + """ + Whether this request specifies to use the variant of the Nextgen framework compiled for serial execution. + + Nextgen may be compiled to execute either serially or using parallelization. DMOD and its Nextgen job workers + can now support either. This property indicates whether this request indicates that serially execution should + be used. + + In the current implementation, this property is ``True`` IFF the request required a CPU count of exactly ``1``. + + Returns + ------- + bool + Whether this request specifies serial Nextgen execution for the job. + + See Also + ------- + use_parallel_ngen + """ + return self.cpu_count == 1 class NGENRequestResponse(ModelExecRequestResponse): diff --git a/python/lib/communication/dmod/communication/message.py b/python/lib/communication/dmod/communication/message.py index bad2e4869..88dcc2cdc 100644 --- a/python/lib/communication/dmod/communication/message.py +++ b/python/lib/communication/dmod/communication/message.py @@ -92,8 +92,8 @@ class AbstractInitRequest(Message, ABC): interactions. """ - def __int__(self, *args, **kwargs): - super(AbstractInitRequest, self).__int__(*args, **kwargs) + def __init__(self, *args, **kwargs): + super(AbstractInitRequest, self).__init__(*args, **kwargs) class Response(ResultIndicator, Message, ABC): diff --git a/python/lib/communication/dmod/test/test_ngen_request.py b/python/lib/communication/dmod/test/test_ngen_request.py index 53528e086..946412754 100644 --- a/python/lib/communication/dmod/test/test_ngen_request.py +++ b/python/lib/communication/dmod/test/test_ngen_request.py @@ -32,7 +32,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: '{"allocation_paradigm": "SINGLE_NODE", "bmi_config_data_id": "02468", "config_data_id": "02468", "cpu_count": ' + str(cpu_count_ex_0) + ', ' '"hydrofabric_data_id": "9876543210", "hydrofabric_uid": "0123456789", "name": "ngen", "time_range": ' + time_range.to_json() + '}, ' - '"session-secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"}') + '"session_secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"}') self.request_jsons.append({ 'model': { 'name': 'ngen', @@ -44,7 +44,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: 'bmi_config_data_id': '02468', 'config_data_id': '02468' }, - 'session-secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c' + 'session_secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c' }) self.request_objs.append( NGENRequest(session_secret='f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c', @@ -69,7 +69,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: '{"allocation_paradigm": "ROUND_ROBIN", "bmi_config_data_id": "02468", "catchments": ' + cat_ids_str + ', "config_data_id": "02468", ' '"cpu_count": ' + str(cpu_count_ex_1) + ', "hydrofabric_data_id": "9876543210", ' '"hydrofabric_uid": "0123456789", "name": "ngen", "time_range": ' + time_range.to_json() + '}, ' - '"session-secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"}') + '"session_secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"}') self.request_jsons.append({ 'model': { 'name': 'ngen', @@ -82,7 +82,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: 'bmi_config_data_id': '02468', 'catchments': cat_ids_list }, - 'session-secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c' + 'session_secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c' }) self.request_objs.append( NGENRequest(session_secret='f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c', diff --git a/python/lib/communication/dmod/test/test_scheduler_request_message.py b/python/lib/communication/dmod/test/test_scheduler_request_message.py index c9fc346a6..2af37f478 100644 --- a/python/lib/communication/dmod/test/test_scheduler_request_message.py +++ b/python/lib/communication/dmod/test/test_scheduler_request_message.py @@ -44,7 +44,7 @@ def setUp(self) -> None: allocation_paradigm='ROUND_ROBIN')) # Example 1 - NGenRequest - raw_json_str_1 = '{"allocation_paradigm": "SINGLE_NODE", "cpus": 288, "mem": 500000, "model_request": {"model": {"allocation_paradigm": "SINGLE_NODE", "bmi_config_data_id": "simple-bmi-cfe-1", "config_data_id": "huc01-simple-realization-config-1", "cpu_count": 288, "hydrofabric_data_id": "hydrofabric-huc01-copy-288", "hydrofabric_uid": "72c2a0220aa7315b50e55b6c5b68f927ac1d9b81", "name": "ngen", "time_range": {"begin": "2012-05-01 00:00:00", "datetime_pattern": "%Y-%m-%d %H:%M:%S", "end": "2012-05-31 23:00:00", "subclass": "TimeRange", "variable": "TIME"}}, "session-secret": "675b2f8826f69f97c01fe4d7add30420322cd21a790ddc68a5b3c149966de919"}, "user_id": "someone"}' + raw_json_str_1 = '{"allocation_paradigm": "SINGLE_NODE", "cpus": 288, "mem": 500000, "model_request": {"model": {"allocation_paradigm": "SINGLE_NODE", "bmi_config_data_id": "simple-bmi-cfe-1", "config_data_id": "huc01-simple-realization-config-1", "cpu_count": 288, "hydrofabric_data_id": "hydrofabric-huc01-copy-288", "hydrofabric_uid": "72c2a0220aa7315b50e55b6c5b68f927ac1d9b81", "name": "ngen", "time_range": {"begin": "2012-05-01 00:00:00", "datetime_pattern": "%Y-%m-%d %H:%M:%S", "end": "2012-05-31 23:00:00", "subclass": "TimeRange", "variable": "TIME"}}, "session_secret": "675b2f8826f69f97c01fe4d7add30420322cd21a790ddc68a5b3c149966de919"}, "user_id": "someone"}' raw_json_obj_1 = json.loads(raw_json_str_1) sorted_json_str_1 = json.dumps(raw_json_obj_1, sort_keys=True) self.request_strings.append(sorted_json_str_1) @@ -54,7 +54,7 @@ def setUp(self) -> None: "hydrofabric_uid": "72c2a0220aa7315b50e55b6c5b68f927ac1d9b81", "name": "ngen", "time_range": {"begin": "2012-05-01 00:00:00", "datetime_pattern": "%Y-%m-%d %H:%M:%S", "end": "2012-05-31 23:00:00", "subclass": "TimeRange", "variable": "TIME"}}, - "session-secret": "675b2f8826f69f97c01fe4d7add30420322cd21a790ddc68a5b3c149966de919"}, + "session_secret": "675b2f8826f69f97c01fe4d7add30420322cd21a790ddc68a5b3c149966de919"}, "user_id": "someone"}) time_range = TimeRange.factory_init_from_deserialized_json({"begin": "2012-05-01 00:00:00", diff --git a/python/lib/core/dmod/core/meta_data.py b/python/lib/core/dmod/core/meta_data.py index 9c74c9fe0..2bf5688f5 100644 --- a/python/lib/core/dmod/core/meta_data.py +++ b/python/lib/core/dmod/core/meta_data.py @@ -303,9 +303,8 @@ def __eq__(self, other): else: return False - def __hash__(self): - str_func = lambda x: str(x) if self._datetime_pattern is None else datetime.strptime(x, self._datetime_pattern) - hash('{}-{}-{}'.format(self.variable.name, str_func(self.begin), str_func(self.end))) + def __hash__(self) -> int: + return hash((self.variable.name, self.begin, self.end)) def contains(self, other: 'ContinuousRestriction') -> bool: """ @@ -377,8 +376,8 @@ def __eq__(self, other): else: return False - def __hash__(self): - hash('{}-{}'.format(self.variable.name, ','.join([str(v) for v in self.values]))) + def __hash__(self) -> int: + return hash((self.variable.name, *self.values)) def contains(self, other: 'DiscreteRestriction') -> bool: """ @@ -543,16 +542,13 @@ def __eq__(self, other): and self.discrete_restrictions == other.discrete_restrictions \ and self._custom_data_fields == other._custom_data_fields - def __hash__(self): - if self._custom_data_fields is None: - cu = '' - else: - cu = ','.join(['{}:{}'.format(f, self._custom_data_fields[f]) for f in sorted(self._custom_data_fields)]) - return hash('{}-{}-{}-{}'.format( - self.data_format, - ','.join([str(hash(self.continuous_restrictions[k])) for k in sorted(self.continuous_restrictions)]), - ','.join([str(hash(self.discrete_restrictions[k])) for k in sorted(self.discrete_restrictions)]), - cu)) + def __hash__(self) -> int: + cu = [] if self._custom_data_fields is None else [tup for tup in sorted(self._custom_data_fields.items())] + return hash((self.data_format.name, + *[v for _, v in sorted(self.continuous_restrictions.items(), key=lambda dt: dt[0].name)], + *[v for _, v in sorted(self.discrete_restrictions.items(), key=lambda dt: dt[0].name)], + *cu + )) def __init__(self, data_format: DataFormat, continuous_restrictions: Optional[List[ContinuousRestriction]] = None, discrete_restrictions: Optional[List[DiscreteRestriction]] = None, @@ -734,6 +730,47 @@ class TimeRange(ContinuousRestriction): Encapsulated representation of a time range. """ + @classmethod + def parse_from_string(cls, as_string: str, dt_format: Optional[str] = None, dt_str_length: int = 19) -> 'TimeRange': + """ + Parse a colloquial string representation of a time range to an object. + + Parse a string representation to an instance. Any string that begins and ends with independent, valid date+time + substrings qualifies; e.g., " to " or " - ". + + Parameters + ---------- + as_string: str + The representation of an instance in the form of a begin and end datetime string. + dt_format: Optional[str] + The optional datetime parsing format pattern, ``None`` by default, which is replaced with the pattern + returned by ::method:`get_datetime_str_format`. + dt_str_length: int + The length of a valid date+time substring, needed for individually parsing it, which should correspond to + the current ``dt_format`` (default: 19). + + Returns + ------- + TimeRange + The parsed instance. + """ + if dt_format is None: + dt_format = cls.get_datetime_str_format() + + # This can't be valid, so sanity check for it + if dt_str_length < len(dt_format): + raise ValueError("Invalid datetime substring length of {} for format {}".format(dt_str_length, dt_format)) + + # This is an absolute min + if len(as_string) < dt_str_length * 2: + raise ValueError("String '{}' cannot contain two datetime substrings".format(as_string)) + + try: + return cls(begin=datetime.strptime(as_string[:dt_str_length], dt_format), + end=datetime.strptime(as_string[(-1 * dt_str_length):], dt_format)) + except: + raise ValueError + def __init__(self, begin: Union[str, datetime], end: Union[str, datetime], datetime_pattern: Optional[str] = None, **kwargs): dt_ptrn = self.get_datetime_str_format() if datetime_pattern is None else datetime_pattern @@ -798,7 +835,7 @@ def __eq__(self, other): and self.category == other.category def __hash__(self): - return hash('{}-{}-{}'.format(hash(self.domain), self.is_input, self.category)) + return hash((self.domain, self.is_input, self.category)) def __init__(self, domain: DataDomain, is_input: bool, category: DataCategory, size: Optional[int] = None, fulfilled_by: Optional[str] = None, fulfilled_access_at: Optional[str] = None): diff --git a/python/lib/externalrequests/dmod/externalrequests/__init__.py b/python/lib/externalrequests/dmod/externalrequests/__init__.py index fea23d035..9778e48c0 100644 --- a/python/lib/externalrequests/dmod/externalrequests/__init__.py +++ b/python/lib/externalrequests/dmod/externalrequests/__init__.py @@ -1,8 +1,8 @@ from .auth_handler import AuthHandler from .maas_request_handlers import DatasetRequestHandler, MaaSRequestHandler, PartitionRequestHandler -from .model_exec_request_handler import ModelExecRequestHandler +from .model_exec_request_handler import ModelExecRequestHandler, NgenCalibrationRequestHandler from .evaluation_request_handler import EvaluationRequestHandler from .evaluation_request_handler import LaunchEvaluationMessage from .evaluation_request_handler import OpenEvaluationMessage -name = 'externalrequests' \ No newline at end of file +name = 'externalrequests' diff --git a/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py b/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py index dff0b45ac..92a2bb225 100644 --- a/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py +++ b/python/lib/externalrequests/dmod/externalrequests/maas_request_handlers.py @@ -31,7 +31,7 @@ class MaaSRequestHandler(AbstractRequestHandler, ABC): """ def __init__(self, session_manager: SessionManager, authorizer: Authorizer, service_host: str, service_port: int, - service_ssl_dir: Path): + service_ssl_dir: Path, *args, **kwargs): self._session_manager = session_manager self._authorizer = authorizer self._service_host = service_host @@ -155,13 +155,23 @@ def service_url(self) -> str: class PartitionRequestHandler(MaaSRequestHandler): - def __init__(self, session_manager: SessionManager, authorizer: Authorizer, partition_service_host: str, - partition_service_port: int, partition_service_ssl_dir: Path): - super(PartitionRequestHandler, self).__init__(session_manager=session_manager, - authorizer=authorizer, - service_host=partition_service_host, - service_port=partition_service_port, - service_ssl_dir=partition_service_ssl_dir) + def __init__(self, *args, **kwargs): + """ + + Parameters + ---------- + args + kwargs + + Other Parameters + ---------- + session_manager + authorizer + service_host + service_port + service_ssl_dir + """ + super(PartitionRequestHandler, self).__init__(*args, **kwargs) # TODO: implement properly self._default_required_access_type = None @@ -196,7 +206,7 @@ def service_client(self) -> PartitionerServiceClient: return self._service_client async def handle_request(self, request: PartitionRequest, **kwargs) -> PartitionResponse: - session, is_authorized, reason, msg = self.get_authorized_session(request) + session, is_authorized, reason, msg = await self.get_authorized_session(request) if not is_authorized: return PartitionResponse(success=False, reason=reason.name, message=msg) # In this case, we actually can pass the request as-is straight through (i.e., after confirming authorization) @@ -209,13 +219,24 @@ async def handle_request(self, request: PartitionRequest, **kwargs) -> Partition class DatasetRequestHandler(MaaSRequestHandler): - def __init__(self, session_manager: SessionManager, authorizer: Authorizer, data_service_host: str, - data_service_port: int, data_service_ssl_dir: Path): - super(DatasetRequestHandler, self).__init__(session_manager=session_manager, - authorizer=authorizer, - service_host=data_service_host, - service_port=data_service_port, - service_ssl_dir=data_service_ssl_dir) + def __init__(self, *args, **kwargs): + """ + + Parameters + ---------- + args + kwargs + + Other Parameters + ---------- + session_manager + authorizer + service_host + service_port + service_ssl_dir + + """ + super(DatasetRequestHandler, self).__init__(*args, **kwargs) # TODO: implement properly self._default_required_access_type = None diff --git a/python/lib/externalrequests/dmod/externalrequests/model_exec_request_handler.py b/python/lib/externalrequests/dmod/externalrequests/model_exec_request_handler.py index b827ca9c5..bc0a641b3 100644 --- a/python/lib/externalrequests/dmod/externalrequests/model_exec_request_handler.py +++ b/python/lib/externalrequests/dmod/externalrequests/model_exec_request_handler.py @@ -3,8 +3,8 @@ from pathlib import Path from dmod.access import Authorizer from dmod.communication import FullAuthSession, InitRequestResponseReason, ModelExecRequest, ModelExecRequestResponse, \ - NGENRequest, NGENRequestResponse, NWMRequest, NWMRequestResponse, SchedulerClient, SchedulerRequestMessage, \ - SchedulerRequestResponse, SessionManager + NGENRequest, NGENRequestResponse, NgenCalibrationRequest, NgenCalibrationResponse, NWMRequest, NWMRequestResponse, \ + SchedulerClient, SchedulerRequestMessage, SchedulerRequestResponse, SessionManager from .maas_request_handlers import MaaSRequestHandler from typing import Optional @@ -17,9 +17,24 @@ class ModelExecRequestHandler(MaaSRequestHandler): - def __init__(self, session_manager: SessionManager, authorizer: Authorizer, scheduler_host: str, - scheduler_port: int, scheduler_ssl_dir: Path): - super().__init__(session_manager, authorizer, scheduler_host, scheduler_port, scheduler_ssl_dir) + def __init__(self, *args, **kwargs): + """ + + Parameters + ---------- + args + kwargs + + Other Parameters + ---------- + session_manager + authorizer + service_host + service_port + service_ssl_dir + + """ + super().__init__(*args, **kwargs) # TODO: implement properly self._default_required_access_type = None @@ -104,25 +119,47 @@ async def determine_required_access_types(self, request: ModelExecRequest, user) # FIXME: for now, just use the default type (which happens to be "everything") return self._default_required_access_type, + async def _preprocess_request(self, request: ModelExecRequest): + """ + Execute any appropriate preprocessing steps for this request before passing it to the scheduler. + + The default implementation does not perform any actions. + + Parameters + ---------- + request + + Raises + ------- + RuntimeError + """ + pass + async def handle_request(self, request: ModelExecRequest, **kwargs) -> ModelExecRequestResponse: """ - Handle the given request for a new NWM job execution and return the resulting response. + Handle the given request for a new job execution and return the resulting response. Parameters ---------- request: ModelExecRequest - A ``ModelExecRequest`` message instance with details of the job being requested. + A ::class:`ModelExecRequest` (or subclass) instance with details of the job being requested. Returns ------- response: ModelExecRequestResponse - An appropriate ``NWMRequestResponse`` object. + An appropriate response object derived from ::class:`ModelExecRequestResponse`. """ session, is_authorized, reason, msg = await self.get_authorized_session(request) if not is_authorized: return self._generate_request_response(exec_request=request, success=False, reason=reason.name, message=msg, scheduler_response=None) + try: + await self._preprocess_request(request=request) + except RuntimeError as e: + return self._generate_request_response(exec_request=request, success=False, reason='Preprocessing Failure', + message=str(e), scheduler_response=None) + # The context manager manages a SINGLE connection to the scheduler server # Adhoc calls to the scheduler can be made for this connection via the scheduler_client # These adhoc calls will use the SAME connection the context was initialized with @@ -146,5 +183,83 @@ async def handle_request(self, request: ModelExecRequest, **kwargs) -> ModelExec @property def service_client(self) -> SchedulerClient: if self._scheduler_client is None: - self._scheduler_client = SchedulerClient(self.service_url, self.service_ssl_dir) + self._scheduler_client = SchedulerClient(ssl_directory=self.service_ssl_dir, endpoint_uri=self.service_url) return self._scheduler_client + + +class NgenCalibrationRequestHandler(ModelExecRequestHandler): + """ + An extension of ::class:`ModelExecRequestHandler` specifically for Nextgen calibration requests. + """ + + def __init__(self, *args, **kwargs): + """ + + Parameters + ---------- + args + kwargs + + Other Parameters + ---------- + session_manager + authorizer + service_host + service_port + service_ssl_dir + + """ + super().__init__(*args, **kwargs) + + # TODO: implement properly (yes, manually doing this again here) + self._default_cal_required_access_type = None + + def _generate_request_response(self, exec_request: NgenCalibrationRequest, success: bool, reason: str, message: str, + scheduler_response: Optional[SchedulerRequestResponse]) -> NgenCalibrationResponse: + """ + Generate a response message of the appropriate type for the given model exec request message. + + Parameters + ---------- + exec_request : NgenCalibrationRequest + The originating ::class:`NgenCalibrationRequest` message requiring a response. + success : bool + Whether the request was successful. + reason : string + A summary of why the request was successful or not. + message : string + A more detailed description of why the request was successful or not. + scheduler_response : Optional[SchedulerRequestResponse] + Response message from the scheduler when processing the exec request resulted in a scheduler request. + Returns + ------- + NgenCalibrationResponse + A generated calibration response object. + """ + if not isinstance(exec_request, NgenCalibrationRequest): + msg = "{} cannot generate calibration response to unexpected {}" + raise RuntimeError(msg.format(self.__class__.__name__, exec_request.__class__.__name__)) + else: + return NgenCalibrationResponse(success=success, reason=reason, message=message, + scheduler_response=scheduler_response) + + async def determine_required_access_types(self, request: NgenCalibrationRequest, user) -> tuple: + """ + Determine what access is required for this request from this user to be accepted. + + Determine the necessary access types for which the given user needs to be authorized in order for the user to + be allowed to submit this request, in the context of the current state of the system. + + Parameters + ---------- + request + user + + Returns + ------- + A tuple of required access types required for authorization for the given request at this time. + """ + # TODO: implement; in particular, consider things like current job count for user, and whether different access + # types are required at different counts. + # FIXME: for now, just use the default type (which happens to be "everything") + return self._default_cal_required_access_type, diff --git a/python/lib/externalrequests/dmod/test/it_model_exec_request_handler.py b/python/lib/externalrequests/dmod/test/it_model_exec_request_handler.py index 532d8f1c4..b5994b202 100644 --- a/python/lib/externalrequests/dmod/test/it_model_exec_request_handler.py +++ b/python/lib/externalrequests/dmod/test/it_model_exec_request_handler.py @@ -212,9 +212,9 @@ def setUp(self) -> None: #self._handler = None self.handler = ModelExecRequestHandler(session_manager=self.session_manager, authorizer=self.success_authorizer, - scheduler_host=self.scheduler_host, - scheduler_port=self.scheduler_port, - scheduler_ssl_dir=self.scheduler_ssl_dir) + service_host=self.scheduler_host, + service_port=self.scheduler_port, + service_ssl_dir=self.scheduler_ssl_dir) def tearDown(self) -> None: pass diff --git a/python/lib/scheduler/dmod/scheduler/scheduler.py b/python/lib/scheduler/dmod/scheduler/scheduler.py index 1382009ce..092164097 100644 --- a/python/lib/scheduler/dmod/scheduler/scheduler.py +++ b/python/lib/scheduler/dmod/scheduler/scheduler.py @@ -2,7 +2,7 @@ import logging from requests.exceptions import ReadTimeout -from dmod.communication import MessageEventType, NGENRequest, NWMRequest +from dmod.communication import AbstractNgenRequest, MessageEventType, NGENRequest, NWMRequest, NgenCalibrationRequest from dmod.core.exception import DmodRuntimeError from dmod.core.meta_data import DataCategory, DataFormat from os import getenv @@ -380,64 +380,108 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: https://docs.docker.com/engine/reference/builder/#understand-how-cmd-and-entrypoint-interact """ # TODO (later): handle non-model-exec jobs in the future - if job.model_request.event_type != MessageEventType.MODEL_EXEC_REQUEST: + valid_event_types = {MessageEventType.MODEL_EXEC_REQUEST, MessageEventType.CALIBRATION_REQUEST} + if job.model_request.event_type not in valid_event_types: raise RuntimeError("Unsupported requested job event type {}; cannot generate Docker CMD arg values".format( job.model_request.get_message_event_type())) # TODO (later): have something more intelligent than class type to determine right entrypoint format and # values, but for now assume/require a "standard" image - if not (isinstance(job.model_request, NWMRequest) or isinstance(job.model_request, NGENRequest)): + if not (isinstance(job.model_request, NWMRequest) or isinstance(job.model_request, AbstractNgenRequest)): raise RuntimeError("Unexpected request type {}: cannot build Docker CMD arg list".format( job.model_request.__class__.__name__)) # For now at least, all image arg lists start the same way (first 3 args: node count, host string, and job id) # TODO: this probably should be a documented standard for any future entrypoints # TODO (later): probably need to move all types to recognize and use explicit flags rather than order arguments - docker_cmd_args = [str(len(job.allocations)), self.build_host_list(job), job.job_id] + docker_cmd_args = [str(len(job.allocations)), self.build_host_list(job), str(job.job_id)] - if isinstance(job.model_request, NGENRequest): - # $4 is the worker index (where index 0 is assumed to be the lead node) - docker_cmd_args.append(str(worker_index)) - - # $5 is the name of the output dataset (which will imply a directory location) - output_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.OUTPUT, max_count=1) - docker_cmd_args.append(output_dataset_names[0]) - - # $6 is the name of the hydrofabric dataset (which will imply a directory location) - hydrofabric_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.HYDROFABRIC, max_count=1) - docker_cmd_args.append(hydrofabric_dataset_names[0]) - - # $7 is the name of the realization configuration dataset (which will imply a directory location) - realization_cfg_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, - data_format=DataFormat.NGEN_REALIZATION_CONFIG) - docker_cmd_args.append(realization_cfg_dataset_names[0]) - - # $8 is the name of the BMI config dataset (which will imply a directory location) - bmi_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, - data_format=DataFormat.BMI_CONFIG) - docker_cmd_args.append(bmi_config_dataset_names[0]) - - # $9 is the name of the partition config dataset (which will imply a directory location) - # TODO: this probably will eventually break things if $10 is added for calibration config dataset - # TODO: need to overhaul entrypoint for ngen and ngen-calibration images with flag-based args - if job.cpu_count > 1: - partition_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, - max_count=1, - data_format=DataFormat.NGEN_PARTITION_CONFIG) - docker_cmd_args.append(partition_config_dataset_names[0]) - - # $10 is the name of the calibration config dataset (which will imply a directory location) - # TODO: this *might* need to be added depending on how we decide to handle calibration - # configs. meaning if they are datasets or not. - # calibration_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, - # data_format=DataFormat.NGEN_CAL_CONFIG) - # docker_cmd_args.append(calibration_config_dataset_names[0]) - - # Also do a sanity check here to ensure there is at least one forcing dataset - self._ds_names_helper(job, worker_index, DataCategory.FORCING) + if isinstance(job.model_request, AbstractNgenRequest): + docker_cmd_args.extend(self._generate_nextgen_job_docker_cmd_args(job, worker_index)) return docker_cmd_args + def _generate_nextgen_job_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: + """ + Prepare the specific Docker CMD arg applicable to Nextgen-based jobs, which start with the 4th positional arg. + + Generate the necessary Docker CMD arguments required for starting a specified worker container that is part of a + Nextgen-based job executed within Docker. In general, this function should not be used except when called by + ::method:`_generate_docker_cmd_args`. Further, it only applies to Nextgen-based jobs: i.e., ngen model exec or + ngen calibration jobs. + + Since the general form of the required Docker CMD args (i.e., for any type, not only Nextgen-based jobs) + generated by ::method:`_generate_docker_cmd_args` always begins with the same 3 positional args, this function + effectively starts by producing positional argument number 4 and generates the remaining necessary CMD args. + + Parameters + ---------- + job : Job + The job to have worker Docker services started, with those services needing "CMD" arguments generated. + worker_index : int + The particular worker service index in question, which will have a specific set of data requirements. + + Returns + ------- + List[str] + The sublist (with index ``0`` of the sublist corresponding to index ``3`` of the final list) of Docker CMD + args for the associated job worker container. + + See Also + ------- + _generate_docker_cmd_args + """ + # Start with a sanity check + if not isinstance(job.model_request, AbstractNgenRequest): + msg = "Cannot generate Nextgen-base Docker job CMD args for job {} with request of {} type" + raise RuntimeError(msg.format(str(job.job_id), job.model_request.__class__.__name__)) + + # Remember, this list will start with $4 in the eventual complete Docker CMD list + ngen_cmd_args = [] + + # $4 is the worker index (where index 0 is assumed to be the lead node) + ngen_cmd_args.append(str(worker_index)) + + # $5 is the name of the output dataset (which will imply a directory location) + output_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.OUTPUT, max_count=1) + ngen_cmd_args.append(output_dataset_names[0]) + + # $6 is the name of the hydrofabric dataset (which will imply a directory location) + hydrofabric_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.HYDROFABRIC, max_count=1) + ngen_cmd_args.append(hydrofabric_dataset_names[0]) + + # $7 is the name of the realization configuration dataset (which will imply a directory location) + realization_cfg_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, + data_format=DataFormat.NGEN_REALIZATION_CONFIG) + ngen_cmd_args.append(realization_cfg_dataset_names[0]) + + # $8 is the name of the BMI config dataset (which will imply a directory location) + bmi_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, + data_format=DataFormat.BMI_CONFIG) + ngen_cmd_args.append(bmi_config_dataset_names[0]) + + # $9 is the name of the partition config dataset (which will imply a directory location) + # TODO: this probably will eventually break things if $10 is added for calibration config dataset + # TODO: need to overhaul entrypoint for ngen and ngen-calibration images with flag-based args + if job.cpu_count > 1: + partition_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, + data_format=DataFormat.NGEN_PARTITION_CONFIG) + ngen_cmd_args.append(partition_config_dataset_names[0]) + + # Also do a sanity check here to ensure there is at least one forcing dataset + self._ds_names_helper(job, worker_index, DataCategory.FORCING) + + # TODO: account for differences between regular ngen execution and calibration job + + # $10 is the name of the calibration config dataset (which will imply a directory location) + # TODO: this *might* need to be added depending on how we decide to handle calibration + # configs. meaning if they are datasets or not. + # calibration_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, + # data_format=DataFormat.NGEN_CAL_CONFIG) + # ngen_cmd_args.append(calibration_config_dataset_names[0]) + + return ngen_cmd_args + def _get_required_obj_store_datasets_arg_strings(self, job: 'Job', worker_index: int) -> List[str]: """ Get list of colon-joined category+name strings for required object store datasets for this job worker. @@ -501,6 +545,11 @@ def determine_image_for_job(self, job: 'Job') -> str: str String name, including tag, of the appropriate Docker image for this job. """ + # For now, these are the only two requests supported + # TODO: move registry name into environment variable other other more appropriate place + if isinstance(job.model_request, NgenCalibrationRequest): + return "127.0.0.1:5000/ngen-cal:latest" + if isinstance(job.model_request, NGENRequest): return "127.0.0.1:5000/ngen:latest" else: diff --git a/python/lib/scheduler/dmod/test/test_job.py b/python/lib/scheduler/dmod/test/test_job.py index 9a9f8e5cc..716c35438 100644 --- a/python/lib/scheduler/dmod/test/test_job.py +++ b/python/lib/scheduler/dmod/test/test_job.py @@ -81,7 +81,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: 'bmi_config_data_id': '02468', 'config_data_id': '02468' }, - 'session-secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c' + 'session_secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c' }) scheduler_request = SchedulerRequestMessage( model_request=NGENRequest.factory_init_from_deserialized_json(self._model_requests_json[2]), diff --git a/python/services/dataservice/dmod/test/test_service_manager.py b/python/services/dataservice/dmod/test/test_service_manager.py index 0c0f45e68..55090e099 100644 --- a/python/services/dataservice/dmod/test/test_service_manager.py +++ b/python/services/dataservice/dmod/test/test_service_manager.py @@ -163,7 +163,7 @@ def setUp(self) -> None: "variable": "TIME" } }, - "session-secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" + "session_secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" }, "user_id": "someone" }, @@ -223,7 +223,7 @@ def setUp(self) -> None: "variable": "TIME" } }, - "session-secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" + "session_secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" }, "user_id": "someone" }, @@ -289,7 +289,7 @@ def setUp(self) -> None: "variable": "TIME" } }, - "session-secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" + "session_secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" }, "user_id": "someone" }, @@ -353,7 +353,7 @@ def setUp(self) -> None: "variable": "TIME" } }, - "session-secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" + "session_secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" }, "user_id": "someone" }, @@ -423,7 +423,7 @@ def setUp(self) -> None: "variable": "TIME" } }, - "session-secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" + "session_secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" }, "user_id": "someone" }, @@ -581,7 +581,7 @@ def setUp(self) -> None: "variable": "TIME" } }, - "session-secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" + "session_secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" }, "user_id": "someone" }, @@ -733,7 +733,7 @@ def setUp(self) -> None: "variable": "TIME" } }, - "session-secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" + "session_secret": "381191cc9b5917b4fb7135e12915dd36513d0483c3c3890bc331a7346cda1cb1" }, "user_id": "someone" }, diff --git a/python/services/requestservice/dmod/requestservice/_version.py b/python/services/requestservice/dmod/requestservice/_version.py index 83e147c62..3966a5f15 100644 --- a/python/services/requestservice/dmod/requestservice/_version.py +++ b/python/services/requestservice/dmod/requestservice/_version.py @@ -1 +1 @@ -__version__ = '0.6.0' \ No newline at end of file +__version__ = '0.6.1' \ No newline at end of file diff --git a/python/services/requestservice/dmod/requestservice/service.py b/python/services/requestservice/dmod/requestservice/service.py index f10a160f5..7e74d71f2 100755 --- a/python/services/requestservice/dmod/requestservice/service.py +++ b/python/services/requestservice/dmod/requestservice/service.py @@ -12,12 +12,13 @@ from dmod.access import DummyAuthUtil, RedisBackendSessionManager from dmod.communication import AbstractInitRequest, InvalidMessageResponse, MessageEventType, NGENRequest, NWMRequest, \ - PartitionRequest, WebSocketSessionsInterface, SessionInitMessage, SchedulerClient, UnsupportedMessageTypeResponse + NgenCalibrationRequest, PartitionRequest, WebSocketSessionsInterface, SessionInitMessage, SchedulerClient, \ + UnsupportedMessageTypeResponse from dmod.communication.dataset_management_message import MaaSDatasetManagementMessage -from dmod.externalrequests import AuthHandler, DatasetRequestHandler, ModelExecRequestHandler, PartitionRequestHandler -from dmod.externalrequests import EvaluationRequestHandler +from dmod.externalrequests import AuthHandler, DatasetRequestHandler, ModelExecRequestHandler, \ + NgenCalibrationRequestHandler, PartitionRequestHandler, EvaluationRequestHandler -from .alternate_service import EvaluationMessage +from .alternate_service import LaunchEvaluationMessage, OpenEvaluationMessage logging.basicConfig( level=logging.DEBUG, @@ -45,11 +46,13 @@ class RequestService(WebSocketSessionsInterface): """ _PARSEABLE_REQUEST_TYPES = [ SessionInitMessage, + NgenCalibrationRequest, NWMRequest, NGENRequest, MaaSDatasetManagementMessage, PartitionRequest, - EvaluationMessage + LaunchEvaluationMessage, + OpenEvaluationMessage ] """ Parseable request types, which are all authenticated ::class:`ExternalRequest` subtypes for this implementation. """ @@ -71,19 +74,19 @@ def __init__(self, listen_host='', scheduler_port: Union[str, int] = 3013, partitioner_host: str = 'partitioner-service', data_service_host: str = 'data-service', - evaluation_service_host: str = 'evaluation-service', + evaluation_service_host: str = 'evaluation-service', partitioner_port: Union[str, int] = 3014, data_service_port: Union[str, int] = 3015, - evaluation_service_port: Union[str, int] = 3016, + evaluation_service_port: Union[str, int] = 3016, ssl_dir=None, cert_pem=None, priv_key_pem=None, scheduler_ssl_dir=None, partitioner_ssl_dir=None, data_service_ssl_dir=None, - evaluation_service_ssl_dir=None, - **kwargs - ): + evaluation_service_ssl_dir=None, + **kwargs + ): super().__init__(listen_host=listen_host, port=port, ssl_dir=ssl_dir, cert_pem=cert_pem, priv_key_pem=priv_key_pem) self._session_manager: RedisBackendSessionManager = RedisBackendSessionManager() @@ -120,23 +123,30 @@ def __init__(self, listen_host='', self._model_exec_request_handler = ModelExecRequestHandler(session_manager=self._session_manager, authorizer=self.authorizer, - scheduler_host=scheduler_host, - scheduler_port=int(scheduler_port), - scheduler_ssl_dir=self.scheduler_client_ssl_dir) + service_host=scheduler_host, + service_port=int(scheduler_port), + service_ssl_dir=self.scheduler_client_ssl_dir) + + self._calibration_request_handler = NgenCalibrationRequestHandler(session_manager=self._session_manager, + authorizer=self.authorizer, + service_host=scheduler_host, + service_port=int(scheduler_port), + service_ssl_dir=self.scheduler_client_ssl_dir) self._partition_request_handler = PartitionRequestHandler(session_manager=self._session_manager, authorizer=self.authorizer, - partition_service_host=partitioner_host, - partition_service_port=int(partitioner_port), - partition_service_ssl_dir=self.partitioner_ssl_dir) + service_host=partitioner_host, + service_port=int(partitioner_port), + service_ssl_dir=self.partitioner_ssl_dir) self._data_service_handler = DatasetRequestHandler(session_manager=self._session_manager, authorizer=self.authorizer, - data_service_host=data_service_host, - data_service_port=int(data_service_port), - data_service_ssl_dir=self.data_service_ssl_dir) + service_host=data_service_host, + service_port=int(data_service_port), + service_ssl_dir=self.data_service_ssl_dir) self._evaluation_service_handler = EvaluationRequestHandler( + target_service='evaluation-service', service_host=evaluation_service_host, service_port=evaluation_service_port, ssl_directory=evaluation_service_ssl_dir @@ -159,7 +169,7 @@ async def listener(self, websocket: WebSocketServerProtocol, path): req_message = await self.deserialized_message(message_data=data) event_type = MessageEventType.INVALID if req_message is None else req_message.get_message_event_type() - if isinstance(req_message, EvaluationMessage): + if isinstance(req_message, LaunchEvaluationMessage) or isinstance(req_message, OpenEvaluationMessage): response = await self._evaluation_service_handler.handle_request( request=req_message, socket=websocket, @@ -195,6 +205,11 @@ async def listener(self, websocket: WebSocketServerProtocol, path): response = await self._partition_request_handler.handle_request(request=req_message) logging.debug('************************* Handled request response: {}'.format(str(response))) await websocket.send(str(response)) + elif event_type == MessageEventType.CALIBRATION_REQUEST: + logging.debug('Handled calibration request') + response = await self._calibration_request_handler.handle_request(request=req_message) + logging.debug('Processed calibration request; response was: {}'.format(str(response))) + await websocket.send(str(response)) # FIXME: add another message type for closing a session else: msg = 'Received valid ' + event_type.name + ' request, but listener does not currently support' diff --git a/python/services/requestservice/setup.py b/python/services/requestservice/setup.py index 08fd704e7..9c8b2bec5 100644 --- a/python/services/requestservice/setup.py +++ b/python/services/requestservice/setup.py @@ -17,7 +17,7 @@ author_email='', url='', license='', - install_requires=['websockets', 'dmod-core>=0.1.0', 'dmod-communication>=0.11.0', 'dmod-access>=0.2.0', - 'dmod-externalrequests>=0.3.0'], + install_requires=['websockets', 'dmod-core>=0.3.0', 'dmod-communication>=0.12.0', 'dmod-access>=0.2.0', + 'dmod-externalrequests>=0.4.0'], packages=find_namespace_packages(exclude=['dmod.test', 'schemas', 'ssl', 'src']) ) diff --git a/python/services/schedulerservice/setup.py b/python/services/schedulerservice/setup.py index d602afb64..697656b15 100644 --- a/python/services/schedulerservice/setup.py +++ b/python/services/schedulerservice/setup.py @@ -17,6 +17,6 @@ author_email='', url='', license='', - install_requires=['dmod-core>=0.2.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0'], + install_requires=['dmod-core>=0.2.0', 'dmod-communication>=0.12.0', 'dmod-scheduler>=0.10.0'], packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src']) )