123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 |
- import * as grpc from '@grpc/grpc-js';
- import { Subject, Subscription, take, takeUntil } from 'rxjs';
- import { ColorCode, GrpcConnectionType, MessageLog, ReportStatus } from '../interfaces/general.interface';
- import { Status } from '@grpc/grpc-js/build/src/constants';
- const message_proto = require('./protos/server.proto')
- export class GrpcService {
- private grpcServerConnection: any = {}
- constructor() { }
- public async stopServer(serverUrl: string): Promise<any> {
- return new Promise((resolve, reject) => {
- if (this.grpcServerConnection[serverUrl]) {
- console.log(`Shutting down the gRPC server:${serverUrl} ...`);
- // this.grpcServerConnection[serverUrl].tryShutdown(() => {
- // console.log(`Server ${serverUrl} has been gracefully stopped.`);
- // resolve('')
- // })
- resolve(this.grpcServerConnection[serverUrl].forceShutdown())
- console.log(`Server ${serverUrl} is forced to shut down!`)
- // simply removing the reference to the GrpcService instance associated with the specific serverUrl from the grpcServerConnection object.
- // However, the gRPC server instance itself continues to run as long as it has not been explicitly shut down using methods like tryShutdown.
- console.log(`Deleting grpc connection instance:${serverUrl} .....`)
- delete this.grpcServerConnection[serverUrl];
- } else {
- console.log(`Server${serverUrl} is not running.`);
- reject()
- }
- })
- }
- public getAllGrpcServerConnectionInstance(): any {
- console.log(this.grpcServerConnection)
- return this.grpcServerConnection
- }
- // To be migrated into a service in the immediate future
- public async createGrpcInstance(serverUrl: string, messageToBePublished: Subject<MessageLog>, reportStatus: Subject<ReportStatus>, connectionType: GrpcConnectionType) {
- let messageToBeTransmitted: Subject<MessageLog> = messageToBePublished
- let statusControl: Subject<ReportStatus> = reportStatus
- let consecutiveResolutions = 0;
- let lastResolutionTime = Date.now();
- let alreadyHealthCheck: boolean = false
- let yellowErrorEmission: boolean = false
- let redErrorEmission: boolean = false
- while (true) {
- try {
- if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
- await this.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl);
- }
- if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') {
- await this.createServerStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl);
- }
- if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
- await this.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl)
- }
- if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') {
- await this.createServerStreamingServer(serverUrl, alreadyHealthCheck, messageToBePublished, statusControl)
- }
- // If connection resolves (indicating failure), increment the count
- consecutiveResolutions++;
- // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
- alreadyHealthCheck = true
- // If there are x consecutive resolutions, log an error and break the loop
- if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) {
- redErrorEmission = true
- console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
- let error: ReportStatus = {
- code: ColorCode.RED,
- message: 'Initiate Doomsday protocol....'
- }
- statusControl.next(error)
- }
- if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
- yellowErrorEmission = true
- let error: ReportStatus = {
- code: ColorCode.YELLOW,
- // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
- message: `Attempting reconnection... Server has yet to respond`
- }
- statusControl.next(error);
- }
- } catch (error) {
- // Connection did not resolve, reset the count
- consecutiveResolutions = 0;
- console.error('Connection attempt failed:', error);
- }
- // Check for a pause of more than 3 seconds since the last resolution attempt
- const currentTime = Date.now();
- const timeSinceLastResolution = currentTime - lastResolutionTime;
- if (timeSinceLastResolution > 2000) {
- consecutiveResolutions = 0;
- yellowErrorEmission = false
- redErrorEmission = false
- alreadyHealthCheck = false
- }
- // Update the last resolution time
- lastResolutionTime = currentTime;
- await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
- // timeout generate message to trigger this reconnection
- }
- }
- private async createGrpcBidirectionalServer(serverUrl: string, messageToBeStream: Subject<any>, statusControl: Subject<ReportStatus>): Promise<any> { // '0.0.0.0:3001'
- return new Promise((resolve, reject) => {
- try {
- // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
- let server: grpc.Server = new grpc.Server();
- // Add the streamingData function to the gRPC service
- // Define your message_proto.Message service methods
- server.addService(message_proto.Message.service, {
- sendMessageStream: (call) => {
- console.log(`Client connected from: ${call.getPeer()}`);
- let report: ReportStatus = {
- code: ColorCode.GREEN,
- message: `Client connected!!`
- }
- statusControl.next(report)
- // Right now this is being broadcast.
- let subscription: Subscription = messageToBeStream.subscribe({
- next: (payload: any) => {
- let noConnection = call.cancelled // check connection for each and every message
- if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
- let report: ReportStatus = {
- code: ColorCode.YELLOW,
- message: `Client is not alive.....`,
- payload: payload
- }
- statusControl.next(report) // no connection. Tell buffer service to stop releasing messages
- subscription.unsubscribe() // i still dont understand why i wrote this here
- } else {
- console.log(`Sending ${payload.appData.msgId}`)
- let message: string = JSON.stringify(payload)
- call.write({ message })
- }
- },
- error: err => console.error(err),
- complete: () => { } //it will never complete
- })
- call.on('data', (data: any) => {
- // console.log(data) // it does return in string format
- let payload = JSON.parse(data.message)
- console.log(`Received Message from Client: ${payload.appData?.msgId}`);
- // Forward the received message to the RxJS subject
- // let respmsg: any = {
- // msgId: payload.appData?.msgId,
- // confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
- // }
- // let message: string = JSON.stringify(respmsg)
- // console.log(`Responding to client: ${respmsg.msgId}`);
- // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
- // call.write({ message });
- });
- call.on('end', () => {
- console.log('Client stream ended');
- // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
- });
- call.on('error', (err) => {
- // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
- // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
- // the call back function will be to write and then the client should response immediately through test
- });
- call.on('close', () => {
- console.log('Unknown cause for diconnectivity');
- // Handle client closure, which may be due to errors or manual termination
- });
- },
- Check: (_, callback) => {
- // health check logic here
- // for now it is just sending the status message over to tell the client it is alive
- // For simplicity, always return "SERVING" as status
- callback(null, { status: 'SERVING' });
- },
- });
- // Bind and start the server
- server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
- console.log(`gRPC server is running on ${serverUrl}`);
- server.start();
- });
- this.grpcServerConnection[serverUrl] = server
- }
- catch (error) {
- resolve(error)
- }
- })
- }
- private async createBidirectionalStreamingClient(server: string, alreadyHealthCheck: boolean, messageToBeTransmitted: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
- let subscription: any
- let unsubscribed: boolean = false
- return new Promise(async (resolve, reject) => {
- const client = new message_proto.Message(server, grpc.credentials.createInsecure());
- const call = client.sendMessageStream();
- this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
- call.on('status', (status: Status) => {
- // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
- // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
- if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
- resolve('No connection established. Server is not responding..')
- }
- });
- // All the grpc operations are here
- // Subscribe to the RxJS subject to send data to the server
- subscription = messageToBeTransmitted.subscribe({
- next: (payload: any) => {
- if (!unsubscribed) {
- console.log(`Sending ${payload.appData.msgId}`)
- let message: string = JSON.stringify(payload)
- call.write({ message })
- }
- },
- error: err => console.error(err),
- complete: () => { } //it will never complete
- });
- call.on('data', (data: any) => {
- let message = JSON.parse(data.message)
- console.log(`Received acknowledgement from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
- });
- call.on('error', (err) => {
- console.log(`Something wrong with RPC call...`)
- if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
- subscription.unsubscribe();
- unsubscribed = true;
- }
- resolve('Server Error');
- });
- call.on('end', () => {
- if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
- subscription.unsubscribe();
- unsubscribed = true;
- }
- resolve('Server Error');
- });
- })
- }
- private async createServerStreamingServer(serverUrl: string, alreadyHealthCheck: boolean, messageToBeStream: Subject<any>, statusControl: Subject<ReportStatus>): Promise<any> { // '0.0.0.0:3001'
- return new Promise((resolve, reject) => {
- try {
- // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
- let server: grpc.Server = new grpc.Server();
- let onHold: any
- // Add the streamingData function to the gRPC service
- // Define your message_proto.Message service methods
- server.addService(message_proto.Message.service, {
- HandleMessage: (call) => { // this is for bidirectional streaming. Need to have another one for unary calls for web clients
- console.log(`Client connected from: ${call.getPeer()}`);
- // let request = call.request // just putting it here to verify unary call request
- let report: ReportStatus = {
- code: ColorCode.GREEN,
- message: `Client connected!!`
- }
- statusControl.next(report)
- let subscription: Subscription = messageToBeStream.subscribe({
- next: (payload: any) => {
- let noConnection = call.cancelled // check connection for each and every message
- if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check
- let report: ReportStatus = {
- code: ColorCode.YELLOW,
- message: `Client is not alive.....`,
- payload: payload
- }
- statusControl.next(report)
- subscription.unsubscribe()
- } else {
- console.log(`Sending ${payload.appData.msgId}`)
- let message: string = JSON.stringify(payload)
- call.write({ message })
- // onHold = null
- }
- },
- error: err => {
- console.error(err)
- let report: ReportStatus = {
- code: ColorCode.YELLOW,
- message: `Message streaming error`
- }
- statusControl.next(report)
- },
- complete: () => console.log(``) //it will never complete
- })
- call.on('data', (data: any) => {
- // console.log(data) // it does return in string format
- let payload = JSON.parse(data.message)
- console.log(data)
- // console.log(`Received Message from Client: ${payload.appData?.msgId}`);
- // Forward the received message to the RxJS subject
- let respmsg: any = {
- msgId: payload.appData?.msgId,
- confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
- }
- let message: string = JSON.stringify(respmsg)
- console.log(`Responding to client: ${respmsg.msgId}`);
- // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
- call.write({ message });
- });
- call.on('end', () => {
- console.log('Client stream ended');
- // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
- });
- call.on('error', (err) => {
- // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
- // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
- // the call back function will be to write and then the client should response immediately through test
- });
- call.on('close', () => {
- console.log('Unknown cause for diconnectivity');
- // Handle client closure, which may be due to errors or manual termination
- });
- },
- Check: (_, callback) => {
- // health check logic here
- // for now it is just sending the status message over to tell the client it is alive
- // For simplicity, always return "SERVING" as status
- callback(null, { status: 'SERVING' });
- },
- });
- // Bind and start the server
- server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
- console.log(`gRPC server is running on ${serverUrl}`);
- server.start();
- });
- this.grpcServerConnection[serverUrl] = server
- }
- catch (error) {
- resolve(error)
- }
- })
- }
- // Create a server streaming call. Please note that the structure of the code would not be the same as bidirectional because of it's unary nature
- private async createServerStreamingClient(server: string, alreadyHealthCheck: boolean, unaryRequestSubject: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
- return new Promise(async (resolve, reject) => {
- const client = new message_proto.Message(server, grpc.credentials.createInsecure());
- this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // atcually there's no need for this
- unaryRequestSubject.subscribe({
- next: (request: any) => {
- let message = {
- id: '123',
- message: JSON.stringify(request)
- }
- console.log(`Sending request: ${message.id} over to server....`)
- const call = client.HandleMessage(message)
- call.on('status', (status: Status) => {
- // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
- // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
- if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
- console.log(`Message trasmission operation is successful`)
- // RPC completed successfully
- } if (status == grpc.status.UNAVAILABLE) {
- resolve('No connection established. Server is not responding..')
- let report = {
- code: ColorCode.YELLOW,
- message: `Server doesn't seem to be alive. Error returned.`
- }
- statusControl.next(report)
- }
- });
- call.on('data', (data: any) => {
- let message = JSON.parse(data.message)
- console.log(`Received data from Server: ${message.appData?.msgId ?? `Invalid`}`);
- });
- call.on('error', (err) => {
- let report = {
- code: ColorCode.YELLOW,
- message: `Server doesn't seem to be alive. Error returned.`
- }
- statusControl.next(report)
- // resolve(err)
- });
- call.on('end', () => { // this is for gracefull || willfull termination from the server
- let report = {
- code: ColorCode.YELLOW,
- message: `Server doesn't seem to be alive. Error returned.`
- }
- statusControl.next(report)
- // subscription.unsubscribe(); // this is not correct i am just destroying the entire operation. i should be terminating the instance to which i think it does by it self
- // resolve('Server Error');
- });
- /* Avoid rsolving at the moment. Because initially it was intended for the bi directional streaming to continue to instantiate the client
- should there be any rpc errors or internet connection errors. In this case, we just want to listen to incoming unary call without terminating the session
- A separate resolve will be prepared for the subject should it fails in its operation */
- },
- error: error => {
- console.error(error),
- resolve(error)
- },
- complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
- })
- })
- }
- // Check connection To be Update. This function is destroying my code flow
- private async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
- return new Promise((resolve, reject) => {
- client.Check({}, (error, response) => {
- if (response) {
- console.log(`GRPC Health check status: ${response.status} Server Connected`);
- let report: ReportStatus = {
- code: ColorCode.GREEN,
- message: `Good to go!!!`
- }
- statusControl.next(report)
- } else {
- if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
- }
- })
- })
- }
- }
|