Skip to content

Commit 6f79e5a

Browse files
author
Youen Péron
authored
feat(catp): execute process (#4)
* feat(catp): execute process * chore(changelog): add new entrie for v0.4.0
1 parent 8ae30c5 commit 6f79e5a

File tree

7 files changed

+470
-48
lines changed

7 files changed

+470
-48
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
.env
22
bin
33
.cache
4+
test/suites/stderr.txt

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ Types of changes
1414
- `Fixed` for any bug fixes.
1515
- `Security` in case of vulnerabilities.
1616

17+
## [0.4.0] 2022-07-05
18+
19+
- `Added` Cat Pipe can execute process and capture stdout or stderr stream
20+
1721
## [0.3.0] 2022-06-14
1822

1923
- `Added` Cat Pipe retry to connect to a failed tcp stream every second at startup.

cmd/catp/main.go

Lines changed: 81 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"os"
2525
"time"
2626

27+
"github.com/cgi-fr/cat-balancer/pkg/catp"
2728
"github.com/rs/zerolog"
2829
"github.com/rs/zerolog/log"
2930
"github.com/spf13/cobra"
@@ -44,9 +45,11 @@ func main() {
4445
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
4546

4647
var (
47-
inAdrress string
48-
outAddress string
49-
verbosity string
48+
inAdrress string
49+
outAddress string
50+
captureStderr string
51+
captureStdout string
52+
verbosity string
5053
)
5154

5255
// nolint: exhaustivestruct
@@ -61,32 +64,26 @@ This is free software: you are free to change and redistribute it.
6164
There is NO WARRANTY, to the extent permitted by law.`, version, commit, buildDate, builtBy),
6265

6366
RunE: func(cmd *cobra.Command, args []string) error {
64-
switch verbosity {
65-
case "trace", "5":
66-
zerolog.SetGlobalLevel(zerolog.TraceLevel)
67-
log.Info().Msg("Logger level set to trace")
68-
case "debug", "4":
69-
zerolog.SetGlobalLevel(zerolog.DebugLevel)
70-
log.Info().Msg("Logger level set to debug")
71-
case "info", "3":
72-
zerolog.SetGlobalLevel(zerolog.InfoLevel)
73-
log.Info().Msg("Logger level set to info")
74-
case "warn", "2":
75-
zerolog.SetGlobalLevel(zerolog.WarnLevel)
76-
case "error", "1":
77-
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
78-
default:
79-
zerolog.SetGlobalLevel(zerolog.Disabled)
67+
initLog(verbosity)
68+
69+
command := []string{}
70+
71+
if cmd.ArgsLenAtDash() > -1 {
72+
command = args[cmd.ArgsLenAtDash():]
8073
}
8174

82-
return run(cmd, inAdrress, outAddress)
75+
return run(cmd, inAdrress, outAddress, command, captureStderr, captureStdout)
8376
},
8477
}
8578

8679
rootCmd.PersistentFlags().StringVarP(&inAdrress, "in", "i", "",
8780
"input server's address (empty for stdin by default)")
8881
rootCmd.PersistentFlags().StringVarP(&outAddress, "out", "o", "",
8982
"output server's address (empty for stdout by default)")
83+
rootCmd.PersistentFlags().StringVarP(&captureStderr, "save-stderr", "E", "",
84+
"capture stderr output into a file")
85+
rootCmd.PersistentFlags().StringVarP(&captureStdout, "save-stdout", "O", "",
86+
"capture stdout output into a file")
9087
rootCmd.PersistentFlags().
9188
StringVarP(&verbosity,
9289
"verbosity",
@@ -101,43 +98,83 @@ There is NO WARRANTY, to the extent permitted by law.`, version, commit, buildDa
10198
}
10299
}
103100

