Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions lib/syskit/network_generation/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,19 @@ def compute_deployed_network(
def apply_deployed_network_to_plan
# Finally, we map the deployed network to the currently
# running tasks
@deployment_tasks, @deployed_tasks =
@deployment_tasks, @reused_deployed_tasks, @new_deployed_tasks =
log_timepoint_group "finalize_deployed_tasks" do
finalize_deployed_tasks
end
@deployed_tasks = @reused_deployed_tasks + @new_deployed_tasks

sever_old_plan_from_new_plan

if @dataflow_dynamics
@dataflow_dynamics.apply_merges(merge_solver)
log_timepoint "apply_merged_to_dataflow_dynamics"
end

Engine.deployment_postprocessing.each do |block|
block.call(self, work_plan)
log_timepoint "postprocessing:#{block}"
Expand Down Expand Up @@ -387,8 +389,9 @@ def finalize_deployed_tasks
end
log_timepoint "select_deployments"

reused_deployed_tasks =
reconfigure_tasks_on_static_port_modification(reused_deployed_tasks)
reconfigure_tasks_on_static_port_modification(
reused_deployed_tasks, newly_deployed_tasks
)
log_timepoint "reconfigure_tasks_on_static_port_modification"

debug do
Expand All @@ -404,7 +407,7 @@ def finalize_deployed_tasks
merge_solver.merge_identical_tasks
log_timepoint "merge"

[selected_deployment_tasks, reused_deployed_tasks | newly_deployed_tasks]
[selected_deployment_tasks, reused_deployed_tasks, newly_deployed_tasks]
end

# Process a single deployment in {#finalize_deployed_tasks}
Expand Down Expand Up @@ -433,18 +436,18 @@ def handle_required_deployment(required, usable, not_reusable)
end

if usable
newly_deployed_tasks = []
reused_deployed_tasks = adapt_existing_deployment(required, usable)
new_deployed_tasks, reused_deployed_tasks =
adapt_existing_deployment(required, usable)
selected = usable
else
# Nothing to do, we leave the plan as it is
newly_deployed_tasks = required.each_executed_task
new_deployed_tasks = required.each_executed_task
reused_deployed_tasks = []
selected = required
end

selected.should_start_after(not_reusable.stop_event) if not_reusable
[selected, newly_deployed_tasks, reused_deployed_tasks]
[selected, new_deployed_tasks, reused_deployed_tasks]
end

# Validate that the usable deployment we found is actually usable
Expand Down Expand Up @@ -563,9 +566,9 @@ def import_existing_deployments(used_deployments)
# Note that tasks that are already reconfigured because of
# {#adapt_existing_deployment} will be fine as the task is not
# configured yet
def reconfigure_tasks_on_static_port_modification(deployed_tasks)
final_deployed_tasks = deployed_tasks.dup

def reconfigure_tasks_on_static_port_modification(
reused_deployed_tasks, newly_deployed_tasks
)
# We filter against 'deployed_tasks' to always select the tasks
# that have been selected in this deployment. It does mean that
# the task is always the 'current' one, that is we would pick
Expand All @@ -576,7 +579,7 @@ def reconfigure_tasks_on_static_port_modification(deployed_tasks)
.find_tasks(Syskit::TaskContext).not_finished.not_finishing
.find_all { |t| !t.read_only? }
.find_all do |t|
deployed_tasks.include?(t) && (t.setting_up? || t.setup?)
reused_deployed_tasks.include?(t) && (t.setting_up? || t.setup?)
end

already_setup_tasks.each do |t|
Expand All @@ -590,10 +593,9 @@ def reconfigure_tasks_on_static_port_modification(deployed_tasks)
new_task = t.execution_agent.task(t.orocos_name, t.concrete_model)
merge_solver.apply_merge_group(t => new_task)
new_task.should_configure_after t.stop_event
final_deployed_tasks.delete(t)
final_deployed_tasks << new_task
reused_deployed_tasks.delete(t)
newly_deployed_tasks << new_task
end
final_deployed_tasks
end

# Find the "last" deployed task in a set of related deployed tasks
Expand Down Expand Up @@ -635,8 +637,9 @@ def adapt_existing_deployment(deployment_task, existing_deployment_task)
(orocos_name_to_existing[t.orocos_name] ||= []) << t
end

applied_merges = Set.new
deployed_tasks = deployment_task.each_executed_task.to_a
new_deployed_tasks = []
reused_deployed_tasks = []
deployed_tasks.each do |task|
existing_tasks =
orocos_name_to_existing[task.orocos_name] || []
Expand Down Expand Up @@ -667,15 +670,17 @@ def adapt_existing_deployment(deployment_task, existing_deployment_task)

new_task.should_configure_after(previous_task.stop_event)
end
new_deployed_tasks << new_task
existing_task = new_task
else
reused_deployed_tasks << existing_task
end

merge_solver.apply_merge_group(task => existing_task)
applied_merges << existing_task
debug { " using #{existing_task} for #{task} (#{task.orocos_name})" }
end
work_plan.remove_task(deployment_task)
applied_merges
[new_deployed_tasks, reused_deployed_tasks]
end

# Computes the set of requirement tasks that should be used for
Expand Down Expand Up @@ -858,15 +863,15 @@ def apply_system_network_to_plan(
log_timepoint "final_network_postprocessing:#{block}"
end

# Finally, we should now only have deployed tasks. Verify it
# and compute the connection policies
if garbage_collect && validate_final_network
validate_final_network(required_instances, work_plan,
compute_deployments: compute_deployments)
log_timepoint "validate_final_network"
end

commit_work_plan

validate_reconfigured_tasks_are_not_held(@new_deployed_tasks)
end

def discard_work_plan
Expand Down Expand Up @@ -927,7 +932,13 @@ def handle_resolution_exception(e, on_error: :discard)
def validate_final_network(
required_instances, plan, compute_deployments: true
)
# Check that all device instances are proper tasks (not proxies)
validate_required_instances_are_tasks(required_instances)

super if defined? super
end

def validate_required_instances_are_tasks(required_instances)
# Check that the final set of root required instances are proper tasks
required_instances.each do |_req_task, task|
if task.transaction_proxy?
raise InternalError,
Expand All @@ -938,8 +949,32 @@ def validate_final_network(
"instance definition #{task} has been removed from plan"
end
end
end

super if defined? super
# Exception added to the plan when we detect that a task being reconfigured
# is held against garbage collection.
#
# This is an internal error (i.e. should not happen), but not triggering
# this causes the system to "hold on" forever.
class InternalErrorReconfiguredTaskIsHeld < Roby::LocalizedError
end

# Validate that "old" tasks in a reconfigured pair will be garbage collected
#
# This must be called at the very end
def validate_reconfigured_tasks_are_not_held(new_deployed_tasks)
reconfigured_tasks = new_deployed_tasks.flat_map do |task|
task.start_event.parent_objects(
Roby::EventStructure::SyskitConfigurationPrecedence
).map(&:task).to_a
end

useful_tasks = real_plan.useful_tasks
reconfigured_tasks.each do |t|
next unless useful_tasks.include?(t)

t.add_error(InternalErrorReconfiguredTaskIsHeld.new(t))
end
end

@@dot_index = 0
Expand Down
12 changes: 7 additions & 5 deletions lib/syskit/test/network_manipulation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,13 @@ def syskit_deploy(
resolve_options = Hash[on_error: :commit].merge(resolve_options)
begin
syskit_engine_resolve_handle_plan_export do
syskit_engine ||= Syskit::NetworkGeneration::Engine.new(plan)
syskit_engine.resolve(
default_deployment_group: default_deployment_group,
**resolve_options
)
execute do
syskit_engine ||= Syskit::NetworkGeneration::Engine.new(plan)
syskit_engine.resolve(
default_deployment_group: default_deployment_group,
**resolve_options
)
end
end
rescue StandardError => e
expect_execution do
Expand Down
Loading