grpc1.v2.bak 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import { Subject, take } from 'rxjs';
  2. import { GrpcServiceMethod } from '../services/grpc.service.method';
  3. import { readFileSync } from 'fs';
  4. import { ConnectionRequest, Message } from '../interfaces/general.interface';
  5. import { ServerClientManager } from '../services/server-client.service';
  6. // Subject for bidirectional communication
  7. const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
  8. const messagesJSON: any = readFileSync('payload.json')
  9. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  10. let targetserver: string = 'localhost:3001'
  11. let targetserver2: string = 'localhost:3002'
  12. let hostServer: string = 'localhost:3000'
  13. let array: any[] = [] // Used for testing
  14. let connectionRequest: ConnectionRequest = {
  15. server: {
  16. name: 'grpc1',
  17. serverUrl: hostServer,
  18. connectionType: 'GRPC',
  19. messageToBePublishedfromApplication: new Subject<Message>()
  20. },
  21. client: [{
  22. name: 'grpc2',
  23. targetServer: targetserver,
  24. connectionType: 'GRPC',
  25. messageToBeReceivedFromRemote: new Subject<Message>()
  26. }]
  27. }
  28. let client :ConnectionAttribute[] = [
  29. {
  30. name:"con1"
  31. ConnectionID: "aaa123-xxx123",
  32. outGoing: {
  33. Name?: string,
  34. ChannelID?: "aaa123",
  35. PublisherID?: "bbb123",
  36. SubscriberID?: "ccc123",
  37. }
  38. ,
  39. inComing: {
  40. Name?: string,
  41. ChannelID?: "xxx123",
  42. PublisherID?: "yyy123",
  43. SubscriberID?: "zzz123",
  44. }
  45. ,
  46. connectionStatus: Subject<ReportStatus>
  47. }
  48. {
  49. name:"con2"
  50. ConnectionID: "aaa123xxx-xxx123xx",
  51. outGoing: {
  52. Name?: string,
  53. ChannelID?: "aaa123xxx",
  54. PublisherID?: "bbb123",
  55. SubscriberID?: "ccc123xxx",
  56. }
  57. ,
  58. inComing: {
  59. Name?: string,
  60. ChannelID?: "xxx123xx",
  61. PublisherID?: "yyy123xxx",
  62. SubscriberID?: "zzz123xxx",
  63. }
  64. ,
  65. connectionStatus: Subject<ReportStatus>
  66. }
  67. ]
  68. // Handler for the incoming Messages from the other side.
  69. connectionRequest.client.forEach((client) => {
  70. client.messageToBeReceivedFromRemote.subscribe({
  71. next: request => {
  72. // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  73. if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  74. generateFakeStreamResponse(request).subscribe({
  75. next: (responseMessage: Message) => {
  76. // console.log(`Processing request:${request.id}....`)
  77. connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  78. },
  79. error: error => console.error(error),
  80. complete: () => {
  81. console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  82. }
  83. })
  84. } else {
  85. array.push(request)
  86. console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
  87. }
  88. },
  89. error: error => console.error(error),
  90. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  91. })
  92. })
  93. connectionService.generateConnection(connectionRequest)
  94. /* Simple Test */
  95. // let generateFakeMessagesToBePublished = stream().pipe(take(10))
  96. // generateFakeMessagesToBePublished.subscribe({
  97. // next: message => {
  98. // let payload: Message = {
  99. // id: hostServer,
  100. // message: message
  101. // }
  102. // connectionRequest.server.messageToBePublishedfromApplication.next(payload)
  103. // }
  104. // })
  105. /* Complex Test: Expected out come, both must receive 14 message by the end. Havent try to disconnect.*/
  106. setTimeout(() => {
  107. let message = {
  108. id: parsedMessages[10].appData.msgId,
  109. message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  110. }
  111. connectionRequest.server.messageToBePublishedfromApplication.next(message)
  112. }, 3000)
  113. setTimeout(() => {
  114. let message = {
  115. id: parsedMessages[11].appData.msgId,
  116. message: parsedMessages[11]// Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  117. }
  118. connectionRequest.server.messageToBePublishedfromApplication.next(message)
  119. }, 4000)
  120. setTimeout(() => {
  121. console.log(`All received data: ${array.length}`)
  122. }, 10000)
  123. setTimeout(() => {
  124. console.log(`All received data: ${array.length}`)
  125. }, 20000)
  126. // this is just to publish an array of fake data as a Subject
  127. function stream(): Subject<any> {
  128. let result: Subject<any> = new Subject()
  129. let messages: any[] = parsedMessages
  130. let count = 0
  131. const intervalId = setInterval(() => {
  132. result.next(messages[count]);
  133. count++;
  134. if (count >= 1000) {
  135. clearInterval(intervalId);
  136. result.complete();
  137. }
  138. }, 500)
  139. return result
  140. }
  141. function generateFakeStreamResponse(request: any): Subject<any> {
  142. let res: Subject<any> = new Subject()
  143. stream().pipe(take(7)).subscribe({
  144. next: element => {
  145. let message = {
  146. id: request.id, // Caller's
  147. message: element
  148. }
  149. res.next(message)
  150. },
  151. error: error => console.error(error),
  152. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  153. })
  154. return res
  155. }