service.method.ts 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Subject, Subscription } from "rxjs";
  3. import { ReportStatus, ColorCode, GrpcMessage, MessageLog } from "../interfaces/general.interface";
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. const message_proto = require('./protos/server.proto')
  6. export class GrpcServiceMethod {
  7. public async createServerStreamingServer(
  8. serverUrl: string,
  9. alreadyHealthCheck: boolean,
  10. messageToBeStream: Subject<any>,
  11. statusControl: Subject<ReportStatus>,
  12. grpcServerConnection: any,
  13. incomingRequest: Subject<GrpcMessage>
  14. ): Promise<any> { // '0.0.0.0:3001'
  15. return new Promise((resolve, reject) => {
  16. try {
  17. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  18. let server: grpc.Server = new grpc.Server();
  19. // Add the streamingData function to the gRPC service
  20. // Define your message_proto.Message service methods
  21. server.addService(message_proto.Message.service, {
  22. HandleMessage: (call) => {
  23. incomingRequest.next(call.request)
  24. console.log(call.request)
  25. console.log(`Intializing main stream response. Confirmation from ${call.request.id}`)
  26. // This will be the main channel for streaming them response messages
  27. let report: ReportStatus = { //let the flow come through
  28. code: ColorCode.GREEN,
  29. message: `Client connected!!`,
  30. from: `Server Streaming Instance`
  31. }
  32. statusControl.next(report)
  33. let subscription: Subscription = messageToBeStream.subscribe({
  34. next: (response: GrpcMessage) => {
  35. // console.log(`${response.id} vs ${request.id}`)
  36. // Check who's response it belongs to
  37. let noConnection = call.cancelled // check connection for each and every message
  38. if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
  39. let report: ReportStatus = {
  40. code: ColorCode.YELLOW,
  41. message: `Client is not alive.....`,
  42. payload: response,
  43. from: `Server Streaming Instance`
  44. }
  45. statusControl.next(report)
  46. subscription.unsubscribe()
  47. } else {
  48. console.log(`Sending ${(response.message as MessageLog).appData.msgId} in respond to request: ${call.request.id}`)
  49. let message = {
  50. id: response.id,
  51. message: JSON.stringify(response.message)
  52. }
  53. call.write(message)
  54. }
  55. },
  56. error: err => {
  57. console.error(err)
  58. let report: ReportStatus = {
  59. code: ColorCode.YELLOW,
  60. message: `Message streaming error`,
  61. from: `Server Streaming Instance`
  62. }
  63. statusControl.next(report)
  64. subscription.unsubscribe()
  65. },
  66. complete: () => {
  67. console.log(`Stream response completed for ${call.request.id}`)
  68. subscription.unsubscribe()
  69. // call.end()
  70. }
  71. })
  72. if (call.request.id != '0000') {
  73. console.log(call.request)
  74. /* Case from handling incoming request from clients. This no longer takes into consideration where the request is coming
  75. from. If the client is subscribed to the server, it will receive it's due. */
  76. // console.log(`Client connected from: ${call.getPeer()}`);
  77. let request = call.request // unary request from client to be responded with a stream
  78. console.log(`Received unary call.... request: ${request.id}`)
  79. call.cancel()
  80. }
  81. },
  82. Check: (_, callback) => {
  83. // health check logic here
  84. // for now it is just sending the status message over to tell the client it is alive
  85. // For simplicity, always return "SERVING" as status
  86. callback(null, { status: 'SERVING' });
  87. },
  88. });
  89. // Bind and start the server
  90. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  91. console.log(`gRPC server is running on ${serverUrl}`);
  92. server.start();
  93. });
  94. grpcServerConnection[serverUrl] = server
  95. }
  96. catch (error) {
  97. resolve(error)
  98. }
  99. })
  100. }
  101. // Create a server streaming call. Please note that the structure of the code would not be the same as bidirectional because of it's unary nature
  102. public async createServerStreamingClient(
  103. server: string,
  104. alreadyHealthCheck: boolean,
  105. unaryRequestSubject: Subject<any>,
  106. statusControl: Subject<ReportStatus>,
  107. incomingResponse: Subject<GrpcMessage>
  108. ): Promise<string> {
  109. return new Promise(async (resolve, reject) => {
  110. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  111. unaryRequestSubject.subscribe({
  112. next: (request: any) => {
  113. let message = {
  114. id: request.id,
  115. message: JSON.stringify(request.message)
  116. }
  117. console.log(message)
  118. console.log(`Sending request: ${message.id} over to server....`)
  119. let call = client.HandleMessage(message)
  120. call.on('status', (status: Status) => {
  121. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  122. console.log(`Message trasmission operation is successful`)
  123. // RPC completed successfully
  124. } if (status == grpc.status.UNAVAILABLE) {
  125. resolve('No connection established. Server is not responding..')
  126. let report = {
  127. code: ColorCode.YELLOW,
  128. message: `Server doesn't seem to be alive. Error returned.`,
  129. payload: request,
  130. from: `Server Streaming Client Instance`
  131. }
  132. statusControl.next(report)
  133. }
  134. });
  135. call.on('data', (data: any) => {
  136. let response: GrpcMessage = {
  137. id: data.id,
  138. message: JSON.parse(data.message)
  139. }
  140. incomingResponse.next(response)
  141. console.log((response.message as MessageLog).appData.msgId)
  142. });
  143. call.on('error', (err) => {
  144. });
  145. call.on('end', () => { // this is for gracefull || willfull termination from the server
  146. console.log(`Terminating Stream Request. Directing response to main channel`)
  147. });
  148. },
  149. error: error => {
  150. console.error(error),
  151. resolve(error)
  152. },
  153. complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
  154. })
  155. unaryRequestSubject.next({ id: `0000`, message: `Intiate Main Stream Channel Response` })
  156. this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // perform check to see if server is alive, if not terminate this grpc instant and create again
  157. // initiate(statusControl, incomingResponse).then(() => {
  158. // streamRequest(unaryRequestSubject, statusControl)
  159. // }).catch(() => {
  160. // resolve('Trigger Reconnection logic. Terminate this client instance and creating new ones')
  161. // })
  162. // async function intialize(statusControl: Subject<any>, incomingResponse: Subject<any>) {
  163. // async function initiate(statusControl: Subject<ReportStatus>, incomingResponse: Subject<any>) {
  164. // let greenlight: ReportStatus = {
  165. // code: ColorCode.GREEN,
  166. // message: `Initial Client set up. Release unary Request`,
  167. // from: `Server Streaming Client Instance`
  168. // }
  169. // statusControl.next(greenlight)
  170. // let report: ReportStatus = {
  171. // code: ColorCode.YELLOW,
  172. // message: `Server doesn't seem to be alive. Error returned.`,
  173. // from: `Server Streaming Client Instance`
  174. // }
  175. // let call = client.HandleMessage({
  176. // id: '0000',
  177. // message: `Establishing channel for response stream. Channel for response!`
  178. // })
  179. // call.on('status', (status: Status) => {
  180. // // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
  181. // // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
  182. // if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  183. // console.log(`Message trasmission operation is successful`)
  184. // resolve('')
  185. // } if (status == grpc.status.UNAVAILABLE) {
  186. // resolve('No connection established. Server is not responding..')
  187. // statusControl.next(report)
  188. // reject()
  189. // }
  190. // });
  191. // // This is and should be the only channel for response. THe demultiplexing will be handled by application logic
  192. // call.on('data', (data: any) => {
  193. // // console.log(`Received stream response from Server. Receiver: ${message.id}`);
  194. // let response = {
  195. // id: data.id,
  196. // message: JSON.parse(data.message)
  197. // }
  198. // // console.log(response)
  199. // incomingResponse.next(response)
  200. // });
  201. // call.on('error', (err) => {
  202. // statusControl.next(report)
  203. // });
  204. // call.on('end', () => { // this is for gracefull || willfull termination from the server
  205. // console.log(`Streaming Response is completed`)
  206. // statusControl.next(report)
  207. // });
  208. // }
  209. // // }
  210. // // function streamRequest(unaryRequestSubject: Subject<any>, statusControl: Subject<any>) {
  211. // // Just send request, no need to listen to response. IT will be handled by the channel above.
  212. // function streamRequest(unaryRequestSubject: Subject<GrpcMessage>, statusControl: Subject<ReportStatus>) {
  213. // unaryRequestSubject.subscribe({
  214. // next: (request: any) => {
  215. // let message = {
  216. // id: request.id,
  217. // message: JSON.stringify(request)
  218. // }
  219. // console.log(`Sending request: ${message.id} over to server....`)
  220. // const call = client.HandleMessage(message)
  221. // call.on('status', (status: Status) => {
  222. // if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  223. // console.log(`Message trasmission operation is successful`)
  224. // // RPC completed successfully
  225. // } if (status == grpc.status.UNAVAILABLE) {
  226. // resolve('No connection established. Server is not responding..')
  227. // let report = {
  228. // code: ColorCode.YELLOW,
  229. // message: `Server doesn't seem to be alive. Error returned.`,
  230. // payload: request,
  231. // from: `Server Streaming Client Instance`
  232. // }
  233. // statusControl.next(report)
  234. // }
  235. // });
  236. // // call.on('data', (data: any) => {
  237. // // let response = {
  238. // // data: data.id,
  239. // // message: JSON.parse(data.message)
  240. // // }
  241. // // console.log(response.message.appData.msgId)
  242. // // });
  243. // call.on('error', (err) => {
  244. // });
  245. // call.on('end', () => { // this is for gracefull || willfull termination from the server
  246. // console.log(`Terminating Stream Request. Directing response to main channel`)
  247. // });
  248. // },
  249. // error: error => {
  250. // console.error(error),
  251. // resolve(error)
  252. // },
  253. // complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
  254. // })
  255. // }
  256. })
  257. }
  258. public async createGrpcBidirectionalServer(
  259. serverUrl: string,
  260. messageToBeStream: Subject<any>,
  261. statusControl: Subject<ReportStatus>,
  262. grpcServerConnection: any,
  263. incomingRequest: Subject<GrpcMessage>
  264. ): Promise<any> { // '0.0.0.0:3001'
  265. return new Promise((resolve, reject) => {
  266. try {
  267. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  268. let server: grpc.Server = new grpc.Server();
  269. // Add the streamingData function to the gRPC service
  270. // Define your message_proto.Message service methods
  271. server.addService(message_proto.Message.service, {
  272. sendMessageStream: (call) => {
  273. console.log(`Client connected from: ${call.getPeer()}`);
  274. let report: ReportStatus = {
  275. code: ColorCode.GREEN,
  276. message: `Client connected!!`,
  277. from: `Bidirectional Instance`
  278. }
  279. statusControl.next(report)
  280. // Right now this is being broadcast.
  281. let subscription: Subscription = messageToBeStream.subscribe({
  282. next: (payload: any) => {
  283. let noConnection = call.cancelled // check connection for each and every message
  284. if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
  285. let report: ReportStatus = {
  286. code: ColorCode.YELLOW,
  287. message: `Client is not alive.....`,
  288. payload: payload,
  289. from: `Bidirectional Instance`
  290. }
  291. statusControl.next(report) // no connection. Tell buffer service to stop releasing messages
  292. subscription.unsubscribe() // i still dont understand why i wrote this here
  293. } else {
  294. console.log(`Sending ${payload.appData.msgId}`)
  295. let message: string = JSON.stringify(payload)
  296. call.write({ message })
  297. }
  298. },
  299. error: err => console.error(err),
  300. complete: () => { } //it will never complete
  301. })
  302. call.on('data', (data: any) => {
  303. // console.log(data) // it does return in string format
  304. let payload = JSON.parse(data.message)
  305. console.log(`Received Message from Client: ${payload.appData?.msgId}`);
  306. // Forward the received message to the RxJS subject
  307. // let respmsg: any = {
  308. // msgId: payload.appData?.msgId,
  309. // confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
  310. // }
  311. // let message: string = JSON.stringify(respmsg)
  312. // console.log(`Responding to client: ${respmsg.msgId}`);
  313. // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
  314. // call.write({ message });
  315. });
  316. call.on('end', () => {
  317. console.log('Client stream ended');
  318. // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
  319. });
  320. call.on('error', (err) => {
  321. // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
  322. // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
  323. // the call back function will be to write and then the client should response immediately through test
  324. });
  325. call.on('close', () => {
  326. console.log('Unknown cause for diconnectivity');
  327. // Handle client closure, which may be due to errors or manual termination
  328. });
  329. },
  330. Check: (_, callback) => {
  331. // health check logic here
  332. // for now it is just sending the status message over to tell the client it is alive
  333. // For simplicity, always return "SERVING" as status
  334. callback(null, { status: 'SERVING' });
  335. },
  336. });
  337. // Bind and start the server
  338. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  339. console.log(`gRPC server is running on ${serverUrl}`);
  340. server.start();
  341. });
  342. grpcServerConnection[serverUrl] = server
  343. }
  344. catch (error) {
  345. resolve(error)
  346. }
  347. })
  348. }
  349. public async createBidirectionalStreamingClient(
  350. server: string,
  351. alreadyHealthCheck: boolean,
  352. messageToBeTransmitted: Subject<any>,
  353. statusControl: Subject<ReportStatus>,
  354. incomingResponse: Subject<GrpcMessage>
  355. ): Promise<string> {
  356. let subscription: any
  357. let unsubscribed: boolean = false
  358. return new Promise(async (resolve, reject) => {
  359. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  360. const call = client.sendMessageStream();
  361. this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
  362. call.on('status', (status: Status) => { // this is useless in streaming(on for unary)
  363. // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
  364. // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
  365. // if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
  366. // resolve('No connection established. Server is not responding..')
  367. // }
  368. });
  369. // All the grpc operations are here
  370. // Subscribe to the RxJS subject to send data to the server
  371. subscription = messageToBeTransmitted.subscribe({
  372. next: (payload: any) => {
  373. if (!unsubscribed) {
  374. console.log(`Sending ${payload.appData.msgId}`)
  375. let message: string = JSON.stringify(payload)
  376. call.write({ message })
  377. }
  378. },
  379. error: err => console.error(err),
  380. complete: () => { } //it will never complete
  381. });
  382. call.on('data', (data: any) => {
  383. let message = JSON.parse(data.message)
  384. console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
  385. });
  386. call.on('error', (err) => {
  387. // console.log(`Something wrong with RPC call...`)
  388. if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
  389. subscription.unsubscribe();
  390. unsubscribed = true;
  391. }
  392. resolve('Server Error');
  393. });
  394. call.on('end', () => {
  395. if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
  396. subscription.unsubscribe();
  397. unsubscribed = true;
  398. }
  399. resolve('Server Error');
  400. });
  401. })
  402. }
  403. // Check connection To be Update. This function is destroying my code flow
  404. public async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
  405. return new Promise((resolve, reject) => {
  406. client.Check({}, (error, response) => {
  407. if (response) {
  408. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  409. let report: ReportStatus = {
  410. code: ColorCode.GREEN,
  411. message: `Good to go!!!`,
  412. from: `GRPC health check`
  413. }
  414. statusControl.next(report)
  415. } else {
  416. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  417. }
  418. })
  419. })
  420. }
  421. }