Browse Source

added common library

enzo 1 week ago
parent
commit
8435546925

+ 3 - 0
.gitmodules

@@ -0,0 +1,3 @@
+[submodule "apps/fis-fingerprint/Fis-Fingerprint"]
+	path = apps/fis-fingerprint/Fis-Fingerprint
+	url = https://git.swopt.com/enzo/Fis-Fingerprint.git

+ 2 - 1
apps/fis-fingerprint/src/fis-fingerprint.module.ts

@@ -1,10 +1,11 @@
 import { Module } from '@nestjs/common';
 import { FisFingerprintController } from './fis-fingerprint.controller';
 import { FisFingerprintService } from './fis-fingerprint.service';
+import { FingerprintService, StorageService } from '../Fis-Fingerprint/src';
 
 @Module({
   imports: [],
   controllers: [FisFingerprintController],
-  providers: [FisFingerprintService],
+  providers: [FingerprintService, StorageService, FisFingerprintService],
 })
 export class FisFingerprintModule {}

+ 8 - 0
libs/common/src/common.module.ts

@@ -0,0 +1,8 @@
+import { Module } from '@nestjs/common';
+import { CommonService } from './common.service';
+
+@Module({
+  providers: [CommonService],
+  exports: [CommonService],
+})
+export class CommonModule {}

+ 18 - 0
libs/common/src/common.service.spec.ts

@@ -0,0 +1,18 @@
+import { Test, TestingModule } from '@nestjs/testing';
+import { CommonService } from './common.service';
+
+describe('CommonService', () => {
+  let service: CommonService;
+
+  beforeEach(async () => {
+    const module: TestingModule = await Test.createTestingModule({
+      providers: [CommonService],
+    }).compile();
+
+    service = module.get<CommonService>(CommonService);
+  });
+
+  it('should be defined', () => {
+    expect(service).toBeDefined();
+  });
+});

+ 4 - 0
libs/common/src/common.service.ts

@@ -0,0 +1,4 @@
+import { Injectable } from '@nestjs/common';
+
+@Injectable()
+export class CommonService {}

+ 1 - 0
libs/common/src/filters/index.ts

@@ -0,0 +1 @@
+export * from './rpc-exception.filter'

+ 17 - 0
libs/common/src/filters/rpc-exception.filter.ts

@@ -0,0 +1,17 @@
+import { Catch, RpcExceptionFilter, ArgumentsHost } from '@nestjs/common';
+import { Observable, throwError } from 'rxjs';
+import { BaseRpcExceptionFilter, RpcException } from '@nestjs/microservices';
+
+@Catch()
+export class AllExceptionsFilter extends BaseRpcExceptionFilter {
+  catch(exception: any, host: ArgumentsHost) {
+    return super.catch(exception, host);
+  }
+}
+
+@Catch(RpcException)
+export class ExceptionFilter implements RpcExceptionFilter<RpcException> {
+  catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
+    return throwError(() => exception.getError());
+  }
+}

+ 5 - 0
libs/common/src/index.ts

@@ -0,0 +1,5 @@
+/* eslint-disable prettier/prettier */
+export * from './common.module';
+export * from './common.service';
+export * from './types';
+export * from './filters'

+ 1 - 0
libs/common/src/types/constant.ts

@@ -0,0 +1 @@
+export const MESSAGE_SERVICE = 'message';

+ 2 - 0
libs/common/src/types/index.ts

@@ -0,0 +1,2 @@
+export * from './message'
+export * from './interface'

+ 19 - 0
libs/common/src/types/interface.ts

@@ -0,0 +1,19 @@
+export interface Message {
+    id: string;
+    payload: any;
+}
+
+export interface ConnectionState {
+    uuid?: string | number;
+    status: 'BUFFER' | 'DIRECT_PUBLISH' | 'LIMIT_EXCEEDED' ,
+    reason?: string;
+    payload?: any;
+}
+
+
+export interface WrappedMessage {
+    timeReceived: Date,
+    payload: any
+    thisMessageID: string,
+    previousMessageID: string | null,
+}

+ 41 - 0
libs/common/src/types/message.ts

