|
@@ -1,45 +1,91 @@
|
|
|
/* ----------------------- TEST4 {Mongo to Mongo} ----------------------- */
|
|
|
-/* Same with test 3 but this one will be working with CDMS or any other potential data. */
|
|
|
+/* Same with test 3 but this one it will be working with CDMS or any other potential data.
|
|
|
+Highly advisable to refer to test3c for the overall explanation of the logic flow in these
|
|
|
+test cases. Test4 is an adjusted version of test3 to cater for the need to deal with
|
|
|
+different types of data aside from messageLogs. */
|
|
|
import * as mongoose from 'mongoose'
|
|
|
import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
|
|
|
import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
|
|
|
import { StreamingService } from "./test-streamOBS";
|
|
|
import { MessageAuditorService } from "../services/message-auditor.service";
|
|
|
-import { LoggingService } from "../dependencies/fisloggingservice/interface/export";
|
|
|
-import { BaseMessage, ResponseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
|
|
|
-import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
|
|
|
-
|
|
|
-/* Pre - Defined Data && Settings */
|
|
|
-// This service will stream the messages from the local testRequest.json messages
|
|
|
-// into the designated location that will be specified later.
|
|
|
-const stream = new StreamingService()
|
|
|
-
|
|
|
-/* Using the instance of the streaming declared earlier, we feed 4 messages into the
|
|
|
-subscribers that are going to subsscribe to this source_payload. Please note that
|
|
|
-source_payload will emite the messages stream from the instance of stream service
|
|
|
-and further feed them into the other Subject which is called source_payload_subject. */
|
|
|
+import { LoggingService } from '../dependencies/log/interface/export';
|
|
|
+import { BaseMessage } from '../dependencies/msgutil/interface/export';
|
|
|
+import { LogSetting, MessageLog } from '../dependencies/log/type/datatype';
|
|
|
+import * as fs from "fs"
|
|
|
+
|
|
|
+/* Convert all the non message data in the database into messageLog type. This is to ensure it's compatibility
|
|
|
+to be used by the interface from logging and audit message features. */
|
|
|
+const Schema = mongoose.Schema;
|
|
|
+// Create the fingerprint schema. This is the type of the data to be transformed into messageLog type
|
|
|
+const fingerPrintSchema = new Schema({
|
|
|
+ uuid: { type: String, required: true, lowercase: true, unique: true },
|
|
|
+ fileName: { type: String, required: true, lowercase: true },
|
|
|
+ fileType: { type: String, required: true, lowercase: true },
|
|
|
+ entityName: { type: String, required: true, lowercase: true },
|
|
|
+ fileData: { type: Object, required: true },
|
|
|
+});
|
|
|
+
|
|
|
+// Use existing schema.
|
|
|
+const messageSchema = require('../dependencies/log/type/schemas/message.schema')
|
|
|
+
|
|
|
+function convertDataInMongo(url: string) {
|
|
|
+ // Create a subject to stream data received from query at mongo
|
|
|
+ let data: Subject<any> = new Subject()
|
|
|
+ let convertService = new LoggingService()
|
|
|
+
|
|
|
+ let dbConnection = mongoose.createConnection(url)
|
|
|
+ let dataModel = dbConnection.model('genericdata', fingerPrintSchema)
|
|
|
+ let messages = dbConnection.model('message', messageSchema)
|
|
|
+
|
|
|
+ // Once the data is queried, it will be streamed into the data Subject declared earlier
|
|
|
+ dataModel.find().then((res) => {
|
|
|
+ // console.log(res)
|
|
|
+ res.forEach((element) => {
|
|
|
+ data.next(element)
|
|
|
+ })
|
|
|
+ })
|
|
|
+
|
|
|
+ // Assign a `handler` so to speak to handle the element receivd in the data Subject
|
|
|
+ // This is where the transformation happens. The logic is written on the logging service side.
|
|
|
+ // Once that is done, the transformed data will be saved again bacn in the mongo database in a different databse/collection
|
|
|
+ data.subscribe((element) => {
|
|
|
+ let res = convertService.convertCDMStoMessageLog(element, settings.incomingSource.tags)
|
|
|
+ console.log(`Converting fingerprint .... ${res.appData.msgId}`)
|
|
|
+ messages.create(res)
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+let dbConnection = mongoose.createConnection("mongodb+srv://testDB:h1nt1OyXw6QeUnzS@cluster0.29sklte.mongodb.net/secondary")
|
|
|
+let dataModel = dbConnection.model('genericdata', fingerPrintSchema)
|
|
|
+
|
|
|
+function convertMessageLogToCDMS(args: MessageLog){
|
|
|
+ let converted = secondary_log.convertMessageLogtoCDMS(args)
|
|
|
+ dataModel.create(converted)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+/* For basic explanation, pleas refer to test3c. Here we are just instantiating audit and logging service for both
|
|
|
+the primary and the secondary soures. And then the instantiation of the corresponding subjects.
|
|
|
+The idea is that the subject will receive the missing info provided by the auditor and then log the
|
|
|
+missing data in the designated database location.
|
|
|
+ */
|
|
|
const primary_sync = new MessageAuditorService()
|
|
|
const primary_Log = new LoggingService()
|
|
|
-const primary: Subject<BaseMessage> = new Subject()
|
|
|
-primary.subscribe((e) => {
|
|
|
- console.log(`Primary Received ${e.header.messageID}`)
|
|
|
+const primary: Subject<MessageLog> = new Subject()
|
|
|
+primary.subscribe((element) => {
|
|
|
+ console.log(`Primary Received ${element.appData.msgId}`)
|
|
|
})
|
|
|
|
|
|
-/* Same thing as the above. The only difference is the we feed only 2 messages
|
|
|
-to simulate streaming error. We want to see if it will sync the other 2 later
|
|
|
-on. But generall the declarative structure is the same as the above. */
|
|
|
const secondary_log = new LoggingService()
|
|
|
-const secondary: Subject<BaseMessage> = new Subject()
|
|
|
-secondary.subscribe((e) => {
|
|
|
- console.log(`Secondary Received ${e.header.messageID}`)
|
|
|
+const secondary: Subject<MessageLog> = new Subject()
|
|
|
+secondary.subscribe((element: MessageLog) => {
|
|
|
+ console.log(`Secondary Received ${element.appData.msgId}`)
|
|
|
+ convertMessageLogToCDMS(element)
|
|
|
})
|
|
|
|
|
|
|
|
|
-/* Declare the designated database. I am using windev's mongo storage to store the data.
|
|
|
-Hence here, is the block that definte the target and it's associated specifications.
|
|
|
-This will be the target and will receive the predefined set of data to be logged as
|
|
|
-prepared earlier in the code above.s */
|
|
|
-let publisher_storage: LogSetting = {
|
|
|
+/* For basic explanation, please refer to test3c. Declaration of the source and target location. */
|
|
|
+let primary_storage: LogSetting = {
|
|
|
cacheMessageLimit: 0,
|
|
|
storage: "MongoDB",
|
|
|
setting: {
|
|
@@ -53,11 +99,7 @@ let publisher_storage: LogSetting = {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/* Same as above. Also declaring another designated database. But this one will be used
|
|
|
-as the target for synching. For such I purposely push only half the of the completed
|
|
|
-dataset in order to test out the sync later. I am using my own cloud atlas mongo
|
|
|
-database on this. The address can always be changed. */
|
|
|
-let subscriber_storage: LogSetting = {
|
|
|
+let secondary_storage: LogSetting = {
|
|
|
cacheMessageLimit: 0,
|
|
|
storage: "MongoDB",
|
|
|
setting: {
|
|
@@ -74,34 +116,22 @@ let subscriber_storage: LogSetting = {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// Combine source and target storage to form MessageSynchronisationServiceSetting
|
|
|
+// Combine source and target storage to form MessageSynchronisationServiceSetting. This is required in messageAudit initialization
|
|
|
let settings: MessageSynchronisationServiceSetting = {
|
|
|
incomingSource: {
|
|
|
//all of the settings to be combined here
|
|
|
- ...publisher_storage,
|
|
|
- tags: ['Incoming']
|
|
|
+ ...primary_storage,
|
|
|
+ tags: ['Fingerprint']
|
|
|
}, //LogSetting & {tags:string[] },
|
|
|
target: {
|
|
|
- ...subscriber_storage,
|
|
|
- tags: ['Incoming']
|
|
|
+ ...secondary_storage,
|
|
|
+ tags: ['Fingerprint']
|
|
|
} //LogSetting & {tags:string[] }
|
|
|
}
|
|
|
+
|
|
|
|
|
|
/* -------- SYNCHRONIZATION --------- */
|
|
|
-// This is where the real test begin. THe process before this were merely just configuring
|
|
|
-// the settings of where to sync. Here the initial intialize data will first log the
|
|
|
-// messages into the designated database as specified earlier.
|
|
|
-function initializeData() { // To store the data into the designated databases.
|
|
|
- primary_Log.init(publisher_storage).then(() => {
|
|
|
- primary_Log.subscribe(primary) // Logging only occurs here
|
|
|
- })
|
|
|
- secondary_log.init(subscriber_storage).then(() => {
|
|
|
- secondary_log.subscribe(secondary) // Logging only occurs here
|
|
|
- })
|
|
|
-}
|
|
|
-
|
|
|
-// Done by appoximately 5-8 Seconds
|
|
|
-// initializeData() // Call the function to store the data into the designated databases.
|
|
|
+// Primary will call the syncrhonization service
|
|
|
primary_sync.init(settings)
|
|
|
|
|
|
/* This is where the synchronization logic is called. The errorSubject will act as a trigger
|
|
@@ -110,15 +140,13 @@ let errorSubject: Subject<ErrorTrigger> = new Subject()
|
|
|
// Subscribe to errorSubject notification
|
|
|
let sync = primary_sync.subscribe(errorSubject)
|
|
|
sync.subscribe({
|
|
|
- next: (msgToBeSynchronized) => {
|
|
|
- console.log(`passing missing message: ${msgToBeSynchronized.header.messageID} into target/secondary subject.`)
|
|
|
+ next: (msgToBeSynchronized: MessageLog) => {
|
|
|
+ console.log(`passing missing message: ${msgToBeSynchronized.appData.msgId} into target/secondary subject.`)
|
|
|
// the missing data returned will be pushed (next(message)) into the target payload.
|
|
|
secondary.next(msgToBeSynchronized)
|
|
|
}
|
|
|
})
|
|
|
|
|
|
-
|
|
|
-
|
|
|
// Set time oout for 5 seconds to allow the initial logging stage to complete it's logging
|
|
|
// implementation first before proceedint to trigger the sync
|
|
|
setTimeout(() => {
|
|
@@ -129,7 +157,7 @@ setTimeout(() => {
|
|
|
message: "NO. I dont want to work"
|
|
|
}
|
|
|
errorSubject.next(sampleError)
|
|
|
-}, 10000)
|
|
|
+}, 3000)
|
|
|
|
|
|
/* THis is testing for generating error message to be fed into the error subject
|
|
|
to act as additional trigger to exectute the synchronization when there's no internet
|
|
@@ -176,33 +204,11 @@ function countdown() {
|
|
|
|
|
|
countdown()
|
|
|
|
|
|
+// convertDataInMongo('mongodb://192.168.100.59:27017/primary')
|
|
|
+// convertDataInMongo('mongodb+srv://testDB:h1nt1OyXw6QeUnzS@cluster0.29sklte.mongodb.net/secondary')
|
|
|
|
|
|
-const Schema = mongoose.Schema;
|
|
|
-const fingerPrintSchema = new Schema({
|
|
|
- uuid: { type: String, required: true, lowercase: true, unique: true },
|
|
|
- fileName: { type: String, required: true, lowercase: true },
|
|
|
- fileType: { type: String, required: true, lowercase: true },
|
|
|
- entityName: { type: String, required: true, lowercase: true },
|
|
|
- fileData: { type: Object, required: true },
|
|
|
-});
|
|
|
-
|
|
|
-function convertDataInMongo(url: string) {
|
|
|
- let data: Subject<any> = new Subject()
|
|
|
- let convertService = new LoggingService()
|
|
|
-
|
|
|
- let dbConnection = mongoose.createConnection(url)
|
|
|
- let dataModel = dbConnection.model('genericdata', fingerPrintSchema)
|
|
|
- dataModel.find().then((res) => {
|
|
|
- // console.log(res)
|
|
|
- res.forEach((element) => {
|
|
|
- data.next(element)
|
|
|
- })
|
|
|
- })
|
|
|
-
|
|
|
- data.subscribe((element) => {
|
|
|
- convertService.
|
|
|
- })
|
|
|
-}
|
|
|
+// // Manually log the missing data given by audit
|
|
|
+secondary_log.init(settings.target).then(() => {
|
|
|
+ secondary_log.subscribe(secondary)
|
|
|
+})
|
|
|
|
|
|
-convertDataInMongo('mongodb+srv://testDB:h1nt1OyXw6QeUnzS@cluster0.29sklte.mongodb.net/test')
|
|
|
-convertDataInMongo('mongodb://192.168.100.59:27017/hq')
|