Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
af10188
Enhance deserialization of NGENRequest.
robertbartel Nov 1, 2022
97c6e60
Add calibration request messages.
robertbartel Nov 1, 2022
b35ab46
Add new types to communication package __init__.
robertbartel Nov 1, 2022
0c3bc77
Fix init of existing request handlers.
robertbartel Nov 2, 2022
89a1f1d
Fix tests for request handler init updates.
robertbartel Nov 2, 2022
7d9de51
Bug in request handler service_client lazy init.
robertbartel Nov 2, 2022
447c02b
Add new NgenCalibrationRequestHandler class.
robertbartel Nov 2, 2022
5e6ecd9
Add means for type-specific handler preprocessing.
robertbartel Nov 2, 2022
f9c5b0e
Update requestservice internal deps versions.
robertbartel Nov 2, 2022
fbb3875
Update requestservice for changes to handler init.
robertbartel Nov 2, 2022
249f898
resolve external requests init conflict
aaraney Dec 1, 2022
ece36f5
resolve request message import conflict in request service
aaraney Dec 1, 2022
224aecd
fix merge conflict
aaraney Dec 1, 2022
825de44
recursively search for available model subclasses
aaraney Dec 6, 2022
a8bbf84
add ngen-cal model_name to NgenCalibrationRequest type
aaraney Dec 6, 2022
41fd24b
deserialize NgenCalibrationRequest evaluation_time_range
aaraney Dec 6, 2022
b65bb4c
fix NgenCalibrationResponse subclass type.
aaraney Dec 6, 2022
ed83b8f
add missing await.
aaraney Dec 6, 2022
670ad6a
Fix bugs/spacing in request srv handler for eval.
robertbartel Jan 20, 2023
d7fba0a
Bump request-service package version to 0.6.1.
robertbartel Mar 15, 2023
43ec1b8
Add support for calibration jobs in scheduler.
robertbartel Mar 15, 2023
ace141f
NOTE: MAY REMOVE LATER. Hard coded dmod.client code for submitting ng…
aaraney Dec 6, 2022
d207276
Fix __init__/__int__ bug in AbstractInitRequest.
robertbartel Feb 16, 2023
86d0dba
Fix __init__/__int__ bug in DmodJobRequest.
robertbartel Feb 16, 2023
465cf9c
Move up some request attrs in class hierarchy.
robertbartel Feb 16, 2023
ddb2c0b
Abstracting NextGen job request to new type.
robertbartel Feb 17, 2023
fc75638
Move NGENRequest to new ExternalNextGenRequest.
robertbartel Feb 17, 2023
99a952f
Move calibration req to ExternalNextGenRequest.
robertbartel Feb 17, 2023
92636a3
Fix NGENRequest tests for session_secret change.
robertbartel Feb 17, 2023
31c4fb3
Fix client for remove evaluation_time_range.
robertbartel Feb 17, 2023
9bc1ad0
Ensure comms types are available from package.
robertbartel Mar 16, 2023
5708199
Refactor scheduler handling nextgen job CMD args.
robertbartel Mar 16, 2023
b20b2e8
Fix bug w/ calibration request default CPU count.
robertbartel Mar 16, 2023
8d78fd0
Fix bug in AbstractNextGenRequest deserializer.
robertbartel Mar 16, 2023
83ed51f
Fix/improve hashing in meta_data.py classes.
robertbartel Mar 16, 2023
ae784bf
Fixing test issues after serialization change.
robertbartel Mar 16, 2023
c396bf8
Convenience parser for TimeRange.
robertbartel Mar 17, 2023
2ebfc0b
Refactoring client setup for ngen jobs/requests.
robertbartel Mar 17, 2023
a64236a
Parameterizing client cached session file.
robertbartel Mar 21, 2023
0ebe5bf
Renaming abstract ngen request types.
robertbartel Mar 21, 2023
24af22b
Update calibration request for super name change.
robertbartel Mar 21, 2023
cf17afd
Update ngen request for super name change.
robertbartel Mar 21, 2023
d8c7ba7
Update AbstractNgenRequest name in scheduler.py.
robertbartel Mar 21, 2023
44e83a4
Fix dataservice tests after dependency change.
robertbartel Mar 22, 2023
92a6cc2
Fix scheduler tests after dependency change.
robertbartel Mar 22, 2023
f9883e5
Bump comms package version to 0.12.0.
robertbartel Mar 22, 2023
dfdef35
Bump scheduler service comms deps to latest.
robertbartel Mar 22, 2023
ad97a24
Fix bug w/ partition_cfg_data_requirement.
robertbartel Mar 22, 2023
6c2de91
Implement use_serial_ngen in calibration request.
robertbartel Mar 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 113 additions & 25 deletions python/lib/client/dmod/client/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from . import name as package_name
from .dmod_client import YamlClientConfig, DmodClient
from dmod.communication.client import get_or_create_eventloop
from dmod.core.meta_data import ContinuousRestriction, DataCategory, DataDomain, DataFormat, DiscreteRestriction
from dmod.core.meta_data import ContinuousRestriction, DataCategory, DataDomain, DataFormat, DiscreteRestriction, \
TimeRange
from pathlib import Path
from typing import List, Optional, Tuple
from typing import Any, List, Optional, Tuple

