1- import type { IntegrationFn , Span , SpanV2JSON } from '@sentry/core' ;
1+ import type { Client , IntegrationFn , Span , SpanV2JSON } from '@sentry/core' ;
22import {
33 createSpanV2Envelope ,
44 debug ,
55 defineIntegration ,
66 getDynamicSamplingContextFromSpan ,
7+ getRootSpan as getSegmentSpan ,
78 isV2BeforeSendSpanCallback ,
9+ reparentChildSpans ,
10+ shouldIgnoreSpan ,
11+ showSpanDropWarning ,
812 spanToV2JSON ,
913} from '@sentry/core' ;
1014import { DEBUG_BUILD } from '../debug-build' ;
@@ -13,7 +17,7 @@ export interface SpanStreamingOptions {
1317 batchLimit : number ;
1418}
1519
16- const _spanStreamingIntegration = ( ( userOptions ?: Partial < SpanStreamingOptions > ) => {
20+ export const spanStreamingIntegration = defineIntegration ( ( ( userOptions ?: Partial < SpanStreamingOptions > ) => {
1721 const validatedUserProvidedBatchLimit =
1822 userOptions ?. batchLimit && userOptions . batchLimit <= 1000 && userOptions . batchLimit >= 1
1923 ? userOptions . batchLimit
@@ -31,7 +35,8 @@ const _spanStreamingIntegration = ((userOptions?: Partial<SpanStreamingOptions>)
3135 ...userOptions ,
3236 } ;
3337
34- const traceMap = new Map < string , Set < Span > > ( ) ;
38+ // key: traceId-segmentSpanId
39+ const spanTreeMap = new Map < string , Set < Span > > ( ) ;
3540
3641 return {
3742 name : 'SpanStreaming' ,
@@ -54,57 +59,118 @@ const _spanStreamingIntegration = ((userOptions?: Partial<SpanStreamingOptions>)
5459 }
5560
5661 client . on ( 'spanEnd' , span => {
57- const spanBuffer = traceMap . get ( span . spanContext ( ) . traceId ) ;
62+ const spanTreeMapKey = getSpanTreeMapKey ( span ) ;
63+ const spanBuffer = spanTreeMap . get ( spanTreeMapKey ) ;
5864 if ( spanBuffer ) {
5965 spanBuffer . add ( span ) ;
6066 } else {
61- traceMap . set ( span . spanContext ( ) . traceId , new Set ( [ span ] ) ) ;
67+ spanTreeMap . set ( spanTreeMapKey , new Set ( [ span ] ) ) ;
6268 }
6369 } ) ;
6470
6571 // For now, we send all spans on local segment (root) span end.
6672 // TODO: This will change once we have more concrete ideas about a universal SDK data buffer.
67- client . on ( 'segmentSpanEnd' , segmentSpan => {
68- const traceId = segmentSpan . spanContext ( ) . traceId ;
69- const spansOfTrace = traceMap . get ( traceId ) ;
73+ client . on (
74+ 'segmentSpanEnd' ,
75+ segmentSpan => ( ) =>
76+ processAndSendSpans ( segmentSpan , {
77+ spanTreeMap : spanTreeMap ,
78+ client,
79+ batchLimit : options . batchLimit ,
80+ beforeSendSpan,
81+ } ) ,
82+ ) ;
83+ } ,
84+ } ;
85+ } ) satisfies IntegrationFn ) ;
7086
71- if ( ! spansOfTrace ?. size ) {
72- traceMap . delete ( traceId ) ;
73- return ;
74- }
87+ interface SpanProcessingOptions {
88+ client : Client ;
89+ spanTreeMap : Map < string , Set < Span > > ;
90+ batchLimit : number ;
91+ beforeSendSpan : ( ( span : SpanV2JSON ) => SpanV2JSON ) | undefined ;
92+ }
7593
76- const serializedSpans = Array . from ( spansOfTrace ?? [ ] ) . map ( span => {
77- const serializedSpan = spanToV2JSON ( span ) ;
78- return beforeSendSpan ? beforeSendSpan ( serializedSpan ) : serializedSpan ;
79- } ) ;
94+ function getSpanTreeMapKey ( span : Span ) : string {
95+ return `${ span . spanContext ( ) . traceId } -${ getSegmentSpan ( span ) . spanContext ( ) . spanId } ` ;
96+ }
8097
81- const batches : SpanV2JSON [ ] [ ] = [ ] ;
82- for ( let i = 0 ; i < serializedSpans . length ; i += options . batchLimit ) {
83- batches . push ( serializedSpans . slice ( i , i + options . batchLimit ) ) ;
84- }
98+ function processAndSendSpans (
99+ segmentSpan : Span ,
100+ { client, spanTreeMap, batchLimit, beforeSendSpan } : SpanProcessingOptions ,
101+ ) : void {
102+ const traceId = segmentSpan . spanContext ( ) . traceId ;
103+ const spanTreeMapKey = getSpanTreeMapKey ( segmentSpan ) ;
104+ const spansOfTrace = spanTreeMap . get ( spanTreeMapKey ) ;
105+
106+ if ( ! spansOfTrace ?. size ) {
107+ spanTreeMap . delete ( spanTreeMapKey ) ;
108+ return ;
109+ }
85110
86- DEBUG_BUILD &&
87- debug . log ( `Sending trace ${ traceId } in ${ batches . length } batche${ batches . length === 1 ? '' : 's' } ` ) ;
111+ const { ignoreSpans } = client . getOptions ( ) ;
88112
89- // TODO: Apply scopes to spans
90- // TODO: Apply ignoreSpans to spans
113+ // TODO: Apply scopes to spans
91114
92- const dsc = getDynamicSamplingContextFromSpan ( segmentSpan ) ;
115+ // 1. Check if the entire span tree is ignored by ignoreSpans
116+ const segmentSpanJson = spanToV2JSON ( segmentSpan ) ;
117+ if ( ignoreSpans ?. length && shouldIgnoreSpan ( segmentSpanJson , ignoreSpans ) ) {
118+ client . recordDroppedEvent ( 'before_send' , 'span' , spansOfTrace . size ) ;
119+ spanTreeMap . delete ( spanTreeMapKey ) ;
120+ return ;
121+ }
93122
94- for ( const batch of batches ) {
95- const envelope = createSpanV2Envelope ( batch , dsc , client ) ;
96- // no need to handle client reports for network errors,
97- // buffer overflows or rate limiting here. All of this is handled
98- // by client and transport.
99- client . sendEnvelope ( envelope ) . then ( null , reason => {
100- DEBUG_BUILD && debug . error ( 'Error while sending span stream envelope:' , reason ) ;
101- } ) ;
102- }
123+ const serializedSpans = Array . from ( spansOfTrace ?? [ ] ) . map ( spanToV2JSON ) ;
103124
104- traceMap . delete ( traceId ) ;
105- } ) ;
106- } ,
107- } ;
108- } ) satisfies IntegrationFn ;
125+ const processedSpans = [ ] ;
126+ let ignoredSpanCount = 0 ;
127+
128+ for ( const span of serializedSpans ) {
129+ // 2. Check if child spans should be ignored
130+ const isChildSpan = span . span_id !== segmentSpan . spanContext ( ) . spanId ;
131+ if ( ignoreSpans ?. length && isChildSpan && shouldIgnoreSpan ( span , ignoreSpans ) ) {
132+ reparentChildSpans ( serializedSpans , span ) ;
133+ ignoredSpanCount ++ ;
134+ // drop this span by not adding it to the processedSpans array
135+ continue ;
136+ }
109137
110- export const spanStreamingIntegration = defineIntegration ( _spanStreamingIntegration ) ;
138+ // 3. Apply beforeSendSpan callback
139+ const processedSpan = beforeSendSpan ? applyBeforeSendSpanCallback ( span , beforeSendSpan ) : span ;
140+ processedSpans . push ( processedSpan ) ;
141+ }
142+
143+ if ( ignoredSpanCount ) {
144+ client . recordDroppedEvent ( 'before_send' , 'span' , ignoredSpanCount ) ;
145+ }
146+
147+ const batches : SpanV2JSON [ ] [ ] = [ ] ;
148+ for ( let i = 0 ; i < processedSpans . length ; i += batchLimit ) {
149+ batches . push ( processedSpans . slice ( i , i + batchLimit ) ) ;
150+ }
151+
152+ DEBUG_BUILD && debug . log ( `Sending trace ${ traceId } in ${ batches . length } batche${ batches . length === 1 ? '' : 's' } ` ) ;
153+
154+ const dsc = getDynamicSamplingContextFromSpan ( segmentSpan ) ;
155+
156+ for ( const batch of batches ) {
157+ const envelope = createSpanV2Envelope ( batch , dsc , client ) ;
158+ // no need to handle client reports for network errors,
159+ // buffer overflows or rate limiting here. All of this is handled
160+ // by client and transport.
161+ client . sendEnvelope ( envelope ) . then ( null , reason => {
162+ DEBUG_BUILD && debug . error ( 'Error while sending span stream envelope:' , reason ) ;
163+ } ) ;
164+ }
165+
166+ spanTreeMap . delete ( spanTreeMapKey ) ;
167+ }
168+
169+ function applyBeforeSendSpanCallback ( span : SpanV2JSON , beforeSendSpan : ( span : SpanV2JSON ) => SpanV2JSON ) : SpanV2JSON {
170+ const modifedSpan = beforeSendSpan ( span ) ;
171+ if ( ! modifedSpan ) {
172+ showSpanDropWarning ( ) ;
173+ return span ;
174+ }
175+ return modifedSpan ;
176+ }
0 commit comments