test.ts 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import { Subject, from } from 'rxjs';
  2. import { groupBy, mergeMap } from 'rxjs/operators';
  3. // Sample response data
  4. const responseData = [
  5. { who: 'user1', message: 'Hello from user1' },
  6. { who: 'user2', message: 'Hi there from user2' },
  7. { who: 'user1', message: 'Another message from user1' },
  8. { who: 'user3', message: 'Message from user3' },
  9. ];
  10. // Create a Subject to receive the response
  11. const responseSubject = new Subject();
  12. // Use groupBy to split the response into observables based on 'who'
  13. const groupedSubjects: Record<string, Subject<any>> = {};
  14. // Create Observables based on 'who' property
  15. const groupedObservables = responseData.map((message) => {
  16. if (!groupedSubjects[message.who]) {
  17. groupedSubjects[message.who] = new Subject();
  18. }
  19. return groupedSubjects[message.who].asObservable();
  20. });
  21. // Merge all Observables into a single stream
  22. from(groupedObservables)
  23. .pipe(
  24. mergeMap((obs) => obs)
  25. )
  26. .subscribe((message) => {
  27. console.log(`Received message from ${message.who}: ${message.message}`);
  28. });
  29. // Push data into the responseSubject
  30. responseData.forEach((data) => {
  31. responseSubject.next(data);
  32. });