Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions cloudinit/net/dhcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
DHCLIENT_FALLBACK_LEASE_DIR = "/var/lib/dhclient"
# Match something.lease or something.leases
DHCLIENT_FALLBACK_LEASE_REGEX = r".+\.leases?$"
NMCLI = "nmcli"
UDHCPC_SCRIPT = """#!/bin/sh
log() {
echo "udhcpc[$PPID]" "$interface: $2"
Expand Down Expand Up @@ -147,6 +148,88 @@ def networkd_get_option_from_leases(keyname, leases_d=None):
return None


def run_nmcli(nmcli_opts):
nmcli_path = subp.which(NMCLI)
if not nmcli_path:
raise NoDHCPLeaseMissingDhclientError()
Comment thread
ani-sinha marked this conversation as resolved.

command = [nmcli_path] + nmcli_opts
try:
out, _ = subp.subp(
command,
)
except subp.ProcessExecutionError as error:
LOG.debug(
"nmcli command exited with code: %s stderr: %r stdout: %r",
error.exit_code,
error.stderr,
error.stdout,
)
raise NoDHCPLeaseError from error
return out


def network_manager_load_leases(device, dhcp6=False):
"""Return a dictionary of lease options as
returned by network manager cli"""

if dhcp6:
opts = ["--fields", "DHCP6", "device", "show"]
else:
opts = ["--fields", "DHCP4", "device", "show"]

opts.append(device)
nmcli_out = run_nmcli(opts)

content = []
for line in nmcli_out.splitlines():
line = line.partition(":")[2].strip()
content.append(line)

return dict(configobj.ConfigObj(content, list_values=False))
Comment thread
ani-sinha marked this conversation as resolved.


def find_correct_device_nmcli(device):
"""If device is specified, return the value of the lease parameter
specified by 'keyname' for that device. Else return the lease value for
the first connected device as returned by 'nmcli'"""

device_list_opts = ["--terse", "device"]
nmcli_out = run_nmcli(device_list_opts)

for line in nmcli_out.splitlines():
if line == "":
continue
dev_name = line.split(":")[0]
state = line.split(":")[2]

# skip devices that are not connected
if state != "connected":
continue
# skip loopback
if dev_name == "lo":
continue
# if no device name is passed, use the first one found.
if device is None:
return dev_name
# else use the name of the device passed.
elif device and device.lower() == dev_name.lower():
return dev_name
return None


def network_manager_get_option_from_leases(keyname, device=None, dhcp6=False):

leases = None
dev = find_correct_device_nmcli(device)
if dev:
leases = network_manager_load_leases(dev, dhcp6)

if leases and leases.get(keyname):
return leases[keyname]
return None


class DhcpClient(abc.ABC):
client_name = ""
timeout = 10
Expand Down
15 changes: 14 additions & 1 deletion cloudinit/sources/DataSourceCloudStack.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def _get_domainname(self):
"""Try obtaining a "domain-name" DHCP lease parameter:
- From systemd-networkd lease (case-insensitive)
- From ISC dhclient
- From network manager dhcp client
- From dhcpcd (ephemeral)
- Return empty string if not found (non-fatal)
"""
Expand All @@ -113,7 +114,19 @@ def _get_domainname(self):

LOG.debug(
"Could not obtain FQDN from ISC dhclient leases. Falling back to "
"%s",
"Network Manager leases"
)
with suppress(
dhcp.NoDHCPLeaseMissingDhclientError, dhcp.NoDHCPLeaseError
):
domain_name = dhcp.network_manager_get_option_from_leases(
"domain_name"
)
if domain_name:
return domain_name.strip()

Comment thread
ani-sinha marked this conversation as resolved.
LOG.debug(
"Could not obtain FQDN from NM leases. Falling back to %s",
self.distro.dhcp_client.client_name,
)
try:
Expand Down
67 changes: 67 additions & 0 deletions tests/unittests/net/test_dhcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
NoDHCPLeaseInterfaceError,
NoDHCPLeaseMissingDhclientError,
Udhcpc,
find_correct_device_nmcli,
maybe_perform_dhcp_discovery,
network_manager_load_leases,
networkd_load_leases,
)
from cloudinit.net.ephemeral import EphemeralDHCPv4
Expand Down Expand Up @@ -1392,3 +1394,68 @@ def test_none_and_missing_fallback(self):
with pytest.raises(NoDHCPLeaseInterfaceError):
distro = mock.Mock(fallback_interface=None)
maybe_perform_dhcp_discovery(distro, None)


class TestNMDhcpLeases:
def test_find_correct_device_firstmatch(self):
with mock.patch(
"cloudinit.net.dhcp.run_nmcli",
return_value=dedent(
"""
ens160:ethernet:connected:Wired connection 1
ens256:ethernet:connected:Wired connection 2
lo:loopback:connected (externally):lo
"""
),
):
ret = find_correct_device_nmcli(None)
assert ret == "ens160"

def test_find_correct_device_nomatch(self):
with mock.patch(
"cloudinit.net.dhcp.run_nmcli",
return_value=dedent(
"""
ens160:ethernet:connected:Wired connection 1
ens256:ethernet:connected:Wired connection 2
lo:loopback:connected (externally):lo
"""
),
):
ret = find_correct_device_nmcli("ens250")
assert ret is None

def test_find_correct_device_second_match(self):
with mock.patch(
"cloudinit.net.dhcp.run_nmcli",
return_value=dedent(
"""
ens160:ethernet:connected:Wired connection 1
ens256:ethernet:connected:Wired connection 2
lo:loopback:connected (externally):lo
"""
),
):
ret = find_correct_device_nmcli("ens256")
assert ret == "ens256"

def test_network_manager_load_leases(self):
expected_return = {
"broadcast_address": "172.16.127.255",
"dhcp_client_identifier": "01:00:0c:29:bf:c5:56",
"dhcp_lease_time": "1800",
"dhcp_server_identifier": "172.16.127.254",
}
with mock.patch(
"cloudinit.net.dhcp.run_nmcli",
return_value=dedent(
"""
DHCP4.OPTION[1]: broadcast_address = 172.16.127.255
DHCP4.OPTION[2]: dhcp_client_identifier = 01:00:0c:29:bf:c5:56
DHCP4.OPTION[3]: dhcp_lease_time = 1800
DHCP4.OPTION[4]: dhcp_server_identifier = 172.16.127.254
"""
),
):
ret = network_manager_load_leases("ens10")
assert ret == expected_return
Loading
Loading