12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- import * as fs from 'fs'
- import { Subject, groupBy, mergeMap, toArray } from 'rxjs';
- import { ColorCode, ReportStatus } from '../interfaces/general.interface';
- import { GrpcService } from '../services/grpc.service';
- import { FisRetransmissionService } from '../services/fis.retransmission.service';
- // Subject for bidirectional communication
- const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
- const grpcService: GrpcService = new GrpcService()
- const messagesJSON: any = fs.readFileSync('payload.json')
- let incomingResponse: Subject<any> = grpcService.getIncomingResponse()
- let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial
- let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
- let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
- let dataMessages = stream() // Emulate messges to be sent over to target server
- let server1: string = 'localhost:3000'
- let unaryRequestSubject: Subject<any> = new Subject()
- let array: any[] = []
- incomingResponse.subscribe({
- next: request => {
- console.log(`To be distributed to request:${request.id} => message: ${request.message.appData.msgId}`)
- array.push(request)
- // Have to create a function that creates observables/subjects corresponding to the request made by the client to stream the responses
- /* now is one request will have it's own listener. If the client is down all listeners instantiated from all the request are terminated.
- Server now doesn't care because when it proceses all the request, the response are merged into one channel. My not so clever solution is that
- when client starts, it wil have to first send a request specifically just to grab the data that was loss, and then let the internal
- application do the sorting */
- /* To really see how this work, i will create an array. Since i wil pump in 3 request, the server side is hardcoded atm to stream 10 messagse
- for one request. I am going to terminate the client halfway through, change the code, so that when i start the client again, it will only send
- 1 request over, there making another request for 10 more. But of course, in the real implementation, this can be a initializer just to see if
- there's any buffered messages to be sent over
- */
- }
- })
- /* Server Streaming Test case */
- errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => {
- messageToBeReleased.next(messages)
- })
- grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
- messageToBeReleased.next(parsedMessages[0])
- setTimeout(() => {
- messageToBeReleased.next(parsedMessages[1])
- }, 2000)
- setTimeout(() => {
- messageToBeReleased.next(parsedMessages[2])
- }, 3000)
- setTimeout(() => {
- messageToBeReleased.next(parsedMessages[3])
- }, 5000)
- setTimeout(() => {
- console.log(`Total messages received: ${array.length}`)
- }, 11000)
- // this is just to publish an array of fake data as a Subject
- function stream(): Subject<any> {
- let result: Subject<any> = new Subject()
- let messages: any[] = parsedMessages
- let count = 0
- const intervalId = setInterval(() => {
- result.next(messages[count]);
- count++;
- if (count >= 1000) {
- clearInterval(intervalId);
- result.complete();
- }
- }, 500)
- return result
- }
|