123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- import { Subject, from, interval, take } from 'rxjs';
- import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
- import { GrpcServiceMethod } from '../services/grpc.service.method';
- import { readFileSync } from 'fs';
- import { ServerClientManager } from '../services/server-client.service';
- import mongoose from 'mongoose';
- // Connect to MongoDB
- // mongoose.connect('mongodb://localhost:27017/grpc1')
- // const Message = mongoose.model('Message', require('../models/message.schema'))
- // Subject for bidirectional communication
- const connectionService: ServerClientManager = new ServerClientManager()
- const messagesJSON: any = readFileSync('payload.json')
- let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
- let targetserver: string = 'localhost:3001'
- let targetserver2: string = 'localhost:3002'
- let hostServer: string = 'localhost:3000'
- let array: any[] = [] // Used for testing
- let intervalToStreamOutGoingMessage: number = 15
- /* Simple Test: 1 to 1 */
- // let connectionRequest: ConnectionRequest = {
- // server: {
- // name: 'g1',
- // serverUrl: hostServer,
- // connectionType: 'GRPC',
- // messageToBePublishedFromApplication: new Subject<Message>()
- // },
- // client: {
- // name: 'g2',
- // targetServer: targetserver,
- // connectionType: 'GRPC',
- // messageToBeReceivedFromRemote: new Subject<Message>()
- // }
- // }
- // connectionService.generateConnection(connectionRequest)
- // let generateFakeMessagesToBePublished = stream().pipe(take(1000))
- // let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(5000))
- // generateFakeMessagesToBePublished.subscribe({
- // next: message => {
- // let payload: Message = {
- // id: hostServer,
- // message: message
- // }
- // connectionRequest.server.messageToBePublishedFromApplication.next(payload)
- // },
- // error: error => console.error(error),
- // complete: () => console.log(`Completed`)
- // })
- // stream().subscribe({
- // next: message => {
- // let payload = {
- // id: hostServer,
- // message: message
- // }
- // connectionRequest.server.messageToBePublishedFromApplication.next(payload)
- // }
- // })
- // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
- // next: response => {
- // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
- // // Message.create(response)
- // array.push(response)
- // },
- // error: error => console.error(error),
- // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
- // })
- /* Complex Test: 1 to 1*/
- // let connectionRequest: ConnectionRequest = {
- // server: {
- // name: 'g1',
- // serverUrl: hostServer,
- // connectionType: 'GRPC',
- // messageToBePublishedfromApplication: new Subject<Message>()
- // },
- // client: {
- // name: 'g2',
- // targetServer: targetserver,
- // connectionType: 'GRPC',
- // messageToBeReceivedFromRemote: new Subject<Message>()
- // }
- // }
- // connectionService.generateConnection(connectionRequest)
- // setTimeout(() => {
- // let message = {
- // id: parsedMessages[10].appData.msgId,
- // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
- // }
- // connectionRequest.server.messageToBePublishedfromApplication.next(message)
- // }, 3000)
- // setTimeout(() => {
- // let message = {
- // id: parsedMessages[11].appData.msgId,
- // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
- // }
- // connectionRequest.server.messageToBePublishedfromApplication.next(message)
- // }, 4000)
- // Handler for the incoming Messages from the other side.
- // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
- // next: request => {
- // // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
- // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
- // generateFakeStreamResponse(request).subscribe({
- // next: (responseMessage: Message) => {
- // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
- // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
- // },
- // error: error => console.error(error),
- // complete: () => {
- // console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
- // }
- // })
- // } else {
- // array.push(request)
- // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
- // }
- // },
- // error: error => console.error(error),
- // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
- // })
- /* Simple Test: 1 to Many */
- let connectionRequest1: ConnectionRequest = {
- server: {
- name: 'G0',
- serverUrl: hostServer,
- connectionType: 'GRPC',
- messageToBePublishedFromApplication: new Subject<Message>()
- },
- client: {
- name: 'G1',
- targetServer: targetserver,
- connectionType: 'GRPC',
- messageToBeReceivedFromRemote: new Subject<Message>()
- }
- }
- let connectionRequest2: ConnectionRequest = {
- server: {
- name: 'G0',
- serverUrl: hostServer,
- connectionType: 'GRPC',
- messageToBePublishedFromApplication: new Subject<Message>()
- },
- client: {
- name: 'G2',
- targetServer: targetserver2,
- connectionType: 'GRPC',
- messageToBeReceivedFromRemote: new Subject<Message>()
- }
- }
- connectionService.generateConnection(connectionRequest1).then((res) => {
- // console.log(res)
- })
- connectionService.generateConnection(connectionRequest2).then((res) => {
- // console.log(res)
- })
- let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(50))
- generateFakeMessagesToBePublished.subscribe({
- next: message => {
- let payload: Message = {
- id: hostServer,
- message: message
- }
- // connectionRequest1.server!.messageToBePublishedFromApplication.next(payload)
- // connectionRequest2.server!.messageToBePublishedFromApplication.next(payload)
- },
- error: error => console.error(error),
- complete: () => console.log(`Completed`)
- })
- stream().subscribe({
- next: message => {
- let payload = {
- id: hostServer,
- message: message
- }
- connectionRequest1.server!.messageToBePublishedFromApplication.next(payload)
- connectionRequest2.server!.messageToBePublishedFromApplication.next(payload)
- }
- })
- // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
- // next: request => {
- // console.log(`Received ${(request.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
- // array.push(request)
- // },
- // error: error => console.error(error),
- // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
- // })
- // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
- // next: request => {
- // console.log(`Received ${(request.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
- // array.push(request)
- // },
- // error: error => console.error(error),
- // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
- // })
- /* Complex Test: 1 to Many */
- // let connectionRequest: ConnectionRequest = {
- // server: {
- // name: 'g1',
- // serverUrl: hostServer,
- // connectionType: 'GRPC',
- // messageToBePublishedfromApplication: new Subject<Message>()
- // },
- // client: {
- // name: 'g2',
- // targetServer: targetserver,
- // connectionType: 'GRPC',
- // messageToBeReceivedFromRemote: new Subject<Message>()
- // }
- // }
- // let connectionRequest2: ConnectionRequest = {
- // server: {
- // name: 'g1',
- // serverUrl: hostServer,
- // connectionType: 'GRPC',
- // messageToBePublishedfromApplication: new Subject<Message>()
- // },
- // client: {
- // name: 'g3',
- // targetServer: targetserver2,
- // connectionType: 'GRPC',
- // messageToBeReceivedFromRemote: new Subject<Message>()
- // }
- // }
- // connectionService.generateConnection(connectionRequest)
- // connectionService.generateConnection(connectionRequest2)
- // setTimeout(() => {
- // let message = {
- // id: parsedMessages[10].appData.msgId,
- // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
- // }
- // connectionRequest.server.messageToBePublishedfromApplication.next(message)
- // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
- // }, 3000)
- // setTimeout(() => {
- // let message = {
- // id: parsedMessages[11].appData.msgId,
- // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
- // }
- // connectionRequest.server.messageToBePublishedfromApplication.next(message)
- // connectionRequest2.server.messageToBePublishedfromApplication.next(message)
- // }, 4000)
- // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
- // next: request => {
- // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
- // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
- // generateFakeStreamResponse(request).subscribe({
- // next: (responseMessage: Message) => {
- // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
- // },
- // error: error => console.error(error),
- // complete: () => {
- // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
- // }
- // })
- // } else {
- // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
- // array.push(request)
- // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
- // }
- // },
- // error: error => console.error(error),
- // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
- // })
- // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
- // next: request => {
- // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
- // if ((request.message as MessageLog).appData.msgPayload == 'Query') {
- // generateFakeStreamResponse(request).subscribe({
- // next: (responseMessage: Message) => {
- // connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage)
- // },
- // error: error => console.error(error),
- // complete: () => {
- // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
- // }
- // })
- // } else {
- // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
- // array.push(request)
- // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
- // }
- // },
- // error: error => console.error(error),
- // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
- // })
- // this is just to publish an array of fake data as a Subject
- function stream(): Subject<any> {
- let result: Subject<any> = new Subject()
- let messages: any[] = parsedMessages
- let count = 0
- const intervalId = setInterval(() => {
- result.next(messages[count]);
- count++;
- if (count >= 300) {
- clearInterval(intervalId);
- result.complete();
- }
- }, intervalToStreamOutGoingMessage)
- return result
- }
- function generateFakeStreamResponse(request: any): Subject<any> {
- let res: Subject<any> = new Subject()
- stream().pipe(take(7)).subscribe({
- next: element => {
- let message = {
- id: request.id, // Caller's
- message: element
- }
- res.next(message)
- },
- error: error => console.error(error),
- complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
- })
- return res
- }
- /* Checking the values by the end of the test */
- interval(5000).subscribe(() => {
- console.log(`All received data: ${array.length}`);
- });
|