@@ -0,0 +1,41 @@
+/*  This Message constants and interfaces and function are mostly for GRPC microservice */
+import { GrpcMethod } from "@nestjs/microservices";
+import { Observable } from "rxjs";
+
+export const messageProtobufPackage = "message";
+
+export interface Request {
+    id: string;
+    message: string;
+}
+
+export interface Response {
+    id: string;
+    message: string;
+}
+
+export const MESSAGE_PACKAGE_NAME = "message";
+
+export interface MessageServiceClient {
+    returnResponse(request): Response
+    returnStreamResponse(request: Request): Observable<Response>;
+    bidirectionalStream(request: Observable<Request>): Observable<Response>
+}
+
+export interface GrpcMessageServiceController {
+    returnResponse(request): Response
+    returnStreamResponse(request: Request): Observable<Response>;
+    bidirectionalStream(request: Observable<Request>): Observable<Response>
+}
+
+export function GrpcMessageServiceControllerMethods() {
+    return function (constructor: Function) {
+        const grpcMethods: string[] = ["returnResponse", "returnStreamResponse", "bidirectionalStream"];
+        for (const method of grpcMethods) {
+            const descriptor: any = Reflect.getOwnPropertyDescriptor(constructor.prototype, method);
+            GrpcMethod("GrpcMessageService", method)(constructor.prototype[method], method, descriptor);
+        }
+    };
+}
+
+export const MESSAGE_SERVICE_NAME = "GrpcMessageService";

+ 134 - 0
libs/common/src/utils/buffer.ts

