grpc3.ts 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import { Subject, take } from 'rxjs';
  2. import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
  3. import { GrpcServiceMethod } from '../services/grpc.service.method';
  4. import { readFileSync } from 'fs';
  5. import { ServerClientManager } from '../services/server-client.service';
  6. // Subject for bidirectional communication
  7. const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
  8. const messagesJSON: any = readFileSync('payload.json')
  9. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  10. let targetserver: string = 'localhost:300'
  11. // let targetserver2: string = 'localhost:3002'
  12. let hostServer: string = 'localhost:3002'
  13. let array: any[] = [] // Used for testing
  14. let request: ConnectionRequest = {
  15. server: {
  16. serverUrl: hostServer,
  17. connectionType: 'GRPC',
  18. messageToBePublishedfromApplication: new Subject<Message>()
  19. },
  20. client: {
  21. targetServer: targetserver,
  22. connectionType: 'GRPC',
  23. messageToBeReceivedFromRemote: new Subject<Message>()
  24. },
  25. }
  26. // Handler for the incoming Messages from the other side.
  27. request.client.messageToBeReceivedFromRemote.subscribe({
  28. next: request => {
  29. // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  30. if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  31. generateFakeStreamResponse(request).subscribe({
  32. next: (responseMessage) => {
  33. // console.log(`Processing request:${request.id}....`)
  34. request.server.messageToBePublishedfromApplication.next(responseMessage)
  35. },
  36. error: error => console.error(error),
  37. complete: () => {
  38. console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  39. }
  40. })
  41. } else {
  42. array.push(request)
  43. console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
  44. }
  45. },
  46. error: error => console.error(error),
  47. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  48. })
  49. connectionService.generateConnection(request)
  50. // grpcService.createGrpcInstance(hostServer, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
  51. // grpcService.createGrpcInstance(targetserver, { instanceType: 'client', serviceMethod: 'server streaming' })
  52. // setTimeout(() => {
  53. // let message = {
  54. // id: parsedMessages[10].appData.msgId,
  55. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  56. // }
  57. // applicationOutgoingResponse.next(message)
  58. // }, 3000)
  59. // setTimeout(() => {
  60. // let message = {
  61. // id: parsedMessages[11].appData.msgId,
  62. // message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  63. // }
  64. // applicationOutgoingResponse.next(message)
  65. // }, 4000)
  66. // setTimeout(() => {
  67. // console.log(`All received data: ${array.length}`)
  68. // }, 10000)
  69. // setTimeout(() => {
  70. // console.log(`All received data: ${array.length}`)
  71. // }, 20000)
  72. // this is just to publish an array of fake data as a Subject
  73. function stream(): Subject<any> {
  74. let result: Subject<any> = new Subject()
  75. let messages: any[] = parsedMessages
  76. let count = 0
  77. const intervalId = setInterval(() => {
  78. result.next(messages[count]);
  79. count++;
  80. if (count >= 1000) {
  81. clearInterval(intervalId);
  82. result.complete();
  83. }
  84. }, 500)
  85. return result
  86. }
  87. function generateFakeStreamResponse(request: any): Subject<any> {
  88. let res: Subject<any> = new Subject()
  89. stream().pipe(take(7)).subscribe({
  90. next: element => {
  91. let message = {
  92. id: request.id, // Caller's
  93. message: element
  94. }
  95. res.next(message)
  96. },
  97. error: error => console.error(error),
  98. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  99. })
  100. return res
  101. }