Exceptions & Retires

Introduction to Exception handling in kafka consumer and retires

In previous section of the course, we talked about errors that can take place when Kafka Consumer is trying to deserialize message from the topic to recover from errors. We configured our application to handle error and then send message that's causing error to a dead letter topic. What if email notification microservice could not send email message due to some other error? It could be that email server was restarting and was temporarily unavailable. Now notice that in this case, email notification microservice or Kafka consumer did consume message successfully.

Message format was valid and message was successfully consumed from Kafka topic. It is just our microservice was not able to process this message due to some other errors. In this case, we need to identify if the error that took place is recoverable. In other words, will it help if we retry this operation again? If the error took place because email server was restarting, then this operation is retrievable and it makes sense to retry this operation a few seconds later. But if error that took place is not retrievable, like for example if nullpointerexception is taking place, then retry this operation again and again will not make sense. In this case, we will publish message to a dead letter topic.

Create Retryable and NotRetryable Exceptions classes

Inside the EmailNotification microservice (consumer) will create the Retryable and NotRetryable exception classes, like below.

RetryableException


    package com.navabitsolutions.email.notification.ms.exceptions;

    public class RetryableException extends RuntimeException {
        public RetryableException(String message) {
            super(message);
        }
        public RetryableException(Throwable throwable) {
            super(throwable);
        }
    }

NotRetryableException


    package com.navabitsolutions.email.notification.ms.exceptions;

    public class NotRetryableException extends Exception {
        public NotRetryableException(String message) {
            super(message);
        }
        public NotRetryableException(Throwable throwable) {
            super(throwable);
        }
    }
Configure DefaultErrorHandler with a list of NotRetryable exceptions

Now will learn how to configure the NotRetryable exception with the DefaultErrorHandler, for this first will go to the consumer microservice which is EmailNotification microservice, open the KafkaConsumerConfiguration class and inside this class we have a method called kafkaListenerContainerFactory inside this method we have added the code to handle the error which are occuring again and again and send the Bad message to Dead Letter Topic.

We can use the addRetryableException method to add a list of user defained excpetions to handle the repeated error again and again and send the Bad messages which are consumered to failed will be send to the Dead Letter Topic.


    DefaultErrorHandler errorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate));
    errorHandler.addNotRetryableExceptions(NotRetryableException.class);
Register RetryableException and Define wait time interval

RetryableException can be added to the DefaultErrorHandler which is available in KafkaConsumerConfiguration class. Which is help to print the retryable exception message. Along with that we can define the wait time interval using the FixedBackOff class like shown in below code snippet. Here 5000 is wait time interval in seconds and 3 indicate number of times the message can be retry.


    DefaultErrorHandler errorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate),
                new FixedBackOff(5000, 3));
    errorHandler.addNotRetryableExceptions(NotRetryableException.class);
    errorHandler.addRetryableExceptions(RetryableException.class);
Throwing a Retryable Exception

Now we will learn how retryable exception works, and this time we will work on little bit more complex example. I want to introduce a problem in case of which retry an operation will make sense. So let's assume that after receiving product created event code in our Kafka handler method needs to send Http request to a different remote microservice. And in this case, if Http request to that different microservice is not successful, then retry in this operation a few seconds later will make sense. And this is because it is possible that that external microservice could not respond to our Http request only because it was temporarily unavailable, maybe because of a temporary network connection, or because it was too busy servicing other Http requests, or because it was restarting. A few seconds later it will become available.

So what we need to do is we will send the Http request to external microservice. And then if in response we receive exception, then we will catch that exception and throw retry exception instead. And because it is going to be retry able exception that we will throw, our Kafka consumer will consume the message again and it will invoke the Kafka handler method again.

In our Spring Boot application, we can use the RestTemplate to send the remote service call. Create a bean in EmailNotification microservice like below.


    @Bean
	public RestTemplate restTemplate() {
		return new RestTemplate();
	}

Once created the bean then next we need to inject the RestTemple into the product created event handler, need to implement the code to send the the HTTP request call to external microservice. Implement the below changes.


    package com.navabitsolutions.email.notification.ms.handler;

    import com.kafka.ws.core.ProductCreatedEvent;
    import com.navabitsolutions.email.notification.ms.exceptions.NotRetryableException;
    import com.navabitsolutions.email.notification.ms.exceptions.RetryableException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.HttpMethod;
    import org.springframework.http.HttpStatus;
    import org.springframework.http.ResponseEntity;
    import org.springframework.kafka.annotation.KafkaHandler;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    import org.springframework.web.client.HttpServerErrorException;
    import org.springframework.web.client.ResourceAccessException;
    import org.springframework.web.client.RestTemplate;

    @Component
    @KafkaListener(topics="product-created-events-topic")
    public class ProductCreatedEventHandler {

        private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());

        @Autowired
        private RestTemplate restTemplate;

        @KafkaHandler
        public void handle(ProductCreatedEvent productCreatedEvent) throws NotRetryableException {
        //  if(true) throw new NotRetryableException("An error took place. No need to consume the message again");
            LOGGER.info("Received a new event {}", productCreatedEvent.getTitle());

            String requestUrl = "http://localhost:8082";
            try {
                ResponseEntity<String> response = restTemplate.exchange(requestUrl, HttpMethod.GET, null, String.class);
                if (response.getStatusCode().value() == HttpStatus.OK.value()) {
                    LOGGER.info("Received response from the remote service {}", response.getBody());
                }
            } catch(ResourceAccessException ex) {
                LOGGER.error(ex.getMessage());
                throw new RetryableException(ex);
            } catch (HttpServerErrorException ex) {
                LOGGER.error(ex.getMessage());
                throw new NotRetryableException(ex);
            } catch (Exception ex) {
                LOGGER.error(ex.getMessage());
                throw new NotRetryableException(ex);
            }
        }
    }

External microservice

I have created the simple external microservice called mock-service, which will be used to get the 200 and 500 http status responses. I have uploaded the mock-service code in github repository, you can dowload the same from following link mock-service

Updated full source code is available in GitHub repository Kafka Consumer: Email Notification Microservice