123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- import { resolve } from "path";
- import { Observable, map, Subject, takeUntil, take } from "rxjs";
- import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
- import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
- import { AcknowledgementService } from "../services/acknowledgement.service";
- import { IncomingMessageService } from "../services/incomingMessage.service";
- import { MessageSyncrhonizationService } from "../services/synchronization.service";
- import { MessageSynchronisationServiceSetting } from "../type/datatype";
- import { StreamingService } from "./test-streamOBS";
- /* Pre - Defined Data && Settings */
- const stream = new StreamingService()
- // Declare source Services && Observables (Using File Storage) Simulating Full Logs
- const source_synchronize = new MessageSyncrhonizationService()
- const source_payload: Observable<BaseMessage> = stream.stream().pipe()
- const source_incoming = new IncomingMessageService()
- const source_payload_subject: Subject<BaseMessage> = new Subject()
- source_payload.subscribe({
- next: (data) => {
- source_payload_subject.next(data)
- // console.log(data)
- }
- })
- const source_payload_string = source_payload.pipe(
- map((data) => {
- return JSON.stringify(data);
- })
- )
- // Declare target Services && Observables (Using MongoDB Storage) Simulating Partial Logs
- const target_syncrhonize = new MessageSyncrhonizationService()
- const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
- const target_payload_subject: Subject<BaseMessage> = new Subject()
- const target_incoming = new IncomingMessageService()
- target_payload.subscribe({
- next: (data) => {
- target_payload_subject.next(<BaseMessage>data)
- }
- })
- const target_payload_string = target_payload.pipe(
- map((data) => {
- return JSON.stringify(data);
- }),
- )
- // testing to see if data is sent in
- target_payload_subject.subscribe({
- next: element => {
- console.log(`target_payload_subject emits : ${element.header.messageID}`)
- // Missing MessageLog Data is sent in
- }
- })
- // Declare Source Storage
- let source_storage: LogSetting = {
- storage: "File",
- setting: {
- appName: 'Default from client',
- appLocName: 'To be generated in client',
- logLocName: 'To be generated in client',
- }
- }
- let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
- storage: source_storage.storage,
- setting: source_storage.setting,
- customSetting: source_storage.customSetting,
- incomingObservable: source_payload_subject
- }
- //Declare Target Storage
- let target_storage: LogSetting = {
- storage: "MongoDB",
- setting: {
- appName: 'Default from client',
- appLocName: 'To be generated in client',
- logLocName: 'To be generated in client',
- },
- customSetting: {
- srv: true,
- user: "testDB",
- password: "h1nt1OyXw6QeUnzS",
- server: "cluster0.29sklte.mongodb.net",
- collection: "log",
- }
- }
- let target_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
- storage: target_storage.storage,
- setting: target_storage.setting,
- customSetting: target_storage.customSetting,
- incomingObservable: target_payload_subject
- }
- // Combine source and target storage to form MessageSynchronisationServiceSetting
- let settings: MessageSynchronisationServiceSetting = {
- incomingSource: {
- //all of the settings to be combined here
- ...source_storage,
- tags: ['Incoming']
- }, //LogSetting & {tags:string[] },
- target: {
- ...target_storage,
- tags: ['Incoming']
- } //LogSetting & {tags:string[] }
- }
- /* -------- SYNCHRONIZATION --------- */
- async function initializeData(): Promise<void> {
- source_incoming.init(source_dataSet)
- target_incoming.init(target_dataSet)
- }
- // initializeData().then(() => {
- // source_synchronize.init(settings)
- // }).then(() => {
- // let stream: Observable<BaseMessage> = new MessageSyncrhonizationService().subscribe(source_payload_string)
- // stream.subscribe({
- // next: (msgToBeSynced) => {
- // target_payload_subject.next(msgToBeSynced)
- // console.log(msgToBeSynced.header.messageID)
- // }
- // })
- // })
- /* Run this code to put some data into the database. 4 in File storage and 2 in Mongo */
- initializeData()
- /* Type 1 synchronization */
- /* Please note that this operation assumes that there's already existing data in the designated storage place. It still cannot perform real-time live streaming dynamically
- when there is a streaming occuring. */
- source_synchronize.init(settings).then(() => {
- source_synchronize.subscribe(source_payload_string).subscribe({
- next: (msgToBeSynchronized) => {
- target_payload_subject.next(msgToBeSynchronized)
- // console.log(`Synchronizing ${msgToBeSynchronized.header.messageID}`)
- }
- })
- })
|