fis.retransmission.service.ts 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. import * as _ from 'lodash'
  2. import mongoose, { Model, Schema } from 'mongoose';
  3. import { BehaviorSubject, Observable, Subject, Subscription, from } from 'rxjs'
  4. import { ColorCode, Message, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
  5. require('dotenv').config();
  6. // Implement status chain refactoring
  7. export class FisRetransmissionService {
  8. private mongoUrl: string = process.env.MONGO + 'emergencyStorage'
  9. private bufferedStorage: Message[] = []
  10. private mongoConnection: any
  11. private messageModel: any
  12. private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // please configure at environment
  13. // private statusReport: Subject<ReportStatus> = new Subject()
  14. constructor() {
  15. // Connect to mongoDB.
  16. this.manageMongoConnection()
  17. }
  18. // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator
  19. public handleMessage(applicationOutgoingMessage: Subject<Message>, statusReport: BehaviorSubject<ReportStatus>): Subject<Message> {
  20. let releaseMessageSubject: Subject<Message> = new Subject() // Every message subscribed from applicationOutgoingMessage will be released through this subject
  21. let messageReleaseSubscription: Subscription | null = null
  22. let messageBufferSubscription: Subscription | null = null
  23. let messageStreamToMongo: Subscription | null = null
  24. this.checkBufferLimit(applicationOutgoingMessage, statusReport)
  25. statusReport.subscribe((report: ReportStatus) => {
  26. /* Green should release all data from buffer and mongo and also redirect the applicationOutgoingMessage back into the return subject(releaseMessageSubject)
  27. if there's any. */
  28. if (report.code == ColorCode.GREEN) {
  29. console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  30. /* Status Chain begins */
  31. let status: Status = 1
  32. if (status === 1) {
  33. messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo)
  34. if (messageStreamToMongo) status = 0
  35. }
  36. if (status === 1) {
  37. messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
  38. if (messageBufferSubscription) status = 0
  39. }
  40. if (status === 1) {
  41. messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, applicationOutgoingMessage, releaseMessageSubject)
  42. if (!messageReleaseSubscription) status = 0
  43. }
  44. if (status === 1) {
  45. this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<Message>) => {
  46. resObs.subscribe({
  47. next: message => releaseMessageSubject.next(message),
  48. error: err => console.error(err),
  49. complete: () => {
  50. this.bufferedStorage = []
  51. console.log(`Reset buffer Storage count: ${this.bufferedStorage.length}. All messages have been released back into the stream.`)
  52. }
  53. })
  54. }).catch((err) => {
  55. status = 0
  56. console.error(err)
  57. })
  58. }
  59. if (status === 1) {
  60. this.releaseMessageFromMongoStorage().then((resObs: Subject<Message>) => {
  61. resObs.subscribe({
  62. next: message => releaseMessageSubject.next(message),
  63. error: err => console.error(err),
  64. complete: () => console.log(`All Mongo data are transferred `)
  65. })
  66. }).catch((err) => {
  67. status = 0
  68. console.error(err)
  69. })
  70. }
  71. if (status === 0) {
  72. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  73. }
  74. }
  75. /* Start buffering the messages coming in from applicationOutgonigMessages and also stop it from flowing into the release subject */
  76. if (report.code == ColorCode.YELLOW) {
  77. if (report.payload) {
  78. console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)
  79. this.bufferedStorage.push(report.payload)
  80. }
  81. console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  82. let status: Status = 1
  83. /* Status Chain begins */
  84. if (status === 1) {
  85. messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, applicationOutgoingMessage)
  86. if (!messageBufferSubscription) status = 0
  87. }
  88. if (status === 1) {
  89. messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
  90. if (messageReleaseSubscription) status = 0
  91. }
  92. if (status === 0) {
  93. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  94. }
  95. }
  96. /* Stop buffering the message in local instance, but start saving them in database. Must first transfer the ones in local buffer before redirecting the
  97. flow from applicationOutgoingMessage into Mongo */
  98. if (report.code == ColorCode.RED) {
  99. console.log(`Connection status report: Server down. ${report.message} lol`)
  100. let status: Status = 1
  101. if (status === 1) {
  102. messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, applicationOutgoingMessage)
  103. if (!messageStreamToMongo) status = 0
  104. }
  105. if (status === 1) {
  106. messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
  107. if (messageBufferSubscription) status = 0
  108. }
  109. if (status === 1) {
  110. this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => {
  111. if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1 // this promise function should return an empty array
  112. })
  113. }
  114. if (status === 0) {
  115. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  116. }
  117. }
  118. if (!report.code) {
  119. console.log(`Unknown message...`)
  120. }
  121. })
  122. return releaseMessageSubject
  123. }
  124. // IF Buffer exceeds a certain limit, it will trigger RED. Configure in .env file. There's the concern of 2 RED status, one from this and another from other means.
  125. // Behaviour of this needs to be investigated further
  126. private checkBufferLimit(message: Subject<Message>, statusReport: Subject<ReportStatus>) {
  127. let status: Status = 1
  128. if (status = 1) {
  129. message.subscribe(() => {
  130. if (this.bufferedStorage.length >= this.maximumBufferLength) {
  131. // for every messges that comes in, check the bufffer size, if it exceesd more than designated amount, push a red report status i
  132. console.log(`Buffer length exceeds limit imposed!!!`)
  133. let report: ReportStatus = {
  134. code: ColorCode.RED,
  135. message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
  136. }
  137. statusReport.next(report)
  138. }
  139. })
  140. }
  141. }
  142. // Release the incoming Messages to be returned to the caller
  143. private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>, releaseMessageSubject: Subject<Message>): Subscription | null {
  144. let status: Status = 1
  145. if (status = 1) {
  146. if (!messageReleaseSubscription) {
  147. messageReleaseSubscription = applicationOutgoingMessage.subscribe({
  148. next: (message: Message) => {
  149. console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`);
  150. releaseMessageSubject.next(message);
  151. },
  152. error: (err) => console.error(err),
  153. complete: () => { },
  154. });
  155. console.log(`Subscription message release activated.`);
  156. } else {
  157. status = 0
  158. console.log(`Subscription message release is already active.`);
  159. }
  160. }
  161. return messageReleaseSubscription
  162. }
  163. // Stop the incoming Messages to be returned to caller
  164. private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null {
  165. let status: Status = 1
  166. if (status = 1) {
  167. if (messageReleaseSubscription) {
  168. messageReleaseSubscription.unsubscribe();
  169. messageReleaseSubscription = null;
  170. console.log(`Subscription message release deactivated.`);
  171. } else {
  172. console.log(`Subscription message release is already deactivated.`);
  173. }
  174. }
  175. return messageReleaseSubscription
  176. }
  177. // Begin to push the incoming messages into local instantarray
  178. private activateBufferSubscription(bufferStorage: Message[], messageBufferSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {
  179. let status: Status = 1
  180. if (status = 1) {
  181. if (!messageBufferSubscription) {
  182. messageBufferSubscription = applicationOutgoingMessage.subscribe({
  183. next: (message: any) => {
  184. console.log(`Buffering ${(message.message as MessageLog).appData.msgId}... Local array length: ${bufferStorage.length}`);
  185. bufferStorage.push(message)
  186. },
  187. error: (err) => console.error(err),
  188. complete: () => { },
  189. });
  190. console.log(`Subscription message buffer activated.`);
  191. } else {
  192. status = 0
  193. console.log(`Subscription message buffer is already active.`);
  194. }
  195. }
  196. return messageBufferSubscription
  197. }
  198. // Stop pushing the incoming messages into local instantarray
  199. private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null {
  200. let status: Status = 1
  201. if (status) {
  202. if (messageBufferSubscription) {
  203. messageBufferSubscription.unsubscribe();
  204. messageBufferSubscription = null;
  205. console.log(`Subscription message buffer deactivated.`);
  206. } else {
  207. status = 0
  208. console.log(`Subscription message buffer is already deactivated.`);
  209. }
  210. }
  211. return null
  212. }
  213. // Change the streaming direction of the incoming messages into mongo streaming subject( to be saved in local databse )
  214. private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {
  215. let status: Status = 1
  216. if (status = 1) {
  217. if (!messageStreamToMongo) {
  218. messageStreamToMongo = applicationOutgoingMessage.subscribe({
  219. next: (message: any) => {
  220. console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`);
  221. this.saveToMongo(message)
  222. },
  223. error: (err) => console.error(err),
  224. complete: () => { },
  225. });
  226. console.log(`Subscription message streaming to Mongo activated.`);
  227. } else {
  228. status = 0
  229. console.log(`Subscription message streaming to Mongo is already active.`);
  230. }
  231. }
  232. return messageStreamToMongo
  233. }
  234. // Stop or cut off the mongo streaming
  235. private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null {
  236. let status: Status = 1
  237. if (status = 1) {
  238. if (messageStreamToMongo) {
  239. messageStreamToMongo.unsubscribe();
  240. messageStreamToMongo = null;
  241. console.log(`Subscription message streaming to Mongo deactivated.`);
  242. } else {
  243. status = 0
  244. console.log(`Subscription message streaming to Mongo is already deactivated.`);
  245. }
  246. }
  247. return messageStreamToMongo
  248. }
  249. // To be used by mongoStreamSubscription to perform the saving execution
  250. private async saveToMongo(message: Message): Promise<boolean> {
  251. return new Promise((resolve, reject) => {
  252. // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))
  253. this.messageModel.create(message).then(() => {
  254. console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);
  255. resolve(true)
  256. }).catch((err) => {
  257. console.log(`MongoSaveError: ${err.message}`)
  258. reject(err)
  259. })
  260. })
  261. }
  262. // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed
  263. private async transferBufferedMessageToMongoStorage(bufferedMessage: Message[], messageBufferSubscription: Subscription | null): Promise<Message[]> {
  264. return new Promise((resolve, reject) => {
  265. let status: Status = 1
  266. if (status = 1) {
  267. let bufferedStorage: Observable<Message> = from(bufferedMessage)
  268. bufferedStorage.subscribe({
  269. next: (message: any) => {
  270. this.saveToMongo(message).then((res) => {
  271. console.log(`Message ${(message.message as MessageLog).appData.msgId} saved successfully...`)
  272. }).catch((err) => console.error(err))
  273. },
  274. error: (error) => {
  275. reject(error)
  276. console.error(error)
  277. },
  278. complete: () => {
  279. this.bufferedStorage = []
  280. if (messageBufferSubscription) {
  281. console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)
  282. }
  283. resolve(this.bufferedStorage)
  284. }
  285. })
  286. }
  287. })
  288. }
  289. // Transfer stored messages from the local instance back into the stream to be returned to the caller.
  290. private async releaseMessageFromLocalBuffer(bufferedStorage: Message[]): Promise<Observable<Message>> {
  291. return new Promise((resolve, reject) => {
  292. let status: Status = 1
  293. if (status = 1) {
  294. if (bufferedStorage.length > 1) {
  295. let caseVariable = this.bufferedStorage.length > 1;
  296. console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
  297. let returnArrayObs: Observable<Message> = from(bufferedStorage)
  298. resolve(returnArrayObs)
  299. } else {
  300. let message = `There is no data in stored in local instance`
  301. reject(message)
  302. }
  303. }
  304. })
  305. }
  306. // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred.
  307. private async releaseMessageFromMongoStorage(): Promise<Subject<Message>> {
  308. return new Promise((resolve, reject) => {
  309. let status: Status = 1
  310. if (status = 1) {
  311. let dataSubject: Subject<Message> = new Subject()
  312. this.extractAllMessages(dataSubject)
  313. resolve(dataSubject)
  314. }
  315. })
  316. }
  317. // Connect to designated mongodatabase.
  318. private async connectToMongoDatabase(): Promise<any> {
  319. return new Promise((resolve, reject) => {
  320. let status: Status = 1
  321. if (status = 1) {
  322. console.log(this.mongoUrl)
  323. this.mongoConnection = mongoose.createConnection(this.mongoUrl)
  324. this.mongoConnection.on('error', (error) => {
  325. console.error('Connection error:', error);
  326. resolve('')
  327. });
  328. this.mongoConnection.once('open', () => {
  329. console.log(`Connected to ${process.env.MONGO}`);
  330. this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
  331. });
  332. }
  333. })
  334. }
  335. // Manage mongoCOnnectino. The logic used would be different across differnet application. This will loop the process indefinitely os it is always trying to connect to database.
  336. private async manageMongoConnection(): Promise<boolean> {
  337. while (true) {
  338. try {
  339. await this.connectToMongoDatabase()
  340. } catch (error) {
  341. console.log(`Something Wrong occured. Please check at manageMongoConnection`)
  342. }
  343. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  344. }
  345. }
  346. // This will be used to release all the hostage messages once the light is green.
  347. public async extractAllMessages(subjectArgs: Subject<Message>): Promise<void> {
  348. // Need to resolve the issue of streaming in a specific order that is sequential
  349. let status: Status = 1
  350. if (status = 1) {
  351. if (this.messageModel) {
  352. const eventStream = this.messageModel.find().lean().cursor()
  353. eventStream.on('data', (message) => {
  354. // Emit each document to the subject
  355. subjectArgs.next(message);
  356. });
  357. eventStream.on('end', async () => {
  358. // All data has been streamed, complete the subject
  359. subjectArgs.complete();
  360. // Delete the data once it has been streamed
  361. try {
  362. await this.messageModel.deleteMany({});
  363. console.log('Data in Mongo deleted successfully.');
  364. } catch (err) {
  365. console.error('Error deleting data:', err);
  366. }
  367. });
  368. } else {
  369. status = 0
  370. console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connection properly!`)
  371. }
  372. }
  373. }
  374. }
  375. // Store in json file in this project folder. To be enabled in future
  376. // private async transferMessageToLocalStorage(message: Subject<any>): Promise<void> {
  377. // let localArray: any[] = this.bufferedStorage
  378. // let filename = `localstorage.json`;
  379. // while (localArray.length > 0) {
  380. // let objectToWrite = this.bufferedStorage[0];
  381. // await writeMessage(objectToWrite, filename)
  382. // }
  383. // message.subscribe((message: any) => {
  384. // writeMessage(message, filename)
  385. // })
  386. // if (localArray.length < 1) this.bufferedStorage = localArray
  387. // console.log('Local Array is empty. Finished transferring to files.')
  388. // async function writeMessage(message: any, filename: string) {
  389. // try {
  390. // let stringifiedMessage = JSON.stringify(message);
  391. // await fs.promises.appendFile(filename, stringifiedMessage + "\r\n")
  392. // console.log(`Successfully transferred ${filename}`);
  393. // localArray.shift();
  394. // } catch (err) {
  395. // console.error(`Error trasferring ${filename}:`, err);
  396. // }
  397. // }
  398. // }