In this lecture I will introduce you to transactions in Apache Kafka.
Why would you use transactions in Apache Kafka? To achieve a couple of very important goals.
Let's have a look at the following diagram. A typical Kafka application follows this pattern. Consumer -> Process -> Producer. We will have Kafka topic from which Kafka consumer will consume messages. This topic can be called transfer for example, and microservice application will be called transfers microservice. The goal of this imaginary application will be to transfer a certain amount of money from one account to another account.
When a new event gets recorded to a transfer topic, then transfer microservice will consume it and will start processing it. Now it is common that the same microservice application acts as consumer and producer at the same time. So while processing this Kafka message transfer microservice might produce a new message to another topic that is called withdrawal topic. For example, to read messages from withdrawal topic, you will have another microservice called withdraw microservice. It will withdraw the requested amount of money from user's account. So your transfer microservice will send a new Kafka message to withdrawal topic and will continue to the next step. But then something happens and this application crashes. It did not finish processing this message successfully. And to Kafka, the message in Kafka topic is not considered as successfully consumed, and this means that it will be delivered again to another active instance of this microservice. And when this message is consumed again, producer API in this microservice will publish a new message to withdrawal topic again. This time application does not crash and continues its execution. It will then produce another message to a different topic that is called deposits and deposits. Microservice will be the one to process it.
We requested to withdraw money from user's account twice, but deposited one time only. We want to avoid situations like this and Kafka transactions can help us with this. So let's see how it works if we use transactions in this application. Transfer topic will receive a new message transfer microservice will consume this message and will start processing it because we will enable transactions in this microservice, the following operations will be executed within one single transaction. Kafka producer will publish a new message to a withdrawal topic. But because this transaction did not complete yet, withdrawal microservice will not see this message. It will not see this message also, because we are going to configure this and deposit microservice to consume only those messages that have been successfully committed until transaction is completed successfully.
Message in Kafka topic is marked as uncommitted and uncommitted messages will not be visible to Kafka consumers that are configured to read committed messages only. If at this moment transfer microservice crashes, or if an exception takes place, then transaction will be aborted and message will remain in topic marked as uncommitted. So technically message is still in the topic, but it is marked as uncommitted. And those consumers that are configured to read committed messages only, they will not see this message. But if everything goes well and no error takes place, then transfer microservice will publish a new message to deposits topic.
This message will also be initially marked as uncommitted until transaction completes. Both of these messages remain invisible to consumer microservices, but if our method completes successfully, transaction will complete as well, and Kafka will mark both messages as committed. Once this message is are committed, they will become visible to consumer microservices and consumer microservices will be able to consume them. As you can see, transactions help us write to multiple topics atomically and transactions help us achieve all or nothing behavior. If we need to write to multiple topics, then it is either all writes are successful or none of them are.
Now your transfer microservice can also have Java code that sends. It should be request to another remote service. This Http request, it will not be part of the same transaction. And if Kafka transaction is aborted, then this Http request will not be rolled back. To handle situations like this, you will need to implement compensating transactions. And the idea behind compensating transactions is to be able to undo operations that span multiple remote microservices. But in this section of the course, I will not be covering compensating transactions and I will focus on Kafka transactions only.
And similarly with database transactions, if your transfer microservice needs to write to a database, then technically this will be a separate transaction. Apache Kafka does not manage database transactions, but even though Kafka does not manage database transactions, Spring Framework does provide a good integration for managing Kafka and database transactions, and this integration will help us rollback both database transaction and Kafka transaction as well.
I have written and uploaded few required microservices into GitHub repository, kindly clone or download the source code and import it into your IDE.
To enable the kafka transactions using application.properties first we need to open the kafka producer which is TransferService, and add below properties in application.properties file.
spring.kafka.producer.transaction-id-prefix=transfer-service-${random.value}-
logging.level.org.springframework.kafka.transaction=TRACE
logging.level.org.springframework.transaction=TRACE
To enable the kafka transactions using @Bean method, first we need to open the kafka producer which is TransferService, and add below code in KafkaConfig class. For full code you can always refer if to the attached above repos.
// Member variable declaration
@Value("${spring.kafka.producer.transaction-id-prefix}")
private String transactionalIdPrefix;
// Property declaration
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalIdPrefix);
// Update the existing method with method argument.
@Bean
KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
// Declare new method
@Bean
KafkaTransactionManager<String, Object> kafkaTransactionManager(ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
Now, we will learn how the Kafka Producer send both Kafka messages within a single transaction, Let see the example which has been given below. In TransferService we have implemented the tranfer method which is sending two different requests to two different topics one is withdrawal event request and another is deposit event request. Now we need to make changes to this method to send both of these transactions as single transaction. So that it is either all messages are delivered or the transaction is aborted, and none of these messages are delivered to my consumer microservices. And to do that, I can annotate this method with transactional annotation.
If we annotate the tranfer method as @Transactional annotation then all operations in this method that support transactions, they should be treated as a single unit of work, meaning that they should either all succeed or none of them should succeed. If the method completes successfully, then transaction commits. Otherwise, transaction rolls back.
@Transactional(value = "kafkaTransactionManager")
@Override
public boolean transfer(TransferRestModel transferRestModel) {
WithdrawalRequestedEvent withdrawalEvent = new WithdrawalRequestedEvent(transferRestModel.getSenderId(),
transferRestModel.getRecepientId(), transferRestModel.getAmount());
DepositRequestedEvent depositEvent = new DepositRequestedEvent(transferRestModel.getSenderId(),
transferRestModel.getRecepientId(), transferRestModel.getAmount());
try {
kafkaTemplate.send(environment.getProperty("withdraw-money-topic", "withdraw-money-topic"),
withdrawalEvent);
LOGGER.info("Sent event to withdrawal topic.");
// Business logic that causes and error
callRemoteServce();
kafkaTemplate.send(environment.getProperty("deposit-money-topic", "deposit-money-topic"), depositEvent);
LOGGER.info("Sent event to deposit topic");
} catch (Exception ex) {
LOGGER.error(ex.getMessage(), ex);
throw new TransferServiceException(ex);
}
return true;
}