Skip to content

Commit 3ff2b1f

Browse files
authored
Merge pull request #94 from OSGP/feature/FDP-3387-signing-byte-array-record
FDP-3387: Signing for ByteArray messages and via ProducerInterceptor
2 parents 950f6fd + 79f5595 commit 3ff2b1f

File tree

15 files changed

+526
-41
lines changed

15 files changed

+526
-41
lines changed

gradle/libs.versions.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ avro = "1.12.0"
77
avroPlugin = "1.9.1"
88
dependencyManagement = "1.1.7"
99
kotlin = "2.2.10"
10-
mockitoKotlin = "6.0.0"
10+
mockk = "1.14.5"
1111
msal4j = "1.23.0"
1212
sonarqube = "6.3.1.5724"
1313
spotless = "7.2.1"
@@ -22,7 +22,7 @@ junitJupiterApi = { group = "org.junit.jupiter", name = "junit-jupiter-api" }
2222
junitJupiterEngine = { group = "org.junit.jupiter", name = "junit-jupiter-engine" }
2323
junitPlatformLauncher = { group = "org.junit.platform", name = "junit-platform-launcher" }
2424
kafkaClients = { group = "org.apache.kafka", name = "kafka-clients" }
25-
mockitoKotlin = { group = "org.mockito.kotlin", name = "mockito-kotlin", version.ref = "mockitoKotlin" }
25+
mockk = { group = "io.mockk", name = "mockk", version.ref = "mockk" }
2626
msal = { group = "com.microsoft.azure", name = "msal4j", version.ref = "msal4j" }
2727
slf4jApi = { group = "org.slf4j", name = "slf4j-api" }
2828
kotlinLoggingJvm = { group = "io.github.oshai", name = "kotlin-logging-jvm", version.ref = "kotlinLogging" }
@@ -33,6 +33,7 @@ springBootStarterTest = { group = "org.springframework.boot", name = "spring-boo
3333
springBootTest = { group = "org.springframework.boot", name = "spring-boot-test" }
3434
springContext = { group = "org.springframework", name = "spring-context" }
3535
springKafka = { group = "org.springframework.kafka", name = "spring-kafka" }
36+
springKafkaTest = { group = "org.springframework.kafka", name = "spring-kafka-test" }
3637
springTest = { group = "org.springframework", name = "spring-test" }
3738

3839
[plugins]

kafka-message-signing/build.gradle.kts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,29 @@ dependencies {
1717
testImplementation(libs.springTest)
1818
testImplementation(libs.springBootTest)
1919
testImplementation(libs.springBootStarter)
20-
testImplementation(libs.mockitoKotlin)
20+
testImplementation(libs.mockk)
2121

2222
testRuntimeOnly(libs.junitPlatformLauncher)
2323
}
2424

2525
tasks.test {
2626
useJUnitPlatform()
2727
}
28+
29+
testing {
30+
suites {
31+
val integrationTest by registering(JvmTestSuite::class) {
32+
useJUnitJupiter()
33+
dependencies {
34+
implementation(project())
35+
implementation(libs.springBootStarterTest)
36+
implementation(libs.springKafka)
37+
implementation(libs.springKafkaTest)
38+
implementation(libs.kafkaClients)
39+
implementation(libs.assertJ)
40+
implementation(libs.avro)
41+
implementation(project(":kafka-avro"))
42+
}
43+
}
44+
}
45+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
package com.gxf.utilities.kafka.message
5+
6+
import com.gxf.utilities.kafka.avro.AvroDeserializer
7+
import com.gxf.utilities.kafka.avro.AvroSerializer
8+
import com.gxf.utilities.kafka.message.signing.MessageSigner
9+
import com.gxf.utilities.kafka.message.signing.interceptors.MessageSigningAvroProducerInterceptor
10+
import com.gxf.utilities.kafka.message.signing.interceptors.MessageSigningByteArrayProducerInterceptor
11+
import java.util.UUID
12+
import org.apache.avro.Schema
13+
import org.apache.avro.specific.SpecificRecordBase
14+
import org.apache.kafka.clients.consumer.Consumer
15+
import org.apache.kafka.clients.producer.Producer
16+
import org.apache.kafka.clients.producer.ProducerConfig
17+
import org.apache.kafka.common.serialization.ByteArrayDeserializer
18+
import org.apache.kafka.common.serialization.ByteArraySerializer
19+
import org.apache.kafka.common.serialization.StringDeserializer
20+
import org.apache.kafka.common.serialization.StringSerializer
21+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
22+
import org.springframework.kafka.core.DefaultKafkaProducerFactory
23+
import org.springframework.kafka.test.EmbeddedKafkaBroker
24+
import org.springframework.kafka.test.utils.KafkaTestUtils
25+
26+
object IntegrationTestHelper {
27+
fun createByteArrayKafkaConsumer(
28+
embeddedKafkaBroker: EmbeddedKafkaBroker,
29+
topic: String,
30+
): Consumer<String, ByteArray> {
31+
val consumerFactory =
32+
DefaultKafkaConsumerFactory(
33+
KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "true", embeddedKafkaBroker),
34+
StringDeserializer(),
35+
ByteArrayDeserializer(),
36+
)
37+
val consumer = consumerFactory.createConsumer()
38+
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic)
39+
return consumer
40+
}
41+
42+
fun createByteArrayKafkaProducer(
43+
embeddedKafkaBroker: EmbeddedKafkaBroker,
44+
messageSigner: MessageSigner,
45+
): Producer<String, ByteArray> {
46+
val producerProps: Map<String, Any> =
47+
HashMap(byteArrayProducerProps(embeddedKafkaBroker.brokersAsString, messageSigner))
48+
val producerFactory = DefaultKafkaProducerFactory(producerProps, StringSerializer(), ByteArraySerializer())
49+
return producerFactory.createProducer()
50+
}
51+
52+
private fun byteArrayProducerProps(brokers: String, messageSigner: MessageSigner): Map<String, Any> {
53+
return mapOf(
54+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to brokers,
55+
ProducerConfig.BATCH_SIZE_CONFIG to "16384",
56+
ProducerConfig.LINGER_MS_CONFIG to 1,
57+
ProducerConfig.BUFFER_MEMORY_CONFIG to "33554432",
58+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
59+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
60+
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG to MessageSigningByteArrayProducerInterceptor::class.java.name,
61+
"message.signer" to messageSigner,
62+
)
63+
}
64+
65+
fun createAvroKafkaConsumer(
66+
embeddedKafkaBroker: EmbeddedKafkaBroker,
67+
topic: String,
68+
): Consumer<String, SpecificRecordBase> {
69+
val consumerFactory =
70+
DefaultKafkaConsumerFactory(
71+
KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "true", embeddedKafkaBroker),
72+
StringDeserializer(),
73+
AvroDeserializer(listOf(Message.getClassSchema())),
74+
)
75+
val consumer = consumerFactory.createConsumer()
76+
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic)
77+
return consumer
78+
}
79+
80+
fun createAvroKafkaProducer(
81+
embeddedKafkaBroker: EmbeddedKafkaBroker,
82+
messageSigner: MessageSigner,
83+
): Producer<String, SpecificRecordBase> {
84+
val producerProps: Map<String, Any> =
85+
HashMap(avroProducerProps(embeddedKafkaBroker.brokersAsString, messageSigner))
86+
val producerFactory = DefaultKafkaProducerFactory(producerProps, StringSerializer(), AvroSerializer())
87+
return producerFactory.createProducer()
88+
}
89+
90+
private fun avroProducerProps(brokers: String, messageSigner: MessageSigner): Map<String, Any> {
91+
return mapOf(
92+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to brokers,
93+
ProducerConfig.BATCH_SIZE_CONFIG to "16384",
94+
ProducerConfig.LINGER_MS_CONFIG to 1,
95+
ProducerConfig.BUFFER_MEMORY_CONFIG to "33554432",
96+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
97+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to AvroSerializer::class.java,
98+
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG to MessageSigningAvroProducerInterceptor::class.java.name,
99+
"message.signer" to messageSigner,
100+
)
101+
}
102+
}
103+
104+
class Message(private var message: String?) : SpecificRecordBase() {
105+
constructor() : this(null) {}
106+
107+
companion object {
108+
fun getClassSchema(): Schema =
109+
Schema.Parser()
110+
.parse(
111+
"""{"type":"record","name":"Message","namespace":"com.gxf.utilities.kafka.message","fields":[{"name":"message","type":{"type":"string","avro.java.string":"String"}}]}"""
112+
)
113+
}
114+
115+
override fun getSchema() = getClassSchema()
116+
117+
override fun get(field: Int): Any {
118+
return message!!
119+
}
120+
121+
override fun put(field: Int, value: Any) {
122+
message = value.toString()
123+
}
124+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
package com.gxf.utilities.kafka.message
5+
6+
import com.gxf.utilities.kafka.message.IntegrationTestHelper.createAvroKafkaConsumer
7+
import com.gxf.utilities.kafka.message.IntegrationTestHelper.createAvroKafkaProducer
8+
import com.gxf.utilities.kafka.message.IntegrationTestHelper.createByteArrayKafkaConsumer
9+
import com.gxf.utilities.kafka.message.IntegrationTestHelper.createByteArrayKafkaProducer
10+
import com.gxf.utilities.kafka.message.signing.MessageSigner
11+
import com.gxf.utilities.kafka.message.signing.MessageSigningAutoConfiguration
12+
import java.time.Duration
13+
import org.apache.avro.specific.SpecificRecordBase
14+
import org.apache.kafka.clients.producer.ProducerRecord
15+
import org.assertj.core.api.Assertions.assertThat
16+
import org.junit.jupiter.api.Test
17+
import org.springframework.beans.factory.annotation.Autowired
18+
import org.springframework.boot.test.context.SpringBootTest
19+
import org.springframework.kafka.test.EmbeddedKafkaBroker
20+
import org.springframework.kafka.test.context.EmbeddedKafka
21+
import org.springframework.test.annotation.DirtiesContext
22+
23+
@SpringBootTest(classes = [MessageSigningAutoConfiguration::class])
24+
@EmbeddedKafka(topics = ["test-topic"])
25+
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
26+
class MessageSigningInterceptorIT {
27+
@Autowired private lateinit var embeddedKafkaBroker: EmbeddedKafkaBroker
28+
@Autowired private lateinit var messageSigner: MessageSigner
29+
30+
val topic = "test-topic"
31+
32+
@Test
33+
fun `can sign ByteArray message with interceptor`() {
34+
val producer = createByteArrayKafkaProducer(embeddedKafkaBroker, messageSigner)
35+
val consumer = createByteArrayKafkaConsumer(embeddedKafkaBroker, topic)
36+
37+
val unsignedRecord = ProducerRecord(topic, "key", "value".toByteArray())
38+
producer.send(unsignedRecord)
39+
40+
val records = consumer.poll(Duration.ofSeconds(5)).records(topic)
41+
42+
assertThat(records).hasSize(1)
43+
44+
val receivedRecord = records.first()
45+
val signatureHeader = receivedRecord.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE)
46+
47+
assertThat(signatureHeader).isNotNull
48+
assertThat(signatureHeader.value()).isNotEmpty()
49+
assertThat(messageSigner.verifyByteArrayRecordUsingHeader(receivedRecord)).isTrue()
50+
}
51+
52+
@Test
53+
fun `can sign Avro message with interceptor`() {
54+
val producer = createAvroKafkaProducer(embeddedKafkaBroker, messageSigner)
55+
val consumer = createAvroKafkaConsumer(embeddedKafkaBroker, topic)
56+
57+
val unsignedRecord = ProducerRecord<String, SpecificRecordBase>(topic, "key", Message("value"))
58+
producer.send(unsignedRecord)
59+
60+
val polled = consumer.poll(Duration.ofSeconds(5))
61+
val records = polled.records(topic)
62+
63+
assertThat(records).hasSize(1)
64+
65+
val receivedRecord = records.first()
66+
val signatureHeader = receivedRecord.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE)
67+
68+
assertThat(signatureHeader).isNotNull
69+
assertThat(signatureHeader.value()).isNotEmpty()
70+
}
71+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
message-signing:
2+
signing-enabled: true
3+
strip-avro-header: true
4+
private-key-file: classpath:rsa-private.pem
5+
public-key-file: classpath:rsa-public.pem
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
-----BEGIN PRIVATE KEY-----
2+
MIIEuwIBADANBgkqhkiG9w0BAQEFAASCBKUwggShAgEAAoIBAQC4gFY4wg/M+lFb
3+
J22C9WdT/QREetu8bNp6/OL7h1/3MOkNHgX3/QmC524yvfiYrT4eYoTdB77+xzqV
4+
EOs3KyaYpLneIPFRVg5/H7mWVcaEli/7WWvgjJV2MDMMkd6e6tcBtTibF9Eo7GHu
5+
aIO5qlpnDoONNE0ON3iuuhCBGMsAnH0nqNlfl6oAwJvQT7PJEarVK6a3EuLD3qNP
6+
4j/XkxLkl2LST9cKG1HMmxBjhHVijHbi7SS/NxCMrJj5rrwOiirHk7GaygCE/gk+
7+
jqMS5TedIzboUnEzcLNJzRut4iyvNtkGk/EuaBmeT2fhiYXTMhMdUe/LWxSIPrbn
8+
+Qfg9peLAgMBAAECgf911YE7vrVYtIZBMW0/tnvHqFCRa+Xq8ZqX8esFufAQb6xo
9+
NESbnX/1mtlGjw22dO63eTRh9hCFp7hCfAu9sFF2K76Jn4B/8fZQOnLNEPs4srLy
10+
VKoRP7g1Q5NW89K6rEGlVZhRdWSgPlhNa3K1oZcbOqnKBR0xdzwcW7kuuibsu++K
11+
0Lo9GNCqzgX34xT9CvmlEs2VsZGV/dYufu+pFFkyp2iahnVbJWGOfcbrk47OwO3I
12+
LjDL6sXmad6OWWhwW7aPcIujEeeBGprZv3T07ikOpTFFrZMbkM/AhP4WPE5gtqJ7
13+
UsD9R2QEXbS2k4Epow3xh+KxkxGbsySzjzor+5ECgYEA3Y2kIZHJjCnvDAPLTktJ
14+
lWkaROiBmzvk8ExPZ3X8q07IDZAqLQ/z3vvT4tluGezilfOsVs4tX6qX9Yx5omJu
15+
4exQ3bB3ra2l/RNKftZrGqsBrr3+tx0HTf1+Zuavt6LFmNKWcS7IXmX6ntF3NwZT
16+
TZwTCinLHv/wIEsQJSR7dPsCgYEA1S/v8N6FOZvqUL3IbYPZXScZStsZQmVvQHFw
17+
Nt1B1xocq8ETj4ua8VLvXW7qjPZZp7qhJBgDUaije4wuHFpFUmDkt5yi5R9bAhAY
18+
miFAx6HMlytQDWhk0y+bC4K62EvTFzAkRlvzr8vkfDeOcafahGZ+k9xXLmVwpBOW
19+
y6u2QrECgYB3vYGjkwN2+YL032gV3KLcWX2VGIRTvb8yEEwqAp6Yh2+fxPbGfAS1
20+
0yzlQdY7tMeRe6z9DVmAhtayI0Xp+YEsIWhjKGjGOT+o07BDdOdV9m5mXtE3bjzw
21+
bbzPKIZ3nUVmHwqoCTzJqBwXkeX4mzaSj3PK3mOlUXYaPfdv25PN2QKBgFxNSA66
22+
2WXK+tWAhgHcn0T0w7+kQzh7IIL/Wn12qKYQOS+oBecVo70uklKazlS/6Kt1Y4V0
23+
HCPD5xx74g8GipMTPpO87s5TGB05iN1a3mhQxnsBFsTnWRgSuYdvT6SMl9WnU11f
24+
PI/1sHSTvUm6SiMfGVi9gsWkx/NSQ+zk6KHRAoGBALI0HbHBfv8flVf18wJpeFpP
25+
X2aEf1IVD+RNnpqaQK64JBu8SoUjt/3qyluptKS/ilAGVPU1KUQqKMHQiGtnKl/m
26+
ZfjuNlnsjh9C5X7Fl/ja9T1SxAyIWPUWT8QIpbOxPmMmyFfK/ZRJ1DDHUA9dohVA
27+
cwNSRNC5PwoEgT8+tfO1
28+
-----END PRIVATE KEY-----
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-----BEGIN PUBLIC KEY-----
2+
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuIBWOMIPzPpRWydtgvVn
3+
U/0ERHrbvGzaevzi+4df9zDpDR4F9/0JguduMr34mK0+HmKE3Qe+/sc6lRDrNysm
4+
mKS53iDxUVYOfx+5llXGhJYv+1lr4IyVdjAzDJHenurXAbU4mxfRKOxh7miDuapa
5+
Zw6DjTRNDjd4rroQgRjLAJx9J6jZX5eqAMCb0E+zyRGq1SumtxLiw96jT+I/15MS
6+
5Jdi0k/XChtRzJsQY4R1Yox24u0kvzcQjKyY+a68Dooqx5OxmsoAhP4JPo6jEuU3
7+
nSM26FJxM3CzSc0breIsrzbZBpPxLmgZnk9n4YmF0zITHVHvy1sUiD625/kH4PaX
8+
iwIDAQAB
9+
-----END PUBLIC KEY-----

0 commit comments

Comments
 (0)