error.handling.service.fis.ts 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. import * as _ from 'lodash'
  2. import * as fs from 'fs'
  3. import mongoose, { Model, Schema } from 'mongoose';
  4. import { Observable, Subject, Subscription, from } from 'rxjs'
  5. import { ColorCode, MessageLog, ReportStatus } from '../interfaces/general.interface'
  6. require('dotenv').config();
  7. // Implement status chain refactoring
  8. export class FisErrorHandlingService {
  9. private mongoUrl: string = process.env.MONGO + 'emergencyStorage'
  10. private bufferedStorage: MessageLog[] = []
  11. private mongoConnection: any
  12. private messageModel: any
  13. private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // right now just put as 15
  14. constructor() {
  15. // Connect to mongoDB.
  16. this.manageMongoConnection()
  17. }
  18. // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator
  19. public handleMessage(messageToBePublished: Subject<MessageLog>, statusReport: Subject<ReportStatus>): Subject<MessageLog> {
  20. let releaseMessageSubject: Subject<MessageLog> = new Subject() // A return value
  21. // Using the concept of toggling to improve the eficacy of subscription control && data flow
  22. let messageReleaseSubscription: Subscription | null = null
  23. let messageBufferSubscription: Subscription | null = null
  24. let messageStreamToMongo: Subscription | null = null
  25. this.checkBufferLimit(messageToBePublished, statusReport)
  26. statusReport.subscribe((report: any) => {
  27. if (report.code == ColorCode.GREEN) {
  28. console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  29. /* Status Chain begins */
  30. let status: Status = 1
  31. if (status === 1) {
  32. messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo)
  33. if (messageStreamToMongo) status = -1
  34. }
  35. if (status === 1) {
  36. messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
  37. if (messageBufferSubscription) status = -1
  38. }
  39. if (status === 1) {
  40. messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, messageToBePublished, releaseMessageSubject)
  41. if (!messageReleaseSubscription) status = -1
  42. }
  43. if (status === 1) {
  44. this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<MessageLog>) => {
  45. resObs.subscribe({
  46. next: message => releaseMessageSubject.next(message),
  47. error: err => console.error(err),
  48. complete: () => {
  49. this.bufferedStorage = []
  50. console.log(`Reset buffer Storage count: ${this.bufferedStorage.length}. All messages have been released back into the stream.`)
  51. }
  52. })
  53. }).catch((err) => {
  54. status = -1
  55. console.error(err)
  56. })
  57. }
  58. if (status === 1) {
  59. this.releaseMessageFromMongoStorage().then((resObs: Subject<MessageLog>) => {
  60. resObs.subscribe({
  61. next: message => releaseMessageSubject.next(message),
  62. error: err => console.error(err),
  63. complete: () => console.log(`All Mongo data are transferred `)
  64. })
  65. }).catch((err) => {
  66. status = -1
  67. console.error(err)
  68. })
  69. }
  70. if (status === -1) {
  71. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  72. }
  73. }
  74. if (report.code == ColorCode.YELLOW) {
  75. if (report.payload) {
  76. console.log(`Rebuffering ${report.payload.appData?.msgId} into buffer...`)
  77. this.bufferedStorage.push(report.payload)
  78. }
  79. console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  80. let status: Status = 1
  81. /* Status Chain begins */
  82. if (status === 1) {
  83. messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, messageToBePublished)
  84. if (!messageBufferSubscription) status = -1
  85. }
  86. if (status === 1) {
  87. messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
  88. if (messageReleaseSubscription) status = -1
  89. }
  90. if (status === -1) {
  91. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  92. }
  93. }
  94. if (report.code == ColorCode.RED) {
  95. console.log(`Connection status report: Server down. ${report.message} lol`)
  96. let status: Status = 1
  97. if (status === 1) {
  98. messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, messageToBePublished)
  99. if (!messageStreamToMongo) status = -1
  100. }
  101. if (status === 1) {
  102. messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
  103. if (messageBufferSubscription) status = -1
  104. }
  105. if (status === 1) {
  106. this.transferBufferedMessagseToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: MessageLog[]) => {
  107. if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1 // this promise function should return an empty array
  108. })
  109. }
  110. if (status === -1) {
  111. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  112. }
  113. }
  114. if (!report.code || report.code == "") {
  115. console.log(`Unknown message...`)
  116. }
  117. })
  118. return releaseMessageSubject
  119. }
  120. private checkBufferLimit(message: Subject<any>, statusReport: Subject<ReportStatus>) {
  121. message.subscribe(() => {
  122. if (this.bufferedStorage.length >= this.maximumBufferLength) {
  123. // for every messges that comes in, check the bufffer size, if it exceesd more than designated amount, push a red report status i
  124. console.log(`Buffer length exceeds limit imposed!!!`)
  125. let report: ReportStatus = {
  126. code: ColorCode.RED,
  127. message: `Buffer is exceeding limit. Initiate storage transfer to designated database. `
  128. }
  129. statusReport.next(report)
  130. }
  131. })
  132. }
  133. // Release the incoming Messages to be returned to the caller
  134. private activateReleaseSubscription(messageReleaseSubscription, messageToBePublished, releaseMessageSubject): Subscription | null {
  135. if (!messageReleaseSubscription) {
  136. messageReleaseSubscription = messageToBePublished.subscribe({
  137. next: (message: MessageLog) => {
  138. console.log(`Releasing ${message.appData.msgId}...`);
  139. releaseMessageSubject.next(message);
  140. },
  141. error: (err) => console.error(err),
  142. complete: () => { },
  143. });
  144. console.log(`Subscription message release activated.`);
  145. } else {
  146. console.log(`Subscription message release is already active.`);
  147. }
  148. return messageReleaseSubscription
  149. }
  150. // Stop the incoming Messaes to be returned to caller
  151. private deactivateReleaseSubscription(messageReleaseSubscription): Subscription | null {
  152. if (messageReleaseSubscription) {
  153. messageReleaseSubscription.unsubscribe();
  154. messageReleaseSubscription = null;
  155. console.log(`Subscription message release deactivated.`);
  156. } else {
  157. console.log(`Subscription message release is already deactivated.`);
  158. }
  159. return messageReleaseSubscription
  160. }
  161. // Begin to push the incoming messages into local instantarray
  162. private activateBufferSubscription(bufferStorage: MessageLog[], messageBufferSubscription: Subscription | null, messageToBePublished: Subject<any>): Subscription | null {
  163. if (!messageBufferSubscription) {
  164. messageBufferSubscription = messageToBePublished.subscribe({
  165. next: (message: MessageLog) => {
  166. console.log(`Buffering ${message.appData.msgId}... Local array length: ${bufferStorage.length}`);
  167. bufferStorage.push(message)
  168. },
  169. error: (err) => console.error(err),
  170. complete: () => { },
  171. });
  172. console.log(`Subscription message buffer activated.`);
  173. } else {
  174. console.log(`Subscription message buffer is already active.`);
  175. }
  176. return messageBufferSubscription
  177. }
  178. // Stop pushing the incoming messages into local instantarray
  179. private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null {
  180. if (messageBufferSubscription) {
  181. messageBufferSubscription.unsubscribe();
  182. messageBufferSubscription = null;
  183. console.log(`Subscription message buffer deactivated.`);
  184. } else {
  185. console.log(`Subscription message buffer is already deactivated.`);
  186. }
  187. return null
  188. }
  189. // Change the streaming direction of the incoming messages into mongo streaming subject( to be saved in local databse )
  190. private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, messageToBePublished: Subject<any>): Subscription | null {
  191. if (!messageStreamToMongo) {
  192. messageStreamToMongo = messageToBePublished.subscribe({
  193. next: (message: MessageLog) => {
  194. console.log(`Saving ${message.appData.msgId}...`);
  195. this.saveToMongo(message)
  196. },
  197. error: (err) => console.error(err),
  198. complete: () => { },
  199. });
  200. console.log(`Subscription message streaming to Mongo activated.`);
  201. } else {
  202. console.log(`Subscription message streaming to Mongo is already active.`);
  203. }
  204. return messageStreamToMongo
  205. }
  206. // Stop or cut off the mongo streaming
  207. private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null {
  208. if (messageStreamToMongo) {
  209. messageStreamToMongo.unsubscribe();
  210. messageStreamToMongo = null;
  211. console.log(`Subscription message streaming to Mongo deactivated.`);
  212. } else {
  213. console.log(`Subscription message streaming to Mongo is already deactivated.`);
  214. }
  215. return messageStreamToMongo
  216. }
  217. // Store in json file in this project folder. To be enabled in future
  218. private async transferMessageToLocalStorage(message: Subject<MessageLog>): Promise<void> {
  219. let localArray: MessageLog[] = this.bufferedStorage
  220. let filename = `localstorage.json`;
  221. while (localArray.length > 0) {
  222. let objectToWrite = this.bufferedStorage[0];
  223. await writeMessage(objectToWrite, filename)
  224. }
  225. message.subscribe((message: MessageLog) => {
  226. writeMessage(message, filename)
  227. })
  228. if (localArray.length < 1) this.bufferedStorage = localArray
  229. console.log('Local Array is empty. Finished transferring to files.')
  230. async function writeMessage(message: MessageLog, filename: string) {
  231. try {
  232. let stringifiedMessage = JSON.stringify(message);
  233. await fs.promises.appendFile(filename, stringifiedMessage + "\r\n")
  234. console.log(`Successfully transferred ${filename}`);
  235. localArray.shift();
  236. } catch (err) {
  237. console.error(`Error trasferring ${filename}:`, err);
  238. }
  239. }
  240. }
  241. // To be used by mongoStreamSubscription to perform the saving execution
  242. private async saveToMongo(message: MessageLog): Promise<boolean> {
  243. return new Promise((resolve, reject) => {
  244. // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))
  245. this.messageModel.create(message).then(() => {
  246. console.log(`Saved MessageID ${message.appData.msgId} into ${this.mongoUrl}`);
  247. resolve(true)
  248. }).catch((err) => {
  249. console.log(`MongoSaveError: ${err.message}`)
  250. reject(err)
  251. })
  252. })
  253. }
  254. // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed
  255. private async transferBufferedMessagseToMongoStorage(bufferedMessage: MessageLog[], messageBufferSubscription): Promise<MessageLog[]> {
  256. return new Promise((resolve, reject) => {
  257. let bufferedStorage: Observable<MessageLog> = from(bufferedMessage)
  258. bufferedStorage.subscribe({
  259. next: (message: MessageLog) => {
  260. this.saveToMongo(message).then((res) => {
  261. console.log(`Message ${message.appData.msgId} saved successfully...`)
  262. }).catch((err) => console.error(err))
  263. },
  264. error: (error) => {
  265. reject(error)
  266. console.error(error)
  267. },
  268. complete: () => {
  269. this.bufferedStorage = []
  270. if (messageBufferSubscription) {
  271. console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)
  272. }
  273. resolve(this.bufferedStorage)
  274. }
  275. })
  276. })
  277. }
  278. // Transfer stored messages from the local instance back into the stream to be returned to the caller.
  279. private async releaseMessageFromLocalBuffer(bufferedStorage: MessageLog[]): Promise<Observable<MessageLog>> {
  280. return new Promise((resolve, reject) => {
  281. if (bufferedStorage.length > 1) {
  282. let caseVariable = this.bufferedStorage.length > 1;
  283. console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
  284. let returnArrayObs: Observable<MessageLog> = from(bufferedStorage)
  285. resolve(returnArrayObs)
  286. } else {
  287. let message = `There is no data in stored in local instance`
  288. reject(message)
  289. }
  290. })
  291. }
  292. // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred.
  293. private async releaseMessageFromMongoStorage(): Promise<Subject<MessageLog>> {
  294. return new Promise((resolve, reject) => {
  295. let dataSubject: Subject<MessageLog> = new Subject()
  296. this.extractAllMessages(dataSubject)
  297. resolve(dataSubject)
  298. })
  299. }
  300. // Connect to designated mongodatabase.
  301. private async connectToMongoDatabase(): Promise<any> {
  302. return new Promise((resolve, reject) => {
  303. console.log(this.mongoUrl)
  304. this.mongoConnection = mongoose.createConnection(this.mongoUrl)
  305. this.mongoConnection.on('error', (error) => {
  306. console.error('Connection error:', error);
  307. resolve('')
  308. });
  309. this.mongoConnection.once('open', () => {
  310. console.log(`Connected to ${process.env.MONGO}`);
  311. this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
  312. });
  313. })
  314. }
  315. // Manage mongoCOnnectino. The logic used would be different across differnet application. This will loop the process indefinitely os it is always trying to connect to database.
  316. private async manageMongoConnection(): Promise<boolean> {
  317. while (true) {
  318. try {
  319. await this.connectToMongoDatabase()
  320. } catch (error) {
  321. console.log(`Something Wrong occured. Please check at manageMongoConnection`)
  322. }
  323. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  324. }
  325. }
  326. public async extractAllMessages(subjectArgs: Subject<any>): Promise<void> {
  327. if (this.messageModel) {
  328. const eventStream = this.messageModel.find().lean().cursor()
  329. eventStream.on('data', (message) => {
  330. // Emit each document to the subject
  331. subjectArgs.next(message);
  332. });
  333. eventStream.on('end', async () => {
  334. // All data has been streamed, complete the subject
  335. subjectArgs.complete();
  336. // Delete the data once it has been streamed
  337. try {
  338. await this.messageModel.deleteMany({});
  339. console.log('Data in Mongo deleted successfully.');
  340. } catch (err) {
  341. console.error('Error deleting data:', err);
  342. }
  343. });
  344. } else {
  345. console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connectino properly!`)
  346. }
  347. }
  348. }
  349. type Status = -1 | 0 | 1 // For status chain effect