Explorar o código

server streaming grpc set up

enzo hai 11 meses
pai
achega
d7dd32c588
Modificáronse 4 ficheiros con 49 adicións e 87 borrados
  1. 1 1
      README.md
  2. 9 9
      grpc/grpc1.ts
  3. 15 28
      grpc/grpc2.ts
  4. 24 49
      services/grpc.service.ts

+ 1 - 1
README.md

@@ -10,7 +10,7 @@ To get started with FIs-Retransmission, you can clone the repository and install
 
 ## Execution Intsructions
 
-Several batch files are prepared for ease of use. After building and the project, please change the path in the startgrpc.bat && startgrpc2.bat to your own path, and just type in start "<startgrpc or batchfile of your choosing>.bat"
+Several batch files are prepared for ease of use. After building and the project, please change the path in the startgrpc.bat && startgrpc2.bat to your own path, and just type in start "startgrpc or batchfile of your choosing.bat"
 
 
 ```bash

+ 9 - 9
grpc/grpc1.ts

@@ -14,19 +14,19 @@ let messageToBePublished: Subject<any> = new Subject()
 let statusControl: Subject<ReportStatus> = new Subject()
 
 /* For server streaming */
-// errorHandlingService.handleMessage(dataMessages, statusControl).subscribe((messages) => {
-//   messageToBePublished.next(messages)
-// })
-// let server1 = 'localhost:3000'
-// gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' })
-
-
-/* For bidiretional streaming*/
 errorHandlingService.handleMessage(dataMessages, statusControl).subscribe((messages) => {
   messageToBePublished.next(messages)
 })
 let server1 = 'localhost:3000'
-gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'bidirectional' })
+gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' })
+
+
+/* For bidiretional streaming*/
+// errorHandlingService.handleMessage(dataMessages, statusControl).subscribe((messages) => {
+//   messageToBePublished.next(messages)
+// })
+// let server1 = 'localhost:3000'
+// gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'bidirectional' })
 
 
 // setTimeout(() => {

+ 15 - 28
grpc/grpc2.ts

@@ -16,40 +16,27 @@ let server1: string = 'localhost:3000'
 let unaryRequestSubject: Subject<any> = new Subject()
 
 /* Server Streaming Test case */
-// errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => {
-//   messageToBeReleased.next(messages)
-// })
-// grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
-
-/* Bidirectional streaming test case */
-errorHandlingService.handleMessage(dataMessages, statusControl).subscribe((messages) => {
+errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => {
   messageToBeReleased.next(messages)
 })
-grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'bidirectional' })
-
-
+grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
 
-let testMessageRequest = {
-  appLogLocId: "68ca0bae-2acd-44f2-b54c-836d6af92890",
-  appData: {
-    msgId: "74023eec-2cf9-422c-ab15-e65c6e08b213",
-    msgLogDateTime: "2023-09-10T17:07:35.262Z",
-    msgDateTime: "2023-01-16T04:51:29.595Z",
-    msgTag: [
-      "free",
-      "enterprise",
-      "rich"
-    ],
-    msgPayload: "Autus ducimus deinde thema. Succurro tui denuncio nostrum summisse aiunt statua. Cribro commemoro utique.\nUlterius apparatus copia argentum solium textor denego inventore thymbra aegre. Acsi cometes color perspiciatis. Pax caste derelinquo amicitia tui molestiae culpo cohaero.\nRepudiandae desipio tero decretum atrocitas. Trado aptus sunt utor arcus quos molestias. Tabella enim curto clibanus cavus usus villa.\nCondico viriliter reprehenderit unus curriculum. Numquam velut adsuesco adversus veritatis callide delibero umquam vulariter deporto. Inventore astrum cavus ambulo creptio.\nSuspendo demo carus fuga. Decerno dolores deficio accusator. Aestus quod dedico contigo magni."
-  }
-}
+/* Bidirectional streaming test case */
+// errorHandlingService.handleMessage(dataMessages, statusControl).subscribe((messages) => {
+//   messageToBeReleased.next(messages)
+// })
+// grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'bidirectional' })
+// }
 
 setTimeout(() => {
-  // messageToBeReleased.next(testMessageRequest)
+  messageToBeReleased.next(parsedMessages[0])
 }, 1000)
-// setTimeout(() => {
-//   unaryRequestSubject.next(testMessageRequest)
-// }, 7000)
+setTimeout(() => {
+  messageToBeReleased.next(parsedMessages[1])
+}, 4000)
+setTimeout(() => {
+  unaryRequestSubject.next(parsedMessages[2])
+}, 7000)
 
 // this is just to publish an array of fake data as a Subject
 function stream(): Subject<any> {

+ 24 - 49
services/grpc.service.ts

@@ -213,12 +213,12 @@ export class GrpcService {
 
             this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
 
-            call.on('status', (status: Status) => {
+            call.on('status', (status: Status) => { // this is useless in streaming(on for unary)
                 // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
                 // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
-                if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
-                    resolve('No connection established. Server is not responding..')
-                }
+                // if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
+                //     resolve('No connection established. Server is not responding..')
+                // }
             });
 
             // All the grpc operations are here
@@ -238,11 +238,11 @@ export class GrpcService {
 
             call.on('data', (data: any) => {
                 let message = JSON.parse(data.message)
-                console.log(`Received acknowledgement from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
+                console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
             });
 
             call.on('error', (err) => {
-                console.log(`Something wrong with RPC call...`)
+                // console.log(`Something wrong with RPC call...`)
                 if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
                     subscription.unsubscribe();
                     unsubscribed = true;
@@ -266,20 +266,22 @@ export class GrpcService {
             try {
                 // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
                 let server: grpc.Server = new grpc.Server();
-                let onHold: any
                 // Add the streamingData function to the gRPC service
                 // Define your message_proto.Message service methods
 
                 server.addService(message_proto.Message.service, {
                     HandleMessage: (call) => { // this is for bidirectional streaming. Need to have another one for unary calls for web clients
+                        let requestId = call.request // unary request from client to be responded with a stream
+                        console.log(`Processing unary call.... requestId: ${requestId.id}`)
                         console.log(`Client connected from: ${call.getPeer()}`);
-                        // let request = call.request // just putting it here to verify unary call request
-                        let report: ReportStatus = {
+
+                        let report: ReportStatus = { //let the flow come through 
                             code: ColorCode.GREEN,
                             message: `Client connected!!`
                         }
                         statusControl.next(report)
-                        let subscription: Subscription = messageToBeStream.subscribe({
+
+                        let subscription: Subscription = messageToBeStream.pipe(take(15)).subscribe({
                             next: (payload: any) => {
                                 let noConnection = call.cancelled // check connection for each and every message
                                 if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check 
@@ -291,10 +293,10 @@ export class GrpcService {
                                     statusControl.next(report)
                                     subscription.unsubscribe()
                                 } else {
-                                    console.log(`Sending ${payload.appData.msgId}`)
+                                    console.log(`Sending ${payload.appData.msgId} in respond to unary ${requestId.id}`)
                                     let message: string = JSON.stringify(payload)
                                     call.write({ message })
-                                    // onHold = null
+
                                 }
                             },
                             error: err => {
@@ -304,41 +306,14 @@ export class GrpcService {
                                     message: `Message streaming error`
                                 }
                                 statusControl.next(report)
+                                subscription.unsubscribe()
                             },
-                            complete: () => console.log(``)  //it will never complete
-                        })
-
-                        call.on('data', (data: any) => {
-                            // console.log(data) // it does return in string format
-                            let payload = JSON.parse(data.message)
-                            console.log(data)
-                            // console.log(`Received Message from Client: ${payload.appData?.msgId}`);
-                            // Forward the received message to the RxJS subject
-                            let respmsg: any = {
-                                msgId: payload.appData?.msgId,
-                                confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
+                            complete: () => {
+                                console.log(`Stream response completed for ${requestId.id}`)
+                                subscription.unsubscribe()
+                                // call.end()
                             }
-                            let message: string = JSON.stringify(respmsg)
-                            console.log(`Responding to client: ${respmsg.msgId}`);
-                            // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
-                            call.write({ message });
-                        });
-
-                        call.on('end', () => {
-                            console.log('Client stream ended');
-                            // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
-                        });
-
-                        call.on('error', (err) => {
-                            // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
-                            // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
-                            // the call back function will be to write and then the client should response immediately through test
-                        });
-
-                        call.on('close', () => {
-                            console.log('Unknown cause for diconnectivity');
-                            // Handle client closure, which may be due to errors or manual termination
-                        });
+                        })
                     },
 
                     Check: (_, callback) => {
@@ -371,11 +346,11 @@ export class GrpcService {
             unaryRequestSubject.subscribe({
                 next: (request: any) => {
                     let message = {
-                        id: '123',
+                        id: request.appData?.msgId,
                         message: JSON.stringify(request)
                     }
 
-                    console.log(`Sending request: ${message.id} over to server....`)
+                    console.log(`<${message.id}> Sending request: ${message.id} over to server....`)
                     const call = client.HandleMessage(message)
 
                     call.on('status', (status: Status) => {
@@ -395,8 +370,7 @@ export class GrpcService {
                     });
 
                     call.on('data', (data: any) => {
-                        let message = JSON.parse(data.message)
-                        console.log(`Received data from Server: ${message.appData?.msgId ?? `Invalid`}`);
+                        console.log(`Received stream response from Server. Receiver: ${message.id}`);
                     });
 
                     call.on('error', (err) => {
@@ -409,6 +383,7 @@ export class GrpcService {
                     });
 
                     call.on('end', () => { // this is for gracefull || willfull termination from the server
+                        console.log(`Streaming Response is completed`)
                         let report = {
                             code: ColorCode.YELLOW,
                             message: `Server doesn't seem to be alive. Error returned.`