[mirotalksfu] - refactoring

هذا الالتزام موجود في:
Miroslav Pejic
2025-05-02 23:09:00 +02:00
الأصل 2c09ebbb9a
التزام d5a6678fe4
8 ملفات معدلة مع 216 إضافات و70 حذوفات

عرض الملف

@@ -92,6 +92,10 @@ module.exports = class Peer {
// TRANSPORT
// ####################################################
hasTransport(transport_id) {
return this.transports.has(transport_id);
}
getTransports() {
return JSON.parse(JSON.stringify([...this.transports]));
}
@@ -109,16 +113,28 @@ module.exports = class Peer {
}
async connectTransport(transport_id, dtlsParameters) {
if (!transport_id || !dtlsParameters) {
throw new Error('Missing required parameters for connecting a transport');
}
if (!this.transports.has(transport_id)) {
throw new Error(`Transport with ID ${transport_id} not found`);
}
const transport = this.transports.get(transport_id);
try {
await this.transports.get(transport_id).connect({
dtlsParameters: dtlsParameters,
// Connect the transport
await transport.connect({ dtlsParameters });
log.debug('Transport connected successfully', {
transport_id,
peer_name: this.peer_name,
});
} catch (error) {
log.error(`Failed to connect transport with ID ${transport_id}`, error);
log.error(`Failed to connect transport with ID ${transport_id}`, {
error: error.message,
peer_name: this.peer_name,
});
throw new Error(`Failed to connect transport with ID ${transport_id}`);
}
@@ -173,6 +189,10 @@ module.exports = class Peer {
}
async createProducer(producerTransportId, producer_rtpParameters, producer_kind, producer_type) {
if (!producerTransportId || !producer_rtpParameters || !producer_kind || !producer_type) {
throw new Error('Missing required parameters for creating a producer');
}
if (!this.transports.has(producerTransportId)) {
throw new Error(`Producer transport with ID ${producerTransportId} not found`);
}
@@ -186,10 +206,18 @@ module.exports = class Peer {
rtpParameters: producer_rtpParameters,
});
} catch (error) {
log.error(`Error creating producer for transport ID ${producerTransportId}:`, error);
log.error(`Error creating producer for transport ID ${producerTransportId}`, {
error: error.message,
producer_kind,
producer_type,
});
throw new Error(`Failed to create producer for transport ID ${producerTransportId}`);
}
if (!producer) {
throw new Error(`Producer creation failed for transport ID ${producerTransportId}`);
}
const { id, appData, type, kind, rtpParameters } = producer;
appData.mediaType = producer_type;
@@ -207,7 +235,7 @@ module.exports = class Peer {
temporalLayer,
});
} else {
log.debug('Producer ----->', { type, kind });
log.debug('Producer created ----->', { type, kind });
}
producer.on('transportclose', () => {
@@ -222,23 +250,29 @@ module.exports = class Peer {
if (!this.producers.has(producer_id)) return;
const producer = this.getProducer(producer_id);
const { id, kind, type, appData } = producer;
try {
producer.close();
log.debug('Producer closed successfully', {
producer_id: producer.id,
peer_name: this.peer_name,
kind: producer.kind,
type: producer.type,
appData: producer.appData,
});
} catch (error) {
log.warn('Close Producer', error.message);
log.error(`Error closing producer with ID ${producer_id}`, {
error: error.message,
peer_name: this.peer_name,
});
}
this.delProducer(producer_id);
log.debug('Producer closed and deleted', {
log.debug('Producer removed from peer', {
producer_id: producer.id,
peer_name: this.peer_name,
kind: kind,
type: type,
appData: appData,
producer_id: id,
producer_closed: producer.closed,
});
}
@@ -263,6 +297,10 @@ module.exports = class Peer {
}
async createConsumer(consumer_transport_id, producerId, rtpCapabilities) {
if (!consumer_transport_id || !producerId || !rtpCapabilities) {
throw new Error('Missing required parameters for creating a consumer');
}
if (!this.transports.has(consumer_transport_id)) {
throw new Error(`Consumer transport with ID ${consumer_transport_id} not found`);
}
@@ -275,14 +313,21 @@ module.exports = class Peer {
producerId,
rtpCapabilities,
enableRtx: true, // Enable NACK for OPUS.
paused: true,
ignoreDtx: true,
paused: true, // Start the consumer in a paused state
ignoreDtx: true, // Ignore DTX (Discontinuous Transmission)
});
} catch (error) {
log.error(`Error creating consumer for transport ID ${consumer_transport_id}`, error);
log.error(`Error creating consumer for transport ID ${consumer_transport_id}`, {
error: error.message,
producerId,
});
throw new Error(`Failed to create consumer for transport ID ${consumer_transport_id}`);
}
if (!consumer) {
throw new Error(`Consumer creation failed for transport ID ${consumer_transport_id}`);
}
const { id, type, kind, rtpParameters, producerPaused } = consumer;
this.addConsumer(id, consumer);
@@ -310,7 +355,7 @@ module.exports = class Peer {
});
}
} else {
log.debug('Consumer ----->', { type, kind });
log.debug('Consumer created ----->', { type, kind });
}
consumer.on('transportclose', () => {
@@ -319,14 +364,14 @@ module.exports = class Peer {
});
return {
consumer: consumer,
consumer,
params: {
producerId,
id: id,
kind: kind,
rtpParameters: rtpParameters,
type: type,
producerPaused: producerPaused,
id,
kind,
rtpParameters,
type,
producerPaused,
},
};
}
@@ -335,22 +380,28 @@ module.exports = class Peer {
if (!this.consumers.has(consumer_id)) return;
const consumer = this.getConsumer(consumer_id);
const { id, kind, type } = consumer;
try {
consumer.close();
log.debug('Consumer closed successfully', {
consumer_id: consumer.id,
peer_name: this.peer_name,
kind: consumer.kind,
type: consumer.type,
});
} catch (error) {
log.warn('Close Consumer', error.message);
log.error(`Error closing consumer with ID ${consumer_id}`, {
error: error.message,
peer_name: this.peer_name,
});
}
this.delConsumer(consumer_id);
log.debug('Consumer closed and deleted', {
log.debug('Consumer removed from peer', {
consumer_id: consumer.id,
peer_name: this.peer_name,
kind: kind,
type: type,
consumer_id: id,
consumer_closed: consumer.closed,
});
}
};

عرض الملف

@@ -328,11 +328,10 @@ module.exports = class Room {
}
closeRouter() {
this.stopAudioLevelObserver();
this.stopActiveSpeakerObserver();
this.router.close();
log.debug('Close Room router', {
router_id: this.router.id,
router_closed: this.router.closed,
});
log.debug('Router closed', { router_id: this.router.id });
}
// ####################################################
@@ -397,6 +396,14 @@ module.exports = class Room {
}
}
stopAudioLevelObserver() {
if (this.audioLevelObserver) {
this.audioLevelObserver.close();
this.audioLevelObserver = null;
log.debug('Audio Level Observer closed');
}
}
// ####################################################
// PRODUCER DOMINANT ACTIVE SPEAKER
// ####################################################
@@ -431,6 +438,14 @@ module.exports = class Room {
}
}
stopActiveSpeakerObserver() {
if (this.activeSpeakerObserver) {
this.activeSpeakerObserver.close();
this.activeSpeakerObserver = null;
log.debug('Active Speaker Observer closed');
}
}
// ####################################################
// ROOM MODERATOR
// ####################################################
@@ -537,31 +552,44 @@ module.exports = class Room {
// WebRTC TRANSPORT
// ####################################################
getWebRtcTransportOptions() {
const { iceConsentTimeout = 35, initialAvailableOutgoingBitrate, listenInfos } = this.webRtcTransport;
return {
...(this.webRtcServerActive ? { webRtcServer: this.webRtcServer } : { listenInfos: listenInfos }),
enableUdp: true,
enableTcp: true,
preferUdp: true,
iceConsentTimeout,
initialAvailableOutgoingBitrate,
};
}
async createWebRtcTransport(socket_id) {
if (!this.peers.has(socket_id)) {
throw new Error(`Peer with socket ID ${socket_id} not found in the room`);
}
const { maxIncomingBitrate, initialAvailableOutgoingBitrate, listenInfos } = this.webRtcTransport;
const webRtcTransportOptions = {
...(this.webRtcServerActive ? { webRtcServer: this.webRtcServer } : { listenInfos: listenInfos }),
enableUdp: true,
enableTcp: true,
preferUdp: true,
iceConsentTimeout: 35,
initialAvailableOutgoingBitrate,
};
const webRtcTransportOptions = this.getWebRtcTransportOptions();
log.debug('webRtcTransportOptions ----->', webRtcTransportOptions);
const transport = await this.router.createWebRtcTransport(webRtcTransportOptions);
let transport;
try {
transport = await this.router.createWebRtcTransport(webRtcTransportOptions);
if (!transport) {
throw new Error('Failed to create WebRTC Transport');
}
} catch (error) {
log.error('Error creating WebRTC Transport', { error: error.message, socket_id });
throw new Error('Error creating WebRTC Transport');
}
if (!transport) {
throw new Error('Failed to create WebRtc Transport');
if (transport.closed) {
throw new Error('Transport is already closed');
}
const { id, type, iceParameters, iceCandidates, dtlsParameters } = transport;
const { maxIncomingBitrate } = this.webRtcTransport;
if (maxIncomingBitrate) {
try {
@@ -580,7 +608,12 @@ module.exports = class Room {
throw new Error(`Failed to add peer transport ${id}`);
}
log.debug('Transport created', { transportId: id, transportType: type });
log.debug('Transport created', {
room_id: this.id,
transport_id: transport.id,
type: type,
peer_name: peer.peer_name,
});
const { peer_name } = peer;
@@ -597,12 +630,22 @@ module.exports = class Room {
});
transport.on('icestatechange', (iceState) => {
if (iceState === 'disconnected' || iceState === 'closed') {
log.warn('ICE state changed, closing peer', {
peer_name: peer_name,
transport_id: id,
iceState: iceState,
});
log.warn('ICE state changed', {
peer_name: peer_name,
transport_id: id,
iceState: iceState,
});
if (iceState === 'disconnected') {
log.warn(`ICE state disconnected for transport ${transport.id}, waiting before closing`);
setTimeout(() => {
if (transport.iceState === 'disconnected') {
log.warn(`Closing transport ${transport.id} due to prolonged ICE disconnection`);
transport.close();
}
}, 5000); // Wait 5 seconds before closing
} else if (iceState === 'closed') {
log.warn(`ICE state closed for transport ${transport.id}`);
transport.close();
}
});
@@ -642,6 +685,10 @@ module.exports = class Room {
}
async connectPeerTransport(socket_id, transport_id, dtlsParameters) {
if (!socket_id || !transport_id || !dtlsParameters) {
throw new Error('Missing required parameters for connecting peer transport');
}
if (!this.peers.has(socket_id)) {
throw new Error(`Peer with socket ID ${socket_id} not found in the room`);
}
@@ -650,8 +697,17 @@ module.exports = class Room {
try {
await peer.connectTransport(transport_id, dtlsParameters);
log.debug('Peer transport connected successfully', {
socket_id,
transport_id,
peer_name: peer.peer_name,
});
} catch (error) {
log.error(`Failed to connect peer transport for socket ID ${socket_id}`, error);
log.error(`Failed to connect peer transport for socket ID ${socket_id}`, {
transport_id,
error: error.message,
peer_name: peer.peer_name,
});
throw new Error(`Failed to connect transport for peer with socket ID ${socket_id}`);
}
@@ -663,22 +719,33 @@ module.exports = class Room {
// ####################################################
async produce(socket_id, producerTransportId, rtpParameters, kind, type) {
if (!socket_id || !producerTransportId || !rtpParameters || !kind || !type) {
throw new Error('Missing required parameters for producing media');
}
if (!this.peers.has(socket_id)) {
throw new Error(`Peer with socket ID ${socket_id} not found in the room`);
}
const peer = this.getPeer(socket_id);
const { peer_name, peer_info } = peer;
let peerProducer;
if (!peer.hasTransport(producerTransportId)) {
throw new Error(`Transport with ID ${producerTransportId} not found for peer ${socket_id}`);
}
let peerProducer;
try {
peerProducer = await peer.createProducer(producerTransportId, rtpParameters, kind, type);
} catch (error) {
log.error(`Error creating producer for peer ${peer_name} with socket ID ${socket_id}`, error);
log.error(`Error creating producer for peer ${peer.peer_name} with socket ID ${socket_id}`, {
producerTransportId,
kind,
type,
error: error.message,
});
throw new Error(
`Error creating producer for peer ${peer_name} with transport ID ${producerTransportId} type ${type} for peer ${socket_id}`,
`Failed to create producer for peer ${peer.peer_name} with transport ID ${producerTransportId}`,
);
}
@@ -700,6 +767,13 @@ module.exports = class Room {
},
]);
log.debug('Producer created successfully', {
producer_id: id,
peer_name: peer.peer_name,
kind,
type,
});
return id;
}
@@ -721,12 +795,15 @@ module.exports = class Room {
// ####################################################
async consume(socket_id, consumer_transport_id, producerId, rtpCapabilities, type) {
if (!socket_id || !consumer_transport_id || !producerId || !rtpCapabilities || !type) {
throw new Error('Missing required parameters for consuming media');
}
if (!this.peers.has(socket_id)) {
throw new Error(`Peer with socket ID ${socket_id} not found in the room`);
}
const peer = this.getPeer(socket_id);
const { peer_name } = peer;
if (!this.router.canConsume({ producerId, rtpCapabilities })) {
@@ -739,7 +816,12 @@ module.exports = class Room {
try {
peerConsumer = await peer.createConsumer(consumer_transport_id, producerId, rtpCapabilities);
} catch (error) {
log.error(`Error creating consumer for peer with socket ID ${socket_id}`, error);
log.error(`Error creating consumer for peer ${peer_name} with socket ID ${socket_id}`, {
consumer_transport_id,
producerId,
type,
error: error.message,
});
throw new Error(
`Failed to create consumer for peer ${peer_name} with transport ID ${consumer_transport_id} and producer ID ${producerId} type ${type} for peer ${socket_id}`,
);
@@ -752,11 +834,14 @@ module.exports = class Room {
}
const { consumer, params } = peerConsumer;
const { id, kind } = consumer;
consumer.on('producerclose', () => {
log.debug('Consumer closed due to "producerclose" event');
log.debug('Consumer closed due to "producerclose" event', {
consumer_id: id,
producer_id: producerId,
peer_name,
});
peer.removeConsumer(id);
@@ -767,6 +852,14 @@ module.exports = class Room {
});
});
log.debug('Consumer created successfully', {
consumer_id: id,
producer_id: producerId,
peer_name,
kind,
type,
});
return params;
}

عرض الملف

@@ -64,7 +64,7 @@ dev dependencies: {
* @license For commercial or closed source, contact us at license.mirotalk@gmail.com or purchase directly via CodeCanyon
* @license CodeCanyon: https://codecanyon.net/item/mirotalk-sfu-webrtc-realtime-video-conferences/40769970
* @author Miroslav Pejic - miroslav.pejic.85@gmail.com
* @version 1.8.35
* @version 1.8.36
*
*/

عرض الملف

@@ -1448,6 +1448,8 @@ module.exports = {
},
],
iceConsentTimeout: 35, // Timeout for ICE consent (seconds)
/**
* Bandwidth Control Settings
* Kubernetes implications: