grpc.service.method.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Subject, Subscription } from "rxjs";
  3. import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, MessageLog, ConnectionStatus } from "../interfaces/general.interface";
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. import { message_proto } from './protos/server.proto'
  6. import * as _ from 'lodash'
  7. export class GrpcServiceMethod {
  8. private server: grpc.Server | any
  9. private messageToBeSendOver: Message | any
  10. private clientRequest: Subject<ConnectionAttribute> = new Subject()
  11. private localServerStatus: Subject<any> = new Subject()
  12. // keep track of all the incoming channel request from the servers
  13. public getClientRequest() {
  14. return this.clientRequest
  15. }
  16. // keep track of all the incoming channel request from the servers
  17. public getLocalServerStatus() {
  18. return this.localServerStatus
  19. }
  20. public async create(connectionAttribute: ConnectionAttribute): Promise<any> {
  21. return new Promise((resolve, reject) => {
  22. this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute)
  23. this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute)
  24. resolve('Just putting it here for now....')
  25. })
  26. }
  27. private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) {
  28. // if (clientInfo) {
  29. // connectionAttribute.inComing.StreamID = clientInfo.StreamID
  30. // connectionAttribute.inComing.PublisherID = clientInfo.PublisherID
  31. // connectionAttribute.inComing.SubscriberID = clientInfo.SubscriberID
  32. // }
  33. // if (localInfo) {
  34. // connectionAttribute.outGoing.StreamID = localInfo.StreamID
  35. // connectionAttribute.outGoing.PublisherID = localInfo.PublisherID
  36. // connectionAttribute.outGoing.SubscriberID = localInfo.SubscriberID
  37. // }
  38. // if (connectionAttribute.outGoing.StreamID && connectionAttribute.inComing.StreamID) {
  39. // connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.StreamID + connectionAttribute.inComing.StreamID
  40. // connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.StreamID + connectionAttribute.outGoing.StreamID
  41. // }
  42. }
  43. private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
  44. // Reconnection Logic here
  45. while (true) {
  46. try {
  47. let recreatePromise = new Promise((resolve) => {
  48. if (grpcType.instanceType == 'server') {
  49. this.createServerStreamingServer(connectionAttribute).then(() => {
  50. resolve('recreate')
  51. })
  52. }
  53. if (grpcType.instanceType == 'client') {
  54. this.createServerStreamingClient(connectionAttribute).then(() => {
  55. resolve('recreate')
  56. })
  57. }
  58. })
  59. await recreatePromise
  60. } catch (error) {
  61. console.error('Connection attempt failed:', error);
  62. }
  63. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  64. // timeout generate message to trigger this reconnection
  65. }
  66. }
  67. // Create Server Instance to stream all application Outgoing messages
  68. public async createServerStreamingServer(connectionAttribute: ConnectionAttribute): Promise<any> { // '0.0.0.0:3001'
  69. return new Promise((resolve, reject) => {
  70. try {
  71. if (!this.server) {
  72. this.server = new grpc.Server()
  73. this.localServerStatus.next({
  74. connectionStatus: 'ON',
  75. serverName: connectionAttribute.outGoing.PublisherID,
  76. message: `${connectionAttribute.outGoing.serverUrl} started.`
  77. })
  78. } else {
  79. console.log(`Grpc server alrady started.`)
  80. }
  81. this.server.addService(message_proto.Message.service, {
  82. HandleMessage: (call) => {
  83. let clientInfo: ConnectionAttribute = JSON.parse(call.request.message)
  84. this.clientRequest.next(clientInfo)
  85. console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
  86. if (connectionAttribute.outGoing.MessageToBePublished) {
  87. let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
  88. next: (response: Message) => {
  89. console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
  90. let message = {
  91. id: response.id,
  92. message: JSON.stringify(response.message)
  93. }
  94. call.write(message)
  95. },
  96. error: err => {
  97. console.error(err)
  98. subscription.unsubscribe()
  99. resolve('')
  100. },
  101. complete: () => {
  102. console.log(`Stream response completed for ${call.request.id}`)
  103. subscription.unsubscribe()
  104. resolve('')
  105. }
  106. })
  107. let report: ConnectionState = {
  108. status: 'DIRECT_PUBLISH'
  109. }
  110. connectionAttribute.connectionStatus!.next(report)
  111. }
  112. },
  113. Check: (_, callback) => {
  114. // for now it is just sending the status message over to tell the client it is alive
  115. // For simplicity, always return "SERVING" as status
  116. callback(null, { status: 'SERVING' });
  117. },
  118. });
  119. // Bind and start the server
  120. this.server.bindAsync(connectionAttribute.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  121. console.log(`gRPC server is running on ${connectionAttribute.outGoing.serverUrl}`);
  122. this.server.start();
  123. });
  124. }
  125. catch (error) {
  126. resolve(error)
  127. }
  128. })
  129. }
  130. // Send a request over to the other server to open a channel for this server to emit/stream messages over
  131. public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise<string> {
  132. return new Promise(async (resolve, reject) => {
  133. const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure());
  134. let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error
  135. ConnectionID: connectionAttribute.ConnectionID,
  136. outGoing: {
  137. StreamID: connectionAttribute.outGoing.StreamID,
  138. PublisherID: connectionAttribute.outGoing.PublisherID,
  139. SubscriberID: connectionAttribute.outGoing.SubscriberID,
  140. serverUrl: connectionAttribute.outGoing.serverUrl,
  141. MessageToBePublished: null,
  142. MessageToBeReceived: null
  143. },
  144. inComing: {
  145. StreamID: connectionAttribute.inComing.StreamID,
  146. PublisherID: connectionAttribute.inComing.PublisherID,
  147. SubscriberID: connectionAttribute.inComing.SubscriberID,
  148. serverUrl: connectionAttribute.inComing.serverUrl,
  149. MessageToBePublished: null,
  150. MessageToBeReceived: null
  151. },
  152. connectionStatus: null
  153. }
  154. let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) })
  155. console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`)
  156. call.on('status', (status: Status) => {
  157. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  158. console.log(`Message trasmission operation is successful`)
  159. // RPC completed successfully
  160. } if (status == grpc.status.UNAVAILABLE) {
  161. let report: ConnectionState = {
  162. status: 'BUFFER',
  163. reason: `Server doesn't seem to be alive. Error returned.`,
  164. payload: this.messageToBeSendOver ?? `There's no message at the moment...`
  165. }
  166. connectionAttribute.connectionStatus!.next(report)
  167. resolve('No connection established. Server is not responding..')
  168. }
  169. });
  170. call.on('data', (data: any) => {
  171. let response: Message = {
  172. id: data.id,
  173. message: JSON.parse(data.message)
  174. }
  175. if (connectionAttribute.inComing.MessageToBeReceived) {
  176. connectionAttribute.inComing.MessageToBeReceived.next(response)
  177. }
  178. });
  179. call.on('error', (err) => {
  180. console.error(err)
  181. resolve('')
  182. });
  183. })
  184. }
  185. // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
  186. public async checkConnectionHealth(client: any, statusControl: Subject<ConnectionState>, alreadyHealthCheck: boolean): Promise<boolean> {
  187. return new Promise((resolve, reject) => {
  188. client.Check({}, (error, response) => {
  189. if (response) {
  190. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  191. // Intepret the response status and implement code logic or handler
  192. resolve(response.status)
  193. } else {
  194. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  195. reject(false)
  196. }
  197. })
  198. })
  199. }
  200. }
  201. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md