Kafka Message Testing – DZone – Uplaza

This text affords an strategy to writing integration assessments for Kafka-based purposes that focuses on interplay specification, making assessments extra readable and simpler to keep up. The proposed methodology not solely enhances testing effectivity but additionally contributes to a greater understanding of the combination processes throughout the software.

The article builds on three concepts offered in related articles: writing assessments with a transparent separation of Organize-Act-Assert levels, isolation in Kafka assessments, and utilizing instruments to reinforce take a look at visibility. I like to recommend reviewing these earlier than delving into the fabric of this text.

Demonstration State of affairs

Let’s take a Telegram bot that forwards requests to the OpenAI API and returns the outcome to the consumer for example. If the request to OpenAI violates the system’s safety guidelines, the consumer will probably be notified. Moreover, a message will probably be despatched to Kafka for the behavioral management system in order that the supervisor can contact the consumer, clarify that their request was too delicate even for our bot, and ask them to overview their preferences.

The interplay contracts with companies are described in a simplified method to emphasise the core logic. Beneath is a sequence diagram demonstrating the appliance’s structure. I perceive that the design could increase questions from a system structure perspective, however please strategy it with understanding — the primary purpose right here is to show the strategy to writing assessments.

Message Seize

The primary testing instrument would be the message seize object — RecordCaptor. Its operation is kind of just like the outgoing request seize object — RequestCaptor, which might be examine within the article Ordering Chaos: Arranging HTTP Request Testing in Spring (linked earlier).

Message seize will probably be carried out by way of a regular Kafka shopper. The checklist of matters should be specified explicitly through a configuration parameter.

@KafkaListener(id = "recordCaptor", matters = "#{'${test.record-captor.topics}'.split(',')}", groupId = "test")
public void eventCaptorListener(ConsumerRecord

The RecordCaptor object accumulates info from captured messages.

Utilizing this strategy requires adhering to isolation in Kafka assessments. Ready for offset commit affirmation earlier than verifying take a look at outcomes ought to be carried out utilizing the KafkaSupport#waitForPartitionOffsetCommit methodology.

Take a look at Instance

Beneath is the take a look at code for the described state of affairs.

def "User Message Processing with OpenAI"() {
    setup:
    KafkaSupport.waitForPartitionAssignment(applicationContext)                           // 1
    and:                                                                                  // 2
    def openaiRequestCaptor = restExpectation.openai.completions(withBadRequest().contentType(APPLICATION_JSON)
            .physique("""{
                "error": {
                "code": "content_policy_violation",
                "message": "Your request was rejected as a result of our safety system."
                }
            }"""))
    def telegramRequestCaptor = restExpectation.telegram.sendMessage(withSuccess('{}', APPLICATION_JSON))
    when:
    mockMvc.carry out(publish("/telegram/webhook")                                             // 3
            .contentType(APPLICATION_JSON_VALUE)
            .content material("""{
                "message": {
                "from": {
                    "id": 10000000
                },
                "chat": {
                    "id": 20000000
                },
                "text": "Hello!"
                }
            }""".toString())
            .settle for(APPLICATION_JSON_VALUE))
            .andExpect(standing().isOk())
    KafkaSupport.waitForPartitionOffsetCommit(applicationContext)                         // 4
    then:
    openaiRequestCaptor.instances == 1                                                        // 5
    JSONAssert.assertEquals("""{
        "content": "Hello!"
    }""", openaiRequestCaptor.bodyString, false)
    and:
    telegramRequestCaptor.instances == 1
    JSONAssert.assertEquals("""{
        "chatId": "20000000",
        "text": "Your request was rejected as a result of our safety system."
    }""", telegramRequestCaptor.bodyString, false)
    when:                                                                                 // 6
    def message = recordCaptor.getRecords("topicC", "20000000").final
    then:
    message != null
    JSONAssert.assertEquals("""{
        "webhookMessage": {
        "message": {
            "chat": {
            "id": "20000000"
            },
            "text": "Hello!"
        }
        },
        "error": {
            "code": "content_policy_violation",
            "message": "Your request was rejected as a result of our safety system."
        }
    }""", message.worth as String, false)
}

Key steps:

  1. Anticipate partition task earlier than beginning the take a look at state of affairs.
  2. Mock requests to OpenAI and Telegram.
  3. Execute the take a look at state of affairs.
  4. Anticipate offset affirmation.
  5. Confirm requests to OpenAI and Telegram.
  6. Verify the message in Kafka.

Utilizing JSONAssert.assertEquals ensures consistency in information illustration throughout Kafka messages, logs, and assessments. This simplifies testing by offering flexibility as compared and accuracy in error prognosis.

The article offers an instance with JSON message format; different codecs should not coated, however the described strategy doesn’t impose format restrictions.

How To Discover Your Message in RecordCaptor

Messages in RecordCaptor are organized by matter identify and key. Within the offered take a look at, the important thing used is the Kafka message key. When sending, we explicitly specify it:

sendMessage("topicC", chatId, ...);
...
non-public void sendMessage(String matter, String key, Object payload) {
    Message message = MessageBuilder
            .withPayload(objectMapper.writeValueAsString(payload))
            .setHeader(KafkaHeaders.TOPIC, matter)
            .setHeader(KafkaHeaders.KEY, key)                          

To go looking by message key inside a subject:

when:                                                                                
def message = recordCaptor.getRecords("topicC", "20000000").final       

If this feature is just not appropriate, it’s essential describe your individual indexes primarily based on message parameters for setting up the search. An instance might be seen within the assessments PolicyViolationTestsCustomIndex.groovy.

Connecting RecordCaptor

The code for connecting RecordCaptor appears to be like as follows:

@TestConfiguration(proxyBeanMethods = false)
public class RecordCaptorConfiguration {
    @Bean
    RecordCaptor recordCaptor() {
        return new RecordCaptor();
    }

    @Bean
    RecordCaptorConsumer recordCaptorConsumer(RecordCaptor recordCaptor) {
        return new RecordCaptorConsumer(recordCaptor, new RecordSnapshotMapper());
    }
}

OffsetSnapshotFrame

Expertise has proven that working with Kafka-based purposes requires instruments to facilitate understanding the state of customers and message consumption standing. For this job, you’ll be able to examine matter offsets and shopper teams within the offset affirmation ready operation and log discrepancies, as illustrated within the picture:

The code for OffsetComparisonFrame is offered for overview.

Conclusion

Testing messages in Kafka utilizing the proposed strategy not solely simplifies take a look at writing but additionally makes it extra structured and comprehensible. Using instruments like RecordCaptor, in addition to adhering to isolation rules and clear separation of take a look at levels, ensures excessive accuracy and effectivity.

Thanks for studying the article, and good luck in your efforts to put in writing efficient and clear assessments!

Share This Article
Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Exit mobile version