Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 4 additions & 16 deletions src/api/command.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,7 @@ module PlaceOS::Core::Api
@[AC::Param::Info(description: "the user context for the execution", example: "user-1234")]
user_id : String? = nil,
) : Nil
# Check if module is loaded, or is a lazy module that can be loaded on demand
unless module_manager.process_manager(module_id, &.module_loaded?(module_id))
# Not loaded - check if it's a lazy module
unless module_manager.lazy_module?(module_id)
# Not a registered lazy module - check DB for launch_on_execute flag
mod = Model::Module.find(module_id)
unless mod && mod.launch_on_execute
Log.info { {module_id: module_id, message: "module not loaded"} }
raise Error::NotFound.new("module #{module_id} not loaded")
end
end
end
manager, mod_orm = module_manager.process_manager(module_id)

# NOTE:: we don't use the AC body helper for performance reasons.
# we're just proxying the JSON to the driver without parsing it
Expand All @@ -49,9 +38,7 @@ module PlaceOS::Core::Api
raise Error::NotAcceptable.new(message)
end

execute_output = module_manager.process_manager(module_id) do |manager|
manager.execute(module_id, body, user_id: user_id)
end
execute_output = manager.execute(module_id, body, user_id: user_id, mod: mod_orm)

# NOTE:: we are not using the typical response processing
# as we don't need to deserialise
Expand All @@ -73,7 +60,8 @@ module PlaceOS::Core::Api
module_id : String,
) : Nil
# Forward debug messages to the websocket
module_manager.process_manager(module_id, &.attach_debugger(module_id, socket))
manager, _mod_orm = module_manager.process_manager(module_id)
manager.attach_debugger(module_id, socket)
end

@[AC::Route::Exception(PlaceOS::Driver::RemoteException, status_code: HTTP::Status::NON_AUTHORITATIVE_INFORMATION)]
Expand Down
30 changes: 21 additions & 9 deletions src/placeos-core/module_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -307,27 +307,39 @@ module PlaceOS::Core
end

###############################################################################################

# Delegate `Model::Module` to a `ProcessManager`, either local or on an edge
#
def process_manager(mod : Model::Module | String, & : ProcessManager ->)
edge_id = case mod
def process_manager(mod : String) : Tuple(ProcessManager, Model::Module)
mod_orm = case mod
in Model::Module
mod.edge_id if mod.on_edge?
mod
in String
# TODO: Cache `Module` to `Edge` relation in `ModuleManager`
Model::Module.find!(mod).edge_id if Model::Module.has_edge_hint?(mod)
mod_lookup = Model::Module.find?(mod)
raise ModuleError.new("Could not locate module #{mod}, no matching database record") unless mod_lookup
mod_lookup
end

if edge_id
if mod_orm.on_edge? && (edge_id = mod_orm.edge_id)
if (manager = edge_processes.for?(edge_id)).nil?
raise ModuleError.new("Could not locate module #{mod_orm.id}, missing edge manager for #{edge_id}")
end
return {manager, mod_orm}
end

{local_processes, mod_orm}
end

# if we have the module loaded
def process_manager(mod : Model::Module, & : ProcessManager ->)
if mod.on_edge? && (edge_id = mod.edge_id)
if (manager = edge_processes.for?(edge_id)).nil?
Log.error { "missing edge manager for #{edge_id}" }
return
end
yield manager
else
yield local_processes
end

yield local_processes
end

def process_manager(driver_key : String, edge_id : String?) : ProcessManager?
Expand Down
21 changes: 3 additions & 18 deletions src/placeos-core/process_manager/common.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,7 @@ require "hardware"

# Methods for interacting with module processes common across a local and edge node
module PlaceOS::Core::ProcessManager::Common
def execute(module_id : String, payload : String | IO, user_id : String?)
manager = protocol_manager_by_module?(module_id)

raise ModuleError.new("No protocol manager for #{module_id}") if manager.nil?

request_body = payload.is_a?(IO) ? payload.gets_to_end : payload
manager.execute(
module_id,
request_body,
user_id: user_id,
)
rescue error : PlaceOS::Driver::RemoteException
raise error
rescue exception
raise module_error(module_id, exception)
end
abstract def execute(module_id : String, payload : String | IO, user_id : String?, mod : Model::Module? = nil)

def start(module_id : String, payload : String)
manager = protocol_manager_by_module?(module_id)
Expand Down Expand Up @@ -268,15 +253,15 @@ module PlaceOS::Core::ProcessManager::Common
#################################################################################################

macro module_error(module_id, exception)
if {{ exception }}.is_a?(ModuleError)
if {{ exception }}.is_a?(::PlaceOS::Core::ModuleError)
{{ exception }}
else
%driver_path = path_for?({{ module_id }})
%context = [] of String
%context << "module manager not present" unless protocol_manager_by_module?({{ module_id }})
%context << "driver expected at #{%driver_path} not present" if %driver_path && !File.exists?(%driver_path)
%message = "When invoking {{@def.name}} on #{{{module_id}}} #{%context.join(", ")}"
ModuleError.new(%message, cause: {{ exception }})
::PlaceOS::Core::ModuleError.new(%message, cause: {{ exception }})
end
end
end
2 changes: 1 addition & 1 deletion src/placeos-core/process_manager/edge.cr
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ module PlaceOS::Core
} }
end

