Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions ros2param/ros2param/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,28 @@ def load_parameter_dict(*, node, node_name, parameter_dict):
print(msg, file=sys.stderr)


def load_parameter_file(*, node, node_name, parameter_file):
def load_parameter_file(*, node, node_name, parameter_file, use_wildcard):
# Remove leading slash and namespaces
with open(parameter_file, 'r') as f:
param_file = yaml.safe_load(f)
if node_name not in param_file:
param_keys = []
if use_wildcard and '/**' in param_file:
param_keys.append('/**')
if node_name in param_file:
param_keys.append(node_name)

if param_keys == []:
raise RuntimeError('Param file does not contain parameters for {}, '
' only for nodes: {}' .format(node_name, param_file.keys()))

value = param_file[node_name]
if type(value) != dict or 'ros__parameters' not in value:
raise RuntimeError('Invalid structure of parameter file for node {}'
'expected same format as provided by ros2 param dump'
.format(node_name))
load_parameter_dict(node=node, node_name=node_name,
parameter_dict=value['ros__parameters'])
param_dict = {}
for k in param_keys:
value = param_file[k]
if type(value) != dict or 'ros__parameters' not in value:
raise RuntimeError('Invalid structure of parameter file for node {}'
'expected same format as provided by ros2 param dump'
.format(k))
param_dict.update(value['ros__parameters'])
load_parameter_dict(node=node, node_name=node_name, parameter_dict=param_dict)


def call_describe_parameters(*, node, node_name, parameter_names=None):
Expand Down
8 changes: 6 additions & 2 deletions ros2param/ros2param/verb/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'--include-hidden-nodes', action='store_true',
help='Consider hidden nodes as well')
arg = parser.add_argument(
parser.add_argument(
'parameter_file', help='Parameter file')
parser.add_argument(
'--no-use-wildcard', action='store_true',
help="Do not load parameters in the '/**' namespace into the node")

def main(self, *, args): # noqa: D102
with NodeStrategy(args) as node:
Expand All @@ -47,4 +50,5 @@ def main(self, *, args): # noqa: D102
return 'Node not found'

with DirectNode(args) as node:
load_parameter_file(node=node, node_name=node_name, parameter_file=args.parameter_file)
load_parameter_file(node=node, node_name=node_name, parameter_file=args.parameter_file,
use_wildcard=not args.no_use_wildcard)
76 changes: 74 additions & 2 deletions ros2param/test/test_verb_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

from ros2cli.node.strategy import NodeStrategy

import yaml

TEST_NODE = 'test_node'
TEST_NAMESPACE = '/foo'

Expand Down Expand Up @@ -71,6 +73,17 @@
' str_param: Bye World\n'
' use_sim_time: false\n'
)
INPUT_WILDCARD_PARAMETER_FILE = (
'/**:\n'
' ros__parameters:\n'
' str_param: Wildcard\n'
' int_param: 12345\n'
)
INPUT_NODE_OVERLAY_PARAMETER_FILE = (
f'{TEST_NAMESPACE}/{TEST_NODE}:\n'
' ros__parameters:\n'
' str_param: Override\n'
)

# Skip cli tests on Windows while they exhibit pathological behavior
# https://github.com/ros2/build_farmer/issues/248
Expand Down Expand Up @@ -191,10 +204,10 @@ def setUp(self):
if timed_out:
self.fail(f'CLI daemon failed to find test node after {TEST_TIMEOUT} seconds')

def _write_param_file(self, tmpdir, filename):
def _write_param_file(self, tmpdir, filename, contents=INPUT_PARAMETER_FILE):
yaml_path = os.path.join(tmpdir, filename)
with open(yaml_path, 'w') as f:
f.write(INPUT_PARAMETER_FILE)
f.write(contents)
return yaml_path

def _output_file(self):
Expand Down Expand Up @@ -280,3 +293,62 @@ def test_verb_load(self):
text=param_dump_command.output,
strict=True
)

def test_verb_load_wildcard(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Try param file with only wildcard
filepath = self._write_param_file(tmpdir, 'params.yaml', INPUT_WILDCARD_PARAMETER_FILE)
with self.launch_param_load_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', filepath,
'--no-use-wildcard']
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code != launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=['Param file does not contain parameters for '
f'{TEST_NAMESPACE}/{TEST_NODE}'],
text=param_load_command.output,
strict=False
)

with self.launch_param_load_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', filepath]
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[''],
text=param_load_command.output,
strict=True
)
# Dump with ros2 param and check that wildcard parameters are loaded
with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--print']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
loaded_params = yaml.safe_load(param_dump_command.output)
params = loaded_params[f'{TEST_NAMESPACE}/{TEST_NODE}']['ros__parameters']
assert params['str_param'] == 'Wildcard'
assert params['int_param'] == 12345

# Concatenate wildcard + some overlays
filepath = self._write_param_file(tmpdir, 'params.yaml',
INPUT_WILDCARD_PARAMETER_FILE + '\n' +
INPUT_NODE_OVERLAY_PARAMETER_FILE)
with self.launch_param_load_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', filepath]
) as param_load_command:
assert param_load_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_load_command.exit_code == launch_testing.asserts.EXIT_OK

# Dump and check that wildcard parameters were overriden if in node namespace
with self.launch_param_dump_command(
arguments=[f'{TEST_NAMESPACE}/{TEST_NODE}', '--print']
) as param_dump_command:
assert param_dump_command.wait_for_shutdown(timeout=TEST_TIMEOUT)
assert param_dump_command.exit_code == launch_testing.asserts.EXIT_OK
loaded_params = yaml.safe_load(param_dump_command.output)
params = loaded_params[f'{TEST_NAMESPACE}/{TEST_NODE}']['ros__parameters']
assert params['str_param'] == 'Override' # Overriden
assert params['int_param'] == 12345 # Wildcard namespace