Heiko's Blog

About programming and other fun things

We Are Reactive

Gabbler, a Reactive Chat App – part 3

| Comments

In the last blog post I have shown how we can use Akka I/O and spray-can to implemented a simple solution for server-side push via long polling. Today we are going to improve it with regard to maintainability and robustness.

Changing behavior instead of mutable state

Currently our Gabbler contains the two mutable fields messages and storedCompleter. There is nothing fundamentally wrong with actors having mutable state. On the contrary, it’s one of the strengthes of the actor model to encapsulate mutable state in a thread-safe way.

Yet in our case the behavior of the actor depends on the two mutable fields in an already slightly entangled fashion: when we receive a completer, we either apply it to complete the request or store it for later, depending on the value of the messages field. And when we receive a message, we have to check if we have a stored completer.

As we will see later, we will have to add another “dimension” to the behavior. Using another mutable field for that would result in a pretty entangled mess which would be too hard to understand and therefore lead to poor maintainability.

How can we solve this issue? Maybe we should think in terms of logical states instead of fields which are implementation details. Or put another way, we should think about a state machine for a Gabbler. Why not give it a try?

All right, when a Gabbler actor gets created, it is waiting for a message or a completer. Let’s call that state waiting. When it gets a GET request in this waiting state, it receives a completer, but in order to complete the request, it needs to wait for a message. Therefore let’s call that state waiting for message. When it gets a POST request in the waiting state, it receives a message and needs to wait for a completer, so let’s call that state waiting for completer. Here’s a state diagram showing these states and possible transitions:

Gabbler states

When a Gabbler is in waiting for message, receiving another completer doesen’t cause a state transition. We would just drop the old completer and use the new one. But when a message is received, the request is completed and the Gabbler transitions back to waiting again.

Similarly, when a Gabbler is in waiting for completer, receiving another message doesen’t cause a state transition. In this case we would add the new message to the one(s) already received. When a completer is received, the request is completed and the Gabbler gets waiting again.

As you can see, the behavior of a Gabbler depends on the state it is in. In other words, we have to change the behavior to build this state machine. Luckily that’s easy, we just have to use the ActorContext.become method which takes the new behavior as a Reveive parameter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Gabbler extends Actor {

  import GabblerService._

  def receive: Receive =
    waiting

  def waiting: Receive = {
    case completer: Completer => context become waitingForMessage(completer)
    case message: Message     => context become waitingForCompleter(message :: Nil)
  }

  def waitingForMessage(completer: Completer): Receive = {
    case completer: Completer => context become waitingForMessage(completer)
    case message: Message     => completeAndWait(completer, message :: Nil)
  }

  def waitingForCompleter(messages: List[Message]): Receive = {
    case completer: Completer => completeAndWait(completer, messages)
    case message: Message     => context become waitingForCompleter(message :: messages)
  }

  def completeAndWait(completer: Completer, messages: List[Message]): Unit = {
    completer(messages)
    context become waiting
  }
}

As you can see, instead of the mutable fields we have introduced three methods, each returning a behavior specific for one of the three states. While we have effectively added four lines of code as compared to the old implementation, I strongly believe that this implementation is superior, because the notion of a state machine makes the implementation easier to understand. In addition we could get rid of the mutable fields and the logic within each state is extremely simple and in particular doesn’t contain nested conditions.

When we use our AngularJS frontend which uses long polling, i.e. sends GET requests asking for messages, our Gabbler actors will be in the waiting for message state most of the time, because every time we complete a request and a Gabbler transitions to waiting, the AngularJS frontend will send a new GET request, i.e. a new completer, within a very short timeframe. Depending on the network latency this period, in which the Gabbler actors are in waiting or eventually in waiting for completer, will typically be in the range between 100 milliseconds and one second.

Robustness

So far we haven’t spent any thoughts on robustness. One reason for that is that Akka offers fault tolerance out of the box. But of course we have to spend some extra efforts to build a really robust system.

Let’s go back to the usage example from the last blog post: first start the application, then ask for messages sending a GET request with curl and then … just wait until the request times out.

1
2
~$ curl -u Heiko:Heiko http://localhost:8080/api/messages
The server was not able to produce a timely response to your request.~$

After waiting a little while – 20 seconds in my case with OS X 10.8.4 – curl gets impatient and cancels the request. This is a pretty reasonable behavior which we should expect from any HTTP client. Therefore we should make sure that we don’t get any issues on the server.

Let’s see what happens if we now send a message using a POST request:

1
2
$curl -u Roland:Roland -d '{ "text": "Akka rocks!", "username": "" }' -H "Content-Type: application/json" http://localhost:8080/api/messages
$

We don’t see anything on the client side, but the server log shows the following:

