import { Subject, Subscription, take, takeUntil } from 'rxjs'; import { ColorCode, GrpcConnectionType, Message, ReportStatus } from '../interfaces/general.interface'; import { GrpcServiceMethod } from './service.method'; export class GrpcService { private grpcServerConnection: any = {} private incomingMessage: Subject = new Subject() constructor(private grpcServiceMethod: GrpcServiceMethod) { } public getIncomingMessage(): Subject { return this.incomingMessage } public async stopServer(serverUrl: string): Promise { return new Promise((resolve, reject) => { if (this.grpcServerConnection[serverUrl]) { console.log(`Shutting down the gRPC server:${serverUrl} ...`); // this.grpcServerConnection[serverUrl].tryShutdown(() => { // console.log(`Server ${serverUrl} has been gracefully stopped.`); // resolve('') // }) resolve(this.grpcServerConnection[serverUrl].forceShutdown()) console.log(`Server ${serverUrl} is forced to shut down!`) // simply removing the reference to the GrpcService instance associated with the specific serverUrl from the grpcServerConnection object. // However, the gRPC server instance itself continues to run as long as it has not been explicitly shut down using methods like tryShutdown. console.log(`Deleting grpc connection instance:${serverUrl} .....`) delete this.grpcServerConnection[serverUrl]; } else { console.log(`Server${serverUrl} is not running.`); reject() } }) } public getAllGrpcServerConnectionInstance(): any { console.log(this.grpcServerConnection) return this.grpcServerConnection } // To be migrated into a service in the immediate future public async createGrpcInstance(serverUrl: string, reportStatus: Subject, connectionType: GrpcConnectionType, messageToBePublished?: Subject) { let messageToBeTransmitted: Subject = messageToBePublished ?? new Subject() let statusControl: Subject = reportStatus let consecutiveResolutions = 0; let lastResolutionTime = Date.now(); let alreadyHealthCheck: boolean = false let yellowErrorEmission: boolean = false let redErrorEmission: boolean = false while (true) { try { if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') { await this.grpcServiceMethod.createServerStreamingClient(serverUrl, alreadyHealthCheck, statusControl, this.incomingMessage); } if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') { await this.grpcServiceMethod.createServerStreamingServer(serverUrl, this.grpcServerConnection, messageToBeTransmitted) } /* To be enabled again if there's a need for bidiretional streaming */ // if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') { // await this.grpcServiceMethod.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingMessage); // } // if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') { // await this.grpcServiceMethod.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl, this.grpcServerConnection) // } // If connection resolves (indicating failure), increment the count consecutiveResolutions++; // console.log(`Reconnection Attempt: ${consecutiveResolutions}`) alreadyHealthCheck = true // If there are x consecutive resolutions, log an error and break the loop if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) { redErrorEmission = true console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`); let error: ReportStatus = { code: ColorCode.RED, message: 'Initiate Doomsday protocol....', from: `GRPC instance management` } statusControl.next(error) } if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) { yellowErrorEmission = true let error: ReportStatus = { code: ColorCode.YELLOW, // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond` message: `Attempting reconnection... Server has yet to respond`, from: `GRPC instance management` } statusControl.next(error); } } catch (error) { // Connection did not resolve, reset the count consecutiveResolutions = 0; console.error('Connection attempt failed:', error); } // Check for a pause of more than 3 seconds since the last resolution attempt const currentTime = Date.now(); const timeSinceLastResolution = currentTime - lastResolutionTime; if (timeSinceLastResolution > 2000) { consecutiveResolutions = 0; yellowErrorEmission = false redErrorEmission = false alreadyHealthCheck = false } // Update the last resolution time lastResolutionTime = currentTime; await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt // timeout generate message to trigger this reconnection } } }