import { buffer, bufferCount, bufferTime, bufferToggle, catchError, combineLatest, filter, finalize, from, map, mapTo, merge, mergeAll, Observable, of, startWith, Subject, Subscriber, Subscription, switchMap, take, takeUntil, tap, throwError, timer } from "rxjs"; import { ClientNotificationState } from "../../interfaces/general.interface"; import { BaseMessage } from "../../dependencies/logging/services/logging-service"; export function rxjsBufferToggle(sourceObs: Subject, notificationObs: Subject): Observable { let releaseTrigger = new Subject() let alreadyDisconnected = false const messageStream: Observable = notificationObs.pipe( map(notif => { if (notif.message == 'Disconnected' && !alreadyDisconnected) { alreadyDisconnected = true return true } // if (notif.message == 'Subscribed' || notif.status == 'ONLINE' || notif.message == 'Reconnected') { else if (notif.message == 'Subscribed') { return false } else { return null } }), switchMap(toBuffer => { if (toBuffer) { console.log(`Rxjs Buffering...`) return sourceObs.pipe( // tap(value => console.log(`rxjs buffering ${value?.header?.messageID ?? 'unknown'}`)), bufferToggle( // bufferTrigger.pipe(startWith(0)), // of(0), from([0]), () => releaseTrigger )); } else { console.log(`Rxjs Releasing...`) return sourceObs } }) ) // only for triggering when to stop buffering notificationObs.subscribe((notification: ClientNotificationState) => { let status: 'Online' | 'Offline-NotSubscribed' | 'Offline-Subscribed' | null = null if (notification.message == 'Reconnected') { status = 'Online' } if (notification.message == 'Disconnected' && status != `Offline-NotSubscribed`) { status = 'Offline-NotSubscribed' } // Technically speaking, we only ever need this, just for the release signal. if (notification.message == 'Subscribed' && status != 'Offline-Subscribed') { releaseTrigger.next('Release') status = 'Offline-Subscribed' } if (notification.message == 'Subscribed' && status != 'Offline-NotSubscribed') { console.log(`Retransmission already subscribed to buffer output. `) } }) return messageStream } // https://rxjs.dev/api/index/function/buffer export function rxjsBufferDefault(sourceObs: Subject, releaseSignal: Observable): Observable { return new Observable((observer: Subscriber) => { let bufferedMessages = sourceObs.pipe(buffer(releaseSignal)); bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => { observer.next(bufferedMessages) }); }) } // https://rxjs.dev/api/index/function/bufferTime export function rxjsBufferTime(sourceObs: Subject, duration: number): Observable { return new Observable((observer: Subscriber) => { let bufferedMessages = sourceObs.pipe(bufferTime(duration)) bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => { observer.next(bufferedMessages) }) }) } // https://rxjs.dev/api/index/function/bufferCount export function rxjsBufferCount(sourceObs: Subject, bufferlimit: number): Observable { return new Observable((observer: Subscriber) => { let bufferedMessages = sourceObs.pipe(bufferCount(bufferlimit)) bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => { observer.next(bufferedMessages) }) }) } // Only the first one is working. The others like buffer duration and buffersize limit is still faulty. Please refrain from using these for now. export function rxjsBuffer(sourceObs: Subject, releaseSignal: Observable, bufferDuration?: number, bufferSizeLimit?: number, messageToBeLogged?: Subject): Observable { return new Observable((observer: Subscriber) => { if (!bufferDuration && !bufferSizeLimit) { console.log(`Default Rxjs Buffering....`) sourceObs.pipe(buffer(releaseSignal)).subscribe((bufferedMessages: BaseMessage[]) => { observer.next(bufferedMessages) }); } if (bufferDuration && !bufferSizeLimit) { console.log(`Rxjs Buffering with bufferDuration: ${bufferDuration}`) // Observable that emits after a certain period (e.g., 5 seconds) let stopBuffering = timer(bufferDuration) // let stopBuffering = () => timer(bufferDuration) let releaseBuffering = merge(stopBuffering, releaseSignal) sourceObs.pipe( bufferToggle(of(0), () => releaseBuffering) ).subscribe(((bufferedMessage: BaseMessage[]) => { console.log(`Buffering Duration Exceeded. Please don't send me any more data.`) // Should emit an event that duration limit has been exceeded if (messageToBeLogged) { bufferedMessage.forEach((message: BaseMessage) => { messageToBeLogged.next(message) }) } else { observer.next(bufferedMessage) } })) } if (!bufferDuration && bufferSizeLimit) { console.log(`rxjs Buffering with buffer limit: ${bufferSizeLimit}`) sourceObs.pipe( bufferCount(bufferSizeLimit), take(1), // take(1) then takes that first buffered array and completes the observable, meaning it will not allow any further emissions or buffering. tap(bufferedMessage => { if (bufferedMessage.length > bufferSizeLimit) { throw new Error('Amount exceeded') } else { // keep buffering } }), catchError(err => { console.error(err) // maybe emit an event for this as well return throwError(() => new Error('Rxjs Buffering STOP!')) }) ).subscribe({ next: (bufferedMessage: BaseMessage[]) => { if (messageToBeLogged) { from(bufferedMessage).subscribe({ next: (message: BaseMessage) => { messageToBeLogged.next(message) }, error: err => console.error(err), complete: () => { // emit event to indicate storage logging is completed } }) } observer.next(bufferedMessage) }, error: err => { // console.error(err) observer.error(err) }, complete: () => { observer.complete() } }) } if (bufferDuration && bufferSizeLimit) { console.log(`rxjs Buffering with bufferDuration: ${bufferDuration} ms && bufferSizeLimit: ${bufferSizeLimit} items`); let islimitExceeded = false let bufferedArray: BaseMessage[] = [] // buffer duration limit. timer(bufferDuration).subscribe(() => islimitExceeded = true) // buffer size limit. sourceObs.pipe( bufferCount(bufferSizeLimit), take(1) // this take will close the subscription as well, preventing memory leak ).subscribe(() => islimitExceeded = true) let subscription: Subscription = sourceObs.subscribe(message => { if (!islimitExceeded) { bufferedArray.push(message) } else { if (messageToBeLogged) { from(bufferedArray).subscribe({ next: (message: BaseMessage) => { messageToBeLogged.next(message) }, error: err => console.error(err), complete: () => { subscription.unsubscribe() // emit event to indicate storage logging is completed } }) } else { console.log(`bufferArray exceeded. Removing all buffered messages`) // observer.next(bufferedArray) subscription.unsubscribe() } } }) } }) }