Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions lib/syskit/network_generation/dataflow_dynamics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,18 @@ def compute_connection_policies
policy_graph
end

# Merges two connection policies
#
# @param [Hash] default_policy the policy that has the lowest
# priority
# @param [Hash] explicit_policy the policy that has the highest
# priority
def merge_policy(default_policy, explicit_policy)
merged_policy = default_policy.merge(explicit_policy)
merged_policy.delete(:size) if merged_policy[:type] == :data
merged_policy
end

# @api private
#
# Compute the policies for all connections starting from a given task
Expand All @@ -561,25 +573,20 @@ def compute_policies_from(connection_graph, source_task, policy_graph = {})
mappings = connection_graph.edge_info(source_task, sink_task)
computed_policies =
mappings.each_with_object({}) do |(port_pair, policy), h|
policy = policy.dup
fallback_policy = policy.delete(:fallback_policy)
if policy.empty?
h[port_pair] =
policy_for(source_task, *port_pair, sink_task,
fallback_policy)
else
h[port_pair] = policy
end
explicit_policy = policy.dup
fallback_policy = explicit_policy.delete(:fallback_policy)
h[port_pair] = policy_for(
source_task, *port_pair, sink_task,
fallback_policy, explicit_policy
)
end
policy_graph[[source_task, sink_task]] = computed_policies
end
policy_graph
end

