Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions worker/base_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ type baseProvider struct {
timeout time.Duration
isMaster bool

cmd *cmdJob
logFileFd *os.File
isRunning atomic.Value
cmd *cmdJob
logFileFd *os.File
isRunning atomic.Value
successExitCodes []int

cgroup *cgroupHook
zfs *zfsHook
Expand Down Expand Up @@ -186,3 +187,18 @@ func (p *baseProvider) Terminate() error {
func (p *baseProvider) DataSize() string {
return ""
}

func (p *baseProvider) SetSuccessExitCodes(codes []int) {
if codes == nil {
p.successExitCodes = []int{}
} else {
p.successExitCodes = codes
}
}

func (p *baseProvider) GetSuccessExitCodes() []int {
if p.successExitCodes == nil {
return []int{}
}
return p.successExitCodes
}
6 changes: 6 additions & 0 deletions worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type globalConfig struct {

ExecOnSuccess []string `toml:"exec_on_success"`
ExecOnFailure []string `toml:"exec_on_failure"`

// merged with mirror-specific options. make sure you know what you are doing!
SuccessExitCodes []int `toml:"dangerous_global_success_exit_codes"`
}

type managerConfig struct {
Expand Down Expand Up @@ -169,6 +172,9 @@ type mirrorConfig struct {
ExecOnSuccessExtra []string `toml:"exec_on_success_extra"`
ExecOnFailureExtra []string `toml:"exec_on_failure_extra"`

// will be merged with global option
SuccessExitCodes []int `toml:"success_exit_codes"`

Command string `toml:"command"`
FailOnMatch string `toml:"fail_on_match"`
SizePattern string `toml:"size_pattern"`
Expand Down
56 changes: 56 additions & 0 deletions worker/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,4 +521,60 @@ rsync_options = ["--local"]
"--local", // from mirror.rsync_options
})
})

Convey("success_exit_codes should work globally and per mirror", t, func() {
tmpfile, err := os.CreateTemp("", "tunasync")
So(err, ShouldEqual, nil)
defer os.Remove(tmpfile.Name())

cfgBlob1 := `
[global]
name = "test_worker"
log_dir = "/var/log/tunasync/{{.Name}}"
mirror_dir = "/data/mirrors"
concurrent = 10
interval = 240
retry = 3
timeout = 86400
dangerous_global_success_exit_codes = [10, 20]

[manager]
api_base = "https://127.0.0.1:5000"
token = "some_token"

[server]
hostname = "worker1.example.com"
listen_addr = "127.0.0.1"
listen_port = 6000
ssl_cert = "/etc/tunasync.d/worker1.cert"
ssl_key = "/etc/tunasync.d/worker1.key"

[[mirrors]]
name = "foo"
provider = "rsync"
upstream = "rsync://foo.bar/"
interval = 720
retry = 2
timeout = 3600
mirror_dir = "/data/foo"
success_exit_codes = [30, 40]
`

err = os.WriteFile(tmpfile.Name(), []byte(cfgBlob1), 0644)
So(err, ShouldEqual, nil)
defer tmpfile.Close()

cfg, err := LoadConfig(tmpfile.Name())
So(err, ShouldBeNil)

providers := map[string]mirrorProvider{}
for _, m := range cfg.Mirrors {
p := newMirrorProvider(m, cfg)
providers[p.Name()] = p
}

p, ok := providers["foo"].(*rsyncProvider)
So(ok, ShouldBeTrue)
So(p.successExitCodes, ShouldResemble, []int{10, 20, 30, 40})
})
}
16 changes: 16 additions & 0 deletions worker/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type mirrorProvider interface {
ExitContext() *Context
// return context
Context() *Context

// set in newMirrorProvider, used by cmdJob.Wait
SetSuccessExitCodes(codes []int)
GetSuccessExitCodes() []int
}

// newProvider creates a mirrorProvider instance
Expand Down Expand Up @@ -249,5 +253,17 @@ func newMirrorProvider(mirror mirrorConfig, cfg *Config) mirrorProvider {
}
addHookFromCmdList(mirror.ExecOnFailureExtra, execOnFailure)

successExitCodes := []int{}
if cfg.Global.SuccessExitCodes != nil {
successExitCodes = append(successExitCodes, cfg.Global.SuccessExitCodes...)
}
if mirror.SuccessExitCodes != nil {
successExitCodes = append(successExitCodes, mirror.SuccessExitCodes...)
}
if len(successExitCodes) > 0 {
logger.Infof("Non-zero success exit codes set for mirror %s: %v", mirror.Name, successExitCodes)
provider.SetSuccessExitCodes(successExitCodes)
}

return provider
}
53 changes: 53 additions & 0 deletions worker/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,59 @@ sleep 10
So(provider.DataSize(), ShouldBeEmpty)
})
})
Convey("Command Provider with successExitCodes should work", t, func(ctx C) {
tmpDir, err := os.MkdirTemp("", "tunasync")
defer os.RemoveAll(tmpDir)
So(err, ShouldBeNil)
scriptFile := filepath.Join(tmpDir, "cmd.sh")
tmpFile := filepath.Join(tmpDir, "log_file")

c := cmdConfig{
name: "tuna-cmd",
upstreamURL: "http://mirrors.tuna.moe/",
command: "bash " + scriptFile,
workingDir: tmpDir,
logDir: tmpDir,
logFile: tmpFile,
interval: 600 * time.Second,
}

provider, err := newCmdProvider(c)
provider.SetSuccessExitCodes([]int{199, 200})
So(err, ShouldBeNil)

So(provider.Type(), ShouldEqual, provCommand)
So(provider.Name(), ShouldEqual, c.name)
So(provider.WorkingDir(), ShouldEqual, c.workingDir)
So(provider.LogDir(), ShouldEqual, c.logDir)
So(provider.LogFile(), ShouldEqual, c.logFile)
So(provider.Interval(), ShouldEqual, c.interval)
So(provider.GetSuccessExitCodes(), ShouldResemble, []int{199, 200})

Convey("Command exits with configured successExitCodes", func() {
scriptContent := `exit 199`
err = os.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)
readedScriptContent, err := os.ReadFile(scriptFile)
So(err, ShouldBeNil)
So(readedScriptContent, ShouldResemble, []byte(scriptContent))

err = provider.Run(make(chan empty, 1))
So(err, ShouldBeNil)
})

Convey("Command exits with unknown exit code", func() {
scriptContent := `exit 201`
err = os.WriteFile(scriptFile, []byte(scriptContent), 0755)
So(err, ShouldBeNil)
readedScriptContent, err := os.ReadFile(scriptFile)
So(err, ShouldBeNil)
So(readedScriptContent, ShouldResemble, []byte(scriptContent))

err = provider.Run(make(chan empty, 1))
So(err, ShouldNotBeNil)
})
})
}

func TestTwoStageRsyncProvider(t *testing.T) {
Expand Down
14 changes: 12 additions & 2 deletions worker/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"os/exec"
"slices"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -171,9 +172,18 @@ func (c *cmdJob) Wait() error {
return c.retErr
default:
err := c.cmd.Wait()
c.retErr = err
close(c.finished)
return err
if err != nil {
code := err.(*exec.ExitError).ExitCode()
allowedCodes := c.provider.GetSuccessExitCodes()
if slices.Contains(allowedCodes, code) {
// process exited with non-success status
logger.Infof("Command %s exited with code %d: treated as success (allowed: %v)", c.cmd.Args, code, allowedCodes)
} else {
c.retErr = err
}
}
return c.retErr
}
}

Expand Down