5.4 Actor-based Concurrency

The concurrency models we have considered so far have the notion of shared state in common. Shared state can be accessed by multiple threads at the same time and must thus be protected, either by locking or by using transactions. Both, mutability and sharing of state are not just inherent for these models, they are also inherent for the complexities. We now have a look at an entirely different approach that bans the notion of shared state altogether. State is still mutable, however it is exclusively coupled to single entities that are allowed to alter it, so-called actors.

The Actor Model

The actor model has its theoretical roots in concurrency modelling [Hew73] and message passing concepts [Hoa78]. The fundamental idea of the actor model is to use actors as concurrent primitives that can act upon receiving messages in different ways:

  1. Send a finite number of messages to other actors.
  2. Spawn a finite number of new actors.
  3. Change its own internal behavior, taking effect when the next incoming message is handled.

For communication, the actor model uses asynchronous message passing. In particular, it does not use any intermediate entities such as channels. Instead, each actor possesses a mailbox and can be addressed. These addresses are not to be confused with identities, and each actor can have no, one or multiple addresses. When an actor sends a message, it must know the address of the recipient. In addition, actors are allowed to send messages to themselves, which they will receive and handle later in a future step. Note that the mapping of addresses and actors is not part of the conceptual model (although it is a feature of implementations).

Figure 5.3: An example network of several actors. Each actor has its own mailbox and isolated state. Based on its designated behavior, the actor responds to incoming messages by send new messages, spawn new actors and/or changing its future behavior.

Messages are sent asynchronously and can take arbitrarily long to eventually arrive in the mailbox of the receiver. Also, the actor models makes no guarantees on the ordering of messages. Queuing and dequeuing of messages in a mailbox are atomic operations, so there cannot be a race condition. An actor processes incoming messages from his mailbox sequentially using the aforementioned possibilities to react. The third possibility, changing its own internal behavior, eventually allows to deal with mutable state. However, the new behavior is only applied after the current message has been handled. Thus, every message handling run still represents a side-effect free operation from a conceptual perspectice. The actor model can be used for modelling inherently concurrent systems, as each actor is entirely independent of any other instances. There is no shared state and the interaction between actors is purely based on asynchronous messages, as shown in figure 5.3.

Actor Implementations for Concurrent Programming

Besides a theoretical model for concurrent systems, the idea of actors also represents the blueprint for a concurrent programming model. Several conceptual implementations have been considered [Agh90], ranging from strictly functional adaptions to extensions of the object-oriented paradigm [Gue07]. The first, fairly popular programming language that has incorporated the actor model for concurrency was Erlang [Vin07]. The actor model has recently become increasingly popular and finds its way into many new programming languages, often as first-class language concept. In many other languages, the actor model is available using third-party libraries that build on top of conventional multithreading.

When implementing the actor model, it is important to adhere to the set of rules defined by the original idea. First and foremost, actors must not share any state. This disallows actors to pass references, pointers or any other kind of shared data as part of a message. Only immutable data and addresses (i.e. "names") of actors should be sent. Message passing between actors is often enriched with a few more guarantees compared to the entirely best-effort style. Most implementations ensure that two messages sent from one actor to another maintain their order at arrival. Messaging is always asynchronous and the interleaving of incoming messages sent by multiple actors is indeterminate.

For message handling, most implementations provide pattern matching. This enables the developer to distinguish between different types of messages and supply different handling code associated to the type of message. While receiving messages is a blocking operation from the actor's perspective, sending new messages is always non-blocking. Some implementations also provide selective receive semantics. Depending on its behavior, an actor may then wait for a specific message in the mailbox and temporarily defer others.

The underlying runtime platform must allocate possibly huge numbers of actors to restricted numbers of CPU cores and resources and schedule the execution of actors with pending messages. Most systems employ a principally lock-free implementation, as atomic behavior is only required for the mailbox operations. For instance, the Erlang virtual machine starts a single process and spawns a pool of threads based on the number of cores available [Lar08]. The internal scheduler organizes actors in process queues and works preemptively. When a running actor has handled a message or has executed a certain number of reductions (i.e. function calls), the scheduler switches to the next actor ready to run. For I/O operations, the Erlang virtual machine spawns decoupled operating system threads. The reduction limits and background I/O operations promote fairness and liveness, because no actor can bind CPU time for a longer period.

