From f853216ba96836e146bbebbe00d3603ab9ed9d4d Mon Sep 17 00:00:00 2001 From: Ani Sinha Date: Wed, 1 Apr 2026 12:50:48 +0530 Subject: [PATCH 1/3] feat(dhcp): Add network manager lease parsing capability (#6829) DHCP leases can be directly obtained from network manager through appropriate command to the network manager cli. Add a couple of helper functions to get the lease information from network manager. In a subsequent patch, we will use the helper function from cloud stack datasource to get the lease information from network manager. Signed-off-by: Ani Sinha Co-authored-by: Chad Smith --- cloudinit/net/dhcp.py | 83 ++++++++++++++++++++++++++++++++ tests/unittests/net/test_dhcp.py | 67 ++++++++++++++++++++++++++ 2 files changed, 150 insertions(+) diff --git a/cloudinit/net/dhcp.py b/cloudinit/net/dhcp.py index 6620b15da26..8eb17b2d7c4 100644 --- a/cloudinit/net/dhcp.py +++ b/cloudinit/net/dhcp.py @@ -29,6 +29,7 @@ DHCLIENT_FALLBACK_LEASE_DIR = "/var/lib/dhclient" # Match something.lease or something.leases DHCLIENT_FALLBACK_LEASE_REGEX = r".+\.leases?$" +NMCLI = "nmcli" UDHCPC_SCRIPT = """#!/bin/sh log() { echo "udhcpc[$PPID]" "$interface: $2" @@ -147,6 +148,88 @@ def networkd_get_option_from_leases(keyname, leases_d=None): return None +def run_nmcli(nmcli_opts): + nmcli_path = subp.which(NMCLI) + if not nmcli_path: + raise NoDHCPLeaseMissingDhclientError() + + command = [nmcli_path] + nmcli_opts + try: + out, _ = subp.subp( + command, + ) + except subp.ProcessExecutionError as error: + LOG.debug( + "nmcli command exited with code: %s stderr: %r stdout: %r", + error.exit_code, + error.stderr, + error.stdout, + ) + raise NoDHCPLeaseError from error + return out + + +def network_manager_load_leases(device, dhcp6=False): + """Return a dictionary of lease options as + returned by network manager cli""" + + if dhcp6: + opts = ["--fields", "DHCP6", "device", "show"] + else: + opts = ["--fields", "DHCP4", "device", "show"] + + opts.append(device) + nmcli_out = run_nmcli(opts) + + content = [] + for line in nmcli_out.splitlines(): + line = line.partition(":")[2].strip() + content.append(line) + + return dict(configobj.ConfigObj(content, list_values=False)) + + +def find_correct_device_nmcli(device): + """If device is specified, return the value of the lease parameter + specified by 'keyname' for that device. Else return the lease value for + the first connected device as returned by 'nmcli'""" + + device_list_opts = ["--terse", "device"] + nmcli_out = run_nmcli(device_list_opts) + + for line in nmcli_out.splitlines(): + if line == "": + continue + dev_name = line.split(":")[0] + state = line.split(":")[2] + + # skip devices that are not connected + if state != "connected": + continue + # skip loopback + if dev_name == "lo": + continue + # if no device name is passed, use the first one found. + if device is None: + return dev_name + # else use the name of the device passed. + elif device and device.lower() == dev_name.lower(): + return dev_name + return None + + +def network_manager_get_option_from_leases(keyname, device=None, dhcp6=False): + + leases = None + dev = find_correct_device_nmcli(device) + if dev: + leases = network_manager_load_leases(dev, dhcp6) + + if leases and leases.get(keyname): + return leases[keyname] + return None + + class DhcpClient(abc.ABC): client_name = "" timeout = 10 diff --git a/tests/unittests/net/test_dhcp.py b/tests/unittests/net/test_dhcp.py index ec1b8ef83ac..7aa77f10ddb 100644 --- a/tests/unittests/net/test_dhcp.py +++ b/tests/unittests/net/test_dhcp.py @@ -20,7 +20,9 @@ NoDHCPLeaseInterfaceError, NoDHCPLeaseMissingDhclientError, Udhcpc, + find_correct_device_nmcli, maybe_perform_dhcp_discovery, + network_manager_load_leases, networkd_load_leases, ) from cloudinit.net.ephemeral import EphemeralDHCPv4 @@ -1392,3 +1394,68 @@ def test_none_and_missing_fallback(self): with pytest.raises(NoDHCPLeaseInterfaceError): distro = mock.Mock(fallback_interface=None) maybe_perform_dhcp_discovery(distro, None) + + +class TestNMDhcpLeases: + def test_find_correct_device_firstmatch(self): + with mock.patch( + "cloudinit.net.dhcp.run_nmcli", + return_value=dedent( + """ + ens160:ethernet:connected:Wired connection 1 + ens256:ethernet:connected:Wired connection 2 + lo:loopback:connected (externally):lo + """ + ), + ): + ret = find_correct_device_nmcli(None) + assert ret == "ens160" + + def test_find_correct_device_nomatch(self): + with mock.patch( + "cloudinit.net.dhcp.run_nmcli", + return_value=dedent( + """ + ens160:ethernet:connected:Wired connection 1 + ens256:ethernet:connected:Wired connection 2 + lo:loopback:connected (externally):lo + """ + ), + ): + ret = find_correct_device_nmcli("ens250") + assert ret is None + + def test_find_correct_device_second_match(self): + with mock.patch( + "cloudinit.net.dhcp.run_nmcli", + return_value=dedent( + """ + ens160:ethernet:connected:Wired connection 1 + ens256:ethernet:connected:Wired connection 2 + lo:loopback:connected (externally):lo + """ + ), + ): + ret = find_correct_device_nmcli("ens256") + assert ret == "ens256" + + def test_network_manager_load_leases(self): + expected_return = { + "broadcast_address": "172.16.127.255", + "dhcp_client_identifier": "01:00:0c:29:bf:c5:56", + "dhcp_lease_time": "1800", + "dhcp_server_identifier": "172.16.127.254", + } + with mock.patch( + "cloudinit.net.dhcp.run_nmcli", + return_value=dedent( + """ + DHCP4.OPTION[1]: broadcast_address = 172.16.127.255 + DHCP4.OPTION[2]: dhcp_client_identifier = 01:00:0c:29:bf:c5:56 + DHCP4.OPTION[3]: dhcp_lease_time = 1800 + DHCP4.OPTION[4]: dhcp_server_identifier = 172.16.127.254 + """ + ), + ): + ret = network_manager_load_leases("ens10") + assert ret == expected_return From 7f01176df25b0319891c2d5043393d682b05849e Mon Sep 17 00:00:00 2001 From: Ani Sinha Date: Wed, 1 Apr 2026 18:22:47 +0530 Subject: [PATCH 2/3] test: fix test_get_domainname_isc_dhclient mock leak (#6829) Without mocking out dhcp.IscDhclient.get_newest_lease_file_from_distro, the functions on some platforms returns None. This means get_key_from_latest_lease() bails out early without calling parse_leases() to parse the lease file. Fix it. Signed-off-by: Ani Sinha --- tests/unittests/sources/test_cloudstack.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/unittests/sources/test_cloudstack.py b/tests/unittests/sources/test_cloudstack.py index f276f274e64..9e524cd2e95 100644 --- a/tests/unittests/sources/test_cloudstack.py +++ b/tests/unittests/sources/test_cloudstack.py @@ -108,6 +108,10 @@ def test_get_domainname_isc_dhclient(self, cloudstack_ds, mocker): DHCP_MOD_PATH + ".networkd_get_option_from_leases", get_networkd_domain, ) + mocker.patch( + MOD_PATH + ".dhcp.IscDhclient.get_newest_lease_file_from_distro", + return_value=True, + ) with patch( MOD_PATH + ".util.load_text_file", From 96a8a02c884e7cf4a4adf259752fb8741d360d22 Mon Sep 17 00:00:00 2001 From: Ani Sinha Date: Wed, 1 Apr 2026 13:14:05 +0530 Subject: [PATCH 3/3] fix(cloudstack): get domain name information from network manager leases (#6829) Red Hat uses network manager as the supported dhcp client. If network manager cli is available, we should try to get domain name information directly from the network manager leases before asking distro specific dhcp client (dhcpcd by default). Signed-off-by: Ani Sinha Co-authored-by: Chad Smith --- cloudinit/sources/DataSourceCloudStack.py | 15 +- tests/unittests/sources/test_cloudstack.py | 172 +++++++++++++++++++++ 2 files changed, 186 insertions(+), 1 deletion(-) diff --git a/cloudinit/sources/DataSourceCloudStack.py b/cloudinit/sources/DataSourceCloudStack.py index 265727cd60c..bf63605c62b 100644 --- a/cloudinit/sources/DataSourceCloudStack.py +++ b/cloudinit/sources/DataSourceCloudStack.py @@ -90,6 +90,7 @@ def _get_domainname(self): """Try obtaining a "domain-name" DHCP lease parameter: - From systemd-networkd lease (case-insensitive) - From ISC dhclient + - From network manager dhcp client - From dhcpcd (ephemeral) - Return empty string if not found (non-fatal) """ @@ -113,7 +114,19 @@ def _get_domainname(self): LOG.debug( "Could not obtain FQDN from ISC dhclient leases. Falling back to " - "%s", + "Network Manager leases" + ) + with suppress( + dhcp.NoDHCPLeaseMissingDhclientError, dhcp.NoDHCPLeaseError + ): + domain_name = dhcp.network_manager_get_option_from_leases( + "domain_name" + ) + if domain_name: + return domain_name.strip() + + LOG.debug( + "Could not obtain FQDN from NM leases. Falling back to %s", self.distro.dhcp_client.client_name, ) try: diff --git a/tests/unittests/sources/test_cloudstack.py b/tests/unittests/sources/test_cloudstack.py index 9e524cd2e95..a29fc4554b1 100644 --- a/tests/unittests/sources/test_cloudstack.py +++ b/tests/unittests/sources/test_cloudstack.py @@ -67,6 +67,7 @@ def setup(self, mocker, tmp_path): self.hostname = "vm-hostname" self.networkd_domainname = "networkd.local" self.isc_dhclient_domainname = "dhclient.local" + self.nm_domainname = "nm.local" get_hostname_parent = mock.MagicMock( return_value=DataSourceHostname(self.hostname, True) @@ -145,6 +146,171 @@ def test_get_domainname_isc_dhclient(self, cloudstack_ds, mocker): result = cloudstack_ds._get_domainname() assert self.isc_dhclient_domainname == result + def test_get_domainname_network_manager(self, cloudstack_ds, mocker): + """ + Test if DataSourceCloudStack._get_domainname() + gets domain name from nmcli lease information + """ + nmcliop = f"DHCP4.OPTION[5]: domain_name = {self.nm_domainname}" + get_networkd_domain = mock.MagicMock(return_value=None) + mocker.patch( + DHCP_MOD_PATH + ".networkd_get_option_from_leases", + get_networkd_domain, + ) + mocker.patch( + MOD_PATH + ".dhcp.IscDhclient.get_newest_lease_file_from_distro", + return_value=True, + ) + + mocker.patch( + MOD_PATH + ".util.load_text_file", + return_value=None, + ) + + mocker.patch( + DHCP_MOD_PATH + ".find_correct_device_nmcli", + return_value="ens120", + ) + + with patch( + DHCP_MOD_PATH + ".run_nmcli", + return_value=dedent( + """ + DHCP4.OPTION[1]: broadcast_address = 172.16.127.255 + DHCP4.OPTION[2]: dhcp_client_identifier = 01:00:0c:29:bf:c5:56 + DHCP4.OPTION[3]: dhcp_lease_time = 1800 + DHCP4.OPTION[4]: dhcp_server_identifier = 172.16.127.254 + """ + + nmcliop + + """ + DHCP4.OPTION[6]: domain_name_servers = 172.16.127.2 + DHCP4.OPTION[7]: expiry = 1775029195 + DHCP4.OPTION[8]: ip_address = 172.16.127.135 + DHCP4.OPTION[9]: next_server = 172.16.127.254 + """ + ), + ): + result = cloudstack_ds._get_domainname() + assert self.nm_domainname == result + + def test_get_domainname_nm_multi_conn_nic(self, cloudstack_ds, mocker): + """ + Test if DataSourceCloudStack._get_domainname() + can handle multi connected nic environments. + """ + domain_name = f"{self.nm_domainname}" + get_networkd_domain = mock.MagicMock(return_value=None) + mocker.patch( + DHCP_MOD_PATH + ".networkd_get_option_from_leases", + get_networkd_domain, + ) + mocker.patch( + MOD_PATH + ".dhcp.IscDhclient.get_newest_lease_file_from_distro", + return_value=True, + ) + + mocker.patch( + MOD_PATH + ".util.load_text_file", + return_value=None, + ) + + mocker.patch( + DHCP_MOD_PATH + ".run_nmcli", + return_value=dedent( + """ + ens160:ethernet:connected:Wired connection 1 + ens256:ethernet:connected:Wired connection 2 + lo:loopback:connected (externally):lo + """ + ), + ) + + with patch( + DHCP_MOD_PATH + ".network_manager_load_leases", + return_value={"domain_name": domain_name}, + ): + result = cloudstack_ds._get_domainname() + assert self.nm_domainname == result + + def test_get_domainname_nm_single_conn_nic(self, cloudstack_ds, mocker): + """ + Test if DataSourceCloudStack._get_domainname() + can handle one connected nic environments. + """ + domain_name = f"{self.nm_domainname}" + get_networkd_domain = mock.MagicMock(return_value=None) + mocker.patch( + DHCP_MOD_PATH + ".networkd_get_option_from_leases", + get_networkd_domain, + ) + mocker.patch( + MOD_PATH + ".dhcp.IscDhclient.get_newest_lease_file_from_distro", + return_value=True, + ) + + mocker.patch( + MOD_PATH + ".util.load_text_file", + return_value=None, + ) + + mocker.patch( + DHCP_MOD_PATH + ".run_nmcli", + return_value=dedent( + """ + ens160:ethernet:connected:Wired connection 1 + lo:loopback:connected (externally):lo + ens256:ethernet:unavailable: + """ + ), + ) + + with patch( + DHCP_MOD_PATH + ".network_manager_load_leases", + return_value={"domain_name": domain_name}, + ): + result = cloudstack_ds._get_domainname() + assert self.nm_domainname == result + + def test_get_domainname_nm_no_conn_nic( + self, cloudstack_ds, mocker, caplog + ): + """ + Test if DataSourceCloudStack._get_domainname() + can handle one connected nic environments. + """ + get_networkd_domain = mock.MagicMock(return_value=None) + mocker.patch( + DHCP_MOD_PATH + ".networkd_get_option_from_leases", + get_networkd_domain, + ) + mocker.patch( + MOD_PATH + ".dhcp.IscDhclient.get_newest_lease_file_from_distro", + return_value=True, + ) + + mocker.patch( + MOD_PATH + ".util.load_text_file", + return_value=None, + ) + + mocker.patch( + DHCP_MOD_PATH + ".run_nmcli", + return_value=dedent( + """ + lo:loopback:connected (externally):lo + ens256:ethernet:unavailable: + """ + ), + ) + + result = cloudstack_ds._get_domainname() + assert "Could not obtain FQDN from NM leases" in caplog.text + assert ( + "No domain name found in any DHCP lease; returning empty" + in caplog.text + ) + assert result == "" + def test_get_hostname_non_fqdn(self, cloudstack_ds): """ Test get_hostname() method implementation @@ -190,6 +356,12 @@ def test_get_hostname_fqdn_fallback(self, cloudstack_ds, mocker): get_networkd_domain, ) + get_nm_domain = mock.MagicMock(return_value=None) + mocker.patch( + DHCP_MOD_PATH + ".network_manager_get_option_from_leases", + get_nm_domain, + ) + mocker.patch( "cloudinit.distros.net.find_fallback_nic", return_value="eth0",