diff --git a/lib/syskit/network_generation/dataflow_dynamics.rb b/lib/syskit/network_generation/dataflow_dynamics.rb index 6b69b8bc3..a6267a593 100644 --- a/lib/syskit/network_generation/dataflow_dynamics.rb +++ b/lib/syskit/network_generation/dataflow_dynamics.rb @@ -553,6 +553,18 @@ def compute_connection_policies policy_graph end + # Merges two connection policies + # + # @param [Hash] default_policy the policy that has the lowest + # priority + # @param [Hash] explicit_policy the policy that has the highest + # priority + def merge_policy(default_policy, explicit_policy) + merged_policy = default_policy.merge(explicit_policy) + merged_policy.delete(:size) if merged_policy[:type] == :data + merged_policy + end + # @api private # # Compute the policies for all connections starting from a given task @@ -561,25 +573,20 @@ def compute_policies_from(connection_graph, source_task, policy_graph = {}) mappings = connection_graph.edge_info(source_task, sink_task) computed_policies = mappings.each_with_object({}) do |(port_pair, policy), h| - policy = policy.dup - fallback_policy = policy.delete(:fallback_policy) - if policy.empty? - h[port_pair] = - policy_for(source_task, *port_pair, sink_task, - fallback_policy) - else - h[port_pair] = policy - end + explicit_policy = policy.dup + fallback_policy = explicit_policy.delete(:fallback_policy) + h[port_pair] = policy_for( + source_task, *port_pair, sink_task, + fallback_policy, explicit_policy + ) end policy_graph[[source_task, sink_task]] = computed_policies end policy_graph end - # Given the current knowledge about the port dynamics, returns the - # policy for the provided connection - def policy_for( - source_task, source_port_name, sink_port_name, sink_task, fallback_policy + def policy_compute_data_element( + source_task, source_port_name, sink_task, sink_port_name, fallback_policy ) source_port = source_task.find_output_port(source_port_name) sink_port = sink_task.find_input_port(sink_port_name) @@ -618,9 +625,27 @@ def policy_for( "#{sink_port_m.required_connection_type} " \ "on #{sink_port}" end + policy + end + + def policy_for( + source_task, source_port_name, sink_port_name, sink_task, + fallback_policy, explicit_policy = {} + ) + computed_policy = + if explicit_policy[:type] + {} + else + policy_compute_data_element( + source_task, source_port_name, sink_task, + sink_port_name, fallback_policy + ) + end + + model = source_task.find_output_port(source_port_name).model + computed_policy[:init] = model.init_policy - source_port_m = source_port.model - policy.merge(init: source_port_m.init_policy?) + merge_policy(computed_policy, explicit_policy) end def compute_reliable_connection_policy( diff --git a/test/network_generation/test_dataflow_dynamics.rb b/test/network_generation/test_dataflow_dynamics.rb index b638b9856..9dded2e99 100644 --- a/test/network_generation/test_dataflow_dynamics.rb +++ b/test/network_generation/test_dataflow_dynamics.rb @@ -232,7 +232,7 @@ module NetworkGeneration task0.out_port.connect_to(task1.in_port) @dynamics.should_receive(:policy_for) - .with(task0, "out", "in", task1, nil) + .with(task0, "out", "in", task1, nil, {}) .and_return(type: :buffer, size: 42) policy_graph = @dynamics.compute_connection_policies @@ -252,7 +252,7 @@ module NetworkGeneration task0.out_port.connect_to(task1.in_port) @dynamics.should_receive(:policy_for) - .with(task0, "out", "in", task1, nil) + .with(task0, "out", "in", task1, nil, {}) .and_return(type: :buffer, size: 42, init: true) policy_graph = @dynamics.compute_connection_policies @@ -272,7 +272,7 @@ module NetworkGeneration task0.out_port.connect_to(task1.in_port) @dynamics.should_receive(:policy_for) - .with(task0, "out", "in", task1, nil) + .with(task0, "out", "in", task1, nil, {}) .and_return(type: :buffer, size: 42, init: false) policy_graph = @dynamics.compute_connection_policies @@ -290,7 +290,7 @@ module NetworkGeneration cmp.c_child.out_port.connect_to(task.in_port) @dynamics.should_receive(:policy_for) - .with(cmp.c_child, "out", "in", task, nil) + .with(cmp.c_child, "out", "in", task, nil, {}) .and_return(type: :buffer, size: 42) policy_graph = @dynamics.compute_connection_policies @@ -298,7 +298,7 @@ module NetworkGeneration policy_graph[[cmp.c_child, task]][%w[out in]]) end - it "uses in-graph policies over the computed ones" do + it "merges in-graph policies with the computed ones" do plan.add(task0 = @task_m.new) plan.add(task1 = @task_m.new) @@ -307,10 +307,13 @@ module NetworkGeneration task0.out_port.connect_to(task1.in_port, type: :buffer, size: 42) - @dynamics.should_receive(:policy_for).never + @dynamics + .should_receive(:policy_compute_data_element) + .with(task0, "out", "in", task1, nil) + .and_return(type: :buffer, size: 10, init: nil) policy_graph = @dynamics.compute_connection_policies - assert_equal({ type: :buffer, size: 42 }, + assert_equal({ type: :buffer, size: 42, init: nil }, policy_graph[[task0, task1]][%w[out in]]) end @@ -326,12 +329,12 @@ module NetworkGeneration ) @dynamics.should_receive(:policy_for) - .with(task0, "out", "in", task1, { type: :data }) - .and_return(type: :buffer, size: 42) + .with(task0, "out", "in", task1, { type: :data }, {}) + .and_return(type: :buffer, size: 42, init: nil) policy_graph = @dynamics.compute_connection_policies - assert_equal({ type: :buffer, size: 42 }, + assert_equal({ type: :buffer, size: 42, init: nil }, policy_graph[[task0, task1]][%w[out in]]) end @@ -348,9 +351,11 @@ module NetworkGeneration fallback_policy: { type: :data } ) - @dynamics.should_receive(:policy_for).never + @dynamics.should_receive(:policy_compute_data_element) + .with(task0, "out", "in", task1, { type: :data }) + .and_return({ type: :buffer, size: 42 }) policy_graph = @dynamics.compute_connection_policies - assert_equal({ type: :buffer, size: 42 }, + assert_equal({ type: :buffer, size: 42, init: nil }, policy_graph[[task0, task1]][%w[out in]]) end @@ -360,6 +365,145 @@ module NetworkGeneration add_agents(tasks[0, 2]) flexmock(@dynamics).should_receive(:propagate).with(tasks[0, 2]) end + + it "handles the case where the explicit policy sets the type to :data" do + plan.add(task0 = @task_m.new) + plan.add(task1 = @task_m.new) + + add_agents(tasks = [task0, task1]) + flexmock(@dynamics).should_receive(:propagate).with(tasks) + + task0.out_port.connect_to task1.in_port, type: :data + + flexmock(@dynamics) + .should_receive(:policy_compute_data_element) + .never + + policy_graph = @dynamics.compute_connection_policies + expected_policy = { type: :data, init: nil } + assert_equal(expected_policy, + policy_graph[[task0, task1]][%w[out in]]) + end + + it "makes sure the size is not overridden when explicit policy " \ + "sets the type to :buffer" do + plan.add(task0 = @task_m.new) + plan.add(task1 = @task_m.new) + + add_agents(tasks = [task0, task1]) + flexmock(@dynamics).should_receive(:propagate).with(tasks) + + task0.out_port.connect_to task1.in_port, type: :buffer, size: 42 + + flexmock(@dynamics) + .should_receive(:policy_compute_data_element) + .never + + policy_graph = @dynamics.compute_connection_policies + expected_policy = { type: :buffer, size: 42, init: nil } + assert_equal(expected_policy, + policy_graph[[task0, task1]][%w[out in]]) + end + end + + describe "merge_policy" do + before do + @dynamics = NetworkGeneration::DataFlowDynamics.new(plan) + end + + it "merges policies by preferring explicit values over " \ + "computed values" do + explicit_policy = { type: :buffer, size: 20, init: true } + computed_policy = { type: :buffer, size: 10, init: false } + + merged_policy = + @dynamics.merge_policy(computed_policy, explicit_policy) + + assert_equal({ type: :buffer, size: 20, init: true }, merged_policy) + end + + it "removes the size value when the type is set to :data" do + explicit_policy = { type: :data, init: true } + computed_policy = { type: :buffer, size: 10, init: false } + + merged_policy = + @dynamics.merge_policy(computed_policy, explicit_policy) + + assert_equal({ type: :data, init: true }, merged_policy) + end + + it "falls back to computed values when explicit values " \ + "are not provided" do + explicit_policy = { init: false } + computed_policy = { type: :buffer, size: 10, init: true } + + merged_policy = + @dynamics.merge_policy(computed_policy, explicit_policy) + + assert_equal({ type: :buffer, size: 10, init: false }, merged_policy) + end + end + + describe "#policy_compute_data_element" do + before do + @task_m = Syskit::TaskContext.new_submodel do + input_port "in", "/double" + output_port "out", "/double" + end + @source_task_m = @task_m.new_submodel + @sink_task_m = @task_m.new_submodel + plan.add(@source_t = @source_task_m.new) + plan.add(@sink_t = @sink_task_m.new) + end + + it "returns a data connection by default" do + policy = @dynamics.policy_compute_data_element( + @source_t, "out", @sink_t, "in", nil + ) + assert_equal :data, policy[:type] + end + + it "returns a buffer connection of size 1 if the sink's " \ + "required connection type is 'buffer'" do + @sink_task_m.in_port.needs_buffered_connection + policy = @dynamics.policy_compute_data_element( + @source_t, "out", @sink_t, "in", nil + ) + assert_equal :buffer, policy[:type] + assert_equal 1, policy[:size] + end + + it "raises if the required connection type is unknown " \ + "and needs_reliable_connection is not set" do + flexmock(@sink_task_m.in_port) + .should_receive(:required_connection_type).explicitly + .and_return(:something) + assert_raises(UnsupportedConnectionType) do + @dynamics.policy_compute_data_element( + @source_t, "out", @sink_t, "in", nil + ) + end + end + + it "returns the value from compute_reliable_connection_policy " \ + "if the sink port is marked as needs_reliable_connection" do + @sink_task_m.in_port.needs_reliable_connection + fallback_policy = flexmock + expected_policy = flexmock + + expected_policy + .should_receive(:merge) + .and_return(expected_policy) + + flexmock(@dynamics) + .should_receive(:compute_reliable_connection_policy) + .with(@source_t.out_port, @sink_t.in_port, fallback_policy) + .once.and_return(expected_policy) + policy = @dynamics.policy_compute_data_element( + @source_t, "out", @sink_t, "in", fallback_policy + ) + assert_equal expected_policy, policy + end end describe "#policy_for" do @@ -419,51 +563,34 @@ module NetworkGeneration it "returns the value from compute_reliable_connection_policy if " \ "the sink port is marked as needs_reliable_connection" do @sink_task_m.in_port.needs_reliable_connection - fallback_policy = flexmock - expected_policy = flexmock - - expected_policy - .should_receive(:merge) - .and_return(expected_policy) - flexmock(@dynamics) .should_receive(:compute_reliable_connection_policy) - .with(@source_t.out_port, @sink_t.in_port, fallback_policy) - .once.and_return(expected_policy) + .with(@source_t.out_port, @sink_t.in_port, { stage: "fallback" }) + .once.and_return({ stage: "reliable" }) + policy = @dynamics.policy_for( - @source_t, "out", "in", @sink_t, fallback_policy + @source_t, "out", "in", @sink_t, { stage: "fallback" } ) - assert_equal expected_policy, policy + assert_equal({ stage: "reliable", init: nil }, policy) end - it "merges init policy when sink requires reliable connection" do - @sink_task_m.in_port.needs_reliable_connection - @source_t.out_port.model.init_policy(true) - fallback_policy = flexmock - - flexmock(@dynamics) - .should_receive(:compute_reliable_connection_policy) - .with(@source_t.out_port, @sink_t.in_port, fallback_policy) - .once.and_return({}) + it "uses the init flag from the source port's model by default" do + flexmock(@source_t.out_port.model) + .should_receive(:init_policy?).explicitly + .and_return(true) policy = @dynamics.policy_for( - @source_t, "out", "in", @sink_t, fallback_policy + @source_t, "out", "in", @sink_t, nil ) - assert policy[:init] end - it "merges init policy when sink requires 'buffer' connection type" do - @sink_task_m.in_port.needs_buffered_connection - - flexmock(@source_t.out_port.model) - .should_receive(:init_policy?).explicitly - .and_return(true) - + it "uses the explicitly given init flag over the one from the port model" do @source_t.out_port.model.init_policy(true) - policy = @dynamics.policy_for(@source_t, "out", "in", @sink_t, nil) - - assert policy[:init] + policy = @dynamics.policy_for( + @source_t, "out", "in", @sink_t, {}, { init: false } + ) + refute policy[:init] end end diff --git a/test/network_generation/test_engine.rb b/test/network_generation/test_engine.rb index 0270ca9f7..afcbb2f0f 100644 --- a/test/network_generation/test_engine.rb +++ b/test/network_generation/test_engine.rb @@ -838,7 +838,7 @@ def deploy_dev_and_bus syskit_configure(cmp) assert_equal( - { %w[out in] => { type: :buffer, size: 20 } }, + { %w[out in] => { type: :buffer, size: 20, init: nil } }, RequiredDataFlow.edge_info(cmp.source_child, cmp.sink_child) ) end