123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- import * as grpc from '@grpc/grpc-js';
- import { Observable, Subject, Subscription } from "rxjs";
- import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, StreamAttribute, MessageLog } from "../interfaces/general.interface";
- import { Status } from '@grpc/grpc-js/build/src/constants';
- import { message_proto } from './protos/server.proto'
- import * as _ from 'lodash'
- import { ServerClientManager } from './server-client.service';
- export class GrpcServiceMethod {
- private connectionAttribute: ConnectionAttribute | undefined
- private connectionAttributes: ConnectionAttribute[] = []
- private server: grpc.Server | any
- private messageToBeSendOver: Message | any
- private clientRequest: Subject<ConnectionAttribute> = new Subject()
-
- // public interface for service client to establish connection. They will give server and client information as a pair
- public async create(connectionAttribute: ConnectionAttribute, connectionAttributes: ConnectionAttribute[]): Promise<any> {
- this.connectionAttribute = connectionAttribute
- this.connectionAttributes = connectionAttributes
- return new Promise((resolve, reject) => {
- this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute)
- this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute)
- resolve('Just putting it here for now....')
- })
- }
- private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
- // Reconnection Logic
- while (true) {
- try {
- let recreatePromise = new Promise((resolve) => {
- if (grpcType.instanceType == 'server' && !this.server) {
- this.createServerStreamingServer().then(() => {
- resolve('recreate')
- })
- }
- if (grpcType.instanceType == 'client') {
- this.createServerStreamingClient(connectionAttribute).then(() => {
- resolve('recreate')
- })
- }
- })
- await recreatePromise
- } catch (error) {
- console.error('Connection attempt failed:', error);
- }
- await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
- // timeout generate message to trigger this reconnection
- }
- }
- // Create Server Instance to stream all application Outgoing messages
- public async createServerStreamingServer(): Promise<any> { // '0.0.0.0:3001'
- return new Promise((resolve, reject) => {
- this.server = new grpc.Server()
- this.server.addService(message_proto.Message.service, {
- HandleMessage: (call) => {
- let clientRequest: ConnectionAttribute = JSON.parse(call.request.message)
- /* Receive request, it will talk server client service, scs will do the checking instead of doing them here,
- two scenario. if this is a new client, then a new subject is assigned.
- IF this is previous channel, an existing subject is assigned back. */
- // client Request validation
- if (this.isConnectionAttribute(clientRequest)) {
- // Check if this connection exists
- let result: ConnectionAttribute | undefined = this.connectionAttributes.find((connectionAttribute: ConnectionAttribute) => connectionAttribute.ConnectionID.local === clientRequest.ConnectionID.remote)
- if (result) {
- // if exist, reassign back the buffer
- let subscription: Subscription = result.outGoing.MessageToBePublished!.subscribe({
- next: (outGoingMessage: Message) => {
- let message = {
- id: outGoingMessage.id,
- message: JSON.stringify(outGoingMessage.message)
- }
- console.log(`Sending ${(outGoingMessage.message as MessageLog).appData.msgId} to ${clientRequest.outGoing.PublisherID}`)
- call.write(message)
- },
- error: err => {
- console.error(err)
- subscription.unsubscribe()
- resolve(``)
- },
- complete: () => {
- subscription.unsubscribe()
- resolve(``)
- }
- })
- let report: ConnectionState = {
- status: `DIRECT_PUBLISH`
- }
- result.connectionStatus!.next(report)
- }
- if (!result) {
- console.log(`No matching results.... leaving the logic blank for now...`)
- /* Currently haven't thought of a solution for this. Even if i do , the simplest one
- woul be to assisgn a new buffer, which means the server client service will
- have to instantiate a new one for the incoming new client. Right now, there
- is no need since the amount of clients and their ID are predetermined.
- TO be discuseed further. */
- }
- }
- }
- })
- this.server.bindAsync(this.connectionAttribute!.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => {
- console.log(`gRPC server is running on ${this.connectionAttribute?.outGoing.serverUrl}`)
- this.server.start()
- })
- })
- }
- // Send a request over to the other server to open a channel for this server to emit/stream messages over
- public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise<string> {
- return new Promise(async (resolve, reject) => {
- const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure());
- let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error
- ConnectionID: connectionAttribute.ConnectionID,
- outGoing: {
- StreamID: connectionAttribute.outGoing.StreamID,
- PublisherID: connectionAttribute.outGoing.PublisherID,
- SubscriberID: connectionAttribute.outGoing.SubscriberID,
- serverUrl: connectionAttribute.outGoing.serverUrl,
- MessageToBePublished: null,
- MessageToBeReceived: null
- },
- inComing: {
- StreamID: connectionAttribute.inComing.StreamID,
- PublisherID: connectionAttribute.inComing.PublisherID,
- SubscriberID: connectionAttribute.inComing.SubscriberID,
- serverUrl: connectionAttribute.inComing.serverUrl,
- MessageToBePublished: null,
- MessageToBeReceived: null
- },
- connectionStatus: null
- }
- let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) })
- console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`)
- call.on('status', (status: Status) => {
- if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
- console.log(`Message trasmission operation is successful`)
- // RPC completed successfully
- } if (status == grpc.status.UNAVAILABLE) {
- let report: ConnectionState = {
- status: 'BUFFER',
- reason: `Server doesn't seem to be alive. Error returned.`,
- payload: this.messageToBeSendOver ?? `There's no message at the moment...`
- }
- connectionAttribute.connectionStatus!.next(report)
- resolve('No connection established. Server is not responding..')
- }
- });
- call.on('data', (data: any) => {
- let response: Message = {
- id: data.id,
- message: JSON.parse(data.message)
- }
- if (connectionAttribute.inComing.MessageToBeReceived) {
- connectionAttribute.inComing.MessageToBeReceived.next(response)
- }
- });
- call.on('error', (err) => {
- // console.error(err)
- let report: ConnectionState = {
- status: 'BUFFER',
- reason: `Server doesn't seem to be alive. Error returned.`,
- payload: this.messageToBeSendOver ?? `There's no message at the moment...`
- }
- connectionAttribute.connectionStatus!.next(report)
- resolve('')
- });
- })
- }
- // TO check or validate if the client request meets the criteria
- private isConnectionAttribute(obj: any): obj is ConnectionAttribute {
- const isMatch = (
- typeof obj.ConnectionID === 'object' && // Further checks can be added based on the structure of ConnectionID
- isStreamAttribute(obj.outGoing) &&
- isStreamAttribute(obj.inComing) &&
- (obj.connectionStatus === null || obj.connectionStatus instanceof Subject)
- );
- if (isMatch) {
- console.log('gRPC client call matches ConnectionAttribute type');
- } else {
- console.log('gRPC client call does not match ConnectionAttribute type');
- }
- return isMatch;
- function isStreamAttribute(obj: any): obj is StreamAttribute {
- return (
- (typeof obj.StreamID === 'string' || obj.StreamID === undefined) &&
- (typeof obj.PublisherID === 'string' || obj.PublisherID === undefined) &&
- (typeof obj.SubscriberID === 'string' || obj.SubscriberID === undefined) &&
- // Check other properties like PublisherInstance and SubscriberInstance based on their expected types
- (typeof obj.serverUrl === 'string' || obj.serverUrl === undefined) &&
- // Check connectionState based on its type, assuming it's an enum or similar
- (obj.MessageToBePublished === null || obj.MessageToBePublished instanceof Observable) &&
- (obj.MessageToBeReceived === null || obj.MessageToBeReceived instanceof Subject)
- );
- }
- }
- }
|