108 lines
3.0 KiB
TypeScript

import { Inject, Injectable, Type } from '@angular/core';
import * as jspb from 'google-protobuf';
import * as nats from 'nats.ws';
import { HEADER_CLIENT } from 'app/modules/protobuf/c2se/core/network_pb';
import { Client } from 'app/modules/protobuf/models/core/network_pb';
import { ModuleConfig } from '../config/module-config';
import { _MODULE_CONFIG } from '../config/token';
type DeserializeConstructor<T> = {
new (): T;
deserializeBinary(this: DeserializeConstructor<T>, bytes: Uint8Array): T;
};
const NAME_GET_ERROR = 'getError';
const NAME_GET_RESULT = 'getResult';
@Injectable({
providedIn: 'root',
})
export class NatsService {
private __conn?: nats.NatsConnection;
/**
* Constructor
*/
constructor(@Inject(_MODULE_CONFIG) private __config: ModuleConfig) {}
// -----------------------------------------------------------------------------------------------------
// @ Accessors
// -----------------------------------------------------------------------------------------------------
get connection(): nats.NatsConnection | undefined {
return this.__conn;
}
// -----------------------------------------------------------------------------------------------------
// @ Public methods
// -----------------------------------------------------------------------------------------------------
connect(): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (!!this.__conn) {
return resolve();
}
nats
.connect(this.__config.connectionOptions)
.then((conn) => {
console.log('NATS connected', conn.info?.client_ip);
this.__conn = conn;
resolve();
})
.catch((e) => {
reject(e);
});
});
}
request<R>(
subject: string,
req: Uint8Array,
deserialize: (data: Uint8Array) => any,
opts?: nats.RequestOptions | undefined
): Promise<R> {
return new Promise<R>((resolve, reject) => {
let c = new Client();
c.setClientIp(this.__conn?.info?.client_ip + '');
c.setSessionId('');
c.setSiteUrl('');
let _opts: nats.RequestOptions = !!opts ? opts : { timeout: 3000 };
if (!_opts.headers) {
_opts.headers = nats.headers();
}
var decoder = new TextDecoder('utf8');
_opts.headers.append(
HEADER_CLIENT,
btoa(decoder.decode(c.serializeBinary()))
);
this.__conn?.request(subject, req, _opts).then((msg) => {
let res = deserialize(msg.data);
let get_error = (res as any)[NAME_GET_ERROR];
if (!!get_error) {
let error = get_error.call(res);
if (!!error) {
return reject(error);
}
}
let get_result = (res as any)[NAME_GET_RESULT];
if (!!get_result) {
let result = get_result.call(res);
if (!result) {
return reject('result is not exist');
}
return resolve(result);
}
return reject('protocol error');
});
});
}
}