How to Set Up Kafka Integration Test – Grape Up

Jaime E. Love


Do you think about device screening as not ample remedy for trying to keep the application’s trustworthiness and stability? Are you fearful that by some means or someplace there is a probable bug hiding in the assumption that device tests really should cover all cases? And also is mocking Kafka not enough for task prerequisites? If even one particular reply is  ‘yes’, then welcome to a pleasant and straightforward manual on how to set up Integration Assessments for Kafka working with TestContainers and Embedded Kafka for Spring!

What is TestContainers?

TestContainers is an open-supply Java library specialised in supplying all necessary methods for the integration and screening of exterior sources. It usually means that we are able to mimic an real database, website server, or even an celebration bus ecosystem and take care of that as a trustworthy spot to take a look at application performance. All these fancy characteristics are hooked into docker images, outlined as containers. Do we need to test the database layer with real MongoDB? No worries, we have a check container for that. We can not also forget about about UI assessments – Selenium Container will do something that we essentially will need.
In our case, we will target on Kafka Testcontainer.

What is Embedded Kafka?

As the identify suggests, we are going to deal with an in-memory Kafka instance, completely ready to be utilized as a ordinary broker with entire functionality. It allows us to perform with producers and shoppers, as normal, earning our integration checks light-weight. 

In advance of we begin

The concept for our check is very simple – I would like to take a look at Kafka shopper and producer working with two distinct ways and verify how we can employ them in real cases. 

Kafka Messages are serialized employing Avro schemas.

Embedded Kafka – Producer Exam

The strategy is quick – let us make a basic task with the controller, which invokes a company method to press a Kafka Avro serialized message.


implementation "org.apache.avro:avro:1.10.1"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'

implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Also really worth mentioning great plugin for Avro. Listed here plugins area:

id 'org.springframework.boot' variation '2.6.8'
id 'io.spring.dependency-management' variation '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" version "1.3."

Avro Plugin supports schema vehicle-producing. This is a will have to-have.

Website link to plugin:

Now let us define the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "style": "report",
  "name": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "": "String"


Our ProducerService will be focused only on sending messages to Kafka employing a template, very little remarkable about that portion. Most important operation can be performed just utilizing this line:

ListenableFuture> long run = this.kafkaTemplate.mail("sign-up-request", kafkaMessage)

We simply cannot ignore about take a look at properties:

    enable-bean-definition-overriding: legitimate
      group-id: team_id
      car-offset-reset: earliest
      vital-deserializer: org.apache.kafka.typical.serialization.StringDeserializer
      price-deserializer: com.grapeup.myawesome.myawesomeconsumer.common.CustomKafkaAvroDeserializer
      auto.register.schemas: real
      vital-serializer: org.apache.kafka.typical.serialization.StringSerializer
      value-serializer: com.grapeup.myawesome.myawesomeconsumer.typical.CustomKafkaAvroSerializer
      specific.avro.reader: accurate

As we see in the pointed out take a look at qualities, we declare a customized deserializer/serializer for KafkaMessages. It is hugely proposed to use Kafka with Avro – don’t let JSONs retain object composition, let us use civilized mapper and object definition like Avro.


general public course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    general public CustomKafkaAvroSerializer() 
        tremendous.schemaRegistry = new MockSchemaRegistryClient()

    general public CustomKafkaAvroSerializer(SchemaRegistryClient consumer) 
        super(new MockSchemaRegistryClient())

    community CustomKafkaAvroSerializer(SchemaRegistryClient client, Map props) 
        super(new MockSchemaRegistryClient(), props)


general public course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    general public CustomKafkaAvroSerializer() 
        super.schemaRegistry = new MockSchemaRegistryClient()

    general public CustomKafkaAvroSerializer(SchemaRegistryClient consumer) 
        tremendous(new MockSchemaRegistryClient())

    public CustomKafkaAvroSerializer(SchemaRegistryClient shopper, Map props) 
        super(new MockSchemaRegistryClient(), props)

And we have almost everything to start off composing our exam.

@TestInstance(TestInstance.Lifecycle.For each_Course)
@ActiveProfiles("take a look at")
@EmbeddedKafka(partitions = 1, subjects = "sign up-ask for")
course ProducerControllerTest {

All we want to do is insert @EmbeddedKafka annotation with listed subject areas and partitions. Software Context will boot Kafka Broker with supplied configuration just like that. Maintain in mind that @TestInstance ought to be applied with particular thing to consider. Lifecycle.For each_Class will stay clear of creating the same objects/context for every single check system. Truly worth examining if exams are far too time-consuming.

Shopper consumerServiceTest
void Setup() 
DefaultKafkaConsumerFactory client = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()

consumerServiceTest = customer.createConsumer()
consumerServiceTest.subscribe(Collections.singletonList(Subject matter_Title))

Below we can declare the test client, primarily based on the Avro schema return form. All Kafka properties are previously furnished in the .yml file. That purchaser will be utilized as a look at if the producer essentially pushed a concept.

Here is the precise take a look at strategy:

void whenValidInput_therReturns200() throws Exception 
        RegisterRequestDto ask for = RegisterRequestDto.builder()


      ConsumerRecord consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, Subject_Name)

        RegisterRequest valueReceived = consumedRegisterRequest.benefit()

        assertEquals(12, valueReceived.getId())
        assertEquals("tempAddress", valueReceived.getAddress())

