diff --git a/build.gradle b/build.gradle index 2fb9743..9cde66f 100644 --- a/build.gradle +++ b/build.gradle @@ -66,6 +66,11 @@ configurations { details.useVersion "4.2.7.Final" details.because "Eliminating CVEs on earlier patch versions. io.netty is brought in by marklogic-data-hub. " } + + if (details.requested.group.equals("org.springframework") && details.requested.version.startsWith("6.2.")) { + details.useVersion "6.2.12" + details.because "Forcing latest Spring version in marklogic-data-hub to minimize CVEs." + } } } } diff --git a/docs/configuring-the-connector.md b/docs/configuring-the-connector.md index 83a4b07..7ea9dfc 100644 --- a/docs/configuring-the-connector.md +++ b/docs/configuring-the-connector.md @@ -47,8 +47,8 @@ Regardless of the required authentication strategy, you must configure the follo - `ml.connection.port` = the port of the MarkLogic app server you wish to connect to - `ml.connection.securityContextType` = the authentication strategy required by the MarkLogic app server; defaults to DIGEST -The choices for `ml.connection.securityContextType` are DIGEST, BASIC, CERTIFICATE, KERBEROS, and NONE. The additional -properties required for each are described in the following sections. +The choices for `ml.connection.securityContextType` are `DIGEST`, `BASIC`, `CERTIFICATE`, `KERBEROS`, `CLOUD`, and +`NONE`. The additional properties required for each are described in the following sections. ### Configuring digest and basic authentication @@ -57,20 +57,22 @@ Both digest and basic authentication require the following properties to be conf - `ml.connection.username` = the name of the MarkLogic user to authenticate as - `ml.connection.password` = the password of the MarkLogic user -### Configuring MarkLogic Cloud authentication +### Configuring Progress Data Cloud authentication Cloud authentication requires the following properties to be configured: -- `ml.connection.basePath` = the base path in your MarkLogic Cloud instance that points to the REST API server you +- `ml.connection.securityContextType=CLOUD` +- `ml.connection.basePath` = the base path in your Progress Data Cloud instance that points to the REST API server you wish to connect to -- `ml.connection.cloudApiKey` = the API key for authenticating with your MarkLogic Cloud instance +- `ml.connection.cloudApiKey` = the API key for authenticating with your Progress Data Cloud instance -You should also set `ml.connection.port` to 443 for connecting to MarkLogic Cloud. +You should also set `ml.connection.port` to 443 for connecting to Progress Data Cloud. ### Configuring certificate authentication Certificate authentication requires the following properties to be configured: +- `ml.connection.securityContextType=CERTIFICATE` - `ml.connection.certFile` = path to a PKCS12 certificate file - `ml.connection.certPassword` = password for the PKCS12 certificate file @@ -78,6 +80,7 @@ Certificate authentication requires the following properties to be configured: Kerberos authentication requires the following property to be configured: +- `ml.connection.securityContextType=KERBEROS` - `ml.connection.externalName` = the name of the principal to be used in Kerberos authentication ### Configuring no authentication diff --git a/src/main/java/com/marklogic/kafka/connect/MarkLogicConfig.java b/src/main/java/com/marklogic/kafka/connect/MarkLogicConfig.java index 2510aa1..ea3e46e 100644 --- a/src/main/java/com/marklogic/kafka/connect/MarkLogicConfig.java +++ b/src/main/java/com/marklogic/kafka/connect/MarkLogicConfig.java @@ -35,7 +35,7 @@ public class MarkLogicConfig extends AbstractConfig { public static final String SSL_HOST_VERIFIER = "ml.connection.customSsl.hostNameVerifier"; public static final String SSL_MUTUAL_AUTH = "ml.connection.customSsl.mutualAuth"; - private static final CustomRecommenderAndValidator CONNECTION_SECURITY_CONTEXT_TYPE_RV = new CustomRecommenderAndValidator("DIGEST", "BASIC", "CERTIFICATE", "KERBEROS", "NONE"); + private static final CustomRecommenderAndValidator CONNECTION_SECURITY_CONTEXT_TYPE_RV = new CustomRecommenderAndValidator("DIGEST", "BASIC", "CERTIFICATE", "KERBEROS", "CLOUD", "NONE"); private static final CustomRecommenderAndValidator CONNECTION_TYPE_RV = new CustomRecommenderAndValidator("DIRECT", "GATEWAY", ""); private static final CustomRecommenderAndValidator SSL_HOST_VERIFIER_RV = new CustomRecommenderAndValidator("ANY", "COMMON", "STRICT"); @@ -73,7 +73,7 @@ public static void addDefinitions(ConfigDef configDef) { "External name for 'KERBEROS' authentication", GROUP, -1, ConfigDef.Width.MEDIUM, "Kerberos External Name") .define(CONNECTION_CLOUD_API_KEY, Type.STRING, null, Importance.MEDIUM, - "API key for connecting to MarkLogic Cloud. Should set port to 443 when connecting to MarkLogic Cloud.", + "API key for connecting to Progress Data Cloud. Should set port to 443 when connecting to Progress Data Cloud.", GROUP, -1, ConfigDef.Width.MEDIUM, "Cloud API Key") .define(CONNECTION_TYPE, Type.STRING, "", CONNECTION_TYPE_RV, Importance.MEDIUM, "Set to 'GATEWAY' when the host identified by ml.connection.host is a load balancer. See https://docs.marklogic.com/guide/java/data-movement#id_26583 for more information.", diff --git a/src/test/java/com/marklogic/kafka/connect/BuildDatabaseClientConfigTest.java b/src/test/java/com/marklogic/kafka/connect/BuildDatabaseClientConfigTest.java index 77f3a7c..337f2f8 100644 --- a/src/test/java/com/marklogic/kafka/connect/BuildDatabaseClientConfigTest.java +++ b/src/test/java/com/marklogic/kafka/connect/BuildDatabaseClientConfigTest.java @@ -182,7 +182,7 @@ void testInvalidAuthentication() { securityContextConfig.put(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "IncorrectValue"); ConfigException ex = assertThrows(ConfigException.class, () -> MarkLogicSinkConfig.CONFIG_DEF.parse(securityContextConfig), "Should throw ConfigException when an invalid authentication type is provided."); - assertEquals("Invalid value: IncorrectValue; must be one of: [DIGEST, BASIC, CERTIFICATE, KERBEROS, NONE]", ex.getMessage()); + assertEquals("Invalid value: IncorrectValue; must be one of: [DIGEST, BASIC, CERTIFICATE, KERBEROS, CLOUD, NONE]", ex.getMessage()); } @Test diff --git a/src/test/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConnectorConfigBuilder.java b/src/test/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConnectorConfigBuilder.java deleted file mode 100644 index 209d5d9..0000000 --- a/src/test/java/com/marklogic/kafka/connect/sink/MarkLogicSinkConnectorConfigBuilder.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2019-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. - */ -package com.marklogic.kafka.connect.sink; - -import org.apache.kafka.connect.runtime.ConnectorConfig; - -import java.util.Properties; -import java.util.UUID; - -public class MarkLogicSinkConnectorConfigBuilder { - - private String topic = String.format("topic-%s", UUID.randomUUID().toString()); - - private String key = String.format("key-%s", UUID.randomUUID().toString()); - - private final Properties connectorProps = new Properties(); - - public MarkLogicSinkConnectorConfigBuilder withTopic(final String topic) { - this.topic = topic; - return this; - } - - public MarkLogicSinkConnectorConfigBuilder withKey(final String key) { - this.key = key; - return this; - } - - public MarkLogicSinkConnectorConfigBuilder with(final String propertyName, final T value) { - connectorProps.put(propertyName, value); - return this; - } - - public MarkLogicSinkConnectorConfigBuilder withAll(final Properties connectorProps) { - this.connectorProps.putAll(connectorProps); - return this; - } - - private void ifNonExisting(final String propertyName, final T value) { - if (connectorProps.get(propertyName) != null) return; - connectorProps.put(propertyName, value); - } - - public Properties build() { - - ifNonExisting(ConnectorConfig.NAME_CONFIG, "marklogic-sink"); - ifNonExisting(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "MarkLogicSinkConnector"); - ifNonExisting(ConnectorConfig.TASKS_MAX_CONFIG, "1"); - ifNonExisting("topics", topic); - ifNonExisting("key", key); - - ifNonExisting(MarkLogicSinkConfig.CONNECTION_HOST, "localhost"); - ifNonExisting(MarkLogicSinkConfig.CONNECTION_PORT, "8000"); - ifNonExisting(MarkLogicSinkConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "DIGEST"); - ifNonExisting(MarkLogicSinkConfig.CONNECTION_USERNAME, "admin"); - ifNonExisting(MarkLogicSinkConfig.CONNECTION_PASSWORD, "admin"); - ifNonExisting(MarkLogicSinkConfig.DOCUMENT_FORMAT, "JSON"); - ifNonExisting(MarkLogicSinkConfig.DOCUMENT_COLLECTIONS, "kafka-data"); - ifNonExisting(MarkLogicSinkConfig.DOCUMENT_PERMISSIONS, "rest-reader,read,rest-writer,update"); - ifNonExisting(MarkLogicSinkConfig.DOCUMENT_URI_PREFIX, "/kafka-data/"); - ifNonExisting(MarkLogicSinkConfig.DOCUMENT_URI_SUFFIX, ".json"); - ifNonExisting(MarkLogicSinkConfig.DMSDK_BATCH_SIZE, "100"); - ifNonExisting(MarkLogicSinkConfig.DMSDK_THREAD_COUNT, "8"); - ifNonExisting("key.converter", "org.apache.kafka.connect.storage.StringConverter"); - ifNonExisting("value.converter", "org.apache.kafka.connect.storage.StringConverter"); - ifNonExisting("errors.log.enable", true); - - final Properties copyOfConnectorProps = new Properties(); - copyOfConnectorProps.putAll(connectorProps); - - return copyOfConnectorProps; - } - - public static MarkLogicSinkConnectorConfigBuilder create() { - return new MarkLogicSinkConnectorConfigBuilder(); - } -} diff --git a/src/test/java/com/marklogic/kafka/connect/source/MarkLogicSourceConnectorConfigBuilder.java b/src/test/java/com/marklogic/kafka/connect/source/MarkLogicSourceConnectorConfigBuilder.java deleted file mode 100644 index 24118e2..0000000 --- a/src/test/java/com/marklogic/kafka/connect/source/MarkLogicSourceConnectorConfigBuilder.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2019-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. - */ -package com.marklogic.kafka.connect.source; - -import org.apache.kafka.connect.runtime.ConnectorConfig; - -import java.util.Properties; -import java.util.UUID; - -public class MarkLogicSourceConnectorConfigBuilder { - - private String topic = String.format("topic-%s", UUID.randomUUID()); - - private String dsl; - - private final Properties connectorProps = new Properties(); - - public MarkLogicSourceConnectorConfigBuilder withTopic(final String topic) { - this.topic = topic; - return this; - } - - public MarkLogicSourceConnectorConfigBuilder withDsl(final String dsl) { - this.dsl = dsl; - return this; - } - - public MarkLogicSourceConnectorConfigBuilder with(final String propertyName, final T value) { - connectorProps.put(propertyName, value); - return this; - } - - private void ifNonExisting(final String propertyName, final T value) { - if (connectorProps.get(propertyName) != null) return; - connectorProps.put(propertyName, value); - } - - public Properties build() { - - ifNonExisting(ConnectorConfig.NAME_CONFIG, "marklogic-source"); - ifNonExisting(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "MarkLogicSourceConnector"); - ifNonExisting(ConnectorConfig.TASKS_MAX_CONFIG, "1"); - ifNonExisting(MarkLogicSourceConfig.DSL_QUERY, dsl); - ifNonExisting(MarkLogicSourceConfig.TOPIC, topic); - - ifNonExisting(MarkLogicSourceConfig.CONNECTION_HOST, "localhost"); - ifNonExisting(MarkLogicSourceConfig.CONNECTION_PORT, "8000"); - ifNonExisting(MarkLogicSourceConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "DIGEST"); - ifNonExisting(MarkLogicSourceConfig.CONNECTION_USERNAME, "admin"); - ifNonExisting(MarkLogicSourceConfig.CONNECTION_PASSWORD, "admin"); - ifNonExisting("key.converter", "org.apache.kafka.connect.storage.StringConverter"); - ifNonExisting("value.converter", "org.apache.kafka.connect.storage.StringConverter"); - ifNonExisting("errors.log.enable", true); - - final Properties copyOfConnectorProps = new Properties(); - copyOfConnectorProps.putAll(connectorProps); - - return copyOfConnectorProps; - } - - public static MarkLogicSourceConnectorConfigBuilder create() { - return new MarkLogicSourceConnectorConfigBuilder(); - } -} diff --git a/src/test/java/com/marklogic/kafka/connect/source/ReadRowsViaOpticDslTest.java b/src/test/java/com/marklogic/kafka/connect/source/ReadRowsViaOpticDslTest.java index 1f8a64c..02bdd22 100644 --- a/src/test/java/com/marklogic/kafka/connect/source/ReadRowsViaOpticDslTest.java +++ b/src/test/java/com/marklogic/kafka/connect/source/ReadRowsViaOpticDslTest.java @@ -3,6 +3,7 @@ */ package com.marklogic.kafka.connect.source; +import com.marklogic.client.ProgressDataCloudException; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; @@ -11,9 +12,7 @@ import java.util.List; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; class ReadRowsViaOpticDslTest extends AbstractIntegrationSourceTest { @@ -46,6 +45,23 @@ void readFifteenAuthorsAsJson() throws InterruptedException { verifyRecordKeysAreSetToIDColumn(records); } + @Test + void cloudAuth() { + ProgressDataCloudException ex = assertThrows(ProgressDataCloudException.class, () -> startSourceTask( + MarkLogicSourceConfig.CONNECTION_SECURITY_CONTEXT_TYPE, "cloud", + MarkLogicSourceConfig.CONNECTION_CLOUD_API_KEY, "abc123", + MarkLogicSourceConfig.DSL_QUERY, AUTHORS_ORDERED_BY_ID_OPTIC_DSL, + MarkLogicSourceConfig.TOPIC, AUTHORS_TOPIC, + MarkLogicSourceConfig.KEY_COLUMN, "Medical.Authors.ID" + )); + + String message = ex.getMessage(); + assertTrue(message.contains("Unable to call token endpoint"), + "We expect this test to fail because it can't talk to PDC, and that's fine. What this verifies " + + "is that the user can configure the connector to talk to PDC - i.e. 'cloud' is accepted as a " + + "security context type. Actual error: " + message); + } + @Test void includeColumnTypes() throws InterruptedException { loadFifteenAuthorsIntoMarkLogic();