fis.retransmission.service.ts 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. import mongoose, { Model, Schema } from 'mongoose';
  2. import { BehaviorSubject, Observable, Subject, Subscription, from } from 'rxjs'
  3. import { ColorCode, Message, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
  4. require('dotenv').config();
  5. // Implement status chain refactoring
  6. export class FisRetransmissionService {
  7. private mongoUrl: string = process.env.MONGO as string
  8. private bufferedStorage: Message[] = []
  9. private mongoConnection: any
  10. private messageModel: Model<any> | null | undefined
  11. private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // please configure at environment
  12. constructor(private databaseName: string, private statusReport: BehaviorSubject<ReportStatus>) {
  13. // Connect to mongoDB.
  14. this.manageMongoConnection(databaseName)
  15. }
  16. // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator
  17. public handleMessage(applicationOutgoingMessage: Subject<Message>): Subject<Message> {
  18. let releaseMessageSubject: Subject<Message> = new Subject() // Every message subscribed from applicationOutgoingMessage will be released through this subject
  19. let messageReleaseSubscription: Subscription | null = null
  20. let messageBufferSubscription: Subscription | null = null
  21. let messageStreamToMongo: Subscription | null = null
  22. this.checkBufferLimit(applicationOutgoingMessage, this.statusReport)
  23. this.statusReport.subscribe((report: ReportStatus) => {
  24. /* Green should release all data from buffer and mongo and also redirect the applicationOutgoingMessage back into the return subject(releaseMessageSubject)
  25. if there's any. */
  26. if (report.code == ColorCode.GREEN) {
  27. // console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  28. /* Status Chain begins */
  29. let status: Status = 1
  30. if (status === 1) {
  31. messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo)
  32. if (messageStreamToMongo) status = 0
  33. }
  34. if (status === 1) {
  35. messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
  36. if (messageBufferSubscription) status = 0
  37. }
  38. if (status === 1) {
  39. messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, applicationOutgoingMessage, releaseMessageSubject)
  40. if (!messageReleaseSubscription) status = 0
  41. }
  42. if (status === 1) {
  43. this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<Message>) => {
  44. resObs.subscribe({
  45. next: message => releaseMessageSubject.next(message),
  46. error: err => console.error(err),
  47. complete: () => {
  48. this.bufferedStorage = []
  49. console.log(`Reset buffer Storage count: ${this.bufferedStorage.length}. All messages have been released back into the stream.`)
  50. }
  51. })
  52. }).catch((err) => {
  53. status = 0
  54. console.error(err)
  55. })
  56. }
  57. if (status === 1) {
  58. this.releaseMessageFromMongoStorage().then((resObs: Subject<Message>) => {
  59. resObs.subscribe({
  60. next: message => releaseMessageSubject.next(message),
  61. error: err => console.error(err),
  62. complete: () => console.log(`All Mongo data are transferred `)
  63. })
  64. }).catch((err) => {
  65. status = 0
  66. console.error(err)
  67. })
  68. }
  69. if (status === 0) {
  70. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  71. }
  72. }
  73. /* Start buffering the messages coming in from applicationOutgonigMessages and also stop it from flowing into the release subject */
  74. if (report.code == ColorCode.YELLOW) {
  75. if (report.payload) {
  76. console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)
  77. this.bufferedStorage.push(report.payload)
  78. }
  79. // console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  80. let status: Status = 1
  81. /* Status Chain begins */
  82. if (status === 1) {
  83. messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, applicationOutgoingMessage)
  84. if (!messageBufferSubscription) status = 0
  85. }
  86. if (status === 1) {
  87. messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
  88. if (messageReleaseSubscription) status = 0
  89. }
  90. if (status === 0) {
  91. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  92. }
  93. }
  94. /* Stop buffering the message in local instance, but start saving them in database. Must first transfer the ones in local buffer before redirecting the
  95. flow from applicationOutgoingMessage into Mongo */
  96. if (report.code == ColorCode.RED) {
  97. // console.log(`Connection status report: ${report.message}`)
  98. if (report.payload) {
  99. console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into storage...`)
  100. this.saveToMongo(report.payload)
  101. }
  102. console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  103. let status: Status = 1
  104. if (status === 1) {
  105. messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, applicationOutgoingMessage)
  106. if (!messageStreamToMongo) status = 0
  107. }
  108. if (status === 1) {
  109. messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
  110. if (messageReleaseSubscription) status = 0
  111. }
  112. if (status === 1) {
  113. messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
  114. if (messageBufferSubscription) status = 0
  115. }
  116. if (status === 1) {
  117. this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => {
  118. if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1 // this promise function should return an empty array
  119. })
  120. }
  121. if (status === 0) {
  122. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  123. }
  124. }
  125. if (!report.code) {
  126. console.log(`Unknown message...`)
  127. }
  128. })
  129. return releaseMessageSubject
  130. }
  131. // IF Buffer exceeds a certain limit, it will trigger RED. Configure in .env file. There's the concern of 2 RED status, one from this and another from other means.
  132. // Behaviour of this needs to be investigated further
  133. private checkBufferLimit(message: Subject<Message>, statusReport: Subject<ReportStatus>) {
  134. let status: Status = 1
  135. if (status = 1) {
  136. message.subscribe(() => {
  137. if (this.bufferedStorage.length >= this.maximumBufferLength) {
  138. // for every messges that comes in, check the bufffer size, if it exceesd more than designated amount, push a red report status i
  139. console.log(`Buffer length exceeds limit imposed!!!`)
  140. let report: ReportStatus = {
  141. code: ColorCode.RED,
  142. message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
  143. }
  144. statusReport.next(report)
  145. }
  146. })
  147. }
  148. }
  149. // Release the incoming Messages to be returned to the caller
  150. private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>, releaseMessageSubject: Subject<Message>): Subscription | null {
  151. let status: Status = 1
  152. if (status = 1) {
  153. if (!messageReleaseSubscription) {
  154. messageReleaseSubscription = applicationOutgoingMessage.subscribe({
  155. next: (message: Message) => {
  156. console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`);
  157. releaseMessageSubject.next(message);
  158. },
  159. error: (err) => console.error(err),
  160. complete: () => { },
  161. });
  162. console.log(`Subscription message release activated.`);
  163. } else {
  164. status = 0
  165. console.log(`Subscription message release is already active.`);
  166. }
  167. }
  168. return messageReleaseSubscription
  169. }
  170. // Stop the incoming Messages to be returned to caller
  171. private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null {
  172. let status: Status = 1
  173. if (status = 1) {
  174. if (messageReleaseSubscription) {
  175. messageReleaseSubscription.unsubscribe();
  176. messageReleaseSubscription = null;
  177. console.log(`Subscription message release deactivated.`);
  178. } else {
  179. console.log(`Subscription message release is already deactivated.`);
  180. }
  181. }
  182. return messageReleaseSubscription
  183. }
  184. // Begin to push the incoming messages into local instantarray
  185. private activateBufferSubscription(bufferStorage: Message[], messageBufferSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {
  186. let status: Status = 1
  187. if (status = 1) {
  188. if (!messageBufferSubscription) {
  189. messageBufferSubscription = applicationOutgoingMessage.subscribe({
  190. next: (message: any) => {
  191. console.log(`Buffering ${(message.message as MessageLog).appData.msgId}... Local array length: ${bufferStorage.length}`);
  192. bufferStorage.push(message)
  193. },
  194. error: (err) => console.error(err),
  195. complete: () => { },
  196. });
  197. console.log(`Subscription message buffer activated.`);
  198. } else {
  199. status = 0
  200. console.log(`Subscription message buffer is already active.`);
  201. }
  202. }
  203. return messageBufferSubscription
  204. }
  205. // Stop pushing the incoming messages into local instantarray
  206. private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null {
  207. let status: Status = 1
  208. if (status) {
  209. if (messageBufferSubscription) {
  210. messageBufferSubscription.unsubscribe();
  211. messageBufferSubscription = null;
  212. console.log(`Subscription message buffer deactivated.`);
  213. } else {
  214. status = 0
  215. console.log(`Subscription message buffer is already deactivated.`);
  216. }
  217. }
  218. return null
  219. }
  220. // Change the streaming direction of the incoming messages into mongo streaming subject( to be saved in local databse )
  221. private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {
  222. let status: Status = 1
  223. if (status = 1) {
  224. if (!messageStreamToMongo) {
  225. messageStreamToMongo = applicationOutgoingMessage.subscribe({
  226. next: (message: any) => {
  227. console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`);
  228. this.saveToMongo(message)
  229. },
  230. error: (err) => console.error(err),
  231. complete: () => { },
  232. });
  233. console.log(`Subscription message streaming to Mongo activated.`);
  234. } else {
  235. status = 0
  236. console.log(`Subscription message streaming to Mongo is already active.`);
  237. }
  238. }
  239. return messageStreamToMongo
  240. }
  241. // Stop or cut off the mongo streaming
  242. private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null {
  243. let status: Status = 1
  244. if (status = 1) {
  245. if (messageStreamToMongo) {
  246. messageStreamToMongo.unsubscribe();
  247. messageStreamToMongo = null;
  248. console.log(`Subscription message streaming to Mongo deactivated.`);
  249. } else {
  250. status = 0
  251. console.log(`Subscription message streaming to Mongo is already deactivated.`);
  252. }
  253. }
  254. return messageStreamToMongo
  255. }
  256. // To be used by mongoStreamSubscription to perform the saving execution
  257. private async saveToMongo(message: Message): Promise<boolean> {
  258. return new Promise((resolve, reject) => {
  259. // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))
  260. if (this.messageModel) {
  261. this.messageModel.create(message).then(() => {
  262. console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);
  263. resolve(true)
  264. }).catch((err) => {
  265. console.log(`MongoSaveError: ${err.message}`)
  266. reject(err)
  267. })
  268. } else {
  269. console.log(`Cant save message. Message Model is absent or not properly initialized`)
  270. }
  271. })
  272. }
  273. // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed
  274. private async transferBufferedMessageToMongoStorage(bufferedMessage: Message[], messageBufferSubscription: Subscription | null): Promise<Message[]> {
  275. return new Promise((resolve, reject) => {
  276. let status: Status = 1
  277. if (status = 1) {
  278. let bufferedStorage: Observable<Message> = from(bufferedMessage)
  279. bufferedStorage.subscribe({
  280. next: (message: any) => {
  281. this.saveToMongo(message).then((res) => {
  282. console.log(`Message ${(message.message as MessageLog).appData.msgId} saved successfully...`)
  283. }).catch((err) => console.error(err))
  284. },
  285. error: (error) => {
  286. reject(error)
  287. console.error(error)
  288. },
  289. complete: () => {
  290. this.bufferedStorage = []
  291. if (messageBufferSubscription) {
  292. console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)
  293. }
  294. resolve(this.bufferedStorage)
  295. }
  296. })
  297. }
  298. })
  299. }
  300. // Transfer stored messages from the local instance back into the stream to be returned to the caller.
  301. private async releaseMessageFromLocalBuffer(bufferedStorage: Message[]): Promise<Observable<Message>> {
  302. return new Promise((resolve, reject) => {
  303. let status: Status = 1
  304. if (status = 1) {
  305. if (bufferedStorage.length > 1) {
  306. let caseVariable = this.bufferedStorage.length > 1;
  307. console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
  308. let returnArrayObs: Observable<Message> = from(bufferedStorage)
  309. resolve(returnArrayObs)
  310. } else {
  311. let message = `There is no data in stored in local instance`
  312. reject(message)
  313. }
  314. }
  315. })
  316. }
  317. // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred.
  318. private async releaseMessageFromMongoStorage(): Promise<Subject<Message>> {
  319. return new Promise((resolve, reject) => {
  320. let status: Status = 1
  321. if (status = 1) {
  322. let dataSubject: Subject<Message> = new Subject()
  323. this.extractAllMessages(dataSubject)
  324. resolve(dataSubject)
  325. }
  326. })
  327. }
  328. // Connect to designated mongodatabase.
  329. private async connectToMongoDatabase(databaseName: string): Promise<any> {
  330. return new Promise((resolve, reject) => {
  331. let status: Status = 1
  332. if (status = 1) {
  333. let database = this.mongoUrl + databaseName
  334. console.log(`Connected to ${database}`)
  335. this.mongoConnection = mongoose.createConnection(database)
  336. this.mongoConnection.on('error', (error) => {
  337. console.error('Connection error:', error);
  338. resolve('')
  339. });
  340. this.mongoConnection.once('open', () => {
  341. // console.log(`Connected to ${process.env.MONGO}`);
  342. let report: ReportStatus = {
  343. code: ColorCode.RED,
  344. message: `Mongo storage available`
  345. }
  346. this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
  347. this.statusReport.next(report)
  348. });
  349. }
  350. })
  351. }
  352. // Manage mongoCOnnectino. The logic used would be different across differnet application. This will loop the process indefinitely os it is always trying to connect to database.
  353. private async manageMongoConnection(databaseName: string): Promise<boolean> {
  354. while (true) {
  355. try {
  356. await this.connectToMongoDatabase(databaseName)
  357. } catch (error) {
  358. console.log(`Something Wrong occured. Please check at manageMongoConnection`)
  359. }
  360. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  361. }
  362. }
  363. // This will be used to release all the hostage messages once the light is green.
  364. public async extractAllMessages(subjectArgs: Subject<Message>): Promise<void> {
  365. // Need to resolve the issue of streaming in a specific order that is sequential
  366. let status: Status = 1
  367. if (status = 1) {
  368. if (this.messageModel) {
  369. const eventStream = this.messageModel.find().lean().cursor()
  370. eventStream.on('data', (message) => {
  371. // Emit each document to the subject
  372. subjectArgs.next(message);
  373. });
  374. eventStream.on('end', async () => {
  375. // All data has been streamed, complete the subject
  376. subjectArgs.complete();
  377. // Delete the data once it has been streamed
  378. try {
  379. if (this.messageModel) {
  380. await this.messageModel.deleteMany({});
  381. console.log('Data in Mongo deleted successfully.');
  382. } else {
  383. console.log(`Message Mongoose Model is not intiated properly...`)
  384. }
  385. } catch (err) {
  386. console.error('Error deleting data:', err);
  387. }
  388. });
  389. } else {
  390. status = 0
  391. console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connection properly!`)
  392. }
  393. }
  394. }
  395. }
  396. // Store in json file in this project folder. To be enabled in future
  397. // private async transferMessageToLocalStorage(message: Subject<any>): Promise<void> {
  398. // let localArray: any[] = this.bufferedStorage
  399. // let filename = `localstorage.json`;
  400. // while (localArray.length > 0) {
  401. // let objectToWrite = this.bufferedStorage[0];
  402. // await writeMessage(objectToWrite, filename)
  403. // }
  404. // message.subscribe((message: any) => {
  405. // writeMessage(message, filename)
  406. // })
  407. // if (localArray.length < 1) this.bufferedStorage = localArray
  408. // console.log('Local Array is empty. Finished transferring to files.')
  409. // async function writeMessage(message: any, filename: string) {
  410. // try {
  411. // let stringifiedMessage = JSON.stringify(message);
  412. // await fs.promises.appendFile(filename, stringifiedMessage + "\r\n")
  413. // console.log(`Successfully transferred ${filename}`);
  414. // localArray.shift();
  415. // } catch (err) {
  416. // console.error(`Error trasferring ${filename}:`, err);
  417. // }
  418. // }
  419. // }