commit f74352ab95bc3ff1d741d7cfee9f58d331af7bf6 Author: crusader Date: Sun Apr 22 20:00:44 2018 +0900 ing diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..60af490 --- /dev/null +++ b/.gitignore @@ -0,0 +1,88 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties +### Maven template + +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties + +# Avoid ignoring Maven wrapper jar file (.jar files are usually ignored) +!/.mvn/wrapper/maven-wrapper.jar +### Java template +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt +14 +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +.idea/ +*.iml +/target/ +.settings/ +.classpath +.project diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..81d81a5 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,27 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "type": "java", + "name": "Debug", + "request": "launch", + "cwd": "${workspaceFolder}", + "console": "internalConsole", + "stopOnEntry": false, + "mainClass": "com.loafle.overflow.central.Central", + "projectName": "central", + "args": "50006" + }, + { + "type": "java", + "name": "Debug (Attach)", + "request": "attach", + "hostName": "localhost", + "port": 0 + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..1133129 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.configuration.updateBuildConfiguration": "automatic" +} \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..7665523 --- /dev/null +++ b/pom.xml @@ -0,0 +1,34 @@ + + + 4.0.0 + + + com.loafle + maven_parent_jar + 1.0.0-RELEASE + + + com.loafle.commons + server-java + jar + 1.0.0-SNAPSHOT + com.loafle.commons.server-java + + + 4.1.17.Final + + + + + io.netty + netty-all + ${netty.version} + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/BinaryWebSocketFrame.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/BinaryWebSocketFrame.java new file mode 100644 index 0000000..140046c --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/BinaryWebSocketFrame.java @@ -0,0 +1,85 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * Web Socket frame containing binary data + */ +public class BinaryWebSocketFrame extends WebSocketFrame { + + /** + * Creates a new empty binary frame. + */ + public BinaryWebSocketFrame() { + super(Unpooled.buffer(0)); + } + + /** + * Creates a new binary frame with the specified binary data. The final fragment flag is set to true. + * + * @param binaryData + * the content of the frame. + */ + public BinaryWebSocketFrame(ByteBuf binaryData) { + super(binaryData); + } + + /** + * Creates a new binary frame with the specified binary data and the final fragment flag. + * + * @param finalFragment + * flag indicating if this frame is the final fragment + * @param rsv + * reserved bits used for protocol extensions + * @param binaryData + * the content of the frame. + */ + public BinaryWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) { + super(finalFragment, rsv, binaryData); + } + + @Override + public BinaryWebSocketFrame copy() { + return (BinaryWebSocketFrame) super.copy(); + } + + @Override + public BinaryWebSocketFrame duplicate() { + return (BinaryWebSocketFrame) super.duplicate(); + } + + @Override + public BinaryWebSocketFrame retainedDuplicate() { + return (BinaryWebSocketFrame) super.retainedDuplicate(); + } + + @Override + public BinaryWebSocketFrame replace(ByteBuf content) { + return new BinaryWebSocketFrame(isFinalFragment(), rsv(), content); + } + + @Override + public BinaryWebSocketFrame retain() { + super.retain(); + return this; + } + + @Override + public BinaryWebSocketFrame retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public BinaryWebSocketFrame touch() { + super.touch(); + return this; + } + + @Override + public BinaryWebSocketFrame touch(Object hint) { + super.touch(hint); + return this; + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/CloseWebSocketFrame.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/CloseWebSocketFrame.java new file mode 100644 index 0000000..c1d8612 --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/CloseWebSocketFrame.java @@ -0,0 +1,168 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; +import io.netty.util.internal.StringUtil; + +/** + * Web Socket Frame for closing the connection + */ +public class CloseWebSocketFrame extends WebSocketFrame { + + /** + * Creates a new empty close frame. + */ + public CloseWebSocketFrame() { + super(Unpooled.buffer(0)); + } + + /** + * Creates a new empty close frame with closing getStatus code and reason text + * + * @param statusCode + * Integer status code as per RFC 6455. For + * example, 1000 indicates normal closure. + * @param reasonText + * Reason text. Set to null if no text. + */ + public CloseWebSocketFrame(int statusCode, String reasonText) { + this(true, 0, statusCode, reasonText); + } + + /** + * Creates a new close frame with no losing getStatus code and no reason text + * + * @param finalFragment + * flag indicating if this frame is the final fragment + * @param rsv + * reserved bits used for protocol extensions + */ + public CloseWebSocketFrame(boolean finalFragment, int rsv) { + this(finalFragment, rsv, Unpooled.buffer(0)); + } + + /** + * Creates a new close frame with closing status code and reason text + * + * @param finalFragment + * flag indicating if this frame is the final fragment + * @param rsv + * reserved bits used for protocol extensions + * @param statusCode + * Integer status code as per RFC 6455. For + * example, 1000 indicates normal closure. + * @param reasonText + * Reason text. Set to null if no text. + */ + public CloseWebSocketFrame(boolean finalFragment, int rsv, int statusCode, String reasonText) { + super(finalFragment, rsv, newBinaryData(statusCode, reasonText)); + } + + private static ByteBuf newBinaryData(int statusCode, String reasonText) { + if (reasonText == null) { + reasonText = StringUtil.EMPTY_STRING; + } + + ByteBuf binaryData = Unpooled.buffer(2 + reasonText.length()); + binaryData.writeShort(statusCode); + if (!reasonText.isEmpty()) { + binaryData.writeCharSequence(reasonText, CharsetUtil.UTF_8); + } + + binaryData.readerIndex(0); + return binaryData; + } + + /** + * Creates a new close frame + * + * @param finalFragment + * flag indicating if this frame is the final fragment + * @param rsv + * reserved bits used for protocol extensions + * @param binaryData + * the content of the frame. Must be 2 byte integer followed by optional UTF-8 encoded string. + */ + public CloseWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) { + super(finalFragment, rsv, binaryData); + } + + /** + * Returns the closing status code as per RFC 6455. If + * a getStatus code is set, -1 is returned. + */ + public int statusCode() { + ByteBuf binaryData = content(); + if (binaryData == null || binaryData.capacity() == 0) { + return -1; + } + + binaryData.readerIndex(0); + int statusCode = binaryData.readShort(); + binaryData.readerIndex(0); + + return statusCode; + } + + /** + * Returns the reason text as per RFC 6455 If a reason + * text is not supplied, an empty string is returned. + */ + public String reasonText() { + ByteBuf binaryData = content(); + if (binaryData == null || binaryData.capacity() <= 2) { + return ""; + } + + binaryData.readerIndex(2); + String reasonText = binaryData.toString(CharsetUtil.UTF_8); + binaryData.readerIndex(0); + + return reasonText; + } + + @Override + public CloseWebSocketFrame copy() { + return (CloseWebSocketFrame) super.copy(); + } + + @Override + public CloseWebSocketFrame duplicate() { + return (CloseWebSocketFrame) super.duplicate(); + } + + @Override + public CloseWebSocketFrame retainedDuplicate() { + return (CloseWebSocketFrame) super.retainedDuplicate(); + } + + @Override + public CloseWebSocketFrame replace(ByteBuf content) { + return new CloseWebSocketFrame(isFinalFragment(), rsv(), content); + } + + @Override + public CloseWebSocketFrame retain() { + super.retain(); + return this; + } + + @Override + public CloseWebSocketFrame retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public CloseWebSocketFrame touch() { + super.touch(); + return this; + } + + @Override + public CloseWebSocketFrame touch(Object hint) { + super.touch(hint); + return this; + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/ContinuationWebSocketFrame.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/ContinuationWebSocketFrame.java new file mode 100644 index 0000000..4490dc1 --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/ContinuationWebSocketFrame.java @@ -0,0 +1,122 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; + +/** + * Web Socket continuation frame containing continuation text or binary data. This is used for + * fragmented messages where the contents of a messages is contained more than 1 frame. + */ +public class ContinuationWebSocketFrame extends WebSocketFrame { + + /** + * Creates a new empty continuation frame. + */ + public ContinuationWebSocketFrame() { + this(Unpooled.buffer(0)); + } + + /** + * Creates a new continuation frame with the specified binary data. The final fragment flag is + * set to true. + * + * @param binaryData the content of the frame. + */ + public ContinuationWebSocketFrame(ByteBuf binaryData) { + super(binaryData); + } + + /** + * Creates a new continuation frame with the specified binary data + * + * @param finalFragment + * flag indicating if this frame is the final fragment + * @param rsv + * reserved bits used for protocol extensions + * @param binaryData + * the content of the frame. + */ + public ContinuationWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) { + super(finalFragment, rsv, binaryData); + } + + /** + * Creates a new continuation frame with the specified text data + * + * @param finalFragment + * flag indicating if this frame is the final fragment + * @param rsv + * reserved bits used for protocol extensions + * @param text + * text content of the frame. + */ + public ContinuationWebSocketFrame(boolean finalFragment, int rsv, String text) { + this(finalFragment, rsv, fromText(text)); + } + + /** + * Returns the text data in this frame + */ + public String text() { + return content().toString(CharsetUtil.UTF_8); + } + + /** + * Sets the string for this frame + * + * @param text + * text to store + */ + private static ByteBuf fromText(String text) { + if (text == null || text.isEmpty()) { + return Unpooled.EMPTY_BUFFER; + } else { + return Unpooled.copiedBuffer(text, CharsetUtil.UTF_8); + } + } + + @Override + public ContinuationWebSocketFrame copy() { + return (ContinuationWebSocketFrame) super.copy(); + } + + @Override + public ContinuationWebSocketFrame duplicate() { + return (ContinuationWebSocketFrame) super.duplicate(); + } + + @Override + public ContinuationWebSocketFrame retainedDuplicate() { + return (ContinuationWebSocketFrame) super.retainedDuplicate(); + } + + @Override + public ContinuationWebSocketFrame replace(ByteBuf content) { + return new ContinuationWebSocketFrame(isFinalFragment(), rsv(), content); + } + + @Override + public ContinuationWebSocketFrame retain() { + super.retain(); + return this; + } + + @Override + public ContinuationWebSocketFrame retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public ContinuationWebSocketFrame touch() { + super.touch(); + return this; + } + + @Override + public ContinuationWebSocketFrame touch(Object hint) { + super.touch(hint); + return this; + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/PingWebSocketFrame.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/PingWebSocketFrame.java new file mode 100644 index 0000000..78eb109 --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/PingWebSocketFrame.java @@ -0,0 +1,85 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * Web Socket frame containing binary data + */ +public class PingWebSocketFrame extends WebSocketFrame { + + /** + * Creates a new empty ping frame. + */ + public PingWebSocketFrame() { + super(true, 0, Unpooled.buffer(0)); + } + + /** + * Creates a new ping frame with the specified binary data. + * + * @param binaryData + * the content of the frame. + */ + public PingWebSocketFrame(ByteBuf binaryData) { + super(binaryData); + } + + /** + * Creates a new ping frame with the specified binary data + * + * @param finalFragment + * flag indicating if this frame is the final fragment + * @param rsv + * reserved bits used for protocol extensions + * @param binaryData + * the content of the frame. + */ + public PingWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) { + super(finalFragment, rsv, binaryData); + } + + @Override + public PingWebSocketFrame copy() { + return (PingWebSocketFrame) super.copy(); + } + + @Override + public PingWebSocketFrame duplicate() { + return (PingWebSocketFrame) super.duplicate(); + } + + @Override + public PingWebSocketFrame retainedDuplicate() { + return (PingWebSocketFrame) super.retainedDuplicate(); + } + + @Override + public PingWebSocketFrame replace(ByteBuf content) { + return new PingWebSocketFrame(isFinalFragment(), rsv(), content); + } + + @Override + public PingWebSocketFrame retain() { + super.retain(); + return this; + } + + @Override + public PingWebSocketFrame retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public PingWebSocketFrame touch() { + super.touch(); + return this; + } + + @Override + public PingWebSocketFrame touch(Object hint) { + super.touch(hint); + return this; + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/PongWebSocketFrame.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/PongWebSocketFrame.java new file mode 100644 index 0000000..72e2fb7 --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/PongWebSocketFrame.java @@ -0,0 +1,85 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * Web Socket frame containing binary data + */ +public class PongWebSocketFrame extends WebSocketFrame { + + /** + * Creates a new empty pong frame. + */ + public PongWebSocketFrame() { + super(Unpooled.buffer(0)); + } + + /** + * Creates a new pong frame with the specified binary data. + * + * @param binaryData + * the content of the frame. + */ + public PongWebSocketFrame(ByteBuf binaryData) { + super(binaryData); + } + + /** + * Creates a new pong frame with the specified binary data + * + * @param finalFragment + * flag indicating if this frame is the final fragment + * @param rsv + * reserved bits used for protocol extensions + * @param binaryData + * the content of the frame. + */ + public PongWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) { + super(finalFragment, rsv, binaryData); + } + + @Override + public PongWebSocketFrame copy() { + return (PongWebSocketFrame) super.copy(); + } + + @Override + public PongWebSocketFrame duplicate() { + return (PongWebSocketFrame) super.duplicate(); + } + + @Override + public PongWebSocketFrame retainedDuplicate() { + return (PongWebSocketFrame) super.retainedDuplicate(); + } + + @Override + public PongWebSocketFrame replace(ByteBuf content) { + return new PongWebSocketFrame(isFinalFragment(), rsv(), content); + } + + @Override + public PongWebSocketFrame retain() { + super.retain(); + return this; + } + + @Override + public PongWebSocketFrame retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public PongWebSocketFrame touch() { + super.touch(); + return this; + } + + @Override + public PongWebSocketFrame touch(Object hint) { + super.touch(hint); + return this; + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/TextWebSocketFrame.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/TextWebSocketFrame.java new file mode 100644 index 0000000..8dde6b4 --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/TextWebSocketFrame.java @@ -0,0 +1,125 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; + +/** + * Web Socket text frame + */ +public class TextWebSocketFrame extends WebSocketFrame { + + /** + * Creates a new empty text frame. + */ + public TextWebSocketFrame() { + super(Unpooled.buffer(0)); + } + + /** + * Creates a new text frame with the specified text string. The final fragment flag is set to true. + * + * @param text + * String to put in the frame + */ + public TextWebSocketFrame(String text) { + super(fromText(text)); + } + + /** + * Creates a new text frame with the specified binary data. The final fragment flag is set to true. + * + * @param binaryData + * the content of the frame. + */ + public TextWebSocketFrame(ByteBuf binaryData) { + super(binaryData); + } + + /** + * Creates a new text frame with the specified text string. The final fragment flag is set to true. + * + * @param finalFragment + * flag indicating if this frame is the final fragment + * @param rsv + * reserved bits used for protocol extensions + * @param text + * String to put in the frame + */ + public TextWebSocketFrame(boolean finalFragment, int rsv, String text) { + super(finalFragment, rsv, fromText(text)); + } + + private static ByteBuf fromText(String text) { + if (text == null || text.isEmpty()) { + return Unpooled.EMPTY_BUFFER; + } else { + return Unpooled.copiedBuffer(text, CharsetUtil.UTF_8); + } + } + + /** + * Creates a new text frame with the specified binary data. The final fragment flag is set to true. + * + * @param finalFragment + * flag indicating if this frame is the final fragment + * @param rsv + * reserved bits used for protocol extensions + * @param binaryData + * the content of the frame. + */ + public TextWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) { + super(finalFragment, rsv, binaryData); + } + + /** + * Returns the text data in this frame + */ + public String text() { + return content().toString(CharsetUtil.UTF_8); + } + + @Override + public TextWebSocketFrame copy() { + return (TextWebSocketFrame) super.copy(); + } + + @Override + public TextWebSocketFrame duplicate() { + return (TextWebSocketFrame) super.duplicate(); + } + + @Override + public TextWebSocketFrame retainedDuplicate() { + return (TextWebSocketFrame) super.retainedDuplicate(); + } + + @Override + public TextWebSocketFrame replace(ByteBuf content) { + return new TextWebSocketFrame(isFinalFragment(), rsv(), content); + } + + @Override + public TextWebSocketFrame retain() { + super.retain(); + return this; + } + + @Override + public TextWebSocketFrame retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public TextWebSocketFrame touch() { + super.touch(); + return this; + } + + @Override + public TextWebSocketFrame touch(Object hint) { + super.touch(hint); + return this; + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/Utf8FrameValidator.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/Utf8FrameValidator.java new file mode 100644 index 0000000..31b929f --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/Utf8FrameValidator.java @@ -0,0 +1,77 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.CorruptedFrameException; + +/** + * + */ +public class Utf8FrameValidator extends ChannelInboundHandlerAdapter { + + private int fragmentedFramesCount; + private Utf8Validator utf8Validator; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof WebSocketFrame) { + WebSocketFrame frame = (WebSocketFrame) msg; + + // Processing for possible fragmented messages for text and binary + // frames + if (((WebSocketFrame) msg).isFinalFragment()) { + // Final frame of the sequence. Apparently ping frames are + // allowed in the middle of a fragmented message + if (!(frame instanceof PingWebSocketFrame)) { + fragmentedFramesCount = 0; + + // Check text for UTF8 correctness + if ((frame instanceof TextWebSocketFrame) || (utf8Validator != null && utf8Validator.isChecking())) { + // Check UTF-8 correctness for this payload + checkUTF8String(ctx, frame.content()); + + // This does a second check to make sure UTF-8 + // correctness for entire text message + utf8Validator.finish(); + } + } + } else { + // Not final frame so we can expect more frames in the + // fragmented sequence + if (fragmentedFramesCount == 0) { + // First text or binary frame for a fragmented set + if (frame instanceof TextWebSocketFrame) { + checkUTF8String(ctx, frame.content()); + } + } else { + // Subsequent frames - only check if init frame is text + if (utf8Validator != null && utf8Validator.isChecking()) { + checkUTF8String(ctx, frame.content()); + } + } + + // Increment counter + fragmentedFramesCount++; + } + } + + super.channelRead(ctx, msg); + } + + private void checkUTF8String(ChannelHandlerContext ctx, ByteBuf buffer) { + try { + if (utf8Validator == null) { + utf8Validator = new Utf8Validator(); + } + utf8Validator.check(buffer); + } catch (CorruptedFrameException ex) { + if (ctx.channel().isActive()) { + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/Utf8Validator.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/Utf8Validator.java new file mode 100644 index 0000000..2e8c812 --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/Utf8Validator.java @@ -0,0 +1,65 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.CorruptedFrameException; +import io.netty.util.ByteProcessor; + +/** + * Checks UTF8 bytes for validity + */ +final class Utf8Validator implements ByteProcessor { + private static final int UTF8_ACCEPT = 0; + private static final int UTF8_REJECT = 12; + + private static final byte[] TYPES = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, + 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 8, 8, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, + 2, 2, 2, 2, 2, 2, 2, 10, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 3, 3, 11, 6, 6, 6, 5, 8, 8, 8, 8, 8, 8, 8, 8, 8, + 8, 8 }; + + private static final byte[] STATES = { 0, 12, 24, 36, 60, 96, 84, 12, 12, 12, 48, 72, 12, 12, 12, 12, 12, 12, 12, 12, + 12, 12, 12, 12, 12, 0, 12, 12, 12, 12, 12, 0, 12, 0, 12, 12, 12, 24, 12, 12, 12, 12, 12, 24, 12, 24, 12, 12, 12, + 12, 12, 12, 12, 12, 12, 24, 12, 12, 12, 12, 12, 24, 12, 12, 12, 12, 12, 12, 12, 24, 12, 12, 12, 12, 12, 12, 12, + 12, 12, 36, 12, 36, 12, 12, 12, 36, 12, 12, 12, 12, 12, 36, 12, 36, 12, 12, 12, 36, 12, 12, 12, 12, 12, 12, 12, + 12, 12, 12 }; + + @SuppressWarnings("RedundantFieldInitialization") + private int state = UTF8_ACCEPT; + private int codep; + private boolean checking; + + public void check(ByteBuf buffer) { + checking = true; + buffer.forEachByte(this); + } + + public void finish() { + checking = false; + codep = 0; + if (state != UTF8_ACCEPT) { + state = UTF8_ACCEPT; + throw new CorruptedFrameException("bytes are not UTF-8"); + } + } + + public boolean process(byte value) throws Exception { + byte type = TYPES[value & 0xFF]; + + codep = state != UTF8_ACCEPT ? value & 0x3f | codep << 6 : 0xff >> type & value; + + state = STATES[state + type]; + + if (state == UTF8_REJECT) { + checking = false; + throw new CorruptedFrameException("bytes are not UTF-8"); + } + return true; + } + + public boolean isChecking() { + return checking; + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocket13FrameDecoder.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocket13FrameDecoder.java new file mode 100644 index 0000000..9dab43f --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocket13FrameDecoder.java @@ -0,0 +1,389 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import static io.netty.buffer.ByteBufUtil.readBytes; + +import java.nio.ByteOrder; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.CorruptedFrameException; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * Decodes a web socket frame from wire protocol version 8 format. This code was forked from webbit and modified. + */ +public class WebSocket13FrameDecoder extends ByteToMessageDecoder implements WebSocketFrameDecoder { + + enum State { + READING_FIRST, READING_SECOND, READING_SIZE, MASKING_KEY, PAYLOAD, CORRUPT + } + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocket13FrameDecoder.class); + + private static final byte OPCODE_CONT = 0x0; + private static final byte OPCODE_TEXT = 0x1; + private static final byte OPCODE_BINARY = 0x2; + private static final byte OPCODE_CLOSE = 0x8; + private static final byte OPCODE_PING = 0x9; + private static final byte OPCODE_PONG = 0xA; + + private final long maxFramePayloadLength; + private final boolean allowExtensions; + private final boolean expectMaskedFrames; + private final boolean allowMaskMismatch; + + private int fragmentedFramesCount; + private boolean frameFinalFlag; + private boolean frameMasked; + private int frameRsv; + private int frameOpcode; + private long framePayloadLength; + private byte[] maskingKey; + private int framePayloadLen1; + private State state = State.READING_FIRST; + + /** + * Constructor + * + * @param expectMaskedFrames + * Web socket servers must set this to true processed incoming masked payload. Client implementations + * must set this to false. + * @param allowExtensions + * Flag to allow reserved extension bits to be used or not + * @param maxFramePayloadLength + * Maximum length of a frame's payload. Setting this to an appropriate value for you application + * helps check for denial of services attacks. + */ + public WebSocket13FrameDecoder(boolean expectMaskedFrames, boolean allowExtensions, int maxFramePayloadLength) { + this(expectMaskedFrames, allowExtensions, maxFramePayloadLength, false); + } + + /** + * Constructor + * + * @param expectMaskedFrames + * Web socket servers must set this to true processed incoming masked payload. Client implementations + * must set this to false. + * @param allowExtensions + * Flag to allow reserved extension bits to be used or not + * @param maxFramePayloadLength + * Maximum length of a frame's payload. Setting this to an appropriate value for you application + * helps check for denial of services attacks. + * @param allowMaskMismatch + * When set to true, frames which are not masked properly according to the standard will still be + * accepted. + */ + public WebSocket13FrameDecoder(boolean expectMaskedFrames, boolean allowExtensions, int maxFramePayloadLength, + boolean allowMaskMismatch) { + this.expectMaskedFrames = expectMaskedFrames; + this.allowMaskMismatch = allowMaskMismatch; + this.allowExtensions = allowExtensions; + this.maxFramePayloadLength = maxFramePayloadLength; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + switch (state) { + case READING_FIRST: + if (!in.isReadable()) { + return; + } + + framePayloadLength = 0; + + // FIN, RSV, OPCODE + byte b = in.readByte(); + frameFinalFlag = (b & 0x80) != 0; + frameRsv = (b & 0x70) >> 4; + frameOpcode = b & 0x0F; + + if (logger.isDebugEnabled()) { + logger.debug("Decoding WebSocket Frame opCode={}", frameOpcode); + } + + state = State.READING_SECOND; + case READING_SECOND: + if (!in.isReadable()) { + return; + } + // MASK, PAYLOAD LEN 1 + b = in.readByte(); + frameMasked = (b & 0x80) != 0; + framePayloadLen1 = b & 0x7F; + + if (frameRsv != 0 && !allowExtensions) { + protocolViolation(ctx, "RSV != 0 and no extension negotiated, RSV:" + frameRsv); + return; + } + + if (!allowMaskMismatch && expectMaskedFrames != frameMasked) { + protocolViolation(ctx, "received a frame that is not masked as expected"); + return; + } + + if (frameOpcode > 7) { // control frame (have MSB in opcode set) + + // control frames MUST NOT be fragmented + if (!frameFinalFlag) { + protocolViolation(ctx, "fragmented control frame"); + return; + } + + // control frames MUST have payload 125 octets or less + if (framePayloadLen1 > 125) { + protocolViolation(ctx, "control frame with payload length > 125 octets"); + return; + } + + // check for reserved control frame opcodes + if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING || frameOpcode == OPCODE_PONG)) { + protocolViolation(ctx, "control frame using reserved opcode " + frameOpcode); + return; + } + + // close frame : if there is a body, the first two bytes of the + // body MUST be a 2-byte unsigned integer (in network byte + // order) representing a getStatus code + if (frameOpcode == 8 && framePayloadLen1 == 1) { + protocolViolation(ctx, "received close control frame with payload len 1"); + return; + } + } else { // data frame + // check for reserved data frame opcodes + if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT || frameOpcode == OPCODE_BINARY)) { + protocolViolation(ctx, "data frame using reserved opcode " + frameOpcode); + return; + } + + // check opcode vs message fragmentation state 1/2 + if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) { + protocolViolation(ctx, "received continuation data frame outside fragmented message"); + return; + } + + // check opcode vs message fragmentation state 2/2 + if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) { + protocolViolation(ctx, "received non-continuation data frame while inside fragmented message"); + return; + } + } + + state = State.READING_SIZE; + case READING_SIZE: + + // Read frame payload length + if (framePayloadLen1 == 126) { + if (in.readableBytes() < 2) { + return; + } + framePayloadLength = in.readUnsignedShort(); + if (framePayloadLength < 126) { + protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)"); + return; + } + } else if (framePayloadLen1 == 127) { + if (in.readableBytes() < 8) { + return; + } + framePayloadLength = in.readLong(); + // TODO: check if it's bigger than 0x7FFFFFFFFFFFFFFF, Maybe + // just check if it's negative? + + if (framePayloadLength < 65536) { + protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)"); + return; + } + } else { + framePayloadLength = framePayloadLen1; + } + + if (framePayloadLength > maxFramePayloadLength) { + protocolViolation(ctx, "Max frame length of " + maxFramePayloadLength + " has been exceeded."); + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Decoding WebSocket Frame length={}", framePayloadLength); + } + + state = State.MASKING_KEY; + case MASKING_KEY: + if (frameMasked) { + if (in.readableBytes() < 4) { + return; + } + if (maskingKey == null) { + maskingKey = new byte[4]; + } + in.readBytes(maskingKey); + } + state = State.PAYLOAD; + case PAYLOAD: + if (in.readableBytes() < framePayloadLength) { + return; + } + + ByteBuf payloadBuffer = null; + try { + payloadBuffer = readBytes(ctx.alloc(), in, toFrameLength(framePayloadLength)); + + // Now we have all the data, the next checkpoint must be the next + // frame + state = State.READING_FIRST; + + // Unmask data if needed + if (frameMasked) { + unmask(payloadBuffer); + } + + // Processing ping/pong/close frames because they cannot be + // fragmented + if (frameOpcode == OPCODE_PING) { + out.add(new PingWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); + payloadBuffer = null; + return; + } + if (frameOpcode == OPCODE_PONG) { + out.add(new PongWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); + payloadBuffer = null; + return; + } + if (frameOpcode == OPCODE_CLOSE) { + checkCloseFrameBody(ctx, payloadBuffer); + out.add(new CloseWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); + payloadBuffer = null; + return; + } + + // Processing for possible fragmented messages for text and binary + // frames + if (frameFinalFlag) { + // Final frame of the sequence. Apparently ping frames are + // allowed in the middle of a fragmented message + if (frameOpcode != OPCODE_PING) { + fragmentedFramesCount = 0; + } + } else { + // Increment counter + fragmentedFramesCount++; + } + + // Return the frame + if (frameOpcode == OPCODE_TEXT) { + out.add(new TextWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); + payloadBuffer = null; + return; + } else if (frameOpcode == OPCODE_BINARY) { + out.add(new BinaryWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); + payloadBuffer = null; + return; + } else if (frameOpcode == OPCODE_CONT) { + out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); + payloadBuffer = null; + return; + } else { + throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: " + frameOpcode); + } + } finally { + if (payloadBuffer != null) { + payloadBuffer.release(); + } + } + case CORRUPT: + if (in.isReadable()) { + // If we don't keep reading Netty will throw an exception saying + // we can't return null if no bytes read and state not changed. + in.readByte(); + } + return; + default: + throw new Error("Shouldn't reach here."); + } + } + + private void unmask(ByteBuf frame) { + int i = frame.readerIndex(); + int end = frame.writerIndex(); + + ByteOrder order = frame.order(); + + // Remark: & 0xFF is necessary because Java will do signed expansion from + // byte to int which we don't want. + int intMask = ((maskingKey[0] & 0xFF) << 24) | ((maskingKey[1] & 0xFF) << 16) | ((maskingKey[2] & 0xFF) << 8) + | (maskingKey[3] & 0xFF); + + // If the byte order of our buffers it little endian we have to bring our mask + // into the same format, because getInt() and writeInt() will use a reversed byte order + if (order == ByteOrder.LITTLE_ENDIAN) { + intMask = Integer.reverseBytes(intMask); + } + + for (; i + 3 < end; i += 4) { + int unmasked = frame.getInt(i) ^ intMask; + frame.setInt(i, unmasked); + } + for (; i < end; i++) { + frame.setByte(i, frame.getByte(i) ^ maskingKey[i % 4]); + } + } + + private void protocolViolation(ChannelHandlerContext ctx, String reason) { + protocolViolation(ctx, new CorruptedFrameException(reason)); + } + + private void protocolViolation(ChannelHandlerContext ctx, CorruptedFrameException ex) { + state = State.CORRUPT; + if (ctx.channel().isActive()) { + Object closeMessage = new CloseWebSocketFrame(1002, null); + ctx.writeAndFlush(closeMessage).addListener(ChannelFutureListener.CLOSE); + } + throw ex; + } + + private static int toFrameLength(long l) { + if (l > Integer.MAX_VALUE) { + throw new TooLongFrameException("Length:" + l); + } else { + return (int) l; + } + } + + /** */ + protected void checkCloseFrameBody(ChannelHandlerContext ctx, ByteBuf buffer) { + if (buffer == null || !buffer.isReadable()) { + return; + } + if (buffer.readableBytes() == 1) { + protocolViolation(ctx, "Invalid close frame body"); + } + + // Save reader index + int idx = buffer.readerIndex(); + buffer.readerIndex(0); + + // Must have 2 byte integer within the valid range + int statusCode = buffer.readShort(); + if (statusCode >= 0 && statusCode <= 999 || statusCode >= 1004 && statusCode <= 1006 + || statusCode >= 1012 && statusCode <= 2999) { + protocolViolation(ctx, "Invalid close frame getStatus code: " + statusCode); + } + + // May have UTF-8 message + if (buffer.isReadable()) { + try { + new Utf8Validator().check(buffer); + } catch (CorruptedFrameException ex) { + protocolViolation(ctx, ex); + } + } + + // Restore reader index + buffer.readerIndex(idx); + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocket13FrameEncoder.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocket13FrameEncoder.java new file mode 100644 index 0000000..8c7b50a --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocket13FrameEncoder.java @@ -0,0 +1,178 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; + +/** + *

