grpc1.ts 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import { Subject, from, take, takeUntil } from "rxjs";
  2. import { GrpcService } from "../services/grpc.service";
  3. import * as fs from 'fs'
  4. import { FisRetransmissionService } from "../services/fis.retransmission.service";
  5. import { ReportStatus } from "../interfaces/general.interface";
  6. import { GrpcServiceMethod } from "../services/service.method";
  7. const messagesJSON: any = fs.readFileSync('payload.json')
  8. const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
  9. const gprcService: GrpcService = new GrpcService(new GrpcServiceMethod())
  10. let incomingRequest: Subject<any> = gprcService.getIncomingRequest()
  11. let applicationOutgoingResponse: Subject<any> = new Subject()
  12. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  13. let dataMessages = stream() // Emulate messges to be sent over to target server
  14. let messageToBePublished: Subject<any> = new Subject()
  15. let statusControl: Subject<ReportStatus> = new Subject()
  16. incomingRequest.subscribe({
  17. next: request => { // this whole thing is for 1 request
  18. if (request.id == '0000') {
  19. console.log(`Just checking for buffer. Dont do anything else!`)
  20. } else {
  21. generateFakeStreamResponse(request).subscribe({
  22. next: (responseMessage) => {
  23. // console.log(`Processing request:${request.id}....`)
  24. applicationOutgoingResponse.next(responseMessage)
  25. },
  26. error: error => console.error(error),
  27. complete: () => {
  28. console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  29. }
  30. })
  31. }
  32. },
  33. error: error => console.error(error),
  34. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  35. })
  36. /* For server streaming */
  37. errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
  38. messageToBePublished.next(messages)
  39. })
  40. let server1 = 'localhost:3000'
  41. gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' })
  42. // this is just to publish an array of fake data as a Subject
  43. function stream(): Subject<any> {
  44. let result: Subject<any> = new Subject()
  45. let messages: any[] = parsedMessages
  46. let count = 0
  47. const intervalId = setInterval(() => {
  48. result.next(messages[count]);
  49. count++;
  50. if (count >= 1000) {
  51. clearInterval(intervalId);
  52. result.complete();
  53. }
  54. }, 500)
  55. return result
  56. }
  57. function generateFakeStreamResponse(request: any): Subject<any> {
  58. let res: Subject<any> = new Subject()
  59. stream().pipe(take(10)).subscribe({
  60. next: element => {
  61. let message = {
  62. id: request.id, // Caller's
  63. message: element
  64. }
  65. res.next(message)
  66. },
  67. error: error => console.error(error),
  68. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  69. })
  70. return res
  71. }