|
@@ -1,4 +1,3 @@
|
|
|
-import { resolve } from "path";
|
|
|
import { map, Observable, of, Subject } from "rxjs";
|
|
|
import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
|
|
|
import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
|
|
@@ -11,12 +10,17 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
|
|
|
private sourceSrc: LoggingService = new LoggingService()
|
|
|
private targetSrc: LoggingService = new LoggingService()
|
|
|
|
|
|
+ /* Set up the targets or points of synchronization. This is where it will register the 2 different location of
|
|
|
+ the data to be synchronized */
|
|
|
public init(settings: MessageSynchronisationServiceSetting): void {
|
|
|
this.settings = settings;
|
|
|
}
|
|
|
|
|
|
- /* This functions will subsribe to the designated error triggers. The error will trigger the need to
|
|
|
- sync, should the user or circumstances necessitates it. */
|
|
|
+ /* This is the main interface of the message sync service. The argument will take in an observable stream of
|
|
|
+ error notifications, prompting it to perform the subscription of the targeted sources and it's corresponding
|
|
|
+ target. Essentially, this does not in synchronize, but rather it checks against the two sources and compare
|
|
|
+ and return the missing data, which will then be passed into the targeted subject stream as specified by the
|
|
|
+ respective client. They can choose how they want to handle the missing messages returned. */
|
|
|
public subscribe(obsTrigger: Observable<ErrorTrigger>): Observable<BaseMessage> {
|
|
|
// Create a subject as a means to return the missing messages if there's any.
|
|
|
let msg: Subject<BaseMessage> = new Subject()
|
|
@@ -24,6 +28,8 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
|
|
|
// Subsribe to the errorTrigger obs to listen to any notification.
|
|
|
obsTrigger.subscribe({
|
|
|
next: obsTrigger => {
|
|
|
+ console.log(obsTrigger.message)// just checking the message
|
|
|
+
|
|
|
let missingMsg = this.dataConversion()
|
|
|
missingMsg.subscribe({
|
|
|
next: element => {
|
|
@@ -32,13 +38,15 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
|
|
|
})
|
|
|
}
|
|
|
})
|
|
|
- if (!obsTrigger) {
|
|
|
- this.dataConversion()
|
|
|
- }
|
|
|
+ // Not sure why this piece of code is here. It generally doesn't affect the function
|
|
|
+ // if (!obsTrigger) {
|
|
|
+ // this.dataConversion()
|
|
|
+ // }
|
|
|
let result: Observable<BaseMessage> = msg.asObservable()
|
|
|
return result
|
|
|
}
|
|
|
|
|
|
+ // Need to change the data to json format first
|
|
|
private dataConversion(): Observable<BaseMessage> {
|
|
|
// let subjectOutput = this.syncrhonize()
|
|
|
let obsOutput: Observable<BaseMessage> = this.synchronize().pipe(
|
|
@@ -51,15 +59,22 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
|
|
|
}
|
|
|
|
|
|
|
|
|
- // Returns all the missing data to be synchronized in the observables later
|
|
|
+ /* This is where the 'synching' operation takes place. */
|
|
|
private synchronize(): Subject<any> {
|
|
|
let subjectOutput = new Subject()
|
|
|
+ // Acquire the data from both location and return them as an array respectively.
|
|
|
this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
|
|
|
+ // Check for length first. If the length matches, then there's no need to sync
|
|
|
+ // since there's nothing missing.
|
|
|
if (data.arr1.length === data.arr2.length) {
|
|
|
console.log(`No synchronization needed`)
|
|
|
} else {
|
|
|
+ // In the case where there are differences in the array lengthh, then extensive comparison
|
|
|
+ // will be carried to filter out the differences. Differences are the missing data.
|
|
|
this.checkArrayDifferences(data).then((data: MessageLog[]) => {
|
|
|
data.forEach(msgElement => {
|
|
|
+ // Once the missing data has been weeded out, it is then passed into the Subject
|
|
|
+ // to be returned for the subscribe method.
|
|
|
subjectOutput.next(msgElement)
|
|
|
})
|
|
|
})
|
|
@@ -68,9 +83,10 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
|
|
|
return subjectOutput
|
|
|
}
|
|
|
|
|
|
- // Acquires the available data from designated target and source storage
|
|
|
+ /* This is where the targeted data is queried. The process is pretty straightforward. */
|
|
|
private async acquireData(): Promise<any> {
|
|
|
const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
|
|
|
+ // declare what to expect.
|
|
|
let allSets: {
|
|
|
arr1: MessageLog[],
|
|
|
arr2: MessageLog[]
|
|
@@ -81,8 +97,10 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
|
|
|
let set1
|
|
|
let set2
|
|
|
|
|
|
+ // Initiate the source to find the location of the targeted data to be synched.
|
|
|
this.sourceSrc.init(this.settings.incomingSource).then(() => {
|
|
|
this.targetSrc.init(this.settings.target).then(() => {
|
|
|
+ // Filter also carries out the query aspect of the operation, allowing it to acquire all the relevant data.
|
|
|
this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
|
|
|
set1 = data
|
|
|
}).then(() => {
|
|
@@ -104,6 +122,9 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
|
|
|
return new Promise((resolve, reject) => {
|
|
|
let missingMsg: MessageLog[] = []
|
|
|
args.arr1.forEach((msgElement: MessageLog) => {
|
|
|
+ // In this case, we are just checking if the msgId matches within the given the array.
|
|
|
+ // Just to save time, there's no need to check the entire message structure unless
|
|
|
+ // the circumstances necessitates it.
|
|
|
if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) {
|
|
|
console.log(`Item Found!`)
|
|
|
} else {
|