Välkommen till Callista Enterprise blogg - här finns tekniska artiklar, presentationer och nyheter om arkitektur och systemutveckling. Håll dig uppdaterad genom att följa oss på Twitter.
Callista Enterprise medarbetare Björn Beskow

Synchronous Request-Reply using Apache Kafka

// Björn Beskow

Event Driven Architectures in general and Apache Kafka specifically have gained lots of attention lately. To realize the full benefits of an Event Driven Architecture, the event delegation mechanism must be inherently asynchronous. There may however be some specific use cases/flows where a Synchronous Request-Reply semantics is needed. This blog post shows how to realize Request Reply using Apache Kafka.

Apache Kafka is by design inherently asynchronous. Hence Request-Reply semantics is not natural in Apache Kafka. This challenge is however not new. The Request Reply Enterprise Integration Pattern provides a proven mechanism for synchronous message exchange over asynchonous channels:

Request Reply

The Return Address pattern complements Request Reply with a mechanism for the requestor to specify to which address the reply should be sent:

Return Addess

Recently, Spring Kafka added support for the Request Reply pattern out-of-the box. Let’s have a look at how that support works:

Client Side: ReplyingKafkaTemplate

The well known Template abstraction forms the basis for the client-side part of the Spring Request-Reply mechanism.

  @Bean
  public ReplyingKafkaTemplate<String, RequestReply, RequestReply> replyKafkaTemplate(ProducerFactory<String, RequestReply> pf, KafkaMessageListenerContainer<String, RequestReply> lc) {
  return new ReplyingKafkaTemplate<>(pf, lc);
  }

That’s fairly straight forward: We setup a ReplyingKafkaTemplate that sends RequestReply request messages with String keys, and receives RequestReply reply messages with String keys. The ReplyingKafkaTemplate however needs to be backed by a Request ProducerFactory, a ReplyConsumerFactory and a MessageListenerContainer, with corresponding consumer and producer configs. Hence the needed config is rather extensive:

  @Value("${kafka.topic.car.reply}")
  private String replyTopic;

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    return props;
  }

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
  }

  @Bean
  public ProducerFactory<String, RequestReply> requestProducerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  @Bean
  public ConsumerFactory<String, RequestReply> replyConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
        new JsonSerializer<RequestReply>());
  }

  @Bean
  public KafkaMessageListenerContainer<String, RequestReply> replyListenerContainer() {
    ContainerProperties containerProperties = new ContainerProperties(replyTopic);
    return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties);
  }

With that in place, using the replyKafkaTemplate to send a synchronous reqeust and get a reply back looks like this:

  @Value("${kafka.topic.car.request}")
  private String requestTopic;

  @Value("${kafka.topic.car.reply}")
  private String replyTopic;

  @Autowired
  private ReplyingKafkaTemplate<String, RequestReply, RequestReply> requestReplyKafkaTemplate;

...
  RequestReply request = RequestReply.request(...);
  // create producer record
  ProducerRecord<String, RequestReply> record = new ProducerRecord<String, RequestReply>(requestTopic, request);
  // set reply topic in header
  record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
  // post in kafka topic
  RequestReplyFuture<String, RequestReply, RequestReply> sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);
  sendAndReceive.addCallback(new ListenableFutureCallback<ConsumerRecord<String, RequestReply>>() {
      @Override
      public void onSuccess(ConsumerRecord<String, RequestReply> result) {
        // get consumer record value
        RequestReply requestReply = consumerRecord.value();
        System.out.println("Reply: " + requestReply.getReply().toString());
      }
  });

Lots of boiler plate code and low level api’s there as well, and that old ListenableFuture API instead of the modern CompletableFuture. The requestReplyKafkaTemplate takes care of generating and setting a KafkaHeaders.CORRELATION_ID header, but we have to set the KafkaHeaders.REPLY_TOPIC header on the request explicitly. Note also that this same reply topic was redundantly wired into the replyListenerContainer above. Yuck. Not quite what I expected from a Spring abstraction.

Server Side: @SendTo

On the server side, a regular KafkaListener listening on the request topic is decorated with an additional @SendTo annotation, to provide the reply message. The object returned by the listener method is automatically wrapped into a reply message, the CORRELATION_ID added, and the reply is posted on the topic specified by the REPLY_TOPIC.

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    return props;
  }

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return props;
  }

  @Bean
  public ConsumerFactory<String, RequestReply> requestConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
        new JsonSerializer<RequestReply());
  }

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, RequestReply>> requestListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, RequestReply> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(requestConsumerFactory());
    factory.setReplyTemplate(replyTemplate());
    return factory;
  }

  @Bean
  public ProducerFactory<String, RequestReply> replyProducerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  @Bean
  public KafkaTemplate<String, RequestReply> replyTemplate() {
    return new KafkaTemplate<>(replyProducerFactory());
  }

