grpc.service.ts 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Subject, Subscription, take, takeUntil } from 'rxjs';
  3. import { ColorCode, GrpcConnectionType, MessageLog, ReportStatus } from '../interfaces/general.interface';
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. const message_proto = require('./protos/server.proto')
  6. export class GrpcService {
  7. private grpcServerConnection: any = {}
  8. constructor() { }
  9. public async stopServer(serverUrl: string): Promise<any> {
  10. return new Promise((resolve, reject) => {
  11. if (this.grpcServerConnection[serverUrl]) {
  12. console.log(`Shutting down the gRPC server:${serverUrl} ...`);
  13. // this.grpcServerConnection[serverUrl].tryShutdown(() => {
  14. // console.log(`Server ${serverUrl} has been gracefully stopped.`);
  15. // resolve('')
  16. // })
  17. resolve(this.grpcServerConnection[serverUrl].forceShutdown())
  18. console.log(`Server ${serverUrl} is forced to shut down!`)
  19. // simply removing the reference to the GrpcService instance associated with the specific serverUrl from the grpcServerConnection object.
  20. // However, the gRPC server instance itself continues to run as long as it has not been explicitly shut down using methods like tryShutdown.
  21. console.log(`Deleting grpc connection instance:${serverUrl} .....`)
  22. delete this.grpcServerConnection[serverUrl];
  23. } else {
  24. console.log(`Server${serverUrl} is not running.`);
  25. reject()
  26. }
  27. })
  28. }
  29. public getAllGrpcServerConnectionInstance(): any {
  30. console.log(this.grpcServerConnection)
  31. return this.grpcServerConnection
  32. }
  33. // To be migrated into a service in the immediate future
  34. public async createGrpcInstance(serverUrl: string, messageToBePublished: Subject<MessageLog>, reportStatus: Subject<ReportStatus>, connectionType: GrpcConnectionType) {
  35. let messageToBeTransmitted: Subject<MessageLog> = messageToBePublished
  36. let statusControl: Subject<ReportStatus> = reportStatus
  37. let consecutiveResolutions = 0;
  38. let lastResolutionTime = Date.now();
  39. let alreadyHealthCheck: boolean = false
  40. let yellowErrorEmission: boolean = false
  41. let redErrorEmission: boolean = false
  42. while (true) {
  43. try {
  44. if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
  45. await this.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl);
  46. }
  47. if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') {
  48. await this.createServerStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl);
  49. }
  50. if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
  51. await this.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl)
  52. }
  53. if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') {
  54. await this.createServerStreamingServer(serverUrl, alreadyHealthCheck, messageToBePublished, statusControl)
  55. }
  56. // If connection resolves (indicating failure), increment the count
  57. consecutiveResolutions++;
  58. // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
  59. alreadyHealthCheck = true
  60. // If there are x consecutive resolutions, log an error and break the loop
  61. if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) {
  62. redErrorEmission = true
  63. console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
  64. let error: ReportStatus = {
  65. code: ColorCode.RED,
  66. message: 'Initiate Doomsday protocol....'
  67. }
  68. statusControl.next(error)
  69. }
  70. if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
  71. yellowErrorEmission = true
  72. let error: ReportStatus = {
  73. code: ColorCode.YELLOW,
  74. // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
  75. message: `Attempting reconnection... Server has yet to respond`
  76. }
  77. statusControl.next(error);
  78. }
  79. } catch (error) {
  80. // Connection did not resolve, reset the count
  81. consecutiveResolutions = 0;
  82. console.error('Connection attempt failed:', error);
  83. }
  84. // Check for a pause of more than 3 seconds since the last resolution attempt
  85. const currentTime = Date.now();
  86. const timeSinceLastResolution = currentTime - lastResolutionTime;
  87. if (timeSinceLastResolution > 2000) {
  88. consecutiveResolutions = 0;
  89. yellowErrorEmission = false
  90. redErrorEmission = false
  91. alreadyHealthCheck = false
  92. }
  93. // Update the last resolution time
  94. lastResolutionTime = currentTime;
  95. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  96. // timeout generate message to trigger this reconnection
  97. }
  98. }
  99. private async createGrpcBidirectionalServer(serverUrl: string, messageToBeStream: Subject<any>, statusControl: Subject<ReportStatus>): Promise<any> { // '0.0.0.0:3001'
  100. return new Promise((resolve, reject) => {
  101. try {
  102. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  103. let server: grpc.Server = new grpc.Server();
  104. // Add the streamingData function to the gRPC service
  105. // Define your message_proto.Message service methods
  106. server.addService(message_proto.Message.service, {
  107. sendMessageStream: (call) => {
  108. console.log(`Client connected from: ${call.getPeer()}`);
  109. let report: ReportStatus = {
  110. code: ColorCode.GREEN,
  111. message: `Client connected!!`
  112. }
  113. statusControl.next(report)
  114. // Right now this is being broadcast.
  115. let subscription: Subscription = messageToBeStream.subscribe({
  116. next: (payload: any) => {
  117. let noConnection = call.cancelled // check connection for each and every message
  118. if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
  119. let report: ReportStatus = {
  120. code: ColorCode.YELLOW,
  121. message: `Client is not alive.....`,
  122. payload: payload
  123. }
  124. statusControl.next(report) // no connection. Tell buffer service to stop releasing messages
  125. subscription.unsubscribe() // i still dont understand why i wrote this here
  126. } else {
  127. console.log(`Sending ${payload.appData.msgId}`)
  128. let message: string = JSON.stringify(payload)
  129. call.write({ message })
  130. }
  131. },
  132. error: err => console.error(err),
  133. complete: () => { } //it will never complete
  134. })
  135. call.on('data', (data: any) => {
  136. // console.log(data) // it does return in string format
  137. let payload = JSON.parse(data.message)
  138. console.log(`Received Message from Client: ${payload.appData?.msgId}`);
  139. // Forward the received message to the RxJS subject
  140. // let respmsg: any = {
  141. // msgId: payload.appData?.msgId,
  142. // confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
  143. // }
  144. // let message: string = JSON.stringify(respmsg)
  145. // console.log(`Responding to client: ${respmsg.msgId}`);
  146. // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
  147. // call.write({ message });
  148. });
  149. call.on('end', () => {
  150. console.log('Client stream ended');
  151. // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
  152. });
  153. call.on('error', (err) => {
  154. // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
  155. // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
  156. // the call back function will be to write and then the client should response immediately through test
  157. });
  158. call.on('close', () => {
  159. console.log('Unknown cause for diconnectivity');
  160. // Handle client closure, which may be due to errors or manual termination
  161. });
  162. },
  163. Check: (_, callback) => {
  164. // health check logic here
  165. // for now it is just sending the status message over to tell the client it is alive
  166. // For simplicity, always return "SERVING" as status
  167. callback(null, { status: 'SERVING' });
  168. },
  169. });
  170. // Bind and start the server
  171. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  172. console.log(`gRPC server is running on ${serverUrl}`);
  173. server.start();
  174. });
  175. this.grpcServerConnection[serverUrl] = server
  176. }
  177. catch (error) {
  178. resolve(error)
  179. }
  180. })
  181. }
  182. private async createBidirectionalStreamingClient(server: string, alreadyHealthCheck: boolean, messageToBeTransmitted: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
  183. let subscription: any
  184. let unsubscribed: boolean = false
  185. return new Promise(async (resolve, reject) => {
  186. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  187. const call = client.sendMessageStream();
  188. this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
  189. call.on('status', (status: Status) => { // this is useless in streaming(on for unary)
  190. // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
  191. // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
  192. // if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
  193. // resolve('No connection established. Server is not responding..')
  194. // }
  195. });
  196. // All the grpc operations are here
  197. // Subscribe to the RxJS subject to send data to the server
  198. subscription = messageToBeTransmitted.subscribe({
  199. next: (payload: any) => {
  200. if (!unsubscribed) {
  201. console.log(`Sending ${payload.appData.msgId}`)
  202. let message: string = JSON.stringify(payload)
  203. call.write({ message })
  204. }
  205. },
  206. error: err => console.error(err),
  207. complete: () => { } //it will never complete
  208. });
  209. call.on('data', (data: any) => {
  210. let message = JSON.parse(data.message)
  211. console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
  212. });
  213. call.on('error', (err) => {
  214. // console.log(`Something wrong with RPC call...`)
  215. if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
  216. subscription.unsubscribe();
  217. unsubscribed = true;
  218. }
  219. resolve('Server Error');
  220. });
  221. call.on('end', () => {
  222. if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
  223. subscription.unsubscribe();
  224. unsubscribed = true;
  225. }
  226. resolve('Server Error');
  227. });
  228. })
  229. }
  230. private async createServerStreamingServer(serverUrl: string, alreadyHealthCheck: boolean, messageToBeStream: Subject<any>, statusControl: Subject<ReportStatus>): Promise<any> { // '0.0.0.0:3001'
  231. return new Promise((resolve, reject) => {
  232. try {
  233. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  234. let server: grpc.Server = new grpc.Server();
  235. // Add the streamingData function to the gRPC service
  236. // Define your message_proto.Message service methods
  237. server.addService(message_proto.Message.service, {
  238. HandleMessage: (call) => { // this is for bidirectional streaming. Need to have another one for unary calls for web clients
  239. let requestId = call.request // unary request from client to be responded with a stream
  240. console.log(`Processing unary call.... requestId: ${requestId.id}`)
  241. console.log(`Client connected from: ${call.getPeer()}`);
  242. let report: ReportStatus = { //let the flow come through
  243. code: ColorCode.GREEN,
  244. message: `Client connected!!`
  245. }
  246. statusControl.next(report)
  247. let subscription: Subscription = messageToBeStream.pipe(take(15)).subscribe({
  248. next: (payload: any) => {
  249. let noConnection = call.cancelled // check connection for each and every message
  250. if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
  251. let report: ReportStatus = {
  252. code: ColorCode.YELLOW,
  253. message: `Client is not alive.....`,
  254. payload: payload
  255. }
  256. statusControl.next(report)
  257. subscription.unsubscribe()
  258. } else {
  259. console.log(`Sending ${payload.appData.msgId} in respond to unary ${requestId.id}`)
  260. let message: string = JSON.stringify(payload)
  261. call.write({ message })
  262. }
  263. },
  264. error: err => {
  265. console.error(err)
  266. let report: ReportStatus = {
  267. code: ColorCode.YELLOW,
  268. message: `Message streaming error`
  269. }
  270. statusControl.next(report)
  271. subscription.unsubscribe()
  272. },
  273. complete: () => {
  274. console.log(`Stream response completed for ${requestId.id}`)
  275. subscription.unsubscribe()
  276. // call.end()
  277. }
  278. })
  279. },
  280. Check: (_, callback) => {
  281. // health check logic here
  282. // for now it is just sending the status message over to tell the client it is alive
  283. // For simplicity, always return "SERVING" as status
  284. callback(null, { status: 'SERVING' });
  285. },
  286. });
  287. // Bind and start the server
  288. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  289. console.log(`gRPC server is running on ${serverUrl}`);
  290. server.start();
  291. });
  292. this.grpcServerConnection[serverUrl] = server
  293. }
  294. catch (error) {
  295. resolve(error)
  296. }
  297. })
  298. }
  299. // Create a server streaming call. Please note that the structure of the code would not be the same as bidirectional because of it's unary nature
  300. private async createServerStreamingClient(server: string, alreadyHealthCheck: boolean, unaryRequestSubject: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
  301. return new Promise(async (resolve, reject) => {
  302. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  303. this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // atcually there's no need for this
  304. unaryRequestSubject.subscribe({
  305. next: (request: any) => {
  306. let message = {
  307. id: request.appData?.msgId,
  308. message: JSON.stringify(request)
  309. }
  310. console.log(`<${message.id}> Sending request: ${message.id} over to server....`)
  311. const call = client.HandleMessage(message)
  312. call.on('status', (status: Status) => {
  313. // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
  314. // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
  315. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  316. console.log(`Message trasmission operation is successful`)
  317. // RPC completed successfully
  318. } if (status == grpc.status.UNAVAILABLE) {
  319. resolve('No connection established. Server is not responding..')
  320. let report = {
  321. code: ColorCode.YELLOW,
  322. message: `Server doesn't seem to be alive. Error returned.`
  323. }
  324. statusControl.next(report)
  325. }
  326. });
  327. call.on('data', (data: any) => {
  328. console.log(`Received stream response from Server. Receiver: ${message.id}`);
  329. });
  330. call.on('error', (err) => {
  331. let report = {
  332. code: ColorCode.YELLOW,
  333. message: `Server doesn't seem to be alive. Error returned.`
  334. }
  335. statusControl.next(report)
  336. // resolve(err)
  337. });
  338. call.on('end', () => { // this is for gracefull || willfull termination from the server
  339. console.log(`Streaming Response is completed`)
  340. let report = {
  341. code: ColorCode.YELLOW,
  342. message: `Server doesn't seem to be alive. Error returned.`
  343. }
  344. statusControl.next(report)
  345. // subscription.unsubscribe(); // this is not correct i am just destroying the entire operation. i should be terminating the instance to which i think it does by it self
  346. // resolve('Server Error');
  347. });
  348. /* Avoid rsolving at the moment. Because initially it was intended for the bi directional streaming to continue to instantiate the client
  349. should there be any rpc errors or internet connection errors. In this case, we just want to listen to incoming unary call without terminating the session
  350. A separate resolve will be prepared for the subject should it fails in its operation */
  351. },
  352. error: error => {
  353. console.error(error),
  354. resolve(error)
  355. },
  356. complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
  357. })
  358. })
  359. }
  360. // Check connection To be Update. This function is destroying my code flow
  361. private async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
  362. return new Promise((resolve, reject) => {
  363. client.Check({}, (error, response) => {
  364. if (response) {
  365. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  366. let report: ReportStatus = {
  367. code: ColorCode.GREEN,
  368. message: `Good to go!!!`
  369. }
  370. statusControl.next(report)
  371. } else {
  372. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  373. }
  374. })
  375. })
  376. }
  377. }