import * as grpc from '@grpc/grpc-js'; import { Observable, Subject, Subscription } from "rxjs"; import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, MessageLog, ConnectionStatus, StreamAttribute } from "../interfaces/general.interface"; import { Status } from '@grpc/grpc-js/build/src/constants'; import { message_proto } from './protos/server.proto' import * as _ from 'lodash' export class GrpcServiceMethod { private server: grpc.Server | any private messageToBeSendOver: Message | any private clientRequest: Subject = new Subject() private localServerStatus: Subject = new Subject() // keep track of all the incoming channel request from the servers public getClientRequest() { return this.clientRequest } // keep track of all the incoming channel request from the servers public getLocalServerStatus() { return this.localServerStatus } public async create(connectionAttribute: ConnectionAttribute): Promise { return new Promise((resolve, reject) => { this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute) this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute) resolve('Just putting it here for now....') }) } private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) { // Reconnection Logic here while (true) { try { let recreatePromise = new Promise((resolve) => { if (grpcType.instanceType == 'server') { this.createServerStreamingServer(connectionAttribute).then(() => { resolve('recreate') }) } if (grpcType.instanceType == 'client') { this.createServerStreamingClient(connectionAttribute).then(() => { resolve('recreate') }) } }) await recreatePromise } catch (error) { console.error('Connection attempt failed:', error); } await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt // timeout generate message to trigger this reconnection } } // Create Server Instance to stream all application Outgoing messages public async createServerStreamingServer(connectionAttribute: ConnectionAttribute): Promise { // '0.0.0.0:3001' return new Promise((resolve, reject) => { try { if (!this.server) { this.server = new grpc.Server() this.localServerStatus.next({ connectionStatus: 'ON', connectionIDlocal: connectionAttribute.ConnectionID.local, message: `${connectionAttribute.outGoing.serverUrl} started.` }) } else { console.log(`Grpc server alrady started.`) this.localServerStatus.next({ connectionStatus: 'ON', connectionIDlocal: connectionAttribute.ConnectionID.local, message: `${connectionAttribute.outGoing.serverUrl} already started.` }) } this.server.addService(message_proto.Message.service, { HandleMessage: (call) => { /// add a checking for standard message request let clientInfo: ConnectionAttribute = JSON.parse(call.request.message) if (this.isConnectionAttribute(clientInfo)) { this.clientRequest.next(clientInfo) console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`) if (connectionAttribute.outGoing.MessageToBePublished) { let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({ next: (response: Message) => { console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `) let message = { id: response.id, message: JSON.stringify(response.message) } call.write(message) }, error: err => { console.error(err) subscription.unsubscribe() resolve('') }, complete: () => { console.log(`Stream response completed for ${call.request.id}`) subscription.unsubscribe() resolve('') } }) let report: ConnectionState = { status: 'DIRECT_PUBLISH' } connectionAttribute.connectionStatus!.next(report) } } else { console.error(`INVALID REQUEST`) } }, Check: (_, callback) => { // for now it is just sending the status message over to tell the client it is alive // For simplicity, always return "SERVING" as status callback(null, { status: 'SERVING' }); }, }); // Bind and start the server this.server.bindAsync(connectionAttribute.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => { console.log(`gRPC server is running on ${connectionAttribute.outGoing.serverUrl}`); this.server.start(); }); } catch (error) { resolve(error) } }) } // Send a request over to the other server to open a channel for this server to emit/stream messages over public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise { return new Promise(async (resolve, reject) => { const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure()); let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error ConnectionID: connectionAttribute.ConnectionID, outGoing: { StreamID: connectionAttribute.outGoing.StreamID, PublisherID: connectionAttribute.outGoing.PublisherID, SubscriberID: connectionAttribute.outGoing.SubscriberID, serverUrl: connectionAttribute.outGoing.serverUrl, MessageToBePublished: null, MessageToBeReceived: null }, inComing: { StreamID: connectionAttribute.inComing.StreamID, PublisherID: connectionAttribute.inComing.PublisherID, SubscriberID: connectionAttribute.inComing.SubscriberID, serverUrl: connectionAttribute.inComing.serverUrl, MessageToBePublished: null, MessageToBeReceived: null }, connectionStatus: null } let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) }) console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`) call.on('status', (status: Status) => { if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits console.log(`Message trasmission operation is successful`) // RPC completed successfully } if (status == grpc.status.UNAVAILABLE) { let report: ConnectionState = { status: 'BUFFER', reason: `Server doesn't seem to be alive. Error returned.`, payload: this.messageToBeSendOver ?? `There's no message at the moment...` } connectionAttribute.connectionStatus!.next(report) resolve('No connection established. Server is not responding..') } }); call.on('data', (data: any) => { let response: Message = { id: data.id, message: JSON.parse(data.message) } if (connectionAttribute.inComing.MessageToBeReceived) { connectionAttribute.inComing.MessageToBeReceived.next(response) } }); call.on('error', (err) => { console.error(err) resolve('') }); }) } // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health public async checkConnectionHealth(client: any, statusControl: Subject, alreadyHealthCheck: boolean): Promise { return new Promise((resolve, reject) => { client.Check({}, (error, response) => { if (response) { console.log(`GRPC Health check status: ${response.status} Server Connected`); // Intepret the response status and implement code logic or handler resolve(response.status) } else { if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`); reject(false) } }) }) } private isConnectionAttribute(obj: any): obj is ConnectionAttribute { const isMatch = ( typeof obj.ConnectionID === 'object' && // Further checks can be added based on the structure of ConnectionID isStreamAttribute(obj.outGoing) && isStreamAttribute(obj.inComing) && (obj.connectionStatus === null || obj.connectionStatus instanceof Subject) ); if (isMatch) { console.log('gRPC client call matches ConnectionAttribute type'); } else { console.log('gRPC client call does not match ConnectionAttribute type'); } return isMatch; function isStreamAttribute(obj: any): obj is StreamAttribute { return ( (typeof obj.StreamID === 'string' || obj.StreamID === undefined) && (typeof obj.PublisherID === 'string' || obj.PublisherID === undefined) && (typeof obj.SubscriberID === 'string' || obj.SubscriberID === undefined) && // Check other properties like PublisherInstance and SubscriberInstance based on their expected types (typeof obj.serverUrl === 'string' || obj.serverUrl === undefined) && // Check connectionState based on its type, assuming it's an enum or similar (obj.MessageToBePublished === null || obj.MessageToBePublished instanceof Observable) && (obj.MessageToBeReceived === null || obj.MessageToBeReceived instanceof Subject) ); } } } // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md