This commit is contained in:
crusader 2018-07-02 15:30:47 +09:00
parent abba37caae
commit b7ec96bbe3
11 changed files with 236 additions and 91 deletions

37
package-lock.json generated
View File

@ -478,6 +478,12 @@
"integrity": "sha512-jRHfWsvyMtXdbhnz5CVHxaBgnV6duZnPlQuRSo/dm/GnmikNcmZhxIES4E9OZjUmQ8C+HCl4KJux+cXN/ErGDQ==",
"dev": true
},
"@types/pako": {
"version": "1.0.0",
"resolved": "https://nexus.loafle.net/repository/npm-all/@types/pako/-/pako-1.0.0.tgz",
"integrity": "sha1-6q6DZNG391LiY7w/1o3+yY5hNsU=",
"dev": true
},
"@types/q": {
"version": "0.0.32",
"resolved": "https://nexus.loafle.net/repository/npm-all/@types/q/-/q-0.0.32.tgz",
@ -4060,12 +4066,14 @@
"balanced-match": {
"version": "1.0.0",
"bundled": true,
"dev": true
"dev": true,
"optional": true
},
"brace-expansion": {
"version": "1.1.11",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"balanced-match": "^1.0.0",
"concat-map": "0.0.1"
@ -4080,17 +4088,20 @@
"code-point-at": {
"version": "1.1.0",
"bundled": true,
"dev": true
"dev": true,
"optional": true
},
"concat-map": {
"version": "0.0.1",
"bundled": true,
"dev": true
"dev": true,
"optional": true
},
"console-control-strings": {
"version": "1.1.0",
"bundled": true,
"dev": true
"dev": true,
"optional": true
},
"core-util-is": {
"version": "1.0.2",
@ -4207,7 +4218,8 @@
"inherits": {
"version": "2.0.3",
"bundled": true,
"dev": true
"dev": true,
"optional": true
},
"ini": {
"version": "1.3.5",
@ -4219,6 +4231,7 @@
"version": "1.0.0",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"number-is-nan": "^1.0.0"
}
@ -4233,6 +4246,7 @@
"version": "3.0.4",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"brace-expansion": "^1.1.7"
}
@ -4240,12 +4254,14 @@
"minimist": {
"version": "0.0.8",
"bundled": true,
"dev": true
"dev": true,
"optional": true
},
"minipass": {
"version": "2.2.4",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"safe-buffer": "^5.1.1",
"yallist": "^3.0.0"
@ -4264,6 +4280,7 @@
"version": "0.5.1",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"minimist": "0.0.8"
}
@ -4344,7 +4361,8 @@
"number-is-nan": {
"version": "1.0.1",
"bundled": true,
"dev": true
"dev": true,
"optional": true
},
"object-assign": {
"version": "4.1.1",
@ -4356,6 +4374,7 @@
"version": "1.4.0",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"wrappy": "1"
}
@ -4477,6 +4496,7 @@
"version": "1.0.2",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"code-point-at": "^1.0.0",
"is-fullwidth-code-point": "^1.0.0",
@ -7899,8 +7919,7 @@
"pako": {
"version": "1.0.6",
"resolved": "https://nexus.loafle.net/repository/npm-all/pako/-/pako-1.0.6.tgz",
"integrity": "sha512-lQe48YPsMJAig+yngZ87Lus+NF+3mtu7DVOBu6b/gHO1YpKwIj5AWjZ/TOS7i46HD/UixzWb1zeWDZfGZ3iYcg==",
"dev": true
"integrity": "sha512-lQe48YPsMJAig+yngZ87Lus+NF+3mtu7DVOBu6b/gHO1YpKwIj5AWjZ/TOS7i46HD/UixzWb1zeWDZfGZ3iYcg=="
},
"parallel-transform": {
"version": "1.1.0",

View File

@ -24,6 +24,7 @@
"@loafer/decorator": "^0.0.1",
"@ngrx/store": "^5.2.0",
"core-js": "^2.5.4",
"pako": "^1.0.6",
"rxjs": "^6.0.0",
"zone.js": "^0.8.26"
},
@ -40,6 +41,7 @@
"@types/jasmine": "~2.8.6",
"@types/jasminewd2": "~2.0.3",
"@types/node": "~8.9.4",
"@types/pako": "^1.0.0",
"codelyzer": "~4.2.1",
"jasmine-core": "~2.99.1",
"jasmine-spec-reporter": "~4.2.1",

View File

@ -8,6 +8,7 @@ import {
RPCClientResponseCodec,
RPCClientNotificationCodec,
} from '../protocol/RPCClientCodec';
import { RPCMessage } from '../core/type';
export interface RPCRequestState {
subject: Subject<any>;
@ -17,32 +18,37 @@ export interface RPCRequestState {
};
}
export abstract class RPCClient<T> {
private _requestID: number;
export abstract class RPCClient {
private requestID: number;
private _pendingRequestsCount: number;
private _pendingRequests: Map<number, RPCRequestState>;
private pendingRequestsCount: number;
private pendingRequests: Map<number, RPCRequestState>;
protected rpcClientCodec: RPCClientCodec;
protected rpcClientRWC: RPCClientRWC;
public constructor(
private _codec: RPCClientCodec,
private _rwc: RPCClientRWC<T>,
rpcClientCodec: RPCClientCodec,
rpcClientRWC: RPCClientRWC,
) {
this._requestID = 0;
this._pendingRequestsCount = 0;
this._pendingRequests = new Map();
this.rpcClientCodec = rpcClientCodec;
this.rpcClientRWC = rpcClientRWC;
this.requestID = 0;
this.pendingRequestsCount = 0;
this.pendingRequests = new Map();
}
private getRequestID(): number {
return ++this._requestID;
return ++this.requestID;
}
/**
* connect
*/
public connect(queryString?: string): void {
this._rwc.connect(queryString);
this._rwc.readResponse().subscribe(
(value: T) => {
this.rpcClientRWC.connect(queryString);
this.rpcClientRWC.readResponse().subscribe(
(value: RPCMessage) => {
this.onMessage(value);
},
(error: any) => {
@ -58,7 +64,7 @@ export abstract class RPCClient<T> {
* close
*/
public disconnect() {
this._rwc.disconnect();
this.rpcClientRWC.disconnect();
}
/**
@ -96,12 +102,11 @@ export abstract class RPCClient<T> {
params: args,
}
};
this._pendingRequests.set(id, reqState);
this._pendingRequestsCount++;
this.pendingRequests.set(id, reqState);
this.pendingRequestsCount++;
}
const req = this._codec.request(method, args, id);
this._rwc.writeRequest(req);
this.rpcClientRWC.writeRequest(this.rpcClientCodec.request(method, args, id));
if (undefined !== resSubject) {
return resSubject.asObservable();
@ -109,8 +114,8 @@ export abstract class RPCClient<T> {
return undefined;
}
private onMessage(message: Object): void {
const resCodec = this._codec.response(message);
private onMessage(message: RPCMessage): void {
const resCodec = this.rpcClientCodec.response(message);
if (resCodec.isNotification()) {
this.onNotification(resCodec.notification());
@ -124,10 +129,10 @@ export abstract class RPCClient<T> {
const result = resCodec.result();
const error = resCodec.error();
const reqState: RPCRequestState = this._pendingRequests.get(id);
const reqState: RPCRequestState = this.pendingRequests.get(id);
this._pendingRequests.delete(id);
this._pendingRequestsCount--;
this.pendingRequests.delete(id);
this.pendingRequestsCount--;
if (undefined !== error) {
const rpcClientError: RPCClientError = {

View File

@ -1,9 +1,11 @@
import { Observable } from 'rxjs';
import { RPCMessage } from '../core/type';
export interface RPCClientRWC<T> {
export interface RPCClientRWC {
connect(queryString?: string): void;
readResponse(): Observable<T>;
writeRequest(data: any): void;
readResponse(): Observable<RPCMessage>;
writeRequest(data: RPCMessage): void;
disconnect(): void;
connectionStatus(): Observable<boolean>;
}

View File

@ -6,15 +6,17 @@ import {
} from './RxWebsocketSubject';
import { RPCClientRWC } from '../../RPCClientRWC';
import { RPCMessage } from '../../../core/type';
export class RPCClientWebsocketRWC<T> implements RPCClientRWC<T> {
private _wsSocketSubject: RxWebsocketSubject<T>;
private _responseSubject: Subject<T>;
export class RPCClientWebsocketRWC implements RPCClientRWC {
private _wsSocketSubject: RxWebsocketSubject;
private _responseSubject: Subject<RPCMessage>;
public constructor(
private _config: RxWebsocketSubjectConfig,
) {
this._wsSocketSubject = new RxWebsocketSubject<T>(this._config);
this._wsSocketSubject = new RxWebsocketSubject(this._config);
}
public connect(queryString?: string): void {
@ -23,7 +25,7 @@ export class RPCClientWebsocketRWC<T> implements RPCClientRWC<T> {
}
this._wsSocketSubject.connect();
this._wsSocketSubject.subscribe(
(value: T) => {
(value: RPCMessage) => {
if (undefined !== this._responseSubject) {
this._responseSubject.next(value);
}
@ -47,14 +49,14 @@ export class RPCClientWebsocketRWC<T> implements RPCClientRWC<T> {
return this._wsSocketSubject.connectionStatus;
}
public readResponse(): Observable<T> {
public readResponse(): Observable<RPCMessage> {
if (undefined === this._responseSubject) {
this._responseSubject = new Subject<T>();
this._responseSubject = new Subject<RPCMessage>();
}
return this._responseSubject.asObservable();
}
public writeRequest(data: any): void {
public writeRequest(data: RPCMessage): void {
this._wsSocketSubject.write(data);
}
}

View File

@ -1,18 +1,38 @@
import { Observable, Observer, Subject, interval } from 'rxjs';
import { share, distinctUntilChanged, takeWhile } from 'rxjs/operators';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { RPCMessage } from '../../../core/type';
export interface RxWebsocketSubjectConfig {
url: string;
protocol?: string | Array<string>;
export interface RxWebsocketSubjectConfig<T = RPCMessage> extends WebSocketSubjectConfig<T> {
reconnectInterval?: 5000;
reconnectRetry?: 10;
compressionThreshold?: 1024;
}
export class RxWebsocketSubject<T> extends Subject<T> {
const DEFAULT_RX_WEBSOCKET_CONFIG: RxWebsocketSubjectConfig<any> = {
url: '',
deserializer: (e: MessageEvent) => e,
serializer: (value: any) => value,
openObserver: {
next: (e: Event) => {
this._connectionObserver.next(true);
}
},
closeObserver: {
next: (e: CloseEvent) => {
this._socket = null;
this._connectionObserver.next(false);
}
},
reconnectInterval: 5000,
reconnectRetry: 10,
compressionThreshold: 1024,
};
export class RxWebsocketSubject extends Subject<RPCMessage> {
private _reconnectionObservable: Observable<number>;
private _wsSubjectConfig: WebSocketSubjectConfig<T>;
private _socket: WebSocketSubject<any>;
private _wsSubjectConfig: RxWebsocketSubjectConfig;
private _socket: WebSocketSubject<RPCMessage>;
private _connectionObserver: Observer<boolean>;
private _connectionStatus: Observable<boolean>;
private _queryString: string;
@ -27,22 +47,37 @@ export class RxWebsocketSubject<T> extends Subject<T> {
distinctUntilChanged(),
);
this._wsSubjectConfig = {
url: _config.url,
protocol: _config.protocol,
serializer: (value: any) => value,
closeObserver: {
next: (e: CloseEvent) => {
this._socket = null;
this._connectionObserver.next(false);
this._wsSubjectConfig = this._config = { ...DEFAULT_RX_WEBSOCKET_CONFIG };
for (const key in _config) {
if (_config.hasOwnProperty(key)) {
switch (key) {
case 'openObserver':
const oo = _config[key];
this._wsSubjectConfig[key] = {
next: (e: Event) => {
this._connectionObserver.next(true);
oo.next(e);
}
};
break;
case 'closeObserver':
const co = _config[key];
this._wsSubjectConfig[key] = {
next: (e: CloseEvent) => {
this._socket = null;
this._connectionObserver.next(false);
co.next(e);
}
};
break;
default:
this._wsSubjectConfig[key] = _config[key];
break;
}
},
openObserver: {
next: (e: Event) => {
this._connectionObserver.next(true);
}
},
};
}
}
this._connectionStatus.subscribe((isConnected: boolean) => {
if (!this._reconnectionObservable && typeof(isConnected) === 'boolean' && !isConnected) {
@ -69,12 +104,12 @@ export class RxWebsocketSubject<T> extends Subject<T> {
wsSubjectConfig.url = wsSubjectConfig.url + '?' + this._queryString;
}
this._socket = new WebSocketSubject(wsSubjectConfig);
this._socket = new WebSocketSubject<RPCMessage>(wsSubjectConfig);
this._socket.subscribe(
(m) => {
(m: RPCMessage) => {
this.next(m);
},
(error: Event) => {
(e: Event) => {
if (!this._socket) {
this.reconnect();
}
@ -107,7 +142,7 @@ export class RxWebsocketSubject<T> extends Subject<T> {
);
}
public write(data: any): void {
public write(data: RPCMessage): void {
this._socket.next(data);
}
}

View File

@ -0,0 +1,33 @@
import { RPCMessage } from '../core/type';
export interface Codec {
encode(message: string): RPCMessage;
decode(message: RPCMessage): string;
}
export class DefaultCodec implements Codec {
public encode(message: string): RPCMessage {
return message;
}
public decode(message: RPCMessage): string {
return <string>message;
}
}
export const defaultCodec = new DefaultCodec();
export interface CodecSelector {
encode(message: string): RPCMessage;
decode(message: RPCMessage): string;
}
export class DefaultCodecSelector implements CodecSelector {
public encode(message: string): RPCMessage {
return defaultCodec.encode(message);
}
public decode(message: RPCMessage): string {
return defaultCodec.decode(message);
}
}
export const defaultCodecSelector = new DefaultCodecSelector();

View File

@ -0,0 +1,38 @@
import { Codec, CodecSelector, defaultCodec } from './codec';
import { RPCMessage } from '../core/type';
import { gzip, ungzip } from 'pako';
export class GZipCodec implements Codec {
public encode(message: string): RPCMessage {
return <ArrayBuffer>gzip(message).buffer;
}
public decode(message: RPCMessage): string {
return ungzip(Buffer.from(<ArrayBuffer>message), {to: 'string'});
}
}
export const gZipCodec = new GZipCodec();
export class CompressionCodecSelector implements CodecSelector {
private readonly threshold: number;
public constructor(threshold: number) {
this.threshold = threshold;
}
public encode(message: string): RPCMessage {
if (this.threshold < Buffer.byteLength(message)) {
return gZipCodec.encode(message);
} else {
return defaultCodec.encode(message);
}
}
public decode(message: RPCMessage): string {
if (message instanceof ArrayBuffer) {
return gZipCodec.decode(message);
} else {
return defaultCodec.decode(message);
}
}
}

View File

@ -0,0 +1 @@
export type RPCMessage = string | ArrayBuffer;

View File

@ -1,8 +1,9 @@
import { RPCError } from './RPCError';
import { RPCMessage } from '../core/type';
export interface RPCClientCodec {
request(method: string, args: any[], id: number): any;
response(res: any): RPCClientResponseCodec;
request(method: string, args: any[], id: number): RPCMessage;
response(message: RPCMessage): RPCClientResponseCodec;
}
export interface RPCClientResponseCodec {

View File

@ -7,6 +7,8 @@ import {
import {
RPCError,
} from '../RPCError';
import { RPCMessage } from '../../core/type';
import { CodecSelector, defaultCodecSelector } from '../../codec/codec';
export interface ClientNotification {
method: string;
@ -28,7 +30,13 @@ export interface ClientResponse {
}
export class JSONRPCClientCodec implements RPCClientCodec {
public request(method: string, args: any[], id?: number): any {
private readonly codecSelector: CodecSelector;
public constructor(codecSelector: CodecSelector = defaultCodecSelector) {
this.codecSelector = codecSelector;
}
public request(method: string, args: any[], id?: number): RPCMessage {
const params = convertParamsToStringArray(args);
const req: ClientRequest = {
@ -37,16 +45,12 @@ export class JSONRPCClientCodec implements RPCClientCodec {
params: 0 === params.length ? null : params,
id: id,
};
return JSON.stringify(req);
return this.codecSelector.encode(JSON.stringify(req));
}
public response(res: any): RPCClientResponseCodec {
const _res: ClientResponse = {
id: res.id,
jsonrpc: res.jsonrpc,
result: res.result,
error: res.error,
};
return new JSONRPCClientResponseCodec(_res);
public response(message: RPCMessage): RPCClientResponseCodec {
return new JSONRPCClientResponseCodec(this.codecSelector.decode(message));
}
}
@ -80,17 +84,20 @@ function convertParamsToStringArray(args: any[]): string[] | undefined {
}
export class JSONRPCClientResponseCodec implements RPCClientResponseCodec {
public constructor(private _res: ClientResponse) {
private res: ClientResponse;
public constructor(json: string) {
this.res = JSON.parse(json);
}
public id(): number | undefined {
return this._res.id;
return this.res.id;
}
public error(): RPCError | undefined {
return this._res.error;
return this.res.error;
}
public result(): any | undefined {
return this._res.result;
return this.res.result;
}
public isNotification(): boolean {
@ -104,22 +111,22 @@ export class JSONRPCClientResponseCodec implements RPCClientResponseCodec {
if (undefined !== this.id() || undefined === this.result()) {
return undefined;
}
const _noti: ClientNotification = {
method: this._res.result.method,
params: this._res.result.params,
const noti: ClientNotification = {
method: this.res.result.method,
params: this.res.result.params,
};
return new JSONRPCClientNotificationCodec(_noti);
return new JSONRPCClientNotificationCodec(noti);
}
}
export class JSONRPCClientNotificationCodec implements RPCClientNotificationCodec {
public constructor(private _noti: ClientNotification) {
public constructor(private noti: ClientNotification) {
}
public method(): string {
return this._noti.method;
return this.noti.method;
}
public params(): any[] | undefined {
return this._noti.params;
return this.noti.params;
}
}