DEFAULT_CLIENT_CONFIG_BASENAME = '.dmod_client_config.yml'

Expand All @@ -19,6 +20,52 @@ class DmodCliArgumentError(ValueError):
pass


def _create_ngen_based_exec_parser(subcommand_container: Any, parser_name: str,
default_alloc_paradigm: AllocationParadigm) -> argparse.ArgumentParser:
"""
Helper function to create a nested parser under the ``exec`` command for different NextGen-related workflows.

Parameters
----------
subcommand_container
The ``workflow`` subcommand "special action object" created by ::method:`ArgumentParser.add_subparsers`, which
is a child of the ``exec`` parser, and to which the new nested parser is to be added.
parser_name : str
The name to give to the new parser to be added.
default_alloc_paradigm : AllocationParadigm
The default ::class:`AllocationParadigm` value to use when adding the ``--allocation-paradigm`` argument to the
parser.

Returns
-------
argparse.ArgumentParser
The newly created and associated subparser.
"""
new_parser = subcommand_container.add_parser(parser_name)
new_parser.add_argument('--partition-config-data-id', dest='partition_cfg_data_id', default=None,
help='Provide data_id for desired partition config dataset.')
new_parser.add_argument('--allocation-paradigm',
dest='allocation_paradigm',
type=AllocationParadigm.get_from_name,
choices=[val.name.lower() for val in AllocationParadigm],
default=default_alloc_paradigm,
help='Specify job resource allocation paradigm to use.')
new_parser.add_argument('--catchment-ids', dest='catchments', nargs='+', help='Specify catchment subset.')

date_format = DataDomain.get_datetime_str_format()
print_date_format = 'YYYY-mm-dd HH:MM:SS'

new_parser.add_argument('time_range', type=TimeRange.parse_from_string,
help='Model time range ({} to {})'.format(print_date_format, print_date_format))
new_parser.add_argument('hydrofabric_data_id', help='Identifier of dataset of required hydrofabric')
new_parser.add_argument('hydrofabric_uid', help='Unique identifier of required hydrofabric')
new_parser.add_argument('config_data_id', help='Identifier of dataset of required realization config')
new_parser.add_argument('bmi_cfg_data_id', help='Identifier of dataset of required BMI init configs')
new_parser.add_argument('cpu_count', type=int, help='Provide the desired number of processes for the execution')

return new_parser


def _handle_config_command_args(parent_subparsers_container):
"""
Handle setup of arg parsing for 'config' command, which allows for various operations related to config.
Expand All @@ -43,6 +90,10 @@ def _handle_exec_command_args(parent_subparsers_container):
parent_subparsers_container
The top-level parent container for subparsers of various commands, including the 'exec' command, to which
some numbers of nested subparser containers and parsers will be added.

See Also
----------
_create_ngen_based_exec_subparser
"""
# A parser for the 'exec' command itself, underneath the parent 'command' subparsers container
command_parser = parent_subparsers_container.add_parser('exec')
Expand All @@ -52,33 +103,66 @@ def _handle_exec_command_args(parent_subparsers_container):
workflow_subparsers.required = True

# Nested parser for the 'ngen' action
parser_ngen = workflow_subparsers.add_parser('ngen')

