From 69f1d1faa776cd13048a40b7c02eba587bd282ad Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Tue, 6 Aug 2024 13:43:10 -0400 Subject: [PATCH 1/3] add regional topic methods --- .../kafka/source/PollerFactoryImpl.java | 4 +- src/test/java/it/Base.java | 2 +- src/test/java/it/StandaloneIT.java | 110 +++++++++++++----- src/test/resources/kafka_vm_startup_script.sh | 4 +- ...pubsub-lite-sink-connector-test.properties | 2 +- ...bsub-lite-source-connector-test.properties | 2 +- 6 files changed, 89 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java b/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java index de607374..4f142aeb 100644 --- a/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java +++ b/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java @@ -15,7 +15,7 @@ */ package com.google.pubsublite.kafka.source; -import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.CloudRegionOrZone; import com.google.cloud.pubsublite.ProjectPath; import com.google.cloud.pubsublite.SubscriptionName; import com.google.cloud.pubsublite.SubscriptionPath; @@ -35,7 +35,7 @@ public Poller newPoller(Map params) { .setProject( ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG).value()) .project()) - .setLocation(CloudZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString())) + .setLocation(CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString())) .setName( SubscriptionName.of( config.get(ConfigDefs.SUBSCRIPTION_NAME_FLAG).value().toString())) diff --git a/src/test/java/it/Base.java b/src/test/java/it/Base.java index db9be8be..fd74bc7d 100644 --- a/src/test/java/it/Base.java +++ b/src/test/java/it/Base.java @@ -243,7 +243,7 @@ protected static void createInstanceTemplate( .setKey("cps_source_connector_properties_name") .setValue(cpsSourceConnectorPropertiesGCSName) .build()) - .addItems(Items.newBuilder().setKey("psl_zone").setValue(location).build()) + .addItems(Items.newBuilder().setKey("psl_location").setValue(region).build()) .addItems( Items.newBuilder() .setKey("psl_sink_connector_properties_name") diff --git a/src/test/java/it/StandaloneIT.java b/src/test/java/it/StandaloneIT.java index 6de9723f..9769697d 100644 --- a/src/test/java/it/StandaloneIT.java +++ b/src/test/java/it/StandaloneIT.java @@ -126,38 +126,64 @@ public class StandaloneIT extends Base { .setName(ReservationName.of((pslReservationId))) .build(); - private static final String kafkaPslSinkTestTopic = "psl-sink-test-topic"; + private static final String kafkaPslSinkZonalTestTopic = "psl-sink-test-topic-zonal"; + // private static final String kafkaPslSinkRegionalTestTopic = "psl-sink-test-topic-regional"; private static final String pslSinkTopicId = "psl-sink-topic-" + runId; - private static final TopicPath pslSinkTopicPath = + private static final TopicPath pslSinkZonalTopicPath = TopicPath.newBuilder() .setProject(ProjectId.of(projectId)) .setLocation(CloudZone.of(CloudRegion.of(region), zone)) .setName(com.google.cloud.pubsublite.TopicName.of(pslSinkTopicId)) .build(); + private static final TopicPath pslSinkRegionalTopicPath = + TopicPath.newBuilder() + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .setName(com.google.cloud.pubsublite.TopicName.of(pslSinkTopicId)) + .build(); private static final String pslSinkSubscriptionId = "psl-sink-subscription-" + runId; - private static final SubscriptionPath pslSinkSubscriptionPath = + private static final SubscriptionPath pslSinkZonalSubscriptionPath = SubscriptionPath.newBuilder() .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSinkSubscriptionId)) .setProject(ProjectId.of(projectId)) .setLocation(CloudZone.of(CloudRegion.of(region), zone)) .build(); - - private static final String kafkaPslSourceTestTopic = "psl-source-test-topic"; + private static final SubscriptionPath pslSinkRegionalSubscriptionPath = + SubscriptionPath.newBuilder() + .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSinkSubscriptionId)) + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .build(); + +// private static final String kafkaPslSourceZonalTestTopic = "psl-source-test-topic-zonal"; + private static final String kafkaPslSourceRegionalTestTopic = "psl-source-test-topic-regional"; private static final String pslSourceTopicId = "psl-source-topic-" + runId; - private static final TopicPath pslSourceTopicPath = + private static final TopicPath pslSourceZonalTopicPath = TopicPath.newBuilder() .setProject(ProjectId.of(projectId)) .setLocation(CloudZone.of(CloudRegion.of(region), zone)) .setName(com.google.cloud.pubsublite.TopicName.of(pslSourceTopicId)) .build(); + private static final TopicPath pslSourceRegionalTopicPath = + TopicPath.newBuilder() + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .setName(com.google.cloud.pubsublite.TopicName.of(pslSourceTopicId)) + .build(); private static final String pslSourceSubscriptionId = "psl-source-subscription-" + runId; - private static final SubscriptionPath pslSourceSubscriptionPath = + private static final SubscriptionPath pslSourceZonalSubscriptionPath = SubscriptionPath.newBuilder() .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSourceSubscriptionId)) .setProject(ProjectId.of(projectId)) .setLocation(CloudZone.of(CloudRegion.of(region), zone)) .build(); - + private static final SubscriptionPath pslSourceRegionalSubscriptionPath = + SubscriptionPath.newBuilder() + .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSourceSubscriptionId)) + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .build(); + private static final String instanceName = "kafka-it-" + runId; private static final String instanceTemplateName = "kafka-it-template-" + runId; private static AtomicBoolean cpsMessageReceived = new AtomicBoolean(false); @@ -186,7 +212,8 @@ public static void setUp() throws Exception { log.atInfo().log("Packaged connector jar."); uploadGCSResources(); setupCpsResources(); - setupPslResources(); + setupPslZonalResources(); + // setupPslRegionalResources(); setupGceInstance(); } @@ -245,7 +272,18 @@ protected static void setupCpsResources() throws IOException { } } - protected static void setupPslResources() throws Exception { + protected static void setupPslZonalResources() throws Exception { + setupPslResources(pslSinkZonalTopicPath, pslSourceZonalTopicPath, pslSinkZonalSubscriptionPath, + pslSourceZonalSubscriptionPath); + } + + protected static void setupPslRegionalResources() throws Exception { + setupPslResources(pslSinkRegionalTopicPath, pslSourceRegionalTopicPath, pslSinkRegionalSubscriptionPath, + pslSourceRegionalSubscriptionPath); + } + + protected static void setupPslResources(TopicPath pslSinkTopicPath, TopicPath pslSourceTopicPath, + SubscriptionPath pslSinkSubscriptionPath, SubscriptionPath pslSourceSubscriptionPath) throws Exception { try (AdminClient pslAdminClient = AdminClient.create( AdminClientSettings.newBuilder().setRegion(CloudRegion.of(region)).build())) { @@ -384,25 +422,23 @@ public Void apply(Runnable runnable) { try (AdminClient pslAdminClient = AdminClient.create( AdminClientSettings.newBuilder().setRegion(CloudRegion.of(region)).build())) { - notFoundIgnoredClosureRunner.apply( - () -> { - pslAdminClient.deleteSubscription(pslSinkSubscriptionPath); - }); - notFoundIgnoredClosureRunner.apply( - () -> { - pslAdminClient.deleteSubscription(pslSourceSubscriptionPath); - }); - notFoundIgnoredClosureRunner.apply( - () -> { - pslAdminClient.deleteTopic(pslSinkTopicPath); - }); - notFoundIgnoredClosureRunner.apply( + final SubscriptionPath[] subscriptionPaths = {pslSinkZonalSubscriptionPath, pslSinkRegionalSubscriptionPath, pslSourceZonalSubscriptionPath, pslSourceRegionalSubscriptionPath}; + for (SubscriptionPath subscriptionPath : subscriptionPaths ) { + notFoundIgnoredClosureRunner.apply( + () -> { + pslAdminClient.deleteSubscription(subscriptionPath); + }); + } + final TopicPath[] topicPaths = {pslSinkZonalTopicPath, pslSinkRegionalTopicPath, pslSourceZonalTopicPath, pslSourceRegionalTopicPath}; + for (TopicPath topicPath : topicPaths) { + notFoundIgnoredClosureRunner.apply( () -> { - pslAdminClient.deleteTopic(pslSourceTopicPath); + pslAdminClient.deleteTopic(topicPath); }); + } + log.atInfo().log("Deleted PSL topics and subscriptions."); } - try (InstancesClient instancesClient = InstancesClient.create()) { instancesClient.deleteAsync(projectId, location, instanceName).get(3, MINUTES); } @@ -412,7 +448,7 @@ public Void apply(Runnable runnable) { instanceTemplatesClient.deleteAsync(projectId, instanceTemplateName).get(3, MINUTES); } log.atInfo().log("Deleted Compute Engine instance template."); - } +} @Test public void testCpsSinkConnector() throws Exception { @@ -555,7 +591,16 @@ public void onPartitionsAssigned(Collection collection) { } @Test - public void testPslSinkConnector() throws Exception { + public void testPslZonalSinkConnector() throws Exception { + testPslSinkConnector(pslSinkZonalTopicPath, pslSinkZonalSubscriptionPath, kafkaPslSinkZonalTestTopic); + } + + //@Test + //public void testPslRegionalSinkConnector() throws Exception { + // testPslSinkConnector(pslSinkRegionalTopicPath, pslSinkRegionalSubscriptionPath, kafkaPslSinkRegionalTestTopic); + //} + + public void testPslSinkConnector(TopicPath topicPath, SubscriptionPath pslSinkSubscriptionPath, String kafkaPslSinkTestTopic) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", kafkaInstanceIpAddress + ":" + KAFKA_PORT); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @@ -613,8 +658,17 @@ public void testPslSinkConnector() throws Exception { assertThat(this.pslMessageReceived.get()).isTrue(); } + //@Test(timeout = 5 * 60 * 1000L) + // public void testPslZonalSourceConnector() throws Exception { + // testPslSourceConnector(pslSinkZonalTopicPath, kafkaPslSourceZonalTestTopic); + // } + @Test(timeout = 5 * 60 * 1000L) - public void testPslSourceConnector() throws Exception { + public void testPslRegionalSourceConnector() throws Exception { + testPslSourceConnector(pslSinkRegionalTopicPath, kafkaPslSourceRegionalTestTopic); + } + + public void testPslSourceConnector(TopicPath pslSourceTopicPath, String kafkaPslSourceTestTopic) throws Exception { // Publish to CPS topic PublisherSettings publisherSettings = PublisherSettings.newBuilder() diff --git a/src/test/resources/kafka_vm_startup_script.sh b/src/test/resources/kafka_vm_startup_script.sh index b17fc8c8..89765cdb 100644 --- a/src/test/resources/kafka_vm_startup_script.sh +++ b/src/test/resources/kafka_vm_startup_script.sh @@ -38,11 +38,11 @@ ls -l $GCS_DIR/ # Prepare properties files for this run RUN_ID=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/run_id -H "Metadata-Flavor: Google") PROJECT_NAME=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/project_id -H "Metadata-Flavor: Google") -PSL_ZONE=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/psl_zone -H "Metadata-Flavor: Google") +PSL_LOCATION=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/psl_location -H "Metadata-Flavor: Google") sed -i "s//$RUN_ID/g" $GCS_DIR/*.properties sed -i "s//$PROJECT_NAME/g" $GCS_DIR/*.properties -sed -i "s//$PSL_ZONE/g" $GCS_DIR/*.properties +sed -i "s//$PSL_LOCATION/g" $GCS_DIR/*.properties # Install and run Kafka brokers KAFKA_VERSION=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/kafka_version -H "Metadata-Flavor: Google") diff --git a/src/test/resources/pubsub-lite-sink-connector-test.properties b/src/test/resources/pubsub-lite-sink-connector-test.properties index ab90cc7a..de19905c 100644 --- a/src/test/resources/pubsub-lite-sink-connector-test.properties +++ b/src/test/resources/pubsub-lite-sink-connector-test.properties @@ -17,7 +17,7 @@ connector.class=com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector tasks.max=2 topics=psl-sink-test-topic pubsublite.project= -pubsublite.location= +pubsublite.location= pubsublite.topic=psl-sink-topic- key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter diff --git a/src/test/resources/pubsub-lite-source-connector-test.properties b/src/test/resources/pubsub-lite-source-connector-test.properties index 1c7ae000..c0ee206c 100644 --- a/src/test/resources/pubsub-lite-source-connector-test.properties +++ b/src/test/resources/pubsub-lite-source-connector-test.properties @@ -16,7 +16,7 @@ name=PubSubLiteSourceConnector connector.class=com.google.pubsublite.kafka.source.PubSubLiteSourceConnector tasks.max=2 pubsublite.project= -pubsublite.location= +pubsublite.location= pubsublite.subscription=psl-source-subscription- kafka.topic=psl-source-test-topic key.converter=org.apache.kafka.connect.converters.ByteArrayConverter From d7095cfcbb93192fb0535edecef3ecaf86e4c3a7 Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Tue, 6 Aug 2024 13:56:37 -0400 Subject: [PATCH 2/3] add regional tests --- src/test/java/it/StandaloneIT.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/java/it/StandaloneIT.java b/src/test/java/it/StandaloneIT.java index 9769697d..3d505fec 100644 --- a/src/test/java/it/StandaloneIT.java +++ b/src/test/java/it/StandaloneIT.java @@ -127,7 +127,7 @@ public class StandaloneIT extends Base { .build(); private static final String kafkaPslSinkZonalTestTopic = "psl-sink-test-topic-zonal"; - // private static final String kafkaPslSinkRegionalTestTopic = "psl-sink-test-topic-regional"; + private static final String kafkaPslSinkRegionalTestTopic = "psl-sink-test-topic-regional"; private static final String pslSinkTopicId = "psl-sink-topic-" + runId; private static final TopicPath pslSinkZonalTopicPath = TopicPath.newBuilder() @@ -155,7 +155,7 @@ public class StandaloneIT extends Base { .setLocation(CloudRegion.of(region)) .build(); -// private static final String kafkaPslSourceZonalTestTopic = "psl-source-test-topic-zonal"; + private static final String kafkaPslSourceZonalTestTopic = "psl-source-test-topic-zonal"; private static final String kafkaPslSourceRegionalTestTopic = "psl-source-test-topic-regional"; private static final String pslSourceTopicId = "psl-source-topic-" + runId; private static final TopicPath pslSourceZonalTopicPath = @@ -213,7 +213,7 @@ public static void setUp() throws Exception { uploadGCSResources(); setupCpsResources(); setupPslZonalResources(); - // setupPslRegionalResources(); + setupPslRegionalResources(); setupGceInstance(); } @@ -658,10 +658,10 @@ public void testPslSinkConnector(TopicPath topicPath, SubscriptionPath pslSinkSu assertThat(this.pslMessageReceived.get()).isTrue(); } - //@Test(timeout = 5 * 60 * 1000L) - // public void testPslZonalSourceConnector() throws Exception { - // testPslSourceConnector(pslSinkZonalTopicPath, kafkaPslSourceZonalTestTopic); - // } + @Test(timeout = 5 * 60 * 1000L) + public void testPslZonalSourceConnector() throws Exception { + testPslSourceConnector(pslSinkZonalTopicPath, kafkaPslSourceZonalTestTopic); + } @Test(timeout = 5 * 60 * 1000L) public void testPslRegionalSourceConnector() throws Exception { From d688020ea4d99e7714d3e4ad220fc12896edc44f Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 6 Aug 2024 18:00:29 +0000 Subject: [PATCH 3/3] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .../kafka/source/PollerFactoryImpl.java | 3 +- src/test/java/it/StandaloneIT.java | 111 +++++++++++------- 2 files changed, 70 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java b/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java index 4f142aeb..04f51818 100644 --- a/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java +++ b/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java @@ -35,7 +35,8 @@ public Poller newPoller(Map params) { .setProject( ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG).value()) .project()) - .setLocation(CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString())) + .setLocation( + CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString())) .setName( SubscriptionName.of( config.get(ConfigDefs.SUBSCRIPTION_NAME_FLAG).value().toString())) diff --git a/src/test/java/it/StandaloneIT.java b/src/test/java/it/StandaloneIT.java index 3d505fec..b8b03d6c 100644 --- a/src/test/java/it/StandaloneIT.java +++ b/src/test/java/it/StandaloneIT.java @@ -136,11 +136,11 @@ public class StandaloneIT extends Base { .setName(com.google.cloud.pubsublite.TopicName.of(pslSinkTopicId)) .build(); private static final TopicPath pslSinkRegionalTopicPath = - TopicPath.newBuilder() - .setProject(ProjectId.of(projectId)) - .setLocation(CloudRegion.of(region)) - .setName(com.google.cloud.pubsublite.TopicName.of(pslSinkTopicId)) - .build(); + TopicPath.newBuilder() + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .setName(com.google.cloud.pubsublite.TopicName.of(pslSinkTopicId)) + .build(); private static final String pslSinkSubscriptionId = "psl-sink-subscription-" + runId; private static final SubscriptionPath pslSinkZonalSubscriptionPath = SubscriptionPath.newBuilder() @@ -149,12 +149,12 @@ public class StandaloneIT extends Base { .setLocation(CloudZone.of(CloudRegion.of(region), zone)) .build(); private static final SubscriptionPath pslSinkRegionalSubscriptionPath = - SubscriptionPath.newBuilder() - .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSinkSubscriptionId)) - .setProject(ProjectId.of(projectId)) - .setLocation(CloudRegion.of(region)) - .build(); - + SubscriptionPath.newBuilder() + .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSinkSubscriptionId)) + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .build(); + private static final String kafkaPslSourceZonalTestTopic = "psl-source-test-topic-zonal"; private static final String kafkaPslSourceRegionalTestTopic = "psl-source-test-topic-regional"; private static final String pslSourceTopicId = "psl-source-topic-" + runId; @@ -165,11 +165,11 @@ public class StandaloneIT extends Base { .setName(com.google.cloud.pubsublite.TopicName.of(pslSourceTopicId)) .build(); private static final TopicPath pslSourceRegionalTopicPath = - TopicPath.newBuilder() - .setProject(ProjectId.of(projectId)) - .setLocation(CloudRegion.of(region)) - .setName(com.google.cloud.pubsublite.TopicName.of(pslSourceTopicId)) - .build(); + TopicPath.newBuilder() + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .setName(com.google.cloud.pubsublite.TopicName.of(pslSourceTopicId)) + .build(); private static final String pslSourceSubscriptionId = "psl-source-subscription-" + runId; private static final SubscriptionPath pslSourceZonalSubscriptionPath = SubscriptionPath.newBuilder() @@ -178,12 +178,12 @@ public class StandaloneIT extends Base { .setLocation(CloudZone.of(CloudRegion.of(region), zone)) .build(); private static final SubscriptionPath pslSourceRegionalSubscriptionPath = - SubscriptionPath.newBuilder() - .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSourceSubscriptionId)) - .setProject(ProjectId.of(projectId)) - .setLocation(CloudRegion.of(region)) - .build(); - + SubscriptionPath.newBuilder() + .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSourceSubscriptionId)) + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .build(); + private static final String instanceName = "kafka-it-" + runId; private static final String instanceTemplateName = "kafka-it-template-" + runId; private static AtomicBoolean cpsMessageReceived = new AtomicBoolean(false); @@ -273,17 +273,27 @@ protected static void setupCpsResources() throws IOException { } protected static void setupPslZonalResources() throws Exception { - setupPslResources(pslSinkZonalTopicPath, pslSourceZonalTopicPath, pslSinkZonalSubscriptionPath, + setupPslResources( + pslSinkZonalTopicPath, + pslSourceZonalTopicPath, + pslSinkZonalSubscriptionPath, pslSourceZonalSubscriptionPath); } protected static void setupPslRegionalResources() throws Exception { - setupPslResources(pslSinkRegionalTopicPath, pslSourceRegionalTopicPath, pslSinkRegionalSubscriptionPath, + setupPslResources( + pslSinkRegionalTopicPath, + pslSourceRegionalTopicPath, + pslSinkRegionalSubscriptionPath, pslSourceRegionalSubscriptionPath); } - protected static void setupPslResources(TopicPath pslSinkTopicPath, TopicPath pslSourceTopicPath, - SubscriptionPath pslSinkSubscriptionPath, SubscriptionPath pslSourceSubscriptionPath) throws Exception { + protected static void setupPslResources( + TopicPath pslSinkTopicPath, + TopicPath pslSourceTopicPath, + SubscriptionPath pslSinkSubscriptionPath, + SubscriptionPath pslSourceSubscriptionPath) + throws Exception { try (AdminClient pslAdminClient = AdminClient.create( AdminClientSettings.newBuilder().setRegion(CloudRegion.of(region)).build())) { @@ -422,19 +432,29 @@ public Void apply(Runnable runnable) { try (AdminClient pslAdminClient = AdminClient.create( AdminClientSettings.newBuilder().setRegion(CloudRegion.of(region)).build())) { - final SubscriptionPath[] subscriptionPaths = {pslSinkZonalSubscriptionPath, pslSinkRegionalSubscriptionPath, pslSourceZonalSubscriptionPath, pslSourceRegionalSubscriptionPath}; - for (SubscriptionPath subscriptionPath : subscriptionPaths ) { + final SubscriptionPath[] subscriptionPaths = { + pslSinkZonalSubscriptionPath, + pslSinkRegionalSubscriptionPath, + pslSourceZonalSubscriptionPath, + pslSourceRegionalSubscriptionPath + }; + for (SubscriptionPath subscriptionPath : subscriptionPaths) { notFoundIgnoredClosureRunner.apply( () -> { pslAdminClient.deleteSubscription(subscriptionPath); }); - } - final TopicPath[] topicPaths = {pslSinkZonalTopicPath, pslSinkRegionalTopicPath, pslSourceZonalTopicPath, pslSourceRegionalTopicPath}; + } + final TopicPath[] topicPaths = { + pslSinkZonalTopicPath, + pslSinkRegionalTopicPath, + pslSourceZonalTopicPath, + pslSourceRegionalTopicPath + }; for (TopicPath topicPath : topicPaths) { notFoundIgnoredClosureRunner.apply( - () -> { - pslAdminClient.deleteTopic(topicPath); - }); + () -> { + pslAdminClient.deleteTopic(topicPath); + }); } log.atInfo().log("Deleted PSL topics and subscriptions."); @@ -448,7 +468,7 @@ public Void apply(Runnable runnable) { instanceTemplatesClient.deleteAsync(projectId, instanceTemplateName).get(3, MINUTES); } log.atInfo().log("Deleted Compute Engine instance template."); -} + } @Test public void testCpsSinkConnector() throws Exception { @@ -592,15 +612,19 @@ public void onPartitionsAssigned(Collection collection) { @Test public void testPslZonalSinkConnector() throws Exception { - testPslSinkConnector(pslSinkZonalTopicPath, pslSinkZonalSubscriptionPath, kafkaPslSinkZonalTestTopic); + testPslSinkConnector( + pslSinkZonalTopicPath, pslSinkZonalSubscriptionPath, kafkaPslSinkZonalTestTopic); } - //@Test - //public void testPslRegionalSinkConnector() throws Exception { - // testPslSinkConnector(pslSinkRegionalTopicPath, pslSinkRegionalSubscriptionPath, kafkaPslSinkRegionalTestTopic); - //} + // @Test + // public void testPslRegionalSinkConnector() throws Exception { + // testPslSinkConnector(pslSinkRegionalTopicPath, pslSinkRegionalSubscriptionPath, + // kafkaPslSinkRegionalTestTopic); + // } - public void testPslSinkConnector(TopicPath topicPath, SubscriptionPath pslSinkSubscriptionPath, String kafkaPslSinkTestTopic) throws Exception { + public void testPslSinkConnector( + TopicPath topicPath, SubscriptionPath pslSinkSubscriptionPath, String kafkaPslSinkTestTopic) + throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", kafkaInstanceIpAddress + ":" + KAFKA_PORT); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @@ -662,13 +686,14 @@ public void testPslSinkConnector(TopicPath topicPath, SubscriptionPath pslSinkSu public void testPslZonalSourceConnector() throws Exception { testPslSourceConnector(pslSinkZonalTopicPath, kafkaPslSourceZonalTestTopic); } - + @Test(timeout = 5 * 60 * 1000L) public void testPslRegionalSourceConnector() throws Exception { testPslSourceConnector(pslSinkRegionalTopicPath, kafkaPslSourceRegionalTestTopic); } - - public void testPslSourceConnector(TopicPath pslSourceTopicPath, String kafkaPslSourceTestTopic) throws Exception { + + public void testPslSourceConnector(TopicPath pslSourceTopicPath, String kafkaPslSourceTestTopic) + throws Exception { // Publish to CPS topic PublisherSettings publisherSettings = PublisherSettings.newBuilder()