grpc.service.ts 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Subject, Subscription, take, takeUntil } from 'rxjs';
  3. import { ColorCode, GrpcConnectionType, MessageLog, ReportStatus } from '../interfaces/general.interface';
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. const message_proto = require('./protos/server.proto')
  6. export class GrpcService {
  7. private grpcServerConnection: any = {}
  8. constructor() { }
  9. public async stopServer(serverUrl: string): Promise<any> {
  10. return new Promise((resolve, reject) => {
  11. if (this.grpcServerConnection[serverUrl]) {
  12. console.log(`Shutting down the gRPC server:${serverUrl} ...`);
  13. // this.grpcServerConnection[serverUrl].tryShutdown(() => {
  14. // console.log(`Server ${serverUrl} has been gracefully stopped.`);
  15. // resolve('')
  16. // })
  17. resolve(this.grpcServerConnection[serverUrl].forceShutdown())
  18. console.log(`Server ${serverUrl} is forced to shut down!`)
  19. // simply removing the reference to the GrpcService instance associated with the specific serverUrl from the grpcServerConnection object.
  20. // However, the gRPC server instance itself continues to run as long as it has not been explicitly shut down using methods like tryShutdown.
  21. console.log(`Deleting grpc connection instance:${serverUrl} .....`)
  22. delete this.grpcServerConnection[serverUrl];
  23. } else {
  24. console.log(`Server${serverUrl} is not running.`);
  25. reject()
  26. }
  27. })
  28. }
  29. public getAllGrpcServerConnectionInstance(): any {
  30. console.log(this.grpcServerConnection)
  31. return this.grpcServerConnection
  32. }
  33. // To be migrated into a service in the immediate future
  34. public async createGrpcInstance(serverUrl: string, messageToBePublished: Subject<MessageLog>, reportStatus: Subject<ReportStatus>, connectionType: GrpcConnectionType) {
  35. let messageToBeTransmitted: Subject<MessageLog> = messageToBePublished
  36. let statusControl: Subject<ReportStatus> = reportStatus
  37. let consecutiveResolutions = 0;
  38. let lastResolutionTime = Date.now();
  39. let alreadyHealthCheck: boolean = false
  40. let yellowErrorEmission: boolean = false
  41. let redErrorEmission: boolean = false
  42. while (true) {
  43. try {
  44. if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
  45. await this.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl);
  46. }
  47. if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') {
  48. await this.createServerStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl);
  49. }
  50. if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
  51. await this.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl)
  52. }
  53. if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') {
  54. await this.createServerStreamingServer(serverUrl, alreadyHealthCheck, messageToBePublished, statusControl)
  55. }
  56. // If connection resolves (indicating failure), increment the count
  57. consecutiveResolutions++;
  58. // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
  59. alreadyHealthCheck = true
  60. // If there are x consecutive resolutions, log an error and break the loop
  61. if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) {
  62. redErrorEmission = true
  63. console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
  64. let error: ReportStatus = {
  65. code: ColorCode.RED,
  66. message: 'Initiate Doomsday protocol....'
  67. }
  68. statusControl.next(error)
  69. }
  70. if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
  71. yellowErrorEmission = true
  72. let error: ReportStatus = {
  73. code: ColorCode.YELLOW,
  74. // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
  75. message: `Attempting reconnection... Server has yet to respond`
  76. }
  77. statusControl.next(error);
  78. }
  79. } catch (error) {
  80. // Connection did not resolve, reset the count
  81. consecutiveResolutions = 0;
  82. console.error('Connection attempt failed:', error);
  83. }
  84. // Check for a pause of more than 3 seconds since the last resolution attempt
  85. const currentTime = Date.now();
  86. const timeSinceLastResolution = currentTime - lastResolutionTime;
  87. if (timeSinceLastResolution > 2000) {
  88. consecutiveResolutions = 0;
  89. yellowErrorEmission = false
  90. redErrorEmission = false
  91. alreadyHealthCheck = false
  92. }
  93. // Update the last resolution time
  94. lastResolutionTime = currentTime;
  95. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  96. // timeout generate message to trigger this reconnection
  97. }
  98. }
  99. private async createGrpcBidirectionalServer(serverUrl: string, messageToBeStream: Subject<any>, statusControl: Subject<ReportStatus>): Promise<any> { // '0.0.0.0:3001'
  100. return new Promise((resolve, reject) => {
  101. try {
  102. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  103. let server: grpc.Server = new grpc.Server();
  104. // Add the streamingData function to the gRPC service
  105. // Define your message_proto.Message service methods
  106. server.addService(message_proto.Message.service, {
  107. sendMessageStream: (call) => {
  108. console.log(`Client connected from: ${call.getPeer()}`);
  109. let report: ReportStatus = {
  110. code: ColorCode.GREEN,
  111. message: `Client connected!!`
  112. }
  113. statusControl.next(report)
  114. // Right now this is being broadcast.
  115. let subscription: Subscription = messageToBeStream.subscribe({
  116. next: (payload: any) => {
  117. let noConnection = call.cancelled // check connection for each and every message
  118. if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
  119. let report: ReportStatus = {
  120. code: ColorCode.YELLOW,
  121. message: `Client is not alive.....`,
  122. payload: payload
  123. }
  124. statusControl.next(report) // no connection. Tell buffer service to stop releasing messages
  125. subscription.unsubscribe() // i still dont understand why i wrote this here
  126. } else {
  127. console.log(`Sending ${payload.appData.msgId}`)
  128. let message: string = JSON.stringify(payload)
  129. call.write({ message })
  130. }
  131. },
  132. error: err => console.error(err),
  133. complete: () => { } //it will never complete
  134. })
  135. call.on('data', (data: any) => {
  136. // console.log(data) // it does return in string format
  137. let payload = JSON.parse(data.message)
  138. console.log(`Received Message from Client: ${payload.appData?.msgId}`);
  139. // Forward the received message to the RxJS subject
  140. // let respmsg: any = {
  141. // msgId: payload.appData?.msgId,
  142. // confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
  143. // }
  144. // let message: string = JSON.stringify(respmsg)
  145. // console.log(`Responding to client: ${respmsg.msgId}`);
  146. // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
  147. // call.write({ message });
  148. });
  149. call.on('end', () => {
  150. console.log('Client stream ended');
  151. // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
  152. });
  153. call.on('error', (err) => {
  154. // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
  155. // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
  156. // the call back function will be to write and then the client should response immediately through test
  157. });
  158. call.on('close', () => {
  159. console.log('Unknown cause for diconnectivity');
  160. // Handle client closure, which may be due to errors or manual termination
  161. });
  162. },
  163. Check: (_, callback) => {
  164. // health check logic here
  165. // for now it is just sending the status message over to tell the client it is alive
  166. // For simplicity, always return "SERVING" as status
  167. callback(null, { status: 'SERVING' });
  168. },
  169. });
  170. // Bind and start the server
  171. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  172. console.log(`gRPC server is running on ${serverUrl}`);
  173. server.start();
  174. });
  175. this.grpcServerConnection[serverUrl] = server
  176. }
  177. catch (error) {
  178. resolve(error)
  179. }
  180. })
  181. }
  182. private async createBidirectionalStreamingClient(server: string, alreadyHealthCheck: boolean, messageToBeTransmitted: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
  183. let subscription: any
  184. let unsubscribed: boolean = false
  185. return new Promise(async (resolve, reject) => {
  186. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  187. const call = client.sendMessageStream();
  188. this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
  189. call.on('status', (status: Status) => {
  190. // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
  191. // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
  192. if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
  193. resolve('No connection established. Server is not responding..')
  194. }
  195. });
  196. // All the grpc operations are here
  197. // Subscribe to the RxJS subject to send data to the server
  198. subscription = messageToBeTransmitted.subscribe({
  199. next: (payload: any) => {
  200. if (!unsubscribed) {
  201. console.log(`Sending ${payload.appData.msgId}`)
  202. let message: string = JSON.stringify(payload)
  203. call.write({ message })
  204. }
  205. },
  206. error: err => console.error(err),
  207. complete: () => { } //it will never complete
  208. });
  209. call.on('data', (data: any) => {
  210. let message = JSON.parse(data.message)
  211. console.log(`Received acknowledgement from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
  212. });
  213. call.on('error', (err) => {
  214. console.log(`Something wrong with RPC call...`)
  215. if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
  216. subscription.unsubscribe();
  217. unsubscribed = true;
  218. }
  219. resolve('Server Error');
  220. });
  221. call.on('end', () => {
  222. if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
  223. subscription.unsubscribe();
  224. unsubscribed = true;
  225. }
  226. resolve('Server Error');
  227. });
  228. })
  229. }
  230. private async createServerStreamingServer(serverUrl: string, alreadyHealthCheck: boolean, messageToBeStream: Subject<any>, statusControl: Subject<ReportStatus>): Promise<any> { // '0.0.0.0:3001'
  231. return new Promise((resolve, reject) => {
  232. try {
  233. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  234. let server: grpc.Server = new grpc.Server();
  235. let onHold: any
  236. // Add the streamingData function to the gRPC service
  237. // Define your message_proto.Message service methods
  238. server.addService(message_proto.Message.service, {
  239. HandleMessage: (call) => { // this is for bidirectional streaming. Need to have another one for unary calls for web clients
  240. console.log(`Client connected from: ${call.getPeer()}`);
  241. // let request = call.request // just putting it here to verify unary call request
  242. let report: ReportStatus = {
  243. code: ColorCode.GREEN,
  244. message: `Client connected!!`
  245. }
  246. statusControl.next(report)
  247. let subscription: Subscription = messageToBeStream.subscribe({
  248. next: (payload: any) => {
  249. let noConnection = call.cancelled // check connection for each and every message
  250. if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
  251. let report: ReportStatus = {
  252. code: ColorCode.YELLOW,
  253. message: `Client is not alive.....`,
  254. payload: payload
  255. }
  256. statusControl.next(report)
  257. subscription.unsubscribe()
  258. } else {
  259. console.log(`Sending ${payload.appData.msgId}`)
  260. let message: string = JSON.stringify(payload)
  261. call.write({ message })
  262. // onHold = null
  263. }
  264. },
  265. error: err => {
  266. console.error(err)
  267. let report: ReportStatus = {
  268. code: ColorCode.YELLOW,
  269. message: `Message streaming error`
  270. }
  271. statusControl.next(report)
  272. },
  273. complete: () => console.log(``) //it will never complete
  274. })
  275. call.on('data', (data: any) => {
  276. // console.log(data) // it does return in string format
  277. let payload = JSON.parse(data.message)
  278. console.log(data)
  279. // console.log(`Received Message from Client: ${payload.appData?.msgId}`);
  280. // Forward the received message to the RxJS subject
  281. let respmsg: any = {
  282. msgId: payload.appData?.msgId,
  283. confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
  284. }
  285. let message: string = JSON.stringify(respmsg)
  286. console.log(`Responding to client: ${respmsg.msgId}`);
  287. // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
  288. call.write({ message });
  289. });
  290. call.on('end', () => {
  291. console.log('Client stream ended');
  292. // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
  293. });
  294. call.on('error', (err) => {
  295. // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
  296. // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
  297. // the call back function will be to write and then the client should response immediately through test
  298. });
  299. call.on('close', () => {
  300. console.log('Unknown cause for diconnectivity');
  301. // Handle client closure, which may be due to errors or manual termination
  302. });
  303. },
  304. Check: (_, callback) => {
  305. // health check logic here
  306. // for now it is just sending the status message over to tell the client it is alive
  307. // For simplicity, always return "SERVING" as status
  308. callback(null, { status: 'SERVING' });
  309. },
  310. });
  311. // Bind and start the server
  312. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  313. console.log(`gRPC server is running on ${serverUrl}`);
  314. server.start();
  315. });
  316. this.grpcServerConnection[serverUrl] = server
  317. }
  318. catch (error) {
  319. resolve(error)
  320. }
  321. })
  322. }
  323. // Create a server streaming call. Please note that the structure of the code would not be the same as bidirectional because of it's unary nature
  324. private async createServerStreamingClient(server: string, alreadyHealthCheck: boolean, unaryRequestSubject: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
  325. return new Promise(async (resolve, reject) => {
  326. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  327. this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // atcually there's no need for this
  328. unaryRequestSubject.subscribe({
  329. next: (request: any) => {
  330. let message = {
  331. id: '123',
  332. message: JSON.stringify(request)
  333. }
  334. console.log(`Sending request: ${message.id} over to server....`)
  335. const call = client.HandleMessage(message)
  336. call.on('status', (status: Status) => {
  337. // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
  338. // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
  339. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  340. console.log(`Message trasmission operation is successful`)
  341. // RPC completed successfully
  342. } if (status == grpc.status.UNAVAILABLE) {
  343. resolve('No connection established. Server is not responding..')
  344. let report = {
  345. code: ColorCode.YELLOW,
  346. message: `Server doesn't seem to be alive. Error returned.`
  347. }
  348. statusControl.next(report)
  349. }
  350. });
  351. call.on('data', (data: any) => {
  352. let message = JSON.parse(data.message)
  353. console.log(`Received data from Server: ${message.appData?.msgId ?? `Invalid`}`);
  354. });
  355. call.on('error', (err) => {
  356. let report = {
  357. code: ColorCode.YELLOW,
  358. message: `Server doesn't seem to be alive. Error returned.`
  359. }
  360. statusControl.next(report)
  361. // resolve(err)
  362. });
  363. call.on('end', () => { // this is for gracefull || willfull termination from the server
  364. let report = {
  365. code: ColorCode.YELLOW,
  366. message: `Server doesn't seem to be alive. Error returned.`
  367. }
  368. statusControl.next(report)
  369. // subscription.unsubscribe(); // this is not correct i am just destroying the entire operation. i should be terminating the instance to which i think it does by it self
  370. // resolve('Server Error');
  371. });
  372. /* Avoid rsolving at the moment. Because initially it was intended for the bi directional streaming to continue to instantiate the client
  373. should there be any rpc errors or internet connection errors. In this case, we just want to listen to incoming unary call without terminating the session
  374. A separate resolve will be prepared for the subject should it fails in its operation */
  375. },
  376. error: error => {
  377. console.error(error),
  378. resolve(error)
  379. },
  380. complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
  381. })
  382. })
  383. }
  384. // Check connection To be Update. This function is destroying my code flow
  385. private async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
  386. return new Promise((resolve, reject) => {
  387. client.Check({}, (error, response) => {
  388. if (response) {
  389. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  390. let report: ReportStatus = {
  391. code: ColorCode.GREEN,
  392. message: `Good to go!!!`
  393. }
  394. statusControl.next(report)
  395. } else {
  396. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  397. }
  398. })
  399. })
  400. }
  401. }