This commit is contained in:
crusader 2018-02-01 22:01:52 +09:00
parent d962b01763
commit 12dce5e7db
8 changed files with 213 additions and 16 deletions

View File

@ -12,4 +12,10 @@ describe('APIService', () => {
it('should be created', inject([APIService], (service: APIService) => {
expect(service).toBeTruthy();
}));
it('should be created', inject([APIService], (service: APIService) => {
expect(service).toBeTruthy();
}));
});

View File

@ -1,27 +1,45 @@
import { Injectable } from '@angular/core';
import { RxWebsocketSubject } from 'app/commons/core/rx/websocket/rx-websocket-subject';
import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
import { Subject } from 'rxjs/Subject';
import { Subscription } from 'rxjs/Subscription';
import { WebSocketSubject } from 'rxjs/observable/dom/WebSocketSubject';
@Injectable()
export class APIService {
private socketSubject: WebSocketSubject<Object>;
private socket: Subscription;
private apiURL: string;
private wsSocketSubject: RxWebsocketSubject<Object>;
constructor() { }
constructor() {
this.wsSocketSubject = new RxWebsocketSubject<Object>('');
}
public connect(): void {
this.socketSubject = WebSocketSubject.create(this.apiURL);
this.socketSubject.subscribe({
next: (data: MessageEvent) => {
this.wsSocketSubject.connect();
this.wsSocketSubject.subscribe(
(value: Object) => {
this.onMessage(value);
},
(error: any) => {
this.onError(error);
},
() => {
}
});
);
}
public getConnectionStatus(): Observable<boolean> {
return this.wsSocketSubject.connectionStatus;
}
private onMessage(message: Object): void {
//
}
private onError(error: any): void {
//
}
private onDisconnected(): void {
//
}
}

View File

@ -0,0 +1,4 @@
export interface ClientCodec {
writeRequest(method: string, args: any[], id?: number);
}

View File

@ -0,0 +1,3 @@
export interface ClientNotification {
}

View File

@ -0,0 +1,3 @@
export interface ClientRequest {
id: number;
}

View File

@ -0,0 +1,6 @@
export interface ClientResponse {
jsonrpc: string;
id?: number;
result?: any;
error?: Error;
}

View File

@ -0,0 +1,36 @@
export class RPCRegistry {
private services: Map<string, Object>;
public constructor() {
this.services = new Map();
}
public registerService(receiver: Object, name?: string): void {
if (undefined === name) {
name = receiver.constructor.name;
}
this.services.set(name, receiver);
}
public getService(name: string): any | undefined {
return this.services.get(name);
}
public hasMethod(method: string): boolean {
const names = method.split('.');
if (undefined === names || 2 !== names.length) {
throw new Error();
}
const serviceName = names[0];
const methodName = names[1];
const service = this.services.get(serviceName);
if (undefined === service || !service.hasOwnProperty(methodName)) {
return false;
}
return true;
}
}

View File

@ -0,0 +1,121 @@
import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
import { Subject } from 'rxjs/Subject';
import {
WebSocketSubject,
WebSocketSubjectConfig
} from 'rxjs/observable/dom/WebSocketSubject';
import 'rxjs/add/operator/distinctUntilChanged';
import 'rxjs/add/operator/interval';
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/takeWhile';
import 'rxjs/add/observable/interval';
export interface Codec {
decode(e: MessageEvent): any;
encode(data: any): string;
}
export const defaultCodec: Codec = {
encode: (e: MessageEvent) => {
return JSON.parse(e.data);
},
decode: (data: any): string => {
return JSON.stringify(data);
}
};
export class RxWebsocketSubject<T> extends Subject<T> {
private reconnectionObservable: Observable<number>;
private wsSubjectConfig: WebSocketSubjectConfig;
private socket: WebSocketSubject<any>;
private connectionObserver: Observer<boolean>;
private _connectionStatus: Observable<boolean>;
private _reconnectInterval = 5000;
private _reconnectAttempts = 10;
public constructor(private url: string, private codec?: Codec) {
super();
this._connectionStatus = new Observable<boolean>((observer) => {
this.connectionObserver = observer;
}).share().distinctUntilChanged();
if (undefined === codec) {
this.codec = defaultCodec;
}
this.wsSubjectConfig = {
url: url,
closeObserver: {
next: (e: CloseEvent) => {
this.socket = null;
this.connectionObserver.next(false);
}
},
openObserver: {
next: (e: Event) => {
this.connectionObserver.next(true);
}
},
resultSelector: this.codec.decode,
};
this._connectionStatus.subscribe((isConnected: boolean) => {
if (!this.reconnectionObservable && typeof(isConnected) === 'boolean' && !isConnected) {
this.reconnect();
}
});
}
public set reconnectInterval(reconnectInterval: number) {
this._reconnectInterval = reconnectInterval;
}
public set reconnectAttempts(reconnectAttempts: number) {
this._reconnectAttempts = reconnectAttempts;
}
public get connectionStatus(): Observable<boolean> {
return this._connectionStatus;
}
public connect(): void {
this.socket = new WebSocketSubject(this.wsSubjectConfig);
this.socket.subscribe(
(m) => {
this.next(m);
},
(error: Event) => {
if (!this.socket) {
this.reconnect();
}
}
);
}
private reconnect(): void {
this.reconnectionObservable = Observable.interval(this._reconnectInterval)
.takeWhile((v, index) => {
return index < this._reconnectAttempts && !this.socket;
});
this.reconnectionObservable.subscribe(
() => {
this.connect();
},
null,
() => {
this.reconnectionObservable = null;
if (!this.socket) {
this.complete();
this.connectionObserver.complete();
}
});
}
public send(data: any): void {
this.socket.next(this.codec.encode(data));
}
}