From 02202aca88c33a7b605bfb72b5a027c8ef4a82c8 Mon Sep 17 00:00:00 2001 From: j-justin Date: Thu, 20 Sep 2018 10:19:32 +0200 Subject: [PATCH 1/8] ADD jaeger tracing for http requests - Tracer has been added, that reports spans to jaeger tracer - If jaeger is not running, no tracer is created and no spans are reported --- cmd/cloudAuth/main.go | 6 +++ cmd/cloudDiscovery/main.go | 5 +++ cmd/cloudStatusReporter/main.go | 6 +++ cmd/cloudStorage/main.go | 5 +++ cmd/localAuth/main.go | 6 +++ cmd/localDiscovery/main.go | 5 +++ cmd/localStatusReporter/main.go | 6 +++ cmd/localStorage/main.go | 5 +++ docker-compose.yml | 7 +++ service/authorizer/service.go | 1 + service/tracing/mockCloser.go | 8 ++++ service/tracing/tracing.go | 77 +++++++++++++++++++++++++++++++++ vendor/vendor.json | 14 +++--- 13 files changed, 144 insertions(+), 7 deletions(-) create mode 100644 service/tracing/mockCloser.go create mode 100644 service/tracing/tracing.go diff --git a/cmd/cloudAuth/main.go b/cmd/cloudAuth/main.go index ab06f913..df2b8197 100644 --- a/cmd/cloudAuth/main.go +++ b/cmd/cloudAuth/main.go @@ -25,6 +25,7 @@ import ( metricsServer "github.com/iryonetwork/wwm/metrics/server" "github.com/iryonetwork/wwm/service/authDataManager" "github.com/iryonetwork/wwm/service/authenticator" + "github.com/iryonetwork/wwm/service/tracing" statusServer "github.com/iryonetwork/wwm/status/server" "github.com/iryonetwork/wwm/storage/auth" "github.com/iryonetwork/wwm/utils" @@ -209,6 +210,11 @@ func main() { }).Handler(api.Serve(nil)) handler = logMW.APILogMiddleware(handler, logger) handler = apiMetrics.Middleware(handler) + // add tracer middleware + traceCloser := tracing.New("cloudAuth", "jaeger:5775") + defer traceCloser.Close() + handler = tracing.Middleware(handler) + server.SetHandler(handler) // Start servers diff --git a/cmd/cloudDiscovery/main.go b/cmd/cloudDiscovery/main.go index 63d92a9a..62d56bb3 100644 --- a/cmd/cloudDiscovery/main.go +++ b/cmd/cloudDiscovery/main.go @@ -22,6 +22,7 @@ import ( metricsServer "github.com/iryonetwork/wwm/metrics/server" "github.com/iryonetwork/wwm/service/authorizer" discoveryService "github.com/iryonetwork/wwm/service/discovery" + "github.com/iryonetwork/wwm/service/tracing" statusServer "github.com/iryonetwork/wwm/status/server" discoveryStorage "github.com/iryonetwork/wwm/storage/discovery" "github.com/iryonetwork/wwm/utils" @@ -115,6 +116,10 @@ func main() { AllowedHeaders: []string{"Authorization", "Content-Type"}, }).Handler(api.Serve(nil)) handler = m.Middleware(handler) + // add tracer middleware + traceCloser := tracing.New("cloudDiscovery", "jaeger:5775") + defer traceCloser.Close() + handler = tracing.Middleware(handler) server.SetHandler(handler) diff --git a/cmd/cloudStatusReporter/main.go b/cmd/cloudStatusReporter/main.go index 480df1db..21cf7537 100644 --- a/cmd/cloudStatusReporter/main.go +++ b/cmd/cloudStatusReporter/main.go @@ -16,6 +16,7 @@ import ( metricsServer "github.com/iryonetwork/wwm/metrics/server" "github.com/iryonetwork/wwm/service/statusReporter" "github.com/iryonetwork/wwm/service/statusReporter/polling" + "github.com/iryonetwork/wwm/service/tracing" ) func main() { @@ -85,6 +86,11 @@ func main() { AllowedHeaders: []string{"Authorization", "Content-Type"}, }).Handler(m.Middleware(log.APILogMiddleware(r.Handler("status"), logger))) + // add tracer middleware + traceCloser := tracing.New("cloudStatusReporter", "jaeger:5775") + defer traceCloser.Close() + handler = tracing.Middleware(handler) + httpServer := &http.Server{ Addr: fmt.Sprintf("%s:%d", cfg.ServerHost, cfg.ServerPortHTTP), Handler: handler, diff --git a/cmd/cloudStorage/main.go b/cmd/cloudStorage/main.go index 06fa3c18..5aeee787 100644 --- a/cmd/cloudStorage/main.go +++ b/cmd/cloudStorage/main.go @@ -25,6 +25,7 @@ import ( metricsServer "github.com/iryonetwork/wwm/metrics/server" "github.com/iryonetwork/wwm/service/authorizer" storage "github.com/iryonetwork/wwm/service/storage" + "github.com/iryonetwork/wwm/service/tracing" statusServer "github.com/iryonetwork/wwm/status/server" "github.com/iryonetwork/wwm/storage/s3" "github.com/iryonetwork/wwm/sync/storage/publisher" @@ -124,6 +125,10 @@ func main() { }).Handler(api.Serve(nil)) handler = logMW.APILogMiddleware(handler, logger) handler = m.Middleware(handler) + // add tracer middleware + traceCloser := tracing.New("cloudStorage", "jaeger:5775") + defer traceCloser.Close() + handler = tracing.Middleware(handler) server.SetHandler(handler) diff --git a/cmd/localAuth/main.go b/cmd/localAuth/main.go index 61df058f..7281262e 100644 --- a/cmd/localAuth/main.go +++ b/cmd/localAuth/main.go @@ -26,6 +26,7 @@ import ( "github.com/iryonetwork/wwm/service/authDataManager" "github.com/iryonetwork/wwm/service/authSync" "github.com/iryonetwork/wwm/service/authenticator" + "github.com/iryonetwork/wwm/service/tracing" statusServer "github.com/iryonetwork/wwm/status/server" "github.com/iryonetwork/wwm/storage/auth" "github.com/iryonetwork/wwm/utils" @@ -192,6 +193,11 @@ func main() { }).Handler(api.Serve(nil)) handler = logMW.APILogMiddleware(handler, logger) handler = apiMetrics.Middleware(handler) + // add tracer middleware + traceCloser := tracing.New("localAuth", "jaeger:5775") + defer traceCloser.Close() + handler = tracing.Middleware(handler) + server.SetHandler(handler) gocron.Every(5).Minutes().Do(authSync.Sync) diff --git a/cmd/localDiscovery/main.go b/cmd/localDiscovery/main.go index ce19ef92..3a3ea0ab 100644 --- a/cmd/localDiscovery/main.go +++ b/cmd/localDiscovery/main.go @@ -23,6 +23,7 @@ import ( metricsServer "github.com/iryonetwork/wwm/metrics/server" "github.com/iryonetwork/wwm/service/authorizer" discoveryService "github.com/iryonetwork/wwm/service/discovery" + "github.com/iryonetwork/wwm/service/tracing" statusServer "github.com/iryonetwork/wwm/status/server" discoveryStorage "github.com/iryonetwork/wwm/storage/discovery" "github.com/iryonetwork/wwm/utils" @@ -121,6 +122,10 @@ func main() { AllowedHeaders: []string{"Authorization", "Content-Type"}, }).Handler(api.Serve(nil)) handler = m.Middleware(handler) + // add tracer middleware + traceCloser := tracing.New("localDiscovery", "jaeger:5775") + defer traceCloser.Close() + handler = tracing.Middleware(handler) server.SetHandler(handler) diff --git a/cmd/localStatusReporter/main.go b/cmd/localStatusReporter/main.go index 52ef9bca..1e5bc798 100644 --- a/cmd/localStatusReporter/main.go +++ b/cmd/localStatusReporter/main.go @@ -16,6 +16,7 @@ import ( metricsServer "github.com/iryonetwork/wwm/metrics/server" "github.com/iryonetwork/wwm/service/statusReporter" "github.com/iryonetwork/wwm/service/statusReporter/polling" + "github.com/iryonetwork/wwm/service/tracing" ) func main() { @@ -99,6 +100,11 @@ func main() { AllowedHeaders: []string{"Authorization", "Content-Type"}, }).Handler(m.Middleware(log.APILogMiddleware(r.Handler("status"), logger))) + // add tracer middleware + traceCloser := tracing.New("localStatusReporter", "jaeger:5775") + defer traceCloser.Close() + handler = tracing.Middleware(handler) + httpServer := &http.Server{ Addr: fmt.Sprintf("%s:%d", cfg.ServerHost, cfg.ServerPortHTTP), Handler: handler, diff --git a/cmd/localStorage/main.go b/cmd/localStorage/main.go index 1b2472e3..4b955e7a 100644 --- a/cmd/localStorage/main.go +++ b/cmd/localStorage/main.go @@ -29,6 +29,7 @@ import ( metricsServer "github.com/iryonetwork/wwm/metrics/server" "github.com/iryonetwork/wwm/service/authorizer" storage "github.com/iryonetwork/wwm/service/storage" + "github.com/iryonetwork/wwm/service/tracing" statusServer "github.com/iryonetwork/wwm/status/server" "github.com/iryonetwork/wwm/storage/s3" storageSync "github.com/iryonetwork/wwm/sync/storage" @@ -179,6 +180,10 @@ func main() { }).Handler(api.Serve(nil)) handler = logMW.APILogMiddleware(handler, logger) handler = m.Middleware(handler) + // add tracer middleware + traceCloser := tracing.New("localStorage", "jaeger:5775") + defer traceCloser.Close() + handler = tracing.Middleware(handler) server.SetHandler(handler) diff --git a/docker-compose.yml b/docker-compose.yml index dfa1bf26..eb7552c3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -383,3 +383,10 @@ services: - DB_USERNAME=reportgenerator - DB_PASSWORD=reportgenerator - REPORT_SPECS_FILEPATHS=/patientsReportSpec.json,/encountersReportSpec.json + + jaeger: + image: jaegertracing/all-in-one:1.3.0 + ports: + - 127.0.0.1:5775:5775/udp # accept zipkin.thrift over compact thrift protocol + - 127.0.0.1:16686:16686 # serve frontend + \ No newline at end of file diff --git a/service/authorizer/service.go b/service/authorizer/service.go index f11adb5e..3c123296 100644 --- a/service/authorizer/service.go +++ b/service/authorizer/service.go @@ -119,6 +119,7 @@ func (a *authorizer) Authorizer() runtime.Authorizer { } r.Header.Add("Authorization", request.Header.Get("Authorization")) r.Header.Add("Content-Type", "application/json") + r.Header.Add("Uber-Trace-Id", request.Header.Get("Uber-Trace-Id")) transport := &http.Transport{} netClient := &http.Client{ diff --git a/service/tracing/mockCloser.go b/service/tracing/mockCloser.go new file mode 100644 index 00000000..0638efb0 --- /dev/null +++ b/service/tracing/mockCloser.go @@ -0,0 +1,8 @@ +package tracing + +type MockCloser struct { +} + +func (c MockCloser) Close() error { + return nil +} diff --git a/service/tracing/tracing.go b/service/tracing/tracing.go new file mode 100644 index 00000000..6f3b96c5 --- /dev/null +++ b/service/tracing/tracing.go @@ -0,0 +1,77 @@ +package tracing + +import ( + "fmt" + "io" + "log" + "net/http" + "time" + + opentracing "github.com/opentracing/opentracing-go" + jaeger "github.com/uber/jaeger-client-go" + "github.com/uber/jaeger-client-go/config" +) + +var tracerIsSet = false + +// New sets opentracing.GlobalTracer() to tracer created from function options +// returns Closer, which is used to close the tracker +func New(serviceName, hostPort string) io.Closer { + log.Printf("Creating new tracer %s on host %s", serviceName, hostPort) + + cfg := config.Configuration{ + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + Reporter: &config.ReporterConfig{ + LogSpans: true, + BufferFlushInterval: 1 * time.Second, + LocalAgentHostPort: hostPort, + }, + } + + tracer, closer, err := cfg.New( + serviceName, + config.Logger(jaeger.StdLogger), + ) + if err != nil { + log.Printf("Error initializing tracker: %v", err) + return MockCloser{} + } + + opentracing.SetGlobalTracer(tracer) + tracerIsSet = true + + return closer +} + +func Middleware(h http.Handler) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if tracerIsSet { + var sp opentracing.Span + spanName := fmt.Sprintf("HTTP %s", r.URL.Path) + wireContext, err := opentracing.GlobalTracer().Extract( + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(r.Header)) + + if err != nil { + // If for whatever reason we can't join, go ahead an start a new root span. + sp = opentracing.StartSpan(spanName) + log.Printf("TRACE NOT FOUND, %v", err) + } else { + sp = opentracing.StartSpan(spanName, opentracing.ChildOf(wireContext)) + log.Printf("TRACE FOUND") + } + defer sp.Finish() + + sp.Tracer().Inject( + sp.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(r.Header)) + } else { + log.Printf("Tracer is not set") + } + h.ServeHTTP(w, r) + } +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 6f69c103..4193c4c1 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -589,7 +589,7 @@ "revisionTime": "2018-03-19T19:41:35Z" }, { - "checksumSHA1": "ATnwV0POluBNQEMjPdylodz0oK0=", + "checksumSHA1": "AU3fA8Sm33Vj9PBoRPSeYfxLRuE=", "path": "github.com/lib/pq/oid", "revision": "88edab0803230a3898347e77b474f8c1820a1f20", "revisionTime": "2018-02-01T18:47:07Z" @@ -625,7 +625,7 @@ "revisionTime": "2016-07-23T06:10:19Z" }, { - "checksumSHA1": "JrFdmAFh7XYlVR/ONVkycAztGcc=", + "checksumSHA1": "B0jv3Wmurf7JvZPD/Tp6m2pRQTw=", "path": "github.com/minio/minio-go", "revision": "d218e4cb1bfc13dcef0eb5c3e74507a35be0dd3a", "revisionTime": "2018-01-13T00:13:38Z" @@ -709,7 +709,7 @@ "revisionTime": "2018-01-30T22:10:49Z" }, { - "checksumSHA1": "gBWCOqKsG1Uw/GJFFYGUhbNxkt0=", + "checksumSHA1": "iiNWEMdn9BdSYqbRGDdMak/OVIE=", "path": "github.com/nats-io/gnatsd/util", "revision": "f39cd53d0712265557576d74aac9351170a70c08", "revisionTime": "2018-01-30T22:10:49Z" @@ -949,7 +949,7 @@ "revisionTime": "2017-09-09T04:35:08Z" }, { - "checksumSHA1": "p5aJ1Wc4KA738r7q77NR/0qLKsA=", + "checksumSHA1": "G99Z/xW4ukmt8SHnsO9SIX71hFE=", "path": "golang.org/x/sys/unix", "revision": "dd9ec17814d5b1de388ebe438ec013d57d8b3779", "revisionTime": "2018-01-06T05:11:39Z" @@ -1003,19 +1003,19 @@ "revisionTime": "2018-03-01T21:24:51Z" }, { - "checksumSHA1": "1oQpUH9BjCWlqFPDahRH+UMlYy4=", + "checksumSHA1": "w8kDfZ1Ug+qAcVU0v8obbu3aDOY=", "path": "golang.org/x/text/unicode/bidi", "revision": "8c34f848e18c4bd34d02db7f19a0ed1a0a8f5852", "revisionTime": "2018-03-01T21:24:51Z" }, { - "checksumSHA1": "lN2xlA6Utu7tXy2iUoMF2+y9EUE=", + "checksumSHA1": "BCNYmf4Ek93G4lk5x3ucNi/lTwA=", "path": "golang.org/x/text/unicode/norm", "revision": "8c34f848e18c4bd34d02db7f19a0ed1a0a8f5852", "revisionTime": "2018-03-01T21:24:51Z" }, { - "checksumSHA1": "+2zgqW4XeI7g/N/QYeY7BPlHd2Y=", + "checksumSHA1": "zuieOGXKG9vXxNhEWv3H0X8RSXM=", "path": "golang.org/x/text/width", "revision": "e19ae1496984b1c655b8044a65c0300a3c878dd3", "revisionTime": "2017-12-24T20:31:28Z" From c0794b5d8a4041a299cef6b0dc7411d9dd8fb1e5 Mon Sep 17 00:00:00 2001 From: j-justin Date: Thu, 20 Sep 2018 10:57:11 +0200 Subject: [PATCH 2/8] ADD tracing of functions - Function tracing.NewSpan takes span name and function, that is exectued. It's execution time is reported to opantracing.GloblaTracer --- cmd/cloudAuth/main.go | 5 +- cmd/cloudDiscovery/main.go | 5 +- cmd/cloudStatusReporter/main.go | 5 +- cmd/cloudStorage/main.go | 5 +- cmd/localAuth/main.go | 5 +- cmd/localDiscovery/main.go | 5 +- cmd/localStatusReporter/main.go | 5 +- cmd/localStorage/main.go | 5 +- service/authenticator/auth.go | 70 +++++++------- service/authorizer/service.go | 156 +++++++++++++++++--------------- service/tracing/tracing.go | 70 +++++++++++++- 11 files changed, 211 insertions(+), 125 deletions(-) diff --git a/cmd/cloudAuth/main.go b/cmd/cloudAuth/main.go index df2b8197..b7cb622c 100644 --- a/cmd/cloudAuth/main.go +++ b/cmd/cloudAuth/main.go @@ -32,6 +32,9 @@ import ( ) func main() { + traceCloser := tracing.New("cloudAuth", "jaeger:5775") + defer traceCloser.Close() + logger := zerolog.New(os.Stdout).With(). Timestamp(). Str("service", "cloudAuth"). @@ -211,8 +214,6 @@ func main() { handler = logMW.APILogMiddleware(handler, logger) handler = apiMetrics.Middleware(handler) // add tracer middleware - traceCloser := tracing.New("cloudAuth", "jaeger:5775") - defer traceCloser.Close() handler = tracing.Middleware(handler) server.SetHandler(handler) diff --git a/cmd/cloudDiscovery/main.go b/cmd/cloudDiscovery/main.go index 62d56bb3..6b8ace6a 100644 --- a/cmd/cloudDiscovery/main.go +++ b/cmd/cloudDiscovery/main.go @@ -29,6 +29,9 @@ import ( ) func main() { + traceCloser := tracing.New("cloudDiscovery", "jaeger:5775") + defer traceCloser.Close() + // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -117,8 +120,6 @@ func main() { }).Handler(api.Serve(nil)) handler = m.Middleware(handler) // add tracer middleware - traceCloser := tracing.New("cloudDiscovery", "jaeger:5775") - defer traceCloser.Close() handler = tracing.Middleware(handler) server.SetHandler(handler) diff --git a/cmd/cloudStatusReporter/main.go b/cmd/cloudStatusReporter/main.go index 21cf7537..77e0d575 100644 --- a/cmd/cloudStatusReporter/main.go +++ b/cmd/cloudStatusReporter/main.go @@ -20,6 +20,9 @@ import ( ) func main() { + traceCloser := tracing.New("cloudStatusReporter", "jaeger:5775") + defer traceCloser.Close() + // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -87,8 +90,6 @@ func main() { }).Handler(m.Middleware(log.APILogMiddleware(r.Handler("status"), logger))) // add tracer middleware - traceCloser := tracing.New("cloudStatusReporter", "jaeger:5775") - defer traceCloser.Close() handler = tracing.Middleware(handler) httpServer := &http.Server{ diff --git a/cmd/cloudStorage/main.go b/cmd/cloudStorage/main.go index 5aeee787..325decf8 100644 --- a/cmd/cloudStorage/main.go +++ b/cmd/cloudStorage/main.go @@ -34,6 +34,9 @@ import ( ) func main() { + traceCloser := tracing.New("cloudStorage", "jaeger:5775") + defer traceCloser.Close() + // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -126,8 +129,6 @@ func main() { handler = logMW.APILogMiddleware(handler, logger) handler = m.Middleware(handler) // add tracer middleware - traceCloser := tracing.New("cloudStorage", "jaeger:5775") - defer traceCloser.Close() handler = tracing.Middleware(handler) server.SetHandler(handler) diff --git a/cmd/localAuth/main.go b/cmd/localAuth/main.go index 7281262e..6d26c68c 100644 --- a/cmd/localAuth/main.go +++ b/cmd/localAuth/main.go @@ -33,6 +33,9 @@ import ( ) func main() { + traceCloser := tracing.New("localAuth", "jaeger:5775") + defer traceCloser.Close() + logger := zerolog.New(os.Stdout).With(). Timestamp(). Str("service", "localAuth"). @@ -194,8 +197,6 @@ func main() { handler = logMW.APILogMiddleware(handler, logger) handler = apiMetrics.Middleware(handler) // add tracer middleware - traceCloser := tracing.New("localAuth", "jaeger:5775") - defer traceCloser.Close() handler = tracing.Middleware(handler) server.SetHandler(handler) diff --git a/cmd/localDiscovery/main.go b/cmd/localDiscovery/main.go index 3a3ea0ab..2f7aaa6f 100644 --- a/cmd/localDiscovery/main.go +++ b/cmd/localDiscovery/main.go @@ -30,6 +30,9 @@ import ( ) func main() { + traceCloser := tracing.New("localDiscovery", "jaeger:5775") + defer traceCloser.Close() + // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -123,8 +126,6 @@ func main() { }).Handler(api.Serve(nil)) handler = m.Middleware(handler) // add tracer middleware - traceCloser := tracing.New("localDiscovery", "jaeger:5775") - defer traceCloser.Close() handler = tracing.Middleware(handler) server.SetHandler(handler) diff --git a/cmd/localStatusReporter/main.go b/cmd/localStatusReporter/main.go index 1e5bc798..08887b41 100644 --- a/cmd/localStatusReporter/main.go +++ b/cmd/localStatusReporter/main.go @@ -20,6 +20,9 @@ import ( ) func main() { + traceCloser := tracing.New("localStatusReporter", "jaeger:5775") + defer traceCloser.Close() + // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -101,8 +104,6 @@ func main() { }).Handler(m.Middleware(log.APILogMiddleware(r.Handler("status"), logger))) // add tracer middleware - traceCloser := tracing.New("localStatusReporter", "jaeger:5775") - defer traceCloser.Close() handler = tracing.Middleware(handler) httpServer := &http.Server{ diff --git a/cmd/localStorage/main.go b/cmd/localStorage/main.go index 4b955e7a..3aa23216 100644 --- a/cmd/localStorage/main.go +++ b/cmd/localStorage/main.go @@ -39,6 +39,9 @@ import ( ) func main() { + traceCloser := tracing.New("localStorage", "jaeger:5775") + defer traceCloser.Close() + // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -181,8 +184,6 @@ func main() { handler = logMW.APILogMiddleware(handler, logger) handler = m.Middleware(handler) // add tracer middleware - traceCloser := tracing.New("localStorage", "jaeger:5775") - defer traceCloser.Close() handler = tracing.Middleware(handler) server.SetHandler(handler) diff --git a/service/authenticator/auth.go b/service/authenticator/auth.go index f104e34e..04379d4d 100644 --- a/service/authenticator/auth.go +++ b/service/authenticator/auth.go @@ -12,6 +12,8 @@ import ( "net/http" "strings" + "github.com/iryonetwork/wwm/service/tracing" + jwt "github.com/dgrijalva/jwt-go" "github.com/go-openapi/runtime" "github.com/go-openapi/swag" @@ -158,45 +160,47 @@ func (a *service) GetPrincipalFromToken(tokenString string) (*string, error) { func (a *service) Authorizer() runtime.Authorizer { return runtime.AuthorizerFunc(func(request *http.Request, principal interface{}) error { - userID, ok := principal.(*string) - if !ok { - return fmt.Errorf("Principal type was '%T', expected '*string'", principal) - } + return tracing.TraceFunctionSpan("Authorizer", request.Header, func() error { + userID, ok := principal.(*string) + if !ok { + return fmt.Errorf("Principal type was '%T', expected '*string'", principal) + } - // allow access for service operations without checking ACL - if strings.HasPrefix(*userID, servicePrincipal) { - keyID := (*userID)[len(servicePrincipal):] - s, ok := a.syncServices[keyID] - if ok && (request.URL.EscapedPath() == "/auth/validate" || s.glob.Match("/api"+request.URL.EscapedPath())) { - return nil + // allow access for service operations without checking ACL + if strings.HasPrefix(*userID, servicePrincipal) { + keyID := (*userID)[len(servicePrincipal):] + s, ok := a.syncServices[keyID] + if ok && (request.URL.EscapedPath() == "/auth/validate" || s.glob.Match("/api"+request.URL.EscapedPath())) { + return nil + } + return utils.NewError(utils.ErrForbidden, "You do not have permissions for this resource") } - return utils.NewError(utils.ErrForbidden, "You do not have permissions for this resource") - } - var action int64 - switch request.Method { - case http.MethodPost: - action = auth.Write - case http.MethodPut: - action = auth.Update - case http.MethodDelete: - action = auth.Delete - default: - action = auth.Read - } + var action int64 + switch request.Method { + case http.MethodPost: + action = auth.Write + case http.MethodPut: + action = auth.Update + case http.MethodDelete: + action = auth.Delete + default: + action = auth.Read + } - result := a.storage.FindACL(*userID, []*models.ValidationPair{{ - DomainType: &a.domainType, - DomainID: &a.domainID, - Actions: &action, - Resource: swag.String("/api" + request.URL.EscapedPath()), - }}) + result := a.storage.FindACL(*userID, []*models.ValidationPair{{ + DomainType: &a.domainType, + DomainID: &a.domainID, + Actions: &action, + Resource: swag.String("/api" + request.URL.EscapedPath()), + }}) - if !*result[0].Result { - return utils.NewError(utils.ErrForbidden, "You do not have permissions for this resource") - } + if !*result[0].Result { + return utils.NewError(utils.ErrForbidden, "You do not have permissions for this resource") + } - return nil + return nil + }) }) } diff --git a/service/authorizer/service.go b/service/authorizer/service.go index 3c123296..ec4a203d 100644 --- a/service/authorizer/service.go +++ b/service/authorizer/service.go @@ -15,6 +15,8 @@ import ( "net/http" "time" + "github.com/iryonetwork/wwm/service/tracing" + jwt "github.com/dgrijalva/jwt-go" "github.com/go-openapi/runtime" "github.com/go-openapi/swag" @@ -94,94 +96,98 @@ type responseAndError struct { func (a *authorizer) Authorizer() runtime.Authorizer { logger := a.logger.With().Str("cmd", "Authorizer").Logger() return runtime.AuthorizerFunc(func(request *http.Request, principal interface{}) error { - action := methodToAction(request.Method) - resource := "/api" + request.URL.EscapedPath() - pairs := []*models.ValidationPair{ - { - DomainType: &a.domainType, - DomainID: &a.domainID, - Actions: &action, - Resource: &resource, - }, - } - logger.Debug().Str("resource", resource).Msg("Authorizing...") + return tracing.TraceFunctionSpan("Authorizer", request.Header, func() error { + action := methodToAction(request.Method) + resource := "/api" + request.URL.EscapedPath() + pairs := []*models.ValidationPair{ + { + DomainType: &a.domainType, + DomainID: &a.domainID, + Actions: &action, + Resource: &resource, + }, + } + logger.Debug().Str("resource", resource).Msg("Authorizing...") - body, err := swag.WriteJSON(pairs) - if err != nil { - logger.Error().Err(err).Msg("WriteJSON failed") - return err - } + body, err := swag.WriteJSON(pairs) + if err != nil { + logger.Error().Err(err).Msg("WriteJSON failed") + return err + } - r, err := http.NewRequest(http.MethodPost, a.validateURL, bytes.NewBuffer(body)) - if err != nil { - logger.Error().Err(err).Msg("Initializing request failed") - return err - } - r.Header.Add("Authorization", request.Header.Get("Authorization")) - r.Header.Add("Content-Type", "application/json") - r.Header.Add("Uber-Trace-Id", request.Header.Get("Uber-Trace-Id")) - - transport := &http.Transport{} - netClient := &http.Client{ - Transport: transport, - Timeout: time.Second * 10, - } + r, err := http.NewRequest(http.MethodPost, a.validateURL, bytes.NewBuffer(body)) + if err != nil { + logger.Error().Err(err).Msg("Initializing request failed") + return err + } + r.Header.Add("Authorization", request.Header.Get("Authorization")) + r.Header.Add("Content-Type", "application/json") + if traceID := request.Header.Get("Uber-Trace-Id"); traceID != "" { + r.Header.Add("Uber-Trace-Id", traceID) + } - c := make(chan responseAndError) - go func() { - response, err := netClient.Do(r) - c <- responseAndError{response, err} - }() - - var response responseAndError - - select { - case <-request.Context().Done(): - transport.CancelRequest(r) - <-c // wait for canceld request - logger.Error().Err(request.Context().Err()).Msg("Context was done") - return fmt.Errorf("Context was done") - case response = <-c: - } + transport := &http.Transport{} + netClient := &http.Client{ + Transport: transport, + Timeout: time.Second * 10, + } - if response.err != nil { - logger.Error().Err(response.err).Msg("Making request failed") - return response.err - } - defer response.r.Body.Close() + c := make(chan responseAndError) + go func() { + response, err := netClient.Do(r) + c <- responseAndError{response, err} + }() + + var response responseAndError + + select { + case <-request.Context().Done(): + transport.CancelRequest(r) + <-c // wait for canceld request + logger.Error().Err(request.Context().Err()).Msg("Context was done") + return fmt.Errorf("Context was done") + case response = <-c: + } - responseBody, err := ioutil.ReadAll(response.r.Body) - if err != nil { - logger.Error().Err(err).Msg("Reading response failed") - return err - } + if response.err != nil { + logger.Error().Err(response.err).Msg("Making request failed") + return response.err + } + defer response.r.Body.Close() - if response.r.StatusCode == http.StatusOK { - validationResponse := []*models.ValidationResult{} - err := swag.ReadJSON(responseBody, &validationResponse) + responseBody, err := ioutil.ReadAll(response.r.Body) if err != nil { - logger.Error().Err(err).Msg("Parsing response failed") + logger.Error().Err(err).Msg("Reading response failed") return err } - if validationResponse[0].Result == nil || !*validationResponse[0].Result { - logger.Debug().Msg(ErrUnauthorized) - return fmt.Errorf(ErrUnauthorized) + if response.r.StatusCode == http.StatusOK { + validationResponse := []*models.ValidationResult{} + err := swag.ReadJSON(responseBody, &validationResponse) + if err != nil { + logger.Error().Err(err).Msg("Parsing response failed") + return err + } + + if validationResponse[0].Result == nil || !*validationResponse[0].Result { + logger.Debug().Msg(ErrUnauthorized) + return fmt.Errorf(ErrUnauthorized) + } + + logger.Debug().Msg("Authorized successfully") + return nil } - logger.Debug().Msg("Authorized successfully") - return nil - } - - jsonError := &models.Error{} - err = jsonError.UnmarshalBinary(responseBody) - if err != nil { - logger.Error().Err(err).Msg("Parsing error response failed") - return err - } + jsonError := &models.Error{} + err = jsonError.UnmarshalBinary(responseBody) + if err != nil { + logger.Error().Err(err).Msg("Parsing error response failed") + return err + } - logger.Error().Str("code", jsonError.Code).Str("errorMessage", jsonError.Message).Msg("Error authorizing") - return fmt.Errorf(jsonError.Message) + logger.Error().Str("code", jsonError.Code).Str("errorMessage", jsonError.Message).Msg("Error authorizing") + return fmt.Errorf(jsonError.Message) + }) }) } diff --git a/service/tracing/tracing.go b/service/tracing/tracing.go index 6f3b96c5..eed636e4 100644 --- a/service/tracing/tracing.go +++ b/service/tracing/tracing.go @@ -1,6 +1,7 @@ package tracing import ( + "context" "fmt" "io" "log" @@ -8,6 +9,7 @@ import ( "time" opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" jaeger "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/config" ) @@ -36,6 +38,7 @@ func New(serviceName, hostPort string) io.Closer { config.Logger(jaeger.StdLogger), ) if err != nil { + // TODO: Could probably set tracer to send data even if there is no connection to host log.Printf("Error initializing tracker: %v", err) return MockCloser{} } @@ -43,6 +46,8 @@ func New(serviceName, hostPort string) io.Closer { opentracing.SetGlobalTracer(tracer) tracerIsSet = true + log.Printf("New tracer %s created on host %s", serviceName, hostPort) + return closer } @@ -50,7 +55,7 @@ func Middleware(h http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if tracerIsSet { var sp opentracing.Span - spanName := fmt.Sprintf("HTTP %s", r.URL.Path) + spanName := fmt.Sprintf("Handler %s %s", r.Method, r.URL.Path) wireContext, err := opentracing.GlobalTracer().Extract( opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header)) @@ -69,9 +74,72 @@ func Middleware(h http.Handler) http.HandlerFunc { sp.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header)) + + r = r.WithContext(context.WithValue(r.Context(), "header", r.Header)) } else { log.Printf("Tracer is not set") } h.ServeHTTP(w, r) } } + +// TraceFunctionSpan creates new span and then executes provided function. +// If opentracing.GlobalTracer() is not set, then no span is reported nor created +// If possible tracer is extracted from request. +// If you have no request handy, pass in nil +func TraceFunctionSpan(name string, header http.Header, f func() error) error { + + // Create new span if tracer is set + var sp opentracing.Span + if tracerIsSet { + sp = getSpan(name, header) + + defer sp.Finish() + } else { + log.Printf("Tracer is not set") + } + + // Execute function + err := f() + if err != nil && tracerIsSet { + ext.Error.Set(sp, true) + sp.LogEventWithPayload(fmt.Sprintf("Error"), err) + } + return err +} + +func getSpan(name string, header http.Header) opentracing.Span { + var out opentracing.Span + log.Printf("Creating new span for %s", name) + + // if header is present try to extract traceID and use it + // if there is no header create new span + if header != nil { + + wireContext, err := opentracing.GlobalTracer().Extract( + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(header)) + + if err != nil { + // If for whatever reason we can't join, go ahead an start a new root span. + out = opentracing.StartSpan(name) + log.Printf("Trace not found, creating new span, %v", err) + + } else { + out = opentracing.StartSpan(name, opentracing.ChildOf(wireContext)) + log.Printf("Trace found, attaching to it") + } + out.Context().ForeachBaggageItem(func(k, v string) bool { + log.Print("k:" + k) + log.Print("v:" + v) + return true + }) + + } else { + out = opentracing.StartSpan(name) + log.Printf("Trace not found, creating new span") + + } + + return out +} From 61373880dbd11455b7bda4928d34ce14861e038e Mon Sep 17 00:00:00 2001 From: j-justin Date: Fri, 21 Sep 2018 00:28:06 +0200 Subject: [PATCH 3/8] CHG context instead of http request - instead of passign around the http request, trace span is now extracted out of context value, that is added to http request context, when root span is created - implemented InjectTracerInRequest function that injects span from provided context to request, which, if sent, be used to attach to a trace --- service/authenticator/auth.go | 2 +- service/authorizer/service.go | 7 ++-- service/tracing/tracing.go | 67 ++++++++++++++++++----------------- 3 files changed, 38 insertions(+), 38 deletions(-) diff --git a/service/authenticator/auth.go b/service/authenticator/auth.go index 04379d4d..d2acb4bb 100644 --- a/service/authenticator/auth.go +++ b/service/authenticator/auth.go @@ -160,7 +160,7 @@ func (a *service) GetPrincipalFromToken(tokenString string) (*string, error) { func (a *service) Authorizer() runtime.Authorizer { return runtime.AuthorizerFunc(func(request *http.Request, principal interface{}) error { - return tracing.TraceFunctionSpan("Authorizer", request.Header, func() error { + return tracing.TraceFunctionSpan("Authorizer", request.Context(), func() error { userID, ok := principal.(*string) if !ok { return fmt.Errorf("Principal type was '%T', expected '*string'", principal) diff --git a/service/authorizer/service.go b/service/authorizer/service.go index ec4a203d..4f364ecf 100644 --- a/service/authorizer/service.go +++ b/service/authorizer/service.go @@ -96,7 +96,7 @@ type responseAndError struct { func (a *authorizer) Authorizer() runtime.Authorizer { logger := a.logger.With().Str("cmd", "Authorizer").Logger() return runtime.AuthorizerFunc(func(request *http.Request, principal interface{}) error { - return tracing.TraceFunctionSpan("Authorizer", request.Header, func() error { + return tracing.TraceFunctionSpan("Authorizer", request.Context(), func() error { action := methodToAction(request.Method) resource := "/api" + request.URL.EscapedPath() pairs := []*models.ValidationPair{ @@ -122,9 +122,8 @@ func (a *authorizer) Authorizer() runtime.Authorizer { } r.Header.Add("Authorization", request.Header.Get("Authorization")) r.Header.Add("Content-Type", "application/json") - if traceID := request.Header.Get("Uber-Trace-Id"); traceID != "" { - r.Header.Add("Uber-Trace-Id", traceID) - } + + tracing.InjectTracerInRequest(request.Context(), r) transport := &http.Transport{} netClient := &http.Client{ diff --git a/service/tracing/tracing.go b/service/tracing/tracing.go index eed636e4..e162397d 100644 --- a/service/tracing/tracing.go +++ b/service/tracing/tracing.go @@ -51,31 +51,32 @@ func New(serviceName, hostPort string) io.Closer { return closer } +type spanContext struct{} // empty spanContext is used to extract opentracing.spanContext from context + func Middleware(h http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if tracerIsSet { var sp opentracing.Span spanName := fmt.Sprintf("Handler %s %s", r.Method, r.URL.Path) + wireContext, err := opentracing.GlobalTracer().Extract( opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header)) if err != nil { - // If for whatever reason we can't join, go ahead an start a new root span. - sp = opentracing.StartSpan(spanName) + // If for whatever reason we can't join go ahead an start a new root span. log.Printf("TRACE NOT FOUND, %v", err) + sp = opentracing.StartSpan(spanName) + } else { - sp = opentracing.StartSpan(spanName, opentracing.ChildOf(wireContext)) log.Printf("TRACE FOUND") + sp = opentracing.StartSpan(spanName, opentracing.ChildOf(wireContext)) } + defer sp.Finish() - sp.Tracer().Inject( - sp.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(r.Header)) + r = r.WithContext(context.WithValue(r.Context(), spanContext{}, sp.Context())) - r = r.WithContext(context.WithValue(r.Context(), "header", r.Header)) } else { log.Printf("Tracer is not set") } @@ -83,17 +84,31 @@ func Middleware(h http.Handler) http.HandlerFunc { } } +// InjectTracerInRequest injects span in request, if span is present in ctx value +func InjectTracerInRequest(ctx context.Context, r *http.Request) { + if tracerIsSet { + spContext := ctx.Value(spanContext{}) + if spContext == nil { + return + } + + opentracing.GlobalTracer().Inject( + spContext.(opentracing.SpanContext), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(r.Header)) + } +} + // TraceFunctionSpan creates new span and then executes provided function. // If opentracing.GlobalTracer() is not set, then no span is reported nor created // If possible tracer is extracted from request. // If you have no request handy, pass in nil -func TraceFunctionSpan(name string, header http.Header, f func() error) error { +func TraceFunctionSpan(name string, ctx context.Context, f func() error) error { // Create new span if tracer is set var sp opentracing.Span if tracerIsSet { - sp = getSpan(name, header) - + sp = getSpan(name, ctx) defer sp.Finish() } else { log.Printf("Tracer is not set") @@ -108,37 +123,23 @@ func TraceFunctionSpan(name string, header http.Header, f func() error) error { return err } -func getSpan(name string, header http.Header) opentracing.Span { +func getSpan(name string, ctx context.Context) opentracing.Span { var out opentracing.Span log.Printf("Creating new span for %s", name) // if header is present try to extract traceID and use it // if there is no header create new span - if header != nil { - - wireContext, err := opentracing.GlobalTracer().Extract( - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(header)) + wireContext := ctx.Value(spanContext{}) - if err != nil { - // If for whatever reason we can't join, go ahead an start a new root span. - out = opentracing.StartSpan(name) - log.Printf("Trace not found, creating new span, %v", err) - - } else { - out = opentracing.StartSpan(name, opentracing.ChildOf(wireContext)) - log.Printf("Trace found, attaching to it") - } - out.Context().ForeachBaggageItem(func(k, v string) bool { - log.Print("k:" + k) - log.Print("v:" + v) - return true - }) - - } else { + if wireContext == nil { + // If for whatever reason we can't join, go ahead an start a new root span. out = opentracing.StartSpan(name) log.Printf("Trace not found, creating new span") + } else { + wireContext := wireContext.(opentracing.SpanContext) + out = opentracing.StartSpan(name, opentracing.ChildOf(wireContext)) + log.Printf("Trace found, attaching to it") } return out From 1d4454db150c2757f191cab330940c57618d5436 Mon Sep 17 00:00:00 2001 From: j-justin Date: Fri, 21 Sep 2018 00:31:46 +0200 Subject: [PATCH 4/8] ADD s3 tracing - s3 calls in storage are traced and reported to agent - Add vendor files --- service/storage/storage.go | 181 ++++++++++++++++++++++++++----------- vendor/vendor.json | 152 +++++++++++++++++++++++++++++-- 2 files changed, 273 insertions(+), 60 deletions(-) diff --git a/service/storage/storage.go b/service/storage/storage.go index 84c5b60f..4fa6e380 100644 --- a/service/storage/storage.go +++ b/service/storage/storage.go @@ -8,6 +8,8 @@ import ( "io" "time" + "github.com/iryonetwork/wwm/service/tracing" + "github.com/agext/uuid" "github.com/go-openapi/strfmt" "github.com/rs/zerolog" @@ -109,7 +111,11 @@ func (s *service) FileList(ctx context.Context, bucketID string) ([]*models.File list := []*models.FileDescriptor{} // check if bucket exists - exists, err := s.s3.BucketExists(ctx, bucketID) + var exists bool + err := tracing.TraceFunctionSpan("s3 BucketExists", ctx, func() (err error) { + exists, err = s.s3.BucketExists(ctx, bucketID) + return err + }) if err != nil { s.logger.Info().Err(err).Str("bucket", bucketID).Msg("Failed to check if bucket exists") return nil, err @@ -119,7 +125,11 @@ func (s *service) FileList(ctx context.Context, bucketID string) ([]*models.File } // collect the list - l, err := s.s3.List(ctx, bucketID, "") + var l []*models.FileDescriptor + err = tracing.TraceFunctionSpan("s3 List", ctx, func() (err error) { + l, err = s.s3.List(ctx, bucketID, "") + return err + }) if err != nil { return nil, err } @@ -139,17 +149,23 @@ func (s *service) FileList(ctx context.Context, bucketID string) ([]*models.File return list, nil } -func (s *service) FileGet(ctx context.Context, bucketID, fileID string) (io.ReadCloser, *models.FileDescriptor, error) { - start := time.Now() - rc, fd, err := s.s3.Read(ctx, bucketID, fileID, "") - s.logger.Info().Str("method", "FileGet").Msgf("s3 read time %s", time.Since(start)) +func (s *service) FileGet(ctx context.Context, bucketID, fileID string) (rc io.ReadCloser, fd *models.FileDescriptor, err error) { + err = tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { + start := time.Now() + rc, fd, err = s.s3.Read(ctx, bucketID, fileID, "") + s.logger.Info().Str("method", "FileGet").Msgf("s3 read time %s", time.Since(start)) + return err + }) return rc, fd, err } -func (s *service) FileGetVersion(ctx context.Context, bucketID, fileID, version string) (io.ReadCloser, *models.FileDescriptor, error) { +func (s *service) FileGetVersion(ctx context.Context, bucketID, fileID, version string) (rc io.ReadCloser, fd *models.FileDescriptor, err error) { start := time.Now() - rc, fd, err := s.s3.Read(ctx, bucketID, fileID, version) + err = tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { + rc, fd, err = s.s3.Read(ctx, bucketID, fileID, version) + return err + }) s.logger.Info().Str("method", "FileGetVersion").Msgf("s3 read time %s", time.Since(start)) return rc, fd, err @@ -160,7 +176,11 @@ func (s *service) FileListVersions(ctx context.Context, bucketID, fileID string, list := []*models.FileDescriptor{} // check if bucket exists - exists, err := s.s3.BucketExists(ctx, bucketID) + var exists bool + err := tracing.TraceFunctionSpan("s3 BucketExists", ctx, func() (err error) { + exists, err = s.s3.BucketExists(ctx, bucketID) + return + }) if err != nil { s.logger.Info().Err(err).Str("bucket", bucketID).Msg("Failed to check if bucket exists") return nil, err @@ -169,7 +189,11 @@ func (s *service) FileListVersions(ctx context.Context, bucketID, fileID string, return list, nil } - l, err := s.s3.List(ctx, bucketID, fileID) + var l []*models.FileDescriptor + err = tracing.TraceFunctionSpan("s3 List", ctx, func() (err error) { + l, err = s.s3.List(ctx, bucketID, fileID) + return + }) if (createdAtSince == nil && createdAtUntil == nil) || err != nil { return l, err } @@ -215,10 +239,13 @@ func (s *service) FileNew(ctx context.Context, bucketID string, r io.Reader, con Labels: labels, } - start := time.Now() - fd, err := s.s3.Write(ctx, bucketID, no, &buf) - s.logger.Info().Str("method", "FileNew").Msgf("s3 write time %s", time.Since(start)) - + var fd *models.FileDescriptor + err = tracing.TraceFunctionSpan("s3 Write", ctx, func() (err error) { + start := time.Now() + fd, err = s.s3.Write(ctx, bucketID, no, &buf) + s.logger.Info().Str("method", "FileNew").Msgf("s3 write time %s", time.Since(start)) + return + }) if err == nil { s.publisher.PublishAsyncWithRetries( context.TODO(), @@ -237,11 +264,15 @@ func (s *service) FileNew(ctx context.Context, bucketID string, r io.Reader, con return fd, err } -func (s *service) FileUpdate(ctx context.Context, bucketID, fileID string, r io.Reader, contentType string, archetype string, labels []string) (*models.FileDescriptor, error) { +func (s *service) FileUpdate(ctx context.Context, bucketID, fileID string, r io.Reader, contentType string, archetype string, labels []string) (fd *models.FileDescriptor, err error) { // get the previous file - start := time.Now() - _, old, err := s.s3.Read(ctx, bucketID, fileID, "") - s.logger.Info().Str("method", "FileUpdate").Msgf("s3 read time %s", time.Since(start)) + var old *models.FileDescriptor + err = tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { + start := time.Now() + _, old, err = s.s3.Read(ctx, bucketID, fileID, "") + s.logger.Info().Str("method", "FileUpdate").Msgf("s3 read time %s", time.Since(start)) + return + }) if err != nil { return nil, err @@ -269,9 +300,12 @@ func (s *service) FileUpdate(ctx context.Context, bucketID, fileID string, r io. Labels: labels, } - start = time.Now() - fd, err := s.s3.Write(ctx, bucketID, no, &buf) - s.logger.Info().Str("method", "FileUpdate").Msgf("s3 write time %s", time.Since(start)) + err = tracing.TraceFunctionSpan("s3 Write", ctx, func() (err error) { + start := time.Now() + fd, err = s.s3.Write(ctx, bucketID, no, &buf) + s.logger.Info().Str("method", "FileUpdate").Msgf("s3 write time %s", time.Since(start)) + return + }) if err == nil { s.publisher.PublishAsyncWithRetries( @@ -300,9 +334,13 @@ func (s *service) FileUpdate(ctx context.Context, bucketID, fileID string, r io. func (s *service) FileDelete(ctx context.Context, bucketID, fileID string) error { // get the previous file - start := time.Now() - _, fd, err := s.s3.Read(ctx, bucketID, fileID, "") - s.logger.Info().Str("method", "FileDelete").Msgf("s3 read time %s", time.Since(start)) + var fd *models.FileDescriptor + err := tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { + start := time.Now() + _, fd, err = s.s3.Read(ctx, bucketID, fileID, "") + s.logger.Info().Str("method", "FileDelete").Msgf("s3 read time %s", time.Since(start)) + return err + }) if err != nil { return err @@ -320,10 +358,12 @@ func (s *service) FileDelete(ctx context.Context, bucketID, fileID string) error Operation: string(s3.Delete), Labels: fd.Labels, } - - start = time.Now() - fd, err = s.s3.Write(ctx, bucketID, no, &bytes.Buffer{}) - s.logger.Info().Str("method", "FileDelete").Msgf("s3 write time %s", time.Since(start)) + err = tracing.TraceFunctionSpan("s3 Write", ctx, func() (err error) { + start := time.Now() + fd, err = s.s3.Write(ctx, bucketID, no, &bytes.Buffer{}) + s.logger.Info().Str("method", "FileDelete").Msgf("s3 write time %s", time.Since(start)) + return err + }) if err == nil { s.publisher.PublishAsyncWithRetries( @@ -346,7 +386,11 @@ func (s *service) SyncFileList(ctx context.Context, bucketID string, createdAtSi list := []*models.FileDescriptor{} // check if bucket exists - exists, err := s.s3.BucketExists(ctx, bucketID) + var exists bool + err := tracing.TraceFunctionSpan("s3 BucketExists", ctx, func() (err error) { + exists, err = s.s3.BucketExists(ctx, bucketID) + return err + }) if err != nil { s.logger.Info().Err(err).Str("method", "SyncFileList").Str("bucket", bucketID).Msg("Failed to check if bucket exists") return nil, err @@ -356,7 +400,11 @@ func (s *service) SyncFileList(ctx context.Context, bucketID string, createdAtSi } // collect the list - l, err := s.s3.List(ctx, bucketID, "") + var l []*models.FileDescriptor + err = tracing.TraceFunctionSpan("s3 List", ctx, func() (err error) { + l, err = s.s3.List(ctx, bucketID, "") + return err + }) if err != nil { return nil, err } @@ -395,9 +443,13 @@ func (s *service) SyncFile(ctx context.Context, bucketID, fileID, version string } // try to fetch - start := time.Now() - _, fd, err := s.s3.Read(ctx, bucketID, fileID, version) - s.logger.Info().Str("method", "SyncFile").Msgf("s3 read time %s", time.Since(start)) + var fd *models.FileDescriptor + err = tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { + start := time.Now() + _, fd, err = s.s3.Read(ctx, bucketID, fileID, version) + s.logger.Info().Str("method", "SyncFile").Msgf("s3 read time %s", time.Since(start)) + return err + }) switch { // Already exists and does not conflict @@ -410,9 +462,12 @@ func (s *service) SyncFile(ctx context.Context, bucketID, fileID, version string s.logger.Info().Str("method", "SyncFile"). Msg("File already exists and has conflicting checksum. Local file will be removed and replaced with sync file.") - start = time.Now() - err = s.s3.Delete(ctx, bucketID, fileID, version) - s.logger.Info().Str("method", "SyncFile").Msgf("s3 delete time %s", time.Since(start)) + err = tracing.TraceFunctionSpan("s3 Delete", ctx, func() (err error) { + start := time.Now() + err = s.s3.Delete(ctx, bucketID, fileID, version) + s.logger.Info().Str("method", "SyncFile").Msgf("s3 delete time %s", time.Since(start)) + return err + }) if err != nil { s.logger.Error().Err(err).Str("method", "SyncFile"). @@ -438,18 +493,25 @@ func (s *service) SyncFile(ctx context.Context, bucketID, fileID, version string Labels: labels, } - start = time.Now() - fd, err = s.s3.Write(ctx, bucketID, no, &buf) - s.logger.Info().Str("method", "SyncFile").Msgf("s3 write time %s", time.Since(start)) + err = tracing.TraceFunctionSpan("s3 Write", ctx, func() (err error) { + start := time.Now() + fd, err = s.s3.Write(ctx, bucketID, no, &buf) + s.logger.Info().Str("method", "SyncFile").Msgf("s3 write time %s", time.Since(start)) + return err + }) return fd, err } func (s *service) SyncFileDelete(ctx context.Context, bucketID, fileID, version string, created strfmt.DateTime) error { // get the previous file - start := time.Now() - _, fd, err := s.s3.Read(ctx, bucketID, fileID, "") - s.logger.Info().Str("method", "SyncFileDelete").Msgf("s3 read time %s", time.Since(start)) + var fd *models.FileDescriptor + err := tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { + start := time.Now() + _, fd, err = s.s3.Read(ctx, bucketID, fileID, "") + s.logger.Info().Str("method", "SyncFileDelete").Msgf("s3 read time %s", time.Since(start)) + return + }) if err != nil { return err @@ -480,16 +542,23 @@ func (s *service) SyncFileDelete(ctx context.Context, bucketID, fileID, version Labels: fd.Labels, } - start = time.Now() - _, err = s.s3.Write(ctx, bucketID, no, &bytes.Buffer{}) - s.logger.Info().Str("method", "SyncFileDelete").Msgf("s3 write time %s", time.Since(start)) + err = tracing.TraceFunctionSpan("s3 Write", ctx, func() (err error) { + start := time.Now() + _, err = s.s3.Write(ctx, bucketID, no, &bytes.Buffer{}) + s.logger.Info().Str("method", "SyncFileDelete").Msgf("s3 write time %s", time.Since(start)) + return + }) return err } func (s *service) EnsureBucket(ctx context.Context, bucketID string) error { // make sure bucket exists - if err := s.s3.MakeBucket(ctx, bucketID); err != nil && err != s3.ErrAlreadyExists { + err := tracing.TraceFunctionSpan("s3 MakeBucket", ctx, func() (err error) { + return s.s3.MakeBucket(ctx, bucketID) + }) + + if err != nil && err != s3.ErrAlreadyExists { s.logger.Error().Err(err).Str("bucket", bucketID).Msg("Failed to ensure bucket") return err } @@ -499,10 +568,13 @@ func (s *service) EnsureBucket(ctx context.Context, bucketID string) error { func (s *service) updateFilesCollection(ctx context.Context, operation s3.Operation, bucketID, label string, fd *models.FileDescriptor) error { var c *filesCollection - - start := time.Now() - r, _, err := s.s3.Read(ctx, bucketID, label, "") - s.logger.Info().Str("method", "updateFilesCollection").Msgf("s3 read time %s", time.Since(start)) + var r io.ReadCloser + err := tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { + start := time.Now() + r, _, err = s.s3.Read(ctx, bucketID, label, "") + s.logger.Info().Str("method", "updateFilesCollection").Msgf("s3 read time %s", time.Since(start)) + return err + }) if err != nil { if err != s3.ErrNotFound { @@ -552,9 +624,12 @@ func (s *service) updateFilesCollection(ctx context.Context, operation s3.Operat Labels: []string{labelFilesCollection}, } - start = time.Now() - fd, err = s.s3.Write(ctx, bucketID, no, &buf) - s.logger.Info().Str("method", "updateFilesCollection").Msgf("s3 write time %s", time.Since(start)) + err = tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { + start := time.Now() + fd, err = s.s3.Write(ctx, bucketID, no, &buf) + s.logger.Info().Str("method", "updateFilesCollection").Msgf("s3 write time %s", time.Since(start)) + return + }) if err != nil { s.logger.Error().Err(err).Msg("failed to write file collection file") diff --git a/vendor/vendor.json b/vendor/vendor.json index 4193c4c1..731d4889 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -108,6 +108,12 @@ "revision": "1b8f581b3213fee8295c7dc7018fb16566eb2cb9", "revisionTime": "2018-01-06T05:10:40Z" }, + { + "checksumSHA1": "7gK+lSShSu1NRw83/A95BcgMqsI=", + "path": "github.com/codahale/hdrhistogram", + "revision": "3a0bb77429bd3a61596f5e8a3172445844342120", + "revisionTime": "2016-10-10T02:54:55Z" + }, { "checksumSHA1": "15ji1OWdG/OwU8mv+h/+gux6VuU=", "path": "github.com/coreos/bbolt", @@ -324,6 +330,12 @@ "revision": "b3e60bcdc577185fce3cf625fc96b62857ce5574", "revisionTime": "2017-12-16T01:26:12Z" }, + { + "checksumSHA1": "PD2yvdc0t2YyPvN9z423IG5t6j4=", + "path": "github.com/golang/mock/mockgen/model", + "revision": "600781dde9cca80734169b9e969d9054ccc57937", + "revisionTime": "2018-08-20T16:13:58Z" + }, { "checksumSHA1": "yqF125xVSkmfLpIVGrLlfE05IUk=", "path": "github.com/golang/protobuf/proto", @@ -787,10 +799,28 @@ "revisionTime": "2017-10-25T20:31:29Z" }, { - "checksumSHA1": "xCv4GBFyw07vZkVtKF/XrUnkHRk=", + "checksumSHA1": "1JLbBBy0amIitG9BZpDz0YL3BB0=", + "path": "github.com/opentracing/opentracing-go", + "revision": "6aa6febac7b98f836100ecaea478c04f30b6dbd0", + "revisionTime": "2018-09-08T21:19:32Z" + }, + { + "checksumSHA1": "uhDxBvLEqRAMZKgpTZ8MFuLIIM8=", + "path": "github.com/opentracing/opentracing-go/ext", + "revision": "6aa6febac7b98f836100ecaea478c04f30b6dbd0", + "revisionTime": "2018-09-08T21:19:32Z" + }, + { + "checksumSHA1": "tnkdNJbJxNKuPZMWapP1xhKIIGw=", + "path": "github.com/opentracing/opentracing-go/log", + "revision": "6aa6febac7b98f836100ecaea478c04f30b6dbd0", + "revisionTime": "2018-09-08T21:19:32Z" + }, + { + "checksumSHA1": "18YrywDvb67HU8xYF5vqKMgelx0=", "path": "github.com/pkg/errors", - "revision": "e881fd58d78e04cf6d0de1217f8707c8cc2249bc", - "revisionTime": "2017-12-16T07:03:16Z" + "revision": "c059e472caf75dbe73903f6521a20abac245b17f", + "revisionTime": "2018-09-11T06:21:13Z" }, { "checksumSHA1": "8T6L55DZtMEFFutCn2licvY7vHk=", @@ -889,10 +919,112 @@ "revisionTime": "2018-02-27T22:34:04Z" }, { - "checksumSHA1": "5q4Lyr1RgyO9j66pTmz1M63rQvs=", - "path": "github.com/tylerb/graceful", - "revision": "d72b0151351a13d0421b763b88f791469c4f5dc7", - "revisionTime": "2017-02-21T17:10:03Z" + "checksumSHA1": "/X4PHptdXEzVmpizWvQSaQKMMXA=", + "path": "github.com/uber/jaeger-client-go", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "krwqWdnYTkSLTlLAut/HgGbEl0I=", + "path": "github.com/uber/jaeger-client-go/config", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "KM5UXTWkHULmw0dDRNuk8ogWyGs=", + "path": "github.com/uber/jaeger-client-go/internal/baggage", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "tZqlcHV1XoLdZp9jfnydzsZAvYo=", + "path": "github.com/uber/jaeger-client-go/internal/baggage/remote", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "QB0L0GrzyMGQp6ivkkxp7a1DPsE=", + "path": "github.com/uber/jaeger-client-go/internal/spanlog", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "79HRO/+ekkpwqDB/OMiW+AHJtlE=", + "path": "github.com/uber/jaeger-client-go/internal/throttler", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "OVQDWFtFMs+NODe0F/S5kYViQco=", + "path": "github.com/uber/jaeger-client-go/internal/throttler/remote", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "tMP/vxbHwNAbOEaUhic5/meKfac=", + "path": "github.com/uber/jaeger-client-go/log", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "j4WrA/B2SJCqHxttCQ+TYbmtQqE=", + "path": "github.com/uber/jaeger-client-go/rpcmetrics", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "+ffspyTBQLql2UiU6muvfWR/m1o=", + "path": "github.com/uber/jaeger-client-go/thrift", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "fMIQ4sJFCkqFYhXvvLKIlofqxvY=", + "path": "github.com/uber/jaeger-client-go/thrift-gen/agent", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "fRR2p+JAp7paApf32YuQuWU7yzY=", + "path": "github.com/uber/jaeger-client-go/thrift-gen/baggage", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "JZkMEOmiOFFEuGCsDOVLK5RzvMM=", + "path": "github.com/uber/jaeger-client-go/thrift-gen/jaeger", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "0teQUhTqTE1fLs+vbnTTzWOqdEQ=", + "path": "github.com/uber/jaeger-client-go/thrift-gen/sampling", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "Nx5witfz05BSO2YlFzh2Gno6bA0=", + "path": "github.com/uber/jaeger-client-go/thrift-gen/zipkincore", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "UlW+AcyeItWM0x1W4vT9hbUiOJs=", + "path": "github.com/uber/jaeger-client-go/transport", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "DKwwIk9vq53IKO7RKccat9cnqeo=", + "path": "github.com/uber/jaeger-client-go/utils", + "revision": "ab5afa7b97b7a16aa6ae001f1c80af61a880e5d9", + "revisionTime": "2018-09-19T14:43:06Z" + }, + { + "checksumSHA1": "k1iaOSBmLp3TpGvHNnRQXyJfwyI=", + "path": "github.com/uber/jaeger-lib/metrics", + "revision": "a51202d6f4a7e5a219e3841a43614ff7187ae7f1", + "revisionTime": "2018-06-15T20:27:29Z" }, { "checksumSHA1": "CSMVjFF7FnylAUUKW1e/4r+VFXA=", @@ -948,6 +1080,12 @@ "revision": "6078986fec03a1dcc236c34816c71b0e05018fda", "revisionTime": "2017-09-09T04:35:08Z" }, + { + "checksumSHA1": "CkkyCjLPBm0OGzfMuTfaDsd45X4=", + "path": "golang.org/x/net/netutil", + "revision": "2f5d2388922f370f4355f327fcf4cfe9f5583908", + "revisionTime": "2018-09-20T20:40:41Z" + }, { "checksumSHA1": "G99Z/xW4ukmt8SHnsO9SIX71hFE=", "path": "golang.org/x/sys/unix", From c6078fa8c472b9b295d20ebd05663b61a1516c4b Mon Sep 17 00:00:00 2001 From: j-justin Date: Fri, 21 Sep 2018 13:25:03 +0200 Subject: [PATCH 5/8] ADD Set tracer address in config --- cmd/cloudAuth/main.go | 6 +++--- cmd/cloudDiscovery/main.go | 6 +++--- cmd/cloudStatusReporter/main.go | 6 +++--- cmd/cloudStorage/main.go | 6 +++--- cmd/localAuth/main.go | 6 +++--- cmd/localDiscovery/main.go | 6 +++--- cmd/localStatusReporter/main.go | 6 +++--- cmd/localStorage/main.go | 6 +++--- config/config.go | 1 + service/tracing/tracing.go | 3 +++ 10 files changed, 28 insertions(+), 24 deletions(-) diff --git a/cmd/cloudAuth/main.go b/cmd/cloudAuth/main.go index b7cb622c..9d87c8ca 100644 --- a/cmd/cloudAuth/main.go +++ b/cmd/cloudAuth/main.go @@ -32,9 +32,6 @@ import ( ) func main() { - traceCloser := tracing.New("cloudAuth", "jaeger:5775") - defer traceCloser.Close() - logger := zerolog.New(os.Stdout).With(). Timestamp(). Str("service", "cloudAuth"). @@ -50,6 +47,9 @@ func main() { logger.Fatal().Err(err).Msg("failed to get config") } + traceCloser := tracing.New("cloudAuth", cfg.TracerAddr) + defer traceCloser.Close() + swaggerSpec, err := loads.Analyzed(restapi.SwaggerJSON, "") if err != nil { logger.Fatal().Err(err).Msg("Failed to load swagger spec") diff --git a/cmd/cloudDiscovery/main.go b/cmd/cloudDiscovery/main.go index 6b8ace6a..0d1d3766 100644 --- a/cmd/cloudDiscovery/main.go +++ b/cmd/cloudDiscovery/main.go @@ -29,9 +29,6 @@ import ( ) func main() { - traceCloser := tracing.New("cloudDiscovery", "jaeger:5775") - defer traceCloser.Close() - // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -48,6 +45,9 @@ func main() { logger.Fatal().Err(err).Msg("failed to get config") } + traceCloser := tracing.New("cloudDiscovery", cfg.TracerAddr) + defer traceCloser.Close() + swaggerSpec, err := loads.Analyzed(restapi.SwaggerJSON, "") if err != nil { logger.Fatal().Err(err).Msg("Failed to load swagger spec") diff --git a/cmd/cloudStatusReporter/main.go b/cmd/cloudStatusReporter/main.go index 77e0d575..334b34db 100644 --- a/cmd/cloudStatusReporter/main.go +++ b/cmd/cloudStatusReporter/main.go @@ -20,9 +20,6 @@ import ( ) func main() { - traceCloser := tracing.New("cloudStatusReporter", "jaeger:5775") - defer traceCloser.Close() - // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -40,6 +37,9 @@ func main() { } logger.Print(cfg) + traceCloser := tracing.New("cloudStatusReporter", cfg.TracerAddr) + defer traceCloser.Close() + // initialize status reporter r := statusReporter.New(logger) diff --git a/cmd/cloudStorage/main.go b/cmd/cloudStorage/main.go index 325decf8..71cfc397 100644 --- a/cmd/cloudStorage/main.go +++ b/cmd/cloudStorage/main.go @@ -34,9 +34,6 @@ import ( ) func main() { - traceCloser := tracing.New("cloudStorage", "jaeger:5775") - defer traceCloser.Close() - // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -52,6 +49,9 @@ func main() { logger.Fatal().Err(err).Msg("failed to get config") } + traceCloser := tracing.New("cloudStorage", cfg.TracerAddr) + defer traceCloser.Close() + swaggerSpec, err := loads.Analyzed(restapi.SwaggerJSON, "") if err != nil { logger.Fatal().Err(err).Msg("Failed to load swagger spec") diff --git a/cmd/localAuth/main.go b/cmd/localAuth/main.go index 6d26c68c..c0411993 100644 --- a/cmd/localAuth/main.go +++ b/cmd/localAuth/main.go @@ -33,9 +33,6 @@ import ( ) func main() { - traceCloser := tracing.New("localAuth", "jaeger:5775") - defer traceCloser.Close() - logger := zerolog.New(os.Stdout).With(). Timestamp(). Str("service", "localAuth"). @@ -51,6 +48,9 @@ func main() { logger.Fatal().Err(err).Msg("failed to get config") } + traceCloser := tracing.New("localAuth", cfg.TracerAddr) + defer traceCloser.Close() + swaggerSpec, err := loads.Analyzed(restapi.SwaggerJSON, "") if err != nil { logger.Fatal().Err(err).Msg("Failed to load swagger spec") diff --git a/cmd/localDiscovery/main.go b/cmd/localDiscovery/main.go index 2f7aaa6f..a820d995 100644 --- a/cmd/localDiscovery/main.go +++ b/cmd/localDiscovery/main.go @@ -30,9 +30,6 @@ import ( ) func main() { - traceCloser := tracing.New("localDiscovery", "jaeger:5775") - defer traceCloser.Close() - // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -49,6 +46,9 @@ func main() { logger.Fatal().Err(err).Msg("failed to get config") } + traceCloser := tracing.New("localDiscovery", cfg.TracerAddr) + defer traceCloser.Close() + swaggerSpec, err := loads.Analyzed(restapi.SwaggerJSON, "") if err != nil { logger.Fatal().Err(err).Msg("Failed to load swagger spec") diff --git a/cmd/localStatusReporter/main.go b/cmd/localStatusReporter/main.go index 08887b41..5c6c9dff 100644 --- a/cmd/localStatusReporter/main.go +++ b/cmd/localStatusReporter/main.go @@ -20,9 +20,6 @@ import ( ) func main() { - traceCloser := tracing.New("localStatusReporter", "jaeger:5775") - defer traceCloser.Close() - // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -40,6 +37,9 @@ func main() { } logger.Print(cfg) + traceCloser := tracing.New("localStatusReporter", cfg.TracerAddr) + defer traceCloser.Close() + // initialize status reporter r := statusReporter.New(logger) diff --git a/cmd/localStorage/main.go b/cmd/localStorage/main.go index 3aa23216..23fa6157 100644 --- a/cmd/localStorage/main.go +++ b/cmd/localStorage/main.go @@ -39,9 +39,6 @@ import ( ) func main() { - traceCloser := tracing.New("localStorage", "jaeger:5775") - defer traceCloser.Close() - // initialize logger logger := zerolog.New(os.Stdout).With(). Timestamp(). @@ -58,6 +55,9 @@ func main() { logger.Fatal().Err(err).Msg("failed to get config") } + traceCloser := tracing.New("localStorage", cfg.TracerAddr) + defer traceCloser.Close() + swaggerSpec, err := loads.Analyzed(restapi.SwaggerJSON, "") if err != nil { logger.Fatal().Err(err).Msg("Failed to load swagger spec") diff --git a/config/config.go b/config/config.go index a1287b78..ea937e02 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,7 @@ type Config struct { StoragePath string `env:"STORAGE_PATH" envDefault:"storage"` AuthHost string `env:"AUTH_HOST" envDefault:"localAuth"` AuthPath string `env:"AUTH_PATH" envDefault:"auth"` + TracerAddr string `env:"TRACER_ADDR" envDefault:"jaeger:5775"` } // New returns new instance of Config diff --git a/service/tracing/tracing.go b/service/tracing/tracing.go index e162397d..adbe914a 100644 --- a/service/tracing/tracing.go +++ b/service/tracing/tracing.go @@ -18,6 +18,7 @@ var tracerIsSet = false // New sets opentracing.GlobalTracer() to tracer created from function options // returns Closer, which is used to close the tracker +// If connection to agent cannot be established return MockCloser and do not set GlobalTracer func New(serviceName, hostPort string) io.Closer { log.Printf("Creating new tracer %s on host %s", serviceName, hostPort) @@ -53,6 +54,8 @@ func New(serviceName, hostPort string) io.Closer { type spanContext struct{} // empty spanContext is used to extract opentracing.spanContext from context +// Middleware injects existing(if provided in request) or new span in request's context +// Does nothing if GlobalTracer is not set func Middleware(h http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if tracerIsSet { From 8092bc70c7a07e564a9ba6cd6eacfbbe3e3fd014 Mon Sep 17 00:00:00 2001 From: j-justin Date: Fri, 21 Sep 2018 13:25:21 +0200 Subject: [PATCH 6/8] ADD cloudStatusReporter to make up - cloudStatusReporter was not included in make up command --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index eef13a00..30f03694 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ lc = $(subst A,a,$(subst B,b,$(subst C,c,$(subst D,d,$(subst E,e,$(subst F,f,$(subst G,g,$(subst H,h,$(subst I,i,$(subst J,j,$(subst K,k,$(subst L,l,$(subst M,m,$(subst N,n,$(subst O,o,$(subst P,p,$(subst Q,q,$(subst R,r,$(subst S,s,$(subst T,t,$(subst U,u,$(subst V,v,$(subst W,w,$(subst X,x,$(subst Y,y,$(subst Z,z,$1)))))))))))))))))))))))))) -BASIC_SERVICES = traefik postgres cloudSymmetric pgweb localMinio cloudMinio localNats natsStreamingExporter localPrometheusPushGateway localPrometheus cloudPrometheus cloudAuth localAuth localStorage cloudStorage localNats storageSync waitlist localStatusReporter cloudDiscovery localDiscovery localSymmetric +BASIC_SERVICES = traefik postgres cloudSymmetric pgweb localMinio cloudMinio localNats natsStreamingExporter localPrometheusPushGateway localPrometheus cloudPrometheus cloudAuth localAuth localStorage cloudStorage localNats storageSync waitlist localStatusReporter cloudStatusReporter cloudDiscovery localDiscovery localSymmetric BIN_CMD ?= DOCKER_TAG ?= $(shell git rev-parse --short HEAD) DOCKER_REGISTRY ?= localhost:5000/ From cf921723e837d24c00e0a89d0d627ded581362c3 Mon Sep 17 00:00:00 2001 From: j-justin Date: Fri, 21 Sep 2018 14:23:43 +0200 Subject: [PATCH 7/8] RMV Error reporting on jaeger - spans are now only tagged as errored, they no longer contain lgo - moved spans to the base of s3 calls --- service/storage/storage.go | 181 +++++------------ service/tracing/tracing.go | 8 +- storage/s3/s3.go | 398 ++++++++++++++++++++----------------- 3 files changed, 272 insertions(+), 315 deletions(-) diff --git a/service/storage/storage.go b/service/storage/storage.go index 4fa6e380..84c5b60f 100644 --- a/service/storage/storage.go +++ b/service/storage/storage.go @@ -8,8 +8,6 @@ import ( "io" "time" - "github.com/iryonetwork/wwm/service/tracing" - "github.com/agext/uuid" "github.com/go-openapi/strfmt" "github.com/rs/zerolog" @@ -111,11 +109,7 @@ func (s *service) FileList(ctx context.Context, bucketID string) ([]*models.File list := []*models.FileDescriptor{} // check if bucket exists - var exists bool - err := tracing.TraceFunctionSpan("s3 BucketExists", ctx, func() (err error) { - exists, err = s.s3.BucketExists(ctx, bucketID) - return err - }) + exists, err := s.s3.BucketExists(ctx, bucketID) if err != nil { s.logger.Info().Err(err).Str("bucket", bucketID).Msg("Failed to check if bucket exists") return nil, err @@ -125,11 +119,7 @@ func (s *service) FileList(ctx context.Context, bucketID string) ([]*models.File } // collect the list - var l []*models.FileDescriptor - err = tracing.TraceFunctionSpan("s3 List", ctx, func() (err error) { - l, err = s.s3.List(ctx, bucketID, "") - return err - }) + l, err := s.s3.List(ctx, bucketID, "") if err != nil { return nil, err } @@ -149,23 +139,17 @@ func (s *service) FileList(ctx context.Context, bucketID string) ([]*models.File return list, nil } -func (s *service) FileGet(ctx context.Context, bucketID, fileID string) (rc io.ReadCloser, fd *models.FileDescriptor, err error) { - err = tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { - start := time.Now() - rc, fd, err = s.s3.Read(ctx, bucketID, fileID, "") - s.logger.Info().Str("method", "FileGet").Msgf("s3 read time %s", time.Since(start)) - return err - }) +func (s *service) FileGet(ctx context.Context, bucketID, fileID string) (io.ReadCloser, *models.FileDescriptor, error) { + start := time.Now() + rc, fd, err := s.s3.Read(ctx, bucketID, fileID, "") + s.logger.Info().Str("method", "FileGet").Msgf("s3 read time %s", time.Since(start)) return rc, fd, err } -func (s *service) FileGetVersion(ctx context.Context, bucketID, fileID, version string) (rc io.ReadCloser, fd *models.FileDescriptor, err error) { +func (s *service) FileGetVersion(ctx context.Context, bucketID, fileID, version string) (io.ReadCloser, *models.FileDescriptor, error) { start := time.Now() - err = tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { - rc, fd, err = s.s3.Read(ctx, bucketID, fileID, version) - return err - }) + rc, fd, err := s.s3.Read(ctx, bucketID, fileID, version) s.logger.Info().Str("method", "FileGetVersion").Msgf("s3 read time %s", time.Since(start)) return rc, fd, err @@ -176,11 +160,7 @@ func (s *service) FileListVersions(ctx context.Context, bucketID, fileID string, list := []*models.FileDescriptor{} // check if bucket exists - var exists bool - err := tracing.TraceFunctionSpan("s3 BucketExists", ctx, func() (err error) { - exists, err = s.s3.BucketExists(ctx, bucketID) - return - }) + exists, err := s.s3.BucketExists(ctx, bucketID) if err != nil { s.logger.Info().Err(err).Str("bucket", bucketID).Msg("Failed to check if bucket exists") return nil, err @@ -189,11 +169,7 @@ func (s *service) FileListVersions(ctx context.Context, bucketID, fileID string, return list, nil } - var l []*models.FileDescriptor - err = tracing.TraceFunctionSpan("s3 List", ctx, func() (err error) { - l, err = s.s3.List(ctx, bucketID, fileID) - return - }) + l, err := s.s3.List(ctx, bucketID, fileID) if (createdAtSince == nil && createdAtUntil == nil) || err != nil { return l, err } @@ -239,13 +215,10 @@ func (s *service) FileNew(ctx context.Context, bucketID string, r io.Reader, con Labels: labels, } - var fd *models.FileDescriptor - err = tracing.TraceFunctionSpan("s3 Write", ctx, func() (err error) { - start := time.Now() - fd, err = s.s3.Write(ctx, bucketID, no, &buf) - s.logger.Info().Str("method", "FileNew").Msgf("s3 write time %s", time.Since(start)) - return - }) + start := time.Now() + fd, err := s.s3.Write(ctx, bucketID, no, &buf) + s.logger.Info().Str("method", "FileNew").Msgf("s3 write time %s", time.Since(start)) + if err == nil { s.publisher.PublishAsyncWithRetries( context.TODO(), @@ -264,15 +237,11 @@ func (s *service) FileNew(ctx context.Context, bucketID string, r io.Reader, con return fd, err } -func (s *service) FileUpdate(ctx context.Context, bucketID, fileID string, r io.Reader, contentType string, archetype string, labels []string) (fd *models.FileDescriptor, err error) { +func (s *service) FileUpdate(ctx context.Context, bucketID, fileID string, r io.Reader, contentType string, archetype string, labels []string) (*models.FileDescriptor, error) { // get the previous file - var old *models.FileDescriptor - err = tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { - start := time.Now() - _, old, err = s.s3.Read(ctx, bucketID, fileID, "") - s.logger.Info().Str("method", "FileUpdate").Msgf("s3 read time %s", time.Since(start)) - return - }) + start := time.Now() + _, old, err := s.s3.Read(ctx, bucketID, fileID, "") + s.logger.Info().Str("method", "FileUpdate").Msgf("s3 read time %s", time.Since(start)) if err != nil { return nil, err @@ -300,12 +269,9 @@ func (s *service) FileUpdate(ctx context.Context, bucketID, fileID string, r io. Labels: labels, } - err = tracing.TraceFunctionSpan("s3 Write", ctx, func() (err error) { - start := time.Now() - fd, err = s.s3.Write(ctx, bucketID, no, &buf) - s.logger.Info().Str("method", "FileUpdate").Msgf("s3 write time %s", time.Since(start)) - return - }) + start = time.Now() + fd, err := s.s3.Write(ctx, bucketID, no, &buf) + s.logger.Info().Str("method", "FileUpdate").Msgf("s3 write time %s", time.Since(start)) if err == nil { s.publisher.PublishAsyncWithRetries( @@ -334,13 +300,9 @@ func (s *service) FileUpdate(ctx context.Context, bucketID, fileID string, r io. func (s *service) FileDelete(ctx context.Context, bucketID, fileID string) error { // get the previous file - var fd *models.FileDescriptor - err := tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { - start := time.Now() - _, fd, err = s.s3.Read(ctx, bucketID, fileID, "") - s.logger.Info().Str("method", "FileDelete").Msgf("s3 read time %s", time.Since(start)) - return err - }) + start := time.Now() + _, fd, err := s.s3.Read(ctx, bucketID, fileID, "") + s.logger.Info().Str("method", "FileDelete").Msgf("s3 read time %s", time.Since(start)) if err != nil { return err @@ -358,12 +320,10 @@ func (s *service) FileDelete(ctx context.Context, bucketID, fileID string) error Operation: string(s3.Delete), Labels: fd.Labels, } - err = tracing.TraceFunctionSpan("s3 Write", ctx, func() (err error) { - start := time.Now() - fd, err = s.s3.Write(ctx, bucketID, no, &bytes.Buffer{}) - s.logger.Info().Str("method", "FileDelete").Msgf("s3 write time %s", time.Since(start)) - return err - }) + + start = time.Now() + fd, err = s.s3.Write(ctx, bucketID, no, &bytes.Buffer{}) + s.logger.Info().Str("method", "FileDelete").Msgf("s3 write time %s", time.Since(start)) if err == nil { s.publisher.PublishAsyncWithRetries( @@ -386,11 +346,7 @@ func (s *service) SyncFileList(ctx context.Context, bucketID string, createdAtSi list := []*models.FileDescriptor{} // check if bucket exists - var exists bool - err := tracing.TraceFunctionSpan("s3 BucketExists", ctx, func() (err error) { - exists, err = s.s3.BucketExists(ctx, bucketID) - return err - }) + exists, err := s.s3.BucketExists(ctx, bucketID) if err != nil { s.logger.Info().Err(err).Str("method", "SyncFileList").Str("bucket", bucketID).Msg("Failed to check if bucket exists") return nil, err @@ -400,11 +356,7 @@ func (s *service) SyncFileList(ctx context.Context, bucketID string, createdAtSi } // collect the list - var l []*models.FileDescriptor - err = tracing.TraceFunctionSpan("s3 List", ctx, func() (err error) { - l, err = s.s3.List(ctx, bucketID, "") - return err - }) + l, err := s.s3.List(ctx, bucketID, "") if err != nil { return nil, err } @@ -443,13 +395,9 @@ func (s *service) SyncFile(ctx context.Context, bucketID, fileID, version string } // try to fetch - var fd *models.FileDescriptor - err = tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { - start := time.Now() - _, fd, err = s.s3.Read(ctx, bucketID, fileID, version) - s.logger.Info().Str("method", "SyncFile").Msgf("s3 read time %s", time.Since(start)) - return err - }) + start := time.Now() + _, fd, err := s.s3.Read(ctx, bucketID, fileID, version) + s.logger.Info().Str("method", "SyncFile").Msgf("s3 read time %s", time.Since(start)) switch { // Already exists and does not conflict @@ -462,12 +410,9 @@ func (s *service) SyncFile(ctx context.Context, bucketID, fileID, version string s.logger.Info().Str("method", "SyncFile"). Msg("File already exists and has conflicting checksum. Local file will be removed and replaced with sync file.") - err = tracing.TraceFunctionSpan("s3 Delete", ctx, func() (err error) { - start := time.Now() - err = s.s3.Delete(ctx, bucketID, fileID, version) - s.logger.Info().Str("method", "SyncFile").Msgf("s3 delete time %s", time.Since(start)) - return err - }) + start = time.Now() + err = s.s3.Delete(ctx, bucketID, fileID, version) + s.logger.Info().Str("method", "SyncFile").Msgf("s3 delete time %s", time.Since(start)) if err != nil { s.logger.Error().Err(err).Str("method", "SyncFile"). @@ -493,25 +438,18 @@ func (s *service) SyncFile(ctx context.Context, bucketID, fileID, version string Labels: labels, } - err = tracing.TraceFunctionSpan("s3 Write", ctx, func() (err error) { - start := time.Now() - fd, err = s.s3.Write(ctx, bucketID, no, &buf) - s.logger.Info().Str("method", "SyncFile").Msgf("s3 write time %s", time.Since(start)) - return err - }) + start = time.Now() + fd, err = s.s3.Write(ctx, bucketID, no, &buf) + s.logger.Info().Str("method", "SyncFile").Msgf("s3 write time %s", time.Since(start)) return fd, err } func (s *service) SyncFileDelete(ctx context.Context, bucketID, fileID, version string, created strfmt.DateTime) error { // get the previous file - var fd *models.FileDescriptor - err := tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { - start := time.Now() - _, fd, err = s.s3.Read(ctx, bucketID, fileID, "") - s.logger.Info().Str("method", "SyncFileDelete").Msgf("s3 read time %s", time.Since(start)) - return - }) + start := time.Now() + _, fd, err := s.s3.Read(ctx, bucketID, fileID, "") + s.logger.Info().Str("method", "SyncFileDelete").Msgf("s3 read time %s", time.Since(start)) if err != nil { return err @@ -542,23 +480,16 @@ func (s *service) SyncFileDelete(ctx context.Context, bucketID, fileID, version Labels: fd.Labels, } - err = tracing.TraceFunctionSpan("s3 Write", ctx, func() (err error) { - start := time.Now() - _, err = s.s3.Write(ctx, bucketID, no, &bytes.Buffer{}) - s.logger.Info().Str("method", "SyncFileDelete").Msgf("s3 write time %s", time.Since(start)) - return - }) + start = time.Now() + _, err = s.s3.Write(ctx, bucketID, no, &bytes.Buffer{}) + s.logger.Info().Str("method", "SyncFileDelete").Msgf("s3 write time %s", time.Since(start)) return err } func (s *service) EnsureBucket(ctx context.Context, bucketID string) error { // make sure bucket exists - err := tracing.TraceFunctionSpan("s3 MakeBucket", ctx, func() (err error) { - return s.s3.MakeBucket(ctx, bucketID) - }) - - if err != nil && err != s3.ErrAlreadyExists { + if err := s.s3.MakeBucket(ctx, bucketID); err != nil && err != s3.ErrAlreadyExists { s.logger.Error().Err(err).Str("bucket", bucketID).Msg("Failed to ensure bucket") return err } @@ -568,13 +499,10 @@ func (s *service) EnsureBucket(ctx context.Context, bucketID string) error { func (s *service) updateFilesCollection(ctx context.Context, operation s3.Operation, bucketID, label string, fd *models.FileDescriptor) error { var c *filesCollection - var r io.ReadCloser - err := tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { - start := time.Now() - r, _, err = s.s3.Read(ctx, bucketID, label, "") - s.logger.Info().Str("method", "updateFilesCollection").Msgf("s3 read time %s", time.Since(start)) - return err - }) + + start := time.Now() + r, _, err := s.s3.Read(ctx, bucketID, label, "") + s.logger.Info().Str("method", "updateFilesCollection").Msgf("s3 read time %s", time.Since(start)) if err != nil { if err != s3.ErrNotFound { @@ -624,12 +552,9 @@ func (s *service) updateFilesCollection(ctx context.Context, operation s3.Operat Labels: []string{labelFilesCollection}, } - err = tracing.TraceFunctionSpan("s3 Read", ctx, func() (err error) { - start := time.Now() - fd, err = s.s3.Write(ctx, bucketID, no, &buf) - s.logger.Info().Str("method", "updateFilesCollection").Msgf("s3 write time %s", time.Since(start)) - return - }) + start = time.Now() + fd, err = s.s3.Write(ctx, bucketID, no, &buf) + s.logger.Info().Str("method", "updateFilesCollection").Msgf("s3 write time %s", time.Since(start)) if err != nil { s.logger.Error().Err(err).Msg("failed to write file collection file") diff --git a/service/tracing/tracing.go b/service/tracing/tracing.go index adbe914a..1722c536 100644 --- a/service/tracing/tracing.go +++ b/service/tracing/tracing.go @@ -69,13 +69,13 @@ func Middleware(h http.Handler) http.HandlerFunc { if err != nil { // If for whatever reason we can't join go ahead an start a new root span. log.Printf("TRACE NOT FOUND, %v", err) - sp = opentracing.StartSpan(spanName) + sp = opentracing.StartSpan(spanName) } else { log.Printf("TRACE FOUND") + sp = opentracing.StartSpan(spanName, opentracing.ChildOf(wireContext)) } - defer sp.Finish() r = r.WithContext(context.WithValue(r.Context(), spanContext{}, sp.Context())) @@ -107,9 +107,9 @@ func InjectTracerInRequest(ctx context.Context, r *http.Request) { // If possible tracer is extracted from request. // If you have no request handy, pass in nil func TraceFunctionSpan(name string, ctx context.Context, f func() error) error { + var sp opentracing.Span // Create new span if tracer is set - var sp opentracing.Span if tracerIsSet { sp = getSpan(name, ctx) defer sp.Finish() @@ -121,8 +121,8 @@ func TraceFunctionSpan(name string, ctx context.Context, f func() error) error { err := f() if err != nil && tracerIsSet { ext.Error.Set(sp, true) - sp.LogEventWithPayload(fmt.Sprintf("Error"), err) } + return err } diff --git a/storage/s3/s3.go b/storage/s3/s3.go index 650ee9cc..786d62f5 100644 --- a/storage/s3/s3.go +++ b/storage/s3/s3.go @@ -41,6 +41,8 @@ import ( "sort" "strings" + "github.com/iryonetwork/wwm/service/tracing" + "github.com/go-openapi/strfmt" "github.com/minio/minio-go/pkg/encrypt" "github.com/pkg/errors" @@ -146,245 +148,275 @@ func New(cfg *Config, keys KeyProvider, logger zerolog.Logger) (Storage, error) } // Check if bucket already exists -func (s *s3storage) BucketExists(_ context.Context, bucketID string) (bool, error) { - s.logger.Debug().Str("cmd", "s3::BucketExists").Msgf("('%s')", bucketID) +func (s *s3storage) BucketExists(ctx context.Context, bucketID string) (bool, error) { + exists := false + err := tracing.TraceFunctionSpan("s3::BucketExists", ctx, func() (err error) { - exists, err := s.client.BucketExists(bucketID) - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::BucketExists").Msg("Failed to check if bucket exists") - return false, errors.Wrap(err, "Failed to check if bucket exists") - } + s.logger.Debug().Str("cmd", "s3::BucketExists").Msgf("('%s')", bucketID) - return exists, nil + exists, err = s.client.BucketExists(bucketID) + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::BucketExists").Msg("Failed to check if bucket exists") + return errors.Wrap(err, "Failed to check if bucket exists") + } + + return nil + + }) + return exists, err } // MakeBucket creates a bucket, return ErrAlreadyExists if bucket already exists -func (s *s3storage) MakeBucket(_ context.Context, bucketID string) error { - s.logger.Debug().Str("cmd", "s3::MakeBucket").Msgf("('%s')", bucketID) +func (s *s3storage) MakeBucket(ctx context.Context, bucketID string) error { + return tracing.TraceFunctionSpan("s3::MakeBucket", ctx, func() (err error) { - exists, err := s.client.BucketExists(bucketID) - if err != nil { - return errors.Wrap(err, "Failed to check if bucket exists") - } - if exists { - return ErrAlreadyExists - } + s.logger.Debug().Str("cmd", "s3::MakeBucket").Msgf("('%s')", bucketID) - if !exists { - if err := s.client.MakeBucket(bucketID, s.cfg.Region); err != nil && strings.Contains(err.Error(), bucketExistsErrMsg) { - s.logger.Debug().Err(err).Msg("Looks like bucket actually existed when MakeBucket was called") + exists, err := s.client.BucketExists(bucketID) + if err != nil { + return errors.Wrap(err, "Failed to check if bucket exists") + } + if exists { return ErrAlreadyExists - } else if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::MakeBucket").Msg("Failed to create a new bucket") - return errors.Wrap(err, "Failed to create a new bucket") } - } - - return nil + if !exists { + if err := s.client.MakeBucket(bucketID, s.cfg.Region); err != nil && strings.Contains(err.Error(), bucketExistsErrMsg) { + s.logger.Debug().Err(err).Msg("Looks like bucket actually existed when MakeBucket was called") + return ErrAlreadyExists + } else if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::MakeBucket").Msg("Failed to create a new bucket") + return errors.Wrap(err, "Failed to create a new bucket") + } + } + return nil + }) } // ListBuckets returns a list of buckets -func (s *s3storage) ListBuckets(_ context.Context) ([]*models.BucketDescriptor, error) { - s.logger.Debug().Str("cmd", "s3::ListBuckets") +func (s *s3storage) ListBuckets(ctx context.Context) ([]*models.BucketDescriptor, error) { + var buckets []*models.BucketDescriptor + err := tracing.TraceFunctionSpan("s3::ListBuckets", ctx, func() (err error) { - b, err := s.client.ListBuckets() + s.logger.Debug().Str("cmd", "s3::ListBuckets") - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::ListBuckets").Msg("Failed to list buckets") - return nil, errors.Wrap(err, "Failed to list buckets") - } + b, err := s.client.ListBuckets() - buckets := []*models.BucketDescriptor{} - for _, info := range b { - bd, err := bucketInfoToBucketDescriptor(info) if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::ListBuckets").Msg("Failed to convert bucketInfo to bucketDescriptor") - return nil, errors.Wrap(err, "Failed to convert bucketInfo to bucketDescriptor") + s.logger.Info().Err(err).Str("cmd", "s3::ListBuckets").Msg("Failed to list buckets") + return errors.Wrap(err, "Failed to list buckets") } - buckets = append(buckets, bd) - } - return buckets, nil -} + for _, info := range b { + bd, err := bucketInfoToBucketDescriptor(info) + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::ListBuckets").Msg("Failed to convert bucketInfo to bucketDescriptor") + return errors.Wrap(err, "Failed to convert bucketInfo to bucketDescriptor") + } + buckets = append(buckets, bd) + } -// List returns a list of files stored inside a bucket -func (s *s3storage) List(_ context.Context, bucketID, prefix string) ([]*models.FileDescriptor, error) { - s.logger.Debug().Str("cmd", "s3::List").Msgf("('%s', '%s')", bucketID, prefix) + return nil - // Check if bucket exists first - exists, err := s.client.BucketExists(bucketID) - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::List").Msg("Failed to check if bucket exists") - return nil, errors.Wrap(err, "Failed to check if bucket exists") - } - if !exists { - // Nothing to list - return []*models.FileDescriptor{}, nil - } + }) + return buckets, err +} - ch := make(chan struct{}) - defer close(ch) - infos := s.client.ListObjectsV2(bucketID, prefix, false, ch) +// List returns a list of files stored inside a bucket +func (s *s3storage) List(ctx context.Context, bucketID, prefix string) ([]*models.FileDescriptor, error) { + var files []*models.FileDescriptor + err := tracing.TraceFunctionSpan("s3::List", ctx, func() (err error) { - files := []*models.FileDescriptor{} - for info := range infos { - if info.Err != nil { - s.logger.Info().Err(info.Err).Str("cmd", "s3::List").Msg("Failed to read object from a list") - return nil, errors.Wrap(info.Err, "Failed to read object from a list") - } + s.logger.Debug().Str("cmd", "s3::List").Msgf("('%s', '%s')", bucketID, prefix) - fd, err := objectInfoToFileDescriptor(info, bucketID) + // Check if bucket exists first + exists, err := s.client.BucketExists(bucketID) if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::List").Msg("Failed to convert object to fileDescriptor") - return nil, errors.Wrap(err, "Failed to convert object to fileDescriptor") + s.logger.Info().Err(err).Str("cmd", "s3::List").Msg("Failed to check if bucket exists") + return errors.Wrap(err, "Failed to check if bucket exists") + } + if !exists { + // Nothing to list + return nil } - files = append(files, fd) - } + ch := make(chan struct{}) + defer close(ch) + infos := s.client.ListObjectsV2(bucketID, prefix, false, ch) + + for info := range infos { + if info.Err != nil { + s.logger.Info().Err(info.Err).Str("cmd", "s3::List").Msg("Failed to read object from a list") + return errors.Wrap(info.Err, "Failed to read object from a list") + } - sort.Sort(byCreated(files)) - return files, nil + fd, err := objectInfoToFileDescriptor(info, bucketID) + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::List").Msg("Failed to convert object to fileDescriptor") + return errors.Wrap(err, "Failed to convert object to fileDescriptor") + } + + files = append(files, fd) + } + + sort.Sort(byCreated(files)) + return nil + }) + return files, err } // Read fetches contents from the storage func (s *s3storage) Read(ctx context.Context, bucketID, fileID, version string) (io.ReadCloser, *models.FileDescriptor, error) { - s.logger.Debug().Str("cmd", "s3::Read").Msgf("('%s', '%s', '%s')", bucketID, fileID, version) + var fd *models.FileDescriptor + var reader io.ReadCloser + err := tracing.TraceFunctionSpan("s3::Read", ctx, func() (err error) { - // find the file - prefix := fmt.Sprintf("%s.", fileID) - if version != "" { - prefix += fmt.Sprintf("%s.", version) - } - list, err := s.List(ctx, bucketID, prefix) - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::Read").Msg("Failed to list files") - return nil, nil, errors.Wrap(err, "Failed to list files") - } - if len(list) == 0 { - return nil, nil, ErrNotFound - } - md, err := metadataFromFileDescriptor(list[0]) - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::Read").Msg("Failed to parse metadata from fileDescriptor") - return nil, nil, errors.Wrap(err, "Failed to parse metadata from fileDescriptor") - } + s.logger.Debug().Str("cmd", "s3::Read").Msgf("('%s', '%s', '%s')", bucketID, fileID, version) - // read the key - em, err := getCBCKey(bucketID, s.keys) - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::Read").Msg("Failed to set CBC key") - return nil, nil, errors.Wrap(err, "Failed to set CBC key") - } + // find the file + prefix := fmt.Sprintf("%s.", fileID) + if version != "" { + prefix += fmt.Sprintf("%s.", version) + } + list, err := s.List(ctx, bucketID, prefix) + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::Read").Msg("Failed to list files") + return errors.Wrap(err, "Failed to list files") + } + if len(list) == 0 { + return ErrNotFound + } + md, err := metadataFromFileDescriptor(list[0]) + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::Read").Msg("Failed to parse metadata from fileDescriptor") + return errors.Wrap(err, "Failed to parse metadata from fileDescriptor") + } - // fetch the file - reader, err := s.client.GetObjectWithContext(ctx, bucketID, md.String(), minio.GetObjectOptions{Materials: em}) + // read the key + em, err := getCBCKey(bucketID, s.keys) + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::Read").Msg("Failed to set CBC key") + return errors.Wrap(err, "Failed to set CBC key") + } - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::Read").Msg("Failed to fetch enc. object") - return nil, nil, errors.Wrap(err, "Failed to fetch enc. object") - } + // fetch the file + reader, err = s.client.GetObjectWithContext(ctx, bucketID, md.String(), minio.GetObjectOptions{Materials: em}) - return reader, list[0], nil + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::Read").Msg("Failed to fetch enc. object") + return errors.Wrap(err, "Failed to fetch enc. object") + } + + fd = list[0] + return nil + }) + return reader, fd, err } // Write creates a new file in the storage -func (s *s3storage) Write(ctx context.Context, bucketID string, newFile *object.NewObjectInfo, r io.Reader) (*models.FileDescriptor, error) { - s.logger.Debug().Str("cmd", "s3::Write").Msgf("('%s', '%+v', reader)", bucketID, newFile) - - // validate operation - op := Operation(newFile.Operation) - if op != Write && op != Delete { - s.logger.Info().Str("cmd", "s3::Write").Msgf("Received an invalid operation '%s'", op) - return nil, fmt.Errorf("Received an invalid operation '%s'", op) - } +func (s *s3storage) Write(ctx context.Context, bucketID string, newFile *object.NewObjectInfo, r io.Reader) (fd *models.FileDescriptor, err error) { + err = tracing.TraceFunctionSpan("s3::Write", ctx, func() (err error) { - // get the key - em, err := getCBCKey(bucketID, s.keys) - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::Write").Msg("Failed to set the CBC key") - return nil, errors.Wrap(err, "Failed to set the CBC key") - } + s.logger.Debug().Str("cmd", "s3::Write").Msgf("('%s', '%+v', reader)", bucketID, newFile) - // collect meta data - meta, err := metadataFromNewFile(newFile) - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::Write").Msg("Failed to collect metadata from new file") - return nil, errors.Wrap(err, "Failed to collect metadata from new file") - } + // validate operation + op := Operation(newFile.Operation) + if op != Write && op != Delete { + s.logger.Info().Str("cmd", "s3::Write").Msgf("Received an invalid operation '%s'", op) + return fmt.Errorf("Received an invalid operation '%s'", op) + } - // upload the file - _, err = s.client.PutObjectWithContext(ctx, bucketID, meta.String(), r, -1, minio.PutObjectOptions{EncryptMaterials: em}) - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::Write").Msg("Failed to call PutObject") - return nil, errors.Wrap(err, "Failed to call PutObjectWithContext") - } + // get the key + em, err := getCBCKey(bucketID, s.keys) + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::Write").Msg("Failed to set the CBC key") + return errors.Wrap(err, "Failed to set the CBC key") + } - // generate the file descriptor - fd := &models.FileDescriptor{ - Name: newFile.Name, - Version: newFile.Version, - Archetype: newFile.Archetype, - ContentType: newFile.ContentType, - Checksum: newFile.Checksum, - Created: newFile.Created, - Labels: newFile.Labels, - Path: fmt.Sprintf("%s/%s/%s", bucketID, meta.filename, meta.version), - Size: newFile.Size, - Operation: string(op), - } + // collect meta data + meta, err := metadataFromNewFile(newFile) + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::Write").Msg("Failed to collect metadata from new file") + return errors.Wrap(err, "Failed to collect metadata from new file") + } - return fd, nil + // upload the file + _, err = s.client.PutObjectWithContext(ctx, bucketID, meta.String(), r, -1, minio.PutObjectOptions{EncryptMaterials: em}) + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::Write").Msg("Failed to call PutObject") + return errors.Wrap(err, "Failed to call PutObjectWithContext") + } + + // generate the file descriptor + fd = &models.FileDescriptor{ + Name: newFile.Name, + Version: newFile.Version, + Archetype: newFile.Archetype, + ContentType: newFile.ContentType, + Checksum: newFile.Checksum, + Created: newFile.Created, + Labels: newFile.Labels, + Path: fmt.Sprintf("%s/%s/%s", bucketID, meta.filename, meta.version), + Size: newFile.Size, + Operation: string(op), + } + + return nil + }) + return fd, err } // Delete removes files completely from storage, used only in case of conflicting files with the same ID and version -func (s *s3storage) Delete(_ context.Context, bucketID, fileID, version string) error { - s.logger.Debug().Str("cmd", "s3::Delete").Msgf("('%s', '%s', '%s')", bucketID, fileID, version) +func (s *s3storage) Delete(ctx context.Context, bucketID, fileID, version string) error { + return tracing.TraceFunctionSpan("s3::Write", ctx, func() (err error) { - // Check if bucket exists first - exists, err := s.client.BucketExists(bucketID) - if err != nil { - s.logger.Info().Err(err).Str("cmd", "s3::Delete").Msg("Failed to check if bucket exists") - return errors.Wrap(err, "Failed to check if bucket exists") - } - if !exists { - // Nothing to delete - return nil - } + s.logger.Debug().Str("cmd", "s3::Delete").Msgf("('%s', '%s', '%s')", bucketID, fileID, version) - // Set object prefix - prefix := fmt.Sprintf("%s.", fileID) - if version != "" { - prefix += fmt.Sprintf("%s.", version) - } + // Check if bucket exists first + exists, err := s.client.BucketExists(bucketID) + if err != nil { + s.logger.Info().Err(err).Str("cmd", "s3::Delete").Msg("Failed to check if bucket exists") + return errors.Wrap(err, "Failed to check if bucket exists") + } + if !exists { + // Nothing to delete + return nil + } - // first objects keys will be saved to array to prevent deleting any if listing fails - objKeys := []string{} - for info := range s.client.ListObjectsV2(bucketID, prefix, false, nil) { - if info.Err != nil { - s.logger.Error().Err(info.Err).Str("cmd", "s3::Delete").Msg("Failed to list all objects") - return errors.Wrap(info.Err, "Failed to list all objects") + // Set object prefix + prefix := fmt.Sprintf("%s.", fileID) + if version != "" { + prefix += fmt.Sprintf("%s.", version) } - objKeys = append(objKeys, info.Key) - } - // make objects channel - ch := make(chan string, len(objKeys)) - for _, objKey := range objKeys { - ch <- objKey - } - close(ch) + // first objects keys will be saved to array to prevent deleting any if listing fails + objKeys := []string{} + for info := range s.client.ListObjectsV2(bucketID, prefix, false, nil) { + if info.Err != nil { + s.logger.Error().Err(info.Err).Str("cmd", "s3::Delete").Msg("Failed to list all objects") + return errors.Wrap(info.Err, "Failed to list all objects") + } + objKeys = append(objKeys, info.Key) + } - for removeObjErr := range s.client.RemoveObjects(bucketID, ch) { - err = removeObjErr.Err - s.logger.Error().Err(err).Str("cmd", "s3::Delete").Msg("Failed to delete the object") - } + // make objects channel + ch := make(chan string, len(objKeys)) + for _, objKey := range objKeys { + ch <- objKey + } + close(ch) - if err != nil { - return errors.New("Failed to delete all matching objects") - } + for removeObjErr := range s.client.RemoveObjects(bucketID, ch) { + err = removeObjErr.Err + s.logger.Error().Err(err).Str("cmd", "s3::Delete").Msg("Failed to delete the object") + } + + if err != nil { + return errors.New("Failed to delete all matching objects") + } - return nil + return nil + }) } func objectInfoToFileDescriptor(info minio.ObjectInfo, bucketID string) (*models.FileDescriptor, error) { From b7e12c66318324e149de1534e938a875ec676f3b Mon Sep 17 00:00:00 2001 From: j-justin Date: Fri, 21 Sep 2018 15:38:43 +0200 Subject: [PATCH 8/8] FIX tests not passing --- storage/s3/s3.go | 1 + 1 file changed, 1 insertion(+) diff --git a/storage/s3/s3.go b/storage/s3/s3.go index 786d62f5..98d53b54 100644 --- a/storage/s3/s3.go +++ b/storage/s3/s3.go @@ -237,6 +237,7 @@ func (s *s3storage) List(ctx context.Context, bucketID, prefix string) ([]*model } if !exists { // Nothing to list + files = []*models.FileDescriptor{} return nil }