grpc.service.method.ts 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Subject, Subscription } from "rxjs";
  3. import { Message, ConnectionAttribute, ConnectionRequest, GrpcConnectionType, ConnectionState, MessageLog, State, OutGoingInfo } from "../interfaces/general.interface";
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. import { message_proto } from './protos/server.proto'
  6. import { ServerWritableStreamImpl } from '@grpc/grpc-js/build/src/server-call';
  7. export class GrpcServiceMethod {
  8. // Prefilled connection attribute and pass in grpc service method for reference
  9. // Isolate connection attribute referencing issue to server-client service
  10. private server: grpc.Server | any
  11. private messageToBeSendOver: Message | any
  12. private clientInfo: any[] = []
  13. // private callRequestsFromRemote: ServerWritableStreamImpl<any, ResponseType>[] = []
  14. public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute, outGoingInfo: OutGoingInfo): Promise<any> {
  15. // Assuming currently only one client
  16. // this.createGrpcInstance(request.server.serverUrl, { instanceType: 'server' }, connectionAttribute, outGoingInfo)
  17. // this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute, outGoingInfo)
  18. }
  19. private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) {
  20. if (clientInfo) {
  21. connectionAttribute.inComing.StreamID = clientInfo.StreamID
  22. connectionAttribute.inComing.PublisherID = clientInfo.PublisherID
  23. connectionAttribute.inComing.SubscriberID = clientInfo.SubscriberID
  24. }
  25. if (localInfo) {
  26. connectionAttribute.outGoing.StreamID = localInfo.StreamID
  27. connectionAttribute.outGoing.PublisherID = localInfo.PublisherID
  28. connectionAttribute.outGoing.SubscriberID = localInfo.SubscriberID
  29. }
  30. if (connectionAttribute.outGoing.StreamID && connectionAttribute.inComing.StreamID) {
  31. connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.StreamID + connectionAttribute.inComing.StreamID
  32. connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.StreamID + connectionAttribute.outGoing.StreamID
  33. }
  34. }
  35. private async createGrpcInstance(
  36. serverUrl: string,
  37. grpcType: GrpcConnectionType,
  38. connectionAttribute: ConnectionAttribute,
  39. outGoingInfo: OutGoingInfo
  40. ) {
  41. while (true) {
  42. try {
  43. let recreatePromise = new Promise((resolve) => {
  44. if (grpcType.instanceType == 'server') {
  45. this.createServerStreamingServer(serverUrl, connectionAttribute).then(() => {
  46. resolve('recreate')
  47. })
  48. }
  49. if (grpcType.instanceType == 'client') {
  50. this.createServerStreamingClient(serverUrl, connectionAttribute, outGoingInfo).then(() => {
  51. resolve('recreate')
  52. })
  53. }
  54. })
  55. await recreatePromise
  56. } catch (error) {
  57. console.error('Connection attempt failed:', error);
  58. }
  59. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  60. // timeout generate message to trigger this reconnection
  61. }
  62. }
  63. // Create Server Instance to stream all application Outgoing messages
  64. public async createServerStreamingServer(
  65. serverUrl: string,
  66. connectionAttribute: ConnectionAttribute
  67. ): Promise<any> { // '0.0.0.0:3001'
  68. return new Promise((resolve, reject) => {
  69. try {
  70. if (!this.server) {
  71. this.server = new grpc.Server()
  72. } else {
  73. console.log(`Grpc server alrady started.`) // this kept calling, that means this function is resolving on it's own, prompting the reconnection logic
  74. }
  75. this.server.addService(message_proto.Message.service, {
  76. HandleMessage: (call) => {
  77. let clientInfo: OutGoingInfo = JSON.parse(call.request.message)
  78. // console.log(clientInfo)
  79. this.generateAdditionalAttributes(connectionAttribute, clientInfo)
  80. console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
  81. if (connectionAttribute.outGoing.MessageToBePublished) {
  82. let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
  83. next: (response: Message) => {
  84. console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
  85. let message = {
  86. id: response.id,
  87. message: JSON.stringify(response.message)
  88. }
  89. call.write(message)
  90. },
  91. error: err => {
  92. console.error(err)
  93. subscription.unsubscribe()
  94. resolve('')
  95. },
  96. complete: () => {
  97. console.log(`Stream response completed for ${call.request.id}`)
  98. subscription.unsubscribe()
  99. resolve('')
  100. }
  101. })
  102. console.log(connectionAttribute)
  103. let report: ConnectionState = {
  104. status: 'DIRECT_PUBLISH'
  105. }
  106. connectionAttribute.connectionStatus.next(report)
  107. }
  108. },
  109. Check: (_, callback) => {
  110. // for now it is just sending the status message over to tell the client it is alive
  111. // For simplicity, always return "SERVING" as status
  112. callback(null, { status: 'SERVING' });
  113. },
  114. });
  115. // Bind and start the server
  116. this.server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  117. console.log(`gRPC server is running on ${serverUrl}`);
  118. this.server.start();
  119. });
  120. }
  121. catch (error) {
  122. resolve(error)
  123. }
  124. })
  125. }
  126. // Send a request over to the other server to open a channel for this server to emit/stream messages over
  127. public async createServerStreamingClient(
  128. server: string,
  129. connectionAttribute: ConnectionAttribute,
  130. outGoingInfo: OutGoingInfo
  131. ): Promise<string> {
  132. return new Promise(async (resolve, reject) => {
  133. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  134. this.generateAdditionalAttributes(connectionAttribute, {}, outGoingInfo)
  135. let call = client.HandleMessage({ id: server, message: JSON.stringify(outGoingInfo) })
  136. console.log(`Sending request to ${server} to open response channel...`)
  137. call.on('status', (status: Status) => {
  138. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  139. console.log(`Message trasmission operation is successful`)
  140. // RPC completed successfully
  141. } if (status == grpc.status.UNAVAILABLE) {
  142. let report: ConnectionState = {
  143. status: 'BUFFER',
  144. reason: `Server doesn't seem to be alive. Error returned.`,
  145. payload: this.messageToBeSendOver ?? `There's no message at the moment...`
  146. }
  147. connectionAttribute.connectionStatus.next(report)
  148. resolve('No connection established. Server is not responding..')
  149. }
  150. });
  151. call.on('data', (data: any) => {
  152. let response: Message = {
  153. id: data.id,
  154. message: JSON.parse(data.message)
  155. }
  156. if (connectionAttribute.inComing.MessageToBeReceived) {
  157. connectionAttribute.inComing.MessageToBeReceived.next(response)
  158. }
  159. });
  160. call.on('error', (err) => {
  161. console.error(err)
  162. resolve('')
  163. });
  164. })
  165. }
  166. // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
  167. public async checkConnectionHealth(client: any, statusControl: Subject<ConnectionState>, alreadyHealthCheck: boolean): Promise<boolean> {
  168. return new Promise((resolve, reject) => {
  169. client.Check({}, (error, response) => {
  170. if (response) {
  171. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  172. // Intepret the response status and implement code logic or handler
  173. resolve(response.status)
  174. } else {
  175. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  176. reject(false)
  177. }
  178. })
  179. })
  180. }
  181. }
  182. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md