grpc2.ts 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import { Subject, take } from 'rxjs';
  2. import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
  3. import { GrpcServiceMethod } from '../services/grpc.service.method';
  4. import { readFileSync } from 'fs';
  5. import { ServerClientManager } from '../services/server-client.service';
  6. // Subject for bidirectional communication
  7. const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
  8. const messagesJSON: any = readFileSync('payload.json')
  9. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  10. let targetserver: string = 'localhost:3000'
  11. let targetserver2: string = 'localhost:3002'
  12. let hostServer: string = 'localhost:3001'
  13. let array: any[] = [] // Used for testing
  14. let connectionRequest: ConnectionRequest = {
  15. database: 'grpc2',
  16. server: {
  17. serverUrl: hostServer,
  18. connectionType: 'GRPC',
  19. messageToBePublishedfromApplication: new Subject<Message>()
  20. },
  21. client: {
  22. targetServer: targetserver,
  23. connectionType: 'GRPC',
  24. messageToBeReceivedFromRemote: new Subject<Message>()
  25. }
  26. }
  27. // Handler for the incoming Messages from the other side.
  28. connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  29. next: request => {
  30. // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  31. if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  32. generateFakeStreamResponse(request).subscribe({
  33. next: (responseMessage: Message) => {
  34. // console.log(`Processing request:${request.id}....`)
  35. connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  36. },
  37. error: error => console.error(error),
  38. complete: () => {
  39. console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  40. }
  41. })
  42. } else {
  43. array.push(request)
  44. console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
  45. }
  46. },
  47. error: error => console.error(error),
  48. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  49. })
  50. connectionService.generateConnection(connectionRequest)
  51. /* Simple Test */
  52. // let generateFakeMessagesToBePublished = stream().pipe(take(10))
  53. // generateFakeMessagesToBePublished.subscribe({
  54. // next: message => {
  55. // let payload: Message = {
  56. // id: hostServer,
  57. // message: message
  58. // }
  59. // connectionRequest.server.messageToBePublishedfromApplication.next(payload)
  60. // }
  61. // })
  62. /* Complex Test: Expected out come, both must receive 14 message by the end. Havent try to disconnect.*/
  63. setTimeout(() => {
  64. let message = {
  65. id: parsedMessages[10].appData.msgId,
  66. message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  67. }
  68. connectionRequest.server.messageToBePublishedfromApplication.next(message)
  69. }, 3000)
  70. setTimeout(() => {
  71. let message = {
  72. id: parsedMessages[11].appData.msgId,
  73. message: parsedMessages[11]// Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  74. }
  75. connectionRequest.server.messageToBePublishedfromApplication.next(message)
  76. }, 4000)
  77. setTimeout(() => {
  78. console.log(`All received data: ${array.length}`)
  79. }, 10000)
  80. setTimeout(() => {
  81. console.log(`All received data: ${array.length}`)
  82. }, 20000)
  83. // this is just to publish an array of fake data as a Subject
  84. function stream(): Subject<any> {
  85. let result: Subject<any> = new Subject()
  86. let messages: any[] = parsedMessages
  87. let count = 0
  88. const intervalId = setInterval(() => {
  89. result.next(messages[count]);
  90. count++;
  91. if (count >= 1000) {
  92. clearInterval(intervalId);
  93. result.complete();
  94. }
  95. }, 500)
  96. return result
  97. }
  98. function generateFakeStreamResponse(request: any): Subject<any> {
  99. let res: Subject<any> = new Subject()
  100. stream().pipe(take(7)).subscribe({
  101. next: element => {
  102. let message = {
  103. id: request.id, // Caller's
  104. message: element
  105. }
  106. res.next(message)
  107. },
  108. error: error => console.error(error),
  109. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  110. })
  111. return res
  112. }