grpc.service.method.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Subject, Subscription } from "rxjs";
  3. import { ReportStatus, ColorCode, Message, MessageLog, ConnectionAttribute, ConnectionRequest, GrpcConnectionType, ConnectionState } from "../interfaces/general.interface";
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. import { v4 as uuidv4 } from 'uuid'
  6. import { message_proto } from './protos/server.proto'
  7. import { ServerWritableStreamImpl } from '@grpc/grpc-js/build/src/server-call';
  8. export class GrpcServiceMethod {
  9. private server: grpc.Server | any
  10. private messageToBeSendOver: Message | any
  11. private callRequestsFromRemote: ServerWritableStreamImpl<any, ResponseType>[] = []
  12. public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute): Promise<any> {
  13. // Assuming currently only one client
  14. this.createGrpcInstance(request.server.serverUrl, { instanceType: 'server' }, connectionAttribute)
  15. this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute)
  16. // connectionAttribute.outGoing.MessageToBePublished?.subscribe(e => console.log((e.message as MessageLog).appData.msgId))
  17. }
  18. // For testing only
  19. public async shutDownServer(): Promise<string> {
  20. return new Promise((resolve, reject) => {
  21. console.log(`Shutting down servers...`)
  22. if (this.server) {
  23. this.callRequestsFromRemote[0].destroy()
  24. this.callRequestsFromRemote[0].end()
  25. this.server.forceShutdown()
  26. let message: string = `Server shut down successfully!`
  27. resolve(message)
  28. }
  29. if (!this.server) {
  30. let errorMsg: string = `There's no active server here`
  31. reject(errorMsg)
  32. }
  33. })
  34. }
  35. private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) {
  36. if (clientInfo) {
  37. connectionAttribute.inComing.StreamID = clientInfo.channelID
  38. connectionAttribute.inComing.PublisherID = clientInfo.publisherID
  39. connectionAttribute.inComing.SubscriberID = clientInfo.subscriberID
  40. }
  41. if (localInfo) {
  42. connectionAttribute.outGoing.StreamID = localInfo.channelID
  43. connectionAttribute.outGoing.PublisherID = localInfo.publisherID
  44. connectionAttribute.outGoing.SubscriberID = localInfo.subscriberID
  45. }
  46. if (connectionAttribute.outGoing.StreamID && connectionAttribute.inComing.StreamID) {
  47. connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.StreamID + connectionAttribute.inComing.StreamID
  48. connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.StreamID + connectionAttribute.outGoing.StreamID
  49. // let report: ReportStatus = {
  50. // code: ColorCode.GREEN,
  51. // message: `ConnectionID acquired. Informing Restranmission to release Messages...`,
  52. // }
  53. }
  54. }
  55. // To be migrated into a service in the immediate future
  56. private async createGrpcInstance(
  57. serverUrl: string,
  58. grpcType: GrpcConnectionType,
  59. connectionAttribute: ConnectionAttribute,
  60. ) {
  61. let statusControl: Subject<ConnectionState> = connectionAttribute.connectionStatus
  62. let consecutiveResolutions = 0;
  63. let lastResolutionTime = Date.now();
  64. let yellowErrorEmission: boolean = false
  65. let redErrorEmission: boolean = false
  66. while (true) {
  67. try {
  68. let recreatePromise = new Promise((resolve) => {
  69. if (grpcType.instanceType == 'server') {
  70. this.createServerStreamingServer(serverUrl, connectionAttribute).then(() => {
  71. resolve('recreate')
  72. })
  73. }
  74. if (grpcType.instanceType == 'client') {
  75. this.createServerStreamingClient(serverUrl, connectionAttribute).then(() => {
  76. resolve('recreate')
  77. })
  78. }
  79. })
  80. await recreatePromise
  81. // If connection resolves (indicating failure), increment the count
  82. consecutiveResolutions++;
  83. // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
  84. if (redErrorEmission == false) {
  85. redErrorEmission = true
  86. console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
  87. // let error: ReportStatus = {
  88. // code: ColorCode.YELLOW,
  89. // message: 'Server is not responding. Proceed to buffer.',
  90. // }
  91. let error: ConnectionState = {
  92. status: 'BUFFER',
  93. reason: `Server is not responding...`
  94. }
  95. statusControl.next(error)
  96. }
  97. } catch (error) {
  98. // Connection did not resolve, reset the count
  99. consecutiveResolutions = 0;
  100. console.error('Connection attempt failed:', error);
  101. }
  102. // Check for a pause of more than 2 seconds since the last resolution attempt
  103. const currentTime = Date.now();
  104. const timeSinceLastResolution = currentTime - lastResolutionTime;
  105. if (timeSinceLastResolution > 2000) {
  106. consecutiveResolutions = 0;
  107. yellowErrorEmission = false
  108. redErrorEmission = false
  109. }
  110. // Update the last resolution time
  111. lastResolutionTime = currentTime;
  112. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  113. // timeout generate message to trigger this reconnection
  114. }
  115. }
  116. // Create Server Instance to stream all application Outgoing messages
  117. public async createServerStreamingServer(
  118. serverUrl: string,
  119. connectionAttribute: ConnectionAttribute
  120. ): Promise<any> { // '0.0.0.0:3001'
  121. return new Promise((resolve, reject) => {
  122. try {
  123. if (!this.server) {
  124. this.server = new grpc.Server();
  125. }
  126. this.server.addService(message_proto.Message.service, {
  127. HandleMessage: (call) => {
  128. this.callRequestsFromRemote.push(call)
  129. let clientInfo = JSON.parse(call.request.message)
  130. this.generateAdditionalAttributes(connectionAttribute, clientInfo)
  131. // console.log(`Hello 123`)
  132. // connectionAttribute.outGoing.MessageToBePublished?.subscribe(e => (e.message as MessageLog).appData.msgId)
  133. console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
  134. if (connectionAttribute.outGoing.MessageToBePublished) {
  135. let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
  136. next: (response: Message) => {
  137. console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
  138. this.messageToBeSendOver = response
  139. let message = {
  140. id: response.id,
  141. message: JSON.stringify(response.message)
  142. }
  143. call.write(message)
  144. },
  145. error: err => {
  146. console.error(err)
  147. subscription.unsubscribe()
  148. resolve('')
  149. },
  150. complete: () => {
  151. console.log(`Stream response completed for ${call.request.id}`)
  152. subscription.unsubscribe()
  153. resolve('')
  154. }
  155. })
  156. }
  157. console.log(connectionAttribute)
  158. let report: ConnectionState = {
  159. status: 'DIRECT_PUBLISH'
  160. }
  161. connectionAttribute.connectionStatus.next(report)
  162. },
  163. Check: (_, callback) => {
  164. // for now it is just sending the status message over to tell the client it is alive
  165. // For simplicity, always return "SERVING" as status
  166. callback(null, { status: 'SERVING' });
  167. },
  168. });
  169. // Bind and start the server
  170. this.server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  171. console.log(`gRPC server is running on ${serverUrl}`);
  172. this.server.start();
  173. });
  174. }
  175. catch (error) {
  176. resolve(error)
  177. }
  178. })
  179. }
  180. // Send a request over to the other server to open a channel for this server to emit/stream messages over
  181. public async createServerStreamingClient(
  182. server: string,
  183. connectionAttribute: ConnectionAttribute
  184. ): Promise<string> {
  185. return new Promise(async (resolve, reject) => {
  186. const client = new message_proto.Message(server, grpc.credentials.createInsecure());
  187. let outGoingInfo: any = {
  188. channelID: uuidv4(),
  189. publisherID: uuidv4(),
  190. subscriberID: uuidv4()
  191. }
  192. this.generateAdditionalAttributes(connectionAttribute, {}, outGoingInfo)
  193. // connectionAttribute.ConnectionID = connectionAttribute.outGoing.ChannelID + (connectionAttribute.inComing.ChannelID ?? 'undefined')
  194. let call = client.HandleMessage({ id: server, message: JSON.stringify(outGoingInfo) })
  195. console.log(`Sending request to ${server} to open response channel...`)
  196. call.on('status', (status: Status) => {
  197. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  198. console.log(`Message trasmission operation is successful`)
  199. // RPC completed successfully
  200. } if (status == grpc.status.UNAVAILABLE) {
  201. // let report: ReportStatus = {
  202. // code: ColorCode.YELLOW,
  203. // message: `Server doesn't seem to be alive. Error returned.`,
  204. // payload: this.messageToBeSendOver ?? `There's no message at the moment...`
  205. // }
  206. let report: ConnectionState = {
  207. status: 'BUFFER',
  208. reason: `Server doesn't seem to be alive. Error returned.`,
  209. payload: this.messageToBeSendOver ?? `There's no message at the moment...`
  210. }
  211. connectionAttribute.connectionStatus.next(report)
  212. resolve('No connection established. Server is not responding..')
  213. }
  214. });
  215. call.on('data', (data: any) => {
  216. let response: Message = {
  217. id: data.id,
  218. message: JSON.parse(data.message)
  219. }
  220. if (connectionAttribute.inComing.MessageToBeReceived) {
  221. connectionAttribute.inComing.MessageToBeReceived.next(response)
  222. }
  223. // console.log(`Received ${(response.message as MessageLog).appData.msgId}`)
  224. });
  225. call.on('error', (err) => {
  226. resolve('')
  227. });
  228. })
  229. }
  230. // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
  231. public async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
  232. return new Promise((resolve, reject) => {
  233. client.Check({}, (error, response) => {
  234. if (response) {
  235. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  236. // Intepret the response status and implement code logic or handler
  237. resolve(response.status)
  238. } else {
  239. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  240. reject(false)
  241. }
  242. })
  243. })
  244. }
  245. }
  246. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md