From b7ec96bbe3463700b11f79f73ec40409a2087435 Mon Sep 17 00:00:00 2001 From: crusader Date: Mon, 2 Jul 2018 15:30:47 +0900 Subject: [PATCH] ing --- package-lock.json | 37 ++++++-- package.json | 2 + .../loafer/ng-rpc/src/lib/client/RPCClient.ts | 51 ++++++----- .../ng-rpc/src/lib/client/RPCClientRWC.ts | 8 +- .../rwc/websocket/RPCClientWebsocketRWC.ts | 18 ++-- .../rwc/websocket/RxWebsocketSubject.ts | 85 +++++++++++++------ projects/loafer/ng-rpc/src/lib/codec/codec.ts | 33 +++++++ .../ng-rpc/src/lib/codec/compression_codec.ts | 38 +++++++++ projects/loafer/ng-rpc/src/lib/core/type.ts | 1 + .../ng-rpc/src/lib/protocol/RPCClientCodec.ts | 5 +- .../lib/protocol/json/JSONRPCClientCodec.ts | 49 ++++++----- 11 files changed, 236 insertions(+), 91 deletions(-) create mode 100644 projects/loafer/ng-rpc/src/lib/codec/codec.ts create mode 100644 projects/loafer/ng-rpc/src/lib/codec/compression_codec.ts create mode 100644 projects/loafer/ng-rpc/src/lib/core/type.ts diff --git a/package-lock.json b/package-lock.json index 42c7c4f..3f5e53d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -478,6 +478,12 @@ "integrity": "sha512-jRHfWsvyMtXdbhnz5CVHxaBgnV6duZnPlQuRSo/dm/GnmikNcmZhxIES4E9OZjUmQ8C+HCl4KJux+cXN/ErGDQ==", "dev": true }, + "@types/pako": { + "version": "1.0.0", + "resolved": "https://nexus.loafle.net/repository/npm-all/@types/pako/-/pako-1.0.0.tgz", + "integrity": "sha1-6q6DZNG391LiY7w/1o3+yY5hNsU=", + "dev": true + }, "@types/q": { "version": "0.0.32", "resolved": "https://nexus.loafle.net/repository/npm-all/@types/q/-/q-0.0.32.tgz", @@ -4060,12 +4066,14 @@ "balanced-match": { "version": "1.0.0", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "brace-expansion": { "version": "1.1.11", "bundled": true, "dev": true, + "optional": true, "requires": { "balanced-match": "^1.0.0", "concat-map": "0.0.1" @@ -4080,17 +4088,20 @@ "code-point-at": { "version": "1.1.0", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "concat-map": { "version": "0.0.1", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "console-control-strings": { "version": "1.1.0", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "core-util-is": { "version": "1.0.2", @@ -4207,7 +4218,8 @@ "inherits": { "version": "2.0.3", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "ini": { "version": "1.3.5", @@ -4219,6 +4231,7 @@ "version": "1.0.0", "bundled": true, "dev": true, + "optional": true, "requires": { "number-is-nan": "^1.0.0" } @@ -4233,6 +4246,7 @@ "version": "3.0.4", "bundled": true, "dev": true, + "optional": true, "requires": { "brace-expansion": "^1.1.7" } @@ -4240,12 +4254,14 @@ "minimist": { "version": "0.0.8", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "minipass": { "version": "2.2.4", "bundled": true, "dev": true, + "optional": true, "requires": { "safe-buffer": "^5.1.1", "yallist": "^3.0.0" @@ -4264,6 +4280,7 @@ "version": "0.5.1", "bundled": true, "dev": true, + "optional": true, "requires": { "minimist": "0.0.8" } @@ -4344,7 +4361,8 @@ "number-is-nan": { "version": "1.0.1", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "object-assign": { "version": "4.1.1", @@ -4356,6 +4374,7 @@ "version": "1.4.0", "bundled": true, "dev": true, + "optional": true, "requires": { "wrappy": "1" } @@ -4477,6 +4496,7 @@ "version": "1.0.2", "bundled": true, "dev": true, + "optional": true, "requires": { "code-point-at": "^1.0.0", "is-fullwidth-code-point": "^1.0.0", @@ -7899,8 +7919,7 @@ "pako": { "version": "1.0.6", "resolved": "https://nexus.loafle.net/repository/npm-all/pako/-/pako-1.0.6.tgz", - "integrity": "sha512-lQe48YPsMJAig+yngZ87Lus+NF+3mtu7DVOBu6b/gHO1YpKwIj5AWjZ/TOS7i46HD/UixzWb1zeWDZfGZ3iYcg==", - "dev": true + "integrity": "sha512-lQe48YPsMJAig+yngZ87Lus+NF+3mtu7DVOBu6b/gHO1YpKwIj5AWjZ/TOS7i46HD/UixzWb1zeWDZfGZ3iYcg==" }, "parallel-transform": { "version": "1.1.0", diff --git a/package.json b/package.json index 9c04d13..56c8369 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "@loafer/decorator": "^0.0.1", "@ngrx/store": "^5.2.0", "core-js": "^2.5.4", + "pako": "^1.0.6", "rxjs": "^6.0.0", "zone.js": "^0.8.26" }, @@ -40,6 +41,7 @@ "@types/jasmine": "~2.8.6", "@types/jasminewd2": "~2.0.3", "@types/node": "~8.9.4", + "@types/pako": "^1.0.0", "codelyzer": "~4.2.1", "jasmine-core": "~2.99.1", "jasmine-spec-reporter": "~4.2.1", diff --git a/projects/loafer/ng-rpc/src/lib/client/RPCClient.ts b/projects/loafer/ng-rpc/src/lib/client/RPCClient.ts index 2ceb174..a10ac35 100644 --- a/projects/loafer/ng-rpc/src/lib/client/RPCClient.ts +++ b/projects/loafer/ng-rpc/src/lib/client/RPCClient.ts @@ -8,6 +8,7 @@ import { RPCClientResponseCodec, RPCClientNotificationCodec, } from '../protocol/RPCClientCodec'; +import { RPCMessage } from '../core/type'; export interface RPCRequestState { subject: Subject; @@ -17,32 +18,37 @@ export interface RPCRequestState { }; } -export abstract class RPCClient { - private _requestID: number; +export abstract class RPCClient { + private requestID: number; - private _pendingRequestsCount: number; - private _pendingRequests: Map; + private pendingRequestsCount: number; + private pendingRequests: Map; + + protected rpcClientCodec: RPCClientCodec; + protected rpcClientRWC: RPCClientRWC; public constructor( - private _codec: RPCClientCodec, - private _rwc: RPCClientRWC, + rpcClientCodec: RPCClientCodec, + rpcClientRWC: RPCClientRWC, ) { - this._requestID = 0; - this._pendingRequestsCount = 0; - this._pendingRequests = new Map(); + this.rpcClientCodec = rpcClientCodec; + this.rpcClientRWC = rpcClientRWC; + this.requestID = 0; + this.pendingRequestsCount = 0; + this.pendingRequests = new Map(); } private getRequestID(): number { - return ++this._requestID; + return ++this.requestID; } /** * connect */ public connect(queryString?: string): void { - this._rwc.connect(queryString); - this._rwc.readResponse().subscribe( - (value: T) => { + this.rpcClientRWC.connect(queryString); + this.rpcClientRWC.readResponse().subscribe( + (value: RPCMessage) => { this.onMessage(value); }, (error: any) => { @@ -58,7 +64,7 @@ export abstract class RPCClient { * close */ public disconnect() { - this._rwc.disconnect(); + this.rpcClientRWC.disconnect(); } /** @@ -96,12 +102,11 @@ export abstract class RPCClient { params: args, } }; - this._pendingRequests.set(id, reqState); - this._pendingRequestsCount++; + this.pendingRequests.set(id, reqState); + this.pendingRequestsCount++; } - const req = this._codec.request(method, args, id); - this._rwc.writeRequest(req); + this.rpcClientRWC.writeRequest(this.rpcClientCodec.request(method, args, id)); if (undefined !== resSubject) { return resSubject.asObservable(); @@ -109,8 +114,8 @@ export abstract class RPCClient { return undefined; } - private onMessage(message: Object): void { - const resCodec = this._codec.response(message); + private onMessage(message: RPCMessage): void { + const resCodec = this.rpcClientCodec.response(message); if (resCodec.isNotification()) { this.onNotification(resCodec.notification()); @@ -124,10 +129,10 @@ export abstract class RPCClient { const result = resCodec.result(); const error = resCodec.error(); - const reqState: RPCRequestState = this._pendingRequests.get(id); + const reqState: RPCRequestState = this.pendingRequests.get(id); - this._pendingRequests.delete(id); - this._pendingRequestsCount--; + this.pendingRequests.delete(id); + this.pendingRequestsCount--; if (undefined !== error) { const rpcClientError: RPCClientError = { diff --git a/projects/loafer/ng-rpc/src/lib/client/RPCClientRWC.ts b/projects/loafer/ng-rpc/src/lib/client/RPCClientRWC.ts index 29b3466..6116b0f 100644 --- a/projects/loafer/ng-rpc/src/lib/client/RPCClientRWC.ts +++ b/projects/loafer/ng-rpc/src/lib/client/RPCClientRWC.ts @@ -1,9 +1,11 @@ import { Observable } from 'rxjs'; +import { RPCMessage } from '../core/type'; -export interface RPCClientRWC { + +export interface RPCClientRWC { connect(queryString?: string): void; - readResponse(): Observable; - writeRequest(data: any): void; + readResponse(): Observable; + writeRequest(data: RPCMessage): void; disconnect(): void; connectionStatus(): Observable; } diff --git a/projects/loafer/ng-rpc/src/lib/client/rwc/websocket/RPCClientWebsocketRWC.ts b/projects/loafer/ng-rpc/src/lib/client/rwc/websocket/RPCClientWebsocketRWC.ts index 213d899..6da8d8e 100644 --- a/projects/loafer/ng-rpc/src/lib/client/rwc/websocket/RPCClientWebsocketRWC.ts +++ b/projects/loafer/ng-rpc/src/lib/client/rwc/websocket/RPCClientWebsocketRWC.ts @@ -6,15 +6,17 @@ import { } from './RxWebsocketSubject'; import { RPCClientRWC } from '../../RPCClientRWC'; +import { RPCMessage } from '../../../core/type'; -export class RPCClientWebsocketRWC implements RPCClientRWC { - private _wsSocketSubject: RxWebsocketSubject; - private _responseSubject: Subject; + +export class RPCClientWebsocketRWC implements RPCClientRWC { + private _wsSocketSubject: RxWebsocketSubject; + private _responseSubject: Subject; public constructor( private _config: RxWebsocketSubjectConfig, ) { - this._wsSocketSubject = new RxWebsocketSubject(this._config); + this._wsSocketSubject = new RxWebsocketSubject(this._config); } public connect(queryString?: string): void { @@ -23,7 +25,7 @@ export class RPCClientWebsocketRWC implements RPCClientRWC { } this._wsSocketSubject.connect(); this._wsSocketSubject.subscribe( - (value: T) => { + (value: RPCMessage) => { if (undefined !== this._responseSubject) { this._responseSubject.next(value); } @@ -47,14 +49,14 @@ export class RPCClientWebsocketRWC implements RPCClientRWC { return this._wsSocketSubject.connectionStatus; } - public readResponse(): Observable { + public readResponse(): Observable { if (undefined === this._responseSubject) { - this._responseSubject = new Subject(); + this._responseSubject = new Subject(); } return this._responseSubject.asObservable(); } - public writeRequest(data: any): void { + public writeRequest(data: RPCMessage): void { this._wsSocketSubject.write(data); } } diff --git a/projects/loafer/ng-rpc/src/lib/client/rwc/websocket/RxWebsocketSubject.ts b/projects/loafer/ng-rpc/src/lib/client/rwc/websocket/RxWebsocketSubject.ts index ab59088..1301f9c 100644 --- a/projects/loafer/ng-rpc/src/lib/client/rwc/websocket/RxWebsocketSubject.ts +++ b/projects/loafer/ng-rpc/src/lib/client/rwc/websocket/RxWebsocketSubject.ts @@ -1,18 +1,38 @@ import { Observable, Observer, Subject, interval } from 'rxjs'; import { share, distinctUntilChanged, takeWhile } from 'rxjs/operators'; import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'; +import { RPCMessage } from '../../../core/type'; -export interface RxWebsocketSubjectConfig { - url: string; - protocol?: string | Array; +export interface RxWebsocketSubjectConfig extends WebSocketSubjectConfig { reconnectInterval?: 5000; reconnectRetry?: 10; + compressionThreshold?: 1024; } -export class RxWebsocketSubject extends Subject { +const DEFAULT_RX_WEBSOCKET_CONFIG: RxWebsocketSubjectConfig = { + url: '', + deserializer: (e: MessageEvent) => e, + serializer: (value: any) => value, + openObserver: { + next: (e: Event) => { + this._connectionObserver.next(true); + } + }, + closeObserver: { + next: (e: CloseEvent) => { + this._socket = null; + this._connectionObserver.next(false); + } + }, + reconnectInterval: 5000, + reconnectRetry: 10, + compressionThreshold: 1024, +}; + +export class RxWebsocketSubject extends Subject { private _reconnectionObservable: Observable; - private _wsSubjectConfig: WebSocketSubjectConfig; - private _socket: WebSocketSubject; + private _wsSubjectConfig: RxWebsocketSubjectConfig; + private _socket: WebSocketSubject; private _connectionObserver: Observer; private _connectionStatus: Observable; private _queryString: string; @@ -27,22 +47,37 @@ export class RxWebsocketSubject extends Subject { distinctUntilChanged(), ); - this._wsSubjectConfig = { - url: _config.url, - protocol: _config.protocol, - serializer: (value: any) => value, - closeObserver: { - next: (e: CloseEvent) => { - this._socket = null; - this._connectionObserver.next(false); + this._wsSubjectConfig = this._config = { ...DEFAULT_RX_WEBSOCKET_CONFIG }; + for (const key in _config) { + if (_config.hasOwnProperty(key)) { + switch (key) { + case 'openObserver': + const oo = _config[key]; + this._wsSubjectConfig[key] = { + next: (e: Event) => { + this._connectionObserver.next(true); + oo.next(e); + } + }; + + break; + case 'closeObserver': + const co = _config[key]; + this._wsSubjectConfig[key] = { + next: (e: CloseEvent) => { + this._socket = null; + this._connectionObserver.next(false); + co.next(e); + } + }; + + break; + default: + this._wsSubjectConfig[key] = _config[key]; + break; } - }, - openObserver: { - next: (e: Event) => { - this._connectionObserver.next(true); - } - }, - }; + } + } this._connectionStatus.subscribe((isConnected: boolean) => { if (!this._reconnectionObservable && typeof(isConnected) === 'boolean' && !isConnected) { @@ -69,12 +104,12 @@ export class RxWebsocketSubject extends Subject { wsSubjectConfig.url = wsSubjectConfig.url + '?' + this._queryString; } - this._socket = new WebSocketSubject(wsSubjectConfig); + this._socket = new WebSocketSubject(wsSubjectConfig); this._socket.subscribe( - (m) => { + (m: RPCMessage) => { this.next(m); }, - (error: Event) => { + (e: Event) => { if (!this._socket) { this.reconnect(); } @@ -107,7 +142,7 @@ export class RxWebsocketSubject extends Subject { ); } - public write(data: any): void { + public write(data: RPCMessage): void { this._socket.next(data); } } diff --git a/projects/loafer/ng-rpc/src/lib/codec/codec.ts b/projects/loafer/ng-rpc/src/lib/codec/codec.ts new file mode 100644 index 0000000..eea52e6 --- /dev/null +++ b/projects/loafer/ng-rpc/src/lib/codec/codec.ts @@ -0,0 +1,33 @@ +import { RPCMessage } from '../core/type'; + +export interface Codec { + encode(message: string): RPCMessage; + decode(message: RPCMessage): string; +} + +export class DefaultCodec implements Codec { + public encode(message: string): RPCMessage { + return message; + } + public decode(message: RPCMessage): string { + return message; + } +} + +export const defaultCodec = new DefaultCodec(); + +export interface CodecSelector { + encode(message: string): RPCMessage; + decode(message: RPCMessage): string; +} + +export class DefaultCodecSelector implements CodecSelector { + public encode(message: string): RPCMessage { + return defaultCodec.encode(message); + } + public decode(message: RPCMessage): string { + return defaultCodec.decode(message); + } +} + +export const defaultCodecSelector = new DefaultCodecSelector(); diff --git a/projects/loafer/ng-rpc/src/lib/codec/compression_codec.ts b/projects/loafer/ng-rpc/src/lib/codec/compression_codec.ts new file mode 100644 index 0000000..ab1b238 --- /dev/null +++ b/projects/loafer/ng-rpc/src/lib/codec/compression_codec.ts @@ -0,0 +1,38 @@ +import { Codec, CodecSelector, defaultCodec } from './codec'; +import { RPCMessage } from '../core/type'; +import { gzip, ungzip } from 'pako'; + +export class GZipCodec implements Codec { + public encode(message: string): RPCMessage { + return gzip(message).buffer; + } + public decode(message: RPCMessage): string { + return ungzip(Buffer.from(message), {to: 'string'}); + } +} + +export const gZipCodec = new GZipCodec(); + +export class CompressionCodecSelector implements CodecSelector { + private readonly threshold: number; + + public constructor(threshold: number) { + this.threshold = threshold; + } + + public encode(message: string): RPCMessage { + if (this.threshold < Buffer.byteLength(message)) { + return gZipCodec.encode(message); + } else { + return defaultCodec.encode(message); + } + } + + public decode(message: RPCMessage): string { + if (message instanceof ArrayBuffer) { + return gZipCodec.decode(message); + } else { + return defaultCodec.decode(message); + } + } +} diff --git a/projects/loafer/ng-rpc/src/lib/core/type.ts b/projects/loafer/ng-rpc/src/lib/core/type.ts new file mode 100644 index 0000000..6e8903c --- /dev/null +++ b/projects/loafer/ng-rpc/src/lib/core/type.ts @@ -0,0 +1 @@ +export type RPCMessage = string | ArrayBuffer; diff --git a/projects/loafer/ng-rpc/src/lib/protocol/RPCClientCodec.ts b/projects/loafer/ng-rpc/src/lib/protocol/RPCClientCodec.ts index 94af99e..a4705a4 100644 --- a/projects/loafer/ng-rpc/src/lib/protocol/RPCClientCodec.ts +++ b/projects/loafer/ng-rpc/src/lib/protocol/RPCClientCodec.ts @@ -1,8 +1,9 @@ import { RPCError } from './RPCError'; +import { RPCMessage } from '../core/type'; export interface RPCClientCodec { - request(method: string, args: any[], id: number): any; - response(res: any): RPCClientResponseCodec; + request(method: string, args: any[], id: number): RPCMessage; + response(message: RPCMessage): RPCClientResponseCodec; } export interface RPCClientResponseCodec { diff --git a/projects/loafer/ng-rpc/src/lib/protocol/json/JSONRPCClientCodec.ts b/projects/loafer/ng-rpc/src/lib/protocol/json/JSONRPCClientCodec.ts index a18971d..04f65f9 100644 --- a/projects/loafer/ng-rpc/src/lib/protocol/json/JSONRPCClientCodec.ts +++ b/projects/loafer/ng-rpc/src/lib/protocol/json/JSONRPCClientCodec.ts @@ -7,6 +7,8 @@ import { import { RPCError, } from '../RPCError'; +import { RPCMessage } from '../../core/type'; +import { CodecSelector, defaultCodecSelector } from '../../codec/codec'; export interface ClientNotification { method: string; @@ -28,7 +30,13 @@ export interface ClientResponse { } export class JSONRPCClientCodec implements RPCClientCodec { - public request(method: string, args: any[], id?: number): any { + private readonly codecSelector: CodecSelector; + + public constructor(codecSelector: CodecSelector = defaultCodecSelector) { + this.codecSelector = codecSelector; + } + + public request(method: string, args: any[], id?: number): RPCMessage { const params = convertParamsToStringArray(args); const req: ClientRequest = { @@ -37,16 +45,12 @@ export class JSONRPCClientCodec implements RPCClientCodec { params: 0 === params.length ? null : params, id: id, }; - return JSON.stringify(req); + + return this.codecSelector.encode(JSON.stringify(req)); } - public response(res: any): RPCClientResponseCodec { - const _res: ClientResponse = { - id: res.id, - jsonrpc: res.jsonrpc, - result: res.result, - error: res.error, - }; - return new JSONRPCClientResponseCodec(_res); + + public response(message: RPCMessage): RPCClientResponseCodec { + return new JSONRPCClientResponseCodec(this.codecSelector.decode(message)); } } @@ -80,17 +84,20 @@ function convertParamsToStringArray(args: any[]): string[] | undefined { } export class JSONRPCClientResponseCodec implements RPCClientResponseCodec { - public constructor(private _res: ClientResponse) { + private res: ClientResponse; + + public constructor(json: string) { + this.res = JSON.parse(json); } public id(): number | undefined { - return this._res.id; + return this.res.id; } public error(): RPCError | undefined { - return this._res.error; + return this.res.error; } public result(): any | undefined { - return this._res.result; + return this.res.result; } public isNotification(): boolean { @@ -104,22 +111,22 @@ export class JSONRPCClientResponseCodec implements RPCClientResponseCodec { if (undefined !== this.id() || undefined === this.result()) { return undefined; } - const _noti: ClientNotification = { - method: this._res.result.method, - params: this._res.result.params, + const noti: ClientNotification = { + method: this.res.result.method, + params: this.res.result.params, }; - return new JSONRPCClientNotificationCodec(_noti); + return new JSONRPCClientNotificationCodec(noti); } } export class JSONRPCClientNotificationCodec implements RPCClientNotificationCodec { - public constructor(private _noti: ClientNotification) { + public constructor(private noti: ClientNotification) { } public method(): string { - return this._noti.method; + return this.noti.method; } public params(): any[] | undefined { - return this._noti.params; + return this.noti.params; } }