Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jobs:
- integration-s3-v0
- integration-opensearch-v0
- integration-kafka-v0
- integration-kafka-connect-v0
juju-version:
- juju-bootstrap-option: "2.9.51"
juju-snap-channel: "2.9/stable"
Expand Down Expand Up @@ -103,6 +104,8 @@ jobs:
ubuntu-versions: {series: focal}
- tox-environments: integration-kafka-v0
ubuntu-versions: {series: focal}
- tox-environments: integration-kafka-connect-v0
ubuntu-versions: {series: focal}
name: ${{ matrix.tox-environments }} Juju ${{ matrix.juju-version.juju-snap-channel}} -- ${{ matrix.ubuntu-versions.series }}
needs:
- lint
Expand Down
210 changes: 209 additions & 1 deletion lib/charms/data_platform_libs/v0/data_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ def _on_subject_requested(self, event: SubjectRequestedEvent):
from typing import (
Callable,
Dict,
Final,
ItemsView,
KeysView,
List,
Expand Down Expand Up @@ -447,7 +448,7 @@ def _on_subject_requested(self, event: SubjectRequestedEvent):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 52
LIBPATCH = 53

PYDEPS = ["ops>=2.0.0"]

Expand Down Expand Up @@ -4450,6 +4451,213 @@ def __init__(
KarapaceRequirerEventHandlers.__init__(self, charm, self)


# Kafka Connect Events


class KafkaConnectProvidesEvent(RelationEvent):
"""Base class for Kafka Connect Provider events."""

@property
def plugin_url(self) -> Optional[str]:
"""Returns the REST endpoint URL which serves the connector plugin."""
if not self.relation.app:
return None

return self.relation.data[self.relation.app].get("plugin-url")


class IntegrationRequestedEvent(KafkaConnectProvidesEvent):
"""Event emitted when a new integrator boots up and is ready to serve the connector plugin."""


class KafkaConnectProvidesEvents(CharmEvents):
"""Kafka Connect Provider Events."""

integration_requested = EventSource(IntegrationRequestedEvent)


class KafkaConnectRequiresEvent(AuthenticationEvent):
"""Base class for Kafka Connect Requirer events."""

@property
def plugin_url(self) -> Optional[str]:
"""Returns the REST endpoint URL which serves the connector plugin."""
if not self.relation.app:
return None

return self.relation.data[self.relation.app].get("plugin-url")


class IntegrationCreatedEvent(KafkaConnectRequiresEvent):
"""Event emitted when the credentials are created for this integrator."""


class IntegrationEndpointsChangedEvent(KafkaConnectRequiresEvent):
"""Event emitted when Kafka Connect REST endpoints change."""


class KafkaConnectRequiresEvents(CharmEvents):
"""Kafka Connect Requirer Events."""

integration_created = EventSource(IntegrationCreatedEvent)
integration_endpoints_changed = EventSource(IntegrationEndpointsChangedEvent)


class KafkaConnectProviderData(ProviderData):
"""Provider-side of the Kafka Connect relation."""

RESOURCE_FIELD = "plugin-url"

def __init__(self, model: Model, relation_name: str) -> None:
super().__init__(model, relation_name)

def set_endpoints(self, relation_id: int, endpoints: str) -> None:
"""Sets REST endpoints of the Kafka Connect service."""
self.update_relation_data(relation_id, {"endpoints": endpoints})


class KafkaConnectProviderEventHandlers(EventHandlers):
"""Provider-side implementation of the Kafka Connect event handlers."""

on = KafkaConnectProvidesEvents() # pyright: ignore [reportAssignmentType]

def __init__(self, charm: CharmBase, relation_data: KafkaConnectProviderData) -> None:
super().__init__(charm, relation_data)
self.relation_data = relation_data

def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
"""Event emitted when the relation has changed."""
# Leader only
if not self.relation_data.local_unit.is_leader():
return

# Check which data has changed to emit customs events.
diff = self._diff(event)

if "plugin-url" in diff.added:
getattr(self.on, "integration_requested").emit(
event.relation, app=event.app, unit=event.unit
)

def _on_secret_changed_event(self, event: SecretChangedEvent):
"""Event notifying about a new value of a secret."""
pass


class KafkaConnectProvides(KafkaConnectProviderData, KafkaConnectProviderEventHandlers):
"""Provider-side implementation of the Kafka Connect relation."""

def __init__(self, charm: CharmBase, relation_name: str) -> None:
KafkaConnectProviderData.__init__(self, charm.model, relation_name)
KafkaConnectProviderEventHandlers.__init__(self, charm, self)


# Sentinel value passed from Kafka Connect requirer side when it does not need to serve any plugins.
PLUGIN_URL_NOT_REQUIRED: Final[str] = "NOT-REQUIRED"


class KafkaConnectRequirerData(RequirerData):
"""Requirer-side of the Kafka Connect relation."""

def __init__(
self,
model: Model,
relation_name: str,
plugin_url: str,
extra_user_roles: Optional[str] = None,
additional_secret_fields: Optional[List[str]] = [],
):
"""Manager of Kafka client relations."""
super().__init__(
model,
relation_name,
extra_user_roles=extra_user_roles,
additional_secret_fields=additional_secret_fields,
)
self.plugin_url = plugin_url

@property
def plugin_url(self):
"""The REST endpoint URL which serves the connector plugin."""
return self._plugin_url

@plugin_url.setter
def plugin_url(self, value):
self._plugin_url = value


class KafkaConnectRequirerEventHandlers(RequirerEventHandlers):
"""Requirer-side of the Kafka Connect relation."""

on = KafkaConnectRequiresEvents() # pyright: ignore [reportAssignmentType]

def __init__(self, charm: CharmBase, relation_data: KafkaConnectRequirerData) -> None:
super().__init__(charm, relation_data)
self.relation_data = relation_data

def _on_relation_created_event(self, event: RelationCreatedEvent) -> None:
"""Event emitted when the Kafka Connect relation is created."""
super()._on_relation_created_event(event)

if not self.relation_data.local_unit.is_leader():
return

relation_data = {"plugin-url": self.relation_data.plugin_url}
self.relation_data.update_relation_data(event.relation.id, relation_data)

def _on_secret_changed_event(self, event: SecretChangedEvent):
"""Event notifying about a new value of a secret."""
pass

def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
"""Event emitted when the Kafka Connect relation has changed."""
# Check which data has changed to emit customs events.
diff = self._diff(event)

# Register all new secrets with their labels
if any(newval for newval in diff.added if self.relation_data._is_secret_field(newval)):
self.relation_data._register_secrets_to_relation(event.relation, diff.added)

if self._main_credentials_shared(diff):
logger.info("integration created at %s", datetime.now())
getattr(self.on, "integration_created").emit(
event.relation, app=event.app, unit=event.unit
)
return

# Emit an endpoints changed event if the provider added or
# changed this info in the relation databag.
if "endpoints" in diff.added or "endpoints" in diff.changed:
# Emit the default event (the one without an alias).
logger.info("endpoints changed on %s", datetime.now())
getattr(self.on, "integration_endpoints_changed").emit(
event.relation, app=event.app, unit=event.unit
)
return


class KafkaConnectRequires(KafkaConnectRequirerData, KafkaConnectRequirerEventHandlers):
"""Requirer-side implementation of the Kafka Connect relation."""

def __init__(
self,
charm: CharmBase,
relation_name: str,
plugin_url: str,
extra_user_roles: Optional[str] = None,
additional_secret_fields: Optional[List[str]] = [],
) -> None:
KafkaConnectRequirerData.__init__(
self,
charm.model,
relation_name,
plugin_url,
extra_user_roles=extra_user_roles,
additional_secret_fields=additional_secret_fields,
)
KafkaConnectRequirerEventHandlers.__init__(self, charm, self)


# Opensearch related events


Expand Down
4 changes: 4 additions & 0 deletions tests/v0/integration/application-charm/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ requires:
interface: opensearch_client
opensearch-client-roles:
interface: opensearch_client
connect-source:
interface: connect_client
connect-sink:
interface: connect_client
37 changes: 37 additions & 0 deletions tests/v0/integration/application-charm/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,22 @@
TopicEntityCreatedEvent,
)

if DATA_INTERFACES_VERSION > 52:
from charms.data_platform_libs.v0.data_interfaces import (
IntegrationCreatedEvent,
IntegrationEndpointsChangedEvent,
KafkaConnectRequires,
)


logger = logging.getLogger(__name__)

# Extra roles that this application needs when interacting with the database.
EXTRA_USER_ROLES = "SUPERUSER"
EXTRA_USER_ROLES_KAFKA = "producer,consumer"
EXTRA_USER_ROLES_OPENSEARCH = "admin,default"
CONSUMER_GROUP_PREFIX = "test-prefix"
BAD_URL = "http://badurl"


class ApplicationCharm(CharmBase):
Expand Down Expand Up @@ -216,6 +225,24 @@ def __init__(self, *args):
)
self.framework.observe(self.kafka.on.topic_created, self._on_kafka_topic_created)

