grpc1.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. import { Subject, from, interval, take } from 'rxjs';
  2. import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
  3. import { GrpcServiceMethod } from '../services/grpc.service.method';
  4. import { readFileSync } from 'fs';
  5. import { ServerClientManager } from '../services/server-client.service';
  6. import mongoose from 'mongoose';
  7. // Connect to MongoDB
  8. // mongoose.connect('mongodb://localhost:27017/grpc1')
  9. // const Message = mongoose.model('Message', require('../models/message.schema'))
  10. // Subject for bidirectional communication
  11. const connectionService: ServerClientManager = new ServerClientManager()
  12. const messagesJSON: any = readFileSync('payload.json')
  13. let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
  14. let targetserver: string = 'localhost:3001'
  15. let targetserver2: string = 'localhost:3002'
  16. let hostServer: string = 'localhost:3000'
  17. let array: any[] = [] // Used for testing
  18. let intervalToStreamOutGoingMessage: number = 1
  19. /* Simple Test: 1 to 1 */
  20. // let connectionRequest: ConnectionRequest = {
  21. // server: {
  22. // name: 'g1',
  23. // serverUrl: hostServer,
  24. // connectionType: 'GRPC',
  25. // messageToBePublishedFromApplication: new Subject<Message>()
  26. // },
  27. // client: {
  28. // name: 'g2',
  29. // targetServer: targetserver,
  30. // connectionType: 'GRPC',
  31. // messageToBeReceivedFromRemote: new Subject<Message>()
  32. // }
  33. // }
  34. // connectionService.generateConnection(connectionRequest)
  35. // let generateFakeMessagesToBePublished = stream().pipe(take(1000))
  36. // let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(5000))
  37. // generateFakeMessagesToBePublished.subscribe({
  38. // next: message => {
  39. // let payload: Message = {
  40. // id: hostServer,
  41. // message: message
  42. // }
  43. // connectionRequest.server.messageToBePublishedFromApplication.next(payload)
  44. // },
  45. // error: error => console.error(error),
  46. // complete: () => console.log(`Completed`)
  47. // })
  48. // stream().subscribe({
  49. // next: message => {
  50. // let payload = {
  51. // id: hostServer,
  52. // message: message
  53. // }
  54. // connectionRequest.server.messageToBePublishedFromApplication.next(payload)
  55. // }
  56. // })
  57. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  58. // next: response => {
  59. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  60. // // Message.create(response)
  61. // array.push(response)
  62. // },
  63. // error: error => console.error(error),
  64. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  65. // })
  66. /* Complex Test: 1 to 1*/
  67. // let connectionRequest: ConnectionRequest = {
  68. // server: {
  69. // name: 'g1',
  70. // serverUrl: hostServer,
  71. // connectionType: 'GRPC',
  72. // messageToBePublishedfromApplication: new Subject<Message>()
  73. // },
  74. // client: {
  75. // name: 'g2',
  76. // targetServer: targetserver,
  77. // connectionType: 'GRPC',
  78. // messageToBeReceivedFromRemote: new Subject<Message>()
  79. // }
  80. // }
  81. // connectionService.generateConnection(connectionRequest)
  82. // setTimeout(() => {
  83. // let message = {
  84. // id: parsedMessages[10].appData.msgId,
  85. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  86. // }
  87. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  88. // }, 3000)
  89. // setTimeout(() => {
  90. // let message = {
  91. // id: parsedMessages[11].appData.msgId,
  92. // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  93. // }
  94. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  95. // }, 4000)
  96. // Handler for the incoming Messages from the other side.
  97. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  98. // next: request => {
  99. // // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
  100. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  101. // generateFakeStreamResponse(request).subscribe({
  102. // next: (responseMessage: Message) => {
  103. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  104. // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  105. // },
  106. // error: error => console.error(error),
  107. // complete: () => {
  108. // console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
  109. // }
  110. // })
  111. // } else {
  112. // array.push(request)
  113. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  114. // }
  115. // },
  116. // error: error => console.error(error),
  117. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  118. // })
  119. /* Simple Test: 1 to Many */
  120. let connectionRequest1: ConnectionRequest = {
  121. server: {
  122. name: 'G0',
  123. serverUrl: hostServer,
  124. connectionType: 'GRPC',
  125. messageToBePublishedFromApplication: new Subject<Message>()
  126. },
  127. client: {
  128. name: 'G1',
  129. targetServer: targetserver,
  130. connectionType: 'GRPC',
  131. messageToBeReceivedFromRemote: new Subject<Message>()
  132. }
  133. }
  134. let connectionRequest2: ConnectionRequest = {
  135. server: {
  136. name: 'G0',
  137. serverUrl: hostServer,
  138. connectionType: 'GRPC',
  139. messageToBePublishedFromApplication: new Subject<Message>()
  140. },
  141. client: {
  142. name: 'G2',
  143. targetServer: targetserver2,
  144. connectionType: 'GRPC',
  145. messageToBeReceivedFromRemote: new Subject<Message>()
  146. }
  147. }
  148. connectionService.generateConnection(connectionRequest1).then((res) => {
  149. // console.log(res)
  150. })
  151. connectionService.generateConnection(connectionRequest2).then((res) => {
  152. // console.log(res)
  153. })
  154. let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(50))
  155. generateFakeMessagesToBePublished.subscribe({
  156. next: message => {
  157. let payload: Message = {
  158. id: hostServer,
  159. message: message
  160. }
  161. connectionRequest1.server!.messageToBePublishedFromApplication.next(payload)
  162. connectionRequest2.server!.messageToBePublishedFromApplication.next(payload)
  163. },
  164. error: error => console.error(error),
  165. complete: () => console.log(`Completed`)
  166. })
  167. // let generateFakeMessagesToBePublished = stream().pipe(take(10))
  168. // generateFakeMessagesToBePublished.subscribe({
  169. // next: message => {
  170. // let payload: Message = {
  171. // id: hostServer,
  172. // message: message
  173. // }
  174. // connectionRequest.server.messageToBePublishedFromApplication.next(payload)
  175. // connectionRequest2.server.messageToBePublishedFromApplication.next(payload)
  176. // }
  177. // })
  178. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  179. // next: request => {
  180. // console.log(`Received ${(request.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  181. // array.push(request)
  182. // },
  183. // error: error => console.error(error),
  184. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  185. // })
  186. // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
  187. // next: request => {
  188. // console.log(`Received ${(request.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  189. // array.push(request)
  190. // },
  191. // error: error => console.error(error),
  192. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  193. // })
  194. /* Complex Test: 1 to Many */
  195. // let connectionRequest: ConnectionRequest = {
  196. // server: {
  197. // name: 'g1',
  198. // serverUrl: hostServer,
  199. // connectionType: 'GRPC',
  200. // messageToBePublishedfromApplication: new Subject<Message>()
  201. // },
  202. // client: {
  203. // name: 'g2',
  204. // targetServer: targetserver,
  205. // connectionType: 'GRPC',
  206. // messageToBeReceivedFromRemote: new Subject<Message>()
  207. // }
  208. // }
  209. // let connectionRequest2: ConnectionRequest = {
  210. // server: {
  211. // name: 'g1',
  212. // serverUrl: hostServer,
  213. // connectionType: 'GRPC',
  214. // messageToBePublishedfromApplication: new Subject<Message>()
  215. // },
  216. // client: {
  217. // name: 'g3',
  218. // targetServer: targetserver2,
  219. // connectionType: 'GRPC',
  220. // messageToBeReceivedFromRemote: new Subject<Message>()
  221. // }
  222. // }
  223. // connectionService.generateConnection(connectionRequest)
  224. // connectionService.generateConnection(connectionRequest2)
  225. // setTimeout(() => {
  226. // let message = {
  227. // id: parsedMessages[10].appData.msgId,
  228. // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
  229. // }
  230. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  231. // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
  232. // }, 3000)
  233. // setTimeout(() => {
  234. // let message = {
  235. // id: parsedMessages[11].appData.msgId,
  236. // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
  237. // }
  238. // connectionRequest.server.messageToBePublishedfromApplication.next(message)
  239. // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
  240. // }, 4000)
  241. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
  242. // next: request => {
  243. // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
  244. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  245. // generateFakeStreamResponse(request).subscribe({
  246. // next: (responseMessage: Message) => {
  247. // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
  248. // },
  249. // error: error => console.error(error),
  250. // complete: () => {
  251. // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  252. // }
  253. // })
  254. // } else {
  255. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  256. // array.push(request)
  257. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  258. // }
  259. // },
  260. // error: error => console.error(error),
  261. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  262. // })
  263. // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
  264. // next: request => {
  265. // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
  266. // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
  267. // generateFakeStreamResponse(request).subscribe({
  268. // next: (responseMessage: Message) => {
  269. // connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage)
  270. // },
  271. // error: error => console.error(error),
  272. // complete: () => {
  273. // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
  274. // }
  275. // })
  276. // } else {
  277. // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
  278. // array.push(request)
  279. // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
  280. // }
  281. // },
  282. // error: error => console.error(error),
  283. // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
  284. // })
  285. // this is just to publish an array of fake data as a Subject
  286. function stream(): Subject<any> {
  287. let result: Subject<any> = new Subject()
  288. let messages: any[] = parsedMessages
  289. let count = 0
  290. const intervalId = setInterval(() => {
  291. result.next(messages[count]);
  292. count++;
  293. if (count >= 1000) {
  294. clearInterval(intervalId);
  295. result.complete();
  296. }
  297. }, intervalToStreamOutGoingMessage)
  298. return result
  299. }
  300. function generateFakeStreamResponse(request: any): Subject<any> {
  301. let res: Subject<any> = new Subject()
  302. stream().pipe(take(7)).subscribe({
  303. next: element => {
  304. let message = {
  305. id: request.id, // Caller's
  306. message: element
  307. }
  308. res.next(message)
  309. },
  310. error: error => console.error(error),
  311. complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
  312. })
  313. return res
  314. }
  315. /* Checking the values by the end of the test */
  316. interval(5000).subscribe(() => {
  317. console.log(`All received data: ${array.length}`);
  318. });