Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 76 additions & 107 deletions apps/controller_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@
HCI_READ_BD_ADDR_COMMAND,
HCI_READ_BUFFER_SIZE_COMMAND,
HCI_READ_LOCAL_NAME_COMMAND,
HCI_SUCCESS,
CodecID,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_LE_Read_Buffer_Size_Command,
HCI_LE_Read_Buffer_Size_V2_Command,
HCI_LE_Read_Maximum_Advertising_Data_Length_Command,
Expand All @@ -59,87 +55,74 @@
from bumble.transport import open_transport


# -----------------------------------------------------------------------------
def command_succeeded(response):
if isinstance(response, HCI_Command_Status_Event):
return response.status == HCI_SUCCESS
if isinstance(response, HCI_Command_Complete_Event):
return response.return_parameters.status == HCI_SUCCESS
return False


# -----------------------------------------------------------------------------
async def get_classic_info(host: Host) -> None:
if host.supports_command(HCI_READ_BD_ADDR_COMMAND):
response = await host.send_command(HCI_Read_BD_ADDR_Command())
if command_succeeded(response):
print()
print(
color('Public Address:', 'yellow'),
response.return_parameters.bd_addr.to_string(False),
)
response1 = await host.send_sync_command(HCI_Read_BD_ADDR_Command())
print()
print(
color('Public Address:', 'yellow'),
response1.bd_addr.to_string(False),
)

if host.supports_command(HCI_READ_LOCAL_NAME_COMMAND):
response = await host.send_command(HCI_Read_Local_Name_Command())
if command_succeeded(response):
print()
print(
color('Local Name:', 'yellow'),
map_null_terminated_utf8_string(response.return_parameters.local_name),
)
response2 = await host.send_sync_command(HCI_Read_Local_Name_Command())
print()
print(
color('Local Name:', 'yellow'),
map_null_terminated_utf8_string(response2.local_name),
)


# -----------------------------------------------------------------------------
async def get_le_info(host: Host) -> None:
print()

if host.supports_command(HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND):
response = await host.send_command(
response1 = await host.send_sync_command(
HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command()
)
if command_succeeded(response):
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response.return_parameters.num_supported_advertising_sets,
'\n',
)
print(
color('LE Number Of Supported Advertising Sets:', 'yellow'),
response1.num_supported_advertising_sets,
'\n',
)

if host.supports_command(HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND):
response = await host.send_command(
response2 = await host.send_sync_command(
HCI_LE_Read_Maximum_Advertising_Data_Length_Command()
)
if command_succeeded(response):
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response.return_parameters.max_advertising_data_length,
'\n',
)
print(
color('LE Maximum Advertising Data Length:', 'yellow'),
response2.max_advertising_data_length,
'\n',
)

if host.supports_command(HCI_LE_READ_MAXIMUM_DATA_LENGTH_COMMAND):
response = await host.send_command(HCI_LE_Read_Maximum_Data_Length_Command())
if command_succeeded(response):
print(
color('Maximum Data Length:', 'yellow'),
(
f'tx:{response.return_parameters.supported_max_tx_octets}/'
f'{response.return_parameters.supported_max_tx_time}, '
f'rx:{response.return_parameters.supported_max_rx_octets}/'
f'{response.return_parameters.supported_max_rx_time}'
),
'\n',
)
response3 = await host.send_sync_command(
HCI_LE_Read_Maximum_Data_Length_Command()
)
print(
color('Maximum Data Length:', 'yellow'),
(
f'tx:{response3.supported_max_tx_octets}/'
f'{response3.supported_max_tx_time}, '
f'rx:{response3.supported_max_rx_octets}/'
f'{response3.supported_max_rx_time}'
),
'\n',
)