parser_ngen.add_argument('--partition-config-data-id', dest='partition_cfg_data_id', default=None,
help='Provide data_id for desired partition config dataset.')
parser_ngen.add_argument('--allocation-paradigm',
dest='allocation_paradigm',
type=AllocationParadigm.get_from_name,
choices=[val.name.lower() for val in AllocationParadigm],
default=AllocationParadigm.get_default_selection(),
help='Specify job resource allocation paradigm to use.')
parser_ngen.add_argument('--catchment-ids', dest='cat_ids', nargs='+', help='Specify catchment subset.')
date_format = DataDomain.get_datetime_str_format()
printable_date_format = 'YYYY-mm-dd HH:MM:SS'
parser_ngen = _create_ngen_based_exec_parser(subcommand_container=workflow_subparsers, parser_name='ngen',
default_alloc_paradigm=AllocationParadigm.get_default_selection())

# TODO: default alloc paradigm needs to be GROUPED_SINGLE_NODE once that has been approved and added
# Nested parser for the 'ngen_cal' action, which is very similar to the 'ngen' parser
parser_ngen_cal = _create_ngen_based_exec_parser(subcommand_container=workflow_subparsers, parser_name='ngen_cal',
default_alloc_paradigm=AllocationParadigm.get_default_selection())

def date_parser(date_time_str: str) -> datetime.datetime:
# Calibration parser needs a few more calibration-specific items
def positive_int(arg_val: str):
try:
return datetime.datetime.strptime(date_time_str, date_format)
arg_as_int = int(arg_val)
except ValueError:
raise argparse.ArgumentTypeError("Not a valid date: {}".format(date_time_str))
raise argparse.ArgumentTypeError("Non-integer value '%s' provided when positive integer expected" % arg_val)
if arg_as_int <= 0:
raise argparse.ArgumentTypeError("Invalid value '%s': expected integer greater than 0" % arg_val)
return arg_as_int

parser_ngen.add_argument('start', type=date_parser, help='Model start date and time ({})'.format(printable_date_format))
parser_ngen.add_argument('end', type=date_parser, help='Model end date and time ({})'.format(printable_date_format))
parser_ngen.add_argument('hydrofabric_data_id', help='Identifier of dataset of required hydrofabric')
parser_ngen.add_argument('hydrofabric_uid', help='Unique identifier of required hydrofabric')
parser_ngen.add_argument('realization_cfg_data_id', help='Identifier of dataset of required realization config')
parser_ngen.add_argument('bmi_cfg_data_id', help='Identifier of dataset of required BMI init configs')
parser_ngen.add_argument('cpu_count', type=int, help='Provide the desired number of processes for the execution')
def model_calibration_param(arg_val: str):
split_arg = arg_val.split(',')
try:
if len(split_arg) != 4:
raise RuntimeError
# Support float args in any order by sorting, since min, max, and other/init will always be self-evident
float_values = sorted([float(split_arg[i]) for i in [1, 2, 3]])
# Return is (param, (min, max, init))
return split_arg[0], (float_values[0], float_values[2], float_values[1])
except:
raise argparse.ArgumentTypeError("Invalid arg '%s'; format must be <str>,<float>,<float>,<float>" % arg_val)

parser_ngen_cal.add_argument('--calibrated-param', dest='model_cal_params', type=model_calibration_param,
nargs='+', metavar='PARAM_NAME,MIN_VAL,MAX_VAL,INIT_VAL',
help='Description of parameters to calibrate, as comma delimited string')

parser_ngen_cal.add_argument('--job-name', default=None, dest='job_name', help='Optional job name.')
# TODO (later): add more choices once available
parser_ngen_cal.add_argument('--strategy', default='estimation', dest='cal_strategy_type',
choices=['estimation'], help='The ngen_cal calibration strategy.')
# TODO (later): need to add other supported algorithms (there should be a few more now)
parser_ngen_cal.add_argument('--algorithm', type=str, default='dds', dest='cal_strategy_algorithm',
choices=['dds'], help='The ngen_cal parameter search algorithm.')
parser_ngen_cal.add_argument('--objective-function', default='nnse', dest='cal_strategy_objective_func',
choices=["kling_gupta", "nnse", "custom", "single_peak", "volume"],
help='The ngen_cal objective function.')
parser_ngen_cal.add_argument('--is-objective-func-minimized', type=bool, default=True,
dest='is_objective_func_minimized',
help='Whether the target of objective function is minimized or maximized.')
parser_ngen_cal.add_argument('--iterations', type=positive_int, default=100, dest='iterations',
help='The number of ngen_cal iterations.')
# TODO (later): in the future, figure out how to best handle this kind of scenario
#parser_ngen_cal.add_argument('--is-restart', action='store_true', dest='is_restart',
# help='Whether this is restarting a previous job.')
#ngen calibration strategies include
#uniform: Each catchment shares the same parameter space, evaluates at one observable nexus
#independet: Each catchment upstream of observable nexus gets its own permuated parameter space, evalutates at one observable nexus
#explicit: only calibrates basins in the realization_config with a "calibration" definition and an observable nexus
# TODO: add this kind of information to the help message
parser_ngen_cal.add_argument('--model-strategy', default='uniform', dest='model_strategy',
choices=["uniform", "independent", "explicit"],
help='The model calibration strategy used by ngen_cal.')


