123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- import { buffer, bufferCount, bufferTime, bufferToggle, catchError, combineLatest, filter, finalize, from, map, mapTo, merge, mergeAll, Observable, of, startWith, Subject, Subscriber, Subscription, switchMap, take, takeUntil, tap, throwError, timer } from "rxjs";
- import { ClientNotificationState } from "../../interfaces/general.interface";
- import { BaseMessage } from "../../dependencies/logging/services/logging-service";
- export function rxjsBufferToggle(sourceObs: Subject<BaseMessage>, notificationObs: Subject<ClientNotificationState>): Observable<BaseMessage | BaseMessage[]> {
- let releaseTrigger = new Subject<any>()
- let alreadyDisconnected = false
- const messageStream: Observable<BaseMessage | BaseMessage[]> = notificationObs.pipe(
- map(notif => {
- if (notif.message == 'Disconnected' && !alreadyDisconnected) {
- alreadyDisconnected = true
- return true
- }
- // if (notif.message == 'Subscribed' || notif.status == 'ONLINE' || notif.message == 'Reconnected') {
- else if (notif.message == 'Subscribed') {
- return false
- } else {
- return null
- }
- }),
- switchMap(toBuffer => {
- if (toBuffer) {
- console.log(`Rxjs Buffering...`)
- return sourceObs.pipe(
- // tap(value => console.log(`rxjs buffering ${value?.header?.messageID ?? 'unknown'}`)),
- bufferToggle(
- // bufferTrigger.pipe(startWith(0)),
- // of(0),
- from([0]),
- () => releaseTrigger
- ));
- } else {
- console.log(`Rxjs Releasing...`)
- return sourceObs
- }
- })
- )
- // only for triggering when to stop buffering
- notificationObs.subscribe((notification: ClientNotificationState) => {
- let status: 'Online' | 'Offline-NotSubscribed' | 'Offline-Subscribed' | null = null
- if (notification.message == 'Reconnected') {
- status = 'Online'
- }
- if (notification.message == 'Disconnected' && status != `Offline-NotSubscribed`) {
- status = 'Offline-NotSubscribed'
- }
- // Technically speaking, we only ever need this, just for the release signal.
- if (notification.message == 'Subscribed' && status != 'Offline-Subscribed') {
- releaseTrigger.next('Release')
- status = 'Offline-Subscribed'
- }
- if (notification.message == 'Subscribed' && status != 'Offline-NotSubscribed') {
- console.log(`Retransmission already subscribed to buffer output. `)
- }
- })
- return messageStream
- }
- // https://rxjs.dev/api/index/function/buffer
- export function rxjsBufferDefault(sourceObs: Subject<BaseMessage>, releaseSignal: Observable<any>): Observable<BaseMessage[]> {
- return new Observable((observer: Subscriber<BaseMessage[]>) => {
- let bufferedMessages = sourceObs.pipe(buffer(releaseSignal));
- bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => {
- observer.next(bufferedMessages)
- });
- })
- }
- // https://rxjs.dev/api/index/function/bufferTime
- export function rxjsBufferTime(sourceObs: Subject<BaseMessage>, duration: number): Observable<BaseMessage[]> {
- return new Observable((observer: Subscriber<BaseMessage[]>) => {
- let bufferedMessages = sourceObs.pipe(bufferTime(duration))
- bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => {
- observer.next(bufferedMessages)
- })
- })
- }
- // https://rxjs.dev/api/index/function/bufferCount
- export function rxjsBufferCount(sourceObs: Subject<BaseMessage>, bufferlimit: number): Observable<BaseMessage[]> {
- return new Observable((observer: Subscriber<BaseMessage[]>) => {
- let bufferedMessages = sourceObs.pipe(bufferCount(bufferlimit))
- bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => {
- observer.next(bufferedMessages)
- })
- })
- }
- // Only the first one is working. The others like buffer duration and buffersize limit is still faulty. Please refrain from using these for now.
- export function rxjsBuffer(sourceObs: Subject<BaseMessage>, releaseSignal: Observable<any>, bufferDuration?: number, bufferSizeLimit?: number, messageToBeLogged?: Subject<BaseMessage>): Observable<BaseMessage[]> {
- return new Observable((observer: Subscriber<BaseMessage[]>) => {
- if (!bufferDuration && !bufferSizeLimit) {
- console.log(`Default Rxjs Buffering....`)
- sourceObs.pipe(buffer(releaseSignal)).subscribe((bufferedMessages: BaseMessage[]) => {
- observer.next(bufferedMessages)
- });
- }
- if (bufferDuration && !bufferSizeLimit) {
- console.log(`Rxjs Buffering with bufferDuration: ${bufferDuration}`)
- // Observable that emits after a certain period (e.g., 5 seconds)
- let stopBuffering = timer(bufferDuration)
- // let stopBuffering = () => timer(bufferDuration)
- let releaseBuffering = merge(stopBuffering, releaseSignal)
- sourceObs.pipe(
- bufferToggle(of(0), () => releaseBuffering)
- ).subscribe(((bufferedMessage: BaseMessage[]) => {
- console.log(`Buffering Duration Exceeded. Please don't send me any more data.`)
- // Should emit an event that duration limit has been exceeded
- if (messageToBeLogged) {
- bufferedMessage.forEach((message: BaseMessage) => {
- messageToBeLogged.next(message)
- })
- } else {
- observer.next(bufferedMessage)
- }
- }))
- }
- if (!bufferDuration && bufferSizeLimit) {
- console.log(`rxjs Buffering with buffer limit: ${bufferSizeLimit}`)
- sourceObs.pipe(
- bufferCount(bufferSizeLimit),
- take(1), // take(1) then takes that first buffered array and completes the observable, meaning it will not allow any further emissions or buffering.
- tap(bufferedMessage => {
- if (bufferedMessage.length > bufferSizeLimit) {
- throw new Error('Amount exceeded')
- } else {
- // keep buffering
- }
- }),
- catchError(err => {
- console.error(err)
- // maybe emit an event for this as well
- return throwError(() => new Error('Rxjs Buffering STOP!'))
- })
- ).subscribe({
- next: (bufferedMessage: BaseMessage[]) => {
- if (messageToBeLogged) {
- from(bufferedMessage).subscribe({
- next: (message: BaseMessage) => {
- messageToBeLogged.next(message)
- },
- error: err => console.error(err),
- complete: () => {
- // emit event to indicate storage logging is completed
- }
- })
- }
- observer.next(bufferedMessage)
- },
- error: err => {
- // console.error(err)
- observer.error(err)
- },
- complete: () => {
- observer.complete()
- }
- })
- }
- if (bufferDuration && bufferSizeLimit) {
- console.log(`rxjs Buffering with bufferDuration: ${bufferDuration} ms && bufferSizeLimit: ${bufferSizeLimit} items`);
- let islimitExceeded = false
- let bufferedArray: BaseMessage[] = []
- // buffer duration limit.
- timer(bufferDuration).subscribe(() => islimitExceeded = true)
- // buffer size limit.
- sourceObs.pipe(
- bufferCount(bufferSizeLimit),
- take(1) // this take will close the subscription as well, preventing memory leak
- ).subscribe(() => islimitExceeded = true)
- let subscription: Subscription = sourceObs.subscribe(message => {
- if (!islimitExceeded) {
- bufferedArray.push(message)
- } else {
- if (messageToBeLogged) {
- from(bufferedArray).subscribe({
- next: (message: BaseMessage) => {
- messageToBeLogged.next(message)
- },
- error: err => console.error(err),
- complete: () => {
- subscription.unsubscribe()
- // emit event to indicate storage logging is completed
- }
- })
- } else {
- console.log(`bufferArray exceeded. Removing all buffered messages`)
- // observer.next(bufferedArray)
- subscription.unsubscribe()
- }
- }
- })
- }
- })
- }
|