瀏覽代碼

implement chunk slicing

Enzo 1 年之前
父節點
當前提交
7332597520
共有 3 個文件被更改,包括 27 次插入4 次删除
  1. 6 0
      services/utility.service.ts
  2. 2 2
      test/consumer_1.ts
  3. 19 2
      test/publisher.ts

+ 6 - 0
services/utility.service.ts

@@ -36,5 +36,11 @@ export class UtilityService {
         let maxHeapSize = heapStats.heap_size_limit / (1024 * 1024); // Convert to MB
         console.log(`Current maximum heap size: ${maxHeapSize.toFixed(2)} MB`);
     }
+
+    public checkCPU() {
+        let os = require('os')
+        let cpuUsage = os.cpus()
+        console.log(cpuUsage)
+    }
 }
 

+ 2 - 2
test/consumer_1.ts

@@ -34,7 +34,7 @@ function createWebsocketServer() {
 
             let toProceed = false;
 
-            if (element >= 2) {
+            if (element >= 50) {
                 toProceed = true
             } else {
                 toProceed = false
@@ -75,7 +75,7 @@ function connectWebSocket() {
 
     // Receive payload from publisher and push them into local Subject
     socket.on('payload', (data: any[]) => {
-        console.log(data.length)
+        console.log(`Message received from publisher: ${data.length}`)
         data.forEach(element => {
             payload.next(element)
             // trafficControl.next(util.checkHeapSize())

+ 19 - 2
test/publisher.ts

@@ -4,6 +4,8 @@ import { StorageLocation } from '../types/interface';
 import { Server } from 'socket.io'
 import { io } from 'socket.io-client';
 import { UtilityService } from '../services/utility.service';
+import { Worker } from "worker_threads"
+
 
 let msgData = new DataPrepService()
 let util = new UtilityService()
@@ -30,8 +32,23 @@ function createWebsocketServer() {
         const subscription = msgPayload.pipe(
             buffer(notifier)
         ).subscribe((element) => {
-            console.log(`Emitting ${element.length} messages`)
-            socket.emit(`payload`, element);
+            if (element.length >= 25000) {
+                const chunkSize = 1024; // Specify the desired chunk size in bytes
+                const totalChunks = Math.ceil(element.length / chunkSize);
+
+                Array.from({ length: totalChunks }, (_, i) => {
+                    const start = i * chunkSize;
+                    const end = start + chunkSize;
+                    const chunk = element.slice(start, end);
+
+                    console.log(`Emitting ${element.length} messages`)
+                    socket.emit('payload', chunk);
+                });
+                socket.emit(`end`)
+            } else {
+                console.log(`Emitting ${element.length} messages`)
+                socket.emit(`payload`, element);
+            }
 
         });