Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ avro = "1.12.0"
avroPlugin = "1.9.1"
dependencyManagement = "1.1.7"
kotlin = "2.2.10"
mockitoKotlin = "6.0.0"
mockk = "1.14.5"
msal4j = "1.23.0"
sonarqube = "6.3.1.5724"
spotless = "7.2.1"
Expand All @@ -22,7 +22,7 @@ junitJupiterApi = { group = "org.junit.jupiter", name = "junit-jupiter-api" }
junitJupiterEngine = { group = "org.junit.jupiter", name = "junit-jupiter-engine" }
junitPlatformLauncher = { group = "org.junit.platform", name = "junit-platform-launcher" }
kafkaClients = { group = "org.apache.kafka", name = "kafka-clients" }
mockitoKotlin = { group = "org.mockito.kotlin", name = "mockito-kotlin", version.ref = "mockitoKotlin" }
mockk = { group = "io.mockk", name = "mockk", version.ref = "mockk" }
msal = { group = "com.microsoft.azure", name = "msal4j", version.ref = "msal4j" }
slf4jApi = { group = "org.slf4j", name = "slf4j-api" }
kotlinLoggingJvm = { group = "io.github.oshai", name = "kotlin-logging-jvm", version.ref = "kotlinLogging" }
Expand All @@ -33,6 +33,7 @@ springBootStarterTest = { group = "org.springframework.boot", name = "spring-boo
springBootTest = { group = "org.springframework.boot", name = "spring-boot-test" }
springContext = { group = "org.springframework", name = "spring-context" }
springKafka = { group = "org.springframework.kafka", name = "spring-kafka" }
springKafkaTest = { group = "org.springframework.kafka", name = "spring-kafka-test" }
springTest = { group = "org.springframework", name = "spring-test" }

[plugins]
Expand Down
20 changes: 19 additions & 1 deletion kafka-message-signing/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,29 @@ dependencies {
testImplementation(libs.springTest)
testImplementation(libs.springBootTest)
testImplementation(libs.springBootStarter)
testImplementation(libs.mockitoKotlin)
testImplementation(libs.mockk)

testRuntimeOnly(libs.junitPlatformLauncher)
}

tasks.test {
useJUnitPlatform()
}

testing {
suites {
val integrationTest by registering(JvmTestSuite::class) {
useJUnitJupiter()
dependencies {
implementation(project())
implementation(libs.springBootStarterTest)
implementation(libs.springKafka)
implementation(libs.springKafkaTest)
implementation(libs.kafkaClients)
implementation(libs.assertJ)
implementation(libs.avro)
implementation(project(":kafka-avro"))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
//
// SPDX-License-Identifier: Apache-2.0
package com.gxf.utilities.kafka.message

import com.gxf.utilities.kafka.avro.AvroDeserializer
import com.gxf.utilities.kafka.avro.AvroSerializer
import com.gxf.utilities.kafka.message.signing.MessageSigner
import com.gxf.utilities.kafka.message.signing.interceptors.MessageSigningAvroProducerInterceptor
import com.gxf.utilities.kafka.message.signing.interceptors.MessageSigningByteArrayProducerInterceptor
import java.util.UUID
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.utils.KafkaTestUtils

object IntegrationTestHelper {
fun createByteArrayKafkaConsumer(
embeddedKafkaBroker: EmbeddedKafkaBroker,
topic: String,
): Consumer<String, ByteArray> {
val consumerFactory =
DefaultKafkaConsumerFactory(
KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "true", embeddedKafkaBroker),
StringDeserializer(),
ByteArrayDeserializer(),
)
val consumer = consumerFactory.createConsumer()
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic)
return consumer
}

fun createByteArrayKafkaProducer(
embeddedKafkaBroker: EmbeddedKafkaBroker,
messageSigner: MessageSigner,
): Producer<String, ByteArray> {
val producerProps: Map<String, Any> =
HashMap(byteArrayProducerProps(embeddedKafkaBroker.brokersAsString, messageSigner))
val producerFactory = DefaultKafkaProducerFactory(producerProps, StringSerializer(), ByteArraySerializer())
return producerFactory.createProducer()
}

private fun byteArrayProducerProps(brokers: String, messageSigner: MessageSigner): Map<String, Any> {
return mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to brokers,
ProducerConfig.BATCH_SIZE_CONFIG to "16384",
ProducerConfig.LINGER_MS_CONFIG to 1,
ProducerConfig.BUFFER_MEMORY_CONFIG to "33554432",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG to MessageSigningByteArrayProducerInterceptor::class.java.name,
"message.signer" to messageSigner,
)
}

fun createAvroKafkaConsumer(
embeddedKafkaBroker: EmbeddedKafkaBroker,
topic: String,
): Consumer<String, SpecificRecordBase> {
val consumerFactory =
DefaultKafkaConsumerFactory(
KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "true", embeddedKafkaBroker),
StringDeserializer(),
AvroDeserializer(listOf(Message.getClassSchema())),
)
val consumer = consumerFactory.createConsumer()
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic)
return consumer
}

fun createAvroKafkaProducer(
embeddedKafkaBroker: EmbeddedKafkaBroker,
messageSigner: MessageSigner,
): Producer<String, SpecificRecordBase> {
val producerProps: Map<String, Any> =
HashMap(avroProducerProps(embeddedKafkaBroker.brokersAsString, messageSigner))
val producerFactory = DefaultKafkaProducerFactory(producerProps, StringSerializer(), AvroSerializer())
return producerFactory.createProducer()
}

private fun avroProducerProps(brokers: String, messageSigner: MessageSigner): Map<String, Any> {
return mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to brokers,
ProducerConfig.BATCH_SIZE_CONFIG to "16384",
ProducerConfig.LINGER_MS_CONFIG to 1,
ProducerConfig.BUFFER_MEMORY_CONFIG to "33554432",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to AvroSerializer::class.java,
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG to MessageSigningAvroProducerInterceptor::class.java.name,
"message.signer" to messageSigner,
)
}
}

