diff --git a/src/batch_exporter.hpp b/src/batch_exporter.hpp index a2e65b12..285024be 100644 --- a/src/batch_exporter.hpp +++ b/src/batch_exporter.hpp @@ -21,6 +21,10 @@ class BatchExporter { opentelemetry::trace::SpanId parent; uint64_t start; uint64_t end; + enum SpanKind{ + CLIENT = opentelemetry::proto::trace::v1::Span::SPAN_KIND_CLIENT, + SERVER = opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER + } type; }; class Span { @@ -32,7 +36,7 @@ class BatchExporter { : span(span) { span->set_kind( - opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER); + (opentelemetry::proto::trace::v1::Span::SpanKind) info.type); // Short setters, like set_name(), use additional std::string as an // intermediary at least up to v21.5 of protobuf. diff --git a/src/http_module.cpp b/src/http_module.cpp index 78a5e896..4681f3cc 100644 --- a/src/http_module.cpp +++ b/src/http_module.cpp @@ -17,6 +17,20 @@ struct OtelCtx { TraceContext current; }; +struct OtelUpstreamCtx { + OtelCtx *reqCtx; + ngx_http_request_t *req; + void *data; // original peer conn data + ngx_uint_t childProcessingStart; + ngx_int_t (*originalGetRequest) (ngx_peer_connection_t *pc, void *data); + void (*originalFreeRequest)(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state); + void (*originalNotify) (ngx_peer_connection_t *pc, void *data, + ngx_uint_t type); + ngx_int_t (*originalSetSession) (ngx_peer_connection_t *pc, void *data); + void (*originalSaveSession)(ngx_peer_connection_t *pc, void *data); +}; + struct MainConfBase { ngx_str_t endpoint; ngx_msec_t interval; @@ -46,11 +60,17 @@ struct LocationConf { ngx_array_t spanAttrs; }; +struct ServerConf { + ngx_http_upstream_init_pt original_init_upstream; + ngx_http_upstream_init_peer_pt original_init_peer; +}; + char* setExporter(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); char* addResourceAttr(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); char* addSpanAttr(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); char* setTrustedCertificate(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); char* addExporterHeader(ngx_conf_t* cf, ngx_command_t* cmd, void* conf); +char* onUpstreamConfiguration(ngx_conf_t* cf, ngx_command_t *cmd, void *conf); namespace Propagation { @@ -107,6 +127,11 @@ ngx_command_t gCommands[] = { addSpanAttr, NGX_HTTP_LOC_CONF_OFFSET }, + { ngx_string("otel_upstream_span_enable"), + NGX_HTTP_UPS_CONF|NGX_CONF_NOARGS, + onUpstreamConfiguration, + NGX_HTTP_SRV_CONF_OFFSET }, + ngx_null_command }; @@ -159,6 +184,11 @@ ngx_str_t toNgxStr(StrView str) return ngx_str_t{str.size(), (u_char*)str.data()}; } +uint64_t toNanoSec(time_t sec, ngx_msec_t msec) +{ + return (sec * 1000 + msec) * 1000000; +} + bool iremovePrefix(ngx_str_t* str, StrView p) { if (str->len >= p.size() && @@ -171,6 +201,10 @@ bool iremovePrefix(ngx_str_t* str, StrView p) return false; } +void cleanupOtelCtx(void* data) +{ +} + MainConf* getMainConf(ngx_conf_t* cf) { return static_cast( @@ -188,8 +222,21 @@ LocationConf* getLocationConf(ngx_http_request_t* r) return (LocationConf*)ngx_http_get_module_loc_conf(r, gHttpModule); } -void cleanupOtelCtx(void* data) +OtelCtx* createOtelCtx(ngx_http_request_t* r) { + static_assert(std::is_trivially_destructible::value, ""); + + auto storage = ngx_pool_cleanup_add(r->pool, sizeof(OtelCtx)); + if (storage == NULL) { + return NULL; + } + + storage->handler = cleanupOtelCtx; + + auto ctx = new (storage->data) OtelCtx{}; + ngx_http_set_ctx(r, ctx, gHttpModule); + + return ctx; } OtelCtx* getOtelCtx(ngx_http_request_t* r) @@ -198,11 +245,26 @@ OtelCtx* getOtelCtx(ngx_http_request_t* r) // restore module context if it was reset by e.g. internal redirect if (ctx == NULL && (r->internal || r->filter_finalize)) { - for (auto cln = r->pool->cleanup; cln; cln = cln->next) { if (cln->handler == cleanupOtelCtx) { - ctx = (OtelCtx*)cln->data; - ngx_http_set_ctx(r, ctx, gHttpModule); + + // restore module context if it was reset by finalize filter + if (r->filter_finalize) { + ctx = (OtelCtx*)cln->data; + ngx_http_set_ctx(r, ctx, gHttpModule); + + // create child context if it was reset by internal redirect + } else if (r->internal) { + auto ctx_orig = (OtelCtx*)cln->data; + ctx = createOtelCtx(r); + if (ctx == NULL) { + return NULL; + } + ctx->parent = ctx_orig->current; + ctx->current = + TraceContext::generate(false, ctx->parent); + } + break; } } @@ -211,23 +273,6 @@ OtelCtx* getOtelCtx(ngx_http_request_t* r) return ctx; } -OtelCtx* createOtelCtx(ngx_http_request_t* r) -{ - static_assert(std::is_trivially_destructible::value, ""); - - auto storage = ngx_pool_cleanup_add(r->pool, sizeof(OtelCtx)); - if (storage == NULL) { - return NULL; - } - - storage->handler = cleanupOtelCtx; - - auto ctx = new (storage->data) OtelCtx{}; - ngx_http_set_ctx(r, ctx, gHttpModule); - - return ctx; -} - ngx_table_elt_t* findHeader(ngx_list_t* list, ngx_uint_t hash, StrView key) { auto part = &list->part; @@ -338,6 +383,10 @@ OtelCtx* ensureOtelCtx(ngx_http_request_t* r) return ctx; } + if (r->internal) { + return NULL; + } + ctx = createOtelCtx(r); if (!ctx) { return NULL; @@ -355,14 +404,9 @@ OtelCtx* ensureOtelCtx(ngx_http_request_t* r) ngx_int_t onRequestStart(ngx_http_request_t* r) { - // don't let internal redirects to override sampling decision - if (r->internal) { - return NGX_DECLINED; - } - bool sampled = false; - auto lcf = getLocationConf(r); + if (lcf->trace != NULL) { ngx_str_t trace; if (ngx_http_complex_value(r, lcf->trace, &trace) != NGX_OK) { @@ -372,7 +416,7 @@ ngx_int_t onRequestStart(ngx_http_request_t* r) sampled = toStrView(trace) == "on" || toStrView(trace) == "1"; } - if (!lcf->traceContext && !sampled) { + if (!lcf->traceContext && !sampled && !r->internal) { return NGX_DECLINED; } @@ -381,6 +425,9 @@ ngx_int_t onRequestStart(ngx_http_request_t* r) return NGX_ERROR; } + if (r->internal) { + sampled = ctx->parent.sampled; + } ctx->current.sampled = sampled; ngx_int_t rc = NGX_OK; @@ -405,22 +452,19 @@ StrView getServerName(ngx_http_request_t* r) return toStrView(name); } -void addDefaultAttrs(BatchExporter::Span& span, ngx_http_request_t* r) +void addDefaultSrvAttrs(BatchExporter::Span& span, ngx_http_request_t* r) { // based on trace semantic conventions for HTTP from 1.16.0 OTel spec span.add("http.method", toStrView(r->method_name)); - span.add("http.target", toStrView(r->unparsed_uri)); - + span.add("http.scheme", r->connection->ssl ? "https" : "http"); auto clcf = (ngx_http_core_loc_conf_t*) ngx_http_get_module_loc_conf(r, ngx_http_core_module); if (clcf->name.len) { span.add("http.route", toStrView(clcf->name)); } - span.add("http.scheme", r->connection->ssl ? "https" : "http"); - auto protocol = toStrView(r->http_protocol); if (protocol.size() > 5) { // "HTTP/" span.add("http.flavor", protocol.substr(5)); @@ -433,20 +477,9 @@ void addDefaultAttrs(BatchExporter::Span& span, ngx_http_request_t* r) auto received = r->headers_in.content_length_n; span.add("http.request_content_length", received > 0 ? received : 0); - auto sent = r->connection->sent - (off_t)r->header_size; - span.add("http.response_content_length", sent > 0 ? sent : 0); - - auto status = r->err_status ? r->err_status : r->headers_out.status; - if (status) { - span.add("http.status_code", status); - - if (status >= 500) { - span.setError(); - } - } - span.add("net.host.name", getServerName(r)); - + span.add("net.sock.peer.addr", toStrView(r->connection->addr_text)); + span.add("net.sock.peer.port", ngx_inet_get_port(r->connection->sockaddr)); if (ngx_connection_local_sockaddr(r->connection, NULL, 0) == NGX_OK) { auto port = ngx_inet_get_port(r->connection->local_sockaddr); auto defaultPort = r->connection->ssl ? 443 : 80; @@ -456,12 +489,100 @@ void addDefaultAttrs(BatchExporter::Span& span, ngx_http_request_t* r) } } - span.add("net.sock.peer.addr", toStrView(r->connection->addr_text)); - span.add("net.sock.peer.port", ngx_inet_get_port(r->connection->sockaddr)); + auto sent = r->connection->sent - (off_t)r->header_size; + span.add("http.response_content_length", sent > 0 ? sent : 0); + + auto status = r->err_status ? r->err_status : r->headers_out.status; + if (status) { + span.add("http.status_code", status); + + if (status >= 500) { + span.setError(); + } + } } -StrView getSpanName(ngx_http_request_t* r) +void addDefaultClientAttrs(BatchExporter::Span& span, ngx_http_request_t* r) { + if (!r->upstream->peer.name) { + throw std::runtime_error("upstream peer has no name"); + } + + if (!r->upstream->schema.data || !r->upstream->schema.len) { + throw std::runtime_error("upstream has no schema"); + } + + if (!r->upstream->uri.data || !r->upstream->uri.len) { + throw std::runtime_error("upstream has no uri"); + } + + if (!r->upstream->upstream || !r->upstream->upstream->host.len) { + throw std::runtime_error("upstream has no upstream or empty host"); + } + + uint64_t addr_len = 0; + // set len at last instance of ':' + for (int i = 0; i < r->upstream->peer.name->len; i++) { + if (r->upstream->peer.name->data[i] == ':') { + addr_len = i; + } + } + + addr_len = addr_len ? addr_len : r->upstream->peer.name->len; + StrView addr((const char *) r->upstream->peer.name->data, addr_len); + span.add("server.address", addr); + + if (addr_len < r->upstream->peer.name->len) { + uint64_t port_len = (r->upstream->peer.name->len - addr_len) + 1; + StrView port((const char *) r->upstream->peer.name->data + addr_len + 1, + port_len); + span.add("server.port", port); + } + + uint64_t url_len = r->upstream->schema.len + + r->upstream->upstream->host.len + + r->upstream->uri.len + 3; + char *url_buf = (char *) ngx_palloc(r->pool, url_len); + if (!ngx_cpystrn((u_char *) url_buf, r->upstream->schema.data, + r->upstream->schema.len + 1)) { + throw std::runtime_error("failed to copy URL schema"); + } + char *cursor = url_buf + r->upstream->schema.len + 1; + if (!ngx_cpystrn((u_char *) cursor, r->upstream->upstream->host.data, + r->upstream->upstream->host.len + 1)) { + throw std::runtime_error("failed to copy URL host"); + } + cursor += r->upstream->upstream->host.len + 1; + if (!ngx_cpystrn((u_char *) cursor, r->upstream->uri.data, + r->upstream->uri.len + 1)) { + throw std::runtime_error("failed to copy URL path"); + } + + StrView url(url_buf, url_len); + span.add("url.full", url); + + auto sent = r->connection->sent - (off_t)r->header_size; + span.add("http.response_content_length", sent > 0 ? sent : 0); + + if (r->upstream->method.len) { + span.add("http.request.method", toStrView(r->upstream->method)); + } else { + span.add("http.request.method", toStrView(r->method_name)); + } + + auto status = r->err_status ? r->err_status : r->headers_out.status; + if (status) { + span.add("http.status_code", status); + + if (status >= 500) { + span.setError(); + } + } +} + +StrView getSrvSpanName( + ngx_http_request_t* r +) { auto lcf = getLocationConf(r); if (lcf->spanName) { @@ -469,12 +590,11 @@ StrView getSpanName(ngx_http_request_t* r) if (ngx_http_complex_value(r, lcf->spanName, &result) != NGX_OK) { throw std::runtime_error("failed to compute complex value"); } - return toStrView(result); + } else { auto clcf = (ngx_http_core_loc_conf_t*) ngx_http_get_module_loc_conf(r, ngx_http_core_module); - return toStrView(clcf->name); } } @@ -502,6 +622,21 @@ void addCustomAttrs(BatchExporter::Span& span, ngx_http_request_t* r) } } +// bool return for convenience in short circuiting +bool log_drop(ngx_http_request_t *r) { + static size_t dropped = 0; + static time_t lastLog = 0; + + ++dropped; + if (lastLog != ngx_time()) { + lastLog = ngx_time(); + ngx_log_error(NGX_LOG_NOTICE, r->connection->log, 0, + "OTel dropped records: %uz", dropped); + } + + return true; +} + ngx_int_t onRequestEnd(ngx_http_request_t* r) { auto ctx = getOtelCtx(r); @@ -511,31 +646,21 @@ ngx_int_t onRequestEnd(ngx_http_request_t* r) auto now = ngx_timeofday(); - auto toNanoSec = [](time_t sec, ngx_msec_t msec) -> uint64_t { - return (sec * 1000 + msec) * 1000000; - }; + // subrequests can only make upstream spans + if (r->internal) { + return NGX_DECLINED; + } try { - BatchExporter::SpanInfo info{ - getSpanName(r), ctx->current, ctx->parent.spanId, + BatchExporter::SpanInfo server{ + getSrvSpanName(r), ctx->current, ctx->parent.spanId, toNanoSec(r->start_sec, r->start_msec), - toNanoSec(now->sec, now->msec)}; - - bool ok = gExporter->add(info, [r](BatchExporter::Span& span) { - addDefaultAttrs(span, r); + toNanoSec(now->sec, now->msec), + BatchExporter::SpanInfo::SpanKind::SERVER}; + gExporter->add(server, [r](BatchExporter::Span& span) { + addDefaultSrvAttrs(span, r); addCustomAttrs(span, r); - }); - - if (!ok) { - static size_t dropped = 0; - static time_t lastLog = 0; - ++dropped; - if (lastLog != ngx_time()) { - lastLog = ngx_time(); - ngx_log_error(NGX_LOG_NOTICE, r->connection->log, 0, - "OTel dropped records: %uz", dropped); - } - } + }) || log_drop(r); } catch (const std::exception& e) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, @@ -546,6 +671,151 @@ ngx_int_t onRequestEnd(ngx_http_request_t* r) return NGX_DECLINED; } +ngx_int_t onUpstreamStart(ngx_peer_connection_t *pc, void *data) +{ + OtelUpstreamCtx *d = (OtelUpstreamCtx *) data; + auto now = ngx_timeofday(); + + if (!d) { + return NGX_ERROR; + } + + d->childProcessingStart = (now->sec * 1000 + now->msec) * 1000000; + + if (d->originalGetRequest) { + return d->originalGetRequest(pc, d->data); + } else { + return NGX_OK; + } +} + +void onUpstreamFinish(ngx_peer_connection_t *pc, void *data, ngx_uint_t state) +{ + OtelUpstreamCtx *d = (OtelUpstreamCtx *) data; + if (!d) { + return; + } + + auto now = ngx_timeofday(); + if (d->reqCtx && d->req) { + auto childContext = \ + TraceContext::generate(d->reqCtx->current.sampled, d->reqCtx->current); + BatchExporter::SpanInfo child{ + toStrView(d->req->uri), childContext, d->reqCtx->current.spanId, + d->childProcessingStart, toNanoSec(now->sec, now->msec), + BatchExporter::SpanInfo::SpanKind::CLIENT}; + gExporter->add(child, [d](BatchExporter::Span& span) { + addDefaultClientAttrs(span, d->req); + addCustomAttrs(span, d->req); + }) || log_drop(d->req); + } + + if (d->originalFreeRequest) { + d->originalFreeRequest(pc, d->data, state); + } +} + +void onUpstreamNotify(ngx_peer_connection_t *pc, void *data, ngx_uint_t type) +{ + OtelUpstreamCtx *d = (OtelUpstreamCtx *) data; + if (d && d->originalNotify) { + d->originalNotify(pc, d->data, type); + } +} + +ngx_int_t onUpstreamSetSes(ngx_peer_connection_t *pc, void *data) +{ + OtelUpstreamCtx *d = (OtelUpstreamCtx *) data; + if (d && d->originalSetSession) { + return d->originalSetSession(pc, d->data); + } + + // This path should never happen + return NGX_ERROR; +} + +void onUpstreamSaveSes(ngx_peer_connection_t *pc, void *data) +{ + OtelUpstreamCtx *d = (OtelUpstreamCtx *) data; + if (d && d->originalSaveSession) { + d->originalSaveSession(pc, d->data); + } +} + +ngx_int_t +onUpstreamInitPeer(ngx_http_request_t *r, ngx_http_upstream_srv_conf_t *us) +{ + if (!r->upstream) { + return NGX_DECLINED; + } + + // run original init func + auto kcf = (ServerConf *) ngx_http_conf_upstream_srv_conf(us, gHttpModule); + if (kcf->original_init_peer && kcf->original_init_peer(r, us) != NGX_OK) { + return NGX_ERROR; + } + + auto ctx = getOtelCtx(r); + if (!ctx) { + return NGX_ERROR; + } + + if (ctx->current.sampled) { + auto dat = (OtelUpstreamCtx *) ngx_palloc(r->pool, sizeof(OtelUpstreamCtx)); + dat->data = r->upstream->peer.data; + dat->req = r; + dat->reqCtx = ctx; + dat->originalGetRequest = r->upstream->peer.get; + dat->originalFreeRequest = r->upstream->peer.free; + dat->originalNotify = r->upstream->peer.notify; + + r->upstream->peer.data = dat; + r->upstream->peer.get = onUpstreamStart; + r->upstream->peer.free = onUpstreamFinish; + r->upstream->peer.notify = onUpstreamNotify; + + dat->originalSetSession = r->upstream->peer.set_session; + dat->originalSaveSession = r->upstream->peer.save_session; + r->upstream->peer.set_session = onUpstreamSetSes; + r->upstream->peer.save_session = onUpstreamSaveSes; + } + + return NGX_OK; +} + +ngx_int_t +onUpstreamInit(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us) +{ + ServerConf *conf = + (ServerConf *) ngx_http_conf_upstream_srv_conf(us, gHttpModule); + + if (conf->original_init_upstream + && conf->original_init_upstream(cf, us) != NGX_OK) { + return NGX_ERROR; + } + + conf->original_init_peer = us->peer.init; + us->peer.init = onUpstreamInitPeer; + + return NGX_OK; +} + +char * +onUpstreamConfiguration(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_http_upstream_srv_conf_t *uscf; + ServerConf *sc = (ServerConf *) conf; + + uscf = (ngx_http_upstream_srv_conf_t *) + ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module); + sc->original_init_upstream = + (ngx_http_upstream_init_pt) uscf->peer.init_upstream + ? uscf->peer.init_upstream + : ngx_http_upstream_init_round_robin; + uscf->peer.init_upstream = &onUpstreamInit; + return NGX_CONF_OK; +} + ngx_int_t initModule(ngx_conf_t* cf) { auto cmcf = (ngx_http_core_main_conf_t*)ngx_http_conf_get_module_main_conf( @@ -972,6 +1242,12 @@ char* mergeLocationConf(ngx_conf_t* cf, void* parent, void* child) return NGX_CONF_OK; } +void * +createSrvConf(ngx_conf_t *cf) +{ + return ngx_pcalloc(cf->pool, sizeof(ServerConf)); +} + ngx_http_module_t gHttpModuleCtx = { addVariables, /* preconfiguration */ initModule, /* postconfiguration */ @@ -979,7 +1255,7 @@ ngx_http_module_t gHttpModuleCtx = { createMainConf, /* create main configuration */ initMainConf, /* init main configuration */ - NULL, /* create server configuration */ + createSrvConf, /* create server configuration */ NULL, /* merge server configuration */ createLocationConf, /* create location configuration */ diff --git a/tests/test_otel.py b/tests/test_otel.py index fef771a5..f1468ddf 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -32,6 +32,11 @@ otel_trace on; {{ resource_attrs }} + upstream uptest { + otel_upstream_span_enable; + server 127.0.0.1:18080; + } + server { listen 127.0.0.1:18443 ssl; listen 127.0.0.1:18443 quic; @@ -93,6 +98,12 @@ add_header "X-Otel-Tracestate" $http_tracestate; return 204; } + + location /upstream_trace { + otel_trace on; + otel_trace_context inject; + proxy_pass http://uptest/ok; + } } } @@ -140,7 +151,7 @@ def get_http09(host, port, path): assert get_http09("127.0.0.1", 18080, "/ok") == "OK" - span = trace_service.get_span() + span = trace_service.get_span(1) assert span.name == "/ok" @@ -154,8 +165,7 @@ def test_default_attributes(client, trace_service, http_ver, path, status): if http_ver == "3.0": client.quic_cache_layer.add_domain("127.0.0.1", port) r = client.get(f"{scheme}://127.0.0.1:{port}{path}", verify=False) - - span = trace_service.get_span() + span = trace_service.get_span(1) assert span.name == path assert get_attr(span, "http.method") == "GET" @@ -178,7 +188,7 @@ def test_default_attributes(client, trace_service, http_ver, path, status): def test_custom_attributes(client, trace_service): assert client.get("http://127.0.0.1:18080/custom").status_code == 200 - span = trace_service.get_span() + span = trace_service.get_span(1) assert span.name == "custom_location" assert get_attr(span, "http.request.completion") == "OK" @@ -193,12 +203,22 @@ def test_trace_off(client, trace_service): time.sleep(0.01) # wait for spans assert len(trace_service.batches) == 0 +def test_upstream_tracing(client, trace_service): + assert client.get("http://127.0.0.1:18080/upstream_trace").status_code == 200 + spans = trace_service.get_span(3) + client_spans = [x for x in spans if x.kind.__str__() == '2'] + server_span = [x for x in spans if x.kind.__str__() == '3'] + assert len(client_spans) == 2 + assert len(server_span) == 1 + for i in client_spans: + assert get_attr(i, "http.status_code") == 200 + @pytest.mark.parametrize("parent", [None, parent_ctx]) def test_variables(client, trace_service, parent): r = client.get("http://127.0.0.1:18080/vars", headers=trace_headers(parent)) - span = trace_service.get_span() + span = trace_service.get_span(1) if parent: assert span.trace_id.hex() == parent.trace_id @@ -220,7 +240,7 @@ def test_context(client, trace_service, parent, path): r = client.get(f"http://127.0.0.1:18080{path}", headers=headers) - span = trace_service.get_span() + span = trace_service.get_span(1) if path in ["/extract", "/propagate"] and parent: assert span.trace_id.hex() == parent.trace_id @@ -308,7 +328,7 @@ def test_custom_resource_attributes(client, trace_service): def test_exporter_headers(client, trace_service): assert client.get("http://127.0.0.1:18080/ok").status_code == 200 - assert trace_service.get_span().name == "/ok" + assert trace_service.get_span(1).name == "/ok" headers = dict(trace_service.last_metadata) assert headers["x-api-token"] == "api.value" @@ -328,4 +348,4 @@ def test_exporter_headers(client, trace_service): def test_tls_export(client, trace_service): assert client.get("http://127.0.0.1:18080/ok").status_code == 200 - assert trace_service.get_span().name == "/ok" + assert trace_service.get_span(1).name == "/ok" diff --git a/tests/trace_service.py b/tests/trace_service.py index 5ef2bc67..f16a3f94 100644 --- a/tests/trace_service.py +++ b/tests/trace_service.py @@ -19,16 +19,20 @@ def get_batch(self): for _ in range(10): if len(self.batches): break - time.sleep(0.001) + time.sleep(1) assert len(self.batches) == 1 assert len(self.batches[0]) == 1 return self.batches.pop()[0] - def get_span(self): + def get_span(self, n): batch = self.get_batch() assert len(batch.scope_spans) == 1 - assert len(batch.scope_spans[0].spans) == 1 - return batch.scope_spans[0].spans.pop() + l = len(batch.scope_spans[0].spans) + assert l == n + s = batch.scope_spans[0].spans[l - n:] + for _ in range(n): + batch.scope_spans[0].spans.pop() + return s[0] if len(s) == 1 else s @pytest.fixture(scope="module")