From eaaa76b991f2740e2356387051caa0f253646cb0 Mon Sep 17 00:00:00 2001 From: FUJIWARA Shunichiro Date: Mon, 7 Oct 2019 13:41:23 +0900 Subject: [PATCH 1/8] refactoring --- supervisor.go | 50 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/supervisor.go b/supervisor.go index 270a56c..9ec1834 100644 --- a/supervisor.go +++ b/supervisor.go @@ -133,25 +133,17 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) { } }() - // spawn command - for i := 0; i < conf.Provider.WorkerNum; i++ { - s.wgrp.Add(1) - go func() { - logf := logrus.Fields{"type": "cmd_worker"} - for c := range s.cmdq { - LogWithFields(logf).Debugf("invoking command: %s %s", c.command, string(c.input)) - src := bytes.NewBuffer(c.input) - out, err := InvokePipe(c.command, src) - if err != nil { - LogWithFields(logf).Errorf("(%s) %s", err.Error(), string(out)) - } else { - LogWithFields(logf).Debugf("Success to execute command") - } - } - s.wgrp.Done() - }() + if err := s.startCommandWorkers(conf); err != nil { + return Supervisor{}, err } + if err := s.startWorkers(conf, wqSize); err != nil { + return Supervisor{}, err + } + return s, nil +} + +func (s *Supervisor) startWorkers(conf *config.Config, wqSize int) error { // Spawn workers var err error for i := 0; i < conf.Provider.WorkerNum; i++ { @@ -195,11 +187,29 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) { "worker_id": i, }).Debugf("Spawned worker-%d.", i) } + return err +} - if err != nil { - return Supervisor{}, err +func (s *Supervisor) startCommandWorkers(conf *config.Config) error { + // spawn command + for i := 0; i < conf.Provider.WorkerNum; i++ { + s.wgrp.Add(1) + go func() { + logf := logrus.Fields{"type": "cmd_worker"} + for c := range s.cmdq { + LogWithFields(logf).Debugf("invoking command: %s %s", c.command, string(c.input)) + src := bytes.NewBuffer(c.input) + out, err := InvokePipe(c.command, src) + if err != nil { + LogWithFields(logf).Errorf("(%s) %s", err.Error(), string(out)) + } else { + LogWithFields(logf).Debugf("Success to execute command") + } + } + s.wgrp.Done() + }() } - return s, nil + return nil } // Shutdown supervisor From 258119a283095c058d924d8da520e05a953c96e2 Mon Sep 17 00:00:00 2001 From: FUJIWARA Shunichiro Date: Mon, 7 Oct 2019 16:09:34 +0900 Subject: [PATCH 2/8] Support error_hook_to to output error payload into stdout/stderr. --- config/config.go | 1 + server.go | 11 +---- stat.go | 2 +- supervisor.go | 108 ++++++++++++++++++++++++++++++----------------- 4 files changed, 74 insertions(+), 48 deletions(-) diff --git a/config/config.go b/config/config.go index 00e9181..e2e9987 100644 --- a/config/config.go +++ b/config/config.go @@ -45,6 +45,7 @@ type SectionProvider struct { DebugPort int MaxConnections int `toml:"max_connections"` ErrorHook string `toml:"error_hook"` + ErrorHookTo string `toml:"error_hook_to"` } // SectionApns is the configure which is loaded from gunfish.toml diff --git a/server.go b/server.go index 2681bfd..5a9c361 100644 --- a/server.go +++ b/server.go @@ -34,7 +34,6 @@ type Provider struct { // Therefore, you can specifies hook command which is set at toml file. type ResponseHandler interface { OnResponse(Result) - HookCmd() string } // DefaultResponseHandler is the default ResponseHandler if not specified. @@ -46,12 +45,6 @@ type DefaultResponseHandler struct { func (rh DefaultResponseHandler) OnResponse(result Result) { } -// HookCmd returns hook command to execute after getting response from APNS -// only when to get error response. -func (rh DefaultResponseHandler) HookCmd() string { - return rh.Hook -} - // StartServer starts an apns provider server on http. func StartServer(conf config.Config, env Environment) { // Initialize DefaultResponseHandler if response handlers are not defined. @@ -60,7 +53,7 @@ func StartServer(conf config.Config, env Environment) { } if errorResponseHandler == nil { - InitErrorResponseHandler(DefaultResponseHandler{Hook: conf.Provider.ErrorHook}) + InitErrorResponseHandler(DefaultResponseHandler{}) } // Init Provider @@ -337,7 +330,7 @@ func (prov *Provider) StatsHandler() http.HandlerFunc { atomic.StoreInt64(&(srvStats.QueueSize), int64(len(prov.Sup.queue))) atomic.StoreInt64(&(srvStats.RetryQueueSize), int64(len(prov.Sup.retryq))) atomic.StoreInt64(&(srvStats.WorkersQueueSize), int64(wqs)) - atomic.StoreInt64(&(srvStats.CommandQueueSize), int64(len(prov.Sup.cmdq))) + atomic.StoreInt64(&(srvStats.ErrorQueueSize), int64(len(prov.Sup.errq))) res.WriteHeader(http.StatusOK) encoder := json.NewEncoder(res) err := encoder.Encode(srvStats.GetStats()) diff --git a/stat.go b/stat.go index 4391bde..076a089 100644 --- a/stat.go +++ b/stat.go @@ -20,7 +20,7 @@ type Stats struct { QueueSize int64 `json:"queue_size"` RetryQueueSize int64 `json:"retry_queue_size"` WorkersQueueSize int64 `json:"workers_queue_size"` - CommandQueueSize int64 `json:"cmdq_queue_size"` + ErrorQueueSize int64 `json:"error_queue_size"` RetryCount int64 `json:"retry_count"` RequestCount int64 `json:"req_count"` SentCount int64 `json:"sent_count"` diff --git a/supervisor.go b/supervisor.go index 9ec1834..13f22e8 100644 --- a/supervisor.go +++ b/supervisor.go @@ -5,9 +5,11 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "os" "os/exec" + "strings" "sync" "sync/atomic" "syscall" @@ -24,7 +26,7 @@ import ( type Supervisor struct { queue chan *[]Request // supervisor's queue that recieves POST requests. retryq chan Request // enqueues this retry queue when to failed to send notification on the http layer. - cmdq chan Command // enqueues this command queue when to get error response from apns. + errq chan Error // enqueues this command queue when to get error response from apns. exit chan struct{} // exit channel is used to stop the supervisor. ticker *time.Ticker // ticker checks retry queue that has notifications to resend periodically. wgrp *sync.WaitGroup @@ -54,9 +56,8 @@ type SenderResponse struct { } // Command has execute command and input stream. -type Command struct { - command string - input []byte +type Error struct { + input []byte } // EnqueueClientRequest enqueues request to supervisor's queue from external application service @@ -96,7 +97,7 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) { s := Supervisor{ queue: make(chan *[]Request, conf.Provider.QueueSize), retryq: make(chan Request, conf.Provider.RequestQueueSize*conf.Provider.WorkerNum), - cmdq: make(chan Command, wqSize*conf.Provider.WorkerNum), + errq: make(chan Error, wqSize*conf.Provider.WorkerNum), exit: make(chan struct{}, 1), ticker: time.NewTicker(RetryWaitTime), wgrp: swgrp, @@ -133,7 +134,7 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) { } }() - if err := s.startCommandWorkers(conf); err != nil { + if err := s.startErrorWorkers(conf); err != nil { return Supervisor{}, err } @@ -190,20 +191,56 @@ func (s *Supervisor) startWorkers(conf *config.Config, wqSize int) error { return err } -func (s *Supervisor) startCommandWorkers(conf *config.Config) error { - // spawn command +func (s *Supervisor) startErrorWorkers(conf *config.Config) error { + hookCmd := conf.Provider.ErrorHook + hookTo := conf.Provider.ErrorHookTo + logf := func() logrus.Fields { + return logrus.Fields{"type": "error_worker"} + } + // not defined + if hookCmd == "" && hookTo == "" { + LogWithFields(logf()).Warnf("Neither of error_hook and error_hook_output are not definde.") + go func() { + <-s.errq // dispose simply + }() + return nil + } + + // stdout / stderr + if hookTo != "" { + var out io.Writer + switch strings.ToLower(hookTo) { + case "stdout": + out = os.Stdout + case "stderr": + out = os.Stderr + default: + LogWithFields(logf()).Warnf("error_hook_to allows stdout or stderr only. dispose hook payloads to /dev/null") + out = ioutil.Discard + } + go func() { + for e := range s.errq { + if _, err := out.Write(e.input); err != nil { + LogWithFields(logf()).Warnf("failed to write to %s: %s", hookTo, err) + return + } + } + }() + return nil + } + + // otherwise spawn command for i := 0; i < conf.Provider.WorkerNum; i++ { s.wgrp.Add(1) go func() { - logf := logrus.Fields{"type": "cmd_worker"} - for c := range s.cmdq { - LogWithFields(logf).Debugf("invoking command: %s %s", c.command, string(c.input)) - src := bytes.NewBuffer(c.input) - out, err := InvokePipe(c.command, src) + for e := range s.errq { + LogWithFields(logf()).Debugf("invoking command: %s %s", hookCmd, string(e.input)) + src := bytes.NewBuffer(e.input) + out, err := InvokePipe(hookCmd, src) if err != nil { - LogWithFields(logf).Errorf("(%s) %s", err.Error(), string(out)) + LogWithFields(logf()).Errorf("(%s) %s", err.Error(), string(out)) } else { - LogWithFields(logf).Debugf("Success to execute command") + LogWithFields(logf()).Debug("Success to execute command") } } s.wgrp.Done() @@ -223,7 +260,7 @@ func (s *Supervisor) Shutdown() { tryCnt := 0 for zeroCnt < RestartWaitCount { // if 's.counter' is not 0 potentially, here loop should not cancel to wait. - if len(s.queue)+len(s.cmdq)+len(s.retryq)+s.workersAllQueueLength() > 0 { + if len(s.queue)+len(s.errq)+len(s.retryq)+s.workersAllQueueLength() > 0 { zeroCnt = 0 tryCnt++ } else { @@ -242,7 +279,7 @@ func (s *Supervisor) Shutdown() { time.Sleep(ShutdownWaitTime) } close(s.exit) - close(s.cmdq) + close(s.errq) s.wgrp.Wait() close(s.queue) close(s.retryq) @@ -278,7 +315,7 @@ func (s *Supervisor) spawnWorker(w Worker, conf *config.Config) { case reqs := <-s.queue: w.receiveRequests(reqs) case resp := <-w.respq: - w.receiveResponse(resp, s.retryq, s.cmdq) + w.receiveResponse(resp, s.retryq, s.errq) case <-s.exit: return } @@ -289,7 +326,7 @@ func (s *Supervisor) spawnWorker(w Worker, conf *config.Config) { w.wgrp.Wait() } -func (w *Worker) receiveResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Command) { +func (w *Worker) receiveResponse(resp SenderResponse, retryq chan<- Request, errq chan Error) { req := resp.Req switch t := req.Notification.(type) { @@ -307,7 +344,7 @@ func (w *Worker) receiveResponse(resp SenderResponse, retryq chan<- Request, cmd "response_time": resp.RespTime, "resp_uid": resp.UID, } - handleAPNsResponse(resp, retryq, cmdq, logf) + handleAPNsResponse(resp, retryq, errq, logf) case fcm.Payload: p := req.Notification.(fcm.Payload) logf := logrus.Fields{ @@ -321,14 +358,14 @@ func (w *Worker) receiveResponse(resp SenderResponse, retryq chan<- Request, cmd "response_time": resp.RespTime, "resp_uid": resp.UID, } - handleFCMResponse(resp, retryq, cmdq, logf) + handleFCMResponse(resp, retryq, errq, logf) default: LogWithFields(logrus.Fields{"type": "worker"}).Infof("Unknown request type:%s", t) } } -func handleAPNsResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Command, logf logrus.Fields) { +func handleAPNsResponse(resp SenderResponse, retryq chan<- Request, errq chan Error, logf logrus.Fields) { req := resp.Req // Response handling @@ -342,7 +379,7 @@ func handleAPNsResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Co logf["status"] = result.Status() LogWithFields(logf).Errorf("%s", resp.Err) // Error handling - onResponse(result, errorResponseHandler.HookCmd(), cmdq) + onResponse(result, errq) } else { // if 'result' is nil, HTTP connection error with APNS. retry(retryq, req, errors.New("http connection error between APNs"), logf) @@ -362,17 +399,17 @@ func handleAPNsResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Co retry(retryq, req, err, logf) } - onResponse(result, errorResponseHandler.HookCmd(), cmdq) + onResponse(result, errq) LogWithFields(logf).Errorf("%s", err) } else { - onResponse(result, "", cmdq) + onResponse(result, errq) LogWithFields(logf).Info("Succeeded to send a notification") } } } } -func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Command, logf logrus.Fields) { +func handleFCMResponse(resp SenderResponse, retryq chan<- Request, errq chan Error, logf logrus.Fields) { if resp.Err != nil { req := resp.Req LogWithFields(logf).Warnf("response is nil. reason: %s", resp.Err.Error()) @@ -408,7 +445,7 @@ func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Com atomic.AddInt64(&(srvStats.ErrCount), 1) switch err.Error() { case fcm.InvalidRegistration.String(), fcm.NotRegistered.String(): - onResponse(result, errorResponseHandler.HookCmd(), cmdq) + onResponse(result, errq) LogWithFields(logf).Errorf("%s", err) default: LogWithFields(logf).Errorf("Unknown error message: %s", err) @@ -505,7 +542,7 @@ func (s Supervisor) workersAllQueueLength() int { return sum } -func onResponse(result Result, cmd string, cmdq chan<- Command) { +func onResponse(result Result, errq chan<- Error) { logf := logrus.Fields{ "provider": result.Provider(), "type": "on_response", @@ -521,20 +558,15 @@ func onResponse(result Result, cmd string, cmdq chan<- Command) { successResponseHandler.OnResponse(result) } - if cmd == "" { - return - } - b, _ := result.MarshalJSON() - command := Command{ - command: cmd, - input: b, + error := Error{ + input: b, } select { - case cmdq <- command: - LogWithFields(logf).Debugf("Enqueue command: %v", command) + case errq <- error: + LogWithFields(logf).Debugf("Enqueue error: %v", error) default: - LogWithFields(logf).Warnf("Command queue is full, so could not execute commnad: %v", command) + LogWithFields(logf).Warnf("Error queue is full. dropping error: %v", error) } } From 25b74f543cba939b868265ce2b03a50b651b990e Mon Sep 17 00:00:00 2001 From: FUJIWARA Shunichiro Date: Mon, 7 Oct 2019 16:12:54 +0900 Subject: [PATCH 3/8] fix readme. --- README.md | 42 ++++++------------------------------------ 1 file changed, 6 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 1f6c305..3a0fd61 100644 --- a/README.md +++ b/README.md @@ -121,7 +121,7 @@ Response example: "queue_size": 0, "retry_queue_size": 0, "workers_queue_size": 0, - "cmdq_queue_size": 0, + "error_queue_size": 0, "retry_count": 0, "req_count": 0, "sent_count": 0, @@ -143,7 +143,7 @@ start\_at | The time of started queue\_size | queue size of requests retry\_queue\_size | queue size for resending notification workers\_queue\_size | summary of worker's queue size -command\_queue\_size | error hook command queue size +error\_queue\_size | error hook command queue size retry\_count | summary of retry count request\_count | request count to gunfish err\_count | count of recieving error response @@ -193,11 +193,13 @@ cert_file |optional| The cert file path. kid |optional| kid for APNs provider authentication token. team_id |optional| team id for APNs provider authentication token. error_hook |optional| Error hook command. This command runs when Gunfish catches an error response. +error_hook_to |optional| Error hook outputs into stdout/stderr. api_key |optional| FCM api key. If you want to delivery notifications to android, it is required. -## Error Hook +## Error Hook payload -Error hook command can get an each error response with JSON format by STDIN. +Error hook command/stdout/stderr will get an each error response with JSON format. +The command accepts a payload by STDIN. for example JSON structure: (>= v0.2.x) ```json5 @@ -265,41 +267,9 @@ $ go get github.com/lestrrat/go-server-starter/cmd/start_server $ start_server --port 38003 --pid-file gunfish.pid -- ./gunfish -c conf/gunfish.toml ``` -## Customize - -### How to Implement Response Handlers - -If you have to handle something on error or on success, you should implement error or success handlers. -For example handlers you should implement is given below: - -```go -type CustomYourErrorHandler struct { - hookCmd string -} - -func (ch CustomYourErrorHandler) OnResponse(result Result){ - // ... -} - -func (ch CustomYourErrorHandler) HookCmd( ) string { - return ch.hookCmd -} -``` - -Then you can use these handlers to set before to start gunfish server `( gunfish.StartServer( Config, Environment ) )`. - -```go -InitErrorResponseHandler(CustomYourErrorHandler{hookCmd: "echo 'on error!'"}) -``` - -You can implement a success custom handler in the same way but a hook command is not executed in the success handler in order not to make cpu resource too tight. - ### Test -Requires [dep](https://github.com/golang/dep/) for vendoring. - ``` -$ make get-deps $ make test ``` From 7d9097d0afeaab4f6337dc1c1687e33170ed4701 Mon Sep 17 00:00:00 2001 From: FUJIWARA Shunichiro Date: Mon, 7 Oct 2019 17:32:06 +0900 Subject: [PATCH 4/8] fix write to stdout/stderr. --- README.md | 24 ++++++++++++------------ supervisor.go | 14 ++++++++++---- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 3a0fd61..b2b2196 100644 --- a/README.md +++ b/README.md @@ -183,18 +183,18 @@ api_key = "API key for FCM" param | status | description ---------------- | ------ | -------------------------------------------------------------------------------------- -port |optional| Listen port number. -worker_num |optional| Number of Gunfish owns http clients. -queue_size |optional| Limit number of posted JSON from the developer application. -max_request_size |optional| Limit size of Posted JSON array. -max_connections |optional| Max connections -key_file |required| The key file path. -cert_file |optional| The cert file path. -kid |optional| kid for APNs provider authentication token. -team_id |optional| team id for APNs provider authentication token. -error_hook |optional| Error hook command. This command runs when Gunfish catches an error response. -error_hook_to |optional| Error hook outputs into stdout/stderr. -api_key |optional| FCM api key. If you want to delivery notifications to android, it is required. +port |optional| Listen port number. +worker\_num |optional| Number of Gunfish owns http clients. +queue_size |optional| Limit number of posted JSON from the developer application. +max\_request\_size |optional| Limit size of Posted JSON array. +max\_connections |optional| Max connections +key\_file |required| The key file path. +cert\_file |optional| The cert file path. +kid |optional| kid for APNs provider authentication token. +team\_id |optional| team id for APNs provider authentication token. +error\_hook |optional| Error hook command. This command runs when Gunfish catches an error response. +error\_hook\_to |optional| Error hook outputs into stdout/stderr. +api\_key |optional| FCM api key. If you want to delivery notifications to android, it is required. ## Error Hook payload diff --git a/supervisor.go b/supervisor.go index 13f22e8..3189065 100644 --- a/supervisor.go +++ b/supervisor.go @@ -1,6 +1,7 @@ package gunfish import ( + "bufio" "bytes" "errors" "fmt" @@ -199,7 +200,7 @@ func (s *Supervisor) startErrorWorkers(conf *config.Config) error { } // not defined if hookCmd == "" && hookTo == "" { - LogWithFields(logf()).Warnf("Neither of error_hook and error_hook_output are not definde.") + LogWithFields(logf()).Warnf("Neither of error_hook and error_hook_to are not definde.") go func() { <-s.errq // dispose simply }() @@ -212,15 +213,20 @@ func (s *Supervisor) startErrorWorkers(conf *config.Config) error { switch strings.ToLower(hookTo) { case "stdout": out = os.Stdout + LogWithFields(logf()).Info("error_hook_to set output to stdout") case "stderr": out = os.Stderr + LogWithFields(logf()).Info("error_hook_to set output to stderr") default: - LogWithFields(logf()).Warnf("error_hook_to allows stdout or stderr only. dispose hook payloads to /dev/null") + LogWithFields(logf()).Warn("error_hook_to allows stdout or stderr only. dispose hook payloads to /dev/null") out = ioutil.Discard } go func() { + w := bufio.NewWriter(out) for e := range s.errq { - if _, err := out.Write(e.input); err != nil { + w.Write(e.input) + io.WriteString(w, "\n") + if err := w.Flush(); err != nil { LogWithFields(logf()).Warnf("failed to write to %s: %s", hookTo, err) return } @@ -412,7 +418,7 @@ func handleAPNsResponse(resp SenderResponse, retryq chan<- Request, errq chan Er func handleFCMResponse(resp SenderResponse, retryq chan<- Request, errq chan Error, logf logrus.Fields) { if resp.Err != nil { req := resp.Req - LogWithFields(logf).Warnf("response is nil. reason: %s", resp.Err.Error()) + LogWithFields(logf).Warnf("unexpected response. reason: %s", resp.Err.Error()) if req.Tries < SendRetryCount { req.Tries++ atomic.AddInt64(&(srvStats.RetryCount), 1) From 8547f28e0a26baebeba73cbdf36b6437eaca7ff9 Mon Sep 17 00:00:00 2001 From: FUJIWARA Shunichiro Date: Fri, 11 Oct 2019 13:54:45 +0900 Subject: [PATCH 5/8] refactoring --- supervisor.go | 85 +++++++++++++++++++++++++++++---------------------- 1 file changed, 49 insertions(+), 36 deletions(-) diff --git a/supervisor.go b/supervisor.go index 3189065..fe1db15 100644 --- a/supervisor.go +++ b/supervisor.go @@ -195,48 +195,61 @@ func (s *Supervisor) startWorkers(conf *config.Config, wqSize int) error { func (s *Supervisor) startErrorWorkers(conf *config.Config) error { hookCmd := conf.Provider.ErrorHook hookTo := conf.Provider.ErrorHookTo + logf := logrus.Fields{"type": "error_worker"} + + // stdout / stderr + if hookTo != "" { + return s.startErrorHookToWorker(hookTo) + } + + // cmd + if hookCmd != "" { + return s.startErrorCmdWorker(hookCmd, conf.Provider.WorkerNum) + } + + // not defined + LogWithFields(logf).Warnf("Neither of error_hook and error_hook_to are not defined.") + go func() { + <-s.errq // dispose simply + }() + return nil +} + +func (s *Supervisor) startErrorHookToWorker(hookTo string) error { logf := func() logrus.Fields { return logrus.Fields{"type": "error_worker"} } - // not defined - if hookCmd == "" && hookTo == "" { - LogWithFields(logf()).Warnf("Neither of error_hook and error_hook_to are not definde.") - go func() { - <-s.errq // dispose simply - }() - return nil + var out io.Writer + switch strings.ToLower(hookTo) { + case "stdout": + out = os.Stdout + LogWithFields(logf()).Info("error_hook_to set output to stdout") + case "stderr": + out = os.Stderr + LogWithFields(logf()).Info("error_hook_to set output to stderr") + default: + LogWithFields(logf()).Warn("error_hook_to allows stdout or stderr only. dispose hook payloads to /dev/null") + out = ioutil.Discard } - - // stdout / stderr - if hookTo != "" { - var out io.Writer - switch strings.ToLower(hookTo) { - case "stdout": - out = os.Stdout - LogWithFields(logf()).Info("error_hook_to set output to stdout") - case "stderr": - out = os.Stderr - LogWithFields(logf()).Info("error_hook_to set output to stderr") - default: - LogWithFields(logf()).Warn("error_hook_to allows stdout or stderr only. dispose hook payloads to /dev/null") - out = ioutil.Discard - } - go func() { - w := bufio.NewWriter(out) - for e := range s.errq { - w.Write(e.input) - io.WriteString(w, "\n") - if err := w.Flush(); err != nil { - LogWithFields(logf()).Warnf("failed to write to %s: %s", hookTo, err) - return - } + go func() { + w := bufio.NewWriter(out) + for e := range s.errq { + w.Write(e.input) + io.WriteString(w, "\n") + if err := w.Flush(); err != nil { + LogWithFields(logf()).Warnf("failed to write to %s: %s", hookTo, err) + return } - }() - return nil - } + } + }() + return nil +} - // otherwise spawn command - for i := 0; i < conf.Provider.WorkerNum; i++ { +func (s *Supervisor) startErrorCmdWorker(hookCmd string, workers int) error { + logf := func() logrus.Fields { + return logrus.Fields{"type": "error_worker"} + } + for i := 0; i < workers; i++ { s.wgrp.Add(1) go func() { for e := range s.errq { From 6e5460e9e66803e625eb294cbdfc320b185464c1 Mon Sep 17 00:00:00 2001 From: FUJIWARA Shunichiro Date: Fri, 11 Oct 2019 14:24:20 +0900 Subject: [PATCH 6/8] add error_hook_command_persistent. to keep a sub process while error processing. --- config/config.go | 17 +++++++------ supervisor.go | 65 +++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 71 insertions(+), 11 deletions(-) diff --git a/config/config.go b/config/config.go index e2e9987..be7362e 100644 --- a/config/config.go +++ b/config/config.go @@ -38,14 +38,15 @@ type Config struct { // SectionProvider is Gunfish provider configuration type SectionProvider struct { - WorkerNum int `toml:"worker_num"` - QueueSize int `toml:"queue_size"` - RequestQueueSize int `toml:"max_request_size"` - Port int `toml:"port"` - DebugPort int - MaxConnections int `toml:"max_connections"` - ErrorHook string `toml:"error_hook"` - ErrorHookTo string `toml:"error_hook_to"` + WorkerNum int `toml:"worker_num"` + QueueSize int `toml:"queue_size"` + RequestQueueSize int `toml:"max_request_size"` + Port int `toml:"port"` + DebugPort int + MaxConnections int `toml:"max_connections"` + ErrorHook string `toml:"error_hook"` + ErrorHookTo string `toml:"error_hook_to"` + ErrorHookCommandPersistent bool `toml:"error_hook_command_persistent"` } // SectionApns is the configure which is loaded from gunfish.toml diff --git a/supervisor.go b/supervisor.go index fe1db15..38a88e6 100644 --- a/supervisor.go +++ b/supervisor.go @@ -204,7 +204,11 @@ func (s *Supervisor) startErrorWorkers(conf *config.Config) error { // cmd if hookCmd != "" { - return s.startErrorCmdWorker(hookCmd, conf.Provider.WorkerNum) + if conf.Provider.ErrorHookCommandPersistent { + return s.startErrorCmdPersistentWorker(hookCmd) + } else { + return s.startErrorCmdWorker(hookCmd, conf.Provider.WorkerNum) + } } // not defined @@ -231,7 +235,9 @@ func (s *Supervisor) startErrorHookToWorker(hookTo string) error { LogWithFields(logf()).Warn("error_hook_to allows stdout or stderr only. dispose hook payloads to /dev/null") out = ioutil.Discard } + s.wgrp.Add(1) go func() { + defer s.wgrp.Done() w := bufio.NewWriter(out) for e := range s.errq { w.Write(e.input) @@ -252,6 +258,7 @@ func (s *Supervisor) startErrorCmdWorker(hookCmd string, workers int) error { for i := 0; i < workers; i++ { s.wgrp.Add(1) go func() { + defer s.wgrp.Done() for e := range s.errq { LogWithFields(logf()).Debugf("invoking command: %s %s", hookCmd, string(e.input)) src := bytes.NewBuffer(e.input) @@ -262,12 +269,46 @@ func (s *Supervisor) startErrorCmdWorker(hookCmd string, workers int) error { LogWithFields(logf()).Debug("Success to execute command") } } - s.wgrp.Done() }() } return nil } +func (s *Supervisor) startErrorCmdPersistentWorker(hookCmd string) error { + logf := func() logrus.Fields { + return logrus.Fields{"type": "error_worker"} + } + _w, err := InvokePipePersistent(hookCmd) + if err != nil { + LogWithFields(logf()).Errorf("failed to invoke command %s", hookCmd) + return err + } + w := bufio.NewWriter(_w) + s.wgrp.Add(1) + go func() { + defer s.wgrp.Done() + LogWithFields(logf()).Debugf("invoking command: %s", hookCmd) + for e := range s.errq { + if w == nil { + _w, err := InvokePipePersistent(hookCmd) + if err != nil { + LogWithFields(logf()).Errorf("failed to invoke command %s", hookCmd) + LogWithFields(logf()).Warnf("failed to process error hook payload: %s", string(e.input)) + continue + } + w = bufio.NewWriter(_w) + } + w.Write(e.input) + w.WriteString("\n") + if err != w.Flush() { + LogWithFields(logf()).Warnf("failed to write STDIN %s, payload: %s", err, string(e.input)) + w = nil + } + } + }() + return nil +} + // Shutdown supervisor func (s *Supervisor) Shutdown() { LogWithFields(logrus.Fields{ @@ -589,6 +630,24 @@ func onResponse(result Result, errq chan<- Error) { } } +func InvokePipePersistent(hook string) (io.WriteCloser, error) { + cmd := exec.Command("sh", "-c", hook) + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("failed: %v %s", cmd, err.Error()) + } + + // merge std(out|err) of command to gunfish + if OutputHookStdout { + cmd.Stdout = os.Stdout + } + if OutputHookStderr { + cmd.Stderr = os.Stderr + } + return stdin, cmd.Start() +} + func InvokePipe(hook string, src io.Reader) ([]byte, error) { logf := logrus.Fields{"type": "invoke_pipe"} cmd := exec.Command("sh", "-c", hook) @@ -616,7 +675,7 @@ func InvokePipe(hook string, src io.Reader) ([]byte, error) { if e, ok := err.(*os.PathError); ok && e.Err == syscall.EPIPE { LogWithFields(logf).Errorf(e.Error()) } else if err != nil { - LogWithFields(logf).Errorf("failed to write STDIN: cmd( %s ), error( %s )", hook, err.Error()) + LogWithFields(logf).Errorf("failed to write STDIN of %s. %s", hook, err.Error()) } stdin.Close() From def88f1a5a1b221e502c32610773477cbed1a409 Mon Sep 17 00:00:00 2001 From: FUJIWARA Shunichiro Date: Fri, 11 Oct 2019 15:07:37 +0900 Subject: [PATCH 7/8] fix error handling --- supervisor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/supervisor.go b/supervisor.go index 38a88e6..fd49fb8 100644 --- a/supervisor.go +++ b/supervisor.go @@ -300,7 +300,7 @@ func (s *Supervisor) startErrorCmdPersistentWorker(hookCmd string) error { } w.Write(e.input) w.WriteString("\n") - if err != w.Flush() { + if err := w.Flush(); err != nil { LogWithFields(logf()).Warnf("failed to write STDIN %s, payload: %s", err, string(e.input)) w = nil } From 9218b74b16cd514820dcf69f8b8cdcd5823d3b94 Mon Sep 17 00:00:00 2001 From: FUJIWARA Shunichiro Date: Fri, 11 Oct 2019 15:11:13 +0900 Subject: [PATCH 8/8] update readme for error_hook_command_persistent --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index b2b2196..a2a78b9 100644 --- a/README.md +++ b/README.md @@ -193,6 +193,7 @@ cert\_file |optional| The cert file path. kid |optional| kid for APNs provider authentication token. team\_id |optional| team id for APNs provider authentication token. error\_hook |optional| Error hook command. This command runs when Gunfish catches an error response. +error\_hook\_command\_persistent |optional|Persist error hook command. error\_hook\_to |optional| Error hook outputs into stdout/stderr. api\_key |optional| FCM api key. If you want to delivery notifications to android, it is required.