123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- import * as fs from 'fs'
- import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs';
- import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface';
- import { GrpcService } from '../services/grpc.service';
- import { FisRetransmissionService } from '../services/fis.retransmission.service';
- import { GrpcServiceMethod } from '../services/service.method';
- // Subject for bidirectional communication
- const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
- const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
- const messagesJSON: any = fs.readFileSync('payload.json')
- let incomingMessages: Subject<any> = grpcService.getIncomingMessage()
- let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
- let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
- let applicationOutgoingResponse: Subject<Message> = new Subject()
- let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
- let targetserver: string = 'localhost:3000'
- let hostServer: string = 'localhost:3001'
- let array: any[] = []// Used for testing
- // Handler for the incoming Messages from the other side.
- incomingMessages.subscribe({
- next: request => {
- // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
- if ((request.message as MessageLog).appData.msgPayload == 'Query') {
- generateFakeStreamResponse(request).subscribe({
- next: (responseMessage) => {
- // console.log(`Processing request:${request.id}....`)
- applicationOutgoingResponse.next(responseMessage)
- },
- error: error => console.error(error),
- complete: () => {
- console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
- }
- })
- } else {
- array.push(request)
- console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
- }
- },
- error: error => console.error(error),
- complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
- })
- // Open channel for sending messages across.
- errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
- messageToBeReleased.next(messages)
- })
- grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
- grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
- setTimeout(() => {
- let message = {
- id: parsedMessages[10].appData.msgId,
- message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
- }
- applicationOutgoingResponse.next(message)
- }, 2000)
- setTimeout(() => {
- let message = {
- id: parsedMessages[11].appData.msgId,
- message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
- }
- applicationOutgoingResponse.next(message)
- }, 3000)
- setTimeout(() => {
- console.log(`All received data: ${array.length}`)
- }, 10000)
- setTimeout(() => {
- console.log(`All received data: ${array.length}`)
- }, 20000)
- // this is just to publish an array of fake data as a Subject
- function stream(): Subject<any> {
- let result: Subject<any> = new Subject()
- let messages: any[] = parsedMessages
- let count = 0
- const intervalId = setInterval(() => {
- result.next(messages[count]);
- count++;
- if (count >= 1000) {
- clearInterval(intervalId);
- result.complete();
- }
- }, 500)
- return result
- }
- function generateFakeStreamResponse(request: any): Subject<any> {
- let res: Subject<any> = new Subject()
- stream().pipe(take(5)).subscribe({
- next: element => {
- let message = {
- id: request.id, // Caller's
- message: element
- }
- res.next(message)
- },
- error: error => console.error(error),
- complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
- })
- return res
- }
|