def _handle_dataset_command_args(parent_subparsers_container):
Expand Down Expand Up @@ -449,9 +533,13 @@ def execute_jobs_command(args, client: DmodClient):

def execute_workflow_command(args, client: DmodClient):
async_loop = get_or_create_eventloop()
# TODO: aaraney
if args.workflow == 'ngen':
result = async_loop.run_until_complete(client.submit_ngen_request(**(vars(args))))
print(result)
elif args.workflow == "ngen_cal":
result = async_loop.run_until_complete(client.submit_ngen_cal_request(**(vars(args))))
print(result)
else:
print("ERROR: Unsupported execution workflow {}".format(args.workflow))
exit(1)
Expand Down
17 changes: 16 additions & 1 deletion python/lib/client/dmod/client/dmod_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dmod.core.execution import AllocationParadigm
from dmod.core.meta_data import DataCategory, DataDomain, DataFormat, DiscreteRestriction
from .request_clients import DatasetClient, DatasetExternalClient, DatasetInternalClient, NgenRequestClient
from .request_clients import DatasetClient, DatasetExternalClient, DatasetInternalClient, NgenRequestClient, NgenCalRequestClient
from .client_config import YamlClientConfig
from datetime import datetime
from pathlib import Path
Expand All @@ -13,6 +13,7 @@ def __init__(self, client_config: YamlClientConfig, bypass_request_service: bool
self._client_config = client_config
self._dataset_client = None
self._ngen_client = None
self._ngen_cal_client = None
self._bypass_request_service = bypass_request_service

@property
Expand Down Expand Up @@ -103,6 +104,12 @@ def ngen_request_client(self) -> NgenRequestClient:
self._ngen_client = NgenRequestClient(self.requests_endpoint_uri, self.requests_ssl_dir)
return self._ngen_client

@property
def ngen_cal_request_client(self) -> NgenCalRequestClient:
if self._ngen_cal_client is None:
self._ngen_cal_client = NgenCalRequestClient(self.requests_endpoint_uri, self.requests_ssl_dir)
return self._ngen_cal_client

async def delete_dataset(self, dataset_name: str, **kwargs):
return await self.dataset_client.delete_dataset(dataset_name, **kwargs)

Expand Down Expand Up @@ -237,6 +244,14 @@ async def submit_ngen_request(self, start: datetime, end: datetime, hydrofabric_
cpu_count, realization_cfg_data_id, bmi_cfg_data_id,
partition_cfg_data_id, cat_ids, allocation_paradigm)

async def submit_ngen_cal_request(self, start: datetime, end: datetime, hydrofabric_data_id: str, hydrofabric_uid: str,
cpu_count: int, realization_cfg_data_id: str, bmi_cfg_data_id: str, ngen_cal_cfg_data_id: str,
partition_cfg_data_id: Optional[str] = None, cat_ids: Optional[List[str]] = None,
allocation_paradigm: Optional[AllocationParadigm] = None, *args, **kwargs):
return await self.ngen_cal_request_client.request_exec(start, end, hydrofabric_data_id, hydrofabric_uid,
cpu_count, realization_cfg_data_id, bmi_cfg_data_id, ngen_cal_cfg_data_id,
partition_cfg_data_id, cat_ids, allocation_paradigm)

def print_config(self):
print(self.client_config.print_config())

Expand Down
44 changes: 27 additions & 17 deletions python/lib/client/dmod/client/request_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from datetime import datetime
from dmod.core.execution import AllocationParadigm
from dmod.communication import DataServiceClient, ExternalRequestClient, ManagementAction, ModelExecRequestClient, \
NGENRequest, NGENRequestResponse
NGENRequest, NGENRequestResponse, \
NgenCalibrationRequest, NgenCalibrationResponse
from dmod.communication.client import R
from dmod.communication.dataset_management_message import DatasetManagementMessage, DatasetManagementResponse, \
MaaSDatasetManagementMessage, MaaSDatasetManagementResponse, QueryType, DatasetQuery
Expand All @@ -21,25 +22,34 @@
class NgenRequestClient(ModelExecRequestClient[NGENRequest, NGENRequestResponse]):

# In particular needs - endpoint_uri: str, ssl_directory: Path
def __init__(self, *args, **kwargs):
def __init__(self, cached_session_file: Optional[Path] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._cached_session_file = Path.home().joinpath('.dmod_client_session')
if cached_session_file is None:
self._cached_session_file = Path.home().joinpath('.dmod_client_session')
else:
self._cached_session_file = cached_session_file

# TODO: need some integration tests for this and CLI main and arg parsing
async def request_exec(self, *args, **kwargs) -> NGENRequestResponse:
await self._async_acquire_session_info()
request = NGENRequest(session_secret=self.session_secret, *args, **kwargs)
return await self.async_make_request(request)


class NgenCalRequestClient(ModelExecRequestClient[NgenCalibrationRequest, NgenCalibrationResponse]):

# In particular needs - endpoint_uri: str, ssl_directory: Path
def __init__(self, cached_session_file: Optional[Path] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
if cached_session_file is None:
self._cached_session_file = Path.home().joinpath('.dmod_client_session')
else:
self._cached_session_file = cached_session_file

async def request_exec(self, start: datetime, end: datetime, hydrofabric_data_id: str, hydrofabric_uid: str,
cpu_count: int, realization_cfg_data_id: str, bmi_cfg_data_id: str,
partition_cfg_data_id: Optional[str] = None, cat_ids: Optional[List[str]] = None,
allocation_paradigm: Optional[AllocationParadigm] = None) -> NGENRequestResponse:
# TODO: need some integration tests for this and CLI main and arg parsing
async def request_exec(self, *args, **kwargs) -> NgenCalibrationResponse:
await self._async_acquire_session_info()
request = NGENRequest(session_secret=self.session_secret,
cpu_count=cpu_count,
allocation_paradigm=allocation_paradigm,
time_range=TimeRange(begin=start, end=end),
hydrofabric_uid=hydrofabric_uid,
hydrofabric_data_id=hydrofabric_data_id,
config_data_id=realization_cfg_data_id,
bmi_cfg_data_id=bmi_cfg_data_id,
partition_cfg_data_id=partition_cfg_data_id,
catchments=cat_ids)
request = NgenCalibrationRequest(session_secret=self.session_secret, *args, **kwargs)
return await self.async_make_request(request)


Expand Down
5 changes: 3 additions & 2 deletions python/lib/communication/dmod/communication/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from .client import DataServiceClient, InternalServiceClient, ModelExecRequestClient, ExternalRequestClient, \
PartitionerServiceClient, SchedulerClient
from .maas_request import get_available_models, get_available_outputs, get_distribution_types, get_parameters, \
get_request, Distribution, ExternalRequest, ExternalRequestResponse, ModelExecRequest, ModelExecRequestResponse, \
NWMRequest, NWMRequestResponse, Scalar, NGENRequest, NGENRequestResponse
get_request, AbstractNgenRequest, Distribution, DmodJobRequest, ExternalRequest, ExternalRequestResponse,\
ModelExecRequest, ModelExecRequestResponse, NWMRequest, NWMRequestResponse, Scalar, NGENRequest, \
NGENRequestResponse, NgenCalibrationRequest, NgenCalibrationResponse
from .message import AbstractInitRequest, MessageEventType, Message, Response, InvalidMessage, InvalidMessageResponse, \
InitRequestResponseReason
from .metadata_message import MetadataPurpose, MetadataMessage, MetadataResponse
Expand Down
2 changes: 1 addition & 1 deletion python/lib/communication/dmod/communication/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.11.0'
__version__ = '0.12.0'
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
get_request,
)
from .distribution import Distribution
from .dmod_job_request import DmodJobRequest
from .parameter import Scalar
from .external_request import ExternalRequest
from .external_request_response import ExternalRequestResponse
from .model_exec_request import ModelExecRequest, get_available_models
from .model_exec_request_response import ModelExecRequestResponse
from .nwm import NWMRequest, NWMRequestResponse
from .ngen import NGENRequest, NGENRequestResponse
from .ngen import AbstractNgenRequest, NGENRequest, NGENRequestResponse, NgenCalibrationRequest, \
NgenCalibrationResponse
Loading