1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import * as fs from 'fs'
- import { Subject, groupBy, mergeMap, toArray } from 'rxjs';
- import { ColorCode, GrpcMessage, MessageLog, ReportStatus } from '../interfaces/general.interface';
- import { GrpcService } from '../services/grpc.service';
- import { FisRetransmissionService } from '../services/fis.retransmission.service';
- import { GrpcServiceMethod } from '../services/service.method';
- // Subject for bidirectional communication
- const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
- const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
- const messagesJSON: any = fs.readFileSync('payload.json')
- let incomingResponse: Subject<any> = grpcService.getIncomingResponse()
- let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
- let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
- let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
- let dataMessages = stream() // Emulate messges to be sent over to target server
- let server1: string = 'localhost:3000'
- let unaryRequestSubject: Subject<GrpcMessage> = new Subject()
- let array: any[] = []
- incomingResponse.subscribe({
- next: request => {
- array.push(request)
- console.log(`To be distributed to request:${request.id} => message: ${(request.message as MessageLog).appData.msgId}`)
- /* Here's the plan. Since each and every single call that the client make, the server will open a stream specifically for that
- client. Now. A new strategy to cater to the revised solution. Everytime a client start up, it will first make a request call
- spefically to open a stream specifically for response. THe demulitplexing will be delegated outside. The grpc instance should
- not be bothered with hashydabery that's going on. Server side just need to receive the request, (must terminate the stream
- response if it eats up too much memory), and then respond with the stream established earlier when the client first send the
- first request call.
- New dilemma, there will be some changes to be followed as well. Since i want to integrate the termination of the stream and in
- this case, the stream will terminate right after it sends a acknoledgement saying that the request is being processed. But due
- to the reconnection logic integration, the client will treat server.end as a server failure, and will try to execute the
- reconnection logic. Should not be hard to change.
- */
- }
- })
- /* Server Streaming Test case */
- errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => {
- messageToBeReleased.next(messages)
- })
- grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
- setTimeout(() => {
- let request = {
- id: parsedMessages[1].appData.msgId,
- message: parsedMessages[1]
- }
- unaryRequestSubject.next(request)
- }, 1000)
- setTimeout(() => {
- let request = {
- id: parsedMessages[2].appData.msgId,
- message: parsedMessages[2]
- }
- unaryRequestSubject.next(request)
- }, 3000)
- setTimeout(() => {
- let request = {
- id: parsedMessages[3].appData.msgId,
- message: parsedMessages[3]
- }
- unaryRequestSubject.next(request)
- }, 5000)
- setTimeout(() => {
- console.log(`Full amount received: ${array.length}`)
- }, 10000)
- setTimeout(() => {
- console.log(`Full amount received: ${array.length}`)
- }, 12000)
- // this is just to publish an array of fake data as a Subject
- function stream(): Subject<any> {
- let result: Subject<any> = new Subject()
- let messages: any[] = parsedMessages
- let count = 0
- const intervalId = setInterval(() => {
- result.next(messages[count]);
- count++;
- if (count >= 1000) {
- clearInterval(intervalId);
- result.complete();
- }
- }, 500)
- return result
- }
|