# Kafka Connect events

if DATA_INTERFACES_VERSION > 52:
self.connect_source = KafkaConnectRequires(
self, "connect-source", "http://10.10.10.10:8080"
)

self.connect_sink = KafkaConnectRequires(self, "connect-sink", BAD_URL)

self.framework.observe(
self.connect_source.on.integration_created, self._on_connect_integration_created
)

self.framework.observe(
self.connect_source.on.integration_endpoints_changed,
self._on_connect_endpoints_changed,
)

# OpenSearch events

self.opensearch = OpenSearchRequires(
Expand Down Expand Up @@ -412,6 +439,16 @@ def _on_kafka_entity_created(self, _: TopicEntityCreatedEvent) -> None:
logger.info("On kafka entity created")
self.unit.status = ActiveStatus("kafka_entity_created")

if DATA_INTERFACES_VERSION > 52:

def _on_connect_integration_created(self, _: IntegrationCreatedEvent):
"""Event triggered when Kafka Connect integration credentials are created for this application."""
self.unit.status = ActiveStatus("connect_integration_created")

def _on_connect_endpoints_changed(self, _: IntegrationEndpointsChangedEvent):
"""Event triggered when Kafka Connect REST endpoints change."""
self.unit.status = ActiveStatus("connect_endpoints_changed")

def _on_opensearch_index_created(self, _: IndexCreatedEvent):
"""Event triggered when an index was created for this application."""
logger.info("On opensearch index created event fired")
Expand Down
10 changes: 10 additions & 0 deletions tests/v0/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def copy_data_interfaces_library_into_charm(ops_test: OpsTest):
shutil.copyfile(library_path, install_path)
install_path = "tests/v0/integration/opensearch-charm/" + library_path
shutil.copyfile(library_path, install_path)
install_path = "tests/v0/integration/kafka-connect-charm/" + library_path
shutil.copyfile(library_path, install_path)


@pytest.fixture(scope="module", autouse=True)
Expand Down Expand Up @@ -118,6 +120,14 @@ async def kafka_charm(ops_test: OpsTest):
return charm


@pytest.fixture(scope="module")
async def kafka_connect_charm(ops_test: OpsTest):
"""Build the Kafka Connect dummy charm."""
charm_path = "tests/v0/integration/kafka-connect-charm"
charm = await ops_test.build_charm(charm_path)
return charm


@pytest.fixture(scope="module")
async def opensearch_charm(ops_test: OpsTest):
"""Build the OpenSearch charm.
Expand Down
Loading
Loading