An Introduction To gosiris, An Actor Framework For Go

This post is an introduction to an actor framework for Golang: gosiris. First of all, I will introduce the context, then we will dig into the framework and its capabilities.

Go

Go (or golang) is a programming language created by Google 8 years ago. It is compiled, statically typed, not object-oriented (even though it provides interfaces to define a set of methods) and has a garbage collector.

Initially, I started to learn Go because of its capabilities in terms of concurrent programming.

I think Node.js is not the best system to build a massive server web, I would use Go for that – Ryan Dahl, creator of Node.js

Actually, Go is based on the Communicating Sequential Processes principle (CSP). In a nutshell, CSP is a concurrency model to avoid sharing memory across processes. Instead, the idea is to implement sequential processes and having communication channels between these processes.

This principle is summarized by Rob Pike, co-creator of Go:

Don’t communicate by sharing memory; share memory by communicating – Rob Pike

The actor model

The core idea of the actor model is the same than CSP, namely to shared memory between processes and favor message passing instead. Yet there are two main differences.

The first one is that CSP is purely synchronous meaning a channel writer is blocked until a channel reader reads the message. This limitation was tackled in Golang, though, with the introduction of non-blocking channels (the so-called buffered channels).

The second main difference is that in the CSP model, the processes are somehow anonymous. A channel writer has only a reference to a channel without actually knowing who will receive the message. The actor model, though, is a point-to-point integration. An actor has a reference to another actor (through its identifier for example) to communicate with it.

Most of the actor frameworks implement also a hierarchy concept between the different actors. This simple idea is actually really powerful. A parent actor, for example, could be directly notified of a child actor failure and then decide about a failure strategy (Restart the child? Fail itself to implement a kind of circuit breaker? Etc.).

gosiris

gosiris is an actor framework for Go allowing local or remote communications between actors and providing runtime discoverability plus distributed tracing.

The remote communications can be done using either an AMQP broker or Kafka. The runtime discoverability is achieved using an etcd registry and the distributed tracing is based on Zipkin.

Hello world example

We will see in the following example how to implement a local send-only interaction.

The first step is to create an actor system:

gosiris.InitActorSystem(gosiris.SystemOptions{
    ActorSystemName: "ActorSystem",
})
defer gosiris.CloseActorSystem()

We will see later on an actor system can be distributed by simply configuring some options.

Then it’s time to create and register an actor:

parentActor := gosiris.Actor{}
defer parentActor.Close()

gosiris.ActorSystem().RegisterActor("parentActor", &parentActor, nil)

Each actor has a logical name, here parentActor.

Then we will create and register a child actor but we will implement a specific behavior on a given message type (message):

childActor := gosiris.Actor{}
defer childActor.Close()

childActor.React("message", func(context gosiris.Context) {
    context.Self.LogInfo(context, "Received %v\n", context.Data)
})

gosiris.ActorSystem().SpawnActor(&parentActor, "childActor", &childActor, nil)

An actor can be composed of multiple reactions. Each reaction is based on an event type and must define a specific behavior by implementing a simple function with a context parameter.

Finally, to send a message from the parent to the child:

parentActorRef, _ := gosiris.ActorSystem().ActorOf("parentActor")
childActorRef, _ := gosiris.ActorSystem().ActorOf("childActor")

childActorRef.Tell(gosiris.EmptyContext, "message", "Hi! How are you?", parentActorRef)

As you can see, we have not used the parentActor or childActor variable to send a message. This is not possible in gosiris. Instead we have done a lookup first in the actor system using the actor identifiers (ActorOf(string)). Each lookup returns an ActorRef structure which is simply a reference to an actor.

As a summary:

package main

import (
    "gosiris/gosiris"
)

func main() {
    gosiris.InitActorSystem(gosiris.SystemOptions{
        ActorSystemName: "ActorSystem",
    })

    parentActor := gosiris.Actor{}
    defer parentActor.Close()

    childActor := gosiris.Actor{}
    defer childActor.Close()
    childActor.React("message", func(context gosiris.Context) {
        context.Self.LogInfo(context, "Received %v\n", context.Data)
    })

    gosiris.ActorSystem().RegisterActor("parentActor", &parentActor, nil)
    gosiris.ActorSystem().SpawnActor(&parentActor, "childActor", &childActor, nil)

    parentActorRef, _ := gosiris.ActorSystem().ActorOf("parentActor")
    childActorRef, _ := gosiris.ActorSystem().ActorOf("childActor")

    childActorRef.Tell(gosiris.EmptyContext, "message", "Hi! How are you?", parentActorRef)
}

