This commit is contained in:
crusader
2018-04-23 21:40:19 +09:00
parent 82719958f9
commit 842426c560
2 changed files with 65 additions and 0 deletions

View File

@@ -0,0 +1,53 @@
package com.loafle.overflow.container.server;
import com.loafle.commons.rpc.RPCException;
import com.loafle.commons.rpc.protocol.RPCServerCodec;
import com.loafle.commons.rpc.protocol.RPCServerRequestCodec;
import com.loafle.commons.rpc.registry.RPCInvoker;
import com.loafle.commons.server.socket.netty.handler.codec.SocketFrame;
import com.loafle.commons.server.socket.netty.handler.codec.TextSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* RPCDecoder
*/
public class RPCServerHandler extends SimpleChannelInboundHandler<SocketFrame> {
private static final Logger logger = LoggerFactory.getLogger(RPCServerHandler.class);
private RPCServerCodec serverCodec;
private RPCInvoker rpcInvoker;
public RPCServerHandler(RPCServerCodec serverCodec, RPCInvoker rpcInvoker) {
this.serverCodec = serverCodec;
this.rpcInvoker = rpcInvoker;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, SocketFrame frame) throws Exception {
if (frame instanceof TextSocketFrame) {
byte[] requestBytes = null;
RPCServerRequestCodec requestCodec = null;
Object reply = null;
RPCException error = null;
try {
requestBytes = new byte[frame.content().readableBytes()];
frame.content().readBytes(requestBytes);
requestCodec = this.serverCodec.request(requestBytes);
reply = this.rpcInvoker.invoke(requestCodec);
} catch (RPCException e) {
error = e;
}
byte[] responseBytes = requestCodec.response(reply, error);
ctx.channel().writeAndFlush(responseBytes);
} else {
throw new UnsupportedOperationException(String.format("unsupported frame type: %s", frame.getClass().getName()));
}
}
}