service.method.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Subject, Subscription } from "rxjs";
  3. import { ReportStatus, ColorCode, Message, MessageLog } from "../interfaces/general.interface";
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. import { error } from 'console';
  6. const message_proto = require('./protos/server.proto')
  7. export class GrpcServiceMethod {
  8. // Create Server Instance to stream all application Outgoing messages
  9. public async createServerStreamingServer(
  10. serverUrl: string,
  11. grpcServerConnection: any,
  12. messageToBeStream: Subject<Message>
  13. ): Promise<any> { // '0.0.0.0:3001'
  14. return new Promise((resolve, reject) => {
  15. try {
  16. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  17. let server: grpc.Server = new grpc.Server();
  18. // Add the streamingData function to the gRPC service
  19. // Define your message_proto.Message service methods
  20. server.addService(message_proto.Message.service, {
  21. HandleMessage: (call) => {
  22. // console.log(call.request)
  23. console.log(`Intializing stream. Opening Channel. Confirmation from ${call.getPeer()}`)
  24. let subscription: Subscription = messageToBeStream.subscribe({
  25. next: (response: Message) => {
  26. console.log(`Sending ${(response.message as MessageLog).appData.msgId}`)
  27. let message = {
  28. id: response.id,
  29. message: JSON.stringify(response.message)
  30. }
  31. call.write(message)
  32. },
  33. error: err => {
  34. console.error(err)
  35. subscription.unsubscribe()
  36. resolve('')
  37. },
  38. complete: () => {
  39. console.log(`Stream response completed for ${call.request.id}`)
  40. subscription.unsubscribe()
  41. resolve('')
  42. // call.end()
  43. }
  44. })
  45. },
  46. Check: (_, callback) => {
  47. // health check logic here
  48. // for now it is just sending the status message over to tell the client it is alive
  49. // For simplicity, always return "SERVING" as status
  50. callback(null, { status: 'SERVING' });
  51. },
  52. });
  53. // Bind and start the server
  54. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  55. console.log(`gRPC server is running on ${serverUrl}`);
  56. server.start();
  57. });
  58. grpcServerConnection[serverUrl] = server
  59. }
  60. catch (error) {
  61. resolve(error)
  62. }
  63. })
  64. }
  65. // Send a request over to the other server to open a channel for this server to emit/stream messages over
  66. public async createServerStreamingClient(
  67. server: string,
  68. alreadyHealthCheck: boolean,
  69. statusControl: Subject<ReportStatus>,
  70. incomingMessage: Subject<Message>
  71. ): Promise<string> {
  72. return new Promise(async (resolve, reject) => {
  73. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  74. // perform check to see if server is alive, if not terminate this grpc instant and create again
  75. this.checkConnectionHealth(client, statusControl, alreadyHealthCheck).catch((error) => {
  76. resolve('')
  77. })
  78. // this is where the request sending logic occurs
  79. let call = client.HandleMessage({ id: `0000`, message: `Intiate Main Stream Channel Response` })
  80. console.log(`Sending request to open response channel...`)
  81. call.on('status', (status: Status) => {
  82. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  83. console.log(`Message trasmission operation is successful`)
  84. // RPC completed successfully
  85. } if (status == grpc.status.UNAVAILABLE) {
  86. let report = {
  87. code: ColorCode.YELLOW,
  88. message: `Server doesn't seem to be alive. Error returned.`,
  89. from: `Server Streaming Client Instance`
  90. }
  91. statusControl.next(report)
  92. resolve('No connection established. Server is not responding..')
  93. }
  94. });
  95. call.on('data', (data: any) => {
  96. // standard procedure. convert back the data and pass to the application to be processed
  97. let response: Message = {
  98. id: data.id,
  99. message: JSON.parse(data.message)
  100. }
  101. incomingMessage.next(response)
  102. console.log((response.message as MessageLog).appData.msgId)
  103. });
  104. call.on('error', (err) => {
  105. resolve('')
  106. });
  107. // call.on('end', () => { // this is for gracefull || willfull termination from the server
  108. // console.log(`Terminating Stream Request. Directing response to main channel`)
  109. // resolve('')
  110. // });
  111. })
  112. }
  113. /* ----------------All the functions below are for Bi-directional streaming. Subject to be deleted if decided not in use---------------- */
  114. public async createGrpcBidirectionalServer(
  115. serverUrl: string,
  116. messageToBeStream: Subject<any>,
  117. statusControl: Subject<ReportStatus>,
  118. grpcServerConnection: any,
  119. ): Promise<any> { // '0.0.0.0:3001'
  120. return new Promise((resolve, reject) => {
  121. try {
  122. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
  123. let server: grpc.Server = new grpc.Server();
  124. // Add the streamingData function to the gRPC service
  125. // Define your message_proto.Message service methods
  126. server.addService(message_proto.Message.service, {
  127. sendMessageStream: (call) => {
  128. console.log(`Client connected from: ${call.getPeer()}`);
  129. let report: ReportStatus = {
  130. code: ColorCode.GREEN,
  131. message: `Client connected!!`,
  132. from: `Bidirectional Instance`
  133. }
  134. statusControl.next(report)
  135. // Right now this is being broadcast.
  136. let subscription: Subscription = messageToBeStream.subscribe({
  137. next: (payload: any) => {
  138. let noConnection = call.cancelled // check connection for each and every message
  139. if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
  140. let report: ReportStatus = {
  141. code: ColorCode.YELLOW,
  142. message: `Client is not alive.....`,
  143. payload: payload,
  144. from: `Bidirectional Instance`
  145. }
  146. statusControl.next(report) // no connection. Tell buffer service to stop releasing messages
  147. subscription.unsubscribe() // i still dont understand why i wrote this here
  148. } else {
  149. console.log(`Sending ${payload.appData.msgId}`)
  150. let message: string = JSON.stringify(payload)
  151. call.write({ message })
  152. }
  153. },
  154. error: err => console.error(err),
  155. complete: () => { } //it will never complete
  156. })
  157. call.on('data', (data: any) => {
  158. // console.log(data) // it does return in string format
  159. let payload = JSON.parse(data.message)
  160. console.log(`Received Message from Client: ${payload.appData?.msgId}`);
  161. // Forward the received message to the RxJS subject
  162. // let respmsg: any = {
  163. // msgId: payload.appData?.msgId,
  164. // confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
  165. // }
  166. // let message: string = JSON.stringify(respmsg)
  167. // console.log(`Responding to client: ${respmsg.msgId}`);
  168. // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
  169. // call.write({ message });
  170. });
  171. call.on('end', () => {
  172. console.log('Client stream ended');
  173. // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
  174. });
  175. call.on('error', (err) => {
  176. // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
  177. // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
  178. // the call back function will be to write and then the client should response immediately through test
  179. });
  180. call.on('close', () => {
  181. console.log('Unknown cause for diconnectivity');
  182. // Handle client closure, which may be due to errors or manual termination
  183. });
  184. },
  185. Check: (_, callback) => {
  186. // health check logic here
  187. // for now it is just sending the status message over to tell the client it is alive
  188. // For simplicity, always return "SERVING" as status
  189. callback(null, { status: 'SERVING' });
  190. },
  191. });
  192. // Bind and start the server
  193. server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  194. console.log(`gRPC server is running on ${serverUrl}`);
  195. server.start();
  196. });
  197. grpcServerConnection[serverUrl] = server
  198. }
  199. catch (error) {
  200. resolve(error)
  201. }
  202. })
  203. }
  204. public async createBidirectionalStreamingClient(
  205. server: string,
  206. alreadyHealthCheck: boolean,
  207. messageToBeTransmitted: Subject<any>,
  208. statusControl: Subject<ReportStatus>,
  209. incomingResponse: Subject<Message>
  210. ): Promise<string> {
  211. let subscription: any
  212. let unsubscribed: boolean = false
  213. return new Promise(async (resolve, reject) => {
  214. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  215. const call = client.sendMessageStream();
  216. this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
  217. call.on('status', (status: Status) => { // this is useless in streaming(on for unary)
  218. // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
  219. // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
  220. // if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
  221. // resolve('No connection established. Server is not responding..')
  222. // }
  223. });
  224. // All the grpc operations are here
  225. // Subscribe to the RxJS subject to send data to the server
  226. subscription = messageToBeTransmitted.subscribe({
  227. next: (payload: any) => {
  228. if (!unsubscribed) {
  229. console.log(`Sending ${payload.appData.msgId}`)
  230. let message: string = JSON.stringify(payload)
  231. call.write({ message })
  232. }
  233. },
  234. error: err => console.error(err),
  235. complete: () => { } //it will never complete
  236. });
  237. call.on('data', (data: any) => {
  238. let message = JSON.parse(data.message)
  239. console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
  240. });
  241. call.on('error', (err) => {
  242. // console.log(`Something wrong with RPC call...`)
  243. if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
  244. subscription.unsubscribe();
  245. unsubscribed = true;
  246. }
  247. resolve('Server Error');
  248. });
  249. call.on('end', () => {
  250. if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
  251. subscription.unsubscribe();
  252. unsubscribed = true;
  253. }
  254. resolve('Server Error');
  255. });
  256. })
  257. }
  258. // Check connection To be Update. This function is destroying my code flow
  259. public async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
  260. return new Promise((resolve, reject) => {
  261. client.Check({}, (error, response) => {
  262. if (response) {
  263. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  264. let report: ReportStatus = {
  265. code: ColorCode.GREEN,
  266. message: `Good to go!!!`,
  267. from: `GRPC health check`
  268. }
  269. statusControl.next(report)
  270. } else {
  271. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  272. resolve(false)
  273. }
  274. })
  275. })
  276. }
  277. }