Stateful actor

It is also possible to manage a stateful actor like in the following example:

type StatefulActor struct {
    gosiris.Actor
    someState interface{}
}

statefulActor := new(StatefulActor).React("someEvent", func(context gosiris.Context) {
    //Some behavior
})

Actor supervision

As we’ve seen, one on the benefits of the actor model is to implement a hierarchy of the actors. In the following example, a parent actor is automatically notified of a child failure:

actor.React(gosiris.GosirisMsgChildClosed, func(context gosiris.Context) {
    context.Self.LogInfo(context, "My child is closed")
})

The parent actor can then decide on the strategy to adopt and decide for example to stop itself:

context.Self.AskForClose(context.Self)

Become/unbecome

In gosiris, it is also possible to implement the become/unbecome pattern familiar in Akka for example:

angry := func(context gosiris.Context) {
    if context.Data == "happy" {
        context.Self.LogInfo(context, "Unbecome\n")
        context.Self.Unbecome(context.MessageType)
    } else {
        context.Self.LogInfo(context, "Angrily receiving %v\n", context.Data)
    }
}

happy := func(context gosiris.Context) {
    if context.Data == "angry" {
        context.Self.LogInfo(context, "I shall become angry\n")
        context.Self.Become(context.MessageType, angry)
    } else {
        context.Self.LogInfo(context, "Happily receiving %v\n", context.Data)
    }
}

actor := gosiris.Actor{}
defer actor.Close()
actor.React("context", happy)

At first, the actor is configured to react by implementing the happy behavior. Then depending on the context, it may change at runtime its behavior to become angry and to unbecome happy.

Distributed example

Let’s see now how to create a distributed actor system:

gosiris.InitActorSystem(gosiris.SystemOptions{
    ActorSystemName: "ActorSystem",
    RegistryUrl:     "http://etcd:2379",
    ZipkinOptions: gosiris.ZipkinOptions{
        Url:      "http://zipkin:9411/api/v1/spans",
        Debug:    true,
        HostPort: "0.0.0.0",
        SameSpan: true,
    },
})
defer gosiris.CloseActorSystem()

Here we referenced an etcd server for the runtime discoverability and Zipkin server for distributed tracing.

In addition, we will implement a request/reply interaction with one actor deployed on AMQP while the other will be deployed on Kafka:

actor1 := new(gosiris.Actor).React("reply", func(context gosiris.Context) {
    context.Self.LogInfo(context, "Received: %v", context.Data)

})
defer actor1.Close()
gosiris.ActorSystem().RegisterActor("actor1", actor1, new(gosiris.ActorOptions)
	.SetRemote(true)
	.SetRemoteType(gosiris.Amqp)
	.SetUrl("amqp://guest:guest@amqp:5672/")
	.SetDestination("actor1"))

actor2 := new(gosiris.Actor).React("context", func(context gosiris.Context) {
    context.Self.LogInfo(context, "Received: %v", context.Data)
    context.Sender.Tell(context, "reply", "hello back", context.Self)
})
defer actor2.Close()
gosiris.ActorSystem().SpawnActor(actor1, "actor2", actor2, new(gosiris.ActorOptions)
	.SetRemote(true)
	.SetRemoteType(gosiris.Kafka)
	.SetUrl("kafka:9092")
	.SetDestination("actor2"))

We simply changed the call to RegisterActor() by adding additional actor options. Once an actor is defined as remote we must at least configure the transport type (AMQP or Kafka), an URL and a destination. Every time a new actor is registered, each actor system will receive a notification and modify its internal actor map.

Last but not least because the actor system has been initialized using a Zipkin reference, gosiris will also manage the distributed tracing part. It means each time an interaction is started (a tell with an empty context), a new parent span will be initialized. Then each reaction becomes automatically a child span and all logs are also forwarded to the Zipkin collector.

Conclusion

As you have seen in the last example, in less than 30 effective lines of code, we have implemented using gosiris a distributed request/reply interaction between two remote actors automatically discovered at runtime and using Zipkin for achieving distributed tracing.

If you are interested in gosiris, feel free to contact me @teivah.

Further reading

Event Sourcing And Concurrent Updates

When we have to deal with concurrent updates, we generally think about two possibilities:

  • Pessimistic locking: an object is explicitly locked by a system
  • Optimistic concurrency control: the system determines whether another client has changed the object before to update it. There are several possible implementations, like using a CAS or providing an expected version of the object.

