This commit is contained in:
crusader 2018-04-25 21:30:44 +09:00
parent b1253242d9
commit 55c6a58f37
4 changed files with 18 additions and 5 deletions

View File

@ -7,5 +7,8 @@ public class Container {
public static final String PIDFILE_PATH = "CONTAINER_PIDFILE_PATH"; public static final String PIDFILE_PATH = "CONTAINER_PIDFILE_PATH";
public static final String CRAWLERS = "CONTAINER_CRAWLERS"; public static final String CRAWLERS = "CONTAINER_CRAWLERS";
public static final String PIPELINE_CHANNEL_HANDLERS = "CONTAINER_PIPELINE_CHANNEL_HANDLERS"; public static final String PIPELINE_CHANNEL_HANDLERS = "CONTAINER_PIPELINE_CHANNEL_HANDLERS";
public static final String RPC_SERVER_CODEC = "CONTAINER_RPC_SERVER_CODEC";
public static final String RPC_REGISTRY = "CONTAINER_RPC_REGISTRY";
public static final String RPC_INVOKER = "CONTAINER_RPC_INVOKER";
} }

View File

@ -60,7 +60,7 @@ public class ContainerConfiguration {
return new LoggingHandler(LogLevel.INFO); return new LoggingHandler(LogLevel.INFO);
} }
@Bean @Bean(Container.RPC_SERVER_CODEC)
public RPCServerCodec rpcServerCodec() { public RPCServerCodec rpcServerCodec() {
if (null != gson) { if (null != gson) {
return new JSONRPCServerCodec(gson); return new JSONRPCServerCodec(gson);
@ -69,7 +69,7 @@ public class ContainerConfiguration {
} }
} }
@Bean @Bean({Container.RPC_INVOKER, Container.RPC_REGISTRY})
public RPCRegistry rpcRegistry() { public RPCRegistry rpcRegistry() {
return new RPCRegistry(); return new RPCRegistry();
} }

View File

@ -6,12 +6,15 @@ import com.loafle.commons.rpc.protocol.RPCServerRequestCodec;
import com.loafle.commons.rpc.registry.RPCInvoker; import com.loafle.commons.rpc.registry.RPCInvoker;
import com.loafle.commons.server.socket.handler.codec.SocketFrame; import com.loafle.commons.server.socket.handler.codec.SocketFrame;
import com.loafle.commons.server.socket.handler.codec.TextSocketFrame; import com.loafle.commons.server.socket.handler.codec.TextSocketFrame;
import com.loafle.overflow.container.Container;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
@ -23,8 +26,10 @@ public class RPCServerHandler extends SimpleChannelInboundHandler<SocketFrame> {
private static final Logger logger = LoggerFactory.getLogger(RPCServerHandler.class); private static final Logger logger = LoggerFactory.getLogger(RPCServerHandler.class);
@Autowired @Autowired
@Qualifier(Container.RPC_SERVER_CODEC)
private RPCServerCodec serverCodec; private RPCServerCodec serverCodec;
@Autowired @Autowired()
@Qualifier(Container.RPC_INVOKER)
private RPCInvoker rpcInvoker; private RPCInvoker rpcInvoker;
public RPCServerHandler() { public RPCServerHandler() {
@ -33,13 +38,15 @@ public class RPCServerHandler extends SimpleChannelInboundHandler<SocketFrame> {
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, SocketFrame frame) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, SocketFrame frame) throws Exception {
if (frame instanceof TextSocketFrame) { if (frame instanceof TextSocketFrame) {
ByteBuf buff = null;
byte[] requestBytes = null; byte[] requestBytes = null;
RPCServerRequestCodec requestCodec = null; RPCServerRequestCodec requestCodec = null;
Object reply = null; Object reply = null;
RPCException error = null; RPCException error = null;
try { try {
requestBytes = new byte[frame.content().readableBytes()]; buff = frame.content();
frame.content().readBytes(requestBytes); requestBytes = new byte[buff.readableBytes()];
buff.getBytes(buff.readerIndex(), requestBytes);
requestCodec = this.serverCodec.request(requestBytes); requestCodec = this.serverCodec.request(requestBytes);
reply = this.rpcInvoker.invoke(requestCodec); reply = this.rpcInvoker.invoke(requestCodec);

View File

@ -6,6 +6,7 @@ import java.util.Map;
import com.loafle.commons.rpc.RPCException; import com.loafle.commons.rpc.RPCException;
import com.loafle.commons.rpc.registry.RPCRegistry; import com.loafle.commons.rpc.registry.RPCRegistry;
import com.loafle.overflow.container.Container;
import com.loafle.overflow.core.annotation.ProbeAPI; import com.loafle.overflow.core.annotation.ProbeAPI;
import com.loafle.overflow.core.annotation.RPCService; import com.loafle.overflow.core.annotation.RPCService;
@ -14,6 +15,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils; import org.springframework.core.annotation.AnnotationUtils;
@ -29,6 +31,7 @@ public class Service implements InitializingBean, ApplicationContextAware {
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
@Autowired @Autowired
@Qualifier(Container.RPC_REGISTRY)
private RPCRegistry rpcRegistry; private RPCRegistry rpcRegistry;
@Override @Override