Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 188 additions & 7 deletions cmd/agent/daemon/pipeline/storage_info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package pipeline
import (
"bufio"
"context"
"encoding/json"
"fmt"
"math"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
Expand All @@ -18,6 +20,8 @@ import (
"golang.org/x/sys/unix"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/api/resource"

kubepb "github.com/castai/kvisor/api/v1/kube"
"github.com/castai/kvisor/pkg/logging"
Expand Down Expand Up @@ -82,13 +86,15 @@ type FilesystemMetric struct {

// NodeStatsSummaryMetric represents node-level filesystem statistics from kubelet
type NodeStatsSummaryMetric struct {
NodeName string `avro:"node_name"`
NodeTemplate *string `avro:"node_template"`
ImageFsSizeBytes *int64 `avro:"image_fs_size_bytes"`
ImageFsUsedBytes *int64 `avro:"image_fs_used_bytes"`
ContainerFsSizeBytes *int64 `avro:"container_fs_size_bytes"`
ContainerFsUsedBytes *int64 `avro:"container_fs_used_bytes"`
Timestamp time.Time `avro:"ts"`
NodeName string `avro:"node_name"`
NodeTemplate *string `avro:"node_template"`
ImageFsSizeBytes *int64 `avro:"image_fs_size_bytes"`
ImageFsUsedBytes *int64 `avro:"image_fs_used_bytes"`
ContainerFsSizeBytes *int64 `avro:"container_fs_size_bytes"`
ContainerFsUsedBytes *int64 `avro:"container_fs_used_bytes"`
NodeFsCapacityBytes *int64 `avro:"node_fs_capacity_bytes"`
NodeFsAllocatableBytes *int64 `avro:"node_fs_allocatable_bytes"`
Timestamp time.Time `avro:"ts"`
}

// K8sPodVolumeMetric represents pod volume information from Kubernetes
Expand Down Expand Up @@ -136,6 +142,14 @@ type SysfsStorageInfoProvider struct {
kubeClient kubepb.KubeAPIClient
nodeCache *freelru.SyncedLRU[string, *kubepb.Node]
wellKnownPathDeviceID map[string]uint64
kubeletConfig *KubeletConfig
}

// KubeletConfig represents the relevant parts of the kubelet configuration file
type KubeletConfig struct {
EvictionHard map[string]string `yaml:"evictionHard" json:"evictionHard"`
KubeReserved map[string]string `yaml:"kubeReserved" json:"kubeReserved"`
SystemReserved map[string]string `yaml:"systemReserved" json:"systemReserved"`
}

const (
Expand Down Expand Up @@ -185,6 +199,11 @@ func NewStorageInfoProvider(log *logging.Logger, kubeClient kubepb.KubeAPIClient
Warn("failed to stat crio path")
}

kubeletConfig, err := readKubeletConfig(hostPathRoot, log)
if err != nil {
log.With("error", err).Warn("failed to read kubelet config, allocatable calculation will use capacity only")
}

return &SysfsStorageInfoProvider{
storageState: &storageMetricsState{
blockDevices: make(map[string]*BlockDeviceMetric),
Expand All @@ -198,6 +217,7 @@ func NewStorageInfoProvider(log *logging.Logger, kubeClient kubepb.KubeAPIClient
kubeClient: kubeClient,
nodeCache: nodeCache,
wellKnownPathDeviceID: wellKnownPathDeviceID,
kubeletConfig: kubeletConfig,
}, nil
}

Expand Down Expand Up @@ -309,6 +329,13 @@ func (s *SysfsStorageInfoProvider) CollectNodeStatsSummary(ctx context.Context)
}
}

nodefsCapacity := s.getNodefsCapacity()
if nodefsCapacity > 0 {
allocatable := s.calculateAllocatableBytes(nodefsCapacity)
metric.NodeFsCapacityBytes = lo.ToPtr(nodefsCapacity)
metric.NodeFsAllocatableBytes = lo.ToPtr(allocatable)
}

return metric, nil
}

Expand Down Expand Up @@ -1061,3 +1088,157 @@ func buildFilesystemLabels(log *logging.Logger, fsMountPointDeviceID uint64, wel

return labels
}

var kubeletConfigPaths = []string{
"/var/lib/kubelet/config.yaml", // kubeadm, AKS
"/home/kubernetes/kubelet-config.yaml", // GKE
"/etc/kubernetes/kubelet/kubelet-config.json", // EKS
}

func readKubeletConfig(hostRootPath string, log *logging.Logger) (*KubeletConfig, error) {
for _, configPath := range kubeletConfigPaths {
fullPath := filepath.Join(hostRootPath, configPath)
data, err := os.ReadFile(fullPath)
if err != nil {
if !os.IsNotExist(err) {
log.With("path", fullPath).With("error", err).Debug("failed to read kubelet config")
}
continue
}

var config KubeletConfig
if strings.HasSuffix(configPath, ".json") {
err = json.Unmarshal(data, &config)
} else {
err = yaml.Unmarshal(data, &config)
}

if err != nil {
log.With("path", fullPath).With("error", err).Debug("failed to parse kubelet config")
continue
}

log.With("path", fullPath).Debug("loaded kubelet config")
return &config, nil
}

return nil, fmt.Errorf("kubelet config not found in any of: %v", kubeletConfigPaths)
}

var percentageRegex = regexp.MustCompile(`^(\d+(?:\.\d+)?)%$`)

// parseEvictionThreshold parses a Kubernetes eviction threshold value.
// It handles both percentage values (e.g., "10%") and absolute quantities (e.g., "1Gi", "500Mi").
func parseEvictionThreshold(threshold string, capacityBytes int64) (int64, error) {
if threshold == "" {
return 0, nil
}

if matches := percentageRegex.FindStringSubmatch(threshold); len(matches) == 2 {
percentage, err := strconv.ParseFloat(matches[1], 64)
if err != nil {
return 0, fmt.Errorf("parsing percentage %q: %w", matches[1], err)
}
return int64(float64(capacityBytes) * percentage / 100.0), nil
}

return parseQuantity(threshold)
}

// parseQuantity parses a Kubernetes quantity string (e.g., "1Gi", "500Mi", "1G", "500M", "1000")
func parseQuantity(s string) (int64, error) {
if s == "" {
return 0, nil
}

quantity, err := resource.ParseQuantity(s)
if err != nil {
return 0, err
}

return quantity.Value(), nil
}

// calculateAllocatableBytes calculates the allocatable storage bytes using the Kubernetes formula:
// allocatable = capacity - evictionReserve - kubeReserved - systemReserved
func (s *SysfsStorageInfoProvider) calculateAllocatableBytes(capacityBytes int64) int64 {
if s.kubeletConfig == nil {
s.log.With("capacityBytes", capacityBytes).Debug("calculateAllocatableBytes: kubeletConfig is nil, returning capacity")
return capacityBytes
}

var evictionReserve int64
if s.kubeletConfig.EvictionHard != nil {
threshold := s.kubeletConfig.EvictionHard["nodefs.available"]
var err error
evictionReserve, err = parseEvictionThreshold(threshold, capacityBytes)
if err != nil {
s.log.With("value", threshold).With("error", err).Warn("failed to parse eviction threshold")
}
}

var kubeReserved int64
if s.kubeletConfig.KubeReserved != nil {
value := s.kubeletConfig.KubeReserved["ephemeral-storage"]
var err error
kubeReserved, err = parseQuantity(value)
if err != nil {
s.log.With("value", value).With("error", err).Warn("failed to parse kubeReserved ephemeral-storage")
}
}

var systemReserved int64
if s.kubeletConfig.SystemReserved != nil {
value := s.kubeletConfig.SystemReserved["ephemeral-storage"]
var err error
systemReserved, err = parseQuantity(value)
if err != nil {
s.log.With("value", value).With("error", err).Warn("failed to parse systemReserved ephemeral-storage")
}
}

allocatable := capacityBytes - evictionReserve - kubeReserved - systemReserved
s.log.With("capacityBytes", capacityBytes).
With("evictionReserve", evictionReserve).
With("kubeReserved", kubeReserved).
With("systemReserved", systemReserved).
With("allocatable", allocatable).
Debug("calculateAllocatableBytes: computed allocatable")
if allocatable < 0 {
return 0
}
return allocatable
}

// getNodefsCapacity returns the total capacity across all well-known storage paths,
// deduplicating by device ID to avoid counting the same filesystem multiple times.
func (s *SysfsStorageInfoProvider) getNodefsCapacity() int64 {
paths := []string{kubeletPath, containerdPath}
seenDevices := make(map[uint64]bool)
var totalCapacity int64

for _, path := range paths {
devID, ok := s.wellKnownPathDeviceID[path]
if !ok {
continue
}
if seenDevices[devID] {
continue
}
seenDevices[devID] = true

fullPath := filepath.Join(s.hostRootPath, path)
sizeBytes, _, _, _, _, err := getFilesystemStats(fullPath)
if err != nil {
s.log.With("path", fullPath).With("error", err).
Debug("failed to get capacity for path")
continue
}
totalCapacity += sizeBytes
}

if totalCapacity == 0 {
s.log.Warn("no capacity found for any well-known storage path")
}
return totalCapacity
}
Loading