|
@@ -1,7 +1,7 @@
|
|
|
import mongoose, { Model, Mongoose } from "mongoose";
|
|
|
import { Observable, Subject } from "rxjs";
|
|
|
import * as fs from 'fs'
|
|
|
-import { Storage } from "./query.service";
|
|
|
+import { Storage, StorageLocation } from "./query.service";
|
|
|
|
|
|
|
|
|
export class DataPrepService {
|
|
@@ -14,11 +14,17 @@ export class DataPrepService {
|
|
|
if (storage.type == `File`) {
|
|
|
this.streamFileData(storage, dataFromStorage)
|
|
|
} else {
|
|
|
- this.streamMongoData(storage, dataFromStorage)
|
|
|
+ if (storage.type != `observable`)
|
|
|
+ {
|
|
|
+ this.streamMongoData(storage as StorageLocation, dataFromStorage)
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ //....
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private streamMongoData(storage: Storage, subjectStream: Subject<any>) {
|
|
|
+ private streamMongoData(storage: StorageLocation, subjectStream: Subject<any>) {
|
|
|
this.connectMongo(storage).then(() => {
|
|
|
let message: Model<any> = this.MongooseConnection.model('Message', require('../types/message.schema'))
|
|
|
let stream = message.find().cursor()
|
|
@@ -29,7 +35,7 @@ export class DataPrepService {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- private streamFileData(storage: Storage, dataFromStorage: Subject<any>) {
|
|
|
+ private streamFileData(storage: StorageLocation, dataFromStorage: Subject<any>) {
|
|
|
let data = fs.readFileSync(storage.url, 'utf-8')
|
|
|
let dataJson = JSON.parse(data)
|
|
|
let count = 0
|
|
@@ -45,7 +51,7 @@ export class DataPrepService {
|
|
|
}
|
|
|
|
|
|
// Conect to designated storage destination
|
|
|
- private async connectMongo(storage: Storage) {
|
|
|
+ private async connectMongo(storage: StorageLocation) {
|
|
|
return new Promise((resolve, reject) => {
|
|
|
try {
|
|
|
console.log(`Connecting to ${storage.url}`)
|