Skip to content

Commit 7290786

Browse files
feat: add pubsublite sink support for credentials settings (googleapis#251)
psl source support requires more work also unify credentialsProvider creation
1 parent c9c3beb commit 7290786

File tree

8 files changed

+113
-92
lines changed

8 files changed

+113
-92
lines changed

src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,58 @@
2323
import java.io.IOException;
2424
import java.util.Arrays;
2525
import java.util.List;
26+
import java.util.Map;
2627

2728
public class ConnectorCredentialsProvider implements CredentialsProvider {
29+
private static final List<String> GCP_SCOPE =
30+
Arrays.asList("https://www.googleapis.com/auth/cloud-platform");
2831

29-
private static final List<String> CPS_SCOPE =
30-
Arrays.asList("https://www.googleapis.com/auth/pubsub");
32+
CredentialsProvider impl;
3133

32-
GoogleCredentials credentials;
34+
private ConnectorCredentialsProvider(CredentialsProvider impl) {
35+
this.impl = impl;
36+
}
37+
38+
public static ConnectorCredentialsProvider fromConfig(Map<String, Object> config) {
39+
String credentialsPath = config.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG).toString();
40+
String credentialsJson = config.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG).toString();
41+
if (!credentialsPath.isEmpty()) {
42+
if (!credentialsJson.isEmpty()) {
43+
throw new IllegalArgumentException(
44+
"May not set both "
45+
+ ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG
46+
+ " and "
47+
+ ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
48+
}
49+
return ConnectorCredentialsProvider.fromFile(credentialsPath);
50+
} else if (!credentialsJson.isEmpty()) {
51+
return ConnectorCredentialsProvider.fromJson(credentialsJson);
52+
} else {
53+
return ConnectorCredentialsProvider.fromDefault();
54+
}
55+
}
3356

34-
public void loadFromFile(String credentialPath) throws IOException {
35-
this.credentials = GoogleCredentials.fromStream(new FileInputStream(credentialPath));
57+
public static ConnectorCredentialsProvider fromFile(String credentialPath) {
58+
return new ConnectorCredentialsProvider(
59+
() ->
60+
GoogleCredentials.fromStream(new FileInputStream(credentialPath))
61+
.createScoped(GCP_SCOPE));
3662
}
3763

38-
public void loadJson(String credentialsJson) throws IOException {
39-
ByteArrayInputStream bs = new ByteArrayInputStream(credentialsJson.getBytes());
40-
this.credentials = credentials = GoogleCredentials.fromStream(bs);
64+
public static ConnectorCredentialsProvider fromJson(String credentialsJson) {
65+
return new ConnectorCredentialsProvider(
66+
() ->
67+
GoogleCredentials.fromStream(new ByteArrayInputStream(credentialsJson.getBytes()))
68+
.createScoped(GCP_SCOPE));
69+
}
70+
71+
public static ConnectorCredentialsProvider fromDefault() {
72+
return new ConnectorCredentialsProvider(
73+
() -> GoogleCredentials.getApplicationDefault().createScoped(GCP_SCOPE));
4174
}
4275

4376
@Override
4477
public Credentials getCredentials() throws IOException {
45-
if (this.credentials == null) {
46-
return GoogleCredentials.getApplicationDefault().createScoped(this.CPS_SCOPE);
47-
} else {
48-
return this.credentials.createScoped(this.CPS_SCOPE);
49-
}
78+
return impl.getCredentials();
5079
}
5180
}

src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,13 +233,13 @@ public ConfigDef config() {
233233
.define(
234234
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
235235
Type.STRING,
236-
null,
236+
"",
237237
Importance.HIGH,
238238
"The path to the GCP credentials file")
239239
.define(
240240
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
241241
Type.STRING,
242-
null,
242+
"",
243243
Importance.HIGH,
244244
"GCP JSON credentials")
245245
.define(

src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.OrderingKeySource;
3434
import com.google.pubsub.v1.ProjectTopicName;
3535
import com.google.pubsub.v1.PubsubMessage;
36-
import java.io.IOException;
3736
import java.nio.ByteBuffer;
3837
import java.util.ArrayList;
3938
import java.util.Collection;
@@ -136,24 +135,7 @@ public void start(Map<String, String> props) {
136135
orderingKeySource =
137136
OrderingKeySource.getEnum(
138137
(String) validatedProps.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE));
139-
gcpCredentialsProvider = new ConnectorCredentialsProvider();
140-
String credentialsPath =
141-
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
142-
String credentialsJson =
143-
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
144-
if (credentialsPath != null) {
145-
try {
146-
gcpCredentialsProvider.loadFromFile(credentialsPath);
147-
} catch (IOException e) {
148-
throw new RuntimeException(e);
149-
}
150-
} else if (credentialsJson != null) {
151-
try {
152-
gcpCredentialsProvider.loadJson(credentialsJson);
153-
} catch (IOException e) {
154-
throw new RuntimeException(e);
155-
}
156-
}
138+
gcpCredentialsProvider = ConnectorCredentialsProvider.fromConfig(validatedProps);
157139
if (publisher == null) {
158140
// Only do this if we did not use the constructor.
159141
createPublisher();

src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
2323
import com.google.pubsub.kafka.common.ConnectorUtils;
2424
import com.google.pubsub.v1.GetSubscriptionRequest;
25-
import java.io.IOException;
2625
import java.util.ArrayList;
2726
import java.util.Arrays;
2827
import java.util.HashMap;
@@ -138,25 +137,11 @@ public String version() {
138137
public void start(Map<String, String> props) {
139138
// Do a validation of configs here too so that we do not pass null objects to
140139
// verifySubscription().
141-
config().parse(props);
142-
String cpsProject = props.get(ConnectorUtils.CPS_PROJECT_CONFIG);
143-
String cpsSubscription = props.get(CPS_SUBSCRIPTION_CONFIG);
144-
String credentialsPath = props.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
145-
String credentialsJson = props.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
146-
ConnectorCredentialsProvider credentialsProvider = new ConnectorCredentialsProvider();
147-
if (credentialsPath != null) {
148-
try {
149-
credentialsProvider.loadFromFile(credentialsPath);
150-
} catch (IOException e) {
151-
throw new RuntimeException(e);
152-
}
153-
} else if (credentialsJson != null) {
154-
try {
155-
credentialsProvider.loadJson(credentialsJson);
156-
} catch (IOException e) {
157-
throw new RuntimeException(e);
158-
}
159-
}
140+
Map<String, Object> validated = config().parse(props);
141+
String cpsProject = validated.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
142+
String cpsSubscription = validated.get(CPS_SUBSCRIPTION_CONFIG).toString();
143+
ConnectorCredentialsProvider credentialsProvider =
144+
ConnectorCredentialsProvider.fromConfig(validated);
160145

161146
verifySubscription(cpsProject, cpsSubscription, credentialsProvider);
162147
this.props = props;
@@ -271,13 +256,13 @@ public ConfigDef config() {
271256
.define(
272257
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
273258
Type.STRING,
274-
null,
259+
"",
275260
Importance.HIGH,
276261
"The path to the GCP credentials file")
277262
.define(
278263
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
279264
Type.STRING,
280-
null,
265+
"",
281266
Importance.HIGH,
282267
"GCP JSON credentials")
283268
.define(

src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.google.pubsub.v1.ProjectSubscriptionName;
3535
import com.google.pubsub.v1.PubsubMessage;
3636
import com.google.pubsub.v1.ReceivedMessage;
37-
import java.io.IOException;
3837
import java.util.ArrayList;
3938
import java.util.Collections;
4039
import java.util.HashSet;
@@ -114,11 +113,6 @@ public void start(Map<String, String> props) {
114113
useKafkaHeaders = (Boolean) validatedProps.get(CloudPubSubSourceConnector.USE_KAFKA_HEADERS);
115114
makeOrderingKeyAttribute =
116115
(Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_MAKE_ORDERING_KEY_ATTRIBUTE);
117-
ConnectorCredentialsProvider gcpCredentialsProvider = new ConnectorCredentialsProvider();
118-
String gcpCredentialsFilePath =
119-
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
120-
String credentialsJson =
121-
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
122116
boolean useStreamingPull =
123117
(Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_ENABLED);
124118
long streamingPullBytes =
@@ -136,19 +130,9 @@ public void start(Map<String, String> props) {
136130
(Long)
137131
validatedProps.get(
138132
CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION);
139-
if (gcpCredentialsFilePath != null) {
140-
try {
141-
gcpCredentialsProvider.loadFromFile(gcpCredentialsFilePath);
142-
} catch (IOException e) {
143-
throw new RuntimeException(e);
144-
}
145-
} else if (credentialsJson != null) {
146-
try {
147-
gcpCredentialsProvider.loadJson(credentialsJson);
148-
} catch (IOException e) {
149-
throw new RuntimeException(e);
150-
}
151-
}
133+
ConnectorCredentialsProvider gcpCredentialsProvider =
134+
ConnectorCredentialsProvider.fromConfig(validatedProps);
135+
152136
// Only do this if we did not set it through the constructor.
153137
if (subscriber == null) {
154138
if (useStreamingPull) {

src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.pubsublite.kafka.sink;
1717

18+
import com.google.pubsub.kafka.common.ConnectorUtils;
1819
import org.apache.kafka.common.config.ConfigDef;
1920
import org.apache.kafka.common.config.ConfigDef.Importance;
2021

@@ -49,6 +50,18 @@ static ConfigDef config() {
4950
ConfigDef.Type.STRING,
5051
OrderingMode.DEFAULT.name(),
5152
Importance.HIGH,
52-
"The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic.");
53+
"The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic.")
54+
.define(
55+
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
56+
ConfigDef.Type.STRING,
57+
"",
58+
Importance.HIGH,
59+
"The path to the GCP credentials file")
60+
.define(
61+
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
62+
ConfigDef.Type.STRING,
63+
"",
64+
Importance.HIGH,
65+
"GCP JSON credentials");
5366
}
5467
}

src/main/java/com/google/pubsublite/kafka/sink/PublisherFactoryImpl.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,21 @@
3737
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
3838
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
3939
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
40+
import com.google.cloud.pubsublite.v1.AdminServiceClient;
41+
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
4042
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
4143
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
44+
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
45+
import java.io.IOException;
4246
import java.util.Map;
4347
import java.util.Optional;
44-
import org.apache.kafka.common.config.ConfigValue;
4548

4649
class PublisherFactoryImpl implements PublisherFactory {
4750

4851
private static final Framework FRAMEWORK = Framework.of("KAFKA_CONNECT");
4952

50-
private PartitionPublisherFactory getPartitionPublisherFactory(TopicPath topic) {
53+
private PartitionPublisherFactory getPartitionPublisherFactory(
54+
TopicPath topic, ConnectorCredentialsProvider credentialsProvider) {
5155

5256
return new PartitionPublisherFactory() {
5357
private Optional<PublisherServiceClient> publisherServiceClient = Optional.empty();
@@ -61,9 +65,7 @@ private synchronized PublisherServiceClient getServiceClient() throws ApiExcepti
6165
addDefaultSettings(
6266
topic.location().extractRegion(),
6367
PublisherServiceSettings.newBuilder()
64-
.setCredentialsProvider(
65-
PublisherServiceSettings.defaultCredentialsProviderBuilder()
66-
.build()))));
68+
.setCredentialsProvider(credentialsProvider))));
6769
return publisherServiceClient.get();
6870
} catch (Throwable t) {
6971
throw toCanonical(t).underlying;
@@ -95,25 +97,38 @@ public void close() {}
9597

9698
@Override
9799
public Publisher<MessageMetadata> newPublisher(Map<String, String> params) {
98-
Map<String, ConfigValue> config = ConfigDefs.config().validateAll(params);
100+
Map<String, Object> config = ConfigDefs.config().parse(params);
101+
ConnectorCredentialsProvider credentialsProvider =
102+
ConnectorCredentialsProvider.fromConfig(config);
99103
CloudRegionOrZone location =
100-
CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString());
104+
CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).toString());
101105
PartitionCountWatchingPublisherSettings.Builder builder =
102106
PartitionCountWatchingPublisherSettings.newBuilder();
103107
TopicPath topic =
104108
TopicPath.newBuilder()
105109
.setProject(
106-
ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG).value())
107-
.project())
110+
ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG)).project())
108111
.setLocation(location)
109-
.setName(TopicName.of(config.get(ConfigDefs.TOPIC_NAME_FLAG).value().toString()))
112+
.setName(TopicName.of(config.get(ConfigDefs.TOPIC_NAME_FLAG).toString()))
110113
.build();
111114
builder.setTopic(topic);
112-
builder.setPublisherFactory(getPartitionPublisherFactory(topic));
113-
builder.setAdminClient(
114-
AdminClient.create(
115-
AdminClientSettings.newBuilder().setRegion(location.extractRegion()).build()));
116-
if (OrderingMode.valueOf(config.get(ConfigDefs.ORDERING_MODE_FLAG).value().toString())
115+
builder.setPublisherFactory(getPartitionPublisherFactory(topic, credentialsProvider));
116+
try {
117+
builder.setAdminClient(
118+
AdminClient.create(
119+
AdminClientSettings.newBuilder()
120+
.setRegion(location.extractRegion())
121+
.setServiceClient(
122+
AdminServiceClient.create(
123+
addDefaultSettings(
124+
location.extractRegion(),
125+
AdminServiceSettings.newBuilder()
126+
.setCredentialsProvider(credentialsProvider))))
127+
.build()));
128+
} catch (IOException e) {
129+
throw new IllegalStateException(e);
130+
}
131+
if (OrderingMode.valueOf(config.get(ConfigDefs.ORDERING_MODE_FLAG).toString())
117132
== OrderingMode.KAFKA) {
118133
builder.setRoutingPolicyFactory(KafkaPartitionRoutingPolicy::new);
119134
}

src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.pubsublite.kafka.source;
1717

18+
import com.google.pubsub.kafka.common.ConnectorUtils;
1819
import org.apache.kafka.common.config.ConfigDef;
1920
import org.apache.kafka.common.config.ConfigDef.Importance;
2021

@@ -63,6 +64,18 @@ static ConfigDef config() {
6364
ConfigDef.Type.LONG,
6465
20_000_000,
6566
Importance.MEDIUM,
66-
"The number of outstanding bytes per-partition allowed. Set to 20MB by default.");
67+
"The number of outstanding bytes per-partition allowed. Set to 20MB by default.")
68+
.define(
69+
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
70+
ConfigDef.Type.STRING,
71+
"",
72+
Importance.HIGH,
73+
"The path to the GCP credentials file")
74+
.define(
75+
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
76+
ConfigDef.Type.STRING,
77+
"",
78+
Importance.HIGH,
79+
"GCP JSON credentials");
6780
}
6881
}

0 commit comments

Comments
 (0)