import { Subject, from, take, takeUntil } from "rxjs"; import { GrpcService } from "../services/grpc.service"; import * as fs from 'fs' import { FisRetransmissionService } from "../services/fis.retransmission.service"; import { ReportStatus } from "../interfaces/general.interface"; import { GrpcServiceMethod } from "../services/service.method"; const messagesJSON: any = fs.readFileSync('payload.json') const errorHandlingService: FisRetransmissionService = new FisRetransmissionService() const gprcService: GrpcService = new GrpcService(new GrpcServiceMethod()) let incomingRequest: Subject = gprcService.getIncomingRequest() let applicationOutgoingResponse: Subject = new Subject() let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial let dataMessages = stream() // Emulate messges to be sent over to target server let messageToBePublished: Subject = new Subject() let statusControl: Subject = new Subject() incomingRequest.subscribe({ next: request => { // this whole thing is for 1 request if (request.id == '0000') { console.log(`Just checking for buffer. Dont do anything else!`) } else { generateFakeStreamResponse(request).subscribe({ next: (responseMessage) => { // console.log(`Processing request:${request.id}....`) applicationOutgoingResponse.next(responseMessage) }, error: error => console.error(error), complete: () => { console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite } }) } }, error: error => console.error(error), complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`) }) /* For server streaming */ errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => { messageToBePublished.next(messages) }) let server1 = 'localhost:3000' gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }) // this is just to publish an array of fake data as a Subject function stream(): Subject { let result: Subject = new Subject() let messages: any[] = parsedMessages let count = 0 const intervalId = setInterval(() => { result.next(messages[count]); count++; if (count >= 1000) { clearInterval(intervalId); result.complete(); } }, 500) return result } function generateFakeStreamResponse(request: any): Subject { let res: Subject = new Subject() stream().pipe(take(10)).subscribe({ next: element => { let message = { id: request.id, // Caller's message: element } res.next(message) }, error: error => console.error(error), complete: () => console.log(`Stream response for ${request.id} has been prepared.`) }) return res }