Many implementations of the actor model provide additional features that are also ramifications of the actor model, namely distribution support and fault tolerance. Concurrency concepts normally target single machines. The actor model does not postulate many guarantees for messaging. Asynchronous, unbounded messaging in fact resembles network-based communication. The isolation of states to actors does not require shared access between multiple actor instances. Actors are designated using addresses, that can easily provide location transparency. The actor model is inherently parallel, thus it is very easy to extend implementations of the actor model to support distributed deployments. For instance, distributed Erlang systems make use of multiple nodes running an Erlang virtual machine and transparently provide distributed messages passing.

Conventional, thread-based concurrency gives fault tolerance a hard fight. Nondeterminism and unpredictable scheduling combined with shared state and locking requires very complex strategies for replication and snapshotting. The actor model comes up with other primitives that makes replication much easier. Isolated states and incoming messages queued in actors' mailboxes are very similar to snapshots and logs. Message handling of an actor is single-threaded and provides implicit yielding points. Erlang embraces a "let is crash" philosophy [Arm07]. The isolated, shared nothing trait of actors allows a single actor to fail without affecting any other actors. Furthermore, the actor model itself can be used for fault tolerance, by spawning hierarchy trees of actors for supervision. Once an actor crashes, the supervising actor receives a messages and can react. A supervisor might restart the actor, stop other actors or escalate by sending an error message to its own supervisor.

Actors have an isolating impact on state and effectively prevent shared mutabale state. Also, no locks are necessary for concurrent programming. However, concurrency issues like deadlocks and race conditions are still not entirely expelled in this programming model, as they can be reintroduced by incorrect applications. Two actors waiting for a message from each other represent a cyclic dependency. In practice, the impending deadlock can be prevented by using timeouts. The arbitrary ordering of messages send by actors might be interpreted as a traditional race condition by some developers. However, it is a characteristic property of the actor model testifying asynchrony. Hence, these developers ignore fundamental ideas of the actor model and the resulting "race condition" is actually a manifestations of inappropriate application design.

Programming with Actors

The actor model for concurrency is very different than thread-based concurrency with locks or STM. Isolated mutable state and asynchronous messaging yield other programming patterns that threads do.

First of all, it is important to understand that actors represent very lightweight primitives compared to threads. They can be spawned and destroyed with minimal overhead. Thus, it is totally feasible to create and use large numbers of instances in parallel. Actors can also execute arbitrarily complex computations in response to a message. Actors can send messages to themselves, which allows messaging patterns that recreate recursion. Furthermore, actors can send messages to other actors known by their address, so an actor-based program is essentially a highly dynamic network of actors (a directed graph). As a result, existing message-based patterns for application integration [Hoh03] provide a comprehensive set of patterns that can be used for concurrent programming with actors as well. This includes popular messaging patterns for routing, filtering, transformation and composition.

Isolating mutable state and enforcing immutable messages guarantees implicit synchronization. However, the concept of asynchronous messaging and no global state challenges coordination. An application may require consensus or a concerted view of state between multiple actors. When multiple actors must be strictly orchestrated in order to provide a distinct application function, correct messaging can become very demanding. Thus, many implementations provide higher-level abstractions that implement low-level coordination protocols based on complex message flows, but hide the internal complexity from the developer. For Erlang, OTP is a standard library that contains a rich set of abstractions, generic protocol implementations and behaviors.

Another common approach is the transactor. For example, multiple actors may require to modify their internal state in a coordinated manner. A transactor, which is a dedicated actor for coordinating transactional operations of multiple actors, can help in this situation by providing abstract transaction logic. Some transactors also apply STM concepts for transactional behavior [Les09,Les11].

Case Study: Concurrency in Scala

Scala is a general purpose, object-functional language that runs on the JVM. It interoperates with Java, but provides enhanced expressiveness, advanced programming concepts and many features of functional programming. For concurrency, Scala implements an actor-based concurrency model and supports explicit immutability of values. However, Scala applications can also fall back to concurrency primitives of the Java programming language.