@@ -0,0 +1,134 @@
+import { BehaviorSubject, buffer, distinct, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
+import { v4 as uuidV4 } from 'uuid';
+import ConsoleLogger from "./log.utils";
+import { WrappedMessage } from "../types";
+import { sortMessageBasedOnDate } from "./message-ordering";
+
+export class BufferService {
+    private console: ConsoleLogger = new ConsoleLogger(`RetransmissionService`, ['retransmission'])
+    private currentMessageId!: string | null
+    private sortMessage: boolean = false
+    private bufferReleaseSignal: Subject<void> = new Subject()
+    private receiverConnectionState: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
+    private transmissionState: BehaviorSubject<TransmissionState> = new BehaviorSubject<TransmissionState>('ARRAY EMPTY')
+    private arrayToBeTransmitted: Subject<WrappedMessage[]> = new Subject()
+    private toBeWrapped: Subject<any> = new Subject()
+    private wrappedMessageToBeBuffered: Subject<WrappedMessage> = new Subject()
+    private messageToBeTransmitted: Subject<WrappedMessage> = new Subject()
+
+    // Interface
+    public implementRetransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<ConnectionState>, wantMessageOrdering?: boolean) {
+        if (wantMessageOrdering) {
+            this.sortMessage = true
+            this.console.log({ message: `Message ordering is set to ${this.sortMessage}` })
+        }
+        eventListener.pipe(distinctUntilChanged()).subscribe(event => this.receiverConnectionState.next(event))
+
+        this.startWrappingOperation()
+        this.startBufferTransmisionProcess()
+        this.linkEventListenerToBufferSignal()
+
+        payloadToBeTransmitted.subscribe((message) => {
+            this.toBeWrapped.next(message)
+        })
+    }
+
+    public returnSubjectForBufferedItems(): Observable<WrappedMessage> {
+        return this.messageToBeTransmitted.asObservable()
+    }
+
+    private startWrappingOperation() {
+        this.toBeWrapped.subscribe(message => {
+            this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, this.currentMessageId ? this.currentMessageId : null))
+        })
+
+        // wrappedMessageToBeBuffered will then be pushed to buffer
+        this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
+            this.console.log({ message: `${bufferedMessages.length > 0 ? `${bufferedMessages.length} buffered messages` : `No buffered messages at the moment`} ` })
+            // console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
+            this.arrayToBeTransmitted.next(sortMessageBasedOnDate(bufferedMessages))
+            // this.arrayToBeTransmitted.next((this.sortMessage && bufferedMessages.length > 0) ? sortMessageBasedOnDate(bufferedMessages) : bufferedMessages)
+        });
+    }
+
+    private wrapMessageWithTimeReceived(message: any, previousMessageID: string | null): WrappedMessage {
+        // check if message has already a time received property if so no need to add anymore
+        if (!message.timeReceived) {
+            let WrappedMessage: WrappedMessage = {
+                timeReceived: new Date(),
+                payload: message,
+                thisMessageID: uuidV4(),
+                previousMessageID: previousMessageID
+            }
+            // console.log(`Current`, WrappedMessage.thisMessageID, 'Previous for this message:', WrappedMessage.previousMessageID)
+            this.currentMessageId = WrappedMessage.thisMessageID as string
+            // console.log(`Updating: `, this.currentMessageId)
+            return WrappedMessage
+        } else {
+            return message as WrappedMessage
+        }
+    }
+
+    private startBufferTransmisionProcess() {
+        this.console.log({ message: `StartBufferTransmissionProcess` })
+        this.arrayToBeTransmitted.subscribe(array => {
+            if (array.length > 0) {
+                this.transmissionState.next('TRANSMITTING')
+                from(array).subscribe({
+                    next: (message: WrappedMessage) => {
+                        if (this.receiverConnectionState.getValue() == 'OFFLINE') {
+                            // buffer this message. Flush it back to buffer
+                            this.wrappedMessageToBeBuffered.next(message)
+                        }
+                        if (this.receiverConnectionState.getValue() == 'ONLINE') {
+                            this.messageToBeTransmitted.next(message)
+                        }
+                    },
+                    error: err => console.error(err),
+                    complete: () => {
+                        // update transmission state to indicate this batch is completed
+                        this.transmissionState.next('ARRAY EMPTY');
+
+                        if (this.receiverConnectionState.getValue() === 'ONLINE' && this.transmissionState.getValue() === 'ARRAY EMPTY') {
+                            setTimeout(() => {
+                                this.bufferReleaseSignal.next()
+                            }, 1000)
+                        }
+                        // Do nothing if the receiver connection is offline
+                    }
+                });
+            } else {
+                // If I don't do setTimeout, then bufferrelasesignal will be overloaded
+                if (this.receiverConnectionState.getValue() === 'ONLINE') {
+                    setTimeout(() => {
+                        this.bufferReleaseSignal.next()
+                    }, 3000)
+                }
+            }
+        }
+        )
+    }
+
+    private linkEventListenerToBufferSignal() {
+        this.receiverConnectionState.pipe(
+            distinctUntilChanged()
+        ).subscribe(clientState => {
+            this.console.log({ message: `Client is now ${clientState}. ${(clientState === 'OFFLINE') ? 'Buffering Mode Active...' : 'Releasing Buffered Messages...'}` })
+            if (clientState == 'OFFLINE') {
+                this.console.log({ message: `Current transmission state: ${this.transmissionState.getValue()}` })
+                // just keep buffering
+            }
+            if (clientState == 'ONLINE') {
+                this.console.log({ message: `Current transmission state: ${this.transmissionState.getValue()}` })
+                // get the stored messages to pump it back into the buffer to be ready to be processed immediately
+                if (this.transmissionState.getValue() == 'ARRAY EMPTY') {
+                    this.bufferReleaseSignal.next()
+                }
+
+            }
+        })
+    }
+}
+
+type ConnectionState = 'ONLINE' | 'OFFLINE'
+type TransmissionState = 'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'

+ 30 - 0
libs/common/src/utils/general.utils.ts

@@ -0,0 +1,30 @@
+import * as fs from 'fs'
+import path from 'path';
+import { isObservable, Observable, Observer, Subject } from 'rxjs';
+import ConsoleLogger from './log.utils';
+const console: ConsoleLogger = new ConsoleLogger(`GeneralUtils`, ['util'])
+
+function isRxObservable(value: any): value is Observable<any> {
+    return isObservable(value);
+}
+function isRxObserver(value: any): value is Observer<any> {
+    return (
+        value &&
+        typeof value === 'object' &&
+        typeof value.next === 'function' &&
+        typeof value.error === 'function' &&
+        typeof value.complete === 'function'
+    );
+}
+
+// Check specifically if the value is a Subject
+function isRxSubject(value: any): value is Subject<any> {
+    return isRxObservable(value) && isRxObserver(value);
+}
+
+export function checkRxType(value: any): 'Subject' | 'Observable' | 'Observer' | 'Neither' {
+    if (isRxSubject(value)) return 'Observer' // for now returns observer, because subject passes as Observable as well. Can modify at later date
+    if (isRxObservable(value)) return 'Observable'
+    if (isRxObserver(value)) return 'Observer'
+    return 'Neither';
+}

