Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/k8saudit/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Audit events are logged by the API server when almost every cluster management t

This plugin supports consuming Kubernetes Audit Events coming from the [Webhook backend](https://kubernetes.io/docs/tasks/debug/debug-cluster/audit/#webhook-backend) or from a file. For webhooks, the plugin embeds a web server that listens on a configurable port and accepts POST requests. The posted JSON object comprises one or more events. The web server of the plugin can be configured as part of the plugin's init configuration and open parameters. For files, the plugin expects content to be in [JSONL format](https://jsonlines.org/), where each line represents a JSON object, containing one or more audit events.

The expected way of using the plugin with Falco is through a Webhook. File reading support can be used with Stratoshark or testing and development.
The expected way of using the plugin with Falco is through a Webhook. File reading support can be used with Stratoshark or testing and development. The `file://` scheme enables continuous file watching with log rotation support, useful for reading audit logs written to disk by the API server.

## Capabilities

Expand Down Expand Up @@ -135,6 +135,7 @@ load_plugins: [k8saudit, json]
**Open Parameters**:
- `http://<host>:<port>/<endpoint>`: Opens an event stream by listening on an HTTP web server
- `https://<host>:<port>/<endpoint>`: Opens an event stream by listening on an HTTPS web server
- `file://<filepath>`: Opens an event stream by continuously watching a file for new audit events. Handles log rotation automatically. Example: `file:///var/log/kube-apiserver/audit.log`
- `no scheme`: Opens an event stream by reading the events from a file on the local filesystem. The params string is interpreted as a filepath


Expand Down
1 change: 1 addition & 0 deletions plugins/k8saudit/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.15
require (
github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b
github.com/falcosecurity/plugin-sdk-go v0.8.3
github.com/fsnotify/fsnotify v1.9.0 // indirect
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why // indirect? 🤔

This should be a direct dependency since it's explicitly imported in source.go

github.com/iancoleman/orderedmap v0.3.0 // indirect
github.com/valyala/fastjson v1.6.4
)
Expand Down
4 changes: 4 additions & 0 deletions plugins/k8saudit/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/falcosecurity/plugin-sdk-go v0.8.3 h1:KsX7qt83dzC57qcNpZKaBrCjTXqpXgvxDcEXs6Z5sHI=
github.com/falcosecurity/plugin-sdk-go v0.8.3/go.mod h1:gEgxjvuopv5VF4wc8s0EHnmT9qrIKBtcJVBnRlEPU1A=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/geraldcombs/fastjson v0.0.0-20250801170450-bf39244e60b8 h1:S2FAMWjJKPRR9fvtgYVWQ5joNsl0qQoRxmxYHKDDtx4=
github.com/geraldcombs/fastjson v0.0.0-20250801170450-bf39244e60b8/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0Wam8K1arqPXNWjMo21EXnBPOPp36vB07FNRdD2geA=
Expand All @@ -25,6 +27,8 @@ github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
103 changes: 103 additions & 0 deletions plugins/k8saudit/pkg/k8saudit/filewatch/filewatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// SPDX-License-Identifier: Apache-2.0
/*
Copyright (C) 2026 The Falco Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filewatch_test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's no filewatch package, why is this file here? 🤔


import (
"os"
"path/filepath"
"testing"
"time"

"github.com/falcosecurity/plugin-sdk-go/pkg/sdk"
"github.com/falcosecurity/plugins/plugins/k8saudit/pkg/k8saudit"
)

const testAuditEvent = `{"kind":"Event","apiVersion":"audit.k8s.io/v1","level":"Metadata","auditID":"test","stage":"ResponseComplete","requestURI":"/api","verb":"get","user":{"username":"test"},"sourceIPs":["127.0.0.1"],"objectRef":{"resource":"pods","namespace":"default","name":"test"},"responseStatus":{"metadata":{},"code":200},"requestReceivedTimestamp":"2023-01-01T00:00:00.000000Z","stageTimestamp":"2023-01-01T00:00:01.000000Z"}`

func newTestPlugin() *k8saudit.Plugin {
p := &k8saudit.Plugin{}
p.Config.Reset()
return p
}

func TestOpenFileWatch_DetectsNewContent(t *testing.T) {
tmpDir := t.TempDir()
filePath := filepath.Join(tmpDir, "audit.log")

if err := os.WriteFile(filePath, []byte{}, 0644); err != nil {
t.Fatal(err)
}

p := newTestPlugin()
inst, err := p.OpenFileWatch(filePath)
if err != nil {
t.Fatal(err)
}
defer inst.(sdk.Closer).Close()

time.Sleep(100 * time.Millisecond)
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
t.Fatal(err)
}
f.WriteString(testAuditEvent + "\n")
f.Close()

time.Sleep(200 * time.Millisecond)
}

func TestOpenFileWatch_HandlesRotation(t *testing.T) {
tmpDir := t.TempDir()
filePath := filepath.Join(tmpDir, "audit.log")

if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil {
t.Fatal(err)
}

p := newTestPlugin()
inst, err := p.OpenFileWatch(filePath)
if err != nil {
t.Fatal(err)
}
defer inst.(sdk.Closer).Close()

time.Sleep(100 * time.Millisecond)

os.Remove(filePath)
if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil {
t.Fatal(err)
}

time.Sleep(200 * time.Millisecond)
}

func TestOpen_FileScheme(t *testing.T) {
tmpDir := t.TempDir()
filePath := filepath.Join(tmpDir, "audit.log")

if err := os.WriteFile(filePath, []byte(testAuditEvent+"\n"), 0644); err != nil {
t.Fatal(err)
}

p := newTestPlugin()
inst, err := p.Open("file://" + filePath)
if err != nil {
t.Fatal(err)
}
defer inst.(sdk.Closer).Close()
}
121 changes: 121 additions & 0 deletions plugins/k8saudit/pkg/k8saudit/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/falcosecurity/plugin-sdk-go/pkg/sdk"
"github.com/falcosecurity/plugin-sdk-go/pkg/sdk/plugins/source"
"github.com/fsnotify/fsnotify"
"github.com/valyala/fastjson"
)

Expand All @@ -51,6 +53,8 @@ func (k *Plugin) Open(params string) (source.Instance, error) {
return k.OpenWebServer(u.Host, u.Path, false)
case "https":
return k.OpenWebServer(u.Host, u.Path, true)
case "file":
return k.OpenFileWatch(u.Path)
case "": // by default, fallback to opening a filepath
trimmed := strings.TrimSpace(params)

Expand Down Expand Up @@ -125,6 +129,123 @@ func (k *Plugin) OpenReader(r io.ReadCloser) (source.Instance, error) {
source.WithInstanceEventSize(uint32(k.Config.MaxEventSize)))
}

// OpenFileWatch opens a source.Instance that continuously watches a file for
// new K8S Audit Events using fsnotify. It watches the parent directory (as
// recommended by fsnotify) to handle atomic file replacements and log rotation.
func (k *Plugin) OpenFileWatch(path string) (source.Instance, error) {
absPath, err := filepath.Abs(path)
if err != nil {
return nil, fmt.Errorf("failed to get absolute path: %w", err)
}
dir := filepath.Dir(absPath)

watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, fmt.Errorf("failed to create file watcher: %w", err)
}

ctx, cancelCtx := context.WithCancel(context.Background())
evtC := make(chan source.PushEvent)

go func() {
defer close(evtC)
defer watcher.Close()

var parser fastjson.Parser
var file *os.File
var offset int64

openFile := func(seekEnd bool) bool {
if file != nil {
file.Close()
file = nil
}
f, err := os.Open(absPath)
if err != nil {
return false
}
file = f
if seekEnd {
offset, _ = file.Seek(0, io.SeekEnd)
} else {
offset = 0
}
return true
}

readNewLines := func() {
if file == nil {
return
}
file.Seek(offset, io.SeekStart)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
offset += int64(len(line)) + 1
if len(line) > 0 {
k.parseAuditEventsAndPush(&parser, []byte(line), evtC)
}
}
}

openFile(true)

if err := watcher.Add(dir); err != nil {
if file != nil {
file.Close()
}
evtC <- source.PushEvent{Err: err}
return
}

for {
select {
case <-ctx.Done():
if file != nil {
file.Close()
}
return
case event, ok := <-watcher.Events:
if !ok {
if file != nil {
file.Close()
}
return
}
if event.Name != absPath {
continue
}
if event.Op&fsnotify.Write == fsnotify.Write {
readNewLines()
}
if event.Op&fsnotify.Create == fsnotify.Create {
openFile(false)
readNewLines()
}
if event.Op&(fsnotify.Rename|fsnotify.Remove) != 0 {
if file != nil {
file.Close()
file = nil
}
}
case _, ok := <-watcher.Errors:
if !ok {
if file != nil {
file.Close()
}
return
}
}
}
}()

return source.NewPushInstance(
evtC,
source.WithInstanceContext(ctx),
source.WithInstanceClose(cancelCtx),
source.WithInstanceEventSize(uint32(k.Config.MaxEventSize)))
}

// OpenWebServer opens a source.Instance event stream that receives K8S Audit
// Events by starting a server and listening for JSON webhooks. The expected
// JSON format is the one of K8S API Server webhook backend
Expand Down
Loading