This commit is contained in:
crusader 2018-05-04 16:27:07 +09:00
parent 736a57e5b7
commit fca686637a
10 changed files with 196 additions and 181 deletions

View File

@ -4,11 +4,10 @@ package com.loafle.overflow.container;
* Container
*/
public class Container {
public static final String PIDFILE_PATH = "CONTAINER_PIDFILE_PATH";
public static final String PORT_NUMBER = "CONTAINER_PORT_NUMBER";
public static final String CRAWLERS = "CONTAINER_CRAWLERS";
public static final String SERVICES = "CONTAINER_SERVICES";
public static final String PIPELINE_CHANNEL_HANDLERS = "CONTAINER_PIPELINE_CHANNEL_HANDLERS";
public static final String RPC_SERVER_CODEC = "CONTAINER_RPC_SERVER_CODEC";
public static final String RPC_CLIENT_CODEC = "CONTAINER_RPC_CLIENT_CODEC";
public static final String RPC_REGISTRY = "CONTAINER_RPC_REGISTRY";
public static final String RPC_INVOKER = "CONTAINER_RPC_INVOKER";

View File

@ -0,0 +1,22 @@
package com.loafle.overflow.container.client;
import com.loafle.commons.server.websocket.client.Client;
/**
* ContainerClient
*/
public class ContainerClient extends Client {
protected void init() throws Exception {
// no op
}
protected void onStart() throws Exception {
// no op
}
protected void onStop() throws Exception {
// no op
}
protected void destroy() throws Exception {
// no op
}
}

View File

@ -1,80 +1,71 @@
package com.loafle.overflow.container.server.handler;
package com.loafle.overflow.container.client.handler;
import com.loafle.commons.rpc.RPCException;
import com.loafle.commons.rpc.protocol.RPCServerCodec;
import com.loafle.commons.rpc.protocol.RPCServerRequestCodec;
import com.loafle.commons.rpc.protocol.RPCClientCodec;
import com.loafle.commons.rpc.protocol.RPCClientResponseCodec;
import com.loafle.commons.rpc.registry.RPCInvoker;
import com.loafle.commons.server.socket.handler.codec.SocketFrame;
import com.loafle.commons.server.socket.handler.codec.TextSocketFrame;
import com.loafle.overflow.container.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
/**
* RPCDecoder
* RPCClientHandler
*/
@Component
public class RPCServerHandler extends SimpleChannelInboundHandler<SocketFrame> {
private static final Logger logger = LoggerFactory.getLogger(RPCServerHandler.class);
public class RPCClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private static final Logger logger = LoggerFactory.getLogger(RPCClientHandler.class);
@Autowired
@Qualifier(Container.RPC_SERVER_CODEC)
private RPCServerCodec serverCodec;
@Qualifier(Container.RPC_CLIENT_CODEC)
private RPCClientCodec clientCodec;
@Autowired()
@Qualifier(Container.RPC_INVOKER)
private RPCInvoker rpcInvoker;
private ChannelHandlerContext ctx;
public RPCServerHandler() {
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, SocketFrame frame) throws Exception {
if (frame instanceof TextSocketFrame) {
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
if (frame instanceof TextWebSocketFrame) {
ByteBuf buff = null;
byte[] requestBytes = null;
RPCServerRequestCodec requestCodec = null;
Object reply = null;
RPCException error = null;
byte[] responseBytes = null;
RPCClientResponseCodec responseCodec = null;
try {
buff = frame.content();
requestBytes = new byte[buff.readableBytes()];
buff.getBytes(buff.readerIndex(), requestBytes);
requestCodec = this.serverCodec.request(requestBytes);
reply = this.rpcInvoker.invoke(requestCodec);
responseBytes = new byte[buff.readableBytes()];
buff.getBytes(buff.readerIndex(), responseBytes);
responseCodec = this.clientCodec.response(responseBytes);
if (responseCodec.isNotification()) {
this.rpcInvoker.invoke(responseCodec.notification());
} else {
throw new UnsupportedOperationException(String.format("request is not unsupported"));
}
} catch (RPCException e) {
logger.debug("Error", e);
error = e;
throw e;
}
if (!requestCodec.hasResponse()) {
return;
}
byte[] responseBytes = requestCodec.response(reply, error);
ctx.channel().writeAndFlush(responseBytes);
} else {
throw new UnsupportedOperationException(String.format("unsupported frame type: %s", frame.getClass().getName()));
}
}
public void send(String method, Object... params) {
if (null == this.ctx || !this.ctx.channel().isOpen()) {
logger.warn("ChannelHandlerContext is not valid");
return;
}
try {
byte[] notificationBytes = this.serverCodec.notification(method, params);
this.ctx.channel().writeAndFlush(notificationBytes);
byte[] requestBytes = this.clientCodec.request(method, params, null);
this.ctx.channel().writeAndFlush(requestBytes);
} catch (RPCException e) {
logger.error("cannot create notification", e);
}

View File

@ -5,31 +5,25 @@ import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import com.loafle.commons.rpc.protocol.RPCServerCodec;
import com.loafle.commons.rpc.protocol.json.JSONRPCServerCodec;
import javax.annotation.Resource;
import com.loafle.commons.rpc.protocol.RPCClientCodec;
import com.loafle.commons.rpc.protocol.json.JSONRPCClientCodec;
import com.loafle.commons.rpc.registry.RPCRegistry;
import com.loafle.commons.server.Server;
import com.loafle.commons.server.socket.handler.codec.SocketServerProtocolHandler;
import com.loafle.commons.server.websocket.client.Client;
import com.loafle.overflow.container.Container;
import com.loafle.overflow.container.server.handler.RPCServerHandler;
import com.loafle.overflow.container.client.handler.RPCClientHandler;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* ContainerConfiguration
@ -38,34 +32,25 @@ import io.netty.handler.logging.LoggingHandler;
@ComponentScan(basePackages = { "com.loafle.overflow" })
@PropertySource({ "classpath:netty.properties" })
public class ContainerConfiguration {
@Autowired(required = false)
@Qualifier(Container.PIPELINE_CHANNEL_HANDLERS)
private List<ChannelHandler> pipelineChannelHandlers;
@Resource(name=Container.PORT_NUMBER)
private int portNumber;
@Autowired()
private RPCServerHandler rpcServerHandler;
@Autowired(required = false)
private ObjectMapper objectMapper;
@Bean(Server.CHANNEL_CLASS)
public Class<? extends ServerChannel> channelClass() {
return NioServerSocketChannel.class;
@Bean(Client.CHANNEL_CLASS)
public Class<? extends Channel> channelClass() {
return NioSocketChannel.class;
}
@Bean(Server.CHANNEL_OPTIONS)
public List<Server.ChannelOptionItem<?>> channelOptions() {
return Arrays.asList(new Server.ChannelOptionItem<>(ChannelOption.SO_BACKLOG, 100));
@Bean(Client.CHANNEL_HANDLERS)
public List<ChannelHandler> channelHandlers() {
return Arrays.asList(new RPCClientHandler());
}
@Bean(Server.CHANNEL_HANDLER)
public ChannelHandler channelHandler() {
return new LoggingHandler(LogLevel.INFO);
}
@Bean(Container.RPC_SERVER_CODEC)
public RPCServerCodec rpcServerCodec() {
return new JSONRPCServerCodec(this.objectMapper);
@Bean(Container.RPC_CLIENT_CODEC)
public RPCClientCodec rpcClientCodec() {
return new JSONRPCClientCodec(this.objectMapper);
}
@Bean({Container.RPC_INVOKER, Container.RPC_REGISTRY})
@ -73,25 +58,8 @@ public class ContainerConfiguration {
return new RPCRegistry();
}
@Bean(Server.CHANNEL_INITIALIZER)
public ChannelInitializer<?> channelInitializer() {
return new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline cp = ch.pipeline();
cp.addLast(new SocketServerProtocolHandler(true));
cp.addLast(rpcServerHandler);
if (null != pipelineChannelHandlers) {
for (ChannelHandler channelHandler : pipelineChannelHandlers) {
cp.addLast(channelHandler);
}
}
}
};
}
@Bean(Server.SOCKET_ADDRESS)
@Bean(Client.SOCKET_ADDRESS)
public SocketAddress address() {
return new InetSocketAddress("127.0.0.1", 60000);
return new InetSocketAddress("127.0.0.1", portNumber);
}
}

View File

@ -1,84 +0,0 @@
package com.loafle.overflow.container.server;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import javax.annotation.Resource;
import com.loafle.commons.server.Server;
import com.loafle.overflow.container.Container;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
/**
* ContainerServer
*/
public class ContainerServer extends Server {
@Resource(name=Container.PIDFILE_PATH)
private String pidFilePath;
private int portNumber;
public ContainerServer() {
}
public void setPidFilePath(String pidFilePath) {
this.pidFilePath = pidFilePath;
}
protected ChannelFuture bind(ServerBootstrap serverBootstrap) throws Exception {
ChannelFuture cf = null;
for (int i = 60000; i < 61000; i++) {
try {
cf = serverBootstrap.bind("127.0.0.1", i).sync();
this.portNumber = i;
break;
} catch (Exception e) {
System.out.println(e);
continue;
}
}
if (null == cf) {
throw new Exception("There is not available port");
}
try {
FileOutputStream outputStream = new FileOutputStream(this.pidFilePath);
outputStream.write(Integer.toString(this.portNumber).getBytes());
outputStream.close();
} catch (IOException e) {
throw e;
}
return cf;
}
@Override
protected void init() throws Exception {
}
@Override
protected void onStart() throws Exception {
}
@Override
protected void onStop() throws Exception {
try {
File f = new File(this.pidFilePath);
f.delete();
} catch (Exception e) {
throw e;
}
}
@Override
protected void destroy() throws Exception {
}
}

View File

@ -0,0 +1,102 @@
package com.loafle.overflow.container.service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.loafle.overflow.core.annotation.RPCService;
import com.loafle.overflow.core.exception.OverflowException;
import com.loafle.overflow.model.sensorconfig.SensorConfig;
import com.loafle.overflow.service.container.CollectorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* ContainerCollectorService
*/
@RPCService("CollectorService")
public class ContainerCollectorService implements CollectorService {
private static final Logger logger = LoggerFactory.getLogger(ContainerCollectorService.class);
private ScheduledExecutorService scheduledExecutorService;
private Map<String, ScheduledFuture<?>> scheduleMap;
@Autowired
private ContainerProbeService probeService;
@Autowired
private ContainerCrawlerService crawlerService;
@Override
public void initService() throws Exception {
this.scheduledExecutorService = Executors.newScheduledThreadPool(20);
this.scheduleMap = new HashMap<>();
}
@Override
public void startService() throws Exception {
}
@Override
public void stopService() {
this.scheduledExecutorService.shutdown();
}
@Override
public void destroyService() {
}
public void addSensorConfigs(List<SensorConfig> sensorConfigs) throws OverflowException {
if (null == sensorConfigs || 0 == sensorConfigs.size()) {
return;
}
for (SensorConfig sensorConfig : sensorConfigs) {
try {
long interval = Long.valueOf(sensorConfig.getSchedule().getInterval()).longValue();
this.addSchedule(interval, sensorConfig);
logger.debug("scheduler of config[%s] has been added", sensorConfig.getConfigID());
} catch (Exception e) {
throw new OverflowException(String.format("Cannot convert interval[%s] %v", sensorConfig.getSchedule().getInterval()));
}
}
}
public void removeSensorConfigs(List<SensorConfig> sensorConfigs) throws OverflowException {
if (null == sensorConfigs || 0 == sensorConfigs.size()) {
return;
}
for (SensorConfig sensorConfig : sensorConfigs) {
this.removeSchedule(sensorConfig.getConfigID());
}
}
private void addSchedule(long interval, SensorConfig sensorConfig) {
Runnable task = () -> {
try {
Map<String, String> result = crawlerService.get(sensorConfig.getConfigID());
probeService.send("DataService.Metric", result);
} catch (Exception e) {
logger.error("Cannot get data from crawler[%s] %s", sensorConfig.getCrawler().getName(), e.getMessage());
}
};
ScheduledFuture<?> sf = this.scheduledExecutorService.schedule(task, interval, TimeUnit.SECONDS);
this.scheduleMap.put(sensorConfig.getConfigID(), sf);
}
private void removeSchedule(String configID) {
if (!this.scheduleMap.containsKey(configID)) {
return;
}
ScheduledFuture<?> sf = this.scheduleMap.get(configID);
this.scheduleMap.remove(configID);
sf.cancel(false);
}
}

View File

@ -60,8 +60,7 @@ public class ContainerCrawlerService implements CrawlerService {
crawler.auth(authInfoMap);
}
@Override
public Map<String, String> Get(String sensorConfigID) throws OverflowException {
public Map<String, String> get(String sensorConfigID) throws OverflowException {
SensorConfig sensorConfig = this.sensorConfigService.getSensorConfig(sensorConfigID);
if (null == sensorConfig) {
throw new OverflowException(String.format("There is no sensor config for id[%s]", sensorConfigID), null);

View File

@ -1,6 +1,6 @@
package com.loafle.overflow.container.service;
import com.loafle.overflow.container.server.handler.RPCServerHandler;
import com.loafle.overflow.container.client.handler.RPCClientHandler;
import com.loafle.overflow.core.annotation.RPCService;
import com.loafle.overflow.core.exception.OverflowException;
import com.loafle.overflow.service.container.ProbeService;
@ -13,7 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
@RPCService("ProbeService")
public class ContainerProbeService implements ProbeService {
@Autowired()
private RPCServerHandler rpcServerHandler;
private RPCClientHandler rpcClientHandler;
@Override
public void initService() throws Exception {
@ -36,6 +36,6 @@ public class ContainerProbeService implements ProbeService {
}
public void send(String method, Object... params) throws OverflowException {
this.rpcServerHandler.send(method, params);
this.rpcClientHandler.send(method, params);
}
}

View File

@ -1,5 +1,7 @@
package com.loafle.overflow.container.service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -11,6 +13,7 @@ import com.loafle.overflow.service.container.SensorConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
* SensorConfigService
@ -21,6 +24,9 @@ public class ContainerSensorConfigService implements SensorConfigService {
private Map<String, SensorConfig> sensorConfigs;
@Autowired
private ContainerCollectorService collectorService;
@Override
public void initService() throws Exception {
this.sensorConfigs = new HashMap<>();
@ -54,6 +60,9 @@ public class ContainerSensorConfigService implements SensorConfigService {
for (SensorConfig sensorConfig : sensorConfigs) {
this.sensorConfigs.put(sensorConfig.getConfigID(), sensorConfig);
}
this.collectorService.addSensorConfigs(sensorConfigs);
logger.debug("Sensor configs[%d] were added", sensorConfigs.size());
}
@ -70,6 +79,8 @@ public class ContainerSensorConfigService implements SensorConfigService {
this.sensorConfigs.put(configID, sensorConfig);
this.collectorService.addSensorConfigs(Arrays.asList(sensorConfig));
logger.debug("Sensor config[%s] was added", configID);
}
@ -87,6 +98,9 @@ public class ContainerSensorConfigService implements SensorConfigService {
this.sensorConfigs.remove(configID);
this.sensorConfigs.put(configID, sensorConfig);
this.collectorService.removeSensorConfigs(Arrays.asList(sensorConfig));
this.collectorService.addSensorConfigs(Arrays.asList(sensorConfig));
logger.debug("Sensor config[%s] was updated", configID);
}
@ -96,7 +110,10 @@ public class ContainerSensorConfigService implements SensorConfigService {
throw new OverflowException(String.format("Sensor config[%s] is not exist", configID), null);
}
SensorConfig sensorConfig = this.sensorConfigs.get(configID);
this.sensorConfigs.remove(configID);
this.collectorService.removeSensorConfigs(Arrays.asList(sensorConfig));
logger.debug("Sensor config[%s] was removed", configID);
}

View File

@ -32,6 +32,7 @@ public class Service implements InitializingBean, ApplicationContextAware {
ContainerProbeService.class,
ContainerSensorConfigService.class,
ContainerCrawlerService.class,
ContainerCollectorService.class,
};
private ApplicationContext applicationContext;