Skip to content

Split BeatV2Manager Start into two methods, so Beats can reply to check-in in parallel to its initialisation#49796

Open
belimawr wants to merge 17 commits intoelastic:mainfrom
belimawr:move-manager-start
Open

Split BeatV2Manager Start into two methods, so Beats can reply to check-in in parallel to its initialisation#49796
belimawr wants to merge 17 commits intoelastic:mainfrom
belimawr:move-manager-start

Conversation

@belimawr
Copy link
Copy Markdown
Contributor

@belimawr belimawr commented Mar 31, 2026

Proposed commit message

The Start method from BeatV2Manager is split into two methods:
 - PreInit: responsible for starting the Elastic Agent client and
   start replying to check-ins.
 - PostInit: responsible for setting the Beats status to 'Running' and
   start executing Unit changes.

A new method, WaitForStop is also added. It stops the BeatV2Manager
and waits until all goroutines have returned. Currently it is only
used in tests that use `testing.T` as the logger output to ensure no
panics happen because the logger was used after the test ended.

Multiple lint warnings are fixed

GenAI-Assisted: Yes
Human-Reviewed: Yes
Tool: Cursor-CLI, Model: GPT-5.3 Codex Extra High Fast

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

## Disruptive User Impact
## Author's Checklist

How to test this PR locally

Because this PR is a implementation detail change, there is no directly observable behaviour change. The best way to test is to run the new and existing tests.

Run the new test

cd x-pack/libbeat/management/
go test -count=1 -v . -run=TestManagerV2_PreInitAppliesBufferedUnitsAfterPostInit

Run the tests from the modified packages

# Run all management unit tests
go test -count=1 ./x-pack/libbeat/management/...

cd x-pack/filebeat
mage BuildSystemTestbinary
mage -v docker:composeUP

# Run all integration tests from the ManagerV2
go test -count=1 -tags=integration ./tests/integration -run="TestInputReloadUnderElasticAgent|TestFailedOutputReportsUnhealthy|TestRecoverFromInvalidOutputConfiguration|TestAgentPackageVersionOnStartUpInfo|TestHTTPJSONInputReloadUnderElasticAgentWithElasticStateStore|TestReloadErrorHandling|TestPipelineConnectionErrorFailsInput"

# Run all integration tests
go test -count=1 -tags=integration ./tests/integration

Related issues

## Use cases
## Screenshots
## Logs

Split Start into PreStart and PostStart so the manager can start
replying to check-in before the Beat is ready to start/stop
inputs/output.
Ensure the timer in unitListen always fires until the Beat is ready
and the first config is applied.
@belimawr belimawr self-assigned this Mar 31, 2026
@belimawr belimawr added Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team backport-active-all Automated backport with mergify to all the active branches labels Mar 31, 2026
@botelastic botelastic bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Mar 31, 2026
@github-actions
Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

GenAI-Assisted: Yes
Human-Reviewed: Yes
Tool: Cursor-CLI, Model: GPT-5.3 Codex Extra High Fast
Add the 'WaitForStop' method in the V2 manager to wait all goroutines
to exit. This prevents panics in test when the `testing.T` is used as
the manager's logger's output.
Keep the irregular input active until the test observes DEGRADED and shorten the per-state timeout to avoid long hangs when transitions are missed.

Made-with: Cursor
@github-actions
Copy link
Copy Markdown
Contributor

TL;DR

Libbeat: Run check/update is failing because make check-no-changes detects generated drift in libbeat/beat/beat_test.go. Commit the formatter-generated update for that file (whitespace alignment in WaitForStop) and rerun CI.

Remediation

  • Run make -C libbeat update (or at minimum goimports/gofmt on libbeat/beat/beat_test.go) and commit the resulting change.
  • Re-run make -C libbeat check update locally (or rerun Buildkite) to confirm check-no-changes is clean.
Investigation details

Root Cause

Buildkite fails in make check-no-changes after update generation because libbeat/beat/beat_test.go is not in its generated/expected formatted state.

In the PR branch (af5e548c1f3db09b512908054a6e68a218789486), the file includes new testManager methods, and formatting adjusts spacing at:

  • libbeat/beat/beat_test.go:45 (WaitForStop line alignment)

Evidence

  • Build: https://buildkite.com/elastic/beats/builds/43302
  • Job/step: Libbeat: Run check/update (beats-libbeat build 27636)
  • Key log excerpt:
    • Error: some files are not up-to-date. Run 'make update' then review and commit the changes. Modified: [libbeat/beat/beat_test.go]

Local reproduction on pr-49796 produced exactly:

