Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""OTLP Exporter"""
"""OTLP Exporter

This module provides a mixin class for OTLP exporters that send telemetry data
to an OTLP-compatible receiver via gRPC. It includes a configurable reconnection
logic to handle transient collector outages.


"""

import random
import threading
Expand Down Expand Up @@ -251,20 +258,27 @@ def _get_credentials(
if certificate_file:
client_key_file = environ.get(client_key_file_env_key)
client_certificate_file = environ.get(client_certificate_file_env_key)
return _load_credentials(
credentials = _load_credentials(
certificate_file, client_key_file, client_certificate_file
)
if credentials is not None:
return credentials
return ssl_channel_credentials()


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT, ExportStubT]
):
"""OTLP span exporter
"""OTLP gRPC exporter mixin.

This class provides the base functionality for OTLP exporters that send
telemetry data (spans or metrics) to an OTLP-compatible receiver via gRPC.
It includes a configurable reconnection mechanism to handle transient
receiver outages.

Args:
endpoint: OpenTelemetry Collector receiver endpoint
endpoint: OTLP-compatible receiver endpoint
insecure: Connection type
credentials: ChannelCredentials object for server authentication
headers: Headers to send when exporting
Expand Down Expand Up @@ -308,6 +322,8 @@ def __init__(
if parsed_url.netloc:
self._endpoint = parsed_url.netloc

self._insecure = insecure
self._credentials = credentials
self._headers = headers or environ.get(OTEL_EXPORTER_OTLP_HEADERS)
if isinstance(self._headers, str):
temp_headers = parse_env_headers(self._headers, liberal=True)
Expand Down Expand Up @@ -336,37 +352,51 @@ def __init__(
)
self._collector_kwargs = None

compression = (
self._compression = (
environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION)
if compression is None
else compression
) or Compression.NoCompression

if insecure:
self._channel = insecure_channel(
self._endpoint,
compression=compression,
options=self._channel_options,
)
else:
self._channel = None
self._client = None

self._shutdown_in_progress = threading.Event()
self._shutdown = False

if not self._insecure:
self._credentials = _get_credentials(
credentials,
self._credentials,
_OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER,
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_CLIENT_KEY,
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
)

self._initialize_channel_and_stub()

def _initialize_channel_and_stub(self):
"""
Create a new gRPC channel and stub.

This method is used during initialization and by the reconnection
mechanism to reinitialize the channel on transient errors.
"""
if self._insecure:
self._channel = insecure_channel(
self._endpoint,
compression=self._compression,
options=self._channel_options,
)
else:
self._channel = secure_channel(
self._endpoint,
self._credentials,
compression=compression,
compression=self._compression,
options=self._channel_options,
)
self._client = self._stub(self._channel) # type: ignore [reportCallIssue]

self._shutdown_in_progress = threading.Event()
self._shutdown = False

@abstractmethod
def _translate_data(
self,
Expand Down Expand Up @@ -407,6 +437,25 @@ def _export(
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

# For UNAVAILABLE errors, reinitialize the channel to force reconnection
if error.code() == StatusCode.UNAVAILABLE and retry_num == 0: # type: ignore
logger.debug(
"Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error",
self._exporting,
)
try:
self._channel.close()
except Exception as e:
logger.debug(
"Error closing channel for %s exporter to %s: %s",
self._exporting,
self._endpoint,
str(e),
)
# Enable channel reconnection for subsequent calls
self._initialize_channel_and_stub()

if (
error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
or retry_num + 1 == _MAX_RETRYS
Expand Down Expand Up @@ -436,6 +485,12 @@ def _export(
return self._result.FAILURE # type: ignore [reportReturnType]

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
"""
Shut down the exporter.

Args:
timeout_millis: Timeout in milliseconds for shutting down the exporter.
"""
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from unittest import TestCase
from unittest.mock import Mock, patch

import grpc
from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module
Duration,
)
Expand Down Expand Up @@ -89,8 +90,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
def _exporting(self):
return "traces"

def shutdown(self, timeout_millis=30_000):
return OTLPExporterMixin.shutdown(self, timeout_millis)
def shutdown(self, timeout_millis: float = 30_000, **kwargs):
return OTLPExporterMixin.shutdown(self, timeout_millis, **kwargs)


class TraceServiceServicerWithExportParams(TraceServiceServicer):
Expand Down Expand Up @@ -511,6 +512,16 @@ def test_timeout_set_correctly(self):
self.assertEqual(mock_trace_service.num_requests, 2)
self.assertAlmostEqual(after - before, 1.4, 1)

def test_channel_options_set_correctly(self):
"""Test that gRPC channel options are set correctly for keepalive and reconnection"""
# This test verifies that the channel is created with the right options
# We patch grpc.insecure_channel to ensure it is called without errors
with patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel"
) as mock_channel:
OTLPSpanExporterForTesting(insecure=True)
self.assertTrue(mock_channel.called)

def test_otlp_headers_from_env(self):
# pylint: disable=protected-access
# This ensures that there is no other header than standard user-agent.
Expand All @@ -534,3 +545,27 @@ def test_permanent_failure(self):
warning.records[-1].message,
"Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS",
)

def test_unavailable_reconnects(self):
"""Test that the exporter reconnects on UNAVAILABLE error"""
add_TraceServiceServicer_to_server(
TraceServiceServicerWithExportParams(StatusCode.UNAVAILABLE),
self.server,
)

# Spy on grpc.insecure_channel to verify it's called for reconnection
with patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel",
side_effect=grpc.insecure_channel,
) as mock_insecure_channel:
# Mock sleep to avoid waiting
with patch("time.sleep"):
# We expect FAILURE because the server keeps returning UNAVAILABLE
# but we want to verify reconnection attempts happened
self.exporter.export([self.span])

# Verify that we attempted to reinitialize the channel (called insecure_channel)
# Since the initial channel was created in setUp (unpatched), this call
# must be from the reconnection logic.
self.assertTrue(mock_insecure_channel.called)
# Verify that reconnection enabled flag is set