diff --git a/docker/main/dataservice/Dockerfile b/docker/main/dataservice/Dockerfile index 804e4e2c1..113bd53b2 100644 --- a/docker/main/dataservice/Dockerfile +++ b/docker/main/dataservice/Dockerfile @@ -1,6 +1,6 @@ ARG docker_internal_registry ################################################################################################################ -FROM ${docker_internal_registry}/dmod-py-sources as sources +FROM ${docker_internal_registry}/dmod-py-sources:latest as sources ################################################################################################################ FROM python:3.8-alpine3.15 diff --git a/docker/main/ngen/entrypoint.sh b/docker/main/ngen/entrypoint.sh index 239b7fd88..b9f568935 100755 --- a/docker/main/ngen/entrypoint.sh +++ b/docker/main/ngen/entrypoint.sh @@ -19,7 +19,10 @@ OUTPUT_DATASET_NAME="${5:?}" HYDROFABRIC_DATASET_NAME="${6:?}" REALIZATION_CONFIG_DATASET_NAME="${7:?}" BMI_CONFIG_DATASET_NAME="${8:?}" -PARTITION_DATASET_NAME="${9:?}" +# Don't require a partitioning config when only using a single node +if [ ${MPI_NODE_COUNT:?} -gt 1 ]; then + PARTITION_DATASET_NAME="${9:?No argument for partition config dataset when expecting one for MPI-based job}" +fi ACCESS_KEY_SECRET="object_store_exec_user_name" SECRET_KEY_SECRET="object_store_exec_user_passwd" @@ -36,6 +39,9 @@ fi MPI_RUN="mpirun" #NGEN_EXECUTABLE="ngen" +NGEN_SERIAL_EXECUTABLE="/ngen/ngen/cmake_build_serial/ngen" +NGEN_PARALLEL_EXECUTABLE="/ngen/ngen/cmake_build_parallel/ngen" +# This will be symlinked to the parallel one currently NGEN_EXECUTABLE="/ngen/ngen/cmake_build/ngen" ALL_DATASET_DIR="/dmod/datasets" @@ -43,7 +49,10 @@ OUTPUT_DATASET_DIR="${ALL_DATASET_DIR}/output/${OUTPUT_DATASET_NAME}" HYDROFABRIC_DATASET_DIR="${ALL_DATASET_DIR}/hydrofabric/${HYDROFABRIC_DATASET_NAME}" REALIZATION_CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${REALIZATION_CONFIG_DATASET_NAME}" BMI_CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${BMI_CONFIG_DATASET_NAME}" -PARTITION_DATASET_DIR="${ALL_DATASET_DIR}/config/${PARTITION_DATASET_NAME}" +# Don't require a partitioning dataset when only using a single node +if [ ${MPI_NODE_COUNT:?} -gt 1 ]; then + PARTITION_DATASET_DIR="${ALL_DATASET_DIR}/config/${PARTITION_DATASET_NAME:?No partition config dataset name for directory}" +fi RUN_SENTINEL="/home/${MPI_USER}/.run_sentinel" @@ -127,10 +136,32 @@ exec_main_worker_ngen_run() return ${NGEN_RETURN} } +exec_serial_ngen_run() +{ + echo "$(print_date) Skipping host checks since job uses ${MPI_NODE_COUNT} worker hosts and framework will run serially" + + # Execute the model on the linked data + echo "$(print_date) Executing serial build of ngen" + ${NGEN_SERIAL_EXECUTABLE:?} ${HYDROFABRIC_DATASET_DIR}/catchment_data.geojson "" \ + ${HYDROFABRIC_DATASET_DIR}/nexus_data.geojson "" \ + ${REALIZATION_CONFIG_DATASET_DIR}/realization_config.json + + #Capture the return value to use as service exit code + NGEN_RETURN=$? + + echo "$(print_date) serial ngen command finished with return value: ${NGEN_RETURN}" + + # Exit with the model's exit code + return ${NGEN_RETURN} +} + # Sanity check that the output, hydrofabric, and config datasets are available (i.e., their directories are in place) check_for_dataset_dir "${REALIZATION_CONFIG_DATASET_DIR}" check_for_dataset_dir "${BMI_CONFIG_DATASET_DIR}" -check_for_dataset_dir "${PARTITION_DATASET_DIR}" +# Don't require a partitioning dataset when only using a single node +if [ ${MPI_NODE_COUNT:?} -gt 1 ]; then + check_for_dataset_dir "${PARTITION_DATASET_DIR:?No partition dataset directory defined}" +fi check_for_dataset_dir "${HYDROFABRIC_DATASET_DIR}" check_for_dataset_dir "${OUTPUT_DATASET_DIR}" @@ -139,7 +170,11 @@ cd ${OUTPUT_DATASET_DIR} if [ "${WORKER_INDEX}" = "0" ]; then if [ "$(whoami)" = "${MPI_USER}" ]; then - exec_main_worker_ngen_run + if [ ${MPI_NODE_COUNT:-1} -gt 1 ]; then + exec_main_worker_ngen_run + else + exec_serial_ngen_run + fi else echo "$(print_date) Starting SSH daemon on main worker" /usr/sbin/sshd -D & diff --git a/docker/py-sources/py-sources.Dockerfile b/docker/py-sources/py-sources.Dockerfile index 9e015ad6e..3d190d092 100644 --- a/docker/py-sources/py-sources.Dockerfile +++ b/docker/py-sources/py-sources.Dockerfile @@ -1,5 +1,5 @@ ARG docker_internal_registry -FROM ${docker_internal_registry}/dmod-py-deps as basis +FROM ${docker_internal_registry}/dmod-py-deps:latest as basis # Copy these needed for sourced functions used by build scripts in later stages RUN mkdir -p /dmod/scripts/shared COPY ./scripts/dist_package.sh /dmod/scripts diff --git a/python/lib/communication/dmod/communication/_version.py b/python/lib/communication/dmod/communication/_version.py index e754a834e..f323a57be 100644 --- a/python/lib/communication/dmod/communication/_version.py +++ b/python/lib/communication/dmod/communication/_version.py @@ -1 +1 @@ -__version__ = '0.10.1' +__version__ = '0.11.0' diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_request.py b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_request.py index 074045370..b1745952f 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_request.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_request.py @@ -219,13 +219,15 @@ def data_requirements(self) -> List[DataRequirement]: List[DataRequirement] List of all the explicit and implied data requirements for this request. """ - return [ + requirements = [ self.bmi_cfg_data_requirement, self.forcing_data_requirement, self.hydrofabric_data_requirement, - self.partition_cfg_data_requirement, self.realization_cfg_data_requirement, ] + if self.use_parallel_ngen: + requirements.append(self.partition_cfg_data_requirement) + return requirements @property def bmi_config_data_id(self) -> str: @@ -291,6 +293,7 @@ def forcing_data_requirement(self) -> DataRequirement: if self._forcing_data_requirement is None: # TODO: going to need to address the CSV usage later forcing_domain = DataDomain( + # TODO: come back to this to change to other type data_format=DataFormat.AORC_CSV, continuous_restrictions=[self._time_range], discrete_restrictions=[self._gen_catchments_domain_restriction()], @@ -357,6 +360,54 @@ def hydrofabric_uid(self) -> str: """ return self._hydrofabric_uid + @property + def use_parallel_ngen(self) -> bool: + """ + Whether this request specifies to use the variant of the NextGen framework compiled for parallel execution. + + NextGen may be compiled to execute either serially or using parallelization. DMOD and its NextGen job workers + can now support either. This property indicates whether this request indicates that parallel execution should + be used. + + In the current implementation, this property is ``True`` IFF ::method:`use_serial_ngen` is ``False``. Note that + this will result in CPU counts of ``0`` or negative numbers, if they were to occur, also resulting in this + returning ``True``. + + Returns + ------- + bool + Whether this request specifies parallel NextGen execution for the job. + + See Also + ------- + use_serial_ngen + """ + return not self.use_serial_ngen + + @property + def use_serial_ngen(self) -> bool: + """ + Whether this request specifies to use the variant of the NextGen framework compiled for serial execution. + + NextGen may be compiled to execute either serially or using parallelization. DMOD and its NextGen job workers + can now support either. This property indicates whether this request indicates that serially execution should + be used. + + In the current implementation, this property is ``True`` IFF the request required a CPU count of exactly ``1``. + + Returns + ------- + bool + Whether this request specifies serial NextGen execution for the job. + + See Also + ------- + use_parallel_ngen + """ + return self.cpu_count == 1 + + + @property def output_formats(self) -> List[DataFormat]: """ @@ -387,16 +438,16 @@ def partition_cfg_data_id(self) -> Optional[str]: return self._part_config_data_id @property - def partition_cfg_data_requirement(self) -> DataRequirement: + def partition_cfg_data_requirement(self) -> Optional[DataRequirement]: """ A requirement object defining of the partitioning configuration data needed to execute this request. Returns ------- - DataRequirement - A requirement object defining of the partitioning configuration data needed to execute this request. + Optional[DataRequirement] + Requirement object defining of the partitioning configuration data needed to execute this request. """ - if self._partition_cfg_data_requirement is None: + if self._partition_cfg_data_requirement is None and self.use_parallel_ngen: d_restricts = [] # Add restriction on hydrofabric diff --git a/python/lib/communication/dmod/test/test_ngen_request.py b/python/lib/communication/dmod/test/test_ngen_request.py index b26aee074..53528e086 100644 --- a/python/lib/communication/dmod/test/test_ngen_request.py +++ b/python/lib/communication/dmod/test/test_ngen_request.py @@ -2,7 +2,7 @@ import unittest from ..communication.maas_request import NGENRequest, NGENRequestResponse from ..test.test_ngen_request_response import TestNGENRequestResponse -from dmod.core.meta_data import TimeRange +from dmod.core.meta_data import DataFormat, TimeRange class TestNGENRequest(unittest.TestCase): @@ -95,6 +95,39 @@ def create_time_range(begin, end, var=None) -> TimeRange: bmi_cfg_data_id='02468', catchments=cat_ids_list)) + # Example 2 - like example 0, but with a CPU count of 1 (which should not require partitioning) + time_range = create_time_range('2022-01-01 00:00:00', '2022-03-01 00:00:00') + cpu_count_ex_2 = 1 + self.time_ranges.append(time_range) + self.request_strings.append( + '{"model": {"allocation_paradigm": "SINGLE_NODE", "bmi_config_data_id": "02468", "config_data_id": "02468", ' + '"cpu_count": ' + str(cpu_count_ex_2) + ', "hydrofabric_data_id": "9876543210", ' + '"hydrofabric_uid": "0123456789", "name": "ngen", "time_range": ' + time_range.to_json() + '}, ' + '"session-secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"}' + ) + self.request_jsons.append({ + 'model': { + 'name': 'ngen', + 'allocation_paradigm': 'SINGLE_NODE', + 'cpu_count': cpu_count_ex_2, + 'time_range': time_range.to_dict(), + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': '0123456789', + 'bmi_config_data_id': '02468', + 'config_data_id': '02468' + }, + 'session-secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c' + }) + self.request_objs.append( + NGENRequest(session_secret='f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c', + cpu_count=cpu_count_ex_2, + allocation_paradigm='SINGLE_NODE', + time_range=time_range, + hydrofabric_uid="0123456789", + hydrofabric_data_id='9876543210', + bmi_cfg_data_id='02468', + config_data_id='02468')) + def test_factory_init_from_deserialized_json_0_a(self): """ Assert that :meth:`NGENRequest.factory_init_from_deserialized_json` produces an equal object to the @@ -142,6 +175,28 @@ def test_factory_init_correct_response_subtype_1_a(self): obj = NGENRequest.factory_init_correct_response_subtype(json_obj) self.assertEqual(obj.__class__, NGENRequestResponse) + def test_data_requirements_0_a(self): + example_index = 0 + obj = self.request_objs[example_index] + self.assertIsNotNone(obj.partition_cfg_data_requirement) + + def test_data_requirements_0_b(self): + example_index = 0 + obj = self.request_objs[example_index] + partition_reqs = [r for r in obj.data_requirements if r.domain.data_format == DataFormat.NGEN_PARTITION_CONFIG] + self.assertEqual(len(partition_reqs), 1) + + def test_data_requirements_2_a(self): + example_index = 2 + obj = self.request_objs[example_index] + self.assertIsNone(obj.partition_cfg_data_requirement) + + def test_data_requirements_2_b(self): + example_index = 2 + obj = self.request_objs[example_index] + partition_reqs = [r for r in obj.data_requirements if r.domain.data_format == DataFormat.NGEN_PARTITION_CONFIG] + self.assertEqual(len(partition_reqs), 0) + def test_to_dict_0_a(self): """ Assert that the example object at the 0th index serializes to a dict as expected by comparing to the pre-set diff --git a/python/lib/scheduler/dmod/scheduler/_version.py b/python/lib/scheduler/dmod/scheduler/_version.py index 1f0478037..9d1bb721b 100644 --- a/python/lib/scheduler/dmod/scheduler/_version.py +++ b/python/lib/scheduler/dmod/scheduler/_version.py @@ -1 +1 @@ -__version__ = '0.9.2' +__version__ = '0.10.0' diff --git a/python/lib/scheduler/dmod/scheduler/scheduler.py b/python/lib/scheduler/dmod/scheduler/scheduler.py index 87f771bb3..1382009ce 100644 --- a/python/lib/scheduler/dmod/scheduler/scheduler.py +++ b/python/lib/scheduler/dmod/scheduler/scheduler.py @@ -418,9 +418,20 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]: docker_cmd_args.append(bmi_config_dataset_names[0]) # $9 is the name of the partition config dataset (which will imply a directory location) - partition_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, - data_format=DataFormat.NGEN_PARTITION_CONFIG) - docker_cmd_args.append(partition_config_dataset_names[0]) + # TODO: this probably will eventually break things if $10 is added for calibration config dataset + # TODO: need to overhaul entrypoint for ngen and ngen-calibration images with flag-based args + if job.cpu_count > 1: + partition_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, + max_count=1, + data_format=DataFormat.NGEN_PARTITION_CONFIG) + docker_cmd_args.append(partition_config_dataset_names[0]) + + # $10 is the name of the calibration config dataset (which will imply a directory location) + # TODO: this *might* need to be added depending on how we decide to handle calibration + # configs. meaning if they are datasets or not. + # calibration_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1, + # data_format=DataFormat.NGEN_CAL_CONFIG) + # docker_cmd_args.append(calibration_config_dataset_names[0]) # Also do a sanity check here to ensure there is at least one forcing dataset self._ds_names_helper(job, worker_index, DataCategory.FORCING) @@ -492,7 +503,6 @@ def determine_image_for_job(self, job: 'Job') -> str: """ if isinstance(job.model_request, NGENRequest): return "127.0.0.1:5000/ngen:latest" - # For now, this is the only thing supported else: msg = "Unable to determine correct scheduler image for job {} with request of {} type" raise DmodRuntimeError(msg.format(job.job_id, job.model_request.__class__.__name__)) diff --git a/python/services/dataservice/dmod/dataservice/_version.py b/python/services/dataservice/dmod/dataservice/_version.py index b703f5c96..9bdd4d277 100644 --- a/python/services/dataservice/dmod/dataservice/_version.py +++ b/python/services/dataservice/dmod/dataservice/_version.py @@ -1 +1 @@ -__version__ = '0.4.1' \ No newline at end of file +__version__ = '0.5.0' \ No newline at end of file diff --git a/python/services/dataservice/dmod/dataservice/service.py b/python/services/dataservice/dmod/dataservice/service.py index 04e1fa0af..88c362496 100644 --- a/python/services/dataservice/dmod/dataservice/service.py +++ b/python/services/dataservice/dmod/dataservice/service.py @@ -953,7 +953,7 @@ async def manage_required_data_checks(self): logging.info("All required data for {} is available.".format(job.job_id)) # Before moving to next successful step, also create output datasets and requirement entries self._create_output_datasets(job) - job.status_step = JobExecStep.AWAITING_PARTITIONING + job.status_step = JobExecStep.AWAITING_PARTITIONING if job.cpu_count > 1 else JobExecStep.AWAITING_ALLOCATION else: logging.error("Some or all required data for {} is unprovideable.".format(job.job_id)) job.status_step = JobExecStep.DATA_UNPROVIDEABLE diff --git a/python/services/dataservice/setup.py b/python/services/dataservice/setup.py index b839e55c9..5e83ded8b 100644 --- a/python/services/dataservice/setup.py +++ b/python/services/dataservice/setup.py @@ -17,7 +17,7 @@ author_email='', url='', license='', - install_requires=['dmod-core>=0.3.0', 'dmod-communication>=0.7.1', 'dmod-scheduler>=0.7.0', 'dmod-modeldata>=0.9.0', + install_requires=['dmod-core>=0.3.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0', 'dmod-modeldata>=0.9.0', 'redis'], packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src']) ) diff --git a/python/services/partitionerservice/dmod/partitionerservice/_version.py b/python/services/partitionerservice/dmod/partitionerservice/_version.py index fb13a3556..9dd16a345 100644 --- a/python/services/partitionerservice/dmod/partitionerservice/_version.py +++ b/python/services/partitionerservice/dmod/partitionerservice/_version.py @@ -1 +1 @@ -__version__ = '0.2.1' \ No newline at end of file +__version__ = '0.2.2' \ No newline at end of file diff --git a/python/services/partitionerservice/dmod/partitionerservice/service.py b/python/services/partitionerservice/dmod/partitionerservice/service.py index e57dd81cf..1fb244122 100644 --- a/python/services/partitionerservice/dmod/partitionerservice/service.py +++ b/python/services/partitionerservice/dmod/partitionerservice/service.py @@ -454,6 +454,12 @@ async def manage_job_partitioning(self): for job in [j for j in self._job_util.get_all_active_jobs() if j.status_step == JobExecStep.AWAITING_PARTITIONING]: + + if job.cpu_count == 1: + logging.warning("No need to partition job {} with only 1 CPU allocated".format(job.job_id)) + job.status_step = JobExecStep.AWAITING_ALLOCATION + continue + logging.info("Processing partitioning for active job {}".format(job.job_id)) try: # See if there is already an existing dataset to use for this diff --git a/python/services/requestservice/dmod/requestservice/_version.py b/python/services/requestservice/dmod/requestservice/_version.py index f93e0653b..83e147c62 100644 --- a/python/services/requestservice/dmod/requestservice/_version.py +++ b/python/services/requestservice/dmod/requestservice/_version.py @@ -1 +1 @@ -__version__ = '0.5.2' \ No newline at end of file +__version__ = '0.6.0' \ No newline at end of file diff --git a/python/services/requestservice/setup.py b/python/services/requestservice/setup.py index fafbc703d..08fd704e7 100644 --- a/python/services/requestservice/setup.py +++ b/python/services/requestservice/setup.py @@ -17,7 +17,7 @@ author_email='', url='', license='', - install_requires=['websockets', 'dmod-core>=0.1.0', 'dmod-communication>=0.7.0', 'dmod-access>=0.2.0', + install_requires=['websockets', 'dmod-core>=0.1.0', 'dmod-communication>=0.11.0', 'dmod-access>=0.2.0', 'dmod-externalrequests>=0.3.0'], packages=find_namespace_packages(exclude=['dmod.test', 'schemas', 'ssl', 'src']) ) diff --git a/python/services/schedulerservice/dmod/schedulerservice/_version.py b/python/services/schedulerservice/dmod/schedulerservice/_version.py index 7320e64e1..ccf9e6286 100644 --- a/python/services/schedulerservice/dmod/schedulerservice/_version.py +++ b/python/services/schedulerservice/dmod/schedulerservice/_version.py @@ -1 +1 @@ -__version__ = '0.7.1' \ No newline at end of file +__version__ = '0.8.0' \ No newline at end of file diff --git a/python/services/schedulerservice/setup.py b/python/services/schedulerservice/setup.py index df2b72527..d602afb64 100644 --- a/python/services/schedulerservice/setup.py +++ b/python/services/schedulerservice/setup.py @@ -17,6 +17,6 @@ author_email='', url='', license='', - install_requires=['dmod-core>=0.2.0', 'dmod-communication>=0.8.0', 'dmod-scheduler>=0.9.0'], + install_requires=['dmod-core>=0.2.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0'], packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src']) )