Skip to content

PIOT-CDA-09-006-A: Add OBSERVE functionality to AsyncCoapClientConnector #217

@labbenchstudios

Description

@labbenchstudios

Description

  • Update the Python module named AsyncCoapClientConnector with support for OBSERVE requests, using the existing method definitions within IRequestResponseHandler to support this request type.
    • NOTE: These instructions make use of the following CoAP library:
      • The aiocoap open source CoAP library, located at: aiocoap. Reference: Amsüss, Christian and Wasilak, Maciej. aiocoap: Python CoAP Library. Energy Harvesting Solutions, 2013–. http://github.com/chrysn/aiocoap/.

Review the README

  • Please see README.md for further information on, and use of, this content.
  • License for embedded documentation and source codes: PIOT-DOC-LIC

Estimated effort may vary greatly

  • The estimated level of effort for this exercise shown in the 'Estimate' section below is a very rough approximation. The actual level of effort may vary greatly depending on your development and test environment, experience with the requisite technologies, and many other factors.

Actions

Step 1: Create the OBSERVE start method

  • Implement the public startObserver() method
    • This method will take the following arguments:
      • resource (ResourceNameEnum): Used to create the general request URL
      • name (str): Used to extend the general request URL with further detail (if warranted)
      • ttl (int): The time to live for the observe request (not currently used in the implementation)
    • A general implementation may look like the following:
	def startObserver(self, resource: ResourceNameEnum = None, name: str = None, ttl: int = IRequestResponseClient.DEFAULT_TTL) -> bool:
		if resource or name:
			resourcePath = self._createResourcePath(resource, name)
		
			if resourcePath in self.observeTasks:
				logging.warning(f"Already observing resource {resourcePath}. Ignoring start observe request.")
				return False

			fullPath = self.uriPath + resourcePath

			task = asyncio.run_coroutine_threadsafe(
				self._handleStartObserveRequest(fullPath),
				self._eventLoopThread
			)

			self.observeTasks[resourcePath] = task  # Store in Tasks

			logging.info(f"Started observing: {resourcePath}")
			return True
		else:
			logging.warning("Can't issue Async OBSERVE - GET - no path provided.")
			return False

Step 2: Create the OBSERVE stop method

  • Implement the public stopObserver() method
    • This method will take the following arguments:
      • resource (ResourceNameEnum): Used to create the general request URL
      • name (str): Used to extend the general request URL with further detail (if warranted)
    • A general implementation may look like the following:
	def stopObserver(self, resource: ResourceNameEnum = None, name: str = None) -> bool:
		if resource or name:
			resourcePath = self._createResourcePath(resource, name)

			if resourcePath not in self.observeTasks:  # Check Tasks
				logging.warning(f"Resource {resourcePath} not being observed. Ignoring stop observe request.")
				return False
			
			task = self.observeTasks[resourcePath]  # Get from Tasks
			task.cancel()

			cleanup_future = asyncio.run_coroutine_threadsafe(
				self._handleStopObserveRequest(resourcePath, ignoreErr = True),
				self._eventLoopThread
			)

			try:
				cleanup_future.result(timeout = 5.0)
				logging.info(f"Stopped observing: {resourcePath}")
				del self.observeTasks[resourcePath]  # Delete from Tasks
				return True
			
			except Exception as e:
				logging.error(f"Error stopping observation: {e}")
				return False
		else:
			logging.warning("Can't cancel OBSERVE - GET - no path provided.")
			return False

Step 3: Create the internal handler for the OBSERVE start request

  • Implement the internal _handleStartObserveRequest() method
    • This method will pass along the callback to be used when the server responds with any update for the observe request (which will be sent asynchronously from the server whenever it has updated data associated with the initial request).
    • _handleStartObserveRequest() will take the following arguments:
      • resourcePath (str): This will be the full resource path used for the request
  • A general implementation may look like the following:
	async def _handleStartObserveRequest(self, resourcePath: str = None):
		logging.info(f"Handle start observe invoked. Waiting for each input: {resourcePath}")
		
		try:
			msg = Message(code = Code.GET, uri = resourcePath, observe = 0)
			req = self.clientContext.request(msg)

			# store with relative path as key
			# needed for later cleanup
			relativePath = resourcePath.replace(self.uriPath, "")
			self.observeRequests[relativePath] = req

			# get initial response
			responseData = await req.response
			self._onGetResponse(responseData)
			
			# continue observation
			async for responseData in req.observation:
				self._onGetResponse(responseData)
				
		except asyncio.CancelledError:
			# expected
			logging.info(f"Observation cancelled for {resourcePath}")

		except Exception as e:
			logging.warning(f"Failed to execute OBSERVE - GET. Error: {e}")
			traceback.print_exception(type(e), e, e.__traceback__)

		finally:
			relativePath = resourcePath.replace(self.uriPath, "")

			if relativePath in self.observeRequests:
				del self.observeRequests[relativePath]

