diff --git a/plugins/k8saudit/README.md b/plugins/k8saudit/README.md index 39dadb88..edcb36aa 100644 --- a/plugins/k8saudit/README.md +++ b/plugins/k8saudit/README.md @@ -9,7 +9,7 @@ Audit events are logged by the API server when almost every cluster management t This plugin supports consuming Kubernetes Audit Events coming from the [Webhook backend](https://kubernetes.io/docs/tasks/debug/debug-cluster/audit/#webhook-backend) or from a file. For webhooks, the plugin embeds a web server that listens on a configurable port and accepts POST requests. The posted JSON object comprises one or more events. The web server of the plugin can be configured as part of the plugin's init configuration and open parameters. For files, the plugin expects content to be in [JSONL format](https://jsonlines.org/), where each line represents a JSON object, containing one or more audit events. -The expected way of using the plugin with Falco is through a Webhook. File reading support can be used with Stratoshark or testing and development. +The expected way of using the plugin with Falco is through a Webhook. File reading support can be used with Stratoshark or testing and development. The `file://` scheme enables continuous file watching with log rotation support, useful for reading audit logs written to disk by the API server. ## Capabilities @@ -135,6 +135,7 @@ load_plugins: [k8saudit, json] **Open Parameters**: - `http://:/`: Opens an event stream by listening on an HTTP web server - `https://:/`: Opens an event stream by listening on an HTTPS web server +- `file://`: Opens an event stream by continuously watching a file for new audit events. Handles log rotation automatically. Example: `file:///var/log/kube-apiserver/audit.log` - `no scheme`: Opens an event stream by reading the events from a file on the local filesystem. The params string is interpreted as a filepath diff --git a/plugins/k8saudit/go.mod b/plugins/k8saudit/go.mod index 931bdbad..a83fde7a 100644 --- a/plugins/k8saudit/go.mod +++ b/plugins/k8saudit/go.mod @@ -5,6 +5,7 @@ go 1.15 require ( github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b github.com/falcosecurity/plugin-sdk-go v0.8.3 + github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/iancoleman/orderedmap v0.3.0 // indirect github.com/valyala/fastjson v1.6.4 ) diff --git a/plugins/k8saudit/go.sum b/plugins/k8saudit/go.sum index 389c983c..ea06fcc8 100644 --- a/plugins/k8saudit/go.sum +++ b/plugins/k8saudit/go.sum @@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/falcosecurity/plugin-sdk-go v0.8.3 h1:KsX7qt83dzC57qcNpZKaBrCjTXqpXgvxDcEXs6Z5sHI= github.com/falcosecurity/plugin-sdk-go v0.8.3/go.mod h1:gEgxjvuopv5VF4wc8s0EHnmT9qrIKBtcJVBnRlEPU1A= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/geraldcombs/fastjson v0.0.0-20250801170450-bf39244e60b8 h1:S2FAMWjJKPRR9fvtgYVWQ5joNsl0qQoRxmxYHKDDtx4= github.com/geraldcombs/fastjson v0.0.0-20250801170450-bf39244e60b8/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA= @@ -25,6 +27,8 @@ github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2 github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go b/plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go new file mode 100644 index 00000000..ef0a1f52 --- /dev/null +++ b/plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go @@ -0,0 +1,103 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2026 The Falco Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filewatch_test + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/falcosecurity/plugin-sdk-go/pkg/sdk" + "github.com/falcosecurity/plugins/plugins/k8saudit/pkg/k8saudit" +) + +const testAuditEvent = `{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":"Metadata","auditID":"test","stage":"ResponseComplete","requestURI":"/api","verb":"get","user":{"username":"test"},"sourceIPs":["127.0.0.1"],"objectRef":{"resource":"pods","namespace":"default","name":"test"},"responseStatus":{"metadata":{},"code":200},"requestReceivedTimestamp":"2023-01-01T00:00:00.000000Z","stageTimestamp":"2023-01-01T00:00:01.000000Z"}` + +func newTestPlugin() *k8saudit.Plugin { + p := &k8saudit.Plugin{} + p.Config.Reset() + return p +} + +func TestOpenFileWatch_DetectsNewContent(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "audit.log") + + if err := os.WriteFile(filePath, []byte{}, 0644); err != nil { + t.Fatal(err) + } + + p := newTestPlugin() + inst, err := p.OpenFileWatch(filePath) + if err != nil { + t.Fatal(err) + } + defer inst.(sdk.Closer).Close() + + time.Sleep(100 * time.Millisecond) + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + t.Fatal(err) + } + f.WriteString(testAuditEvent + "\n") + f.Close() + + time.Sleep(200 * time.Millisecond) +} + +func TestOpenFileWatch_HandlesRotation(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "audit.log") + + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + p := newTestPlugin() + inst, err := p.OpenFileWatch(filePath) + if err != nil { + t.Fatal(err) + } + defer inst.(sdk.Closer).Close() + + time.Sleep(100 * time.Millisecond) + + os.Remove(filePath) + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + time.Sleep(200 * time.Millisecond) +} + +func TestOpen_FileScheme(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "audit.log") + + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + p := newTestPlugin() + inst, err := p.Open("file://" + filePath) + if err != nil { + t.Fatal(err) + } + defer inst.(sdk.Closer).Close() +} diff --git a/plugins/k8saudit/pkg/k8saudit/source.go b/plugins/k8saudit/pkg/k8saudit/source.go index 4081eadb..90ee4511 100644 --- a/plugins/k8saudit/pkg/k8saudit/source.go +++ b/plugins/k8saudit/pkg/k8saudit/source.go @@ -26,12 +26,14 @@ import ( "net/http" "net/url" "os" + "path/filepath" "sort" "strings" "time" "github.com/falcosecurity/plugin-sdk-go/pkg/sdk" "github.com/falcosecurity/plugin-sdk-go/pkg/sdk/plugins/source" + "github.com/fsnotify/fsnotify" "github.com/valyala/fastjson" ) @@ -51,6 +53,8 @@ func (k *Plugin) Open(params string) (source.Instance, error) { return k.OpenWebServer(u.Host, u.Path, false) case "https": return k.OpenWebServer(u.Host, u.Path, true) + case "file": + return k.OpenFileWatch(u.Path) case "": // by default, fallback to opening a filepath trimmed := strings.TrimSpace(params) @@ -125,6 +129,123 @@ func (k *Plugin) OpenReader(r io.ReadCloser) (source.Instance, error) { source.WithInstanceEventSize(uint32(k.Config.MaxEventSize))) } +// OpenFileWatch opens a source.Instance that continuously watches a file for +// new K8S Audit Events using fsnotify. It watches the parent directory (as +// recommended by fsnotify) to handle atomic file replacements and log rotation. +func (k *Plugin) OpenFileWatch(path string) (source.Instance, error) { + absPath, err := filepath.Abs(path) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path: %w", err) + } + dir := filepath.Dir(absPath) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("failed to create file watcher: %w", err) + } + + ctx, cancelCtx := context.WithCancel(context.Background()) + evtC := make(chan source.PushEvent) + + go func() { + defer close(evtC) + defer watcher.Close() + + var parser fastjson.Parser + var file *os.File + var offset int64 + + openFile := func(seekEnd bool) bool { + if file != nil { + file.Close() + file = nil + } + f, err := os.Open(absPath) + if err != nil { + return false + } + file = f + if seekEnd { + offset, _ = file.Seek(0, io.SeekEnd) + } else { + offset = 0 + } + return true + } + + readNewLines := func() { + if file == nil { + return + } + file.Seek(offset, io.SeekStart) + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + offset += int64(len(line)) + 1 + if len(line) > 0 { + k.parseAuditEventsAndPush(&parser, []byte(line), evtC) + } + } + } + + openFile(true) + + if err := watcher.Add(dir); err != nil { + if file != nil { + file.Close() + } + evtC <- source.PushEvent{Err: err} + return + } + + for { + select { + case <-ctx.Done(): + if file != nil { + file.Close() + } + return + case event, ok := <-watcher.Events: + if !ok { + if file != nil { + file.Close() + } + return + } + if event.Name != absPath { + continue + } + if event.Op&fsnotify.Write == fsnotify.Write { + readNewLines() + } + if event.Op&fsnotify.Create == fsnotify.Create { + openFile(false) + readNewLines() + } + if event.Op&(fsnotify.Rename|fsnotify.Remove) != 0 { + if file != nil { + file.Close() + file = nil + } + } + case _, ok := <-watcher.Errors: + if !ok { + if file != nil { + file.Close() + } + return + } + } + } + }() + + return source.NewPushInstance( + evtC, + source.WithInstanceContext(ctx), + source.WithInstanceClose(cancelCtx), + source.WithInstanceEventSize(uint32(k.Config.MaxEventSize))) +} + // OpenWebServer opens a source.Instance event stream that receives K8S Audit // Events by starting a server and listening for JSON webhooks. The expected // JSON format is the one of K8S API Server webhook backend