grpc3.ts 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import * as fs from 'fs'
  2. import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs';
  3. import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface';
  4. import { GrpcService } from '../services/grpc.service';
  5. import { FisRetransmissionService } from '../services/fis.retransmission.service';
  6. import { GrpcServiceMethod } from '../services/service.method';
  7. // Subject for bidirectional communication
  8. const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
  9. const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
  10. const messagesJSON: any = fs.readFileSync('payload.json')
  11. let incomingMessages: Subject<any> = grpcService.getIncomingMessage()
  12. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  13. let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
  14. let applicationOutgoingResponse: Subject<Message> = new Subject()
  15. let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
  16. let targetserver: string = 'localhost:3000'
  17. let hostServer: string = 'localhost:3001'
  18. let array: any[] = []
  19. incomingMessages.subscribe({
  20. next: request => {
  21. // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  22. if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  23. generateFakeStreamResponse(request).subscribe({
  24. next: (responseMessage) => {
  25. // console.log(`Processing request:${request.id}....`)
  26. applicationOutgoingResponse.next(responseMessage)
  27. },
  28. error: error => console.error(error),
  29. complete: () => {
  30. console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  31. }
  32. })
  33. } else {
  34. array.push(request)
  35. console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
  36. }
  37. },
  38. error: error => console.error(error),
  39. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  40. })
  41. // Open channel for sending messages across.
  42. errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
  43. messageToBeReleased.next(messages)
  44. })
  45. grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
  46. grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
  47. setTimeout(() => {
  48. console.log(`All received data: ${array.length}`)
  49. }, 10000)
  50. setTimeout(() => {
  51. console.log(`All received data: ${array.length}`)
  52. }, 20000)
  53. // this is just to publish an array of fake data as a Subject
  54. function stream(): Subject<any> {
  55. let result: Subject<any> = new Subject()
  56. let messages: any[] = parsedMessages
  57. let count = 0
  58. const intervalId = setInterval(() => {
  59. result.next(messages[count]);
  60. count++;
  61. if (count >= 1000) {
  62. clearInterval(intervalId);
  63. result.complete();
  64. }
  65. }, 500)
  66. return result
  67. }
  68. function generateFakeStreamResponse(request: any): Subject<any> {
  69. let res: Subject<any> = new Subject()
  70. stream().pipe(take(5)).subscribe({
  71. next: element => {
  72. let message = {
  73. id: request.id, // Caller's
  74. message: element
  75. }
  76. res.next(message)
  77. },
  78. error: error => console.error(error),
  79. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  80. })
  81. return res
  82. }