grpc2.ts 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import * as fs from 'fs'
  2. import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs';
  3. import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface';
  4. import { GrpcService } from '../services/grpc.service';
  5. import { FisRetransmissionService } from '../services/fis.retransmission.service';
  6. import { GrpcServiceMethod } from '../services/service.method';
  7. // Subject for bidirectional communication
  8. const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
  9. const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
  10. const messagesJSON: any = fs.readFileSync('payload.json')
  11. let incomingMessages: Subject<any> = grpcService.getIncomingMessage()
  12. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  13. let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
  14. let applicationOutgoingResponse: Subject<Message> = new Subject()
  15. let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
  16. let targetserver: string = 'localhost:3000'
  17. let hostServer: string = 'localhost:3001'
  18. let array: any[] = []// Used for testing
  19. // Handler for the incoming Messages from the other side.
  20. incomingMessages.subscribe({
  21. next: request => {
  22. // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  23. if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  24. generateFakeStreamResponse(request).subscribe({
  25. next: (responseMessage) => {
  26. // console.log(`Processing request:${request.id}....`)
  27. applicationOutgoingResponse.next(responseMessage)
  28. },
  29. error: error => console.error(error),
  30. complete: () => {
  31. console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  32. }
  33. })
  34. } else {
  35. array.push(request)
  36. console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
  37. }
  38. },
  39. error: error => console.error(error),
  40. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  41. })
  42. // Open channel for sending messages across.
  43. errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
  44. messageToBeReleased.next(messages)
  45. })
  46. grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
  47. grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
  48. setTimeout(() => {
  49. let message = {
  50. id: parsedMessages[10].appData.msgId,
  51. message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  52. }
  53. applicationOutgoingResponse.next(message)
  54. }, 2000)
  55. setTimeout(() => {
  56. let message = {
  57. id: parsedMessages[11].appData.msgId,
  58. message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  59. }
  60. applicationOutgoingResponse.next(message)
  61. }, 3000)
  62. setTimeout(() => {
  63. console.log(`All received data: ${array.length}`)
  64. }, 10000)
  65. setTimeout(() => {
  66. console.log(`All received data: ${array.length}`)
  67. }, 20000)
  68. // this is just to publish an array of fake data as a Subject
  69. function stream(): Subject<any> {
  70. let result: Subject<any> = new Subject()
  71. let messages: any[] = parsedMessages
  72. let count = 0
  73. const intervalId = setInterval(() => {
  74. result.next(messages[count]);
  75. count++;
  76. if (count >= 1000) {
  77. clearInterval(intervalId);
  78. result.complete();
  79. }
  80. }, 500)
  81. return result
  82. }
  83. function generateFakeStreamResponse(request: any): Subject<any> {
  84. let res: Subject<any> = new Subject()
  85. stream().pipe(take(5)).subscribe({
  86. next: element => {
  87. let message = {
  88. id: request.id, // Caller's
  89. message: element
  90. }
  91. res.next(message)
  92. },
  93. error: error => console.error(error),
  94. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  95. })
  96. return res
  97. }