Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/main/dataservice/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ARG docker_internal_registry
################################################################################################################
FROM ${docker_internal_registry}/dmod-py-sources as sources
FROM ${docker_internal_registry}/dmod-py-sources:latest as sources

################################################################################################################
FROM python:3.8-alpine3.15
Expand Down
43 changes: 39 additions & 4 deletions docker/main/ngen/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ OUTPUT_DATASET_NAME="${5:?}"
HYDROFABRIC_DATASET_NAME="${6:?}"
REALIZATION_CONFIG_DATASET_NAME="${7:?}"
BMI_CONFIG_DATASET_NAME="${8:?}"
PARTITION_DATASET_NAME="${9:?}"
# Don't require a partitioning config when only using a single node
if [ ${MPI_NODE_COUNT:?} -gt 1 ]; then
PARTITION_DATASET_NAME="${9:?No argument for partition config dataset when expecting one for MPI-based job}"
fi

ACCESS_KEY_SECRET="object_store_exec_user_name"
SECRET_KEY_SECRET="object_store_exec_user_passwd"
Expand All @@ -36,14 +39,20 @@ fi

MPI_RUN="mpirun"
#NGEN_EXECUTABLE="ngen"
NGEN_SERIAL_EXECUTABLE="/ngen/ngen/cmake_build_serial/ngen"
NGEN_PARALLEL_EXECUTABLE="/ngen/ngen/cmake_build_parallel/ngen"
# This will be symlinked to the parallel one currently
NGEN_EXECUTABLE="/ngen/ngen/cmake_build/ngen"

ALL_DATASET_DIR="/dmod/datasets"
OUTPUT_DATASET_DIR="${ALL_DATASET_DIR}/output/${OUTPUT_DATASET_NAME}"
HYDROFABRIC_DATASET_DIR="${ALL_DATASET_DIR}/hydrofabric/${HYDROFABRIC_DATASET_NAME}"
REALIZATION_CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${REALIZATION_CONFIG_DATASET_NAME}"
BMI_CONFIG_DATASET_DIR="${ALL_DATASET_DIR}/config/${BMI_CONFIG_DATASET_NAME}"
PARTITION_DATASET_DIR="${ALL_DATASET_DIR}/config/${PARTITION_DATASET_NAME}"
# Don't require a partitioning dataset when only using a single node
if [ ${MPI_NODE_COUNT:?} -gt 1 ]; then
PARTITION_DATASET_DIR="${ALL_DATASET_DIR}/config/${PARTITION_DATASET_NAME:?No partition config dataset name for directory}"
fi

RUN_SENTINEL="/home/${MPI_USER}/.run_sentinel"

Expand Down Expand Up @@ -127,10 +136,32 @@ exec_main_worker_ngen_run()
return ${NGEN_RETURN}
}

exec_serial_ngen_run()
{
echo "$(print_date) Skipping host checks since job uses ${MPI_NODE_COUNT} worker hosts and framework will run serially"

# Execute the model on the linked data
echo "$(print_date) Executing serial build of ngen"
${NGEN_SERIAL_EXECUTABLE:?} ${HYDROFABRIC_DATASET_DIR}/catchment_data.geojson "" \
${HYDROFABRIC_DATASET_DIR}/nexus_data.geojson "" \
${REALIZATION_CONFIG_DATASET_DIR}/realization_config.json

#Capture the return value to use as service exit code
NGEN_RETURN=$?

echo "$(print_date) serial ngen command finished with return value: ${NGEN_RETURN}"

# Exit with the model's exit code
return ${NGEN_RETURN}
}

# Sanity check that the output, hydrofabric, and config datasets are available (i.e., their directories are in place)
check_for_dataset_dir "${REALIZATION_CONFIG_DATASET_DIR}"
check_for_dataset_dir "${BMI_CONFIG_DATASET_DIR}"
check_for_dataset_dir "${PARTITION_DATASET_DIR}"
# Don't require a partitioning dataset when only using a single node
if [ ${MPI_NODE_COUNT:?} -gt 1 ]; then
check_for_dataset_dir "${PARTITION_DATASET_DIR:?No partition dataset directory defined}"
fi
check_for_dataset_dir "${HYDROFABRIC_DATASET_DIR}"
check_for_dataset_dir "${OUTPUT_DATASET_DIR}"

