grpc2.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. import { Subject, from, take } from 'rxjs';
  2. import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
  3. import { GrpcServiceMethod } from '../services/grpc.service.method';
  4. import { readFileSync } from 'fs';
  5. import { ServerClientManager } from '../services/server-client.service';
  6. // Subject for bidirectional communication
  7. const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
  8. const messagesJSON: any = readFileSync('payload.json')
  9. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  10. let targetserver: string = 'localhost:3000'
  11. let targetserver2: string = 'localhost:3002'
  12. let hostServer: string = 'localhost:3001'
  13. let array: any[] = [] // Used for testing
  14. let intervalToStreamOutGoingMessage: number = 1
  15. /* Simple Test: 1 to 1 */
  16. let connectionRequest: ConnectionRequest = {
  17. server: {
  18. name: 'g2',
  19. serverUrl: hostServer,
  20. connectionType: 'GRPC',
  21. messageToBePublishedfromApplication: new Subject<Message>()
  22. },
  23. client: {
  24. name: 'g1',
  25. targetServer: targetserver,
  26. connectionType: 'GRPC',
  27. messageToBeReceivedFromRemote: new Subject<Message>()
  28. }
  29. }
  30. connectionService.generateConnection(connectionRequest)
  31. // 10000th message == 848438e1-da50-4d98-aa12-e44d6d6a1489
  32. // let generateFakeMessagesToBePublished = stream().pipe(take(1000))
  33. let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(20000))
  34. generateFakeMessagesToBePublished.subscribe({
  35. next: message => {
  36. let payload: Message = {
  37. id: hostServer,
  38. message: message
  39. }
  40. // connectionRequest.server.messageToBePublishedfromApplication.next(payload)
  41. }
  42. })
  43. connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  44. next: response => {
  45. if((response.message as MessageLog).appData.msgId == `ebf94479-44fe-470d-827c-9f1389396d6a`){
  46. console.log(`Received the 1000th message. Running the test. Initiating server restart....`)
  47. connectionService.restartServerInDuration(10)
  48. }
  49. console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  50. array.push(response)
  51. },
  52. error: error => console.error(error),
  53. complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  54. })
  55. /* Complex Test: 1 to 1*/
  56. // let connectionRequest: ConnectionRequest = {
  57. // server: {
  58. // name: 'g1',
  59. // serverUrl: hostServer,
  60. // connectionType: 'GRPC',
  61. // messageToBePublishedfromApplication: new Subject<Message>()
  62. // },
  63. // client: {
  64. // name: 'g2',
  65. // targetServer: targetserver,
  66. // connectionType: 'GRPC',
  67. // messageToBeReceivedFromRemote: new Subject<Message>()
  68. // }
  69. // }
  70. // connectionService.generateConnection(connectionRequest)
  71. // setTimeout(() => {
  72. // let message = {
  73. // id: parsedMessages[10].appData.msgId,
  74. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  75. // }
  76. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  77. // }, 3000)
  78. // setTimeout(() => {
  79. // let message = {
  80. // id: parsedMessages[11].appData.msgId,
  81. // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  82. // }
  83. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  84. // }, 4000)
  85. // Handler for the incoming Messages from the other side.
  86. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  87. // next: request => {
  88. // // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  89. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  90. // generateFakeStreamResponse(request).subscribe({
  91. // next: (responseMessage: Message) => {
  92. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  93. // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  94. // },
  95. // error: error => console.error(error),
  96. // complete: () => {
  97. // console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  98. // }
  99. // })
  100. // } else {
  101. // array.push(request)
  102. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  103. // }
  104. // },
  105. // error: error => console.error(error),
  106. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  107. // })
  108. /* Simple Test: 1 to Many */
  109. // let connectionRequest: ConnectionRequest = {
  110. // server: {
  111. // name: 'g1',
  112. // serverUrl: hostServer,
  113. // connectionType: 'GRPC',
  114. // messageToBePublishedfromApplication: new Subject<Message>()
  115. // },
  116. // client: {
  117. // name: 'g2',
  118. // targetServer: targetserver,
  119. // connectionType: 'GRPC',
  120. // messageToBeReceivedFromRemote: new Subject<Message>()
  121. // }
  122. // }
  123. // let connectionRequest2: ConnectionRequest = {
  124. // server: {
  125. // name: 'g1',
  126. // serverUrl: hostServer,
  127. // connectionType: 'GRPC',
  128. // messageToBePublishedfromApplication: new Subject<Message>()
  129. // },
  130. // client: {
  131. // name: 'g3',
  132. // targetServer: targetserver2,
  133. // connectionType: 'GRPC',
  134. // messageToBeReceivedFromRemote: new Subject<Message>()
  135. // }
  136. // }
  137. // connectionService.generateConnection(connectionRequest)
  138. // connectionService.generateConnection(connectionRequest2)
  139. // let generateFakeMessagesToBePublished = stream().pipe(take(10))
  140. // generateFakeMessagesToBePublished.subscribe({
  141. // next: message => {
  142. // let payload: Message = {
  143. // id: hostServer,
  144. // message: message
  145. // }
  146. // connectionRequest.server.messageToBePublishedfromApplication.next(payload)
  147. // }
  148. // })
  149. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  150. // next: request => {
  151. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  152. // array.push(request)
  153. // },
  154. // error: error => console.error(error),
  155. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  156. // })
  157. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  158. // next: request => {
  159. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  160. // array.push(request)
  161. // },
  162. // error: error => console.error(error),
  163. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  164. // })
  165. // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
  166. // next: request => {
  167. // array.push(request)
  168. // },
  169. // error: error => console.error(error),
  170. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  171. // })
  172. /* Complex Test: 1 to Many */
  173. // let connectionRequest: ConnectionRequest = {
  174. // server: {
  175. // name: 'g1',
  176. // serverUrl: hostServer,
  177. // connectionType: 'GRPC',
  178. // messageToBePublishedfromApplication: new Subject<Message>()
  179. // },
  180. // client: {
  181. // name: 'g2',
  182. // targetServer: targetserver,
  183. // connectionType: 'GRPC',
  184. // messageToBeReceivedFromRemote: new Subject<Message>()
  185. // }
  186. // }
  187. // let connectionRequest2: ConnectionRequest = {
  188. // server: {
  189. // name: 'g1',
  190. // serverUrl: hostServer,
  191. // connectionType: 'GRPC',
  192. // messageToBePublishedfromApplication: new Subject<Message>()
  193. // },
  194. // client: {
  195. // name: 'g3',
  196. // targetServer: targetserver2,
  197. // connectionType: 'GRPC',
  198. // messageToBeReceivedFromRemote: new Subject<Message>()
  199. // }
  200. // }
  201. // connectionService.generateConnection(connectionRequest)
  202. // connectionService.generateConnection(connectionRequest2)
  203. // setTimeout(() => {
  204. // let message = {
  205. // id: parsedMessages[10].appData.msgId,
  206. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  207. // }
  208. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  209. // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
  210. // }, 3000)
  211. // setTimeout(() => {
  212. // let message = {
  213. // id: parsedMessages[11].appData.msgId,
  214. // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  215. // }
  216. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  217. // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
  218. // }, 4000)
  219. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  220. // next: request => {
  221. // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
  222. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  223. // generateFakeStreamResponse(request).subscribe({
  224. // next: (responseMessage: Message) => {
  225. // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  226. // },
  227. // error: error => console.error(error),
  228. // complete: () => {
  229. // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  230. // }
  231. // })
  232. // } else {
  233. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  234. // array.push(request)
  235. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  236. // }
  237. // },
  238. // error: error => console.error(error),
  239. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  240. // })
  241. // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
  242. // next: request => {
  243. // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
  244. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  245. // generateFakeStreamResponse(request).subscribe({
  246. // next: (responseMessage: Message) => {
  247. // connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage)
  248. // },
  249. // error: error => console.error(error),
  250. // complete: () => {
  251. // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  252. // }
  253. // })
  254. // } else {
  255. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  256. // array.push(request)
  257. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  258. // }
  259. // },
  260. // error: error => console.error(error),
  261. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  262. // })
  263. // this is just to publish an array of fake data as a Subject
  264. function stream(): Subject<any> {
  265. let result: Subject<any> = new Subject()
  266. let messages: any[] = parsedMessages
  267. let count = 0
  268. const intervalId = setInterval(() => {
  269. result.next(messages[count]);
  270. count++;
  271. if (count >= 10000) {
  272. clearInterval(intervalId);
  273. result.complete();
  274. }
  275. }, intervalToStreamOutGoingMessage)
  276. return result
  277. }
  278. function generateFakeStreamResponse(request: any): Subject<any> {
  279. let res: Subject<any> = new Subject()
  280. stream().pipe(take(7)).subscribe({
  281. next: element => {
  282. let message = {
  283. id: request.id, // Caller's
  284. message: element
  285. }
  286. res.next(message)
  287. },
  288. error: error => console.error(error),
  289. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  290. })
  291. return res
  292. }
  293. /* Checking the values by the end of the test */
  294. setTimeout(() => {
  295. console.log(`All received data: ${array.length}`)
  296. }, 5000)
  297. setTimeout(() => {
  298. console.log(`All received data: ${array.length}`)
  299. }, 10000)
  300. setTimeout(() => {
  301. console.log(`All received data: ${array.length}`)
  302. }, 15000)
  303. setTimeout(() => {
  304. console.log(`All received data: ${array.length}`)
  305. }, 20000)