import { BehaviorSubject, Subject } from 'rxjs'; import { ClientRequest, ConnectionAttribute, ConnectionRequest, ConnectionState, Message, ServerRequest, State } from '../interfaces/general.interface'; import { GrpcServiceMethod } from './grpc.service.method'; import { BufferService } from './buffer.service'; import * as dotenv from 'dotenv' import * as _ from 'lodash' dotenv.config() export class ServerClientManager { private connectionAttributes: ConnectionAttribute[] = [] private grpcService: GrpcServiceMethod = new GrpcServiceMethod() private defaultServerAttribute: ServerRequest = { name: 'Default - Server', serverUrl: "localhost:3000", connectionType: 'GRPC', messageToBePublishedFromApplication: new Subject() } private defaultClientAttribute: ClientRequest = { name: 'Default - Client', targetServer: "localhost:3001", connectionType: 'GRPC', messageToBeReceivedFromRemote: new Subject() } constructor() { } public async generateConnection(request: ConnectionRequest): Promise { return new Promise(async (resolve, reject) => { let initialReport: ConnectionState let reportSubject: BehaviorSubject let retransmission: BufferService let errorString: string let originalRequest = _.cloneDeep(request) let database: string let response: any = { message: `Fail to complete connection generation` } let statusChain: State = 1 let connectionAttribute: ConnectionAttribute if (statusChain == 1) { if (!request.server) { request.server = this.defaultServerAttribute } if (!request.client) { request.client = this.defaultClientAttribute } if (request.database) { database = request.database } else { database = request.server.name + request.client.name } /* Inject retransmission here */ initialReport = { status: 'BUFFER' } reportSubject = new BehaviorSubject(initialReport) retransmission = new BufferService(request.server.messageToBePublishedFromApplication, reportSubject, database) } if (statusChain == 1) { // Connection Type checking if (request.server!.connectionType != request.client!.connectionType) { statusChain = -1 errorString = "Connection Type DOES NOT MATCH!" } else { statusChain = 1 } } if (statusChain == 1) { connectionAttribute = { ConnectionID: { local: request.server!.name + request.client!.name, remote: request.client!.name + request.server!.name }, outGoing: { StreamID: request.server!.name, PublisherID: request.server!.name, SubscriberID: request.server!.name, serverUrl: request.server?.serverUrl, connectionState: `OFF`, MessageToBePublished: retransmission!.getMessages(), MessageToBeReceived: null }, inComing: { StreamID: request.client!.name, PublisherID: request.client!.name, SubscriberID: request.client!.name, serverUrl: request.client?.targetServer, connectionState: `OFF`, MessageToBePublished: null, MessageToBeReceived: request.client!.messageToBeReceivedFromRemote }, connectionStatus: reportSubject! } } if (statusChain == 1) { await this.checkConnectionAttribute(connectionAttribute!).then((res) => { if (res == true) { console.log(`Connection<${connectionAttribute.ConnectionID.local}> already exists `) } if (res == false) { this.connectionAttributes.push(connectionAttribute) console.log(`Connection ${connectionAttribute.ConnectionID.local} registered...`) } console.log(`There is now ${this.connectionAttributes.length} connection Attributes`) }) } if (statusChain == 1) { // This is default connection` if (!request.client!.connectionType) { request.client!.connectionType = 'GRPC' } // For each connection type: if (request.client!.connectionType == 'GRPC') { this.grpcServerStatusHandler() this.grpcClientStatusHandler() this.grpcService.create(connectionAttribute!).then(() => { // logic here }).catch(() => { errorString = `Something wrong with gRPC methods` statusChain = -1 }) } } if (statusChain == 1) { response = { message: "Channel Response", requestedTo: originalRequest, data: connectionAttribute! } resolve(response); } else if (statusChain == -1) { response = { message: "Channel Response Error", requestedTo: originalRequest, data: errorString! // put error string here } resolve(response); } }) } private async checkConnectionAttribute(connectionAttribute: ConnectionAttribute): Promise { return new Promise((resolve) => { let result: boolean = this.connectionAttributes.some(connection => connection.ConnectionID.local === connectionAttribute.ConnectionID.local ); resolve(result); }); } private grpcServerStatusHandler() { this.grpcService.getLocalServerStatus().subscribe({ next: (notification: any) => { if (notification.connectionStatus === `ON`) { let connectionAttribute = this.connectionAttributes.find(connection => connection.ConnectionID.local == notification.connectionIDlocal) if (connectionAttribute) { connectionAttribute.outGoing.connectionState = 'ON' console.log(`Connection ${notification.connectionIDlocal} updated`) } else { console.log(`Connection ${notification.connectionIDlocal} attribute is not found.`) } } }, error: err => console.error(err), complete: () => { } }) } private grpcClientStatusHandler() { this.grpcService.getClientRequest().subscribe({ next: (clientConnectionAttribute: ConnectionAttribute) => { console.log(`Received a request from ${clientConnectionAttribute.outGoing.serverUrl}`) let connectionAttribute = this.connectionAttributes.find(connection => connection.ConnectionID.remote == clientConnectionAttribute.ConnectionID.local) if (connectionAttribute) { console.log(`Connection ${clientConnectionAttribute.ConnectionID.local} updated`) connectionAttribute.inComing.connectionState = 'ON' } else { console.log(`Connection Attribut ${clientConnectionAttribute.inComing.PublisherID} is not found`) } }, error: err => console.error(err), complete: () => { } }) } }