While Erlang spawns multiple low-level threads and implements a custom scheduler for running actors, Scala is somehow caught by the multithreading implications of the JVM. Furthermore, the actor implementation of Scala is not part of the language core, but part of its standard library. This means that actor library itself is implemented in Scala. One initial challenge of Scala actors has been introduced by the constraints of multithreading in the JVM. The number of possible threads is limited, there is no cooperative scheduling available and threads are conceptually less lightweight than actors are supposed to be. As a result, Scala provides a single concept of an actor, but two different mechanisms for message handling [Hal06,Hal08].

Thread-based Actors
When the receive primitive is used, the actor is internally backed by a dedicated thread. This obviously limits scalability and requires the thread to suspend and block when waiting for new messages.
Event-driven Actors
The react primitive allows and event-driven execution strategy, which does not directly couple actors to threads. Instead, a thread pool can be used for a number of actors. This approach uses a continuation closure to encapsulate the actor and its state. However, this mechanism has several limitations and obscures the control flow [Hal08]. Conceptually, this implementation is very similar to an event loop backed by a threadpool. Actors represent event handlers and messages resemble events.

Generally, react should be preferred, as it does not couple each actor to a dedicated thread. The react primitive thus yields better scalability results.

The syntax of Scala actors for messaging follows the Erlang style. Messages are supposed to be immutable values, but this is not enforced so far. Often, case classes are used, a special type of wrapper class. They are especially helpful when pattern matching is used to determine the type of message on arrival. Scala also supports the distribution of actors, by supplying remote actors, that communicate over TCP/IP and rely on Java serialization.

The listings below illustrate an actor-based solution to our prior exemplary web application. The solution is written in Scala and uses the Play web application framework The framework does not use regular Scala actors, but actor implementations provided by the akka library. As shown in the listing below, the application starts a single actor and registers a method for handling requests. This method sends an asynchronous "visit" message to the actor. By using the ? operator, a Future is returned that represents the eventual reply. If no timeout occurs, the actor reply is then used as response body of the SimpleResult, once available. The internals of CoutingActor, as shown in the listing below, are very simple. The actor provides a single counter variable and responds to "visit" messages by incrementing the value and sending it back to the original sender.

An actor in scala, based on the akka actor implementation. The actors encapsulates a counter state, and responds to each "visit" message by returning the number of overall visits counted so far:

package actors

import akka.actor.Actor
import Actor._

class CoutingActor extends Actor {

	var count = 0;

	def receive = {
		case "visit" => 
			count = count+1
			sender ! ""+count 
	}
}

A minimalistic, concurrent web application written in Scala that returns the number of requests handled so far, using the Play web framework:

package controllers

import akka.util.Timeout
import akka.pattern.ask
import akka.actor._
import akka.util.duration._
import actors.CoutingActor
import play.api._
import play.api.mvc._
import play.api.libs.concurrent._
import play.api.libs.iteratee.Enumerator

object Application extends Controller {

	val system = ActorSystem("counter")
	val actor = system.actorOf(Props[CoutingActor])

	def index = Action {
		AsyncResult {
			implicit val timeout= Timeout(5.seconds)
			(actor ? "visit").mapTo[String].asPromise.map { result =>
				SimpleResult(
					header = ResponseHeader(200, Map(CONTENT_TYPE -> "text/plain")), 
					body = Enumerator(result)
				)		   
			}
		}    
	}
}

Actors for Concurrent Application Logic

When the application servers use the actor model, each incoming request represents a new actor. For parallelizing request operations, the actor spawns new actors and assigns work via messages. This enables parallel I/O-bound operations as well as parallel computations. Often, the flow of a single request represents a more or less complex message flow between multiple actors, using messaging patterns such as scatter/gather, router, enricher or aggregator [Hoh03].

However, implementing request logic using actors differs clearly from sequential request logic implementations. The necessary coordination of multiple actors and the less apparent flow of execution due to asynchronous messaging provides an arguably less comfortable, but more realistic abstraction towards concurrency

Another feature of the actor model is the possibility to scale the actor system as a whole by adding new machines. For instance, Erlang enables virtual machines to spawn a distributed system. In this case, remote actors can hold isolated application state, but accessible via messaging for all other actors of the entire system.