123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import { Subject, Subscription, take, takeUntil } from 'rxjs';
- import { ColorCode, GrpcConnectionType, Message, ReportStatus } from '../interfaces/general.interface';
- import { GrpcServiceMethod } from './service.method';
- export class GrpcService {
- private grpcServerConnection: any = {}
- private incomingMessage: Subject<any> = new Subject()
- constructor(private grpcServiceMethod: GrpcServiceMethod) { }
- public getIncomingMessage(): Subject<Message> {
- return this.incomingMessage
- }
- public async stopServer(serverUrl: string): Promise<any> {
- return new Promise((resolve, reject) => {
- if (this.grpcServerConnection[serverUrl]) {
- console.log(`Shutting down the gRPC server:${serverUrl} ...`);
- // this.grpcServerConnection[serverUrl].tryShutdown(() => {
- // console.log(`Server ${serverUrl} has been gracefully stopped.`);
- // resolve('')
- // })
- resolve(this.grpcServerConnection[serverUrl].forceShutdown())
- console.log(`Server ${serverUrl} is forced to shut down!`)
- // simply removing the reference to the GrpcService instance associated with the specific serverUrl from the grpcServerConnection object.
- // However, the gRPC server instance itself continues to run as long as it has not been explicitly shut down using methods like tryShutdown.
- console.log(`Deleting grpc connection instance:${serverUrl} .....`)
- delete this.grpcServerConnection[serverUrl];
- } else {
- console.log(`Server${serverUrl} is not running.`);
- reject()
- }
- })
- }
- public getAllGrpcServerConnectionInstance(): any {
- console.log(this.grpcServerConnection)
- return this.grpcServerConnection
- }
- // To be migrated into a service in the immediate future
- public async createGrpcInstance(serverUrl: string, reportStatus: Subject<ReportStatus>, connectionType: GrpcConnectionType, messageToBePublished?: Subject<Message>) {
- let messageToBeTransmitted: Subject<Message> = messageToBePublished ?? new Subject()
- let statusControl: Subject<ReportStatus> = reportStatus
- let consecutiveResolutions = 0;
- let lastResolutionTime = Date.now();
- let alreadyHealthCheck: boolean = false
- let yellowErrorEmission: boolean = false
- let redErrorEmission: boolean = false
- while (true) {
- try {
- if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') {
- await this.grpcServiceMethod.createServerStreamingClient(serverUrl, alreadyHealthCheck, statusControl, this.incomingMessage);
- }
- if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') {
- await this.grpcServiceMethod.createServerStreamingServer(serverUrl, this.grpcServerConnection, messageToBeTransmitted)
- }
- /* To be enabled again if there's a need for bidiretional streaming */
- // if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
- // await this.grpcServiceMethod.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingMessage);
- // }
- // if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
- // await this.grpcServiceMethod.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl, this.grpcServerConnection)
- // }
- // If connection resolves (indicating failure), increment the count
- consecutiveResolutions++;
- // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
- alreadyHealthCheck = true
- // If there are x consecutive resolutions, log an error and break the loop
- if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) {
- redErrorEmission = true
- console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
- let error: ReportStatus = {
- code: ColorCode.RED,
- message: 'Initiate Doomsday protocol....',
- from: `GRPC instance management`
- }
- statusControl.next(error)
- }
- if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
- yellowErrorEmission = true
- let error: ReportStatus = {
- code: ColorCode.YELLOW,
- // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
- message: `Attempting reconnection... Server has yet to respond`,
- from: `GRPC instance management`
- }
- statusControl.next(error);
- }
- } catch (error) {
- // Connection did not resolve, reset the count
- consecutiveResolutions = 0;
- console.error('Connection attempt failed:', error);
- }
- // Check for a pause of more than 3 seconds since the last resolution attempt
- const currentTime = Date.now();
- const timeSinceLastResolution = currentTime - lastResolutionTime;
- if (timeSinceLastResolution > 2000) {
- consecutiveResolutions = 0;
- yellowErrorEmission = false
- redErrorEmission = false
- alreadyHealthCheck = false
- }
- // Update the last resolution time
- lastResolutionTime = currentTime;
- await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
- // timeout generate message to trigger this reconnection
- }
- }
- }
|