diff --git a/src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java b/src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java index d73d80b..034ceb1 100644 --- a/src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java +++ b/src/main/java/com/loafle/overflow/container/server/handler/RPCServerHandler.java @@ -32,6 +32,8 @@ public class RPCServerHandler extends SimpleChannelInboundHandler { @Qualifier(Container.RPC_INVOKER) private RPCInvoker rpcInvoker; + private ChannelHandlerContext ctx; + public RPCServerHandler() { } @@ -50,7 +52,7 @@ public class RPCServerHandler extends SimpleChannelInboundHandler { requestCodec = this.serverCodec.request(requestBytes); reply = this.rpcInvoker.invoke(requestCodec); - + } catch (RPCException e) { logger.debug("Error", e); error = e; @@ -61,7 +63,47 @@ public class RPCServerHandler extends SimpleChannelInboundHandler { byte[] responseBytes = requestCodec.response(reply, error); ctx.channel().writeAndFlush(responseBytes); } else { - throw new UnsupportedOperationException(String.format("unsupported frame type: %s", frame.getClass().getName())); + throw new UnsupportedOperationException(String.format("unsupported frame type: %s", frame.getClass().getName())); } } + + public void send(String method, Object... params) { + if (null == this.ctx || !this.ctx.channel().isOpen()) { + logger.warn("ChannelHandlerContext is not valid"); + return; + } + try { + byte[] notificationBytes = this.serverCodec.notification(method, params); + this.ctx.channel().writeAndFlush(notificationBytes); + } catch (RPCException e) { + logger.error("cannot create notification", e); + } + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + // Initialize early if channel is active already. + if (ctx.channel().isActive()) { + this.ctx = ctx; + } + super.channelRegistered(ctx); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + super.channelActive(ctx); + } + + /** + * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward + * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}. + * + * Sub-classes may override this method to change behavior. + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + this.ctx = null; + super.channelInactive(ctx); + } } \ No newline at end of file diff --git a/src/main/java/com/loafle/overflow/container/service/ContainerProbeService.java b/src/main/java/com/loafle/overflow/container/service/ContainerProbeService.java index cb3cada..47a9fd7 100644 --- a/src/main/java/com/loafle/overflow/container/service/ContainerProbeService.java +++ b/src/main/java/com/loafle/overflow/container/service/ContainerProbeService.java @@ -1,14 +1,19 @@ package com.loafle.overflow.container.service; +import com.loafle.overflow.container.server.handler.RPCServerHandler; import com.loafle.overflow.core.annotation.RPCService; import com.loafle.overflow.core.exception.OverflowException; import com.loafle.overflow.service.container.ProbeService; +import org.springframework.beans.factory.annotation.Autowired; + /** * ProbeService */ @RPCService("ProbeService") public class ContainerProbeService implements ProbeService { + @Autowired() + private RPCServerHandler rpcServerHandler; @Override public void initService() throws Exception { @@ -31,6 +36,6 @@ public class ContainerProbeService implements ProbeService { } public void send(String method, Object... params) throws OverflowException { - + this.rpcServerHandler.send(method, params); } } \ No newline at end of file