import { Injectable } from '@angular/core'; import { Observable, Subject } from 'rxjs'; import { map, takeWhile, timeout, take, share, filter, tap } from 'rxjs/operators'; import { ProtocolService } from '@ucap-webmessenger/protocol'; import { InfoRequest, InfoResponse, encodeInfo, decodeInfo, decodeInfoData, InfoData } from '../protocols/info'; import { SVC_TYPE_EVENT, SSVC_TYPE_EVENT_INFO_REQ, SSVC_TYPE_EVENT_INFO_RES, SSVC_TYPE_EVENT_INFO_DATA, SSVC_TYPE_EVENT_SEND_REQ, SSVC_TYPE_EVENT_PUSH_CL_REQ, SSVC_TYPE_EVENT_READ_REQ, SSVC_TYPE_EVENT_DEL_REQ, SSVC_TYPE_EVENT_CANCEL_REQ, SSVC_TYPE_EVENT_CANCEL_NOTI, SSVC_TYPE_EVENT_SEND_NOTI, SSVC_TYPE_EVENT_READ_NOTI, SSVC_TYPE_EVENT_DEL_RES, SSVC_TYPE_EVENT_SEND_RES, SSVC_TYPE_EVENT_READ_RES } from '../types/service'; import { SendRequest, SendResponse, decodeSend, encodeSend, decodeSendNotification, SendNotification } from '../protocols/send'; import { PushRequest, encodePush } from '../protocols/push'; import { ReadResponse, ReadRequest, encodeRead, decodeRead, decodeReadNotification, ReadNotification } from '../protocols/read'; import { CancelRequest, CancelResponse, encodeCancel, decodeCancel, CancelNotification, decodeCancelNotification } from '../protocols/cancel'; import { decodeDelNotification, DelNotification, DelRequest, DelResponse, encodeDel, decodeDel } from '../protocols/del'; type Notifications = | SendNotification | ReadNotification | CancelNotification | DelNotification; @Injectable({ providedIn: 'root' }) export class EventProtocolService { private notificationSubject: Subject; public notification$: Observable; constructor(private protocolService: ProtocolService) { this.notificationSubject = new Subject(); this.notification$ = this.notificationSubject.asObservable().pipe(share()); this.protocolService.serverMessage .pipe( filter(message => message.serviceType === SVC_TYPE_EVENT), tap(message => { switch (message.subServiceType) { case SSVC_TYPE_EVENT_SEND_RES: case SSVC_TYPE_EVENT_SEND_NOTI: { this.notificationSubject.next(decodeSendNotification(message)); } break; case SSVC_TYPE_EVENT_READ_RES: case SSVC_TYPE_EVENT_READ_NOTI: { this.notificationSubject.next(decodeReadNotification(message)); } break; case SSVC_TYPE_EVENT_CANCEL_NOTI: { this.notificationSubject.next( decodeCancelNotification(message) ); } break; case SSVC_TYPE_EVENT_DEL_RES: { this.notificationSubject.next(decodeDelNotification(message)); } break; default: break; } }) ) .subscribe(); } public info(req: InfoRequest): Observable { return this.protocolService .call(SVC_TYPE_EVENT, SSVC_TYPE_EVENT_INFO_REQ, ...encodeInfo(req)) .pipe( takeWhile(res => SSVC_TYPE_EVENT_INFO_RES !== res.subServiceType, true), map(res => { if (SSVC_TYPE_EVENT_INFO_DATA === res.subServiceType) { return decodeInfoData(res); } return decodeInfo(res); }) ); } public send(req: SendRequest): Observable { return this.protocolService .call(SVC_TYPE_EVENT, SSVC_TYPE_EVENT_SEND_REQ, ...encodeSend(req)) .pipe( take(1), map(res => { return decodeSend(res); }) ); } public push(req: PushRequest): void { return this.protocolService.send( SVC_TYPE_EVENT, SSVC_TYPE_EVENT_PUSH_CL_REQ, ...encodePush(req) ); } public read(req: ReadRequest): Observable { return this.protocolService .call(SVC_TYPE_EVENT, SSVC_TYPE_EVENT_READ_REQ, ...encodeRead(req)) .pipe( take(1), map(res => { return decodeRead(res); }) ); } public del(req: DelRequest): Observable { return this.protocolService .call(SVC_TYPE_EVENT, SSVC_TYPE_EVENT_DEL_REQ, ...encodeDel(req)) .pipe( take(1), map(res => { return decodeDel(res); }) ); } public cancel(req: CancelRequest): Observable { return this.protocolService .call(SVC_TYPE_EVENT, SSVC_TYPE_EVENT_CANCEL_REQ, ...encodeCancel(req)) .pipe( take(1), map(res => { return decodeCancel(res); }) ); } }