import * as grpc from '@grpc/grpc-js'; import { Observable, Subject, Subscription } from "rxjs"; import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, StreamAttribute, MessageLog } from "../interfaces/general.interface"; import { Status } from '@grpc/grpc-js/build/src/constants'; import { message_proto } from './protos/server.proto' import * as _ from 'lodash' import { ServerClientManager } from './server-client.service'; export class GrpcServiceMethod { private connectionAttribute: ConnectionAttribute | undefined private connectionAttributes: ConnectionAttribute[] = [] private server: grpc.Server | any private messageToBeSendOver: Message | any private clientRequest: Subject = new Subject() // public interface for service client to establish connection. They will give server and client information as a pair public async create(connectionAttribute: ConnectionAttribute, connectionAttributes: ConnectionAttribute[]): Promise { this.connectionAttribute = connectionAttribute this.connectionAttributes = connectionAttributes return new Promise((resolve, reject) => { this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute) this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute) resolve('Just putting it here for now....') }) } private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) { // Reconnection Logic while (true) { try { let recreatePromise = new Promise((resolve) => { if (grpcType.instanceType == 'server' && !this.server) { this.createServerStreamingServer().then(() => { resolve('recreate') }) } if (grpcType.instanceType == 'client') { this.createServerStreamingClient(connectionAttribute).then(() => { resolve('recreate') }) } }) await recreatePromise } catch (error) { console.error('Connection attempt failed:', error); } await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt // timeout generate message to trigger this reconnection } } // Create Server Instance to stream all application Outgoing messages public async createServerStreamingServer(): Promise { // '0.0.0.0:3001' return new Promise((resolve, reject) => { this.server = new grpc.Server() this.server.addService(message_proto.Message.service, { HandleMessage: (call) => { let clientRequest: ConnectionAttribute = JSON.parse(call.request.message) /* Receive request, it will talk server client service, scs will do the checking instead of doing them here, two scenario. if this is a new client, then a new subject is assigned. IF this is previous channel, an existing subject is assigned back. */ // client Request validation if (this.isConnectionAttribute(clientRequest)) { // Check if this connection exists let result: ConnectionAttribute | undefined = this.connectionAttributes.find((connectionAttribute: ConnectionAttribute) => connectionAttribute.ConnectionID.local === clientRequest.ConnectionID.remote) if (result) { // if exist, reassign back the buffer let subscription: Subscription = result.outGoing.MessageToBePublished!.subscribe({ next: (outGoingMessage: Message) => { let message = { id: outGoingMessage.id, message: JSON.stringify(outGoingMessage.message) } console.log(`Sending ${(outGoingMessage.message as MessageLog).appData.msgId} to ${clientRequest.outGoing.PublisherID}`) call.write(message) }, error: err => { console.error(err) subscription.unsubscribe() resolve(``) }, complete: () => { subscription.unsubscribe() resolve(``) } }) let report: ConnectionState = { status: `DIRECT_PUBLISH` } result.connectionStatus!.next(report) } if (!result) { console.log(`No matching results.... leaving the logic blank for now...`) /* Currently haven't thought of a solution for this. Even if i do , the simplest one woul be to assisgn a new buffer, which means the server client service will have to instantiate a new one for the incoming new client. Right now, there is no need since the amount of clients and their ID are predetermined. TO be discuseed further. */ } } } }) this.server.bindAsync(this.connectionAttribute!.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => { console.log(`gRPC server is running on ${this.connectionAttribute?.outGoing.serverUrl}`) this.server.start() }) }) } // Send a request over to the other server to open a channel for this server to emit/stream messages over public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise { return new Promise(async (resolve, reject) => { const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure()); let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error ConnectionID: connectionAttribute.ConnectionID, outGoing: { StreamID: connectionAttribute.outGoing.StreamID, PublisherID: connectionAttribute.outGoing.PublisherID, SubscriberID: connectionAttribute.outGoing.SubscriberID, serverUrl: connectionAttribute.outGoing.serverUrl, MessageToBePublished: null, MessageToBeReceived: null }, inComing: { StreamID: connectionAttribute.inComing.StreamID, PublisherID: connectionAttribute.inComing.PublisherID, SubscriberID: connectionAttribute.inComing.SubscriberID, serverUrl: connectionAttribute.inComing.serverUrl, MessageToBePublished: null, MessageToBeReceived: null }, connectionStatus: null } let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) }) console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`) call.on('status', (status: Status) => { if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits console.log(`Message trasmission operation is successful`) // RPC completed successfully } if (status == grpc.status.UNAVAILABLE) { let report: ConnectionState = { status: 'BUFFER', reason: `Server doesn't seem to be alive. Error returned.`, payload: this.messageToBeSendOver ?? `There's no message at the moment...` } connectionAttribute.connectionStatus!.next(report) resolve('No connection established. Server is not responding..') } }); call.on('data', (data: any) => { let response: Message = { id: data.id, message: JSON.parse(data.message) } if (connectionAttribute.inComing.MessageToBeReceived) { connectionAttribute.inComing.MessageToBeReceived.next(response) } }); call.on('error', (err) => { // console.error(err) let report: ConnectionState = { status: 'BUFFER', reason: `Server doesn't seem to be alive. Error returned.`, payload: this.messageToBeSendOver ?? `There's no message at the moment...` } connectionAttribute.connectionStatus!.next(report) resolve('') }); }) } // TO check or validate if the client request meets the criteria private isConnectionAttribute(obj: any): obj is ConnectionAttribute { const isMatch = ( typeof obj.ConnectionID === 'object' && // Further checks can be added based on the structure of ConnectionID isStreamAttribute(obj.outGoing) && isStreamAttribute(obj.inComing) && (obj.connectionStatus === null || obj.connectionStatus instanceof Subject) ); if (isMatch) { console.log('gRPC client call matches ConnectionAttribute type'); } else { console.log('gRPC client call does not match ConnectionAttribute type'); } return isMatch; function isStreamAttribute(obj: any): obj is StreamAttribute { return ( (typeof obj.StreamID === 'string' || obj.StreamID === undefined) && (typeof obj.PublisherID === 'string' || obj.PublisherID === undefined) && (typeof obj.SubscriberID === 'string' || obj.SubscriberID === undefined) && // Check other properties like PublisherInstance and SubscriberInstance based on their expected types (typeof obj.serverUrl === 'string' || obj.serverUrl === undefined) && // Check connectionState based on its type, assuming it's an enum or similar (obj.MessageToBePublished === null || obj.MessageToBePublished instanceof Observable) && (obj.MessageToBeReceived === null || obj.MessageToBeReceived instanceof Subject) ); } } }