Explorar o código

statusChain refactoring

Enzo hai 11 meses
pai
achega
980ba7925a
Modificáronse 9 ficheiros con 339 adicións e 114 borrados
  1. 86 0
      attributes.json
  2. 2 2
      interfaces/general.interface.ts
  3. 1 0
      package.json
  4. 7 5
      services/grpc.service.method.ts
  5. 127 51
      services/server-client.service.ts
  6. 30 30
      test/grpc1.ts
  7. 13 13
      test/grpc2.ts
  8. 13 13
      test/grpc3.ts
  9. 60 0
      test/test2.ts

+ 86 - 0
attributes.json

@@ -0,0 +1,86 @@
+[
+    {
+        "ConnectionID": {
+            "local": "G0G1",
+            "remote": "G1G0"
+        },
+        "outGoing": {
+            "StreamID": "G0",
+            "PublisherID": "G0",
+            "SubscriberID": "G0",
+            "MessageToBePublished": null,
+            "MessageToBeReceived": null
+        },
+        "inComing": {
+            "StreamID": "G1",
+            "PublisherID": "G1",
+            "SubscriberID": "G1",
+            "MessageToBePublished": null,
+            "MessageToBeReceived": null
+        },
+        "connectionStatus": null
+    },
+    {
+        "ConnectionID": {
+            "local": "G0G2",
+            "remote": "G2G0"
+        },
+        "outGoing": {
+            "StreamID": "G0",
+            "PublisherID": "G0",
+            "SubscriberID": "G0",
+            "MessageToBePublished": null,
+            "MessageToBeReceived": null
+        },
+        "inComing": {
+            "StreamID": "G2",
+            "PublisherID": "G2",
+            "SubscriberID": "G2",
+            "MessageToBePublished": null,
+            "MessageToBeReceived": null
+        },
+        "connectionStatus": null
+    },
+    {
+        "ConnectionID": {
+            "local": "G1G0",
+            "remote": "G0G1"
+        },
+        "outGoing": {
+            "StreamID": "G1",
+            "PublisherID": "G1",
+            "SubscriberID": "G1",
+            "MessageToBePublished": null,
+            "MessageToBeReceived": null
+        },
+        "inComing": {
+            "StreamID": "G0",
+            "PublisherID": "G0",
+            "SubscriberID": "G0",
+            "MessageToBePublished": null,
+            "MessageToBeReceived": null
+        },
+        "connectionStatus": null
+    },
+    {
+        "ConnectionID": {
+            "local": "G2G0",
+            "remote": "G0G2"
+        },
+        "outGoing": {
+            "StreamID": "G2",
+            "PublisherID": "G2",
+            "SubscriberID": "G2",
+            "MessageToBePublished": null,
+            "MessageToBeReceived": null
+        },
+        "inComing": {
+            "StreamID": "G0",
+            "PublisherID": "G0",
+            "SubscriberID": "G0",
+            "MessageToBePublished": null,
+            "MessageToBeReceived": null
+        },
+        "connectionStatus": null
+    }
+]

+ 2 - 2
interfaces/general.interface.ts

