瀏覽代碼

audit fixes and update

Enzo 1 年之前
父節點
當前提交
f63db81ef3

+ 1 - 1
dependencies/fisloggingservice

@@ -1 +1 @@
-Subproject commit 554c435043bb29595caa87aefe6015ce135a1cbf
+Subproject commit 37455fe1ef62f7ef1428f859f789f82d9e8df4a9

+ 36 - 0
documentations/auditmessage.plantuml

@@ -0,0 +1,36 @@
+@startuml MessageAuditor
+header Message MessageAuditor
+left to right direction
+
+package 1 {
+    [Publisher]
+    database "interface log location 1" as database1
+}
+package 2 {
+    database "interface log location 2" as database2
+    [Consumer]
+}
+[MessageAuditor]
+
+database1 <|-- Publisher #line:red;line.bold;text:Red : Publish into \ndesignated \nlog location 
+database2 <|-- Consumer #line:red;line.bold;text:Red : Publish into \ndesignated \nlog location
+Publisher --|> Consumer #green;line.dashed;text:green : Publish \nSubscribed \nData 
+Publisher <-- Consumer : Subscribe \nfor \ndata
+Error -> MessageAuditor
+
+
+database1 <--- MessageAuditor : Check and compare \nrelevant log location 
+MessageAuditor --> database2 : Check and compare \nrelevant log location 
+MessageAuditor --|> Publisher #blue;line.dotted;text:blue : Publish missing data
+
+
+@enduml
+'  Make changes to the line publish missing data
+
+/' Brief Summary :
+    When Error is emitted from relevant party, then the message auditor that
+    is subcribed to it will receive the message and perform the checking.
+    It will cross check against the two database to see what's missing, and
+    then will publish to the publisher for the missing data. The publisher 
+    will then include it in it's own stream.
+ '/

+ 1 - 1
interface/export.ts

@@ -1,6 +1,6 @@
 
 // All service classes to be exported will be set here.
 export * from "../services/acknowledgement.service"
-export * from "../services/synchronization.service"
+export * from "../services/message-auditor.service"
 export * from "../services/incomingMessage.service"
  

+ 3 - 0
services/acknowledgement.service.ts

@@ -7,6 +7,9 @@ import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/d
 import { Acknowledgemeent, AcknowledgementLogSetting } from "../type/acknowledgement.interface";
 
 