+ 158 - 0
libs/common/src/utils/log.utils.ts

@@ -0,0 +1,158 @@
+import fs from "fs";
+import "source-map-support/register";
+// const chalk = require('chalk');
+import chalk from 'chalk'
+
+// const logColors: Record<string, (text: string) => string> = {
+//     base: chalk.bgRgb(69, 64, 74),
+//     managers: chalk.bgRgb(128, 20, 217),
+//     transmission: chalk.bgRgb(0, 106, 255),
+//     adapter: chalk.bgRgb(51, 130, 68),
+//     transport: chalk.bgRgb(173, 9, 0),
+//     error: chalk.rgb(212, 32, 0),
+//     util: chalk.rgb(200, 204, 177),
+//     details: chalk.rgb(255, 255, 97),
+//     location: chalk.rgb(241, 112, 255),
+//     retransmission: chalk.bgRgb(186, 87, 0)
+// };
+
+function applyColor(rgb: [number, number, number], isBackground: boolean = false) {
+    const [r, g, b] = rgb;
+    return isBackground ? chalk.bgRgb(r, g, b) : chalk.rgb(r, g, b);
+}
+
+const logColors: Record<string, [number, number, number]> = {
+    base: [69, 64, 74],
+    managers: [128, 20, 217],
+    transmission: [0, 106, 255],
+    adapter: [51, 130, 68],
+    transport: [173, 9, 0],
+    error: [212, 32, 0],
+    util: [200, 204, 177],
+    details: [255, 255, 97],
+    retransmission: [186, 87, 0],
+};
+
+
+class ConsoleLogger {
+    private categoryPath: string[] = []
+    private settings: Record<string, any>;
+    private className!: string
+    constructor(className: string, categoryPath: string[]) {
+        this.className = className
+        let configPath = "./logSetting.json"
+        this.settings = this.loadSettings(configPath);
+        this.categoryPath = categoryPath
+    }
+
+    private loadSettings(configPath: string): Record<string, any> {
+        try {
+            const config = fs.readFileSync(configPath, "utf-8");
+            return JSON.parse(config);
+        } catch (error) {
+            console.error("Failed to load log settings:", error);
+            return {};
+        }
+    }
+
+    private isCategoryEnabled(categoryPath: string[]): boolean {
+        let currentLevel = this.settings;
+
+        for (const part of categoryPath) {
+            if (currentLevel[part] === undefined) {
+                return false; // Category or subcategory does not exist
+            }
+            if (typeof currentLevel[part] === "boolean") {
+                return currentLevel[part];
+            }
+            currentLevel = currentLevel[part];
+        }
+
+        return false;
+    }
+
+    log(message: { message: string, details?: any }): void {
+        if (!this.isCategoryEnabled(this.categoryPath)) {
+            return; // Skip logging if the category is disabled
+        }
+
+        const category = this.categoryPath.join(" -> ").toUpperCase();
+        const location = this.getLogLocation();
+
+        const primaryCategory = this.categoryPath[0];
+        const rgb = logColors[primaryCategory] || [255, 255, 255]; // Default to white
+        const categoryStyle = applyColor(rgb, true); // Use bgRgb for category
+        const locationStyle = applyColor(rgb); // Use rgb for location
+
+        const formattedCategory = categoryStyle(`[${category}]`);
+        const formattedClassName = categoryStyle(`${this.className}`);
+        const formattedLocation = locationStyle(` ${location}`);
+
+        const formattedMessage = `${formattedClassName}${formattedLocation}: ${message.message}`;
+        console.log(formattedMessage, message.details ? applyColor([255, 255, 97])(message.details) : '');
+
+        if (message.details && this.isCategoryEnabled(["details"])) {
+            console.log(applyColor([255, 255, 97])('Details: '), message.details);
+        }
+    }
+
+
+
+    error(message: { message: string, details?: any }): void {
+        if (!this.isCategoryEnabled(this.categoryPath)) {
+            return; // Skip logging if the category is disabled
+        }
+
+        const category = this.categoryPath.join(" -> ").toUpperCase();
+        const location = this.getLogLocation();
+
+        const primaryCategory = this.categoryPath[0];
+        const rgb = logColors[primaryCategory] || [255, 255, 255]; // Default to white
+        const categoryStyle = applyColor(rgb, true); // Use bgRgb for category
+        const locationStyle = applyColor(rgb); // Use rgb for location
+        const messageStyle = applyColor([224, 0, 0])
+        const formattedCategory = categoryStyle(`[${category}]`);
+        const formattedClassName = categoryStyle(`${this.className}`);
+        const formattedLocation = locationStyle(`${location}`);
+        const formattedErrorMessage = messageStyle(`${message.message}`)
+
+        const formattedMessage = `${formattedClassName} ${formattedLocation}: ${formattedErrorMessage}`;
+        console.log(formattedMessage, message.details ? applyColor([224, 0, 0])(message.details) : '');
+
+        if (message.details && this.isCategoryEnabled(["details"])) {
+            console.log(applyColor([224, 0, 0])('Details: '), message.details);
+        }
+    }
+
+
+
+    reloadSettings(configPath: string): void {
+        this.settings = this.loadSettings(configPath);
+    }
+
+    private getLogLocation(): string {
+        if (!this.isCategoryEnabled(["location"])) {
+            return ""; // Don't display location if the category is disabled
+        }
+
+        const error = new Error();
+        // Captures the current stack trace
+        Error.captureStackTrace(error, this.getLogLocation);
+
+        const stack = error.stack?.split("\n") || [];
+        const callerLine = stack[2]; // Adjust index to get the correct caller line (this may vary based on environment)
+
+        // Extract only line and column numbers using regex
+        const match = callerLine?.match(/:(\d+):(\d+)\)/);
+        if (match) {
+            const [, line, column] = match;
+            // return `line ${line}, column ${column}`;
+            return `at line ${line}`;
+        }
+
+        return "at unknown location";
+    }
+}
+
+export default ConsoleLogger;
+

