fis.retransmission.service.ts 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. import mongoose, { Model, Schema } from 'mongoose';
  2. import { BehaviorSubject, Observable, Subject, Subscription, from } from 'rxjs'
  3. import { ColorCode, Message, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
  4. import { resolve } from 'path';
  5. require('dotenv').config();
  6. // Implement status chain refactoring
  7. export class FisRetransmissionService {
  8. private mongoUrl: string = process.env.MONGO as string
  9. private bufferedStorage: Message[] = []
  10. private mongoConnection: any
  11. private messageModel: Model<any> | null | undefined
  12. private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // please configure at environment
  13. constructor(private databaseName: string, private statusReport: BehaviorSubject<ReportStatus>) {
  14. // Connect to mongoDB.
  15. this.manageMongoConnection(databaseName)
  16. }
  17. // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator
  18. public handleMessage(applicationOutgoingMessage: Subject<Message>): Subject<Message> {
  19. let releaseMessageSubject: Subject<Message> = new Subject() // Every message subscribed from applicationOutgoingMessage will be released through this subject
  20. let messageReleaseSubscription: Subscription | null = null
  21. let messageBufferSubscription: Subscription | null = null
  22. let messageStreamToMongo: Subscription | null = null
  23. this.checkBufferLimit(applicationOutgoingMessage, this.statusReport)
  24. this.statusReport.subscribe((report: ReportStatus) => {
  25. /* Green should release all data from buffer and mongo and also redirect the applicationOutgoingMessage back into the return subject(releaseMessageSubject)
  26. if there's any. */
  27. if (report.code == ColorCode.GREEN) {
  28. let status: Status = 1
  29. // console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  30. /* Status Chain begins */
  31. if (status === 1) {
  32. messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo)
  33. if (messageStreamToMongo) status = 0
  34. }
  35. if (status === 1) {
  36. messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
  37. if (messageBufferSubscription) status = 0
  38. }
  39. if (status === 1) {
  40. messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, applicationOutgoingMessage, releaseMessageSubject)
  41. if (!messageReleaseSubscription) status = 0
  42. }
  43. if (status === 1) {
  44. this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<Message>) => {
  45. resObs.subscribe({
  46. next: message => releaseMessageSubject.next(message),
  47. error: err => console.error(err),
  48. complete: () => {
  49. this.bufferedStorage = []
  50. console.log(`Reset buffer Storage count: ${this.bufferedStorage.length}. All messages have been released back into the stream.`)
  51. }
  52. })
  53. }).catch((err) => {
  54. status = 0
  55. console.error(err)
  56. })
  57. }
  58. if (status === 1) {
  59. this.releaseMessageFromMongoStorage().then((resObs: Subject<Message>) => {
  60. resObs.subscribe({
  61. next: message => releaseMessageSubject.next(message),
  62. error: err => console.error(err),
  63. complete: () => console.log(`All Mongo data are transferred `)
  64. })
  65. }).catch((err) => {
  66. status = 0
  67. console.error(err)
  68. })
  69. }
  70. if (status === 0) {
  71. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  72. }
  73. }
  74. /* Start buffering the messages coming in from applicationOutgonigMessages and also stop it from flowing into the release subject */
  75. if (report.code == ColorCode.YELLOW) {
  76. if (report.payload) {
  77. console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)
  78. this.bufferedStorage.push(report.payload)
  79. }
  80. // console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  81. let status: Status = 1
  82. /* Status Chain begins */
  83. if (status === 1) {
  84. messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, applicationOutgoingMessage)
  85. if (!messageBufferSubscription) status = 0
  86. }
  87. if (status === 1) {
  88. messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
  89. if (messageReleaseSubscription) status = 0
  90. }
  91. if (status === 0) {
  92. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  93. }
  94. }
  95. /* Stop buffering the message in local instance, but start saving them in database. Must first transfer the ones in local buffer before redirecting the
  96. flow from applicationOutgoingMessage into Mongo */
  97. if (report.code == ColorCode.RED) {
  98. // console.log(`Connection status report: ${report.message}`)
  99. if (report.payload) {
  100. console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into storage...`)
  101. this.saveToMongo(report.payload)
  102. }
  103. console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  104. let status: Status = 1
  105. if (status === 1) {
  106. messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, applicationOutgoingMessage)
  107. if (!messageStreamToMongo) status = 0
  108. }
  109. if (status === 1) {
  110. messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
  111. if (messageReleaseSubscription) status = 0
  112. }
  113. if (status === 1) {
  114. messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
  115. if (messageBufferSubscription) status = 0
  116. }
  117. if (status === 1) {
  118. this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => {
  119. if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1 // this promise function should return an empty array
  120. })
  121. }
  122. if (status === 0) {
  123. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  124. }
  125. }
  126. if (!report.code) {
  127. console.log(`Unknown message...`)
  128. }
  129. })
  130. return releaseMessageSubject
  131. }
  132. private checkIfMongoBufferHasData(): Promise<boolean> {
  133. return new Promise(async (resolve, reject) => {
  134. if (this.messageModel) {
  135. try {
  136. const count = await this.messageModel.countDocuments();
  137. resolve(count > 0)
  138. }
  139. catch (error) {
  140. reject(error)
  141. }
  142. }
  143. })
  144. }
  145. // IF Buffer exceeds a certain limit, it will trigger RED. Configure in .env file. There's the concern of 2 RED status, one from this and another from other means.
  146. // Behaviour of this needs to be investigated further
  147. private checkBufferLimit(message: Subject<Message>, statusReport: Subject<ReportStatus>) {
  148. let status: Status = 1
  149. if (status = 1) {
  150. message.subscribe(() => {
  151. if (this.bufferedStorage.length >= this.maximumBufferLength) {
  152. // for every messges that comes in, check the bufffer size, if it exceesd more than designated amount, push a red report status i
  153. console.log(`Buffer length exceeds limit imposed!!!`)
  154. let report: ReportStatus = {
  155. code: ColorCode.RED,
  156. message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
  157. }
  158. statusReport.next(report)
  159. }
  160. })
  161. }
  162. }
  163. // Release the incoming Messages to be returned to the caller
  164. private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>, releaseMessageSubject: Subject<Message>): Subscription | null {
  165. let status: Status = 1
  166. if (status = 1) {
  167. if (!messageReleaseSubscription) {
  168. messageReleaseSubscription = applicationOutgoingMessage.subscribe({
  169. next: (message: Message) => {
  170. console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`);
  171. releaseMessageSubject.next(message);
  172. },
  173. error: (err) => console.error(err),
  174. complete: () => { },
  175. });
  176. console.log(`Subscription message release activated.`);
  177. } else {
  178. status = 0
  179. console.log(`Subscription message release is already active.`);
  180. }
  181. }
  182. return messageReleaseSubscription
  183. }
  184. // Stop the incoming Messages to be returned to caller
  185. private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null {
  186. let status: Status = 1
  187. if (status = 1) {
  188. if (messageReleaseSubscription) {
  189. messageReleaseSubscription.unsubscribe();
  190. messageReleaseSubscription = null;
  191. console.log(`Subscription message release deactivated.`);
  192. } else {
  193. console.log(`Subscription message release is already deactivated.`);
  194. }
  195. }
  196. return messageReleaseSubscription
  197. }
  198. // Begin to push the incoming messages into local instantarray
  199. private activateBufferSubscription(bufferStorage: Message[], messageBufferSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {
  200. let status: Status = 1
  201. if (status = 1) {
  202. if (!messageBufferSubscription) {
  203. messageBufferSubscription = applicationOutgoingMessage.subscribe({
  204. next: (message: any) => {
  205. console.log(`Buffering ${(message.message as MessageLog).appData.msgId}... Local array length: ${bufferStorage.length}`);
  206. bufferStorage.push(message)
  207. },
  208. error: (err) => console.error(err),
  209. complete: () => { },
  210. });
  211. console.log(`Subscription message buffer activated.`);
  212. } else {
  213. status = 0
  214. console.log(`Subscription message buffer is already active.`);
  215. }
  216. }
  217. return messageBufferSubscription
  218. }
  219. // Stop pushing the incoming messages into local instantarray
  220. private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null {
  221. let status: Status = 1
  222. if (status) {
  223. if (messageBufferSubscription) {
  224. messageBufferSubscription.unsubscribe();
  225. messageBufferSubscription = null;
  226. console.log(`Subscription message buffer deactivated.`);
  227. } else {
  228. status = 0
  229. console.log(`Subscription message buffer is already deactivated.`);
  230. }
  231. }
  232. return null
  233. }
  234. // Change the streaming direction of the incoming messages into mongo streaming subject( to be saved in local databse )
  235. private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {
  236. let status: Status = 1
  237. if (status = 1) {
  238. if (!messageStreamToMongo) {
  239. messageStreamToMongo = applicationOutgoingMessage.subscribe({
  240. next: (message: any) => {
  241. console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`);
  242. this.saveToMongo(message)
  243. },
  244. error: (err) => console.error(err),
  245. complete: () => { },
  246. });
  247. console.log(`Subscription message streaming to Mongo activated.`);
  248. } else {
  249. status = 0
  250. console.log(`Subscription message streaming to Mongo is already active.`);
  251. }
  252. }
  253. return messageStreamToMongo
  254. }
  255. // Stop or cut off the mongo streaming
  256. private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null {
  257. let status: Status = 1
  258. if (status = 1) {
  259. if (messageStreamToMongo) {
  260. messageStreamToMongo.unsubscribe();
  261. messageStreamToMongo = null;
  262. console.log(`Subscription message streaming to Mongo deactivated.`);
  263. } else {
  264. status = 0
  265. console.log(`Subscription message streaming to Mongo is already deactivated.`);
  266. }
  267. }
  268. return messageStreamToMongo
  269. }
  270. // To be used by mongoStreamSubscription to perform the saving execution
  271. private async saveToMongo(message: Message): Promise<boolean> {
  272. return new Promise((resolve, reject) => {
  273. // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))
  274. if (this.messageModel) {
  275. this.messageModel.create(message).then(() => {
  276. console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);
  277. resolve(true)
  278. }).catch((err) => {
  279. console.log(`MongoSaveError: ${err.message}`)
  280. reject(err)
  281. })
  282. } else {
  283. console.log(`Cant save message. Message Model is absent or not properly initialized`)
  284. }
  285. })
  286. }
  287. // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed
  288. private async transferBufferedMessageToMongoStorage(bufferedMessage: Message[], messageBufferSubscription: Subscription | null): Promise<Message[]> {
  289. return new Promise((resolve, reject) => {
  290. let status: Status = 1
  291. if (status = 1) {
  292. let bufferedStorage: Observable<Message> = from(bufferedMessage)
  293. bufferedStorage.subscribe({
  294. next: (message: any) => {
  295. this.saveToMongo(message).then((res) => {
  296. console.log(`Message ${(message.message as MessageLog).appData.msgId} saved successfully...`)
  297. }).catch((err) => console.error(err))
  298. },
  299. error: (error) => {
  300. reject(error)
  301. console.error(error)
  302. },
  303. complete: () => {
  304. this.bufferedStorage = []
  305. if (messageBufferSubscription) {
  306. console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)
  307. }
  308. resolve(this.bufferedStorage)
  309. }
  310. })
  311. }
  312. })
  313. }
  314. // Transfer stored messages from the local instance back into the stream to be returned to the caller.
  315. private async releaseMessageFromLocalBuffer(bufferedStorage: Message[]): Promise<Observable<Message>> {
  316. return new Promise((resolve, reject) => {
  317. let status: Status = 1
  318. if (status = 1) {
  319. if (bufferedStorage.length > 1) {
  320. let caseVariable = this.bufferedStorage.length > 1;
  321. console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
  322. let returnArrayObs: Observable<Message> = from(bufferedStorage)
  323. resolve(returnArrayObs)
  324. } else {
  325. let message = `There is no data in stored in local instance`
  326. reject(message)
  327. }
  328. }
  329. })
  330. }
  331. // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred.
  332. private async releaseMessageFromMongoStorage(): Promise<Subject<Message>> {
  333. return new Promise((resolve, reject) => {
  334. let status: Status = 1
  335. if (status = 1) {
  336. let dataSubject: Subject<Message> = new Subject()
  337. this.extractAllMessages(dataSubject)
  338. resolve(dataSubject)
  339. }
  340. })
  341. }
  342. // Connect to designated mongodatabase.
  343. private async connectToMongoDatabase(databaseName: string): Promise<any> {
  344. return new Promise((resolve, reject) => {
  345. let status: Status = 1
  346. if (status = 1) {
  347. let database = this.mongoUrl + databaseName
  348. console.log(`Connected to ${database}`)
  349. this.mongoConnection = mongoose.createConnection(database)
  350. this.mongoConnection.on('error', (error) => {
  351. console.error('Connection error:', error);
  352. resolve('')
  353. });
  354. this.mongoConnection.once('open', () => {
  355. // console.log(`Connected to ${process.env.MONGO}`);
  356. let report: ReportStatus = {
  357. code: ColorCode.RED,
  358. message: `Mongo storage available`
  359. }
  360. this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
  361. this.statusReport.next(report)
  362. });
  363. }
  364. })
  365. }
  366. // Manage mongoCOnnectino. The logic used would be different across differnet application. This will loop the process indefinitely os it is always trying to connect to database.
  367. private async manageMongoConnection(databaseName: string): Promise<boolean> {
  368. while (true) {
  369. try {
  370. await this.connectToMongoDatabase(databaseName)
  371. } catch (error) {
  372. console.log(`Something Wrong occured. Please check at manageMongoConnection`)
  373. }
  374. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  375. }
  376. }
  377. // This will be used to release all the hostage messages once the light is green.
  378. public async extractAllMessages(subjectArgs: Subject<Message>): Promise<void> {
  379. // Need to resolve the issue of streaming in a specific order that is sequential
  380. let status: Status = 1
  381. if (status = 1) {
  382. if (this.messageModel) {
  383. const eventStream = this.messageModel.find().lean().cursor()
  384. eventStream.on('data', (message) => {
  385. // Emit each document to the subject
  386. subjectArgs.next(message);
  387. });
  388. eventStream.on('end', async () => {
  389. // All data has been streamed, complete the subject
  390. subjectArgs.complete();
  391. // Delete the data once it has been streamed
  392. try {
  393. if (this.messageModel) {
  394. await this.messageModel.deleteMany({});
  395. console.log('Data in Mongo deleted successfully.');
  396. } else {
  397. console.log(`Message Mongoose Model is not intiated properly...`)
  398. }
  399. } catch (err) {
  400. console.error('Error deleting data:', err);
  401. }
  402. });
  403. } else {
  404. status = 0
  405. console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connection properly!`)
  406. }
  407. }
  408. }
  409. }
  410. // Store in json file in this project folder. To be enabled in future
  411. // private async transferMessageToLocalStorage(message: Subject<any>): Promise<void> {
  412. // let localArray: any[] = this.bufferedStorage
  413. // let filename = `localstorage.json`;
  414. // while (localArray.length > 0) {
  415. // let objectToWrite = this.bufferedStorage[0];
  416. // await writeMessage(objectToWrite, filename)
  417. // }
  418. // message.subscribe((message: any) => {
  419. // writeMessage(message, filename)
  420. // })
  421. // if (localArray.length < 1) this.bufferedStorage = localArray
  422. // console.log('Local Array is empty. Finished transferring to files.')
  423. // async function writeMessage(message: any, filename: string) {
  424. // try {
  425. // let stringifiedMessage = JSON.stringify(message);
  426. // await fs.promises.appendFile(filename, stringifiedMessage + "\r\n")
  427. // console.log(`Successfully transferred ${filename}`);
  428. // localArray.shift();
  429. // } catch (err) {
  430. // console.error(`Error trasferring ${filename}:`, err);
  431. // }
  432. // }
  433. // }