Akka Finite State Machine (FSM) and At Most Once Semantics

The original definition of Actors by Hewitt, Bishop and Steiger stated that i) everything is an actor ii) actors have their own memory and iii) actors communicate by sending and receiving messages asynchronously. When responding to messages an actor can i) send a finite number of messages ii) create a finite number of actors, and iii) designate new behaviour. Designating behaviour in Akka is done through the become and unbecome primitives. Although these primitives are adequate for systems with a small number of states, state management in large systems requires a higher level of abstraction. One of the abstractions used to control states is the Akka FSM Mixin. In this post we will describe how to use the Akka FSM Mixin through a classic Akka example - the Wire Transfer example. We will also extend this example to show how one can implement At Most Once semantics.

Introduction

The ability to designate behaviour is one of the corner stones of Actor systems. Without the ability to designate behaviour Actor systems would be unable to manage state without resorting to mutability. Typically, behaviour is designated in Akka through become and unbecome. Although become and unbecome are adequate, in general it is desired to design Actor systems on a higher level of abstraction. One of the abstraction tools provided by Akka is the Akka FSM mixin. Before we dive into FSM in detail we will first go through a classic Akka example - the Wire Transfer - and work this out from first principles. The Wire Transfer example, as the name implies, refers to the problem of transferring money from one bank account to another.

First stop in this transaction adventure is Bank Account ville!

Bank Account

A BankAccount actor can receive two types of messages: Deposit and Withdraw. Success and Failure business acknowledgements will be indicated by the Done and Failed messages respectively. These messages will be defined in the BankAccount companion object.

object BankAccount {

  case class Deposit(amount: BigInt) {
    require(amount > 0)
  }

  case class Withdraw(amount: BigInt) {
    require(amount > 0)
  }

  case object Done
  case object Failed
}

The implementation of the Bank Account actor is the following:

class BankAccount extends Actor {
  var balance = BigInt(0)

  def receive = LoggingReceive {
    case Deposit(amount) =>
      balance += amount
      sender ! Done

    case Withdraw(amount) if amount <= balance =>
      balance -= amount
      sender ! Done

    case _ => sender ! Failed
  }
}

When a client sends a Deposit message the BankAccount actor will add the deposited amount to the balance. When a client sends a Withdraw message the BankAccount actor will check whether there is sufficient balance in the account and if this precondition is met, the balance is decremented and a Done reply is sent to the client. If the precondition fails a Failed message is sent to the client.

Wire Transfer

In order to handle transfers between two BankAccounts we will delegate this responsibility to the Wire Transfer actor . A WireTransfer actor can understand primarily one message - Transfer - which initiates a transfer of amount money from a from: ActorRef to a to: ActorRef. We will define this message in the WireTransfer companion object together with the Done and Failed business acknowledgments.

object WireTransfer {
  case class Transfer(from: ActorRef, to: ActorRef, amount: BigInt)

  case object Done

  case object Failed
}

The implementation of the WireTransfer is made up of three states:

  1. Initial State: In this Initial state the WireTransfer actor can receive a Transfer message which will initiate the transfer. When a transfer is initiated a Withdraw message is sent to the from ActorRef and the behaviour is designated to Awaiting From state.
  2. Awaiting From State: In this state the Wire Transfer actor has sent a Withdraw message and is awaiting a business acknowledgement from the from ActorRef. If the from ActorRef replies with Done then the Withdraw has been performed (by the from ActorRef) and the WireTransfer proceeds by sending a Deposit message to the to ActorRef. The WireTransfer actor will designate the behaviour to Await To state.
  3. Await To State: In this State the Wire Transfer actor has sent the Deposit message to the to ActorRef and is now expecting a reply. If the to ActorRef replies with Done, the WireTransfer actor sends a Done business acknowledgement to the original client and terminates. If the to ActorRef receives a Failed message the WireTransfer actor will send a Failed business acknowledgement to the original client and terminates.

This simple state machine can be represented diagrammatically as follows:

Using the become primitive the WireTransfer state machine can be implemented as follows:

class WireTransfer extends Actor {
  def receive: Receive = LoggingReceive {
    case Transfer(from, to, amount) =>
      from ! BankAccount.Withdraw(amount)
      context.become(awaitFrom(to, amount, sender))
  }

  def awaitFrom(to: ActorRef, amount: BigInt, customer: ActorRef): Receive = LoggingReceive {
    case BankAccount.Done =>
      to ! BankAccount.Deposit(amount)
      context.become(awaitTo(customer))
    case BankAccount.Failed =>
      customer ! Failed
      context.stop(self)
  }

  def awaitTo(customer: ActorRef): Receive = LoggingReceive {
    case BankAccount.Done =>
      customer ! Done
      context.stop(self)
  }
}

Note that in this case we do not make use of the unbecome primitive. In reality the unbecome primitive is rarely used.

Wire Transfer using FSM

The main problem with the above implementation is that things are not explicit. Although we move from one state to another given an event, the new state and the changes to the data are not explicit. Ideally the problem should be modeled as a set of relations of the form:

which informally means that if we are in state \(State(S)\) and an \(Event(E)\) occurs then actions \(Actions(A)\) should be performed and the system should transition to \(State(S’)\).

A state in FSM is defined as the current position on the state machine together with any data. In the Wire Transfer example above the user may be in either one of these states (we will talk about the data part shortly!): Initial, AwaitFrom, AwaitTo or Done. These states (positions) can be expressed in Scala as follows:

sealed trait State
object Initial extends State
object AwaitFrom extends State
object AwaitTo extends State
object Done extends State

The data required by a WireTransfer actor are the from, to and amount data contained in the initial Transfer message. Additionally the original sender which should receive the business acknowledgement should be stored. We will package this data in the InitialisedWireTransferData object. Initially, prior to receiving the Transfer message, the from, to, amount and sender will be undefined. We will represent this using the UninitializedWireTransferData object as follows:

