diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4296abb0..e4c5ffec 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -75,6 +75,7 @@ jobs: - integration-s3-v0 - integration-opensearch-v0 - integration-kafka-v0 + - integration-kafka-connect-v0 juju-version: - juju-bootstrap-option: "2.9.51" juju-snap-channel: "2.9/stable" @@ -103,6 +104,8 @@ jobs: ubuntu-versions: {series: focal} - tox-environments: integration-kafka-v0 ubuntu-versions: {series: focal} + - tox-environments: integration-kafka-connect-v0 + ubuntu-versions: {series: focal} name: ${{ matrix.tox-environments }} Juju ${{ matrix.juju-version.juju-snap-channel}} -- ${{ matrix.ubuntu-versions.series }} needs: - lint diff --git a/lib/charms/data_platform_libs/v0/data_interfaces.py b/lib/charms/data_platform_libs/v0/data_interfaces.py index 5a2b4429..d242377e 100644 --- a/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -417,6 +417,7 @@ def _on_subject_requested(self, event: SubjectRequestedEvent): from typing import ( Callable, Dict, + Final, ItemsView, KeysView, List, @@ -447,7 +448,7 @@ def _on_subject_requested(self, event: SubjectRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 52 +LIBPATCH = 53 PYDEPS = ["ops>=2.0.0"] @@ -4450,6 +4451,213 @@ def __init__( KarapaceRequirerEventHandlers.__init__(self, charm, self) +# Kafka Connect Events + + +class KafkaConnectProvidesEvent(RelationEvent): + """Base class for Kafka Connect Provider events.""" + + @property + def plugin_url(self) -> Optional[str]: + """Returns the REST endpoint URL which serves the connector plugin.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("plugin-url") + + +class IntegrationRequestedEvent(KafkaConnectProvidesEvent): + """Event emitted when a new integrator boots up and is ready to serve the connector plugin.""" + + +class KafkaConnectProvidesEvents(CharmEvents): + """Kafka Connect Provider Events.""" + + integration_requested = EventSource(IntegrationRequestedEvent) + + +class KafkaConnectRequiresEvent(AuthenticationEvent): + """Base class for Kafka Connect Requirer events.""" + + @property + def plugin_url(self) -> Optional[str]: + """Returns the REST endpoint URL which serves the connector plugin.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("plugin-url") + + +class IntegrationCreatedEvent(KafkaConnectRequiresEvent): + """Event emitted when the credentials are created for this integrator.""" + + +class IntegrationEndpointsChangedEvent(KafkaConnectRequiresEvent): + """Event emitted when Kafka Connect REST endpoints change.""" + + +class KafkaConnectRequiresEvents(CharmEvents): + """Kafka Connect Requirer Events.""" + + integration_created = EventSource(IntegrationCreatedEvent) + integration_endpoints_changed = EventSource(IntegrationEndpointsChangedEvent) + + +class KafkaConnectProviderData(ProviderData): + """Provider-side of the Kafka Connect relation.""" + + RESOURCE_FIELD = "plugin-url" + + def __init__(self, model: Model, relation_name: str) -> None: + super().__init__(model, relation_name) + + def set_endpoints(self, relation_id: int, endpoints: str) -> None: + """Sets REST endpoints of the Kafka Connect service.""" + self.update_relation_data(relation_id, {"endpoints": endpoints}) + + +class KafkaConnectProviderEventHandlers(EventHandlers): + """Provider-side implementation of the Kafka Connect event handlers.""" + + on = KafkaConnectProvidesEvents() # pyright: ignore [reportAssignmentType] + + def __init__(self, charm: CharmBase, relation_data: KafkaConnectProviderData) -> None: + super().__init__(charm, relation_data) + self.relation_data = relation_data + + def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: + """Event emitted when the relation has changed.""" + # Leader only + if not self.relation_data.local_unit.is_leader(): + return + + # Check which data has changed to emit customs events. + diff = self._diff(event) + + if "plugin-url" in diff.added: + getattr(self.on, "integration_requested").emit( + event.relation, app=event.app, unit=event.unit + ) + + def _on_secret_changed_event(self, event: SecretChangedEvent): + """Event notifying about a new value of a secret.""" + pass + + +class KafkaConnectProvides(KafkaConnectProviderData, KafkaConnectProviderEventHandlers): + """Provider-side implementation of the Kafka Connect relation.""" + + def __init__(self, charm: CharmBase, relation_name: str) -> None: + KafkaConnectProviderData.__init__(self, charm.model, relation_name) + KafkaConnectProviderEventHandlers.__init__(self, charm, self) + + +# Sentinel value passed from Kafka Connect requirer side when it does not need to serve any plugins. +PLUGIN_URL_NOT_REQUIRED: Final[str] = "NOT-REQUIRED" + + +class KafkaConnectRequirerData(RequirerData): + """Requirer-side of the Kafka Connect relation.""" + + def __init__( + self, + model: Model, + relation_name: str, + plugin_url: str, + extra_user_roles: Optional[str] = None, + additional_secret_fields: Optional[List[str]] = [], + ): + """Manager of Kafka client relations.""" + super().__init__( + model, + relation_name, + extra_user_roles=extra_user_roles, + additional_secret_fields=additional_secret_fields, + ) + self.plugin_url = plugin_url + + @property + def plugin_url(self): + """The REST endpoint URL which serves the connector plugin.""" + return self._plugin_url + + @plugin_url.setter + def plugin_url(self, value): + self._plugin_url = value + + +class KafkaConnectRequirerEventHandlers(RequirerEventHandlers): + """Requirer-side of the Kafka Connect relation.""" + + on = KafkaConnectRequiresEvents() # pyright: ignore [reportAssignmentType] + + def __init__(self, charm: CharmBase, relation_data: KafkaConnectRequirerData) -> None: + super().__init__(charm, relation_data) + self.relation_data = relation_data + + def _on_relation_created_event(self, event: RelationCreatedEvent) -> None: + """Event emitted when the Kafka Connect relation is created.""" + super()._on_relation_created_event(event) + + if not self.relation_data.local_unit.is_leader(): + return + + relation_data = {"plugin-url": self.relation_data.plugin_url} + self.relation_data.update_relation_data(event.relation.id, relation_data) + + def _on_secret_changed_event(self, event: SecretChangedEvent): + """Event notifying about a new value of a secret.""" + pass + + def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: + """Event emitted when the Kafka Connect relation has changed.""" + # Check which data has changed to emit customs events. + diff = self._diff(event) + + # Register all new secrets with their labels + if any(newval for newval in diff.added if self.relation_data._is_secret_field(newval)): + self.relation_data._register_secrets_to_relation(event.relation, diff.added) + + if self._main_credentials_shared(diff): + logger.info("integration created at %s", datetime.now()) + getattr(self.on, "integration_created").emit( + event.relation, app=event.app, unit=event.unit + ) + return + + # Emit an endpoints changed event if the provider added or + # changed this info in the relation databag. + if "endpoints" in diff.added or "endpoints" in diff.changed: + # Emit the default event (the one without an alias). + logger.info("endpoints changed on %s", datetime.now()) + getattr(self.on, "integration_endpoints_changed").emit( + event.relation, app=event.app, unit=event.unit + ) + return + + +class KafkaConnectRequires(KafkaConnectRequirerData, KafkaConnectRequirerEventHandlers): + """Requirer-side implementation of the Kafka Connect relation.""" + + def __init__( + self, + charm: CharmBase, + relation_name: str, + plugin_url: str, + extra_user_roles: Optional[str] = None, + additional_secret_fields: Optional[List[str]] = [], + ) -> None: + KafkaConnectRequirerData.__init__( + self, + charm.model, + relation_name, + plugin_url, + extra_user_roles=extra_user_roles, + additional_secret_fields=additional_secret_fields, + ) + KafkaConnectRequirerEventHandlers.__init__(self, charm, self) + + # Opensearch related events diff --git a/tests/v0/integration/application-charm/metadata.yaml b/tests/v0/integration/application-charm/metadata.yaml index 9a497470..983b8301 100644 --- a/tests/v0/integration/application-charm/metadata.yaml +++ b/tests/v0/integration/application-charm/metadata.yaml @@ -29,3 +29,7 @@ requires: interface: opensearch_client opensearch-client-roles: interface: opensearch_client + connect-source: + interface: connect_client + connect-sink: + interface: connect_client diff --git a/tests/v0/integration/application-charm/src/charm.py b/tests/v0/integration/application-charm/src/charm.py index a34688ab..d5c6a55c 100755 --- a/tests/v0/integration/application-charm/src/charm.py +++ b/tests/v0/integration/application-charm/src/charm.py @@ -45,6 +45,14 @@ TopicEntityCreatedEvent, ) +if DATA_INTERFACES_VERSION > 52: + from charms.data_platform_libs.v0.data_interfaces import ( + IntegrationCreatedEvent, + IntegrationEndpointsChangedEvent, + KafkaConnectRequires, + ) + + logger = logging.getLogger(__name__) # Extra roles that this application needs when interacting with the database. @@ -52,6 +60,7 @@ EXTRA_USER_ROLES_KAFKA = "producer,consumer" EXTRA_USER_ROLES_OPENSEARCH = "admin,default" CONSUMER_GROUP_PREFIX = "test-prefix" +BAD_URL = "http://badurl" class ApplicationCharm(CharmBase): @@ -216,6 +225,24 @@ def __init__(self, *args): ) self.framework.observe(self.kafka.on.topic_created, self._on_kafka_topic_created) + # Kafka Connect events + + if DATA_INTERFACES_VERSION > 52: + self.connect_source = KafkaConnectRequires( + self, "connect-source", "http://10.10.10.10:8080" + ) + + self.connect_sink = KafkaConnectRequires(self, "connect-sink", BAD_URL) + + self.framework.observe( + self.connect_source.on.integration_created, self._on_connect_integration_created + ) + + self.framework.observe( + self.connect_source.on.integration_endpoints_changed, + self._on_connect_endpoints_changed, + ) + # OpenSearch events self.opensearch = OpenSearchRequires( @@ -412,6 +439,16 @@ def _on_kafka_entity_created(self, _: TopicEntityCreatedEvent) -> None: logger.info("On kafka entity created") self.unit.status = ActiveStatus("kafka_entity_created") + if DATA_INTERFACES_VERSION > 52: + + def _on_connect_integration_created(self, _: IntegrationCreatedEvent): + """Event triggered when Kafka Connect integration credentials are created for this application.""" + self.unit.status = ActiveStatus("connect_integration_created") + + def _on_connect_endpoints_changed(self, _: IntegrationEndpointsChangedEvent): + """Event triggered when Kafka Connect REST endpoints change.""" + self.unit.status = ActiveStatus("connect_endpoints_changed") + def _on_opensearch_index_created(self, _: IndexCreatedEvent): """Event triggered when an index was created for this application.""" logger.info("On opensearch index created event fired") diff --git a/tests/v0/integration/conftest.py b/tests/v0/integration/conftest.py index fa477d6b..cb8b66b7 100644 --- a/tests/v0/integration/conftest.py +++ b/tests/v0/integration/conftest.py @@ -58,6 +58,8 @@ def copy_data_interfaces_library_into_charm(ops_test: OpsTest): shutil.copyfile(library_path, install_path) install_path = "tests/v0/integration/opensearch-charm/" + library_path shutil.copyfile(library_path, install_path) + install_path = "tests/v0/integration/kafka-connect-charm/" + library_path + shutil.copyfile(library_path, install_path) @pytest.fixture(scope="module", autouse=True) @@ -118,6 +120,14 @@ async def kafka_charm(ops_test: OpsTest): return charm +@pytest.fixture(scope="module") +async def kafka_connect_charm(ops_test: OpsTest): + """Build the Kafka Connect dummy charm.""" + charm_path = "tests/v0/integration/kafka-connect-charm" + charm = await ops_test.build_charm(charm_path) + return charm + + @pytest.fixture(scope="module") async def opensearch_charm(ops_test: OpsTest): """Build the OpenSearch charm. diff --git a/tests/v0/integration/dummy-database-charm/lib/charms/data_platform_libs/v0/data_interfaces.py b/tests/v0/integration/dummy-database-charm/lib/charms/data_platform_libs/v0/data_interfaces.py index 0fab404d..46f3e25d 100644 --- a/tests/v0/integration/dummy-database-charm/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/tests/v0/integration/dummy-database-charm/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -16,7 +16,7 @@ This library contains the Requires and Provides classes for handling the relation between an application and multiple managed application supported by the data-team: -MySQL, Postgresql, MongoDB, Redis, and Kafka. +MySQL, Postgresql, MongoDB, Redis, Kafka, and Karapace. ### Database (MySQL, Postgresql, MongoDB, and Redis) @@ -306,6 +306,105 @@ def _on_entity_requested(self, event: EntityRequestedEvent): It is preferred to subscribe to this event instead of relation changed event to avoid creating a new topic when other information other than a topic name is exchanged in the relation databag. + +### Karapace + +This library is the interface to use and interact with the Karapace charm. This library contains +custom events that add convenience to manage Karapace, and provides methods to consume the +application related data. + +#### Requirer Charm + +```python + +from charms.data_platform_libs.v0.data_interfaces import ( + EndpointsChangedEvent, + KarapaceRequires, + SubjectAllowedEvent, +) + +class ApplicationCharm(CharmBase): + + def __init__(self, *args): + super().__init__(*args) + self.karapace = KarapaceRequires(self, relation_name="karapace_client", subject="test-subject") + self.framework.observe( + self.karapace.on.server_changed, self._on_karapace_server_changed + ) + self.framework.observe( + self.karapace.on.subject_allowed, self._on_karapace_subject_allowed + ) + self.framework.observe( + self.karapace.on.subject_entity_created, self._on_subject_entity_created + ) + + + def _on_karapace_server_changed(self, event: EndpointsChangedEvent): + # Event triggered when a server endpoint was changed for this application + new_server = event.endpoints + ... + + def _on_karapace_subject_allowed(self, event: SubjectAllowedEvent): + # Event triggered when a subject was allowed for this application + username = event.username + password = event.password + tls = event.tls + endpoints = event.endpoints + ... + + def _on_subject_entity_created(self, event: SubjectEntityCreatedEvent): + # Event triggered when a subject entity was created this application + entity_name = event.entity_name + entity_password = event.entity_password + ... +``` + +As shown above, the library provides some custom events to handle specific situations, +which are listed below: + +- subject_allowed: event emitted when the requested subject is allowed. +- server_changed: event emitted when the server endpoints have changed. + +#### Provider Charm + +Following the previous example, this is an example of the provider charm. + +```python +class SampleCharm(CharmBase): + +from charms.data_platform_libs.v0.data_interfaces import ( + KarapaceProvides, + SubjectRequestedEvent, +) + + def __init__(self, *args): + super().__init__(*args) + + # Default charm events. + self.framework.observe(self.on.start, self._on_start) + + # Charm events defined in the Karapace Provides charm library. + self.karapace_provider = KarapaceProvides(self, relation_name="karapace_client") + self.framework.observe(self.karapace_provider.on.subject_requested, self._on_subject_requested) + # Karapace generic helper + self.karapace = KarapaceHelper() + + def _on_subject_requested(self, event: SubjectRequestedEvent): + # Handle the on_subject_requested event. + + subject = event.subject + relation_id = event.relation.id + # set connection info in the databag relation + self.karapace_provider.set_endpoint(relation_id, self.karapace.get_endpoint()) + self.karapace_provider.set_credentials(relation_id, username=username, password=password) + self.karapace_provider.set_tls(relation_id, "False") +``` + +As shown above, the library provides a custom event (subject_requested) to handle +the situation when an application charm requests a new subject to be created. +It is preferred to subscribe to this event instead of relation changed event to avoid +creating a new subject when other information other than a subject name is +exchanged in the relation databag. """ import copy @@ -318,6 +417,7 @@ def _on_entity_requested(self, event: EntityRequestedEvent): from typing import ( Callable, Dict, + Final, ItemsView, KeysView, List, @@ -348,7 +448,7 @@ def _on_entity_requested(self, event: EntityRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 51 +LIBPATCH = 53 PYDEPS = ["ops>=2.0.0"] @@ -4010,6 +4110,557 @@ def __init__( KafkaRequirerEventHandlers.__init__(self, charm, self) +# Karapace related events + + +class KarapaceProvidesEvent(RelationEvent): + """Base class for Karapace events.""" + + @property + def subject(self) -> Optional[str]: + """Returns the subject that was requested.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("subject") + + +class SubjectRequestedEvent(KarapaceProvidesEvent): + """Event emitted when a new subject is requested for use on this relation.""" + + @property + def extra_user_roles(self) -> Optional[str]: + """Returns the extra user roles that were requested.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("extra-user-roles") + + +class SubjectEntityRequestedEvent(KarapaceProvidesEvent, EntityProvidesEvent): + """Event emitted when a new entity is requested for use on this relation.""" + + +class SubjectEntityPermissionsChangedEvent(KarapaceProvidesEvent, EntityProvidesEvent): + """Event emitted when existing entity permissions are changed on this relation.""" + + +class KarapaceProvidesEvents(CharmEvents): + """Karapace events. + + This class defines the events that the Karapace can emit. + """ + + subject_requested = EventSource(SubjectRequestedEvent) + subject_entity_requested = EventSource(SubjectEntityRequestedEvent) + subject_entity_permissions_changed = EventSource(SubjectEntityPermissionsChangedEvent) + + +class KarapaceRequiresEvent(RelationEvent): + """Base class for Karapace events.""" + + @property + def subject(self) -> Optional[str]: + """Returns the subject.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("subject") + + @property + def endpoints(self) -> Optional[str]: + """Returns a comma-separated list of broker uris.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("endpoints") + + +class SubjectAllowedEvent(AuthenticationEvent, KarapaceRequiresEvent): + """Event emitted when a new subject ACL is created for use on this relation.""" + + +class SubjectEntityCreatedEvent(EntityRequiresEvent, KarapaceRequiresEvent): + """Event emitted when a new entity is created for use on this relation.""" + + +class EndpointsChangedEvent(AuthenticationEvent, KarapaceRequiresEvent): + """Event emitted when the endpoints are changed.""" + + +class KarapaceRequiresEvents(CharmEvents): + """Karapace events. + + This class defines the events that Karapace can emit. + """ + + subject_allowed = EventSource(SubjectAllowedEvent) + subject_entity_created = EventSource(SubjectEntityCreatedEvent) + server_changed = EventSource(EndpointsChangedEvent) + + +# Karapace Provides and Requires + + +class KarapaceProviderData(ProviderData): + """Provider-side of the Karapace relation.""" + + RESOURCE_FIELD = "subject" + + def __init__(self, model: Model, relation_name: str) -> None: + super().__init__(model, relation_name) + + def set_subject(self, relation_id: int, subject: str) -> None: + """Set subject name in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + subject: the subject name. + """ + self.update_relation_data(relation_id, {"subject": subject}) + + def set_endpoint(self, relation_id: int, endpoint: str) -> None: + """Set the endpoint in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + endpoint: the server address. + """ + self.update_relation_data(relation_id, {"endpoints": endpoint}) + + +class KarapaceProviderEventHandlers(ProviderEventHandlers): + """Provider-side of the Karapace relation.""" + + on = KarapaceProvidesEvents() # pyright: ignore [reportAssignmentType] + + def __init__(self, charm: CharmBase, relation_data: KarapaceProviderData) -> None: + super().__init__(charm, relation_data) + # Just to keep lint quiet, can't resolve inheritance. The same happened in super().__init__() above + self.relation_data = relation_data + + def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: + """Event emitted when the relation has changed.""" + super()._on_relation_changed_event(event) + + # Leader only + if not self.relation_data.local_unit.is_leader(): + return + + # Check which data has changed to emit customs events. + diff = self._diff(event) + + # Validate entity information is not dynamically changed + self._validate_entity_consistency(event, diff) + + # Emit a subject requested event if the setup key (subject name) + # was added to the relation databag, but the entity-type key was not. + if "subject" in diff.added and "entity-type" not in diff.added: + getattr(self.on, "subject_requested").emit( + event.relation, app=event.app, unit=event.unit + ) + + # To avoid unnecessary application restarts do not trigger other events. + return + + # Emit an entity requested event if the setup key (subject name) + # was added to the relation databag, in addition to the entity-type key. + if "subject" in diff.added and "entity-type" in diff.added: + getattr(self.on, "subject_entity_requested").emit( + event.relation, app=event.app, unit=event.unit + ) + + # To avoid unnecessary application restarts do not trigger other events. + return + + # Emit a permissions changed event if the setup key (subject name) + # was added to the relation databag, and the entity-permissions key changed. + if ( + "subject" not in diff.added + and "entity-type" not in diff.added + and ("entity-permissions" in diff.added or "entity-permissions" in diff.changed) + ): + getattr(self.on, "subject_entity_permissions_changed").emit( + event.relation, app=event.app, unit=event.unit + ) + + # To avoid unnecessary application restarts do not trigger other events. + return + + def _on_secret_changed_event(self, event: SecretChangedEvent): + """Event notifying about a new value of a secret.""" + pass + + +class KarapaceProvides(KarapaceProviderData, KarapaceProviderEventHandlers): + """Provider-side of the Karapace relation.""" + + def __init__(self, charm: CharmBase, relation_name: str) -> None: + KarapaceProviderData.__init__(self, charm.model, relation_name) + KarapaceProviderEventHandlers.__init__(self, charm, self) + + +class KarapaceRequirerData(RequirerData): + """Requirer-side of the Karapace relation.""" + + def __init__( + self, + model: Model, + relation_name: str, + subject: str, + extra_user_roles: Optional[str] = None, + additional_secret_fields: Optional[List[str]] = [], + extra_group_roles: Optional[str] = None, + entity_type: Optional[str] = None, + entity_permissions: Optional[str] = None, + ): + """Manager of Karapace client relations.""" + super().__init__( + model, + relation_name, + extra_user_roles, + additional_secret_fields, + extra_group_roles, + entity_type, + entity_permissions, + ) + self.subject = subject + + @property + def subject(self): + """Topic to use in Karapace.""" + return self._subject + + @subject.setter + def subject(self, value): + # Avoid wildcards + if value == "*": + raise ValueError(f"Error on subject '{value}', cannot be a wildcard.") + self._subject = value + + +class KarapaceRequirerEventHandlers(RequirerEventHandlers): + """Requires-side of the Karapace relation.""" + + on = KarapaceRequiresEvents() # pyright: ignore [reportAssignmentType] + + def __init__(self, charm: CharmBase, relation_data: KarapaceRequirerData) -> None: + super().__init__(charm, relation_data) + # Just to keep lint quiet, can't resolve inheritance. The same happened in super().__init__() above + self.relation_data = relation_data + + def _on_relation_created_event(self, event: RelationCreatedEvent) -> None: + """Event emitted when the Karapace relation is created.""" + super()._on_relation_created_event(event) + + if not self.relation_data.local_unit.is_leader(): + return + + # Sets subject and extra user roles + relation_data = {"subject": self.relation_data.subject} + + if self.relation_data.extra_user_roles: + relation_data["extra-user-roles"] = self.relation_data.extra_user_roles + if self.relation_data.extra_group_roles: + relation_data["extra-group-roles"] = self.relation_data.extra_group_roles + if self.relation_data.entity_type: + relation_data["entity-type"] = self.relation_data.entity_type + if self.relation_data.entity_permissions: + relation_data["entity-permissions"] = self.relation_data.entity_permissions + + self.relation_data.update_relation_data(event.relation.id, relation_data) + + def _on_secret_changed_event(self, event: SecretChangedEvent): + """Event notifying about a new value of a secret.""" + pass + + def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: + """Event emitted when the Karapace relation has changed.""" + # Check which data has changed to emit customs events. + diff = self._diff(event) + + # Check if the subject ACLs are created + # (the Karapace charm shared the credentials). + + # Register all new secrets with their labels + if any(newval for newval in diff.added if self.relation_data._is_secret_field(newval)): + self.relation_data._register_secrets_to_relation(event.relation, diff.added) + + app_databag = get_encoded_dict(event.relation, event.app, "data") + if app_databag is None: + app_databag = {} + + if self._main_credentials_shared(diff) and "entity-type" not in app_databag: + # Emit the default event (the one without an alias). + logger.info("subject ACL created at %s", datetime.now()) + getattr(self.on, "subject_allowed").emit( + event.relation, app=event.app, unit=event.unit + ) + + # To avoid unnecessary application restarts do not trigger other events. + return + + if self._entity_credentials_shared(diff) and "entity-type" in app_databag: + # Emit the default event (the one without an alias). + logger.info("entity created at %s", datetime.now()) + getattr(self.on, "subject_entity_created").emit( + event.relation, app=event.app, unit=event.unit + ) + + # To avoid unnecessary application restarts do not trigger other events. + return + + # Emit an endpoints changed event if the Karapace endpoints added or changed + # this info in the relation databag. + if "endpoints" in diff.added or "endpoints" in diff.changed: + # Emit the default event (the one without an alias). + logger.info("endpoints changed on %s", datetime.now()) + getattr(self.on, "server_changed").emit( + event.relation, app=event.app, unit=event.unit + ) # here check if this is the right design + + # To avoid unnecessary application restarts do not trigger other events. + return + + +class KarapaceRequires(KarapaceRequirerData, KarapaceRequirerEventHandlers): + """Provider-side of the Karapace relation.""" + + def __init__( + self, + charm: CharmBase, + relation_name: str, + subject: str, + extra_user_roles: Optional[str] = None, + additional_secret_fields: Optional[List[str]] = [], + extra_group_roles: Optional[str] = None, + entity_type: Optional[str] = None, + entity_permissions: Optional[str] = None, + ) -> None: + KarapaceRequirerData.__init__( + self, + charm.model, + relation_name, + subject, + extra_user_roles, + additional_secret_fields, + extra_group_roles, + entity_type, + entity_permissions, + ) + KarapaceRequirerEventHandlers.__init__(self, charm, self) + + +# Kafka Connect Events + + +class KafkaConnectProvidesEvent(RelationEvent): + """Base class for Kafka Connect Provider events.""" + + @property + def plugin_url(self) -> Optional[str]: + """Returns the REST endpoint URL which serves the connector plugin.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("plugin-url") + + +class IntegrationRequestedEvent(KafkaConnectProvidesEvent): + """Event emitted when a new integrator boots up and is ready to serve the connector plugin.""" + + +class KafkaConnectProvidesEvents(CharmEvents): + """Kafka Connect Provider Events.""" + + integration_requested = EventSource(IntegrationRequestedEvent) + + +class KafkaConnectRequiresEvent(AuthenticationEvent): + """Base class for Kafka Connect Requirer events.""" + + @property + def plugin_url(self) -> Optional[str]: + """Returns the REST endpoint URL which serves the connector plugin.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("plugin-url") + + +class IntegrationCreatedEvent(KafkaConnectRequiresEvent): + """Event emitted when the credentials are created for this integrator.""" + + +class IntegrationEndpointsChangedEvent(KafkaConnectRequiresEvent): + """Event emitted when Kafka Connect REST endpoints change.""" + + +class KafkaConnectRequiresEvents(CharmEvents): + """Kafka Connect Requirer Events.""" + + integration_created = EventSource(IntegrationCreatedEvent) + integration_endpoints_changed = EventSource(IntegrationEndpointsChangedEvent) + + +class KafkaConnectProviderData(ProviderData): + """Provider-side of the Kafka Connect relation.""" + + RESOURCE_FIELD = "plugin-url" + + def __init__(self, model: Model, relation_name: str) -> None: + super().__init__(model, relation_name) + + def set_endpoints(self, relation_id: int, endpoints: str) -> None: + """Sets REST endpoints of the Kafka Connect service.""" + self.update_relation_data(relation_id, {"endpoints": endpoints}) + + +class KafkaConnectProviderEventHandlers(EventHandlers): + """Provider-side implementation of the Kafka Connect event handlers.""" + + on = KafkaConnectProvidesEvents() # pyright: ignore [reportAssignmentType] + + def __init__(self, charm: CharmBase, relation_data: KafkaConnectProviderData) -> None: + super().__init__(charm, relation_data) + self.relation_data = relation_data + + def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: + """Event emitted when the relation has changed.""" + # Leader only + if not self.relation_data.local_unit.is_leader(): + return + + # Check which data has changed to emit customs events. + diff = self._diff(event) + + if "plugin-url" in diff.added: + getattr(self.on, "integration_requested").emit( + event.relation, app=event.app, unit=event.unit + ) + + def _on_secret_changed_event(self, event: SecretChangedEvent): + """Event notifying about a new value of a secret.""" + pass + + +class KafkaConnectProvides(KafkaConnectProviderData, KafkaConnectProviderEventHandlers): + """Provider-side implementation of the Kafka Connect relation.""" + + def __init__(self, charm: CharmBase, relation_name: str) -> None: + KafkaConnectProviderData.__init__(self, charm.model, relation_name) + KafkaConnectProviderEventHandlers.__init__(self, charm, self) + + +# Sentinel value passed from Kafka Connect requirer side when it does not need to serve any plugins. +PLUGIN_URL_NOT_REQUIRED: Final[str] = "NOT-REQUIRED" + + +class KafkaConnectRequirerData(RequirerData): + """Requirer-side of the Kafka Connect relation.""" + + def __init__( + self, + model: Model, + relation_name: str, + plugin_url: str, + extra_user_roles: Optional[str] = None, + additional_secret_fields: Optional[List[str]] = [], + ): + """Manager of Kafka client relations.""" + super().__init__( + model, + relation_name, + extra_user_roles=extra_user_roles, + additional_secret_fields=additional_secret_fields, + ) + self.plugin_url = plugin_url + + @property + def plugin_url(self): + """The REST endpoint URL which serves the connector plugin.""" + return self._plugin_url + + @plugin_url.setter + def plugin_url(self, value): + self._plugin_url = value + + +class KafkaConnectRequirerEventHandlers(RequirerEventHandlers): + """Requirer-side of the Kafka Connect relation.""" + + on = KafkaConnectRequiresEvents() # pyright: ignore [reportAssignmentType] + + def __init__(self, charm: CharmBase, relation_data: KafkaConnectRequirerData) -> None: + super().__init__(charm, relation_data) + self.relation_data = relation_data + + def _on_relation_created_event(self, event: RelationCreatedEvent) -> None: + """Event emitted when the Kafka Connect relation is created.""" + super()._on_relation_created_event(event) + + if not self.relation_data.local_unit.is_leader(): + return + + relation_data = {"plugin-url": self.relation_data.plugin_url} + self.relation_data.update_relation_data(event.relation.id, relation_data) + + def _on_secret_changed_event(self, event: SecretChangedEvent): + """Event notifying about a new value of a secret.""" + pass + + def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: + """Event emitted when the Kafka Connect relation has changed.""" + # Check which data has changed to emit customs events. + diff = self._diff(event) + + # Register all new secrets with their labels + if any(newval for newval in diff.added if self.relation_data._is_secret_field(newval)): + self.relation_data._register_secrets_to_relation(event.relation, diff.added) + + secret_field_user = self.relation_data._generate_secret_field_name(SECRET_GROUPS.USER) + if ( + "username" in diff.added and "password" in diff.added + ) or secret_field_user in diff.added: + logger.info("integration created at %s", datetime.now()) + getattr(self.on, "integration_created").emit( + event.relation, app=event.app, unit=event.unit + ) + return + + # Emit an endpoints changed event if the provider added or + # changed this info in the relation databag. + if "endpoints" in diff.added or "endpoints" in diff.changed: + # Emit the default event (the one without an alias). + logger.info("endpoints changed on %s", datetime.now()) + getattr(self.on, "integration_endpoints_changed").emit( + event.relation, app=event.app, unit=event.unit + ) + return + + +class KafkaConnectRequires(KafkaConnectRequirerData, KafkaConnectRequirerEventHandlers): + """Requirer-side implementation of the Kafka Connect relation.""" + + def __init__( + self, + charm: CharmBase, + relation_name: str, + plugin_url: str, + extra_user_roles: Optional[str] = None, + additional_secret_fields: Optional[List[str]] = [], + ) -> None: + KafkaConnectRequirerData.__init__( + self, + charm.model, + relation_name, + plugin_url, + extra_user_roles=extra_user_roles, + additional_secret_fields=additional_secret_fields, + ) + KafkaConnectRequirerEventHandlers.__init__(self, charm, self) + + # Opensearch related events diff --git a/tests/v0/integration/kafka-connect-charm/actions.yaml b/tests/v0/integration/kafka-connect-charm/actions.yaml new file mode 100644 index 00000000..79bafa15 --- /dev/null +++ b/tests/v0/integration/kafka-connect-charm/actions.yaml @@ -0,0 +1,13 @@ +# Copyright 2025 Canonical Ltd. +sync: + description: Change Kafka Connect provider username/password/endpoints. + params: + key: + type: string + description: | + one of the following values: username | password | endpoints + value: + type: string + description: New username/password/endpoints. + required: [key, value] + diff --git a/tests/v0/integration/kafka-connect-charm/charmcraft.yaml b/tests/v0/integration/kafka-connect-charm/charmcraft.yaml new file mode 100644 index 00000000..4a0e40bf --- /dev/null +++ b/tests/v0/integration/kafka-connect-charm/charmcraft.yaml @@ -0,0 +1,11 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +type: charm +bases: + - build-on: + - name: "ubuntu" + channel: "22.04" + run-on: + - name: "ubuntu" + channel: "22.04" diff --git a/tests/v0/integration/kafka-connect-charm/lib/charms/data_platform_libs/v0/.gitkeep b/tests/v0/integration/kafka-connect-charm/lib/charms/data_platform_libs/v0/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/tests/v0/integration/kafka-connect-charm/metadata.yaml b/tests/v0/integration/kafka-connect-charm/metadata.yaml new file mode 100644 index 00000000..e93e1229 --- /dev/null +++ b/tests/v0/integration/kafka-connect-charm/metadata.yaml @@ -0,0 +1,16 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. +name: kafka-connect +description: | + Toy charm used to emulate Kafka Connect in integration tests. +summary: | + Charm used to mimic the Kafka Connect for test purpose only. + + +peers: + worker: + interface: worker + +provides: + connect-client: + interface: connect_client diff --git a/tests/v0/integration/kafka-connect-charm/requirements.txt b/tests/v0/integration/kafka-connect-charm/requirements.txt new file mode 100644 index 00000000..365fa5eb --- /dev/null +++ b/tests/v0/integration/kafka-connect-charm/requirements.txt @@ -0,0 +1 @@ +ops >= 2.0.0 diff --git a/tests/v0/integration/kafka-connect-charm/src/charm.py b/tests/v0/integration/kafka-connect-charm/src/charm.py new file mode 100755 index 00000000..e82f69f3 --- /dev/null +++ b/tests/v0/integration/kafka-connect-charm/src/charm.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Kafka Connect provider charm that accepts connections from application charms. + +This charm is meant to be used only for testing +of the libraries in this repository. +""" + +import logging +from typing import Dict, Optional + +from ops.charm import ActionEvent, CharmBase +from ops.main import main +from ops.model import ActiveStatus, MaintenanceStatus + +from charms.data_platform_libs.v0.data_interfaces import ( + IntegrationRequestedEvent, + KafkaConnectProvides, +) + +logger = logging.getLogger(__name__) + +PEER = "worker" +REL = "connect-client" +BAD_URL = "http://badurl" +SYNC_ACTIONS = ("username", "password", "endpoints") + + +class KafkaConnectCharm(CharmBase): + """Kafka connect charm that accepts connections from application charms.""" + + def __init__(self, *args): + super().__init__(*args) + + # Default charm events. + self.framework.observe(self.on.start, self._on_start) + + # Charm events defined in the Kafka Connect Provides charm library. + self.provider = KafkaConnectProvides(self, relation_name=REL) + self.framework.observe( + self.provider.on.integration_requested, self._on_integration_requested + ) + self.framework.observe(self.on[PEER].relation_joined, self._on_peer_relation_joined) + + # syncaction + self.framework.observe(self.on.sync_action, self._on_sync) + + def _on_peer_relation_joined(self, _): + pass + + @property + def app_peer_data(self) -> Dict: + """Application peer relation data object.""" + relation = self.model.get_relation(PEER) + if not relation: + return {} + + return relation.data[self.app] + + def get_secret(self, scope: str, key: str) -> str: + """Get secret from the secret storage.""" + if scope == "app": + return self.app_peer_data.get(key, "") + else: + raise RuntimeError("Unknown secret scope.") + + def set_secret(self, scope: str, key: str, value: Optional[str]) -> None: + """Set secret in the secret storage.""" + if scope == "app": + if not value: + del self.app_peer_data[key] + return + self.app_peer_data.update({key: value}) + else: + raise RuntimeError("Unknown secret scope.") + + def _on_start(self, _) -> None: + """Only sets an active status.""" + self.unit.status = ActiveStatus("Kafka Connect Ready!") + + def _download_plugin(self, plugin_url) -> bool: + """Fake plugin downloader, returns True on every URL except `BAD_URL`.""" + if plugin_url == BAD_URL: + return False + + return True + + def _on_integration_requested(self, event: IntegrationRequestedEvent): + """Handle the `on_integration_requested` event.""" + # retrieve `plugin-url` from the requirer side + plugin_url = event.plugin_url + self.unit.status = MaintenanceStatus(f"Retrieving plugin from client: {plugin_url}.") + + if not self._download_plugin(plugin_url): + event.defer() + return + + self.unit.status = MaintenanceStatus("Plugin downloaded successfully.") + relation_id = event.relation.id + + username = "integrator" + password = "password" + endpoints = "http://worker1:8083,http://worker2:8083" + self.set_secret("app", "username", username) + self.set_secret("app", "password", password) + self.set_secret("app", "endpoints", endpoints) + # set connection info in the databag relation + self.provider.set_endpoints(relation_id, endpoints) + self.provider.set_credentials(relation_id, username=username, password=password) + self.provider.set_tls(relation_id, "disabled") + self.provider.set_tls_ca(relation_id, "disabled") + self.unit.status = ActiveStatus( + f"Integration setup successful for relation {relation_id}!" + ) + + def _on_sync(self, event: ActionEvent): + """Handler for `sync` action.""" + key = event.params.get("key") + if key not in SYNC_ACTIONS: + event.fail(f"Action '{key}' not permitted.") + return + + value = event.params.get("value", "") + self.set_secret("app", key, value) + + # update clients data + if len(self.provider.relations) > 0: + self._update_clients_data(key, value) + + event.set_results({key: value}) + self.unit.status = ActiveStatus(f"{key} changed on connect_client!") + + def _update_clients_data(self, key: str, value: str) -> None: + """Updates connect clients data.""" + for relation in self.provider.relations: + # compatibility: match/case isn't available in python version used in the lib + if key in ("username", "password"): + self.provider.set_credentials( + relation.id, + username=self.get_secret("app", "username"), + password=self.get_secret("app", "password"), + ) + elif key == "endpoints": + self.provider.set_endpoints(relation.id, value) + else: + pass + + def _on_reset_unit_status(self, event: ActionEvent): + """Reset the status message of the unit.""" + self.unit.status = ActiveStatus() + event.set_results({"Status": "Reset unit status message"}) + + +if __name__ == "__main__": + main(KafkaConnectCharm) diff --git a/tests/v0/integration/test_kafka_connect_charm.py b/tests/v0/integration/test_kafka_connect_charm.py new file mode 100644 index 00000000..b57c564c --- /dev/null +++ b/tests/v0/integration/test_kafka_connect_charm.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. +import asyncio +import logging +from pathlib import Path + +import pytest +from pytest_operator.plugin import OpsTest + +from .helpers import get_application_relation_data, get_juju_secret + +logger = logging.getLogger(__name__) + +REQUIRER_APP_NAME = "requirer-app" +PROVIDER_APP_NAME = "kafka-connect" +APP_NAMES = [REQUIRER_APP_NAME, PROVIDER_APP_NAME] +SOURCE_REL = "connect-source" +SINK_REL = "connect-sink" +PROV_SECRET_PREFIX = "secret-" + + +@pytest.mark.abort_on_fail +@pytest.mark.skip_if_deployed +@pytest.mark.log_errors_allowed( + 'ERROR juju.worker.meterstatus error running "meter-status-changed": charm missing from disk' +) +async def test_deploy_charms( + ops_test: OpsTest, application_charm: Path, kafka_connect_charm: Path +): + """Test deployment of Kafka Connect provider and requirer toy charms.""" + await asyncio.gather( + ops_test.model.deploy( + application_charm, application_name=REQUIRER_APP_NAME, num_units=1, series="jammy" + ), + ops_test.model.deploy( + kafka_connect_charm, application_name=PROVIDER_APP_NAME, num_units=1, series="jammy" + ), + ) + + await ops_test.model.wait_for_idle( + apps=APP_NAMES, + idle_period=30, + timeout=1800, + status="active", + ) + + assert ops_test.model.applications[REQUIRER_APP_NAME].status == "active" + assert ops_test.model.applications[PROVIDER_APP_NAME].status == "active" + + +@pytest.mark.abort_on_fail +@pytest.mark.usefixtures("only_with_juju_secrets") +async def test_connect_client_relation_with_charm_libraries( + ops_test: OpsTest, request: pytest.FixtureRequest +): + """Test basic functionality of Kafka Connect client relation interface.""" + # Relate the charms and wait for them exchanging some connection data. + await ops_test.model.add_relation(PROVIDER_APP_NAME, f"{REQUIRER_APP_NAME}:{SOURCE_REL}") + await ops_test.model.wait_for_idle(apps=APP_NAMES, status="active") + + # check unit messagge on requirer side + for unit in ops_test.model.applications[REQUIRER_APP_NAME].units: + assert unit.workload_status_message == "connect_integration_created" + # check unit message on provider side + for unit in ops_test.model.applications[PROVIDER_APP_NAME].units: + assert "successful" in unit.workload_status_message + + secret_uri = ( + await get_application_relation_data( + ops_test, REQUIRER_APP_NAME, SOURCE_REL, f"{PROV_SECRET_PREFIX}user" + ) + or "" + ) + + secret_content = await get_juju_secret(ops_test, secret_uri) + username = secret_content["username"] + password = secret_content["password"] + + endpoints = await get_application_relation_data( + ops_test, REQUIRER_APP_NAME, SOURCE_REL, "endpoints" + ) + + request.config.cache.set("initial_password", password) + request.config.cache.set("initial_endpoints", endpoints) + + assert username == "integrator" + assert password == "password" + assert endpoints == "http://worker1:8083,http://worker2:8083" + + +@pytest.mark.abort_on_fail +@pytest.mark.usefixtures("only_with_juju_secrets") +async def test_kafka_connect_credentials_change(ops_test: OpsTest, request: pytest.FixtureRequest): + """Test Kafka Connect credentials change functionality.""" + # Get current password + password = request.config.cache.get("initial_password", "") + assert password == "password" + + # Change connect password + action = ( + await ops_test.model.applications[PROVIDER_APP_NAME] + .units[0] + .run_action("sync", key="password", value="newpass") + ) + await action.wait() + + await ops_test.model.wait_for_idle( + apps=APP_NAMES, + idle_period=20, + timeout=600, + status="active", + ) + + secret_uri = ( + await get_application_relation_data( + ops_test, REQUIRER_APP_NAME, SOURCE_REL, f"{PROV_SECRET_PREFIX}user" + ) + or "" + ) + + secret_content = await get_juju_secret(ops_test, secret_uri) + new_password = secret_content["password"] + + assert password != new_password + assert new_password == "newpass" + + +@pytest.mark.abort_on_fail +@pytest.mark.usefixtures("only_with_juju_secrets") +async def test_kafka_connect_endpoints_change(ops_test: OpsTest, request: pytest.FixtureRequest): + """Test Kafka Connect endpoints change functionality.""" + # Get current password + endpoints = request.config.cache.get("initial_endpoints", "") + assert endpoints == "http://worker1:8083,http://worker2:8083" + + # Change connect endpoints + action = ( + await ops_test.model.applications[PROVIDER_APP_NAME] + .units[0] + .run_action("sync", key="endpoints", value="http://worker1:8083") + ) + await action.wait() + + await ops_test.model.wait_for_idle( + apps=APP_NAMES, + idle_period=20, + timeout=600, + status="active", + ) + + new_endpoints = await get_application_relation_data( + ops_test, REQUIRER_APP_NAME, SOURCE_REL, "endpoints" + ) + + assert endpoints != new_endpoints + assert new_endpoints == "http://worker1:8083" diff --git a/tox.ini b/tox.ini index f051f219..f963244a 100644 --- a/tox.ini +++ b/tox.ini @@ -173,3 +173,17 @@ deps = -r {[vars]reqs_path}/v0/requirements.txt commands = pytest -v --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/v0/integration/test_secrets.py + + +[testenv:integration-kafka-connect-v0] +description = Run Kafka Connect integration tests +deps = + pytest<8.2.0 + juju{env:LIBJUJU_VERSION_SPECIFIER:==3.6.1.0} + pytest-operator<0.43 + pytest-mock + websockets{env:WEBSOCKETS_VERSION_SPECIFIER:} + -r {[vars]reqs_path}/v0/requirements.txt +commands = + pytest -v --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/v0/integration/test_kafka_connect_charm.py +