grpc1.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. import { Subject, from, take } from 'rxjs';
  2. import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
  3. import { GrpcServiceMethod } from '../services/grpc.service.method';
  4. import { readFileSync } from 'fs';
  5. import { ServerClientManager } from '../services/server-client.service';
  6. // Subject for bidirectional communication
  7. const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
  8. const messagesJSON: any = readFileSync('payload.json')
  9. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  10. let targetserver: string = 'localhost:3001'
  11. let targetserver2: string = 'localhost:3002'
  12. let hostServer: string = 'localhost:3000'
  13. let array: any[] = [] // Used for testing
  14. let intervalToStreamOutGoingMessage: number = 1
  15. /* Simple Test: 1 to 1 */
  16. let connectionRequest: ConnectionRequest = {
  17. server: {
  18. name: 'g1',
  19. serverUrl: hostServer,
  20. connectionType: 'GRPC',
  21. messageToBePublishedfromApplication: new Subject<Message>()
  22. },
  23. client: {
  24. name: 'g2',
  25. targetServer: targetserver,
  26. connectionType: 'GRPC',
  27. messageToBeReceivedFromRemote: new Subject<Message>()
  28. }
  29. }
  30. connectionService.generateConnection(connectionRequest)
  31. // let generateFakeMessagesToBePublished = stream().pipe(take(1000))
  32. let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(3000))
  33. generateFakeMessagesToBePublished.subscribe({
  34. next: message => {
  35. let payload: Message = {
  36. id: hostServer,
  37. message: message
  38. }
  39. connectionRequest.server.messageToBePublishedfromApplication.next(payload)
  40. }
  41. })
  42. connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  43. next: response => {
  44. console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  45. array.push(response)
  46. },
  47. error: error => console.error(error),
  48. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  49. })
  50. /* Complex Test: 1 to 1*/
  51. // let connectionRequest: ConnectionRequest = {
  52. // server: {
  53. // name: 'g1',
  54. // serverUrl: hostServer,
  55. // connectionType: 'GRPC',
  56. // messageToBePublishedfromApplication: new Subject<Message>()
  57. // },
  58. // client: {
  59. // name: 'g2',
  60. // targetServer: targetserver,
  61. // connectionType: 'GRPC',
  62. // messageToBeReceivedFromRemote: new Subject<Message>()
  63. // }
  64. // }
  65. // connectionService.generateConnection(connectionRequest)
  66. // setTimeout(() => {
  67. // let message = {
  68. // id: parsedMessages[10].appData.msgId,
  69. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  70. // }
  71. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  72. // }, 3000)
  73. // setTimeout(() => {
  74. // let message = {
  75. // id: parsedMessages[11].appData.msgId,
  76. // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  77. // }
  78. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  79. // }, 4000)
  80. // Handler for the incoming Messages from the other side.
  81. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  82. // next: request => {
  83. // // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  84. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  85. // generateFakeStreamResponse(request).subscribe({
  86. // next: (responseMessage: Message) => {
  87. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  88. // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  89. // },
  90. // error: error => console.error(error),
  91. // complete: () => {
  92. // console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  93. // }
  94. // })
  95. // } else {
  96. // array.push(request)
  97. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  98. // }
  99. // },
  100. // error: error => console.error(error),
  101. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  102. // })
  103. /* Simple Test: 1 to Many */
  104. // let connectionRequest: ConnectionRequest = {
  105. // server: {
  106. // name: 'g1',
  107. // serverUrl: hostServer,
  108. // connectionType: 'GRPC',
  109. // messageToBePublishedfromApplication: new Subject<Message>()
  110. // },
  111. // client: {
  112. // name: 'g2',
  113. // targetServer: targetserver,
  114. // connectionType: 'GRPC',
  115. // messageToBeReceivedFromRemote: new Subject<Message>()
  116. // }
  117. // }
  118. // let connectionRequest2: ConnectionRequest = {
  119. // server: {
  120. // name: 'g1',
  121. // serverUrl: hostServer,
  122. // connectionType: 'GRPC',
  123. // messageToBePublishedfromApplication: new Subject<Message>()
  124. // },
  125. // client: {
  126. // name: 'g3',
  127. // targetServer: targetserver2,
  128. // connectionType: 'GRPC',
  129. // messageToBeReceivedFromRemote: new Subject<Message>()
  130. // }
  131. // }
  132. // connectionService.generateConnection(connectionRequest)
  133. // connectionService.generateConnection(connectionRequest2)
  134. // let generateFakeMessagesToBePublished = stream().pipe(take(10))
  135. // generateFakeMessagesToBePublished.subscribe({
  136. // next: message => {
  137. // let payload: Message = {
  138. // id: hostServer,
  139. // message: message
  140. // }
  141. // connectionRequest.server.messageToBePublishedfromApplication.next(payload)
  142. // }
  143. // })
  144. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  145. // next: request => {
  146. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  147. // array.push(request)
  148. // },
  149. // error: error => console.error(error),
  150. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  151. // })
  152. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  153. // next: request => {
  154. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  155. // array.push(request)
  156. // },
  157. // error: error => console.error(error),
  158. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  159. // })
  160. // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
  161. // next: request => {
  162. // array.push(request)
  163. // },
  164. // error: error => console.error(error),
  165. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  166. // })
  167. /* Complex Test: 1 to Many */
  168. // let connectionRequest: ConnectionRequest = {
  169. // server: {
  170. // name: 'g1',
  171. // serverUrl: hostServer,
  172. // connectionType: 'GRPC',
  173. // messageToBePublishedfromApplication: new Subject<Message>()
  174. // },
  175. // client: {
  176. // name: 'g2',
  177. // targetServer: targetserver,
  178. // connectionType: 'GRPC',
  179. // messageToBeReceivedFromRemote: new Subject<Message>()
  180. // }
  181. // }
  182. // let connectionRequest2: ConnectionRequest = {
  183. // server: {
  184. // name: 'g1',
  185. // serverUrl: hostServer,
  186. // connectionType: 'GRPC',
  187. // messageToBePublishedfromApplication: new Subject<Message>()
  188. // },
  189. // client: {
  190. // name: 'g3',
  191. // targetServer: targetserver2,
  192. // connectionType: 'GRPC',
  193. // messageToBeReceivedFromRemote: new Subject<Message>()
  194. // }
  195. // }
  196. // connectionService.generateConnection(connectionRequest)
  197. // connectionService.generateConnection(connectionRequest2)
  198. // setTimeout(() => {
  199. // let message = {
  200. // id: parsedMessages[10].appData.msgId,
  201. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  202. // }
  203. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  204. // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
  205. // }, 3000)
  206. // setTimeout(() => {
  207. // let message = {
  208. // id: parsedMessages[11].appData.msgId,
  209. // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  210. // }
  211. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  212. // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
  213. // }, 4000)
  214. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  215. // next: request => {
  216. // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
  217. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  218. // generateFakeStreamResponse(request).subscribe({
  219. // next: (responseMessage: Message) => {
  220. // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  221. // },
  222. // error: error => console.error(error),
  223. // complete: () => {
  224. // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  225. // }
  226. // })
  227. // } else {
  228. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  229. // array.push(request)
  230. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  231. // }
  232. // },
  233. // error: error => console.error(error),
  234. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  235. // })
  236. // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
  237. // next: request => {
  238. // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
  239. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  240. // generateFakeStreamResponse(request).subscribe({
  241. // next: (responseMessage: Message) => {
  242. // connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage)
  243. // },
  244. // error: error => console.error(error),
  245. // complete: () => {
  246. // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  247. // }
  248. // })
  249. // } else {
  250. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  251. // array.push(request)
  252. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  253. // }
  254. // },
  255. // error: error => console.error(error),
  256. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  257. // })
  258. // this is just to publish an array of fake data as a Subject
  259. function stream(): Subject<any> {
  260. let result: Subject<any> = new Subject()
  261. let messages: any[] = parsedMessages
  262. let count = 0
  263. const intervalId = setInterval(() => {
  264. result.next(messages[count]);
  265. count++;
  266. if (count >= 1000) {
  267. clearInterval(intervalId);
  268. result.complete();
  269. }
  270. }, intervalToStreamOutGoingMessage)
  271. return result
  272. }
  273. function generateFakeStreamResponse(request: any): Subject<any> {
  274. let res: Subject<any> = new Subject()
  275. stream().pipe(take(7)).subscribe({
  276. next: element => {
  277. let message = {
  278. id: request.id, // Caller's
  279. message: element
  280. }
  281. res.next(message)
  282. },
  283. error: error => console.error(error),
  284. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  285. })
  286. return res
  287. }
  288. /* Checking the values by the end of the test */
  289. setTimeout(() => {
  290. console.log(`All received data: ${array.length}`)
  291. }, 5000)
  292. setTimeout(() => {
  293. console.log(`All received data: ${array.length}`)
  294. }, 10000)
  295. setTimeout(() => {
  296. console.log(`All received data: ${array.length}`)
  297. }, 15000)
  298. setTimeout(() => {
  299. console.log(`All received data: ${array.length}`)
  300. }, 20000)