diff --git a/README.md b/README.md index 1f6c305..a2a78b9 100644 --- a/README.md +++ b/README.md @@ -121,7 +121,7 @@ Response example: "queue_size": 0, "retry_queue_size": 0, "workers_queue_size": 0, - "cmdq_queue_size": 0, + "error_queue_size": 0, "retry_count": 0, "req_count": 0, "sent_count": 0, @@ -143,7 +143,7 @@ start\_at | The time of started queue\_size | queue size of requests retry\_queue\_size | queue size for resending notification workers\_queue\_size | summary of worker's queue size -command\_queue\_size | error hook command queue size +error\_queue\_size | error hook command queue size retry\_count | summary of retry count request\_count | request count to gunfish err\_count | count of recieving error response @@ -183,21 +183,24 @@ api_key = "API key for FCM" param | status | description ---------------- | ------ | -------------------------------------------------------------------------------------- -port |optional| Listen port number. -worker_num |optional| Number of Gunfish owns http clients. -queue_size |optional| Limit number of posted JSON from the developer application. -max_request_size |optional| Limit size of Posted JSON array. -max_connections |optional| Max connections -key_file |required| The key file path. -cert_file |optional| The cert file path. -kid |optional| kid for APNs provider authentication token. -team_id |optional| team id for APNs provider authentication token. -error_hook |optional| Error hook command. This command runs when Gunfish catches an error response. -api_key |optional| FCM api key. If you want to delivery notifications to android, it is required. - -## Error Hook - -Error hook command can get an each error response with JSON format by STDIN. +port |optional| Listen port number. +worker\_num |optional| Number of Gunfish owns http clients. +queue_size |optional| Limit number of posted JSON from the developer application. +max\_request\_size |optional| Limit size of Posted JSON array. +max\_connections |optional| Max connections +key\_file |required| The key file path. +cert\_file |optional| The cert file path. +kid |optional| kid for APNs provider authentication token. +team\_id |optional| team id for APNs provider authentication token. +error\_hook |optional| Error hook command. This command runs when Gunfish catches an error response. +error\_hook\_command\_persistent |optional|Persist error hook command. +error\_hook\_to |optional| Error hook outputs into stdout/stderr. +api\_key |optional| FCM api key. If you want to delivery notifications to android, it is required. + +## Error Hook payload + +Error hook command/stdout/stderr will get an each error response with JSON format. +The command accepts a payload by STDIN. for example JSON structure: (>= v0.2.x) ```json5 @@ -265,41 +268,9 @@ $ go get github.com/lestrrat/go-server-starter/cmd/start_server $ start_server --port 38003 --pid-file gunfish.pid -- ./gunfish -c conf/gunfish.toml ``` -## Customize - -### How to Implement Response Handlers - -If you have to handle something on error or on success, you should implement error or success handlers. -For example handlers you should implement is given below: - -```go -type CustomYourErrorHandler struct { - hookCmd string -} - -func (ch CustomYourErrorHandler) OnResponse(result Result){ - // ... -} - -func (ch CustomYourErrorHandler) HookCmd( ) string { - return ch.hookCmd -} -``` - -Then you can use these handlers to set before to start gunfish server `( gunfish.StartServer( Config, Environment ) )`. - -```go -InitErrorResponseHandler(CustomYourErrorHandler{hookCmd: "echo 'on error!'"}) -``` - -You can implement a success custom handler in the same way but a hook command is not executed in the success handler in order not to make cpu resource too tight. - ### Test -Requires [dep](https://github.com/golang/dep/) for vendoring. - ``` -$ make get-deps $ make test ``` diff --git a/config/config.go b/config/config.go index 00e9181..be7362e 100644 --- a/config/config.go +++ b/config/config.go @@ -38,13 +38,15 @@ type Config struct { // SectionProvider is Gunfish provider configuration type SectionProvider struct { - WorkerNum int `toml:"worker_num"` - QueueSize int `toml:"queue_size"` - RequestQueueSize int `toml:"max_request_size"` - Port int `toml:"port"` - DebugPort int - MaxConnections int `toml:"max_connections"` - ErrorHook string `toml:"error_hook"` + WorkerNum int `toml:"worker_num"` + QueueSize int `toml:"queue_size"` + RequestQueueSize int `toml:"max_request_size"` + Port int `toml:"port"` + DebugPort int + MaxConnections int `toml:"max_connections"` + ErrorHook string `toml:"error_hook"` + ErrorHookTo string `toml:"error_hook_to"` + ErrorHookCommandPersistent bool `toml:"error_hook_command_persistent"` } // SectionApns is the configure which is loaded from gunfish.toml diff --git a/server.go b/server.go index 2681bfd..5a9c361 100644 --- a/server.go +++ b/server.go @@ -34,7 +34,6 @@ type Provider struct { // Therefore, you can specifies hook command which is set at toml file. type ResponseHandler interface { OnResponse(Result) - HookCmd() string } // DefaultResponseHandler is the default ResponseHandler if not specified. @@ -46,12 +45,6 @@ type DefaultResponseHandler struct { func (rh DefaultResponseHandler) OnResponse(result Result) { } -// HookCmd returns hook command to execute after getting response from APNS -// only when to get error response. -func (rh DefaultResponseHandler) HookCmd() string { - return rh.Hook -} - // StartServer starts an apns provider server on http. func StartServer(conf config.Config, env Environment) { // Initialize DefaultResponseHandler if response handlers are not defined. @@ -60,7 +53,7 @@ func StartServer(conf config.Config, env Environment) { } if errorResponseHandler == nil { - InitErrorResponseHandler(DefaultResponseHandler{Hook: conf.Provider.ErrorHook}) + InitErrorResponseHandler(DefaultResponseHandler{}) } // Init Provider @@ -337,7 +330,7 @@ func (prov *Provider) StatsHandler() http.HandlerFunc { atomic.StoreInt64(&(srvStats.QueueSize), int64(len(prov.Sup.queue))) atomic.StoreInt64(&(srvStats.RetryQueueSize), int64(len(prov.Sup.retryq))) atomic.StoreInt64(&(srvStats.WorkersQueueSize), int64(wqs)) - atomic.StoreInt64(&(srvStats.CommandQueueSize), int64(len(prov.Sup.cmdq))) + atomic.StoreInt64(&(srvStats.ErrorQueueSize), int64(len(prov.Sup.errq))) res.WriteHeader(http.StatusOK) encoder := json.NewEncoder(res) err := encoder.Encode(srvStats.GetStats()) diff --git a/stat.go b/stat.go index 4391bde..076a089 100644 --- a/stat.go +++ b/stat.go @@ -20,7 +20,7 @@ type Stats struct { QueueSize int64 `json:"queue_size"` RetryQueueSize int64 `json:"retry_queue_size"` WorkersQueueSize int64 `json:"workers_queue_size"` - CommandQueueSize int64 `json:"cmdq_queue_size"` + ErrorQueueSize int64 `json:"error_queue_size"` RetryCount int64 `json:"retry_count"` RequestCount int64 `json:"req_count"` SentCount int64 `json:"sent_count"` diff --git a/supervisor.go b/supervisor.go index 270a56c..fd49fb8 100644 --- a/supervisor.go +++ b/supervisor.go @@ -1,13 +1,16 @@ package gunfish import ( + "bufio" "bytes" "errors" "fmt" "io" + "io/ioutil" "net/http" "os" "os/exec" + "strings" "sync" "sync/atomic" "syscall" @@ -24,7 +27,7 @@ import ( type Supervisor struct { queue chan *[]Request // supervisor's queue that recieves POST requests. retryq chan Request // enqueues this retry queue when to failed to send notification on the http layer. - cmdq chan Command // enqueues this command queue when to get error response from apns. + errq chan Error // enqueues this command queue when to get error response from apns. exit chan struct{} // exit channel is used to stop the supervisor. ticker *time.Ticker // ticker checks retry queue that has notifications to resend periodically. wgrp *sync.WaitGroup @@ -54,9 +57,8 @@ type SenderResponse struct { } // Command has execute command and input stream. -type Command struct { - command string - input []byte +type Error struct { + input []byte } // EnqueueClientRequest enqueues request to supervisor's queue from external application service @@ -96,7 +98,7 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) { s := Supervisor{ queue: make(chan *[]Request, conf.Provider.QueueSize), retryq: make(chan Request, conf.Provider.RequestQueueSize*conf.Provider.WorkerNum), - cmdq: make(chan Command, wqSize*conf.Provider.WorkerNum), + errq: make(chan Error, wqSize*conf.Provider.WorkerNum), exit: make(chan struct{}, 1), ticker: time.NewTicker(RetryWaitTime), wgrp: swgrp, @@ -133,25 +135,17 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) { } }() - // spawn command - for i := 0; i < conf.Provider.WorkerNum; i++ { - s.wgrp.Add(1) - go func() { - logf := logrus.Fields{"type": "cmd_worker"} - for c := range s.cmdq { - LogWithFields(logf).Debugf("invoking command: %s %s", c.command, string(c.input)) - src := bytes.NewBuffer(c.input) - out, err := InvokePipe(c.command, src) - if err != nil { - LogWithFields(logf).Errorf("(%s) %s", err.Error(), string(out)) - } else { - LogWithFields(logf).Debugf("Success to execute command") - } - } - s.wgrp.Done() - }() + if err := s.startErrorWorkers(conf); err != nil { + return Supervisor{}, err + } + + if err := s.startWorkers(conf, wqSize); err != nil { + return Supervisor{}, err } + return s, nil +} +func (s *Supervisor) startWorkers(conf *config.Config, wqSize int) error { // Spawn workers var err error for i := 0; i < conf.Provider.WorkerNum; i++ { @@ -195,11 +189,124 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) { "worker_id": i, }).Debugf("Spawned worker-%d.", i) } + return err +} + +func (s *Supervisor) startErrorWorkers(conf *config.Config) error { + hookCmd := conf.Provider.ErrorHook + hookTo := conf.Provider.ErrorHookTo + logf := logrus.Fields{"type": "error_worker"} + + // stdout / stderr + if hookTo != "" { + return s.startErrorHookToWorker(hookTo) + } + + // cmd + if hookCmd != "" { + if conf.Provider.ErrorHookCommandPersistent { + return s.startErrorCmdPersistentWorker(hookCmd) + } else { + return s.startErrorCmdWorker(hookCmd, conf.Provider.WorkerNum) + } + } + + // not defined + LogWithFields(logf).Warnf("Neither of error_hook and error_hook_to are not defined.") + go func() { + <-s.errq // dispose simply + }() + return nil +} + +func (s *Supervisor) startErrorHookToWorker(hookTo string) error { + logf := func() logrus.Fields { + return logrus.Fields{"type": "error_worker"} + } + var out io.Writer + switch strings.ToLower(hookTo) { + case "stdout": + out = os.Stdout + LogWithFields(logf()).Info("error_hook_to set output to stdout") + case "stderr": + out = os.Stderr + LogWithFields(logf()).Info("error_hook_to set output to stderr") + default: + LogWithFields(logf()).Warn("error_hook_to allows stdout or stderr only. dispose hook payloads to /dev/null") + out = ioutil.Discard + } + s.wgrp.Add(1) + go func() { + defer s.wgrp.Done() + w := bufio.NewWriter(out) + for e := range s.errq { + w.Write(e.input) + io.WriteString(w, "\n") + if err := w.Flush(); err != nil { + LogWithFields(logf()).Warnf("failed to write to %s: %s", hookTo, err) + return + } + } + }() + return nil +} + +func (s *Supervisor) startErrorCmdWorker(hookCmd string, workers int) error { + logf := func() logrus.Fields { + return logrus.Fields{"type": "error_worker"} + } + for i := 0; i < workers; i++ { + s.wgrp.Add(1) + go func() { + defer s.wgrp.Done() + for e := range s.errq { + LogWithFields(logf()).Debugf("invoking command: %s %s", hookCmd, string(e.input)) + src := bytes.NewBuffer(e.input) + out, err := InvokePipe(hookCmd, src) + if err != nil { + LogWithFields(logf()).Errorf("(%s) %s", err.Error(), string(out)) + } else { + LogWithFields(logf()).Debug("Success to execute command") + } + } + }() + } + return nil +} +func (s *Supervisor) startErrorCmdPersistentWorker(hookCmd string) error { + logf := func() logrus.Fields { + return logrus.Fields{"type": "error_worker"} + } + _w, err := InvokePipePersistent(hookCmd) if err != nil { - return Supervisor{}, err + LogWithFields(logf()).Errorf("failed to invoke command %s", hookCmd) + return err } - return s, nil + w := bufio.NewWriter(_w) + s.wgrp.Add(1) + go func() { + defer s.wgrp.Done() + LogWithFields(logf()).Debugf("invoking command: %s", hookCmd) + for e := range s.errq { + if w == nil { + _w, err := InvokePipePersistent(hookCmd) + if err != nil { + LogWithFields(logf()).Errorf("failed to invoke command %s", hookCmd) + LogWithFields(logf()).Warnf("failed to process error hook payload: %s", string(e.input)) + continue + } + w = bufio.NewWriter(_w) + } + w.Write(e.input) + w.WriteString("\n") + if err := w.Flush(); err != nil { + LogWithFields(logf()).Warnf("failed to write STDIN %s, payload: %s", err, string(e.input)) + w = nil + } + } + }() + return nil } // Shutdown supervisor @@ -213,7 +320,7 @@ func (s *Supervisor) Shutdown() { tryCnt := 0 for zeroCnt < RestartWaitCount { // if 's.counter' is not 0 potentially, here loop should not cancel to wait. - if len(s.queue)+len(s.cmdq)+len(s.retryq)+s.workersAllQueueLength() > 0 { + if len(s.queue)+len(s.errq)+len(s.retryq)+s.workersAllQueueLength() > 0 { zeroCnt = 0 tryCnt++ } else { @@ -232,7 +339,7 @@ func (s *Supervisor) Shutdown() { time.Sleep(ShutdownWaitTime) } close(s.exit) - close(s.cmdq) + close(s.errq) s.wgrp.Wait() close(s.queue) close(s.retryq) @@ -268,7 +375,7 @@ func (s *Supervisor) spawnWorker(w Worker, conf *config.Config) { case reqs := <-s.queue: w.receiveRequests(reqs) case resp := <-w.respq: - w.receiveResponse(resp, s.retryq, s.cmdq) + w.receiveResponse(resp, s.retryq, s.errq) case <-s.exit: return } @@ -279,7 +386,7 @@ func (s *Supervisor) spawnWorker(w Worker, conf *config.Config) { w.wgrp.Wait() } -func (w *Worker) receiveResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Command) { +func (w *Worker) receiveResponse(resp SenderResponse, retryq chan<- Request, errq chan Error) { req := resp.Req switch t := req.Notification.(type) { @@ -297,7 +404,7 @@ func (w *Worker) receiveResponse(resp SenderResponse, retryq chan<- Request, cmd "response_time": resp.RespTime, "resp_uid": resp.UID, } - handleAPNsResponse(resp, retryq, cmdq, logf) + handleAPNsResponse(resp, retryq, errq, logf) case fcm.Payload: p := req.Notification.(fcm.Payload) logf := logrus.Fields{ @@ -311,14 +418,14 @@ func (w *Worker) receiveResponse(resp SenderResponse, retryq chan<- Request, cmd "response_time": resp.RespTime, "resp_uid": resp.UID, } - handleFCMResponse(resp, retryq, cmdq, logf) + handleFCMResponse(resp, retryq, errq, logf) default: LogWithFields(logrus.Fields{"type": "worker"}).Infof("Unknown request type:%s", t) } } -func handleAPNsResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Command, logf logrus.Fields) { +func handleAPNsResponse(resp SenderResponse, retryq chan<- Request, errq chan Error, logf logrus.Fields) { req := resp.Req // Response handling @@ -332,7 +439,7 @@ func handleAPNsResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Co logf["status"] = result.Status() LogWithFields(logf).Errorf("%s", resp.Err) // Error handling - onResponse(result, errorResponseHandler.HookCmd(), cmdq) + onResponse(result, errq) } else { // if 'result' is nil, HTTP connection error with APNS. retry(retryq, req, errors.New("http connection error between APNs"), logf) @@ -352,20 +459,20 @@ func handleAPNsResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Co retry(retryq, req, err, logf) } - onResponse(result, errorResponseHandler.HookCmd(), cmdq) + onResponse(result, errq) LogWithFields(logf).Errorf("%s", err) } else { - onResponse(result, "", cmdq) + onResponse(result, errq) LogWithFields(logf).Info("Succeeded to send a notification") } } } } -func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Command, logf logrus.Fields) { +func handleFCMResponse(resp SenderResponse, retryq chan<- Request, errq chan Error, logf logrus.Fields) { if resp.Err != nil { req := resp.Req - LogWithFields(logf).Warnf("response is nil. reason: %s", resp.Err.Error()) + LogWithFields(logf).Warnf("unexpected response. reason: %s", resp.Err.Error()) if req.Tries < SendRetryCount { req.Tries++ atomic.AddInt64(&(srvStats.RetryCount), 1) @@ -398,7 +505,7 @@ func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Com atomic.AddInt64(&(srvStats.ErrCount), 1) switch err.Error() { case fcm.InvalidRegistration.String(), fcm.NotRegistered.String(): - onResponse(result, errorResponseHandler.HookCmd(), cmdq) + onResponse(result, errq) LogWithFields(logf).Errorf("%s", err) default: LogWithFields(logf).Errorf("Unknown error message: %s", err) @@ -495,7 +602,7 @@ func (s Supervisor) workersAllQueueLength() int { return sum } -func onResponse(result Result, cmd string, cmdq chan<- Command) { +func onResponse(result Result, errq chan<- Error) { logf := logrus.Fields{ "provider": result.Provider(), "type": "on_response", @@ -511,21 +618,34 @@ func onResponse(result Result, cmd string, cmdq chan<- Command) { successResponseHandler.OnResponse(result) } - if cmd == "" { - return - } - b, _ := result.MarshalJSON() - command := Command{ - command: cmd, - input: b, + error := Error{ + input: b, } select { - case cmdq <- command: - LogWithFields(logf).Debugf("Enqueue command: %v", command) + case errq <- error: + LogWithFields(logf).Debugf("Enqueue error: %v", error) default: - LogWithFields(logf).Warnf("Command queue is full, so could not execute commnad: %v", command) + LogWithFields(logf).Warnf("Error queue is full. dropping error: %v", error) + } +} + +func InvokePipePersistent(hook string) (io.WriteCloser, error) { + cmd := exec.Command("sh", "-c", hook) + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("failed: %v %s", cmd, err.Error()) + } + + // merge std(out|err) of command to gunfish + if OutputHookStdout { + cmd.Stdout = os.Stdout + } + if OutputHookStderr { + cmd.Stderr = os.Stderr } + return stdin, cmd.Start() } func InvokePipe(hook string, src io.Reader) ([]byte, error) { @@ -555,7 +675,7 @@ func InvokePipe(hook string, src io.Reader) ([]byte, error) { if e, ok := err.(*os.PathError); ok && e.Err == syscall.EPIPE { LogWithFields(logf).Errorf(e.Error()) } else if err != nil { - LogWithFields(logf).Errorf("failed to write STDIN: cmd( %s ), error( %s )", hook, err.Error()) + LogWithFields(logf).Errorf("failed to write STDIN of %s. %s", hook, err.Error()) } stdin.Close()