diff --git a/libbeat/beat/beat_test.go b/libbeat/beat/beat_test.go
@@ -42,7 +42,7 @@
-func (tm testManager) WaitForStop(_ time.Duration) bool      { return true }
+func (tm testManager) WaitForStop(_ time.Duration) bool       { return true }

Verification

  • Ran: make -C libbeat check update on PR branch.
  • Result: It reformatted libbeat/beat/beat_test.go as above before later aborting in this environment due local Python autopep8/lib2to3 tooling mismatch; the relevant check-no-changes drift was still reproduced and identified.

Follow-up

After committing the regenerated libbeat/beat/beat_test.go, rerun the failing Buildkite pipeline; no code-logic fix appears necessary.

Note

🔒 Integrity filtering filtered 1 item

Integrity filtering activated and filtered the following item during workflow execution.
This happens when a tool call accesses a resource that does not meet the required integrity or secrecy level of the workflow.


What is this? | From workflow: PR Buildkite Detective

Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.

@belimawr belimawr changed the title Move manager start Split BeatV2Manager into two methods, so Beats can reply to check-in in parallel to its initialisation Mar 31, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR restructures the V2 management manager startup so Beats can begin responding to Elastic Agent check-ins earlier (before full Beat initialization), avoiding check-in timeouts and resulting restart/crash loops.

Changes:

  • Split BeatV2Manager.Start() into PreInit() (start client + check-ins) and PostInit() (mark running + enable applying unit changes), keeping Start() as a deprecated compatibility wrapper.
  • Start Filebeat’s manager earlier in Filebeat.Run() and add shutdown waiting support via WaitForStop() (primarily for tests).
  • Update integration/unit tests and mock managers to align with the new lifecycle.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
x-pack/libbeat/management/managerV2.go Implements PreInit/PostInit, adds goroutine waiting via WaitForStop, adjusts unit application timing.
libbeat/management/management.go Extends Manager interface with PreInit, PostInit, and WaitForStop; deprecates Start behaviorally.
filebeat/beater/filebeat.go Moves manager startup earlier (pre state-store load) and calls PostInit after initialization completes.
x-pack/libbeat/management/managerV2_test.go Adds tests for buffered units behavior and uses WaitForStop to ensure clean shutdown.
x-pack/filebeat/tests/integration/status_reporter_test.go Tweaks integration test scenario expectations/inputs and lint annotations.
x-pack/filebeat/tests/integration/managerV2_test.go Updates integration expectations around unit states/messages.
x-pack/otel/otelmanager/manager.go Updates stub manager to satisfy the expanded Manager interface.
x-pack/osquerybeat/beater/osquerybeat_status_test.go Updates test manager stub + minor assertion/lint improvements.
libbeat/cmd/instance/beat_test.go Updates test mock manager to satisfy new interface.
libbeat/beat/beat_test.go Updates test manager to satisfy new interface.
changelog/fragments/1774984035-move-manager-start.yaml Adds a changelog fragment describing the Filebeat crash-loop fix.
Comments suppressed due to low confidence (1)

x-pack/libbeat/management/managerV2.go:304

  • PreInit drops the underlying error from cm.client.Start(ctx), returning a generic message only. This makes failures hard to diagnose; please wrap/return the original error (e.g., with %w) and include context in the message.
	ctx := context.Background()
	err := cm.client.Start(ctx)
	if err != nil {
		return fmt.Errorf("error starting connection to client")
	}

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@belimawr belimawr marked this pull request as ready for review March 31, 2026 19:59
@elasticmachine
Copy link
Copy Markdown
Contributor

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 31, 2026

📝 Walkthrough

Walkthrough