sealed trait Data
case object UninitializedWireTransferData extends Data
case class InitialisedWireTransferData(from: ActorRef, to: ActorRef, amount: BigInt, client: ActorRef) extends Data

Now that we have the state and data defined, we can implement the WireTransfer FSM as follows:

class WireTransfer extends FSM[State, Data] {
  
  startWith(Initial, UninitializedWireTransferData)
  
  when(Initial) {
    case Event(Transfer(from, to, amount), UninitializedWireTransferData) =>
      from ! BankAccount.Withdraw(amount)
      goto(AwaitFrom) using InitialisedWireTransferData(from, to, amount, sender())
  }

  when(AwaitFrom) {
    case Event(BankAccount.Done, InitialisedWireTransferData(_, to, amount, _)) =>
      to ! BankAccount.Deposit(amount)
      goto(AwaitTo)
    case Event(BankAccount.Failed, InitialisedWireTransferData(_, _, _, client)) =>
      client ! WireTransfer.Failed
      goto(Done)
      stop()
  }

  when(AwaitTo) {
    case Event(BankAccount.Done, InitialisedWireTransferData(_, _, _, client)) =>
      client ! WireTransfer.Done
      goto(Done)
      stop()
    case Event(BankAccount.Failed, InitialisedWireTransferData(_, _, _, client)) =>
      client ! WireTransfer.Failed
      goto(Done)
      stop()
  }
  
  initialize()
}

The code is pretty self explanatory. The state machine will start in the Initial state with all values uninitialised (represented by the UninitializedWireTransferData). The only type of message which can be received in the Initial state is the initial Transfer request at which point a Withdraw message is sent to the from ActorRef and the state machine transitions to the AwaitFrom state.

When the system is in the AwaitFrom state the only two messages that can be received are Done or Failure from the from ActorRef. If the Done business acknowledgement is received the system will send a Deposit message to the to ActorRef and transition to the AwaitTo state. The system will stop() if a Failure is received.

When the system is in the AwaitTo state the only two messages that can be received are the Done or Failure from the to ActorRef. If the Done business acknowledgement is received the system will send a Done business acknowledgement to the original sender. Else if a Failure is received, it sends the sender a Failure business acknowledgement.

At Most Once Semantics

When dealing with transactions in the above example we have assumed that messages (originating from the from and to actors) are unique and hence should always be processed. In general, this assumption is wrong. In order to guarantee that a message is received by an actor we need to be able to send a message more than once (and at least once). When multiple identical messages can be sent to an actor, the actor needs to determine whether the received message is to be processed (unless a pure function is implemented). We can generify this concept by creating our own custom AtMostOnceFSM as follows:

class AtMostOnceFSM[S, D] extends FSM[S, D] {

  protected[this] def hasBeenProcessed(message: S): Boolean = {
    false
  }

  def AtMostOnce(stateFunction: StateFunction): StateFunction = new StateFunction() {
    override def isDefinedAt(msg: Event): Boolean = stateFunction.isDefinedAt(msg)

    override def apply(msg: Event): State = msg match {
      case msg@Event(s: S, _) =>
        if (!hasBeenProcessed(s)) {
          stateFunction(msg)
        } else {
          stay()
        }
      case msg@_ => stateFunction(msg)
    }
  }
}

The AtMostOnceFSM contains a function AtMostOnce which wraps up an original StateFunction. The AtMostOnce state function will first determine whether the received message is defined on the original function (isDefinedAt) and if defined, it will perform the at most once check by delegating to the hasBeenProcessed function. In the WireTransfer example we will use the default implementation which will assume that a message has not yet been processed. Using this custom FSM we can update the original WireTransfer actor as follows:

class WireTransfer extends AtMostOnceFSM[State, Data] {
  startWith(Initial, UninitializedWireTransferData)
  when(Initial) (AtMostOnce {
    case Event(Transfer(from, to, amount), UninitializedWireTransferData) =>
      from ! BankAccount.Withdraw(amount)
      goto(AwaitFrom) using InitialisedWireTransferData(from, to, amount, sender())
  })

  when(AwaitFrom) (AtMostOnce {
    case Event(BankAccount.Done, InitialisedWireTransferData(_, to, amount, _)) =>
      to ! BankAccount.Deposit(amount)
      goto(AwaitTo)
    case Event(BankAccount.Failed, InitialisedWireTransferData(_, _, _, client)) =>
      client ! WireTransfer.Failed
      goto(Done)
      stop()
  })

  when(AwaitTo) (AtMostOnce {
    case Event(BankAccount.Done, InitialisedWireTransferData(_, _, _, client)) =>
      client ! WireTransfer.Done
      goto(Done)
      stop()
    case Event(BankAccount.Failed, InitialisedWireTransferData(_, _, _, client)) =>
      client ! WireTransfer.Failed
      goto(Done)
      stop()
  })
  initialize()
}

The real implementation of the hasBeenProcessed will require unique identifier for BankAccounts and for each WireTransfer. This is left as an exercise to the reader.

The observant reader might have noticed that the isDefinedAt function uses the stateFunction rather than the list of messages received by the proxy in the apply function. This is deliberate since we only want to proxy any messages which were defined in the original function and ignore anything else.

Conclusion

Some have defined become and unbecome as the ‘evil’ Actor counterpart of Imperative gotos. In large systems (with lots of states), using become and unbecome directly can lead to a spaghetti code of states. In this post we have shown how we can use the Actor FSM Mixin to implement Finite State Machines which abstract away become and unbecome calls and make states (and data) more explicit. Stay safe and keep hacking!

Written on June 24, 2017