Akka Finite State Machine (FSM) and At Most Once Semantics
The original definition of Actors by Hewitt, Bishop and Steiger stated that i) everything is an actor ii) actors have their own memory and iii) actors communicate by sending and receiving messages asynchronously. When responding to messages an actor can i) send a finite number of messages ii) create a finite number of actors, and iii) designate new behaviour. Designating behaviour in Akka is done through the become
and unbecome
primitives. Although these primitives are adequate for systems with a small number of states, state management in large systems requires a higher level of abstraction. One of the abstractions used to control states is the Akka FSM Mixin. In this post we will describe how to use the Akka FSM Mixin through a classic Akka example - the Wire Transfer example. We will also extend this example to show how one can implement At Most Once semantics.
Introduction
The ability to designate behaviour is one of the corner stones of Actor systems. Without the ability to designate behaviour Actor systems would be unable to manage state without resorting to mutability. Typically, behaviour is designated in Akka through become
and unbecome
. Although become
and unbecome
are adequate, in general it is desired to design Actor systems on a higher level of abstraction. One of the abstraction tools provided by Akka is the Akka FSM mixin. Before we dive into FSM in detail we will first go through a classic Akka example - the Wire Transfer - and work this out from first principles. The Wire Transfer example, as the name implies, refers to the problem of transferring money from one bank account to another.
First stop in this transaction adventure is Bank Account ville!
Bank Account
A BankAccount
actor can receive two types of messages: Deposit
and Withdraw
. Success and Failure business acknowledgements will be indicated by the Done
and Failed
messages respectively. These messages will be defined in the BankAccount
companion object.
object BankAccount {
case class Deposit(amount: BigInt) {
require(amount > 0)
}
case class Withdraw(amount: BigInt) {
require(amount > 0)
}
case object Done
case object Failed
}
The implementation of the Bank Account actor is the following:
class BankAccount extends Actor {
var balance = BigInt(0)
def receive = LoggingReceive {
case Deposit(amount) =>
balance += amount
sender ! Done
case Withdraw(amount) if amount <= balance =>
balance -= amount
sender ! Done
case _ => sender ! Failed
}
}
When a client sends a Deposit
message the BankAccount
actor will add the deposited amount
to the balance
. When a client sends a Withdraw
message the BankAccount
actor will check whether there is sufficient balance
in the account and if this precondition is met, the balance is decremented and a Done
reply is sent to the client. If the precondition fails a Failed
message is sent to the client.
Wire Transfer
In order to handle transfers between two BankAccount
s we will delegate this responsibility to the Wire Transfer
actor . A WireTransfer
actor can understand primarily one message - Transfer
- which initiates a transfer of amount
money from a from: ActorRef
to a to: ActorRef
. We will define this message in the WireTransfer
companion object together with the Done
and Failed
business acknowledgments.
object WireTransfer {
case class Transfer(from: ActorRef, to: ActorRef, amount: BigInt)
case object Done
case object Failed
}
The implementation of the WireTransfer
is made up of three states:
- Initial State: In this Initial state the
WireTransfer
actor can receive aTransfer
message which will initiate thetransfer
. When a transfer is initiated aWithdraw
message is sent to thefrom
ActorRef and the behaviour is designated to Awaiting From state. - Awaiting From State: In this state the Wire Transfer actor has sent a
Withdraw
message and is awaiting a business acknowledgement from thefrom
ActorRef. If thefrom
ActorRef replies withDone
then theWithdraw
has been performed (by thefrom
ActorRef) and theWireTransfer
proceeds by sending aDeposit
message to theto
ActorRef. TheWireTransfer
actor will designate the behaviour to Await To state. - Await To State: In this State the Wire Transfer actor has sent the
Deposit
message to theto
ActorRef and is now expecting a reply. If theto
ActorRef replies withDone
, theWireTransfer
actor sends aDone
business acknowledgement to the original client and terminates. If theto
ActorRef receives aFailed
message theWireTransfer
actor will send aFailed
business acknowledgement to the original client and terminates.
This simple state machine can be represented diagrammatically as follows:
Using the become
primitive the WireTransfer state machine can be implemented as follows:
class WireTransfer extends Actor {
def receive: Receive = LoggingReceive {
case Transfer(from, to, amount) =>
from ! BankAccount.Withdraw(amount)
context.become(awaitFrom(to, amount, sender))
}
def awaitFrom(to: ActorRef, amount: BigInt, customer: ActorRef): Receive = LoggingReceive {
case BankAccount.Done =>
to ! BankAccount.Deposit(amount)
context.become(awaitTo(customer))
case BankAccount.Failed =>
customer ! Failed
context.stop(self)
}
def awaitTo(customer: ActorRef): Receive = LoggingReceive {
case BankAccount.Done =>
customer ! Done
context.stop(self)
}
}
Note that in this case we do not make use of the unbecome
primitive. In reality the unbecome
primitive is rarely used.
Wire Transfer using FSM
The main problem with the above implementation is that things are not explicit. Although we move from one state to another given an event, the new state and the changes to the data are not explicit. Ideally the problem should be modeled as a set of relations of the form:
\[State\(S\) \times Event\(E\) \implies Actions\(A\), State\(S’\)\]which informally means that if we are in state \(State(S)\) and an \(Event(E)\) occurs then actions \(Actions(A)\) should be performed and the system should transition to \(State(S’)\).
A state in FSM is defined as the current position on the state machine together with any data. In the Wire Transfer example above the user may be in either one of these states (we will talk about the data part shortly!): Initial
, AwaitFrom
, AwaitTo
or Done
. These states (positions) can be expressed in Scala as follows:
sealed trait State
object Initial extends State
object AwaitFrom extends State
object AwaitTo extends State
object Done extends State
The data required by a WireTransfer
actor are the from
, to
and amount
data contained in the initial Transfer
message. Additionally the original sender
which should receive the business acknowledgement should be stored. We will package this data in the InitialisedWireTransferData
object. Initially, prior to receiving the Transfer
message, the from
, to
, amount
and sender
will be undefined. We will represent this using the UninitializedWireTransferData
object as follows:
sealed trait Data
case object UninitializedWireTransferData extends Data
case class InitialisedWireTransferData(from: ActorRef, to: ActorRef, amount: BigInt, client: ActorRef) extends Data
Now that we have the state and data defined, we can implement the WireTransfer
FSM as follows:
class WireTransfer extends FSM[State, Data] {
startWith(Initial, UninitializedWireTransferData)
when(Initial) {
case Event(Transfer(from, to, amount), UninitializedWireTransferData) =>
from ! BankAccount.Withdraw(amount)
goto(AwaitFrom) using InitialisedWireTransferData(from, to, amount, sender())
}
when(AwaitFrom) {
case Event(BankAccount.Done, InitialisedWireTransferData(_, to, amount, _)) =>
to ! BankAccount.Deposit(amount)
goto(AwaitTo)
case Event(BankAccount.Failed, InitialisedWireTransferData(_, _, _, client)) =>
client ! WireTransfer.Failed
goto(Done)
stop()
}
when(AwaitTo) {
case Event(BankAccount.Done, InitialisedWireTransferData(_, _, _, client)) =>
client ! WireTransfer.Done
goto(Done)
stop()
case Event(BankAccount.Failed, InitialisedWireTransferData(_, _, _, client)) =>
client ! WireTransfer.Failed
goto(Done)
stop()
}
initialize()
}
The code is pretty self explanatory. The state machine will start in the Initial
state with all values uninitialised (represented by the UninitializedWireTransferData
). The only type of message which can be received in the Initial
state is the initial Transfer
request at which point a Withdraw
message is sent to the from
ActorRef and the state machine transitions to the AwaitFrom
state.
When the system is in the AwaitFrom
state the only two messages that can be received are Done
or Failure
from the from
ActorRef. If the Done
business acknowledgement is received the system will send a Deposit
message to the to
ActorRef and transition to the AwaitTo
state. The system will stop()
if a Failure
is received.
When the system is in the AwaitTo
state the only two messages that can be received are the Done
or Failure
from the to
ActorRef. If the Done
business acknowledgement is received the system will send a Done
business acknowledgement to the original sender
. Else if a Failure
is received, it sends the sender a Failure
business acknowledgement.
At Most Once Semantics
When dealing with transactions in the above example we have assumed that messages (originating from the from
and to
actors) are unique and hence should always be processed. In general, this assumption is wrong. In order to guarantee that a message is received by an actor we need to be able to send a message more than once (and at least once). When multiple identical messages can be sent to an actor, the actor needs to determine whether the received message is to be processed (unless a pure function is implemented). We can generify this concept by creating our own custom AtMostOnceFSM
as follows:
class AtMostOnceFSM[S, D] extends FSM[S, D] {
protected[this] def hasBeenProcessed(message: S): Boolean = {
false
}
def AtMostOnce(stateFunction: StateFunction): StateFunction = new StateFunction() {
override def isDefinedAt(msg: Event): Boolean = stateFunction.isDefinedAt(msg)
override def apply(msg: Event): State = msg match {
case msg@Event(s: S, _) =>
if (!hasBeenProcessed(s)) {
stateFunction(msg)
} else {
stay()
}
case msg@_ => stateFunction(msg)
}
}
}
The AtMostOnceFSM
contains a function AtMostOnce
which wraps up an original StateFunction
. The AtMostOnce
state function will first determine whether the received message is defined on the original function (isDefinedAt
) and if defined, it will perform the at most once check by delegating to the hasBeenProcessed
function. In the WireTransfer
example we will use the default implementation which will assume that a message has not yet been processed. Using this custom FSM we can update the original WireTransfer
actor as follows:
class WireTransfer extends AtMostOnceFSM[State, Data] {
startWith(Initial, UninitializedWireTransferData)
when(Initial) (AtMostOnce {
case Event(Transfer(from, to, amount), UninitializedWireTransferData) =>
from ! BankAccount.Withdraw(amount)
goto(AwaitFrom) using InitialisedWireTransferData(from, to, amount, sender())
})
when(AwaitFrom) (AtMostOnce {
case Event(BankAccount.Done, InitialisedWireTransferData(_, to, amount, _)) =>
to ! BankAccount.Deposit(amount)
goto(AwaitTo)
case Event(BankAccount.Failed, InitialisedWireTransferData(_, _, _, client)) =>
client ! WireTransfer.Failed
goto(Done)
stop()
})
when(AwaitTo) (AtMostOnce {
case Event(BankAccount.Done, InitialisedWireTransferData(_, _, _, client)) =>
client ! WireTransfer.Done
goto(Done)
stop()
case Event(BankAccount.Failed, InitialisedWireTransferData(_, _, _, client)) =>
client ! WireTransfer.Failed
goto(Done)
stop()
})
initialize()
}
The real implementation of the hasBeenProcessed
will require unique identifier for BankAccount
s and for each WireTransfer
. This is left as an exercise to the reader.
The observant reader might have noticed that the isDefinedAt
function uses the stateFunction
rather than the list of messages received by the proxy in the apply
function. This is deliberate since we only want to proxy any messages which were defined in the original function and ignore anything else.
Conclusion
Some have defined become
and unbecome
as the ‘evil’ Actor counterpart of Imperative goto
s. In large systems (with lots of states), using become
and unbecome
directly can lead to a spaghetti code of states. In this post we have shown how we can use the Actor FSM Mixin to implement Finite State Machines which abstract away become
and unbecome
calls and make states (and data) more explicit. Stay safe and keep hacking!