Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions go/agent/agent_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"io"
"net"
"net/http"
"strings"
Expand Down Expand Up @@ -87,9 +87,9 @@ func readResponse(res *http.Response, err error) ([]byte, error) {
if err != nil {
return nil, err
}
defer res.Body.Close()
defer func() { _ = res.Body.Close() }()

body, err := ioutil.ReadAll(res.Body)
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func SubmitAgent(hostname string, port int, token string) (string, error) {
}

// Try to discover topology instances when an agent submits
go DiscoverAgentInstance(hostname, port)
go func() { _ = DiscoverAgentInstance(hostname, port) }()

return hostname, err
}
Expand All @@ -129,18 +129,18 @@ func SubmitAgent(hostname string, port int, token string) (string, error) {
func DiscoverAgentInstance(hostname string, port int) error {
agent, err := GetAgent(hostname)
if err != nil {
log.Errorf("Couldn't get agent for %s: %v", hostname, err)
_ = log.Errorf("Couldn't get agent for %s: %v", hostname, err)
return err
}

instanceKey := agent.GetInstance()
instance, err := inst.ReadTopologyInstance(instanceKey)
if err != nil {
log.Errorf("Failed to read topology for %v. err=%+v", instanceKey, err)
_ = log.Errorf("Failed to read topology for %v. err=%+v", instanceKey, err)
return err
}
if instance == nil {
log.Errorf("Failed to read topology for %v", instanceKey)
_ = log.Errorf("Failed to read topology for %v", instanceKey)
return err
}
log.Infof("Discovered Agent Instance: %v", instance.Key)
Expand Down Expand Up @@ -177,7 +177,7 @@ func ReadOutdatedAgentsHosts() ([]string, error) {
})

if err != nil {
log.Errore(err)
_ = log.Errore(err)
}
return res, err
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func ReadAgents() ([]Agent, error) {
})

if err != nil {
log.Errore(err)
_ = log.Errore(err)
}
return res, err

