diff --git a/spec/api/command_spec.cr b/spec/api/command_spec.cr index db6b1a2..64a98a5 100644 --- a/spec/api/command_spec.cr +++ b/spec/api/command_spec.cr @@ -74,7 +74,7 @@ module PlaceOS::Core::Api resource_manager.try &.stop end - it "returns 404 for non-lazy module that is not loaded" do + it "recovers a module that is not loaded but should be" do _, _, mod = setup(role: PlaceOS::Model::Driver::Role::Service) mod_id = mod.id.as(String) @@ -84,7 +84,7 @@ module PlaceOS::Core::Api route = File.join(namespace, mod_id, "execute") response = client.post(route, headers: json_headers, body: EXEC_PAYLOAD) - response.status_code.should eq 404 + response.status_code.should eq 200 ensure module_manager.try &.stop end diff --git a/src/api/command.cr b/src/api/command.cr index 179fbd4..3d2d005 100644 --- a/src/api/command.cr +++ b/src/api/command.cr @@ -27,18 +27,7 @@ module PlaceOS::Core::Api @[AC::Param::Info(description: "the user context for the execution", example: "user-1234")] user_id : String? = nil, ) : Nil - # Check if module is loaded, or is a lazy module that can be loaded on demand - unless module_manager.process_manager(module_id, &.module_loaded?(module_id)) - # Not loaded - check if it's a lazy module - unless module_manager.lazy_module?(module_id) - # Not a registered lazy module - check DB for launch_on_execute flag - mod = Model::Module.find(module_id) - unless mod && mod.launch_on_execute - Log.info { {module_id: module_id, message: "module not loaded"} } - raise Error::NotFound.new("module #{module_id} not loaded") - end - end - end + manager, mod_orm = module_manager.process_manager(module_id) # NOTE:: we don't use the AC body helper for performance reasons. # we're just proxying the JSON to the driver without parsing it @@ -49,9 +38,7 @@ module PlaceOS::Core::Api raise Error::NotAcceptable.new(message) end - execute_output = module_manager.process_manager(module_id) do |manager| - manager.execute(module_id, body, user_id: user_id) - end + execute_output = manager.execute(module_id, body, user_id: user_id, mod: mod_orm) # NOTE:: we are not using the typical response processing # as we don't need to deserialise @@ -73,7 +60,8 @@ module PlaceOS::Core::Api module_id : String, ) : Nil # Forward debug messages to the websocket - module_manager.process_manager(module_id, &.attach_debugger(module_id, socket)) + manager, _mod_orm = module_manager.process_manager(module_id) + manager.attach_debugger(module_id, socket) end @[AC::Route::Exception(PlaceOS::Driver::RemoteException, status_code: HTTP::Status::NON_AUTHORITATIVE_INFORMATION)] diff --git a/src/placeos-core/module_manager.cr b/src/placeos-core/module_manager.cr index d3f21d2..164f70d 100644 --- a/src/placeos-core/module_manager.cr +++ b/src/placeos-core/module_manager.cr @@ -307,27 +307,39 @@ module PlaceOS::Core end ############################################################################################### - # Delegate `Model::Module` to a `ProcessManager`, either local or on an edge # - def process_manager(mod : Model::Module | String, & : ProcessManager ->) - edge_id = case mod + def process_manager(mod : String) : Tuple(ProcessManager, Model::Module) + mod_orm = case mod in Model::Module - mod.edge_id if mod.on_edge? + mod in String - # TODO: Cache `Module` to `Edge` relation in `ModuleManager` - Model::Module.find!(mod).edge_id if Model::Module.has_edge_hint?(mod) + mod_lookup = Model::Module.find?(mod) + raise ModuleError.new("Could not locate module #{mod}, no matching database record") unless mod_lookup + mod_lookup end - if edge_id + if mod_orm.on_edge? && (edge_id = mod_orm.edge_id) + if (manager = edge_processes.for?(edge_id)).nil? + raise ModuleError.new("Could not locate module #{mod_orm.id}, missing edge manager for #{edge_id}") + end + return {manager, mod_orm} + end + + {local_processes, mod_orm} + end + + # if we have the module loaded + def process_manager(mod : Model::Module, & : ProcessManager ->) + if mod.on_edge? && (edge_id = mod.edge_id) if (manager = edge_processes.for?(edge_id)).nil? Log.error { "missing edge manager for #{edge_id}" } return end yield manager - else - yield local_processes end + + yield local_processes end def process_manager(driver_key : String, edge_id : String?) : ProcessManager? diff --git a/src/placeos-core/process_manager/common.cr b/src/placeos-core/process_manager/common.cr index 7b6b28d..36a17f3 100644 --- a/src/placeos-core/process_manager/common.cr +++ b/src/placeos-core/process_manager/common.cr @@ -2,22 +2,7 @@ require "hardware" # Methods for interacting with module processes common across a local and edge node module PlaceOS::Core::ProcessManager::Common - def execute(module_id : String, payload : String | IO, user_id : String?) - manager = protocol_manager_by_module?(module_id) - - raise ModuleError.new("No protocol manager for #{module_id}") if manager.nil? - - request_body = payload.is_a?(IO) ? payload.gets_to_end : payload - manager.execute( - module_id, - request_body, - user_id: user_id, - ) - rescue error : PlaceOS::Driver::RemoteException - raise error - rescue exception - raise module_error(module_id, exception) - end + abstract def execute(module_id : String, payload : String | IO, user_id : String?, mod : Model::Module? = nil) def start(module_id : String, payload : String) manager = protocol_manager_by_module?(module_id) @@ -268,7 +253,7 @@ module PlaceOS::Core::ProcessManager::Common ################################################################################################# macro module_error(module_id, exception) - if {{ exception }}.is_a?(ModuleError) + if {{ exception }}.is_a?(::PlaceOS::Core::ModuleError) {{ exception }} else %driver_path = path_for?({{ module_id }}) @@ -276,7 +261,7 @@ module PlaceOS::Core::ProcessManager::Common %context << "module manager not present" unless protocol_manager_by_module?({{ module_id }}) %context << "driver expected at #{%driver_path} not present" if %driver_path && !File.exists?(%driver_path) %message = "When invoking {{@def.name}} on #{{{module_id}}} #{%context.join(", ")}" - ModuleError.new(%message, cause: {{ exception }}) + ::PlaceOS::Core::ModuleError.new(%message, cause: {{ exception }}) end end end diff --git a/src/placeos-core/process_manager/edge.cr b/src/placeos-core/process_manager/edge.cr index 659f9bc..fb30d68 100644 --- a/src/placeos-core/process_manager/edge.cr +++ b/src/placeos-core/process_manager/edge.cr @@ -69,7 +69,7 @@ module PlaceOS::Core } } end - def execute(module_id : String, payload : String, user_id : String?) + def execute(module_id : String, payload : String, user_id : String?, mod : Model::Module? = nil) response = Protocol.request(Protocol::Message::Execute.new(module_id, payload, user_id), expect: Protocol::Message::ExecuteResponse, preserve_response: true) if response.nil? raise PlaceOS::Driver::RemoteException.new("No response received from edge received", IO::TimeoutError.class.to_s) diff --git a/src/placeos-core/process_manager/local.cr b/src/placeos-core/process_manager/local.cr index 9587ed0..39f757c 100644 --- a/src/placeos-core/process_manager/local.cr +++ b/src/placeos-core/process_manager/local.cr @@ -60,19 +60,29 @@ module PlaceOS::Core false end - def execute(module_id : String, payload : String | IO, user_id : String?) - # Check if this is a lazy module that needs to be loaded - mod = Model::Module.find?(module_id) - if mod && mod.launch_on_execute - return execute_lazy(mod, payload, user_id) - end + def execute(module_id : String, payload : String | IO, user_id : String?, mod : Model::Module? = nil) + mod = mod || Model::Module.find?(module_id) + raise ModuleError.new("Could not locate module #{module_id}, no matching database record") unless mod - super - rescue exception : ModuleError + # Check if this is a lazy module that needs to be loaded + return execute_lazy(mod, payload, user_id) if mod.launch_on_execute + raise ModuleError.new("Could not locate module #{module_id}, it is stopped") unless mod.running + + # the module should be running and have a management module + manager = protocol_manager_by_module?(module_id) || ensure_lazy_module_loaded(mod) + request_body = payload.is_a?(IO) ? payload.gets_to_end : payload + manager.execute( + module_id, + request_body, + user_id: user_id, + ) + rescue error : PlaceOS::Driver::RemoteException + raise error + rescue exception if exception.message =~ /module #{module_id} not running on this host/ raise no_module_error(module_id, exception) else - raise exception + raise module_error(module_id, exception) end end @@ -85,12 +95,9 @@ module PlaceOS::Core begin # Ensure driver is spawned and module is loaded - ensure_lazy_module_loaded(mod) + manager = ensure_lazy_module_loaded(mod) # Execute the request - manager = protocol_manager_by_module?(module_id) - raise ModuleError.new("No protocol manager for lazy module #{module_id}") if manager.nil? - request_body = payload.is_a?(IO) ? payload.gets_to_end : payload manager.execute(module_id, request_body, user_id: user_id) ensure @@ -105,9 +112,9 @@ module PlaceOS::Core module_id = mod.id.as(String) # Already loaded? - if protocol_manager_by_module?(module_id) + if manager = protocol_manager_by_module?(module_id) Log.debug { {message: "lazy module already loaded", module_id: module_id} } - return + return manager end driver = mod.driver! @@ -128,6 +135,7 @@ module PlaceOS::Core Log.info { {message: "spawned driver for lazy module execution", module_id: module_id, name: mod.name} } end + manager.as(Driver::Protocol::Management) end # Schedule unload of lazy module after idle timeout @@ -236,18 +244,21 @@ module PlaceOS::Core def on_exec(request : Request, response_callback : Request ->) module_manager = ModuleManager.instance module_id = request.id - request = if module_manager.process_manager(module_id, &.module_loaded?(module_id)) - local_execute(request) + + manager, mod_orm = module_manager.process_manager(module_id) + request = if manager.module_loaded?(module_id) + local_execute(request, module_id, mod_orm) else core_uri = which_core(module_id) if core_uri == discovery.uri # If the module maps to this node - local_execute(request) + local_execute(request, module_id, mod_orm) else # Otherwise, dial core node responsible for the module remote_execute(core_uri, request) end end + response_callback.call(request) rescue error request.set_error(error) @@ -291,35 +302,10 @@ module PlaceOS::Core request end - protected def local_execute(request) - module_id = request.id - - module_manager = ModuleManager.instance - unless module_manager.process_manager(module_id, &.module_loaded?(module_id)) - Log.info { {module_id: module_id, message: "module not loaded"} } - # Attempt to start the module, recover from abnormal conditions - begin - module_manager.start_module(Model::Module.find!(module_id)) - rescue exception - raise no_module_error(module_id, exception) - end - raise no_module_error(module_id) unless module_manager.process_manager(module_id, &.module_loaded?(module_id)) - end - - begin - response = module_manager.process_manager(module_id) { |manager| - manager.execute(module_id, request.payload.as(String), user_id: request.user_id) - } || {"".as(String?), 500} - request.code = response[1] - request.payload = response[0] - rescue exception - if exception.message.try(&.includes?("module #{module_id} not running on this host")) - raise no_module_error(module_id, exception) - else - raise exception - end - end - + protected def local_execute(request, module_id, mod_orm) + response = execute(module_id, request.payload.as(String), request.user_id, mod_orm) || {"".as(String?), 500} + request.code = response[1] + request.payload = response[0] request.cmd = :result request end diff --git a/src/placeos-edge/client.cr b/src/placeos-edge/client.cr index 9e89b48..84a02be 100644 --- a/src/placeos-edge/client.cr +++ b/src/placeos-edge/client.cr @@ -65,6 +65,26 @@ module PlaceOS::Edge @uri = uri end + alias ModuleError = ::PlaceOS::Core::ModuleError + + # Implement the abstract method from Common + def execute(module_id : String, payload : String | IO, user_id : String?, mod : Model::Module? = nil) + manager = protocol_manager_by_module?(module_id) + + raise ModuleError.new("No protocol manager for #{module_id}") if manager.nil? + + request_body = payload.is_a?(IO) ? payload.gets_to_end : payload + manager.execute( + module_id, + request_body, + user_id: user_id, + ) + rescue error : PlaceOS::Driver::RemoteException + raise error + rescue exception + raise module_error(module_id, exception) + end + # Initialize the WebSocket API # # Optionally accepts a block called after connection has been established.