if host.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
response = await host.send_command(
response4 = await host.send_sync_command(
HCI_LE_Read_Suggested_Default_Data_Length_Command()
)
if command_succeeded(response):
print(
color('Suggested Default Data Length:', 'yellow'),
f'{response.return_parameters.suggested_max_tx_octets}/'
f'{response.return_parameters.suggested_max_tx_time}',
'\n',
)
print(
color('Suggested Default Data Length:', 'yellow'),
f'{response4.suggested_max_tx_octets}/'
f'{response4.suggested_max_tx_time}',
'\n',
)

print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
Expand All @@ -151,37 +134,31 @@ async def get_flow_control_info(host: Host) -> None:
print()

if host.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
response = await host.send_command(
HCI_Read_Buffer_Size_Command(), check_result=True
)
response1 = await host.send_sync_command(HCI_Read_Buffer_Size_Command())
print(
color('ACL Flow Control:', 'yellow'),
f'{response.return_parameters.hc_total_num_acl_data_packets} '
f'packets of size {response.return_parameters.hc_acl_data_packet_length}',
f'{response1.hc_total_num_acl_data_packets} '
f'packets of size {response1.hc_acl_data_packet_length}',
)

if host.supports_command(HCI_LE_READ_BUFFER_SIZE_V2_COMMAND):
response = await host.send_command(
HCI_LE_Read_Buffer_Size_V2_Command(), check_result=True
)
response2 = await host.send_sync_command(HCI_LE_Read_Buffer_Size_V2_Command())
print(
color('LE ACL Flow Control:', 'yellow'),
f'{response.return_parameters.total_num_le_acl_data_packets} '
f'packets of size {response.return_parameters.le_acl_data_packet_length}',
f'{response2.total_num_le_acl_data_packets} '
f'packets of size {response2.le_acl_data_packet_length}',
)
print(
color('LE ISO Flow Control:', 'yellow'),
f'{response.return_parameters.total_num_iso_data_packets} '
f'packets of size {response.return_parameters.iso_data_packet_length}',
f'{response2.total_num_iso_data_packets} '
f'packets of size {response2.iso_data_packet_length}',
)
elif host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await host.send_command(
HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
response3 = await host.send_sync_command(HCI_LE_Read_Buffer_Size_Command())
print(
color('LE ACL Flow Control:', 'yellow'),
f'{response.return_parameters.total_num_le_acl_data_packets} '
f'packets of size {response.return_parameters.le_acl_data_packet_length}',
f'{response3.total_num_le_acl_data_packets} '
f'packets of size {response3.le_acl_data_packet_length}',
)


Expand All @@ -190,52 +167,44 @@ async def get_codecs_info(host: Host) -> None:
print()

if host.supports_command(HCI_Read_Local_Supported_Codecs_V2_Command.op_code):
response = await host.send_command(
HCI_Read_Local_Supported_Codecs_V2_Command(), check_result=True
response1 = await host.send_sync_command(
HCI_Read_Local_Supported_Codecs_V2_Command()
)
print(color('Codecs:', 'yellow'))

for codec_id, transport in zip(
response.return_parameters.standard_codec_ids,
response.return_parameters.standard_codec_transports,
response1.standard_codec_ids,
response1.standard_codec_transports,
):
transport_name = HCI_Read_Local_Supported_Codecs_V2_Command.Transport(
transport
).name
codec_name = CodecID(codec_id).name
print(f' {codec_name} - {transport_name}')
print(f' {codec_id.name} - {transport.name}')

for codec_id, transport in zip(
response.return_parameters.vendor_specific_codec_ids,
response.return_parameters.vendor_specific_codec_transports,
for vendor_codec_id, vendor_transport in zip(
response1.vendor_specific_codec_ids,
response1.vendor_specific_codec_transports,
):
transport_name = HCI_Read_Local_Supported_Codecs_V2_Command.Transport(
transport
).name
company = name_or_number(COMPANY_IDENTIFIERS, codec_id >> 16)
print(f' {company} / {codec_id & 0xFFFF} - {transport_name}')
company = name_or_number(COMPANY_IDENTIFIERS, vendor_codec_id >> 16)
print(f' {company} / {vendor_codec_id & 0xFFFF} - {vendor_transport.name}')

if not response.return_parameters.standard_codec_ids:
if not response1.standard_codec_ids:
print(' No standard codecs')
if not response.return_parameters.vendor_specific_codec_ids:
if not response1.vendor_specific_codec_ids:
print(' No Vendor-specific codecs')

if host.supports_command(HCI_Read_Local_Supported_Codecs_Command.op_code):
response = await host.send_command(
HCI_Read_Local_Supported_Codecs_Command(), check_result=True
response2 = await host.send_sync_command(
HCI_Read_Local_Supported_Codecs_Command()
)
print(color('Codecs (BR/EDR):', 'yellow'))
for codec_id in response.return_parameters.standard_codec_ids:
codec_name = CodecID(codec_id).name
print(f' {codec_name}')
for codec_id in response2.standard_codec_ids:
print(f' {codec_id.name}')

for codec_id in response.return_parameters.vendor_specific_codec_ids:
company = name_or_number(COMPANY_IDENTIFIERS, codec_id >> 16)
print(f' {company} / {codec_id & 0xFFFF}')
for vendor_codec_id in response2.vendor_specific_codec_ids:
company = name_or_number(COMPANY_IDENTIFIERS, vendor_codec_id >> 16)
print(f' {company} / {vendor_codec_id & 0xFFFF}')

if not response.return_parameters.standard_codec_ids:
if not response2.standard_codec_ids:
print(' No standard codecs')
if not response.return_parameters.vendor_specific_codec_ids:
if not response2.vendor_specific_codec_ids:
print(' No Vendor-specific codecs')


Expand Down
20 changes: 11 additions & 9 deletions apps/controller_loopback.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def on_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes):
print(color('@@@ Received last packet', 'green'))
self.done.set()

async def run(self):
async def run(self) -> None:
"""Run a loopback throughput test"""
print(color('>>> Connecting to HCI...', 'green'))
async with await open_transport(self.transport) as (
Expand All @@ -100,11 +100,15 @@ async def run(self):
# make sure data can fit in one l2cap pdu
l2cap_header_size = 4

max_packet_size = (
packet_queue = (
host.acl_packet_queue
if host.acl_packet_queue
else host.le_acl_packet_queue
).max_packet_size - l2cap_header_size
)
if packet_queue is None:
print(color('!!! No packet queue', 'red'))
return
max_packet_size = packet_queue.max_packet_size - l2cap_header_size
if self.packet_size > max_packet_size:
print(
color(
Expand All @@ -128,20 +132,18 @@ async def run(self):
loopback_mode = LoopbackMode.LOCAL

print(color('### Setting loopback mode', 'blue'))
await host.send_command(
await host.send_sync_command(
HCI_Write_Loopback_Mode_Command(loopback_mode=LoopbackMode.LOCAL),
check_result=True,
)

print(color('### Checking loopback mode', 'blue'))
response = await host.send_command(
HCI_Read_Loopback_Mode_Command(), check_result=True
)
if response.return_parameters.loopback_mode != loopback_mode:
response = await host.send_sync_command(HCI_Read_Loopback_Mode_Command())
if response.loopback_mode != loopback_mode:
print(color('!!! Loopback mode mismatch', 'red'))
return

await self.connection_event.wait()
assert self.connection_handle is not None
print(color('### Connected', 'cyan'))

print(color('=== Start sending', 'magenta'))
Expand Down
2 changes: 1 addition & 1 deletion bumble/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def on_hci_command_packet(self, command: hci.HCI_Command) -> None:
hci.HCI_Command_Complete_Event(
num_hci_command_packets=1,
command_opcode=command.op_code,
return_parameters=result,
return_parameters=hci.HCI_GenericReturnParameters(data=result),
)
)

Expand Down
2 changes: 1 addition & 1 deletion bumble/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ class DeviceClass:
# pylint: enable=line-too-long

@staticmethod
def split_class_of_device(class_of_device):
def split_class_of_device(class_of_device: int) -> tuple[int, int, int]:
# Split the bit fields of the composite class of device value into:
# (service_classes, major_device_class, minor_device_class)
return (
Expand Down
Loading
Loading