123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- import { BehaviorSubject, catchError, Observable, of, Subject, Subscription, tap, timeout } from "rxjs"
- import { RetransmissionService } from "../../services/retransmission.service"
- import { BaseMessage } from "../../dependencies/logging/services/logging-service"
- import { v4 as uuidV4 } from 'uuid';
- const express = require('express');
- const http = require('http');
- // const { Server } = require('socket.io');
- import { Server } from 'socket.io'
- /* This is only for demonstration purposes. Because the actual nestjs socket implementation may differ. */
- export class SocketService {
- private connectedClients: ClientInfo[] = []
- private announcements: Subject<any> = new Subject()
- private app = express();
- private server = http.createServer(this.app);
- private io = new Server(this.server);
- private responseFromApp: Subject<BaseMessage | any>
- private incomingRequest: Subject<BaseMessage> = new Subject()
- constructor(response: Subject<BaseMessage>) {
- this.responseFromApp = response
- this.announcements.subscribe(announcement => {
- console.log(`Server Announcement: ${announcement}`)
- })
- }
- public getIncomingRequest(): Observable<BaseMessage> {
- return this.incomingRequest.asObservable()
- }
- public async setUpConnection() {
- this.io.on('connection', (socket) => {
- this.announcements.next('a client is connected:' + socket.id);
- let clientInfo: ClientInfo | null
- socket.on('connect', (msg) => {
- // this is reserved....
- });
- socket.on('notification', (msg) => {
- console.log(msg)
- if (msg.agenda == 'newClient') {
- clientInfo = {
- id: socket.id,
- clientName: uuidV4(),
- connectedAt: new Date(),
- clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
- requests: [],
- buffer: new RetransmissionService(),
- responseObs: new Subject<any>()
- }
- this.connectedClients.push(clientInfo);
- // Send data over for client to persist
- socket.emit('notification', {
- notification: 'Your credentials',
- createdAt: new Date(),
- socketInfo: clientInfo
- })
- // this is the supposed responses to be pushed to this socket client
- clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState)
- let subscription = clientInfo.buffer.returnBufferedMessages().subscribe(output => {
- // console.log(output)
- if (clientInfo.clientConnectionState.getValue() === 'ONLINE') {
- socket.emit('response', output)
- } else {
- subscription.unsubscribe()
- }
- })
- }
- if (msg.agenda == 'existingClient') {
- // check if client exists
- let clientObj: ClientInfo = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
- if (clientObj) {
- clientInfo = clientObj
- console.log('Existing client found')
- // but also update socketId
- clientInfo.id = socket.id
- // Send data over for client to persist
- socket.emit('notification', {
- notification: 'Your updated credentials',
- connectedAt: new Date(),
- updatedId: socket.id
- })
- // resume operation Release them buffer
- /* local client isOnline need not be mutated, since this is a new connection. However the previous intance of client Connection State
- inside the retransmission needs to be updated to release the buffered values.*/
- function releaseBufferedItems(clientInfo: ClientInfo) {
- let subscription: Subscription = clientInfo.buffer.returnBufferedMessages().pipe(
- tap(message => {
- if (clientInfo.clientConnectionState.getValue() === 'OFFLINE') {
- clientInfo.responseObs.next(message)
- }
- }),
- timeout(10000), // Unsubscribe if no value is emitted within 10 seconds
- catchError(err => {
- if (err.name === 'TimeoutError') {
- console.log('TimeoutError: No value emitted within 10 seconds.');
- if (clientInfo.clientConnectionState.getValue() === 'ONLINE') {
- releaseBufferedItems(clientInfo); // Call the function if it's still online
- } else {
- subscription.unsubscribe()
- }
- }
- return of();
- })
- )
- .subscribe({
- next: output => {
- socket.emit('response', output)
- },
- error: err => console.error(err),
- complete: () => { }
- })
- }
- releaseBufferedItems(clientInfo)
- //signal to release buffered items
- clientObj.clientConnectionState.next('ONLINE')
- } else {
- console.log(this.connectedClients)
- console.log(`Existing Client is not found`)
- }
- }
- })
- // Listen for messages from the client
- socket.on('request', (request: BaseMessage) => {
- if (clientInfo) {
- this.announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.clientName}`);
- // clientInfo.requests.push({ message: request, completed: false })
- this.incomingRequest.next(request)
- this.processRequest(request).subscribe({
- next: message => {
- // console.log(message.header.messageName) // it does receive
- clientInfo.responseObs.next(message)
- },
- error: err => console.error(err),
- complete: () => { }
- })
- } else {
- console.log(`Client is still not defined. Please have this client set up the credentials`)
- socket.emit('notification', {
- notification: 'Failed Request',
- data: request,
- message: 'Client Credentials is not properply set up! Cannot process requests at the moment.'
- })
- }
- });
- // Handle disconnection
- socket.on('disconnect', () => {
- if (clientInfo) {
- clientInfo.clientConnectionState.next('OFFLINE') // signal to start buffering\
- this.announcements.next(`Client ${clientInfo.id} disconnected`);
- // this.deleteClientById(socket.id)
- }
- });
- });
- this.io.engine.on("connection_error", (err) => {
- console.log(err.req); // the request object
- console.log(err.code); // the error code, for example 1
- console.log(err.message); // the error message, for example "Session ID unknown"
- console.log(err.context); // some additional error context
- });
- // Start the server
- const PORT = process.env.PORT || 3000;
- this.server.listen(PORT, () => {
- console.log(`Server listening on port ${PORT}`);
- });
- }
- // Utils
- // Function to delete an item by its id (mutating the array)
- private deleteClientById(id) {
- const index = this.connectedClients.findIndex(item => item.id === id);
- if (index !== -1) {
- this.connectedClients.splice(index, 1);
- }
- }
- private processRequest(request: BaseMessage): Observable<any> {
- return new Observable((observer) => {
- this.responseFromApp.subscribe(message => {
- // console.log(message)
- if (message.header.messageID === request.header.messageID) {
- if (!message.complete) {
- observer.next(message)
- } else {
- observer.next(message)
- observer.complete()
- }
- }
- })
- })
- }
- }
- export interface ClientInfo {
- id: string,
- clientName: string,
- connectedAt: Date,
- clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
- requests: { message: any, completed: boolean }[],
- buffer: RetransmissionService,
- responseObs: Subject<any>
- }
|