Blogg
Här finns tekniska artiklar, presentationer och nyheter om arkitektur och systemutveckling. Håll dig uppdaterad, följ oss på LinkedIn
Här finns tekniska artiklar, presentationer och nyheter om arkitektur och systemutveckling. Håll dig uppdaterad, följ oss på LinkedIn
Event Driven Architectures in general and Apache Kafka specifically have gained lots of attention lately. To realize the full benefits of an Event Driven Architecture, the event delegation mechanism must be inherently asynchronous. There may however be some specific use cases/flows where a Synchronous Request-Reply semantics is needed. This blog post shows how to realize Request Reply using Apache Kafka.
Apache Kafka is by design inherently asynchronous. Hence Request-Reply semantics is not natural in Apache Kafka. This challenge is however not new. The Request Reply Enterprise Integration Pattern provides a proven mechanism for synchronous message exchange over asynchonous channels:
The Return Address pattern complements Request Reply with a mechanism for the requestor to specify to which address the reply should be sent:
Recently, Spring Kafka 2.1.3 added support for the Request Reply pattern out-of-the box, and version 2.2 polished some of it’s rough edges. Let’s have a look at how that support works:
The well known Template
abstraction forms the basis for the client-side part of the Spring Request-Reply mechanism.
@Bean
public ReplyingKafkaTemplate<String, Request, Reply> replyKafkaTemplate(ProducerFactory<String, Request> pf, KafkaMessageListenerContainer<String, Reply> lc) {
return new ReplyingKafkaTemplate<>(pf, lc);
}
That’s fairly straight forward: We setup a ReplyingKafkaTemplate that sends Request messages with String keys, and receives Reply messages with String keys. The ReplyingKafkaTemplate however needs to be backed by a Request ProducerFactory, a ReplyConsumerFactory and a MessageListenerContainer, with corresponding consumer and producer configs. Hence the needed config is rather extensive:
@Value("${kafka.topic.car.reply}")
private String replyTopic;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Request> requestProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public ConsumerFactory<String, Reply> replyConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonSerializer<Reply>());
}
@Bean
public KafkaMessageListenerContainer<String, Reply> replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties);
}
With that in place, using the replyKafkaTemplate to send a synchronous reqeust and get a reply back looks like this:
@Value("${kafka.topic.car.request}")
private String requestTopic;
@Value("${kafka.topic.car.reply}")
private String replyTopic;
@Autowired
private ReplyingKafkaTemplate<String, Request, Reply> requestReplyKafkaTemplate;
...
RequestReply request = RequestReply.request(...);
// create producer record
ProducerRecord<String, Request> record = new ProducerRecord<String, Request>(requestTopic, request);
// set reply topic in header
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
// post requst to kafka topic, and asynchronously get reply on the specified reply topic
RequestReplyFuture<String, Request, Reply> sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);
sendAndReceive.addCallback(new ListenableFutureCallback<ConsumerRecord<String, Reply>>() {
@Override
public void onSuccess(ConsumerRecord<String, Reply> result) {
// get consumer record value
Reply reply = result.value();
System.out.println("Reply: " + reply.toString());
}
});
Lots of boiler plate code and low level api’s there as well, and that old ListenableFuture
API instead of the modern CompletableFuture. The requestReplyKafkaTemplate takes care of generating and setting a KafkaHeaders.CORRELATION_ID
header, but we have to set the KafkaHeaders.REPLY_TOPIC
header on the request explicitly. Note also that this same reply topic was redundantly wired into the replyListenerContainer above. Yuck. Not quite what I expected from a Spring abstraction.
On the server side, a regular KafkaListener listening on the request topic is decorated with an additional @SendTo
annotation, to provide the reply message. The object returned by the listener method is automatically wrapped into a reply message, the CORRELATION_ID
added, and the reply is posted on the topic specified by the REPLY_TOPIC
.
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ConsumerFactory<String, Request> requestConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonSerializer<Request>());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Request>> requestListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Request> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(requestConsumerFactory());
factory.setReplyTemplate(replyTemplate());
return factory;
}
@Bean
public ProducerFactory<String, Reply> replyProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Reply> replyTemplate() {
return new KafkaTemplate<>(replyProducerFactory());
}
Also quite some configuration needed, but configuration of the listener is easier:
@KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory")
@SendTo()
public Reply receive(Request request) {
Reply reply = ...;
return reply;
}
It sort of works, as long as we don’t use multiple consumer instances. If we have multiple client instances, we must make sure that the reply is sent back to the correct client instance. The Spring Kafka documentation suggests that each consumer may use a unique topic, or that an additional KafkaHeaders.REPLY_PARTITION
header value is sent with the request, a four byte field containing a BIG-ENDIAN representation of the partition integer. Using separate topics for different clients is clearly not very flexible, hence we opt for setting the REPLY_PARTITION
explicitly. The client will then need to know which partition it is assigned to. The documentation suggests using explicit configuration to select a specific partition. Let’s add that to our example:
@Value("${kafka.topic.car.reply.partition}")
private int replyPartition;
...
@Bean
public KafkaMessageListenerContainer<String, RequestReply> replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition);
return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties, initialOffset);
}
private static byte[] intToBytesBigEndian(final int data) {
return new byte[] {(byte) ((data >> 24) & 0xff), (byte) ((data >> 16) & 0xff),
(byte) ((data >> 8) & 0xff), (byte) ((data >> 0) & 0xff),};
}
...
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition)));
RequestReplyFuture<String, RequestReply, RequestReply> sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);
...
Not pretty, but it works. The configuration needed is extensive, and the APIs are kind of low level. The need for explicit partition configuration adds complexity if we need to dynamically scale number of clients. Clearly, we could do better.
Let’s start with encapsulating the Return Address pattern, passing along the reply topic and partition. The Reply topic needs to be wired into the RequestReplyTemplate, and hence shouldn’t be present in the API at all. When it comes to the reply partition, let’s do it the other way around: Retrieve which partition(s) the reply topic listener has been assigned, and pass that partition along automatically. This eliminates the need for the client to care about these headers.
While doing this, let’s also complete the API to resemble the standard KafkaTemplate (overloading the sendAndReceive() method with simplified parameters, and adding corresponding overloaded methods which use a configured default topic):
public class PartitionAwareReplyingKafkaTemplate<K, V, R> extends ReplyingKafkaTemplate<K, V, R> {
public PartitionAwareReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory,
GenericMessageListenerContainer<K, R> replyContainer) {
super(producerFactory, replyContainer);
}
private TopicPartition getFirstAssignedReplyTopicPartition() {
if (getAssignedReplyTopicPartitions() != null &&
getAssignedReplyTopicPartitions().iterator().hasNext()) {
TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Using partition " + replyPartition.partition());
}
return replyPartition;
} else {
throw new KafkaException("Illegal state: No reply partition is assigned to this instance");
}
}
private static byte[] intToBytesBigEndian(final int data) {
return new byte[] {(byte) ((data >> 24) & 0xff), (byte) ((data >> 16) & 0xff),
(byte) ((data >> 8) & 0xff), (byte) ((data >> 0) & 0xff),};
}
public RequestReplyFuture<K, V, R> sendAndReceiveDefault(@Nullable V data) {
return sendAndReceive(getDefaultTopic(), data);
}
public RequestReplyFuture<K, V, R> sendAndReceiveDefault(K key, @Nullable V data) {
return sendAndReceive(getDefaultTopic(), key, data);
}
...
public RequestReplyFuture<K, V, R> sendAndReceive(String topic, @Nullable V data) {
ProducerRecord<K, V> record = new ProducerRecord<>(topic, data);
return doSendAndReceive(record);
}
public RequestReplyFuture<K, V, R> sendAndReceive(String topic, K key, @Nullable V data) {
ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, data);
return doSendAndReceive(record);
}
...
@Override
public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
return doSendAndReceive(record);
}
protected RequestReplyFuture<K, V, R> doSendAndReceive(ProducerRecord<K, V> record) {
TopicPartition replyPartition = getFirstAssignedReplyTopicPartition();
record.headers()
.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes()))
.add(new RecordHeader(KafkaHeaders.REPLY_PARTITION,
intToBytesBigEndian(replyPartition.partition())));
return super.sendAndReceive(record);
}
}
Next step: Let’s adapt the ListenableFuture to the more modern CompletableFuture.
public class CompletableFutureReplyingKafkaTemplate<K, V, R> extends PartitionAwareReplyingKafkaTemplate<K, V, R> {
public CompletableFutureReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory,
GenericMessageListenerContainer<K, R> replyContainer) {
super(producerFactory, replyContainer);
}
public CompletableFuture<R> requestReplyDefault(V value) {
return adapt(sendAndReceiveDefault(value));
}
public CompletableFuture<R> requestReplyDefault(K key, V value) {
return adapt(sendAndReceiveDefault(key, value));
}
...
public CompletableFuture<R> requestReply(String topic, V value) {
return adapt(sendAndReceive(topic, value));
}
public CompletableFuture<R> requestReply(String topic, K key, V value) {
return adapt(sendAndReceive(topic, key, value));
}
...
private CompletableFuture<R> adapt(RequestReplyFuture<K, V, R> requestReplyFuture) {
CompletableFuture<R> completableResult = new CompletableFuture<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = requestReplyFuture.cancel(mayInterruptIfRunning);
super.cancel(mayInterruptIfRunning);
return result;
}
};
// Add callback to the request sending result
requestReplyFuture.getSendFuture().addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
@Override
public void onSuccess(SendResult<K, V> sendResult) {
// NOOP
}
@Override
public void onFailure(Throwable t) {
completableResult.completeExceptionally(t);
}
});
// Add callback to the reply
requestReplyFuture.addCallback(new ListenableFutureCallback<ConsumerRecord<K, R>>() {
@Override
public void onSuccess(ConsumerRecord<K, R> result) {
completableResult.complete(result.value());
}
@Override
public void onFailure(Throwable t) {
completableResult.completeExceptionally(t);
}
});
return completableResult;
}
}
Pack that up in a utility library, and we now have an API that is much more in line with the general Convention over Configuration design philosophy of Spring. This is the resulting client code:
@Autowired
private CompletableFutureReplyingKafkaTemplate<String,Request,Reply> requestReplyKafkaTemplate;
...
requestReplyKafkaTemplate.requestReply(request).thenAccept(reply ->
System.out.println("Reply: " + reply.toString())
);
To summarize, Spring for Kafka 2.2 provides a fully functional implementation of the Request-Reply pattern over Apache Kafka, but the API still have some rough edges. In this blog post, we have seen that some additional abstractions and API adaptations can give a more consistent, high-level API.
Caveat 1: One of the principal benefits of an Event Driven Architecture is the decoupling of event producers and consumers, allowing for much more flexible and evolvable systems. Relying on a synchronous Request-Reply semantics is the exact opposite, where the requestor and replyer are tightly coupled. Hence it should be used only when needed.
Caveat 2: If synchronous Request-Reply is required, an HTTP-based protocol is much simpler and more efficient than using an asynchronous channel like Apache Kafka.
Still, there may be scenarios when synchronous Request-Reply over Kafka makes sense. Choose wisely the best tool for the job.
A fully working example can be found at github.com/callistaenterprise/blog-synchronous-kafka.