1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import { Subject, from, take, takeUntil } from "rxjs";
- import { GrpcService } from "../services/grpc.service";
- import * as fs from 'fs'
- import { FisRetransmissionService } from "../services/fis.retransmission.service";
- import { ReportStatus } from "../interfaces/general.interface";
- const messagesJSON: any = fs.readFileSync('payload.json')
- const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
- const gprcService: GrpcService = new GrpcService()
- let incomingRequest: Subject<any> = gprcService.getIncomingRequest()
- let applicationOutgoingResponse: Subject<any> = new Subject()
- let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
- let dataMessages = stream() // Emulate messges to be sent over to target server
- let messageToBePublished: Subject<any> = new Subject()
- let statusControl: Subject<ReportStatus> = new Subject()
- incomingRequest.subscribe({
- next: request => { // this whole thing is for 1 request
- if (request.id == '0000') {
- console.log(`Just checking for buffer. Dont do anything else!`)
- } else {
- generateFakeStreamResponse(request).subscribe({
- next: (responseMessage) => {
- // console.log(`Processing request:${request.id}....`)
- applicationOutgoingResponse.next(responseMessage)
- },
- error: error => console.error(error),
- complete: () => {
- console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
- }
- })
- }
- },
- error: error => console.error(error),
- complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
- })
- /* For server streaming */
- errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
- messageToBePublished.next(messages)
- })
- let server1 = 'localhost:3000'
- gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' })
- // this is just to publish an array of fake data as a Subject
- function stream(): Subject<any> {
- let result: Subject<any> = new Subject()
- let messages: any[] = parsedMessages
- let count = 0
- const intervalId = setInterval(() => {
- result.next(messages[count]);
- count++;
- if (count >= 1000) {
- clearInterval(intervalId);
- result.complete();
- }
- }, 500)
- return result
- }
- function generateFakeStreamResponse(request: any): Subject<any> {
- let res: Subject<any> = new Subject()
- stream().pipe(take(10)).subscribe({
- next: element => {
- let message = {
- id: request.id, // Caller's
- message: element
- }
- res.next(message)
- },
- error: error => console.error(error),
- complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
- })
- return res
- }
|