class Message(private var message: String?) : SpecificRecordBase() {
constructor() : this(null) {}

companion object {
fun getClassSchema(): Schema =
Schema.Parser()
.parse(
"""{"type":"record","name":"Message","namespace":"com.gxf.utilities.kafka.message","fields":[{"name":"message","type":{"type":"string","avro.java.string":"String"}}]}"""
)
}

override fun getSchema() = getClassSchema()

override fun get(field: Int): Any {
return message!!
}

override fun put(field: Int, value: Any) {
message = value.toString()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
//
// SPDX-License-Identifier: Apache-2.0
package com.gxf.utilities.kafka.message

import com.gxf.utilities.kafka.message.IntegrationTestHelper.createAvroKafkaConsumer
import com.gxf.utilities.kafka.message.IntegrationTestHelper.createAvroKafkaProducer
import com.gxf.utilities.kafka.message.IntegrationTestHelper.createByteArrayKafkaConsumer
import com.gxf.utilities.kafka.message.IntegrationTestHelper.createByteArrayKafkaProducer
import com.gxf.utilities.kafka.message.signing.MessageSigner
import com.gxf.utilities.kafka.message.signing.MessageSigningAutoConfiguration
import java.time.Duration
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.clients.producer.ProducerRecord
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.test.annotation.DirtiesContext

@SpringBootTest(classes = [MessageSigningAutoConfiguration::class])
@EmbeddedKafka(topics = ["test-topic"])
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
class MessageSigningInterceptorIT {
@Autowired private lateinit var embeddedKafkaBroker: EmbeddedKafkaBroker
@Autowired private lateinit var messageSigner: MessageSigner

val topic = "test-topic"

@Test
fun `can sign ByteArray message with interceptor`() {
val producer = createByteArrayKafkaProducer(embeddedKafkaBroker, messageSigner)
val consumer = createByteArrayKafkaConsumer(embeddedKafkaBroker, topic)

val unsignedRecord = ProducerRecord(topic, "key", "value".toByteArray())
producer.send(unsignedRecord)

val records = consumer.poll(Duration.ofSeconds(5)).records(topic)

assertThat(records).hasSize(1)

val receivedRecord = records.first()
val signatureHeader = receivedRecord.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE)

assertThat(signatureHeader).isNotNull
assertThat(signatureHeader.value()).isNotEmpty()
assertThat(messageSigner.verifyByteArrayRecordUsingHeader(receivedRecord)).isTrue()
}

@Test
fun `can sign Avro message with interceptor`() {
val producer = createAvroKafkaProducer(embeddedKafkaBroker, messageSigner)
val consumer = createAvroKafkaConsumer(embeddedKafkaBroker, topic)

val unsignedRecord = ProducerRecord<String, SpecificRecordBase>(topic, "key", Message("value"))
producer.send(unsignedRecord)

val polled = consumer.poll(Duration.ofSeconds(5))
val records = polled.records(topic)

assertThat(records).hasSize(1)

val receivedRecord = records.first()
val signatureHeader = receivedRecord.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE)

assertThat(signatureHeader).isNotNull
assertThat(signatureHeader.value()).isNotEmpty()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
message-signing:
signing-enabled: true
strip-avro-header: true
private-key-file: classpath:rsa-private.pem
public-key-file: classpath:rsa-public.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEuwIBADANBgkqhkiG9w0BAQEFAASCBKUwggShAgEAAoIBAQC4gFY4wg/M+lFb
J22C9WdT/QREetu8bNp6/OL7h1/3MOkNHgX3/QmC524yvfiYrT4eYoTdB77+xzqV
EOs3KyaYpLneIPFRVg5/H7mWVcaEli/7WWvgjJV2MDMMkd6e6tcBtTibF9Eo7GHu
aIO5qlpnDoONNE0ON3iuuhCBGMsAnH0nqNlfl6oAwJvQT7PJEarVK6a3EuLD3qNP
4j/XkxLkl2LST9cKG1HMmxBjhHVijHbi7SS/NxCMrJj5rrwOiirHk7GaygCE/gk+
jqMS5TedIzboUnEzcLNJzRut4iyvNtkGk/EuaBmeT2fhiYXTMhMdUe/LWxSIPrbn
+Qfg9peLAgMBAAECgf911YE7vrVYtIZBMW0/tnvHqFCRa+Xq8ZqX8esFufAQb6xo
NESbnX/1mtlGjw22dO63eTRh9hCFp7hCfAu9sFF2K76Jn4B/8fZQOnLNEPs4srLy
VKoRP7g1Q5NW89K6rEGlVZhRdWSgPlhNa3K1oZcbOqnKBR0xdzwcW7kuuibsu++K
0Lo9GNCqzgX34xT9CvmlEs2VsZGV/dYufu+pFFkyp2iahnVbJWGOfcbrk47OwO3I
LjDL6sXmad6OWWhwW7aPcIujEeeBGprZv3T07ikOpTFFrZMbkM/AhP4WPE5gtqJ7
UsD9R2QEXbS2k4Epow3xh+KxkxGbsySzjzor+5ECgYEA3Y2kIZHJjCnvDAPLTktJ
lWkaROiBmzvk8ExPZ3X8q07IDZAqLQ/z3vvT4tluGezilfOsVs4tX6qX9Yx5omJu
4exQ3bB3ra2l/RNKftZrGqsBrr3+tx0HTf1+Zuavt6LFmNKWcS7IXmX6ntF3NwZT
TZwTCinLHv/wIEsQJSR7dPsCgYEA1S/v8N6FOZvqUL3IbYPZXScZStsZQmVvQHFw
Nt1B1xocq8ETj4ua8VLvXW7qjPZZp7qhJBgDUaije4wuHFpFUmDkt5yi5R9bAhAY
miFAx6HMlytQDWhk0y+bC4K62EvTFzAkRlvzr8vkfDeOcafahGZ+k9xXLmVwpBOW
y6u2QrECgYB3vYGjkwN2+YL032gV3KLcWX2VGIRTvb8yEEwqAp6Yh2+fxPbGfAS1
0yzlQdY7tMeRe6z9DVmAhtayI0Xp+YEsIWhjKGjGOT+o07BDdOdV9m5mXtE3bjzw
bbzPKIZ3nUVmHwqoCTzJqBwXkeX4mzaSj3PK3mOlUXYaPfdv25PN2QKBgFxNSA66
2WXK+tWAhgHcn0T0w7+kQzh7IIL/Wn12qKYQOS+oBecVo70uklKazlS/6Kt1Y4V0
HCPD5xx74g8GipMTPpO87s5TGB05iN1a3mhQxnsBFsTnWRgSuYdvT6SMl9WnU11f
PI/1sHSTvUm6SiMfGVi9gsWkx/NSQ+zk6KHRAoGBALI0HbHBfv8flVf18wJpeFpP
X2aEf1IVD+RNnpqaQK64JBu8SoUjt/3qyluptKS/ilAGVPU1KUQqKMHQiGtnKl/m
ZfjuNlnsjh9C5X7Fl/ja9T1SxAyIWPUWT8QIpbOxPmMmyFfK/ZRJ1DDHUA9dohVA
cwNSRNC5PwoEgT8+tfO1
-----END PRIVATE KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuIBWOMIPzPpRWydtgvVn
U/0ERHrbvGzaevzi+4df9zDpDR4F9/0JguduMr34mK0+HmKE3Qe+/sc6lRDrNysm
mKS53iDxUVYOfx+5llXGhJYv+1lr4IyVdjAzDJHenurXAbU4mxfRKOxh7miDuapa
Zw6DjTRNDjd4rroQgRjLAJx9J6jZX5eqAMCb0E+zyRGq1SumtxLiw96jT+I/15MS
5Jdi0k/XChtRzJsQY4R1Yox24u0kvzcQjKyY+a68Dooqx5OxmsoAhP4JPo6jEuU3
nSM26FJxM3CzSc0breIsrzbZBpPxLmgZnk9n4YmF0zITHVHvy1sUiD625/kH4PaX
iwIDAQAB
-----END PUBLIC KEY-----
Loading
Loading