grpc2.ts 3.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import * as fs from 'fs'
  2. import { Subject, groupBy, mergeMap, toArray } from 'rxjs';
  3. import { ColorCode, GrpcMessage, MessageLog, ReportStatus } from '../interfaces/general.interface';
  4. import { GrpcService } from '../services/grpc.service';
  5. import { FisRetransmissionService } from '../services/fis.retransmission.service';
  6. import { GrpcServiceMethod } from '../services/service.method';
  7. // Subject for bidirectional communication
  8. const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
  9. const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
  10. const messagesJSON: any = fs.readFileSync('payload.json')
  11. let incomingResponse: Subject<any> = grpcService.getIncomingResponse()
  12. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  13. let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
  14. let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
  15. let dataMessages = stream() // Emulate messges to be sent over to target server
  16. let server1: string = 'localhost:3000'
  17. let unaryRequestSubject: Subject<GrpcMessage> = new Subject()
  18. let array: any[] = []
  19. incomingResponse.subscribe({
  20. next: request => {
  21. array.push(request)
  22. console.log(`To be distributed to request:${request.id} => message: ${(request.message as MessageLog).appData.msgId}`)
  23. /* Here's the plan. Since each and every single call that the client make, the server will open a stream specifically for that
  24. client. Now. A new strategy to cater to the revised solution. Everytime a client start up, it will first make a request call
  25. spefically to open a stream specifically for response. THe demulitplexing will be delegated outside. The grpc instance should
  26. not be bothered with hashydabery that's going on. Server side just need to receive the request, (must terminate the stream
  27. response if it eats up too much memory), and then respond with the stream established earlier when the client first send the
  28. first request call.
  29. New dilemma, there will be some changes to be followed as well. Since i want to integrate the termination of the stream and in
  30. this case, the stream will terminate right after it sends a acknoledgement saying that the request is being processed. But due
  31. to the reconnection logic integration, the client will treat server.end as a server failure, and will try to execute the
  32. reconnection logic. Should not be hard to change.
  33. */
  34. }
  35. })
  36. /* Server Streaming Test case */
  37. errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => {
  38. messageToBeReleased.next(messages)
  39. })
  40. grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
  41. setTimeout(() => {
  42. let request = {
  43. id: parsedMessages[1].appData.msgId,
  44. message: parsedMessages[1]
  45. }
  46. unaryRequestSubject.next(request)
  47. }, 1000)
  48. setTimeout(() => {
  49. let request = {
  50. id: parsedMessages[2].appData.msgId,
  51. message: parsedMessages[2]
  52. }
  53. unaryRequestSubject.next(request)
  54. }, 3000)
  55. setTimeout(() => {
  56. let request = {
  57. id: parsedMessages[3].appData.msgId,
  58. message: parsedMessages[3]
  59. }
  60. unaryRequestSubject.next(request)
  61. }, 5000)
  62. setTimeout(() => {
  63. console.log(`Full amount received: ${array.length}`)
  64. }, 10000)
  65. setTimeout(() => {
  66. console.log(`Full amount received: ${array.length}`)
  67. }, 12000)
  68. // this is just to publish an array of fake data as a Subject
  69. function stream(): Subject<any> {
  70. let result: Subject<any> = new Subject()
  71. let messages: any[] = parsedMessages
  72. let count = 0
  73. const intervalId = setInterval(() => {
  74. result.next(messages[count]);
  75. count++;
  76. if (count >= 1000) {
  77. clearInterval(intervalId);
  78. result.complete();
  79. }
  80. }, 500)
  81. return result
  82. }