104-
func run(cmd *cobra.Command, in string, out string) error {
105-
streamIn := cmd.InOrStdin()
106-
streamOut := cmd.OutOrStdout()
101+
func initLog(verbosity string) {
102+
switch verbosity {
103+
case "trace", "5":
104+
zerolog.SetGlobalLevel(zerolog.TraceLevel)
105+
log.Info().Msg("Logger level set to trace")
106+
case "debug", "4":
107+
zerolog.SetGlobalLevel(zerolog.DebugLevel)
108+
log.Info().Msg("Logger level set to debug")
109+
case "info", "3":
110+
zerolog.SetGlobalLevel(zerolog.InfoLevel)
111+
log.Info().Msg("Logger level set to info")
112+
case "warn", "2":
113+
zerolog.SetGlobalLevel(zerolog.WarnLevel)
114+
case "error", "1":
115+
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
116+
default:
117+
zerolog.SetGlobalLevel(zerolog.Disabled)
118+
}
119+
}
107120

108-
if in != "" {
109-
for {
110-
conIn, err := net.Dial("tcp", in)
111-
if err == nil {
112-
defer conIn.Close()
121+
// openTCP try to open tcp stream every second until success.
122+
func openTCP(addr string) io.ReadWriteCloser {
123+
for {
124+
conIn, err := net.Dial("tcp", addr)
125+
if err == nil {
126+
return conIn
127+
}
113128

114-
streamIn = conIn
129+
log.Warn().Err(err).Msg("tcp stream failed to connect")
130+
time.Sleep(time.Second)
131+
}
132+
}
115133

116-
break
117-
}
134+
func run(
135+
cmd *cobra.Command,
136+
in string, out string,
137+
command []string,
138+
captureStderr string, captureStdout string,
139+
) error {
140+
streamIn := cmd.InOrStdin()
141+
streamOut := cmd.OutOrStdout()
118142

119-
log.Warn().Err(err).Msg("input tcp stream failed to connect")
120-
time.Sleep(time.Second)
121-
}
143+
if in != "" {
144+
conIn := openTCP(in)
145+
defer conIn.Close()
146+
streamIn = conIn
122147
}
123148

124149
if out != "" {
125-
for {
126-
conOut, err := net.Dial("tcp", out)
127-
if err == nil {
128-
defer conOut.Close()
150+
conOut := openTCP(out)
151+
defer conOut.Close()
152+
streamOut = conOut
153+
}
129154

130-
streamOut = conOut
155+
streamCaptureStderr := cmd.ErrOrStderr()
131156

132-
break
133-
}
157+
if captureStderr != "" {
158+
captureFile, err := os.Create(captureStderr)
159+
if err != nil {
160+
return fmt.Errorf("%w", err)
161+
}
162+
defer captureFile.Close()
134163

135-
log.Warn().Err(err).Msg("output tcp stream failed to connect")
136-
time.Sleep(time.Second)
164+
streamCaptureStderr = io.MultiWriter(captureFile, cmd.ErrOrStderr())
165+
}
166+
167+
if captureStdout != "" {
168+
captureFile, err := os.Create(captureStdout)
169+
if err != nil {
170+
return fmt.Errorf("%w", err)
137171
}
172+
defer captureFile.Close()
173+
174+
streamOut = io.MultiWriter(captureFile, streamOut)
138175
}
139176

140-
_, err := io.Copy(streamOut, streamIn)
177+
err := catp.Start(command, streamIn, streamOut, streamCaptureStderr)
141178
if err != nil {
142179
return fmt.Errorf("%w", err)
143180
}

cmd/catp/main_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func Test_run(t *testing.T) {
8181
cmd := &cobra.Command{}
8282
cmd.SetIn(reader)
8383

84-
err := run(cmd, "", fmt.Sprintf("127.0.0.1:%d", producerPort))
84+
err := run(cmd, "", fmt.Sprintf("127.0.0.1:%d", producerPort), []string{}, "", "")
8585

8686
assert.Nil(t, err)
8787
wg.Done()
@@ -95,7 +95,7 @@ func Test_run(t *testing.T) {
9595
cmd := &cobra.Command{}
9696
cmd.SetOut(&writer)
9797

98-
err := run(cmd, fmt.Sprintf("127.0.0.1:%d", consumerPort), "")
98+
err := run(cmd, fmt.Sprintf("127.0.0.1:%d", consumerPort), "", []string{}, "", "")
9999
assert.Nil(t, err)
100100
assert.Equal(t, "hello\n", writer.String())
101101
wg.Done()
@@ -124,7 +124,7 @@ func Test_run_producer_first(t *testing.T) {
124124
cmd := &cobra.Command{}
125125
cmd.SetIn(reader)
126126

127-
err = run(cmd, "", fmt.Sprintf("127.0.0.1:%d", producerPort))
127+
err = run(cmd, "", fmt.Sprintf("127.0.0.1:%d", producerPort), []string{}, "", "")
128128

129129
assert.Nil(t, err)
130130

@@ -133,7 +133,7 @@ func Test_run_producer_first(t *testing.T) {
133133
cmd = &cobra.Command{}
134134
cmd.SetOut(&writer)
135135

136-
err = run(cmd, fmt.Sprintf("127.0.0.1:%d", consumerPort), "")
136+
err = run(cmd, fmt.Sprintf("127.0.0.1:%d", consumerPort), "", []string{}, "", "")
137137
assert.Nil(t, err)
138138
assert.Equal(t, "hello\n", writer.String())
139139
}

0 commit comments

Comments
 (0)