@@ -55,8 +55,8 @@ export interface StreamAttribute {
 
 export interface ConnectionRequest {
     database?: string,
-    server: ServerRequest,
-    client: ClientRequest
+    server?: ServerRequest,
+    client?: ClientRequest
 }
 
 export interface ServerRequest {

+ 1 - 0
package.json

@@ -7,6 +7,7 @@
     "build": "tsc -p tsconfig.json",
     "watch": "tsc -p tsconfig.json --watch",
     "test": "echo \"Error: no test specified\" && exit 1",
+    "test2": "node test/test2.js",
     "generatedata": "node services/utility/generateData.js",
     "grpc1": "node test/grpc1.js",
     "grpc2": "node test/grpc2.js",

+ 7 - 5
services/grpc.service.method.ts

@@ -5,6 +5,8 @@ import { Status } from '@grpc/grpc-js/build/src/constants';
 import { message_proto } from './protos/server.proto'
 import { ServerWritableStreamImpl } from '@grpc/grpc-js/build/src/server-call';
 export class GrpcServiceMethod {
+    // Prefilled connection attribute and pass in grpc service method for reference
+    // Isolate connection attribute referencing issue to server-client service
     private server: grpc.Server | any
     private messageToBeSendOver: Message | any
     private clientInfo: any[] = []
@@ -12,8 +14,8 @@ export class GrpcServiceMethod {
 
     public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute, outGoingInfo: OutGoingInfo): Promise<any> {
         // Assuming currently only one client
-        this.createGrpcInstance(request.server.serverUrl, { instanceType: 'server' }, connectionAttribute, outGoingInfo)
-        this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute, outGoingInfo)
+        // this.createGrpcInstance(request.server.serverUrl, { instanceType: 'server' }, connectionAttribute, outGoingInfo)
+        // this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute, outGoingInfo)
     }
 
     private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) {
@@ -77,9 +79,9 @@ export class GrpcServiceMethod {
 
                 this.server.addService(message_proto.Message.service, {
                     HandleMessage: (call) => {
-                        let clientInfo = JSON.parse(call.request.message)
-                        this.clientInfo.push(clientInfo)
-                        // this.generateAdditionalAttributes(connectionAttribute, clientInfo)
+                        let clientInfo: OutGoingInfo = JSON.parse(call.request.message)
+                        // console.log(clientInfo)
+                        this.generateAdditionalAttributes(connectionAttribute, clientInfo)
 
                         console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
 

+ 127 - 51
services/server-client.service.ts

@@ -1,68 +1,144 @@
-import { BehaviorSubject } from 'rxjs';
-import { ConnectionAttribute, ConnectionRequest, ConnectionState, OutGoingInfo } from '../interfaces/general.interface';
+import { BehaviorSubject, Subject } from 'rxjs';
+import { ClientRequest, ConnectionAttribute, ConnectionRequest, ConnectionState, Message, OutGoingInfo, ServerRequest, State } from '../interfaces/general.interface';
 import { GrpcServiceMethod } from './grpc.service.method';
 import { BufferService } from './buffer.service';
-import { v4 as uuidv4 } from 'uuid'
 import * as dotenv from 'dotenv'
+import * as _ from 'lodash'
 dotenv.config()
 
 export class ServerClientManager {
-
     private connectionAttributes: ConnectionAttribute[] = []
-    private request: ConnectionRequest | any
-    private outGoingInfo: OutGoingInfo
     private grpcService: GrpcServiceMethod = new GrpcServiceMethod()
+    private defaultServerAttribute: ServerRequest = {
+        name: 'Default - Server',
+        serverUrl: "localhost:3000",
+        connectionType: 'GRPC',
+        messageToBePublishedFromApplication: new Subject<Message>()
+    }
+    private defaultClientAttribute: ClientRequest = {
+        name: 'Default - Client',
+        targetServer: "localhost:3001",
+        connectionType: 'GRPC',
+        messageToBeReceivedFromRemote: new Subject<Message>()
+    }
 
     constructor() {
-        this.outGoingInfo = {
-            StreamID: uuidv4(),
-            PublisherID: uuidv4(),
-            SubscriberID: uuidv4()
-        }
+        // logic here
     }
 
 
-    public generateConnection(request: ConnectionRequest) {
-        this.request = request
-        let database: string
-        if (request.database) {
-            database = request.database
-        } else {
-            database = request.server.name + request.client.name
-        }
-        /* Inject retransmission here */
-        let initialReport: ConnectionState = { status: 'BUFFER' }
-        let reportSubject: BehaviorSubject<ConnectionState> = new BehaviorSubject(initialReport)
-        let retransmission: BufferService = new BufferService(request.server.messageToBePublishedFromApplication, reportSubject, database)
-
-        let connectionAttribute: ConnectionAttribute = {
-            ConnectionID: {
-                local: '',
-                remote: ''
-            },
-            outGoing: {
-                MessageToBePublished: retransmission.getMessages(),
-                MessageToBeReceived: null
-            },
-            inComing: {
-                MessageToBePublished: null,
-                MessageToBeReceived: request.client.messageToBeReceivedFromRemote
-            },
-            connectionStatus: reportSubject
-        }
-
-        // connectionAttribute.outGoing.MessageToBePublished?.subscribe(e => console.log((e.message as MessageLog).appData.msgId))
-        // This is default connection`
-        if (!request.server.connectionType) {
-            request.server.connectionType = 'GRPC'
-        }
-        // For each connection type:
-        if (request.server.connectionType == 'GRPC') {
-            this.grpcService.create(request, connectionAttribute, this.outGoingInfo)
-            this.connectionAttributes.push(connectionAttribute)
-            console.log(`There is now ${this.connectionAttributes.length} connection Attributes`)
-        }
+    public async generateConnection(request: ConnectionRequest): Promise<any> {
+        return new Promise(async (resolve, reject) => {
+            let initialReport: ConnectionState
+            let reportSubject: BehaviorSubject<ConnectionState>
+            let retransmission: BufferService
+            // let originalRequest = JSON.parse(JSON.stringify(request))
+            let originalRequest = _.cloneDeep(request)
+            let database: string
+            let response: any = { message: `Fail to complete connection generation` }
+            let statusChain: State = 0
+            let connectionAttribute: ConnectionAttribute
+
+            if (statusChain == 0) {
+                if (!request.server) {
+                    request.server = this.defaultServerAttribute
+                }
+                if (!request.client) {
+                    request.client = this.defaultClientAttribute
+                }
+                if (request.database) {
+                    database = request.database
+                } else {
+                    database = request.server.name + request.client.name
+                }
+                /* Inject retransmission here */
+                initialReport = { status: 'BUFFER' }
+                reportSubject = new BehaviorSubject(initialReport)
+                retransmission = new BufferService(request.server.messageToBePublishedFromApplication, reportSubject, database)
+
+                statusChain = 1
+            }
+
+            if (statusChain == 1) {
+                // Connection Type checking
+                if (request.server!.connectionType != request.client!.connectionType) {
+                    console.log(`Connection Type DOES NOT MATCH!`)
+                    statusChain = 0
+                } else {
+                    statusChain = 1
+                }
+            }
+
+            if (statusChain == 1) {
+                connectionAttribute = {
+                    ConnectionID: {
+                        local: request.server!.name + request.client!.name,
+                        remote: request.client!.name + request.server!.name
+                    },
+                    outGoing: {
+                        StreamID: request.server!.name,
+                        PublisherID: request.server!.name,
+                        SubscriberID: request.server!.name,
+                        MessageToBePublished: retransmission!.getMessages(),
+                        MessageToBeReceived: null
+                    },
+                    inComing: {
+                        StreamID: request.client!.name,
+                        PublisherID: request.client!.name,
+                        SubscriberID: request.client!.name,
+                        MessageToBePublished: null,
+                        MessageToBeReceived: request.client!.messageToBeReceivedFromRemote
+                    },
+                    connectionStatus: reportSubject!
+                }
+                statusChain = 1
+            }
+
+            if (statusChain == 1) {
+                await this.checkConnectionAttribute(connectionAttribute!).then((res) => {
+                    if (res == true) {
+                        console.log(`Connection<${connectionAttribute.ConnectionID.local}> already exists `)
+                    }
+                    if (res == false) {
+                        this.connectionAttributes.push(connectionAttribute)
+                        console.log(`Connection ${connectionAttribute.ConnectionID.local} registered...`)
+                        response = {
+                            message: "Channel Response",
+                            requestedTo: originalRequest,
+                            data: connectionAttribute
+                        }
+                    }
+                    console.log(`There is now ${this.connectionAttributes.length} connection Attributes`)
+                })
+                statusChain = 1
+            }
+
+            if (statusChain == 1) {
+                // This is default connection`
+                if (!request.client!.connectionType) {
+                    request.client!.connectionType = 'GRPC'
+                }
+                // For each connection type:
+                if (request.client!.connectionType == 'GRPC') {
+                    // this.grpcService.create(request, connectionAttribute, this.outGoingInfo)
+                }
+            }
+
+            resolve(response);
+
+        })
     }
 
+    private async checkConnectionAttribute(connectionAttribute: ConnectionAttribute): Promise<boolean> {
+        return new Promise((resolve) => {
+            let result: boolean = this.connectionAttributes.some(connection =>
+                connection.ConnectionID.local === connectionAttribute.ConnectionID.local
+            );
+            console.log(`Checking ${connectionAttribute.ConnectionID.local} and returns ${result}`);
+            resolve(result);
+        });
+    }
+
+
 }
 

+ 30 - 30
test/grpc1.ts

@@ -167,19 +167,19 @@ let connectionRequest2: ConnectionRequest = {
 connectionService.generateConnection(connectionRequest)
 connectionService.generateConnection(connectionRequest2)
 
-let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(50))
-generateFakeMessagesToBePublished.subscribe({
-  next: message => {
-    let payload: Message = {
-      id: hostServer,
-      message: message
-    }
-    connectionRequest.server.messageToBePublishedFromApplication.next(payload)
-    connectionRequest2.server.messageToBePublishedFromApplication.next(payload)
-  },
-  error: error => console.error(error),
-  complete: () => console.log(`Completed`)
-})
+// let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(50))
+// generateFakeMessagesToBePublished.subscribe({
+//   next: message => {
+//     let payload: Message = {
+//       id: hostServer,
+//       message: message
+//     }
+//     connectionRequest.server.messageToBePublishedFromApplication.next(payload)
+//     connectionRequest2.server.messageToBePublishedFromApplication.next(payload)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Completed`)
+// })
 
 // let generateFakeMessagesToBePublished = stream().pipe(take(10))
 // generateFakeMessagesToBePublished.subscribe({
@@ -193,23 +193,23 @@ generateFakeMessagesToBePublished.subscribe({
 //   }
 // })
 
-connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
-  next: request => {
-    console.log(`Received ${(request.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
-    array.push(request)
-  },
-  error: error => console.error(error),
-  complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
-})
-
-connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
-  next: request => {
-    console.log(`Received ${(request.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
-    array.push(request)
-  },
-  error: error => console.error(error),
-  complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
-})
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     console.log(`Received ${(request.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+// connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     console.log(`Received ${(request.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
 
 
 

+ 13 - 13
test/grpc2.ts

@@ -50,19 +50,19 @@ connectionService.generateConnection(connectionRequest)
 // })
 
 
-connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
-  next: response => {
-    // if((response.message as MessageLog).appData.msgId == `ebf94479-44fe-470d-827c-9f1389396d6a`){
-    //   console.log(`Received the 1000th message. Running the test. Initiating server restart....`)
-    // connectionService.restartServerInDuration(10)
-    // }
-    console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
-    // Message.create(response)
-    array.push(response)
-  },
-  error: error => console.error(error),
-  complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
-})
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: response => {
+//     // if((response.message as MessageLog).appData.msgId == `ebf94479-44fe-470d-827c-9f1389396d6a`){
+//     //   console.log(`Received the 1000th message. Running the test. Initiating server restart....`)
+//     // connectionService.restartServerInDuration(10)
+//     // }
+//     console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+//     // Message.create(response)
+//     array.push(response)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
 
 
 /* Complex Test: 1 to 1*/

+ 13 - 13
test/grpc3.ts

@@ -50,19 +50,19 @@ connectionService.generateConnection(connectionRequest)
 // })
 
 
-connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
-  next: response => {
-    // if((response.message as MessageLog).appData.msgId == `ebf94479-44fe-470d-827c-9f1389396d6a`){
-    //   console.log(`Received the 1000th message. Running the test. Initiating server restart....`)
-    // connectionService.restartServerInDuration(10)
-    // }
-    console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
-    // Message.create(response)
-    array.push(response)
-  },
-  error: error => console.error(error),
-  complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
-})
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: response => {
+//     // if((response.message as MessageLog).appData.msgId == `ebf94479-44fe-470d-827c-9f1389396d6a`){
+//     //   console.log(`Received the 1000th message. Running the test. Initiating server restart....`)
+//     // connectionService.restartServerInDuration(10)
+//     // }
+//     console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+//     // Message.create(response)
+//     array.push(response)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
 
 
 /* Complex Test: 1 to 1*/

+ 60 - 0
test/test2.ts

@@ -0,0 +1,60 @@
+/* call server client service instance 3 times.  */
+
+import { Subject } from "rxjs";
+import { ConnectionRequest, Message } from "../interfaces/general.interface";
+import { ServerClientManager } from "../services/server-client.service";
+
+const serverClientManager = new ServerClientManager()
+
+let targetserver: string = 'localhost:3000'
+let targetserver2: string = 'localhost:3002'
+let hostServer: string = 'localhost:3001'
+
+let connectionRequest1: ConnectionRequest = {
+  server: {
+    name: 'G1',
+    serverUrl: hostServer,
+    connectionType: 'GRPC',
+    messageToBePublishedFromApplication: new Subject<Message>()
+  },
+  client: {
+    name: 'G2',
+    targetServer: targetserver,
+    connectionType: 'GRPC',
+    messageToBeReceivedFromRemote: new Subject<Message>()
+  }
+}
+
+let connectionRequest2: ConnectionRequest = {
+  server: {
+    name: 'G1',
+    serverUrl: hostServer,
+    connectionType: 'GRPC',
+    messageToBePublishedFromApplication: new Subject<Message>()
+  },
+  client: {
+    name: 'G0',
+    targetServer: targetserver2,
+    connectionType: 'GRPC',
+    messageToBeReceivedFromRemote: new Subject<Message>()
+  }
+}
+
+// Client 1 request connection
+serverClientManager.generateConnection(connectionRequest1).then((response) => {
+  console.log(response)
+  serverClientManager.generateConnection(connectionRequest2).then((response) => {
+    console.log(response)
+    serverClientManager.generateConnection(connectionRequest1);
+  })
+})
+// Array size =1;
+// serverClientManager.generateConnection(connectionRequest2)
+// Client 2 request connection
+// Array size =2;
+
+// Client 1 request connection again
+// setTimeout(() => {
+//   serverClientManager.generateConnection(connectionRequest1)
+// }, 4000)
+// Array size =2;