import * as grpc from '@grpc/grpc-js'; import { Subject, Subscription } from "rxjs"; import { ReportStatus, ColorCode, GrpcMessage, MessageLog } from "../interfaces/general.interface"; import { Status } from '@grpc/grpc-js/build/src/constants'; const message_proto = require('./protos/server.proto') export class GrpcServiceMethod { public async createServerStreamingServer( serverUrl: string, alreadyHealthCheck: boolean, messageToBeStream: Subject, statusControl: Subject, grpcServerConnection: any, incomingRequest: Subject ): Promise { // '0.0.0.0:3001' return new Promise((resolve, reject) => { try { // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md let server: grpc.Server = new grpc.Server(); // Add the streamingData function to the gRPC service // Define your message_proto.Message service methods server.addService(message_proto.Message.service, { HandleMessage: (call) => { incomingRequest.next(call.request) console.log(call.request) console.log(`Intializing main stream response. Confirmation from ${call.request.id}`) // This will be the main channel for streaming them response messages let report: ReportStatus = { //let the flow come through code: ColorCode.GREEN, message: `Client connected!!`, from: `Server Streaming Instance` } statusControl.next(report) let subscription: Subscription = messageToBeStream.subscribe({ next: (response: GrpcMessage) => { // console.log(`${response.id} vs ${request.id}`) // Check who's response it belongs to let noConnection = call.cancelled // check connection for each and every message if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check let report: ReportStatus = { code: ColorCode.YELLOW, message: `Client is not alive.....`, payload: response, from: `Server Streaming Instance` } statusControl.next(report) subscription.unsubscribe() } else { console.log(`Sending ${(response.message as MessageLog).appData.msgId} in respond to request: ${call.request.id}`) let message = { id: response.id, message: JSON.stringify(response.message) } call.write(message) } }, error: err => { console.error(err) let report: ReportStatus = { code: ColorCode.YELLOW, message: `Message streaming error`, from: `Server Streaming Instance` } statusControl.next(report) subscription.unsubscribe() }, complete: () => { console.log(`Stream response completed for ${call.request.id}`) subscription.unsubscribe() // call.end() } }) if (call.request.id != '0000') { console.log(call.request) /* Case from handling incoming request from clients. This no longer takes into consideration where the request is coming from. If the client is subscribed to the server, it will receive it's due. */ // console.log(`Client connected from: ${call.getPeer()}`); let request = call.request // unary request from client to be responded with a stream console.log(`Received unary call.... request: ${request.id}`) call.cancel() } }, Check: (_, callback) => { // health check logic here // for now it is just sending the status message over to tell the client it is alive // For simplicity, always return "SERVING" as status callback(null, { status: 'SERVING' }); }, }); // Bind and start the server server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => { console.log(`gRPC server is running on ${serverUrl}`); server.start(); }); grpcServerConnection[serverUrl] = server } catch (error) { resolve(error) } }) } // Create a server streaming call. Please note that the structure of the code would not be the same as bidirectional because of it's unary nature public async createServerStreamingClient( server: string, alreadyHealthCheck: boolean, unaryRequestSubject: Subject, statusControl: Subject, incomingResponse: Subject ): Promise { return new Promise(async (resolve, reject) => { const client = new message_proto.Message(server, grpc.credentials.createInsecure()); unaryRequestSubject.subscribe({ next: (request: any) => { let message = { id: request.id, message: JSON.stringify(request.message) } console.log(message) console.log(`Sending request: ${message.id} over to server....`) let call = client.HandleMessage(message) call.on('status', (status: Status) => { if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits console.log(`Message trasmission operation is successful`) // RPC completed successfully } if (status == grpc.status.UNAVAILABLE) { resolve('No connection established. Server is not responding..') let report = { code: ColorCode.YELLOW, message: `Server doesn't seem to be alive. Error returned.`, payload: request, from: `Server Streaming Client Instance` } statusControl.next(report) } }); call.on('data', (data: any) => { let response: GrpcMessage = { id: data.id, message: JSON.parse(data.message) } incomingResponse.next(response) console.log((response.message as MessageLog).appData.msgId) }); call.on('error', (err) => { }); call.on('end', () => { // this is for gracefull || willfull termination from the server console.log(`Terminating Stream Request. Directing response to main channel`) }); }, error: error => { console.error(error), resolve(error) }, complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application }) unaryRequestSubject.next({ id: `0000`, message: `Intiate Main Stream Channel Response` }) this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // perform check to see if server is alive, if not terminate this grpc instant and create again // initiate(statusControl, incomingResponse).then(() => { // streamRequest(unaryRequestSubject, statusControl) // }).catch(() => { // resolve('Trigger Reconnection logic. Terminate this client instance and creating new ones') // }) // async function intialize(statusControl: Subject, incomingResponse: Subject) { // async function initiate(statusControl: Subject, incomingResponse: Subject) { // let greenlight: ReportStatus = { // code: ColorCode.GREEN, // message: `Initial Client set up. Release unary Request`, // from: `Server Streaming Client Instance` // } // statusControl.next(greenlight) // let report: ReportStatus = { // code: ColorCode.YELLOW, // message: `Server doesn't seem to be alive. Error returned.`, // from: `Server Streaming Client Instance` // } // let call = client.HandleMessage({ // id: '0000', // message: `Establishing channel for response stream. Channel for response!` // }) // call.on('status', (status: Status) => { // // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html // // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming // if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits // console.log(`Message trasmission operation is successful`) // resolve('') // } if (status == grpc.status.UNAVAILABLE) { // resolve('No connection established. Server is not responding..') // statusControl.next(report) // reject() // } // }); // // This is and should be the only channel for response. THe demultiplexing will be handled by application logic // call.on('data', (data: any) => { // // console.log(`Received stream response from Server. Receiver: ${message.id}`); // let response = { // id: data.id, // message: JSON.parse(data.message) // } // // console.log(response) // incomingResponse.next(response) // }); // call.on('error', (err) => { // statusControl.next(report) // }); // call.on('end', () => { // this is for gracefull || willfull termination from the server // console.log(`Streaming Response is completed`) // statusControl.next(report) // }); // } // // } // // function streamRequest(unaryRequestSubject: Subject, statusControl: Subject) { // // Just send request, no need to listen to response. IT will be handled by the channel above. // function streamRequest(unaryRequestSubject: Subject, statusControl: Subject) { // unaryRequestSubject.subscribe({ // next: (request: any) => { // let message = { // id: request.id, // message: JSON.stringify(request) // } // console.log(`Sending request: ${message.id} over to server....`) // const call = client.HandleMessage(message) // call.on('status', (status: Status) => { // if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits // console.log(`Message trasmission operation is successful`) // // RPC completed successfully // } if (status == grpc.status.UNAVAILABLE) { // resolve('No connection established. Server is not responding..') // let report = { // code: ColorCode.YELLOW, // message: `Server doesn't seem to be alive. Error returned.`, // payload: request, // from: `Server Streaming Client Instance` // } // statusControl.next(report) // } // }); // // call.on('data', (data: any) => { // // let response = { // // data: data.id, // // message: JSON.parse(data.message) // // } // // console.log(response.message.appData.msgId) // // }); // call.on('error', (err) => { // }); // call.on('end', () => { // this is for gracefull || willfull termination from the server // console.log(`Terminating Stream Request. Directing response to main channel`) // }); // }, // error: error => { // console.error(error), // resolve(error) // }, // complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application // }) // } }) } public async createGrpcBidirectionalServer( serverUrl: string, messageToBeStream: Subject, statusControl: Subject, grpcServerConnection: any, incomingRequest: Subject ): Promise { // '0.0.0.0:3001' return new Promise((resolve, reject) => { try { // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md let server: grpc.Server = new grpc.Server(); // Add the streamingData function to the gRPC service // Define your message_proto.Message service methods server.addService(message_proto.Message.service, { sendMessageStream: (call) => { console.log(`Client connected from: ${call.getPeer()}`); let report: ReportStatus = { code: ColorCode.GREEN, message: `Client connected!!`, from: `Bidirectional Instance` } statusControl.next(report) // Right now this is being broadcast. let subscription: Subscription = messageToBeStream.subscribe({ next: (payload: any) => { let noConnection = call.cancelled // check connection for each and every message if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check let report: ReportStatus = { code: ColorCode.YELLOW, message: `Client is not alive.....`, payload: payload, from: `Bidirectional Instance` } statusControl.next(report) // no connection. Tell buffer service to stop releasing messages subscription.unsubscribe() // i still dont understand why i wrote this here } else { console.log(`Sending ${payload.appData.msgId}`) let message: string = JSON.stringify(payload) call.write({ message }) } }, error: err => console.error(err), complete: () => { } //it will never complete }) call.on('data', (data: any) => { // console.log(data) // it does return in string format let payload = JSON.parse(data.message) console.log(`Received Message from Client: ${payload.appData?.msgId}`); // Forward the received message to the RxJS subject // let respmsg: any = { // msgId: payload.appData?.msgId, // confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!` // } // let message: string = JSON.stringify(respmsg) // console.log(`Responding to client: ${respmsg.msgId}`); // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 } // call.write({ message }); }); call.on('end', () => { console.log('Client stream ended'); // but the stream never ends. THis is not a reliable way to tell if a client is disconnected }); call.on('error', (err) => { // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card // the call back function will be to write and then the client should response immediately through test }); call.on('close', () => { console.log('Unknown cause for diconnectivity'); // Handle client closure, which may be due to errors or manual termination }); }, Check: (_, callback) => { // health check logic here // for now it is just sending the status message over to tell the client it is alive // For simplicity, always return "SERVING" as status callback(null, { status: 'SERVING' }); }, }); // Bind and start the server server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => { console.log(`gRPC server is running on ${serverUrl}`); server.start(); }); grpcServerConnection[serverUrl] = server } catch (error) { resolve(error) } }) } public async createBidirectionalStreamingClient( server: string, alreadyHealthCheck: boolean, messageToBeTransmitted: Subject, statusControl: Subject, incomingResponse: Subject ): Promise { let subscription: any let unsubscribed: boolean = false return new Promise(async (resolve, reject) => { const client = new message_proto.Message(server, grpc.credentials.createInsecure()); const call = client.sendMessageStream(); this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) call.on('status', (status: Status) => { // this is useless in streaming(on for unary) // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming // if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits // resolve('No connection established. Server is not responding..') // } }); // All the grpc operations are here // Subscribe to the RxJS subject to send data to the server subscription = messageToBeTransmitted.subscribe({ next: (payload: any) => { if (!unsubscribed) { console.log(`Sending ${payload.appData.msgId}`) let message: string = JSON.stringify(payload) call.write({ message }) } }, error: err => console.error(err), complete: () => { } //it will never complete }); call.on('data', (data: any) => { let message = JSON.parse(data.message) console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`); }); call.on('error', (err) => { // console.log(`Something wrong with RPC call...`) if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks subscription.unsubscribe(); unsubscribed = true; } resolve('Server Error'); }); call.on('end', () => { if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks subscription.unsubscribe(); unsubscribed = true; } resolve('Server Error'); }); }) } // Check connection To be Update. This function is destroying my code flow public async checkConnectionHealth(client: any, statusControl: Subject, alreadyHealthCheck: boolean): Promise { return new Promise((resolve, reject) => { client.Check({}, (error, response) => { if (response) { console.log(`GRPC Health check status: ${response.status} Server Connected`); let report: ReportStatus = { code: ColorCode.GREEN, message: `Good to go!!!`, from: `GRPC health check` } statusControl.next(report) } else { if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`); } }) }) } }