diff --git a/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java b/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java index de607374..04f51818 100644 --- a/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java +++ b/src/main/java/com/google/pubsublite/kafka/source/PollerFactoryImpl.java @@ -15,7 +15,7 @@ */ package com.google.pubsublite.kafka.source; -import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.CloudRegionOrZone; import com.google.cloud.pubsublite.ProjectPath; import com.google.cloud.pubsublite.SubscriptionName; import com.google.cloud.pubsublite.SubscriptionPath; @@ -35,7 +35,8 @@ public Poller newPoller(Map params) { .setProject( ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG).value()) .project()) - .setLocation(CloudZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString())) + .setLocation( + CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString())) .setName( SubscriptionName.of( config.get(ConfigDefs.SUBSCRIPTION_NAME_FLAG).value().toString())) diff --git a/src/test/java/it/Base.java b/src/test/java/it/Base.java index db9be8be..fd74bc7d 100644 --- a/src/test/java/it/Base.java +++ b/src/test/java/it/Base.java @@ -243,7 +243,7 @@ protected static void createInstanceTemplate( .setKey("cps_source_connector_properties_name") .setValue(cpsSourceConnectorPropertiesGCSName) .build()) - .addItems(Items.newBuilder().setKey("psl_zone").setValue(location).build()) + .addItems(Items.newBuilder().setKey("psl_location").setValue(region).build()) .addItems( Items.newBuilder() .setKey("psl_sink_connector_properties_name") diff --git a/src/test/java/it/StandaloneIT.java b/src/test/java/it/StandaloneIT.java index 6de9723f..b8b03d6c 100644 --- a/src/test/java/it/StandaloneIT.java +++ b/src/test/java/it/StandaloneIT.java @@ -126,37 +126,63 @@ public class StandaloneIT extends Base { .setName(ReservationName.of((pslReservationId))) .build(); - private static final String kafkaPslSinkTestTopic = "psl-sink-test-topic"; + private static final String kafkaPslSinkZonalTestTopic = "psl-sink-test-topic-zonal"; + private static final String kafkaPslSinkRegionalTestTopic = "psl-sink-test-topic-regional"; private static final String pslSinkTopicId = "psl-sink-topic-" + runId; - private static final TopicPath pslSinkTopicPath = + private static final TopicPath pslSinkZonalTopicPath = TopicPath.newBuilder() .setProject(ProjectId.of(projectId)) .setLocation(CloudZone.of(CloudRegion.of(region), zone)) .setName(com.google.cloud.pubsublite.TopicName.of(pslSinkTopicId)) .build(); + private static final TopicPath pslSinkRegionalTopicPath = + TopicPath.newBuilder() + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .setName(com.google.cloud.pubsublite.TopicName.of(pslSinkTopicId)) + .build(); private static final String pslSinkSubscriptionId = "psl-sink-subscription-" + runId; - private static final SubscriptionPath pslSinkSubscriptionPath = + private static final SubscriptionPath pslSinkZonalSubscriptionPath = SubscriptionPath.newBuilder() .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSinkSubscriptionId)) .setProject(ProjectId.of(projectId)) .setLocation(CloudZone.of(CloudRegion.of(region), zone)) .build(); + private static final SubscriptionPath pslSinkRegionalSubscriptionPath = + SubscriptionPath.newBuilder() + .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSinkSubscriptionId)) + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .build(); - private static final String kafkaPslSourceTestTopic = "psl-source-test-topic"; + private static final String kafkaPslSourceZonalTestTopic = "psl-source-test-topic-zonal"; + private static final String kafkaPslSourceRegionalTestTopic = "psl-source-test-topic-regional"; private static final String pslSourceTopicId = "psl-source-topic-" + runId; - private static final TopicPath pslSourceTopicPath = + private static final TopicPath pslSourceZonalTopicPath = TopicPath.newBuilder() .setProject(ProjectId.of(projectId)) .setLocation(CloudZone.of(CloudRegion.of(region), zone)) .setName(com.google.cloud.pubsublite.TopicName.of(pslSourceTopicId)) .build(); + private static final TopicPath pslSourceRegionalTopicPath = + TopicPath.newBuilder() + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .setName(com.google.cloud.pubsublite.TopicName.of(pslSourceTopicId)) + .build(); private static final String pslSourceSubscriptionId = "psl-source-subscription-" + runId; - private static final SubscriptionPath pslSourceSubscriptionPath = + private static final SubscriptionPath pslSourceZonalSubscriptionPath = SubscriptionPath.newBuilder() .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSourceSubscriptionId)) .setProject(ProjectId.of(projectId)) .setLocation(CloudZone.of(CloudRegion.of(region), zone)) .build(); + private static final SubscriptionPath pslSourceRegionalSubscriptionPath = + SubscriptionPath.newBuilder() + .setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSourceSubscriptionId)) + .setProject(ProjectId.of(projectId)) + .setLocation(CloudRegion.of(region)) + .build(); private static final String instanceName = "kafka-it-" + runId; private static final String instanceTemplateName = "kafka-it-template-" + runId; @@ -186,7 +212,8 @@ public static void setUp() throws Exception { log.atInfo().log("Packaged connector jar."); uploadGCSResources(); setupCpsResources(); - setupPslResources(); + setupPslZonalResources(); + setupPslRegionalResources(); setupGceInstance(); } @@ -245,7 +272,28 @@ protected static void setupCpsResources() throws IOException { } } - protected static void setupPslResources() throws Exception { + protected static void setupPslZonalResources() throws Exception { + setupPslResources( + pslSinkZonalTopicPath, + pslSourceZonalTopicPath, + pslSinkZonalSubscriptionPath, + pslSourceZonalSubscriptionPath); + } + + protected static void setupPslRegionalResources() throws Exception { + setupPslResources( + pslSinkRegionalTopicPath, + pslSourceRegionalTopicPath, + pslSinkRegionalSubscriptionPath, + pslSourceRegionalSubscriptionPath); + } + + protected static void setupPslResources( + TopicPath pslSinkTopicPath, + TopicPath pslSourceTopicPath, + SubscriptionPath pslSinkSubscriptionPath, + SubscriptionPath pslSourceSubscriptionPath) + throws Exception { try (AdminClient pslAdminClient = AdminClient.create( AdminClientSettings.newBuilder().setRegion(CloudRegion.of(region)).build())) { @@ -384,25 +432,33 @@ public Void apply(Runnable runnable) { try (AdminClient pslAdminClient = AdminClient.create( AdminClientSettings.newBuilder().setRegion(CloudRegion.of(region)).build())) { - notFoundIgnoredClosureRunner.apply( - () -> { - pslAdminClient.deleteSubscription(pslSinkSubscriptionPath); - }); - notFoundIgnoredClosureRunner.apply( - () -> { - pslAdminClient.deleteSubscription(pslSourceSubscriptionPath); - }); - notFoundIgnoredClosureRunner.apply( - () -> { - pslAdminClient.deleteTopic(pslSinkTopicPath); - }); - notFoundIgnoredClosureRunner.apply( - () -> { - pslAdminClient.deleteTopic(pslSourceTopicPath); - }); + final SubscriptionPath[] subscriptionPaths = { + pslSinkZonalSubscriptionPath, + pslSinkRegionalSubscriptionPath, + pslSourceZonalSubscriptionPath, + pslSourceRegionalSubscriptionPath + }; + for (SubscriptionPath subscriptionPath : subscriptionPaths) { + notFoundIgnoredClosureRunner.apply( + () -> { + pslAdminClient.deleteSubscription(subscriptionPath); + }); + } + final TopicPath[] topicPaths = { + pslSinkZonalTopicPath, + pslSinkRegionalTopicPath, + pslSourceZonalTopicPath, + pslSourceRegionalTopicPath + }; + for (TopicPath topicPath : topicPaths) { + notFoundIgnoredClosureRunner.apply( + () -> { + pslAdminClient.deleteTopic(topicPath); + }); + } + log.atInfo().log("Deleted PSL topics and subscriptions."); } - try (InstancesClient instancesClient = InstancesClient.create()) { instancesClient.deleteAsync(projectId, location, instanceName).get(3, MINUTES); } @@ -555,7 +611,20 @@ public void onPartitionsAssigned(Collection collection) { } @Test - public void testPslSinkConnector() throws Exception { + public void testPslZonalSinkConnector() throws Exception { + testPslSinkConnector( + pslSinkZonalTopicPath, pslSinkZonalSubscriptionPath, kafkaPslSinkZonalTestTopic); + } + + // @Test + // public void testPslRegionalSinkConnector() throws Exception { + // testPslSinkConnector(pslSinkRegionalTopicPath, pslSinkRegionalSubscriptionPath, + // kafkaPslSinkRegionalTestTopic); + // } + + public void testPslSinkConnector( + TopicPath topicPath, SubscriptionPath pslSinkSubscriptionPath, String kafkaPslSinkTestTopic) + throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", kafkaInstanceIpAddress + ":" + KAFKA_PORT); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @@ -614,7 +683,17 @@ public void testPslSinkConnector() throws Exception { } @Test(timeout = 5 * 60 * 1000L) - public void testPslSourceConnector() throws Exception { + public void testPslZonalSourceConnector() throws Exception { + testPslSourceConnector(pslSinkZonalTopicPath, kafkaPslSourceZonalTestTopic); + } + + @Test(timeout = 5 * 60 * 1000L) + public void testPslRegionalSourceConnector() throws Exception { + testPslSourceConnector(pslSinkRegionalTopicPath, kafkaPslSourceRegionalTestTopic); + } + + public void testPslSourceConnector(TopicPath pslSourceTopicPath, String kafkaPslSourceTestTopic) + throws Exception { // Publish to CPS topic PublisherSettings publisherSettings = PublisherSettings.newBuilder() diff --git a/src/test/resources/kafka_vm_startup_script.sh b/src/test/resources/kafka_vm_startup_script.sh index b17fc8c8..89765cdb 100644 --- a/src/test/resources/kafka_vm_startup_script.sh +++ b/src/test/resources/kafka_vm_startup_script.sh @@ -38,11 +38,11 @@ ls -l $GCS_DIR/ # Prepare properties files for this run RUN_ID=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/run_id -H "Metadata-Flavor: Google") PROJECT_NAME=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/project_id -H "Metadata-Flavor: Google") -PSL_ZONE=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/psl_zone -H "Metadata-Flavor: Google") +PSL_LOCATION=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/psl_location -H "Metadata-Flavor: Google") sed -i "s//$RUN_ID/g" $GCS_DIR/*.properties sed -i "s//$PROJECT_NAME/g" $GCS_DIR/*.properties -sed -i "s//$PSL_ZONE/g" $GCS_DIR/*.properties +sed -i "s//$PSL_LOCATION/g" $GCS_DIR/*.properties # Install and run Kafka brokers KAFKA_VERSION=$(curl http://metadata.google.internal/computeMetadata/v1/instance/attributes/kafka_version -H "Metadata-Flavor: Google") diff --git a/src/test/resources/pubsub-lite-sink-connector-test.properties b/src/test/resources/pubsub-lite-sink-connector-test.properties index ab90cc7a..de19905c 100644 --- a/src/test/resources/pubsub-lite-sink-connector-test.properties +++ b/src/test/resources/pubsub-lite-sink-connector-test.properties @@ -17,7 +17,7 @@ connector.class=com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector tasks.max=2 topics=psl-sink-test-topic pubsublite.project= -pubsublite.location= +pubsublite.location= pubsublite.topic=psl-sink-topic- key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter diff --git a/src/test/resources/pubsub-lite-source-connector-test.properties b/src/test/resources/pubsub-lite-source-connector-test.properties index 1c7ae000..c0ee206c 100644 --- a/src/test/resources/pubsub-lite-source-connector-test.properties +++ b/src/test/resources/pubsub-lite-source-connector-test.properties @@ -16,7 +16,7 @@ name=PubSubLiteSourceConnector connector.class=com.google.pubsublite.kafka.source.PubSubLiteSourceConnector tasks.max=2 pubsublite.project= -pubsublite.location= +pubsublite.location= pubsublite.subscription=psl-source-subscription- kafka.topic=psl-source-test-topic key.converter=org.apache.kafka.connect.converters.ByteArrayConverter