diff --git a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java index e77831f7..c19722be 100644 --- a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java +++ b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java @@ -15,14 +15,8 @@ */ package com.google.pubsub.kafka.source; -import com.google.api.gax.core.CredentialsProvider; -import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; -import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; -import com.google.common.annotations.VisibleForTesting; -import com.google.pubsub.kafka.common.ConnectorCredentialsProvider; import com.google.pubsub.kafka.common.ConnectorUtils; import com.google.pubsub.kafka.common.Version; -import com.google.pubsub.v1.GetSubscriptionRequest; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -33,7 +27,6 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.Task; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,15 +128,6 @@ public String version() { @Override public void start(Map props) { - // Do a validation of configs here too so that we do not pass null objects to - // verifySubscription(). - Map validated = config().parse(props); - String cpsProject = validated.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString(); - String cpsSubscription = validated.get(CPS_SUBSCRIPTION_CONFIG).toString(); - ConnectorCredentialsProvider credentialsProvider = - ConnectorCredentialsProvider.fromConfig(validated); - - verifySubscription(cpsProject, cpsSubscription, credentialsProvider); this.props = props; log.info("Started the CloudPubSubSourceConnector"); } @@ -286,36 +270,6 @@ public ConfigDef config() { "The Pub/Sub endpoint to use."); } - /** - * Check whether the user provided Cloud Pub/Sub subscription name specified by {@link - * #CPS_SUBSCRIPTION_CONFIG} exists or not. - */ - @VisibleForTesting - public void verifySubscription( - String cpsProject, String cpsSubscription, CredentialsProvider credentialsProvider) { - try { - SubscriberStubSettings subscriberStubSettings = - SubscriberStubSettings.newBuilder() - .setTransportChannelProvider( - SubscriberStubSettings.defaultGrpcTransportProviderBuilder() - .setMaxInboundMessageSize(20 << 20) // 20MB - .build()) - .setCredentialsProvider(credentialsProvider) - .build(); - GrpcSubscriberStub stub = GrpcSubscriberStub.create(subscriberStubSettings); - GetSubscriptionRequest request = - GetSubscriptionRequest.newBuilder() - .setSubscription( - String.format( - ConnectorUtils.CPS_SUBSCRIPTION_FORMAT, cpsProject, cpsSubscription)) - .build(); - stub.getSubscriptionCallable().call(request); - } catch (Exception e) { - throw new ConnectException( - "Error verifying the subscription " + cpsSubscription + " for project " + cpsProject, e); - } - } - @Override public void stop() {} } diff --git a/src/test/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnectorTest.java b/src/test/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnectorTest.java index 6d31a018..436dc010 100644 --- a/src/test/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnectorTest.java +++ b/src/test/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnectorTest.java @@ -16,19 +16,12 @@ package com.google.pubsub.kafka.source; import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; -import com.google.pubsub.kafka.common.ConnectorCredentialsProvider; import com.google.pubsub.kafka.common.ConnectorUtils; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.connect.errors.ConnectException; import org.junit.Before; import org.junit.Test; @@ -52,24 +45,8 @@ public void setup() { props.put(CloudPubSubSourceConnector.KAFKA_TOPIC_CONFIG, KAFKA_TOPIC); } - @Test(expected = ConnectException.class) - public void testStartWhenSubscriptionNonexistant() { - doThrow(new ConnectException("")) - .when(connector) - .verifySubscription(anyString(), anyString(), any(ConnectorCredentialsProvider.class)); - connector.start(props); - } - - @Test(expected = ConfigException.class) - public void testStartWhenRequiredConfigMissing() { - connector.start(new HashMap()); - } - @Test public void testTaskConfigs() { - doNothing() - .when(connector) - .verifySubscription(anyString(), anyString(), any(ConnectorCredentialsProvider.class)); connector.start(props); List> taskConfigs = connector.taskConfigs(NUM_TASKS); assertEquals(taskConfigs.size(), NUM_TASKS);