Also quite some configuration needed, but configuration of the listener is easier:

  @KafkaListener(topics = "${kafka.topic.car.request}", containerFactory = "requestListenerContainerFactory")
  @SendTo()
  public RequestReply receive(RequestReply requestReply) {
    Request request = requestReply.getRequest();
    Reply reply = ...;
    return RequestReply.reply(reply);
  }

But what is that RequestReply class that is both parameter type and return type in the template? The signature of ReplyingKafkaTemplate<K, V, R> suggests that different Request and Reply types can be used? Well, the implementation on the server side using the @SendTo annotation seems to prevent using different types for the Request and the Reply, since the requestListenerContainerFactory needs to be configured with a reply template with the same type as the listener uses. A strange inconsistency in the API. Hence the need for the common container type used here for convenience:

public class RequestReply {
  private Object request;
  private Object reply;

  public Object getRequest() {
    return request;
  }

  public Object getReply() {
    return reply;
  }

  public static RequestReply request(Object request) {
    RequestReply requestReply = new RequestReply();
    requestReply.request = request;
    return requestReply;
  }

  public static RequestReply reply(Object reply) {
    RequestReply requestReply = new RequestReply();
    requestReply.reply = reply;
    return requestReply;
  }
}

But what about multiple consumer instances?

It sort of works, as long as we don’t use multiple consumer instances. If we have multiple client instances, we must make sure that the reply is sent back to the correct client instance. The Spring Kafka documentation suggests that each consumer may use a unique topic, or that an additional KafkaHeaders.REPLY_PARTITION header value is sent with the request, a four byte field containing a BIG-ENDIAN representation of the partition integer. Using separate topics for different clients is clearly not very flexible, hence we opt for setting the REPLY_PARTITION explicitly. The client will then need to know which partition it is assigned to. The documentation suggests using explicit configuration to select a specific partition. Let’s add that to our example:

  @Value("${kafka.topic.car.reply.partition}")
  private int replyPartition;
  
  ...
  
    @Bean
  public KafkaMessageListenerContainer<String, RequestReply> replyListenerContainer() {
    ContainerProperties containerProperties = new ContainerProperties(replyTopic);
    TopicPartitionInitialOffset initialOffset = new TopicPartitionInitialOffset(replyTopic, replyPartition);
    return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties, initialOffset);
  }
  private static byte[] intToBytesBigEndian(final int data) {
    return new byte[] {(byte) ((data >> 24) & 0xff), (byte) ((data >> 16) & 0xff),
        (byte) ((data >> 8) & 0xff), (byte) ((data >> 0) & 0xff),};
  }

  ...
  // set reply topic in header
  record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
  record.headers().add(new RecordHeader(KafkaHeaders.REPLY_PARTITION, intToBytesBigEndian(replyPartition)));
  // post in kafka topic
  RequestReplyFuture<String, RequestReply, RequestReply> sendAndReceive = requestReplyKafkaTemplate.sendAndReceive(record);
  ...

Not pretty, but it works. The configuration needed is extensive, and the APIs are kind of low level. The need for explicit partition configuration adds complexity if we need to dynamically scale number of clients. Clearly, we could do better.

Encapsulating reply topic and partition handling

Let’s start with encapsulating the Return Address pattern, passing along the reply topic and partition. The Reply topic needs to be wired into the RequestReplyTemplate, and hence shouldn’t be present in the API at all. When it comes to the reply partition, let’s do it the other way around: Retrieve which partition(s) the reply topic listener has been assigned, and pass that partition along automatically. This eliminates the need for the client to care about these headers.

public class PartitionAwareReplyingKafkaTemplate<K, V, R> extends ReplyingKafkaTemplate<K, V, R> {

  public PartitionAwareReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory,
      GenericMessageListenerContainer<K, R> replyContainer) {
    super(producerFactory, replyContainer);
  }

  private TopicPartition getFirstAssignedReplyTopicPartition() {
    if (getAssignedReplyTopicPartitions() != null &&
        getAssignedReplyTopicPartitions().iterator().hasNext()) {
      TopicPartition replyPartition = getAssignedReplyTopicPartitions().iterator().next();
      if (this.logger.isDebugEnabled()) {
        this.logger.debug("Using partition " + replyPartition.partition());
      }
      return replyPartition;
    } else {
      throw new KafkaException("Illegal state: No reply partition is assigned to this instance");
    }
  }

  private static byte[] intToBytesBigEndian(final int data) {
    return new byte[] {(byte) ((data >> 24) & 0xff), (byte) ((data >> 16) & 0xff),
        (byte) ((data >> 8) & 0xff), (byte) ((data >> 0) & 0xff),};
  }

  @Override
  public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
    TopicPartition replyPartition = getFirstAssignedReplyTopicPartition();
    record.headers()
        .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyPartition.topic().getBytes()))
        .add(new RecordHeader(KafkaHeaders.REPLY_PARTITION,
            intToBytesBigEndian(replyPartition.partition())));
    return super.sendAndReceive(record);
  }

}

