grpc1.ts 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import { Subject, from, take, takeUntil } from "rxjs";
  2. import { GrpcService } from "../services/grpc.service";
  3. import * as fs from 'fs'
  4. import { FisRetransmissionService } from "../services/fis.retransmission.service";
  5. import { ReportStatus } from "../interfaces/general.interface";
  6. const messagesJSON: any = fs.readFileSync('payload.json')
  7. const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
  8. const gprcService: GrpcService = new GrpcService()
  9. let incomingRequest: Subject<any> = gprcService.getIncomingRequest()
  10. let applicationOutgoingResponse: Subject<any> = new Subject()
  11. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  12. let dataMessages = stream() // Emulate messges to be sent over to target server
  13. let messageToBePublished: Subject<any> = new Subject()
  14. let statusControl: Subject<ReportStatus> = new Subject()
  15. incomingRequest.subscribe({
  16. next: request => { // this whole thing is for 1 request
  17. if (request.id == '0000') {
  18. console.log(`Just checking for buffer. Dont do anything else!`)
  19. } else {
  20. generateFakeStreamResponse(request).subscribe({
  21. next: (responseMessage) => {
  22. // console.log(`Processing request:${request.id}....`)
  23. applicationOutgoingResponse.next(responseMessage)
  24. },
  25. error: error => console.error(error),
  26. complete: () => {
  27. console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  28. }
  29. })
  30. }
  31. },
  32. error: error => console.error(error),
  33. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  34. })
  35. /* For server streaming */
  36. errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
  37. messageToBePublished.next(messages)
  38. })
  39. let server1 = 'localhost:3000'
  40. gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' })
  41. // this is just to publish an array of fake data as a Subject
  42. function stream(): Subject<any> {
  43. let result: Subject<any> = new Subject()
  44. let messages: any[] = parsedMessages
  45. let count = 0
  46. const intervalId = setInterval(() => {
  47. result.next(messages[count]);
  48. count++;
  49. if (count >= 1000) {
  50. clearInterval(intervalId);
  51. result.complete();
  52. }
  53. }, 500)
  54. return result
  55. }
  56. function generateFakeStreamResponse(request: any): Subject<any> {
  57. let res: Subject<any> = new Subject()
  58. stream().pipe(take(10)).subscribe({
  59. next: element => {
  60. let message = {
  61. id: request.id, // Caller's
  62. message: element
  63. }
  64. res.next(message)
  65. },
  66. error: error => console.error(error),
  67. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  68. })
  69. return res
  70. }