Explorar el Código

integrate mongoDB data query

Enzo hace 1 año
padre
commit
eab501ed10
Se han modificado 6 ficheros con 224 adiciones y 22 borrados
  1. 2 1
      package.json
  2. 69 0
      services/dataprep.service.ts
  3. 11 20
      services/query.service.ts
  4. 1 1
      test/test1.ts
  5. 103 0
      test/test2.ts
  6. 38 0
      types/message.schema.ts

+ 2 - 1
package.json

@@ -8,7 +8,8 @@
     "build": "tsc -p tsconfig.json",
     "test": "echo \"Error: no test specified\" && exit 1",
     "start": "node index.js",
-    "test1": "node test/test1.js"
+    "test1": "node test/test1.js",
+    "test2": "node test/test2.js"
   },
   "repository": {
     "type": "git",

+ 69 - 0
services/dataprep.service.ts

@@ -0,0 +1,69 @@
+import mongoose, { Model, Mongoose } from "mongoose";
+import { Observable, Subject } from "rxjs";
+import * as fs from 'fs'
+import { Storage } from "./query.service";
+
+
+export class DataPrepService {
+
+    private MongooseConnection : mongoose.Connection
+    private connectionStatus = 0
+
+    // Data preparations: Purely Observables
+    public loadObsData(storage: Storage, dataFromStorage: Subject<any>) {
+        if (storage.type == `File`) {
+            this.streamFileData(storage, dataFromStorage)
+        } else {
+            this.streamMongoData(storage, dataFromStorage)
+        }
+    }
+
+    private streamMongoData(storage: Storage, subjectStream: Subject<any>) {
+        this.connectMongo(storage).then(() => {
+            let message: Model<any> = this.MongooseConnection.model('Message', require('../types/message.schema'))
+            let stream = message.find().cursor()
+    
+            stream.on('data', (data: any) => subjectStream.next(data));
+            stream.on('error', (error) => subjectStream.error(error));
+            stream.on('end', () => subjectStream.complete());
+        })
+    }
+
+    private streamFileData(storage: Storage, dataFromStorage: Subject<any>) {
+        let data = fs.readFileSync(storage.url, 'utf-8')
+        let dataJson = JSON.parse(data)
+        let count = 0
+        const intervalId = setInterval(() => {
+            dataFromStorage.next(dataJson[count]);
+            count++;
+            if (count >= 100) {
+                clearInterval(intervalId);
+                dataFromStorage.complete();
+            }
+        }, 250)
+
+    }
+
+    // Conect to designated storage destination
+    private async connectMongo(storage: Storage) {
+        return new Promise((resolve, reject) => {
+            try {
+                console.log(`Connecting to ${storage.url}`)
+                this.MongooseConnection = mongoose.createConnection(storage.url)
+                this.connectionStatus = 1
+                resolve(this.connectionStatus)
+            }
+            catch(error) {
+                this.connectionStatus = 0
+                console.error('An error occurred while connecting to the database:', error);
+                setTimeout(() => {
+                    this.connectMongo(storage).then(() => {
+                        resolve(this.connectionStatus)
+                    })
+                    console.log(`Reconnecting...`)
+                }, 3000);
+            }
+        })
+    }
+}
+

+ 11 - 20
services/query.service.ts

@@ -1,32 +1,24 @@
 import * as fs from 'fs'
 import { _, isObject, get } from 'lodash'
 import { Observable, Subject, interval, map, of } from 'rxjs'
+import { DataPrepService } from './dataprep.service'
 
 export class queryService {
 
+    private dataPrepService : DataPrepService
+    constructor(){
+        this.dataPrepService = new DataPrepService()
+    }
+
     public query(storageAddress: Storage, ...conditions: Conditions[]): Observable<any> {
         let dataFromStorage: Subject<any> = new Subject()
         let filteredResult: Subject<any> = new Subject()
-        this.loadObsData(storageAddress.address, dataFromStorage)
+        this.dataPrepService.loadObsData(storageAddress, dataFromStorage)
         this.filterFromObs(dataFromStorage, filteredResult, ...conditions)
         return filteredResult.pipe()
     }
 
-    // Data preparations: Purely Observables
-    private loadObsData(location: string, dataFromStorage: Subject<any>) {
-        //  Temporary version. More defined design will be implemented to cater for different storage locations
-        let data = fs.readFileSync(location, 'utf-8')
-        let dataJson = JSON.parse(data)
-        let count = 0
-        const intervalId = setInterval(() => {
-            dataFromStorage.next(dataJson[count]);
-            count++;
-            if (count >= 100) {
-                clearInterval(intervalId);
-                dataFromStorage.complete();
-            }
-        }, 250)
-    }
+    
 
     // Search and Filter: Pure Observables
     private filterFromObs(dataFromStorage: Subject<any>, filteredResult: Subject<any>, ...conditions: Conditions[]) {
@@ -35,7 +27,7 @@ export class queryService {
                 if (this.filterByKeyValue(element, ...conditions)) {
                     filteredResult.next(element)
                 } else {
-                    // console.log(`${element.header.messageName} does not match search criteria`)
+                    console.log(`${element.appData.msgId} does not match search criteria`)
                 }
             }
         })
@@ -171,7 +163,6 @@ export class queryService {
 export interface Conditions {
     $regex?: string,
     $dateRange?: DateRange,
-    $msgTag?: string[],
     [key: string]: string | Date | DateRange | string[]
 }
 export interface DateRange {
@@ -181,5 +172,5 @@ export interface DateRange {
 }
 export interface Storage {
     type: string,
-    address: string
-}
+    url: string
+}

+ 1 - 1
test/test1.ts

@@ -12,7 +12,7 @@ let query = new queryService()
 //For now just local file storage. More will be preapred in the design phase later.
 let storageAddress: Storage = {
     type: "File",
-    address: "payload.json"
+    url: "payload.json"
 }
 
 // Array inquiry: should return mutiple data

+ 103 - 0
test/test2.ts

@@ -0,0 +1,103 @@
+/* the key is to do it in one line. Client just pass 2 arguments, one is the location of the data, which could be file, sql or mongodb, and also
+pass in the conditions of their search enquiries. We will aslo have to cater to different file storage location to determine how to prep the
+data to be filtered
+ */
+import { Observable } from "rxjs"
+import { queryService } from "../services/query.service"
+import { Conditions, Storage } from "../services/query.service"
+import { _, isObject } from 'lodash'
+
+let query = new queryService()
+
+let mongoStorage : any = {
+    type: `MongoDB`,
+    url: `mongodb://192.168.100.59:27017/fromEnzo`
+}
+
+// Array inquiry: should return mutiple data
+let conditions1: Conditions[] = [
+    { 'msgTag': ['Incoming'] }
+]
+
+// Basic inquiry, but with multi search: should return one data
+let conditions2: Conditions[] = [
+    { "msgId": "4f710c4b-a258-4c7e-a4b6-6095bb7028e9" },
+    { "msgLogDateTime": "2023-01-14T21:50:19.917Z" },
+]
+
+// Value only argument! : should return one data
+let conditions3: Conditions[] = [
+    { "$regex": "cum incidunt maxime voluptatibus" }
+]
+
+// Date Range inquiry: Should return 1 data
+let conditions4: Conditions[] = [
+    {
+        "$dateRange": {
+            "startDate": "2022-04-29T00:00:00.000Z",
+            'endDate': "2022-04-30T00:00:00.000Z",
+            'column': "data.data.appData.msgDateTime"
+        }
+    },
+]
+
+// Multi conditions except for regex search: Should return at least 1 data
+let conditions5: Conditions[] = [
+    {
+        "$dateRange": {
+            "startDate": "2022-04-29T00:00:00.000Z",
+            'endDate': "2022-04-30T00:00:00.000Z",
+            'column': "data.data.appData.msgDateTime"
+        }
+    },
+    { 'msgTag': ['basic'] },
+    { "msgId": "4f710c4b-a258-4c7e-a4b6-6095bb7028e9" },
+    { "msgLogDateTime": "2023-01-14T21:50:19.917Z" }
+]
+
+// Ultimate search. With all conditions piling at once: Should at least returns 1 data
+let conditions6: Conditions[] = [
+    {
+        "$dateRange": {
+            "startDate": "2022-04-29T00:00:00.000Z",
+            'endDate': "2022-04-30T00:00:00.000Z",
+            'column': "data.data.appData.msgDateTime"
+        }
+    },
+    { "$regex": "maxime voluptatibus ad quasi eveniet" },
+    { 'msgTag': ['basic'] },
+    { "msgId": "4f710c4b-a258-4c7e-a4b6-6095bb7028e9" },
+]
+
+// should return 1 data
+let conditions7: Conditions[] = [
+    {
+        "$dateRange": {
+            "startDate": "2022-04-29T00:00:00.000Z",
+            'endDate': "2022-04-30T00:00:00.000Z",
+            'column': "data.data.appData.msgDateTime"
+        }
+    },
+    { "$regex": "maxime voluptatibus ad quasi eveniet" },
+    // { 'data.data.appData.msgTag': ['basic'] },
+    { "data.data.appData.msgId": "4f710c4b-a258-4c7e-a4b6-6095bb7028e9" },
+    { "msgLogDateTime": "2023-01-14T21:50:19.917Z" }
+]
+// should not return anything
+let conditions8: Conditions[] = [
+    {
+        "$dateRange": {
+            "startDate": "2022-04-29T00:00:00.000Z",
+            'endDate': "2022-04-30T00:00:00.000Z",
+            'column': "data.data.appData.msgDateTime"
+        }
+    },
+    { "$regex": "maxime voluptatibus ad quasi eveniet" },
+    { 'msgTag': ['basic'] },
+    { "data.data.appDatamsgId": "4f710c4b-a258-4c7e-a4b6-6095bb7028e9" },
+    { "header.msgLogDateTime": "2023-01-14T21:50:19.917Z" }
+]
+
+query.query(mongoStorage, ...conditions1).subscribe((element) => {
+    console.log(element.appData.msgId)
+})

+ 38 - 0
types/message.schema.ts

@@ -0,0 +1,38 @@
+import mongoose from 'mongoose';
+const { Schema } = mongoose;
+
+const appData = {
+    msgId: {
+        type: String,
+        required: true,
+    },
+    msgLogDateTime: {
+        type: Date,
+        required: true,
+        default: () => Date.now()
+    },
+    msgDateTime: {
+        type: Date,
+        required: true,
+        default: () => Date.now()
+    },
+    msgTag: [String],
+    msgPayload: {
+        type: String,
+        required: true
+    }
+}
+
+const appDataSchema = new mongoose.Schema(
+    appData
+)
+const messageSchema = new mongoose.Schema({
+    appLogLocId: {
+        type: String,
+        ref: `appLogLoc`,
+        required: true
+    },
+    appData: appData
+});
+
+module.exports = messageSchema