The bigger the chances to get a concurrent update are, the better it is to favor a pessimistic locking approach. Indeed, if an optimistic transaction fails, the cost to retry it may be very expensive (back and forth to the client, delay, retry etc.).

In addition, an Event Sourcing strategy can be a good solution to deal with highly concurrent environments.

A common strategy with Event Sourcing is to implement something on top of an optimistic concurrency control. For example, a client will request a change on a given object with providing an expected version. If the request fails because the current version is greater than the expected one, instead of naively throwing an error, the system can:

  • First, retrieve the event(s) persisted since the expected version
  • Then check whether these event(s) are really conflicting from a business perspective

Let’s take the following example:

In black, the events already persisted (CustomerCreated and FamilySituationUpdated). At this moment, the current version of the customer is v2.

Let’s now imagine the system receives two concurrent updates with the same expected version (v3). The first event ContactInfoUpdated will be persisted because of the version matches but what about the NewContractSubscribed one?

As we described above, the system can retrieve all the events since v3 (here only ContactInfoUpdated) and determine whether they are actually conflicting or not. In this example, we can imagine that adding a new contract is not a conflict with the update of a contact information so the system will commit the transaction.

To determine whether an event is in conflict with a set of already persisted events, there are several strategies.

We can, for example, implement a blacklist strategy. Any event type registers as well a set of conflicting events. If any of these events are found, the transaction is not committed.
Furthermore, we can also decide to do it with a custom conflict checker function. Bear in mind, though, that one of the benefits of this strategy is to be faster than a simple optimistic concurrency one. If the duration to execute our function is too important (and the chances to get concurrent updates are high), it will impact the system throughput.

Another possible strategy to deal with concurrent updates could be to vary the data model granularity. For example, instead of having a simple event stream for a customer, we could split up its representation in several streams (e.g. GeneralInformation and ContractInformation). Then we will have to apply an optimistic concurrency control for each stream (GeneralInformation has its own version and ContractInformation has also its own version).

Yet, this solution may have some limitations. Indeed we must guarantee the conflict resolution is stream-autonomous, meaning the events stored in one stream are enough to guarantee the integrity of our whole object.

On the opposite, though, a coarse-grained representation (e.g. one stream for a single customer), may lead to trigger the conflict resolution mechanism too often (and once again to impact the system throughput). Therefore the granularity decision remains a very important one, regardless of our strategy.

Running Kafka 1.0 In Docker

With the recent release of Kafka 1.0.0, I have just released the Docker image teivah/kafka (forked from spotify/kafka).

You simply need to run the following command by replacing <DOCKER_HOST> with your Docker host IP:

docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=<DOCKER_HOST> --env ADVERTISED_PORT=9092 teivah/kafka

The Docker image will instantiate a Zookeeper server (port 2181) and a Kafka 1.0.0 (port 9092).

To create a Kafka producer you must run the following command in your Docker instance:

/opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh --broker-list <DOCKER_HOST>:9092 --topic test

And for a Kafka consumer:

/opt/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh --bootstrap-server <DOCKER_HOST>:9092 --topic test

An Introduction To Reactive Streams

As you may know, Java 9 is about to be released on September 21st. Among many new features, the Reactive Streams integration was probably the one I was expecting the most. The goal of this very post is to introduce the original Reactive Streams initiative. As a second step in another post, I will describe what Java 9 will bring.

The original initiative

Reactive Streams is an initiative started in 2013 with different companies like Netflix, Pivotal, Lightbend etc. The core goal is to provide a standard for asynchronous stream processing with a non-blocking back pressure.

To explain what the concept of back pressure is, let us take a concrete example. Your organization is composed of legacy applications. Some of them are, most likely, running on old technologies with limited scalability & performance capabilities. Then you put in place a new revolutionary system, capable to handle thousands of messages per second. If this new system communicates synchronously with some of your legacies (without any throttling mechanisms provided by a service gateway in between), those legacies could simply just crash at some point due to the number of incoming requests sent by the new system. This scenario might seem a bit far-fetched but actually, I did experience it.

To tackle this problem, the core idea of back pressure is to not force a subscriber system to buffer arbitrary amounts of data sent by a publisher.

As mentioned the back pressure problem could also be tackled with using a service gateway in between but also using messaging middleware. For example, if you have a JMS middleware in between a publisher and a subscriber, the requests buffering is not done by the subscriber itself. Yet from my perspective, the main benefit of Reactive Streams is the capability to achieve back pressure without being forced to use any middleware.

