From 12dce5e7dbf869d684d22ed3d0bbe8cb57fe6dd6 Mon Sep 17 00:00:00 2001 From: crusader Date: Thu, 1 Feb 2018 22:01:52 +0900 Subject: [PATCH] ing --- .../commons/api/service/api.service.spec.ts | 6 + src/app/commons/api/service/api.service.ts | 50 +++++--- .../commons/core/rpc/protocol/client-codec.ts | 4 + .../rpc/protocol/json/client-notification.ts | 3 + .../core/rpc/protocol/json/client-request.ts | 3 + .../core/rpc/protocol/json/client-response.ts | 6 + .../commons/core/rpc/registry/rpc-registry.ts | 36 ++++++ .../core/rx/websocket/rx-websocket-subject.ts | 121 ++++++++++++++++++ 8 files changed, 213 insertions(+), 16 deletions(-) create mode 100644 src/app/commons/core/rpc/protocol/client-codec.ts create mode 100644 src/app/commons/core/rpc/protocol/json/client-notification.ts create mode 100644 src/app/commons/core/rpc/protocol/json/client-request.ts create mode 100644 src/app/commons/core/rpc/protocol/json/client-response.ts create mode 100644 src/app/commons/core/rpc/registry/rpc-registry.ts create mode 100644 src/app/commons/core/rx/websocket/rx-websocket-subject.ts diff --git a/src/app/commons/api/service/api.service.spec.ts b/src/app/commons/api/service/api.service.spec.ts index eabfdd4..9b2fbe6 100644 --- a/src/app/commons/api/service/api.service.spec.ts +++ b/src/app/commons/api/service/api.service.spec.ts @@ -12,4 +12,10 @@ describe('APIService', () => { it('should be created', inject([APIService], (service: APIService) => { expect(service).toBeTruthy(); })); + + it('should be created', inject([APIService], (service: APIService) => { + + expect(service).toBeTruthy(); + })); + }); diff --git a/src/app/commons/api/service/api.service.ts b/src/app/commons/api/service/api.service.ts index 0f4e754..271952e 100644 --- a/src/app/commons/api/service/api.service.ts +++ b/src/app/commons/api/service/api.service.ts @@ -1,27 +1,45 @@ import { Injectable } from '@angular/core'; +import { RxWebsocketSubject } from 'app/commons/core/rx/websocket/rx-websocket-subject'; import { Observable } from 'rxjs/Observable'; -import { Observer } from 'rxjs/Observer'; -import { Subject } from 'rxjs/Subject'; -import { Subscription } from 'rxjs/Subscription'; -import { WebSocketSubject } from 'rxjs/observable/dom/WebSocketSubject'; @Injectable() export class APIService { - private socketSubject: WebSocketSubject; - private socket: Subscription; - private apiURL: string; + private wsSocketSubject: RxWebsocketSubject; - constructor() { } - - public connect(): void { - this.socketSubject = WebSocketSubject.create(this.apiURL); - this.socketSubject.subscribe({ - next: (data: MessageEvent) => { - - } - }); + constructor() { + this.wsSocketSubject = new RxWebsocketSubject(''); } + public connect(): void { + this.wsSocketSubject.connect(); + this.wsSocketSubject.subscribe( + (value: Object) => { + this.onMessage(value); + }, + (error: any) => { + this.onError(error); + }, + () => { + + } + ); + } + + public getConnectionStatus(): Observable { + return this.wsSocketSubject.connectionStatus; + } + + private onMessage(message: Object): void { + // + } + + private onError(error: any): void { + // + } + + private onDisconnected(): void { + // + } } diff --git a/src/app/commons/core/rpc/protocol/client-codec.ts b/src/app/commons/core/rpc/protocol/client-codec.ts new file mode 100644 index 0000000..52108df --- /dev/null +++ b/src/app/commons/core/rpc/protocol/client-codec.ts @@ -0,0 +1,4 @@ +export interface ClientCodec { + writeRequest(method: string, args: any[], id?: number); + +} diff --git a/src/app/commons/core/rpc/protocol/json/client-notification.ts b/src/app/commons/core/rpc/protocol/json/client-notification.ts new file mode 100644 index 0000000..c8be96e --- /dev/null +++ b/src/app/commons/core/rpc/protocol/json/client-notification.ts @@ -0,0 +1,3 @@ +export interface ClientNotification { + +} diff --git a/src/app/commons/core/rpc/protocol/json/client-request.ts b/src/app/commons/core/rpc/protocol/json/client-request.ts new file mode 100644 index 0000000..ac76013 --- /dev/null +++ b/src/app/commons/core/rpc/protocol/json/client-request.ts @@ -0,0 +1,3 @@ +export interface ClientRequest { + id: number; +} diff --git a/src/app/commons/core/rpc/protocol/json/client-response.ts b/src/app/commons/core/rpc/protocol/json/client-response.ts new file mode 100644 index 0000000..c857c4f --- /dev/null +++ b/src/app/commons/core/rpc/protocol/json/client-response.ts @@ -0,0 +1,6 @@ +export interface ClientResponse { + jsonrpc: string; + id?: number; + result?: any; + error?: Error; +} diff --git a/src/app/commons/core/rpc/registry/rpc-registry.ts b/src/app/commons/core/rpc/registry/rpc-registry.ts new file mode 100644 index 0000000..a1d4a68 --- /dev/null +++ b/src/app/commons/core/rpc/registry/rpc-registry.ts @@ -0,0 +1,36 @@ +export class RPCRegistry { + private services: Map; + + public constructor() { + this.services = new Map(); + } + + public registerService(receiver: Object, name?: string): void { + if (undefined === name) { + name = receiver.constructor.name; + } + this.services.set(name, receiver); + } + + public getService(name: string): any | undefined { + return this.services.get(name); + } + + public hasMethod(method: string): boolean { + const names = method.split('.'); + if (undefined === names || 2 !== names.length) { + throw new Error(); + } + const serviceName = names[0]; + const methodName = names[1]; + + const service = this.services.get(serviceName); + if (undefined === service || !service.hasOwnProperty(methodName)) { + return false; + } + + return true; + } + + +} diff --git a/src/app/commons/core/rx/websocket/rx-websocket-subject.ts b/src/app/commons/core/rx/websocket/rx-websocket-subject.ts new file mode 100644 index 0000000..1ea93d5 --- /dev/null +++ b/src/app/commons/core/rx/websocket/rx-websocket-subject.ts @@ -0,0 +1,121 @@ +import { Observable } from 'rxjs/Observable'; +import { Observer } from 'rxjs/Observer'; +import { Subject } from 'rxjs/Subject'; +import { + WebSocketSubject, + WebSocketSubjectConfig +} from 'rxjs/observable/dom/WebSocketSubject'; + +import 'rxjs/add/operator/distinctUntilChanged'; +import 'rxjs/add/operator/interval'; +import 'rxjs/add/operator/share'; +import 'rxjs/add/operator/takeWhile'; +import 'rxjs/add/observable/interval'; + +export interface Codec { + decode(e: MessageEvent): any; + encode(data: any): string; +} + +export const defaultCodec: Codec = { + encode: (e: MessageEvent) => { + return JSON.parse(e.data); + }, + decode: (data: any): string => { + return JSON.stringify(data); + } +}; + +export class RxWebsocketSubject extends Subject { + private reconnectionObservable: Observable; + private wsSubjectConfig: WebSocketSubjectConfig; + private socket: WebSocketSubject; + private connectionObserver: Observer; + private _connectionStatus: Observable; + + private _reconnectInterval = 5000; + private _reconnectAttempts = 10; + + public constructor(private url: string, private codec?: Codec) { + super(); + + this._connectionStatus = new Observable((observer) => { + this.connectionObserver = observer; + }).share().distinctUntilChanged(); + + if (undefined === codec) { + this.codec = defaultCodec; + } + + this.wsSubjectConfig = { + url: url, + closeObserver: { + next: (e: CloseEvent) => { + this.socket = null; + this.connectionObserver.next(false); + } + }, + openObserver: { + next: (e: Event) => { + this.connectionObserver.next(true); + } + }, + resultSelector: this.codec.decode, + }; + + this._connectionStatus.subscribe((isConnected: boolean) => { + if (!this.reconnectionObservable && typeof(isConnected) === 'boolean' && !isConnected) { + this.reconnect(); + } + }); + } + + public set reconnectInterval(reconnectInterval: number) { + this._reconnectInterval = reconnectInterval; + } + + public set reconnectAttempts(reconnectAttempts: number) { + this._reconnectAttempts = reconnectAttempts; + } + + public get connectionStatus(): Observable { + return this._connectionStatus; + } + + public connect(): void { + this.socket = new WebSocketSubject(this.wsSubjectConfig); + this.socket.subscribe( + (m) => { + this.next(m); + }, + (error: Event) => { + if (!this.socket) { + this.reconnect(); + } + } + ); + } + + private reconnect(): void { + this.reconnectionObservable = Observable.interval(this._reconnectInterval) + .takeWhile((v, index) => { + return index < this._reconnectAttempts && !this.socket; + }); + this.reconnectionObservable.subscribe( + () => { + this.connect(); + }, + null, + () => { + this.reconnectionObservable = null; + if (!this.socket) { + this.complete(); + this.connectionObserver.complete(); + } + }); + } + + public send(data: any): void { + this.socket.next(this.codec.encode(data)); + } +}