|
@@ -1,6 +1,6 @@
|
|
import * as grpc from '@grpc/grpc-js';
|
|
import * as grpc from '@grpc/grpc-js';
|
|
-import { Subject, Subscription } from "rxjs";
|
|
|
|
-import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, MessageLog, ConnectionStatus } from "../interfaces/general.interface";
|
|
|
|
|
|
+import { Observable, Subject, Subscription } from "rxjs";
|
|
|
|
+import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, MessageLog, ConnectionStatus, StreamAttribute } from "../interfaces/general.interface";
|
|
import { Status } from '@grpc/grpc-js/build/src/constants';
|
|
import { Status } from '@grpc/grpc-js/build/src/constants';
|
|
import { message_proto } from './protos/server.proto'
|
|
import { message_proto } from './protos/server.proto'
|
|
import * as _ from 'lodash'
|
|
import * as _ from 'lodash'
|
|
@@ -27,24 +27,6 @@ export class GrpcServiceMethod {
|
|
resolve('Just putting it here for now....')
|
|
resolve('Just putting it here for now....')
|
|
})
|
|
})
|
|
}
|
|
}
|
|
-
|
|
|
|
- private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) {
|
|
|
|
- // if (clientInfo) {
|
|
|
|
- // connectionAttribute.inComing.StreamID = clientInfo.StreamID
|
|
|
|
- // connectionAttribute.inComing.PublisherID = clientInfo.PublisherID
|
|
|
|
- // connectionAttribute.inComing.SubscriberID = clientInfo.SubscriberID
|
|
|
|
- // }
|
|
|
|
- // if (localInfo) {
|
|
|
|
- // connectionAttribute.outGoing.StreamID = localInfo.StreamID
|
|
|
|
- // connectionAttribute.outGoing.PublisherID = localInfo.PublisherID
|
|
|
|
- // connectionAttribute.outGoing.SubscriberID = localInfo.SubscriberID
|
|
|
|
- // }
|
|
|
|
- // if (connectionAttribute.outGoing.StreamID && connectionAttribute.inComing.StreamID) {
|
|
|
|
- // connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.StreamID + connectionAttribute.inComing.StreamID
|
|
|
|
- // connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.StreamID + connectionAttribute.outGoing.StreamID
|
|
|
|
- // }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
|
|
private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
|
|
// Reconnection Logic here
|
|
// Reconnection Logic here
|
|
while (true) {
|
|
while (true) {
|
|
@@ -78,45 +60,55 @@ export class GrpcServiceMethod {
|
|
this.server = new grpc.Server()
|
|
this.server = new grpc.Server()
|
|
this.localServerStatus.next({
|
|
this.localServerStatus.next({
|
|
connectionStatus: 'ON',
|
|
connectionStatus: 'ON',
|
|
- serverName: connectionAttribute.outGoing.PublisherID,
|
|
|
|
|
|
+ connectionIDlocal: connectionAttribute.ConnectionID.local,
|
|
message: `${connectionAttribute.outGoing.serverUrl} started.`
|
|
message: `${connectionAttribute.outGoing.serverUrl} started.`
|
|
})
|
|
})
|
|
} else {
|
|
} else {
|
|
console.log(`Grpc server alrady started.`)
|
|
console.log(`Grpc server alrady started.`)
|
|
|
|
+ this.localServerStatus.next({
|
|
|
|
+ connectionStatus: 'ON',
|
|
|
|
+ connectionIDlocal: connectionAttribute.ConnectionID.local,
|
|
|
|
+ message: `${connectionAttribute.outGoing.serverUrl} already started.`
|
|
|
|
+ })
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
this.server.addService(message_proto.Message.service, {
|
|
this.server.addService(message_proto.Message.service, {
|
|
HandleMessage: (call) => {
|
|
HandleMessage: (call) => {
|
|
|
|
+ /// add a checking for standard message request
|
|
let clientInfo: ConnectionAttribute = JSON.parse(call.request.message)
|
|
let clientInfo: ConnectionAttribute = JSON.parse(call.request.message)
|
|
- this.clientRequest.next(clientInfo)
|
|
|
|
-
|
|
|
|
- console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
|
|
|
|
-
|
|
|
|
- if (connectionAttribute.outGoing.MessageToBePublished) {
|
|
|
|
- let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
|
|
|
|
- next: (response: Message) => {
|
|
|
|
- console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
|
|
|
|
- let message = {
|
|
|
|
- id: response.id,
|
|
|
|
- message: JSON.stringify(response.message)
|
|
|
|
|
|
+ if (this.isConnectionAttribute(clientInfo)) {
|
|
|
|
+ this.clientRequest.next(clientInfo)
|
|
|
|
+ console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
|
|
|
|
+
|
|
|
|
+ if (connectionAttribute.outGoing.MessageToBePublished) {
|
|
|
|
+ let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
|
|
|
|
+ next: (response: Message) => {
|
|
|
|
+ console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
|
|
|
|
+ let message = {
|
|
|
|
+ id: response.id,
|
|
|
|
+ message: JSON.stringify(response.message)
|
|
|
|
+ }
|
|
|
|
+ call.write(message)
|
|
|
|
+ },
|
|
|
|
+ error: err => {
|
|
|
|
+ console.error(err)
|
|
|
|
+ subscription.unsubscribe()
|
|
|
|
+ resolve('')
|
|
|
|
+ },
|
|
|
|
+ complete: () => {
|
|
|
|
+ console.log(`Stream response completed for ${call.request.id}`)
|
|
|
|
+ subscription.unsubscribe()
|
|
|
|
+ resolve('')
|
|
}
|
|
}
|
|
- call.write(message)
|
|
|
|
- },
|
|
|
|
- error: err => {
|
|
|
|
- console.error(err)
|
|
|
|
- subscription.unsubscribe()
|
|
|
|
- resolve('')
|
|
|
|
- },
|
|
|
|
- complete: () => {
|
|
|
|
- console.log(`Stream response completed for ${call.request.id}`)
|
|
|
|
- subscription.unsubscribe()
|
|
|
|
- resolve('')
|
|
|
|
|
|
+ })
|
|
|
|
+ let report: ConnectionState = {
|
|
|
|
+ status: 'DIRECT_PUBLISH'
|
|
}
|
|
}
|
|
- })
|
|
|
|
- let report: ConnectionState = {
|
|
|
|
- status: 'DIRECT_PUBLISH'
|
|
|
|
|
|
+ connectionAttribute.connectionStatus!.next(report)
|
|
}
|
|
}
|
|
- connectionAttribute.connectionStatus!.next(report)
|
|
|
|
|
|
+ } else {
|
|
|
|
+ console.error(`INVALID REQUEST`)
|
|
}
|
|
}
|
|
},
|
|
},
|
|
Check: (_, callback) => {
|
|
Check: (_, callback) => {
|
|
@@ -213,6 +205,38 @@ export class GrpcServiceMethod {
|
|
})
|
|
})
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ private isConnectionAttribute(obj: any): obj is ConnectionAttribute {
|
|
|
|
+ const isMatch = (
|
|
|
|
+ typeof obj.ConnectionID === 'object' && // Further checks can be added based on the structure of ConnectionID
|
|
|
|
+ isStreamAttribute(obj.outGoing) &&
|
|
|
|
+ isStreamAttribute(obj.inComing) &&
|
|
|
|
+ (obj.connectionStatus === null || obj.connectionStatus instanceof Subject)
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ if (isMatch) {
|
|
|
|
+ console.log('gRPC client call matches ConnectionAttribute type');
|
|
|
|
+ } else {
|
|
|
|
+ console.log('gRPC client call does not match ConnectionAttribute type');
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return isMatch;
|
|
|
|
+ function isStreamAttribute(obj: any): obj is StreamAttribute {
|
|
|
|
+ return (
|
|
|
|
+ (typeof obj.StreamID === 'string' || obj.StreamID === undefined) &&
|
|
|
|
+ (typeof obj.PublisherID === 'string' || obj.PublisherID === undefined) &&
|
|
|
|
+ (typeof obj.SubscriberID === 'string' || obj.SubscriberID === undefined) &&
|
|
|
|
+ // Check other properties like PublisherInstance and SubscriberInstance based on their expected types
|
|
|
|
+ (typeof obj.serverUrl === 'string' || obj.serverUrl === undefined) &&
|
|
|
|
+ // Check connectionState based on its type, assuming it's an enum or similar
|
|
|
|
+ (obj.MessageToBePublished === null || obj.MessageToBePublished instanceof Observable) &&
|
|
|
|
+ (obj.MessageToBeReceived === null || obj.MessageToBeReceived instanceof Subject)
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|