grpc.service.method.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Subject, Subscription } from "rxjs";
  3. import { ReportStatus, ColorCode, Message, MessageLog, ConnectionAttribute, ConnectionRequest, GrpcConnectionType } from "../interfaces/general.interface";
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. import { v4 as uuidv4 } from 'uuid'
  6. import { message_proto } from './protos/server.proto'
  7. export class GrpcServiceMethod {
  8. public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute): Promise<any> {
  9. // Assuming currently only one client
  10. this.createGrpcInstance(request.server.serverUrl, { instanceType: 'server' }, connectionAttribute)
  11. this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute)
  12. }
  13. private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) {
  14. if (clientInfo) {
  15. connectionAttribute.inComing.ChannelID = clientInfo.channelID
  16. connectionAttribute.inComing.PublisherID = clientInfo.publisherID
  17. connectionAttribute.inComing.SubscriberID = clientInfo.subscriberID
  18. // let report: any = {
  19. // message: 'Remote Server Communication Established',
  20. // channelID: clientInfo.channelID
  21. // }
  22. // connectionAttribute.connectionStatus.next(report)
  23. }
  24. if (localInfo) {
  25. connectionAttribute.outGoing.ChannelID = localInfo.channelID
  26. connectionAttribute.outGoing.PublisherID = localInfo.publisherID
  27. connectionAttribute.outGoing.SubscriberID = localInfo.subscriberID
  28. // let report: any = {
  29. // message: 'Local Server Communication Established',
  30. // channelID: localInfo.channelID
  31. // }
  32. // connectionAttribute.connectionStatus.next(report)
  33. }
  34. if (connectionAttribute.outGoing.ChannelID && connectionAttribute.inComing.ChannelID) {
  35. connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.ChannelID + connectionAttribute.inComing.ChannelID
  36. connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.ChannelID + connectionAttribute.outGoing.ChannelID
  37. let report: ReportStatus = {
  38. code: ColorCode.GREEN,
  39. message: `ConnectionID acquired. Informing Restranmission to release Messages...`,
  40. }
  41. connectionAttribute.connectionStatus.next(report)
  42. console.log(connectionAttribute)
  43. }
  44. }
  45. // To be migrated into a service in the immediate future
  46. private async createGrpcInstance(
  47. serverUrl: string,
  48. grpcType: GrpcConnectionType,
  49. connectionAttribute: ConnectionAttribute,
  50. ) {
  51. let statusControl: Subject<ReportStatus> = connectionAttribute.connectionStatus
  52. let consecutiveResolutions = 0;
  53. let lastResolutionTime = Date.now();
  54. let alreadyHealthCheck: boolean = false
  55. let yellowErrorEmission: boolean = false
  56. let redErrorEmission: boolean = false
  57. while (true) {
  58. try {
  59. let recreatePromise = new Promise((resolve) => {
  60. if (grpcType.instanceType == 'server') {
  61. this.createServerStreamingServer(serverUrl, connectionAttribute).then(() => {
  62. resolve('recreate')
  63. })
  64. }
  65. if (grpcType.instanceType == 'client') {
  66. this.createServerStreamingClient(serverUrl, alreadyHealthCheck, connectionAttribute).then(() => {
  67. resolve('recreate')
  68. })
  69. }
  70. })
  71. await recreatePromise
  72. // If connection resolves (indicating failure), increment the count
  73. consecutiveResolutions++;
  74. // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
  75. alreadyHealthCheck = true
  76. if (redErrorEmission == false) {
  77. redErrorEmission = true
  78. // console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
  79. let error: ReportStatus = {
  80. code: ColorCode.RED,
  81. message: 'Server is not responding. Proceed to buffer.',
  82. }
  83. statusControl.next(error)
  84. }
  85. // Comment it out if Client wishes to use YELLOW for memory buffer instead of persistent storage buffer
  86. // if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
  87. // yellowErrorEmission = true
  88. // let error: ReportStatus = {
  89. // code: ColorCode.YELLOW,
  90. // // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
  91. // message: `Attempting reconnection... Server has yet to respond`,
  92. // }
  93. // statusControl.next(error);
  94. // }
  95. } catch (error) {
  96. // Connection did not resolve, reset the count
  97. consecutiveResolutions = 0;
  98. console.error('Connection attempt failed:', error);
  99. }
  100. // Check for a pause of more than 3 seconds since the last resolution attempt
  101. const currentTime = Date.now();
  102. const timeSinceLastResolution = currentTime - lastResolutionTime;
  103. if (timeSinceLastResolution > 2000) {
  104. consecutiveResolutions = 0;
  105. yellowErrorEmission = false
  106. redErrorEmission = false
  107. alreadyHealthCheck = false
  108. }
  109. // Update the last resolution time
  110. lastResolutionTime = currentTime;
  111. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  112. // timeout generate message to trigger this reconnection
  113. }
  114. }
  115. // Create Server Instance to stream all application Outgoing messages
  116. public async createServerStreamingServer(
  117. serverUrl: string,
  118. connectionAttribute: ConnectionAttribute
  119. ): Promise<any> { // '0.0.0.0:3001'
  120. return new Promise((resolve, reject) => {
  121. try {
  122. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  123. let server: grpc.Server = new grpc.Server();
  124. server.addService(message_proto.Message.service, {
  125. HandleMessage: (call) => {
  126. // Assign channel uuid
  127. let clientInfo = JSON.parse(call.request.message)
  128. this.generateAdditionalAttributes(connectionAttribute, clientInfo)
  129. console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
  130. if (connectionAttribute.outGoing.MessageToBePublished) {
  131. let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
  132. next: (response: Message) => {
  133. console.log(`Sending ${(response.message as MessageLog).appData.msgId}`)
  134. let message = {
  135. id: response.id,
  136. message: JSON.stringify(response.message)
  137. }
  138. call.write(message)
  139. },
  140. error: err => {
  141. console.error(err)
  142. subscription.unsubscribe()
  143. resolve('')
  144. },
  145. complete: () => {
  146. console.log(`Stream response completed for ${call.request.id}`)
  147. subscription.unsubscribe()
  148. resolve('')
  149. }
  150. })
  151. }
  152. },
  153. Check: (_, callback) => {
  154. // for now it is just sending the status message over to tell the client it is alive
  155. // For simplicity, always return "SERVING" as status
  156. callback(null, { status: 'SERVING' });
  157. },
  158. });
  159. // Bind and start the server
  160. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  161. console.log(`gRPC server is running on ${serverUrl}`);
  162. server.start();
  163. });
  164. }
  165. catch (error) {
  166. resolve(error)
  167. }
  168. })
  169. }
  170. // Send a request over to the other server to open a channel for this server to emit/stream messages over
  171. public async createServerStreamingClient(
  172. server: string,
  173. alreadyHealthCheck: boolean,
  174. connectionAttribute: ConnectionAttribute
  175. ): Promise<string> {
  176. return new Promise(async (resolve, reject) => {
  177. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  178. // perform check to see if server is alive, if not terminate this grpc instant and create again
  179. let outGoingInfo: any = {
  180. channelID: uuidv4(),
  181. publisherID: uuidv4(),
  182. subscriberID: uuidv4()
  183. }
  184. this.generateAdditionalAttributes(connectionAttribute, {}, outGoingInfo)
  185. // connectionAttribute.ConnectionID = connectionAttribute.outGoing.ChannelID + (connectionAttribute.inComing.ChannelID ?? 'undefined')
  186. let call = client.HandleMessage({ id: server, message: JSON.stringify(outGoingInfo) })
  187. console.log(`Sending request to ${server} to open response channel...`)
  188. call.on('status', (status: Status) => {
  189. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  190. console.log(`Message trasmission operation is successful`)
  191. // RPC completed successfully
  192. } if (status == grpc.status.UNAVAILABLE) {
  193. let report: ReportStatus = {
  194. code: ColorCode.RED,
  195. message: `Server doesn't seem to be alive. Error returned.`,
  196. }
  197. connectionAttribute.connectionStatus.next(report)
  198. resolve('No connection established. Server is not responding..')
  199. }
  200. });
  201. call.on('data', (data: any) => {
  202. let response: Message = {
  203. id: data.id,
  204. message: JSON.parse(data.message)
  205. }
  206. if (connectionAttribute.inComing.MessageToBeReceived) {
  207. connectionAttribute.inComing.MessageToBeReceived.next(response)
  208. }
  209. console.log((response.message as MessageLog).appData.msgId)
  210. });
  211. call.on('error', (err) => {
  212. resolve('')
  213. });
  214. })
  215. }
  216. // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
  217. public async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
  218. return new Promise((resolve, reject) => {
  219. client.Check({}, (error, response) => {
  220. if (response) {
  221. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  222. // Intepret the response status and implement code logic or handler
  223. resolve(response.status)
  224. } else {
  225. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  226. reject(false)
  227. }
  228. })
  229. })
  230. }
  231. }