import { Subject, from } from 'rxjs'; import { groupBy, mergeMap } from 'rxjs/operators'; // Sample response data const responseData = [ { who: 'user1', message: 'Hello from user1' }, { who: 'user2', message: 'Hi there from user2' }, { who: 'user1', message: 'Another message from user1' }, { who: 'user3', message: 'Message from user3' }, ]; // Create a Subject to receive the response const responseSubject = new Subject(); // Use groupBy to split the response into observables based on 'who' const groupedSubjects: Record> = {}; // Create Observables based on 'who' property const groupedObservables = responseData.map((message) => { if (!groupedSubjects[message.who]) { groupedSubjects[message.who] = new Subject(); } return groupedSubjects[message.who].asObservable(); }); // Merge all Observables into a single stream from(groupedObservables) .pipe( mergeMap((obs) => obs) ) .subscribe((message) => { console.log(`Received message from ${message.who}: ${message.message}`); }); // Push data into the responseSubject responseData.forEach((data) => { responseSubject.next(data); });