diff --git a/build.gradle b/build.gradle index dd1e9e6675..c7a5f38c4e 100644 --- a/build.gradle +++ b/build.gradle @@ -27,7 +27,7 @@ ext { scapegoatVersion = '3.2.3' junitVersion = '6.0.1' - testContainerVersion = '0.43.6' + testContainerVersion = '0.44.0' scriptsLocation = 'gradle' + File.separator + 'scripts' + File.separator // location of script plugins diff --git a/src/test/scala/edu/ie3/simona/test/KafkaSpecLike.scala b/src/test/scala/edu/ie3/simona/test/KafkaSpecLike.scala index 4267fa9e29..82bed2a0f3 100644 --- a/src/test/scala/edu/ie3/simona/test/KafkaSpecLike.scala +++ b/src/test/scala/edu/ie3/simona/test/KafkaSpecLike.scala @@ -14,41 +14,48 @@ import org.testcontainers.utility.DockerImageName import java.util.concurrent.TimeUnit import scala.jdk.CollectionConverters.* +import scala.util.{Failure, Success, Try} /** Adapted from * https://kafka-tutorials.confluent.io/produce-consume-lang/scala.html */ -trait KafkaSpecLike extends BeforeAndAfterAll { - this: TestSuite => +trait KafkaSpecLike extends BeforeAndAfterAll { this: TestSuite => + /** Topics that should exist in the test broker */ protected val testTopics: Seq[Topic] - protected val kafka: KafkaContainer = KafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka:7.3.1") - ) - protected lazy val admin: Admin = Admin.create( - Map[String, AnyRef]("bootstrap.servers" -> kafka.bootstrapServers).asJava - ) + /** Kafka container definition; started in [[beforeAll]] */ + protected lazy val kafka: KafkaContainer = + KafkaContainer(DockerImageName.parse("apache/kafka:3.7.0")) + + /** Create an Admin client once the container is running */ + protected def createAdmin(): Admin = + Admin.create(Map("bootstrap.servers" -> kafka.bootstrapServers).asJava) override def beforeAll(): Unit = { super.beforeAll() kafka.start() - val result = admin.createTopics( - testTopics.map { topic => - new NewTopic( - topic.name, - topic.partitions, - topic.replicationFactor, - ) - }.asJava - ) - - // wait for result, throw exception if applicable - result.all().get(1, TimeUnit.MINUTES) + + val result = Try { + val admin = createAdmin() + try { + val topics = testTopics + .map(t => new NewTopic(t.name, t.partitions, t.replicationFactor)) + .asJava + admin.createTopics(topics).all().get(15, TimeUnit.SECONDS) + } finally { + admin.close() + } + } + + result match { + case Success(_) => + case Failure(ex) => + throw new IllegalStateException("Failed to create Kafka topics", ex) + } } override def afterAll(): Unit = { - admin.close() kafka.stop() super.afterAll() }