diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4eb5fe0..37db1ce 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -7,7 +7,7 @@ avro = "1.12.0" avroPlugin = "1.9.1" dependencyManagement = "1.1.7" kotlin = "2.2.10" -mockitoKotlin = "6.0.0" +mockk = "1.14.5" msal4j = "1.23.0" sonarqube = "6.3.1.5724" spotless = "7.2.1" @@ -22,7 +22,7 @@ junitJupiterApi = { group = "org.junit.jupiter", name = "junit-jupiter-api" } junitJupiterEngine = { group = "org.junit.jupiter", name = "junit-jupiter-engine" } junitPlatformLauncher = { group = "org.junit.platform", name = "junit-platform-launcher" } kafkaClients = { group = "org.apache.kafka", name = "kafka-clients" } -mockitoKotlin = { group = "org.mockito.kotlin", name = "mockito-kotlin", version.ref = "mockitoKotlin" } +mockk = { group = "io.mockk", name = "mockk", version.ref = "mockk" } msal = { group = "com.microsoft.azure", name = "msal4j", version.ref = "msal4j" } slf4jApi = { group = "org.slf4j", name = "slf4j-api" } kotlinLoggingJvm = { group = "io.github.oshai", name = "kotlin-logging-jvm", version.ref = "kotlinLogging" } @@ -33,6 +33,7 @@ springBootStarterTest = { group = "org.springframework.boot", name = "spring-boo springBootTest = { group = "org.springframework.boot", name = "spring-boot-test" } springContext = { group = "org.springframework", name = "spring-context" } springKafka = { group = "org.springframework.kafka", name = "spring-kafka" } +springKafkaTest = { group = "org.springframework.kafka", name = "spring-kafka-test" } springTest = { group = "org.springframework", name = "spring-test" } [plugins] diff --git a/kafka-message-signing/build.gradle.kts b/kafka-message-signing/build.gradle.kts index bd68bcb..13f51c1 100644 --- a/kafka-message-signing/build.gradle.kts +++ b/kafka-message-signing/build.gradle.kts @@ -17,7 +17,7 @@ dependencies { testImplementation(libs.springTest) testImplementation(libs.springBootTest) testImplementation(libs.springBootStarter) - testImplementation(libs.mockitoKotlin) + testImplementation(libs.mockk) testRuntimeOnly(libs.junitPlatformLauncher) } @@ -25,3 +25,21 @@ dependencies { tasks.test { useJUnitPlatform() } + +testing { + suites { + val integrationTest by registering(JvmTestSuite::class) { + useJUnitJupiter() + dependencies { + implementation(project()) + implementation(libs.springBootStarterTest) + implementation(libs.springKafka) + implementation(libs.springKafkaTest) + implementation(libs.kafkaClients) + implementation(libs.assertJ) + implementation(libs.avro) + implementation(project(":kafka-avro")) + } + } + } +} diff --git a/kafka-message-signing/src/integrationTest/kotlin/com/gxf/utilities/kafka/message/IntegrationTestHelper.kt b/kafka-message-signing/src/integrationTest/kotlin/com/gxf/utilities/kafka/message/IntegrationTestHelper.kt new file mode 100644 index 0000000..c5c5570 --- /dev/null +++ b/kafka-message-signing/src/integrationTest/kotlin/com/gxf/utilities/kafka/message/IntegrationTestHelper.kt @@ -0,0 +1,124 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 +package com.gxf.utilities.kafka.message + +import com.gxf.utilities.kafka.avro.AvroDeserializer +import com.gxf.utilities.kafka.avro.AvroSerializer +import com.gxf.utilities.kafka.message.signing.MessageSigner +import com.gxf.utilities.kafka.message.signing.interceptors.MessageSigningAvroProducerInterceptor +import com.gxf.utilities.kafka.message.signing.interceptors.MessageSigningByteArrayProducerInterceptor +import java.util.UUID +import org.apache.avro.Schema +import org.apache.avro.specific.SpecificRecordBase +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.utils.KafkaTestUtils + +object IntegrationTestHelper { + fun createByteArrayKafkaConsumer( + embeddedKafkaBroker: EmbeddedKafkaBroker, + topic: String, + ): Consumer { + val consumerFactory = + DefaultKafkaConsumerFactory( + KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "true", embeddedKafkaBroker), + StringDeserializer(), + ByteArrayDeserializer(), + ) + val consumer = consumerFactory.createConsumer() + embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic) + return consumer + } + + fun createByteArrayKafkaProducer( + embeddedKafkaBroker: EmbeddedKafkaBroker, + messageSigner: MessageSigner, + ): Producer { + val producerProps: Map = + HashMap(byteArrayProducerProps(embeddedKafkaBroker.brokersAsString, messageSigner)) + val producerFactory = DefaultKafkaProducerFactory(producerProps, StringSerializer(), ByteArraySerializer()) + return producerFactory.createProducer() + } + + private fun byteArrayProducerProps(brokers: String, messageSigner: MessageSigner): Map { + return mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to brokers, + ProducerConfig.BATCH_SIZE_CONFIG to "16384", + ProducerConfig.LINGER_MS_CONFIG to 1, + ProducerConfig.BUFFER_MEMORY_CONFIG to "33554432", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java, + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG to MessageSigningByteArrayProducerInterceptor::class.java.name, + "message.signer" to messageSigner, + ) + } + + fun createAvroKafkaConsumer( + embeddedKafkaBroker: EmbeddedKafkaBroker, + topic: String, + ): Consumer { + val consumerFactory = + DefaultKafkaConsumerFactory( + KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "true", embeddedKafkaBroker), + StringDeserializer(), + AvroDeserializer(listOf(Message.getClassSchema())), + ) + val consumer = consumerFactory.createConsumer() + embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic) + return consumer + } + + fun createAvroKafkaProducer( + embeddedKafkaBroker: EmbeddedKafkaBroker, + messageSigner: MessageSigner, + ): Producer { + val producerProps: Map = + HashMap(avroProducerProps(embeddedKafkaBroker.brokersAsString, messageSigner)) + val producerFactory = DefaultKafkaProducerFactory(producerProps, StringSerializer(), AvroSerializer()) + return producerFactory.createProducer() + } + + private fun avroProducerProps(brokers: String, messageSigner: MessageSigner): Map { + return mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to brokers, + ProducerConfig.BATCH_SIZE_CONFIG to "16384", + ProducerConfig.LINGER_MS_CONFIG to 1, + ProducerConfig.BUFFER_MEMORY_CONFIG to "33554432", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to AvroSerializer::class.java, + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG to MessageSigningAvroProducerInterceptor::class.java.name, + "message.signer" to messageSigner, + ) + } +} + +class Message(private var message: String?) : SpecificRecordBase() { + constructor() : this(null) {} + + companion object { + fun getClassSchema(): Schema = + Schema.Parser() + .parse( + """{"type":"record","name":"Message","namespace":"com.gxf.utilities.kafka.message","fields":[{"name":"message","type":{"type":"string","avro.java.string":"String"}}]}""" + ) + } + + override fun getSchema() = getClassSchema() + + override fun get(field: Int): Any { + return message!! + } + + override fun put(field: Int, value: Any) { + message = value.toString() + } +} diff --git a/kafka-message-signing/src/integrationTest/kotlin/com/gxf/utilities/kafka/message/MessageSigningInterceptorIT.kt b/kafka-message-signing/src/integrationTest/kotlin/com/gxf/utilities/kafka/message/MessageSigningInterceptorIT.kt new file mode 100644 index 0000000..95c9aaa --- /dev/null +++ b/kafka-message-signing/src/integrationTest/kotlin/com/gxf/utilities/kafka/message/MessageSigningInterceptorIT.kt @@ -0,0 +1,71 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 +package com.gxf.utilities.kafka.message + +import com.gxf.utilities.kafka.message.IntegrationTestHelper.createAvroKafkaConsumer +import com.gxf.utilities.kafka.message.IntegrationTestHelper.createAvroKafkaProducer +import com.gxf.utilities.kafka.message.IntegrationTestHelper.createByteArrayKafkaConsumer +import com.gxf.utilities.kafka.message.IntegrationTestHelper.createByteArrayKafkaProducer +import com.gxf.utilities.kafka.message.signing.MessageSigner +import com.gxf.utilities.kafka.message.signing.MessageSigningAutoConfiguration +import java.time.Duration +import org.apache.avro.specific.SpecificRecordBase +import org.apache.kafka.clients.producer.ProducerRecord +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.test.EmbeddedKafkaBroker +import org.springframework.kafka.test.context.EmbeddedKafka +import org.springframework.test.annotation.DirtiesContext + +@SpringBootTest(classes = [MessageSigningAutoConfiguration::class]) +@EmbeddedKafka(topics = ["test-topic"]) +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) +class MessageSigningInterceptorIT { + @Autowired private lateinit var embeddedKafkaBroker: EmbeddedKafkaBroker + @Autowired private lateinit var messageSigner: MessageSigner + + val topic = "test-topic" + + @Test + fun `can sign ByteArray message with interceptor`() { + val producer = createByteArrayKafkaProducer(embeddedKafkaBroker, messageSigner) + val consumer = createByteArrayKafkaConsumer(embeddedKafkaBroker, topic) + + val unsignedRecord = ProducerRecord(topic, "key", "value".toByteArray()) + producer.send(unsignedRecord) + + val records = consumer.poll(Duration.ofSeconds(5)).records(topic) + + assertThat(records).hasSize(1) + + val receivedRecord = records.first() + val signatureHeader = receivedRecord.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE) + + assertThat(signatureHeader).isNotNull + assertThat(signatureHeader.value()).isNotEmpty() + assertThat(messageSigner.verifyByteArrayRecordUsingHeader(receivedRecord)).isTrue() + } + + @Test + fun `can sign Avro message with interceptor`() { + val producer = createAvroKafkaProducer(embeddedKafkaBroker, messageSigner) + val consumer = createAvroKafkaConsumer(embeddedKafkaBroker, topic) + + val unsignedRecord = ProducerRecord(topic, "key", Message("value")) + producer.send(unsignedRecord) + + val polled = consumer.poll(Duration.ofSeconds(5)) + val records = polled.records(topic) + + assertThat(records).hasSize(1) + + val receivedRecord = records.first() + val signatureHeader = receivedRecord.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE) + + assertThat(signatureHeader).isNotNull + assertThat(signatureHeader.value()).isNotEmpty() + } +} diff --git a/kafka-message-signing/src/integrationTest/resources/application.yaml b/kafka-message-signing/src/integrationTest/resources/application.yaml new file mode 100644 index 0000000..3d39100 --- /dev/null +++ b/kafka-message-signing/src/integrationTest/resources/application.yaml @@ -0,0 +1,5 @@ +message-signing: + signing-enabled: true + strip-avro-header: true + private-key-file: classpath:rsa-private.pem + public-key-file: classpath:rsa-public.pem diff --git a/kafka-message-signing/src/integrationTest/resources/rsa-private.pem b/kafka-message-signing/src/integrationTest/resources/rsa-private.pem new file mode 100644 index 0000000..2054aec --- /dev/null +++ b/kafka-message-signing/src/integrationTest/resources/rsa-private.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEuwIBADANBgkqhkiG9w0BAQEFAASCBKUwggShAgEAAoIBAQC4gFY4wg/M+lFb +J22C9WdT/QREetu8bNp6/OL7h1/3MOkNHgX3/QmC524yvfiYrT4eYoTdB77+xzqV +EOs3KyaYpLneIPFRVg5/H7mWVcaEli/7WWvgjJV2MDMMkd6e6tcBtTibF9Eo7GHu +aIO5qlpnDoONNE0ON3iuuhCBGMsAnH0nqNlfl6oAwJvQT7PJEarVK6a3EuLD3qNP +4j/XkxLkl2LST9cKG1HMmxBjhHVijHbi7SS/NxCMrJj5rrwOiirHk7GaygCE/gk+ +jqMS5TedIzboUnEzcLNJzRut4iyvNtkGk/EuaBmeT2fhiYXTMhMdUe/LWxSIPrbn ++Qfg9peLAgMBAAECgf911YE7vrVYtIZBMW0/tnvHqFCRa+Xq8ZqX8esFufAQb6xo +NESbnX/1mtlGjw22dO63eTRh9hCFp7hCfAu9sFF2K76Jn4B/8fZQOnLNEPs4srLy +VKoRP7g1Q5NW89K6rEGlVZhRdWSgPlhNa3K1oZcbOqnKBR0xdzwcW7kuuibsu++K +0Lo9GNCqzgX34xT9CvmlEs2VsZGV/dYufu+pFFkyp2iahnVbJWGOfcbrk47OwO3I +LjDL6sXmad6OWWhwW7aPcIujEeeBGprZv3T07ikOpTFFrZMbkM/AhP4WPE5gtqJ7 +UsD9R2QEXbS2k4Epow3xh+KxkxGbsySzjzor+5ECgYEA3Y2kIZHJjCnvDAPLTktJ +lWkaROiBmzvk8ExPZ3X8q07IDZAqLQ/z3vvT4tluGezilfOsVs4tX6qX9Yx5omJu +4exQ3bB3ra2l/RNKftZrGqsBrr3+tx0HTf1+Zuavt6LFmNKWcS7IXmX6ntF3NwZT +TZwTCinLHv/wIEsQJSR7dPsCgYEA1S/v8N6FOZvqUL3IbYPZXScZStsZQmVvQHFw +Nt1B1xocq8ETj4ua8VLvXW7qjPZZp7qhJBgDUaije4wuHFpFUmDkt5yi5R9bAhAY +miFAx6HMlytQDWhk0y+bC4K62EvTFzAkRlvzr8vkfDeOcafahGZ+k9xXLmVwpBOW +y6u2QrECgYB3vYGjkwN2+YL032gV3KLcWX2VGIRTvb8yEEwqAp6Yh2+fxPbGfAS1 +0yzlQdY7tMeRe6z9DVmAhtayI0Xp+YEsIWhjKGjGOT+o07BDdOdV9m5mXtE3bjzw +bbzPKIZ3nUVmHwqoCTzJqBwXkeX4mzaSj3PK3mOlUXYaPfdv25PN2QKBgFxNSA66 +2WXK+tWAhgHcn0T0w7+kQzh7IIL/Wn12qKYQOS+oBecVo70uklKazlS/6Kt1Y4V0 +HCPD5xx74g8GipMTPpO87s5TGB05iN1a3mhQxnsBFsTnWRgSuYdvT6SMl9WnU11f +PI/1sHSTvUm6SiMfGVi9gsWkx/NSQ+zk6KHRAoGBALI0HbHBfv8flVf18wJpeFpP +X2aEf1IVD+RNnpqaQK64JBu8SoUjt/3qyluptKS/ilAGVPU1KUQqKMHQiGtnKl/m +ZfjuNlnsjh9C5X7Fl/ja9T1SxAyIWPUWT8QIpbOxPmMmyFfK/ZRJ1DDHUA9dohVA +cwNSRNC5PwoEgT8+tfO1 +-----END PRIVATE KEY----- diff --git a/kafka-message-signing/src/integrationTest/resources/rsa-public.pem b/kafka-message-signing/src/integrationTest/resources/rsa-public.pem new file mode 100644 index 0000000..6df90ed --- /dev/null +++ b/kafka-message-signing/src/integrationTest/resources/rsa-public.pem @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuIBWOMIPzPpRWydtgvVn +U/0ERHrbvGzaevzi+4df9zDpDR4F9/0JguduMr34mK0+HmKE3Qe+/sc6lRDrNysm +mKS53iDxUVYOfx+5llXGhJYv+1lr4IyVdjAzDJHenurXAbU4mxfRKOxh7miDuapa +Zw6DjTRNDjd4rroQgRjLAJx9J6jZX5eqAMCb0E+zyRGq1SumtxLiw96jT+I/15MS +5Jdi0k/XChtRzJsQY4R1Yox24u0kvzcQjKyY+a68Dooqx5OxmsoAhP4JPo6jEuU3 +nSM26FJxM3CzSc0breIsrzbZBpPxLmgZnk9n4YmF0zITHVHvy1sUiD625/kH4PaX +iwIDAQAB +-----END PUBLIC KEY----- diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt index 2db887c..c765a44 100644 --- a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt @@ -10,9 +10,15 @@ import java.io.IOException import java.io.UncheckedIOException import java.nio.ByteBuffer import java.nio.charset.StandardCharsets -import java.security.* +import java.security.GeneralSecurityException +import java.security.InvalidKeyException +import java.security.KeyFactory +import java.security.PrivateKey +import java.security.PublicKey +import java.security.Signature +import java.security.SignatureException import java.security.spec.X509EncodedKeySpec -import java.util.* +import java.util.Base64 import java.util.regex.Pattern import org.apache.avro.specific.SpecificRecordBase import org.apache.kafka.clients.consumer.ConsumerRecord @@ -28,7 +34,6 @@ import org.springframework.stereotype.Component // Only instantiate when no other bean has been configured @ConditionalOnMissingBean(MessageSigner::class) class MessageSigner(properties: MessageSigningProperties) { - val signingEnabled: Boolean = properties.signingEnabled private val stripAvroHeader: Boolean = properties.stripAvroHeader @@ -90,6 +95,16 @@ class MessageSigner(properties: MessageSigningProperties) { return producerRecord } + fun signByteArrayRecordUsingHeader( + producerRecord: ProducerRecord + ): ProducerRecord { + if (signingEnabled) { + val signature = signatureByteArray(producerRecord) + producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, signature.array()) + } + return producerRecord + } + /** * Determines the signature for the given `message`. * @@ -104,16 +119,14 @@ class MessageSigner(properties: MessageSigningProperties) { * @throws UncheckedSecurityException if the signing process throws a SignatureException. */ private fun signature(message: FlexibleSignableMessageWrapper<*>): ByteBuffer { - check(canSignMessages()) { - "This MessageSigner is not configured for signing, it can only be used for verification" - } + check(canSignMessages()) { KEY_NOT_FOR_SIGNING } val oldSignature = message.getSignature() message.clearSignature() val byteBuffer = toByteBuffer(message) try { return signature(byteBuffer) } catch (e: SignatureException) { - throw UncheckedSecurityException("Unable to sign message", e) + throw UncheckedSecurityException(UNABLE_TO_SIGN_MESSAGE, e) } finally { oldSignature?.let { message.setSignature(it) } } @@ -133,15 +146,23 @@ class MessageSigner(properties: MessageSigningProperties) { * @throws UncheckedSecurityException if the signing process throws a SignatureException. */ private fun signature(producerRecord: ProducerRecord): ByteBuffer { - check(canSignMessages()) { - "This MessageSigner is not configured for signing, it can only be used for verification" - } + check(canSignMessages()) { KEY_NOT_FOR_SIGNING } val specificRecordBase = producerRecord.value() val byteBuffer = toByteBuffer(specificRecordBase) try { return signature(byteBuffer) } catch (e: SignatureException) { - throw UncheckedSecurityException("Unable to sign message", e) + throw UncheckedSecurityException(UNABLE_TO_SIGN_MESSAGE, e) + } + } + + private fun signatureByteArray(producerRecord: ProducerRecord): ByteBuffer { + check(canSignMessages()) { KEY_NOT_FOR_SIGNING } + val byteBuffer = ByteBuffer.wrap(producerRecord.value()) + try { + return signature(byteBuffer) + } catch (e: SignatureException) { + throw UncheckedSecurityException(UNABLE_TO_SIGN_MESSAGE, e) } } @@ -169,7 +190,7 @@ class MessageSigner(properties: MessageSigningProperties) { */ fun verifyUsingField(message: FlexibleSignableMessageWrapper): Boolean { if (!canVerifyMessageSignatures()) { - logger.error("This MessageSigner is not configured for verification, it can only be used for signing") + logger.error(KEY_NOT_FOR_VERIFICATION) return false } @@ -184,7 +205,7 @@ class MessageSigner(properties: MessageSigningProperties) { message.clearSignature() return verifySignatureBytes(messageSignature, toByteBuffer(message)) } catch (e: Exception) { - logger.error("Unable to verify message signature", e) + logger.error(UNABLE_TO_VERIFY_SIGNATURE, e) return false } finally { message.setSignature(messageSignature) @@ -199,31 +220,43 @@ class MessageSigner(properties: MessageSigningProperties) { */ fun verifyUsingHeader(consumerRecord: ConsumerRecord): Boolean { if (!canVerifyMessageSignatures()) { - logger.error("This MessageSigner is not configured for verification, it can only be used for signing") - return false - } - - val header = consumerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE) - if (header == null) { - logger.error("This ProducerRecord does not contain a signature header") + logger.error(KEY_NOT_FOR_VERIFICATION) return false } - - val signatureBytes = header.value() - if (signatureBytes == null || signatureBytes.isEmpty()) { - logger.error("Signature header is empty") + try { + val signatureBytes = getSignatureBytes(consumerRecord) + val specificRecordBase: SpecificRecordBase = consumerRecord.value() + return verifySignatureBytes(ByteBuffer.wrap(signatureBytes), toByteBuffer(specificRecordBase)) + } catch (e: Exception) { + logger.error(UNABLE_TO_VERIFY_SIGNATURE, e) return false } + } + fun verifyByteArrayRecordUsingHeader(consumerRecord: ConsumerRecord): Boolean { try { - val specificRecordBase: SpecificRecordBase = consumerRecord.value() - return verifySignatureBytes(ByteBuffer.wrap(signatureBytes), toByteBuffer(specificRecordBase)) + if (!canVerifyMessageSignatures()) { + logger.error(KEY_NOT_FOR_VERIFICATION) + return false + } + val signatureBytes = getSignatureBytes(consumerRecord) + return verifySignatureBytes(ByteBuffer.wrap(signatureBytes), ByteBuffer.wrap(consumerRecord.value())) } catch (e: Exception) { - logger.error("Unable to verify message signature", e) + logger.error(UNABLE_TO_VERIFY_SIGNATURE, e) return false } } + private fun getSignatureBytes(consumerRecord: ConsumerRecord): ByteArray { + val header = + consumerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE) + ?: throw IllegalArgumentException("This ConsumerRecord does not contain a signature header") + + val signatureBytes = header.value() + require(!(signatureBytes == null || signatureBytes.isEmpty())) { "Signature header is empty" } + return signatureBytes + } + @Throws(SignatureException::class) private fun verifySignatureBytes(signatureBytes: ByteBuffer, messageByteBuffer: ByteBuffer): Boolean { val messageBytes: ByteBuffer = @@ -245,7 +278,7 @@ class MessageSigner(properties: MessageSigningProperties) { private fun stripAvroHeader(bytes: ByteBuffer): ByteBuffer { if (hasAvroHeader(bytes)) { - return ByteBuffer.wrap(Arrays.copyOfRange(bytes.array(), AVRO_HEADER_LENGTH, bytes.array().size)) + return ByteBuffer.wrap(bytes.array().copyOfRange(AVRO_HEADER_LENGTH, bytes.array().size)) } return bytes } @@ -289,6 +322,13 @@ class MessageSigner(properties: MessageSigningProperties) { private val PEM_REMOVAL_PATTERN: Pattern = Pattern.compile("-----(?:BEGIN|END) .*?-----|\\r|\\n") + const val KEY_NOT_FOR_SIGNING = + "This MessageSigner is not configured for signing, it can only be used for verification" + const val KEY_NOT_FOR_VERIFICATION = + "This MessageSigner is not configured for verification, it can only be used for signing" + const val UNABLE_TO_SIGN_MESSAGE = "Unable to sign message" + const val UNABLE_TO_VERIFY_SIGNATURE = "Unable to verify message signature" + val logger: Logger = LoggerFactory.getLogger(this::class.java) @JvmStatic diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/interceptors/MessageSigningAvroProducerInterceptor.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/interceptors/MessageSigningAvroProducerInterceptor.kt new file mode 100644 index 0000000..118d4fd --- /dev/null +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/interceptors/MessageSigningAvroProducerInterceptor.kt @@ -0,0 +1,30 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 +package com.gxf.utilities.kafka.message.signing.interceptors + +import com.gxf.utilities.kafka.message.signing.MessageSigner +import org.apache.avro.specific.SpecificRecordBase +import org.apache.kafka.clients.producer.ProducerInterceptor +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata + +class MessageSigningAvroProducerInterceptor() : ProducerInterceptor { + private lateinit var messageSigner: MessageSigner + + override fun onSend( + producerRecord: ProducerRecord + ): ProducerRecord = messageSigner.signUsingHeader(producerRecord) + + override fun onAcknowledgement(metadata: RecordMetadata?, exception: Exception?) { + // not used + } + + override fun close() { + // not used + } + + override fun configure(configs: MutableMap?) { + messageSigner = (configs?.get("message.signer") ?: throw Exception()) as MessageSigner + } +} diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/interceptors/MessageSigningByteArrayProducerInterceptor.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/interceptors/MessageSigningByteArrayProducerInterceptor.kt new file mode 100644 index 0000000..6f0f303 --- /dev/null +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/interceptors/MessageSigningByteArrayProducerInterceptor.kt @@ -0,0 +1,28 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 +package com.gxf.utilities.kafka.message.signing.interceptors + +import com.gxf.utilities.kafka.message.signing.MessageSigner +import org.apache.kafka.clients.producer.ProducerInterceptor +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata + +class MessageSigningByteArrayProducerInterceptor() : ProducerInterceptor { + private lateinit var messageSigner: MessageSigner + + override fun onSend(producerRecord: ProducerRecord): ProducerRecord = + messageSigner.signByteArrayRecordUsingHeader(producerRecord) + + override fun onAcknowledgement(metadata: RecordMetadata?, exception: Exception?) { + // not used + } + + override fun close() { + // not used + } + + override fun configure(configs: MutableMap?) { + messageSigner = (configs?.get("message.signer") ?: throw Exception()) as MessageSigner + } +} diff --git a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTestByteArrayUsingHeader.kt b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTestByteArrayUsingHeader.kt new file mode 100644 index 0000000..ccf05f4 --- /dev/null +++ b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTestByteArrayUsingHeader.kt @@ -0,0 +1,85 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 +package com.gxf.utilities.kafka.message.signing + +import com.gxf.utilities.kafka.message.signing.TestHelper.producerRecordToConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerRecord +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test + +class MessageSignerTestByteArrayUsingHeader { + private val messageSignerProperties = TestConstants.messageSignerProperties.apply { stripAvroHeader = false } + private val messageSigner = MessageSigner(messageSignerProperties) + + @Test + fun signsRecordHeaderWithoutSignature() { + val record: ProducerRecord = producerRecordByteArray() + + // Assert that the returned var is of exactly the same type as the input + val sameTypeResult: ProducerRecord = messageSigner.signByteArrayRecordUsingHeader(record) + + assertThat(record.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE)).isNotNull() + } + + @Test + fun signsRecordHeaderReplacingSignature() { + val randomSignature = TestConstants.randomSignature() + val record = producerRecordByteArray() + record.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature.array()) + + val actualSignatureBefore = record.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE).value() + assertThat(actualSignatureBefore).isNotNull().isEqualTo(randomSignature.array()) + + messageSigner.signByteArrayRecordUsingHeader(record) + + val actualSignatureAfter = record.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE).value() + assertThat(actualSignatureAfter).isNotNull().isNotEqualTo(randomSignature.array()) + } + + @Test + fun verifiesRecordsWithValidSignature() { + val signedRecord = properlySignedByteArrayRecord() + + val result = messageSigner.verifyByteArrayRecordUsingHeader(signedRecord) + + assertThat(result).isTrue() + } + + @Test + fun doesNotVerifyRecordsWithoutSignature() { + val consumerRecord = consumerRecordByteArray() + + val validSignature = messageSigner.verifyByteArrayRecordUsingHeader(consumerRecord) + + assertThat(validSignature).isFalse() + } + + @Test + fun doesNotVerifyRecordsWithInvalidSignature() { + val consumerRecord = consumerRecordByteArray() + val randomSignature = TestConstants.randomSignature() + consumerRecord.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature.array()) + + val validSignature = messageSigner.verifyByteArrayRecordUsingHeader(consumerRecord) + + assertThat(validSignature).isFalse() + } + + private fun producerRecordByteArray(): ProducerRecord { + val value = "Test message".toByteArray() + return ProducerRecord("test-topic", "key1", value) + } + + private fun properlySignedByteArrayRecord(): ConsumerRecord { + val producerRecord = producerRecordByteArray() + messageSigner.signByteArrayRecordUsingHeader(producerRecord) + return producerRecordToConsumerRecord(producerRecord) + } + + private fun consumerRecordByteArray(): ConsumerRecord { + val value = "Test message".toByteArray() + return ConsumerRecord("test-topic", 0, 0L, "key1", value) + } +} diff --git a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTestUsingHeader.kt b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTestUsingHeader.kt index fc75163..d47379b 100644 --- a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTestUsingHeader.kt +++ b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTestUsingHeader.kt @@ -3,12 +3,11 @@ // SPDX-License-Identifier: Apache-2.0 package com.gxf.utilities.kafka.message.signing -import java.util.function.Consumer +import com.gxf.utilities.kafka.message.signing.TestHelper.producerRecordToConsumerRecord import org.apache.avro.Schema import org.apache.avro.specific.SpecificRecordBase import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.header.Header import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test @@ -72,13 +71,6 @@ class MessageSignerTestUsingHeader { assertThat(validSignature).isFalse() } - private fun producerRecordToConsumerRecord(producerRecord: ProducerRecord): ConsumerRecord { - val consumerRecord = - ConsumerRecord(producerRecord.topic(), 0, 123L, producerRecord.key(), producerRecord.value()) - producerRecord.headers().forEach(Consumer { header: Header? -> consumerRecord.headers().add(header) }) - return consumerRecord - } - private fun properlySignedRecord(): ConsumerRecord { val producerRecord = producerRecord() messageSigner.signUsingHeader(producerRecord) diff --git a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningProducerInterceptorTest.kt b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningProducerInterceptorTest.kt new file mode 100644 index 0000000..3049247 --- /dev/null +++ b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningProducerInterceptorTest.kt @@ -0,0 +1,36 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 +package com.gxf.utilities.kafka.message.signing + +import com.gxf.utilities.kafka.message.signing.interceptors.MessageSigningByteArrayProducerInterceptor +import io.mockk.every +import io.mockk.impl.annotations.InjectMockKs +import io.mockk.impl.annotations.MockK +import io.mockk.junit5.MockKExtension +import org.apache.kafka.clients.producer.ProducerRecord +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith + +@ExtendWith(MockKExtension::class) +class MessageSigningProducerInterceptorTest { + @InjectMockKs private lateinit var interceptor: MessageSigningByteArrayProducerInterceptor + @MockK private lateinit var messageSigner: MessageSigner + + @Test + fun testOnSend() { + val producerRecord = ProducerRecord("topic", "key", "value".toByteArray()) + every { messageSigner.signByteArrayRecordUsingHeader(producerRecord) } answers + { + producerRecord.apply { headers().add("signature", "signed".toByteArray()) } + } + + assertThat(producerRecord.headers()).isEmpty() + + interceptor.onSend(producerRecord) + + assertThat(producerRecord.headers()).isNotEmpty() + assertThat(producerRecord.headers().lastHeader("signature")).isNotNull + } +} diff --git a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/TestHelper.kt b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/TestHelper.kt new file mode 100644 index 0000000..9d7d9d0 --- /dev/null +++ b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/TestHelper.kt @@ -0,0 +1,18 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 +package com.gxf.utilities.kafka.message.signing + +import java.util.function.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.Header + +object TestHelper { + fun producerRecordToConsumerRecord(producerRecord: ProducerRecord): ConsumerRecord { + val consumerRecord = + ConsumerRecord(producerRecord.topic(), 0, 123L, producerRecord.key(), producerRecord.value()) + producerRecord.headers().forEach(Consumer { header: Header? -> consumerRecord.headers().add(header) }) + return consumerRecord + } +} diff --git a/oslp-message-signing/build.gradle.kts b/oslp-message-signing/build.gradle.kts index 58d2cb9..df636f7 100644 --- a/oslp-message-signing/build.gradle.kts +++ b/oslp-message-signing/build.gradle.kts @@ -3,7 +3,7 @@ dependencies { implementation(libs.slf4jApi) testImplementation(libs.junitJupiterApi) - testImplementation(libs.mockitoKotlin) + testImplementation(libs.mockk) testImplementation(libs.assertJ) testRuntimeOnly(libs.junitJupiterEngine) testRuntimeOnly(libs.junitPlatformLauncher)