+ 29 - 0
libs/common/src/utils/message-ordering.ts

@@ -0,0 +1,29 @@
+import { Subject, takeWhile } from "rxjs";
+import { WrappedMessage } from "../types/interface";
+
+export function sortMessageBasedOnDate(array: WrappedMessage[]): WrappedMessage[] {
+    // console.log(`Sorting ${array.length} messages....`)
+    return array.sort((a, b) => {
+        return new Date(a.timeReceived).getTime() - new Date(b.timeReceived).getTime();
+    });
+}
+
+// SO concept will be that if the message behind it is received, then 
+export async function checkMessage(message: WrappedMessage, messageChecking: Subject<WrappedMessage>): Promise<any> {
+    return new Promise((resolve, reject) => {
+        if (message.previousMessageID) {
+            messageChecking.pipe(
+                takeWhile(item => message.previousMessageID === item.thisMessageID)
+            ).subscribe({
+                complete: () => {
+                    resolve('previousMessageID matched')
+                }
+            })
+        } else {
+            console.log('No previous messageID. This should be the first message')
+            resolve('No previous message ID. Please Proceed.')
+        }
+    })
+}
+
+

+ 9 - 0
libs/common/tsconfig.lib.json

@@ -0,0 +1,9 @@
+{
+  "extends": "../../tsconfig.json",
+  "compilerOptions": {
+    "declaration": true,
+    "outDir": "../../dist/libs/common"
+  },
+  "include": ["src/**/*"],
+  "exclude": ["node_modules", "dist", "test", "**/*spec.ts"]
+}

+ 19 - 0
proto/message.proto

@@ -0,0 +1,19 @@
+syntax = "proto3";
+
+package message;
+
+service GrpcMessageService {
+  rpc returnResponse (Request) returns (Response) {}
+  rpc returnStreamResponse (Request) returns (stream Response) {}
+  rpc bidirectionalStream(stream Request) returns (stream Response) {}
+}
+
+message Request {
+    string id = 1;
+    string message = 2;
+}
+
+message Response {
+  string id = 1;
+  string message = 2;
+}