diff --git a/aries_cloudagent/core/conductor.py b/aries_cloudagent/core/conductor.py index 951e7d785a..d8c4743f0b 100644 --- a/aries_cloudagent/core/conductor.py +++ b/aries_cloudagent/core/conductor.py @@ -531,9 +531,11 @@ async def _queue_external( targets = ( [outbound.target] if outbound.target else (outbound.target_list or []) ) + queue = await self.outbound_transport_manager.encode_message_external_queue(profile, outbound) + for target in targets: await self.outbound_queue.enqueue_message( - outbound.payload, target.endpoint + queue.payload, target.endpoint ) return OutboundSendStatus.SENT_TO_EXTERNAL_QUEUE diff --git a/aries_cloudagent/transport/outbound/manager.py b/aries_cloudagent/transport/outbound/manager.py index b9086445cb..2ec5f26d0f 100644 --- a/aries_cloudagent/transport/outbound/manager.py +++ b/aries_cloudagent/transport/outbound/manager.py @@ -234,6 +234,29 @@ def get_transport_instance(self, transport_id: str) -> BaseOutboundTransport: """Get an instance of a running transport by ID.""" return self.running_transports[transport_id] + + async def encode_message_external_queue(self, profile: Profile, outbound: OutboundMessage): + """ + Add an outbound message to the queue. + + Args: + profile: The active profile for the request + outbound: The outbound message to deliver + """ + targets = [outbound.target] if outbound.target else (outbound.target_list or []) + transport_id = None + for target in targets: + endpoint = target.endpoint + + queued = QueuedOutboundMessage(profile, outbound, target, None) + + if queued.message and queued.message.enc_payload: + queued.payload = queued.message.enc_payload + else: + await self.perform_encode(queued) + + return queued + def enqueue_message(self, profile: Profile, outbound: OutboundMessage): """ Add an outbound message to the queue. @@ -423,8 +446,9 @@ def encode_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task: async def perform_encode(self, queued: QueuedOutboundMessage): """Perform message encoding.""" - transport = self.get_transport_instance(queued.transport_id) - wire_format = transport.wire_format or self.context.inject(BaseWireFormat) + #transport = self.get_transport_instance(queued.transport_id) + #wire_format = transport.wire_format or self.context.inject(BaseWireFormat) + wire_format = self.context.inject(BaseWireFormat) session = await queued.profile.session() queued.payload = await wire_format.encode_message( session,