This commit is contained in:
crusader 2018-09-05 15:52:55 +09:00
parent de5e2d9a3d
commit d27b8b249f
4 changed files with 53 additions and 33 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@overflow/rpc-js", "name": "@overflow/rpc-js",
"version": "0.0.4", "version": "0.0.5",
"description": "TypeScript library setup for multiple compilation targets using tsc and webpack", "description": "TypeScript library setup for multiple compilation targets using tsc and webpack",
"main": "./bundles/index.umd.js", "main": "./bundles/index.umd.js",
"module": "./esm5/index.js", "module": "./esm5/index.js",

View File

@ -4,7 +4,7 @@ import { Message } from '../core/type';
export interface ClientRWC { export interface ClientRWC {
connect(queryString?: string): void; connect(queryString?: string): void;
disconnect(): void; disconnect(): void;
connectionStatus(): Observable<boolean> | undefined; connectionStatus(): Observable<boolean>;
read(): Observable<Message>; read(): Observable<Message>;
write(data: Message): void; write(data: Message): void;

View File

@ -17,7 +17,7 @@ export interface WebSocketClientRWCConfig {
} }
export class WebSocketClientRWC implements ClientRWC { export class WebSocketClientRWC implements ClientRWC {
private wsSubject: RxWebSocketSubject<Message> | undefined; private wsSubject: RxWebSocketSubject<Message>;
private _rxConfig: RxWebSocketSubjectConfig<Message>; private _rxConfig: RxWebSocketSubjectConfig<Message>;
private resSubject: Subject<Message> | undefined; private resSubject: Subject<Message> | undefined;
@ -29,15 +29,8 @@ export class WebSocketClientRWC implements ClientRWC {
this._rxConfig.serializer = (value: Message) => value; this._rxConfig.serializer = (value: Message) => value;
this._rxConfig.deserializer = (e: MessageEvent) => e.data; this._rxConfig.deserializer = (e: MessageEvent) => e.data;
this._rxConfig.binaryType = 'arraybuffer'; this._rxConfig.binaryType = 'arraybuffer';
}
public connect(queryString?: string): void {
if (undefined === this.wsSubject) {
if (undefined !== queryString) {
this._rxConfig.queryString = queryString;
}
this.wsSubject = new RxWebSocketSubject(this._rxConfig); this.wsSubject = new RxWebSocketSubject(this._rxConfig);
}
this.wsSubject.subscribe( this.wsSubject.subscribe(
(value: Message) => { (value: Message) => {
if (undefined !== this.resSubject) { if (undefined !== this.resSubject) {
@ -50,25 +43,39 @@ export class WebSocketClientRWC implements ClientRWC {
} }
}, },
() => { () => {
console.log('sss'); console.log('Cannot connect to server');
}, },
); );
} }
public connect(queryString?: string): void {
if (undefined !== queryString) {
this.wsSubject.rxConfig.queryString = queryString;
}
if (this.wsSubject.isConnected()) {
console.log('connected already');
return;
}
this.wsSubject.connect();
}
public disconnect(): void { public disconnect(): void {
if (undefined !== this.wsSubject) { if (!this.wsSubject.isConnected()) {
console.log('not connected');
return;
}
this.wsSubject.disconnect(); this.wsSubject.disconnect();
} }
}
public connectionStatus(): Observable<boolean> | undefined { public connectionStatus(): Observable<boolean> {
if (undefined !== this.wsSubject) {
return this.wsSubject.connectionStatus; return this.wsSubject.connectionStatus;
} }
return undefined;
}
public read(): Observable<Message> { public read(): Observable<Message> {
if (undefined === this.resSubject) { if (undefined === this.resSubject) {
this.resSubject = new Subject<Message>(); this.resSubject = new Subject<Message>();
@ -78,8 +85,6 @@ export class WebSocketClientRWC implements ClientRWC {
} }
public write(data: Message): void { public write(data: Message): void {
if (undefined !== this.wsSubject) {
this.wsSubject.send(data); this.wsSubject.send(data);
} }
} }
}

View File

@ -17,19 +17,26 @@ export class RxWebSocketSubject<T> extends Subject<T> {
private connectionObserver: Observer<boolean> | undefined; private connectionObserver: Observer<boolean> | undefined;
public connectionStatus: Observable<boolean>; public connectionStatus: Observable<boolean>;
public rxConfig: RxWebSocketSubjectConfig<T>;
private readonly reconnectInterval: number; private readonly reconnectInterval: number;
private readonly reconnectRetry: number; private readonly reconnectRetry: number;
constructor(private _rxConfig: RxWebSocketSubjectConfig<T>) { private _isConnected: boolean;
constructor(_rxConfig: RxWebSocketSubjectConfig<T>) {
super(); super();
this.rxConfig = _rxConfig;
this._isConnected = false;
this.reconnectInterval = this.reconnectInterval =
undefined !== this._rxConfig.reconnectInterval undefined !== this.rxConfig.reconnectInterval
? this._rxConfig.reconnectInterval ? this.rxConfig.reconnectInterval
: 5000; : 5000;
this.reconnectRetry = this.reconnectRetry =
undefined !== this._rxConfig.reconnectRetry undefined !== this.rxConfig.reconnectRetry
? this._rxConfig.reconnectRetry ? this.rxConfig.reconnectRetry
: 10; : 10;
this.connectionStatus = new Observable((observer: Observer<boolean>) => { this.connectionStatus = new Observable((observer: Observer<boolean>) => {
@ -39,9 +46,10 @@ export class RxWebSocketSubject<T> extends Subject<T> {
distinctUntilChanged(), distinctUntilChanged(),
); );
this.wsSubjectConfig = Object.assign({}, this._rxConfig); this.wsSubjectConfig = Object.assign({}, this.rxConfig);
this.wsSubjectConfig.closeObserver = { this.wsSubjectConfig.closeObserver = {
next: (_e: CloseEvent) => { next: (_e: CloseEvent) => {
this._isConnected = false;
this.socket = undefined; this.socket = undefined;
if (undefined !== this.connectionObserver) { if (undefined !== this.connectionObserver) {
this.connectionObserver.next(false); this.connectionObserver.next(false);
@ -51,12 +59,12 @@ export class RxWebSocketSubject<T> extends Subject<T> {
this.wsSubjectConfig.openObserver = { this.wsSubjectConfig.openObserver = {
next: (_e: Event) => { next: (_e: Event) => {
if (undefined !== this.connectionObserver) { if (undefined !== this.connectionObserver) {
this._isConnected = true;
this.connectionObserver.next(true); this.connectionObserver.next(true);
} }
}, },
}; };
this.connect();
this.connectionStatus.subscribe(isConnected => { this.connectionStatus.subscribe(isConnected => {
if ( if (
!this.reconnectionObservable && !this.reconnectionObservable &&
@ -67,11 +75,11 @@ export class RxWebSocketSubject<T> extends Subject<T> {
}); });
} }
private connect(): void { public connect(): void {
this.wsSubjectConfig.url = this.wsSubjectConfig.url =
undefined !== this._rxConfig.queryString undefined !== this.rxConfig.queryString
? urljoin(this._rxConfig.url, this._rxConfig.queryString) ? urljoin(this.rxConfig.url, this.rxConfig.queryString)
: this._rxConfig.url; : this.rxConfig.url;
this.socket = new WebSocketSubject(this.wsSubjectConfig); this.socket = new WebSocketSubject(this.wsSubjectConfig);
this.socket.subscribe( this.socket.subscribe(
m => { m => {
@ -85,6 +93,13 @@ export class RxWebSocketSubject<T> extends Subject<T> {
); );
} }
/**
* isConnected
*/
public isConnected(): boolean {
return this._isConnected;
}
private reconnect(): void { private reconnect(): void {
this.reconnectionObservable = interval(this.reconnectInterval).pipe( this.reconnectionObservable = interval(this.reconnectInterval).pipe(
takeWhile((_v, index) => { takeWhile((_v, index) => {