This commit is contained in:
crusader 2018-04-26 21:00:39 +09:00
parent a899f91968
commit 1ed55320c6
2 changed files with 50 additions and 3 deletions

View File

@ -32,6 +32,8 @@ public class RPCServerHandler extends SimpleChannelInboundHandler<SocketFrame> {
@Qualifier(Container.RPC_INVOKER) @Qualifier(Container.RPC_INVOKER)
private RPCInvoker rpcInvoker; private RPCInvoker rpcInvoker;
private ChannelHandlerContext ctx;
public RPCServerHandler() { public RPCServerHandler() {
} }
@ -61,7 +63,47 @@ public class RPCServerHandler extends SimpleChannelInboundHandler<SocketFrame> {
byte[] responseBytes = requestCodec.response(reply, error); byte[] responseBytes = requestCodec.response(reply, error);
ctx.channel().writeAndFlush(responseBytes); ctx.channel().writeAndFlush(responseBytes);
} else { } else {
throw new UnsupportedOperationException(String.format("unsupported frame type: %s", frame.getClass().getName())); throw new UnsupportedOperationException(String.format("unsupported frame type: %s", frame.getClass().getName()));
} }
} }
public void send(String method, Object... params) {
if (null == this.ctx || !this.ctx.channel().isOpen()) {
logger.warn("ChannelHandlerContext is not valid");
return;
}
try {
byte[] notificationBytes = this.serverCodec.notification(method, params);
this.ctx.channel().writeAndFlush(notificationBytes);
} catch (RPCException e) {
logger.error("cannot create notification", e);
}
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Initialize early if channel is active already.
if (ctx.channel().isActive()) {
this.ctx = ctx;
}
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
super.channelActive(ctx);
}
/**
* Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
this.ctx = null;
super.channelInactive(ctx);
}
} }

View File

@ -1,14 +1,19 @@
package com.loafle.overflow.container.service; package com.loafle.overflow.container.service;
import com.loafle.overflow.container.server.handler.RPCServerHandler;
import com.loafle.overflow.core.annotation.RPCService; import com.loafle.overflow.core.annotation.RPCService;
import com.loafle.overflow.core.exception.OverflowException; import com.loafle.overflow.core.exception.OverflowException;
import com.loafle.overflow.service.container.ProbeService; import com.loafle.overflow.service.container.ProbeService;
import org.springframework.beans.factory.annotation.Autowired;
/** /**
* ProbeService * ProbeService
*/ */
@RPCService("ProbeService") @RPCService("ProbeService")
public class ContainerProbeService implements ProbeService { public class ContainerProbeService implements ProbeService {
@Autowired()
private RPCServerHandler rpcServerHandler;
@Override @Override
public void initService() throws Exception { public void initService() throws Exception {
@ -31,6 +36,6 @@ public class ContainerProbeService implements ProbeService {
} }
public void send(String method, Object... params) throws OverflowException { public void send(String method, Object... params) throws OverflowException {
this.rpcServerHandler.send(method, params);
} }
} }