Blogg
Här finns tekniska artiklar, presentationer och nyheter om arkitektur och systemutveckling. Håll dig uppdaterad, följ oss på LinkedIn
Här finns tekniska artiklar, presentationer och nyheter om arkitektur och systemutveckling. Håll dig uppdaterad, följ oss på LinkedIn
Taking a quick look at Erlang-style actor architecture using Go with Ergo Framework.
Disclaimer: I’ve never written a single line of Erlang in my life, and have zero experience developing using Actor-based patterns. So do NOT read this blog post as some guide on how to use Ergo or how to design Actor-based applications.
Ergo? I guess it’s short for “ERlangGO”, since Ergo is a framework for running Erlang-style apps in Go. The project’s official github page says it rather well:
An actor-based framework with network transparency.
For creating event-driven architecture using technologies and design patterns of Erlang/OTP in Golang.
This “OTP” thing is short for “Open Telecom Platform”, which is a set of libraries and patterns commonly used when building Erlang applications.
Being a novice when it comes to the Actor programming model, I primarily refer to other great sources on the topic such as this intro from the Actor-based Akka framework for in-depth study. My view of Actor-based architecture is that it offers an enticing “decoupled-by-design” architectural pattern through:
The above tells us, that if our Actor-based software solution consists of independently executing pieces of software (Actors) having well-defined purposes that only communicates through message passing, the architecture facilitates excellent scaling characteristics since actors won’t block each other - they’ll accept a message, do some work, possibly emitting a message of their own, and then either go back to sleep or start working on the next message in the mailbox.
Mailbox? In Ergo (as in most, if not all Actor-based frameworks), Actors have mailboxes where their messages are stored in order received before being processed.
It’s important to know about the three main types of message passing in Ergo:
On top of the above, we can also use a dispatcher
to use the Pub/Sub functionality of Ergo, where the Dispatcher registers itself as “event emitter” through process.RegisterEvent(eventType, ...)
while subscribers calls process.MonitorEvent(eventType)
to register themselves, which will cause the appropriate callback method to be invoked in the consumer when a message is published.
One of the main features (or perhaps characteristics is a better word) of Actor-based frameworks such as Erlang OTP or Akka, is the supervisor hierarchy pattern and how Actors are set up in an hierarchy so crashes, errors and other unexpected events are propagated to the supervising actor. If an actor crashes, actor systems typically will restart all subordinate actors. The proto.actor (another Actor framework for .NET and Go) explains it in detail.
A really simple example could be if our system has a Web API
actor with a number of subordinate DB query
actors. If the Web API
actor crashes, we’d want all the DB query
actors to restart as well. The supervisor makes sure that the order of actor restarts follows the established hierarchy, or according to the selected supervisor strategy.
That said, let’s take Ergo for a spin. As study case, I made up some kind of make-believe traffic/vehicle monitoring system according the following high-level architecture:
In short - we have producers sending geo-position events tied to vehicle IDs to our system over UDP. To keep things structured and simple, we’ll be using msgpack as our message format.
The other use-case is some kind of BI/dashboard user that needs to be able to query the system over HTTP for vehicle positions given proximity to some Lat/Lon coordinate.
After the very brief intro above to the study applicaton and Actors, let’s take a closer look at how we’ll use Ergo Framework to build a solution that supports the following make-believe use-cases:
Our solution to the above make-believe requirements boils down to the following Actor-based architecture:
On the right we see the UDP (and a TCP) actors, that’ll accept traffic events and use the dispatcher
to publish them to two subscribers, the “ingest worker pool” and the “emergency actor”. The ingest workers will unmarshal the msgpack payload, possibly perform some validation, authenticity checks etc. before using Send
to directly transmit a position update to the Storage
actor which is responsible for writing position updates to the DB. The emergency
actor checks if there is an emergency event and in that case calls the external emergency API.
On the left we have the slightly simpler REST API that uses a web
Actor providing a HTTPS / JSON API accepting requests with a lat/lon coordinate, that then uses the Call
request/reply message type, sent directly to a pool of query workers
. The query workers have read-only access to the DB and can use the geospatial capabilities of the DB to return vehicles and positions near the supplied lat/lon. By using worker pools (both using direct request/reply and pub/sub) the solution could scale really well in a real distributed setup.
If we take a look at the hierarchy of our “traffic” application, it executes the worker pools and UDP/TCP receivers as “independent” gen.Process
processes, while the TrafficApp
forms the root of an hierarchy having both a direct subordinate (the Web actor) and also a subordinate “common supervisor” that in its turn is the parent of the storage, dispatcher and emergency actors.
I’m quite sure a seasoned Erlang/Actor/Ergo developer (rightfully) would bash the above hierarchy. It’s just an example of how hierarchies are formed. Note that supervisors can have subordinate supervisors.
I started by using the ergo tool to generate the boilerplate code, with a few actors and a default supervision strategy.
The generated directory structure roughly follows Go best-practices:
/apps/trafficapp <- supervisor, dispatcher, trafficapp, web, emergency, storage actors goes here
/cmd <- main method bootstrapping the trafficapp stuff above + some discrete actors declared directly under the root "application"
The actors under the “trafficapp” hierarchy goes into /apps/trafficapp
, while the main function that starts the server node (e.g. ergo.StartNodeWithContext(ctx, OptionNodeName, OptionNodeCookie, options)
) and the underlying processes including the supervised trafficapp goes into /cmd
.
I won’t cover most of the code involved except some important excerpts in the following sections. The full source code can be found here.
I’ve yet to find a single unit test in the official Ergo examples. A simple unit test is no problem, I wrote one for my “storage” actor using an in-memory DB, where the test code calls the HandleInfo(process *ServerProcess, message etf.Term))
callback. Works like any other unit test in Go. However, the thing is - I passed nil
as *ServerProcess
. This is fine as long as your callback code doesn’t do anything with process
such as sending a message… a rather important bit in Ergo!
To make things even more difficult, HandleInfo
accepts a pointer to the concrete type gen.ServerProcess
and not an interface. However, if we take a look at *gen.ServerProcess
, we see that it has a ProcessState
field, which in its turn has a Process
field. Process
happens to be the interface declaring all the methods on *gen.ServerProcess
and is something we can mock!
process := &gen.ServerProcess{
ProcessState: gen.ProcessState{
Process: MOCK_GOES_HERE,
},
}
However, manually writing a mock implementing gen.Process
is a daunting task, there’s just too many methods to implement manually! (Approx 60 methods are declared in the Process interface).
Luckily, there exists mock generation tools for Go such as gomock to help us out.
First, install gomock: go install go.uber.org/mock/mockgen@latest
.
Then generate a mock for gen.Process
:
$ mockgen -package mock_ergo github.com/ergo-services/ergo/gen Process > mock_argo/mock_ergo.go
Then, by importing our generated mock into our test and using gomock.Controller
, we can finally test usage of the process
parameter. A more complete example test:
func TestStoreAndQuery(t *testing.T) {
ctrl := gomock.NewController(t)
processMock := mock_ergo.NewMockProcess(ctrl)
process := &gen.ServerProcess{
ProcessState: gen.ProcessState{
Process: processMock, // <-- Here, set our mock as Process.
},
}
// Here we set up an expectation that our code under test will call .Send, addressing "some-actor".
processMock.EXPECT().Send("some-actor", gomock.Any()).Times(1)
testee := createActorUnderTest()
// Invoke HandleInfo with our semi-mocked process.
serverStatus := testee.HandleInfo(process, SomeStruct{})
assert.Equal(t, gen.ServerStatusOK, serverStatus)
}
The test above will fail if the EXPECT is not fulfilled, i.e. we can assert that a message was sent from the actor under test. Of course, we can replace gomock.Any()
with an exact expectation for the message or use a gomock.Matcher
to write our own matching logic.
Ergo offers an UDP behaviour (actor) out of the box that’s easy to setup:
func (us *UDPReceiver) InitUDP(process *gen.UDPProcess, args ...etf.Term) (gen.UDPOptions, error) {
var options gen.UDPOptions
options.Handler = createUDPReceiverHandler()
options.Port = 9092
options.NumHandlers = 4
return options, nil
}
The code above (with some supporting boilerplate) sets up a UDP listener on port 9092 having four actor instances. When a UDP packet is received, the following function is invoked by the Ergo Framework:
func (uh *UDPReceiverHandler) HandlePacket(process *gen.UDPHandlerProcess, data []byte, packet gen.UDPPacket) {
event := events.TrafficEventMessage{Date: time.Now().UnixMilli(), Payload: data}
if err := process.Send("dispatcher", event); err != nil {
slog.Error("cannot send event message", slog.Any("error", err))
}
// Just for fun, show how to send a response packet.
_, err = packet.Socket.Write([]byte(`\n`))
if err != nil {
slog.Error("writing response", slog.Any("error", err))
}
}
What’s that process.Send("dispatcher", events.TrafficEventMessage{Date: time.Now().UnixMilli(), Payload: data}))
thing?
It’s how to use the Pub/Sub functionality of Ergo! We’ll put the []byte
from the packet into our own events.TrafficEventMessage
and pass it to dispatcher
using a Send-type message. The dispatcher
will receive the events.TrafficEventMessage
in its “Send”-receive callback, where it will perform the actual Pub/Sub-publishing.
func (s *Dispatcher) HandleInfo(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
if err := process.SendEventMessage(events.TrafficEvent, message); err != nil {
slog.Error("Dispatcher: handle SendEventMessage(events.TrafficEvent) error", slog.Any("error", err))
}
return gen.ServerStatusOK
}
(from the dispatcher code)
Note: I personally think it would be convenient to be able to do the Pub operation directly from the UDP actor, but doing so gives an “not an owner” error. The official docs seems to indicate that any actor should be able to be a producer, but since the official getting started guide uses the “dispatcher” pattern, I guess there’s some reason why a web- or UDP actor can’t publish directly.
Setting up a pool of worker actors is a breeze with Ergo. In main.go
I’ve declared two separate worker pools, one of which is the ingestors_workers
whose members will act as pub/sub subscribers.
process, err = trafficNode.Spawn("ingestors_workers", gen.ProcessOptions{}, createIngestors())
The underlying gen.PoolBehaviour
is created in the createIngestors()
function, whose InitPool
callback (called by the Ergo framework) sets things up for us:
func (p *Ingestors) InitPool(process *gen.PoolProcess, args ...etf.Term) (gen.PoolOptions, error) {
opts := gen.PoolOptions{
Worker: createIngestorsWorker(),
NumWorkers: 5,
WorkerOptions: gen.ProcessOptions{MailboxSize: 1024},
}
if err := process.MonitorEvent(events.TrafficEvent); err != nil {
slog.Error("can't monitor event", slog.Any("error", err))
return opts, err
}
return opts, nil
}
The above snippet shows two important things:
events.TrafficEvent
using process.MonitorEvent
. The worker instance setup happens in createIngestorsWorker()
which creates a gen.PoolWorkerBehavior
with appropriate callbacks for handling messages.The the worker HandleWorkerInfo(process *gen.PoolWorkerProcess, message etf.Term)
callback gets invoked in a round-robin fashion each time a events.TrafficEvent
is produced.
Currently, our HandleWorkerInfo
doesn’t do that much - it type-converts the etf.Term
to the actual Go struct type, then calls msgpack.Unmarshal(event.payload, &geoPosUpdate)
to go from the original []byte
payload sent over the wire to a Go struct, which then is sent to the storage
actor using Send.
Strictly speaking, the work done by the ingest workers is quite minimal, just unmarshal from msgpack and send to storage
. However, in a more realistic scenario, perhaps each payload received over UDP would contain a MAC or a checksum that needs to be verified or checked for each message. Sure, for 10 msg/sec or 100 msg/sec we could do that without a worker pool. But scale to 1000/s or 25000/s, then a worker pool (a real one distributed across CPUs, servers, pods, clusters whatever) could be necessary.
Many modern databases support storing geospatial data, including powerful query support. In a live scenario I’d perhaps pick something like PostGIS. For the sake of this blog post, I picked the Go-embeddable KV-store BuntDB which is simple, fast and has excellent support for geospatial indexes and queries.
Our spatial index is created at storage
actor creation. Excerpt:
if err := db.CreateSpatialIndex("fleet", "fleet:*:pos", buntdb.IndexRect); err != nil {
slog.Error(err.Error())
}
I think this is quite cool! We’re creating an index named “fleet” having the key pattern “fleet:*:pos”, of the “rectangular” 2D spatial index type. What this index does is that whenever a KV-pair in our KV-store is created that matches this pattern (such as “fleet:my-cool-vehicle-id:pos”) it becomes indexed and efficiently queryable. More on that in section 2.5.1.
Our storage mechanism is an Actor called storage
that is called from the ingest workers using the Send
operation, i.e. direct message passing without reply. An important trait of BuntDB is that it supports only a single simultaneous writer, but many concurrent readers. Thus, we’re only running a single instance of storage
, which means it could become a bottleneck in a high-load scenario. To partially mitigate this, declaring a really large mailbox
size may be beneficial.
The Go code in the storage
actor is quite straightforward. We’ll use a dedicated write transaction against the DB:
func (s *Storage) HandleInfo(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
trafficEvt := message.(PositionUpdate)
// Store position on index uing a write transaction
if err := s.db.Update(func(tx *buntdb.Tx) error {
_, _, err := tx.Set("fleet:"+trafficEvt.ID+":pos", buntdb.Point(trafficEvt.Lon, trafficEvt.Lat), nil)
return err
}); err != nil {
slog.Error(err.Error())
return gen.ServerStatusIgnore
}
slog.Debug("stored position event in DB", slog.Float64("lon", trafficEvt.Lon), slog.Float64("lat", trafficEvt.Lat))
return gen.ServerStatusOK
}
I’m not that fond of having to do a type-cast from etf.Term
to our PositionUpdate
struct. Other than that, the actual write happens inside the inlined tx
function.
tx.Set("fleet:"+trafficEvt.ID+":pos", buntdb.Point(trafficEvt.Lon, trafficEvt.Lat), nil)
I.e. we create a key under the fleet
index with the position, and then use buntdb.Point
to store the geoposition in BuntDB’s proprietary "[lon lat]"
format.
If all goes well, we let the method return gen.ServerStatusOK
to indicate that the processing went fine.
The application also offers an HTTP API that a imaginary frontend, dashboard or BI system could use to query vehicle geospatial data, for example getting the 100 vehicles closest to a specified lat/lon position.
The API is implemented using a gen.WebBehavior
actor whose gen.WebHandlerBehavior
provides callback methods for both ERGO-type messaging as well as the expected web-specific ones such as HandleRequest((process *gen.WebHandlerProcess, request gen.WebMessageRequest)
. Note that this Web/HTTP API does not follow the Go stdlib HandlerFuc(rw http.ResponseWriter, req *http.Request)
pattern. Instead, both the underlying HTTP request and the HTTP response we write to are accessed through the gen.WebMessageRequest
.
On a higher level, I decided to let the REST API be rather scalable using another worker-style pattern for the actual querying:
By using the synchronous Call
pattern, addressing the “queries_workers” worker pool directly, ERGO will let one of the workers handle our request and directly return the response.
elems, err := process.Call("queries_workers", events.NearbyQueryEventMessage{Lon: lon, Lat: lat})
Inside the “queries_workers” actors, the HandleWorkerCall(process *gen.PoolWorkerProcess, message etf.Term) etf.Term
callback is invoked when a Call
is invoked. The implementation is rather straightforward, since each “queries worker” has a *buntdb.DB
member allowing access to the database.
func (w *QueriesWorker) HandleWorkerCall(process *gen.PoolWorkerProcess, message etf.Term) etf.Term {
query := message.(events.NearbyQueryEventMessage)
elems := make([]events.VehiclePosition, 0)
// Call DB using read-only "view" transaction
if err := w.db.View(func(tx *buntdb.Tx) error {
return tx.Nearby("fleet", buntdb.Point(query.Lon, query.Lat), func(key, val string, dist float64) bool {
lon, lat, ok := coordToLonLat(val)
if !ok {
return false
}
// Convert to our output format. (Key is fleet:<uuid>:pos)
elems = append(elems, events.VehiclePosition{
ID: strings.Split(key, ":")[1],
Lon: lon,
Lat: lat,
})
return true
})
}); err != nil {
slog.Error("QueriesWorker query error", slog.Any("error", err))
}
return elems
}
Note that:
etf.Term
to our events.NearbyQueryEventMessage
.db.View
read-only transaction. This is important since BuntDB allows concurrent read-access, but only allows a single writer. By using a read-only tx, we’ll get much better performance.[]events.VehiclePosition
while the method signature has the etf.Term
as return type. I wonder if this could be made more explicit with generics? The issue is probably the (lack of) support for method-level generics in Go.All in all, this conveniently allows us to directly access the []events.VehiclePosition
in the Web Actor, where a simple json.Marshal(...)
lets us return the contents as JSON to the callee.
data, err := json.Marshal(elems) // <- elems were returned from the queries worker
if err != nil {
request.Response.WriteHeader(http.StatusInternalServerError)
return gen.ServerStatusIgnore
}
request.Response.Header().Set("Content-Type", "application/json")
request.Response.Header().Set("Content-Length", fmt.Sprintf("%d", len(data)))
request.Response.WriteHeader(http.StatusOK)
_, _ = request.Response.Write(data)
return gen.WebHandlerStatusDone
If you want to try this, here’s how to do it (if you have Go 1.21 or later installed on your system):
git clone https://github.com/eriklupander/ergo-trafficapp.git
cd ergo-trafficapp
go run cmd/*.go -level debug
The console will print something similar to:
9:05PM INF trafficapp/dispatcher.go:22 Init process: <99C75413.0.1011> with name "dispatcher" and args []
9:05PM INF trafficapp/emergency.go:24 Init process: <99C75413.0.1012> with name "emergency" and args []
9:05PM INF trafficapp/storage.go:31 Init process: <99C75413.0.1013> with name "storage" and args []
... rest truncated
Since there’s no external database and Ergo provisions a self-signed certificate for HTTPS, it’s very easy to launch the application.
As a reminder, the Traffic Application has two APIs - one UDP API that ingests “traffic events” and a REST API for queries.
With the UDP listener listening at port 9092, the easiest way to submit a sample traffic message encoded with msgpack is to use netcat to transmit packets. A sample call that sends a pre-baked base64-encoded msgpack packet:
$ echo "hKJJRNkkNGUwOTEzOTYtNjJhZS00ZjNiLTg3MTUtN2I0ZmM5YzY0ZDE0o0xvbsvAU1R+Qxkx1qNMYXTLwEI9Ghs6ciCpRW1lcmdlbmN5wg==" | \
base64 -d | \
nc -u -w1 localhost 9092
The log should print something similar to:
10:47PM INF cmd/udpreceiver_handler.go:25 [UDP handler] got message queue_len=0 process_id=<99C75413.0.1027>
However, we want to be able to put this thing under load! Therefore, I implemented a really simple fake traffic generator in Go, You can find the source under /tools/udpclient in the source code repo.
The client is really easy to use (you need to have Go installed). The first parameter is host:port of the Traffic Application, the second the number of workers that will send a UDP traffic event per second (with a bit of randomness):
$ go run main.go localhost:9092 10
UDP packets sent: 0 ... 40 ... 90 ... 140 ... 190 ... 240 ... 290 ..
The program will run for an hour, or until Ctrl+C is pressed. There should be a lot of log messages in the traffic-app server log and the database should have some entries stored. Play around with the number of workers, you’ll probably run into issues with insufficient OS files in the client, or full mailbox errors on the server at some point.
Our HTTP API listens at :9090/ and accepts lat/lon coordinates as query-parameters. A sample query using curl where we use jq to pretty-print the JSON response:
curl -k https://localhost:9090?lon=43.5454&lat=76.232 | jq .
[
{
"ID": "cfca6d73-143a-48f6-a0bd-fb17bbd1f7f8",
"Lon": 43.379289410669,
"Lat": 76.22768021122108,
"Date": 1701114364134
},
{
"ID": "d4cd55bd-832d-465f-88f3-c3beba88e5e4",
"Lon": 43.50006737771133,
"Lat": 76.69090415181678,
"Date": 1701114364134
}, ... rest truncated
Behind the scenes, our Web actor receives the HTTP request, reads the query-params into float64
lat
and lon
variables, and then uses process.Call
to perform a request/reply with the query_workers
worker pool, marshalling the result into JSON and then writing it to the HTTP response.
A neat thing here is how Ergo automatically load-balances the Call
messages from the Web actor across the available workers. If we call the API three times with debug
level logging enabled, we can see in the (truncated) logs that workers are picked in a round-robin fashion:
QueriesWorker received Call request process_id=<99C75413.0.1037>
QueriesWorker received Call request process_id=<99C75413.0.1038>
QueriesWorker received Call request process_id=<99C75413.0.1036>
(see process_id
1036,1037,1038)
We’ve already covered how the BuntDB is queried in section 2.5.1, so I guess this wraps up how to interact with our little demo application.
No, I won’t do a realistic load test of my Ergo app. What I can do, is to run some simple benchmarks locally on my 2019 MacBook Pro and see how many UDP packets per second we can ingest, and what kind of CPU and memory usage the Traffic Application will show.
Note that memory usage grows linearly with the number of vehicles ingested since BuntDB keeps all data in memory. It also flushes the DB:s state to disk so data can be kept between restarts. However, the DB file on disk is removed after each test round to keep tests fair. Note that since BuntDB executes in process, the benchmark may not be representative of Ergo performance since quite a lot of CPU cycles may be used writing to the DB and keeping its indexes up to date.
UDP packets per second | CPU | Memory | Errors | Notes |
---|---|---|---|---|
0 | 0.2% | 18 mb | None | Starts at approx 12 mb, grows to ~18 mb during the first 10 minutes. |
50 | 3-4% | 25 mb | None | |
500 | 30-35% | 27 mb | None | |
5000 | 145-150% | 37 mb | None | Need to increase mailbox size of various actors. |
15000 | 300+% | 90+ mb | Yes | Client crashes with “write: no buffer space available” |
Is the performance “good”? I have no idea, this is after all tested on a single computer with an embedded DB. But at least it’s not horrible and up to 5000 UDP messages/s can be handled just fine including storing to DB.
Running REST queries using curl
while ingesting 5000 events/s works fine with a nice low latency:
9:32PM INF web/web_handler.go:66 WebHandler - served query duration=88.871µs vehicle_count=10 lon=43.5454 lat=76.232
Also, keep in mind that Ergo by default starts dropping messages when an actor’s mailbox is full. I had to tweak my get.ProcessOptions
mailbox sizes once I started to add even moderate loads.
As stated at the start of this taking-for-a-spin blog post, I’m a novice when it comes to actor-based architecture, Erlang etc. I attended a few talks on Akka around 2010, and while I found the concept of Actors intriguing in the pre-microservices era of Application Servers, .ear files and J2EE, I never got a chance to try it out for real. Fast-forward more than a decade, and the Application Server is not exactly bleeding edge anymore, and the whole microservices paradigm of isolated containers, container orchestrators, service meshes etc. has transformed how we build, deploy, scale and manage our applications. In a way, I can see how the actor programming model and OTP architecture actually was a precursor to microservices. But where does that leave actors in 2023 and beyond? Ergo Framework and other similar frameworks offers isolation, message handling, supervisor hierarchies etc. in a well-defined and proven programming model. But these days, so does many containerized and orchestrated platforms as well, built around message brokers, non-blocking IO and the full plethora of observability available these days. I’m wondering - what do I get with something like Ergo Framework that I won’t get running (well done) microservices on Kubernetes?
This blog post won’t (and can’t) answer that. Ergo claims to support “Cloud Overlay Networking” and I can imagine that an actor framework playing really nice with the Kubernetes API, plus something like Terraform, could combine the best of two worlds, with automatic provisioning of “Actors” running in containers, with all the observability, availability, traffic monitoring, ingress/egress handling etc required in the 2020:s.
As for Ergo Framework in particular, I think it seems to do what it advertises on the localhost level, with a clean programming model, good defaults and useful actor types available out-of-the box. However, I’m not sure how I would go about making a production-ready solution running in the cloud using “native” Ergo. I think the documentation is a bit lacking beyond the “hello world”-level, and the godocs have a lot of holes. Still, if I were to design an event-driven Go-based system where excellent performance characteristics were important, I’d love to give Ergo a serious chance.
Thanks for reading! // Erik Lupander