Expand All @@ -139,7 +170,11 @@ cd ${OUTPUT_DATASET_DIR}

if [ "${WORKER_INDEX}" = "0" ]; then
if [ "$(whoami)" = "${MPI_USER}" ]; then
exec_main_worker_ngen_run
if [ ${MPI_NODE_COUNT:-1} -gt 1 ]; then
exec_main_worker_ngen_run
else
exec_serial_ngen_run
fi
else
echo "$(print_date) Starting SSH daemon on main worker"
/usr/sbin/sshd -D &
Expand Down
2 changes: 1 addition & 1 deletion docker/py-sources/py-sources.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ARG docker_internal_registry
FROM ${docker_internal_registry}/dmod-py-deps as basis
FROM ${docker_internal_registry}/dmod-py-deps:latest as basis
# Copy these needed for sourced functions used by build scripts in later stages
RUN mkdir -p /dmod/scripts/shared
COPY ./scripts/dist_package.sh /dmod/scripts
Expand Down
2 changes: 1 addition & 1 deletion python/lib/communication/dmod/communication/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.10.1'
__version__ = '0.11.0'
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,15 @@ def data_requirements(self) -> List[DataRequirement]:
List[DataRequirement]
List of all the explicit and implied data requirements for this request.
"""
return [
requirements = [
self.bmi_cfg_data_requirement,
self.forcing_data_requirement,
self.hydrofabric_data_requirement,
self.partition_cfg_data_requirement,
self.realization_cfg_data_requirement,
]
if self.use_parallel_ngen:
requirements.append(self.partition_cfg_data_requirement)
return requirements

@property
def bmi_config_data_id(self) -> str:
Expand Down Expand Up @@ -291,6 +293,7 @@ def forcing_data_requirement(self) -> DataRequirement:
if self._forcing_data_requirement is None:
# TODO: going to need to address the CSV usage later
forcing_domain = DataDomain(
# TODO: come back to this to change to other type
data_format=DataFormat.AORC_CSV,
continuous_restrictions=[self._time_range],
discrete_restrictions=[self._gen_catchments_domain_restriction()],
Expand Down Expand Up @@ -357,6 +360,54 @@ def hydrofabric_uid(self) -> str:
"""
return self._hydrofabric_uid

@property
def use_parallel_ngen(self) -> bool:
"""
Whether this request specifies to use the variant of the NextGen framework compiled for parallel execution.

NextGen may be compiled to execute either serially or using parallelization. DMOD and its NextGen job workers
can now support either. This property indicates whether this request indicates that parallel execution should
be used.

In the current implementation, this property is ``True`` IFF ::method:`use_serial_ngen` is ``False``. Note that
this will result in CPU counts of ``0`` or negative numbers, if they were to occur, also resulting in this
returning ``True``.

Returns
-------
bool
Whether this request specifies parallel NextGen execution for the job.

See Also
-------
use_serial_ngen
"""
return not self.use_serial_ngen

@property
def use_serial_ngen(self) -> bool:
"""
Whether this request specifies to use the variant of the NextGen framework compiled for serial execution.

NextGen may be compiled to execute either serially or using parallelization. DMOD and its NextGen job workers
can now support either. This property indicates whether this request indicates that serially execution should
be used.

In the current implementation, this property is ``True`` IFF the request required a CPU count of exactly ``1``.

Returns
-------
bool
Whether this request specifies serial NextGen execution for the job.

See Also
-------
use_parallel_ngen
"""
return self.cpu_count == 1



@property
def output_formats(self) -> List[DataFormat]:
"""
Expand Down Expand Up @@ -387,16 +438,16 @@ def partition_cfg_data_id(self) -> Optional[str]:
return self._part_config_data_id

@property
def partition_cfg_data_requirement(self) -> DataRequirement:
def partition_cfg_data_requirement(self) -> Optional[DataRequirement]:
"""
A requirement object defining of the partitioning configuration data needed to execute this request.

