diff --git a/pom.xml b/pom.xml index 7bb9866..63fa134 100644 --- a/pom.xml +++ b/pom.xml @@ -17,14 +17,21 @@ com.loafle.overflow.container-java + 1.0.0-SNAPSHOT 1.0.0-SNAPSHOT 1.0.0-SNAPSHOT 1.0.0-SNAPSHOT + 4.1.24.Final 5.0.5.RELEASE + + com.loafle.commons + rpc-java + ${commons.rpc-java.version} + com.loafle.commons server-java @@ -41,6 +48,11 @@ ${overflow.crawler-java.version} + + io.netty + netty-handler + ${netty.version} + org.springframework spring-context diff --git a/src/main/java/com/loafle/overflow/container/server/RPCServerHandler.java b/src/main/java/com/loafle/overflow/container/server/RPCServerHandler.java new file mode 100644 index 0000000..7c1ebc0 --- /dev/null +++ b/src/main/java/com/loafle/overflow/container/server/RPCServerHandler.java @@ -0,0 +1,53 @@ +package com.loafle.overflow.container.server; + +import com.loafle.commons.rpc.RPCException; +import com.loafle.commons.rpc.protocol.RPCServerCodec; +import com.loafle.commons.rpc.protocol.RPCServerRequestCodec; +import com.loafle.commons.rpc.registry.RPCInvoker; +import com.loafle.commons.server.socket.netty.handler.codec.SocketFrame; +import com.loafle.commons.server.socket.netty.handler.codec.TextSocketFrame; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +/** + * RPCDecoder + */ +public class RPCServerHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(RPCServerHandler.class); + private RPCServerCodec serverCodec; + private RPCInvoker rpcInvoker; + + public RPCServerHandler(RPCServerCodec serverCodec, RPCInvoker rpcInvoker) { + this.serverCodec = serverCodec; + this.rpcInvoker = rpcInvoker; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, SocketFrame frame) throws Exception { + if (frame instanceof TextSocketFrame) { + byte[] requestBytes = null; + RPCServerRequestCodec requestCodec = null; + Object reply = null; + RPCException error = null; + try { + requestBytes = new byte[frame.content().readableBytes()]; + frame.content().readBytes(requestBytes); + + requestCodec = this.serverCodec.request(requestBytes); + reply = this.rpcInvoker.invoke(requestCodec); + + } catch (RPCException e) { + error = e; + } + + byte[] responseBytes = requestCodec.response(reply, error); + ctx.channel().writeAndFlush(responseBytes); + } else { + throw new UnsupportedOperationException(String.format("unsupported frame type: %s", frame.getClass().getName())); + } + } +} \ No newline at end of file