fis.retransmission.service.ts 20 KB


  1. import * as _ from 'lodash'
  2. import * as fs from 'fs'
  3. import mongoose, { Model, Schema } from 'mongoose';
  4. import { Observable, Subject, Subscription, from } from 'rxjs'
  5. import { ColorCode, GrpcMessage, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
  6. require('dotenv').config();
  7. // Implement status chain refactoring
  8. export class FisRetransmissionService {
  9. private mongoUrl: string = process.env.MONGO + 'emergencyStorage'
  10. private bufferedStorage: any[] = []
  11. private mongoConnection: any
  12. private messageModel: any
  13. private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // right now just put as 15
  14. constructor() {
  15. // Connect to mongoDB.
  16. this.manageMongoConnection()
  17. }
  18. // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator
  19. public handleMessage(messageToBePublished: Subject<GrpcMessage>, statusReport: Subject<ReportStatus>): Subject<GrpcMessage> {
  20. let releaseMessageSubject: Subject<GrpcMessage> = new Subject() // A return value
  21. // Using the concept of toggling to improve the eficacy of subscription control && data flow
  22. let messageReleaseSubscription: Subscription | null = null
  23. let messageBufferSubscription: Subscription | null = null
  24. let messageStreamToMongo: Subscription | null = null
  25. this.checkBufferLimit(messageToBePublished, statusReport)
  26. statusReport.subscribe((report: ReportStatus) => {
  27. if (report.code == ColorCode.GREEN) {
  28. console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  29. /* Status Chain begins */
  30. let status: Status = 1
  31. if (status === 1) {
  32. messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo)
  33. if (messageStreamToMongo) status = 0
  34. }
  35. if (status === 1) {
  36. messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
  37. if (messageBufferSubscription) status = 0
  38. }
  39. if (status === 1) {
  40. messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, messageToBePublished, releaseMessageSubject)
  41. if (!messageReleaseSubscription) status = 0
  42. }
  43. if (status === 1) {
  44. this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<GrpcMessage>) => {
  45. resObs.subscribe({
  46. next: message => releaseMessageSubject.next(message),
  47. error: err => console.error(err),
  48. complete: () => {
  49. this.bufferedStorage = []
  50. console.log(`Reset buffer Storage count: ${this.bufferedStorage.length}. All messages have been released back into the stream.`)
  51. }
  52. })
  53. }).catch((err) => {
  54. status = 0
  55. console.error(err)
  56. })
  57. }
  58. if (status === 1) {
  59. this.releaseMessageFromMongoStorage().then((resObs: Subject<GrpcMessage>) => {
  60. resObs.subscribe({
  61. next: message => releaseMessageSubject.next(message),
  62. error: err => console.error(err),
  63. complete: () => console.log(`All Mongo data are transferred `)
  64. })
  65. }).catch((err) => {
  66. status = 0
  67. console.error(err)
  68. })
  69. }
  70. if (status === 0) {
  71. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  72. }
  73. }
  74. if (report.code == ColorCode.YELLOW) {
  75. if (report.payload) {
  76. console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)
  77. this.bufferedStorage.push(report.payload)
  78. }
  79. console.log(`Connection status report && ${report.message ?? 'No Message'}`)
  80. let status: Status = 1
  81. /* Status Chain begins */
  82. if (status === 1) {
  83. messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, messageToBePublished)
  84. if (!messageBufferSubscription) status = 0
  85. }
  86. if (status === 1) {
  87. messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
  88. if (messageReleaseSubscription) status = 0
  89. }
  90. if (status === 0) {
  91. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  92. }
  93. }
  94. if (report.code == ColorCode.RED) {
  95. console.log(`Connection status report: Server down. ${report.message} lol`)
  96. let status: Status = 1
  97. if (status === 1) {
  98. messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, messageToBePublished)
  99. if (!messageStreamToMongo) status = 0
  100. }
  101. if (status === 1) {
  102. messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
  103. if (messageBufferSubscription) status = 0
  104. }
  105. if (status === 1) {
  106. this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => {
  107. if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1 // this promise function should return an empty array
  108. })
  109. }
  110. if (status === 0) {
  111. console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
  112. }
  113. }
  114. if (!report.code) {
  115. console.log(`Unknown message...`)
  116. }
  117. })
  118. return releaseMessageSubject
  119. }
  120. private checkBufferLimit(message: Subject<GrpcMessage>, statusReport: Subject<ReportStatus>) {
  121. let status: Status = 1
  122. if (status = 1) {
  123. message.subscribe(() => {
  124. if (this.bufferedStorage.length >= this.maximumBufferLength) {
  125. // for every messges that comes in, check the bufffer size, if it exceesd more than designated amount, push a red report status i
  126. console.log(`Buffer length exceeds limit imposed!!!`)
  127. let report: ReportStatus = {
  128. code: ColorCode.RED,
  129. message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
  130. from: `Error Handling Service`
  131. }
  132. statusReport.next(report)
  133. }
  134. })
  135. }
  136. }
  137. // Release the incoming Messages to be returned to the caller
  138. private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, messageToBePublished: Subject<GrpcMessage>, releaseMessageSubject: Subject<GrpcMessage>): Subscription | null {
  139. let status: Status = 1
  140. if (status = 1) {
  141. if (!messageReleaseSubscription) {
  142. messageReleaseSubscription = messageToBePublished.subscribe({
  143. next: (message: GrpcMessage) => {
  144. console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`);
  145. releaseMessageSubject.next(message);
  146. },
  147. error: (err) => console.error(err),
  148. complete: () => { },
  149. });
  150. console.log(`Subscription message release activated.`);
  151. } else {
  152. status = 0
  153. console.log(`Subscription message release is already active.`);
  154. }
  155. }
  156. return messageReleaseSubscription
  157. }
  158. // Stop the incoming Messaes to be returned to caller
  159. private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null {
  160. let status: Status = 1
  161. if (status = 1) {
  162. if (messageReleaseSubscription) {
  163. messageReleaseSubscription.unsubscribe();
  164. messageReleaseSubscription = null;
  165. console.log(`Subscription message release deactivated.`);
  166. } else {
  167. console.log(`Subscription message release is already deactivated.`);
  168. }
  169. }
  170. return messageReleaseSubscription
  171. }
  172. // Begin to push the incoming messages into local instantarray
  173. private activateBufferSubscription(bufferStorage: GrpcMessage[], messageBufferSubscription: Subscription | null, messageToBePublished: Subject<GrpcMessage>): Subscription | null {
  174. let status: Status = 1
  175. if (status = 1) {
  176. if (!messageBufferSubscription) {
  177. messageBufferSubscription = messageToBePublished.subscribe({
  178. next: (message: any) => {
  179. console.log(`Buffering ${(message.message as MessageLog).appData.msgId}... Local array length: ${bufferStorage.length}`);
  180. bufferStorage.push(message)
  181. },
  182. error: (err) => console.error(err),
  183. complete: () => { },
  184. });
  185. console.log(`Subscription message buffer activated.`);
  186. } else {
  187. status = 0
  188. console.log(`Subscription message buffer is already active.`);
  189. }
  190. }
  191. return messageBufferSubscription
  192. }
  193. // Stop pushing the incoming messages into local instantarray
  194. private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null {
  195. let status: Status = 1
  196. if (status) {
  197. if (messageBufferSubscription) {
  198. messageBufferSubscription.unsubscribe();
  199. messageBufferSubscription = null;
  200. console.log(`Subscription message buffer deactivated.`);
  201. } else {
  202. status = 0
  203. console.log(`Subscription message buffer is already deactivated.`);
  204. }
  205. }
  206. return null
  207. }
  208. // Change the streaming direction of the incoming messages into mongo streaming subject( to be saved in local databse )
  209. private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, messageToBePublished: Subject<GrpcMessage>): Subscription | null {
  210. let status: Status = 1
  211. if (status = 1) {
  212. if (!messageStreamToMongo) {
  213. messageStreamToMongo = messageToBePublished.subscribe({
  214. next: (message: any) => {
  215. console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`);
  216. this.saveToMongo(message)
  217. },
  218. error: (err) => console.error(err),
  219. complete: () => { },
  220. });
  221. console.log(`Subscription message streaming to Mongo activated.`);
  222. } else {
  223. status = 0
  224. console.log(`Subscription message streaming to Mongo is already active.`);
  225. }
  226. }
  227. return messageStreamToMongo
  228. }
  229. // Stop or cut off the mongo streaming
  230. private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null {
  231. let status: Status = 1
  232. if (status = 1) {
  233. if (messageStreamToMongo) {
  234. messageStreamToMongo.unsubscribe();
  235. messageStreamToMongo = null;
  236. console.log(`Subscription message streaming to Mongo deactivated.`);
  237. } else {
  238. status = 0
  239. console.log(`Subscription message streaming to Mongo is already deactivated.`);
  240. }
  241. }
  242. return messageStreamToMongo
  243. }
  244. // To be used by mongoStreamSubscription to perform the saving execution
  245. private async saveToMongo(message: GrpcMessage): Promise<boolean> {
  246. return new Promise((resolve, reject) => {
  247. // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))
  248. this.messageModel.create(message).then(() => {
  249. console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);
  250. resolve(true)
  251. }).catch((err) => {
  252. console.log(`MongoSaveError: ${err.message}`)
  253. reject(err)
  254. })
  255. })
  256. }
  257. // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed
  258. private async transferBufferedMessageToMongoStorage(bufferedMessage: GrpcMessage[], messageBufferSubscription: Subscription | null): Promise<GrpcMessage[]> {
  259. return new Promise((resolve, reject) => {
  260. let status: Status = 1
  261. if (status = 1) {
  262. let bufferedStorage: Observable<GrpcMessage> = from(bufferedMessage)
  263. bufferedStorage.subscribe({
  264. next: (message: any) => {
  265. this.saveToMongo(message).then((res) => {
  266. console.log(`Message ${(message.message as MessageLog).appData.msgId} saved successfully...`)
  267. }).catch((err) => console.error(err))
  268. },
  269. error: (error) => {
  270. reject(error)
  271. console.error(error)
  272. },
  273. complete: () => {
  274. this.bufferedStorage = []
  275. if (messageBufferSubscription) {
  276. console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)
  277. }
  278. resolve(this.bufferedStorage)
  279. }
  280. })
  281. }
  282. })
  283. }
  284. // Transfer stored messages from the local instance back into the stream to be returned to the caller.
  285. private async releaseMessageFromLocalBuffer(bufferedStorage: GrpcMessage[]): Promise<Observable<GrpcMessage>> {
  286. return new Promise((resolve, reject) => {
  287. let status: Status = 1
  288. if (status = 1) {
  289. if (bufferedStorage.length > 1) {
  290. let caseVariable = this.bufferedStorage.length > 1;
  291. console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
  292. let returnArrayObs: Observable<GrpcMessage> = from(bufferedStorage)
  293. resolve(returnArrayObs)
  294. } else {
  295. let message = `There is no data in stored in local instance`
  296. reject(message)
  297. }
  298. }
  299. })
  300. }
  301. // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred.
  302. private async releaseMessageFromMongoStorage(): Promise<Subject<GrpcMessage>> {
  303. return new Promise((resolve, reject) => {
  304. let status: Status = 1
  305. if (status = 1) {
  306. let dataSubject: Subject<GrpcMessage> = new Subject()
  307. this.extractAllMessages(dataSubject)
  308. resolve(dataSubject)
  309. }
  310. })
  311. }
  312. // Connect to designated mongodatabase.
  313. private async connectToMongoDatabase(): Promise<any> {
  314. return new Promise((resolve, reject) => {
  315. let status: Status = 1
  316. if (status = 1) {
  317. console.log(this.mongoUrl)
  318. this.mongoConnection = mongoose.createConnection(this.mongoUrl)
  319. this.mongoConnection.on('error', (error) => {
  320. console.error('Connection error:', error);
  321. resolve('')
  322. });
  323. this.mongoConnection.once('open', () => {
  324. console.log(`Connected to ${process.env.MONGO}`);
  325. this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
  326. });
  327. }
  328. })
  329. }
  330. // Manage mongoCOnnectino. The logic used would be different across differnet application. This will loop the process indefinitely os it is always trying to connect to database.
  331. private async manageMongoConnection(): Promise<boolean> {
  332. while (true) {
  333. try {
  334. await this.connectToMongoDatabase()
  335. } catch (error) {
  336. console.log(`Something Wrong occured. Please check at manageMongoConnection`)
  337. }
  338. await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
  339. }
  340. }
  341. public async extractAllMessages(subjectArgs: Subject<GrpcMessage>): Promise<void> {
  342. // Need to resolve the issue of streaming in a specific order that is sequential
  343. let status: Status = 1
  344. if (status = 1) {
  345. if (this.messageModel) {
  346. const eventStream = this.messageModel.find().lean().cursor()
  347. eventStream.on('data', (message) => {
  348. // Emit each document to the subject
  349. subjectArgs.next(message);
  350. });
  351. eventStream.on('end', async () => {
  352. // All data has been streamed, complete the subject
  353. subjectArgs.complete();
  354. // Delete the data once it has been streamed
  355. try {
  356. await this.messageModel.deleteMany({});
  357. console.log('Data in Mongo deleted successfully.');
  358. } catch (err) {
  359. console.error('Error deleting data:', err);
  360. }
  361. });
  362. } else {
  363. status = 0
  364. console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connectino properly!`)
  365. }
  366. }
  367. }
  368. }
  369. // Store in json file in this project folder. To be enabled in future
  370. // private async transferMessageToLocalStorage(message: Subject<any>): Promise<void> {
  371. // let localArray: any[] = this.bufferedStorage
  372. // let filename = `localstorage.json`;
  373. // while (localArray.length > 0) {
  374. // let objectToWrite = this.bufferedStorage[0];
  375. // await writeMessage(objectToWrite, filename)
  376. // }
  377. // message.subscribe((message: any) => {
  378. // writeMessage(message, filename)
  379. // })
  380. // if (localArray.length < 1) this.bufferedStorage = localArray
  381. // console.log('Local Array is empty. Finished transferring to files.')
  382. // async function writeMessage(message: any, filename: string) {
  383. // try {
  384. // let stringifiedMessage = JSON.stringify(message);
  385. // await fs.promises.appendFile(filename, stringifiedMessage + "\r\n")
  386. // console.log(`Successfully transferred ${filename}`);
  387. // localArray.shift();
  388. // } catch (err) {
  389. // console.error(`Error trasferring ${filename}:`, err);
  390. // }
  391. // }
  392. // }