Skip to content

Not able to add value from feeder in Kafka Header #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vijayjoshi16 opened this issue Jul 8, 2024 · 6 comments · May be fixed by #15
Closed

Not able to add value from feeder in Kafka Header #13

vijayjoshi16 opened this issue Jul 8, 2024 · 6 comments · May be fixed by #15

Comments

@vijayjoshi16
Copy link

Hi,
I'm trying to add value from a feeder to header in a Kafka request but not able to do so.
I initially set up feeders in a way similar to this:

scn = scn.feed(
  Iterator.continually(Map(UUID.toString -> getRandomValue(UUID, None)))
)

def getRandomValue(
      valueType: RandomValueTypes.Value,
      length: Option[Int]
  ): String = {
    valueType match {
      case UUID              => java.util.UUID.randomUUID().toString
  }
}

And I build my Kafka scenario as:

case kafkaRequest : KafkaRequest =>
val kafkaRequestKey = kafkaRequest.key.getOrElse("")
val kafkaRequestBody = kafkaRequest.body
val kafkaRequestHeaders = mapToRecordHeaders(kafkaRequest.headers)
scn = scn.group(getNameWithTestId(ReportSections.KAFKA_GROUP, testId)) {
  exec(
    kafka(kafkaRequest.name).send[String, String](
      key = kafkaRequestKey,
      payload = kafkaRequestBody,
      headers = kafkaRequestHeaders
    )
  )
}

private def mapToRecordHeaders(
    headersOpt: Option[Map[String, String]]
): RecordHeaders = {
val recordHeaders = new RecordHeaders()
headersOpt.foreach { headers =>
  headers.foreach { case (key, value) =>
    recordHeaders.add(key, value.getBytes("UTF-8"))
  }
}
recordHeaders
}

The feeders work fine for kafka key and kafka body but do not work for headers as I get #{UUID} printed as it is rather than the value

I found a similar issue on another plugin, but the steps over there are also not helping me: Tinkoff/gatling-kafka-plugin#110

Can anyone help me out since I see that this plugin doesn't seem to support feeders for headers?

@vijayjoshi16
Copy link
Author

@jigarkhwar @red-bashmak

@GharbiNoussair
Copy link

Hello there ! any updates on this ? i am facing the same issue and didn't find any solution for that so far.

@RussGarratt
Copy link
Contributor

Also would like to be able to use this please! any update on the merge request?
Thanks

@jigarkhwar
Copy link
Contributor

Hello guys, I'll try to review this PR by the weekend

@red-bashmak
Copy link
Contributor

red-bashmak commented Mar 11, 2025

@vijayjoshi16 It support session variables, but little different way. See this example:

import org.galaxio.gatling.kafka.Predef._
import io.gatling.core.Predef._

...

val kafkaRequestKey = kafkaRequest.key.getOrElse("")
val kafkaRequestBody = kafkaRequest.body

val feederUuid: Feeder[String] = Iterator.continually(Map("UUID" -> java.util.UUID.randomUUID().toString))

val header: Expression[Headers] = session => session("UUID").validate[String] // here UUID is name of session variable
    .map(uuid => new RecordHeaders().add("uuid-header", uuid.getBytes))

val scn = scenario("KafkaTest").feed(feederUuid)
    .exec(kafka("KafkaMessage").send[String, String](kafkaRequestKey, kafkaRequestBody, header))

@red-bashmak
Copy link
Contributor

Should work with #18

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants