| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 | import { fa } from "@faker-js/faker"import { Subject, interval, Observable, buffer, switchMap, bufferToggle, map, scan, startWith, BehaviorSubject, tap } from "rxjs"import { prepareResponseMessages } from "../services/utility/prepareFISmessage"let toggle = interval(5000)const syncSource = interval(500) // synclet asyncSource = new Subject<any>() //asyncsyncSource.subscribe(e => {    asyncSource.next(e)})/* So, output is the final and syncSource is the input. So the first trigger should triggle buffer.so initiailly, source should push to output, when toggle is triggered, source should be unsubscribed,and buffered should be instantiated immediately to keep receiving the messages. When the next trigger comes in (reconnection), than connect source to output again, whilst asloreleasing buffered into output as well. */// assuming the initial state is online, so source will stream straight to output, but the first toggle will be offline/* How buffer works: It doesn't have a normal. It will subscribe to another observable that will act as a signal to releaseIt's default state would be that it will always buffer, until the subscribed observable emits a signal to relase the buffer,and then it will continue to buffer. There's no way to adjust buffer to stream normally. */function bufferTest1() {    const intervalEvents = interval(1000);    const buffered = intervalEvents.pipe(buffer(toggle));    buffered.subscribe(x => console.log(x));}// VERSION 2 <with the help of chatGPT>// Toggle between buffering and normal emittingfunction bufferTest2() {    const toggleBuffer$ = toggle.pipe(        // Track the toggle state: true means buffering, false means normal        scan((isBuffering) => !isBuffering, false),        // Use the state to switch between buffer mode and normal mode        switchMap(isBuffering => {            // check for notif messatge and mutate open and close and isbuffering            // open.next(blablabla) or close.next(blablabla)            if (isBuffering) {                console.log(isBuffering)                // Start buffering: open on toggle, close on next toggle                return asyncSource.pipe(                    // bufferToggle(toggle, () => toggle)                    bufferToggle(toggle.pipe(startWith(0)), () => toggle))                // bufferToggle(open, () => { if (true) { return closing } })            } else {                console.log(isBuffering)                // Emit values normally                return asyncSource;            }        })    );    // Subscribe to the toggled stream    toggleBuffer$.subscribe(values => {        console.log('Values:', values);    });}// bufferTest2()// bufferTest1()let notificationSubject = new BehaviorSubject<string>('online') // true as in online false as in offlinelet keep = new Subject<any>()let release = new Subject<any>()// bufferTest3().subscribe(values => {//     console.log(`Values: ${values}`)// })// emulateNotification(`offline`, 3000)// emulateNotification(`online`, 7000)// emulateNotification(`offline`, 10000)// emulateNotification(`online`, 15000)function bufferTest3(): Observable<any> {    const messageStream = notificationSubject.pipe(        map(notif => {            if (notif == 'offline') {                return true            } else {                return false            }        }),        switchMap(notif => {            if (notif) {                // Start buffering: open on toggle, close on next toggle                return asyncSource.pipe(                    bufferToggle(keep.pipe(startWith(0)), () => release))            } else {                // Emit values normally                return asyncSource;            }        })    )    notificationSubject.subscribe(notif => {        // logic here        if (notif == 'online') {            console.log(`received notification: ${notif}, releasing...`)            release.next('release')        }        if (notif == 'offline') {            console.log(`received notification: ${notif}, buffering...`)            keep.next('keep')        }    })    return messageStream}function emulateNotification(status: string, delay: number) {    setTimeout(() => {        notificationSubject.next(status)    }, delay)}// working versionfunction bufferTest4(): Observable<any> {    // Track buffering state    const bufferingState$ = notificationSubject.pipe(        scan((isBuffering, notif) => notif === 'offline' ? true : false, false),        tap(isBuffering => {            console.log(`Buffering state changed: ${isBuffering}`);        })    );    // Message stream based on buffering state    const messageStream = bufferingState$.pipe(        switchMap(isBuffering => {            if (isBuffering) {                // Start buffering: open on toggle, close on next toggle                return asyncSource.pipe(                    // bufferToggle(toggle.pipe(startWith(0)), () => release)                    bufferToggle(keep.pipe(startWith(0)), () => release)                );            } else {                // Emit values normally                return asyncSource;            }        })    );    notificationSubject.subscribe(notif => {        console.log(`Received notification: ${notif}`);        if (notif === 'online') {            release.next(true);  // Release buffered values        }        if (notif === 'offline') {            keep.next(true);  // Start buffering        }    });    return messageStream;}let messages = prepareResponseMessages(1000000, 1)function bufferTest5() {    const clicks = interval(300000);    const buffered = messages.pipe(buffer(clicks));    let count = 0    buffered.subscribe({        next: x => {            console.log(`${count++} && Buffer Length: ${x.length}`)        },        error: err => console.error(err),        complete: () => { }    })}bufferTest5()
 |