import * as fs from 'fs' import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs'; import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface'; import { GrpcService } from '../services/grpc.service'; import { FisRetransmissionService } from '../services/fis.retransmission.service'; import { GrpcServiceMethod } from '../services/service.method'; // Subject for bidirectional communication const errorHandlingService: FisRetransmissionService = new FisRetransmissionService() const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod()) const messagesJSON: any = fs.readFileSync('payload.json') let incomingMessages: Subject = grpcService.getIncomingMessage() let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial let messageToBeReleased: Subject = new Subject() // Sample message to be transmitted over to target server let applicationOutgoingResponse: Subject = new Subject() let statusControl: Subject = new Subject() // Listening for error events and states let targetserver: string = 'localhost:3001' let hostServer: string = 'localhost:3000' let array: any[] = [] // Used for testing // Handler for the incoming Messages from the other side. incomingMessages.subscribe({ next: request => { // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is if ((request.message as MessageLog).appData.msgPayload == 'Query') { generateFakeStreamResponse(request).subscribe({ next: (responseMessage) => { // console.log(`Processing request:${request.id}....`) applicationOutgoingResponse.next(responseMessage) }, error: error => console.error(error), complete: () => { console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite } }) } else { array.push(request) console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`) } }, error: error => console.error(error), complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) }) // Open channel for sending messages across. errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => { messageToBeReleased.next(messages) }) grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased) grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' }) setTimeout(() => { let message = { id: parsedMessages[10].appData.msgId, message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request } applicationOutgoingResponse.next(message) }, 2000) setTimeout(() => { let message = { id: parsedMessages[11].appData.msgId, message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request } applicationOutgoingResponse.next(message) }, 3000) setTimeout(() => { console.log(`All received data: ${array.length}`) }, 10000) setTimeout(() => { console.log(`All received data: ${array.length}`) }, 20000) // this is just to publish an array of fake data as a Subject function stream(): Subject { let result: Subject = new Subject() let messages: any[] = parsedMessages let count = 0 const intervalId = setInterval(() => { result.next(messages[count]); count++; if (count >= 1000) { clearInterval(intervalId); result.complete(); } }, 500) return result } function generateFakeStreamResponse(request: any): Subject { let res: Subject = new Subject() stream().pipe(take(7)).subscribe({ next: element => { let message = { id: request.id, // Caller's message: element } res.next(message) }, error: error => console.error(error), complete: () => console.log(`Stream response for ${request.id} has been prepared.`) }) return res } /* Extra NOTEs: So let's say when this host makes a request and send it over to the other side to process. And say this request is actually a query, so it will take some times for the other side to process the data. But what happens when the other side down, that means i won't get my query. Is this the responsibility of the application logic of the other side? To keep track of the message request sent over? Personal opinion it should be the responsibility of the application logic to keep track of the request they are processing, especially when the server goes down whilst they are streaning back the response. Because for this retransmission service, it shouldn't care anymore. It just make sure to get the message to pass over and buffer the message when established client instance could not talk to the server on the other side.*/