import { Injectable, Inject } from '@angular/core'; import { Subscription, Observable, Subject } from 'rxjs'; import { share, switchMap, retryWhen, delay } from 'rxjs/operators'; import { QueueingSubject } from 'queueing-subject'; import { makeWebSocketObservable, GetWebSocketResponses, NormalClosureMessage } from '@ucap-webmessenger/web-socket'; import { _MODULE_CONFIG } from '../types/token'; import { ModuleConfig } from '../types/module-config'; import { PacketBody } from '../models/packet'; import { PacketBodyValueDivider, PacketBodyDivider } from '../types/packet-body-divider'; import { PacketBodyValue } from '../types/packet-body-value.type'; import { SSVC_TYPE_ERROR_RES, ServerErrorCode } from '../types/service'; interface RequestState { subject: Subject; request: { serviceType: number; subServiceType: number; bodyList: PacketBody[]; }; } export interface ServerResponse { serviceType: number; subServiceType: number; senderSeq: number; bodyList: any[]; } @Injectable({ providedIn: 'root' }) export class ProtocolService { // tslint:disable-next-line: variable-name private _requestId: number | null = null; private readonly pendingRequests: Map; private readonly input$: QueueingSubject; private socket$: Observable>; private messages$: Observable; private messagesSubscription: Subscription | null = null; constructor(@Inject(_MODULE_CONFIG) private moduleConfig: ModuleConfig) { this.pendingRequests = new Map(); this.input$ = new QueueingSubject(); } public connect(serverIp: string | null = null): void { this.socket$ = makeWebSocketObservable( this.moduleConfig.urls.base + serverIp ? serverIp : '' ); this.messages$ = this.socket$.pipe( switchMap(getResponses => getResponses(this.input$)), retryWhen(errors => errors.pipe(delay(this.moduleConfig.reconnect.delay)) ), share() ); this.messagesSubscription = this.messages$.subscribe( (message: string) => { const arg = message.split(PacketBodyDivider); if (2 > arg.length) { // OnError(3); return; } const res = this.decodePacket(arg); let requestState: RequestState | null = null; if (res.requestId) { requestState = this.pendingRequests.get(res.requestId); this.pendingRequests.delete(res.requestId); } if (SSVC_TYPE_ERROR_RES === res.response.subServiceType) { const errorCode: ServerErrorCode = res.response .bodyList[0] as ServerErrorCode; if (requestState) { requestState.subject.error(errorCode); } return; } if (requestState) { requestState.subject.next(res.response); } }, (error: Error) => { const { message } = error; if (message === NormalClosureMessage) { console.log('server closed the websocket connection normally'); } else { console.log('socket was disconnected due to error:', message); } }, () => { // The clean termination only happens in response to the last // subscription to the observable being unsubscribed, any // other closure is considered an error. console.log('the connection was closed in response to the user'); } ); } public disconnect(): void { this.messagesSubscription.unsubscribe(); } public call( serviceType: number | null, subServiceType: number | null, ...bodyList: PacketBody[] ): Observable { return this.sendInternal(true, serviceType, subServiceType, bodyList); } public send( serviceType: number, subServiceType: number, ...bodyList: PacketBody[] ): void { this.sendInternal(false, serviceType, subServiceType, bodyList); } private sendInternal( hasResponse: boolean, serviceType: number, subServiceType: number, bodyList: PacketBody[] ): Observable | undefined { let packet: string; let responseSubject: Subject | null = null; if (hasResponse) { const requestId = this.requestId; packet = this.encodePacket(serviceType, subServiceType, [ ...bodyList, { type: PacketBodyValue.RequestId, value: requestId } ]); responseSubject = new Subject(); this.pendingRequests.set(requestId, { subject: responseSubject, request: { serviceType, subServiceType, bodyList } }); } else { packet = this.encodePacket(serviceType, subServiceType, bodyList); } this.input$.next(packet); return responseSubject ? responseSubject.asObservable() : undefined; } private get requestId() { if ( null === this._requestId || this.moduleConfig.requestId.max < this._requestId ) { this._requestId = this.moduleConfig.requestId.min; } return this._requestId; } private decodePacket( arg: string[] ): { requestId: number; response: ServerResponse } | null { const cmdArg = arg[0].split(PacketBodyValueDivider); if (2 > cmdArg.length) { // OnError(3); return null; } const serviceType = Number(cmdArg[0]); const subServiceType = Number(cmdArg[1]); const seqArg = arg[1].split(PacketBodyValueDivider); if (2 > seqArg.length) { // OnError(3); return null; } const senderSeq = Number(seqArg[0]); const bodyList: any[] | null = null; let requestId: number | null = null; for (let i = 2; i < arg.length; i++) { const bodyArg = arg[i].split(PacketBodyValueDivider); if (2 > bodyArg.length) { // OnError(3); return null; } const valueType: PacketBodyValue = bodyArg[0] as PacketBodyValue; const value = bodyArg[0]; switch (valueType) { case PacketBodyValue.None: bodyList.push(value); break; case PacketBodyValue.Binary: bodyList.push(value); break; case PacketBodyValue.Integer: bodyList.push(Number(value)); break; case PacketBodyValue.String: bodyList.push(value); break; case PacketBodyValue.EncodedString: bodyList.push(value); break; case PacketBodyValue.MultiByteString: bodyList.push(value); break; case PacketBodyValue.WideString: bodyList.push(value); break; case PacketBodyValue.IuId: bodyList.push(value); break; case PacketBodyValue.RequestId: requestId = parseInt(value, 10); break; default: break; } } return { requestId, response: { serviceType, subServiceType, senderSeq, bodyList } }; } private encodePacket( serviceType: number | null, subServiceType: number | null, bodyList: PacketBody[] = [] ): string { const packet: string[] = []; packet.push(serviceType ? serviceType.toString() : '0'); packet.push(PacketBodyValueDivider); packet.push(subServiceType ? subServiceType.toString() : '0'); packet.push(PacketBodyDivider); for (const body of bodyList) { packet.push(body.type); packet.push(PacketBodyValueDivider); switch (body.type) { case PacketBodyValue.None: packet.push(body.value.toString()); break; case PacketBodyValue.Binary: packet.push(body.value.toString()); break; case PacketBodyValue.Integer: packet.push(body.value.toString()); break; case PacketBodyValue.String: packet.push(body.value); break; case PacketBodyValue.EncodedString: packet.push(body.value); break; case PacketBodyValue.MultiByteString: packet.push(body.value); break; case PacketBodyValue.WideString: packet.push(body.value); break; case PacketBodyValue.IuId: packet.push(body.value); break; case PacketBodyValue.RequestId: packet.push(body.value.toString()); break; default: break; } packet.push(PacketBodyDivider); } return packet.join(''); } }