grpc.service.ts 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import { Subject, Subscription, take, takeUntil } from 'rxjs';
  2. import { ColorCode, GrpcConnectionType, Message, ReportStatus } from '../interfaces/general.interface';
  3. import { GrpcServiceMethod } from './service.method';
  4. export class GrpcService {
  5. private grpcServerConnection: any = {}
  6. private incomingMessage: Subject<any> = new Subject()
  7. constructor(private grpcServiceMethod: GrpcServiceMethod) { }
  8. public getIncomingMessage(): Subject<Message> {
  9. return this.incomingMessage
  10. }
  11. public async stopServer(serverUrl: string): Promise<any> {
  12. return new Promise((resolve, reject) => {
  13. if (this.grpcServerConnection[serverUrl]) {
  14. console.log(`Shutting down the gRPC server:${serverUrl} ...`);
  15. // this.grpcServerConnection[serverUrl].tryShutdown(() => {
  16. // console.log(`Server ${serverUrl} has been gracefully stopped.`);
  17. // resolve('')
  18. // })
  19. resolve(this.grpcServerConnection[serverUrl].forceShutdown())
  20. console.log(`Server ${serverUrl} is forced to shut down!`)
  21. // simply removing the reference to the GrpcService instance associated with the specific serverUrl from the grpcServerConnection object.
  22. // However, the gRPC server instance itself continues to run as long as it has not been explicitly shut down using methods like tryShutdown.
  23. console.log(`Deleting grpc connection instance:${serverUrl} .....`)
  24. delete this.grpcServerConnection[serverUrl];
  25. } else {
  26. console.log(`Server${serverUrl} is not running.`);
  27. reject()
  28. }
  29. })
  30. }
  31. public getAllGrpcServerConnectionInstance(): any {
  32. console.log(this.grpcServerConnection)
  33. return this.grpcServerConnection
  34. }
  35. // To be migrated into a service in the immediate future
  36. public async createGrpcInstance(serverUrl: string, reportStatus: Subject<ReportStatus>, connectionType: GrpcConnectionType, messageToBePublished?: Subject<Message>) {
  37. let messageToBeTransmitted: Subject<Message> = messageToBePublished ?? new Subject()
  38. let statusControl: Subject<ReportStatus> = reportStatus
  39. let consecutiveResolutions = 0;
  40. let lastResolutionTime = Date.now();
  41. let alreadyHealthCheck: boolean = false
  42. let yellowErrorEmission: boolean = false
  43. let redErrorEmission: boolean = false
  44. while (true) {
  45. try {
  46. if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') {
  47. await this.grpcServiceMethod.createServerStreamingClient(serverUrl, alreadyHealthCheck, statusControl, this.incomingMessage);
  48. }
  49. if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') {
  50. await this.grpcServiceMethod.createServerStreamingServer(serverUrl, this.grpcServerConnection, messageToBeTransmitted)
  51. }
  52. /* To be enabled again if there's a need for bidiretional streaming */
  53. // if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
  54. // await this.grpcServiceMethod.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingMessage);
  55. // }
  56. // if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
  57. // await this.grpcServiceMethod.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl, this.grpcServerConnection)
  58. // }
  59. // If connection resolves (indicating failure), increment the count
  60. consecutiveResolutions++;
  61. // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
  62. alreadyHealthCheck = true
  63. // If there are x consecutive resolutions, log an error and break the loop
  64. if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) {
  65. redErrorEmission = true
  66. console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
  67. let error: ReportStatus = {
  68. code: ColorCode.RED,
  69. message: 'Initiate Doomsday protocol....',
  70. from: `GRPC instance management`
  71. }
  72. statusControl.next(error)
  73. }
  74. if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
  75. yellowErrorEmission = true
  76. let error: ReportStatus = {
  77. code: ColorCode.YELLOW,
  78. // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
  79. message: `Attempting reconnection... Server has yet to respond`,
  80. from: `GRPC instance management`
  81. }
  82. statusControl.next(error);
  83. }
  84. } catch (error) {
  85. // Connection did not resolve, reset the count
  86. consecutiveResolutions = 0;
  87. console.error('Connection attempt failed:', error);
  88. }
  89. // Check for a pause of more than 3 seconds since the last resolution attempt
  90. const currentTime = Date.now();
  91. const timeSinceLastResolution = currentTime - lastResolutionTime;
  92. if (timeSinceLastResolution > 2000) {
  93. consecutiveResolutions = 0;
  94. yellowErrorEmission = false
  95. redErrorEmission = false
  96. alreadyHealthCheck = false
  97. }
  98. // Update the last resolution time
  99. lastResolutionTime = currentTime;
  100. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  101. // timeout generate message to trigger this reconnection
  102. }
  103. }
  104. }