import * as fs from 'fs' import { Subject, groupBy, mergeMap, toArray } from 'rxjs'; import { ColorCode, ReportStatus } from '../interfaces/general.interface'; import { GrpcService } from '../services/grpc.service'; import { FisRetransmissionService } from '../services/fis.retransmission.service'; // Subject for bidirectional communication const errorHandlingService: FisRetransmissionService = new FisRetransmissionService() const grpcService: GrpcService = new GrpcService() const messagesJSON: any = fs.readFileSync('payload.json') let incomingResponse: Subject = grpcService.getIncomingResponse() let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial let messageToBeReleased: Subject = new Subject() // Sample message to be transmitted over to target server let statusControl: Subject = new Subject() // Listening for error events and states let dataMessages = stream() // Emulate messges to be sent over to target server let server1: string = 'localhost:3000' let unaryRequestSubject: Subject = new Subject() let array: any[] = [] incomingResponse.subscribe({ next: request => { console.log(`To be distributed to request:${request.id} => message: ${request.message.appData.msgId}`) array.push(request) // Have to create a function that creates observables/subjects corresponding to the request made by the client to stream the responses /* now is one request will have it's own listener. If the client is down all listeners instantiated from all the request are terminated. Server now doesn't care because when it proceses all the request, the response are merged into one channel. My not so clever solution is that when client starts, it wil have to first send a request specifically just to grab the data that was loss, and then let the internal application do the sorting */ /* To really see how this work, i will create an array. Since i wil pump in 3 request, the server side is hardcoded atm to stream 10 messagse for one request. I am going to terminate the client halfway through, change the code, so that when i start the client again, it will only send 1 request over, there making another request for 10 more. But of course, in the real implementation, this can be a initializer just to see if there's any buffered messages to be sent over */ } }) /* Server Streaming Test case */ errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => { messageToBeReleased.next(messages) }) grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' }) messageToBeReleased.next(parsedMessages[0]) setTimeout(() => { messageToBeReleased.next(parsedMessages[1]) }, 2000) setTimeout(() => { messageToBeReleased.next(parsedMessages[2]) }, 3000) setTimeout(() => { messageToBeReleased.next(parsedMessages[3]) }, 5000) setTimeout(() => { console.log(`Total messages received: ${array.length}`) }, 11000) // this is just to publish an array of fake data as a Subject function stream(): Subject { let result: Subject = new Subject() let messages: any[] = parsedMessages let count = 0 const intervalId = setInterval(() => { result.next(messages[count]); count++; if (count >= 1000) { clearInterval(intervalId); result.complete(); } }, 500) return result }