| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 | 
							- import { buffer, bufferCount, bufferTime, bufferToggle, catchError, combineLatest, filter, finalize, from, map, mapTo, merge, mergeAll, Observable, of, startWith, Subject, Subscriber, Subscription, switchMap, take, takeUntil, tap, throwError, timer } from "rxjs";
 
- import { ClientNotificationState } from "../../interfaces/general.interface";
 
- import { BaseMessage } from "../../dependencies/logging/services/logging-service";
 
- export function rxjsBufferToggle(sourceObs: Subject<BaseMessage>, notificationObs: Subject<ClientNotificationState>): Observable<BaseMessage | BaseMessage[]> {
 
-     let releaseTrigger = new Subject<any>()
 
-     let alreadyDisconnected = false
 
-     const messageStream: Observable<BaseMessage | BaseMessage[]> = notificationObs.pipe(
 
-         map(notif => {
 
-             if (notif.message == 'Disconnected' && !alreadyDisconnected) {
 
-                 alreadyDisconnected = true
 
-                 return true
 
-             }
 
-             // if (notif.message == 'Subscribed' || notif.status == 'ONLINE' || notif.message == 'Reconnected') {
 
-             else if (notif.message == 'Subscribed') {
 
-                 return false
 
-             } else {
 
-                 return null
 
-             }
 
-         }),
 
-         switchMap(toBuffer => {
 
-             if (toBuffer) {
 
-                 console.log(`Rxjs Buffering...`)
 
-                 return sourceObs.pipe(
 
-                     // tap(value => console.log(`rxjs buffering ${value?.header?.messageID ?? 'unknown'}`)),
 
-                     bufferToggle(
 
-                         // bufferTrigger.pipe(startWith(0)),
 
-                         // of(0),
 
-                         from([0]),
 
-                         () => releaseTrigger
 
-                     ));
 
-             } else {
 
-                 console.log(`Rxjs Releasing...`)
 
-                 return sourceObs
 
-             }
 
-         })
 
-     )
 
-     // only for triggering when to stop buffering
 
-     notificationObs.subscribe((notification: ClientNotificationState) => {
 
-         let status: 'Online' | 'Offline-NotSubscribed' | 'Offline-Subscribed' | null = null
 
-         if (notification.message == 'Reconnected') {
 
-             status = 'Online'
 
-         }
 
-         if (notification.message == 'Disconnected' && status != `Offline-NotSubscribed`) {
 
-             status = 'Offline-NotSubscribed'
 
-         }
 
-         // Technically speaking, we only ever need this, just for the release signal. 
 
-         if (notification.message == 'Subscribed' && status != 'Offline-Subscribed') {
 
-             releaseTrigger.next('Release')
 
-             status = 'Offline-Subscribed'
 
-         }
 
-         if (notification.message == 'Subscribed' && status != 'Offline-NotSubscribed') {
 
-             console.log(`Retransmission already subscribed to buffer output. `)
 
-         }
 
-     })
 
-     return messageStream
 
- }
 
- // https://rxjs.dev/api/index/function/buffer
 
- export function rxjsBufferDefault(sourceObs: Subject<BaseMessage>, releaseSignal: Observable<any>): Observable<BaseMessage[]> {
 
-     return new Observable((observer: Subscriber<BaseMessage[]>) => {
 
-         let bufferedMessages = sourceObs.pipe(buffer(releaseSignal));
 
-         bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => {
 
-             observer.next(bufferedMessages)
 
-         });
 
-     })
 
- }
 
- // https://rxjs.dev/api/index/function/bufferTime
 
- export function rxjsBufferTime(sourceObs: Subject<BaseMessage>, duration: number): Observable<BaseMessage[]> {
 
-     return new Observable((observer: Subscriber<BaseMessage[]>) => {
 
-         let bufferedMessages = sourceObs.pipe(bufferTime(duration))
 
-         bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => {
 
-             observer.next(bufferedMessages)
 
-         })
 
-     })
 
- }
 
- // https://rxjs.dev/api/index/function/bufferCount
 
- export function rxjsBufferCount(sourceObs: Subject<BaseMessage>, bufferlimit: number): Observable<BaseMessage[]> {
 
-     return new Observable((observer: Subscriber<BaseMessage[]>) => {
 
-         let bufferedMessages = sourceObs.pipe(bufferCount(bufferlimit))
 
-         bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => {
 
-             observer.next(bufferedMessages)
 
-         })
 
-     })
 
- }
 
- // Only the first one is working. The others like buffer duration and buffersize limit is still faulty. Please refrain from using these for now.
 
