import { Subject, from, interval, take } from 'rxjs'; import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface'; import { GrpcServiceMethod } from '../services/grpc.service.method'; import { readFileSync } from 'fs'; import { ServerClientManager } from '../services/server-client.service'; import mongoose from 'mongoose'; // Connect to MongoDB // mongoose.connect('mongodb://localhost:27017/grpc1') // const Message = mongoose.model('Message', require('../models/message.schema')) // Subject for bidirectional communication const connectionService: ServerClientManager = new ServerClientManager() const messagesJSON: any = readFileSync('payload.json') let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial let targetserver: string = 'localhost:3001' let targetserver2: string = 'localhost:3002' let hostServer: string = 'localhost:3000' let array: any[] = [] // Used for testing let intervalToStreamOutGoingMessage: number = 15 /* Simple Test: 1 to 1 */ // let connectionRequest: ConnectionRequest = { // server: { // name: 'g1', // serverUrl: hostServer, // connectionType: 'GRPC', // messageToBePublishedFromApplication: new Subject() // }, // client: { // name: 'g2', // targetServer: targetserver, // connectionType: 'GRPC', // messageToBeReceivedFromRemote: new Subject() // } // } // connectionService.generateConnection(connectionRequest) // let generateFakeMessagesToBePublished = stream().pipe(take(1000)) // let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(5000)) // generateFakeMessagesToBePublished.subscribe({ // next: message => { // let payload: Message = { // id: hostServer, // message: message // } // connectionRequest.server.messageToBePublishedFromApplication.next(payload) // }, // error: error => console.error(error), // complete: () => console.log(`Completed`) // }) // stream().subscribe({ // next: message => { // let payload = { // id: hostServer, // message: message // } // connectionRequest.server.messageToBePublishedFromApplication.next(payload) // } // }) // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({ // next: response => { // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // // Message.create(response) // array.push(response) // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) /* Complex Test: 1 to 1*/ // let connectionRequest: ConnectionRequest = { // server: { // name: 'g1', // serverUrl: hostServer, // connectionType: 'GRPC', // messageToBePublishedfromApplication: new Subject() // }, // client: { // name: 'g2', // targetServer: targetserver, // connectionType: 'GRPC', // messageToBeReceivedFromRemote: new Subject() // } // } // connectionService.generateConnection(connectionRequest) // setTimeout(() => { // let message = { // id: parsedMessages[10].appData.msgId, // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request // } // connectionRequest.server.messageToBePublishedfromApplication.next(message) // }, 3000) // setTimeout(() => { // let message = { // id: parsedMessages[11].appData.msgId, // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request // } // connectionRequest.server.messageToBePublishedfromApplication.next(message) // }, 4000) // Handler for the incoming Messages from the other side. // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is // if ((request.message as MessageLog).appData.msgPayload == 'Query') { // generateFakeStreamResponse(request).subscribe({ // next: (responseMessage: Message) => { // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage) // }, // error: error => console.error(error), // complete: () => { // console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite // } // }) // } else { // array.push(request) // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`) // } // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) /* Simple Test: 1 to Many */ let connectionRequest1: ConnectionRequest = { server: { name: 'G0', serverUrl: hostServer, connectionType: 'GRPC', messageToBePublishedFromApplication: new Subject() }, client: { name: 'G1', targetServer: targetserver, connectionType: 'GRPC', messageToBeReceivedFromRemote: new Subject() } } let connectionRequest2: ConnectionRequest = { server: { name: 'G0', serverUrl: hostServer, connectionType: 'GRPC', messageToBePublishedFromApplication: new Subject() }, client: { name: 'G2', targetServer: targetserver2, connectionType: 'GRPC', messageToBeReceivedFromRemote: new Subject() } } connectionService.generateConnection(connectionRequest1).then((res) => { // console.log(res) }) connectionService.generateConnection(connectionRequest2).then((res) => { // console.log(res) }) let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(50)) generateFakeMessagesToBePublished.subscribe({ next: message => { let payload: Message = { id: hostServer, message: message } // connectionRequest1.server!.messageToBePublishedFromApplication.next(payload) // connectionRequest2.server!.messageToBePublishedFromApplication.next(payload) }, error: error => console.error(error), complete: () => console.log(`Completed`) }) stream().subscribe({ next: message => { let payload = { id: hostServer, message: message } connectionRequest1.server!.messageToBePublishedFromApplication.next(payload) connectionRequest2.server!.messageToBePublishedFromApplication.next(payload) } }) // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // console.log(`Received ${(request.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // array.push(request) // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // console.log(`Received ${(request.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // array.push(request) // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) /* Complex Test: 1 to Many */ // let connectionRequest: ConnectionRequest = { // server: { // name: 'g1', // serverUrl: hostServer, // connectionType: 'GRPC', // messageToBePublishedfromApplication: new Subject() // }, // client: { // name: 'g2', // targetServer: targetserver, // connectionType: 'GRPC', // messageToBeReceivedFromRemote: new Subject() // } // } // let connectionRequest2: ConnectionRequest = { // server: { // name: 'g1', // serverUrl: hostServer, // connectionType: 'GRPC', // messageToBePublishedfromApplication: new Subject() // }, // client: { // name: 'g3', // targetServer: targetserver2, // connectionType: 'GRPC', // messageToBeReceivedFromRemote: new Subject() // } // } // connectionService.generateConnection(connectionRequest) // connectionService.generateConnection(connectionRequest2) // setTimeout(() => { // let message = { // id: parsedMessages[10].appData.msgId, // message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request // } // connectionRequest.server.messageToBePublishedfromApplication.next(message) // connectionRequest2.server.messageToBePublishedfromApplication.next(message) // }, 3000) // setTimeout(() => { // let message = { // id: parsedMessages[11].appData.msgId, // message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request // } // connectionRequest.server.messageToBePublishedfromApplication.next(message) // connectionRequest2.server.messageToBePublishedfromApplication.next(message) // }, 4000) // connectionRequest.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is // if ((request.message as MessageLog).appData.msgPayload == 'Query') { // generateFakeStreamResponse(request).subscribe({ // next: (responseMessage: Message) => { // connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage) // }, // error: error => console.error(error), // complete: () => { // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite // } // }) // } else { // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // array.push(request) // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`) // } // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) // connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({ // next: request => { // // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is // if ((request.message as MessageLog).appData.msgPayload == 'Query') { // generateFakeStreamResponse(request).subscribe({ // next: (responseMessage: Message) => { // connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage) // }, // error: error => console.error(error), // complete: () => { // console.log(`Stream request for ${request.id} is queued.`) // should be indefinite // } // }) // } else { // console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`) // array.push(request) // console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`) // } // }, // error: error => console.error(error), // complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) // }) // this is just to publish an array of fake data as a Subject function stream(): Subject { let result: Subject = new Subject() let messages: any[] = parsedMessages let count = 0 const intervalId = setInterval(() => { result.next(messages[count]); count++; if (count >= 300) { clearInterval(intervalId); result.complete(); } }, intervalToStreamOutGoingMessage) return result } function generateFakeStreamResponse(request: any): Subject { let res: Subject = new Subject() stream().pipe(take(7)).subscribe({ next: element => { let message = { id: request.id, // Caller's message: element } res.next(message) }, error: error => console.error(error), complete: () => console.log(`Stream response for ${request.id} has been prepared.`) }) return res } /* Checking the values by the end of the test */ interval(5000).subscribe(() => { console.log(`All received data: ${array.length}`); });