syntax = "proto2";
package myprotocol;
message SearchRequest {
required int32 requestid = 1;
}
message SearchResponse {
required int32 requestid = 1;
required int32 responseid = 2;
}
A simple RPC call definition:
syntax = "proto2";
option java_generic_services = true;
package myprotocol;
import "Search.proto";
service SearchService {
rpc Search (SearchRequest) returns (SearchResponse);
}
Then you define two classes...the one that implements your RpcChannel is the one where you plugin your underlying messaging system, such as JMS, Solace, Tibco RV, or sockets...
package demo
import java.util.logging.Logger
import com.google.protobuf.Descriptors.MethodDescriptor
import com.google.protobuf.Message
import com.google.protobuf.RpcCallback
import com.google.protobuf.RpcChannel
import com.google.protobuf.RpcController
import myprotocol.Search.SearchRequest
import myprotocol.Search.SearchResponse
class SChannel extends RpcChannel {
val logger = Logger.getLogger("SChannel")
override def callMethod(methodDescriptor: MethodDescriptor ,
rpcController: RpcController ,
m1: Message,
m2: Message,
rpcCallback: RpcCallback[Message] ) {
m1 match {
case sr: SearchRequest => {
val response = SearchResponse.newBuilder.setRequestid(sr.getRequestid).setResponseid(2).build
rpcCallback.run(response)
}
case _ => logger.warning(s"Not handling message type $m1 yet")
}
} // override def callMethod
}
class SController extends RpcController {
val logger = Logger.getLogger("SController")
override def reset() {
logger.info("reset()")
}
override def failed(): Boolean = {
logger.info("failed()")
false
}
override def errorText(): String = {
logger.info("errorText()")
null
}
override def startCancel() {
logger.info("startCancel()")
}
override def setFailed(s: String) {
logger.info(s"setFailed($s")
}
override def isCanceled():Boolean = {
logger.info("isCanceled()")
false
}
override def notifyOnCancel(rpccallback: RpcCallback[Object]) {
logger.info(s"notifyOnCancel($rpccallback)")
}
}
Edited I
Forgot the code to actually use those classes :-)
package demo
import java.util.logging.Logger
import com.google.protobuf.RpcCallback
import myprotocol.Search.SearchRequest
import myprotocol.Search.SearchResponse
import myprotocol.Services
object RPCDemo {
val logger = Logger.getLogger("RPCDemo")
def main(args: Array[String]): Unit = {
val channel = new SChannel
val controller = new SController
val services = Services.SearchService.newStub(channel)
val request = SearchRequest.newBuilder().setRequestid(1).build
val callback = new RpcCallback[SearchResponse] {
override def run(obj: SearchResponse) = logger.info(s"Received on RpcCallback $obj")
}
services.search(controller, request, callback)
logger.info("Sleeping 5 seconds")
Thread.sleep(5000)
logger.info("Ciao")
}
}
For example, if you are looking for a pre-built TCP/IP socket implementation on top of protobuf, check out protobuf-socket-rpc
A detailed view of a custom channel can be seen there: RpcChannelImpl.java
Edited II You can obviously define an implict instead of this callback a la Java:
import scala.language.implicitConversions
implicit def f2cb(f: (SearchResponse) => Unit) = new RpcCallback[SearchResponse] {
override def run(sr: SearchResponse) = f(sr)
}
services.search(controller, request, (resp: SearchResponse) => {
logger.info(s"Received on RpcCallback $resp")
})
...
Voila.
No comments:
Post a Comment