- export function rxjsBuffer(sourceObs: Subject<BaseMessage>, releaseSignal: Observable<any>, bufferDuration?: number, bufferSizeLimit?: number, messageToBeLogged?: Subject<BaseMessage>): Observable<BaseMessage[]> {
 
-     return new Observable((observer: Subscriber<BaseMessage[]>) => {
 
-         if (!bufferDuration && !bufferSizeLimit) {
 
-             console.log(`Default Rxjs Buffering....`)
 
-             sourceObs.pipe(buffer(releaseSignal)).subscribe((bufferedMessages: BaseMessage[]) => {
 
-                 observer.next(bufferedMessages)
 
-             });
 
-         }
 
-         if (bufferDuration && !bufferSizeLimit) {
 
-             console.log(`Rxjs Buffering with bufferDuration: ${bufferDuration}`)
 
-             // Observable that emits after a certain period (e.g., 5 seconds)
 
-             let stopBuffering = timer(bufferDuration)
 
-             // let stopBuffering = () => timer(bufferDuration)
 
-             let releaseBuffering = merge(stopBuffering, releaseSignal)
 
-             sourceObs.pipe(
 
-                 bufferToggle(of(0), () => releaseBuffering)
 
-             ).subscribe(((bufferedMessage: BaseMessage[]) => {
 
-                 console.log(`Buffering Duration Exceeded. Please don't send me any more data.`)
 
-                 // Should emit an event that duration limit has been exceeded
 
-                 if (messageToBeLogged) {
 
-                     bufferedMessage.forEach((message: BaseMessage) => {
 
-                         messageToBeLogged.next(message)
 
-                     })
 
-                 } else {
 
-                     observer.next(bufferedMessage)
 
-                 }
 
-             }))
 
-         }
 
-         if (!bufferDuration && bufferSizeLimit) {
 
-             console.log(`rxjs Buffering with buffer limit: ${bufferSizeLimit}`)
 
-             sourceObs.pipe(
 
-                 bufferCount(bufferSizeLimit),
 
-                 take(1), // take(1) then takes that first buffered array and completes the observable, meaning it will not allow any further emissions or buffering.
 
-                 tap(bufferedMessage => {
 
-                     if (bufferedMessage.length > bufferSizeLimit) {
 
-                         throw new Error('Amount exceeded')
 
-                     } else {
 
-                         // keep buffering
 
-                     }
 
-                 }),
 
-                 catchError(err => {
 
-                     console.error(err)
 
-                     // maybe emit an event for this as well
 
-                     return throwError(() => new Error('Rxjs Buffering STOP!'))
 
-                 })
 
-             ).subscribe({
 
-                 next: (bufferedMessage: BaseMessage[]) => {
 
-                     if (messageToBeLogged) {
 
-                         from(bufferedMessage).subscribe({
 
-                             next: (message: BaseMessage) => {
 
-                                 messageToBeLogged.next(message)
 
-                             },
 
-                             error: err => console.error(err),
 
-                             complete: () => {
 
-                                 // emit event to indicate storage logging is completed
 
-                             }
 
-                         })
 
-                     }
 
-                     observer.next(bufferedMessage)
 
-                 },
 
-                 error: err => {
 
-                     // console.error(err)
 
-                     observer.error(err)
 
-                 },
 
-                 complete: () => {
 
-                     observer.complete()
 
-                 }
 
-             })
 
-         }
 
-         if (bufferDuration && bufferSizeLimit) {
 
-             console.log(`rxjs Buffering with bufferDuration: ${bufferDuration} ms && bufferSizeLimit: ${bufferSizeLimit} items`);
 
-             let islimitExceeded = false
 
-             let bufferedArray: BaseMessage[] = []
 
-             // buffer duration limit.
 
-             timer(bufferDuration).subscribe(() => islimitExceeded = true)
 
-             // buffer size limit. 
 
-             sourceObs.pipe(
 
-                 bufferCount(bufferSizeLimit),
 
-                 take(1) // this take will close the subscription as well, preventing memory leak
 
-             ).subscribe(() => islimitExceeded = true)
 
-             let subscription: Subscription = sourceObs.subscribe(message => {
 
-                 if (!islimitExceeded) {
 
-                     bufferedArray.push(message)
 
-                 } else {
 
-                     if (messageToBeLogged) {
 
-                         from(bufferedArray).subscribe({
 
-                             next: (message: BaseMessage) => {
 
-                                 messageToBeLogged.next(message)
 
-                             },
 
-                             error: err => console.error(err),
 
-                             complete: () => {
 
-                                 subscription.unsubscribe()
 
-                                 // emit event to indicate storage logging is completed
 
-                             }
 
-                         })
 
-                     } else {
 
-                         console.log(`bufferArray exceeded. Removing all buffered messages`)
 
-                         // observer.next(bufferedArray)
 
-                         subscription.unsubscribe()
 
-                     }
 
-                 }
 
-             })
 
-         }
 
-     })
 
- }
 
 
  |