1
2
3
4
5
6
09:32:25 ERROR [akka://gabbler-service-system/user/gabbler-service/Heiko] - null
java.lang.NullPointerException: null
  at spray.can.server.ResponseReceiverRef.unhandledMessage(ResponseReceiverRef.scala:80) ~[spray-can-1.2-20130628.jar:1.2-20

09:32:25 DEBUG [akka://gabbler-service-system/user/gabbler-service/Heiko] - restarting
09:32:25 DEBUG [akka://gabbler-service-system/user/gabbler-service/Heiko] - restarted

Obviously the Gabbler actor which was created on behalf of the initial GET request was still alive even though the client connection used by its completer has been terminated. This isn’t a big issue, because Akka is fault tolerant. Yet the default behavior of restarting a Gabbler actor in such a situation doesn’t make sense. Therefore the first think we change is the supervisorStrategy of the GabblerService:

1
2
override def supervisorStrategy: SupervisorStrategy =
  OneForOneStrategy() { case _ => SupervisorStrategy.Stop }

As you can see, we simply stop a Gabbler actor in any case, because there are no cases where it makes sense to restart or resume it.

But the essence of the issue still hasn’t been resolved: We should prevent the client connection from timing out during the long polling cycle. Instead we need to have a server side timeout which leads to completing the current request and the client sending another one.

A simple approach would just make use of the receiveTimeout of an actor and stop a Gabbler after a reasonable time. This would work in many cases, but what if there arrives a message after the Gabbler has been stopped, but before the client has reconnected and a new Gabbler has been created? This message would not be delivered to that particular gabbler.

Therefore we have to schedule a timeout ourselves every time a Gabbler actor enters a particular state. If this state is waiting for message, i.e. if the Gabbler has a completer, we’ll complete the request after the timeout with an empty list and transition into waiting. This gives the long polling client the chance to send a new GET request with a new completer.

Gabbler states

In both other states, i.e. in waiting and waiting for completer, the Gabbler is waiting for a completer. If this doesn’t arrive within the timeout period, we can safely assume the user has shut the browser window or some other severe reason prevents the client to send another long polling request, hence we can close the Gabbler actor.

Here’s the accordingly changed Gabbler code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
object Gabbler {

  private case class Timeout(id: Int)

  def props(timeout: FiniteDuration): Props =
    Props(new Gabbler(timeout))
}

final class Gabbler(timeoutDuration: FiniteDuration) extends Actor {

  import Gabbler._
  import GabblerService._
  import context.dispatcher

  def receive: Receive =
    waiting(scheduleTimeout(Timeout(0)))

  def waiting(timeout: Timeout): Receive = {
    case completer: Completer => context become waitingForMessage(completer, newTimeout(timeout))
    case message: Message     => context become waitingForCompleter(message :: Nil, timeout)
    case `timeout`            => context.stop(self)
  }

  def waitingForMessage(completer: Completer, timeout: Timeout): Receive = {
    case completer: Completer => context become waitingForMessage(completer, newTimeout(timeout))
    case message: Message     => completeAndWait(completer, message :: Nil, timeout)
    case `timeout`            => completeAndWait(completer, Nil, timeout)
  }

  def waitingForCompleter(messages: List[Message], timeout: Timeout): Receive = {
    case completer: Completer => completeAndWait(completer, messages, timeout)
    case message: Message     => context become waitingForCompleter(message :: messages, timeout)
    case `timeout`            => context.stop(self)
  }

  private def newTimeout(timeout: Timeout): Timeout =
    scheduleTimeout(timeout.copy(timeout.id + 1))

  private def scheduleTimeout(timeout: Timeout): Timeout = {
    context.system.scheduler.scheduleOnce(timeoutDuration, self, timeout)
    timeout
  }

  private def completeAndWait(completer: Completer, messages: List[Message], timeout: Timeout): Unit = {
    completer(messages)
    context become waiting(newTimeout(timeout))
  }
}

As you can see we have introduced a parameterized Timeout message. This is important to schedule new timeouts and ignore old ones. Of course we don’t hard code the timeout duration, but instead use a value from the configuration. In order to make it work with curl we need a value less than 20 seconds. For the sake of demonstration, let’s simply go for 5 seconds, because then we don’t have to wait too long if we want to watch what’s going on.

If we rerun the above usage example, curl doesn’t time out, but we receive an empty list after 5 seconds. Then we can safely send a message without getting any errors on the server side:

1
2
3
4
5
6
7
~$ curl -u Heiko:Heiko http://localhost:8080/api/messages
[]~$
~$ curl -u Roland:Roland -d '{ "text": "Akka rocks!", "username": "" }' -H "Content-Type: application/json" http://localhost:8080/api/messages
~$ curl -u Heiko:Heiko http://localhost:8080/api/messages[{
  "username": "Roland",
  "text": "Akka rocks!"
}]~$

If we send the message and then again ask for messages quickly enough, all within 5 seconds, we’ll even receive the message that has been sent. This is because after timing out in waiting for message, the Gabbler actor transitions into waiting. There it receives the message and transitions into waiting for completer. In this state timing out would mean stopping, therefore we have to send the completer quickly. In orther words: Even if the long polling connection hasn’t yet been reopened, no messages will get lost.

Conclusion

That’s it, we’re done! We have built Gabbler, a simple push-enabled chat application. While simple, Gabbler is robust and shows how easily we can build such applications with Scala, Akka, spray and AngularJS.

The complete source code is available on GitHub.

Comments