Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,8 @@
*/
package com.google.pubsub.kafka.source;

import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
import com.google.pubsub.kafka.common.ConnectorUtils;
import com.google.pubsub.kafka.common.Version;
import com.google.pubsub.v1.GetSubscriptionRequest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -33,7 +27,6 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -135,15 +128,6 @@ public String version() {

@Override
public void start(Map<String, String> props) {
// Do a validation of configs here too so that we do not pass null objects to
// verifySubscription().
Map<String, Object> validated = config().parse(props);
String cpsProject = validated.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
String cpsSubscription = validated.get(CPS_SUBSCRIPTION_CONFIG).toString();
ConnectorCredentialsProvider credentialsProvider =
ConnectorCredentialsProvider.fromConfig(validated);

verifySubscription(cpsProject, cpsSubscription, credentialsProvider);
this.props = props;
log.info("Started the CloudPubSubSourceConnector");
}
Expand Down Expand Up @@ -286,36 +270,6 @@ public ConfigDef config() {
"The Pub/Sub endpoint to use.");
}

/**
* Check whether the user provided Cloud Pub/Sub subscription name specified by {@link
* #CPS_SUBSCRIPTION_CONFIG} exists or not.
*/
@VisibleForTesting
public void verifySubscription(
String cpsProject, String cpsSubscription, CredentialsProvider credentialsProvider) {
try {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20) // 20MB
.build())
.setCredentialsProvider(credentialsProvider)
.build();
GrpcSubscriberStub stub = GrpcSubscriberStub.create(subscriberStubSettings);
GetSubscriptionRequest request =
GetSubscriptionRequest.newBuilder()
.setSubscription(
String.format(
ConnectorUtils.CPS_SUBSCRIPTION_FORMAT, cpsProject, cpsSubscription))
.build();
stub.getSubscriptionCallable().call(request);
} catch (Exception e) {
throw new ConnectException(
"Error verifying the subscription " + cpsSubscription + " for project " + cpsProject, e);
}
}

@Override
public void stop() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,12 @@
package com.google.pubsub.kafka.source;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;

import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
import com.google.pubsub.kafka.common.ConnectorUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -52,24 +45,8 @@ public void setup() {
props.put(CloudPubSubSourceConnector.KAFKA_TOPIC_CONFIG, KAFKA_TOPIC);
}

@Test(expected = ConnectException.class)
public void testStartWhenSubscriptionNonexistant() {
doThrow(new ConnectException(""))
.when(connector)
.verifySubscription(anyString(), anyString(), any(ConnectorCredentialsProvider.class));
connector.start(props);
}

@Test(expected = ConfigException.class)
public void testStartWhenRequiredConfigMissing() {
connector.start(new HashMap<String, String>());
}

@Test
public void testTaskConfigs() {
doNothing()
.when(connector)
.verifySubscription(anyString(), anyString(), any(ConnectorCredentialsProvider.class));
connector.start(props);
List<Map<String, String>> taskConfigs = connector.taskConfigs(NUM_TASKS);
assertEquals(taskConfigs.size(), NUM_TASKS);
Expand Down
Loading