Tuesday, September 27, 2011

Non blocking composition using Redis and Futures

scala-redis now supports pooling of Redis clients. Using RedisClientPool you can do some cool stuff in non blocking mode and get an improved throughput for your application.

Suppose you have a bunch of operations that you can theoretically execute in parallel, maybe a few disjoint list operations and a few operations on key/values .. like the following snippets ..

val clients = new RedisClientPool("localhost", 6379)

// left push to a list
def lp(msgs: List[String]) = {
  clients.withClient {client => {
    msgs.foreach(client.lpush("list-l", _))
    client.llen("list-l")
  }}
}

// right push to another list
def rp(msgs: List[String]) = {
  clients.withClient {client => {
    msgs.foreach(client.rpush("list-r", _))
    client.llen("list-r")
  }}
}

// key/set operations
def set(msgs: List[String]) = {
  clients.withClient {client => {
    var i = 0
    msgs.foreach { v =>
      client.set("key-%d".format(i), v)
      i += 1
    }
    Some(1000) // some dummy
  }}
}

Redis, being single threaded, you can use client pooling to allocate multiple clients and fork these operations concurrently .. Here's a snippet that does these operations asynchronously using Scala futures ..

// generate some arbitrary values
val l = (0 until 5000).map(_.toString).toList

// prepare the list of functions to invoke
val fns = List[List[String] => Option[Int]](lp, rp, set)

// schedule the futures
val tasks = fns map (fn => scala.actors.Futures.future { fn(l) })

// wait for results
val results = tasks map (future => future.apply())

And while we are on this topic of using futures for non blocking redis operations, Twitter has a cool library finagle that offers lots of cool composition stuff on Futures and other non blocking RPC mechanisms. Over the weekend I used some of them to implement scatter/gather algorithms over Redis. I am not going into the details of what I did, but here's a sample dummy example of stuffs you can do with RedisConnectionPool and Future implementation of Finagle ..

The essential idea is to be able to compose futures and write non blocking code all the way down. This is made possible through monadic non-blocking map and flatMap operations and a host of other utility functions that use them. Here's an example ..

def collect[A](fs: Seq[Future[A]]): Future[Seq[A]] = { //..

It uses flatMap and map to collect the results from the given list of futures into a new future of Seq[A].

Let's have a look at a specific example where we push a number of elements into 100 lists concurrently using a pool of futures, backed by ExecutorService. This is the scatter phase of the algorithm. The function listPush actually does the push using a RedisConnectionPool and each of these operations is done within a Future. FuturePool gives you a Future where you can specify timeouts and exception handlers using Scala closures.

Note how we use the combinator collect for concurrent composition of the futures. The resulting future that collect returns will be complete when all the underlying futures have completed.

After the scatter phase we prepare for the gather phase by pipelining the future computation using flatMap. Unlike collect, flatMap is a combinator for sequential composition. In the following snippet, once allPushes completes, the result pipelines into the following closure that generates another Future. The whole operation completes only when we have both the futures completed. Or we have an exception in either of them.

For more details on how to use these combinators on Future abstractions, have a look at the tutorial that the Twitter guys published recently.

implicit val timer = new JavaTimer

// set up Executors
val futures = FuturePool(Executors.newFixedThreadPool(8))

// abstracting the flow with future
private[this] def flow[A](noOfRecipients: Int, opsPerClient: Int, fn: (Int, String) => A) = {
  val fs = (1 to noOfRecipients) map {i => 
    futures {
      fn(opsPerClient, "list_" + i)
    }.within(40.seconds) handle {
      case _: TimeoutException => null.asInstanceOf[A]
    }
  }
  Future.collect(fs)
}

// scatter across clients and gather them to do a sum
def scatterGatherWithList(opsPerClient: Int)(implicit clients: RedisClientPool) = {
  // scatter
  val allPushes: Future[Seq[String]] = flow(100, opsPerClient, listPush)
  val allSum = allPushes flatMap {result =>
    // gather
    val allPops: Future[Seq[Long]] = flow(100, opsPerClient, listPop)
    allPops map {members => members.sum}
  }
  allSum.apply
}

For the complete example implementations of these patterns like scatter/gather using Redis, have a look at the github repo for scala-redis.