import * as grpc from '@grpc/grpc-js'; import { Subject, Subscription } from "rxjs"; import { ReportStatus, ColorCode, Message, MessageLog } from "../interfaces/general.interface"; import { Status } from '@grpc/grpc-js/build/src/constants'; import { error } from 'console'; const message_proto = require('./protos/server.proto') export class GrpcServiceMethod { // Create Server Instance to stream all application Outgoing messages public async createServerStreamingServer( serverUrl: string, grpcServerConnection: any, messageToBeStream: Subject ): Promise { // '0.0.0.0:3001' return new Promise((resolve, reject) => { try { // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md let server: grpc.Server = new grpc.Server(); // Add the streamingData function to the gRPC service // Define your message_proto.Message service methods server.addService(message_proto.Message.service, { HandleMessage: (call) => { // console.log(call.request) console.log(`Intializing stream. Opening Channel. Confirmation from ${call.getPeer()}`) let subscription: Subscription = messageToBeStream.subscribe({ next: (response: Message) => { console.log(`Sending ${(response.message as MessageLog).appData.msgId}`) let message = { id: response.id, message: JSON.stringify(response.message) } call.write(message) }, error: err => { console.error(err) subscription.unsubscribe() resolve('') }, complete: () => { console.log(`Stream response completed for ${call.request.id}`) subscription.unsubscribe() resolve('') // call.end() } }) }, Check: (_, callback) => { // health check logic here // for now it is just sending the status message over to tell the client it is alive // For simplicity, always return "SERVING" as status callback(null, { status: 'SERVING' }); }, }); // Bind and start the server server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => { console.log(`gRPC server is running on ${serverUrl}`); server.start(); }); grpcServerConnection[serverUrl] = server } catch (error) { resolve(error) } }) } // Send a request over to the other server to open a channel for this server to emit/stream messages over public async createServerStreamingClient( server: string, alreadyHealthCheck: boolean, statusControl: Subject, incomingMessage: Subject ): Promise { return new Promise(async (resolve, reject) => { const client = new message_proto.Message(server, grpc.credentials.createInsecure()); // perform check to see if server is alive, if not terminate this grpc instant and create again this.checkConnectionHealth(client, statusControl, alreadyHealthCheck).catch((error) => { resolve('') }) // this is where the request sending logic occurs let call = client.HandleMessage({ id: `0000`, message: `Intiate Main Stream Channel Response` }) console.log(`Sending request to open response channel...`) call.on('status', (status: Status) => { if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits console.log(`Message trasmission operation is successful`) // RPC completed successfully } if (status == grpc.status.UNAVAILABLE) { let report = { code: ColorCode.YELLOW, message: `Server doesn't seem to be alive. Error returned.`, from: `Server Streaming Client Instance` } statusControl.next(report) resolve('No connection established. Server is not responding..') } }); call.on('data', (data: any) => { // standard procedure. convert back the data and pass to the application to be processed let response: Message = { id: data.id, message: JSON.parse(data.message) } incomingMessage.next(response) console.log((response.message as MessageLog).appData.msgId) }); call.on('error', (err) => { resolve('') }); // call.on('end', () => { // this is for gracefull || willfull termination from the server // console.log(`Terminating Stream Request. Directing response to main channel`) // resolve('') // }); }) } /* ----------------All the functions below are for Bi-directional streaming. Subject to be deleted if decided not in use---------------- */ public async createGrpcBidirectionalServer( serverUrl: string, messageToBeStream: Subject, statusControl: Subject, grpcServerConnection: any, ): Promise { // '0.0.0.0:3001' return new Promise((resolve, reject) => { try { // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md let server: grpc.Server = new grpc.Server(); // Add the streamingData function to the gRPC service // Define your message_proto.Message service methods server.addService(message_proto.Message.service, { sendMessageStream: (call) => { console.log(`Client connected from: ${call.getPeer()}`); let report: ReportStatus = { code: ColorCode.GREEN, message: `Client connected!!`, from: `Bidirectional Instance` } statusControl.next(report) // Right now this is being broadcast. let subscription: Subscription = messageToBeStream.subscribe({ next: (payload: any) => { let noConnection = call.cancelled // check connection for each and every message if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check let report: ReportStatus = { code: ColorCode.YELLOW, message: `Client is not alive.....`, payload: payload, from: `Bidirectional Instance` } statusControl.next(report) // no connection. Tell buffer service to stop releasing messages subscription.unsubscribe() // i still dont understand why i wrote this here } else { console.log(`Sending ${payload.appData.msgId}`) let message: string = JSON.stringify(payload) call.write({ message }) } }, error: err => console.error(err), complete: () => { } //it will never complete }) call.on('data', (data: any) => { // console.log(data) // it does return in string format let payload = JSON.parse(data.message) console.log(`Received Message from Client: ${payload.appData?.msgId}`); // Forward the received message to the RxJS subject // let respmsg: any = { // msgId: payload.appData?.msgId, // confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!` // } // let message: string = JSON.stringify(respmsg) // console.log(`Responding to client: ${respmsg.msgId}`); // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 } // call.write({ message }); }); call.on('end', () => { console.log('Client stream ended'); // but the stream never ends. THis is not a reliable way to tell if a client is disconnected }); call.on('error', (err) => { // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card // the call back function will be to write and then the client should response immediately through test }); call.on('close', () => { console.log('Unknown cause for diconnectivity'); // Handle client closure, which may be due to errors or manual termination }); }, Check: (_, callback) => { // health check logic here // for now it is just sending the status message over to tell the client it is alive // For simplicity, always return "SERVING" as status callback(null, { status: 'SERVING' }); }, }); // Bind and start the server server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => { console.log(`gRPC server is running on ${serverUrl}`); server.start(); }); grpcServerConnection[serverUrl] = server } catch (error) { resolve(error) } }) } public async createBidirectionalStreamingClient( server: string, alreadyHealthCheck: boolean, messageToBeTransmitted: Subject, statusControl: Subject, incomingResponse: Subject ): Promise { let subscription: any let unsubscribed: boolean = false return new Promise(async (resolve, reject) => { const client = new message_proto.Message(server, grpc.credentials.createInsecure()); const call = client.sendMessageStream(); this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) call.on('status', (status: Status) => { // this is useless in streaming(on for unary) // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming // if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits // resolve('No connection established. Server is not responding..') // } }); // All the grpc operations are here // Subscribe to the RxJS subject to send data to the server subscription = messageToBeTransmitted.subscribe({ next: (payload: any) => { if (!unsubscribed) { console.log(`Sending ${payload.appData.msgId}`) let message: string = JSON.stringify(payload) call.write({ message }) } }, error: err => console.error(err), complete: () => { } //it will never complete }); call.on('data', (data: any) => { let message = JSON.parse(data.message) console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`); }); call.on('error', (err) => { // console.log(`Something wrong with RPC call...`) if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks subscription.unsubscribe(); unsubscribed = true; } resolve('Server Error'); }); call.on('end', () => { if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks subscription.unsubscribe(); unsubscribed = true; } resolve('Server Error'); }); }) } // Check connection To be Update. This function is destroying my code flow public async checkConnectionHealth(client: any, statusControl: Subject, alreadyHealthCheck: boolean): Promise { return new Promise((resolve, reject) => { client.Check({}, (error, response) => { if (response) { console.log(`GRPC Health check status: ${response.status} Server Connected`); let report: ReportStatus = { code: ColorCode.GREEN, message: `Good to go!!!`, from: `GRPC health check` } statusControl.next(report) } else { if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`); resolve(false) } }) }) } }