Very first of all, we use MockMvc to conduct an action on our endpoint. That endpoint utilizes ProducerService to push messages to Kafka. KafkaConsumer is made use of to confirm if the producer labored as envisioned. And that is it – we have a fully doing work exam with embedded Kafka.

Test Containers – Shopper Check

TestContainers are absolutely nothing else like unbiased docker pictures all set for becoming dockerized. The next check state of affairs will be improved by a MongoDB picture. Why not continue to keep our knowledge in the database appropriate after everything took place in Kafka move?

Dependencies are not much distinct than in the former case in point. The pursuing techniques are necessary for take a look at containers:

testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'

set('testcontainersVersion', "1.17.1")

mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"

Let’s aim now on the Purchaser component. The take a look at circumstance will be uncomplicated – 1 purchaser assistance will be responsible for getting the Kafka concept and storing the parsed payload in the MongoDB assortment. All that we want to know about KafkaListeners, for now, is that annotation:

@KafkaListener(subject areas = "sign-up-ask for")

By the functionality of the annotation processor, KafkaListenerContainerFactory will be accountable to generate a listener on our system. From this second our approach will respond to any impending Kafka message with the described matter.

Avro serializer and deserializer configs are the exact as in the earlier exam.

Pertaining to TestContainer, we really should begin with the next annotations:

community class AbstractIntegrationTest {

During startup, all configured TestContainers modules will be activated. It suggests that we will get entry to the complete operating ecosystem of the selected resource. As illustration:

private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

general public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)

As a consequence of booting the test, we can hope two docker containers to begin with the provided configuration.

What is genuinely important for the mongo container – it gives us whole obtain to the databases working with just a uncomplicated relationship uri. With these types of a feature, we are able to take a look what is the present condition in our collections, even for the duration of debug method and prepared breakpoints.
Consider a appear also at the Ryuk container – it will work like overwatch and checks if our containers have began effectively.

And below is the final element of the configuration:

static void dataSourceProperties(DynamicPropertyRegistry registry) 
   registry.insert("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.add("spring.kafka.customer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.include("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.insert("spring.details.mongodb.uri", mongoDBContainer::getReplicaSetUrl)

   mongoDBContainer.start off()

   mongoDBContainer.waitingFor(Hold out.forListeningPort()

public void beforeTest() 

           messageListenerContainer -> 
                       .waitForAssignment(messageListenerContainer, 1)


static void tearDown() 

DynamicPropertySource presents us the possibility to set all desired setting variables through the take a look at lifecycle. Strongly wanted for any config functions for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for each listener to get expected partitions through container startup.

And the final element of the Kafka take a look at containers journey – the major entire body of the exam:

community void containerStartsAndPublicPortIsAvailable() throws Exception 
   writeToTopic("sign up-ask for", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").establish())

   //Wait for KafkaListener
   Assertions.assertEquals(1, taxiRepository.findAll().size())

personal KafkaProducer createProducer() 
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties())

non-public void writeToTopic(String topicName, RegisterRequest... registerRequests) 

   check out (KafkaProducer producer = createProducer())
               .forEach(registerRequest -> 
                           ProducerRecord record = new ProducerRecord<>(topicName, registerRequest)
                           producer.send out(history)

The tailor made producer is accountable for composing our concept to KafkaBroker. Also, it is suggested to give some time for consumers to take care of messages properly. As we see, the message was not just eaten by the listener, but also stored in the MongoDB selection.


As we can see, current options for integration checks are fairly straightforward to employ and maintain in initiatives. There is no issue in trying to keep just device checks and counting on all traces lined as a signal of code/logic quality. Now the problem is, ought to we use an Embedded option or TestContainers? I propose to start with of all concentrating on the term “Embedded”. As a best integration check, we want to get an just about best duplicate of the creation ecosystem with all properties/capabilities involved. In-memory methods are good, but mostly, not sufficient for massive company initiatives. Surely, the benefit of Embedded services is the straightforward way to put into practice these types of checks and keep configuration, just when everything takes place in memory.
TestContainers at the first sight may search like overkill, but they give us the most vital attribute, which is a separate natural environment. We really don’t have to even count on existing docker photos – if we want we can use custom kinds. This is a big improvement for prospective test scenarios.
What about Jenkins? There is no rationale to be worried also to use TestContainers in Jenkins. I firmly recommend checking TestContainers documentation on how quickly we can established up the configuration for Jenkins agents.
To sum up – if there is no blocker or any unwanted issue for utilizing TestContainers, then don’t wait. It is generally great to retain all companies managed and secured with integration take a look at contracts.


Supply connection

Next Post

Which Retirement Plan is Best for your Small Business?

[ad_1] A 2021 poll found that 59% of Americans don’t believe they will ever save enough to retire. In fact, a 2020 PwC survey found that 25% of American adults have no retirement savings at all. Those who do aren’t saving nearly enough in their retirement plans. Due – Due […]