Next step: Let’s adapt the ListenableFuture to the more modern CompletableFuture. While doing that, let’s streamline the API to resemble the standard KafkaTemplate:

public class CompletableFutureReplyingKafkaTemplate<K, V, R> extends PartitionAwareReplyingKafkaTemplate<K, V, R>{

  private volatile String defaultTopic;

  public CompletableFutureReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory,
      GenericMessageListenerContainer<K, R> replyContainer) {
    super(producerFactory, replyContainer);
  }

  /**
   * The default topic for send methods where a topic is not
   * provided.
   * @return the topic.
   */
  public String getDefaultTopic() {
      return this.defaultTopic;
  }

  /**
   * Set the default topic for send methods where a topic is not
   * provided.
   * @param defaultTopic the topic.
   */
  public void setDefaultTopic(String defaultTopic) {
      this.defaultTopic = defaultTopic;
  }

  public CompletableFuture<R> sendAndReceiveDefault(V value) {
    return sendAndReceiveDefault(null, value);
  }

  public CompletableFuture<R> sendAndReceiveDefault(K key, V value) {
    return sendAndReceive(this.defaultTopic, key, value);
  }

  public CompletableFuture<R> sendAndReceive(String topic, V value) {
    return sendAndReceive(topic, null, value);
  }

  public CompletableFuture<R> sendAndReceive(String requestTopic, K key, V value) {
    ProducerRecord<K, V> record = new ProducerRecord<>(requestTopic, key, value);
    RequestReplyFuture<K, V, R> reply = super.sendAndReceive(record);
    CompletableFuture<R> completableResult = new CompletableFuture<R>() {
      @Override
      public boolean cancel(boolean mayInterruptIfRunning) {
        boolean result = reply.cancel(mayInterruptIfRunning);
        super.cancel(mayInterruptIfRunning);
        return result;
      }
    };
    // Add callback to the request sending result
    reply.getSendFuture().addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
      @Override
      public void onSuccess(SendResult<K, V> sendResult) {
        // NOOP
        sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));
      }
      @Override
      public void onFailure(Throwable t) {
        completableResult.completeExceptionally(t);
      }
    });
    // Add callback to the reply
    reply.addCallback(new ListenableFutureCallback<ConsumerRecord<K, R>>() {
      @Override
      public void onSuccess(ConsumerRecord<K, R> result) {
        completableResult.complete(result.value());
      }
      @Override
      public void onFailure(Throwable t) {
        completableResult.completeExceptionally(t);
      }
    });
    return completableResult;
  }

}

Pack that up in a utility library, and we now have an API that is much more in line with the general Convention over Configuration design philosophy of Spring. This is the resulting client code:

  @Autowired
  private CompletableFutureReplyingKafkaTemplate<String,RequestReply,RequestReply> requestReplyKafkaTemplate;

...

  requestReplyKafkaTemplate.sendAndReceive(request).thenAccept(requestReply -> {
    System.out.println("Reply: " + requestReply.getReply().toString());
  }

Summing up

To summarize, Spring for Kafka 2.1 provides a fully functional implementation of the Request-Reply pattern over Apache Kafka, but the API still have some rough edges. In this blog post, we have seen that some additional abstractions and API adaptations can give a much more consistent, high-level API.

Caveat 1: One of the principal benefits of an Event Driven Architecture is the decoupling of event producers and consumers, allowing for much more flexible and evolvable systems. Relying on a synchronous Request-Reply semantics is the exact opposite, where the requestor and replyer are tightly coupled. Hence it should be used only when needed.

Caveat 2: If synchronous Request-Reply is required, an HTTP-based protocol is much simpler and more efficient than using an asynchronous channel like Apache Kafka.

Still, there may be scenarios when synchronous Request-Reply over Kafka makes sense. Choose wisely the best tool for the job.

A fully working example can be found at github.com/callistaenterprise/blog-synchronous-kafka.

Tack för att du läser Callista Enterprise blogg.
Hjälp oss att nå ut med information genom att dela nyheter och artiklar i ditt nätverk.

Kommentarer