diff --git a/src/main/java/com/loafle/overflow/container/configuration/ContainerConfiguration.java b/src/main/java/com/loafle/overflow/container/configuration/ContainerConfiguration.java index 86493c9..f8f7dad 100644 --- a/src/main/java/com/loafle/overflow/container/configuration/ContainerConfiguration.java +++ b/src/main/java/com/loafle/overflow/container/configuration/ContainerConfiguration.java @@ -5,7 +5,14 @@ import java.net.SocketAddress; import java.util.Arrays; import java.util.List; +import com.google.gson.Gson; +import com.loafle.commons.rpc.protocol.RPCServerCodec; +import com.loafle.commons.rpc.protocol.json.JSONRPCServerCodec; +import com.loafle.commons.rpc.registry.RPCRegistry; +import com.loafle.commons.rpc.registry.pojo.POJORPCRegistry; import com.loafle.commons.server.Server; +import com.loafle.commons.server.socket.handler.codec.SocketServerProtocolHandler; +import com.loafle.overflow.container.server.handler.RPCServerHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -29,9 +36,12 @@ public class ContainerConfiguration { @Autowired private List pipelineChannelHandlers; + @Autowired + private Gson gson; + @Bean - public List> channelClasses() { - return Arrays.asList(NioServerSocketChannel.class); + public Class channelClass() { + return NioServerSocketChannel.class; } @Bean @@ -40,21 +50,37 @@ public class ContainerConfiguration { } @Bean - public List channelHandlers() { - return Arrays.asList(new LoggingHandler(LogLevel.INFO)); + public ChannelHandler channelHandler() { + return new LoggingHandler(LogLevel.INFO); + } + + @Bean + public RPCServerCodec rpcServerCodec() { + if (null != gson) { + return new JSONRPCServerCodec(gson); + } else { + return new JSONRPCServerCodec(); + } + } + + @Bean + public RPCRegistry rpcRegistry() { + return new POJORPCRegistry(); } @Bean public ChannelInitializer channelInitializer() { return new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline cp = ch.pipeline(); - for (ChannelHandler channelHandler : pipelineChannelHandlers) { - cp.addLast(channelHandler); - } - } - }; + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline cp = ch.pipeline(); + cp.addLast(new SocketServerProtocolHandler(true)); + cp.addLast(new RPCServerHandler()); + for (ChannelHandler channelHandler : pipelineChannelHandlers) { + cp.addLast(channelHandler); + } + } + }; } @Bean diff --git a/src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java b/src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java index 46897f7..a2155d0 100644 --- a/src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java +++ b/src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java @@ -10,6 +10,7 @@ import com.loafle.commons.server.socket.handler.codec.TextSocketFrame; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -17,6 +18,7 @@ import io.netty.channel.SimpleChannelInboundHandler; /** * RPCDecoder */ +@Component public class RPCServerHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(RPCServerHandler.class); @@ -25,9 +27,7 @@ public class RPCServerHandler extends SimpleChannelInboundHandler { @Autowired private RPCInvoker rpcInvoker; - public RPCServerHandler(RPCServerCodec serverCodec, RPCInvoker rpcInvoker) { - this.serverCodec = serverCodec; - this.rpcInvoker = rpcInvoker; + public RPCServerHandler() { } @Override diff --git a/src/main/java/com/loafle/overflow/container/service/CrawlerService.java b/src/main/java/com/loafle/overflow/container/service/CrawlerService.java index 6c363bb..115eccd 100644 --- a/src/main/java/com/loafle/overflow/container/service/CrawlerService.java +++ b/src/main/java/com/loafle/overflow/container/service/CrawlerService.java @@ -6,6 +6,7 @@ import javax.annotation.Resource; import com.loafle.overflow.container.Container; import com.loafle.overflow.crawler.Crawler; +import com.loafle.overflow.module.core.annotation.RPCService; import com.loafle.overflow.module.core.exception.OverflowException; import com.loafle.overflow.module.core.interfaces.Service; import com.loafle.overflow.module.sensorconfig.model.SensorConfig; @@ -17,7 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired; /** * CrawlerService */ -@org.springframework.stereotype.Service("CrawlerService") +@RPCService("CrawlerService") public class CrawlerService implements Service { private static final Logger logger = LoggerFactory.getLogger(CrawlerService.class); diff --git a/src/main/java/com/loafle/overflow/container/service/ProbeService.java b/src/main/java/com/loafle/overflow/container/service/ProbeService.java index 7664c3b..89761ec 100644 --- a/src/main/java/com/loafle/overflow/container/service/ProbeService.java +++ b/src/main/java/com/loafle/overflow/container/service/ProbeService.java @@ -1,12 +1,13 @@ package com.loafle.overflow.container.service; +import com.loafle.overflow.module.core.annotation.RPCService; import com.loafle.overflow.module.core.exception.OverflowException; import com.loafle.overflow.module.core.interfaces.Service; /** * ProbeService */ -@org.springframework.stereotype.Service("ProbeService") +@RPCService("ProbeService") public class ProbeService implements Service { @Override diff --git a/src/main/java/com/loafle/overflow/container/service/SensorConfigService.java b/src/main/java/com/loafle/overflow/container/service/SensorConfigService.java index 8d5aa30..639978d 100644 --- a/src/main/java/com/loafle/overflow/container/service/SensorConfigService.java +++ b/src/main/java/com/loafle/overflow/container/service/SensorConfigService.java @@ -4,6 +4,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.loafle.overflow.module.core.annotation.RPCService; import com.loafle.overflow.module.core.exception.OverflowException; import com.loafle.overflow.module.core.interfaces.Service; import com.loafle.overflow.module.sensorconfig.model.SensorConfig; @@ -14,7 +15,7 @@ import org.slf4j.LoggerFactory; /** * SensorConfigService */ -@org.springframework.stereotype.Service("SensorConfigService") +@RPCService("SensorConfigService") public class SensorConfigService implements Service { private static final Logger logger = LoggerFactory.getLogger(SensorConfigService.class); diff --git a/src/main/java/com/loafle/overflow/container/service/Service.java b/src/main/java/com/loafle/overflow/container/service/Service.java new file mode 100644 index 0000000..fb41f6d --- /dev/null +++ b/src/main/java/com/loafle/overflow/container/service/Service.java @@ -0,0 +1,53 @@ +package com.loafle.overflow.container.service; + +import java.util.Map; + +import com.loafle.commons.rpc.RPCException; +import com.loafle.commons.rpc.registry.RPCRegistry; +import com.loafle.overflow.module.core.annotation.RPCService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +/** + * Service + */ +@Component +public class Service implements InitializingBean, ApplicationContextAware { + private static final Logger logger = LoggerFactory.getLogger(Service.class); + + private ApplicationContext applicationContext; + + @Autowired + private RPCRegistry rpcRegistry; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + @Override + public void afterPropertiesSet() throws Exception { + Map services = this.applicationContext.getBeansWithAnnotation(RPCService.class); + if (null == services || 0 == services.size()) { + logger.debug("there is not service"); + return; + } + + services.forEach((name, bean) -> { + logger.debug("bean %s", bean.getClass().getName()); + try { + this.rpcRegistry.registerService(bean, bean.getClass(), name); + } catch (RPCException e) { + logger.error("RPCRegistry", e); + } + }); + } + +} \ No newline at end of file