grpc.service.method.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Observable, Subject, Subscription } from "rxjs";
  3. import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, MessageLog, ConnectionStatus, StreamAttribute } from "../interfaces/general.interface";
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. import { message_proto } from './protos/server.proto'
  6. import * as _ from 'lodash'
  7. export class GrpcServiceMethod {
  8. private server: grpc.Server | any
  9. private messageToBeSendOver: Message | any
  10. private clientRequest: Subject<ConnectionAttribute> = new Subject()
  11. private localServerStatus: Subject<any> = new Subject()
  12. public async create(connectionAttribute: ConnectionAttribute): Promise<any> {
  13. return new Promise((resolve, reject) => {
  14. this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute)
  15. this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute)
  16. resolve('Just putting it here for now....')
  17. })
  18. }
  19. private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
  20. // Reconnection Logic here
  21. while (true) {
  22. try {
  23. let recreatePromise = new Promise((resolve) => {
  24. if (grpcType.instanceType == 'server') {
  25. this.createServerStreamingServer(connectionAttribute).then(() => {
  26. resolve('recreate')
  27. })
  28. }
  29. if (grpcType.instanceType == 'client') {
  30. this.createServerStreamingClient(connectionAttribute).then(() => {
  31. resolve('recreate')
  32. })
  33. }
  34. })
  35. await recreatePromise
  36. } catch (error) {
  37. console.error('Connection attempt failed:', error);
  38. }
  39. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  40. // timeout generate message to trigger this reconnection
  41. }
  42. }
  43. // Create Server Instance to stream all application Outgoing messages
  44. public async createServerStreamingServer(connectionAttribute: ConnectionAttribute): Promise<any> { // '0.0.0.0:3001'
  45. return new Promise((resolve, reject) => {
  46. try {
  47. if (!this.server) {
  48. this.server = new grpc.Server()
  49. this.localServerStatus.next({
  50. connectionStatus: 'ON',
  51. connectionID: connectionAttribute.ConnectionID.local,
  52. message: `${connectionAttribute.outGoing.serverUrl} started.`
  53. })
  54. } else {
  55. console.log(`Grpc server alrady started.`)
  56. this.localServerStatus.next({
  57. connectionStatus: 'ON',
  58. connectionID: connectionAttribute.ConnectionID.local,
  59. message: `${connectionAttribute.outGoing.serverUrl} already started.`
  60. })
  61. }
  62. this.server.addService(message_proto.Message.service, {
  63. HandleMessage: (call) => {
  64. /// add a checking for standard message request
  65. let clientInfo: ConnectionAttribute = JSON.parse(call.request.message)
  66. if (this.isConnectionAttribute(clientInfo)) {
  67. this.clientRequest.next(clientInfo)
  68. console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
  69. if (connectionAttribute.outGoing.MessageToBePublished) {
  70. let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
  71. next: (response: Message) => {
  72. console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
  73. let message = {
  74. id: response.id,
  75. message: JSON.stringify(response.message)
  76. }
  77. call.write(message)
  78. },
  79. error: err => {
  80. console.error(err)
  81. subscription.unsubscribe()
  82. resolve('')
  83. },
  84. complete: () => {
  85. console.log(`Stream response completed for ${call.request.id}`)
  86. subscription.unsubscribe()
  87. resolve('')
  88. }
  89. })
  90. let report: ConnectionState = {
  91. status: 'DIRECT_PUBLISH'
  92. }
  93. connectionAttribute.connectionStatus!.next(report)
  94. }
  95. } else {
  96. console.error(`INVALID REQUEST! Client request does not fulfill criteria`)
  97. }
  98. },
  99. Check: (_, callback) => {
  100. // for now it is just sending the status message over to tell the client it is alive
  101. // For simplicity, always return "SERVING" as status
  102. callback(null, { status: 'SERVING' });
  103. },
  104. });
  105. // Bind and start the server
  106. this.server.bindAsync(connectionAttribute.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  107. console.log(`gRPC server is running on ${connectionAttribute.outGoing.serverUrl}`);
  108. this.server.start();
  109. });
  110. }
  111. catch (error) {
  112. resolve(error)
  113. }
  114. })
  115. }
  116. // Send a request over to the other server to open a channel for this server to emit/stream messages over
  117. public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise<string> {
  118. return new Promise(async (resolve, reject) => {
  119. const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure());
  120. let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error
  121. ConnectionID: connectionAttribute.ConnectionID,
  122. outGoing: {
  123. StreamID: connectionAttribute.outGoing.StreamID,
  124. PublisherID: connectionAttribute.outGoing.PublisherID,
  125. SubscriberID: connectionAttribute.outGoing.SubscriberID,
  126. serverUrl: connectionAttribute.outGoing.serverUrl,
  127. MessageToBePublished: null,
  128. MessageToBeReceived: null
  129. },
  130. inComing: {
  131. StreamID: connectionAttribute.inComing.StreamID,
  132. PublisherID: connectionAttribute.inComing.PublisherID,
  133. SubscriberID: connectionAttribute.inComing.SubscriberID,
  134. serverUrl: connectionAttribute.inComing.serverUrl,
  135. MessageToBePublished: null,
  136. MessageToBeReceived: null
  137. },
  138. connectionStatus: null
  139. }
  140. let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) })
  141. console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`)
  142. call.on('status', (status: Status) => {
  143. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  144. console.log(`Message trasmission operation is successful`)
  145. // RPC completed successfully
  146. } if (status == grpc.status.UNAVAILABLE) {
  147. let report: ConnectionState = {
  148. status: 'BUFFER',
  149. reason: `Server doesn't seem to be alive. Error returned.`,
  150. payload: this.messageToBeSendOver ?? `There's no message at the moment...`
  151. }
  152. connectionAttribute.connectionStatus!.next(report)
  153. let clientStatusUpdateInfo: any = {
  154. connectionStatus: 'OFF',
  155. connectionID: connectionAttribute.ConnectionID.remote,
  156. message: `${connectionAttribute.outGoing.serverUrl} started.`
  157. }
  158. this.clientRequest.next(clientStatusUpdateInfo)
  159. resolve('No connection established. Server is not responding..')
  160. }
  161. });
  162. call.on('data', (data: any) => {
  163. let response: Message = {
  164. id: data.id,
  165. message: JSON.parse(data.message)
  166. }
  167. if (connectionAttribute.inComing.MessageToBeReceived) {
  168. connectionAttribute.inComing.MessageToBeReceived.next(response)
  169. }
  170. });
  171. call.on('error', (err) => {
  172. console.error(err)
  173. resolve('')
  174. });
  175. })
  176. }
  177. // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
  178. public async checkConnectionHealth(client: any, statusControl: Subject<ConnectionState>, alreadyHealthCheck: boolean): Promise<boolean> {
  179. return new Promise((resolve, reject) => {
  180. client.Check({}, (error, response) => {
  181. if (response) {
  182. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  183. // Intepret the response status and implement code logic or handler
  184. resolve(response.status)
  185. } else {
  186. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  187. reject(false)
  188. }
  189. })
  190. })
  191. }
  192. // TO check or validate if the client request meets the criteria
  193. private isConnectionAttribute(obj: any): obj is ConnectionAttribute {
  194. const isMatch = (
  195. typeof obj.ConnectionID === 'object' && // Further checks can be added based on the structure of ConnectionID
  196. isStreamAttribute(obj.outGoing) &&
  197. isStreamAttribute(obj.inComing) &&
  198. (obj.connectionStatus === null || obj.connectionStatus instanceof Subject)
  199. );
  200. if (isMatch) {
  201. console.log('gRPC client call matches ConnectionAttribute type');
  202. } else {
  203. console.log('gRPC client call does not match ConnectionAttribute type');
  204. }
  205. return isMatch;
  206. function isStreamAttribute(obj: any): obj is StreamAttribute {
  207. return (
  208. (typeof obj.StreamID === 'string' || obj.StreamID === undefined) &&
  209. (typeof obj.PublisherID === 'string' || obj.PublisherID === undefined) &&
  210. (typeof obj.SubscriberID === 'string' || obj.SubscriberID === undefined) &&
  211. // Check other properties like PublisherInstance and SubscriberInstance based on their expected types
  212. (typeof obj.serverUrl === 'string' || obj.serverUrl === undefined) &&
  213. // Check connectionState based on its type, assuming it's an enum or similar
  214. (obj.MessageToBePublished === null || obj.MessageToBePublished instanceof Observable) &&
  215. (obj.MessageToBeReceived === null || obj.MessageToBeReceived instanceof Subject)
  216. );
  217. }
  218. }
  219. // update the referenced array
  220. public updateConnectionStatus(connectionAttributeArray: ConnectionAttribute[]) {
  221. this.localServerStatus.subscribe({
  222. next: (notification: any) => {
  223. if (notification.connectionStatus === `ON`) {
  224. let connectionAttribute = connectionAttributeArray.find(connection => connection.ConnectionID.local == notification.connectionID)
  225. if (connectionAttribute) {
  226. connectionAttribute.outGoing.connectionState = 'ON'
  227. console.log(`Connection ${notification.connectionID} updated`)
  228. } else {
  229. console.log(`Connection ${notification.connectionID} attribute is not found.`)
  230. }
  231. }
  232. },
  233. error: err => console.error(err),
  234. complete: () => { }
  235. })
  236. this.clientRequest.subscribe({
  237. next: (clientConnectionAttribute: ConnectionAttribute) => {
  238. console.log(`Received a request from ${clientConnectionAttribute.outGoing.serverUrl}`)
  239. let connectionAttribute = connectionAttributeArray.find(connection => connection.ConnectionID.remote == clientConnectionAttribute.ConnectionID.local)
  240. if (connectionAttribute) {
  241. console.log(`Connection ${clientConnectionAttribute.ConnectionID.local} updated`)
  242. connectionAttribute.inComing.connectionState = 'ON'
  243. } else {
  244. console.log(`Connection Attribut ${clientConnectionAttribute.inComing.PublisherID} is not found`)
  245. }
  246. },
  247. error: err => console.error(err),
  248. complete: () => { }
  249. })
  250. }
  251. }
  252. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md