Expand Down Expand Up @@ -322,7 +322,7 @@ func GetAgent(hostname string) (Agent, error) {
err = json.Unmarshal(body, &agent.AvailableLocalSnapshots)
}
if err != nil {
log.Errore(err)
_ = log.Errore(err)
}
}
{
Expand Down Expand Up @@ -359,7 +359,7 @@ func GetAgent(hostname string) (Agent, error) {
mySQLRunningUri := fmt.Sprintf("%s/mysql-status?token=%s", uri, token)
body, err := readResponse(httpGet(mySQLRunningUri))
if err == nil {
err = json.Unmarshal(body, &agent.MySQLRunning)
_ = json.Unmarshal(body, &agent.MySQLRunning)
}
// Actually an error is OK here since "status" returns with non-zero exit code when MySQL not running
Comment on lines 359 to 364
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n -C3 'mysql-status|seed-command-completed|seed-command-succeeded|MySQLRunning|seedCommandCompleted|seedCommandSucceeded|Checking MySQL status on target|10 iterations have passed without progress' go

Repository: ProxySQL/orchestrator

Length of output: 4012


Handle JSON decode failures separately from legitimate false values.

Lines 359–364 and 522–533 silently discard json.Unmarshal errors, causing boolean fields to default to false when responses contain malformed JSON. This pattern masks decode failures:

  • Line 685 checks MySQLRunning to prevent executing MySQL operations on a running instance
  • Line 751's polling loop depends on seedCommandCompleted returning an accurate completion state

If an agent endpoint returns malformed JSON with a successful HTTP status, the code treats it identically to a legitimate false response. Consider explicitly handling decode errors or separating error states from boolean results.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@go/agent/agent_dao.go` around lines 359 - 364, The code currently swallows
json.Unmarshal errors when populating agent.MySQLRunning (from
readResponse(httpGet(...))) and similarly for seedCommandCompleted, causing
malformed JSON to be indistinguishable from a legitimate false; update the
unmarshalling logic to detect and surface decode errors instead of silently
defaulting to false: when calling json.Unmarshal for the MySQL status and for
seed command/status responses, capture the error and either (a) return/log it
and avoid overwriting the existing boolean, or (b) unmarshal into a *bool
(pointer) or a small struct so you can tell nil (decode failure) vs false, and
propagate that error/state up (or set a separate error field like
MySQLRunningErr/SeedCommandErr) so callers (e.g., the MySQL guard that checks
MySQLRunning and the polling loop that calls seedCommandCompleted) can
distinguish malformed responses from a real false.

}
Expand Down Expand Up @@ -434,7 +434,7 @@ func executeAgentCommandWithMethodFunc(hostname string, command string, methodFu
if onResponse != nil {
(*onResponse)(body)
}
auditAgentOperation("agent-command", &agent, command)
_ = auditAgentOperation("agent-command", &agent, command)

return agent, err
}
Expand Down Expand Up @@ -519,7 +519,7 @@ func CustomCommand(hostname string, cmd string) (output string, err error) {
func seedCommandCompleted(hostname string, seedId int64) (Agent, bool, error) {
result := false
onResponse := func(body []byte) {
json.Unmarshal(body, &result)
_ = json.Unmarshal(body, &result)
}
agent, err := executeAgentCommand(hostname, fmt.Sprintf("seed-command-completed/%d", seedId), &onResponse)
return agent, result, err
Expand All @@ -529,7 +529,7 @@ func seedCommandCompleted(hostname string, seedId int64) (Agent, bool, error) {
func seedCommandSucceeded(hostname string, seedId int64) (Agent, bool, error) {
result := false
onResponse := func(body []byte) {
json.Unmarshal(body, &result)
_ = json.Unmarshal(body, &result)
}
agent, err := executeAgentCommand(hostname, fmt.Sprintf("seed-command-succeeded/%d", seedId), &onResponse)
return agent, result, err
Expand All @@ -543,7 +543,7 @@ func AbortSeed(seedId int64) error {
}

for _, seedOperation := range seedOperations {
AbortSeedCommand(seedOperation.TargetHostname, seedId)
_, _ = AbortSeedCommand(seedOperation.TargetHostname, seedId)
AbortSeedCommand(seedOperation.SourceHostname, seedId)
}
updateSeedComplete(seedId, errors.New("Aborted"))
Comment on lines 545 to 549
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't mark the seed aborted when the abort RPCs are unchecked.

This loop discards both agent-side abort results and then unconditionally marks the seed as aborted on Line 549. If either agent misses the abort, orchestrator can report success while the copy is still running.

Proposed fix
 func AbortSeed(seedId int64) error {
 	seedOperations, err := AgentSeedDetails(seedId)
 	if err != nil {
 		return log.Errore(err)
 	}

+	var abortErr error
 	for _, seedOperation := range seedOperations {
-		_, _ = AbortSeedCommand(seedOperation.TargetHostname, seedId)
-		AbortSeedCommand(seedOperation.SourceHostname, seedId)
+		if _, err := AbortSeedCommand(seedOperation.TargetHostname, seedId); err != nil && abortErr == nil {
+			abortErr = err
+		}
+		if _, err := AbortSeedCommand(seedOperation.SourceHostname, seedId); err != nil && abortErr == nil {
+			abortErr = err
+		}
 	}
+	if abortErr != nil {
+		return log.Errore(abortErr)
+	}
 	updateSeedComplete(seedId, errors.New("Aborted"))
 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@go/agent/agent_dao.go` around lines 545 - 549, The loop currently ignores
AbortSeedCommand results and then unconditionally calls
updateSeedComplete(seedId, errors.New("Aborted")); change it to inspect and
aggregate the errors returned by AbortSeedCommand for both
seedOperation.TargetHostname and seedOperation.SourceHostname (for each
seedOperation in seedOperations), and only mark the seed as aborted via
updateSeedComplete(seedId, err) when all abort RPCs succeeded (or pass a
combined error when any fail); specifically, replace the discarded returns from
AbortSeedCommand with captured error variables, accumulate non-nil errors (e.g.,
in a slice or combined error) and use that aggregated error when calling
updateSeedComplete so the orchestrator won’t report success if any abort RPC
failed.

Expand Down Expand Up @@ -702,8 +702,8 @@ func executeSeed(seedId int64, targetHostname string, sourceHostname string) err
if err != nil {
return updateSeedStateEntry(seedStateId, err)
}
sourceAgent, err = GetAgent(sourceHostname)
seedStateId, _ = submitSeedStateEntry(seedId, fmt.Sprintf("MySQL data volume on source host %s is %d bytes", sourceHostname, sourceAgent.MountPoint.MySQLDiskUsage), "")
sourceAgent, _ = GetAgent(sourceHostname)
_, _ = submitSeedStateEntry(seedId, fmt.Sprintf("MySQL data volume on source host %s is %d bytes", sourceHostname, sourceAgent.MountPoint.MySQLDiskUsage), "")
Comment on lines +705 to +706
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve the GetAgent error after mounting.

If this refresh fails, sourceAgent.MountPoint.MySQLDiskUsage falls back to stale/zero data and the free-space guard on Line 720 can pass incorrectly. That can wipe the target and continue the seed without ever re-reading the mounted volume size.

Proposed fix
-	sourceAgent, _ = GetAgent(sourceHostname)
-	_, _ = submitSeedStateEntry(seedId, fmt.Sprintf("MySQL data volume on source host %s is %d bytes", sourceHostname, sourceAgent.MountPoint.MySQLDiskUsage), "")
+	sourceAgent, err = GetAgent(sourceHostname)
+	if err != nil {
+		_, _ = Unmount(sourceHostname)
+		return updateSeedStateEntry(seedStateId, err)
+	}
+	_, _ = submitSeedStateEntry(seedId, fmt.Sprintf("MySQL data volume on source host %s is %d bytes", sourceHostname, sourceAgent.MountPoint.MySQLDiskUsage), "")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@go/agent/agent_dao.go` around lines 705 - 706, After remounting the source
volume you must preserve and check the error returned by GetAgent instead of
discarding it; update the code around the call to GetAgent so that the returned
error is assigned (e.g., sourceAgent, err := GetAgent(sourceHostname)), verify
err is nil before using sourceAgent.MountPoint.MySQLDiskUsage, and if GetAgent
fails return or log and abort the seed operation (using seedId and
submitSeedStateEntry to record the failure) so stale/zero MySQLDiskUsage cannot
bypass the free-space guard.


seedStateId, _ = submitSeedStateEntry(seedId, fmt.Sprintf("Erasing MySQL data on %s", targetHostname), "")
_, err = deleteMySQLDatadir(targetHostname)
Expand All @@ -719,14 +719,14 @@ func executeSeed(seedId int64, targetHostname string, sourceHostname string) err

if sourceAgent.MountPoint.MySQLDiskUsage > targetAgent.MySQLDatadirDiskFree {
Unmount(sourceHostname)
return updateSeedStateEntry(seedStateId, fmt.Errorf("Not enough disk space on target host %s. Required: %d, available: %d. Bailing out.", targetHostname, sourceAgent.MountPoint.MySQLDiskUsage, targetAgent.MySQLDatadirDiskFree))
return updateSeedStateEntry(seedStateId, fmt.Errorf("not enough disk space on target host %s, required: %d, available: %d, bailing out", targetHostname, sourceAgent.MountPoint.MySQLDiskUsage, targetAgent.MySQLDatadirDiskFree))
}

// ...
seedStateId, _ = submitSeedStateEntry(seedId, fmt.Sprintf("%s will now receive data in background", targetHostname), "")
_, _ = submitSeedStateEntry(seedId, fmt.Sprintf("%s will now receive data in background", targetHostname), "")
ReceiveMySQLSeedData(targetHostname, seedId)

seedStateId, _ = submitSeedStateEntry(seedId, fmt.Sprintf("Waiting %d seconds for %s to start listening for incoming data", config.Config.SeedWaitSecondsBeforeSend, targetHostname), "")
_, _ = submitSeedStateEntry(seedId, fmt.Sprintf("Waiting %d seconds for %s to start listening for incoming data", config.Config.SeedWaitSecondsBeforeSend, targetHostname), "")
time.Sleep(time.Duration(config.Config.SeedWaitSecondsBeforeSend) * time.Second)

seedStateId, _ = submitSeedStateEntry(seedId, fmt.Sprintf("%s will now send data to %s in background", sourceHostname, targetHostname), "")
Expand Down Expand Up @@ -762,7 +762,7 @@ func executeSeed(seedId int64, targetHostname string, sourceHostname string) err
AbortSeedCommand(sourceHostname, seedId)
AbortSeedCommand(targetHostname, seedId)
Unmount(sourceHostname)
return updateSeedStateEntry(seedStateId, errors.New("10 iterations have passed without progress. Bailing out."))
return updateSeedStateEntry(seedStateId, errors.New("10 iterations have passed without progress, bailing out"))
}

var copyPct int64 = 0
Expand Down Expand Up @@ -795,10 +795,10 @@ func executeSeed(seedId int64, targetHostname string, sourceHostname string) err
return updateSeedStateEntry(seedStateId, err)
}

seedStateId, _ = submitSeedStateEntry(seedId, fmt.Sprintf("Submitting MySQL instance for discovery: %s", targetHostname), "")
_, _ = submitSeedStateEntry(seedId, fmt.Sprintf("Submitting MySQL instance for discovery: %s", targetHostname), "")
SeededAgents <- &targetAgent

seedStateId, _ = submitSeedStateEntry(seedId, "Done", "")
_, _ = submitSeedStateEntry(seedId, "Done", "")

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/agent/instance_topology_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func SyncReplicaRelayLogs(instance, otherInstance *inst.Instance) (*inst.Instanc
var nextCoordinates *inst.BinlogCoordinates
var content string
onResponse := func(contentBytes []byte) {
json.Unmarshal(contentBytes, &content)
_ = json.Unmarshal(contentBytes, &content)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

head -n 100 go/agent/instance_topology_agent.go | tail -n 75

Repository: ProxySQL/orchestrator

Length of output: 2621


🏁 Script executed:

sed -n '1,100p' go/agent/instance_topology_agent.go

Repository: ProxySQL/orchestrator

Length of output: 2655


🏁 Script executed:

rg -n "func RelaylogContentsTail" --type go

Repository: ProxySQL/orchestrator

Length of output: 218


🏁 Script executed:

rg -A 20 "func RelaylogContentsTail" --type go | head -50

Repository: ProxySQL/orchestrator

Length of output: 663


🏁 Script executed:

rg -A 30 "func executeAgentCommand" go/agent/agent_dao.go | head -50

Repository: ProxySQL/orchestrator

Length of output: 1896


Don't ignore relay-log JSON decode failures.

Line 33 discards json.Unmarshal errors; that allows invalid or empty payloads to flow into ApplyRelaylogContents. The callback is invoked synchronously before RelaylogContentsTail returns, so you can capture the error and propagate it through the existing error handling path.

Suggested fix
 func SyncReplicaRelayLogs(instance, otherInstance *inst.Instance) (*inst.Instance, error) {
 	var err error
+	var decodeErr error
 	var found bool
 	var nextCoordinates *inst.BinlogCoordinates
 	var content string
 	onResponse := func(contentBytes []byte) {
-		_ = json.Unmarshal(contentBytes, &content)
+		if e := json.Unmarshal(contentBytes, &content); e != nil {
+			decodeErr = e
+		}
 	}
@@
 	if _, err := RelaylogContentsTail(otherInstance.Key.Hostname, nextCoordinates, &onResponse); err != nil {
 		goto Cleanup
 	}
+	if decodeErr != nil {
+		err = decodeErr
+		goto Cleanup
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
_ = json.Unmarshal(contentBytes, &content)
func SyncReplicaRelayLogs(instance, otherInstance *inst.Instance) (*inst.Instance, error) {
var err error
var decodeErr error
var found bool
var nextCoordinates *inst.BinlogCoordinates
var content string
onResponse := func(contentBytes []byte) {
if e := json.Unmarshal(contentBytes, &content); e != nil {
decodeErr = e
}
}
// ... (intervening code)
if _, err := RelaylogContentsTail(otherInstance.Key.Hostname, nextCoordinates, &onResponse); err != nil {
goto Cleanup
}
if decodeErr != nil {
err = decodeErr
goto Cleanup
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@go/agent/instance_topology_agent.go` at line 33, Replace the ignored
json.Unmarshal call so that decoding errors are captured and propagated: change
the `_ = json.Unmarshal(contentBytes, &content)` usage in RelaylogContentsTail
(the call that eventually invokes ApplyRelaylogContents) to check the returned
error, log or return it through the existing error path, and avoid calling
ApplyRelaylogContents with invalid/empty content; ensure the function signature
and callers of RelaylogContentsTail (and any error-handling path already
present) are used to propagate the JSON decode error instead of discarding it.

}
log.Debugf("SyncReplicaRelayLogs: stopping replication")

Expand Down Expand Up @@ -72,7 +72,7 @@ Cleanup:
return instance, log.Errore(err)
}
// and we're done (pending deferred functions)
inst.AuditOperation("align-via-relaylogs", &instance.Key, fmt.Sprintf("aligned %+v by relaylogs from %+v", instance.Key, otherInstance.Key))
_ = inst.AuditOperation("align-via-relaylogs", &instance.Key, fmt.Sprintf("aligned %+v by relaylogs from %+v", instance.Key, otherInstance.Key))

return instance, err
}
Loading
Loading