test5.ts 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. /* --------------------- TEST 5 : Filtering -------------------------- */
  2. /* This test is specifically design for testing the audit with additional fitlers. When the primary source want to perform
  3. audit on the designated target, they will impose one or many condition, in that only the data that meets the criteria
  4. will be taken into consideratoin for auditng. */
  5. import { Observable, Subject, take } from "rxjs"
  6. import { MessageAuditorService } from "../services/message-auditor.service"
  7. import { MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype"
  8. import { LogSetting, MessageLog } from "../dependencies/log/type/datatype"
  9. import { _ } from 'lodash'
  10. import { StreamingService } from "./test-streamOBS"
  11. import { LoggingService } from "../dependencies/log/services/logging-service"
  12. import { ResponseMessage } from "../dependencies/log/dependencies/msgutil/interface/export"
  13. const auditService: MessageAuditorServiceInterface = new MessageAuditorService()
  14. const publisherloggingService: LoggingService = new LoggingService()
  15. const subscriberloggingService: LoggingService = new LoggingService()
  16. const stream = new StreamingService()
  17. let triggerSyncSubject: Subject<any> = new Subject()
  18. const publisher_take_four_messages: Observable<any> = stream.stream().pipe(take(4))
  19. const publisher: Subject<any> = new Subject()
  20. publisher_take_four_messages.subscribe({
  21. next: (data) => {
  22. publisher.next(data)
  23. }
  24. })
  25. const subscriber_take_two_messagse: Observable<any> = stream.stream().pipe(take(2))
  26. const subscriber: Subject<any> = new Subject()
  27. subscriber_take_two_messagse.subscribe({
  28. next: (data) => {
  29. subscriber.next(<ResponseMessage>data)
  30. }
  31. })
  32. let source: LogSetting = {
  33. cacheMessageLimit: 0,
  34. storage: "MongoDB",
  35. setting: {
  36. appName: 'Deafult from source',
  37. appLocName: 'To be generated in source',
  38. logLocName: 'To be generated in source',
  39. },
  40. customSetting: {
  41. url: 'mongodb+srv://testDB:h1nt1OyXw6QeUnzS@cluster0.29sklte.mongodb.net/test1'
  42. }
  43. }
  44. let target: LogSetting = {
  45. cacheMessageLimit: 0,
  46. storage: "MongoDB",
  47. setting: {
  48. appName: 'Default from target',
  49. appLocName: 'To be generated in target',
  50. logLocName: 'To be generated in target',
  51. },
  52. customSetting: {
  53. url: 'mongodb+srv://testDB:h1nt1OyXw6QeUnzS@cluster0.29sklte.mongodb.net/test2'
  54. }
  55. }
  56. // Combine source and target storage to form MessageSynchronisationServiceSetting. This is required in messageAudit initialization
  57. let settings: MessageSynchronisationServiceSetting = {
  58. incomingSource: {
  59. //all of the settings to be combined here
  60. ...source,
  61. tags: ['default'],
  62. },
  63. target: {
  64. ...target,
  65. tags: ['default'],
  66. },
  67. // Set Filters here, since it's part of the settings
  68. filters: {
  69. 'data.data.data.personCode': 'w002',
  70. 'header.messageProducerInformation.origin.userApplication.userAppId': 'Content Delivery Management Server'
  71. }
  72. }
  73. /* ------- Calling the functions to be tested ----------- */
  74. intializeLogging(source, target)
  75. initializeAuditService(settings)
  76. setTimeout(() => {
  77. performSync({ status: 1, message: "GO! GO! GO!" })
  78. }, 5000)
  79. // Basically start up all the functions and relevant subscription service in Audit Service.
  80. async function initializeAuditService(configuration: MessageSynchronisationServiceSetting) {
  81. auditService.init(configuration) // Configure two points of audit and also adding filter
  82. auditService.subscribe(triggerSyncSubject).subscribe((missingElements: MessageLog) => {
  83. let message = JSON.parse(missingElements.appData.msgPayload as any)
  84. subscriber.next(message)
  85. })
  86. }
  87. // Emit an args into the synchronization trigger stream to perform a sync
  88. async function performSync(args: any) {
  89. triggerSyncSubject.next(args)
  90. }
  91. // Set up logging point
  92. async function intializeLogging(source: LogSetting, target: LogSetting) {
  93. publisherloggingService.init(source).then(() => {
  94. publisherloggingService.subscribe(publisher)
  95. })
  96. subscriberloggingService.init(target).then(() => {
  97. subscriberloggingService.subscribe(subscriber)
  98. })
  99. }