diff --git a/.data/.file b/.data/.file deleted file mode 100644 index 1917ccee65..0000000000 --- a/.data/.file +++ /dev/null @@ -1 +0,0 @@ -This directory contains runtime data only. This file serves as a Git placeholder in order to ensure its presence in development environments. diff --git a/Dockerfile b/Dockerfile index caff36914e..608e8bb715 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,10 +5,12 @@ ARG goversion FROM --platform=${BUILDPLATFORM} golang:${goversion}-alpine3.22 as base WORKDIR /spire RUN apk --no-cache --update add file bash clang lld pkgconfig git make -COPY go.* ./ +COPY spire/go.* ./ +COPY spire-plugin-sdk/ /spire-plugin-sdk/ +COPY spire-api-sdk/ /spire-api-sdk/ # https://go.dev/ref/mod#module-cache RUN --mount=type=cache,target=/go/pkg/mod go mod download -COPY . . +COPY spire/ . # xx is a helper for cross-compilation # when bumping to a new version analyze the new version for security issues @@ -31,7 +33,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \ make build-static git_tag=$TAG git_dirty="" && \ for f in $(find bin -executable -type f); do xx-verify --static $f; done -FROM --platform=${BUILDPLATFORM} scratch AS spire-base +FROM --platform=${BUILDPLATFORM} golang:${goversion}-alpine3.22 AS spire-base COPY --link --from=builder --chown=root:root --chmod=755 /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ WORKDIR /opt/spire diff --git a/Dockerfile.windows b/Dockerfile.windows index a0510926ea..a40d75a459 100644 --- a/Dockerfile.windows +++ b/Dockerfile.windows @@ -10,14 +10,14 @@ CMD [] # SPIRE Server FROM spire-base-windows AS spire-server-windows ENTRYPOINT ["c:/spire/bin/spire-server.exe", "run"] -COPY bin/spire-server.exe C:/spire/bin/spire-server.exe +COPY spire/bin/spire-server.exe C:/spire/bin/spire-server.exe # SPIRE Agent FROM spire-base-windows AS spire-agent-windows ENTRYPOINT ["c:/spire/bin/spire-agent.exe", "run"] -COPY ./bin/spire-agent.exe C:/spire/bin/spire-agent.exe +COPY spire/bin/spire-agent.exe C:/spire/bin/spire-agent.exe # OIDC Discovery Provider FROM spire-base-windows AS oidc-discovery-provider-windows ENTRYPOINT ["c:/spire/bin/oidc-discovery-provider.exe"] -COPY ./bin/oidc-discovery-provider.exe c:/spire/bin/oidc-discovery-provider.exe +COPY spire/bin/oidc-discovery-provider.exe c:/spire/bin/oidc-discovery-provider.exe diff --git a/Makefile b/Makefile index f9ca7f8311..ce3ea239fa 100644 --- a/Makefile +++ b/Makefile @@ -184,7 +184,7 @@ git_dirty := $(shell git status -s) protos := \ proto/private/server/journal/journal.proto \ - proto/spire/common/common.proto \ + proto/spire/common/common.proto api-protos := \ @@ -342,7 +342,7 @@ $1: $3 container-builder --target $2 \ -o type=oci,dest=$2-image.tar \ -f $3 \ - . + .. endef @@ -373,7 +373,7 @@ $1: $3 --target $2 \ -t $2 -t $2:latest-local \ -f $3 \ - . + .. endef @@ -477,7 +477,7 @@ endif .PHONY: dev-shell dev-image dev-image: - $(E)docker build -t spire-dev -f Dockerfile.dev . + $(E)docker build -t spire-dev -f Dockerfile.dev .. dev-shell: | go-check $(E)docker run --rm -v "$(call goenv,GOCACHE)":/root/.cache/go-build -v "$(DIR):/spire" -v "$(call goenv,GOPATH)/pkg/mod":/root/go/pkg/mod -it -h spire-dev spire-dev diff --git a/cmd/spire-agent/cli/run/run.go b/cmd/spire-agent/cli/run/run.go index c5a334d7a9..541c4dd460 100644 --- a/cmd/spire-agent/cli/run/run.go +++ b/cmd/spire-agent/cli/run/run.go @@ -67,6 +67,7 @@ type Config struct { type agentConfig struct { DataDir string `hcl:"data_dir"` AdminSocketPath string `hcl:"admin_socket_path"` + BrokerSocketPath string `hcl:"broker_socket_path"` InsecureBootstrap bool `hcl:"insecure_bootstrap"` RebootstrapMode string `hcl:"rebootstrap_mode"` RebootstrapDelay string `hcl:"rebootstrap_delay"` @@ -509,6 +510,15 @@ func NewAgentConfig(c *Config, logOptions []log.Option, allowUnknownConfig bool) } ac.AdminBindAddress = adminAddr } + + if c.Agent.hasBrokerAddr() { + brokerAddr, err := c.Agent.getBrokerAddr() + if err != nil { + return nil, err + } + ac.BrokerBindAddress = brokerAddr + } + // Handle join token - read from file if specified if c.Agent.JoinTokenFile != "" { tokenBytes, err := os.ReadFile(c.Agent.JoinTokenFile) diff --git a/cmd/spire-agent/cli/run/run_posix.go b/cmd/spire-agent/cli/run/run_posix.go index 4c8ddd9d03..5bb8cde0e1 100644 --- a/cmd/spire-agent/cli/run/run_posix.go +++ b/cmd/spire-agent/cli/run/run_posix.go @@ -53,6 +53,30 @@ func (c *agentConfig) hasAdminAddr() bool { return c.AdminSocketPath != "" } +func (c *agentConfig) getBrokerAddr() (net.Addr, error) { + socketPathAbs, err := filepath.Abs(c.SocketPath) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path for socket_path: %w", err) + } + brokerSocketPathAbs, err := filepath.Abs(c.BrokerSocketPath) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path for broker_socket_path: %w", err) + } + + if strings.HasPrefix(brokerSocketPathAbs, filepath.Dir(socketPathAbs)+"/") { + return nil, errors.New("broker socket cannot be in the same directory or a subdirectory as that containing the Workload API socket") + } + + return &net.UnixAddr{ + Name: brokerSocketPathAbs, + Net: "unix", + }, nil +} + +func (c *agentConfig) hasBrokerAddr() bool { + return c.BrokerSocketPath != "" +} + // validateOS performs posix specific validations of the agent config func (c *agentConfig) validateOS() error { if c.Experimental.NamedPipeName != "" { @@ -88,5 +112,16 @@ func prepareEndpoints(c *agent.Config) error { } } + if c.BrokerBindAddress != nil { + // Create uds dir and parents if not exists + brokerDir := filepath.Dir(c.BrokerBindAddress.String()) + if _, statErr := os.Stat(brokerDir); os.IsNotExist(statErr) { + c.Log.WithField("dir", brokerDir).Infof("Creating broker UDS directory") + if err := os.MkdirAll(brokerDir, 0755); err != nil { + return err + } + } + } + return nil } diff --git a/cmd/spire-agent/cli/run/run_windows.go b/cmd/spire-agent/cli/run/run_windows.go index 015bbc3420..6ffcac1031 100644 --- a/cmd/spire-agent/cli/run/run_windows.go +++ b/cmd/spire-agent/cli/run/run_windows.go @@ -32,6 +32,14 @@ func (c *agentConfig) hasAdminAddr() bool { return c.Experimental.AdminNamedPipeName != "" } +func (c *agentConfig) getBrokerAddr() (net.Addr, error) { + return nil, errors.New("broker_socket_path is not supported in this platform") +} + +func (c *agentConfig) hasBrokerAddr() bool { + return false +} + // validateOS performs windows specific validations of the agent config func (c *agentConfig) validateOS() error { if c.SocketPath != "" { diff --git a/conf/agent/agent.conf b/conf/agent/agent.conf index cf1fcb353a..b10788ab03 100644 --- a/conf/agent/agent.conf +++ b/conf/agent/agent.conf @@ -6,6 +6,11 @@ agent { socket_path ="/tmp/spire-agent/public/api.sock" trust_bundle_path = "./conf/agent/dummy_root_ca.crt" trust_domain = "example.org" + + authorized_delegates = [ + "spiffe://example.org/broker", + ] + broker_socket_path = "/tmp/spire-agent/broker/api.sock" } plugins { diff --git a/conf/agent/agent_full.conf b/conf/agent/agent_full.conf index b7e0d3f52c..41c524611e 100644 --- a/conf/agent/agent_full.conf +++ b/conf/agent/agent_full.conf @@ -86,6 +86,9 @@ agent { # "spiffe://example.org/authorized_client1", # ] + # broker_socket_path: Location to bind the SPIFFE Broker Endpoint. + # broker_socket_path = "" + # sds: Optional SDS configuration section. # sds = { # # default_svid_name: The TLS Certificate resource name to use for the default diff --git a/go.mod b/go.mod index 7c2475a170..5ac294940d 100644 --- a/go.mod +++ b/go.mod @@ -329,3 +329,6 @@ require ( sigs.k8s.io/structured-merge-diff/v6 v6.3.2-0.20260122202528-d9cc6641c482 // indirect sigs.k8s.io/yaml v1.6.0 // indirect ) + +replace github.com/spiffe/spire-plugin-sdk => ../spire-plugin-sdk +replace github.com/spiffe/spire-api-sdk => ../spire-api-sdk diff --git a/go.sum b/go.sum index 79a63afbc8..0086f1448b 100644 --- a/go.sum +++ b/go.sum @@ -810,8 +810,6 @@ github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMps github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs= github.com/spiffe/spire-api-sdk v1.2.5-0.20260115194754-bcd1999bdd05 h1:wJPyPSXHtKEc9SuK83sr40OoRSaYUTf1+wncvWvogHE= github.com/spiffe/spire-api-sdk v1.2.5-0.20260115194754-bcd1999bdd05/go.mod h1:9hXJcMzatM1KwAtBDO3s6HccDCic++/5c2yOc5Iln8Y= -github.com/spiffe/spire-plugin-sdk v1.4.4-0.20251120194148-791bbd274dc7 h1:OAvr7TNirmBpXnAp82cTosuB+JAus5cyFCRqXHE0WHs= -github.com/spiffe/spire-plugin-sdk v1.4.4-0.20251120194148-791bbd274dc7/go.mod h1:QvrRDiBlXiJ7kNd176ZHsF5eklxxeTRgJSu2CXe0MKw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index ceda18da67..b04c8c60c6 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -17,6 +17,7 @@ import ( admin_api "github.com/spiffe/spire/pkg/agent/api" node_attestor "github.com/spiffe/spire/pkg/agent/attestor/node" workload_attestor "github.com/spiffe/spire/pkg/agent/attestor/workload" + "github.com/spiffe/spire/pkg/agent/broker" "github.com/spiffe/spire/pkg/agent/catalog" "github.com/spiffe/spire/pkg/agent/endpoints" "github.com/spiffe/spire/pkg/agent/manager" @@ -255,6 +256,24 @@ func (a *Agent) Run(ctx context.Context) error { tasks = append(tasks, adminEndpoints.ListenAndServe) } + if a.c.BrokerBindAddress != nil { + brokerEndpoints, err := broker.New(&broker.Config{ + BindAddr: a.c.BrokerBindAddress, + Manager: manager, + Log: a.c.Log, + Metrics: metrics, + Attestor: workloadAttestor, + TrustDomain: a.c.TrustDomain, + AuthorizedDelegates: a.c.AuthorizedDelegates, + SVIDSource: as, + BundleSource: manager.GetX509Bundle(), + }) + if err != nil { + return fmt.Errorf("failed to create broker endpoints: %w", err) + } + tasks = append(tasks, brokerEndpoints.ListenAndServe) + } + if a.c.LogReopener != nil { tasks = append(tasks, a.c.LogReopener) } diff --git a/pkg/agent/api/delegatedidentity/v1/service_test.go b/pkg/agent/api/delegatedidentity/v1/service_test.go index 6cdc659ab0..c5d1dae147 100644 --- a/pkg/agent/api/delegatedidentity/v1/service_test.go +++ b/pkg/agent/api/delegatedidentity/v1/service_test.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/anypb" ) var ( @@ -870,6 +871,11 @@ func (fa FakeWorkloadPIDAttestor) Attest(_ context.Context, _ int) ([]*common.Se return fa.selectors, fa.err } +func (fa FakeWorkloadPIDAttestor) AttestReference(_ context.Context, _ *anypb.Any) ([]*common.Selector, error) { + // TODO(arndt) do we want more logic here? + return fa.selectors, fa.err +} + type FakeManager struct { manager.Manager diff --git a/pkg/agent/attestor/node/node.go b/pkg/agent/attestor/node/node.go index ce4559ba28..a4a3d5c2a5 100644 --- a/pkg/agent/attestor/node/node.go +++ b/pkg/agent/attestor/node/node.go @@ -36,13 +36,6 @@ const ( roundRobinServiceConfig = `{ "loadBalancingConfig": [ { "round_robin": {} } ] }` ) -type AttestationResult struct { - SVID []*x509.Certificate - Key keymanager.Key - Bundle *spiffebundle.Bundle - Reattestable bool -} - type Attestor interface { Attest(ctx context.Context) (*AttestationResult, error) } diff --git a/pkg/agent/attestor/node/result.go b/pkg/agent/attestor/node/result.go new file mode 100644 index 0000000000..8001af040b --- /dev/null +++ b/pkg/agent/attestor/node/result.go @@ -0,0 +1,40 @@ +package attestor + +import ( + "crypto/x509" + "fmt" + + "github.com/spiffe/go-spiffe/v2/bundle/spiffebundle" + "github.com/spiffe/go-spiffe/v2/bundle/x509bundle" + "github.com/spiffe/go-spiffe/v2/spiffeid" + "github.com/spiffe/go-spiffe/v2/svid/x509svid" + "github.com/spiffe/spire/pkg/agent/plugin/keymanager" +) + +// Allow AttestationResult to be used as go-spiffe SVID and bundle sources. +// TODO(arndt): Check whether the key of the agent is ok to be exposed to other parties. +var ( + _ x509svid.Source = (*AttestationResult)(nil) + _ x509bundle.Source = (*AttestationResult)(nil) +) + +type AttestationResult struct { + SVID []*x509.Certificate + Key keymanager.Key + Bundle *spiffebundle.Bundle + Reattestable bool +} + +func (ar *AttestationResult) GetX509SVID() (*x509svid.SVID, error) { + return &x509svid.SVID{ + Certificates: ar.SVID, + PrivateKey: ar.Key, + }, nil +} + +func (ar *AttestationResult) GetX509BundleForTrustDomain(trustDomain spiffeid.TrustDomain) (*x509bundle.Bundle, error) { + if ar.Bundle.TrustDomain() != trustDomain { + return nil, fmt.Errorf("bundle for trust domain %q not found", trustDomain) + } + return ar.Bundle.X509Bundle(), nil +} diff --git a/pkg/agent/attestor/workload/workload.go b/pkg/agent/attestor/workload/workload.go index 1ff65b42c5..222b84b722 100644 --- a/pkg/agent/attestor/workload/workload.go +++ b/pkg/agent/attestor/workload/workload.go @@ -3,7 +3,6 @@ package attestor import ( "context" "fmt" - "os" "github.com/sirupsen/logrus" "github.com/spiffe/spire/pkg/agent/catalog" @@ -11,6 +10,7 @@ import ( "github.com/spiffe/spire/pkg/common/telemetry" telemetry_workload "github.com/spiffe/spire/pkg/common/telemetry/agent/workloadapi" "github.com/spiffe/spire/proto/spire/common" + "google.golang.org/protobuf/types/known/anypb" ) type attestor struct { @@ -19,6 +19,7 @@ type attestor struct { type Attestor interface { Attest(ctx context.Context, pid int) ([]*common.Selector, error) + AttestReference(ctx context.Context, reference *anypb.Any) ([]*common.Selector, error) } func New(config *Config) Attestor { @@ -45,18 +46,50 @@ type Config struct { // Attest invokes all workload attestor plugins against the provided PID. If an error // is encountered, it is logged and selectors from the failing plugin are discarded. func (wla *attestor) Attest(ctx context.Context, pid int) ([]*common.Selector, error) { + log := wla.c.Log.WithField(telemetry.PID, pid) + + return wla.attest(ctx, func(a workloadattestor.WorkloadAttestor) ([]*common.Selector, error) { + var err error + counter := telemetry_workload.StartAttestorCall(wla.c.Metrics, a.Name()) + defer counter.Done(&err) + + selectors, err := a.Attest(ctx, pid) + if err != nil { + log.WithError(err).Errorf("workload attestor %q failed", a.Name()) + return nil, fmt.Errorf("workload attestor %q failed: %w", a.Name(), err) + } + return selectors, nil + }) +} + +func (wla *attestor) AttestReference(ctx context.Context, reference *anypb.Any) ([]*common.Selector, error) { + // TODO(arndt) add references to log context + log := wla.c.Log + return wla.attest(ctx, func(a workloadattestor.WorkloadAttestor) ([]*common.Selector, error) { + var err error + counter := telemetry_workload.StartAttestorCall(wla.c.Metrics, a.Name()) + defer counter.Done(&err) + + selectors, err := a.AttestReference(ctx, reference) + if err != nil { + log.WithError(err).Errorf("workload attestor %q failed", a.Name()) + return nil, fmt.Errorf("workload attestor %q failed: %w", a.Name(), err) + } + return selectors, nil + }) +} + +func (wla *attestor) attest(ctx context.Context, attestFunc func(attestor workloadattestor.WorkloadAttestor) ([]*common.Selector, error)) ([]*common.Selector, error) { counter := telemetry_workload.StartAttestationCall(wla.c.Metrics) defer counter.Done(nil) - log := wla.c.Log.WithField(telemetry.PID, pid) - plugins := wla.c.Catalog.GetWorkloadAttestors() sChan := make(chan []*common.Selector) errChan := make(chan error) for _, p := range plugins { go func(p workloadattestor.WorkloadAttestor) { - if selectors, err := wla.invokeAttestor(ctx, p, pid); err == nil { + if selectors, err := attestFunc(p); err == nil { sChan <- selectors } else { errChan <- err @@ -72,34 +105,22 @@ func (wla *attestor) Attest(ctx context.Context, pid int) ([]*common.Selector, e selectors = append(selectors, s...) wla.c.selectorHook(selectors) case err := <-errChan: - log.WithError(err).Error("Failed to collect all selectors for PID") + wla.c.Log.WithError(err).Error("Failed to collect all selectors") case <-ctx.Done(): // If the client times out before all workload attestation plugins have reported selectors or an error, // it can be helpful to see the partial set of selectors discovered for debugging purposes. - log.WithField(telemetry.PartialSelectors, selectors).Error("Timed out collecting selectors for PID") + wla.c.Log.WithField(telemetry.PartialSelectors, selectors).Error("Timed out collecting selectors") return nil, ctx.Err() } } telemetry_workload.AddDiscoveredSelectorsSample(wla.c.Metrics, float32(len(selectors))) - // The agent health check currently exercises the Workload API. Since this - // can happen with some frequency, it has a tendency to fill up logs with - // hard-to-filter details if we're not careful (e.g. issue #1537). Only log - // if it is not the agent itself. - if pid != os.Getpid() { - log.WithField(telemetry.Selectors, selectors).Debug("PID attested to have selectors") - } - return selectors, nil -} - -// invokeAttestor invokes attestation against the supplied plugin. Should be called from a goroutine. -func (wla *attestor) invokeAttestor(ctx context.Context, a workloadattestor.WorkloadAttestor, pid int) (_ []*common.Selector, err error) { - counter := telemetry_workload.StartAttestorCall(wla.c.Metrics, a.Name()) - defer counter.Done(&err) - - selectors, err := a.Attest(ctx, pid) - if err != nil { - return nil, fmt.Errorf("workload attestor %q failed: %w", a.Name(), err) - } + // // The agent health check currently exercises the Workload API. Since this + // // can happen with some frequency, it has a tendency to fill up logs with + // // hard-to-filter details if we're not careful (e.g. issue #1537). Only log + // // if it is not the agent itself. + // if pid != os.Getpid() { + // log.WithField(telemetry.Selectors, selectors).Debug("PID attested to have selectors") + // } return selectors, nil } diff --git a/pkg/agent/broker/api/service.go b/pkg/agent/broker/api/service.go new file mode 100644 index 0000000000..6f6c3dfd90 --- /dev/null +++ b/pkg/agent/broker/api/service.go @@ -0,0 +1,358 @@ +package api + +import ( + "context" + "crypto/x509" + "errors" + "fmt" + "time" + + "github.com/sirupsen/logrus" + "github.com/spiffe/go-spiffe/v2/spiffegrpc/grpccredentials" + "github.com/spiffe/go-spiffe/v2/spiffeid" + "github.com/spiffe/spire-api-sdk/proto/spiffe/broker" + "github.com/spiffe/spire/pkg/agent/api/rpccontext" + workloadattestor "github.com/spiffe/spire/pkg/agent/attestor/workload" + "github.com/spiffe/spire/pkg/agent/client" + "github.com/spiffe/spire/pkg/agent/manager" + "github.com/spiffe/spire/pkg/agent/manager/cache" + "github.com/spiffe/spire/pkg/common/bundleutil" + "github.com/spiffe/spire/pkg/common/idutil" + "github.com/spiffe/spire/pkg/common/telemetry" + "github.com/spiffe/spire/pkg/common/telemetry/agent/adminapi" + "github.com/spiffe/spire/pkg/common/x509util" + "github.com/spiffe/spire/proto/spire/common" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// RegisterService registers the delegated identity service on the provided server +func RegisterService(s *grpc.Server, service *Service) { + broker.RegisterAPIServer(s, service) +} + +type Config struct { + Log logrus.FieldLogger + Metrics telemetry.Metrics + Manager manager.Manager + Attestor workloadattestor.Attestor +} + +func New(config Config) *Service { + return &Service{ + manager: config.Manager, + peerAttestor: config.Attestor, + metrics: config.Metrics, + } +} + +// Service implements the delegated identity server +type Service struct { + broker.UnimplementedAPIServer + + manager manager.Manager + peerAttestor workloadattestor.Attestor + metrics telemetry.Metrics +} + +func (s *Service) getCallerContext(ctx context.Context) (spiffeid.ID, error) { + peer, ok := grpccredentials.PeerIDFromContext(ctx) + if !ok { + return spiffeid.ID{}, status.Error(codes.Unauthenticated, "unable to determine caller identity") + } + return peer, nil +} + +func (s *Service) SubscribeToX509SVID(req *broker.SubscribeToX509SVIDRequest, stream broker.API_SubscribeToX509SVIDServer) error { + latency := adminapi.StartFirstX509SVIDUpdateLatency(s.metrics) + ctx := stream.Context() + log := rpccontext.Logger(ctx) + var receivedFirstUpdate bool + + // peer, err := s.getCallerContext(ctx) + // if err != nil { + // return err + // } + // log = log.WithField("broker_peer", peer.String()) + + selectors, err := s.constructValidSelectorsFromReference(ctx, log, req.Reference) + if err != nil { + return err + } + + log.WithFields(logrus.Fields{ + "request_selectors": selectors, + }).Debug("Subscribing to cache changes") + + subscriber, err := s.manager.SubscribeToCacheChanges(ctx, selectors) + if err != nil { + log.WithError(err).Error("Subscribe to cache changes failed") + return err + } + defer subscriber.Finish() + + for { + select { + case update := <-subscriber.Updates(): + if len(update.Identities) > 0 && !receivedFirstUpdate { + // emit latency metric for first update containing an SVID. + latency.Measure() + receivedFirstUpdate = true + } + + if err := sendX509SVIDResponse(update, stream, log); err != nil { + return err + } + case <-ctx.Done(): + return nil + } + } +} + +func (s *Service) SubscribeToX509Bundles(_ *broker.SubscribeToX509BundlesRequest, stream broker.API_SubscribeToX509BundlesServer) error { + ctx := stream.Context() + + peer, err := s.getCallerContext(ctx) + if err != nil { + return err + } + _ = rpccontext.Logger(ctx).WithField("broker_peer", peer.String()) + + subscriber := s.manager.SubscribeToBundleChanges() + + // send initial update.... + caCerts := make(map[string][]byte) + for td, bundle := range subscriber.Value() { + caCerts[td.IDString()] = marshalBundle(bundle.X509Authorities()) + } + + resp := &broker.SubscribeToX509BundlesResponse{ + Bundles: caCerts, + } + + if err := stream.Send(resp); err != nil { + return err + } + + for { + select { + case <-subscriber.Changes(): + for td, bundle := range subscriber.Next() { + caCerts[td.IDString()] = marshalBundle(bundle.X509Authorities()) + } + + resp := &broker.SubscribeToX509BundlesResponse{ + Bundles: caCerts, + } + + if err := stream.Send(resp); err != nil { + return err + } + + case <-ctx.Done(): + return nil + } + } +} + +func (s *Service) FetchJWTSVID(ctx context.Context, req *broker.FetchJWTSVIDRequest) (*broker.FetchJWTSVIDResponse, error) { + log := rpccontext.Logger(ctx) + if len(req.Audience) == 0 { + log.Error("Missing required audience parameter") + return nil, status.Error(codes.InvalidArgument, "audience must be specified") + } + + peer, err := s.getCallerContext(ctx) + if err != nil { + return nil, err + } + log = log.WithField("broker_peer", peer.String()) + + selectors, err := s.constructValidSelectorsFromReference(ctx, log, req.Reference) + if err != nil { + return nil, err + } + + resp := new(broker.FetchJWTSVIDResponse) + entries := s.manager.MatchingRegistrationEntries(selectors) + for _, entry := range entries { + spiffeID, err := spiffeid.FromString(entry.SpiffeId) + if err != nil { + log.WithField(telemetry.SPIFFEID, entry.SpiffeId).WithError(err).Error("Invalid requested SPIFFE ID") + return nil, status.Errorf(codes.InvalidArgument, "invalid requested SPIFFE ID: %v", err) + } + + loopLog := log.WithField(telemetry.SPIFFEID, spiffeID.String()) + + var svid *client.JWTSVID + svid, err = s.manager.FetchJWTSVID(ctx, entry, req.Audience) + if err != nil { + loopLog.WithError(err).Error("Could not fetch JWT-SVID") + return nil, status.Errorf(codes.Unavailable, "could not fetch JWT-SVID: %v", err) + } + resp.Svids = append(resp.Svids, &broker.JWTSVID{ + SpiffeId: spiffeID.String(), + Hint: entry.Hint, + Svid: svid.Token, + }) + + ttl := time.Until(svid.ExpiresAt) + loopLog.WithField(telemetry.TTL, ttl.Seconds()).Debug("Fetched JWT SVID") + } + + if len(resp.Svids) == 0 { + log.Error("No identity issued") + return nil, status.Error(codes.PermissionDenied, "no identity issued") + } + + return resp, nil +} + +func (s *Service) SubscribeToJWTBundles(_ *broker.SubscribeToJWTBundlesRequest, stream broker.API_SubscribeToJWTBundlesServer) error { + ctx := stream.Context() + + peer, err := s.getCallerContext(ctx) + if err != nil { + return err + } + _ = rpccontext.Logger(ctx).WithField("broker_peer", peer.String()) + + subscriber := s.manager.SubscribeToBundleChanges() + + // send initial update.... + jwtbundles := make(map[string][]byte) + for td, bundle := range subscriber.Value() { + jwksBytes, err := bundleutil.Marshal(bundle, bundleutil.NoX509SVIDKeys(), bundleutil.StandardJWKS()) + if err != nil { + return err + } + jwtbundles[td.IDString()] = jwksBytes + } + + resp := &broker.SubscribeToJWTBundlesResponse{ + Bundles: jwtbundles, + } + + if err := stream.Send(resp); err != nil { + return err + } + for { + select { + case <-subscriber.Changes(): + for td, bundle := range subscriber.Next() { + jwksBytes, err := bundleutil.Marshal(bundle, bundleutil.NoX509SVIDKeys(), bundleutil.StandardJWKS()) + if err != nil { + return err + } + jwtbundles[td.IDString()] = jwksBytes + } + + resp := &broker.SubscribeToJWTBundlesResponse{ + Bundles: jwtbundles, + } + if err := stream.Send(resp); err != nil { + return err + } + case <-ctx.Done(): + return nil + } + } +} + +func (s *Service) constructValidSelectorsFromReference(ctx context.Context, log logrus.FieldLogger, ref *broker.WorkloadReference) ([]*common.Selector, error) { + if ref == nil { + log.Error("No workload reference provided") + return nil, status.Error(codes.InvalidArgument, "workload reference must be provided") + } + + selectors, err := s.peerAttestor.AttestReference(ctx, ref.Reference) + if err != nil { + log.WithError(err).Error("Workload attestation failed") + return nil, status.Errorf(codes.Unauthenticated, "workload attestation failed: %v", err) + } + + return selectors, nil +} + +func sendX509SVIDResponse(update *cache.WorkloadUpdate, stream broker.API_SubscribeToX509SVIDServer, log logrus.FieldLogger) (err error) { + resp, err := composeX509SVIDBySelectors(update) + if err != nil { + log.WithError(err).Error("Could not serialize X.509 SVID response") + return status.Error(codes.Internal, "could not serialize response") + } + + if err := stream.Send(resp); err != nil { + log.WithError(err).Error("Failed to send X.509 SVID response") + return err + } + + log = log.WithField(telemetry.Count, len(resp.Svids)) + + // log details on each SVID + // a response has already been sent so nothing is + // blocked on this logic + for i, svid := range resp.Svids { + // Ideally ID Proto parsing should succeed, but if it fails, + // ignore the error and still log with empty spiffe_id. + ttl := time.Until(update.Identities[i].SVID[0].NotAfter) + log.WithFields(logrus.Fields{ + telemetry.SPIFFEID: svid.SpiffeId, + telemetry.TTL: ttl.Seconds(), + }).Debug("Fetched X.509 SVID for broker") + } + + return nil +} + +func composeX509SVIDBySelectors(update *cache.WorkloadUpdate) (*broker.SubscribeToX509SVIDResponse, error) { + resp := new(broker.SubscribeToX509SVIDResponse) + resp.Svids = make([]*broker.X509SVID, 0, len(update.Identities)) + resp.FederatedBundles = make(map[string][]byte, len(update.FederatedBundles)) + + x509Bundle := marshalBundle(update.Bundle.X509Authorities()) + for _, identity := range update.Identities { + // Do not send admin nor downstream SVIDs to the caller + if identity.Entry.Admin || identity.Entry.Downstream { + continue + } + + // check if SVIDs exist for the identity + if len(identity.SVID) == 0 { + return nil, errors.New("unable to get SVID from identity") + } + + id, err := idutil.IDProtoFromString(identity.Entry.SpiffeId) + if err != nil { + return nil, fmt.Errorf("error during SPIFFE ID parsing: %w", err) + } + + keyData, err := x509.MarshalPKCS8PrivateKey(identity.PrivateKey) + if err != nil { + return nil, fmt.Errorf("marshal key for %v: %w", id, err) + } + + svid := &broker.X509SVID{ + SpiffeId: id.String(), + X509Svid: x509util.DERFromCertificates(identity.SVID), + Bundle: x509Bundle, + Hint: identity.Entry.Hint, + X509SvidKey: keyData, + } + resp.Svids = append(resp.Svids, svid) + } + + for td, bundle := range update.FederatedBundles { + resp.FederatedBundles[td.IDString()] = marshalBundle(bundle.X509Authorities()) + } + + return resp, nil +} + +func marshalBundle(certs []*x509.Certificate) []byte { + bundle := []byte{} + for _, c := range certs { + bundle = append(bundle, c.Raw...) + } + return bundle +} diff --git a/pkg/agent/broker/endpoints.go b/pkg/agent/broker/endpoints.go new file mode 100644 index 0000000000..225c994195 --- /dev/null +++ b/pkg/agent/broker/endpoints.go @@ -0,0 +1,165 @@ +package broker + +import ( + "context" + "fmt" + "net" + + "github.com/sirupsen/logrus" + "github.com/spiffe/go-spiffe/v2/spiffeid" + "github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig" + attestor "github.com/spiffe/spire/pkg/agent/attestor/workload" + brokerapi "github.com/spiffe/spire/pkg/agent/broker/api" + "github.com/spiffe/spire/pkg/agent/endpoints" + "github.com/spiffe/spire/pkg/agent/manager" + "github.com/spiffe/spire/pkg/common/api/middleware" + "github.com/spiffe/spire/pkg/common/telemetry" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/reflection" + + "github.com/spiffe/go-spiffe/v2/bundle/x509bundle" + "github.com/spiffe/go-spiffe/v2/svid/x509svid" +) + +type Config struct { + BindAddr net.Addr + + Manager manager.Manager + + Log logrus.FieldLogger + + Metrics telemetry.Metrics + + Attestor attestor.Attestor + + TrustDomain spiffeid.TrustDomain + + AuthorizedDelegates []string + + SVIDSource x509svid.Source + BundleSource x509bundle.Source +} + +type Endpoints struct { + c *Config +} + +func New(c *Config) (*Endpoints, error) { + switch { + case c.BindAddr == nil: + return nil, fmt.Errorf("BindAddr is required") + case c.Manager == nil: + return nil, fmt.Errorf("Manager is required") + case c.Log == nil: + return nil, fmt.Errorf("Log is required") + case c.Metrics == nil: + return nil, fmt.Errorf("Metrics is required") + case c.Attestor == nil: + return nil, fmt.Errorf("Attestor is required") + case c.TrustDomain == spiffeid.TrustDomain{}: + return nil, fmt.Errorf("TrustDomain is required") + case c.SVIDSource == nil: + return nil, fmt.Errorf("SVIDSource is required") + case c.BundleSource == nil: + return nil, fmt.Errorf("BundleSource is required") + } + return &Endpoints{ + c: c, + }, nil +} + +func (e *Endpoints) ListenAndServe(ctx context.Context) error { + unaryInterceptor, streamInterceptor := middleware.Interceptors( + endpoints.Middleware(e.c.Log, e.c.Metrics), + ) + + // TODO(arndt): Delegated Identity API allows to be served without any authorized peer. + // I think it's better to fail as it's a misconfiguration and having that socket up + // without any authorized peer is just a potential security risk. + if len(e.c.AuthorizedDelegates) == 0 { + return fmt.Errorf("at least one authorized delegate is required") + } + + authorizedDelegates, err := delegatesFromStrings(e.c.AuthorizedDelegates, e.c.TrustDomain) + if err != nil { + return fmt.Errorf("failed to parse authorized delegates: %w", err) + } + + // In comparison to the admin endpoints, this API is secured by mutual TLS using X.509 SVIDs. + // Clients of this API are expected to use the Workload API to obtain their SVIDs first. + // This is to accommodate environments where this API is served over network. + tlsConfig := tlsconfig.MTLSServerConfig(e.c.SVIDSource, e.c.BundleSource, tlsconfig.AuthorizeOneOf(authorizedDelegates...)) + server := grpc.NewServer( + grpc.Creds(credentials.NewTLS(tlsConfig)), + grpc.UnaryInterceptor(unaryInterceptor), + grpc.StreamInterceptor(streamInterceptor), + ) + + e.registerBrokerAPI(server) + reflection.Register(server) + + var l net.Listener + switch e.c.BindAddr.Network() { + case "unix": + l, err = createUDSListener(e.c.BindAddr) + if err != nil { + return err + } + defer l.Close() + default: + return fmt.Errorf("unsupported network type %q for broker endpoint", e.c.BindAddr.Network()) + } + + log := e.c.Log.WithFields(logrus.Fields{ + telemetry.Network: l.Addr().Network(), + telemetry.Address: l.Addr().String()}) + log.Info("Starting SPIFFE Broker Endpoint") + + errChan := make(chan error) + go func() { errChan <- server.Serve(l) }() + + select { + case err = <-errChan: + log.WithError(err).Error("SPIFFE Broker Endpoint stopped prematurely") + return err + case <-ctx.Done(): + log.Info("Stopping SPIFFE Broker Endpoint") + server.Stop() + <-errChan + log.Info("SPIFFE Broker Endpoint has stopped") + return nil + } +} + +func (e *Endpoints) registerBrokerAPI(server *grpc.Server) { + service := brokerapi.New(brokerapi.Config{ + Manager: e.c.Manager, + Attestor: e.c.Attestor, + Metrics: e.c.Metrics, + Log: e.c.Log.WithField(telemetry.SubsystemName, telemetry.BrokerAPI), + }) + + brokerapi.RegisterService(server, service) +} + +func delegatesFromStrings(delegates []string, trustDomain spiffeid.TrustDomain) ([]spiffeid.ID, error) { + var ids []spiffeid.ID + for _, d := range delegates { + id, err := spiffeid.FromString(d) + if err != nil { + return nil, err + } + // TODO(arndt): This also differs from the admin API where it's possible to define delegates + // from other trust domains. Here we enforce that delegates must be in the same trust domain + // as the agent. + // I suppose technically it's not possible on the admin API to encounter this, but considering that + // the SPIFFE Broker Endpoint may be offered over the network in the future I think it's better to + // enforce this here. + if id.TrustDomain() != trustDomain { + return nil, fmt.Errorf("delegate %q is not in trust domain %q", d, trustDomain.Name()) + } + ids = append(ids, id) + } + return ids, nil +} diff --git a/pkg/agent/broker/endpoints_fallback.go b/pkg/agent/broker/endpoints_fallback.go new file mode 100644 index 0000000000..cf4fc353d8 --- /dev/null +++ b/pkg/agent/broker/endpoints_fallback.go @@ -0,0 +1,12 @@ +//go:build windows + +package broker + +import ( + "fmt" + "net" +) + +func createUDSListener(_ net.Addr) (net.Listener, error) { + return nil, fmt.Errorf("unsupported platform for broker API") +} diff --git a/pkg/agent/broker/endpoints_posix.go b/pkg/agent/broker/endpoints_posix.go new file mode 100644 index 0000000000..ef0f359c00 --- /dev/null +++ b/pkg/agent/broker/endpoints_posix.go @@ -0,0 +1,29 @@ +//go:build !windows + +package broker + +import ( + "fmt" + "net" + "os" + + "github.com/spiffe/spire/pkg/common/util" +) + +func createUDSListener(bindAddr net.Addr) (net.Listener, error) { + if bindAddr.Network() != "unix" { + return nil, fmt.Errorf("unsupported network type %q for UDS listener", bindAddr.Network()) + } + + // Remove uds if already exists + os.Remove(bindAddr.String()) + + l, err := net.ListenUnix(bindAddr.Network(), util.GetUnixAddr(bindAddr.String())) + if err != nil { + return nil, fmt.Errorf("error creating UDS listener: %w", err) + } + if err := os.Chmod(bindAddr.String(), 0770); err != nil { + return nil, fmt.Errorf("unable to change UDS permissions: %w", err) + } + return l, nil +} diff --git a/pkg/agent/catalog/workloadattestor.go b/pkg/agent/catalog/workloadattestor.go index 4e367815bc..7328c5da37 100644 --- a/pkg/agent/catalog/workloadattestor.go +++ b/pkg/agent/catalog/workloadattestor.go @@ -23,7 +23,7 @@ func (repo *workloadAttestorRepository) Constraints() catalog.Constraints { } func (repo *workloadAttestorRepository) Versions() []catalog.Version { - return []catalog.Version{workloadAttestorV1{}} + return []catalog.Version{workloadAttestorV1{}, workloadAttestorV2{}} } func (repo *workloadAttestorRepository) BuiltIns() []catalog.BuiltIn { @@ -40,3 +40,8 @@ type workloadAttestorV1 struct{} func (workloadAttestorV1) New() catalog.Facade { return new(workloadattestor.V1) } func (workloadAttestorV1) Deprecated() bool { return false } + +type workloadAttestorV2 struct{} + +func (workloadAttestorV2) New() catalog.Facade { return new(workloadattestor.V2) } +func (workloadAttestorV2) Deprecated() bool { return true } diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 0163d2f7b9..5d35fd15ba 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -31,6 +31,9 @@ type Config struct { // Directory to bind the admin api to AdminBindAddress net.Addr + // Address to bind the broker endpoint to + BrokerBindAddress net.Addr + // The Validation Context resource name to use when fetching X.509 bundle together with federated bundles with Envoy SDS DefaultAllBundlesName string diff --git a/pkg/agent/endpoints/peertracker_test.go b/pkg/agent/endpoints/peertracker_test.go index 3f559c2ccc..dbaf726281 100644 --- a/pkg/agent/endpoints/peertracker_test.go +++ b/pkg/agent/endpoints/peertracker_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" "google.golang.org/grpc/peer" + "google.golang.org/protobuf/types/known/anypb" ) func TestPeerTrackerAttestor(t *testing.T) { @@ -44,6 +45,11 @@ func (a FakeAttestor) Attest(_ context.Context, pid int) ([]*common.Selector, er return nil, nil } +func (a FakeAttestor) AttestReference(_ context.Context, _ *anypb.Any) ([]*common.Selector, error) { + // TODO(arndt) do we want more logic here? + return []*common.Selector{{Type: "Type", Value: "Value"}}, nil +} + func WithFakeWatcher(alive bool) context.Context { return peer.NewContext(context.Background(), &peer.Peer{ AuthInfo: peertracker.AuthInfo{ diff --git a/pkg/agent/manager/cache/bundle_cache.go b/pkg/agent/manager/cache/bundle_cache.go index 1f275ac630..5ffffc1d2a 100644 --- a/pkg/agent/manager/cache/bundle_cache.go +++ b/pkg/agent/manager/cache/bundle_cache.go @@ -1,15 +1,19 @@ package cache import ( + "fmt" "maps" "github.com/imkira/go-observer" "github.com/spiffe/go-spiffe/v2/bundle/spiffebundle" + "github.com/spiffe/go-spiffe/v2/bundle/x509bundle" "github.com/spiffe/go-spiffe/v2/spiffeid" ) type Bundle = spiffebundle.Bundle +var _ x509bundle.Source = (*BundleCache)(nil) + type BundleCache struct { trustDomain spiffeid.TrustDomain bundles observer.Property @@ -43,6 +47,15 @@ func (c *BundleCache) SubscribeToBundleChanges() *BundleStream { return NewBundleStream(c.bundles.Observe()) } +func (c *BundleCache) GetX509BundleForTrustDomain(trustDomain spiffeid.TrustDomain) (*x509bundle.Bundle, error) { + bundles := c.Bundles() + bundle, ok := bundles[trustDomain] + if !ok { + return nil, fmt.Errorf("bundle for trust domain %q not found", trustDomain) + } + return bundle.X509Bundle(), nil +} + // Wraps an observer stream to provide a type safe interface type BundleStream struct { stream observer.Stream diff --git a/pkg/agent/manager/cache/lru_cache.go b/pkg/agent/manager/cache/lru_cache.go index e66e475db6..f21f545f8b 100644 --- a/pkg/agent/manager/cache/lru_cache.go +++ b/pkg/agent/manager/cache/lru_cache.go @@ -11,6 +11,7 @@ import ( "github.com/andres-erbsen/clock" "github.com/sirupsen/logrus" "github.com/spiffe/go-spiffe/v2/bundle/spiffebundle" + "github.com/spiffe/go-spiffe/v2/bundle/x509bundle" "github.com/spiffe/go-spiffe/v2/spiffeid" "github.com/spiffe/spire/pkg/common/backoff" "github.com/spiffe/spire/pkg/common/telemetry" @@ -585,6 +586,10 @@ func (c *LRUCache) SyncSVIDsWithSubscribers() { c.syncSVIDsWithSubscribers() } +func (c *LRUCache) X509Bundle() x509bundle.Source { + return c.BundleCache +} + // scheduleRotation processes SVID entries in batches, removing those tainted by X.509 authorities. // The process continues at regular intervals until all entries have been processed or the context is cancelled. func (c *LRUCache) scheduleRotation(ctx context.Context, entryIDs []string, taintedX509Authorities []*x509.Certificate) { diff --git a/pkg/agent/manager/manager.go b/pkg/agent/manager/manager.go index 72013a9440..1b57cf9aa5 100644 --- a/pkg/agent/manager/manager.go +++ b/pkg/agent/manager/manager.go @@ -11,6 +11,7 @@ import ( "github.com/andres-erbsen/clock" observer "github.com/imkira/go-observer" "github.com/spiffe/go-spiffe/v2/bundle/spiffebundle" + "github.com/spiffe/go-spiffe/v2/bundle/x509bundle" "github.com/spiffe/go-spiffe/v2/spiffeid" "github.com/spiffe/spire/pkg/agent/client" "github.com/spiffe/spire/pkg/agent/manager/cache" @@ -93,6 +94,9 @@ type Manager interface { // GetBundle get latest cached bundle GetBundle() *cache.Bundle + + // GetX509Bundle returns an X509 bundle source + GetX509Bundle() x509bundle.Source } // Cache stores each registration entry, signed X509-SVIDs for those entries, @@ -135,6 +139,8 @@ type Cache interface { // Identities get all identities in cache Identities() []cache.Identity + + X509Bundle() x509bundle.Source } type manager struct { @@ -429,6 +435,10 @@ func (m *manager) GetBundle() *cache.Bundle { return m.cache.Bundle() } +func (m *manager) GetX509Bundle() x509bundle.Source { + return m.cache.X509Bundle() +} + func (m *manager) runSVIDObserver(ctx context.Context) error { svidStream := m.SubscribeToSVIDChanges() for { diff --git a/pkg/agent/plugin/workloadattestor/k8s/k8s.go b/pkg/agent/plugin/workloadattestor/k8s/k8s.go index 2b3ddac8c3..0f3aed6676 100644 --- a/pkg/agent/plugin/workloadattestor/k8s/k8s.go +++ b/pkg/agent/plugin/workloadattestor/k8s/k8s.go @@ -20,7 +20,8 @@ import ( "github.com/andres-erbsen/clock" "github.com/hashicorp/go-hclog" "github.com/hashicorp/hcl" - workloadattestorv1 "github.com/spiffe/spire-plugin-sdk/proto/spire/plugin/agent/workloadattestor/v1" + "github.com/spiffe/spire-api-sdk/proto/spiffe/reference" + workloadattestorv2 "github.com/spiffe/spire-plugin-sdk/proto/spire/plugin/agent/workloadattestor/v2" configv1 "github.com/spiffe/spire-plugin-sdk/proto/spire/service/common/config/v1" "github.com/spiffe/spire/pkg/agent/common/sigstore" "github.com/spiffe/spire/pkg/common/catalog" @@ -31,6 +32,7 @@ import ( "golang.org/x/sync/singleflight" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" ) @@ -52,7 +54,7 @@ func BuiltIn() catalog.BuiltIn { func builtin(p *Plugin) catalog.BuiltIn { return catalog.MakeBuiltIn(pluginName, - workloadattestorv1.WorkloadAttestorPluginServer(p), + workloadattestorv2.WorkloadAttestorPluginServer(p), configv1.ConfigServiceServer(p), ) } @@ -257,11 +259,11 @@ func (p *Plugin) buildConfig(coreConfig catalog.CoreConfig, hclText string, stat type ContainerHelper interface { Configure(config *HCLConfig, log hclog.Logger) error - GetPodUIDAndContainerID(pID int32, log hclog.Logger) (types.UID, string, error) + GetPodUIDAndContainerID(reference *anypb.Any, log hclog.Logger) (types.UID, string, bool, error) } type Plugin struct { - workloadattestorv1.UnsafeWorkloadAttestorServer + workloadattestorv2.UnsafeWorkloadAttestorServer configv1.UnsafeConfigServer log hclog.Logger @@ -290,13 +292,15 @@ func (p *Plugin) SetLogger(log hclog.Logger) { p.log = log } -func (p *Plugin) Attest(ctx context.Context, req *workloadattestorv1.AttestRequest) (*workloadattestorv1.AttestResponse, error) { +func (p *Plugin) AttestReference(ctx context.Context, req *workloadattestorv2.AttestReferenceRequest) (*workloadattestorv2.AttestReferenceResponse, error) { + p.log.Info("skip kubelet verification", p.config.SkipKubeletVerification) + config, containerHelper, sigstoreVerifier, err := p.getConfig() if err != nil { return nil, err } - podUID, containerID, err := containerHelper.GetPodUIDAndContainerID(req.Pid, p.log) + podUID, containerID, _, err := containerHelper.GetPodUIDAndContainerID(req.Reference, p.log) if err != nil { return nil, err } @@ -304,7 +308,7 @@ func (p *Plugin) Attest(ctx context.Context, req *workloadattestorv1.AttestReque // Not a Kubernetes pod if containerID == "" { - return &workloadattestorv1.AttestResponse{}, nil + return &workloadattestorv2.AttestReferenceResponse{}, nil } log := p.log.With( @@ -323,7 +327,7 @@ func (p *Plugin) Attest(ctx context.Context, req *workloadattestorv1.AttestReque return nil, err } - var attestResponse *workloadattestorv1.AttestResponse + var attestResponse *workloadattestorv2.AttestReferenceResponse for podKey, podValue := range podList { if podKnown { if podKey != string(podUID) { @@ -375,7 +379,7 @@ func (p *Plugin) Attest(ctx context.Context, req *workloadattestorv1.AttestReque log.Warn("Two pods found with same container Id") return nil, status.Error(codes.Internal, "two pods found with same container Id") } - attestResponse = &workloadattestorv1.AttestResponse{SelectorValues: selectorValues} + attestResponse = &workloadattestorv2.AttestReferenceResponse{SelectorValues: selectorValues} } } @@ -846,3 +850,23 @@ func newCertPool(certs []*x509.Certificate) *x509.CertPool { } return certPool } + +func extractRelevantReference(ref *anypb.Any) (types.UID, int32, error) { + switch ref.TypeUrl { + case "type.googleapis.com/spiffe.reference.WorkloadPIDReference": + var pidRef reference.WorkloadPIDReference + if err := ref.UnmarshalTo(&pidRef); err != nil { + return "", -1, fmt.Errorf("unable to unmarshal PID reference: %w", err) + } + return "", pidRef.Pid, nil + case "type.googleapis.com/spiffe.reference.KubernetesPodUIDReference": + var podUIDRef reference.KubernetesPodUIDReference + if err := ref.UnmarshalTo(&podUIDRef); err != nil { + return "", -1, fmt.Errorf("unable to unmarshal Pod UID reference: %w", err) + } + uid := types.UID(podUIDRef.Uid) + return uid, -1, nil + default: + return "", -1, fmt.Errorf("unsupported reference type: %s", ref.TypeUrl) + } +} diff --git a/pkg/agent/plugin/workloadattestor/k8s/k8s_posix.go b/pkg/agent/plugin/workloadattestor/k8s/k8s_posix.go index 35510de46c..1688836656 100644 --- a/pkg/agent/plugin/workloadattestor/k8s/k8s_posix.go +++ b/pkg/agent/plugin/workloadattestor/k8s/k8s_posix.go @@ -3,6 +3,7 @@ package k8s import ( + "fmt" "io" "log" "os" @@ -16,6 +17,7 @@ import ( "github.com/spiffe/spire/pkg/common/containerinfo" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" "k8s.io/apimachinery/pkg/types" ) @@ -34,6 +36,7 @@ func createHelper(c *Plugin) ContainerHelper { } return &containerHelper{ rootDir: rootDir, + log: c.log, } } @@ -41,6 +44,7 @@ type containerHelper struct { rootDir string useNewContainerLocator bool verboseContainerLocatorLogs bool + log hclog.Logger } func (h *containerHelper) Configure(config *HCLConfig, log hclog.Logger) error { @@ -55,12 +59,34 @@ func (h *containerHelper) Configure(config *HCLConfig, log hclog.Logger) error { return nil } -func (h *containerHelper) GetPodUIDAndContainerID(pID int32, log hclog.Logger) (types.UID, string, error) { +func (h *containerHelper) GetPodUIDAndContainerID(ref *anypb.Any, log hclog.Logger) (types.UID, string, bool, error) { + podUID, pid, err := extractRelevantReference(ref) + + h.log.Info("Extracted reference from AttestReference request", "podUID", podUID, "pid", pid) + + switch { + case err != nil: + return "", "", false, fmt.Errorf("unable to extract relevant reference: %w", err) + case pid != -1: + // instruct that in the later state we require a container ID match in the pod spec. + requiredContainerIDMatch := true + podUID, containerID, err := h.getPodUIDAndContainerIDFromPID(pid, log) + h.log.Info("Obtained pod UID and container ID from PID reference", "podUID", podUID, "containerID", containerID, "pid", pid) + return podUID, containerID, requiredContainerIDMatch, err + case podUID != "": + return podUID, "", false, nil + default: + return "", "", false, status.Errorf(codes.InvalidArgument, "reference did not contain a valid Pod UID or PID") + } +} + +func (h *containerHelper) getPodUIDAndContainerIDFromPID(pID int32, log hclog.Logger) (types.UID, string, error) { if !h.useNewContainerLocator { cgroups, err := cgroups.GetCgroups(pID, dirFS(h.rootDir)) if err != nil { return "", "", status.Errorf(codes.Internal, "unable to obtain cgroups: %v", err) } + h.log.Info("Obtained cgroups for PID", "pid", pID, "cgroups", cgroups) return getPodUIDAndContainerIDFromCGroups(cgroups) } diff --git a/pkg/agent/plugin/workloadattestor/k8s/k8s_windows.go b/pkg/agent/plugin/workloadattestor/k8s/k8s_windows.go index 2d55ea3b24..1c70cf9c5a 100644 --- a/pkg/agent/plugin/workloadattestor/k8s/k8s_windows.go +++ b/pkg/agent/plugin/workloadattestor/k8s/k8s_windows.go @@ -9,6 +9,7 @@ import ( "github.com/spiffe/spire/pkg/common/container/process" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" "k8s.io/apimachinery/pkg/types" ) @@ -30,13 +31,18 @@ func (h *containerHelper) Configure(_ *HCLConfig, _ hclog.Logger) error { return nil } -func (h *containerHelper) GetPodUIDAndContainerID(pID int32, log hclog.Logger) (types.UID, string, error) { - containerID, err := h.ph.GetContainerIDByProcess(pID, log) +func (h *containerHelper) GetPodUIDAndContainerID(ref *anypb.Any, log hclog.Logger) (types.UID, string, bool, error) { + _, pid, err := extractRelevantReference(ref) if err != nil { - return types.UID(""), "", status.Errorf(codes.Internal, "failed to get container ID: %v", err) + return "", "", false, status.Errorf(codes.Internal, "failed to extract relevant reference: %v", err) } - return types.UID(""), containerID, nil + containerID, err := h.ph.GetContainerIDByProcess(pid, log) + if err != nil { + return types.UID(""), "", false, status.Errorf(codes.Internal, "failed to get container ID: %v", err) + } + + return types.UID(""), containerID, true, nil } func (p *Plugin) defaultKubeletCAPath() string { diff --git a/pkg/agent/plugin/workloadattestor/k8s/k8s_windows_test.go b/pkg/agent/plugin/workloadattestor/k8s/k8s_windows_test.go index 90efb51628..a9801ca49b 100644 --- a/pkg/agent/plugin/workloadattestor/k8s/k8s_windows_test.go +++ b/pkg/agent/plugin/workloadattestor/k8s/k8s_windows_test.go @@ -8,10 +8,12 @@ import ( "testing" "github.com/hashicorp/go-hclog" + "github.com/spiffe/spire-api-sdk/proto/spiffe/reference" "github.com/spiffe/spire/test/spiretest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" + "google.golang.org/protobuf/types/known/anypb" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" ) @@ -48,17 +50,22 @@ func (h *fakeContainerHelper) GetOSSelectors(context.Context, hclog.Logger, *cor return h.osSelectors, nil } -func (h *fakeContainerHelper) GetPodUIDAndContainerID(pID int32, _ hclog.Logger) (types.UID, string, error) { +func (h *fakeContainerHelper) GetPodUIDAndContainerID(ref *anypb.Any, _ hclog.Logger) (types.UID, string, bool, error) { if h.err != nil { - return types.UID(""), "", h.err + return types.UID(""), "", false, h.err } - cID, ok := h.cIDs[pID] + _, pid, err := extractRelevantReference(ref) + if err != nil { + return "", "", false, err + } + + cID, ok := h.cIDs[pid] if !ok { - return types.UID(""), "", nil + return types.UID(""), "", false, nil } - return types.UID(""), cID, nil + return types.UID(""), cID, true, nil } func (s *Suite) addGetContainerResponsePidInPod() { @@ -75,7 +82,9 @@ func TestContainerHelper(t *testing.T) { t.Run("containerID found", func(t *testing.T) { fakeHelper.containerID = "123" - podID, containerID, err := cHelper.GetPodUIDAndContainerID(123, nil) + ref, err := buildReferenceWithPID(123) + require.NoError(t, err) + podID, containerID, _, err := cHelper.GetPodUIDAndContainerID(ref, nil) require.NoError(t, err) assert.Empty(t, podID) @@ -84,7 +93,9 @@ func TestContainerHelper(t *testing.T) { t.Run("get fails", func(t *testing.T) { fakeHelper.err = errors.New("oh no") - podID, containerID, err := cHelper.GetPodUIDAndContainerID(123, nil) + ref, err := buildReferenceWithPID(123) + require.NoError(t, err) + podID, containerID, _, err := cHelper.GetPodUIDAndContainerID(ref, nil) spiretest.RequireGRPCStatus(t, err, codes.Internal, "failed to get container ID: oh no") assert.Empty(t, podID) @@ -104,3 +115,10 @@ func (f *fakeProcessHelper) GetContainerIDByProcess(int32, hclog.Logger) (string return f.containerID, nil } + +func buildReferenceWithPID(pid int32) (*anypb.Any, error) { + pidReference := reference.WorkloadPIDReference{ + Pid: pid, + } + return anypb.New(&pidReference) +} diff --git a/pkg/agent/plugin/workloadattestor/v1.go b/pkg/agent/plugin/workloadattestor/v1.go index 329a7a17de..2442fce867 100644 --- a/pkg/agent/plugin/workloadattestor/v1.go +++ b/pkg/agent/plugin/workloadattestor/v1.go @@ -2,12 +2,16 @@ package workloadattestor import ( "context" + "errors" "fmt" + "github.com/spiffe/spire-api-sdk/proto/spiffe/reference" workloadattestorv1 "github.com/spiffe/spire-plugin-sdk/proto/spire/plugin/agent/workloadattestor/v1" "github.com/spiffe/spire/pkg/common/plugin" "github.com/spiffe/spire/pkg/common/util" "github.com/spiffe/spire/proto/spire/common" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" ) type V1 struct { @@ -39,3 +43,23 @@ func (v1 *V1) Attest(ctx context.Context, pid int) ([]*common.Selector, error) { } return selectors, nil } + +func (v1 *V1) AttestReference(ctx context.Context, reference *anypb.Any) ([]*common.Selector, error) { + pid, err := extractPIDReference(reference) + if err != nil { + return nil, v1.WrapErr(err) + } + + return v1.Attest(ctx, int(pid)) +} + +func extractPIDReference(ref *anypb.Any) (int32, error) { + if ref.GetTypeUrl() == "type.googleapis.com/spiffe.reference.WorkloadPIDReference" { + var pidRef reference.WorkloadPIDReference + if err := anypb.UnmarshalTo(ref, &pidRef, proto.UnmarshalOptions{}); err != nil { + return 0, fmt.Errorf("unmarshaling PID reference: %w", err) + } + return pidRef.Pid, nil + } + return -1, errors.New("PID reference not found") +} diff --git a/pkg/agent/plugin/workloadattestor/v2.go b/pkg/agent/plugin/workloadattestor/v2.go new file mode 100644 index 0000000000..de25e76c4f --- /dev/null +++ b/pkg/agent/plugin/workloadattestor/v2.go @@ -0,0 +1,56 @@ +package workloadattestor + +import ( + "context" + "fmt" + + workloadattestorv2 "github.com/spiffe/spire-plugin-sdk/proto/spire/plugin/agent/workloadattestor/v2" + + "github.com/spiffe/spire-api-sdk/proto/spiffe/reference" + "github.com/spiffe/spire/pkg/common/plugin" + "github.com/spiffe/spire/pkg/common/util" + "github.com/spiffe/spire/proto/spire/common" + "google.golang.org/protobuf/types/known/anypb" +) + +type V2 struct { + plugin.Facade + workloadattestorv2.WorkloadAttestorPluginClient +} + +func (v2 *V2) Attest(ctx context.Context, pid int) ([]*common.Selector, error) { + pidInt32, err := util.CheckedCast[int32](pid) + if err != nil { + return nil, v2.WrapErr(fmt.Errorf("invalid value for PID: %w", err)) + } + pidReference := reference.WorkloadPIDReference{ + Pid: pidInt32, + } + anyPidReference, err := anypb.New(&pidReference) + if err != nil { + return nil, v2.WrapErr(err) + } + + return v2.AttestReference(ctx, anyPidReference) +} + +func (v2 *V2) AttestReference(ctx context.Context, reference *anypb.Any) ([]*common.Selector, error) { + resp, err := v2.WorkloadAttestorPluginClient.AttestReference(ctx, &workloadattestorv2.AttestReferenceRequest{ + Reference: reference, + }) + if err != nil { + return nil, v2.WrapErr(err) + } + + var selectors []*common.Selector + if resp.SelectorValues != nil { + selectors = make([]*common.Selector, 0, len(resp.SelectorValues)) + for _, selectorValue := range resp.SelectorValues { + selectors = append(selectors, &common.Selector{ + Type: v2.Name(), + Value: selectorValue, + }) + } + } + return selectors, nil +} diff --git a/pkg/agent/plugin/workloadattestor/workloadattestor.go b/pkg/agent/plugin/workloadattestor/workloadattestor.go index b17a586866..ba4458f0d9 100644 --- a/pkg/agent/plugin/workloadattestor/workloadattestor.go +++ b/pkg/agent/plugin/workloadattestor/workloadattestor.go @@ -5,10 +5,12 @@ import ( "github.com/spiffe/spire/pkg/common/catalog" "github.com/spiffe/spire/proto/spire/common" + "google.golang.org/protobuf/types/known/anypb" ) type WorkloadAttestor interface { catalog.PluginInfo Attest(ctx context.Context, pid int) ([]*common.Selector, error) + AttestReference(ctx context.Context, reference *anypb.Any) ([]*common.Selector, error) } diff --git a/pkg/common/containerinfo/extract.go b/pkg/common/containerinfo/extract.go index ae4cb2d1d4..c741d50cb2 100644 --- a/pkg/common/containerinfo/extract.go +++ b/pkg/common/containerinfo/extract.go @@ -64,11 +64,14 @@ func (e *Extractor) extractInfo(pid int32, log hclog.Logger, extractPodUID bool) return "", "", err } + log.Info("Extracted pod UID and container ID from mount info", "podUID", podUID, "containerID", containerID, "pid", pid) + if containerID == "" { podUID, containerID, err = e.extractPodUIDAndContainerIDFromCGroups(pid, log, extractPodUID) if err != nil { return "", "", err } + log.Info("Extracted pod UID and container ID from cgroups", "podUID", podUID, "containerID", containerID, "pid", pid) } return podUID, containerID, nil diff --git a/pkg/common/telemetry/names.go b/pkg/common/telemetry/names.go index e2bc775448..a04a177470 100644 --- a/pkg/common/telemetry/names.go +++ b/pkg/common/telemetry/names.go @@ -849,6 +849,9 @@ const ( // DelegatedIdentityAPI functionality related to delegated identity endpoints DelegatedIdentityAPI = "delegated_identity_api" + // BrokerAPI functionality related to delegated identity endpoints + BrokerAPI = "broker_api" + // DeleteFederatedBundle functionality related to deleting a federated bundle DeleteFederatedBundle = "delete_federated_bundle"