Heiko's Blog

About programming and other fun things

We Are Reactive

Gabbler, a Reactive Chat App – part 2

| Comments

In my previous blog post we have built the client portion of Gabbler – a modern and reactive push-enabled chat application – as well as its server skeleton. For those of you who didn’t like the client stuff, here comes some good news: The client portion is already done and we’ll focus on the server in this and the upcoming blog posts. Today we will fill the missing pieces of the server to create a first fully functional, yet still simplistic, solution.

Gabblers are actors

It should be obvious, that we have to represent the connected users, which I will call gabblers from now on, in some way at runtime. I won’t use the term session here, because it might cause frowns or even worse, but at the end of the day we need a concept to keep track of these gabblers.

As we are already using Akka – what a lucky coincident, eh? – we will use actors to represent these gabblers. Actors fit quite naturally, because they have a lifecycle, i.e. can get created when a gabbler appears and get stopped somewhat later when the gabbler leaves. In addition, we can easily create millions of actors on commodity hardware, so chances are that we can sell our application to some social media startup.

All right, here comes the Gabbler actor:

1
class Gabbler extends Actor {  }

Long polling

Before we can define the Gabbler’s behavior, we have to spend some time thinking about its responsibilities.

We want to provide server-side push via long polling. What does that mean? Well, if a gabbler sends a GET request (“Give me some messages, dude!”), we don’t necessarily send a response right away, but only if there are messages available. So in terms of spray-routing we won’t complete a GET request immediately, but essentially keep the respective RequestContext around for later. By the way, in this first step we are not concerned about timeouts and such, but we are looking for a simple solution.

If a client sends a POST request (“Folks, here is a message for you!”), we have to send it to all gabblers which then can use the stored RequestContexts to complete the outstanding long polling GET requests with this message.

So basically we could implement the Gabbler’s behavior like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def receive: Receive = {
  case context: RequestContext =>
    if (messages.isEmpty)
      storedContext = Some(context)
    else
      context.complete(messages)
  case message: Message =>
    messages +:= message
    for (context <- storedContext) {
      context.complete(messages)
      messages = Nil
      storedCompleter = None
    }
}

As you can see we could send the RequestContext for a GET request to the respective Gabbler instance. It would either be stored for later or be used to complete the request immediately, if messages were already available.

A POST request would lead to a Message instance being sent to all Gabbler instances. In this behavior, such a message would first be stored and then all stored messags would be used to complete a former GET request, if a respective RequestContext was available.

Completing the abstraction

While this approach would work, it is really ugly, because the HTTP API leaks into the “business logic” via the RequestContext. Therefore spray-routing offers the produce directive, which produces – hence the name – a completion function A => Unit, where A is the type we want to get marshalled, and passes it into its inner route. We can then further pass along this harmless function and use it later to complete the request by simply applying it. We’ll look at produce when we turn our attention to GabblerService.

In our example we want the completion function to be a List[Message] => Unit. Therefore our Gabbler looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Gabbler extends Actor {

  import GabblerService._

  var messages: List[Message] = Nil

  var storedCompleter: Option[Completer] = None

  def receive: Receive = {
    case completer: Completer =>
      if (messages.isEmpty)
        storedCompleter = Some(completer)
      else
        completer(messages)
    case message: Message =>
      messages +:= message
      for (completer <- storedCompleter) {
        completer(messages)
        messages = Nil
        storedCompleter = None
      }
  }
}

Notice that we are using the Completer type alias for List[Message] => Unit to make its purpose more expressive. When we receive such a completer, we either apply it to complete the request or store it for later. All the rest is like in the “ugly” approach above.

Producing completion functions

All that’s left to do is filling the missing pieces in GabblerService:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def apiRoute: Route =
// format: OFF
  authenticate(BasicAuth(UsernameEqualsPasswordAuthenticator, "Gabbler"))(user =>
    path("api" / "messages")(
      get(
        produce(instanceOf[List[Message]]){ completer => _ =>
          log.debug("User '{}' is asking for messages ...", user.username)
          gabblerFor(user.username) ! completer
        }
      ) ~
      post(
        entity(as[Message]) { message =>
          complete {
            log.debug("User '{}' has posted '{}'", user.username, message.text)
            val m = message.copy(username = user.username)
            context.children foreach (_ ! m)
            StatusCodes.NoContent
          }
        }
      )
    )
  )
// format: ON

def gabblerFor(username: String): ActorRef =
  context.child(username) getOrElse context.actorOf(Gabbler.props, username)

As you can see, we now use the produce directive when handling a GET request to create a completion function List[Message] => Unit, which we simply send to the Gabbler actor for the authenticated user. This is looked up amongst the children or, if not yet existing, created with the name of the authenticated user. Like for the entity directive, marshalling is a breeze and already working for us, because of the integration with spray-json and our Message companion object.

Of course we also have to fill the missing piece in the portion of the route handling POST request: We add the authenticated user to the unmarshalled message and send it to all Gabblers. That’s all.

Gabble away

We could use our beautiful HTML 5 UI and I encourage you to give it a try yourself, but for this blog post we will simply use good old curl to see our solution in action.

After running our GabblerServiceApp, we enter into one terminal session:

1
curl -u Heiko:Heiko http://localhost:8080/api/messages

We will see … nothing, which is expected, because we are long polling for messages and nobody is sending one. Let’s change that! In another terminal session we enter:

1
curl -u Roland:Roland -d '{ "text": "Akka rocks!", "username": "" }' -H "Content-Type: application/json" http://localhost:8080/api/messages

Which will immediately return and also make the following appear in the first terminal session:

1
2
3
4
[{
  "username": "Roland",
  "text": "Akka rocks!"
}]

No big surprise, but nice to see that it is working.

Conclusion

We have completed the server skeleton and implemented a first and simple solution for server-side push via long polling. In the next blog posts we will improve this solution, in particular make it easier to understand and also more robust.

The complete source code is available on GitHub, the current state under the tag step-02.

Comments