Step 3: Create the internal handler for the OBSERVE stop request

  • Implement the internal _handleStopObserveRequest() method
    • This method will pass along the callback to be used when the server responds with any update for the observe request (which will be sent asynchronously from the server whenever it has updated data associated with the initial request).
    • _handleStopObserveRequest() will take the following arguments:
      • resourcePath (str): This will be the full resource path used for the request
      • ignoreErr (bool): Reserved for future use
  • A general implementation may look like the following:
	async def _handleStopObserveRequest(self, resourcePath: str = None, ignoreErr: bool = False):
		if resourcePath in self.observeRequests:
			logging.info(f"Handle stop observe invoked: {resourcePath}")

			try:
				observeRequest = self.observeRequests[resourcePath]
				observeRequest.observation.cancel()

			except Exception as e:
				if not ignoreErr:
					logging.warning(f"Failed to cancel OBSERVE - GET: {resourcePath}")

			try:
				del self.observeRequests[resourcePath]

			except Exception as e:
				if not ignoreErr:
					logging.warning(f"Failed to remove observable from list: {resourcePath}")

		else:
			if not ignoreErr:
				logging.warning(f"Resource not currently under observation. Ignoring: {resourcePath}")

IMPORTANT NOTE

  • Be sure to implement the stopObserver() functionality. This is very important, as the CoAP server will continue attempting to send the CoAP client updates after the startObserver() functionality is invoked, unless the server itself decides to stop on its own (due to an error sending updates to the client) or the client explicitly tells the server to stop (via `stopObserver()').
  • Keep in mind the CoAP server needs to implement the observe functionality in the appropriate handlers for observe to work properly.

Estimate

  • Medium

Tests

  • Edit / add test cases
    • Update the test_CoapAsyncClientConnectorTest.py module (containing the CoapAsyncClientConnectorTest class) in CDA_HOME/tests/integration/connection as indicated below.
      • If not already done for you within the code, disable the existing tests by UNCOMMENTING the skip test annotation before each test case (change #@unittest.skip("Ignore for now.") to @unittest.skip("Ignore for now.")).
      • If not already done for you within the code, create a new test case named testActuatorCommandObserve() to handle the start and stop observation calls. It can also be helpful to create helper methods to support this test case, as follows:
#@unittest.skip("Ignore for now.")
def testActuatorCommandObserve(self):
	self._startObserver()
	sleep(20) # feel free to change this to a different value
	self._stopObserver()
	
def _startObserver(self):
	self.coapClient.startObserver(resource = ResourceNameEnum.CDA_ACTUATOR_CMD_RESOURCE)

def _stopObserver(self):
	self.coapClient.stopObserver(resource = ResourceNameEnum.CDA_ACTUATOR_CMD_RESOURCE)
  • Setup
    • Start Wireshark, and ensure it's watching the loopback adapter.
      • You can filter on 'coap' to track only CoAP messages, which is recommended for this test.
    • Start your GDA application with the CoAP server enabled. Make sure it runs for a couple of minutes - long enough for you to run the integration tests listed below.
      • To run your GDA from the command line, you just need to do the following from a shell (assuming you're starting from the parent directory containing your java-components, and that the path is named 'piot-java-components':
        • NOTE: Be sure to run your GDA Java app for a few minutes so you have time to run the tests!!
cd piot-java-components
mvn install -DskipTests
java -jar target/gateway-device-app-0.0.1-jar-with-dependencies.jar
  • Run the OBSERVE tests
    • Run the testActuatorCommandObserve() test case and watch the output in the console for the client as well as within Wireshark.

      • NOTE 1: If you're a student in the Connected Devices course, be sure to follow the instructions in PIOT-INF-09-003.
      • NOTE 2: For the test to work as shown in the sample log output, you'll need to update your GDA's GetActuatorCommandResourceHandler class to do the following:
        • Handle actuator data updates (via a callback to the class from the IDataMessageListener or using an internal thread / scheduler that updates the locally stored ActuatorData instance at regular intervals.
        • Notify any observers of the resource when the ActuatorData instance is updated via the CoapResource instances changed() method.
        • Handle the removal of any resource observers - as appropriate.
    • Your output will contain log content similar to the following:


Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    Status

    Lab Module 09 - CoAP Clients

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions