dataprep.service.ts 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import mongoose, { Model, Mongoose } from "mongoose";
  2. import { Observable, Subject } from "rxjs";
  3. import * as fs from 'fs'
  4. import { StorageLocation } from "../types/interface";
  5. export class DataPrepService {
  6. private MongooseConnection : mongoose.Connection
  7. private connectionStatus = 0
  8. // Data preparations: Purely Observables
  9. public loadObsData(storage: StorageLocation, dataFromStorage: Subject<any>) {
  10. if (storage.type == `File`) {
  11. this.streamFileData(storage, dataFromStorage)
  12. } else {
  13. this.streamMongoData(storage, dataFromStorage)
  14. }
  15. }
  16. private streamMongoData(storage: StorageLocation, subjectStream: Subject<any>) {
  17. this.connectMongo(storage).then(() => {
  18. let message: Model<any> = this.MongooseConnection.model('Message', require('../types/message.schema'))
  19. let stream = message.find().cursor()
  20. stream.on('data', (data: any) => subjectStream.next(data));
  21. stream.on('error', (error) => subjectStream.error(error));
  22. stream.on('end', () => subjectStream.complete());
  23. })
  24. }
  25. private streamFileData(storage: StorageLocation, dataFromStorage: Subject<any>) {
  26. let data = fs.readFileSync(storage.url, 'utf-8')
  27. let dataJson = JSON.parse(data)
  28. let count = 0
  29. const intervalId = setInterval(() => {
  30. dataFromStorage.next(dataJson[count]);
  31. count++;
  32. if (count >= 100) {
  33. clearInterval(intervalId);
  34. dataFromStorage.complete();
  35. }
  36. }, 100)
  37. }
  38. // Conect to designated storage destination
  39. private async connectMongo(storage: StorageLocation) {
  40. return new Promise((resolve, reject) => {
  41. try {
  42. console.log(`Connecting to ${storage.url}`)
  43. this.MongooseConnection = mongoose.createConnection(storage.url)
  44. this.connectionStatus = 1
  45. resolve(this.connectionStatus)
  46. }
  47. catch(error) {
  48. this.connectionStatus = 0
  49. console.error('An error occurred while connecting to the database:', error);
  50. setTimeout(() => {
  51. this.connectMongo(storage).then(() => {
  52. resolve(this.connectionStatus)
  53. })
  54. console.log(`Reconnecting...`)
  55. }, 3000);
  56. }
  57. })
  58. }
  59. }