Tuesday, 4 June 2013

Another code snippet for Scala/Akka: Folding

Folding is the art of shrinking events based on their unique id, still allowing the process of the latest data associated with that id. This pattern is usually employed in situations where the sender can output millions of messages in a very short time frame, however, the receiver can only process a subset of those. A typical example seen in the financial industry would be a feed getting into a pricer. A feed such as EUREX, with a weighted average for prices, ((bid*ask size + ask* bid size) / (bid size + ask size)) can generate 100+ updates per second. Such a feed can get into a pricer to calculate bonds or swaps (in the thousands). As the calculations usually require more steps (different pricing models, calculation methods, root-finding algorithms, etc.) the incoming feed outpaces the pricer, hence the feed to "fold" the input: only take in the latest data that you can handle. There are several ways to implement this. I have successfully implemented this using plain Java.
This post is an attempt to implement folding using Akka and Scala.
My approach is to create a custom mailbox, a class that extends akka.dispatch.MailboxType, this class has a method that returns a custom queue, where the folding logic would be implemented.
class FoldableMailbox extends MailboxType {
  def this(settings: ActorSystem.Settings, config: Config) = this()
  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new FoldableQueue
}
The queue is a class that extends akka.dispatch.MessageQueue - I have delegated the processing to another class - the idea is to provide a Java implementation and compare the performance between the pure Scala one and the Java version. (Please note that more work is needed for clean up and co. Check out the Akka source code for this).
class FoldableQueue extends MessageQueue with TLogger {
  val struct = new FoldableStruct
  
  override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = struct.clean
  override def numberOfMessages: Int = struct.nbMessages
  override def hasMessages: Boolean = struct.hasMessages
  
  override def enqueue(receiver: ActorRef, envelope: Envelope): Unit = struct.add(envelope)
  
  override def dequeue(): Envelope = {
    struct.get match {
      case e:Some[Envelope] => e.get
      case None => null
    }
  }
}
The struct for the real processing:
sealed class FoldableStruct extends TLogger {
  private var elements: List[Envelope] = List()
  private var foldableMap: Map[Any, Envelope] = Map()
  
  def clean = elements = synchronized { List() }
  def nbMessages = synchronized { elements.size }
  def hasMessages = synchronized { !elements.isEmpty }
  
  def add(env: Envelope) = synchronized {
    env.message match {
      case fm: FoldableMessage => {
        foldableMap += fm.foldableKey -> env
        if (!elements.contains(env)) ++(env)
      }
      case _ => ++(env)
    }
  }
  
  private def ++(e: Envelope) = elements = elements :+ e
  
  def get: Option[Envelope] = synchronized {
    if (hasMessages) {
      val head = elements.head
      elements = elements.tail
      head.message match {
        case fm: FoldableMessage => {
          Some(foldableMap(fm.foldableKey))
        }
        case _ => Some(head)
      }
    } else {
      None
    }
  }
}
add checks if the message is a foldable message, a class that has the FoldableMessage trait:
trait FoldableMessage {
  def foldableKey: Any
  override def hashCode = foldableKey.hashCode
  override def equals(other: Any) = other match {
    case fm:FoldableMessage => fm.foldableKey.equals(foldableKey)
    case _ => false
  }
}
If the message is a FoldableMessage, the Envelope is updated in the map using the foldableKey. If the list contains the envelope, then we do nothing, otherwise we add the envelope at the back of the list, via the ++ method.
The get method gets the head of the envelope list and if it is a FoldableMessage, returns the latest value from the map, otherwise the element itself. The test: Write an application.conf with the following (all the code above is in a package called mbfolding):
Foldable-Dispatcher {
  mailbox-type = mbfolding.FoldableMailbox
}
Then the main looks like this, with the actor definition and the message:
class FoldedActor extends Actor with TLogger {
  override def receive = {
    case fm: FoldableMsg => info(s"actor <- $fm")
    case _ =>
  }
}
class FoldableMsg(id: String, price: Double) extends FoldableMessage {
  override def foldableKey = id
  override def toString = s"FoldableMsg($id, price=$price)"
}
Main:
val actorSystem = ActorSystem("MySystem")
val folderActor = actorSystem.actorOf(Props(new FoldedActor).withDispatcher("Foldable-Dispatcher"))
    
for(i <- 1 to 100 * 1000) {
  val j = i % 3
  folderActor ! new FoldableMsg(s"msg $j", i)
}
Seems to be working ;-) PS-1: A cleaner get for the struct (thanks to C.C):
def get: Option[Envelope] = synchronized {
  elements.headOption match {
    case Some(msg) => {
      elements = elements.tail
      msg.message match {
        case fm:FoldableMessage => Some(foldableMap(fm.foldableKey))
        case _ => Some(msg)
      }
    }
    case _ => None
  }
}
PS-2: Another idea from C.C is to remove the key from the map once processed to avoid the contains on the list :-)

No comments:

Blog Archive