[mirotalksfu] - refactoring

هذا الالتزام موجود في:
Miroslav Pejic
2024-04-27 18:39:14 +02:00
الأصل dbfe4349e5
التزام 37d6aa5501
6 ملفات معدلة مع 192 إضافات و260 حذوفات

عرض الملف

@@ -106,8 +106,8 @@ module.exports = class Peer {
close() {
this.transports.forEach((transport, transport_id) => {
log.debug('Close and delete peer transports', {
transport_id: transport_id,
//transportInternal: transport.internal,
//transport_id: transport_id,
transportInternal: transport.internal,
});
transport.close();
this.delTransport(transport_id);
@@ -131,55 +131,40 @@ module.exports = class Peer {
}
async createProducer(producerTransportId, producer_rtpParameters, producer_kind, producer_type) {
try {
if (!producerTransportId) {
return 'Invalid producer transport ID';
}
if (!this.transports.has(producerTransportId)) return;
const producerTransport = this.transports.get(producerTransportId);
const producerTransport = this.transports.get(producerTransportId);
if (!producerTransport) {
return `Producer transport with ID ${producerTransportId} not found`;
}
const producer = await producerTransport.produce({
kind: producer_kind,
rtpParameters: producer_rtpParameters,
});
const producer = await producerTransport.produce({
kind: producer_kind,
rtpParameters: producer_rtpParameters,
const { id, appData, type, kind, rtpParameters } = producer;
appData.mediaType = producer_type;
this.producers.set(id, producer);
if (['simulcast', 'svc'].includes(type)) {
const { scalabilityMode } = rtpParameters.encodings[0];
const spatialLayer = parseInt(scalabilityMode.substring(1, 2)); // 1/2/3
const temporalLayer = parseInt(scalabilityMode.substring(3, 4)); // 1/2/3
log.debug(`Producer [${type}-${kind}] ----->`, {
scalabilityMode,
spatialLayer,
temporalLayer,
});
if (!producer) {
return `Producer type: ${producer_type} kind: ${producer_kind} not found`;
}
const { id, appData, type, kind, rtpParameters } = producer;
appData.mediaType = producer_type;
this.producers.set(id, producer);
if (['simulcast', 'svc'].includes(type)) {
const { scalabilityMode } = rtpParameters.encodings[0];
const spatialLayer = parseInt(scalabilityMode.substring(1, 2)); // 1/2/3
const temporalLayer = parseInt(scalabilityMode.substring(3, 4)); // 1/2/3
log.debug(`Producer [${type}-${kind}] ----->`, {
scalabilityMode,
spatialLayer,
temporalLayer,
});
} else {
log.debug('Producer ----->', { type: type, kind: kind });
}
producer.on('transportclose', () => {
log.debug('Producer "transportclose" event');
this.closeProducer(id);
});
return producer;
} catch (error) {
log.error('Error creating producer', error.message);
return error.message;
} else {
log.debug('Producer ----->', { type: type, kind: kind });
}
producer.on('transportclose', () => {
log.debug('Producer "transportclose" event');
this.closeProducer(id);
});
return producer;
}
closeProducer(producer_id) {
@@ -222,74 +207,57 @@ module.exports = class Peer {
}
async createConsumer(consumer_transport_id, producer_id, rtpCapabilities) {
try {
if (!consumer_transport_id) {
return 'Invalid consumer transport ID';
}
if (!this.transports.has(consumer_transport_id)) return;
const consumerTransport = this.transports.get(consumer_transport_id);
const consumerTransport = this.transports.get(consumer_transport_id);
if (!consumerTransport) {
return `Consumer transport with id ${consumer_transport_id} not found`;
}
const consumer = await consumerTransport.consume({
producerId: producer_id,
rtpCapabilities,
enableRtx: true, // Enable NACK for OPUS.
paused: false,
});
const consumer = await consumerTransport.consume({
producerId: producer_id,
rtpCapabilities,
enableRtx: true, // Enable NACK for OPUS.
paused: false,
});
const { id, type, kind, rtpParameters, producerPaused } = consumer;
if (!consumer) {
return `Consumer for producer ID ${producer_id} not found`;
}
this.consumers.set(id, consumer);
const { id, type, kind, rtpParameters, producerPaused } = consumer;
this.consumers.set(id, consumer);
if (['simulcast', 'svc'].includes(type)) {
// simulcast - L1T3/L2T3/L3T3 | svc - L3T3
const { scalabilityMode } = rtpParameters.encodings[0];
const spatialLayer = parseInt(scalabilityMode.substring(1, 2)); // 1/2/3
const temporalLayer = parseInt(scalabilityMode.substring(3, 4)); // 1/2/3
try {
await consumer.setPreferredLayers({
spatialLayer: spatialLayer,
temporalLayer: temporalLayer,
});
log.debug(`Consumer [${type}-${kind}] ----->`, {
scalabilityMode,
spatialLayer,
temporalLayer,
});
} catch (error) {
return `Error to set Consumer preferred layers: ${error.message}`;
}
} else {
log.debug('Consumer ----->', { type: type, kind: kind });
}
consumer.on('transportclose', () => {
log.debug('Consumer "transportclose" event');
this.removeConsumer(id);
});
return {
consumer: consumer,
params: {
producerId: producer_id,
id: id,
kind: kind,
rtpParameters: rtpParameters,
type: type,
producerPaused: producerPaused,
},
};
} catch (error) {
log.error('Error creating consumer', error.message);
return error.message;
if (['simulcast', 'svc'].includes(type)) {
// simulcast - L1T3/L2T3/L3T3 | svc - L3T3
const { scalabilityMode } = rtpParameters.encodings[0];
const spatialLayer = parseInt(scalabilityMode.substring(1, 2)); // 1/2/3
const temporalLayer = parseInt(scalabilityMode.substring(3, 4)); // 1/2/3
try {
await consumer.setPreferredLayers({
spatialLayer: spatialLayer,
temporalLayer: temporalLayer,
});
log.debug(`Consumer [${type}-${kind}] ----->`, {
scalabilityMode,
spatialLayer,
temporalLayer,
});
} catch (error) {}
} else {
log.debug('Consumer ----->', { type: type, kind: kind });
}
consumer.on('transportclose', () => {
log.debug('Consumer "transportclose" event');
this.removeConsumer(id);
});
return {
consumer: consumer,
params: {
producerId: producer_id,
id: id,
kind: kind,
rtpParameters: rtpParameters,
type: type,
producerPaused: producerPaused,
},
};
}
removeConsumer(consumer_id) {

عرض الملف

@@ -88,6 +88,7 @@ module.exports = class Room {
}
closeRouter() {
log.debug('Close Room router id', this.router.id);
this.router.close();
}
@@ -204,11 +205,7 @@ module.exports = class Room {
}
getPeer(socket_id) {
//
if (!this.peers.has(socket_id)) {
log.error('---> Peer not found for socket ID', socket_id);
return null;
}
if (!this.peers.has(socket_id)) return;
const peer = this.peers.get(socket_id);
@@ -240,11 +237,9 @@ module.exports = class Room {
}
async removePeer(socket_id) {
const peer = this.getPeer(socket_id);
if (!this.peers.has(socket_id)) return;
if (!peer || typeof peer !== 'object') {
return;
}
const peer = this.getPeer(socket_id);
const { id, peer_name } = peer;
@@ -274,6 +269,8 @@ module.exports = class Room {
// ####################################################
async createWebRtcTransport(socket_id) {
if (!this.peers.has(socket_id)) return;
const { maxIncomingBitrate, initialAvailableOutgoingBitrate, listenInfos } = this.webRtcTransport;
const webRtcTransportOptions = {
@@ -290,7 +287,7 @@ module.exports = class Room {
const transport = await this.router.createWebRtcTransport(webRtcTransportOptions);
if (!transport) {
return this.callback('[Room|createWebRtcTransport] Failed to create WebRTC transport');
throw new Error('Create WebRtc Transport failed!');
}
const { id, iceParameters, iceCandidates, dtlsParameters } = transport;
@@ -298,16 +295,14 @@ module.exports = class Room {
if (maxIncomingBitrate) {
try {
await transport.setMaxIncomingBitrate(maxIncomingBitrate);
} catch (error) {
log.debug('Transport setMaxIncomingBitrate error', error.message);
}
} catch (error) {}
}
const peer = this.getPeer(socket_id);
if (!peer || typeof peer !== 'object') {
return this.callback(`[Room|createWebRtcTransport] Peer object not found for socket ID: ${socket_id}`);
}
peer.addTransport(transport);
log.debug('Transport created', { transportId: id });
const { peer_name } = peer;
@@ -342,13 +337,12 @@ module.exports = class Room {
});
transport.on('close', () => {
log.debug('Transport closed', { peer_name: peer_name, transport_id: transport.id });
log.debug('Transport closed', {
peer_name: peer_name,
transport_id: transport.id,
});
});
peer.addTransport(transport);
log.debug('Transport created', { transportId: id });
return {
id: id,
iceParameters: iceParameters,
@@ -358,28 +352,13 @@ module.exports = class Room {
}
async connectPeerTransport(socket_id, transport_id, dtlsParameters) {
try {
if (!socket_id || !transport_id || !dtlsParameters) {
return this.callback('[Room|connectPeerTransport] Invalid input parameters');
}
if (!this.peers.has(socket_id)) return;
const peer = this.getPeer(socket_id);
const peer = this.getPeer(socket_id);
if (!peer || typeof peer !== 'object') {
return this.callback(`[Room|connectPeerTransport] Peer object not found for socket ID: ${socket_id}`);
}
await peer.connectTransport(transport_id, dtlsParameters);
const connectTransport = await peer.connectTransport(transport_id, dtlsParameters);
if (!connectTransport) {
return this.callback(`[Room|connectPeerTransport] error: Transport with ID ${transport_id} not found`);
}
return '[Room|connectPeerTransport] done';
} catch (error) {
log.error('Error connecting peer transport', error.message);
return this.callback(`[Room|connectPeerTransport] error: ${error.message}`);
}
return '[Room|connectPeerTransport] done';
}
// ####################################################
@@ -387,21 +366,14 @@ module.exports = class Room {
// ####################################################
async produce(socket_id, producerTransportId, rtpParameters, kind, type) {
//
if (!socket_id || !producerTransportId || !rtpParameters || !kind || !type) {
return this.callback('[Room|produce] Invalid input parameters');
}
if (!this.peers.has(socket_id)) return;
const peer = this.getPeer(socket_id);
if (!peer || typeof peer !== 'object') {
return this.callback(`[Room|produce] Peer object not found for socket ID: ${socket_id}`);
}
const peerProducer = await peer.createProducer(producerTransportId, rtpParameters, kind, type);
if (!peerProducer || !peerProducer.id) {
return this.callback(`[Room|produce] Peer producer error: '${peerProducer}'`);
if (!peerProducer) {
throw new Error(`Peer producer kind ${kind} with id ${producerTransportId} not found`);
}
const { id } = peerProducer;
@@ -422,14 +394,10 @@ module.exports = class Room {
}
closeProducer(socket_id, producer_id) {
if (!socket_id || !producer_id) return;
if (!this.peers.has(socket_id)) return;
const peer = this.getPeer(socket_id);
if (!peer || typeof peer !== 'object') {
return;
}
peer.closeProducer(producer_id);
}
@@ -438,53 +406,47 @@ module.exports = class Room {
// ####################################################
async consume(socket_id, consumer_transport_id, producer_id, rtpCapabilities) {
try {
if (!socket_id || !consumer_transport_id || !producer_id || !rtpCapabilities) {
return this.callback('[Room|consume] Invalid input parameters');
}
if (!this.peers.has(socket_id)) return;
if (!this.router.canConsume({ producerId: producer_id, rtpCapabilities })) {
log.warn('Cannot consume', {
socket_id,
consumer_transport_id,
producer_id,
});
return this.callback(`[Room|consume] Room router cannot consume producer_id: '${producer_id}'`);
}
const peer = this.getPeer(socket_id);
if (!peer || typeof peer !== 'object') {
return this.callback(`[Room|consume] Peer object not found for socket ID: ${socket_id}`);
}
const peerConsumer = await peer.createConsumer(consumer_transport_id, producer_id, rtpCapabilities);
if (!peerConsumer || !peerConsumer.consumer || !peerConsumer.params) {
log.debug('peerConsumer or params are not defined');
return this.callback(`[Room|consume] peerConsumer error: '${peerConsumer}'`);
}
const { consumer, params } = peerConsumer;
const { id, kind } = consumer;
consumer.on('producerclose', () => {
log.debug('Consumer closed due to "producerclose" event');
peer.removeConsumer(id);
// Notify the client that consumer is closed
this.send(socket_id, 'consumerClosed', {
consumer_id: id,
consumer_kind: kind,
});
if (
!this.router.canConsume({
producerId: producer_id,
rtpCapabilities,
})
) {
log.warn('Cannot consume', {
socket_id,
consumer_transport_id,
producer_id,
});
return params;
} catch (error) {
log.error('Error occurred during consumption', error.message);
return this.callback(`[Room|consume] ${error.message}`);
return;
}
const peer = this.getPeer(socket_id);
const peerConsumer = await peer.createConsumer(consumer_transport_id, producer_id, rtpCapabilities);
if (!peerConsumer) {
throw new Error(`Peer consumer kind ${kind} with id ${consumer_transport_id} not found`);
}
const { consumer, params } = peerConsumer;
const { id, kind } = consumer;
consumer.on('producerclose', () => {
log.debug('Consumer closed due to "producerclose" event');
peer.removeConsumer(id);
// Notify the client that consumer is closed
this.send(socket_id, 'consumerClosed', {
consumer_id: id,
consumer_kind: kind,
});
});
return params;
}
// ####################################################
@@ -543,14 +505,6 @@ module.exports = class Room {
this._hostOnlyRecording = status;
}
// ####################################################
// ERRORS
// ####################################################
callback(message) {
return { error: message };
}
// ####################################################
// SENDER
// ####################################################

عرض الملف

@@ -41,7 +41,7 @@ dependencies: {
* @license For commercial or closed source, contact us at license.mirotalk@gmail.com or purchase directly via CodeCanyon
* @license CodeCanyon: https://codecanyon.net/item/mirotalk-sfu-webrtc-realtime-video-conferences/40769970
* @author Miroslav Pejic - miroslav.pejic.85@gmail.com
* @version 1.4.23
* @version 1.4.24
*
*/
@@ -907,11 +907,9 @@ function startServer() {
log.info('User joined', data);
const { peer_token } = data.peer_info;
const room = roomList.get(socket.room_id);
const { peer_name, peer_id, peer_uuid, os_name, os_version, browser_name, browser_version } =
const { peer_name, peer_id, peer_uuid, peer_token, os_name, os_version, browser_name, browser_version } =
data.peer_info;
let is_presenter = true;
@@ -1063,7 +1061,7 @@ function startServer() {
callback(getRouterRtpCapabilities);
} catch (err) {
log.error('Get RouterRtpCapabilities error', err.message);
log.error('Get RouterRtpCapabilities error', err);
callback({
error: err.message,
});
@@ -1086,7 +1084,7 @@ function startServer() {
callback(createWebRtcTransport);
} catch (err) {
log.error('Create WebRtc Transport error', err.message);
log.error('Create WebRtc Transport error', err);
callback({
error: err.message,
});
@@ -1111,7 +1109,7 @@ function startServer() {
callback(connectTransport);
} catch (err) {
log.error('Connect transport error', err.message);
log.error('Connect transport error', err);
callback({
error: err.message,
});
@@ -1134,7 +1132,9 @@ function startServer() {
try {
const transport = peer.getTransport(transport_id);
if (!transport) throw new Error(`Restart ICE, transport with id "${transport_id}" not found`);
if (!transport) {
throw new Error(`Restart ICE, transport with id "${transport_id}" not found`);
}
const iceParameters = await transport.restartIce();
@@ -1142,7 +1142,7 @@ function startServer() {
callback(iceParameters);
} catch (err) {
log.error('Restart ICE error', err.message);
log.error('Restart ICE error', err);
callback({
error: err.message,
});
@@ -1200,7 +1200,7 @@ function startServer() {
producer_id,
});
} catch (err) {
log.error('Producer transport error', err.message);
log.error('Producer transport error', err);
callback({
error: err.message,
});
@@ -1229,7 +1229,7 @@ function startServer() {
callback(params);
} catch (err) {
log.error('Consumer transport error', err.message);
log.error('Consumer transport error', err);
callback({
error: err.message,
});