dataprep.service.ts 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import mongoose, { Model, Mongoose } from "mongoose";
  2. import { Observable, Subject } from "rxjs";
  3. import * as fs from 'fs'
  4. import { Storage, StorageLocation } from "./query.service";
  5. export class DataPrepService {
  6. private MongooseConnection : mongoose.Connection
  7. private connectionStatus = 0
  8. // Data preparations: Purely Observables
  9. public loadObsData(storage: Storage, dataFromStorage: Subject<any>) {
  10. if (storage.type == `File`) {
  11. this.streamFileData(storage, dataFromStorage)
  12. } else {
  13. if (storage.type != `observable`)
  14. {
  15. this.streamMongoData(storage as StorageLocation, dataFromStorage)
  16. }
  17. else{
  18. //....
  19. }
  20. }
  21. }
  22. private streamMongoData(storage: StorageLocation, subjectStream: Subject<any>) {
  23. this.connectMongo(storage).then(() => {
  24. let message: Model<any> = this.MongooseConnection.model('Message', require('../types/message.schema'))
  25. let stream = message.find().cursor()
  26. stream.on('data', (data: any) => subjectStream.next(data));
  27. stream.on('error', (error) => subjectStream.error(error));
  28. stream.on('end', () => subjectStream.complete());
  29. })
  30. }
  31. private streamFileData(storage: StorageLocation, dataFromStorage: Subject<any>) {
  32. let data = fs.readFileSync(storage.url, 'utf-8')
  33. let dataJson = JSON.parse(data)
  34. let count = 0
  35. const intervalId = setInterval(() => {
  36. dataFromStorage.next(dataJson[count]);
  37. count++;
  38. if (count >= 100) {
  39. clearInterval(intervalId);
  40. dataFromStorage.complete();
  41. }
  42. }, 250)
  43. }
  44. // Conect to designated storage destination
  45. private async connectMongo(storage: StorageLocation) {
  46. return new Promise((resolve, reject) => {
  47. try {
  48. console.log(`Connecting to ${storage.url}`)
  49. this.MongooseConnection = mongoose.createConnection(storage.url)
  50. this.connectionStatus = 1
  51. resolve(this.connectionStatus)
  52. }
  53. catch(error) {
  54. this.connectionStatus = 0
  55. console.error('An error occurred while connecting to the database:', error);
  56. setTimeout(() => {
  57. this.connectMongo(storage).then(() => {
  58. resolve(this.connectionStatus)
  59. })
  60. console.log(`Reconnecting...`)
  61. }, 3000);
  62. }
  63. })
  64. }
  65. }