grpc2.ts 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import * as fs from 'fs'
  2. import { Subject, groupBy, mergeMap, toArray } from 'rxjs';
  3. import { ColorCode, ReportStatus } from '../interfaces/general.interface';
  4. import { GrpcService } from '../services/grpc.service';
  5. import { FisRetransmissionService } from '../services/fis.retransmission.service';
  6. // Subject for bidirectional communication
  7. const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
  8. const grpcService: GrpcService = new GrpcService()
  9. const messagesJSON: any = fs.readFileSync('payload.json')
  10. let incomingResponse: Subject<any> = grpcService.getIncomingResponse()
  11. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  12. let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
  13. let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
  14. let dataMessages = stream() // Emulate messges to be sent over to target server
  15. let server1: string = 'localhost:3000'
  16. let unaryRequestSubject: Subject<any> = new Subject()
  17. let array: any[] = []
  18. incomingResponse.subscribe({
  19. next: request => {
  20. console.log(`To be distributed to request:${request.id} => message: ${request.message.appData.msgId}`)
  21. array.push(request)
  22. // Have to create a function that creates observables/subjects corresponding to the request made by the client to stream the responses
  23. /* now is one request will have it's own listener. If the client is down all listeners instantiated from all the request are terminated.
  24. Server now doesn't care because when it proceses all the request, the response are merged into one channel. My not so clever solution is that
  25. when client starts, it wil have to first send a request specifically just to grab the data that was loss, and then let the internal
  26. application do the sorting */
  27. /* To really see how this work, i will create an array. Since i wil pump in 3 request, the server side is hardcoded atm to stream 10 messagse
  28. for one request. I am going to terminate the client halfway through, change the code, so that when i start the client again, it will only send
  29. 1 request over, there making another request for 10 more. But of course, in the real implementation, this can be a initializer just to see if
  30. there's any buffered messages to be sent over
  31. */
  32. }
  33. })
  34. /* Server Streaming Test case */
  35. errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => {
  36. messageToBeReleased.next(messages)
  37. })
  38. grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
  39. messageToBeReleased.next(parsedMessages[0])
  40. setTimeout(() => {
  41. messageToBeReleased.next(parsedMessages[1])
  42. }, 2000)
  43. setTimeout(() => {
  44. messageToBeReleased.next(parsedMessages[2])
  45. }, 3000)
  46. setTimeout(() => {
  47. messageToBeReleased.next(parsedMessages[3])
  48. }, 5000)
  49. setTimeout(() => {
  50. console.log(`Total messages received: ${array.length}`)
  51. }, 11000)
  52. // this is just to publish an array of fake data as a Subject
  53. function stream(): Subject<any> {
  54. let result: Subject<any> = new Subject()
  55. let messages: any[] = parsedMessages
  56. let count = 0
  57. const intervalId = setInterval(() => {
  58. result.next(messages[count]);
  59. count++;
  60. if (count >= 1000) {
  61. clearInterval(intervalId);
  62. result.complete();
  63. }
  64. }, 500)
  65. return result
  66. }