import * as fs from 'fs' import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs'; import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface'; import { GrpcService } from '../services/grpc.service'; import { FisRetransmissionService } from '../services/fis.retransmission.service'; import { GrpcServiceMethod } from '../services/service.method'; // Subject for bidirectional communication const errorHandlingService: FisRetransmissionService = new FisRetransmissionService() const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod()) const messagesJSON: any = fs.readFileSync('payload.json') let incomingMessages: Subject = grpcService.getIncomingMessage() let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial let messageToBeReleased: Subject = new Subject() // Sample message to be transmitted over to target server let applicationOutgoingResponse: Subject = new Subject() let statusControl: Subject = new Subject() // Listening for error events and states let targetserver: string = 'localhost:3000' let hostServer: string = 'localhost:3001' let array: any[] = [] incomingMessages.subscribe({ next: request => { // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is if ((request.message as MessageLog).appData.msgPayload == 'Query') { generateFakeStreamResponse(request).subscribe({ next: (responseMessage) => { // console.log(`Processing request:${request.id}....`) applicationOutgoingResponse.next(responseMessage) }, error: error => console.error(error), complete: () => { console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite } }) } else { array.push(request) console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`) } }, error: error => console.error(error), complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) }) // Open channel for sending messages across. errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => { messageToBeReleased.next(messages) }) grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased) grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' }) setTimeout(() => { console.log(`All received data: ${array.length}`) }, 10000) setTimeout(() => { console.log(`All received data: ${array.length}`) }, 20000) // this is just to publish an array of fake data as a Subject function stream(): Subject { let result: Subject = new Subject() let messages: any[] = parsedMessages let count = 0 const intervalId = setInterval(() => { result.next(messages[count]); count++; if (count >= 1000) { clearInterval(intervalId); result.complete(); } }, 500) return result } function generateFakeStreamResponse(request: any): Subject { let res: Subject = new Subject() stream().pipe(take(5)).subscribe({ next: element => { let message = { id: request.id, // Caller's message: element } res.next(message) }, error: error => console.error(error), complete: () => console.log(`Stream response for ${request.id} has been prepared.`) }) return res }