Returns
-------
DataRequirement
A requirement object defining of the partitioning configuration data needed to execute this request.
Optional[DataRequirement]
Requirement object defining of the partitioning configuration data needed to execute this request.
"""
if self._partition_cfg_data_requirement is None:
if self._partition_cfg_data_requirement is None and self.use_parallel_ngen:
d_restricts = []

# Add restriction on hydrofabric
Expand Down
57 changes: 56 additions & 1 deletion python/lib/communication/dmod/test/test_ngen_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import unittest
from ..communication.maas_request import NGENRequest, NGENRequestResponse
from ..test.test_ngen_request_response import TestNGENRequestResponse
from dmod.core.meta_data import TimeRange
from dmod.core.meta_data import DataFormat, TimeRange


class TestNGENRequest(unittest.TestCase):
Expand Down Expand Up @@ -95,6 +95,39 @@ def create_time_range(begin, end, var=None) -> TimeRange:
bmi_cfg_data_id='02468',
catchments=cat_ids_list))

# Example 2 - like example 0, but with a CPU count of 1 (which should not require partitioning)
time_range = create_time_range('2022-01-01 00:00:00', '2022-03-01 00:00:00')
cpu_count_ex_2 = 1
self.time_ranges.append(time_range)
self.request_strings.append(
'{"model": {"allocation_paradigm": "SINGLE_NODE", "bmi_config_data_id": "02468", "config_data_id": "02468", '
'"cpu_count": ' + str(cpu_count_ex_2) + ', "hydrofabric_data_id": "9876543210", '
'"hydrofabric_uid": "0123456789", "name": "ngen", "time_range": ' + time_range.to_json() + '}, '
'"session-secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"}'
)
self.request_jsons.append({
'model': {
'name': 'ngen',
'allocation_paradigm': 'SINGLE_NODE',
'cpu_count': cpu_count_ex_2,
'time_range': time_range.to_dict(),
'hydrofabric_data_id': '9876543210',
'hydrofabric_uid': '0123456789',
'bmi_config_data_id': '02468',
'config_data_id': '02468'
},
'session-secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c'
})
self.request_objs.append(
NGENRequest(session_secret='f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c',
cpu_count=cpu_count_ex_2,
allocation_paradigm='SINGLE_NODE',
time_range=time_range,
hydrofabric_uid="0123456789",
hydrofabric_data_id='9876543210',
bmi_cfg_data_id='02468',
config_data_id='02468'))

def test_factory_init_from_deserialized_json_0_a(self):
"""
Assert that :meth:`NGENRequest.factory_init_from_deserialized_json` produces an equal object to the
Expand Down Expand Up @@ -142,6 +175,28 @@ def test_factory_init_correct_response_subtype_1_a(self):
obj = NGENRequest.factory_init_correct_response_subtype(json_obj)
self.assertEqual(obj.__class__, NGENRequestResponse)

def test_data_requirements_0_a(self):
example_index = 0
obj = self.request_objs[example_index]
self.assertIsNotNone(obj.partition_cfg_data_requirement)

def test_data_requirements_0_b(self):
example_index = 0
obj = self.request_objs[example_index]
partition_reqs = [r for r in obj.data_requirements if r.domain.data_format == DataFormat.NGEN_PARTITION_CONFIG]
self.assertEqual(len(partition_reqs), 1)

def test_data_requirements_2_a(self):
example_index = 2
obj = self.request_objs[example_index]
self.assertIsNone(obj.partition_cfg_data_requirement)

def test_data_requirements_2_b(self):
example_index = 2
obj = self.request_objs[example_index]
partition_reqs = [r for r in obj.data_requirements if r.domain.data_format == DataFormat.NGEN_PARTITION_CONFIG]
self.assertEqual(len(partition_reqs), 0)

