1
1
import functools
2
2
import inspect
3
+ from collections .abc import Awaitable , Callable , Sequence
3
4
from dataclasses import dataclass
4
- from datetime import datetime , timezone
5
- from pathlib import Path
6
- from typing import Awaitable , Callable , Concatenate , ParamSpec , TypeAlias , TypeVar
5
+ from typing import Concatenate , Literal , ParamSpec , TypeAlias , TypeVar
7
6
8
- import orjson
9
- import structlog as slog
10
7
from latch_config .config import DatadogConfig , LoggingMode , read_config
11
8
from opentelemetry import trace
12
9
from opentelemetry .exporter .otlp .proto .grpc .trace_exporter import OTLPSpanExporter
13
10
from opentelemetry .sdk .resources import Attributes , LabelValue , Resource
14
- from opentelemetry .sdk .trace import TracerProvider
15
- from opentelemetry .sdk .trace .export import BatchSpanProcessor
11
+ from opentelemetry .sdk .trace import ReadableSpan , TracerProvider
12
+ from opentelemetry .sdk .trace .export import (
13
+ BatchSpanProcessor ,
14
+ ConsoleSpanExporter ,
15
+ SimpleSpanProcessor ,
16
+ SpanExporter ,
17
+ SpanExportResult ,
18
+ )
16
19
from opentelemetry .trace import Tracer
17
- from opentelemetry .trace .span import INVALID_SPAN , INVALID_SPAN_CONTEXT , Span
18
- from structlog .types import EventDict , WrappedLogger
20
+ from opentelemetry .trace .span import Span
19
21
20
22
21
- @dataclass (frozen = True )
22
- class Config :
23
- datadog : DatadogConfig
24
- logging_mode : LoggingMode = LoggingMode .console_json
23
+ class NoopSpanExporter (SpanExporter ):
24
+ def export (self , spans : Sequence [ReadableSpan ]) -> SpanExportResult :
25
+ return SpanExportResult .SUCCESS
25
26
27
+ def shutdown (self ) -> None : ...
26
28
27
- config = read_config ( Config )
28
-
29
+ def force_flush ( self , timeout_millis : int = 30000 ) -> bool :
30
+ return True
29
31
30
- def add_timestamp (logger : WrappedLogger , name : str , x : EventDict ) -> EventDict :
31
- # the default slog.processors.TimeStamper is not great with timezones
32
- utc = datetime .now (timezone .utc )
33
- x ["timestamp" ] = utc .astimezone ().isoformat ()
34
- return x
35
32
33
+ def format_console_span (x : ReadableSpan ) -> str :
34
+ ctx = x .context
36
35
37
- def add_dd_otel ( logger : WrappedLogger , name : str , x : EventDict ) -> EventDict :
38
- s = trace . get_current_span ()
39
- if s == INVALID_SPAN :
40
- return x
36
+ parent_ctx = x . parent
37
+ parent_id = " root "
38
+ if parent_ctx is not None :
39
+ parent_id = f" { parent_ctx . span_id :x } "
41
40
42
- ctx = s .get_span_context ()
43
- if ctx == INVALID_SPAN_CONTEXT :
44
- return x
41
+ s_id = "?" * 16
42
+ if ctx is not None :
43
+ s_id = f"{ ctx .span_id :x} "
44
+ return f"<{ parent_id } > |- <{ s_id } > { x .name } \n "
45
45
46
- x ["trace_id" ] = format (ctx .trace_id , "032x" )
47
- x ["span_id" ] = format (ctx .span_id , "016x" )
48
46
49
- x ["dd.trace_id" ] = str (ctx .trace_id & 0xFFFF_FFFF_FFFF_FFFF )
50
- x ["dd.span_id" ] = str (ctx .span_id )
51
-
52
- x ["dd.service" ] = config .datadog .service_name
53
- x ["dd.version" ] = config .datadog .service_version
54
- x ["dd.env" ] = config .datadog .deployment_environment
47
+ @dataclass (frozen = True )
48
+ class Config :
49
+ datadog : DatadogConfig
50
+ logging_mode : LoggingMode = LoggingMode .console_json
55
51
56
- return x
57
52
53
+ config = read_config (Config )
58
54
59
55
app_tracer : Tracer
60
- # fixme(maximsmol): add support for .exception
61
- log : slog .stdlib .BoundLogger
62
56
63
57
64
- def setup ():
65
- if config .logging_mode == LoggingMode .file_json :
66
- log_file = Path (__file__ ).parent .parent / "o11y_debug" / "log.json"
67
- else :
68
- log_file = Path ("/dev/stdout" )
69
-
58
+ def setup (* , span_exporter : Literal ["otlp" , "console" , "noop" ] = "otlp" ) -> None :
70
59
service_data : Attributes = {
71
60
"service.name" : config .datadog .service_name ,
72
61
"service.version" : config .datadog .service_version ,
73
62
"deployment.environment" : config .datadog .deployment_environment ,
74
63
}
75
64
76
65
tracer_provider = TracerProvider (resource = Resource (service_data ))
77
- tracer_provider .add_span_processor (BatchSpanProcessor (OTLPSpanExporter ()))
66
+
67
+ if span_exporter == "otlp" :
68
+ tracer_provider .add_span_processor (BatchSpanProcessor (OTLPSpanExporter ()))
69
+ elif span_exporter == "console" :
70
+ tracer_provider .add_span_processor (
71
+ SimpleSpanProcessor (ConsoleSpanExporter (formatter = format_console_span ))
72
+ )
73
+ elif span_exporter == "noop" :
74
+ tracer_provider .add_span_processor (SimpleSpanProcessor (NoopSpanExporter ()))
75
+
78
76
trace .set_tracer_provider (tracer_provider )
79
77
80
78
global app_tracer
81
79
# todo(maximsmol): setup trace sampling based on datadog settings
82
80
# todo(maximsmol): port over stuff from https://github.com/open-telemetry/opentelemetry-python-contrib/blob/934af7ea4f9b1e0294ced6a014d6eefdda156b2b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/exporter.py
83
81
app_tracer = trace .get_tracer (__name__ )
84
82
85
- slog .configure_once (
86
- processors = [
87
- # todo(maximsmol): allow named loggers (needs a custom logging base)
88
- # slog.stdlib.add_logger_name,
89
- slog .stdlib .add_log_level ,
90
- add_timestamp ,
91
- add_dd_otel ,
92
- ]
93
- + (
94
- [slog .dev .ConsoleRenderer ()]
95
- if config .logging_mode == LoggingMode .console
96
- else [
97
- slog .processors .ExceptionRenderer (
98
- slog .tracebacks .ExceptionDictTransformer ()
99
- ),
100
- slog .processors .JSONRenderer (orjson .dumps ),
101
- ]
102
- ),
103
- logger_factory = slog .BytesLoggerFactory (file = log_file .open ("wb" ))
104
- if config .logging_mode != LoggingMode .console
105
- else None ,
106
- wrapper_class = slog .stdlib .AsyncBoundLogger ,
107
- cache_logger_on_first_use = True ,
108
- )
109
-
110
- global log
111
- log = slog .stdlib .get_logger ()
112
-
113
83
114
84
T = TypeVar ("T" )
115
85
P = ParamSpec ("P" )
@@ -121,7 +91,7 @@ def _trace_function_with_span_async(
121
91
[Callable [Concatenate [Span , P ], Awaitable [T ]]], Callable [P , Awaitable [T ]]
122
92
]:
123
93
def decorator (
124
- f : Callable [Concatenate [Span , P ], Awaitable [T ]]
94
+ f : Callable [Concatenate [Span , P ], Awaitable [T ]],
125
95
) -> Callable [P , Awaitable [T ]]:
126
96
@functools .wraps (f )
127
97
async def inner (* args : P .args , ** kwargs : P .kwargs ) -> T :
@@ -163,7 +133,7 @@ def inner(*args: P.args, **kwargs: P.kwargs) -> T:
163
133
164
134
165
135
def trace_app_function_with_span (
166
- f : Callable [Concatenate [Span , P ], T ]
136
+ f : Callable [Concatenate [Span , P ], T ],
167
137
) -> Callable [P , T ]:
168
138
return trace_function_with_span (app_tracer )(f )
169
139
@@ -201,13 +171,13 @@ def trace_app_function(f: Callable[P, T]) -> Callable[P, T]:
201
171
return trace_function (app_tracer )(f )
202
172
203
173
204
- AttributesDict : TypeAlias = dict [str , LabelValue | " AttributesDict" ]
174
+ AttributesDict : TypeAlias = " dict[str, LabelValue | AttributesDict]"
205
175
206
176
207
177
def dict_to_attrs (x : AttributesDict , prefix : str ) -> Attributes :
208
178
res : Attributes = {}
209
179
210
- def inner (x : LabelValue | AttributesDict , prefix : str ):
180
+ def inner (x : LabelValue | AttributesDict , prefix : str ) -> None :
211
181
if isinstance (x , list ):
212
182
for i , y in enumerate (x ):
213
183
inner (y , f"{ prefix } .{ i } " )
0 commit comments