grpc.service.method.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Observable, Subject, Subscription } from "rxjs";
  3. import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, StreamAttribute, MessageLog } from "../interfaces/general.interface";
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. import { message_proto } from './protos/server.proto'
  6. import * as _ from 'lodash'
  7. import { ServerClientManager } from './server-client.service';
  8. export class GrpcServiceMethod {
  9. private connectionAttribute: ConnectionAttribute | undefined
  10. private connectionAttributes: ConnectionAttribute[] = []
  11. private server: grpc.Server | any
  12. private messageToBeSendOver: Message | any
  13. private clientRequest: Subject<ConnectionAttribute> = new Subject()
  14. // public interface for service client to establish connection. They will give server and client information as a pair
  15. public async create(connectionAttribute: ConnectionAttribute, connectionAttributes: ConnectionAttribute[]): Promise<any> {
  16. this.connectionAttribute = connectionAttribute
  17. this.connectionAttributes = connectionAttributes
  18. return new Promise((resolve, reject) => {
  19. this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute)
  20. this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute)
  21. resolve('Just putting it here for now....')
  22. })
  23. }
  24. private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
  25. // Reconnection Logic
  26. while (true) {
  27. try {
  28. let recreatePromise = new Promise((resolve) => {
  29. if (grpcType.instanceType == 'server' && !this.server) {
  30. this.createServerStreamingServer().then(() => {
  31. resolve('recreate')
  32. })
  33. }
  34. if (grpcType.instanceType == 'client') {
  35. this.createServerStreamingClient(connectionAttribute).then(() => {
  36. resolve('recreate')
  37. })
  38. }
  39. })
  40. await recreatePromise
  41. } catch (error) {
  42. console.error('Connection attempt failed:', error);
  43. }
  44. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  45. // timeout generate message to trigger this reconnection
  46. }
  47. }
  48. // Create Server Instance to stream all application Outgoing messages
  49. public async createServerStreamingServer(): Promise<any> { // '0.0.0.0:3001'
  50. return new Promise((resolve, reject) => {
  51. this.server = new grpc.Server()
  52. this.server.addService(message_proto.Message.service, {
  53. HandleMessage: (call) => {
  54. let clientRequest: ConnectionAttribute = JSON.parse(call.request.message)
  55. /* Receive request, it will talk server client service, scs will do the checking instead of doing them here,
  56. two scenario. if this is a new client, then a new subject is assigned.
  57. IF this is previous channel, an existing subject is assigned back. */
  58. // client Request validation
  59. if (this.isConnectionAttribute(clientRequest)) {
  60. // Check if this connection exists
  61. let result: ConnectionAttribute | undefined = this.connectionAttributes.find((connectionAttribute: ConnectionAttribute) => connectionAttribute.ConnectionID.local === clientRequest.ConnectionID.remote)
  62. if (result) {
  63. // if exist, reassign back the buffer
  64. let subscription: Subscription = result.outGoing.MessageToBePublished!.subscribe({
  65. next: (outGoingMessage: Message) => {
  66. let message = {
  67. id: outGoingMessage.id,
  68. message: JSON.stringify(outGoingMessage.message)
  69. }
  70. console.log(`Sending ${(outGoingMessage.message as MessageLog).appData.msgId} to ${clientRequest.outGoing.PublisherID}`)
  71. call.write(message)
  72. },
  73. error: err => {
  74. console.error(err)
  75. subscription.unsubscribe()
  76. resolve(``)
  77. },
  78. complete: () => {
  79. subscription.unsubscribe()
  80. resolve(``)
  81. }
  82. })
  83. let report: ConnectionState = {
  84. status: `DIRECT_PUBLISH`
  85. }
  86. result.connectionStatus!.next(report)
  87. }
  88. if (!result) {
  89. console.log(`No matching results.... leaving the logic blank for now...`)
  90. /* Currently haven't thought of a solution for this. Even if i do , the simplest one
  91. woul be to assisgn a new buffer, which means the server client service will
  92. have to instantiate a new one for the incoming new client. Right now, there
  93. is no need since the amount of clients and their ID are predetermined.
  94. TO be discuseed further. */
  95. }
  96. }
  97. }
  98. })
  99. this.server.bindAsync(this.connectionAttribute!.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  100. console.log(`gRPC server is running on ${this.connectionAttribute?.outGoing.serverUrl}`)
  101. this.server.start()
  102. })
  103. })
  104. }
  105. // Send a request over to the other server to open a channel for this server to emit/stream messages over
  106. public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise<string> {
  107. return new Promise(async (resolve, reject) => {
  108. const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure());
  109. let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error
  110. ConnectionID: connectionAttribute.ConnectionID,
  111. outGoing: {
  112. StreamID: connectionAttribute.outGoing.StreamID,
  113. PublisherID: connectionAttribute.outGoing.PublisherID,
  114. SubscriberID: connectionAttribute.outGoing.SubscriberID,
  115. serverUrl: connectionAttribute.outGoing.serverUrl,
  116. MessageToBePublished: null,
  117. MessageToBeReceived: null
  118. },
  119. inComing: {
  120. StreamID: connectionAttribute.inComing.StreamID,
  121. PublisherID: connectionAttribute.inComing.PublisherID,
  122. SubscriberID: connectionAttribute.inComing.SubscriberID,
  123. serverUrl: connectionAttribute.inComing.serverUrl,
  124. MessageToBePublished: null,
  125. MessageToBeReceived: null
  126. },
  127. connectionStatus: null
  128. }
  129. let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) })
  130. console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`)
  131. call.on('status', (status: Status) => {
  132. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  133. console.log(`Message trasmission operation is successful`)
  134. // RPC completed successfully
  135. } if (status == grpc.status.UNAVAILABLE) {
  136. let report: ConnectionState = {
  137. status: 'BUFFER',
  138. reason: `Server doesn't seem to be alive. Error returned.`,
  139. payload: this.messageToBeSendOver ?? `There's no message at the moment...`
  140. }
  141. connectionAttribute.connectionStatus!.next(report)
  142. resolve('No connection established. Server is not responding..')
  143. }
  144. });
  145. call.on('data', (data: any) => {
  146. let response: Message = {
  147. id: data.id,
  148. message: JSON.parse(data.message)
  149. }
  150. if (connectionAttribute.inComing.MessageToBeReceived) {
  151. connectionAttribute.inComing.MessageToBeReceived.next(response)
  152. }
  153. });
  154. call.on('error', (err) => {
  155. // console.error(err)
  156. let report: ConnectionState = {
  157. status: 'BUFFER',
  158. reason: `Server doesn't seem to be alive. Error returned.`,
  159. payload: this.messageToBeSendOver ?? `There's no message at the moment...`
  160. }
  161. connectionAttribute.connectionStatus!.next(report)
  162. resolve('')
  163. });
  164. })
  165. }
  166. // TO check or validate if the client request meets the criteria
  167. private isConnectionAttribute(obj: any): obj is ConnectionAttribute {
  168. const isMatch = (
  169. typeof obj.ConnectionID === 'object' && // Further checks can be added based on the structure of ConnectionID
  170. isStreamAttribute(obj.outGoing) &&
  171. isStreamAttribute(obj.inComing) &&
  172. (obj.connectionStatus === null || obj.connectionStatus instanceof Subject)
  173. );
  174. if (isMatch) {
  175. console.log('gRPC client call matches ConnectionAttribute type');
  176. } else {
  177. console.log('gRPC client call does not match ConnectionAttribute type');
  178. }
  179. return isMatch;
  180. function isStreamAttribute(obj: any): obj is StreamAttribute {
  181. return (
  182. (typeof obj.StreamID === 'string' || obj.StreamID === undefined) &&
  183. (typeof obj.PublisherID === 'string' || obj.PublisherID === undefined) &&
  184. (typeof obj.SubscriberID === 'string' || obj.SubscriberID === undefined) &&
  185. // Check other properties like PublisherInstance and SubscriberInstance based on their expected types
  186. (typeof obj.serverUrl === 'string' || obj.serverUrl === undefined) &&
  187. // Check connectionState based on its type, assuming it's an enum or similar
  188. (obj.MessageToBePublished === null || obj.MessageToBePublished instanceof Observable) &&
  189. (obj.MessageToBeReceived === null || obj.MessageToBeReceived instanceof Subject)
  190. );
  191. }
  192. }
  193. }