test3.ts 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. // Test Observable //
  2. import mongoose, { Model } from "mongoose";
  3. import { Subject, from, map, of, interval } from "rxjs";
  4. const used = process.memoryUsage();
  5. let MongooseConnection: mongoose.Connection
  6. let connectionStatus = 0
  7. let mongoStorage: any = {
  8. type: `MongoDB`,
  9. url: `mongodb://192.168.100.59:27017/fromEnzo`
  10. }
  11. let testSubject: Subject<any> = new Subject()
  12. let count = 0
  13. let data: any[] = []
  14. // Connect to designated storage destination
  15. async function connectMongo(storage: Storage) {
  16. return new Promise((resolve, reject) => {
  17. try {
  18. console.log(`Connecting to ${storage.url}`)
  19. MongooseConnection = mongoose.createConnection(storage.url)
  20. connectionStatus = 1
  21. resolve(connectionStatus)
  22. }
  23. catch (error) {
  24. connectionStatus = 0
  25. console.error('An error occurred while connecting to the database:', error);
  26. setTimeout(() => {
  27. connectMongo(storage).then(() => {
  28. resolve(connectionStatus)
  29. })
  30. console.log(`Reconnecting...`)
  31. }, 3000);
  32. }
  33. })
  34. }
  35. // Acquire data from Mongo
  36. function streamMongoData(storage: Storage, subjectStream: Subject<any>) {
  37. connectMongo(storage).then(() => {
  38. let message: Model<any> = MongooseConnection.model('Message', require('../types/message.schema'))
  39. let stream = message.find().limit(10).cursor()
  40. stream.on('data', (data: any) => subjectStream.next(data));
  41. stream.on('error', (error) => subjectStream.error(error));
  42. stream.on('end', () => subjectStream.complete());
  43. })
  44. }
  45. /* --------------- TEST --------------- */
  46. // testSubject.subscribe({
  47. // next: (e) => {
  48. // setTimeout(() => {
  49. // count++
  50. // console.log(count + '. ' + e.appData.msgId) // No problem streaming all the elements from mongo streaming (2.9 milliion)
  51. // }, 10000) // But it doesn't wait 10 seconds
  52. // }
  53. // });
  54. function doThis(element: any) {
  55. const min = 1;
  56. const max = 10;
  57. const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
  58. setTimeout(() => {
  59. // console.log(`Doing random task for 3 seconds for ${element.appData.msgId}`)
  60. console.log(`Doing random task for ${randomInt} seconds for "${element}"`)
  61. }, randomInt * 1000)
  62. }
  63. async function doThat(element): Promise<any> {
  64. return new Promise((resolve, reject) => {
  65. const min = 1;
  66. const max = 10;
  67. const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
  68. setTimeout(() => {
  69. // console.log(`Processing ${element.appData.msgId} for 3 seconds`)
  70. console.log(`Processing "${element}" for ${randomInt} seconds`)
  71. resolve(element)
  72. }, randomInt * 1000)
  73. })
  74. }
  75. /* -------------- ACTION ------------------- */
  76. // streamMongoData(mongoStorage, testSubject)
  77. // testSubject.subscribe({
  78. // next(element) {
  79. // doThat(element).then((data) => {
  80. // doThis(data)
  81. // })
  82. // },
  83. // error(err) {
  84. // console.error('something wrong occurred: ' + err);
  85. // },
  86. // complete() {
  87. // console.log('done');
  88. // },
  89. // })
  90. /* --------- Just TESTING and understanding behaviour ---------- */
  91. /* Explanation: Producer streams at a constant rate of broadcasting 1 data per second. The first task of emitting the value will continue
  92. without interruptioon, where as the callbacks for doThis() and doThat() will be registered and be called.
  93. If the callback is asynchronous, it will register it to the event stack to be executed. And it doesn't have to wait for the previous
  94. tasks to complete before it can start it's own tasks.
  95. If the callback is synchronous, it will register the callbacks to the event stack to be executed. It doesn't wait for previous
  96. tasks to be completed before it can start it's own tasks. */
  97. let waitForOneSecond = interval(1000)
  98. waitForOneSecond.subscribe({
  99. next(element) {
  100. console.log(`Broadcasting: ${element}`)
  101. //Asynchrounous Code
  102. // doThat(element).then((data) => {
  103. // doThis(data)
  104. // })
  105. // Synchrounous
  106. doThis(element)
  107. }
  108. })
  109. /* Ignore this for now, just wanted to see how it behaves if the observable is created from arrays. The
  110. value publishing was too fast, I could derive meaningful comprehension from the data. Plese refer to the
  111. study case above. */
  112. // let obsArray = from([1, 2, 3, 4, 5])
  113. // obsArray.subscribe({
  114. // next(element) {
  115. // console.log(`Emmitting: ${element}`)
  116. // doThat(element).then((data) => {
  117. // doThis(data)
  118. // })
  119. // }
  120. // })
  121. // of(1, 2, 3, 4, 5).subscribe({
  122. // next(element) {
  123. // console.log(`Streaming: ${element}`)
  124. // doThat(element).then((data) => {
  125. // doThis(data)
  126. // })
  127. // }
  128. // })