def execute(module_id : String, payload : String, user_id : String?)
def execute(module_id : String, payload : String, user_id : String?, mod : Model::Module? = nil)
response = Protocol.request(Protocol::Message::Execute.new(module_id, payload, user_id), expect: Protocol::Message::ExecuteResponse, preserve_response: true)
if response.nil?
raise PlaceOS::Driver::RemoteException.new("No response received from edge received", IO::TimeoutError.class.to_s)
Expand Down
80 changes: 33 additions & 47 deletions src/placeos-core/process_manager/local.cr
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,29 @@ module PlaceOS::Core
false
end

def execute(module_id : String, payload : String | IO, user_id : String?)
# Check if this is a lazy module that needs to be loaded
mod = Model::Module.find?(module_id)
if mod && mod.launch_on_execute
return execute_lazy(mod, payload, user_id)
end
def execute(module_id : String, payload : String | IO, user_id : String?, mod : Model::Module? = nil)
mod = mod || Model::Module.find?(module_id)
raise ModuleError.new("Could not locate module #{module_id}, no matching database record") unless mod

super
rescue exception : ModuleError
# Check if this is a lazy module that needs to be loaded
return execute_lazy(mod, payload, user_id) if mod.launch_on_execute
raise ModuleError.new("Could not locate module #{module_id}, it is stopped") unless mod.running

# the module should be running and have a management module
manager = protocol_manager_by_module?(module_id) || ensure_lazy_module_loaded(mod)
request_body = payload.is_a?(IO) ? payload.gets_to_end : payload
manager.execute(
module_id,
request_body,
user_id: user_id,
)
rescue error : PlaceOS::Driver::RemoteException
raise error
rescue exception
if exception.message =~ /module #{module_id} not running on this host/
raise no_module_error(module_id, exception)
else
raise exception
raise module_error(module_id, exception)
end
end

Expand All @@ -85,12 +95,9 @@ module PlaceOS::Core

begin
# Ensure driver is spawned and module is loaded
ensure_lazy_module_loaded(mod)
manager = ensure_lazy_module_loaded(mod)

# Execute the request
manager = protocol_manager_by_module?(module_id)
raise ModuleError.new("No protocol manager for lazy module #{module_id}") if manager.nil?

request_body = payload.is_a?(IO) ? payload.gets_to_end : payload
manager.execute(module_id, request_body, user_id: user_id)
ensure
Expand All @@ -105,9 +112,9 @@ module PlaceOS::Core
module_id = mod.id.as(String)

# Already loaded?
if protocol_manager_by_module?(module_id)
if manager = protocol_manager_by_module?(module_id)
Log.debug { {message: "lazy module already loaded", module_id: module_id} }
return
return manager
end

driver = mod.driver!
Expand All @@ -128,6 +135,7 @@ module PlaceOS::Core

Log.info { {message: "spawned driver for lazy module execution", module_id: module_id, name: mod.name} }
end
manager.as(Driver::Protocol::Management)
end

# Schedule unload of lazy module after idle timeout
Expand Down Expand Up @@ -236,18 +244,21 @@ module PlaceOS::Core
def on_exec(request : Request, response_callback : Request ->)
module_manager = ModuleManager.instance
module_id = request.id
request = if module_manager.process_manager(module_id, &.module_loaded?(module_id))
local_execute(request)

manager, mod_orm = module_manager.process_manager(module_id)
request = if manager.module_loaded?(module_id)
local_execute(request, module_id, mod_orm)
else
core_uri = which_core(module_id)
if core_uri == discovery.uri
# If the module maps to this node
local_execute(request)
local_execute(request, module_id, mod_orm)
else
# Otherwise, dial core node responsible for the module
remote_execute(core_uri, request)
end
end

response_callback.call(request)
rescue error
request.set_error(error)
Expand Down Expand Up @@ -291,35 +302,10 @@ module PlaceOS::Core
request
end

protected def local_execute(request)
module_id = request.id

module_manager = ModuleManager.instance
unless module_manager.process_manager(module_id, &.module_loaded?(module_id))
Log.info { {module_id: module_id, message: "module not loaded"} }
# Attempt to start the module, recover from abnormal conditions
begin
module_manager.start_module(Model::Module.find!(module_id))
rescue exception
raise no_module_error(module_id, exception)
end
raise no_module_error(module_id) unless module_manager.process_manager(module_id, &.module_loaded?(module_id))
end

begin
response = module_manager.process_manager(module_id) { |manager|
manager.execute(module_id, request.payload.as(String), user_id: request.user_id)
} || {"".as(String?), 500}
request.code = response[1]
request.payload = response[0]
rescue exception
if exception.message.try(&.includes?("module #{module_id} not running on this host"))
raise no_module_error(module_id, exception)
else
raise exception
end
end

protected def local_execute(request, module_id, mod_orm)
response = execute(module_id, request.payload.as(String), request.user_id, mod_orm) || {"".as(String?), 500}
request.code = response[1]
request.payload = response[0]
request.cmd = :result
request
end
Expand Down
20 changes: 20 additions & 0 deletions src/placeos-edge/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,26 @@ module PlaceOS::Edge
@uri = uri
end

alias ModuleError = ::PlaceOS::Core::ModuleError

# Implement the abstract method from Common
def execute(module_id : String, payload : String | IO, user_id : String?, mod : Model::Module? = nil)
manager = protocol_manager_by_module?(module_id)

raise ModuleError.new("No protocol manager for #{module_id}") if manager.nil?

request_body = payload.is_a?(IO) ? payload.gets_to_end : payload
manager.execute(
module_id,
request_body,
user_id: user_id,
)
rescue error : PlaceOS::Driver::RemoteException
raise error
rescue exception
raise module_error(module_id, exception)
end

# Initialize the WebSocket API
#
# Optionally accepts a block called after connection has been established.
Expand Down
Loading