grpc.service.ts 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Subject, Subscription, take, takeUntil } from 'rxjs';
  3. import { ColorCode, GrpcConnectionType, MessageLog, ReportStatus } from '../interfaces/general.interface';
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. const message_proto = require('./protos/server.proto')
  6. export class GrpcService {
  7. private grpcServerConnection: any = {}
  8. private incomingRequest: Subject<any> = new Subject()
  9. private incomingResponse: Subject<any> = new Subject()
  10. constructor() { }
  11. public getIncomingRequest(): Subject<any> {
  12. return this.incomingRequest
  13. }
  14. public getIncomingResponse(): Subject<any> {
  15. return this.incomingResponse
  16. }
  17. public async stopServer(serverUrl: string): Promise<any> {
  18. return new Promise((resolve, reject) => {
  19. if (this.grpcServerConnection[serverUrl]) {
  20. console.log(`Shutting down the gRPC server:${serverUrl} ...`);
  21. // this.grpcServerConnection[serverUrl].tryShutdown(() => {
  22. // console.log(`Server ${serverUrl} has been gracefully stopped.`);
  23. // resolve('')
  24. // })
  25. resolve(this.grpcServerConnection[serverUrl].forceShutdown())
  26. console.log(`Server ${serverUrl} is forced to shut down!`)
  27. // simply removing the reference to the GrpcService instance associated with the specific serverUrl from the grpcServerConnection object.
  28. // However, the gRPC server instance itself continues to run as long as it has not been explicitly shut down using methods like tryShutdown.
  29. console.log(`Deleting grpc connection instance:${serverUrl} .....`)
  30. delete this.grpcServerConnection[serverUrl];
  31. } else {
  32. console.log(`Server${serverUrl} is not running.`);
  33. reject()
  34. }
  35. })
  36. }
  37. public getAllGrpcServerConnectionInstance(): any {
  38. console.log(this.grpcServerConnection)
  39. return this.grpcServerConnection
  40. }
  41. // To be migrated into a service in the immediate future
  42. public async createGrpcInstance(serverUrl: string, messageToBePublished: Subject<MessageLog>, reportStatus: Subject<ReportStatus>, connectionType: GrpcConnectionType) {
  43. let messageToBeTransmitted: Subject<MessageLog> = messageToBePublished
  44. let statusControl: Subject<ReportStatus> = reportStatus
  45. let consecutiveResolutions = 0;
  46. let lastResolutionTime = Date.now();
  47. let alreadyHealthCheck: boolean = false
  48. let yellowErrorEmission: boolean = false
  49. let redErrorEmission: boolean = false
  50. while (true) {
  51. try {
  52. if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
  53. await this.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl);
  54. }
  55. if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') {
  56. await this.createServerStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl);
  57. }
  58. if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
  59. await this.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl)
  60. }
  61. if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') {
  62. await this.createServerStreamingServer(serverUrl, alreadyHealthCheck, messageToBePublished, statusControl)
  63. }
  64. // If connection resolves (indicating failure), increment the count
  65. consecutiveResolutions++;
  66. // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
  67. alreadyHealthCheck = true
  68. // If there are x consecutive resolutions, log an error and break the loop
  69. if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) {
  70. redErrorEmission = true
  71. console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
  72. let error: ReportStatus = {
  73. code: ColorCode.RED,
  74. message: 'Initiate Doomsday protocol....',
  75. from: `GRPC instance management`
  76. }
  77. statusControl.next(error)
  78. }
  79. if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
  80. yellowErrorEmission = true
  81. let error: ReportStatus = {
  82. code: ColorCode.YELLOW,
  83. // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
  84. message: `Attempting reconnection... Server has yet to respond`,
  85. from: `GRPC instance management`
  86. }
  87. statusControl.next(error);
  88. }
  89. } catch (error) {
  90. // Connection did not resolve, reset the count
  91. consecutiveResolutions = 0;
  92. console.error('Connection attempt failed:', error);
  93. }
  94. // Check for a pause of more than 3 seconds since the last resolution attempt
  95. const currentTime = Date.now();
  96. const timeSinceLastResolution = currentTime - lastResolutionTime;
  97. if (timeSinceLastResolution > 2000) {
  98. consecutiveResolutions = 0;
  99. yellowErrorEmission = false
  100. redErrorEmission = false
  101. alreadyHealthCheck = false
  102. }
  103. // Update the last resolution time
  104. lastResolutionTime = currentTime;
  105. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  106. // timeout generate message to trigger this reconnection
  107. }
  108. }
  109. private async createGrpcBidirectionalServer(serverUrl: string, messageToBeStream: Subject<any>, statusControl: Subject<ReportStatus>): Promise<any> { // '0.0.0.0:3001'
  110. return new Promise((resolve, reject) => {
  111. try {
  112. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  113. let server: grpc.Server = new grpc.Server();
  114. // Add the streamingData function to the gRPC service
  115. // Define your message_proto.Message service methods
  116. server.addService(message_proto.Message.service, {
  117. sendMessageStream: (call) => {
  118. console.log(`Client connected from: ${call.getPeer()}`);
  119. let report: ReportStatus = {
  120. code: ColorCode.GREEN,
  121. message: `Client connected!!`,
  122. from: `Bidirectional Instance`
  123. }
  124. statusControl.next(report)
  125. // Right now this is being broadcast.
  126. let subscription: Subscription = messageToBeStream.subscribe({
  127. next: (payload: any) => {
  128. let noConnection = call.cancelled // check connection for each and every message
  129. if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
  130. let report: ReportStatus = {
  131. code: ColorCode.YELLOW,
  132. message: `Client is not alive.....`,
  133. payload: payload,
  134. from: `Bidirectional Instance`
  135. }
  136. statusControl.next(report) // no connection. Tell buffer service to stop releasing messages
  137. subscription.unsubscribe() // i still dont understand why i wrote this here
  138. } else {
  139. console.log(`Sending ${payload.appData.msgId}`)
  140. let message: string = JSON.stringify(payload)
  141. call.write({ message })
  142. }
  143. },
  144. error: err => console.error(err),
  145. complete: () => { } //it will never complete
  146. })
  147. call.on('data', (data: any) => {
  148. // console.log(data) // it does return in string format
  149. let payload = JSON.parse(data.message)
  150. console.log(`Received Message from Client: ${payload.appData?.msgId}`);
  151. // Forward the received message to the RxJS subject
  152. // let respmsg: any = {
  153. // msgId: payload.appData?.msgId,
  154. // confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
  155. // }
  156. // let message: string = JSON.stringify(respmsg)
  157. // console.log(`Responding to client: ${respmsg.msgId}`);
  158. // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
  159. // call.write({ message });
  160. });
  161. call.on('end', () => {
  162. console.log('Client stream ended');
  163. // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
  164. });
  165. call.on('error', (err) => {
  166. // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
  167. // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
  168. // the call back function will be to write and then the client should response immediately through test
  169. });
  170. call.on('close', () => {
  171. console.log('Unknown cause for diconnectivity');
  172. // Handle client closure, which may be due to errors or manual termination
  173. });
  174. },
  175. Check: (_, callback) => {
  176. // health check logic here
  177. // for now it is just sending the status message over to tell the client it is alive
  178. // For simplicity, always return "SERVING" as status
  179. callback(null, { status: 'SERVING' });
  180. },
  181. });
  182. // Bind and start the server
  183. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  184. console.log(`gRPC server is running on ${serverUrl}`);
  185. server.start();
  186. });
  187. this.grpcServerConnection[serverUrl] = server
  188. }
  189. catch (error) {
  190. resolve(error)
  191. }
  192. })
  193. }
  194. private async createBidirectionalStreamingClient(server: string, alreadyHealthCheck: boolean, messageToBeTransmitted: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
  195. let subscription: any
  196. let unsubscribed: boolean = false
  197. return new Promise(async (resolve, reject) => {
  198. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  199. const call = client.sendMessageStream();
  200. this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
  201. call.on('status', (status: Status) => { // this is useless in streaming(on for unary)
  202. // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
  203. // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
  204. // if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
  205. // resolve('No connection established. Server is not responding..')
  206. // }
  207. });
  208. // All the grpc operations are here
  209. // Subscribe to the RxJS subject to send data to the server
  210. subscription = messageToBeTransmitted.subscribe({
  211. next: (payload: any) => {
  212. if (!unsubscribed) {
  213. console.log(`Sending ${payload.appData.msgId}`)
  214. let message: string = JSON.stringify(payload)
  215. call.write({ message })
  216. }
  217. },
  218. error: err => console.error(err),
  219. complete: () => { } //it will never complete
  220. });
  221. call.on('data', (data: any) => {
  222. let message = JSON.parse(data.message)
  223. console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
  224. });
  225. call.on('error', (err) => {
  226. // console.log(`Something wrong with RPC call...`)
  227. if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
  228. subscription.unsubscribe();
  229. unsubscribed = true;
  230. }
  231. resolve('Server Error');
  232. });
  233. call.on('end', () => {
  234. if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
  235. subscription.unsubscribe();
  236. unsubscribed = true;
  237. }
  238. resolve('Server Error');
  239. });
  240. })
  241. }
  242. private async createServerStreamingServer(serverUrl: string, alreadyHealthCheck: boolean, messageToBeStream: Subject<any>, statusControl: Subject<ReportStatus>): Promise<any> { // '0.0.0.0:3001'
  243. return new Promise((resolve, reject) => {
  244. try {
  245. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  246. let server: grpc.Server = new grpc.Server();
  247. // Add the streamingData function to the gRPC service
  248. // Define your message_proto.Message service methods
  249. server.addService(message_proto.Message.service, {
  250. HandleMessage: (call) => { // this is for bidirectional streaming. Need to have another one for unary calls for web clients
  251. let report: ReportStatus = { //let the flow come through
  252. code: ColorCode.GREEN,
  253. message: `Client connected!!`,
  254. from: `Server Streaming Instance`
  255. }
  256. statusControl.next(report)
  257. let request = call.request // unary request from client to be responded with a stream
  258. console.log(`Received unary call.... request: ${request.id}`)
  259. this.incomingRequest.next(request)
  260. console.log(`Client connected from: ${call.getPeer()}`);
  261. let subscription: Subscription = messageToBeStream.subscribe({
  262. next: (response: any) => {
  263. if (response.id == request.id) {
  264. // console.log(`${response.id} vs ${request.id}`)
  265. // Check who's response it belongs to
  266. let noConnection = call.cancelled // check connection for each and every message
  267. if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
  268. let report: ReportStatus = {
  269. code: ColorCode.YELLOW,
  270. message: `Client is not alive.....`,
  271. payload: response,
  272. from: `Server Streaming Instance`
  273. }
  274. statusControl.next(report)
  275. subscription.unsubscribe()
  276. } else {
  277. console.log(`Sending ${response.message.appData.msgId} in respond to unary ${request.id}`)
  278. // let respond: string = JSON.stringify(response.message)
  279. let message = {
  280. id: response.id,
  281. message: JSON.stringify(response.message)
  282. }
  283. // console.log(message)
  284. call.write(message)
  285. }
  286. }
  287. },
  288. error: err => {
  289. console.error(err)
  290. let report: ReportStatus = {
  291. code: ColorCode.YELLOW,
  292. message: `Message streaming error`,
  293. from: `Server Streaming Instance`
  294. }
  295. statusControl.next(report)
  296. subscription.unsubscribe()
  297. },
  298. complete: () => {
  299. console.log(`Stream response completed for ${request.id}`)
  300. subscription.unsubscribe()
  301. // call.end()
  302. }
  303. })
  304. },
  305. Check: (_, callback) => {
  306. // health check logic here
  307. // for now it is just sending the status message over to tell the client it is alive
  308. // For simplicity, always return "SERVING" as status
  309. callback(null, { status: 'SERVING' });
  310. },
  311. });
  312. // Bind and start the server
  313. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  314. console.log(`gRPC server is running on ${serverUrl}`);
  315. server.start();
  316. });
  317. this.grpcServerConnection[serverUrl] = server
  318. }
  319. catch (error) {
  320. resolve(error)
  321. }
  322. })
  323. }
  324. // Create a server streaming call. Please note that the structure of the code would not be the same as bidirectional because of it's unary nature
  325. private async createServerStreamingClient(server: string, alreadyHealthCheck: boolean, unaryRequestSubject: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
  326. return new Promise(async (resolve, reject) => {
  327. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  328. this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // perform check to see if server is alive, if not terminate this grpc instant and create again
  329. /* Memory leak some where here */
  330. unaryRequestSubject.subscribe({
  331. next: (request: any) => {
  332. let message = {
  333. id: request.appData?.msgId,
  334. message: JSON.stringify(request)
  335. }
  336. console.log(`<${message.id}> Sending request: ${message.id} over to server....`)
  337. const call = client.HandleMessage(message)
  338. call.on('status', (status: Status) => {
  339. // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
  340. // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
  341. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  342. console.log(`Message trasmission operation is successful`)
  343. // RPC completed successfully
  344. } if (status == grpc.status.UNAVAILABLE) {
  345. resolve('No connection established. Server is not responding..')
  346. let report = {
  347. code: ColorCode.YELLOW,
  348. message: `Server doesn't seem to be alive. Error returned.`,
  349. from: `Server Streaming Client Instance`
  350. }
  351. statusControl.next(report)
  352. }
  353. });
  354. call.on('data', (data: any) => {
  355. // console.log(`Received stream response from Server. Receiver: ${message.id}`);
  356. let response = {
  357. id: data.id,
  358. message: JSON.parse(data.message)
  359. }
  360. console.log(response)
  361. this.incomingResponse.next(response)
  362. });
  363. call.on('error', (err) => {
  364. let report = {
  365. code: ColorCode.YELLOW,
  366. message: `Server doesn't seem to be alive. Error returned.`,
  367. from: `Server Streaming Client Instance`
  368. }
  369. statusControl.next(report)
  370. // resolve(err)
  371. });
  372. call.on('end', () => { // this is for gracefull || willfull termination from the server
  373. console.log(`Streaming Response is completed`)
  374. let report = {
  375. code: ColorCode.YELLOW,
  376. message: `Server doesn't seem to be alive. Error returned.`,
  377. from: `Server Streaming Client Instance`
  378. }
  379. statusControl.next(report)
  380. // subscription.unsubscribe(); // this is not correct i am just destroying the entire operation. i should be terminating the instance to which i think it does by it self
  381. // resolve('Server Error');
  382. });
  383. /* Avoid rsolving at the moment. Because initially it was intended for the bi directional streaming to continue to instantiate the client
  384. should there be any rpc errors or internet connection errors. In this case, we just want to listen to incoming unary call without terminating the session
  385. A separate resolve will be prepared for the subject should it fails in its operation */
  386. },
  387. error: error => {
  388. console.error(error),
  389. resolve(error)
  390. },
  391. complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
  392. })
  393. })
  394. }
  395. // Check connection To be Update. This function is destroying my code flow
  396. private async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
  397. return new Promise((resolve, reject) => {
  398. client.Check({}, (error, response) => {
  399. if (response) {
  400. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  401. let report: ReportStatus = {
  402. code: ColorCode.GREEN,
  403. message: `Good to go!!!`,
  404. from: `GRPC health check`
  405. }
  406. statusControl.next(report)
  407. } else {
  408. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  409. }
  410. })
  411. })
  412. }
  413. }