Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions frontend/csi/node_server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 NetApp, Inc. All Rights Reserved.
// Copyright 2025 NetApp, Inc. All Rights Reserved.

package csi

Expand Down Expand Up @@ -31,6 +31,7 @@ const (
tridentDeviceInfoPath = "/var/lib/trident/tracking"
lockID = "csi_node_server"
AttachISCSIVolumeTimeoutShort = 20 * time.Second
ResizeISCSIVolumeTimeout = 20 * time.Second
iSCSINodeUnstageMaxDuration = 15 * time.Second
iSCSISelfHealingLockContext = "ISCSISelfHealingThread"
nvmeSelfHealingLockContext = "NVMeSelfHealingThread"
Expand Down Expand Up @@ -534,26 +535,16 @@ func nodePrepareISCSIVolumeForExpansion(
"filesystemType": publishInfo.FilesystemType,
}).Debug("PublishInfo for block device to expand.")

var err error

// Make sure device is ready.
if utils.IsAlreadyAttached(ctx, lunID, publishInfo.IscsiTargetIQN) {
// Rescan device to detect increased size.
if err = utils.ISCSIRescanDevices(
ctx, publishInfo.IscsiTargetIQN, publishInfo.IscsiLunNumber, requiredBytes); err != nil {
Logc(ctx).WithFields(LogFields{
"device": publishInfo.DevicePath,
"error": err,
}).Error("Unable to scan device.")
err = status.Error(codes.Internal, err.Error())
}
} else {
err = fmt.Errorf("device %s to expand is not attached", publishInfo.DevicePath)
Logc(ctx).WithField("devicePath", publishInfo.DevicePath).WithError(err).Error(
"Unable to expand volume.")
// Resize the volume.
if err := utils.ResizeVolumeRetry(ctx, publishInfo, requiredBytes, ResizeISCSIVolumeTimeout); err != nil {
Logc(ctx).WithFields(LogFields{
"lunID": publishInfo.IscsiLunNumber,
"devicePath": publishInfo.DevicePath,
}).WithError(err).Error("Unable to resize device(s) for LUN.")
return status.Error(codes.Internal, err.Error())
}
return err

return nil
}

// nodePrepareBlockOnFileVolumeForExpansion readies volume expansion for BlockOnFile volumes
Expand Down
113 changes: 81 additions & 32 deletions utils/devices.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 NetApp, Inc. All Rights Reserved.
// Copyright 2025 NetApp, Inc. All Rights Reserved.

package utils

Expand Down Expand Up @@ -141,50 +141,98 @@ func isDeviceUnformatted(ctx context.Context, device string) (bool, error) {
return true, nil
}

// filterDevicesBySize builds a map of disk devices to their size, filtered by a minimum size requirement.
// If any errors occur when checking the size of a device, it captures the error and moves onto the next device.
func filterDevicesBySize(
ctx context.Context, deviceInfo *ScsiDeviceInfo, minSize int64,
) (map[string]int64, error) {
errs := make([]string, 0)
deviceSizeMap := make(map[string]int64, 0)
for _, diskDevice := range deviceInfo.Devices {
size, err := getISCSIDiskSize(ctx, devPrefix+diskDevice)
if err != nil {
errs = append(errs, fmt.Sprintf("failed to get size for disk %s: %s", diskDevice, err))
// Only consider devices whose size can be gathered.
continue
}

if size < minSize {
// Only consider devices that are undersized.
deviceSizeMap[diskDevice] = size
}
}

if len(errs) != 0 {
return nil, fmt.Errorf(strings.Join(errs, ", "))
}
return deviceSizeMap, nil
}

// rescanDevices accepts a map of disk devices to sizes and initiates a rescan for each device.
// If any rescan fails it captures the error and moves onto the next rescanning the next device.
func rescanDevices(ctx context.Context, deviceSizeMap map[string]int64) error {
errs := make([]string, 0)
for diskDevice := range deviceSizeMap {
if err := iSCSIRescanDisk(ctx, diskDevice); err != nil {
errs = append(errs, fmt.Sprintf("failed to rescan disk %s: %s", diskDevice, err))
}
}

if len(errs) != 0 {
return fmt.Errorf(strings.Join(errs, ", "))
}
return nil
}

