grpc.service.method.bak 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. import * as grpc from '@grpc/grpc-js';
  2. import { Observable, Subject, Subscription } from "rxjs";
  3. import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, MessageLog, ConnectionStatus, StreamAttribute, State } from "../interfaces/general.interface";
  4. import { Status } from '@grpc/grpc-js/build/src/constants';
  5. import { message_proto } from './protos/server.proto'
  6. import * as _ from 'lodash'
  7. export class GrpcServiceMethod {
  8. private connectionAttributes: ConnectionAttribute[] = []
  9. private updateConnectionStatusFlag: boolean = false
  10. private server: grpc.Server | any
  11. private messageToBeSendOver: Message | any
  12. private clientRequest: Subject<ConnectionAttribute> = new Subject()
  13. private localServerStatus: Subject<any> = new Subject()
  14. public async create(connectionAttribute: ConnectionAttribute, connectionAttributes: ConnectionAttribute[]): Promise<any> {
  15. this.connectionAttributes = connectionAttributes
  16. return new Promise((resolve, reject) => {
  17. this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute)
  18. this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute)
  19. resolve('Just putting it here for now....')
  20. })
  21. }
  22. private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
  23. // Reconnection Logic here
  24. while (true) {
  25. try {
  26. let recreatePromise = new Promise((resolve) => {
  27. if (grpcType.instanceType == 'server' && !this.server) {
  28. this.createServerStreamingServer(connectionAttribute).then(() => {
  29. resolve('recreate')
  30. })
  31. }
  32. if (grpcType.instanceType == 'client') {
  33. this.createServerStreamingClient(connectionAttribute).then(() => {
  34. resolve('recreate')
  35. })
  36. }
  37. })
  38. await recreatePromise
  39. } catch (error) {
  40. console.error('Connection attempt failed:', error);
  41. }
  42. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  43. // timeout generate message to trigger this reconnection
  44. }
  45. }
  46. // Create Server Instance to stream all application Outgoing messages
  47. public async createServerStreamingServer(connectionAttribute: ConnectionAttribute): Promise<any> { // '0.0.0.0:3001'
  48. return new Promise((resolve, reject) => {
  49. try {
  50. let statusChain: State = 1
  51. if (statusChain == 1) {
  52. // Bind and start the server
  53. this.server = new grpc.Server()
  54. this.server.bindAsync(connectionAttribute.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => {
  55. console.log(`gRPC server is running on ${connectionAttribute.outGoing.serverUrl}`);
  56. this.server.start();
  57. });
  58. this.localServerStatus.next({
  59. connectionStatus: 'ON',
  60. connectionID: connectionAttribute.ConnectionID.local,
  61. message: `${connectionAttribute.outGoing.serverUrl} started.`
  62. })
  63. }
  64. if (statusChain == 1 && !this.server) {
  65. this.server.addService(message_proto.Message.service, {
  66. HandleMessage: (call) => {
  67. /// add a checking for standard message request
  68. let clientInfo: ConnectionAttribute = JSON.parse(call.request.message)
  69. if (this.isConnectionAttribute(clientInfo)) {
  70. // this.clientRequest.next(clientInfo)
  71. console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
  72. let result: ConnectionAttribute | undefined = this.connectionAttributes.find(connectionAttribute => connectionAttribute.ConnectionID.local = clientInfo.ConnectionID.remote)
  73. if (!result) {
  74. console.log(`No connectionID match. This should be new connection...`)
  75. let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished!.subscribe({
  76. next: (response: Message) => {
  77. console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
  78. let message = {
  79. id: response.id,
  80. message: JSON.stringify(response.message)
  81. }
  82. call.write(message)
  83. },
  84. error: err => {
  85. console.error(err)
  86. subscription.unsubscribe()
  87. resolve('')
  88. },
  89. complete: () => {
  90. console.log(`Stream response completed for ${call.request.id}`)
  91. subscription.unsubscribe()
  92. resolve('')
  93. }
  94. })
  95. let report: ConnectionState = {
  96. status: 'DIRECT_PUBLISH'
  97. }
  98. connectionAttribute.connectionStatus!.next(report)
  99. }
  100. if (result && result.outGoing.connectionState == `OFF`) {
  101. // reassign previously matched buffer
  102. console.log(`Connection info found in array matched. Assigning buffer....`)
  103. let subscription: Subscription = result.outGoing.MessageToBePublished!.subscribe({
  104. next: (response: Message) => {
  105. console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
  106. let message = {
  107. id: response.id,
  108. message: JSON.stringify(response.message)
  109. }
  110. call.write(message)
  111. },
  112. error: err => {
  113. console.error(err)
  114. subscription.unsubscribe()
  115. resolve('')
  116. },
  117. complete: () => {
  118. console.log(`Stream response completed for ${call.request.id}`)
  119. subscription.unsubscribe()
  120. resolve('')
  121. }
  122. })
  123. let report: ConnectionState = {
  124. status: 'DIRECT_PUBLISH'
  125. }
  126. result.connectionStatus!.next(report)
  127. }
  128. } else {
  129. console.error(`INVALID REQUEST! Client request does not fulfill criteria`)
  130. }
  131. },
  132. Check: (_, callback) => {
  133. // for now it is just sending the status message over to tell the client it is alive
  134. // For simplicity, always return "SERVING" as status
  135. callback(null, { status: 'SERVING' });
  136. },
  137. });
  138. }
  139. }
  140. catch (error) {
  141. resolve(error)
  142. }
  143. })
  144. }
  145. // Send a request over to the other server to open a channel for this server to emit/stream messages over
  146. public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise<string> {
  147. return new Promise(async (resolve, reject) => {
  148. const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure());
  149. let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error
  150. ConnectionID: connectionAttribute.ConnectionID,
  151. outGoing: {
  152. StreamID: connectionAttribute.outGoing.StreamID,
  153. PublisherID: connectionAttribute.outGoing.PublisherID,
  154. SubscriberID: connectionAttribute.outGoing.SubscriberID,
  155. serverUrl: connectionAttribute.outGoing.serverUrl,
  156. MessageToBePublished: null,
  157. MessageToBeReceived: null
  158. },
  159. inComing: {
  160. StreamID: connectionAttribute.inComing.StreamID,
  161. PublisherID: connectionAttribute.inComing.PublisherID,
  162. SubscriberID: connectionAttribute.inComing.SubscriberID,
  163. serverUrl: connectionAttribute.inComing.serverUrl,
  164. MessageToBePublished: null,
  165. MessageToBeReceived: null
  166. },
  167. connectionStatus: null
  168. }
  169. let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) })
  170. console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`)
  171. call.on('status', (status: Status) => {
  172. if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
  173. console.log(`Message trasmission operation is successful`)
  174. // RPC completed successfully
  175. } if (status == grpc.status.UNAVAILABLE) {
  176. let report: ConnectionState = {
  177. status: 'BUFFER',
  178. reason: `Server doesn't seem to be alive. Error returned.`,
  179. payload: this.messageToBeSendOver ?? `There's no message at the moment...`
  180. }
  181. connectionAttribute.connectionStatus!.next(report)
  182. let clientStatusUpdateInfo: any = {
  183. connectionStatus: 'OFF',
  184. connectionID: connectionAttribute.ConnectionID.remote,
  185. message: `${connectionAttribute.outGoing.serverUrl} started.`
  186. }
  187. this.clientRequest.next(clientStatusUpdateInfo)
  188. resolve('No connection established. Server is not responding..')
  189. }
  190. });
  191. call.on('data', (data: any) => {
  192. let response: Message = {
  193. id: data.id,
  194. message: JSON.parse(data.message)
  195. }
  196. if (connectionAttribute.inComing.MessageToBeReceived) {
  197. connectionAttribute.inComing.MessageToBeReceived.next(response)
  198. }
  199. });
  200. call.on('error', (err) => {
  201. console.error(err)
  202. resolve('')
  203. });
  204. })
  205. }
  206. // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
  207. public async checkConnectionHealth(client: any, statusControl: Subject<ConnectionState>, alreadyHealthCheck: boolean): Promise<boolean> {
  208. return new Promise((resolve, reject) => {
  209. client.Check({}, (error, response) => {
  210. if (response) {
  211. console.log(`GRPC Health check status: ${response.status} Server Connected`);
  212. // Intepret the response status and implement code logic or handler
  213. resolve(response.status)
  214. } else {
  215. if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
  216. reject(false)
  217. }
  218. })
  219. })
  220. }
  221. // TO check or validate if the client request meets the criteria
  222. private isConnectionAttribute(obj: any): obj is ConnectionAttribute {
  223. const isMatch = (
  224. typeof obj.ConnectionID === 'object' && // Further checks can be added based on the structure of ConnectionID
  225. isStreamAttribute(obj.outGoing) &&
  226. isStreamAttribute(obj.inComing) &&
  227. (obj.connectionStatus === null || obj.connectionStatus instanceof Subject)
  228. );
  229. if (isMatch) {
  230. console.log('gRPC client call matches ConnectionAttribute type');
  231. } else {
  232. console.log('gRPC client call does not match ConnectionAttribute type');
  233. }
  234. return isMatch;
  235. function isStreamAttribute(obj: any): obj is StreamAttribute {
  236. return (
  237. (typeof obj.StreamID === 'string' || obj.StreamID === undefined) &&
  238. (typeof obj.PublisherID === 'string' || obj.PublisherID === undefined) &&
  239. (typeof obj.SubscriberID === 'string' || obj.SubscriberID === undefined) &&
  240. // Check other properties like PublisherInstance and SubscriberInstance based on their expected types
  241. (typeof obj.serverUrl === 'string' || obj.serverUrl === undefined) &&
  242. // Check connectionState based on its type, assuming it's an enum or similar
  243. (obj.MessageToBePublished === null || obj.MessageToBePublished instanceof Observable) &&
  244. (obj.MessageToBeReceived === null || obj.MessageToBeReceived instanceof Subject)
  245. );
  246. }
  247. }
  248. // update the referenced array
  249. // public updateConnectionStatus(connectionAttributeArray: ConnectionAttribute[]) {
  250. // if (this.updateConnectionStatusFlag === false) {
  251. // this.updateConnectionStatusFlag = true
  252. // this.localServerStatus.subscribe({
  253. // next: (notification: any) => {
  254. // if (notification.connectionStatus === `ON`) {
  255. // let connectionAttribute = connectionAttributeArray.find(connection => connection.ConnectionID.local == notification.connectionID)
  256. // if (connectionAttribute) {
  257. // connectionAttribute.outGoing.connectionState = 'ON'
  258. // console.log(`Local Connection ${notification.connectionID} updated. {${connectionAttribute.outGoing.connectionState}}`)
  259. // } else {
  260. // console.log(`Local Connection ${notification.connectionID} attribute is not found.`)
  261. // }
  262. // }
  263. // },
  264. // error: err => console.error(err),
  265. // complete: () => { }
  266. // })
  267. // this.clientRequest.subscribe({
  268. // next: (clientConnectionAttribute: ConnectionAttribute) => {
  269. // console.log(`Received a request from ${clientConnectionAttribute.outGoing.serverUrl}`)
  270. // let connectionAttribute = connectionAttributeArray.find(connection => connection.ConnectionID.remote == clientConnectionAttribute.ConnectionID.local)
  271. // if (connectionAttribute) {
  272. // connectionAttribute.inComing.connectionState = 'ON'
  273. // console.log(`Client Connection ${clientConnectionAttribute.ConnectionID.local} updated. {${connectionAttribute.inComing.connectionState}}`)
  274. // } else {
  275. // console.log(`Client Connection Attribute ${clientConnectionAttribute.inComing.PublisherID} is not found`)
  276. // }
  277. // },
  278. // error: err => console.error(err),
  279. // complete: () => { }
  280. // })
  281. // } else {
  282. // console.log(`Update Connection Status already called.`)
  283. // }
  284. // }
  285. }
  286. // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md