syntax = "proto2"; package myprotocol; message SearchRequest { required int32 requestid = 1; } message SearchResponse { required int32 requestid = 1; required int32 responseid = 2; }A simple RPC call definition:
syntax = "proto2"; option java_generic_services = true; package myprotocol; import "Search.proto"; service SearchService { rpc Search (SearchRequest) returns (SearchResponse); }Then you define two classes...the one that implements your RpcChannel is the one where you plugin your underlying messaging system, such as JMS, Solace, Tibco RV, or sockets...
package demo import java.util.logging.Logger import com.google.protobuf.Descriptors.MethodDescriptor import com.google.protobuf.Message import com.google.protobuf.RpcCallback import com.google.protobuf.RpcChannel import com.google.protobuf.RpcController import myprotocol.Search.SearchRequest import myprotocol.Search.SearchResponse class SChannel extends RpcChannel { val logger = Logger.getLogger("SChannel") override def callMethod(methodDescriptor: MethodDescriptor , rpcController: RpcController , m1: Message, m2: Message, rpcCallback: RpcCallback[Message] ) { m1 match { case sr: SearchRequest => { val response = SearchResponse.newBuilder.setRequestid(sr.getRequestid).setResponseid(2).build rpcCallback.run(response) } case _ => logger.warning(s"Not handling message type $m1 yet") } } // override def callMethod } class SController extends RpcController { val logger = Logger.getLogger("SController") override def reset() { logger.info("reset()") } override def failed(): Boolean = { logger.info("failed()") false } override def errorText(): String = { logger.info("errorText()") null } override def startCancel() { logger.info("startCancel()") } override def setFailed(s: String) { logger.info(s"setFailed($s") } override def isCanceled():Boolean = { logger.info("isCanceled()") false } override def notifyOnCancel(rpccallback: RpcCallback[Object]) { logger.info(s"notifyOnCancel($rpccallback)") } }Edited I Forgot the code to actually use those classes :-)
package demo import java.util.logging.Logger import com.google.protobuf.RpcCallback import myprotocol.Search.SearchRequest import myprotocol.Search.SearchResponse import myprotocol.Services object RPCDemo { val logger = Logger.getLogger("RPCDemo") def main(args: Array[String]): Unit = { val channel = new SChannel val controller = new SController val services = Services.SearchService.newStub(channel) val request = SearchRequest.newBuilder().setRequestid(1).build val callback = new RpcCallback[SearchResponse] { override def run(obj: SearchResponse) = logger.info(s"Received on RpcCallback $obj") } services.search(controller, request, callback) logger.info("Sleeping 5 seconds") Thread.sleep(5000) logger.info("Ciao") } }For example, if you are looking for a pre-built TCP/IP socket implementation on top of protobuf, check out protobuf-socket-rpc A detailed view of a custom channel can be seen there: RpcChannelImpl.java
Edited II You can obviously define an implict instead of this callback a la Java:
import scala.language.implicitConversions implicit def f2cb(f: (SearchResponse) => Unit) = new RpcCallback[SearchResponse] { override def run(sr: SearchResponse) = f(sr) } services.search(controller, request, (resp: SearchResponse) => { logger.info(s"Received on RpcCallback $resp") })... Voila.
No comments:
Post a Comment