Basically, a subscriber can indicate directly to a publisher how many messages it will be able to manage:

  • During the initial subscription
  • And each time a new message is received (received does not necessarily mean processed)

We could also argue that the buffering problem is simply shifted somewhere else, on the publisher side. As an example, if the publisher is a GUI application which shall manage incoming user requests then it is up to this very application to handle the buffering. From a design point of view, it does make sense to buffer them on the publisher and not on the subscriber. Furthermore, in case of a synchronous interaction, some frameworks are releasing the initial request object from the publisher JVM (meaning available for garbage collection) once the reply has been sent by the subscriber. So in this case, the buffering is even done twice.

The Reactive Streams API

The Reactive Streams working group released an API for several languages (Java and .NET, the JavaScript API has not been released yet). This API is simply composed of four different interfaces (that belongs to the org.reactivestreams package for Java):

Subscription

A simple interface acting as a kind of channel description and providing two methods:

  • cancel(): Used by a subscriber to cancel its subscription
  • request(long): Used by a subscriber to request n messages to a Publisher

Publisher

A generic interface providing a single subscribe(Subscriber) method. At first glance considering the original publish/subscribe pattern, it might be strange for a publisher to subscribe to a subscriber. But this is actually the strength of Reactive Streams, the capability for a publisher to adapt its flow of data according to a subscriber.

Subscriber

A generic interface providing 4 methods:

  • onComplete(): Notify the subscriber when a publisher is closed
  • onError(Throwable): Notify the subscriber when a publisher is closed with an error state
  • onNext(T): Notify the subscriber a message was sent. If everything goes right, after having processed a message, the subscriber will usually invoke then Subscription.request(long).
  • onSubscribe(Subscription): Notify the subscriber that a publisher just started a subscription (through the Publisher.subscribe(Subscriber) method). This is also a way for the subscriber to keep a reference on the Subscription object provided.

Processor

A simple interface extending Publisher and Subscriber interfaces. In other words a Processor can acts as both a Publisher and a Subscriber.

API summary

Bear in mind that a data stream has not necessarily an end. So the sequence of actions is usually the following:

onSubscribe onNext* (onError | onComplete)?

So far several actors have released a Reactive Streams compliant implementation like Akka, Vert.x, Reactor, RxJava etc. Nonetheless, Java 9 itself is not compatible with the original API as the four interfaces are redefined in the java.util.concurrent package. I will describe this point in more details in an upcoming post.

Reactive Streams IO

You may have noticed that so far I was solely speaking about an API, not a protocol (just like JMS vs AMQP for instance). And obviously an API does not bring interoperability cross languages.

To tackle this, another working group is (was?) in charge to define a protocol on top of network protocols (either uni or bi-directional like TCP, WebSockets, HTTP/2 etc.). Unfortunately it seems there is no much activity on the Github project even though the first RC was initially planned in 2015. Yet we can easily imagine this should not be an easy job at all but I am looking forward to getting some news.

When to use Reactive Streams?

From my perspective, I can imagine two applications of Reactive Streams.

First, you have to protect a legacy exposing a synchronous API (as described in the example above) from the rest of the world. You can implement a kind of Back Pressure Layer (BPL) acting as a facade on top of the legacy. This is obviously not the best possible application as the request buffering is still not kept at the publisher level. Yet this could be a smarter implementation compared to the dumb throttling policy one you can implement with a service gateway.

Second, you want to implement a system based on reactive principles (responsiveness, resiliency, elasticity and message-driven). In that case, Reactive Streams might appear as a de facto standard for component interactions without being forced to use a load-centric service or messaging middleware.

Conclusion

Reactive Streams does appear as a compelling solution to handle back pressure. Furthermore, the fact that Java 9 integrated this concept is definitely a step ahead for the original Reactive Streams initiative.

In my next post, I am going to describe what will be brought by Java 9 in more details.

Further reading

Why Is A Canonical Data Model An Anti-Pattern

A canonical data model is defined in the Enterprise Integration Patterns as the solution to minimize dependencies when integrating applications that use different data formats. In other words, a component (an application or a service) should communicate with another component through a data format that would be independent of both component data formats.

Two things to highlight here. First I defined a component as either an application or a service because such models are/were used in the context of application integration and service orientation. Secondly, we are talking here about a concept which should be used solely as a transport data format. You should not use a canonical data format as the internal structure of your data store.

