grpc.service.ts 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import { Subject, Subscription, take, takeUntil } from 'rxjs';
  2. import { ColorCode, GrpcConnectionType, MessageLog, ReportStatus } from '../interfaces/general.interface';
  3. import { GrpcServiceMethod } from './service.method';
  4. export class GrpcService {
  5. private grpcServerConnection: any = {}
  6. private incomingRequest: Subject<any> = new Subject()
  7. private incomingResponse: Subject<any> = new Subject()
  8. constructor(private grpcServiceMethod: GrpcServiceMethod) { }
  9. public getIncomingRequest(): Subject<any> {
  10. return this.incomingRequest
  11. }
  12. public getIncomingResponse(): Subject<any> {
  13. return this.incomingResponse
  14. }
  15. public async stopServer(serverUrl: string): Promise<any> {
  16. return new Promise((resolve, reject) => {
  17. if (this.grpcServerConnection[serverUrl]) {
  18. console.log(`Shutting down the gRPC server:${serverUrl} ...`);
  19. // this.grpcServerConnection[serverUrl].tryShutdown(() => {
  20. // console.log(`Server ${serverUrl} has been gracefully stopped.`);
  21. // resolve('')
  22. // })
  23. resolve(this.grpcServerConnection[serverUrl].forceShutdown())
  24. console.log(`Server ${serverUrl} is forced to shut down!`)
  25. // simply removing the reference to the GrpcService instance associated with the specific serverUrl from the grpcServerConnection object.
  26. // However, the gRPC server instance itself continues to run as long as it has not been explicitly shut down using methods like tryShutdown.
  27. console.log(`Deleting grpc connection instance:${serverUrl} .....`)
  28. delete this.grpcServerConnection[serverUrl];
  29. } else {
  30. console.log(`Server${serverUrl} is not running.`);
  31. reject()
  32. }
  33. })
  34. }
  35. public getAllGrpcServerConnectionInstance(): any {
  36. console.log(this.grpcServerConnection)
  37. return this.grpcServerConnection
  38. }
  39. // To be migrated into a service in the immediate future
  40. public async createGrpcInstance(serverUrl: string, messageToBePublished: Subject<MessageLog>, reportStatus: Subject<ReportStatus>, connectionType: GrpcConnectionType) {
  41. let messageToBeTransmitted: Subject<MessageLog> = messageToBePublished
  42. let statusControl: Subject<ReportStatus> = reportStatus
  43. let consecutiveResolutions = 0;
  44. let lastResolutionTime = Date.now();
  45. let alreadyHealthCheck: boolean = false
  46. let yellowErrorEmission: boolean = false
  47. let redErrorEmission: boolean = false
  48. while (true) {
  49. try {
  50. if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
  51. await this.grpcServiceMethod.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingResponse);
  52. }
  53. if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') {
  54. await this.grpcServiceMethod.createServerStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingResponse);
  55. }
  56. if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
  57. await this.grpcServiceMethod.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl, this.grpcServerConnection, this.incomingRequest)
  58. }
  59. if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') {
  60. await this.grpcServiceMethod.createServerStreamingServer(serverUrl, alreadyHealthCheck, messageToBePublished, statusControl, this.grpcServerConnection, this.incomingRequest)
  61. }
  62. // If connection resolves (indicating failure), increment the count
  63. consecutiveResolutions++;
  64. // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
  65. alreadyHealthCheck = true
  66. // If there are x consecutive resolutions, log an error and break the loop
  67. if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) {
  68. redErrorEmission = true
  69. console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
  70. let error: ReportStatus = {
  71. code: ColorCode.RED,
  72. message: 'Initiate Doomsday protocol....',
  73. from: `GRPC instance management`
  74. }
  75. statusControl.next(error)
  76. }
  77. if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
  78. yellowErrorEmission = true
  79. let error: ReportStatus = {
  80. code: ColorCode.YELLOW,
  81. // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
  82. message: `Attempting reconnection... Server has yet to respond`,
  83. from: `GRPC instance management`
  84. }
  85. statusControl.next(error);
  86. }
  87. } catch (error) {
  88. // Connection did not resolve, reset the count
  89. consecutiveResolutions = 0;
  90. console.error('Connection attempt failed:', error);
  91. }
  92. // Check for a pause of more than 3 seconds since the last resolution attempt
  93. const currentTime = Date.now();
  94. const timeSinceLastResolution = currentTime - lastResolutionTime;
  95. if (timeSinceLastResolution > 2000) {
  96. consecutiveResolutions = 0;
  97. yellowErrorEmission = false
  98. redErrorEmission = false
  99. alreadyHealthCheck = false
  100. }
  101. // Update the last resolution time
  102. lastResolutionTime = currentTime;
  103. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  104. // timeout generate message to trigger this reconnection
  105. }
  106. }
  107. }