From ba7ff685b947e601048fe59f37d9516b97b2b995 Mon Sep 17 00:00:00 2001 From: Miroslav Pejic Date: Mon, 11 Mar 2024 21:44:12 +0100 Subject: [PATCH] [mirotalksfu] - refactoring --- app/src/Peer.js | 18 +-- app/src/Room.js | 273 +++++++++++++++++++-------------------- app/src/Server.js | 16 +-- package.json | 6 +- public/js/Room.js | 2 +- public/js/RoomClient.js | 277 +++++++++++++++++++--------------------- 6 files changed, 284 insertions(+), 308 deletions(-) diff --git a/app/src/Peer.js b/app/src/Peer.js index 7f0069bd..78216d36 100644 --- a/app/src/Peer.js +++ b/app/src/Peer.js @@ -98,12 +98,12 @@ module.exports = class Peer { async createProducer(producerTransportId, producer_rtpParameters, producer_kind, producer_type) { try { if (!producerTransportId) { - throw new Error('Invalid producer transport ID'); + return 'Invalid producer transport ID'; } const producerTransport = this.transports.get(producerTransportId); if (!producerTransport) { - throw new Error(`Producer transport with ID ${producerTransportId} not found`); + return `Producer transport with ID ${producerTransportId} not found`; } const producer = await producerTransport.produce({ @@ -112,7 +112,7 @@ module.exports = class Peer { }); if (!producer) { - throw new Error(`Producer type: ${producer_type} kind: ${producer_kind} not found`); + return `Producer type: ${producer_type} kind: ${producer_kind} not found`; } const { id, appData, type, kind, rtpParameters } = producer; @@ -145,7 +145,7 @@ module.exports = class Peer { return producer; } catch (error) { log.error('Error creating producer', error.message); - return null; + return error.message; } } @@ -153,8 +153,8 @@ module.exports = class Peer { if (!this.producers.has(producer_id)) return; try { this.producers.get(producer_id).close(); - } catch (ex) { - log.warn('Close Producer', ex); + } catch (error) { + log.warn('Close Producer', error.message); } this.producers.delete(producer_id); } @@ -168,7 +168,7 @@ module.exports = class Peer { const consumerTransport = this.transports.get(consumer_transport_id); if (!consumerTransport) { - throw new Error(`Consumer transport with id ${consumer_transport_id} not found`); + return `Consumer transport with id ${consumer_transport_id} not found`; } const consumer = await consumerTransport.consume({ @@ -179,7 +179,7 @@ module.exports = class Peer { }); if (!consumer) { - throw new Error(`Consumer for producer ID ${producer_id} not found`); + return `Consumer for producer ID ${producer_id} not found`; } const { id, type, kind, rtpParameters, producerPaused } = consumer; @@ -224,7 +224,7 @@ module.exports = class Peer { }; } catch (error) { log.error('Error creating consumer', error.message); - return null; + return error.message; } } diff --git a/app/src/Room.js b/app/src/Room.js index d5406e02..594a5493 100644 --- a/app/src/Room.js +++ b/app/src/Room.js @@ -216,22 +216,6 @@ module.exports = class Room { return producerList; } - async connectPeerTransport(socket_id, transport_id, dtlsParameters) { - try { - if (!socket_id || !transport_id || !dtlsParameters) { - throw new Error('Invalid input parameters'); - } - - if (!this.peers.has(socket_id)) { - throw new Error(`Peer with socket ID ${socket_id} not found`); - } - - await this.peers.get(socket_id).connectTransport(transport_id, dtlsParameters); - } catch (error) { - log.error('Error connecting peer transport', error.message); - } - } - async removePeer(socket_id) { if (!this.peers.has(socket_id)) return; this.peers.get(socket_id).close(); @@ -243,92 +227,103 @@ module.exports = class Room { // #################################################### async createWebRtcTransport(socket_id) { - try { - if (!socket_id || !this.peers.has(socket_id)) { - throw new Error(`Invalid socket ID: ${socket_id}`); + if (!this.peers.has(socket_id)) { + return this.printError(`[Room|createWebRtcTransport] Invalid socket ID: ${socket_id}`); + } + + const { maxIncomingBitrate, initialAvailableOutgoingBitrate, listenInfos } = this.webRtcTransport; + + const webRtcTransportOptions = { + ...(this.webRtcServerActive ? { webRtcServer: this.webRtcServer } : { listenInfos }), + enableUdp: true, + enableTcp: true, + preferUdp: true, + iceConsentTimeout: 20, + initialAvailableOutgoingBitrate, + }; + + const transport = await this.router.createWebRtcTransport(webRtcTransportOptions); + + if (!transport) { + return this.printError('[Room|createWebRtcTransport] Failed to create WebRTC transport'); + } + + const { id, iceParameters, iceCandidates, dtlsParameters } = transport; + + if (maxIncomingBitrate) { + try { + await transport.setMaxIncomingBitrate(maxIncomingBitrate); + } catch (error) { + log.debug('Transport setMaxIncomingBitrate error', error.message); } + } - const { maxIncomingBitrate, initialAvailableOutgoingBitrate, listenInfos } = this.webRtcTransport; + const peer = this.peers.get(socket_id); - const webRtcTransportOptions = { - ...(this.webRtcServerActive ? { webRtcServer: this.webRtcServer } : { listenInfos }), - enableUdp: true, - enableTcp: true, - preferUdp: true, - iceConsentTimeout: 20, - initialAvailableOutgoingBitrate, - }; + const { peer_name } = peer; - const transport = await this.router.createWebRtcTransport(webRtcTransportOptions); - - if (!transport) { - throw new Error('Failed to create WebRTC transport'); - } - - const { id, iceParameters, iceCandidates, dtlsParameters } = transport; - - if (maxIncomingBitrate) { - try { - await transport.setMaxIncomingBitrate(maxIncomingBitrate); - } catch (error) { - log.debug('Transport setMaxIncomingBitrate error', error.message); - } - } - - const peer = this.peers.get(socket_id); - - const { peer_name } = peer; - - transport.on('icestatechange', (iceState) => { - if (iceState === 'disconnected' || iceState === 'closed') { - log.debug('Transport closed "icestatechange" event', { - peer_name: peer_name, - iceState: iceState, - }); - transport.close(); - //this.router.close(); - //peer.close(); - } - }); - - transport.on('sctpstatechange', (sctpState) => { - log.debug('Transport "sctpstatechange" event', { + transport.on('icestatechange', (iceState) => { + if (iceState === 'disconnected' || iceState === 'closed') { + log.debug('Transport closed "icestatechange" event', { peer_name: peer_name, - sctpState: sctpState, + iceState: iceState, }); + transport.close(); + //this.router.close(); + //peer.close(); + } + }); + + transport.on('sctpstatechange', (sctpState) => { + log.debug('Transport "sctpstatechange" event', { + peer_name: peer_name, + sctpState: sctpState, }); + }); - transport.on('dtlsstatechange', (dtlsState) => { - if (dtlsState === 'failed' || dtlsState === 'closed') { - log.debug('Transport closed "dtlsstatechange" event', { - peer_name: peer_name, - dtlsState: dtlsState, - }); - transport.close(); - //this.router.close(); - //peer.close(); - } - }); + transport.on('dtlsstatechange', (dtlsState) => { + if (dtlsState === 'failed' || dtlsState === 'closed') { + log.debug('Transport closed "dtlsstatechange" event', { + peer_name: peer_name, + dtlsState: dtlsState, + }); + transport.close(); + //this.router.close(); + //peer.close(); + } + }); - transport.on('close', () => { - log.debug('Transport closed', { peer_name: peer_name }); - }); + transport.on('close', () => { + log.debug('Transport closed', { peer_name: peer_name }); + }); - log.debug('Adding transport', { transportId: id }); + log.debug('Adding transport', { transportId: id }); - peer.addTransport(transport); + peer.addTransport(transport); - return { - params: { - id: id, - iceParameters: iceParameters, - iceCandidates: iceCandidates, - dtlsParameters: dtlsParameters, - }, - }; + return { + id: id, + iceParameters: iceParameters, + iceCandidates: iceCandidates, + dtlsParameters: dtlsParameters, + }; + } + + async connectPeerTransport(socket_id, transport_id, dtlsParameters) { + try { + if (!socket_id || !transport_id || !dtlsParameters) { + return this.printError('[Room|connectPeerTransport] Invalid input parameters'); + } + + if (!this.peers.has(socket_id)) { + return this.printError(`[Room|connectPeerTransport] Peer with socket ID ${socket_id} not found`); + } + + await this.peers.get(socket_id).connectTransport(transport_id, dtlsParameters); + return '[Room|connectPeerTransport] done'; } catch (error) { - log.error('Error creating WebRTC transport', error.message); - return null; + log.error('Error connecting peer transport', error.message); + return this.printError(`[Room|connectPeerTransport] error: ${error.message}`); } } @@ -337,41 +332,37 @@ module.exports = class Room { // #################################################### async produce(socket_id, producerTransportId, rtpParameters, kind, type) { - try { - if (!socket_id || !producerTransportId || !rtpParameters || !kind || !type) { - throw new Error('Invalid input parameters'); - } - - if (!this.peers.has(socket_id)) { - throw new Error(`Invalid socket ID: ${socket_id}`); - } - - const peer = this.peers.get(socket_id); - - const producer = await peer.createProducer(producerTransportId, rtpParameters, kind, type); - if (!producer) { - throw new Error('Failed to create producer'); - } - - const { id } = producer; - - const { peer_name, peer_info } = peer; - - this.broadCast(socket_id, 'newProducers', [ - { - producer_id: id, - producer_socket_id: socket_id, - peer_name: peer_name, - peer_info: peer_info, - type: type, - }, - ]); - - return id; - } catch (error) { - console.error('Error producing', error.message); - throw error.message; + // + if (!socket_id || !producerTransportId || !rtpParameters || !kind || !type) { + return this.printError('[Room|produce] Invalid input parameters'); } + + if (!this.peers.has(socket_id)) { + return this.printError(`[Room|produce] Invalid socket ID: ${socket_id}`); + } + + const peer = this.peers.get(socket_id); + + const peerProducer = await peer.createProducer(producerTransportId, rtpParameters, kind, type); + if (!peerProducer || !peerProducer.id) { + return this.printError(`[Room|produce] peerProducer error: '${peerProducer}'`); + } + + const { id } = peerProducer; + + const { peer_name, peer_info } = peer; + + this.broadCast(socket_id, 'newProducers', [ + { + producer_id: id, + producer_socket_id: socket_id, + peer_name: peer_name, + peer_info: peer_info, + type: type, + }, + ]); + + return id; } // #################################################### @@ -381,7 +372,7 @@ module.exports = class Room { async consume(socket_id, consumer_transport_id, producer_id, rtpCapabilities) { try { if (!socket_id || !consumer_transport_id || !producer_id || !rtpCapabilities) { - throw new Error('Invalid input parameters'); + return this.printError('[Room|consume] Invalid input parameters'); } if (!this.router.canConsume({ producerId: producer_id, rtpCapabilities })) { @@ -390,26 +381,26 @@ module.exports = class Room { consumer_transport_id, producer_id, }); - return; + return this.printError(`[Room|consume] Room router cannot consume producer_id: '${producer_id}'`); } if (!this.peers.has(socket_id)) { log.warn('Peer not found for socket ID', socket_id); - return; + return this.printError('[Room|consume] Peer not found'); } const peer = this.peers.get(socket_id); const { peer_name } = peer; - const result = await peer.createConsumer(consumer_transport_id, producer_id, rtpCapabilities); + const peerConsumer = await peer.createConsumer(consumer_transport_id, producer_id, rtpCapabilities); - if (!result || !result.consumer || !result.params) { - log.debug('Consumer or params are not defined in createConsumer result'); - return; + if (!peerConsumer || !peerConsumer.consumer || !peerConsumer.params) { + log.debug('peerConsumer or params are not defined'); + return this.printError(`[Room|consume] peerConsumer error: '${peerConsumer}'`); } - const { consumer, params } = result; + const { consumer, params } = peerConsumer; const { id, kind } = consumer; @@ -430,19 +421,13 @@ module.exports = class Room { return params; } catch (error) { log.error('Error occurred during consumption', error.message); - return; + return this.printError(`[Room|consume] ${error.message}`); } } closeProducer(socket_id, producer_id) { - try { - if (!socket_id || !producer_id || !this.peers.has(socket_id)) { - throw new Error('Invalid socket ID or producer ID'); - } - this.peers.get(socket_id).closeProducer(producer_id); - } catch (error) { - log.error('Error closing producer', error.message); - } + if (!socket_id || !producer_id || !this.peers.has(socket_id)) return; + this.peers.get(socket_id).closeProducer(producer_id); } // #################################################### @@ -499,6 +484,14 @@ module.exports = class Room { this._hostOnlyRecording = status; } + // #################################################### + // ERRORS + // #################################################### + + printError(message) { + return { error: message }; + } + // #################################################### // SENDER // #################################################### diff --git a/app/src/Server.js b/app/src/Server.js index 64f743cb..abb1c6e3 100644 --- a/app/src/Server.js +++ b/app/src/Server.js @@ -41,7 +41,7 @@ dependencies: { * @license For commercial or closed source, contact us at license.mirotalk@gmail.com or purchase directly via CodeCanyon * @license CodeCanyon: https://codecanyon.net/item/mirotalk-sfu-webrtc-realtime-video-conferences/40769970 * @author Miroslav Pejic - miroslav.pejic.85@gmail.com - * @version 1.3.92 + * @version 1.3.93 * */ @@ -1001,8 +1001,8 @@ function startServer() { log.debug('Get RouterRtpCapabilities', getPeerName(room)); try { - const routerRtpCapabilities = room.getRtpCapabilities(); - callback(routerRtpCapabilities); + const getRouterRtpCapabilities = room.getRtpCapabilities(); + callback(getRouterRtpCapabilities); } catch (err) { log.error('Get RouterRtpCapabilities error', err.message); callback({ @@ -1020,8 +1020,8 @@ function startServer() { log.debug('Create webrtc transport', getPeerName(room)); try { - const { params } = await room.createWebRtcTransport(socket.id); - callback(params); + const createWebRtcTransport = await room.createWebRtcTransport(socket.id); + callback(createWebRtcTransport); } catch (err) { log.error('Create WebRtc Transport error', err.message); callback({ @@ -1042,8 +1042,8 @@ function startServer() { log.debug('Connect transport', { peer_name: peer_name, transport_id: transport_id }); try { - await room.connectPeerTransport(socket.id, transport_id, dtlsParameters); - callback('success'); + const connectTransport = await room.connectPeerTransport(socket.id, transport_id, dtlsParameters); + callback(connectTransport); } catch (err) { log.error('Connect transport error', err.message); callback({ @@ -1052,7 +1052,7 @@ function startServer() { } }); - socket.on('produce', async ({ producerTransportId, kind, appData, rtpParameters }, callback) => { + socket.on('produce', async ({ producerTransportId, kind, appData, rtpParameters }, callback, errback) => { if (!roomList.has(socket.room_id)) { return callback({ error: 'Room not found' }); } diff --git a/package.json b/package.json index 1d80c869..c2b4fa92 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mirotalksfu", - "version": "1.3.92", + "version": "1.3.93", "description": "WebRTC SFU browser-based video calls", "main": "Server.js", "scripts": { @@ -38,8 +38,8 @@ "author": "Miroslav Pejic", "license": "AGPL-3.0", "dependencies": { - "@sentry/integrations": "7.106.0", - "@sentry/node": "7.106.0", + "@sentry/integrations": "7.106.1", + "@sentry/node": "7.106.1", "axios": "^1.6.7", "body-parser": "1.20.2", "colors": "1.4.0", diff --git a/public/js/Room.js b/public/js/Room.js index 8080ef11..f0b313e3 100644 --- a/public/js/Room.js +++ b/public/js/Room.js @@ -11,7 +11,7 @@ if (location.href.substr(0, 5) !== 'https') location.href = 'https' + location.h * @license For commercial or closed source, contact us at license.mirotalk@gmail.com or purchase directly via CodeCanyon * @license CodeCanyon: https://codecanyon.net/item/mirotalk-sfu-webrtc-realtime-video-conferences/40769970 * @author Miroslav Pejic - miroslav.pejic.85@gmail.com - * @version 1.3.92 + * @version 1.3.93 * */ diff --git a/public/js/RoomClient.js b/public/js/RoomClient.js index 81bc235e..cdd87541 100644 --- a/public/js/RoomClient.js +++ b/public/js/RoomClient.js @@ -9,7 +9,7 @@ * @license For commercial or closed source, contact us at license.mirotalk@gmail.com or purchase directly via CodeCanyon * @license CodeCanyon: https://codecanyon.net/item/mirotalk-sfu-webrtc-realtime-video-conferences/40769970 * @author Miroslav Pejic - miroslav.pejic.85@gmail.com - * @version 1.3.92 + * @version 1.3.93 * */ @@ -387,8 +387,8 @@ class RoomClient { } await this.joinAllowed(room); }) - .catch((err) => { - console.log('Join error:', err); + .catch((error) => { + console.error('Join error:', error); }); } @@ -533,129 +533,121 @@ class RoomClient { // #################################################### async initTransports(device) { - try { - // #################################################### - // PRODUCER TRANSPORT - // #################################################### + // #################################################### + // PRODUCER TRANSPORT + // #################################################### - const producerTransportData = await this.socket.request('createWebRtcTransport', { - forceTcp: false, - rtpCapabilities: device.rtpCapabilities, - }); + const producerTransportData = await this.socket.request('createWebRtcTransport', { + forceTcp: false, + rtpCapabilities: device.rtpCapabilities, + }); - if (producerTransportData.error) { - throw new Error('Error creating WebRTC producer transport: ' + producerTransportData.error); + this.producerTransport = device.createSendTransport(producerTransportData); + + this.producerTransport.on('connect', async ({ dtlsParameters }, callback, errback) => { + try { + await this.socket.request('connectTransport', { + dtlsParameters, + transport_id: producerTransportData.id, + }); + callback(); + } catch (err) { + errback(err); } + }); - this.producerTransport = device.createSendTransport(producerTransportData); - - this.producerTransport.on('connect', async ({ dtlsParameters }, callback, errback) => { - try { - await this.socket.request('connectTransport', { - dtlsParameters, - transport_id: producerTransportData.id, - }); - callback(); - } catch (err) { - errback(err); - } - }); - - this.producerTransport.on('produce', async ({ kind, appData, rtpParameters }, callback, errback) => { - console.log('Going to produce', { kind, appData, rtpParameters }); - try { - const { producer_id } = await this.socket.request('produce', { - producerTransportId: this.producerTransport.id, - kind, - appData, - rtpParameters, - }); + this.producerTransport.on('produce', async ({ kind, appData, rtpParameters }, callback, errback) => { + console.log('Going to produce', { kind, appData, rtpParameters }); + try { + const { producer_id } = await this.socket.request('produce', { + producerTransportId: this.producerTransport.id, + kind, + appData, + rtpParameters, + }); + if (producer_id.error) { + errback(producer_id.error); + } else { callback({ id: producer_id }); - } catch (err) { - errback(err); } - }); - - this.producerTransport.on('connectionstatechange', (state) => { - switch (state) { - case 'connecting': - console.log('Producer Transport connecting...'); - break; - case 'connected': - console.log('Producer Transport connected'); - break; - case 'failed': - console.warn('Producer Transport failed'); - this.producerTransport.close(); - // this.exit(true); - // this.refreshBrowser(); - break; - default: - break; - } - }); - - this.producerTransport.on('icegatheringstatechange', (state) => { - console.log('Producer icegatheringstatechange', state); - }); - - // #################################################### - // CONSUMER TRANSPORT - // #################################################### - - const consumerTransportData = await this.socket.request('createWebRtcTransport', { - forceTcp: false, - }); - - if (consumerTransportData.error) { - throw new Error('Error creating WebRTC consumer transport: ' + consumerTransportData.error); + } catch (err) { + errback(err); } + }); - this.consumerTransport = device.createRecvTransport(consumerTransportData); + this.producerTransport.on('connectionstatechange', (state) => { + switch (state) { + case 'connecting': + console.log('Producer Transport connecting...'); + break; + case 'connected': + console.log('Producer Transport connected'); + break; + case 'failed': + console.warn('Producer Transport failed'); + this.producerTransport.close(); + // this.exit(true); + // this.refreshBrowser(); + break; + default: + break; + } + }); - this.consumerTransport.on('connect', async ({ dtlsParameters }, callback, errback) => { - try { - await this.socket.request('connectTransport', { - transport_id: this.consumerTransport.id, - dtlsParameters, - }); - callback(); - } catch (err) { - errback(err); - } - }); + this.producerTransport.on('icegatheringstatechange', (state) => { + console.log('Producer icegatheringstatechange', state); + }); - this.consumerTransport.on('connectionstatechange', (state) => { - switch (state) { - case 'connecting': - console.log('Consumer Transport connecting...'); - break; - case 'connected': - console.log('Consumer Transport connected'); - break; - case 'failed': - console.warn('Consumer Transport failed'); - this.consumerTransport.close(); - // this.exit(true); - // this.refreshBrowser(); - break; - default: - break; - } - }); + // #################################################### + // CONSUMER TRANSPORT + // #################################################### - this.consumerTransport.on('icegatheringstatechange', (state) => { - console.log('Consumer icegatheringstatechange', state); - }); + const consumerTransportData = await this.socket.request('createWebRtcTransport', { + forceTcp: false, + }); - // #################################################### - // TODO DATACHANNEL TRANSPORT - // #################################################### + this.consumerTransport = device.createRecvTransport(consumerTransportData); - // - } catch (error) { - console.error('Error initializing transports', error); - } + this.consumerTransport.on('connect', async ({ dtlsParameters }, callback, errback) => { + try { + await this.socket.request('connectTransport', { + transport_id: this.consumerTransport.id, + dtlsParameters, + }); + callback(); + } catch (err) { + errback(err); + } + }); + + this.consumerTransport.on('connectionstatechange', (state) => { + switch (state) { + case 'connecting': + console.log('Consumer Transport connecting...'); + break; + case 'connected': + console.log('Consumer Transport connected'); + break; + case 'failed': + console.warn('Consumer Transport failed'); + this.consumerTransport.close(); + // this.exit(true); + // this.refreshBrowser(); + break; + default: + break; + } + }); + + this.consumerTransport.on('icegatheringstatechange', (state) => { + console.log('Consumer icegatheringstatechange', state); + }); + + // #################################################### + // TODO DATACHANNEL TRANSPORT + // #################################################### + + // } // #################################################### @@ -1895,44 +1887,35 @@ class RoomClient { } async getConsumeStream(producerId, peer_id, type) { - try { - const { rtpCapabilities } = this.device; - const data = await this.socket.request('consume', { - rtpCapabilities, - consumerTransportId: this.consumerTransport.id, - producerId, - }); + const { rtpCapabilities } = this.device; + const data = await this.socket.request('consume', { + rtpCapabilities, + consumerTransportId: this.consumerTransport.id, + producerId, + }); - console.log('DATA', data); + console.log('DATA', data); - if (!data || !data.id || !data.kind || !data.rtpParameters) { - throw new Error('Invalid data received from server'); - } + const { id, kind, rtpParameters } = data; + const codecOptions = {}; + const streamId = peer_id + (type == mediaType.screen ? '-screen-sharing' : '-mic-webcam'); + const consumer = await this.consumerTransport.consume({ + id, + producerId, + kind, + rtpParameters, + codecOptions, + streamId, + }); - const { id, kind, rtpParameters } = data; - const codecOptions = {}; - const streamId = peer_id + (type == mediaType.screen ? '-screen-sharing' : '-mic-webcam'); - const consumer = await this.consumerTransport.consume({ - id, - producerId, - kind, - rtpParameters, - codecOptions, - streamId, - }); + const stream = new MediaStream(); + stream.addTrack(consumer.track); - const stream = new MediaStream(); - stream.addTrack(consumer.track); - - return { - consumer, - stream, - kind, - }; - } catch (error) { - console.error('Error in getConsumeStream', error); - throw error; - } + return { + consumer, + stream, + kind, + }; } handleConsumer(id, type, stream, peer_name, peer_info) {