def test_to_dict_0_a(self):
"""
Assert that the example object at the 0th index serializes to a dict as expected by comparing to the pre-set
Expand Down
2 changes: 1 addition & 1 deletion python/lib/scheduler/dmod/scheduler/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.9.2'
__version__ = '0.10.0'
18 changes: 14 additions & 4 deletions python/lib/scheduler/dmod/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,20 @@ def _generate_docker_cmd_args(self, job: 'Job', worker_index: int) -> List[str]:
docker_cmd_args.append(bmi_config_dataset_names[0])

# $9 is the name of the partition config dataset (which will imply a directory location)
partition_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1,
data_format=DataFormat.NGEN_PARTITION_CONFIG)
docker_cmd_args.append(partition_config_dataset_names[0])
# TODO: this probably will eventually break things if $10 is added for calibration config dataset
# TODO: need to overhaul entrypoint for ngen and ngen-calibration images with flag-based args
if job.cpu_count > 1:
partition_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG,
max_count=1,
data_format=DataFormat.NGEN_PARTITION_CONFIG)
docker_cmd_args.append(partition_config_dataset_names[0])

# $10 is the name of the calibration config dataset (which will imply a directory location)
# TODO: this *might* need to be added depending on how we decide to handle calibration
# configs. meaning if they are datasets or not.
# calibration_config_dataset_names = self._ds_names_helper(job, worker_index, DataCategory.CONFIG, max_count=1,
# data_format=DataFormat.NGEN_CAL_CONFIG)
# docker_cmd_args.append(calibration_config_dataset_names[0])

# Also do a sanity check here to ensure there is at least one forcing dataset
self._ds_names_helper(job, worker_index, DataCategory.FORCING)
Expand Down Expand Up @@ -492,7 +503,6 @@ def determine_image_for_job(self, job: 'Job') -> str:
"""
if isinstance(job.model_request, NGENRequest):
return "127.0.0.1:5000/ngen:latest"
# For now, this is the only thing supported
else:
msg = "Unable to determine correct scheduler image for job {} with request of {} type"
raise DmodRuntimeError(msg.format(job.job_id, job.model_request.__class__.__name__))
Expand Down
2 changes: 1 addition & 1 deletion python/services/dataservice/dmod/dataservice/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.4.1'
__version__ = '0.5.0'
2 changes: 1 addition & 1 deletion python/services/dataservice/dmod/dataservice/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ async def manage_required_data_checks(self):
logging.info("All required data for {} is available.".format(job.job_id))
# Before moving to next successful step, also create output datasets and requirement entries
self._create_output_datasets(job)
job.status_step = JobExecStep.AWAITING_PARTITIONING
job.status_step = JobExecStep.AWAITING_PARTITIONING if job.cpu_count > 1 else JobExecStep.AWAITING_ALLOCATION
else:
logging.error("Some or all required data for {} is unprovideable.".format(job.job_id))
job.status_step = JobExecStep.DATA_UNPROVIDEABLE
Expand Down
2 changes: 1 addition & 1 deletion python/services/dataservice/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
author_email='',
url='',
license='',
install_requires=['dmod-core>=0.3.0', 'dmod-communication>=0.7.1', 'dmod-scheduler>=0.7.0', 'dmod-modeldata>=0.9.0',
install_requires=['dmod-core>=0.3.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0', 'dmod-modeldata>=0.9.0',
'redis'],
packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src'])
)
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.1'
__version__ = '0.2.2'
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,12 @@ async def manage_job_partitioning(self):

for job in [j for j in self._job_util.get_all_active_jobs() if
j.status_step == JobExecStep.AWAITING_PARTITIONING]:

if job.cpu_count == 1:
logging.warning("No need to partition job {} with only 1 CPU allocated".format(job.job_id))
job.status_step = JobExecStep.AWAITING_ALLOCATION
continue

logging.info("Processing partitioning for active job {}".format(job.job_id))
try:
# See if there is already an existing dataset to use for this
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.5.2'
__version__ = '0.6.0'
2 changes: 1 addition & 1 deletion python/services/requestservice/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
author_email='',
url='',
license='',
install_requires=['websockets', 'dmod-core>=0.1.0', 'dmod-communication>=0.7.0', 'dmod-access>=0.2.0',
install_requires=['websockets', 'dmod-core>=0.1.0', 'dmod-communication>=0.11.0', 'dmod-access>=0.2.0',
'dmod-externalrequests>=0.3.0'],
packages=find_namespace_packages(exclude=['dmod.test', 'schemas', 'ssl', 'src'])
)
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.7.1'
__version__ = '0.8.0'
2 changes: 1 addition & 1 deletion python/services/schedulerservice/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
author_email='',
url='',
license='',
install_requires=['dmod-core>=0.2.0', 'dmod-communication>=0.8.0', 'dmod-scheduler>=0.9.0'],
install_requires=['dmod-core>=0.2.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0'],
packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src'])
)