diff --git a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py index 84258fe830..01e27875a2 100644 --- a/managedkafka/snippets/connect/clusters/delete_connect_cluster.py +++ b/managedkafka/snippets/connect/clusters/delete_connect_cluster.py @@ -41,14 +41,14 @@ def delete_connect_cluster( # region = "us-central1" # connect_cluster_id = "my-connect-cluster" - client = ManagedKafkaConnectClient() + connect_client = ManagedKafkaConnectClient() request = managedkafka_v1.DeleteConnectClusterRequest( - name=client.connect_cluster_path(project_id, region, connect_cluster_id), + name=connect_client.connect_cluster_path(project_id, region, connect_cluster_id), ) try: - operation = client.delete_connect_cluster(request=request) + operation = connect_client.delete_connect_cluster(request=request) print(f"Waiting for operation {operation.operation.name} to complete...") operation.result() print("Deleted Connect cluster") diff --git a/managedkafka/snippets/connect/connectors/connectors_test.py b/managedkafka/snippets/connect/connectors/connectors_test.py index 73d7323f14..ade860ae40 100644 --- a/managedkafka/snippets/connect/connectors/connectors_test.py +++ b/managedkafka/snippets/connect/connectors/connectors_test.py @@ -20,14 +20,23 @@ import create_mirrormaker2_source_connector import create_pubsub_sink_connector import create_pubsub_source_connector +import delete_connector +import get_connector from google.api_core.operation import Operation from google.cloud import managedkafka_v1 +import list_connectors +import pause_connector import pytest +import restart_connector +import resume_connector +import stop_connector +import update_connector PROJECT_ID = "test-project-id" REGION = "us-central1" CONNECT_CLUSTER_ID = "test-connect-cluster-id" +CONNECTOR_ID = "test-connector-id" @mock.patch( @@ -194,3 +203,203 @@ def test_create_bigquery_sink_connector( assert "Created Connector" in out assert connector_id in out mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.list_connectors" +) +def test_list_connectors( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector = managedkafka_v1.types.Connector() + connector.name = managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID + ) + mock_method.return_value = [connector] + + list_connectors.list_connectors( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + ) + + out, _ = capsys.readouterr() + assert "Got connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.get_connector" +) +def test_get_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector = managedkafka_v1.types.Connector() + connector.name = managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID + ) + mock_method.return_value = connector + + get_connector.get_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Got connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.update_connector" +) +def test_update_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + configs = {"tasks.max": "6", "value.converter.schemas.enable": "true"} + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = managedkafka_v1.ManagedKafkaConnectClient.connector_path( + PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID + ) + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + update_connector.update_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + configs=configs, + ) + + out, _ = capsys.readouterr() + assert "Updated connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.delete_connector" +) +def test_delete_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + operation.result = mock.MagicMock(return_value=None) + mock_method.return_value = operation + + delete_connector.delete_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Deleted connector" in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.pause_connector" +) +def test_pause_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + operation.result = mock.MagicMock(return_value=None) + mock_method.return_value = operation + + pause_connector.pause_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Paused connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.resume_connector" +) +def test_resume_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + operation.result = mock.MagicMock(return_value=None) + mock_method.return_value = operation + + resume_connector.resume_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Resumed connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.stop_connector" +) +def test_stop_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + operation.result = mock.MagicMock(return_value=None) + mock_method.return_value = operation + + stop_connector.stop_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Stopped connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.restart_connector" +) +def test_restart_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + operation = mock.MagicMock(spec=Operation) + operation.result = mock.MagicMock(return_value=None) + mock_method.return_value = operation + + restart_connector.restart_connector( + project_id=PROJECT_ID, + region=REGION, + connect_cluster_id=CONNECT_CLUSTER_ID, + connector_id=CONNECTOR_ID, + ) + + out, _ = capsys.readouterr() + assert "Restarted connector" in out + assert CONNECTOR_ID in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py b/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py index b6719e82ea..129872d66d 100644 --- a/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py +++ b/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + def create_bigquery_sink_connector( project_id: str, region: str, diff --git a/managedkafka/snippets/connect/connectors/create_mirrormaker2_source_connector.py b/managedkafka/snippets/connect/connectors/create_mirrormaker2_source_connector.py index f81730f556..2252ac2c2f 100644 --- a/managedkafka/snippets/connect/connectors/create_mirrormaker2_source_connector.py +++ b/managedkafka/snippets/connect/connectors/create_mirrormaker2_source_connector.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + def create_mirrormaker2_source_connector( project_id: str, region: str, @@ -75,8 +76,9 @@ def create_mirrormaker2_source_connector( "target.cluster.alias": target_cluster_alias, # This is usually the primary cluster. # Replicate all topics from the source "topics": topics, - # The value for bootstrap.servers is a comma-separated list of hostname:port pairs - # for one or more Kafka brokers in the source/target cluster. + # The value for bootstrap.servers is a hostname:port pair for the Kafka broker in + # the source/target cluster. + # For example: "kafka-broker:9092" "source.cluster.bootstrap.servers": source_bootstrap_servers, "target.cluster.bootstrap.servers": target_bootstrap_servers, # You can define an exclusion policy for topics as follows: diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py index 2742d8166d..7f455059a8 100644 --- a/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py +++ b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + def create_pubsub_sink_connector( project_id: str, region: str, diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py index d5c2acc701..19f891fd38 100644 --- a/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py +++ b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + def create_pubsub_source_connector( project_id: str, region: str, diff --git a/managedkafka/snippets/connect/connectors/delete_connector.py b/managedkafka/snippets/connect/connectors/delete_connector.py new file mode 100644 index 0000000000..84ee0e3ecf --- /dev/null +++ b/managedkafka/snippets/connect/connectors/delete_connector.py @@ -0,0 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def delete_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Delete a connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_delete_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.DeleteConnectorRequest( + name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id), + ) + + try: + operation = connect_client.delete_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print("Deleted connector") + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_delete_connector] diff --git a/managedkafka/snippets/connect/connectors/get_connector.py b/managedkafka/snippets/connect/connectors/get_connector.py new file mode 100644 index 0000000000..a3477ef4c7 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/get_connector.py @@ -0,0 +1,60 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def get_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Get details of a specific connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the NotFound exception if the connector is not found. + """ + # [START managedkafka_get_connector] + from google.api_core.exceptions import NotFound + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + connect_client = ManagedKafkaConnectClient() + + connector_path = connect_client.connector_path( + project_id, region, connect_cluster_id, connector_id + ) + request = managedkafka_v1.GetConnectorRequest( + name=connector_path, + ) + + try: + connector = connect_client.get_connector(request=request) + print("Got connector:", connector) + except NotFound as e: + print(f"Failed to get connector {connector_id} with error: {e}") + + # [END managedkafka_get_connector] diff --git a/managedkafka/snippets/connect/connectors/list_connectors.py b/managedkafka/snippets/connect/connectors/list_connectors.py new file mode 100644 index 0000000000..f707df0945 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/list_connectors.py @@ -0,0 +1,54 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def list_connectors( + project_id: str, + region: str, + connect_cluster_id: str, +) -> None: + """ + List all connectors in a Kafka Connect cluster. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + """ + # [START managedkafka_list_connectors] + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.api_core.exceptions import GoogleAPICallError + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.ListConnectorsRequest( + parent=connect_client.connect_cluster_path(project_id, region, connect_cluster_id), + ) + + try: + response = connect_client.list_connectors(request=request) + for connector in response: + print("Got connector:", connector) + except GoogleAPICallError as e: + print(f"Failed to list connectors with error: {e}") + + # [END managedkafka_list_connectors] diff --git a/managedkafka/snippets/connect/connectors/pause_connector.py b/managedkafka/snippets/connect/connectors/pause_connector.py new file mode 100644 index 0000000000..35f184c244 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/pause_connector.py @@ -0,0 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def pause_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Pause a connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_pause_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.PauseConnectorRequest( + name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id), + ) + + try: + operation = connect_client.pause_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print(f"Paused connector {connector_id}") + except GoogleAPICallError as e: + print(f"Failed to pause connector {connector_id} with error: {e}") + + # [END managedkafka_pause_connector] diff --git a/managedkafka/snippets/connect/connectors/restart_connector.py b/managedkafka/snippets/connect/connectors/restart_connector.py new file mode 100644 index 0000000000..72714de7aa --- /dev/null +++ b/managedkafka/snippets/connect/connectors/restart_connector.py @@ -0,0 +1,63 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def restart_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Restart a connector. + Note: This operation is used to restart a failed connector. To start + a stopped connector, use the `resume_connector` operation instead. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_restart_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.RestartConnectorRequest( + name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id), + ) + + try: + operation = connect_client.restart_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print(f"Restarted connector {connector_id}") + except GoogleAPICallError as e: + print(f"Failed to restart connector {connector_id} with error: {e}") + + # [END managedkafka_restart_connector] diff --git a/managedkafka/snippets/connect/connectors/resume_connector.py b/managedkafka/snippets/connect/connectors/resume_connector.py new file mode 100644 index 0000000000..3787368ef1 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/resume_connector.py @@ -0,0 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def resume_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Resume a paused connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_resume_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.ResumeConnectorRequest( + name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id), + ) + + try: + operation = connect_client.resume_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print(f"Resumed connector {connector_id}") + except GoogleAPICallError as e: + print(f"Failed to resume connector {connector_id} with error: {e}") + + # [END managedkafka_resume_connector] diff --git a/managedkafka/snippets/connect/connectors/stop_connector.py b/managedkafka/snippets/connect/connectors/stop_connector.py new file mode 100644 index 0000000000..cd3767075b --- /dev/null +++ b/managedkafka/snippets/connect/connectors/stop_connector.py @@ -0,0 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def stop_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, +) -> None: + """ + Stop a connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_stop_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud import managedkafka_v1 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + + connect_client = ManagedKafkaConnectClient() + + request = managedkafka_v1.StopConnectorRequest( + name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id), + ) + + try: + operation = connect_client.stop_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + operation.result() + print(f"Stopped connector {connector_id}") + except GoogleAPICallError as e: + print(f"Failed to stop connector {connector_id} with error: {e}") + + # [END managedkafka_stop_connector] diff --git a/managedkafka/snippets/connect/connectors/update_connector.py b/managedkafka/snippets/connect/connectors/update_connector.py new file mode 100644 index 0000000000..b0357079cd --- /dev/null +++ b/managedkafka/snippets/connect/connectors/update_connector.py @@ -0,0 +1,79 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def update_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + configs: dict, +) -> None: + """ + Update a connector's configuration. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: ID of the connector. + configs: Dictionary containing the updated configuration. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # [START managedkafka_update_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud import managedkafka_v1 + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector + from google.protobuf import field_mask_pb2 + + # TODO(developer) + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "my-connector" + # configs = { + # "tasks.max": "6", + # "value.converter.schemas.enable": "true" + # } + + connect_client = ManagedKafkaConnectClient() + + connector = Connector() + connector.name = connect_client.connector_path( + project_id, region, connect_cluster_id, connector_id + ) + connector.configs = configs + update_mask = field_mask_pb2.FieldMask() + update_mask.paths.append("config") + + # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/update-connector#editable-properties. + request = managedkafka_v1.UpdateConnectorRequest( + update_mask=update_mask, + connector=connector, + ) + + try: + operation = connect_client.update_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Updated connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + + # [END managedkafka_update_connector]