grpc1.ts 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import * as fs from 'fs'
  2. import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs';
  3. import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface';
  4. import { GrpcService } from '../services/grpc.service';
  5. import { FisRetransmissionService } from '../services/fis.retransmission.service';
  6. import { GrpcServiceMethod } from '../services/service.method';
  7. // Subject for bidirectional communication
  8. const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
  9. const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
  10. const messagesJSON: any = fs.readFileSync('payload.json')
  11. let incomingMessages: Subject<any> = grpcService.getIncomingMessage()
  12. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  13. let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
  14. let applicationOutgoingResponse: Subject<Message> = new Subject()
  15. let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
  16. let targetserver: string = 'localhost:3001'
  17. let hostServer: string = 'localhost:3000'
  18. let array: any[] = [] // Used for testing
  19. // Handler for the incoming Messages from the other side.
  20. incomingMessages.subscribe({
  21. next: request => {
  22. // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  23. if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  24. generateFakeStreamResponse(request).subscribe({
  25. next: (responseMessage) => {
  26. // console.log(`Processing request:${request.id}....`)
  27. applicationOutgoingResponse.next(responseMessage)
  28. },
  29. error: error => console.error(error),
  30. complete: () => {
  31. console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  32. }
  33. })
  34. } else {
  35. array.push(request)
  36. console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
  37. }
  38. },
  39. error: error => console.error(error),
  40. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  41. })
  42. // Open channel for sending messages across.
  43. errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
  44. messageToBeReleased.next(messages)
  45. })
  46. grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
  47. grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
  48. setTimeout(() => {
  49. let message = {
  50. id: parsedMessages[10].appData.msgId,
  51. message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  52. }
  53. applicationOutgoingResponse.next(message)
  54. }, 2000)
  55. setTimeout(() => {
  56. let message = {
  57. id: parsedMessages[11].appData.msgId,
  58. message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  59. }
  60. applicationOutgoingResponse.next(message)
  61. }, 3000)
  62. setTimeout(() => {
  63. console.log(`All received data: ${array.length}`)
  64. }, 10000)
  65. setTimeout(() => {
  66. console.log(`All received data: ${array.length}`)
  67. }, 20000)
  68. // this is just to publish an array of fake data as a Subject
  69. function stream(): Subject<any> {
  70. let result: Subject<any> = new Subject()
  71. let messages: any[] = parsedMessages
  72. let count = 0
  73. const intervalId = setInterval(() => {
  74. result.next(messages[count]);
  75. count++;
  76. if (count >= 1000) {
  77. clearInterval(intervalId);
  78. result.complete();
  79. }
  80. }, 500)
  81. return result
  82. }
  83. function generateFakeStreamResponse(request: any): Subject<any> {
  84. let res: Subject<any> = new Subject()
  85. stream().pipe(take(7)).subscribe({
  86. next: element => {
  87. let message = {
  88. id: request.id, // Caller's
  89. message: element
  90. }
  91. res.next(message)
  92. },
  93. error: error => console.error(error),
  94. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  95. })
  96. return res
  97. }
  98. /* Extra NOTEs:
  99. So let's say when this host makes a request and send it over to the other side to process. And say this request
  100. is actually a query, so it will take some times for the other side to process the data. But what happens when the
  101. other side down, that means i won't get my query. Is this the responsibility of the application logic of the other
  102. side? To keep track of the message request sent over?
  103. Personal opinion it should be the responsibility of the application logic to keep track of the request they are
  104. processing, especially when the server goes down whilst they are streaning back the response. Because for this
  105. retransmission service, it shouldn't care anymore. It just make sure to get the message to pass over and buffer
  106. the message when established client instance could not talk to the server on the other side.*/