retransmission.service.ts 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import { BehaviorSubject, buffer, concatMap, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
  2. import { BaseMessage } from "../dependencies/logging/dependencies/msgutil/dependencies/dependencies";
  3. import { RetransmissionInterface } from "../interfaces/retransmission.interface";
  4. import { WrappedMessage } from "../interfaces/general.interface";
  5. import { sortMessageBasedOnDate } from "./utility/message-ordering";
  6. import { v4 as uuidV4 } from 'uuid';
  7. export class RetransmissionService implements RetransmissionInterface {
  8. private currentMessageId: string | null
  9. private sortMessage: boolean = false
  10. private bufferReleaseSignal: Subject<void> = new Subject()
  11. private receiverConnectionState: BehaviorSubject<'OFFLINE' | 'ONLINE'> = new BehaviorSubject('OFFLINE')
  12. private transmissionState: BehaviorSubject<'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'> = new BehaviorSubject('ARRAY EMPTY')
  13. private arrayToBeTransmitted: Subject<WrappedMessage[]> = new Subject()
  14. private toBeWrapped: Subject<any> = new Subject()
  15. private wrappedMessageToBeBuffered: Subject<WrappedMessage> = new Subject()
  16. private messageToBeTransmitted: Subject<WrappedMessage> = new Subject()
  17. // Interface
  18. public retransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<any>, messageOrdering?: boolean): Observable<WrappedMessage> {
  19. return new Observable((observer) => {
  20. if (messageOrdering) {
  21. this.sortMessage = true
  22. console.log(`Message ordering is set to ${this.sortMessage}`)
  23. }
  24. eventListener.subscribe(event => this.receiverConnectionState.next(event))
  25. this.startWrappingOperation()
  26. this.startBufferTransmisionProcess()
  27. this.releaseSignalManager()
  28. payloadToBeTransmitted.subscribe((message) => {
  29. this.toBeWrapped.next(message)
  30. })
  31. // these ones are the ones that are exited out from the bufferey'
  32. this.messageToBeTransmitted.subscribe(message => observer.next(message))
  33. })
  34. }
  35. private startWrappingOperation() {
  36. this.toBeWrapped.subscribe(message => {
  37. this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, this.currentMessageId ? this.currentMessageId : null))
  38. })
  39. //simulate connection test
  40. // wrappedMessageToBeBuffered will then be pushed to buffer
  41. this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
  42. // console.log(bufferedMessages.length + ' buffered messages')
  43. // console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
  44. // arrayToBeTransmitted.next(sortMessage(bufferedMessages))
  45. this.arrayToBeTransmitted.next((this.sortMessage && bufferedMessages.length > 0) ? sortMessageBasedOnDate(bufferedMessages) : bufferedMessages)
  46. });
  47. }
  48. private wrapMessageWithTimeReceived(message: any, previousMessageID: string | null): WrappedMessage {
  49. // check if message has already a time received property if so no need to add anymore
  50. if (!message.timeReceived) {
  51. let WrappedMessage: WrappedMessage = {
  52. timeReceived: new Date(),
  53. payload: message as BaseMessage,
  54. thisMessageID: uuidV4(),
  55. previousMessageID: previousMessageID
  56. }
  57. // console.log(`Current`, WrappedMessage.thisMessageID, 'Previous for this message:', WrappedMessage.previousMessageID)
  58. this.currentMessageId = WrappedMessage.thisMessageID
  59. // console.log(`Updating: `, this.currentMessageId)
  60. return WrappedMessage
  61. } else {
  62. return message as WrappedMessage
  63. }
  64. }
  65. private startBufferTransmisionProcess() {
  66. console.log(`StartBufferTransmissionProcess`)
  67. this.arrayToBeTransmitted.subscribe(array => {
  68. if (array.length > 0) {
  69. this.transmissionState.next('TRANSMITTING')
  70. from(array).subscribe({
  71. next: (message: WrappedMessage) => {
  72. if (this.receiverConnectionState.getValue() == 'OFFLINE') {
  73. // buffer this message. Flush it back to buffer
  74. this.wrappedMessageToBeBuffered.next(message)
  75. }
  76. if (this.receiverConnectionState.getValue() == 'ONLINE') {
  77. this.messageToBeTransmitted.next(message)
  78. }
  79. },
  80. error: err => console.error(err),
  81. complete: () => {
  82. // update transmission state to indicate this batch is completed
  83. // console.log(`Processing buffered array completed. Changing transmission state to ARRAY EMPTY`);
  84. this.transmissionState.next('ARRAY EMPTY');
  85. if (this.receiverConnectionState.getValue() === 'ONLINE' && this.transmissionState.getValue() === 'ARRAY EMPTY') {
  86. setTimeout(() => {
  87. this.bufferReleaseSignal.next()
  88. }, 1000)
  89. }
  90. // Do nothing if the receiver connection is offline
  91. }
  92. });
  93. } else {
  94. // If I don't do setTimeout, then bufferrelasesignal will be overloaded
  95. setTimeout(() => {
  96. this.bufferReleaseSignal.next()
  97. }, 3000)
  98. }
  99. }
  100. )
  101. }
  102. private releaseSignalManager() {
  103. this.receiverConnectionState.pipe(
  104. distinctUntilChanged()
  105. ).subscribe(clientState => {
  106. console.log(`Client is now ${clientState}`)
  107. if (clientState == 'OFFLINE') {
  108. console.log(`Current transmission state: ${this.transmissionState.getValue()}`)
  109. // just keep buffering
  110. }
  111. if (clientState == 'ONLINE') {
  112. console.log(`Current transmission state: ${this.transmissionState.getValue()}`)
  113. // get the stored messages to pump it back into the buffer to be ready to be processed immediately
  114. if (this.transmissionState.getValue() == 'ARRAY EMPTY') {
  115. this.bufferReleaseSignal.next()
  116. }
  117. }
  118. })
  119. }
  120. }