Blogg

Här finns tekniska artiklar, presentationer och nyheter om arkitektur och systemutveckling. Håll dig uppdaterad, följ oss på Twitter

Callista medarbetare Björn Gylling

Kafka and Go - consuming messages

// Björn Gylling

How do you approach using Kafka to store your data when building a HTTP api? In this series we build an application which would traditionally be backed by a classic database, the basic ‘Create-Read-Update-Delete’ api. Instead of a traditional database we store our data in Kafka and over the series look at some of the issues with this approach and how you can solve them.

1. Introduction

We begin by consuming data from Kafka and serve it over HTTP. The resource we are handling is a simple user object, but it could be anything. The users will be persisted as records on a Kafka topic and the application will expose a HTTP api to handle the users. My plan is to start off relatively simple and then we improve the application over a few blogposts, hopefully learning about Kafka and Go along the way. The goal is not to end up with a production-grade solution, but rather to learn about Kafka.

2. Kafka setup

I prefer using an alternative Kafka implementation called Redpanda when experimenting with Kafka applications. Redpanda is a bit easier and quicker to setup and it has a simple CLI tool to setup and interact with your Kafka cluster. See their documentation on how to install the tool. The application we are building will work fine with the official Kafka implementation as well but some instructions on how to setup and interact with Kafka will differ.

Start up a Redpanda cluster in Docker.

$ rpk container start

  NODE ID  ADDRESS
  0        127.0.0.1:50278

Take note of the address specified here as we will be using it for all our interactions with the cluster going forward.

Create our user topic, setting the partition count to 10, we will come back to partitions in the future. Note the cleanup policy we specify here is not the default one but it will serve us well for this usecase. We will come back to what this means in Part 2.

$ rpk topic --brokers 127.0.0.1:50278 create -p 10 -c cleanup.policy=compact user

Create a few users

$ echo 'bob_smith@example.com {"name":"Bob Smith","email":"bob_smith@example.com"}
sarah_black@example.com {"name":"Sarah Black","email":"sarah_black@example.com"}
emma_turner@example.com {"name":"Emma Turner","email":"emma_turner@example.com"}
helen_garcia@example.com {"name":"Helen Garcia","email":"helen_garcia@example.com"}' | rpk --brokers 127.0.0.1:50278 topic produce user -f '%k %v\n'

3. Consume messages with Go

We start off with a simple Go program which will consume users from the Kafka topic we have created. There are a few different Kafka client Go modules, I prefer github.com/twmb/franz-go.

func main() {
   var broker string  
   flag.StringVar(&broker, "b", "localhost:50278", "Kafka broker")
   flag.Parse()

   // Connect to the Redpanda broker and consume the user topic
   seeds := []string{broker}
   cl, err := kgo.NewClient(  
      kgo.SeedBrokers(seeds...),  
      kgo.ConsumeTopics("user"),  
   )  
   if err != nil {  
      panic(err)  
   }  
   defer cl.Close()  
  
   // Run our consume loop in a separate Go routine
   ctx := context.Background()  
   go consume(ctx, cl)  
  
   // Shutdown gracefully
   sigs := make(chan os.Signal, 2)  
   signal.Notify(sigs, os.Interrupt)  
  
   <-sigs  
   fmt.Println("received interrupt signal; closing client")  
   done := make(chan struct{})  
   go func() {  
      defer close(done)  
      cl.Close()  
   }()  
  
   select {  
   case <-sigs:  
      fmt.Println("received second interrupt signal; quitting without waiting for graceful close")  
   case <-done:  
   }  
}  
  
func consume(ctx context.Context, cl *kgo.Client) {  
   for {  
      fetches := cl.PollFetches(ctx)  
      fetches.EachPartition(func(p kgo.FetchTopicPartition) {  
         for _, record := range p.Records {  
            fmt.Printf("%s (p=%d): %s\n", string(record.Key), record.Partition, string(record.Value))  
         }  
      })  
   }  
}

As you can see if you run this program it will output the users.

4. Introduce state

The beginnings of our HTTP api will be a simple endpoint which takes a single parameter, the user email. It will return the user object if it exists on the Kafka topic. In order to have the user record available when a client requests it we will have to build up a state of all the users in our application as we consume them from Kafka.

func main() {  
   var broker, listenAddr string  
   flag.StringVar(&broker, "b", "localhost:50278", "Kafka broker")  
   flag.StringVar(&listenAddr, "l", ":8080", "HTTP listen address")  
   flag.Parse()  
  
   ...
  
   users := map[string]string{}  
   // Start serving HTTP requests  
   httpShutdown := serveHttp(listenAddr, users)  
  
   // Run our consume loop in a separate Go routine  
   ctx := context.Background()  
   go consume(ctx, cl, users)  
  
   ...
}

func serveHttp(addr string, users map[string]string) func(ctx context.Context) error {  
   mux := http.NewServeMux()  
  
   mux.HandleFunc("/user", func(w http.ResponseWriter, r *http.Request) {  
      email := r.URL.Query().Get("email")  
      fmt.Printf("http: %s: /user?email=%s\n", r.Method, email)  
      u, ok := users[email]  
      if !ok {  
         http.NotFound(w, r)  
         return  
      }  
      _, err := w.Write([]byte(u))  
      if err != nil {  
         panic(err)  
      }  
   })  
  
   s := http.Server{  
      Addr:    addr,  
      Handler: mux,  
   }  
   go func() {  
      if err := s.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {  
         panic(err)  
      }  
   }()  
  
   return s.Shutdown  
}

Running the program allows us to fetch a user using curl:

$ curl "localhost:8081/user?email=bob_smith@example.com"
{"name":"Bob Smith","email":"bob_smith@example.com"}

In this example we use a simple map[string]string to represent our state. This solution as it is leaves us open to a potential concurrency issue where the state could be read and written at the same time since the consumer runs in one goroutine and all HTTP requests are served in their own goroutines. See for yourself by running the program with go run -race main.go and do a request using curl. To avoid this we can protect the state using a sync.RWMutex.

type UserStore struct {  
   l sync.RWMutex  
   u map[string]string  
}  
  
func NewUserStore() *UserStore {  
   return &UserStore{u: map[string]string{}}  
}  
  
func (u *UserStore) Get(email string) (string, bool) {  
   u.l.RLock()  
   defer u.l.RUnlock()  
   user, ok := u.u[email]  
   return user, ok  
}  
  
func (u *UserStore) Set(email string, user string) {  
   u.l.Lock()  
   defer u.l.Unlock()  
   u.u[email] = user  
}

5. Summary

This is a basic demonstration of how to consume data from Kafka and exposing it over HTTP. There are flaws with this approach, perhaps the biggest one is the lack of a consumergroup in this setup. We will introduce consumer groups in a future post. But before that we will implement the HTTP endpoints to create, update and delete users.

Full source is available at github.

Part 2 is available here!

Tack för att du läser Callistas blogg.
Hjälp oss att nå ut med information genom att dela nyheter och artiklar i ditt nätverk.

Kommentarer