grpc3.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. import { Subject, take } from 'rxjs';
  2. import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
  3. import { GrpcServiceMethod } from '../services/grpc.service.method';
  4. import { readFileSync } from 'fs';
  5. import { ServerClientManager } from '../services/server-client.service';
  6. // Subject for bidirectional communication
  7. const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
  8. const messagesJSON: any = readFileSync('payload.json')
  9. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  10. let targetserver: string = 'localhost:3000'
  11. let targetserver2: string = 'localhost:3001'
  12. let hostServer: string = 'localhost:3002'
  13. let array: any[] = [] // Used for testing
  14. let intervalToStreamOutGoingMessage: number = 10
  15. /* Simple Test: 1 to 1 */
  16. let connectionRequest: ConnectionRequest = {
  17. server: {
  18. name: 'g2',
  19. serverUrl: hostServer,
  20. connectionType: 'GRPC',
  21. messageToBePublishedfromApplication: new Subject<Message>()
  22. },
  23. client: {
  24. name: 'g1',
  25. targetServer: targetserver,
  26. connectionType: 'GRPC',
  27. messageToBeReceivedFromRemote: new Subject<Message>()
  28. }
  29. }
  30. connectionService.generateConnection(connectionRequest)
  31. let generateFakeMessagesToBePublished = stream().pipe(take(10))
  32. generateFakeMessagesToBePublished.subscribe({
  33. next: message => {
  34. let payload: Message = {
  35. id: hostServer,
  36. message: message
  37. }
  38. connectionRequest.server.messageToBePublishedfromApplication.next(payload)
  39. }
  40. })
  41. connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  42. next: response => {
  43. console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  44. array.push(response)
  45. },
  46. error: error => console.error(error),
  47. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  48. })
  49. /* Complex Test: 1 to 1*/
  50. // let connectionRequest: ConnectionRequest = {
  51. // server: {
  52. // name: 'g1',
  53. // serverUrl: hostServer,
  54. // connectionType: 'GRPC',
  55. // messageToBePublishedfromApplication: new Subject<Message>()
  56. // },
  57. // client: {
  58. // name: 'g2',
  59. // targetServer: targetserver,
  60. // connectionType: 'GRPC',
  61. // messageToBeReceivedFromRemote: new Subject<Message>()
  62. // }
  63. // }
  64. // connectionService.generateConnection(connectionRequest)
  65. // setTimeout(() => {
  66. // let message = {
  67. // id: parsedMessages[10].appData.msgId,
  68. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  69. // }
  70. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  71. // }, 3000)
  72. // setTimeout(() => {
  73. // let message = {
  74. // id: parsedMessages[11].appData.msgId,
  75. // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  76. // }
  77. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  78. // }, 4000)
  79. // Handler for the incoming Messages from the other side.
  80. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  81. // next: request => {
  82. // // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  83. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  84. // generateFakeStreamResponse(request).subscribe({
  85. // next: (responseMessage: Message) => {
  86. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  87. // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  88. // },
  89. // error: error => console.error(error),
  90. // complete: () => {
  91. // console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  92. // }
  93. // })
  94. // } else {
  95. // array.push(request)
  96. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  97. // }
  98. // },
  99. // error: error => console.error(error),
  100. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  101. // })
  102. /* Simple Test: 1 to Many */
  103. // let connectionRequest: ConnectionRequest = {
  104. // server: {
  105. // name: 'g1',
  106. // serverUrl: hostServer,
  107. // connectionType: 'GRPC',
  108. // messageToBePublishedfromApplication: new Subject<Message>()
  109. // },
  110. // client: {
  111. // name: 'g2',
  112. // targetServer: targetserver,
  113. // connectionType: 'GRPC',
  114. // messageToBeReceivedFromRemote: new Subject<Message>()
  115. // }
  116. // }
  117. // let connectionRequest2: ConnectionRequest = {
  118. // server: {
  119. // name: 'g1',
  120. // serverUrl: hostServer,
  121. // connectionType: 'GRPC',
  122. // messageToBePublishedfromApplication: new Subject<Message>()
  123. // },
  124. // client: {
  125. // name: 'g3',
  126. // targetServer: targetserver2,
  127. // connectionType: 'GRPC',
  128. // messageToBeReceivedFromRemote: new Subject<Message>()
  129. // }
  130. // }
  131. // connectionService.generateConnection(connectionRequest)
  132. // connectionService.generateConnection(connectionRequest2)
  133. // let generateFakeMessagesToBePublished = stream().pipe(take(10))
  134. // generateFakeMessagesToBePublished.subscribe({
  135. // next: message => {
  136. // let payload: Message = {
  137. // id: hostServer,
  138. // message: message
  139. // }
  140. // connectionRequest.server.messageToBePublishedfromApplication.next(payload)
  141. // }
  142. // })
  143. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  144. // next: request => {
  145. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  146. // array.push(request)
  147. // },
  148. // error: error => console.error(error),
  149. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  150. // })
  151. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  152. // next: request => {
  153. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  154. // array.push(request)
  155. // },
  156. // error: error => console.error(error),
  157. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  158. // })
  159. // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
  160. // next: request => {
  161. // array.push(request)
  162. // },
  163. // error: error => console.error(error),
  164. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  165. // })
  166. /* Complex Test: 1 to Many */
  167. // let connectionRequest: ConnectionRequest = {
  168. // server: {
  169. // name: 'g1',
  170. // serverUrl: hostServer,
  171. // connectionType: 'GRPC',
  172. // messageToBePublishedfromApplication: new Subject<Message>()
  173. // },
  174. // client: {
  175. // name: 'g2',
  176. // targetServer: targetserver,
  177. // connectionType: 'GRPC',
  178. // messageToBeReceivedFromRemote: new Subject<Message>()
  179. // }
  180. // }
  181. // let connectionRequest2: ConnectionRequest = {
  182. // server: {
  183. // name: 'g1',
  184. // serverUrl: hostServer,
  185. // connectionType: 'GRPC',
  186. // messageToBePublishedfromApplication: new Subject<Message>()
  187. // },
  188. // client: {
  189. // name: 'g3',
  190. // targetServer: targetserver2,
  191. // connectionType: 'GRPC',
  192. // messageToBeReceivedFromRemote: new Subject<Message>()
  193. // }
  194. // }
  195. // connectionService.generateConnection(connectionRequest)
  196. // connectionService.generateConnection(connectionRequest2)
  197. // setTimeout(() => {
  198. // let message = {
  199. // id: parsedMessages[10].appData.msgId,
  200. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  201. // }
  202. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  203. // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
  204. // }, 3000)
  205. // setTimeout(() => {
  206. // let message = {
  207. // id: parsedMessages[11].appData.msgId,
  208. // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  209. // }
  210. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  211. // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
  212. // }, 4000)
  213. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  214. // next: request => {
  215. // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
  216. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  217. // generateFakeStreamResponse(request).subscribe({
  218. // next: (responseMessage: Message) => {
  219. // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  220. // },
  221. // error: error => console.error(error),
  222. // complete: () => {
  223. // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  224. // }
  225. // })
  226. // } else {
  227. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  228. // array.push(request)
  229. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  230. // }
  231. // },
  232. // error: error => console.error(error),
  233. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  234. // })
  235. // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
  236. // next: request => {
  237. // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
  238. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  239. // generateFakeStreamResponse(request).subscribe({
  240. // next: (responseMessage: Message) => {
  241. // connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage)
  242. // },
  243. // error: error => console.error(error),
  244. // complete: () => {
  245. // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  246. // }
  247. // })
  248. // } else {
  249. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  250. // array.push(request)
  251. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  252. // }
  253. // },
  254. // error: error => console.error(error),
  255. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  256. // })
  257. // this is just to publish an array of fake data as a Subject
  258. function stream(): Subject<any> {
  259. let result: Subject<any> = new Subject()
  260. let messages: any[] = parsedMessages
  261. let count = 0
  262. const intervalId = setInterval(() => {
  263. result.next(messages[count]);
  264. count++;
  265. if (count >= 1000) {
  266. clearInterval(intervalId);
  267. result.complete();
  268. }
  269. }, intervalToStreamOutGoingMessage)
  270. return result
  271. }
  272. function generateFakeStreamResponse(request: any): Subject<any> {
  273. let res: Subject<any> = new Subject()
  274. stream().pipe(take(7)).subscribe({
  275. next: element => {
  276. let message = {
  277. id: request.id, // Caller's
  278. message: element
  279. }
  280. res.next(message)
  281. },
  282. error: error => console.error(error),
  283. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  284. })
  285. return res
  286. }
  287. /* Checking the values by the end of the test */
  288. setTimeout(() => {
  289. console.log(`All received data: ${array.length}`)
  290. }, 5000)
  291. setTimeout(() => {
  292. console.log(`All received data: ${array.length}`)
  293. }, 10000)
  294. setTimeout(() => {
  295. console.log(`All received data: ${array.length}`)
  296. }, 15000)