+/**
+ * @deprecated The acknowledgement will be covered by MessageAuditorService.
+ */
 export class AcknowledgementService implements Acknowledgemeent {
 
     private messageUtil: FisCreateMessageUtility = new FisCreateMessageUtility("FisAppID/Name")

+ 3 - 0
services/incomingMessage.service.ts

@@ -4,6 +4,9 @@ import { LoggingService } from "../dependencies/fisloggingservice/services/loggi
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { IncomingMessageServiceInterface } from "../type/datatype";
 
+/**
+ * @deprecated The logging is now supported by the Fis-Logging library.
+ */
 export class IncomingMessageService implements IncomingMessageServiceInterface {
 
     constructor(private logService?: LoggingService) {

+ 3 - 4
services/synchronization.service.ts → services/message-auditor.service.ts

@@ -1,11 +1,10 @@
 import { map, Observable, of, Subject } from "rxjs";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
-import { MessageLog } from "../dependencies/fisloggingservice/type/datatype";
-import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting, ErrorTrigger } from "../type/datatype";
-
-export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
+import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
+import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
 
+export class MessageAuditorService implements MessageAuditorServiceInterface {
     private settings: MessageSynchronisationServiceSetting
     private sourceSrc: LoggingService = new LoggingService()
     private targetSrc: LoggingService = new LoggingService()

+ 2 - 2
test/test1a.ts

@@ -1,13 +1,13 @@
 import { AcknowledgementService } from "../services/acknowledgement.service";
 import { StreamingService } from "./test-streamOBS";
-import { MessageSyncrhonizationService } from "../services/synchronization.service";
+import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { map, Observable } from "rxjs";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 const incoming = new IncomingMessageService()
 const acknowledge = new AcknowledgementService()
-const syncrhonize = new MessageSyncrhonizationService()
+const syncrhonize = new MessageAuditorService()
 const streamService = new StreamingService()
 
 /* --------------  TEST -------------------- */

+ 2 - 2
test/test1b.ts

@@ -1,13 +1,13 @@
 import { AcknowledgementService } from "../services/acknowledgement.service";
 import { StreamingService } from "./test-streamOBS";
-import { MessageSyncrhonizationService } from "../services/synchronization.service";
+import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { map, Observable } from "rxjs";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 const incoming = new IncomingMessageService()
 const acknowledge = new AcknowledgementService()
-const syncrhonize = new MessageSyncrhonizationService()
+const syncrhonize = new MessageAuditorService()
 const streamService = new StreamingService()
 
 /* --------------  TEST -------------------- */

+ 2 - 2
test/test2a.ts

@@ -1,12 +1,12 @@
 import { AcknowledgementService } from "../services/acknowledgement.service";
 import { StreamingService } from "./test-streamOBS";
-import { MessageSyncrhonizationService } from "../services/synchronization.service";
+import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { map, Observable, take } from "rxjs";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 const incoming = new IncomingMessageService()
-const syncrhonize = new MessageSyncrhonizationService()
+const syncrhonize = new MessageAuditorService()
 const acknowledge = new AcknowledgementService()
 const streamService = new StreamingService()
 

+ 2 - 2
test/test2b.ts

@@ -1,12 +1,12 @@
 import { AcknowledgementService } from "../services/acknowledgement.service";
 import { StreamingService } from "./test-streamOBS";
-import { MessageSyncrhonizationService } from "../services/synchronization.service";
+import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { map, Observable, take } from "rxjs";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 const incoming = new IncomingMessageService()
-const syncrhonize = new MessageSyncrhonizationService()
+const syncrhonize = new MessageAuditorService()
 const acknowledge = new AcknowledgementService()
 const streamService = new StreamingService()
 

+ 2 - 2
test/test3a.ts

@@ -7,7 +7,7 @@ import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { IncomingMessageService } from "../services/incomingMessage.service";
-import { MessageSyncrhonizationService } from "../services/synchronization.service";
+import { MessageAuditorService } from "../services/message-auditor.service";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
 
@@ -15,7 +15,7 @@ import { StreamingService } from "./test-streamOBS";
 const stream = new StreamingService()
 
 // Declare source Services && Observables (Using File Storage) Simulating Full Logs
-const source_synchronize = new MessageSyncrhonizationService()
+const source_synchronize = new MessageAuditorService()
 const source_payload: Observable<BaseMessage> = stream.stream().pipe(take(4))
 const source_incoming = new IncomingMessageService()
 const source_payload_subject: Subject<BaseMessage> = new Subject()

+ 2 - 2
test/test3b.ts

@@ -8,7 +8,7 @@ import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { AcknowledgementService } from "../services/acknowledgement.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
-import { MessageSyncrhonizationService } from "../services/synchronization.service";
+import { MessageAuditorService } from "../services/message-auditor.service";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
 
@@ -16,7 +16,7 @@ import { StreamingService } from "./test-streamOBS";
 const stream = new StreamingService()
 
 // Declare source Services && Observables (Using File Storage) Simulating Full Logs
-const source_synchronize = new MessageSyncrhonizationService()
+const source_synchronize = new MessageAuditorService()
 const source_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
 const source_incoming = new IncomingMessageService()
 const source_payload_subject: Subject<BaseMessage> = new Subject()

+ 28 - 40
test/test3c.ts

@@ -1,15 +1,15 @@
-/*  -----------------------       TEST3A {Mongo to Mongo}    -----------------------   */
+/*  -----------------------       TEST3C {Mongo to Mongo}    -----------------------   */
 /* This test is focusing on comparing 2 different arrays of message logs from 2 different storage. 
 Which is local file mongo as the control/source, and then comparing the data from cloud mongoDB
 server data, and then synchronizing them */
 
 import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
-import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
+import { BaseMessage, ResponseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
-import { IncomingMessageService } from "../services/incomingMessage.service";
-import { MessageSyncrhonizationService } from "../services/synchronization.service";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
+import { MessageAuditorService } from "../services/message-auditor.service";
+import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
 
 /* Pre - Defined Data && Settings */
 // This service will stream the messages from the local testRequest.json messages
@@ -20,25 +20,25 @@ const stream = new StreamingService()
 subscribers that are going to subsscribe to this source_payload. Please note that 
 source_payload will emite the messages stream from the instance of stream service
 and further feed them into the other Subject which is called source_payload_subject. */
-const source_synchronize = new MessageSyncrhonizationService()
-const source_incoming = new IncomingMessageService()
-const source_payload: Observable<BaseMessage> = stream.stream().pipe(take(4))
-const source_payload_subject: Subject<BaseMessage> = new Subject()
-source_payload.subscribe({
+const publisher_sync = new MessageAuditorService()
+const publisher_Log = new LoggingService()
+const publisher_take_four_messages: Observable<BaseMessage> = stream.stream().pipe(take(4))
+const publisher: Subject<BaseMessage> = new Subject()
+publisher_take_four_messages.subscribe({
     next: (data) => {
-        source_payload_subject.next(data)
+        publisher.next(data)
     }
 })
 
 /* Same thing as the above. The only difference is the we feed only 2 messages 
 to simulate streaming error. We want to see if it will sync the other 2 later 
 on. But generall the declarative structure is the same as the above. */
-const target_incoming = new IncomingMessageService()
-const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
-const target_payload_subject: Subject<BaseMessage> = new Subject()
-target_payload.subscribe({
+const subscriber_log = new LoggingService()
+const subscriber_take_two_messagse: Observable<BaseMessage> = stream.stream().pipe(take(2))
+const subscriber: Subject<BaseMessage> = new Subject()
+subscriber_take_two_messagse.subscribe({
     next: (data) => {
-        target_payload_subject.next(<BaseMessage>data)
+        subscriber.next(<ResponseMessage>data)
     },
     error: e => console.error(e),
     complete: () => { `Target Payload Completed` }
@@ -49,7 +49,7 @@ target_payload.subscribe({
 Hence here, is the block that definte the target and it's associated specifications.
 This will be the target and will receive the predefined set of data to be logged as 
 prepared earlier in the code above.s */
-let source_storage: LogSetting = {
+let publisher_storage: LogSetting = {
     cacheMessageLimit: 0,
     storage: "MongoDB",
     setting: {
@@ -63,19 +63,11 @@ let source_storage: LogSetting = {
     }
 }
 
-// I forgot what this is for lol...
-let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
-    storage: source_storage.storage,
-    setting: source_storage.setting,
-    customSetting: source_storage.customSetting,
-    incomingObservable: source_payload_subject
-}
-
 /* Same as above. Also declaring another designated database. But this one will be used
 as the target for synching. For such I purposely push only half the of the completed 
 dataset in order to test out the sync later. I am using my own cloud atlas mongo
 database on this. The address can always be changed. */
-let target_storage: LogSetting = {
+let subscriber_storage: LogSetting = {
     cacheMessageLimit: 0,
     storage: "MongoDB",
     setting: {
@@ -92,22 +84,15 @@ let target_storage: LogSetting = {
     }
 }
 
-let target_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
-    storage: target_storage.storage,
-    setting: target_storage.setting,
-    customSetting: target_storage.customSetting,
-    incomingObservable: target_payload_subject
-}
-
 // Combine source and target storage to form MessageSynchronisationServiceSetting
 let settings: MessageSynchronisationServiceSetting = {
     incomingSource: {
         //all of the settings to be combined here
-        ...source_storage,
+        ...publisher_storage,
         tags: ['Incoming']
     }, //LogSetting & {tags:string[] },   
     target: {
-        ...target_storage,
+        ...subscriber_storage,
         tags: ['Incoming']
     }  //LogSetting & {tags:string[] }  
 }
@@ -117,25 +102,28 @@ let settings: MessageSynchronisationServiceSetting = {
 // the settings of where to sync. Here the initial intialize data will first log the 
 // messages into the designated database as specified earlier.
 function initializeData() { // To store the data into the designated databases.
-    source_incoming.init(source_dataSet)
-    target_incoming.init(target_dataSet)
+    publisher_Log.init(publisher_storage).then(() => {
+        publisher_Log.subscribe(publisher)
+    })
+    subscriber_log.init(subscriber_storage).then(() => {
+        subscriber_log.subscribe(subscriber)
+    })
 }
 
 // Done by appoximately 5-8 Seconds
 initializeData() // Call the function to store the data into the designated databases.
-source_synchronize.init(settings)
+publisher_sync.init(settings)
 
 /* This is where the synchronization logic is called. The errorSubject will act as a trigger
 mechanism to execute the synchronization. */
-
 let errorSubject: Subject<ErrorTrigger> = new Subject()
 // Subscribe to errorSubject notification 
-let sync = source_synchronize.subscribe(errorSubject)
+let sync = publisher_sync.subscribe(errorSubject)
 sync.subscribe({
     next: (msgToBeSynched) => {
         console.log(`synching ... ${msgToBeSynched.header.messageID}`)
         // the missing data returned will be pushed (next(message)) into the target payload.
-        target_payload_subject.next(msgToBeSynched)
+        subscriber.next(msgToBeSynched)
     }
 })
 

+ 3 - 0
type/acknowledgement.interface.ts

@@ -3,6 +3,9 @@ import { ResponseMessage } from "../dependencies/fisappmessagejsutilty/dependenc
 import { BaseMessage } from "../dependencies/fisloggingservice/services/logging-service";
 import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
 
+/**
+ * @deprecated The acknowledgement will be covered by MessageAuditorService.
+ */
 export interface Acknowledgemeent {
     init(settings: LogSetting): void;
 

+ 1 - 1
type/datatype.ts

@@ -25,7 +25,7 @@ interface Tags {
     tags: string[]
 }
 // Acknowledgement Service Class
-export interface MessageSynchronisationServiceInterface {
+export interface MessageAuditorServiceInterface {
     // Set default setting
     init(settings: MessageSynchronisationServiceSetting): void;
     // Subscribe to trigger