import { Observable, Subject } from 'rxjs'; import { Message } from '../core/type'; import { ClientCodec, ClientNotificationCodec, ClientResponseCodec } from '../protocol/client_codec'; import { RPCClientError } from '../protocol/error'; import { ClientRWC } from './client_rwc'; export interface RequestState { subject: Subject<any>; request: { method: string; params: any[] | null; }; } export abstract class Client { private requestID: number; private pendingRequestsCount: number; private pendingRequests: Map<number, RequestState>; private notiSubject: Subject<ClientNotificationCodec> | undefined; protected clientCodec: ClientCodec; protected clientRWC: ClientRWC; public constructor( clientCodec: ClientCodec, clientRWC: ClientRWC, ) { this.clientCodec = clientCodec; this.clientRWC = clientRWC; this.requestID = 0; this.pendingRequestsCount = 0; this.pendingRequests = new Map(); } private getRequestID(): number { return ++this.requestID; } public getPendingRequestsCount(): number { return this.pendingRequestsCount; } /** * connect */ public connect(queryString?: string): void { this.clientRWC.connect(queryString); this.clientRWC.read().subscribe( (value: Message) => { this.onMessage(value); }, (error: any) => { console.error(error); }, () => { console.log(''); }, ); } /** * close */ public disconnect() { this.clientRWC.disconnect(); } /** * notify */ public send(method: string, ...args: any[]): void { this.sendInternal(false, method, args); } /** * call */ public call<T>(method: string, ...args: any[]): Observable<T> { const o = this.sendInternal<T>(true, method, args); if (undefined === o) { throw new Error('Error'); } return o; } /** * callTimeout */ public callTimeout<T>(ms: number, method: string, ...args: any[]): Observable<T> | undefined { return undefined; } private sendInternal<T>(hasResponse: boolean, method: string, args?: any[]): Observable<T> | undefined { let id = -1; let resSubject: Subject<T> | undefined; const params = undefined === args ? null : args; if (hasResponse) { id = this.getRequestID(); resSubject = new Subject<T>(); const reqState: RequestState = { subject: resSubject, request: { method, params, }, }; this.pendingRequests.set(id, reqState); this.pendingRequestsCount++; } this.clientRWC.write(this.clientCodec.request(method, params, id)); if (undefined !== resSubject) { return resSubject.asObservable(); } return undefined; } private onMessage(message: Message): void { const resCodec = this.clientCodec.response(message); if (resCodec.isNotification()) { const notiCodec = resCodec.notification(); if (undefined !== notiCodec) { if (undefined !== this.notiSubject) { this.notiSubject.next(resCodec.notification()); } } } else { this.onResponse(resCodec); } } protected onResponse(resCodec: ClientResponseCodec): void { const id = resCodec.id(); const result = resCodec.result(); const error = resCodec.error(); if (undefined === id) { throw new Error(`This is not response because does not have ID. ${resCodec}`); } const reqState = this.pendingRequests.get(id); if (undefined === reqState) { console.log(`The message does not exist. ${resCodec}`); return; } this.pendingRequests.delete(id); this.pendingRequestsCount--; if (undefined !== error) { const rpcClientError: RPCClientError = { request: reqState.request, response: error, }; console.error(rpcClientError); reqState.subject.error(rpcClientError); } else { reqState.subject.next(result); } } public notification(): Observable<ClientNotificationCodec> { if (undefined === this.notiSubject) { this.notiSubject = new Subject<ClientNotificationCodec>(); } return this.notiSubject.asObservable(); } }