retransmission.service.ts 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import { BehaviorSubject, buffer, concatMap, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
  2. import { BaseMessage } from "../dependencies/logging/dependencies/msgutil/dependencies/dependencies";
  3. import { RetransmissionInterface } from "../interfaces/retransmission.interface";
  4. import { WrappedMessage } from "../interfaces/general.interface";
  5. import { sortMessageBasedOnDate } from "./utility/message-ordering";
  6. export class RetransmissionService implements RetransmissionInterface {
  7. private sortMessage: boolean = false
  8. private bufferReleaseSignal: Subject<void> = new Subject()
  9. private receiverConnectionState: BehaviorSubject<'OFFLINE' | 'ONLINE'> = new BehaviorSubject('OFFLINE')
  10. private transmissionState: BehaviorSubject<'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'> = new BehaviorSubject('ARRAY EMPTY')
  11. private arrayToBeTransmitted: Subject<WrappedMessage[]> = new Subject()
  12. private toBeWrapped: Subject<any> = new Subject()
  13. private wrappedMessageToBeBuffered: Subject<WrappedMessage> = new Subject()
  14. private messageToBeTransmitted: Subject<WrappedMessage> = new Subject()
  15. // Interface
  16. public retransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<any>, messageOrdering?: boolean): Observable<any> {
  17. return new Observable((observer) => {
  18. if (messageOrdering) {
  19. this.sortMessage = true
  20. console.log(`Message ordering is set to ${this.sortMessage}`)
  21. }
  22. eventListener.subscribe(event => this.receiverConnectionState.next(event))
  23. this.startWrappingOperation()
  24. this.startBufferTransmisionProcess()
  25. this.releaseSignalManager()
  26. payloadToBeTransmitted.subscribe((message) => {
  27. this.toBeWrapped.next(message)
  28. })
  29. this.messageToBeTransmitted.subscribe(message => observer.next(message))
  30. })
  31. }
  32. private startWrappingOperation() {
  33. let currentMessageId: string | null
  34. this.toBeWrapped.subscribe(message => {
  35. this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, currentMessageId ? currentMessageId : null))
  36. currentMessageId = message.header.messageID
  37. })
  38. //simulate connection test
  39. // wrappedMessageToBeBuffered will then be pushed to buffer
  40. this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
  41. console.log(bufferedMessages.length + ' buffered messages')
  42. console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
  43. // arrayToBeTransmitted.next(sortMessage(bufferedMessages))
  44. this.arrayToBeTransmitted.next(this.sortMessage && bufferedMessages.length > 0 ? sortMessageBasedOnDate(bufferedMessages) : [])
  45. });
  46. }
  47. private wrapMessageWithTimeReceived(message: any, previousMessageID: string): WrappedMessage {
  48. // check if message has already a time received property if so no need to add anymore
  49. if (!message.timeReceived) {
  50. let WrappedMessage: WrappedMessage = {
  51. timeReceived: new Date(),
  52. payload: message as BaseMessage,
  53. previousMessageID: previousMessageID
  54. }
  55. return WrappedMessage
  56. } else {
  57. return message as WrappedMessage
  58. }
  59. }
  60. private startBufferTransmisionProcess() {
  61. console.log(`StartBufferTransmissionProcess`)
  62. this.arrayToBeTransmitted.subscribe(array => {
  63. if (array.length > 0) {
  64. this.transmissionState.next('TRANSMITTING')
  65. from(array).subscribe({
  66. next: (message: WrappedMessage) => {
  67. if (this.receiverConnectionState.getValue() == 'OFFLINE') {
  68. // buffer this message. Flush it back to buffer
  69. this.wrappedMessageToBeBuffered.next(message)
  70. }
  71. if (this.receiverConnectionState.getValue() == 'ONLINE') {
  72. this.messageToBeTransmitted.next(message)
  73. }
  74. },
  75. error: err => console.error(err),
  76. complete: () => {
  77. // update transmission state to indicate this batch is completed
  78. console.log(`Processing buffered array completed. Changing transmission state to ARRAY EMPTY`);
  79. this.transmissionState.next('ARRAY EMPTY');
  80. if (this.receiverConnectionState.getValue() === 'ONLINE' && this.transmissionState.getValue() === 'ARRAY EMPTY') {
  81. setTimeout(() => {
  82. this.bufferReleaseSignal.next()
  83. }, 1000)
  84. }
  85. // Do nothing if the receiver connection is offline
  86. }
  87. });
  88. } else {
  89. // If I don't do setTimeout, then bufferrelasesignal will be overloaded
  90. setTimeout(() => {
  91. this.bufferReleaseSignal.next()
  92. }, 3000)
  93. }
  94. }
  95. )
  96. }
  97. private releaseSignalManager() {
  98. this.receiverConnectionState.pipe(
  99. distinctUntilChanged()
  100. ).subscribe(clientState => {
  101. console.log(`Client is now ${clientState}`)
  102. if (clientState == 'OFFLINE') {
  103. console.log(`Current transmission state: ${this.transmissionState.getValue()}`)
  104. // just keep buffering
  105. }
  106. if (clientState == 'ONLINE') {
  107. console.log(`Current transmission state: ${this.transmissionState.getValue()}`)
  108. // get the stored messages to pump it back into the buffer to be ready to be processed immediately
  109. if (this.transmissionState.getValue() == 'ARRAY EMPTY') {
  110. this.bufferReleaseSignal.next()
  111. }
  112. }
  113. })
  114. }
  115. }