Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion aries_cloudagent/core/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,11 @@ async def _queue_external(
targets = (
[outbound.target] if outbound.target else (outbound.target_list or [])
)
queue = await self.outbound_transport_manager.encode_message_external_queue(profile, outbound)

for target in targets:
await self.outbound_queue.enqueue_message(
outbound.payload, target.endpoint
queue.payload, target.endpoint
)

return OutboundSendStatus.SENT_TO_EXTERNAL_QUEUE
Expand Down
28 changes: 26 additions & 2 deletions aries_cloudagent/transport/outbound/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,29 @@ def get_transport_instance(self, transport_id: str) -> BaseOutboundTransport:
"""Get an instance of a running transport by ID."""
return self.running_transports[transport_id]


async def encode_message_external_queue(self, profile: Profile, outbound: OutboundMessage):
"""
Add an outbound message to the queue.

Args:
profile: The active profile for the request
outbound: The outbound message to deliver
"""
targets = [outbound.target] if outbound.target else (outbound.target_list or [])
transport_id = None
for target in targets:
endpoint = target.endpoint

queued = QueuedOutboundMessage(profile, outbound, target, None)

if queued.message and queued.message.enc_payload:
queued.payload = queued.message.enc_payload
else:
await self.perform_encode(queued)

return queued

def enqueue_message(self, profile: Profile, outbound: OutboundMessage):
"""
Add an outbound message to the queue.
Expand Down Expand Up @@ -423,8 +446,9 @@ def encode_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task:

async def perform_encode(self, queued: QueuedOutboundMessage):
"""Perform message encoding."""
transport = self.get_transport_instance(queued.transport_id)
wire_format = transport.wire_format or self.context.inject(BaseWireFormat)
#transport = self.get_transport_instance(queued.transport_id)
#wire_format = transport.wire_format or self.context.inject(BaseWireFormat)
wire_format = self.context.inject(BaseWireFormat)
session = await queued.profile.session()
queued.payload = await wire_format.encode_message(
session,
Expand Down