From fca686637ae0d8de44dfc7350865475199ced78f Mon Sep 17 00:00:00 2001 From: crusader Date: Fri, 4 May 2018 16:27:07 +0900 Subject: [PATCH] ing --- .../loafle/overflow/container/Container.java | 5 +- .../container/client/ContainerClient.java | 22 ++++ .../handler/RPCClientHandler.java} | 63 +++++------ .../configuration/ContainerConfiguration.java | 74 ++++--------- .../container/server/ContainerServer.java | 84 --------------- .../service/ContainerCollectorService.java | 102 ++++++++++++++++++ .../service/ContainerCrawlerService.java | 3 +- .../service/ContainerProbeService.java | 6 +- .../service/ContainerSensorConfigService.java | 17 +++ .../overflow/container/service/Service.java | 1 + 10 files changed, 196 insertions(+), 181 deletions(-) create mode 100644 src/main/java/com/loafle/overflow/container/client/ContainerClient.java rename src/main/java/com/loafle/overflow/container/{server/handler/RPCServerHandler.java => client/handler/RPCClientHandler.java} (58%) delete mode 100644 src/main/java/com/loafle/overflow/container/server/ContainerServer.java create mode 100644 src/main/java/com/loafle/overflow/container/service/ContainerCollectorService.java diff --git a/src/main/java/com/loafle/overflow/container/Container.java b/src/main/java/com/loafle/overflow/container/Container.java index 023bf19..f68afa6 100644 --- a/src/main/java/com/loafle/overflow/container/Container.java +++ b/src/main/java/com/loafle/overflow/container/Container.java @@ -4,11 +4,10 @@ package com.loafle.overflow.container; * Container */ public class Container { - public static final String PIDFILE_PATH = "CONTAINER_PIDFILE_PATH"; + public static final String PORT_NUMBER = "CONTAINER_PORT_NUMBER"; public static final String CRAWLERS = "CONTAINER_CRAWLERS"; public static final String SERVICES = "CONTAINER_SERVICES"; - public static final String PIPELINE_CHANNEL_HANDLERS = "CONTAINER_PIPELINE_CHANNEL_HANDLERS"; - public static final String RPC_SERVER_CODEC = "CONTAINER_RPC_SERVER_CODEC"; + public static final String RPC_CLIENT_CODEC = "CONTAINER_RPC_CLIENT_CODEC"; public static final String RPC_REGISTRY = "CONTAINER_RPC_REGISTRY"; public static final String RPC_INVOKER = "CONTAINER_RPC_INVOKER"; diff --git a/src/main/java/com/loafle/overflow/container/client/ContainerClient.java b/src/main/java/com/loafle/overflow/container/client/ContainerClient.java new file mode 100644 index 0000000..614b240 --- /dev/null +++ b/src/main/java/com/loafle/overflow/container/client/ContainerClient.java @@ -0,0 +1,22 @@ +package com.loafle.overflow.container.client; + +import com.loafle.commons.server.websocket.client.Client; + +/** + * ContainerClient + */ +public class ContainerClient extends Client { + + protected void init() throws Exception { + // no op + } + protected void onStart() throws Exception { + // no op + } + protected void onStop() throws Exception { + // no op + } + protected void destroy() throws Exception { + // no op + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java b/src/main/java/com/loafle/overflow/container/client/handler/RPCClientHandler.java similarity index 58% rename from src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java rename to src/main/java/com/loafle/overflow/container/client/handler/RPCClientHandler.java index 034ceb1..0c36d6d 100644 --- a/src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java +++ b/src/main/java/com/loafle/overflow/container/client/handler/RPCClientHandler.java @@ -1,80 +1,71 @@ -package com.loafle.overflow.container.server.handler; +package com.loafle.overflow.container.client.handler; import com.loafle.commons.rpc.RPCException; -import com.loafle.commons.rpc.protocol.RPCServerCodec; -import com.loafle.commons.rpc.protocol.RPCServerRequestCodec; +import com.loafle.commons.rpc.protocol.RPCClientCodec; +import com.loafle.commons.rpc.protocol.RPCClientResponseCodec; import com.loafle.commons.rpc.registry.RPCInvoker; -import com.loafle.commons.server.socket.handler.codec.SocketFrame; -import com.loafle.commons.server.socket.handler.codec.TextSocketFrame; import com.loafle.overflow.container.Container; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Component; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; /** - * RPCDecoder + * RPCClientHandler */ -@Component -public class RPCServerHandler extends SimpleChannelInboundHandler { - private static final Logger logger = LoggerFactory.getLogger(RPCServerHandler.class); +public class RPCClientHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(RPCClientHandler.class); @Autowired - @Qualifier(Container.RPC_SERVER_CODEC) - private RPCServerCodec serverCodec; + @Qualifier(Container.RPC_CLIENT_CODEC) + private RPCClientCodec clientCodec; @Autowired() @Qualifier(Container.RPC_INVOKER) private RPCInvoker rpcInvoker; private ChannelHandlerContext ctx; - public RPCServerHandler() { - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, SocketFrame frame) throws Exception { - if (frame instanceof TextSocketFrame) { + @Override + protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { + if (frame instanceof TextWebSocketFrame) { ByteBuf buff = null; - byte[] requestBytes = null; - RPCServerRequestCodec requestCodec = null; - Object reply = null; - RPCException error = null; + byte[] responseBytes = null; + RPCClientResponseCodec responseCodec = null; try { buff = frame.content(); - requestBytes = new byte[buff.readableBytes()]; - buff.getBytes(buff.readerIndex(), requestBytes); - - requestCodec = this.serverCodec.request(requestBytes); - reply = this.rpcInvoker.invoke(requestCodec); + responseBytes = new byte[buff.readableBytes()]; + buff.getBytes(buff.readerIndex(), responseBytes); + responseCodec = this.clientCodec.response(responseBytes); + if (responseCodec.isNotification()) { + this.rpcInvoker.invoke(responseCodec.notification()); + } else { + throw new UnsupportedOperationException(String.format("request is not unsupported")); + } } catch (RPCException e) { logger.debug("Error", e); - error = e; + throw e; } - if (!requestCodec.hasResponse()) { - return; - } - byte[] responseBytes = requestCodec.response(reply, error); - ctx.channel().writeAndFlush(responseBytes); } else { throw new UnsupportedOperationException(String.format("unsupported frame type: %s", frame.getClass().getName())); } } - + public void send(String method, Object... params) { if (null == this.ctx || !this.ctx.channel().isOpen()) { logger.warn("ChannelHandlerContext is not valid"); return; } try { - byte[] notificationBytes = this.serverCodec.notification(method, params); - this.ctx.channel().writeAndFlush(notificationBytes); + byte[] requestBytes = this.clientCodec.request(method, params, null); + this.ctx.channel().writeAndFlush(requestBytes); } catch (RPCException e) { logger.error("cannot create notification", e); } diff --git a/src/main/java/com/loafle/overflow/container/configuration/ContainerConfiguration.java b/src/main/java/com/loafle/overflow/container/configuration/ContainerConfiguration.java index 6fd506a..9355475 100644 --- a/src/main/java/com/loafle/overflow/container/configuration/ContainerConfiguration.java +++ b/src/main/java/com/loafle/overflow/container/configuration/ContainerConfiguration.java @@ -5,31 +5,25 @@ import java.net.SocketAddress; import java.util.Arrays; import java.util.List; -import com.loafle.commons.rpc.protocol.RPCServerCodec; -import com.loafle.commons.rpc.protocol.json.JSONRPCServerCodec; +import javax.annotation.Resource; + +import com.loafle.commons.rpc.protocol.RPCClientCodec; +import com.loafle.commons.rpc.protocol.json.JSONRPCClientCodec; import com.loafle.commons.rpc.registry.RPCRegistry; -import com.loafle.commons.server.Server; -import com.loafle.commons.server.socket.handler.codec.SocketServerProtocolHandler; +import com.loafle.commons.server.websocket.client.Client; import com.loafle.overflow.container.Container; -import com.loafle.overflow.container.server.handler.RPCServerHandler; +import com.loafle.overflow.container.client.handler.RPCClientHandler; import org.codehaus.jackson.map.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ServerChannel; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; +import io.netty.channel.socket.nio.NioSocketChannel; /** * ContainerConfiguration @@ -38,34 +32,25 @@ import io.netty.handler.logging.LoggingHandler; @ComponentScan(basePackages = { "com.loafle.overflow" }) @PropertySource({ "classpath:netty.properties" }) public class ContainerConfiguration { - @Autowired(required = false) - @Qualifier(Container.PIPELINE_CHANNEL_HANDLERS) - private List pipelineChannelHandlers; + @Resource(name=Container.PORT_NUMBER) + private int portNumber; @Autowired() - private RPCServerHandler rpcServerHandler; - - @Autowired(required = false) private ObjectMapper objectMapper; - @Bean(Server.CHANNEL_CLASS) - public Class channelClass() { - return NioServerSocketChannel.class; + @Bean(Client.CHANNEL_CLASS) + public Class channelClass() { + return NioSocketChannel.class; } - @Bean(Server.CHANNEL_OPTIONS) - public List> channelOptions() { - return Arrays.asList(new Server.ChannelOptionItem<>(ChannelOption.SO_BACKLOG, 100)); + @Bean(Client.CHANNEL_HANDLERS) + public List channelHandlers() { + return Arrays.asList(new RPCClientHandler()); } - @Bean(Server.CHANNEL_HANDLER) - public ChannelHandler channelHandler() { - return new LoggingHandler(LogLevel.INFO); - } - - @Bean(Container.RPC_SERVER_CODEC) - public RPCServerCodec rpcServerCodec() { - return new JSONRPCServerCodec(this.objectMapper); + @Bean(Container.RPC_CLIENT_CODEC) + public RPCClientCodec rpcClientCodec() { + return new JSONRPCClientCodec(this.objectMapper); } @Bean({Container.RPC_INVOKER, Container.RPC_REGISTRY}) @@ -73,25 +58,8 @@ public class ContainerConfiguration { return new RPCRegistry(); } - @Bean(Server.CHANNEL_INITIALIZER) - public ChannelInitializer channelInitializer() { - return new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline cp = ch.pipeline(); - cp.addLast(new SocketServerProtocolHandler(true)); - cp.addLast(rpcServerHandler); - if (null != pipelineChannelHandlers) { - for (ChannelHandler channelHandler : pipelineChannelHandlers) { - cp.addLast(channelHandler); - } - } - } - }; - } - - @Bean(Server.SOCKET_ADDRESS) + @Bean(Client.SOCKET_ADDRESS) public SocketAddress address() { - return new InetSocketAddress("127.0.0.1", 60000); + return new InetSocketAddress("127.0.0.1", portNumber); } } \ No newline at end of file diff --git a/src/main/java/com/loafle/overflow/container/server/ContainerServer.java b/src/main/java/com/loafle/overflow/container/server/ContainerServer.java deleted file mode 100644 index 57e2be7..0000000 --- a/src/main/java/com/loafle/overflow/container/server/ContainerServer.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.loafle.overflow.container.server; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; - -import javax.annotation.Resource; - -import com.loafle.commons.server.Server; -import com.loafle.overflow.container.Container; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; - -/** - * ContainerServer - */ -public class ContainerServer extends Server { - @Resource(name=Container.PIDFILE_PATH) - private String pidFilePath; - - private int portNumber; - - public ContainerServer() { - } - - public void setPidFilePath(String pidFilePath) { - this.pidFilePath = pidFilePath; - } - - protected ChannelFuture bind(ServerBootstrap serverBootstrap) throws Exception { - ChannelFuture cf = null; - - for (int i = 60000; i < 61000; i++) { - try { - cf = serverBootstrap.bind("127.0.0.1", i).sync(); - this.portNumber = i; - break; - } catch (Exception e) { - System.out.println(e); - continue; - } - } - - if (null == cf) { - throw new Exception("There is not available port"); - } - - try { - FileOutputStream outputStream = new FileOutputStream(this.pidFilePath); - outputStream.write(Integer.toString(this.portNumber).getBytes()); - outputStream.close(); - } catch (IOException e) { - throw e; - } - - return cf; - } - - @Override - protected void init() throws Exception { - - } - - @Override - protected void onStart() throws Exception { - - } - - @Override - protected void onStop() throws Exception { - try { - File f = new File(this.pidFilePath); - f.delete(); - } catch (Exception e) { - throw e; - } - } - - @Override - protected void destroy() throws Exception { - - } -} \ No newline at end of file diff --git a/src/main/java/com/loafle/overflow/container/service/ContainerCollectorService.java b/src/main/java/com/loafle/overflow/container/service/ContainerCollectorService.java new file mode 100644 index 0000000..3198227 --- /dev/null +++ b/src/main/java/com/loafle/overflow/container/service/ContainerCollectorService.java @@ -0,0 +1,102 @@ +package com.loafle.overflow.container.service; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import com.loafle.overflow.core.annotation.RPCService; +import com.loafle.overflow.core.exception.OverflowException; +import com.loafle.overflow.model.sensorconfig.SensorConfig; +import com.loafle.overflow.service.container.CollectorService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * ContainerCollectorService + */ +@RPCService("CollectorService") +public class ContainerCollectorService implements CollectorService { + private static final Logger logger = LoggerFactory.getLogger(ContainerCollectorService.class); + + private ScheduledExecutorService scheduledExecutorService; + private Map> scheduleMap; + + @Autowired + private ContainerProbeService probeService; + @Autowired + private ContainerCrawlerService crawlerService; + + @Override + public void initService() throws Exception { + this.scheduledExecutorService = Executors.newScheduledThreadPool(20); + this.scheduleMap = new HashMap<>(); + } + + @Override + public void startService() throws Exception { + } + + @Override + public void stopService() { + this.scheduledExecutorService.shutdown(); + } + + @Override + public void destroyService() { + + } + + public void addSensorConfigs(List sensorConfigs) throws OverflowException { + if (null == sensorConfigs || 0 == sensorConfigs.size()) { + return; + } + + for (SensorConfig sensorConfig : sensorConfigs) { + try { + long interval = Long.valueOf(sensorConfig.getSchedule().getInterval()).longValue(); + this.addSchedule(interval, sensorConfig); + logger.debug("scheduler of config[%s] has been added", sensorConfig.getConfigID()); + } catch (Exception e) { + throw new OverflowException(String.format("Cannot convert interval[%s] %v", sensorConfig.getSchedule().getInterval())); + } + } + } + + public void removeSensorConfigs(List sensorConfigs) throws OverflowException { + if (null == sensorConfigs || 0 == sensorConfigs.size()) { + return; + } + + for (SensorConfig sensorConfig : sensorConfigs) { + this.removeSchedule(sensorConfig.getConfigID()); + } + } + + private void addSchedule(long interval, SensorConfig sensorConfig) { + Runnable task = () -> { + try { + Map result = crawlerService.get(sensorConfig.getConfigID()); + probeService.send("DataService.Metric", result); + } catch (Exception e) { + logger.error("Cannot get data from crawler[%s] %s", sensorConfig.getCrawler().getName(), e.getMessage()); + } + }; + ScheduledFuture sf = this.scheduledExecutorService.schedule(task, interval, TimeUnit.SECONDS); + this.scheduleMap.put(sensorConfig.getConfigID(), sf); + } + + private void removeSchedule(String configID) { + if (!this.scheduleMap.containsKey(configID)) { + return; + } + ScheduledFuture sf = this.scheduleMap.get(configID); + this.scheduleMap.remove(configID); + sf.cancel(false); + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/overflow/container/service/ContainerCrawlerService.java b/src/main/java/com/loafle/overflow/container/service/ContainerCrawlerService.java index e711271..8b10fd9 100644 --- a/src/main/java/com/loafle/overflow/container/service/ContainerCrawlerService.java +++ b/src/main/java/com/loafle/overflow/container/service/ContainerCrawlerService.java @@ -60,8 +60,7 @@ public class ContainerCrawlerService implements CrawlerService { crawler.auth(authInfoMap); } - @Override - public Map Get(String sensorConfigID) throws OverflowException { + public Map get(String sensorConfigID) throws OverflowException { SensorConfig sensorConfig = this.sensorConfigService.getSensorConfig(sensorConfigID); if (null == sensorConfig) { throw new OverflowException(String.format("There is no sensor config for id[%s]", sensorConfigID), null); diff --git a/src/main/java/com/loafle/overflow/container/service/ContainerProbeService.java b/src/main/java/com/loafle/overflow/container/service/ContainerProbeService.java index 47a9fd7..991e9c4 100644 --- a/src/main/java/com/loafle/overflow/container/service/ContainerProbeService.java +++ b/src/main/java/com/loafle/overflow/container/service/ContainerProbeService.java @@ -1,6 +1,6 @@ package com.loafle.overflow.container.service; -import com.loafle.overflow.container.server.handler.RPCServerHandler; +import com.loafle.overflow.container.client.handler.RPCClientHandler; import com.loafle.overflow.core.annotation.RPCService; import com.loafle.overflow.core.exception.OverflowException; import com.loafle.overflow.service.container.ProbeService; @@ -13,7 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; @RPCService("ProbeService") public class ContainerProbeService implements ProbeService { @Autowired() - private RPCServerHandler rpcServerHandler; + private RPCClientHandler rpcClientHandler; @Override public void initService() throws Exception { @@ -36,6 +36,6 @@ public class ContainerProbeService implements ProbeService { } public void send(String method, Object... params) throws OverflowException { - this.rpcServerHandler.send(method, params); + this.rpcClientHandler.send(method, params); } } \ No newline at end of file diff --git a/src/main/java/com/loafle/overflow/container/service/ContainerSensorConfigService.java b/src/main/java/com/loafle/overflow/container/service/ContainerSensorConfigService.java index b210411..b213d68 100644 --- a/src/main/java/com/loafle/overflow/container/service/ContainerSensorConfigService.java +++ b/src/main/java/com/loafle/overflow/container/service/ContainerSensorConfigService.java @@ -1,5 +1,7 @@ package com.loafle.overflow.container.service; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -11,6 +13,7 @@ import com.loafle.overflow.service.container.SensorConfigService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; /** * SensorConfigService @@ -21,6 +24,9 @@ public class ContainerSensorConfigService implements SensorConfigService { private Map sensorConfigs; + @Autowired + private ContainerCollectorService collectorService; + @Override public void initService() throws Exception { this.sensorConfigs = new HashMap<>(); @@ -54,6 +60,9 @@ public class ContainerSensorConfigService implements SensorConfigService { for (SensorConfig sensorConfig : sensorConfigs) { this.sensorConfigs.put(sensorConfig.getConfigID(), sensorConfig); } + + this.collectorService.addSensorConfigs(sensorConfigs); + logger.debug("Sensor configs[%d] were added", sensorConfigs.size()); } @@ -70,6 +79,8 @@ public class ContainerSensorConfigService implements SensorConfigService { this.sensorConfigs.put(configID, sensorConfig); + this.collectorService.addSensorConfigs(Arrays.asList(sensorConfig)); + logger.debug("Sensor config[%s] was added", configID); } @@ -87,6 +98,9 @@ public class ContainerSensorConfigService implements SensorConfigService { this.sensorConfigs.remove(configID); this.sensorConfigs.put(configID, sensorConfig); + this.collectorService.removeSensorConfigs(Arrays.asList(sensorConfig)); + this.collectorService.addSensorConfigs(Arrays.asList(sensorConfig)); + logger.debug("Sensor config[%s] was updated", configID); } @@ -96,7 +110,10 @@ public class ContainerSensorConfigService implements SensorConfigService { throw new OverflowException(String.format("Sensor config[%s] is not exist", configID), null); } + SensorConfig sensorConfig = this.sensorConfigs.get(configID); + this.sensorConfigs.remove(configID); + this.collectorService.removeSensorConfigs(Arrays.asList(sensorConfig)); logger.debug("Sensor config[%s] was removed", configID); } diff --git a/src/main/java/com/loafle/overflow/container/service/Service.java b/src/main/java/com/loafle/overflow/container/service/Service.java index ef1f5d9..122012e 100644 --- a/src/main/java/com/loafle/overflow/container/service/Service.java +++ b/src/main/java/com/loafle/overflow/container/service/Service.java @@ -32,6 +32,7 @@ public class Service implements InitializingBean, ApplicationContextAware { ContainerProbeService.class, ContainerSensorConfigService.class, ContainerCrawlerService.class, + ContainerCollectorService.class, }; private ApplicationContext applicationContext;