+ * Encodes a web socket frame into wire protocol version 8 format. This code was forked from webbit and modified. + *

+ */ +public class WebSocket13FrameEncoder extends MessageToMessageEncoder implements WebSocketFrameEncoder { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(WebSocket13FrameEncoder.class); + + private static final byte OPCODE_CONT = 0x0; + private static final byte OPCODE_TEXT = 0x1; + private static final byte OPCODE_BINARY = 0x2; + private static final byte OPCODE_CLOSE = 0x8; + private static final byte OPCODE_PING = 0x9; + private static final byte OPCODE_PONG = 0xA; + + /** + * The size threshold for gathering writes. Non-Masked messages bigger than this size will be be sent fragmented as + * a header and a content ByteBuf whereas messages smaller than the size will be merged into a single buffer and + * sent at once.
+ * Masked messages will always be sent at once. + */ + private static final int GATHERING_WRITE_THRESHOLD = 1024; + + private final boolean maskPayload; + + /** + * Constructor + * + * @param maskPayload + * Web socket clients must set this to true to mask payload. Server implementations must set this to + * false. + */ + public WebSocket13FrameEncoder(boolean maskPayload) { + this.maskPayload = maskPayload; + } + + @Override + protected void encode(ChannelHandlerContext ctx, WebSocketFrame msg, List out) throws Exception { + final ByteBuf data = msg.content(); + byte[] mask; + + byte opcode; + if (msg instanceof TextWebSocketFrame) { + opcode = OPCODE_TEXT; + } else if (msg instanceof PingWebSocketFrame) { + opcode = OPCODE_PING; + } else if (msg instanceof PongWebSocketFrame) { + opcode = OPCODE_PONG; + } else if (msg instanceof CloseWebSocketFrame) { + opcode = OPCODE_CLOSE; + } else if (msg instanceof BinaryWebSocketFrame) { + opcode = OPCODE_BINARY; + } else if (msg instanceof ContinuationWebSocketFrame) { + opcode = OPCODE_CONT; + } else { + throw new UnsupportedOperationException("Cannot encode frame of type: " + msg.getClass().getName()); + } + + int length = data.readableBytes(); + + if (logger.isDebugEnabled()) { + logger.debug("Encoding WebSocket Frame opCode=" + opcode + " length=" + length); + } + + int b0 = 0; + if (msg.isFinalFragment()) { + b0 |= 1 << 7; + } + b0 |= msg.rsv() % 8 << 4; + b0 |= opcode % 128; + + if (opcode == OPCODE_PING && length > 125) { + throw new TooLongFrameException("invalid payload for PING (payload length must be <= 125, was " + length); + } + + boolean release = true; + ByteBuf buf = null; + try { + int maskLength = maskPayload ? 4 : 0; + if (length <= 125) { + int size = 2 + maskLength; + if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) { + size += length; + } + buf = ctx.alloc().buffer(size); + buf.writeByte(b0); + byte b = (byte) (maskPayload ? 0x80 | (byte) length : (byte) length); + buf.writeByte(b); + } else if (length <= 0xFFFF) { + int size = 4 + maskLength; + if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) { + size += length; + } + buf = ctx.alloc().buffer(size); + buf.writeByte(b0); + buf.writeByte(maskPayload ? 0xFE : 126); + buf.writeByte(length >>> 8 & 0xFF); + buf.writeByte(length & 0xFF); + } else { + int size = 10 + maskLength; + if (maskPayload || length <= GATHERING_WRITE_THRESHOLD) { + size += length; + } + buf = ctx.alloc().buffer(size); + buf.writeByte(b0); + buf.writeByte(maskPayload ? 0xFF : 127); + buf.writeLong(length); + } + + // Write payload + if (maskPayload) { + int random = (int) (Math.random() * Integer.MAX_VALUE); + mask = ByteBuffer.allocate(4).putInt(random).array(); + buf.writeBytes(mask); + + ByteOrder srcOrder = data.order(); + ByteOrder dstOrder = buf.order(); + + int counter = 0; + int i = data.readerIndex(); + int end = data.writerIndex(); + + if (srcOrder == dstOrder) { + // Use the optimized path only when byte orders match + // Remark: & 0xFF is necessary because Java will do signed expansion from + // byte to int which we don't want. + int intMask = ((mask[0] & 0xFF) << 24) | ((mask[1] & 0xFF) << 16) | ((mask[2] & 0xFF) << 8) + | (mask[3] & 0xFF); + + // If the byte order of our buffers it little endian we have to bring our mask + // into the same format, because getInt() and writeInt() will use a reversed byte order + if (srcOrder == ByteOrder.LITTLE_ENDIAN) { + intMask = Integer.reverseBytes(intMask); + } + + for (; i + 3 < end; i += 4) { + int intData = data.getInt(i); + buf.writeInt(intData ^ intMask); + } + } + for (; i < end; i++) { + byte byteData = data.getByte(i); + buf.writeByte(byteData ^ mask[counter++ % 4]); + } + out.add(buf); + } else { + if (buf.writableBytes() >= data.readableBytes()) { + // merge buffers as this is cheaper then a gathering write if the payload is small enough + buf.writeBytes(data); + out.add(buf); + } else { + out.add(buf); + out.add(data.retain()); + } + } + release = false; + } finally { + if (release && buf != null) { + buf.release(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketFrame.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketFrame.java new file mode 100644 index 0000000..47060a0 --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketFrame.java @@ -0,0 +1,94 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DefaultByteBufHolder; +import io.netty.util.internal.StringUtil; + +/** + * Base class for web socket frames + */ +public abstract class WebSocketFrame extends DefaultByteBufHolder { + + /** + * Flag to indicate if this frame is the final fragment in a message. The first fragment (frame) may also be the + * final fragment. + */ + private final boolean finalFragment; + + /** + * RSV1, RSV2, RSV3 used for extensions + */ + private final int rsv; + + protected WebSocketFrame(ByteBuf binaryData) { + this(true, 0, binaryData); + } + + protected WebSocketFrame(boolean finalFragment, int rsv, ByteBuf binaryData) { + super(binaryData); + this.finalFragment = finalFragment; + this.rsv = rsv; + } + + /** + * Flag to indicate if this frame is the final fragment in a message. The first fragment (frame) may also be the + * final fragment. + */ + public boolean isFinalFragment() { + return finalFragment; + } + + /** + * Bits used for extensions to the standard. + */ + public int rsv() { + return rsv; + } + + @Override + public WebSocketFrame copy() { + return (WebSocketFrame) super.copy(); + } + + @Override + public WebSocketFrame duplicate() { + return (WebSocketFrame) super.duplicate(); + } + + @Override + public WebSocketFrame retainedDuplicate() { + return (WebSocketFrame) super.retainedDuplicate(); + } + + @Override + public abstract WebSocketFrame replace(ByteBuf content); + + @Override + public String toString() { + return StringUtil.simpleClassName(this) + "(data: " + contentToString() + ')'; + } + + @Override + public WebSocketFrame retain() { + super.retain(); + return this; + } + + @Override + public WebSocketFrame retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public WebSocketFrame touch() { + super.touch(); + return this; + } + + @Override + public WebSocketFrame touch(Object hint) { + super.touch(hint); + return this; + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketFrameDecoder.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketFrameDecoder.java new file mode 100644 index 0000000..641b3d5 --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketFrameDecoder.java @@ -0,0 +1,12 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelPipeline; + +/** + * Marker interface which all WebSocketFrame decoders need to implement. + * + * This makes it easier to access the added encoder later in the {@link ChannelPipeline}. + */ +public interface WebSocketFrameDecoder extends ChannelInboundHandler { +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketFrameEncoder.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketFrameEncoder.java new file mode 100644 index 0000000..025d7d3 --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketFrameEncoder.java @@ -0,0 +1,12 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.channel.ChannelOutboundHandler; +import io.netty.channel.ChannelPipeline; + +/** + * Marker interface which all WebSocketFrame encoders need to implement. + * + * This makes it easier to access the added encoder later in the {@link ChannelPipeline}. + */ +public interface WebSocketFrameEncoder extends ChannelOutboundHandler { +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketProtocolHandler.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketProtocolHandler.java new file mode 100644 index 0000000..d419fcb --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketProtocolHandler.java @@ -0,0 +1,29 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; + +import java.util.List; + +abstract class WebSocketProtocolHandler extends MessageToMessageDecoder { + @Override + protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List out) throws Exception { + if (frame instanceof PingWebSocketFrame) { + frame.content().retain(); + ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content())); + return; + } + if (frame instanceof PongWebSocketFrame) { + // Pong frames need to get ignored + return; + } + + out.add(frame.retain()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.fireExceptionCaught(cause); + ctx.close(); + } +} \ No newline at end of file diff --git a/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketServerProtocolHandler.java b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketServerProtocolHandler.java new file mode 100644 index 0000000..f62688d --- /dev/null +++ b/src/main/java/com/loafle/commons/server/socket/netty/handler/codec/WebSocketServerProtocolHandler.java @@ -0,0 +1,79 @@ +package com.loafle.commons.server.socket.netty.handler.codec; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelPipeline; + +import java.util.List; + +/** + * This handler does all the heavy lifting for you to run a websocket server. + * + * It takes care of websocket handshaking as well as processing of control frames (Close, Ping, Pong). Text and Binary + * data frames are passed to the next handler in the pipeline (implemented by you) for processing. + * + * See io.netty.example.http.websocketx.html5.WebSocketServer for usage. + * + * The implementation of this handler assumes that you just want to run a websocket server and not process other types + * HTTP requests (like GET and POST). If you wish to support both HTTP requests and websockets in the one server, refer + * to the io.netty.example.http.websocketx.server.WebSocketServer example. + * + * To know once a handshake was done you can intercept the + * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)} and check if the event was instance + * of {@link HandshakeComplete}, the event will contain extra information about the handshake such as the request and + * selected subprotocol. + */ +public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler { + + private final boolean allowExtensions; + private final int maxFramePayloadLength; + private final boolean allowMaskMismatch; + + public WebSocketServerProtocolHandler(boolean allowExtensions) { + this(allowExtensions, 65536); + } + + public WebSocketServerProtocolHandler(boolean allowExtensions, int maxFrameSize) { + this(allowExtensions, maxFrameSize, false); + } + + public WebSocketServerProtocolHandler(boolean allowExtensions, int maxFrameSize, boolean allowMaskMismatch) { + this.allowExtensions = allowExtensions; + maxFramePayloadLength = maxFrameSize; + this.allowMaskMismatch = allowMaskMismatch; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + ChannelPipeline cp = ctx.pipeline(); + + WebSocketFrameDecoder decoder = new WebSocket13FrameDecoder(true, allowExtensions, maxFramePayloadLength, + allowMaskMismatch); + WebSocketFrameEncoder encoder = new WebSocket13FrameEncoder(false); + + cp.addBefore(ctx.name(), WebSocketFrameDecoder.class.getName(), decoder); + cp.addBefore(ctx.name(), WebSocketFrameEncoder.class.getName(), encoder); + + if (cp.get(Utf8FrameValidator.class) == null) { + // Add the UFT8 checking before this one. + ctx.pipeline().addBefore(ctx.name(), Utf8FrameValidator.class.getName(), new Utf8FrameValidator()); + } + } + + @Override + protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List out) throws Exception { + if (frame instanceof CloseWebSocketFrame) { + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + return; + } + super.decode(ctx, frame, out); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.fireExceptionCaught(cause); + ctx.close(); + } +} \ No newline at end of file diff --git a/src/main/resources/_ b/src/main/resources/_ new file mode 100644 index 0000000..e69de29 diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml new file mode 100644 index 0000000..575281e --- /dev/null +++ b/src/test/resources/logback.xml @@ -0,0 +1,17 @@ + + + commons_java + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{32} - %msg%n + + + + + + + + + \ No newline at end of file