Theoretically the advantages are pretty obvious. A canonical data format reduces the coupling between applications, reduces the number of translations to be developed for integrating a set of components etc. Pretty interesting right? One single data model understandable by the whole IT landscape and a set of people (developers, system analysts, business stakeholders etc.) who can share the same vision of a given concept.

For practical purposes, implementing such models is rarely efficient though.

A person is not the same concept for a marketing and a support department in an insurance company. A flight for an air traffic management system has a different meaning depending on if it was filled by a pilot or if it is an ongoing flight on top of your airspace. A PLM part has a completely different representation depending if it has just been designed or if it has to be maintained by a support team.

In most of the contexts, designing a canonical data model results in a large data model with a full set of optional attributes and very few mandatory ones (like the identifiers). Even though it was primarily designed to ease component integration, you will just complicate it. In the meantime your model will create a lot of frustration among the users because of its inherent complexity (in terms of utilization and management). Furthermore, regarding the coupling issue, you are just shifting it somewhere else. Instead of being coupled to one component data format, you become tightly coupled to one common data format that will be used by the whole IT landscape and subject to very frequent changes.

In a nutshell, in most of the contexts, a canonical data model should be considered as an anti-pattern. What is the other option than considering you should still want to minimize the dependencies between two components exchanging data?

Domain Driven Design (DDD) recommends to introduce the concept of bounded context. A bounded context is simply an explicit context in which a model applies with clear boundaries with other bounded contexts. Depending on your organization a bounded context could refer to a functional domain in which an object is utilized, it could refer to the object state itself etc.

In that case the canonical data model as such would simply be the yelow part in the following diagram:

The intersection of A, B and C represents the set of attributes that must be there regardless of the context (basically the mandatory attributes of the previous large CDM). This part should still be carefully designed due to its central role. Yet it is important to remain pragmatic. If in your context is does not make sense to have such common model, you should simply discard it.

What is also important is an intersection between two contexts (A and B, B and C, C and A). How to map one object representation from one context to another? What are the explicit attributes that should be shared across two contexts? What are the common business constraints between two contexts? These questions should still be answered by a transverse team but from a business perspective, it makes sense to raise them. That was not always the case with one single CDM shared across potentially opposite contexts.

Nonetheless, regarding the parts which are not shared with other contexts (in white), it should not be part of an enterprise data model. You should be pragmatic. For example, if a subset is specific to a given domain, it should be up to the domain experts to model it themselves.

One key challenge, though, is to identify those bounded contexts and it might be worth reminding here the Conway’s law:

“Any organization that designs a system will inevitably produce a design whose structure is a copy of the organization’s communication structure.”

The bounded contexts shall not be necessarily mapped onto the current organization. For example, a bounded context can encompass several departments. Breaking organizational silos should still be an objective for most companies.

In addition DDD introduces the concept of Anti-Corruption Layer (ACL). This pattern can refer to a solution for a legacy migration (by introducing an intermediation layer between the old and the new system to prevent data quality issues etc.). But in our context when we talk about corruption it is related to data modeling debt you can introduce to solve short-term problems.

Let us take the example of two systems in charge to manage a given state of a PLM part (a part is a physical item produced or purchased and then assembled like a helicopter rotor for instance). One legacy system (let’s call it SystemA) is in charge to manage the design phase and you must implement a new system (let’s call it SystemZ) in charge to manage the maintenance phase. The whole IT application landscape shares the same common part identifier (partId) except for SystemA which is not aware of it. Instead of the partId, SystemA manages its own identifier, systemAId. Because SystemZ needs to be call SystemA using systemAId, a heuristic could be to integrate systemAId as part of the SystemZ data model.

This is a common mistake you should avoid. You simply corrupted your data model because of a short-term situation.

The ACL pattern could have been a solution here. SystemZ could have implemented its own data format (without any external corruption like the systemAId). Then it would have been up to an intermediation layer to manage the translation between the partId and the systemAId.

Applied to our topic, the ACL pattern enforces to implement a layer in between two different bounded contexts. A component is not aware of how to call another component which is not part of its own bounded context. Instead, a component is only aware of how to map its own data structure on the data format of the bounded context it belongs to.

By the way, this is a rule of thumb. A component shall belongs to only one bounded context. This is also why DDD is a great fit for microservices architecture. Because of the fine-grained granularity, it is easier to comply with this rule.

To summarize, a canonical model as such should be considered as an anti-pattern in most cases. You should try to implement the bounded context concept meaning one model per context with explicit boundaries between the contexts. Two components that are part of different bounded contexts should communicate through an Anti-Corruption Layer to prevent data modeling corruption.

Further reading