import { Injectable, Inject } from '@angular/core'; import { Subscription, Observable, Subject } from 'rxjs'; import { share, switchMap, retryWhen, delay, finalize } from 'rxjs/operators'; import { QueueingSubject } from 'queueing-subject'; import { makeWebSocketObservable, GetWebSocketResponses, NormalClosureMessage } from '@ucap-webmessenger/web-socket'; import { _MODULE_CONFIG } from '../types/token'; import { ModuleConfig } from '../types/module-config'; import { PacketBody } from '../protocols/packet'; import { PacketBodyValueDivider, PacketBodyDivider } from '../types/packet-body-divider'; import { PacketBodyValue } from '../types/packet-body-value.type'; import { SSVC_TYPE_ERROR_RES, ServerErrorCode } from '../types/service'; import { ProtocolMessage } from '../protocols/protocol'; import { NGXLogger } from 'ngx-logger'; interface RequestState { subject: Subject; request: { serviceType: number; subServiceType: number; bodyList: PacketBody[]; }; } @Injectable({ providedIn: 'root' }) export class ProtocolService { // tslint:disable-next-line: variable-name private _requestId: number | null = null; private readonly pendingRequests: Map; private readonly input$: QueueingSubject; private socket$: Observable>; private messages$: Observable; private messagesSubscription: Subscription | null = null; private serverMessageSubject: Subject | null = null; private serverMessage$: Observable | null = null; constructor( @Inject(_MODULE_CONFIG) private moduleConfig: ModuleConfig, private logger: NGXLogger ) { this.pendingRequests = new Map(); this.input$ = new QueueingSubject(); this.serverMessageSubject = new Subject(); this.serverMessage$ = this.serverMessageSubject .asObservable() .pipe(share()); } public connect(serverIp: string | null = null): Observable { return new Observable(subscriber => { try { this.socket$ = makeWebSocketObservable( `${this.moduleConfig.urls.base}${serverIp ? serverIp : ''}` ); this.messages$ = this.socket$.pipe( switchMap(getResponses => { subscriber.next(); return getResponses(this.input$); }), retryWhen(errors => errors.pipe(delay(this.moduleConfig.reconnect.delay)) ), share() ); this.messagesSubscription = this.messages$.subscribe( (message: string) => { const arg = message.split(PacketBodyDivider); if (2 > arg.length) { // OnError(3); return; } const res = this.decodePacket(arg); let requestState: RequestState | null = null; if (res.requestId) { requestState = this.pendingRequests.get(res.requestId); } if (SSVC_TYPE_ERROR_RES === res.message.subServiceType) { const errorCode: ServerErrorCode = res.message .bodyList[0] as ServerErrorCode; if (requestState) { requestState.subject.error(errorCode); } return; } if ( requestState && requestState.request.serviceType === res.message.serviceType ) { requestState.subject.next(res.message); return; } this.serverMessageSubject.next(res.message); }, (error: Error) => { const { message } = error; if (message === NormalClosureMessage) { this.logger.info( 'server closed the websocket connection normally' ); } else { this.logger.error( 'socket was disconnected due to error:', message ); } }, () => { // The clean termination only happens in response to the last // subscription to the observable being unsubscribed, any // other closure is considered an error. this.logger.info( 'the connection was closed in response to the user' ); } ); } catch (error) { subscriber.error(error); } }); } public disconnect(): void { this.messagesSubscription.unsubscribe(); } public get serverMessage(): Observable { return this.serverMessage$; } public call( serviceType: number | null, subServiceType: number | null, ...bodyList: PacketBody[] ): Observable { return this.sendInternal(true, serviceType, subServiceType, bodyList); } public send( serviceType: number, subServiceType: number, ...bodyList: PacketBody[] ): void { this.sendInternal(false, serviceType, subServiceType, bodyList); } private sendInternal( hasResponse: boolean, serviceType: number, subServiceType: number, bodyList: PacketBody[] ): Observable | undefined { let packet: string; let responseSubject: Subject | null = null; if (hasResponse) { const requestId = this.requestId; packet = this.encodePacket(serviceType, subServiceType, [ ...bodyList, { type: PacketBodyValue.RequestId, value: requestId } ]); responseSubject = new Subject().pipe( finalize(() => { if (this.pendingRequests.has(requestId)) { this.pendingRequests.delete(requestId); } this.logger.debug( 'ProtocolService::pendingRequests.size', this.pendingRequests.size ); }) ) as Subject; this.pendingRequests.set(requestId, { subject: responseSubject, request: { serviceType, subServiceType, bodyList } }); } else { packet = this.encodePacket(serviceType, subServiceType, bodyList); } this.input$.next(packet); return responseSubject ? responseSubject.asObservable() : undefined; } private get requestId() { if ( null === this._requestId || this.moduleConfig.requestId.max < this._requestId ) { this._requestId = this.moduleConfig.requestId.min; } return this._requestId++; } private decodePacket( arg: string[] ): { requestId: number; message: ProtocolMessage } | null { const cmdArg = arg[0].split(PacketBodyValueDivider); if (2 > cmdArg.length) { // OnError(3); return null; } const serviceType = Number(cmdArg[0]); const subServiceType = Number(cmdArg[1]); const seqArg = arg[1].split(PacketBodyValueDivider); if (2 > seqArg.length) { // OnError(3); return null; } const senderSeq = Number(seqArg[0]); const bodyList: any[] = []; let requestId: number | null = null; for (let i = 2; i < arg.length; i++) { const bodyArg = arg[i].split(PacketBodyValueDivider); if (2 > bodyArg.length) { // OnError(3); return null; } const valueType: PacketBodyValue = bodyArg[0] as PacketBodyValue; const value = bodyArg[1]; switch (valueType) { case PacketBodyValue.None: bodyList.push(value); break; case PacketBodyValue.Binary: bodyList.push(value); break; case PacketBodyValue.Integer: bodyList.push(Number(value)); break; case PacketBodyValue.String: bodyList.push(value); break; case PacketBodyValue.EncodedString: bodyList.push(value); break; case PacketBodyValue.MultiByteString: bodyList.push(value); break; case PacketBodyValue.WideString: bodyList.push(value); break; case PacketBodyValue.IuId: bodyList.push(value); break; case PacketBodyValue.RequestId: requestId = parseInt(value, 10); break; default: break; } } return { requestId, message: { serviceType, subServiceType, senderSeq, bodyList } }; } private encodePacket( serviceType: number | null, subServiceType: number | null, bodyList: PacketBody[] = [] ): string { const packet: string[] = []; packet.push(serviceType ? serviceType.toString() : '0'); packet.push(PacketBodyValueDivider); packet.push(subServiceType ? subServiceType.toString() : '0'); packet.push(PacketBodyDivider); for (const body of bodyList) { packet.push(body.type); packet.push(PacketBodyValueDivider); switch (body.type) { case PacketBodyValue.None: packet.push(body.value); break; case PacketBodyValue.Binary: packet.push(body.value); break; case PacketBodyValue.Integer: packet.push(String(body.value)); break; case PacketBodyValue.String: packet.push(body.value); break; case PacketBodyValue.EncodedString: packet.push(body.value); break; case PacketBodyValue.MultiByteString: packet.push(body.value); break; case PacketBodyValue.WideString: packet.push(body.value); break; case PacketBodyValue.IuId: packet.push(body.value); break; case PacketBodyValue.RequestId: packet.push(String(body.value)); break; default: break; } packet.push(PacketBodyDivider); } return packet.join(''); } }