|
@@ -0,0 +1,306 @@
|
|
|
+import * as grpc from '@grpc/grpc-js';
|
|
|
+import { Observable, Subject, Subscription } from "rxjs";
|
|
|
+import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, MessageLog, ConnectionStatus, StreamAttribute, State } from "../interfaces/general.interface";
|
|
|
+import { Status } from '@grpc/grpc-js/build/src/constants';
|
|
|
+import { message_proto } from './protos/server.proto'
|
|
|
+import * as _ from 'lodash'
|
|
|
+export class GrpcServiceMethod {
|
|
|
+ private connectionAttributes: ConnectionAttribute[] = []
|
|
|
+ private updateConnectionStatusFlag: boolean = false
|
|
|
+ private server: grpc.Server | any
|
|
|
+ private messageToBeSendOver: Message | any
|
|
|
+ private clientRequest: Subject<ConnectionAttribute> = new Subject()
|
|
|
+ private localServerStatus: Subject<any> = new Subject()
|
|
|
+
|
|
|
+ public async create(connectionAttribute: ConnectionAttribute, connectionAttributes: ConnectionAttribute[]): Promise<any> {
|
|
|
+ this.connectionAttributes = connectionAttributes
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute)
|
|
|
+ this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute)
|
|
|
+ resolve('Just putting it here for now....')
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
|
|
|
+ // Reconnection Logic here
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ let recreatePromise = new Promise((resolve) => {
|
|
|
+ if (grpcType.instanceType == 'server' && !this.server) {
|
|
|
+ this.createServerStreamingServer(connectionAttribute).then(() => {
|
|
|
+ resolve('recreate')
|
|
|
+ })
|
|
|
+ }
|
|
|
+ if (grpcType.instanceType == 'client') {
|
|
|
+ this.createServerStreamingClient(connectionAttribute).then(() => {
|
|
|
+ resolve('recreate')
|
|
|
+ })
|
|
|
+ }
|
|
|
+ })
|
|
|
+ await recreatePromise
|
|
|
+ } catch (error) {
|
|
|
+ console.error('Connection attempt failed:', error);
|
|
|
+ }
|
|
|
+ await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
|
|
|
+ // timeout generate message to trigger this reconnection
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create Server Instance to stream all application Outgoing messages
|
|
|
+ public async createServerStreamingServer(connectionAttribute: ConnectionAttribute): Promise<any> { // '0.0.0.0:3001'
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ try {
|
|
|
+ let statusChain: State = 1
|
|
|
+ if (statusChain == 1) {
|
|
|
+ // Bind and start the server
|
|
|
+ this.server = new grpc.Server()
|
|
|
+ this.server.bindAsync(connectionAttribute.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => {
|
|
|
+ console.log(`gRPC server is running on ${connectionAttribute.outGoing.serverUrl}`);
|
|
|
+ this.server.start();
|
|
|
+ });
|
|
|
+ this.localServerStatus.next({
|
|
|
+ connectionStatus: 'ON',
|
|
|
+ connectionID: connectionAttribute.ConnectionID.local,
|
|
|
+ message: `${connectionAttribute.outGoing.serverUrl} started.`
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ if (statusChain == 1 && !this.server) {
|
|
|
+ this.server.addService(message_proto.Message.service, {
|
|
|
+ HandleMessage: (call) => {
|
|
|
+ /// add a checking for standard message request
|
|
|
+ let clientInfo: ConnectionAttribute = JSON.parse(call.request.message)
|
|
|
+ if (this.isConnectionAttribute(clientInfo)) {
|
|
|
+ // this.clientRequest.next(clientInfo)
|
|
|
+ console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
|
|
|
+ let result: ConnectionAttribute | undefined = this.connectionAttributes.find(connectionAttribute => connectionAttribute.ConnectionID.local = clientInfo.ConnectionID.remote)
|
|
|
+ if (!result) {
|
|
|
+ console.log(`No connectionID match. This should be new connection...`)
|
|
|
+ let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished!.subscribe({
|
|
|
+ next: (response: Message) => {
|
|
|
+ console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
|
|
|
+ let message = {
|
|
|
+ id: response.id,
|
|
|
+ message: JSON.stringify(response.message)
|
|
|
+ }
|
|
|
+ call.write(message)
|
|
|
+ },
|
|
|
+ error: err => {
|
|
|
+ console.error(err)
|
|
|
+ subscription.unsubscribe()
|
|
|
+ resolve('')
|
|
|
+ },
|
|
|
+ complete: () => {
|
|
|
+ console.log(`Stream response completed for ${call.request.id}`)
|
|
|
+ subscription.unsubscribe()
|
|
|
+ resolve('')
|
|
|
+ }
|
|
|
+ })
|
|
|
+ let report: ConnectionState = {
|
|
|
+ status: 'DIRECT_PUBLISH'
|
|
|
+ }
|
|
|
+ connectionAttribute.connectionStatus!.next(report)
|
|
|
+ }
|
|
|
+ if (result && result.outGoing.connectionState == `OFF`) {
|
|
|
+ // reassign previously matched buffer
|
|
|
+ console.log(`Connection info found in array matched. Assigning buffer....`)
|
|
|
+ let subscription: Subscription = result.outGoing.MessageToBePublished!.subscribe({
|
|
|
+ next: (response: Message) => {
|
|
|
+ console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
|
|
|
+ let message = {
|
|
|
+ id: response.id,
|
|
|
+ message: JSON.stringify(response.message)
|
|
|
+ }
|
|
|
+ call.write(message)
|
|
|
+ },
|
|
|
+ error: err => {
|
|
|
+ console.error(err)
|
|
|
+ subscription.unsubscribe()
|
|
|
+ resolve('')
|
|
|
+ },
|
|
|
+ complete: () => {
|
|
|
+ console.log(`Stream response completed for ${call.request.id}`)
|
|
|
+ subscription.unsubscribe()
|
|
|
+ resolve('')
|
|
|
+ }
|
|
|
+ })
|
|
|
+ let report: ConnectionState = {
|
|
|
+ status: 'DIRECT_PUBLISH'
|
|
|
+ }
|
|
|
+ result.connectionStatus!.next(report)
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+ console.error(`INVALID REQUEST! Client request does not fulfill criteria`)
|
|
|
+ }
|
|
|
+ },
|
|
|
+ Check: (_, callback) => {
|
|
|
+ // for now it is just sending the status message over to tell the client it is alive
|
|
|
+ // For simplicity, always return "SERVING" as status
|
|
|
+ callback(null, { status: 'SERVING' });
|
|
|
+ },
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (error) {
|
|
|
+ resolve(error)
|
|
|
+ }
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ // Send a request over to the other server to open a channel for this server to emit/stream messages over
|
|
|
+ public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise<string> {
|
|
|
+ return new Promise(async (resolve, reject) => {
|
|
|
+ const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure());
|
|
|
+ let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error
|
|
|
+ ConnectionID: connectionAttribute.ConnectionID,
|
|
|
+ outGoing: {
|
|
|
+ StreamID: connectionAttribute.outGoing.StreamID,
|
|
|
+ PublisherID: connectionAttribute.outGoing.PublisherID,
|
|
|
+ SubscriberID: connectionAttribute.outGoing.SubscriberID,
|
|
|
+ serverUrl: connectionAttribute.outGoing.serverUrl,
|
|
|
+ MessageToBePublished: null,
|
|
|
+ MessageToBeReceived: null
|
|
|
+ },
|
|
|
+ inComing: {
|
|
|
+ StreamID: connectionAttribute.inComing.StreamID,
|
|
|
+ PublisherID: connectionAttribute.inComing.PublisherID,
|
|
|
+ SubscriberID: connectionAttribute.inComing.SubscriberID,
|
|
|
+ serverUrl: connectionAttribute.inComing.serverUrl,
|
|
|
+ MessageToBePublished: null,
|
|
|
+ MessageToBeReceived: null
|
|
|
+ },
|
|
|
+ connectionStatus: null
|
|
|
+ }
|
|
|
+ let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) })
|
|
|
+
|
|
|
+ console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`)
|
|
|
+
|
|
|
+ call.on('status', (status: Status) => {
|
|
|
+ if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
|
|
|
+ console.log(`Message trasmission operation is successful`)
|
|
|
+ // RPC completed successfully
|
|
|
+ } if (status == grpc.status.UNAVAILABLE) {
|
|
|
+ let report: ConnectionState = {
|
|
|
+ status: 'BUFFER',
|
|
|
+ reason: `Server doesn't seem to be alive. Error returned.`,
|
|
|
+ payload: this.messageToBeSendOver ?? `There's no message at the moment...`
|
|
|
+ }
|
|
|
+ connectionAttribute.connectionStatus!.next(report)
|
|
|
+ let clientStatusUpdateInfo: any = {
|
|
|
+ connectionStatus: 'OFF',
|
|
|
+ connectionID: connectionAttribute.ConnectionID.remote,
|
|
|
+ message: `${connectionAttribute.outGoing.serverUrl} started.`
|
|
|
+ }
|
|
|
+ this.clientRequest.next(clientStatusUpdateInfo)
|
|
|
+ resolve('No connection established. Server is not responding..')
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ call.on('data', (data: any) => {
|
|
|
+ let response: Message = {
|
|
|
+ id: data.id,
|
|
|
+ message: JSON.parse(data.message)
|
|
|
+ }
|
|
|
+ if (connectionAttribute.inComing.MessageToBeReceived) {
|
|
|
+ connectionAttribute.inComing.MessageToBeReceived.next(response)
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ call.on('error', (err) => {
|
|
|
+ console.error(err)
|
|
|
+ resolve('')
|
|
|
+ });
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
|
|
|
+ public async checkConnectionHealth(client: any, statusControl: Subject<ConnectionState>, alreadyHealthCheck: boolean): Promise<boolean> {
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ client.Check({}, (error, response) => {
|
|
|
+ if (response) {
|
|
|
+ console.log(`GRPC Health check status: ${response.status} Server Connected`);
|
|
|
+ // Intepret the response status and implement code logic or handler
|
|
|
+ resolve(response.status)
|
|
|
+ } else {
|
|
|
+ if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
|
|
|
+ reject(false)
|
|
|
+ }
|
|
|
+ })
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ // TO check or validate if the client request meets the criteria
|
|
|
+ private isConnectionAttribute(obj: any): obj is ConnectionAttribute {
|
|
|
+ const isMatch = (
|
|
|
+ typeof obj.ConnectionID === 'object' && // Further checks can be added based on the structure of ConnectionID
|
|
|
+ isStreamAttribute(obj.outGoing) &&
|
|
|
+ isStreamAttribute(obj.inComing) &&
|
|
|
+ (obj.connectionStatus === null || obj.connectionStatus instanceof Subject)
|
|
|
+ );
|
|
|
+
|
|
|
+ if (isMatch) {
|
|
|
+ console.log('gRPC client call matches ConnectionAttribute type');
|
|
|
+ } else {
|
|
|
+ console.log('gRPC client call does not match ConnectionAttribute type');
|
|
|
+ }
|
|
|
+
|
|
|
+ return isMatch;
|
|
|
+ function isStreamAttribute(obj: any): obj is StreamAttribute {
|
|
|
+ return (
|
|
|
+ (typeof obj.StreamID === 'string' || obj.StreamID === undefined) &&
|
|
|
+ (typeof obj.PublisherID === 'string' || obj.PublisherID === undefined) &&
|
|
|
+ (typeof obj.SubscriberID === 'string' || obj.SubscriberID === undefined) &&
|
|
|
+ // Check other properties like PublisherInstance and SubscriberInstance based on their expected types
|
|
|
+ (typeof obj.serverUrl === 'string' || obj.serverUrl === undefined) &&
|
|
|
+ // Check connectionState based on its type, assuming it's an enum or similar
|
|
|
+ (obj.MessageToBePublished === null || obj.MessageToBePublished instanceof Observable) &&
|
|
|
+ (obj.MessageToBeReceived === null || obj.MessageToBeReceived instanceof Subject)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // update the referenced array
|
|
|
+ // public updateConnectionStatus(connectionAttributeArray: ConnectionAttribute[]) {
|
|
|
+ // if (this.updateConnectionStatusFlag === false) {
|
|
|
+ // this.updateConnectionStatusFlag = true
|
|
|
+ // this.localServerStatus.subscribe({
|
|
|
+ // next: (notification: any) => {
|
|
|
+ // if (notification.connectionStatus === `ON`) {
|
|
|
+ // let connectionAttribute = connectionAttributeArray.find(connection => connection.ConnectionID.local == notification.connectionID)
|
|
|
+ // if (connectionAttribute) {
|
|
|
+ // connectionAttribute.outGoing.connectionState = 'ON'
|
|
|
+ // console.log(`Local Connection ${notification.connectionID} updated. {${connectionAttribute.outGoing.connectionState}}`)
|
|
|
+ // } else {
|
|
|
+ // console.log(`Local Connection ${notification.connectionID} attribute is not found.`)
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // },
|
|
|
+ // error: err => console.error(err),
|
|
|
+ // complete: () => { }
|
|
|
+ // })
|
|
|
+ // this.clientRequest.subscribe({
|
|
|
+ // next: (clientConnectionAttribute: ConnectionAttribute) => {
|
|
|
+ // console.log(`Received a request from ${clientConnectionAttribute.outGoing.serverUrl}`)
|
|
|
+ // let connectionAttribute = connectionAttributeArray.find(connection => connection.ConnectionID.remote == clientConnectionAttribute.ConnectionID.local)
|
|
|
+ // if (connectionAttribute) {
|
|
|
+ // connectionAttribute.inComing.connectionState = 'ON'
|
|
|
+ // console.log(`Client Connection ${clientConnectionAttribute.ConnectionID.local} updated. {${connectionAttribute.inComing.connectionState}}`)
|
|
|
+ // } else {
|
|
|
+ // console.log(`Client Connection Attribute ${clientConnectionAttribute.inComing.PublisherID} is not found`)
|
|
|
+ // }
|
|
|
+ // },
|
|
|
+ // error: err => console.error(err),
|
|
|
+ // complete: () => { }
|
|
|
+ // })
|
|
|
+ // } else {
|
|
|
+ // console.log(`Update Connection Status already called.`)
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+}
|
|
|
+// https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|