123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- import { Subject, interval, Observable, buffer, switchMap, bufferToggle, map, scan, startWith, BehaviorSubject, tap } from "rxjs"
- import { prepareResponseMessages } from "../services/utility/prepareFISmessage"
- let toggle = interval(5000)
- const syncSource = interval(500) // sync
- let asyncSource = new Subject<any>() //async
- syncSource.subscribe(e => {
- asyncSource.next(e)
- })
- /* So, output is the final and syncSource is the input. So the first trigger should triggle buffer.
- so initiailly, source should push to output, when toggle is triggered, source should be unsubscribed,
- and buffered should be instantiated immediately to keep receiving the messages.
- When the next trigger comes in (reconnection), than connect source to output again, whilst aslo
- releasing buffered into output as well. */
- // assuming the initial state is online, so source will stream straight to output, but the first toggle will be offline
- /* How buffer works: It doesn't have a normal. It will subscribe to another observable that will act as a signal to release
- It's default state would be that it will always buffer, until the subscribed observable emits a signal to relase the buffer,
- and then it will continue to buffer. There's no way to adjust buffer to stream normally. */
- function bufferTest1() {
- const intervalEvents = interval(1000);
- const buffered = intervalEvents.pipe(buffer(toggle));
- buffered.subscribe(x => console.log(x));
- }
- // VERSION 2 <with the help of chatGPT>
- // Toggle between buffering and normal emitting
- function bufferTest2() {
- const toggleBuffer$ = toggle.pipe(
- // Track the toggle state: true means buffering, false means normal
- scan((isBuffering) => !isBuffering, false),
- // Use the state to switch between buffer mode and normal mode
- switchMap(isBuffering => {
- // check for notif messatge and mutate open and close and isbuffering
- // open.next(blablabla) or close.next(blablabla)
- if (isBuffering) {
- console.log(isBuffering)
- // Start buffering: open on toggle, close on next toggle
- return asyncSource.pipe(
- // bufferToggle(toggle, () => toggle)
- bufferToggle(toggle.pipe(startWith(0)), () => toggle))
- // bufferToggle(open, () => { if (true) { return closing } })
- } else {
- console.log(isBuffering)
- // Emit values normally
- return asyncSource;
- }
- })
- );
- // Subscribe to the toggled stream
- toggleBuffer$.subscribe(values => {
- console.log('Values:', values);
- });
- }
- // bufferTest2()
- // bufferTest1()
- let notificationSubject = new BehaviorSubject<string>('online') // true as in online false as in offline
- let keep = new Subject<any>()
- let release = new Subject<any>()
- // bufferTest3().subscribe(values => {
- // console.log(`Values: ${values}`)
- // })
- // emulateNotification(`offline`, 3000)
- // emulateNotification(`online`, 7000)
- // emulateNotification(`offline`, 10000)
- // emulateNotification(`online`, 15000)
- function bufferTest3(): Observable<any> {
- const messageStream = notificationSubject.pipe(
- map(notif => {
- if (notif == 'offline') {
- return true
- } else {
- return false
- }
- }),
- switchMap(notif => {
- if (notif) {
- // Start buffering: open on toggle, close on next toggle
- return asyncSource.pipe(
- bufferToggle(keep.pipe(startWith(0)), () => release))
- } else {
- // Emit values normally
- return asyncSource;
- }
- })
- )
- notificationSubject.subscribe(notif => {
- // logic here
- if (notif == 'online') {
- console.log(`received notification: ${notif}, releasing...`)
- release.next('release')
- }
- if (notif == 'offline') {
- console.log(`received notification: ${notif}, buffering...`)
- keep.next('keep')
- }
- })
- return messageStream
- }
- function emulateNotification(status: string, delay: number) {
- setTimeout(() => {
- notificationSubject.next(status)
- }, delay)
- }
- // working version
- function bufferTest4(): Observable<any> {
- // Track buffering state
- const bufferingState$ = notificationSubject.pipe(
- scan((isBuffering, notif) => notif === 'offline' ? true : false, false),
- tap(isBuffering => {
- console.log(`Buffering state changed: ${isBuffering}`);
- })
- );
- // Message stream based on buffering state
- const messageStream = bufferingState$.pipe(
- switchMap(isBuffering => {
- if (isBuffering) {
- // Start buffering: open on toggle, close on next toggle
- return asyncSource.pipe(
- // bufferToggle(toggle.pipe(startWith(0)), () => release)
- bufferToggle(keep.pipe(startWith(0)), () => release)
- );
- } else {
- // Emit values normally
- return asyncSource;
- }
- })
- );
- notificationSubject.subscribe(notif => {
- console.log(`Received notification: ${notif}`);
- if (notif === 'online') {
- release.next(true); // Release buffered values
- }
- if (notif === 'offline') {
- keep.next(true); // Start buffering
- }
- });
- return messageStream;
- }
- let messages = prepareResponseMessages(1000000, 1)
- function bufferTest5() {
- const clicks = interval(300000);
- const buffered = messages.pipe(buffer(clicks));
- let count = 0
- buffered.subscribe({
- next: x => {
- console.log(`${count++} && Buffer Length: ${x.length}`)
- },
- error: err => console.error(err),
- complete: () => { }
- })
- }
- bufferTest5()
|