diff --git a/README.md b/README.md index d30016de..8e457d69 100644 --- a/README.md +++ b/README.md @@ -198,6 +198,7 @@ configurations: | cps.topic | String | REQUIRED (No default) | The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo". | | cps.project | String | REQUIRED (No default) | The project containing the Pub/Sub topic, e.g. "bar" from above. | | cps.endpoint | String | "pubsub.googleapis.com:443" | The [Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. | +| cps.useEmulator | Boolean | false | When true, use the Pub/Sub emulator instead of the production service. The emulator endpoint will be determined by the PUBSUB_EMULATOR_HOST environment variable, or fallback to the cps.endpoint configuration. | | maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub. | | maxBufferBytes | Long | 10,000,000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub. | | maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing. | diff --git a/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java b/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java index 70044257..63ad7c0c 100644 --- a/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java +++ b/src/main/java/com/google/pubsub/kafka/common/ConnectorUtils.java @@ -29,6 +29,8 @@ public class ConnectorUtils { public static final String CPS_TOPIC_CONFIG = "cps.topic"; public static final String CPS_ENDPOINT = "cps.endpoint"; public static final String CPS_DEFAULT_ENDPOINT = "pubsub.googleapis.com:443"; + public static final String CPS_USE_EMULATOR = "cps.useEmulator"; + public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST"; public static final String CPS_MESSAGE_KEY_ATTRIBUTE = "key"; public static final String CPS_ORDERING_KEY_ATTRIBUTE = "orderingKey"; public static final String GCP_CREDENTIALS_FILE_PATH_CONFIG = "gcp.credentials.file.path"; diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java index d57988a2..9b8fcc8a 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java @@ -287,7 +287,13 @@ public ConfigDef config() { Type.STRING, ConnectorUtils.CPS_DEFAULT_ENDPOINT, Importance.LOW, - "The Pub/Sub endpoint to use."); + "The Pub/Sub endpoint to use.") + .define( + ConnectorUtils.CPS_USE_EMULATOR, + Type.BOOLEAN, + false, + Importance.LOW, + "When true, use the Pub/Sub emulator instead of the production service."); } @Override diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index ce5fe152..0606f484 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -69,6 +69,7 @@ public class CloudPubSubSinkTask extends SinkTask { private String cpsProject; private String cpsTopic; private String cpsEndpoint; + private boolean useEmulator; private String messageBodyName; private long maxBufferSize; private long maxBufferBytes; @@ -118,6 +119,7 @@ public void start(Map props) { cpsProject = validatedProps.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString(); cpsTopic = validatedProps.get(ConnectorUtils.CPS_TOPIC_CONFIG).toString(); cpsEndpoint = validatedProps.get(ConnectorUtils.CPS_ENDPOINT).toString(); + useEmulator = (Boolean) validatedProps.get(ConnectorUtils.CPS_USE_EMULATOR); maxBufferSize = (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG); maxBufferBytes = (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_BYTES_CONFIG); maxOutstandingRequestBytes = @@ -399,7 +401,6 @@ private void createPublisher() { com.google.cloud.pubsub.v1.Publisher.Builder builder = com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic) - .setCredentialsProvider(gcpCredentialsProvider) .setBatchingSettings(batchingSettings.build()) .setRetrySettings( RetrySettings.newBuilder() @@ -413,8 +414,24 @@ private void createPublisher() { .setInitialRpcTimeout(Duration.ofSeconds(10)) .setRpcTimeoutMultiplier(2) .build()) - .setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor())) - .setEndpoint(cpsEndpoint); + .setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor())); + + // Configure endpoint, credentials and channel based on whether we're using emulator or production + if (useEmulator) { + // For emulator: use PUBSUB_EMULATOR_HOST env var, fallback to configured cps.endpoint, then default + String emulatorHost = System.getenv(ConnectorUtils.PUBSUB_EMULATOR_HOST); + String endpoint = emulatorHost != null ? emulatorHost : cpsEndpoint; + builder.setCredentialsProvider(com.google.api.gax.core.NoCredentialsProvider.create()) + .setChannelProvider( + com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint(endpoint) + .setChannelConfigurator(channel -> channel.usePlaintext()) + .build()); + } else { + // For production: use configured credentials and endpoint + builder.setCredentialsProvider(gcpCredentialsProvider) + .setEndpoint(cpsEndpoint); + } if (orderingKeySource != OrderingKeySource.NONE) { builder.setEnableMessageOrdering(true); } diff --git a/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java b/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java index d25918f3..19e9444c 100644 --- a/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java +++ b/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java @@ -16,6 +16,7 @@ package com.google.pubsub.kafka.sink; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; @@ -38,6 +39,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -612,6 +614,42 @@ public void testPublisherShutdownOnStop() throws Exception { verify(publisher, times(1)).awaitTermination(maxShutdownTimeoutMs, TimeUnit.MILLISECONDS); } + /** Tests that the emulator configuration is properly defined and parsed. */ + @Test + public void testEmulatorConfiguration() { + CloudPubSubSinkConnector connector = new CloudPubSubSinkConnector(); + ConfigDef configDef = connector.config(); + + assertTrue("Emulator configuration should be defined", + configDef.names().contains(ConnectorUtils.CPS_USE_EMULATOR)); + + Map emulatorProps = new HashMap<>(); + emulatorProps.put(ConnectorUtils.CPS_TOPIC_CONFIG, CPS_TOPIC); + emulatorProps.put(ConnectorUtils.CPS_PROJECT_CONFIG, CPS_PROJECT); + emulatorProps.put(ConnectorUtils.CPS_USE_EMULATOR, "true"); + + Map parsedProps = configDef.parse(emulatorProps); + assertTrue("Emulator should be enabled", (Boolean) parsedProps.get(ConnectorUtils.CPS_USE_EMULATOR)); + } + + @Test + public void testCreatePublisherWithEmulatorEnabled() { + props.put(ConnectorUtils.CPS_USE_EMULATOR, "true"); + props.put(ConnectorUtils.CPS_ENDPOINT, "localhost:8085"); + CloudPubSubSinkTask task = new CloudPubSubSinkTask(publisher); + task.start(props); + assertEquals(CloudPubSubSinkTask.class, task.getClass()); + } + + @Test + public void testCreatePublisherWithEmulatorDisabled() { + props.put(ConnectorUtils.CPS_USE_EMULATOR, "false"); + props.put(ConnectorUtils.CPS_ENDPOINT, "pubsub.googleapis.com:443"); + CloudPubSubSinkTask task = new CloudPubSubSinkTask(publisher); + task.start(props); + assertEquals(CloudPubSubSinkTask.class, task.getClass()); + } + /** Get some sample SinkRecords's to use in the tests. */ private List getSampleRecords() { List records = new ArrayList<>();