This pull request refactors the Manager lifecycle into two phases: PreInit() and PostInit(), replacing the previous single Start() call. PreInit() initializes the gRPC connection and check-in loop without starting input or output processing, while PostInit() marks the beat as ready after initialization completes. Additionally, WaitForStop() is introduced to block until full shutdown with optional timeout support. In Filebeat, PreInit() is called early—before state store loading—allowing check-ins to proceed while initialization continues, preventing agent timeout and crash loops. The interface is extended across multiple manager implementations, and tests are updated accordingly.

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed PR successfully implements the core objective: Manager.Start() split into PreInit() (early check-in) and PostInit() (status change), moving PreInit before state store loading to prevent check-in timeout livelock.
Out of Scope Changes check ✅ Passed All changes directly support the linked issue objective: core manager lifecycle refactoring, test mock/helper updates, integration test adjustments, and changelog addition. No unrelated scope creep detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation: Commit on current branch
  • 🛠️ Update Documentation: Create PR

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@x-pack/filebeat/tests/integration/managerV2_test.go`:
- Line 286: The test incorrectly expects proto.State_HEALTHY for an input that
the rest of the test and the counterpart managerV2_test.go treat as remaining in
STARTING after a failed output reload; update the expectation(s) to match the
intended failed-output path by replacing proto.State_HEALTHY with
proto.State_STARTING (also fix the duplicate occurrences noted around the same
test at the other location referenced, lines 332-334) or alternatively adjust
the surrounding test setup and assertions so the input is explicitly
transitioned to HEALTHY everywhere (ensure consistency between the State field
in the failing-output scenario and the assertions in managerV2_test.go).

In `@x-pack/libbeat/management/managerV2_test.go`:
- Around line 455-472: The test flips beatReady too late, causing CheckinV2Impl
to assert units are STARTING while m.PostInit() may already trigger check-ins;
fix by setting beatReady to true before calling m.PostInit (or alternatively
relax the CheckinV2Impl assertion to only enforce STARTING when beatReady is
false and not before m.PostInit runs). Update the test to call
beatReady.Store(true) (or adjust the CheckinV2Impl logic around beatReady) so
there is no timing window between m.PostInit() and the assertion in
CheckinV2Impl.

In `@x-pack/libbeat/management/managerV2.go`:
- Around line 373-380: The timer created by time.NewTimer(timeout) (variable t)
is not stopped when the done channel fires; modify the select branch that
handles <-done to call t.Stop() and, to be safe, drain the timer if Stop returns
false (i.e., the timer already fired) before returning true. Update the select
so the case <-done path invokes t.Stop() (and drains t.C when needed) to release
timer resources and then returns true.
- Around line 94-95: stopWait is declared as sync.WaitGroup but the code calls
stopWait.Go(), which doesn't exist; change stopWait to an errgroup.Group and
update its usage: replace the declaration "stopWait sync.WaitGroup" with
"stopWait errgroup.Group" (import "golang.org/x/sync/errgroup"), change the
stopWait.Go(...) calls in PreInit() to stopWait.Go(...) using the errgroup
signature, and replace any stopWait.Wait() calls with stopWait.Wait() from
errgroup (handling returned error) or, alternatively, if you prefer
sync.WaitGroup keep the current calls but replace stopWait.Go(...) with
stopWait.Add(1) followed by launching a goroutine that defers stopWait.Done()
inside PreInit().
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 55e41946-b5dc-4634-bfc5-1b6debdc5722

📥 Commits

Reviewing files that changed from the base of the PR and between 18aea77 and 42aaaed.

📒 Files selected for processing (11)
  • changelog/fragments/1774984035-move-manager-start.yaml
  • filebeat/beater/filebeat.go
  • libbeat/beat/beat_test.go
  • libbeat/cmd/instance/beat_test.go
  • libbeat/management/management.go
  • x-pack/filebeat/tests/integration/managerV2_test.go
  • x-pack/filebeat/tests/integration/status_reporter_test.go
  • x-pack/libbeat/management/managerV2.go
  • x-pack/libbeat/management/managerV2_test.go
  • x-pack/osquerybeat/beater/osquerybeat_status_test.go
  • x-pack/otel/otelmanager/manager.go

Comment on lines +373 to +380
t := time.NewTimer(timeout)

select {
case <-done:
return true
case <-t.C:
return false
}
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Timer not stopped on early return.

When done fires before the timeout, t.Stop() should be called to release timer resources.

Proposed fix
 select {
 case <-done:
+	t.Stop()
 	return true
 case <-t.C:
 	return false
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
t := time.NewTimer(timeout)
select {
case <-done:
return true
case <-t.C:
return false
}
t := time.NewTimer(timeout)
select {
case <-done:
t.Stop()
return true
case <-t.C:
return false
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@x-pack/libbeat/management/managerV2.go` around lines 373 - 380, The timer
created by time.NewTimer(timeout) (variable t) is not stopped when the done
channel fires; modify the select branch that handles <-done to call t.Stop()
and, to be safe, drain the timer if Stop returns false (i.e., the timer already
fired) before returning true. Update the select so the case <-done path invokes
t.Stop() (and drains t.C when needed) to release timer resources and then
returns true.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure? Check the documentation for the Go version used by Beats.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the Go version used by Beats
cat .go-version 2>/dev/null || true
grep -r '^go ' go.mod | head -5

Length of output: 16


