grpc2.ts 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import * as fs from 'fs'
  2. import { Subject } from 'rxjs';
  3. import { ColorCode, ReportStatus } from '../interfaces/general.interface';
  4. import { GrpcService } from '../services/grpc.service';
  5. import { FisErrorHandlingService } from '../services/error.handling.service.fis';
  6. // Subject for bidirectional communication
  7. const errorHandlingService: FisErrorHandlingService = new FisErrorHandlingService()
  8. const grpcService: GrpcService = new GrpcService()
  9. const messagesJSON: any = fs.readFileSync('payload.json')
  10. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  11. let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
  12. let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
  13. let dataMessages = stream() // Emulate messges to be sent over to target server
  14. let server1: string = 'localhost:3000'
  15. let unaryRequestSubject: Subject<any> = new Subject()
  16. /* Server Streaming Test case */
  17. errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => {
  18. messageToBeReleased.next(messages)
  19. })
  20. grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
  21. /* Bidirectional streaming test case */
  22. // errorHandlingService.handleMessage(dataMessages, statusControl).subscribe((messages) => {
  23. // messageToBeReleased.next(messages)
  24. // })
  25. // grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'bidirectional' })
  26. // }
  27. setTimeout(() => {
  28. messageToBeReleased.next(parsedMessages[0])
  29. }, 1000)
  30. setTimeout(() => {
  31. messageToBeReleased.next(parsedMessages[1])
  32. }, 4000)
  33. setTimeout(() => {
  34. unaryRequestSubject.next(parsedMessages[2])
  35. }, 7000)
  36. // this is just to publish an array of fake data as a Subject
  37. function stream(): Subject<any> {
  38. let result: Subject<any> = new Subject()
  39. let messages: any[] = parsedMessages
  40. let count = 0
  41. const intervalId = setInterval(() => {
  42. result.next(messages[count]);
  43. count++;
  44. if (count >= 1000) {
  45. clearInterval(intervalId);
  46. result.complete();
  47. }
  48. }, 500)
  49. return result
  50. }