From b50b7b104e855bc338f83884aed9b48b91c64520 Mon Sep 17 00:00:00 2001 From: Richard Tweed Date: Thu, 22 Jan 2026 16:59:47 +0000 Subject: [PATCH 1/7] feat(k8saudit): add file watching support via tail:// scheme Add continuous file watching capability for audit logs using the tail:// URL scheme. This allows monitoring files for new entries and handles log rotation via inode detection and file truncation. Closes #191 Co-Authored-By: Claude Opus 4.5 Signed-off-by: Richard Tweed --- plugins/k8saudit/pkg/k8saudit/config.go | 2 + plugins/k8saudit/pkg/k8saudit/source.go | 87 ++++++++++ .../k8saudit/pkg/k8saudit/tail/tail_test.go | 153 ++++++++++++++++++ 3 files changed, 242 insertions(+) create mode 100644 plugins/k8saudit/pkg/k8saudit/tail/tail_test.go diff --git a/plugins/k8saudit/pkg/k8saudit/config.go b/plugins/k8saudit/pkg/k8saudit/config.go index 16b0eb05b..2c9a910a2 100644 --- a/plugins/k8saudit/pkg/k8saudit/config.go +++ b/plugins/k8saudit/pkg/k8saudit/config.go @@ -24,6 +24,7 @@ type PluginConfig struct { UseAsync bool `json:"useAsync" jsonschema:"title=Use async extraction,description=If true then async extraction optimization is enabled (Default: true),default=true"` MaxEventSize uint64 `json:"maxEventSize" jsonschema:"title=Maximum event size,description=Maximum size of single audit event (Default: 262144),default=262144"` WebhookMaxBatchSize uint64 `json:"webhookMaxBatchSize" jsonschema:"title=Maximum webhook request size,description=Maximum size of incoming webhook POST request bodies (Default: 12582912),default=12582912"` + WatchPollIntervalMs uint64 `json:"watchPollIntervalMs" jsonschema:"title=Watch poll interval,description=Polling interval in milliseconds when watching a file with tail:// scheme (Default: 250),default=250"` } // Resets sets the configuration to its default values @@ -38,4 +39,5 @@ func (k *PluginConfig) Reset() { // The following values have been chosen by increasing by ~20% the default // values of the K8S docs k.WebhookMaxBatchSize = 12 * 1024 * 1024 + k.WatchPollIntervalMs = 250 } diff --git a/plugins/k8saudit/pkg/k8saudit/source.go b/plugins/k8saudit/pkg/k8saudit/source.go index 4081eadb6..9f9cb0a33 100644 --- a/plugins/k8saudit/pkg/k8saudit/source.go +++ b/plugins/k8saudit/pkg/k8saudit/source.go @@ -28,6 +28,7 @@ import ( "os" "sort" "strings" + "syscall" "time" "github.com/falcosecurity/plugin-sdk-go/pkg/sdk" @@ -35,6 +36,13 @@ import ( "github.com/valyala/fastjson" ) +func fileInode(info os.FileInfo) uint64 { + if stat, ok := info.Sys().(*syscall.Stat_t); ok { + return stat.Ino + } + return 0 +} + const ( webServerShutdownTimeoutSecs = 5 webServerEventChanBufSize = 50 @@ -51,6 +59,8 @@ func (k *Plugin) Open(params string) (source.Instance, error) { return k.OpenWebServer(u.Host, u.Path, false) case "https": return k.OpenWebServer(u.Host, u.Path, true) + case "tail": + return k.OpenFileTail(u.Path) case "": // by default, fallback to opening a filepath trimmed := strings.TrimSpace(params) @@ -125,6 +135,83 @@ func (k *Plugin) OpenReader(r io.ReadCloser) (source.Instance, error) { source.WithInstanceEventSize(uint32(k.Config.MaxEventSize))) } +// OpenFileTail opens a source.Instance that continuously watches a file for +// new K8S Audit Events, similar to "tail -f". It handles log rotation by +// detecting file truncation or inode changes. +func (k *Plugin) OpenFileTail(path string) (source.Instance, error) { + ctx, cancelCtx := context.WithCancel(context.Background()) + evtC := make(chan source.PushEvent) + + go func() { + defer close(evtC) + var parser fastjson.Parser + var offset int64 + var lastInode uint64 + + pollInterval := time.Duration(k.Config.WatchPollIntervalMs) * time.Millisecond + + for { + select { + case <-ctx.Done(): + return + default: + } + + file, err := os.Open(path) + if err != nil { + select { + case <-ctx.Done(): + return + case <-time.After(pollInterval): + continue + } + } + + info, err := file.Stat() + if err != nil { + file.Close() + continue + } + + inode := fileInode(info) + if lastInode != 0 && inode != lastInode { + offset = 0 // file rotated (new inode) + } else if info.Size() < offset { + offset = 0 // file truncated + } + lastInode = inode + + if offset > 0 { + file.Seek(offset, io.SeekStart) + } + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if len(line) > 0 { + k.parseAuditEventsAndPush(&parser, []byte(line), evtC) + } + } + + pos, _ := file.Seek(0, io.SeekCurrent) + offset = pos + file.Close() + + select { + case <-ctx.Done(): + return + case <-time.After(pollInterval): + } + } + }() + + return source.NewPushInstance( + evtC, + source.WithInstanceContext(ctx), + source.WithInstanceClose(cancelCtx), + source.WithInstanceEventSize(uint32(k.Config.MaxEventSize))) +} + // OpenWebServer opens a source.Instance event stream that receives K8S Audit // Events by starting a server and listening for JSON webhooks. The expected // JSON format is the one of K8S API Server webhook backend diff --git a/plugins/k8saudit/pkg/k8saudit/tail/tail_test.go b/plugins/k8saudit/pkg/k8saudit/tail/tail_test.go new file mode 100644 index 000000000..606710b91 --- /dev/null +++ b/plugins/k8saudit/pkg/k8saudit/tail/tail_test.go @@ -0,0 +1,153 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2023 The Falco Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tail_test + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/falcosecurity/plugin-sdk-go/pkg/sdk" + "github.com/falcosecurity/plugins/plugins/k8saudit/pkg/k8saudit" +) + +const testAuditEvent = `{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":"Metadata","auditID":"test","stage":"ResponseComplete","requestURI":"/api","verb":"get","user":{"username":"test"},"sourceIPs":["127.0.0.1"],"objectRef":{"resource":"pods","namespace":"default","name":"test"},"responseStatus":{"metadata":{},"code":200},"requestReceivedTimestamp":"2023-01-01T00:00:00.000000Z","stageTimestamp":"2023-01-01T00:00:01.000000Z"}` + +func newTestPlugin() *k8saudit.Plugin { + p := &k8saudit.Plugin{} + p.Config.Reset() + p.Config.WatchPollIntervalMs = 50 + return p +} + +func TestOpenFileTail_ReadsExistingContent(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "audit.log") + + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + p := newTestPlugin() + inst, err := p.OpenFileTail(filePath) + if err != nil { + t.Fatal(err) + } + defer inst.(sdk.Closer).Close() + + // Allow time for initial read + time.Sleep(100 * time.Millisecond) +} + +func TestOpenFileTail_DetectsNewContent(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "audit.log") + + if err := os.WriteFile(filePath, []byte{}, 0644); err != nil { + t.Fatal(err) + } + + p := newTestPlugin() + inst, err := p.OpenFileTail(filePath) + if err != nil { + t.Fatal(err) + } + defer inst.(sdk.Closer).Close() + + // Write new content after opening + time.Sleep(100 * time.Millisecond) + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + t.Fatal(err) + } + f.WriteString(testAuditEvent + "\n") + f.Close() + + // Allow time for polling to detect new content + time.Sleep(200 * time.Millisecond) +} + +func TestOpenFileTail_HandlesRotation(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "audit.log") + + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + p := newTestPlugin() + inst, err := p.OpenFileTail(filePath) + if err != nil { + t.Fatal(err) + } + defer inst.(sdk.Closer).Close() + + time.Sleep(100 * time.Millisecond) + + // Simulate rotation: remove and recreate with new content + os.Remove(filePath) + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + // Allow time for polling to detect rotation + time.Sleep(200 * time.Millisecond) +} + +func TestOpenFileTail_HandlesTruncation(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "audit.log") + + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"+testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + p := newTestPlugin() + inst, err := p.OpenFileTail(filePath) + if err != nil { + t.Fatal(err) + } + defer inst.(sdk.Closer).Close() + + time.Sleep(100 * time.Millisecond) + + // Truncate the file + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + // Allow time for polling to detect truncation + time.Sleep(200 * time.Millisecond) +} + +func TestOpen_TailScheme(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "audit.log") + + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + p := newTestPlugin() + inst, err := p.Open("tail://" + filePath) + if err != nil { + t.Fatal(err) + } + defer inst.(sdk.Closer).Close() +} From b9c7cfd6f650245399f507bb2193e0154383b3c8 Mon Sep 17 00:00:00 2001 From: Richard Tweed Date: Thu, 22 Jan 2026 17:03:40 +0000 Subject: [PATCH 2/7] docs(k8saudit): document tail:// file watching scheme Add documentation for the new tail:// URL scheme and watchPollIntervalMs configuration option. Co-Authored-By: Claude Opus 4.5 Signed-off-by: Richard Tweed --- plugins/k8saudit/README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugins/k8saudit/README.md b/plugins/k8saudit/README.md index 39dadb882..5fb6080da 100644 --- a/plugins/k8saudit/README.md +++ b/plugins/k8saudit/README.md @@ -9,7 +9,7 @@ Audit events are logged by the API server when almost every cluster management t This plugin supports consuming Kubernetes Audit Events coming from the [Webhook backend](https://kubernetes.io/docs/tasks/debug/debug-cluster/audit/#webhook-backend) or from a file. For webhooks, the plugin embeds a web server that listens on a configurable port and accepts POST requests. The posted JSON object comprises one or more events. The web server of the plugin can be configured as part of the plugin's init configuration and open parameters. For files, the plugin expects content to be in [JSONL format](https://jsonlines.org/), where each line represents a JSON object, containing one or more audit events. -The expected way of using the plugin with Falco is through a Webhook. File reading support can be used with Stratoshark or testing and development. +The expected way of using the plugin with Falco is through a Webhook. File reading support can be used with Stratoshark or testing and development. The `tail://` scheme enables continuous file watching with log rotation support, useful for reading audit logs written to disk by the API server. ## Capabilities @@ -131,11 +131,13 @@ load_plugins: [k8saudit, json] - `maxEventSize`: Maximum size of single audit event (Default: 262144) - `webhookMaxBatchSize`: Maximum size of incoming webhook POST request bodies (Default: 12582912) - `useAsync`: If true, then async extraction optimization is enabled (Default: true) +- `watchPollIntervalMs`: Polling interval in milliseconds when watching a file with the `tail://` scheme (Default: 250) **Open Parameters**: - `http://:/`: Opens an event stream by listening on an HTTP web server - `https://:/`: Opens an event stream by listening on an HTTPS web server -- `no scheme`: Opens an event stream by reading the events from a file on the local filesystem. The params string is interpreted as a filepath +- `tail://`: Opens an event stream by continuously watching a file for new audit events, similar to `tail -f`. Handles log rotation (inode changes) and file truncation automatically. Example: `tail:///var/log/kube-apiserver/audit.log` +- `no scheme`: Opens an event stream by reading the events from a file on the local filesystem. The params string is interpreted as a filepath (one-shot read, exits at EOF) **NOTE**: There is also a full tutorial on how to run the k8saudit plugin in a Kubernetes cluster using minikube: From b4696a614148482909f4e483c2906685abfe2151 Mon Sep 17 00:00:00 2001 From: Richard Tweed Date: Thu, 22 Jan 2026 17:11:31 +0000 Subject: [PATCH 3/7] docs(k8saudit): correct documentation for default file read Signed-off-by: Richard Tweed --- plugins/k8saudit/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/k8saudit/README.md b/plugins/k8saudit/README.md index 5fb6080da..cbbb6f393 100644 --- a/plugins/k8saudit/README.md +++ b/plugins/k8saudit/README.md @@ -137,7 +137,7 @@ load_plugins: [k8saudit, json] - `http://:/`: Opens an event stream by listening on an HTTP web server - `https://:/`: Opens an event stream by listening on an HTTPS web server - `tail://`: Opens an event stream by continuously watching a file for new audit events, similar to `tail -f`. Handles log rotation (inode changes) and file truncation automatically. Example: `tail:///var/log/kube-apiserver/audit.log` -- `no scheme`: Opens an event stream by reading the events from a file on the local filesystem. The params string is interpreted as a filepath (one-shot read, exits at EOF) +- `no scheme`: Opens an event stream by reading the events from a file on the local filesystem. The params string is interpreted as a filepath **NOTE**: There is also a full tutorial on how to run the k8saudit plugin in a Kubernetes cluster using minikube: From 2b7e0543fd8bd2a6df423e8bbc6eec2724a16ef6 Mon Sep 17 00:00:00 2001 From: Richard Tweed Date: Mon, 26 Jan 2026 17:00:35 +0000 Subject: [PATCH 4/7] Update plugins/k8saudit/pkg/k8saudit/tail/tail_test.go Co-authored-by: Iacopo Rozzo Signed-off-by: Richard Tweed --- plugins/k8saudit/pkg/k8saudit/tail/tail_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/k8saudit/pkg/k8saudit/tail/tail_test.go b/plugins/k8saudit/pkg/k8saudit/tail/tail_test.go index 606710b91..fc7813f35 100644 --- a/plugins/k8saudit/pkg/k8saudit/tail/tail_test.go +++ b/plugins/k8saudit/pkg/k8saudit/tail/tail_test.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 /* -Copyright (C) 2023 The Falco Authors. +Copyright (C) 2026 The Falco Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From c12e1e05aeb32140116ec6dbc5482c1ab53c8827 Mon Sep 17 00:00:00 2001 From: Richard Tweed Date: Mon, 26 Jan 2026 17:25:47 +0000 Subject: [PATCH 5/7] refactor(k8saudit): use event-driven file watching with fsnotify Replace polling-based file watching with fsnotify for better efficiency. - Use fsnotify to watch parent directory (per maintainer recommendation) - Rename scheme from tail:// to file:// - Remove watchPollIntervalMs config (no longer needed) - Rename test package from tail to filewatch Co-Authored-By: Claude Opus 4.5 Signed-off-by: Richard Tweed --- plugins/k8saudit/README.md | 5 +- plugins/k8saudit/go.mod | 1 + plugins/k8saudit/go.sum | 4 + plugins/k8saudit/pkg/k8saudit/config.go | 2 - .../filewatch_test.go} | 64 +-------- plugins/k8saudit/pkg/k8saudit/source.go | 134 +++++++++++------- 6 files changed, 98 insertions(+), 112 deletions(-) rename plugins/k8saudit/pkg/k8saudit/{tail/tail_test.go => filewatch/filewatch_test.go} (61%) diff --git a/plugins/k8saudit/README.md b/plugins/k8saudit/README.md index cbbb6f393..edcb36aaa 100644 --- a/plugins/k8saudit/README.md +++ b/plugins/k8saudit/README.md @@ -9,7 +9,7 @@ Audit events are logged by the API server when almost every cluster management t This plugin supports consuming Kubernetes Audit Events coming from the [Webhook backend](https://kubernetes.io/docs/tasks/debug/debug-cluster/audit/#webhook-backend) or from a file. For webhooks, the plugin embeds a web server that listens on a configurable port and accepts POST requests. The posted JSON object comprises one or more events. The web server of the plugin can be configured as part of the plugin's init configuration and open parameters. For files, the plugin expects content to be in [JSONL format](https://jsonlines.org/), where each line represents a JSON object, containing one or more audit events. -The expected way of using the plugin with Falco is through a Webhook. File reading support can be used with Stratoshark or testing and development. The `tail://` scheme enables continuous file watching with log rotation support, useful for reading audit logs written to disk by the API server. +The expected way of using the plugin with Falco is through a Webhook. File reading support can be used with Stratoshark or testing and development. The `file://` scheme enables continuous file watching with log rotation support, useful for reading audit logs written to disk by the API server. ## Capabilities @@ -131,12 +131,11 @@ load_plugins: [k8saudit, json] - `maxEventSize`: Maximum size of single audit event (Default: 262144) - `webhookMaxBatchSize`: Maximum size of incoming webhook POST request bodies (Default: 12582912) - `useAsync`: If true, then async extraction optimization is enabled (Default: true) -- `watchPollIntervalMs`: Polling interval in milliseconds when watching a file with the `tail://` scheme (Default: 250) **Open Parameters**: - `http://:/`: Opens an event stream by listening on an HTTP web server - `https://:/`: Opens an event stream by listening on an HTTPS web server -- `tail://`: Opens an event stream by continuously watching a file for new audit events, similar to `tail -f`. Handles log rotation (inode changes) and file truncation automatically. Example: `tail:///var/log/kube-apiserver/audit.log` +- `file://`: Opens an event stream by continuously watching a file for new audit events. Handles log rotation automatically. Example: `file:///var/log/kube-apiserver/audit.log` - `no scheme`: Opens an event stream by reading the events from a file on the local filesystem. The params string is interpreted as a filepath diff --git a/plugins/k8saudit/go.mod b/plugins/k8saudit/go.mod index 931bdbad0..a83fde7a9 100644 --- a/plugins/k8saudit/go.mod +++ b/plugins/k8saudit/go.mod @@ -5,6 +5,7 @@ go 1.15 require ( github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b github.com/falcosecurity/plugin-sdk-go v0.8.3 + github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/iancoleman/orderedmap v0.3.0 // indirect github.com/valyala/fastjson v1.6.4 ) diff --git a/plugins/k8saudit/go.sum b/plugins/k8saudit/go.sum index 389c983c5..ea06fcc85 100644 --- a/plugins/k8saudit/go.sum +++ b/plugins/k8saudit/go.sum @@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/falcosecurity/plugin-sdk-go v0.8.3 h1:KsX7qt83dzC57qcNpZKaBrCjTXqpXgvxDcEXs6Z5sHI= github.com/falcosecurity/plugin-sdk-go v0.8.3/go.mod h1:gEgxjvuopv5VF4wc8s0EHnmT9qrIKBtcJVBnRlEPU1A= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/geraldcombs/fastjson v0.0.0-20250801170450-bf39244e60b8 h1:S2FAMWjJKPRR9fvtgYVWQ5joNsl0qQoRxmxYHKDDtx4= github.com/geraldcombs/fastjson v0.0.0-20250801170450-bf39244e60b8/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA= @@ -25,6 +27,8 @@ github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2 github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/plugins/k8saudit/pkg/k8saudit/config.go b/plugins/k8saudit/pkg/k8saudit/config.go index 2c9a910a2..16b0eb05b 100644 --- a/plugins/k8saudit/pkg/k8saudit/config.go +++ b/plugins/k8saudit/pkg/k8saudit/config.go @@ -24,7 +24,6 @@ type PluginConfig struct { UseAsync bool `json:"useAsync" jsonschema:"title=Use async extraction,description=If true then async extraction optimization is enabled (Default: true),default=true"` MaxEventSize uint64 `json:"maxEventSize" jsonschema:"title=Maximum event size,description=Maximum size of single audit event (Default: 262144),default=262144"` WebhookMaxBatchSize uint64 `json:"webhookMaxBatchSize" jsonschema:"title=Maximum webhook request size,description=Maximum size of incoming webhook POST request bodies (Default: 12582912),default=12582912"` - WatchPollIntervalMs uint64 `json:"watchPollIntervalMs" jsonschema:"title=Watch poll interval,description=Polling interval in milliseconds when watching a file with tail:// scheme (Default: 250),default=250"` } // Resets sets the configuration to its default values @@ -39,5 +38,4 @@ func (k *PluginConfig) Reset() { // The following values have been chosen by increasing by ~20% the default // values of the K8S docs k.WebhookMaxBatchSize = 12 * 1024 * 1024 - k.WatchPollIntervalMs = 250 } diff --git a/plugins/k8saudit/pkg/k8saudit/tail/tail_test.go b/plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go similarity index 61% rename from plugins/k8saudit/pkg/k8saudit/tail/tail_test.go rename to plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go index fc7813f35..ef0a1f520 100644 --- a/plugins/k8saudit/pkg/k8saudit/tail/tail_test.go +++ b/plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package tail_test +package filewatch_test import ( "os" @@ -32,30 +32,10 @@ const testAuditEvent = `{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":" func newTestPlugin() *k8saudit.Plugin { p := &k8saudit.Plugin{} p.Config.Reset() - p.Config.WatchPollIntervalMs = 50 return p } -func TestOpenFileTail_ReadsExistingContent(t *testing.T) { - tmpDir := t.TempDir() - filePath := filepath.Join(tmpDir, "audit.log") - - if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { - t.Fatal(err) - } - - p := newTestPlugin() - inst, err := p.OpenFileTail(filePath) - if err != nil { - t.Fatal(err) - } - defer inst.(sdk.Closer).Close() - - // Allow time for initial read - time.Sleep(100 * time.Millisecond) -} - -func TestOpenFileTail_DetectsNewContent(t *testing.T) { +func TestOpenFileWatch_DetectsNewContent(t *testing.T) { tmpDir := t.TempDir() filePath := filepath.Join(tmpDir, "audit.log") @@ -64,13 +44,12 @@ func TestOpenFileTail_DetectsNewContent(t *testing.T) { } p := newTestPlugin() - inst, err := p.OpenFileTail(filePath) + inst, err := p.OpenFileWatch(filePath) if err != nil { t.Fatal(err) } defer inst.(sdk.Closer).Close() - // Write new content after opening time.Sleep(100 * time.Millisecond) f, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, 0644) if err != nil { @@ -79,11 +58,10 @@ func TestOpenFileTail_DetectsNewContent(t *testing.T) { f.WriteString(testAuditEvent + "\n") f.Close() - // Allow time for polling to detect new content time.Sleep(200 * time.Millisecond) } -func TestOpenFileTail_HandlesRotation(t *testing.T) { +func TestOpenFileWatch_HandlesRotation(t *testing.T) { tmpDir := t.TempDir() filePath := filepath.Join(tmpDir, "audit.log") @@ -92,7 +70,7 @@ func TestOpenFileTail_HandlesRotation(t *testing.T) { } p := newTestPlugin() - inst, err := p.OpenFileTail(filePath) + inst, err := p.OpenFileWatch(filePath) if err != nil { t.Fatal(err) } @@ -100,43 +78,15 @@ func TestOpenFileTail_HandlesRotation(t *testing.T) { time.Sleep(100 * time.Millisecond) - // Simulate rotation: remove and recreate with new content os.Remove(filePath) if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { t.Fatal(err) } - // Allow time for polling to detect rotation - time.Sleep(200 * time.Millisecond) -} - -func TestOpenFileTail_HandlesTruncation(t *testing.T) { - tmpDir := t.TempDir() - filePath := filepath.Join(tmpDir, "audit.log") - - if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"+testAuditEvent+"\n"), 0644); err != nil { - t.Fatal(err) - } - - p := newTestPlugin() - inst, err := p.OpenFileTail(filePath) - if err != nil { - t.Fatal(err) - } - defer inst.(sdk.Closer).Close() - - time.Sleep(100 * time.Millisecond) - - // Truncate the file - if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { - t.Fatal(err) - } - - // Allow time for polling to detect truncation time.Sleep(200 * time.Millisecond) } -func TestOpen_TailScheme(t *testing.T) { +func TestOpen_FileScheme(t *testing.T) { tmpDir := t.TempDir() filePath := filepath.Join(tmpDir, "audit.log") @@ -145,7 +95,7 @@ func TestOpen_TailScheme(t *testing.T) { } p := newTestPlugin() - inst, err := p.Open("tail://" + filePath) + inst, err := p.Open("file://" + filePath) if err != nil { t.Fatal(err) } diff --git a/plugins/k8saudit/pkg/k8saudit/source.go b/plugins/k8saudit/pkg/k8saudit/source.go index 9f9cb0a33..90ee45118 100644 --- a/plugins/k8saudit/pkg/k8saudit/source.go +++ b/plugins/k8saudit/pkg/k8saudit/source.go @@ -26,23 +26,17 @@ import ( "net/http" "net/url" "os" + "path/filepath" "sort" "strings" - "syscall" "time" "github.com/falcosecurity/plugin-sdk-go/pkg/sdk" "github.com/falcosecurity/plugin-sdk-go/pkg/sdk/plugins/source" + "github.com/fsnotify/fsnotify" "github.com/valyala/fastjson" ) -func fileInode(info os.FileInfo) uint64 { - if stat, ok := info.Sys().(*syscall.Stat_t); ok { - return stat.Ino - } - return 0 -} - const ( webServerShutdownTimeoutSecs = 5 webServerEventChanBufSize = 50 @@ -59,8 +53,8 @@ func (k *Plugin) Open(params string) (source.Instance, error) { return k.OpenWebServer(u.Host, u.Path, false) case "https": return k.OpenWebServer(u.Host, u.Path, true) - case "tail": - return k.OpenFileTail(u.Path) + case "file": + return k.OpenFileWatch(u.Path) case "": // by default, fallback to opening a filepath trimmed := strings.TrimSpace(params) @@ -135,72 +129,112 @@ func (k *Plugin) OpenReader(r io.ReadCloser) (source.Instance, error) { source.WithInstanceEventSize(uint32(k.Config.MaxEventSize))) } -// OpenFileTail opens a source.Instance that continuously watches a file for -// new K8S Audit Events, similar to "tail -f". It handles log rotation by -// detecting file truncation or inode changes. -func (k *Plugin) OpenFileTail(path string) (source.Instance, error) { +// OpenFileWatch opens a source.Instance that continuously watches a file for +// new K8S Audit Events using fsnotify. It watches the parent directory (as +// recommended by fsnotify) to handle atomic file replacements and log rotation. +func (k *Plugin) OpenFileWatch(path string) (source.Instance, error) { + absPath, err := filepath.Abs(path) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path: %w", err) + } + dir := filepath.Dir(absPath) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("failed to create file watcher: %w", err) + } + ctx, cancelCtx := context.WithCancel(context.Background()) evtC := make(chan source.PushEvent) go func() { defer close(evtC) + defer watcher.Close() + var parser fastjson.Parser + var file *os.File var offset int64 - var lastInode uint64 - - pollInterval := time.Duration(k.Config.WatchPollIntervalMs) * time.Millisecond - - for { - select { - case <-ctx.Done(): - return - default: - } - file, err := os.Open(path) - if err != nil { - select { - case <-ctx.Done(): - return - case <-time.After(pollInterval): - continue - } + openFile := func(seekEnd bool) bool { + if file != nil { + file.Close() + file = nil } - - info, err := file.Stat() + f, err := os.Open(absPath) if err != nil { - file.Close() - continue + return false } - - inode := fileInode(info) - if lastInode != 0 && inode != lastInode { - offset = 0 // file rotated (new inode) - } else if info.Size() < offset { - offset = 0 // file truncated + file = f + if seekEnd { + offset, _ = file.Seek(0, io.SeekEnd) + } else { + offset = 0 } - lastInode = inode + return true + } - if offset > 0 { - file.Seek(offset, io.SeekStart) + readNewLines := func() { + if file == nil { + return } - + file.Seek(offset, io.SeekStart) scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() + offset += int64(len(line)) + 1 if len(line) > 0 { k.parseAuditEventsAndPush(&parser, []byte(line), evtC) } } + } + + openFile(true) - pos, _ := file.Seek(0, io.SeekCurrent) - offset = pos - file.Close() + if err := watcher.Add(dir); err != nil { + if file != nil { + file.Close() + } + evtC <- source.PushEvent{Err: err} + return + } + for { select { case <-ctx.Done(): + if file != nil { + file.Close() + } return - case <-time.After(pollInterval): + case event, ok := <-watcher.Events: + if !ok { + if file != nil { + file.Close() + } + return + } + if event.Name != absPath { + continue + } + if event.Op&fsnotify.Write == fsnotify.Write { + readNewLines() + } + if event.Op&fsnotify.Create == fsnotify.Create { + openFile(false) + readNewLines() + } + if event.Op&(fsnotify.Rename|fsnotify.Remove) != 0 { + if file != nil { + file.Close() + file = nil + } + } + case _, ok := <-watcher.Errors: + if !ok { + if file != nil { + file.Close() + } + return + } } } }() From d55af3c756e061542b4a99366285a3c18c94a2d4 Mon Sep 17 00:00:00 2001 From: Richard Tweed Date: Thu, 5 Feb 2026 21:59:57 +0000 Subject: [PATCH 6/7] fix(k8saudit): restore truncation handling and relocate watch tests Move tests from orphaned filewatch/ package into k8saudit alongside the code they test, and restore truncation detection that was lost in the fsnotify refactor. - Detect file truncation via size check before seeking (copytruncate) - Move filewatch/filewatch_test.go to watch_test.go in k8saudit package - Restore TestOpenFileWatch_HandlesTruncation test - Remove empty filewatch/ directory - Mark fsnotify as direct dependency in go.mod Co-Authored-By: Claude Opus 4.6 Signed-off-by: Richard Tweed --- plugins/k8saudit/go.mod | 2 +- plugins/k8saudit/pkg/k8saudit/source.go | 4 +++ .../filewatch_test.go => watch_test.go} | 32 ++++++++++++++++--- 3 files changed, 33 insertions(+), 5 deletions(-) rename plugins/k8saudit/pkg/k8saudit/{filewatch/filewatch_test.go => watch_test.go} (79%) diff --git a/plugins/k8saudit/go.mod b/plugins/k8saudit/go.mod index a83fde7a9..dd501fbf5 100644 --- a/plugins/k8saudit/go.mod +++ b/plugins/k8saudit/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b github.com/falcosecurity/plugin-sdk-go v0.8.3 - github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/fsnotify/fsnotify v1.9.0 github.com/iancoleman/orderedmap v0.3.0 // indirect github.com/valyala/fastjson v1.6.4 ) diff --git a/plugins/k8saudit/pkg/k8saudit/source.go b/plugins/k8saudit/pkg/k8saudit/source.go index 90ee45118..c9ad27d3d 100644 --- a/plugins/k8saudit/pkg/k8saudit/source.go +++ b/plugins/k8saudit/pkg/k8saudit/source.go @@ -177,6 +177,10 @@ func (k *Plugin) OpenFileWatch(path string) (source.Instance, error) { if file == nil { return } + // Detect file truncation (e.g. logrotate copytruncate) + if info, err := file.Stat(); err == nil && info.Size() < offset { + offset = 0 + } file.Seek(offset, io.SeekStart) scanner := bufio.NewScanner(file) for scanner.Scan() { diff --git a/plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go b/plugins/k8saudit/pkg/k8saudit/watch_test.go similarity index 79% rename from plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go rename to plugins/k8saudit/pkg/k8saudit/watch_test.go index ef0a1f520..a507871d7 100644 --- a/plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go +++ b/plugins/k8saudit/pkg/k8saudit/watch_test.go @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package filewatch_test +package k8saudit import ( "os" @@ -24,13 +24,12 @@ import ( "time" "github.com/falcosecurity/plugin-sdk-go/pkg/sdk" - "github.com/falcosecurity/plugins/plugins/k8saudit/pkg/k8saudit" ) const testAuditEvent = `{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":"Metadata","auditID":"test","stage":"ResponseComplete","requestURI":"/api","verb":"get","user":{"username":"test"},"sourceIPs":["127.0.0.1"],"objectRef":{"resource":"pods","namespace":"default","name":"test"},"responseStatus":{"metadata":{},"code":200},"requestReceivedTimestamp":"2023-01-01T00:00:00.000000Z","stageTimestamp":"2023-01-01T00:00:01.000000Z"}` -func newTestPlugin() *k8saudit.Plugin { - p := &k8saudit.Plugin{} +func newTestPlugin() *Plugin { + p := &Plugin{} p.Config.Reset() return p } @@ -86,6 +85,31 @@ func TestOpenFileWatch_HandlesRotation(t *testing.T) { time.Sleep(200 * time.Millisecond) } +func TestOpenFileWatch_HandlesTruncation(t *testing.T) { + tmpDir := t.TempDir() + filePath := filepath.Join(tmpDir, "audit.log") + + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"+testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + p := newTestPlugin() + inst, err := p.OpenFileWatch(filePath) + if err != nil { + t.Fatal(err) + } + defer inst.(sdk.Closer).Close() + + time.Sleep(100 * time.Millisecond) + + // Truncate and rewrite with less data (simulates logrotate copytruncate) + if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + t.Fatal(err) + } + + time.Sleep(200 * time.Millisecond) +} + func TestOpen_FileScheme(t *testing.T) { tmpDir := t.TempDir() filePath := filepath.Join(tmpDir, "audit.log") From 16ded85295827763a0da74e4e7c179c66aa72262 Mon Sep 17 00:00:00 2001 From: Richard Tweed Date: Thu, 5 Feb 2026 22:31:07 +0000 Subject: [PATCH 7/7] fix(k8saudit): improve error handling and test assertions for file watcher - Align scanner buffer with MaxEventSize to avoid silently dropping large events - Log scanner errors instead of swallowing them - Log fsnotify watcher errors instead of discarding them - Add real assertions to watch tests using sentinel-based content verification Co-Authored-By: Claude Opus 4.6 Signed-off-by: Richard Tweed --- plugins/k8saudit/pkg/k8saudit/source.go | 7 ++- plugins/k8saudit/pkg/k8saudit/watch_test.go | 60 +++++++++++++++++---- 2 files changed, 56 insertions(+), 11 deletions(-) diff --git a/plugins/k8saudit/pkg/k8saudit/source.go b/plugins/k8saudit/pkg/k8saudit/source.go index c9ad27d3d..c2ef30fc5 100644 --- a/plugins/k8saudit/pkg/k8saudit/source.go +++ b/plugins/k8saudit/pkg/k8saudit/source.go @@ -183,6 +183,7 @@ func (k *Plugin) OpenFileWatch(path string) (source.Instance, error) { } file.Seek(offset, io.SeekStart) scanner := bufio.NewScanner(file) + scanner.Buffer(nil, int(k.Config.MaxEventSize)) for scanner.Scan() { line := scanner.Text() offset += int64(len(line)) + 1 @@ -190,6 +191,9 @@ func (k *Plugin) OpenFileWatch(path string) (source.Instance, error) { k.parseAuditEventsAndPush(&parser, []byte(line), evtC) } } + if err := scanner.Err(); err != nil { + k.logger.Println(err.Error()) + } } openFile(true) @@ -232,13 +236,14 @@ func (k *Plugin) OpenFileWatch(path string) (source.Instance, error) { file = nil } } - case _, ok := <-watcher.Errors: + case err, ok := <-watcher.Errors: if !ok { if file != nil { file.Close() } return } + k.logger.Println("file watcher error:", err) } } }() diff --git a/plugins/k8saudit/pkg/k8saudit/watch_test.go b/plugins/k8saudit/pkg/k8saudit/watch_test.go index a507871d7..4de99e763 100644 --- a/plugins/k8saudit/pkg/k8saudit/watch_test.go +++ b/plugins/k8saudit/pkg/k8saudit/watch_test.go @@ -18,8 +18,10 @@ limitations under the License. package k8saudit import ( + "log" "os" "path/filepath" + "sync" "testing" "time" @@ -28,10 +30,31 @@ import ( const testAuditEvent = `{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":"Metadata","auditID":"test","stage":"ResponseComplete","requestURI":"/api","verb":"get","user":{"username":"test"},"sourceIPs":["127.0.0.1"],"objectRef":{"resource":"pods","namespace":"default","name":"test"},"responseStatus":{"metadata":{},"code":200},"requestReceivedTimestamp":"2023-01-01T00:00:00.000000Z","stageTimestamp":"2023-01-01T00:00:01.000000Z"}` -func newTestPlugin() *Plugin { +// safeBuffer is a thread-safe buffer for capturing log output in tests. +type safeBuffer struct { + mu sync.Mutex + buf []byte +} + +func (b *safeBuffer) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + b.buf = append(b.buf, p...) + return len(p), nil +} + +func (b *safeBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return string(b.buf) +} + +func newTestPlugin() (*Plugin, *safeBuffer) { p := &Plugin{} p.Config.Reset() - return p + buf := &safeBuffer{} + p.logger = log.New(buf, "", 0) + return p, buf } func TestOpenFileWatch_DetectsNewContent(t *testing.T) { @@ -42,7 +65,7 @@ func TestOpenFileWatch_DetectsNewContent(t *testing.T) { t.Fatal(err) } - p := newTestPlugin() + p, logBuf := newTestPlugin() inst, err := p.OpenFileWatch(filePath) if err != nil { t.Fatal(err) @@ -54,10 +77,16 @@ func TestOpenFileWatch_DetectsNewContent(t *testing.T) { if err != nil { t.Fatal(err) } - f.WriteString(testAuditEvent + "\n") + // Write an invalid line; if the watcher reads the appended content, + // parseAuditEventsAndPush logs a parse error proving the content was read. + f.WriteString("SENTINEL\n") f.Close() time.Sleep(200 * time.Millisecond) + + if logBuf.String() == "" { + t.Error("watcher did not process appended file content") + } } func TestOpenFileWatch_HandlesRotation(t *testing.T) { @@ -68,7 +97,7 @@ func TestOpenFileWatch_HandlesRotation(t *testing.T) { t.Fatal(err) } - p := newTestPlugin() + p, logBuf := newTestPlugin() inst, err := p.OpenFileWatch(filePath) if err != nil { t.Fatal(err) @@ -77,23 +106,29 @@ func TestOpenFileWatch_HandlesRotation(t *testing.T) { time.Sleep(100 * time.Millisecond) + // Simulate rotation: remove and recreate os.Remove(filePath) - if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + if err := os.WriteFile(filePath, []byte("SENTINEL\n"), 0644); err != nil { t.Fatal(err) } time.Sleep(200 * time.Millisecond) + + if logBuf.String() == "" { + t.Error("watcher did not process rotated file content") + } } func TestOpenFileWatch_HandlesTruncation(t *testing.T) { tmpDir := t.TempDir() filePath := filepath.Join(tmpDir, "audit.log") + // Write two events so the initial offset is large if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"+testAuditEvent+"\n"), 0644); err != nil { t.Fatal(err) } - p := newTestPlugin() + p, logBuf := newTestPlugin() inst, err := p.OpenFileWatch(filePath) if err != nil { t.Fatal(err) @@ -102,12 +137,17 @@ func TestOpenFileWatch_HandlesTruncation(t *testing.T) { time.Sleep(100 * time.Millisecond) - // Truncate and rewrite with less data (simulates logrotate copytruncate) - if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil { + // Truncate and rewrite with less data (simulates logrotate copytruncate). + // Without truncation detection the watcher would seek past EOF and miss this. + if err := os.WriteFile(filePath, []byte("SENTINEL\n"), 0644); err != nil { t.Fatal(err) } time.Sleep(200 * time.Millisecond) + + if logBuf.String() == "" { + t.Error("watcher did not detect file truncation") + } } func TestOpen_FileScheme(t *testing.T) { @@ -118,7 +158,7 @@ func TestOpen_FileScheme(t *testing.T) { t.Fatal(err) } - p := newTestPlugin() + p, _ := newTestPlugin() inst, err := p.Open("file://" + filePath) if err != nil { t.Fatal(err)