func ISCSIRescanDevices(ctx context.Context, targetIQN string, lunID int32, minSize int64) error {
GenerateRequestContextForLayer(ctx, LogLayerUtils)

fields := LogFields{"targetIQN": targetIQN, "lunID": lunID}
Logc(ctx).WithFields(fields).Debug(">>>> devices.ISCSIRescanDevices")
defer Logc(ctx).WithFields(fields).Debug("<<<< devices.ISCSIRescanDevices")
Logc(ctx).WithFields(fields).Debug(">>>> iscsi.RescanDevices")
defer Logc(ctx).WithFields(fields).Debug("<<<< iscsi.RescanDevices")

hostSessionMap := IscsiUtils.GetISCSIHostSessionMapForTarget(ctx, targetIQN)
if len(hostSessionMap) == 0 {
return fmt.Errorf("error getting iSCSI device information: no host session found")
}
deviceInfo, err := getDeviceInfoForLUN(ctx, int(lunID), targetIQN, false, false)
if err != nil {
return fmt.Errorf("error getting iSCSI device information: %s", err)
} else if deviceInfo == nil {
return fmt.Errorf("could not get iSCSI device information for LUN: %d", lunID)
}

allLargeEnough := true
for _, diskDevice := range deviceInfo.Devices {
size, err := getISCSIDiskSize(ctx, devPrefix+diskDevice)
if err != nil {
return err
// Get all disk devices that require a rescan.
devicesBySize, err := filterDevicesBySize(ctx, deviceInfo, minSize)
if err != nil {
Logc(ctx).WithError(err).Error("Failed to read disk size for devices.")
return err
}

if len(devicesBySize) != 0 {
fields = LogFields{
"lunID": lunID,
"devices": devicesBySize,
"minSize": minSize,
}
if size < minSize {
allLargeEnough = false
} else {
continue

Logc(ctx).WithFields(fields).Debug("Found devices that require a rescan.")
if err := rescanDevices(ctx, devicesBySize); err != nil {
Logc(ctx).WithError(err).Error("Failed to initiate rescanning for devices.")
return err
}

err = iSCSIRescanDisk(ctx, diskDevice)
// Sleep for a second to give the SCSI subsystem time to rescan the devices.
time.Sleep(time.Second)

// Reread the devices to check if any are undersized.
devicesBySize, err = filterDevicesBySize(ctx, deviceInfo, minSize)
if err != nil {
Logc(ctx).WithField("diskDevice", diskDevice).Error("Failed to rescan disk.")
return fmt.Errorf("failed to rescan disk %s: %s", diskDevice, err)
Logc(ctx).WithError(err).Error("Failed to read disk size for devices after rescan.")
return err
}
}

if !allLargeEnough {
time.Sleep(time.Second)
for _, diskDevice := range deviceInfo.Devices {
size, err := getISCSIDiskSize(ctx, devPrefix+diskDevice)
if err != nil {
return err
}
if size < minSize {
Logc(ctx).Error("Disk size not large enough after resize.")
return fmt.Errorf("disk size not large enough after resize: %d, %d", size, minSize)
}
if len(devicesBySize) != 0 {
Logc(ctx).WithFields(fields).Error("Some devices are still undersized after rescan.")
return errors.New("devices are still undersized after rescan")
}
}

Expand All @@ -198,15 +246,16 @@ func ISCSIRescanDevices(ctx context.Context, targetIQN string, lunID int32, minS
fields = LogFields{"size": size, "minSize": minSize}
if size < minSize {
Logc(ctx).WithFields(fields).Debug("Reloading the multipath device.")
err := reloadMultipathDevice(ctx, multipathDevice)
if err != nil {
if err := reloadMultipathDevice(ctx, multipathDevice); err != nil {
return err
}
time.Sleep(time.Second)
size, err = getISCSIDiskSize(ctx, devPrefix+multipathDevice)

size, err := getISCSIDiskSize(ctx, devPrefix+multipathDevice)
if err != nil {
return err
}

if size < minSize {
Logc(ctx).Error("Multipath device not large enough after resize.")
return fmt.Errorf("multipath device not large enough after resize: %d < %d", size, minSize)
Expand Down
107 changes: 106 additions & 1 deletion utils/iscsi.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 NetApp, Inc. All Rights Reserved.
// Copyright 2025 NetApp, Inc. All Rights Reserved.

package utils

Expand Down Expand Up @@ -410,6 +410,111 @@ func parseInitiatorIQNs(ctx context.Context, contents string) []string {
return iqns
}

// resizeVolume accepts a LUN ID, target IQN, and list of portals.
// It reads the sysfs paths for the LUN and ensures that all devices are readable.
// If sessions are not found for the LUN, it attempts to heal the attachment by establishing sessions on all portals.
// It verifies that the devices are readable and that there is disk to path to portal parity.
// If all checks pass, it initiates rescans on all devices or disks. If any step fails, it returns an error.
func resizeVolume(ctx context.Context, publishInfo *VolumePublishInfo, minSize int64) error {
// Extract the LUN ID, target IQN, and portals from the publish info.
lunID, targetIQN := int(publishInfo.IscsiLunNumber), publishInfo.IscsiTargetIQN
portals := make([]string, 0)
for _, p := range publishInfo.IscsiPortals {
portals = append(portals, ensureHostportFormatted(p))
}
portals = append(portals, ensureHostportFormatted(publishInfo.IscsiTargetPortal))

// Set up logging fields with entry and exit log messages.
fields := LogFields{
"lunID": lunID,
"targetIQN": targetIQN,
"portals": portals,
}
Logc(ctx).WithFields(fields).Debug(">>>> iscsi.resizeVolume")
defer Logc(ctx).WithFields(fields).Debug("<<<< iscsi.resizeVolume")

// Build a set of host sessions.
// From the host sessions, ensure the number of paths in sysfs is congruent with the number of portals.
hostSessions := IscsiUtils.GetISCSIHostSessionMapForTarget(ctx, targetIQN)
paths := IscsiUtils.GetSysfsBlockDirsForLUN(lunID, hostSessions)

// Ensure the number of visible paths is congruent with the number of portals.
// If not, attempt to heal the sessions and initiate a SCSI rescan to recover the path.
// NOTE: Hypothetically, this can lead to momentarily incongruence between device sizes for the same LUN
// on the host, but it should correct itself after the later rescans complete.
if len(paths) != len(portals) {
// If we're missing a session, attempt to heal it. If we can't establish the sessions, return an error.
Logc(ctx).WithFields(fields).Debug("Paths are missing for LUN; attempting to establish missing paths.")
if _, err := EnsureISCSISessions(ctx, publishInfo, portals); err != nil {
return fmt.Errorf("failed to establish sessions for LUN %v; %w", lunID, err)
}

// Grant some time for the SCSI subsystem to process the recovered sessions and paths.
time.Sleep(time.Second)

// Reread the host session map to detect new host session entries.
hostSessions = IscsiUtils.GetISCSIHostSessionMapForTarget(ctx, targetIQN)
paths = IscsiUtils.GetSysfsBlockDirsForLUN(lunID, hostSessions)
}

// Parity should exist between the number of paths and the number of portals.
// If it doesn't fail and retry. If this never resolves, it implies a network connectivity issue.
if len(paths) != len(portals) {
return fmt.Errorf("paths missing for LUN %v; current paths: %v; expected portals: %v", lunID, paths, portals)
}
fields["paths"] = paths

// Scan the target and wait for the device(s) to appear.
if err := waitForDeviceScan(ctx, lunID, targetIQN); err != nil {
return fmt.Errorf("failed to scan for devices for LUN %v; %w", lunID, err)
}

// Ensure that all devices are present and readable.
devices, err := IscsiUtils.GetDevicesForLUN(paths)
if err != nil {
return fmt.Errorf("failed to get devices for LUN %v; %w", lunID, err)
}
fields["devices"] = devices

// At this point, parity should exist between the number of devices and the number of paths.
if len(devices) != len(paths) {
return fmt.Errorf("device and path count are incongruent for LUN %v", lunID)
}

// Initiate rescans across all devices for the LUN.
if err = ISCSIRescanDevices(ctx, targetIQN, int32(lunID), minSize); err != nil {
return fmt.Errorf("failed to rescan devices for LUN %v; %w", lunID, err)
}

return nil
}

func ResizeVolumeRetry(
ctx context.Context, publishInfo *VolumePublishInfo, minSize int64, timeout time.Duration,
) error {
Logc(ctx).Debug(">>>> iscsi.ResizeVolumeRetry")
defer Logc(ctx).Debug("<<<< iscsi.ResizeVolumeRetry")

checkPreconditions := func() error {
return resizeVolume(ctx, publishInfo, minSize)
}

checkNotify := func(err error, duration time.Duration) {
Logc(ctx).WithFields(LogFields{
"increment": duration,
"error": err,
}).Debug("Resize iSCSI volume is not complete, waiting.")
}

checkBackoff := backoff.NewExponentialBackOff()
checkBackoff.InitialInterval = 1 * time.Second
checkBackoff.Multiplier = 1.414 // approx sqrt(2)
checkBackoff.RandomizationFactor = 0.1
checkBackoff.MaxElapsedTime = timeout

return backoff.RetryNotify(checkPreconditions, checkBackoff, checkNotify)
}

// GetSysfsBlockDirsForLUN returns the list of directories in sysfs where the block devices should appear
// after the scan is successful. One directory is returned for each path in the host session map.
func (h *IscsiReconcileHelper) GetSysfsBlockDirsForLUN(lunID int, hostSessionMap map[int]int) []string {
Expand Down
Loading