Blogg

Här finns tekniska artiklar, presentationer och nyheter om arkitektur och systemutveckling. Håll dig uppdaterad, följ oss på Twitter

Callista medarbetare Björn Gylling

Kafka and Go - producing messages

// Björn Gylling

How do you approach using Kafka to store your data when building a HTTP api? In this series we build an application which would traditionally be backed by a classic database, the basic ‘Create-Read-Update-Delete’ api. Instead of a traditional database we store our data in Kafka and over the series look at some of the issues with this approach and how you can solve them. If you want to read from the beginning, you can find Part 1 here.

1. Introduction

In Part 1 we built a simple application consuming data from Kafka and exposing it on a HTTP endpoint. This post will describe how we can produce messages to Kafka in order to add and update users.

2. Compaction

In Part 1 we created this topic with the cleanup policy compact. This instructs Kafka to keep the latest message for each key no matter how old it is, Kafka will remove old records < a newer one with the same key exists. There may be more than one message for a given key for a period of time since Kafka will not run the cleanup policy on the active log segment. You can read more about log compaction in the Kafka documentation, log compaction and cleanup is a deep topic with many configuration parameters to tune and consider.

Our user topic, created with cleanup.policy=compact, is usually called a compacted topic. This means we can produce a new updated user and the previous record for that user will be deleted by the log cleaner at some point. Since records have a guaranteed order on the same partition (and records with the same key end up on the same partition) the latest record for a given key will always be the most up to date state for that user. For our application this means we do not need to worry about multiple records with the same key, the last one is always the most up to date.

The following diagram illustrates this concept, where the record with offset 3 has the same key (u1@mail.com) as the record with offset 1. This means record 3 updates record 1 and when Kafka runs compaction the record at offset 1 will be deleted.

Kafka topic compaction example

3. Creating and updating users

Currently we store users on a topic called user. To create a new user, but also to update an existing user we simply need to produce a record to this topic. Let’s extend the HTTP handler to accept PUT requests.

mux.HandleFunc("/user", func(w http.ResponseWriter, r *http.Request) {  
   email := r.URL.Query().Get("email")  
   fmt.Printf("http: %s /user?email=%s\n", r.Method, email)  
   switch r.Method {  
   case http.MethodGet:  
      ...
   case http.MethodPut:  
      u := User{}  
      b, err := io.ReadAll(r.Body)  
      if err != nil {  
         panic(err)  
      }  
      if json.Unmarshal(b, &u) != nil {  
         panic(err)  
      }  
      if email == "" {  
         email = u.Email  
      }  
      v, err := json.Marshal(u)  
      if err != nil {  
         panic(err)  
      }  
      res := kClient.ProduceSync(r.Context(), &kgo.Record{Key: []byte(email), Value: v, Topic: "user"})  
      if err := res.FirstErr(); err != nil {  
         http.Error(w, "failed to update user", http.StatusInternalServerError)  
      }  
      w.WriteHeader(http.StatusOK)  
      return  
   }  
})

Here I have decided to use the synchronous producer in order to handle the error directly and respond correctly back to the caller. In order to increase performance we could produce asynchronous, in that case we have to provide the produce call with a different context. In the example I use the context tied to the HTTP request, this context will cancel once the response is served. If we were to use the HTTP request context in an asyncronous produce the record would most likely never be sent to Kafka.

Test it out with curl:

$ curl "localhost:8080/user?email=bob_smith@example.com"
{"name":"Bob Smith","email":"bob_smith@example.com"}

$ curl -X PUT localhost:8080/user -d '{"name":"Bob Davis","email":"bob_smith@example.com"}'

$ curl "localhost:8080/user?email=bob_smith@example.com"
{"name":"Bob Davis","email":"bob_smith@example.com"}

4. Deleting users

Deleting a user is very easy, all we have to do is produce a tombstone record for the key we delete. A tombstone record is a record where the value is null, or 0 bytes. Let’s add support for the DELETE request and produce the tombstone message.

switch r.Method {  
   case http.MethodGet:  
      ...
   case http.MethodPut:  
      ...
   case http.MethodDelete:  
      res := kClient.ProduceSync(r.Context(), &kgo.Record{Key: []byte(email), Value: []byte{}, Topic: "user"})  
      if err := res.FirstErr(); err != nil {  
         http.Error(w, "failed to delete user", http.StatusInternalServerError)  
   }

We also need to handle these tombstone messages and remove the user from our internal state, right now the consuming loop would fail due to invalid JSON. Let’s fix that!

for _, record := range p.Records {  
   fmt.Printf("%s (p=%d): %s\n", string(record.Key), record.Partition, string(record.Value))  
  
   // Handle tombstones and continue with next record  
   if len(record.Value) == 0 {  
      users.Delete(string(record.Key))  
      continue  
   }  
  
   // Update state  
   ...
}

The new Delete method on our UserStore is similar to the Set method but it deletes the entry in the map instead of updating it.

Example with curl:

$ curl "localhost:8080/user?email=bob_smith@example.com"
{"name":"Bob Davis","email":"bob_smith@example.com"}

$ curl -X DELETE "localhost:8080/user?email=bob_smith@example.com"

$ curl "localhost:8080/user?email=bob_smith@example.com"
404 page not found

5. Summary

In this post we have produced messages in Kafka and we have learned about compacted topics and tombstone messages. Next up we will dive into partitions and consumer groups.

Full source is available at github.

Part 3 is available here!

Tack för att du läser Callistas blogg.
Hjälp oss att nå ut med information genom att dela nyheter och artiklar i ditt nätverk.

Kommentarer