This post is an attempt to implement folding using Akka and Scala.My approach is to create a custom mailbox, a class that extends akka.dispatch.MailboxType, this class has a method that returns a custom queue, where the folding logic would be implemented.
class FoldableMailbox extends MailboxType { def this(settings: ActorSystem.Settings, config: Config) = this() final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new FoldableQueue }The queue is a class that extends akka.dispatch.MessageQueue - I have delegated the processing to another class - the idea is to provide a Java implementation and compare the performance between the pure Scala one and the Java version. (Please note that more work is needed for clean up and co. Check out the Akka source code for this).
class FoldableQueue extends MessageQueue with TLogger { val struct = new FoldableStruct override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = struct.clean override def numberOfMessages: Int = struct.nbMessages override def hasMessages: Boolean = struct.hasMessages override def enqueue(receiver: ActorRef, envelope: Envelope): Unit = struct.add(envelope) override def dequeue(): Envelope = { struct.get match { case e:Some[Envelope] => e.get case None => null } } }The struct for the real processing:
sealed class FoldableStruct extends TLogger { private var elements: List[Envelope] = List() private var foldableMap: Map[Any, Envelope] = Map() def clean = elements = synchronized { List() } def nbMessages = synchronized { elements.size } def hasMessages = synchronized { !elements.isEmpty } def add(env: Envelope) = synchronized { env.message match { case fm: FoldableMessage => { foldableMap += fm.foldableKey -> env if (!elements.contains(env)) ++(env) } case _ => ++(env) } } private def ++(e: Envelope) = elements = elements :+ e def get: Option[Envelope] = synchronized { if (hasMessages) { val head = elements.head elements = elements.tail head.message match { case fm: FoldableMessage => { Some(foldableMap(fm.foldableKey)) } case _ => Some(head) } } else { None } } }add checks if the message is a foldable message, a class that has the FoldableMessage trait:
trait FoldableMessage { def foldableKey: Any override def hashCode = foldableKey.hashCode override def equals(other: Any) = other match { case fm:FoldableMessage => fm.foldableKey.equals(foldableKey) case _ => false } }If the message is a FoldableMessage, the Envelope is updated in the map using the foldableKey. If the list contains the envelope, then we do nothing, otherwise we add the envelope at the back of the list, via the ++ method.
The get method gets the head of the envelope list and if it is a FoldableMessage, returns the latest value from the map, otherwise the element itself. The test: Write an application.conf with the following (all the code above is in a package called mbfolding):
Foldable-Dispatcher { mailbox-type = mbfolding.FoldableMailbox }Then the main looks like this, with the actor definition and the message:
class FoldedActor extends Actor with TLogger { override def receive = { case fm: FoldableMsg => info(s"actor <- $fm") case _ => } }
class FoldableMsg(id: String, price: Double) extends FoldableMessage { override def foldableKey = id override def toString = s"FoldableMsg($id, price=$price)" }Main:
val actorSystem = ActorSystem("MySystem") val folderActor = actorSystem.actorOf(Props(new FoldedActor).withDispatcher("Foldable-Dispatcher")) for(i <- 1 to 100 * 1000) { val j = i % 3 folderActor ! new FoldableMsg(s"msg $j", i) }Seems to be working ;-) PS-1: A cleaner get for the struct (thanks to C.C):
def get: Option[Envelope] = synchronized { elements.headOption match { case Some(msg) => { elements = elements.tail msg.message match { case fm:FoldableMessage => Some(foldableMap(fm.foldableKey)) case _ => Some(msg) } } case _ => None } }PS-2: Another idea from C.C is to remove the key from the map once processed to avoid the contains on the list :-)
No comments:
Post a Comment