import * as fs from 'fs' import { Subject, groupBy, mergeMap, toArray } from 'rxjs'; import { ColorCode, GrpcMessage, MessageLog, ReportStatus } from '../interfaces/general.interface'; import { GrpcService } from '../services/grpc.service'; import { FisRetransmissionService } from '../services/fis.retransmission.service'; import { GrpcServiceMethod } from '../services/service.method'; // Subject for bidirectional communication const errorHandlingService: FisRetransmissionService = new FisRetransmissionService() const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod()) const messagesJSON: any = fs.readFileSync('payload.json') let incomingResponse: Subject = grpcService.getIncomingResponse() let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial let messageToBeReleased: Subject = new Subject() // Sample message to be transmitted over to target server let statusControl: Subject = new Subject() // Listening for error events and states let dataMessages = stream() // Emulate messges to be sent over to target server let server1: string = 'localhost:3000' let unaryRequestSubject: Subject = new Subject() let array: any[] = [] incomingResponse.subscribe({ next: request => { array.push(request) console.log(`To be distributed to request:${request.id} => message: ${(request.message as MessageLog).appData.msgId}`) /* Here's the plan. Since each and every single call that the client make, the server will open a stream specifically for that client. Now. A new strategy to cater to the revised solution. Everytime a client start up, it will first make a request call spefically to open a stream specifically for response. THe demulitplexing will be delegated outside. The grpc instance should not be bothered with hashydabery that's going on. Server side just need to receive the request, (must terminate the stream response if it eats up too much memory), and then respond with the stream established earlier when the client first send the first request call. New dilemma, there will be some changes to be followed as well. Since i want to integrate the termination of the stream and in this case, the stream will terminate right after it sends a acknoledgement saying that the request is being processed. But due to the reconnection logic integration, the client will treat server.end as a server failure, and will try to execute the reconnection logic. Should not be hard to change. */ } }) /* Server Streaming Test case */ errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => { messageToBeReleased.next(messages) }) grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' }) setTimeout(() => { let request = { id: parsedMessages[1].appData.msgId, message: parsedMessages[1] } unaryRequestSubject.next(request) }, 1000) setTimeout(() => { let request = { id: parsedMessages[2].appData.msgId, message: parsedMessages[2] } unaryRequestSubject.next(request) }, 3000) setTimeout(() => { let request = { id: parsedMessages[3].appData.msgId, message: parsedMessages[3] } unaryRequestSubject.next(request) }, 5000) setTimeout(() => { console.log(`Full amount received: ${array.length}`) }, 10000) setTimeout(() => { console.log(`Full amount received: ${array.length}`) }, 12000) // this is just to publish an array of fake data as a Subject function stream(): Subject { let result: Subject = new Subject() let messages: any[] = parsedMessages let count = 0 const intervalId = setInterval(() => { result.next(messages[count]); count++; if (count >= 1000) { clearInterval(intervalId); result.complete(); } }, 500) return result }