@belimawr, you're right — my suggestion was incorrect. Beats uses Go 1.25.8, and since Go 1.23 the behaviour changed significantly:

  • As of Go 1.23, the garbage collector can recover unreferenced timers, even if they haven't expired or been stopped. The Stop method is no longer necessary to help the garbage collector.
  • Timer channels are now synchronous (unbuffered), giving t.Reset and t.Stop a stronger guarantee. Before Go 1.23, it was impossible to avoid stale values with t.Reset, and avoiding stale values with t.Stop required careful use of its return value. The Go 1.23 implementation removes this concern entirely.

The code in WaitForStop is correct as-is — no t.Stop() call is needed. I apologize for the noise.


✏️ Learnings added
Learnt from: belimawr
Repo: elastic/beats PR: 49796
File: x-pack/libbeat/management/managerV2.go:373-380
Timestamp: 2026-03-31T20:42:16.412Z
Learning: In Go 1.23+, time.Timer instances that are no longer referenced are garbage collected automatically without needing t.Stop(). Timer channels are also now synchronous (unbuffered), eliminating stale value concerns after t.Reset or t.Stop. Therefore, do not flag missing t.Stop() calls on time.NewTimer as resource leaks in Go 1.23+ codebases. Beats (elastic/beats) uses Go 1.25.8.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: ShourieG
Repo: elastic/beats PR: 48846
File: x-pack/filebeat/input/akamai/config.go:231-297
Timestamp: 2026-03-16T11:05:51.163Z
Learning: When reviewing Go code that uses cfg.Unpack(&dst) from the elastic-agent-libs config wrapper, understand that Unpack will call Validate() on dst if it implements the Validator interface and will enforce validate struct tags (e.g., validate:"required"). Do not require or flag an explicit dst.Validate() call after Unpack in factory methods like InputManager.Create(); rely on the upstream validation behavior. If a destination type should be validated, ensure it implements Validator or uses appropriate validate tags.

Learnt from: cmacknz
Repo: elastic/beats PR: 49796
File: x-pack/libbeat/management/managerV2.go:94-95
Timestamp: 2026-03-31T20:23:25.614Z
Learning: In Go 1.25+, `sync.WaitGroup` includes a `Go(f func())` method that runs `f` in a new goroutine and automatically performs the corresponding `Add(1)`/`Done()` bookkeeping. Therefore, when reviewing Go code, do not flag calls of the form `wg.Go(...)` as incorrect WaitGroup usage; they are valid and should be treated as correct Go 1.25+ code. (See `sync.WaitGroup.Go` in the Go standard library.)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This thing was clearly trained on an older version of Go than we use (1.23 apparently).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

@belimawr belimawr changed the title Split BeatV2Manager into two methods, so Beats can reply to check-in in parallel to its initialisation Split BeatV2Manager Start into two methods, so Beats can reply to check-in in parallel to its initialisation Mar 31, 2026

// Once ready, previously buffered unit state should eventually be applied.
m.PostInit()
beatReady.Store(true)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I following this right that beatReady is what is gating the unit config even being sent to the manager, and not the Pre or Post init?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this boolean necessary, don't you want checkins to happen continuously regardless of if the PostInit has been called?

Copy link
Copy Markdown
Contributor Author

@belimawr belimawr Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nop, beatReady only ensures we assert that all units are in STARTING until PostInit is called.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK yes it's on the observed not the expected.

Should beatReady.Store(true) get changed before m.PostInit() changes, so that a checking doens't happen between the two lines before the condition gets changed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beatReady exists so the test can ensure the Beat (well, the manager) and all units are in STARTING state util PostInit is called.

Even if the Beat is not applying the Units, it needs to reply the checkin, so the test ensures all units in the checkin response are in the STARTING state.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that I see, my last point is that once you call m.PostInit() the status can flip to something other than starting before you can call beatReady.Store(true). That is a very slim possibility.

Copy link
Copy Markdown
Member

@cmacknz cmacknz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outstanding comments are basically nits, thanks

@belimawr
Copy link
Copy Markdown
Contributor Author

Outstanding comments are basically nits, thanks

Thanks for the quick review Craig! I believe I've replied all of your comments.

@pierrehilbert pierrehilbert added the Team:Security-Linux Platform Linux Platform Team in Security Solution label Apr 1, 2026
@elasticmachine
Copy link
Copy Markdown
Contributor

Pinging @elastic/sec-linux-platform (Team:Security-Linux Platform)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-active-all Automated backport with mergify to all the active branches Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team Team:Security-Linux Platform Linux Platform Team in Security Solution

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Filebeat: Move Manager.Start() before state store loading to prevent check-in timeout livelock

5 participants