import { BehaviorSubject, Subject } from 'rxjs'; import { ClientRequest, ConnectionAttribute, ConnectionRequest, ConnectionState, Message, ServerRequest, State } from '../interfaces/general.interface'; import { GrpcServiceMethod } from './grpc.service.method'; import { BufferService } from './buffer.service'; import * as dotenv from 'dotenv' import * as _ from 'lodash' dotenv.config() /** * @deprecated This service will be removed in future versions. * We are using websocket now. */ export class ServerClientManager { private connectionAttributes: ConnectionAttribute[] = [] private grpcService: GrpcServiceMethod = new GrpcServiceMethod() private defaultServerAttribute: ServerRequest = { name: 'Default - Server', serverUrl: "localhost:3000", connectionType: 'GRPC', messageToBePublishedFromApplication: new Subject() } private defaultClientAttribute: ClientRequest = { name: 'Default - Client', targetServer: "localhost:3001", connectionType: 'GRPC', messageToBeReceivedFromRemote: new Subject() } constructor() { // this.generateConnection({}) } // create connection based on the request from the application public async generateConnection(request: ConnectionRequest): Promise { return new Promise(async (resolve, reject) => { let initialReport: ConnectionState let reportSubject: BehaviorSubject let retransmission: BufferService let errorString: string let originalRequest = _.cloneDeep(request) let database: string let response: any = { message: `Fail to complete connection generation` } let statusChain: State = 1 let connectionAttribute: ConnectionAttribute if (statusChain == 1) { if (!request.server) { request.server = this.defaultServerAttribute } if (!request.client) { request.client = this.defaultClientAttribute } if (request.database) { database = request.database } else { database = request.server.name + request.client.name } /* Inject retransmission here */ initialReport = { status: 'BUFFER' } reportSubject = new BehaviorSubject(initialReport) retransmission = new BufferService(request.server.messageToBePublishedFromApplication, reportSubject, database) } if (statusChain == 1) { // Connection Type checking if (request.server!.connectionType != request.client!.connectionType) { statusChain = -1 errorString = "Connection Type DOES NOT MATCH!" } else { statusChain = 1 } } if (statusChain == 1) { connectionAttribute = { ConnectionID: { local: request.server!.name + request.client!.name, remote: request.client!.name + request.server!.name }, outGoing: { StreamID: request.server!.name, PublisherID: request.server!.name, SubscriberID: request.server!.name, serverUrl: request.server?.serverUrl, MessageToBePublished: retransmission!.getMessages(), MessageToBeReceived: null }, inComing: { StreamID: request.client!.name, PublisherID: request.client!.name, SubscriberID: request.client!.name, serverUrl: request.client?.targetServer, MessageToBePublished: null, MessageToBeReceived: request.client!.messageToBeReceivedFromRemote }, connectionStatus: reportSubject! } } if (statusChain == 1) { // to prevent duplicate connection from being created await this.checkConnectionAttribute(connectionAttribute!).then((res) => { if (res == true) { console.log(`Connection<${connectionAttribute.ConnectionID.local}> already exists `) } if (res == false) { this.connectionAttributes.push(connectionAttribute) console.log(`Connection ${connectionAttribute.ConnectionID.local} registered...`) } console.log(`There is now ${this.connectionAttributes.length} connection Attributes`) }) } if (statusChain == 1) { // This is default connection` if (!request.client!.connectionType) { request.client!.connectionType = 'GRPC' } // For each connection type: if (request.client!.connectionType == 'GRPC') { this.grpcService.create(connectionAttribute!, this.connectionAttributes).then(() => { // logic here }).catch(() => { errorString = `Something wrong with gRPC methods` statusChain = -1 }) } } if (statusChain == 1) { response = { message: "Channel Response", requestedTo: originalRequest, data: connectionAttribute! } resolve(response); } else if (statusChain == -1) { response = { message: "Channel Response Error", requestedTo: originalRequest, data: errorString! // put error string here } resolve(response); } }) } private async checkConnectionAttribute(connectionAttribute: ConnectionAttribute): Promise { return new Promise((resolve) => { let result: boolean = this.connectionAttributes.some(connection => connection.ConnectionID.local === connectionAttribute.ConnectionID.local ); resolve(result); }); } }