grpc.service.method.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Observable, Subject, Subscription } from "rxjs";
  3. import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, MessageLog, ConnectionStatus, StreamAttribute } from "../interfaces/general.interface";
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. import { message_proto } from './protos/server.proto'
  6. import * as _ from 'lodash'
  7. export class GrpcServiceMethod {
  8. private server: grpc.Server | any
  9. private messageToBeSendOver: Message | any
  10. private clientRequest: Subject<ConnectionAttribute> = new Subject()
  11. private localServerStatus: Subject<any> = new Subject()
  12. // keep track of all the incoming channel request from the servers
  13. public getClientRequest() {
  14. return this.clientRequest
  15. }
  16. // keep track of all the incoming channel request from the servers
  17. public getLocalServerStatus() {
  18. return this.localServerStatus
  19. }
  20. public async create(connectionAttribute: ConnectionAttribute): Promise<any> {
  21. return new Promise((resolve, reject) => {
  22. this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute)
  23. this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute)
  24. resolve('Just putting it here for now....')
  25. })
  26. }
  27. private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
  28. // Reconnection Logic here
  29. while (true) {
  30. try {
  31. let recreatePromise = new Promise((resolve) => {
  32. if (grpcType.instanceType == 'server') {
  33. this.createServerStreamingServer(connectionAttribute).then(() => {
  34. resolve('recreate')
  35. })
  36. }
  37. if (grpcType.instanceType == 'client') {
  38. this.createServerStreamingClient(connectionAttribute).then(() => {
  39. resolve('recreate')
  40. })
  41. }
  42. })
  43. await recreatePromise
  44. } catch (error) {
  45. console.error('Connection attempt failed:', error);
  46. }
  47. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  48. // timeout generate message to trigger this reconnection
  49. }
  50. }
  51. // Create Server Instance to stream all application Outgoing messages
  52. public async createServerStreamingServer(connectionAttribute: ConnectionAttribute): Promise<any> { // '0.0.0.0:3001'
  53. return new Promise((resolve, reject) => {
  54. try {
  55. if (!this.server) {
  56. this.server = new grpc.Server()
  57. this.localServerStatus.next({
  58. connectionStatus: 'ON',
  59. connectionIDlocal: connectionAttribute.ConnectionID.local,
  60. message: `${connectionAttribute.outGoing.serverUrl} started.`
  61. })
  62. } else {
  63. console.log(`Grpc server alrady started.`)
  64. this.localServerStatus.next({
  65. connectionStatus: 'ON',
  66. connectionIDlocal: connectionAttribute.ConnectionID.local,
  67. message: `${connectionAttribute.outGoing.serverUrl} already started.`
  68. })
  69. }
  70. this.server.addService(message_proto.Message.service, {
  71. HandleMessage: (call) => {
  72. /// add a checking for standard message request
  73. let clientInfo: ConnectionAttribute = JSON.parse(call.request.message)
  74. if (this.isConnectionAttribute(clientInfo)) {
  75. this.clientRequest.next(clientInfo)
  76. console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
  77. if (connectionAttribute.outGoing.MessageToBePublished) {
  78. let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
  79. next: (response: Message) => {
  80. console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
  81. let message = {
  82. id: response.id,
  83. message: JSON.stringify(response.message)
  84. }
  85. call.write(message)
  86. },
  87. error: err => {
  88. console.error(err)
  89. subscription.unsubscribe()
  90. resolve('')
  91. },
  92. complete: () => {
  93. console.log(`Stream response completed for ${call.request.id}`)
  94. subscription.unsubscribe()
  95. resolve('')
  96. }
  97. })
  98. let report: ConnectionState = {
  99. status: 'DIRECT_PUBLISH'
  100. }
  101. connectionAttribute.connectionStatus!.next(report)
  102. }
  103. } else {
  104. console.error(`INVALID REQUEST`)
  105. }
  106. },
  107. Check: (_, callback) => {
  108. // for now it is just sending the status message over to tell the client it is alive
  109. // For simplicity, always return "SERVING" as status
  110. callback(null, { status: 'SERVING' });
  111. },
  112. });
  113. // Bind and start the server
  114. this.server.bindAsync(connectionAttribute.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  115. console.log(`gRPC server is running on ${connectionAttribute.outGoing.serverUrl}`);
  116. this.server.start();
  117. });
  118. }
  119. catch (error) {
  120. resolve(error)
  121. }
  122. })
  123. }
  124. // Send a request over to the other server to open a channel for this server to emit/stream messages over
  125. public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise<string> {
  126. return new Promise(async (resolve, reject) => {
  127. const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure());
  128. let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error
  129. ConnectionID: connectionAttribute.ConnectionID,
  130. outGoing: {
  131. StreamID: connectionAttribute.outGoing.StreamID,
  132. PublisherID: connectionAttribute.outGoing.PublisherID,
  133. SubscriberID: connectionAttribute.outGoing.SubscriberID,
  134. serverUrl: connectionAttribute.outGoing.serverUrl,
  135. MessageToBePublished: null,
  136. MessageToBeReceived: null
  137. },
  138. inComing: {
  139. StreamID: connectionAttribute.inComing.StreamID,
  140. PublisherID: connectionAttribute.inComing.PublisherID,
  141. SubscriberID: connectionAttribute.inComing.SubscriberID,
  142. serverUrl: connectionAttribute.inComing.serverUrl,
  143. MessageToBePublished: null,
  144. MessageToBeReceived: null
  145. },
  146. connectionStatus: null
  147. }
  148. let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) })
  149. console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`)
  150. call.on('status', (status: Status) => {
  151. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  152. console.log(`Message trasmission operation is successful`)
  153. // RPC completed successfully
  154. } if (status == grpc.status.UNAVAILABLE) {
  155. let report: ConnectionState = {
  156. status: 'BUFFER',
  157. reason: `Server doesn't seem to be alive. Error returned.`,
  158. payload: this.messageToBeSendOver ?? `There's no message at the moment...`
  159. }
  160. connectionAttribute.connectionStatus!.next(report)
  161. resolve('No connection established. Server is not responding..')
  162. }
  163. });
  164. call.on('data', (data: any) => {
  165. let response: Message = {
  166. id: data.id,
  167. message: JSON.parse(data.message)
  168. }
  169. if (connectionAttribute.inComing.MessageToBeReceived) {
  170. connectionAttribute.inComing.MessageToBeReceived.next(response)
  171. }
  172. });
  173. call.on('error', (err) => {
  174. console.error(err)
  175. resolve('')
  176. });
  177. })
  178. }
  179. // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
  180. public async checkConnectionHealth(client: any, statusControl: Subject<ConnectionState>, alreadyHealthCheck: boolean): Promise<boolean> {
  181. return new Promise((resolve, reject) => {
  182. client.Check({}, (error, response) => {
  183. if (response) {
  184. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  185. // Intepret the response status and implement code logic or handler
  186. resolve(response.status)
  187. } else {
  188. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  189. reject(false)
  190. }
  191. })
  192. })
  193. }
  194. private isConnectionAttribute(obj: any): obj is ConnectionAttribute {
  195. const isMatch = (
  196. typeof obj.ConnectionID === 'object' && // Further checks can be added based on the structure of ConnectionID
  197. isStreamAttribute(obj.outGoing) &&
  198. isStreamAttribute(obj.inComing) &&
  199. (obj.connectionStatus === null || obj.connectionStatus instanceof Subject)
  200. );
  201. if (isMatch) {
  202. console.log('gRPC client call matches ConnectionAttribute type');
  203. } else {
  204. console.log('gRPC client call does not match ConnectionAttribute type');
  205. }
  206. return isMatch;
  207. function isStreamAttribute(obj: any): obj is StreamAttribute {
  208. return (
  209. (typeof obj.StreamID === 'string' || obj.StreamID === undefined) &&
  210. (typeof obj.PublisherID === 'string' || obj.PublisherID === undefined) &&
  211. (typeof obj.SubscriberID === 'string' || obj.SubscriberID === undefined) &&
  212. // Check other properties like PublisherInstance and SubscriberInstance based on their expected types
  213. (typeof obj.serverUrl === 'string' || obj.serverUrl === undefined) &&
  214. // Check connectionState based on its type, assuming it's an enum or similar
  215. (obj.MessageToBePublished === null || obj.MessageToBePublished instanceof Observable) &&
  216. (obj.MessageToBeReceived === null || obj.MessageToBeReceived instanceof Subject)
  217. );
  218. }
  219. }
  220. }
  221. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md