This commit is contained in:
crusader 2018-03-13 20:59:39 +09:00
parent 61feaf3d6a
commit 1fc94ebd7d
8 changed files with 1 additions and 207 deletions

View File

@ -1,28 +0,0 @@
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { RPCRegistry } from '../registry/rpc-registry';
export class RPCClient {
private socketSubject: Subject<any>;
private rpcRegistry: RPCRegistry;
private pendingRequests: Map<number, Observable<any>>;
public constructor() {
}
public call<T>(method: string, ...args: any[]): Observable<T> {
return undefined;
}
public send(method: string, ...args: any[]): void {
}
private sendInternal(method: string, ...args: any[]): Observable<any> {
return undefined;
}
}

View File

@ -1,4 +0,0 @@
export interface ClientCodec {
encode(method: string, args: any[], id?: number);
}

View File

@ -1,3 +0,0 @@
export interface ClientNotification {
}

View File

@ -1,3 +0,0 @@
export interface ClientRequest {
id: number;
}

View File

@ -1,6 +0,0 @@
export interface ClientResponse {
jsonrpc: string;
id?: number;
result?: any;
error?: Error;
}

View File

@ -1,36 +0,0 @@
export class RPCRegistry {
private services: Map<string, Object>;
public constructor() {
this.services = new Map();
}
public registerService(receiver: Object, name?: string): void {
if (undefined === name) {
name = receiver.constructor.name;
}
this.services.set(name, receiver);
}
public getService(name: string): any | undefined {
return this.services.get(name);
}
public hasMethod(method: string): boolean {
const names = method.split('.');
if (undefined === names || 2 !== names.length) {
throw new Error();
}
const serviceName = names[0];
const methodName = names[1];
const service = this.services.get(serviceName);
if (undefined === service || !service.hasOwnProperty(methodName)) {
return false;
}
return true;
}
}

View File

@ -1,122 +0,0 @@
import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
import { Subject } from 'rxjs/Subject';
import {
WebSocketSubject,
WebSocketSubjectConfig
} from 'rxjs/observable/dom/WebSocketSubject';
import 'rxjs/add/operator/distinctUntilChanged';
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/takeWhile';
import 'rxjs/add/observable/interval';
export interface Codec {
decode(e: MessageEvent): any;
encode(data: any): string;
}
export const defaultCodec: Codec = {
decode: (e: MessageEvent) => {
return JSON.parse(e.data);
},
encode: (data: any): string => {
return JSON.stringify(data);
}
};
export class RxWebsocketSubject<T> extends Subject<T> {
private reconnectionObservable: Observable<number>;
private wsSubjectConfig: WebSocketSubjectConfig;
private socket: WebSocketSubject<any>;
private connectionObserver: Observer<boolean>;
private _connectionStatus: Observable<boolean>;
private _reconnectInterval = 5000;
private _reconnectAttempts = 10;
public constructor(private url: string, private codec?: Codec) {
super();
this._connectionStatus = new Observable<boolean>((observer) => {
this.connectionObserver = observer;
}).share().distinctUntilChanged();
if (undefined === codec) {
this.codec = defaultCodec;
}
this.wsSubjectConfig = {
url: url,
closeObserver: {
next: (e: CloseEvent) => {
this.socket = null;
this.connectionObserver.next(false);
}
},
openObserver: {
next: (e: Event) => {
this.connectionObserver.next(true);
}
},
// resultSelector: this.codec.decode,
};
this._connectionStatus.subscribe((isConnected: boolean) => {
if (!this.reconnectionObservable && typeof(isConnected) === 'boolean' && !isConnected) {
this.reconnect();
}
});
}
public set reconnectInterval(reconnectInterval: number) {
this._reconnectInterval = reconnectInterval;
}
public set reconnectAttempts(reconnectAttempts: number) {
this._reconnectAttempts = reconnectAttempts;
}
public get connectionStatus(): Observable<boolean> {
return this._connectionStatus;
}
public connect(): void {
this.socket = new WebSocketSubject(this.wsSubjectConfig);
this.socket.subscribe(
(m) => {
this.next(m);
},
(error: Event) => {
if (!this.socket) {
this.reconnect();
}
}
);
}
private reconnect(): void {
this.reconnectionObservable = Observable.interval(this._reconnectInterval)
.takeWhile((v, index) => {
return index < this._reconnectAttempts && !this.socket;
});
this.reconnectionObservable.subscribe(
() => {
this.connect();
},
null,
() => {
this.reconnectionObservable = null;
if (!this.socket) {
this.complete();
this.connectionObserver.complete();
}
}
);
}
public send(data: any): void {
const s = this.codec.encode(data);
this.socket.next(this.codec.encode(data));
}
}

View File

@ -37,11 +37,7 @@ export class RESTClient {
}): Observable<T> {
return this._httpClient
.request<T>(method, Location.joinWithSlash(this._baseURL, entry), options)
.map(
(response: T) => {
return response;
}
)
.map(response => response)
.catch((error: HttpErrorResponse) => {
const aryMsg = error.error.message.split('|');
const resError: RESTError = {