# Given the current knowledge about the port dynamics, returns the
# policy for the provided connection
def policy_for(
source_task, source_port_name, sink_port_name, sink_task, fallback_policy
def policy_compute_data_element(
source_task, source_port_name, sink_task, sink_port_name, fallback_policy
)
source_port = source_task.find_output_port(source_port_name)
sink_port = sink_task.find_input_port(sink_port_name)
Expand Down Expand Up @@ -618,9 +625,27 @@ def policy_for(
"#{sink_port_m.required_connection_type} " \
"on #{sink_port}"
end
policy
end

def policy_for(
source_task, source_port_name, sink_port_name, sink_task,
fallback_policy, explicit_policy = {}
)
computed_policy =
if explicit_policy[:type]
{}
else
policy_compute_data_element(
source_task, source_port_name, sink_task,
sink_port_name, fallback_policy
)
end

model = source_task.find_output_port(source_port_name).model
computed_policy[:init] = model.init_policy

source_port_m = source_port.model
policy.merge(init: source_port_m.init_policy?)
merge_policy(computed_policy, explicit_policy)
end

def compute_reliable_connection_policy(
Expand Down
215 changes: 171 additions & 44 deletions test/network_generation/test_dataflow_dynamics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ module NetworkGeneration
task0.out_port.connect_to(task1.in_port)

@dynamics.should_receive(:policy_for)
.with(task0, "out", "in", task1, nil)
.with(task0, "out", "in", task1, nil, {})
.and_return(type: :buffer, size: 42)
policy_graph = @dynamics.compute_connection_policies

Expand All @@ -252,7 +252,7 @@ module NetworkGeneration
task0.out_port.connect_to(task1.in_port)

@dynamics.should_receive(:policy_for)
.with(task0, "out", "in", task1, nil)
.with(task0, "out", "in", task1, nil, {})
.and_return(type: :buffer, size: 42, init: true)
policy_graph = @dynamics.compute_connection_policies

Expand All @@ -272,7 +272,7 @@ module NetworkGeneration
task0.out_port.connect_to(task1.in_port)

@dynamics.should_receive(:policy_for)
.with(task0, "out", "in", task1, nil)
.with(task0, "out", "in", task1, nil, {})
.and_return(type: :buffer, size: 42, init: false)
policy_graph = @dynamics.compute_connection_policies

Expand All @@ -290,15 +290,15 @@ module NetworkGeneration
cmp.c_child.out_port.connect_to(task.in_port)

@dynamics.should_receive(:policy_for)
.with(cmp.c_child, "out", "in", task, nil)
.with(cmp.c_child, "out", "in", task, nil, {})
.and_return(type: :buffer, size: 42)
policy_graph = @dynamics.compute_connection_policies

assert_equal({ type: :buffer, size: 42 },
policy_graph[[cmp.c_child, task]][%w[out in]])
end

it "uses in-graph policies over the computed ones" do
it "merges in-graph policies with the computed ones" do
plan.add(task0 = @task_m.new)
plan.add(task1 = @task_m.new)

Expand All @@ -307,10 +307,13 @@ module NetworkGeneration

task0.out_port.connect_to(task1.in_port, type: :buffer, size: 42)

@dynamics.should_receive(:policy_for).never
@dynamics
.should_receive(:policy_compute_data_element)
.with(task0, "out", "in", task1, nil)
.and_return(type: :buffer, size: 10, init: nil)
policy_graph = @dynamics.compute_connection_policies

assert_equal({ type: :buffer, size: 42 },
assert_equal({ type: :buffer, size: 42, init: nil },
policy_graph[[task0, task1]][%w[out in]])
end

Expand All @@ -326,12 +329,12 @@ module NetworkGeneration
)

@dynamics.should_receive(:policy_for)
.with(task0, "out", "in", task1, { type: :data })
.and_return(type: :buffer, size: 42)
.with(task0, "out", "in", task1, { type: :data }, {})
.and_return(type: :buffer, size: 42, init: nil)

policy_graph = @dynamics.compute_connection_policies

assert_equal({ type: :buffer, size: 42 },
assert_equal({ type: :buffer, size: 42, init: nil },
policy_graph[[task0, task1]][%w[out in]])
end

Expand All @@ -348,9 +351,11 @@ module NetworkGeneration
fallback_policy: { type: :data }
)

@dynamics.should_receive(:policy_for).never
@dynamics.should_receive(:policy_compute_data_element)
.with(task0, "out", "in", task1, { type: :data })
.and_return({ type: :buffer, size: 42 })
policy_graph = @dynamics.compute_connection_policies
assert_equal({ type: :buffer, size: 42 },
assert_equal({ type: :buffer, size: 42, init: nil },
policy_graph[[task0, task1]][%w[out in]])
end

Expand All @@ -360,6 +365,145 @@ module NetworkGeneration
add_agents(tasks[0, 2])
flexmock(@dynamics).should_receive(:propagate).with(tasks[0, 2])
end

it "handles the case where the explicit policy sets the type to :data" do
plan.add(task0 = @task_m.new)
plan.add(task1 = @task_m.new)

add_agents(tasks = [task0, task1])
flexmock(@dynamics).should_receive(:propagate).with(tasks)

task0.out_port.connect_to task1.in_port, type: :data

flexmock(@dynamics)
.should_receive(:policy_compute_data_element)
.never

policy_graph = @dynamics.compute_connection_policies
expected_policy = { type: :data, init: nil }
assert_equal(expected_policy,
policy_graph[[task0, task1]][%w[out in]])
end

it "makes sure the size is not overridden when explicit policy " \
"sets the type to :buffer" do
plan.add(task0 = @task_m.new)
plan.add(task1 = @task_m.new)

add_agents(tasks = [task0, task1])
flexmock(@dynamics).should_receive(:propagate).with(tasks)

task0.out_port.connect_to task1.in_port, type: :buffer, size: 42

flexmock(@dynamics)
.should_receive(:policy_compute_data_element)
.never

policy_graph = @dynamics.compute_connection_policies
expected_policy = { type: :buffer, size: 42, init: nil }
assert_equal(expected_policy,
policy_graph[[task0, task1]][%w[out in]])
end
end

describe "merge_policy" do
before do
@dynamics = NetworkGeneration::DataFlowDynamics.new(plan)
end

it "merges policies by preferring explicit values over " \
"computed values" do
explicit_policy = { type: :buffer, size: 20, init: true }
computed_policy = { type: :buffer, size: 10, init: false }

merged_policy =
@dynamics.merge_policy(computed_policy, explicit_policy)

assert_equal({ type: :buffer, size: 20, init: true }, merged_policy)
end

it "removes the size value when the type is set to :data" do
explicit_policy = { type: :data, init: true }
computed_policy = { type: :buffer, size: 10, init: false }

merged_policy =
@dynamics.merge_policy(computed_policy, explicit_policy)

assert_equal({ type: :data, init: true }, merged_policy)
end

it "falls back to computed values when explicit values " \
"are not provided" do
explicit_policy = { init: false }
computed_policy = { type: :buffer, size: 10, init: true }

merged_policy =
@dynamics.merge_policy(computed_policy, explicit_policy)

assert_equal({ type: :buffer, size: 10, init: false }, merged_policy)
end
end

describe "#policy_compute_data_element" do
before do
@task_m = Syskit::TaskContext.new_submodel do
input_port "in", "/double"
output_port "out", "/double"
end
@source_task_m = @task_m.new_submodel
@sink_task_m = @task_m.new_submodel
plan.add(@source_t = @source_task_m.new)
plan.add(@sink_t = @sink_task_m.new)
end

it "returns a data connection by default" do
policy = @dynamics.policy_compute_data_element(
@source_t, "out", @sink_t, "in", nil
)
assert_equal :data, policy[:type]
end

it "returns a buffer connection of size 1 if the sink's " \
"required connection type is 'buffer'" do
@sink_task_m.in_port.needs_buffered_connection
policy = @dynamics.policy_compute_data_element(
@source_t, "out", @sink_t, "in", nil
)
assert_equal :buffer, policy[:type]
assert_equal 1, policy[:size]
end

it "raises if the required connection type is unknown " \
"and needs_reliable_connection is not set" do
flexmock(@sink_task_m.in_port)
.should_receive(:required_connection_type).explicitly
.and_return(:something)
assert_raises(UnsupportedConnectionType) do
@dynamics.policy_compute_data_element(
@source_t, "out", @sink_t, "in", nil
)
end
end

it "returns the value from compute_reliable_connection_policy " \
"if the sink port is marked as needs_reliable_connection" do
@sink_task_m.in_port.needs_reliable_connection
fallback_policy = flexmock
expected_policy = flexmock

expected_policy
.should_receive(:merge)
.and_return(expected_policy)

flexmock(@dynamics)
.should_receive(:compute_reliable_connection_policy)
.with(@source_t.out_port, @sink_t.in_port, fallback_policy)
.once.and_return(expected_policy)
policy = @dynamics.policy_compute_data_element(
@source_t, "out", @sink_t, "in", fallback_policy
)
assert_equal expected_policy, policy
end
end

describe "#policy_for" do
Expand Down Expand Up @@ -419,51 +563,34 @@ module NetworkGeneration
it "returns the value from compute_reliable_connection_policy if " \
"the sink port is marked as needs_reliable_connection" do
@sink_task_m.in_port.needs_reliable_connection
fallback_policy = flexmock
expected_policy = flexmock

expected_policy
.should_receive(:merge)
.and_return(expected_policy)

flexmock(@dynamics)
.should_receive(:compute_reliable_connection_policy)
.with(@source_t.out_port, @sink_t.in_port, fallback_policy)
.once.and_return(expected_policy)
.with(@source_t.out_port, @sink_t.in_port, { stage: "fallback" })
.once.and_return({ stage: "reliable" })

policy = @dynamics.policy_for(
@source_t, "out", "in", @sink_t, fallback_policy
@source_t, "out", "in", @sink_t, { stage: "fallback" }
)
assert_equal expected_policy, policy
assert_equal({ stage: "reliable", init: nil }, policy)
end

it "merges init policy when sink requires reliable connection" do
@sink_task_m.in_port.needs_reliable_connection
@source_t.out_port.model.init_policy(true)
fallback_policy = flexmock

flexmock(@dynamics)
.should_receive(:compute_reliable_connection_policy)
.with(@source_t.out_port, @sink_t.in_port, fallback_policy)
.once.and_return({})
it "uses the init flag from the source port's model by default" do
flexmock(@source_t.out_port.model)
.should_receive(:init_policy?).explicitly
.and_return(true)

policy = @dynamics.policy_for(
@source_t, "out", "in", @sink_t, fallback_policy
@source_t, "out", "in", @sink_t, nil
)

assert policy[:init]
end

it "merges init policy when sink requires 'buffer' connection type" do
@sink_task_m.in_port.needs_buffered_connection

flexmock(@source_t.out_port.model)
.should_receive(:init_policy?).explicitly
.and_return(true)

it "uses the explicitly given init flag over the one from the port model" do
@source_t.out_port.model.init_policy(true)
policy = @dynamics.policy_for(@source_t, "out", "in", @sink_t, nil)

assert policy[:init]
policy = @dynamics.policy_for(
@source_t, "out", "in", @sink_t, {}, { init: false }
)
refute policy[:init]
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/network_generation/test_engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ def deploy_dev_and_bus
syskit_configure(cmp)

assert_equal(
{ %w[out in] => { type: :buffer, size: 20 } },
{ %w[out in] => { type: :buffer, size: 20, init: nil } },
RequiredDataFlow.edge_info(cmp.source_child, cmp.sink_child)
)
end
Expand Down