Saturday, 21 February 2015

RPC, Protobuf, Scala

Seen too many custom frameworks for RPC, asynchronous callbacks, custom serialization, etc...... There are great frameworks out there to handle this. Let's have a look at a simple Search service, asynchronous request, using protobuf and Scala. Some *.proto def where the services are defined, and the data to marshall in and out.
syntax = "proto2";

package myprotocol;

message SearchRequest {
 required int32 requestid = 1;
}

message SearchResponse {
 required int32 requestid = 1;
 required int32 responseid = 2;
}
A simple RPC call definition:
syntax = "proto2";
option java_generic_services = true;

package myprotocol;

import "Search.proto";

service SearchService {
  rpc Search (SearchRequest) returns (SearchResponse);
}
Then you define two classes...the one that implements your RpcChannel is the one where you plugin your underlying messaging system, such as JMS, Solace, Tibco RV, or sockets...
package demo

import java.util.logging.Logger

import com.google.protobuf.Descriptors.MethodDescriptor
import com.google.protobuf.Message
import com.google.protobuf.RpcCallback
import com.google.protobuf.RpcChannel
import com.google.protobuf.RpcController

import myprotocol.Search.SearchRequest
import myprotocol.Search.SearchResponse

class SChannel extends RpcChannel {
  val logger = Logger.getLogger("SChannel")
  
  override def callMethod(methodDescriptor: MethodDescriptor , 
                          rpcController: RpcController , 
                          m1: Message, 
                          m2: Message, 
                          rpcCallback: RpcCallback[Message] ) {
    m1 match {
      case sr: SearchRequest => {
        val response = SearchResponse.newBuilder.setRequestid(sr.getRequestid).setResponseid(2).build
        rpcCallback.run(response)
      }
      case _ => logger.warning(s"Not handling message type $m1 yet")
    }
  } //  override def callMethod
}

class SController extends RpcController {
  val logger = Logger.getLogger("SController")
  
  override def reset() {
    logger.info("reset()")
  }

  override def failed(): Boolean = {
    logger.info("failed()")
    false
  }

  override def errorText(): String = {
    logger.info("errorText()")
    null
  }

  override def startCancel() {
    logger.info("startCancel()")
  }

  override def setFailed(s: String) {
    logger.info(s"setFailed($s")
  }

  override def isCanceled():Boolean = {
    logger.info("isCanceled()")
    false
  }

  override def notifyOnCancel(rpccallback: RpcCallback[Object]) {
    logger.info(s"notifyOnCancel($rpccallback)")
  }
}
Edited I Forgot the code to actually use those classes :-)
package demo

import java.util.logging.Logger

import com.google.protobuf.RpcCallback

import myprotocol.Search.SearchRequest
import myprotocol.Search.SearchResponse
import myprotocol.Services

object RPCDemo {
  val logger = Logger.getLogger("RPCDemo")
  
  def main(args: Array[String]): Unit = {
    val channel = new SChannel
    val controller = new SController
    val services = Services.SearchService.newStub(channel)
    val request = SearchRequest.newBuilder().setRequestid(1).build
    val callback = new RpcCallback[SearchResponse] {
      override def run(obj: SearchResponse) = logger.info(s"Received on RpcCallback $obj")
    }
    services.search(controller, request, callback)
    
    logger.info("Sleeping 5 seconds")
    Thread.sleep(5000)
    logger.info("Ciao")
  }
}
For example, if you are looking for a pre-built TCP/IP socket implementation on top of protobuf, check out protobuf-socket-rpc A detailed view of a custom channel can be seen there: RpcChannelImpl.java

Edited II You can obviously define an implict instead of this callback a la Java:
  import scala.language.implicitConversions
  implicit def f2cb(f: (SearchResponse) => Unit) = new RpcCallback[SearchResponse] {
    override def run(sr: SearchResponse) = f(sr)
  }

  services.search(controller, request, (resp: SearchResponse) => {
    logger.info(s"Received on RpcCallback $resp")
  })
... Voila.

No comments:

Blog Archive