[mirotalksfu] - add RTMP server and multi-source streaming!, update dep

هذا الالتزام موجود في:
Miroslav Pejic
2024-06-29 18:49:10 +02:00
الأصل aaf5fe44ed
التزام 3929212631
52 ملفات معدلة مع 3986 إضافات و132 حذوفات

عرض الملف

@@ -8,6 +8,7 @@
███████ ███████ ██  ██   ████   ███████ ██  ██             
dependencies: {
@ffmpeg-installer/ffmpeg: https://www.npmjs.com/package/@ffmpeg-installer/ffmpeg
@sentry/node : https://www.npmjs.com/package/@sentry/node
@sentry/integrations : https://www.npmjs.com/package/@sentry/integrations
axios : https://www.npmjs.com/package/axios
@@ -18,6 +19,7 @@ dependencies: {
crypto-js : https://www.npmjs.com/package/crypto-js
express : https://www.npmjs.com/package/express
express-openid-connect : https://www.npmjs.com/package/express-openid-connect
fluent-ffmpeg : https://www.npmjs.com/package/fluent-ffmpeg
httpolyglot : https://www.npmjs.com/package/httpolyglot
jsonwebtoken : https://www.npmjs.com/package/jsonwebtoken
js-yaml : https://www.npmjs.com/package/js-yaml
@@ -42,7 +44,7 @@ dependencies: {
* @license For commercial or closed source, contact us at license.mirotalk@gmail.com or purchase directly via CodeCanyon
* @license CodeCanyon: https://codecanyon.net/item/mirotalk-sfu-webrtc-realtime-video-conferences/40769970
* @author Miroslav Pejic - miroslav.pejic.85@gmail.com
* @version 1.4.51
* @version 1.4.70
*
*/
@@ -50,6 +52,7 @@ const express = require('express');
const { auth, requiresAuth } = require('express-openid-connect');
const cors = require('cors');
const compression = require('compression');
const socketIo = require('socket.io');
const https = require('httpolyglot');
const mediasoup = require('mediasoup');
const mediasoupClient = require('mediasoup-client');
@@ -75,6 +78,17 @@ const { CaptureConsole } = require('@sentry/integrations');
const restrictAccessByIP = require('./middleware/IpWhitelist.js');
const packageJson = require('../../package.json');
// Incoming Stream to RTPM
const { v4: uuidv4 } = require('uuid');
const crypto = require('crypto-js');
const RtmpStreamer = require('./RtmpStreamer.js'); // Import the RtmpStreamer class
const rtmpCfg = config.server.rtmp;
const rtmpDir = rtmpCfg && rtmpCfg.dir ? rtmpCfg.dir : 'rtmp';
// File and Url Rtmp streams count
let rtmpFileStreamsCount = 0;
let rtmpUrlStreamsCount = 0;
// Email alerts and notifications
const nodemailer = require('./lib/nodemailer');
@@ -98,7 +112,7 @@ const corsOptions = {
};
const httpsServer = https.createServer(options, app);
const io = require('socket.io')(httpsServer, {
const io = socketIo(httpsServer, {
maxHttpBufferSize: 1e7,
transports: ['websocket'],
cors: corsOptions,
@@ -169,7 +183,7 @@ if (config.chatGPT.enabled) {
};
chatGPT = new OpenAI(configuration);
} else {
log.warning('ChatGPT seems enabled, but you missing the apiKey!');
log.warn('ChatGPT seems enabled, but you missing the apiKey!');
}
}
@@ -200,13 +214,16 @@ const views = {
permission: path.join(__dirname, '../../', 'public/views/permission.html'),
privacy: path.join(__dirname, '../../', 'public/views/privacy.html'),
room: path.join(__dirname, '../../', 'public/views/Room.html'),
rtmpStreamer: path.join(__dirname, '../../', 'public/views/RtmpStreamer.html'),
};
const authHost = new Host(); // Authenticated IP by Login
const roomList = new Map(); // All Rooms
const presenters = {}; // collect presenters grp by roomId
const presenters = {}; // Collect presenters grp by roomId
const streams = {}; // Collect all rtmp streams
const webRtcServerActive = config.mediasoup.webRtcServerActive;
@@ -288,24 +305,28 @@ function startServer() {
// Start the app
app.use(cors(corsOptions));
app.use(compression());
app.use(express.json());
app.use(express.json({ limit: '50mb' })); // Ensure the body parser can handle large files
app.use(express.static(dir.public));
app.use(bodyParser.urlencoded({ extended: true }));
app.use(bodyParser.raw({ type: 'video/webm', limit: '50mb' })); // handle raw binary data
app.use(bodyParser.raw({ type: 'application/octet-stream', limit: '50mb' })); // handle raw binary data
app.use(restApi.basePath + '/docs', swaggerUi.serve, swaggerUi.setup(swaggerDocument)); // api docs
// IP Whitelist check ...
app.use(restrictAccessByIP);
// Logs requests
/*
app.use((req, res, next) => {
log.debug('New request:', {
// headers: req.headers,
headers: req.headers,
body: req.body,
method: req.method,
path: req.originalUrl,
});
next();
});
*/
// POST start from here...
app.post('*', function (next) {
@@ -407,6 +428,14 @@ function startServer() {
}
});
// Route to display rtmp streamer
app.get('/rtmp', OIDCAuth, (req, res) => {
if (!rtmpCfg || !rtmpCfg.fromStream) {
return res.json({ message: 'The RTMP Streamer is currently disabled.' });
}
return res.sendFile(views.rtmpStreamer);
});
// set new room name and join
app.get(['/newroom'], OIDCAuth, (req, res) => {
//log.info('/newroom - hostCfg ----->', hostCfg);
@@ -655,6 +684,128 @@ function startServer() {
}
});
// ###############################################################
// INCOMING STREAM (getUserMedia || getDisplayMedia) TO RTMP
// ###############################################################
function checkRTMPApiSecret(req, res, next) {
const expectedApiSecret = rtmpCfg && rtmpCfg.apiSecret;
const apiSecret = req.headers.authorization;
if (!apiSecret || apiSecret !== expectedApiSecret) {
log.warn('RTMP apiSecret Unauthorized', {
apiSecret: apiSecret,
expectedApiSecret: expectedApiSecret,
});
return res.status(401).send('Unauthorized');
}
next();
}
function checkMaxStreams(req, res, next) {
const maxStreams = (rtmpCfg && rtmpCfg.maxStreams) || 1; // Set your maximum allowed streams here
const activeStreams = Object.keys(streams).length;
if (activeStreams >= maxStreams) {
log.warn('Maximum number of streams reached', activeStreams);
return res.status(429).send('Maximum number of streams reached, please try later!');
}
next();
}
app.get('/activeStreams', checkRTMPApiSecret, (req, res) => {
const activeStreams = Object.keys(streams).length;
log.info('Active Streams', activeStreams);
res.json(activeStreams);
});
app.get('/rtmpEnabled', (req, res) => {
const rtmpEnabled = rtmpCfg && rtmpCfg.enabled;
log.debug('RTMP enabled', rtmpEnabled);
res.json({ enabled: rtmpEnabled });
});
app.post('/initRTMP', checkRTMPApiSecret, checkMaxStreams, async (req, res) => {
if (!rtmpCfg || !rtmpCfg.enabled) {
return res.status(400).send('RTMP server is not enabled or missing the config');
}
const domainName = config.ngrok.enabled ? 'localhost' : req.headers.host.split(':')[0];
const rtmpServer = rtmpCfg.server != '' ? rtmpCfg.server : false;
const rtmpServerAppName = rtmpCfg.appName != '' ? rtmpCfg.appName : 'live';
const rtmpStreamKey = rtmpCfg.streamKey != '' ? rtmpCfg.streamKey : uuidv4();
const rtmpServerSecret = rtmpCfg.secret != '' ? rtmpCfg.secret : false;
const expirationHours = rtmpCfg.expirationHours || 4;
const rtmpServerURL = rtmpServer ? rtmpServer : `rtmp://${domainName}:1935`;
const rtmpServerPath = '/' + rtmpServerAppName + '/' + rtmpStreamKey;
const rtmp = rtmpServerSecret
? generateRTMPUrl(rtmpServerURL, rtmpServerPath, rtmpServerSecret, expirationHours)
: rtmpServerURL + rtmpServerPath;
log.info('initRTMP', {
headers: req.headers,
rtmpServer,
rtmpServerSecret,
rtmpServerURL,
rtmpServerPath,
expirationHours,
rtmpStreamKey,
rtmp,
});
const stream = new RtmpStreamer(rtmp, rtmpStreamKey);
streams[rtmpStreamKey] = stream;
log.info('Active RTMP Streams', Object.keys(streams).length);
return res.json({ rtmp });
});
app.post('/streamRTMP', checkRTMPApiSecret, (req, res) => {
if (!rtmpCfg || !rtmpCfg.enabled) {
return res.status(400).send('RTMP server is not enabled');
}
if (!req.body || req.body.length === 0) {
return res.status(400).send('Invalid video data');
}
const rtmpStreamKey = req.query.key;
const stream = streams[rtmpStreamKey];
if (!stream || !stream.isRunning()) {
delete streams[rtmpStreamKey];
log.debug('Stream not found', { rtmpStreamKey, streams: Object.keys(streams).length });
return res.status(404).send('FFmpeg Stream not found');
}
log.debug('Received video data', {
// data: req.body.slice(0, 20).toString('hex'),
key: rtmpStreamKey,
size: bytesToSize(req.headers['content-length']),
});
stream.write(Buffer.from(req.body));
res.sendStatus(200);
});
app.post('/stopRTMP', checkRTMPApiSecret, (req, res) => {
if (!rtmpCfg || !rtmpCfg.enabled) {
return res.status(400).send('RTMP server is not enabled');
}
const rtmpStreamKey = req.query.key;
const stream = streams[rtmpStreamKey];
if (stream) {
stream.end();
delete streams[rtmpStreamKey];
log.debug('Active RTMP Streams', Object.keys(streams).length);
}
res.sendStatus(200);
});
// ####################################################
// REST API
// ####################################################
@@ -1098,6 +1249,10 @@ function startServer() {
log.info('[Join] - current active rooms', activeRooms);
const activeStreams = getRTMPActiveStreams();
log.info('[Join] - current active RTMP streams', activeStreams);
if (!(socket.room_id in presenters)) presenters[socket.room_id] = {};
// Set the presenters
@@ -2135,6 +2290,93 @@ function startServer() {
}
});
socket.on('getRTMP', async ({}, cb) => {
if (!roomList.has(socket.room_id)) return;
const room = roomList.get(socket.room_id);
const rtmpFiles = await room.getRTMP(rtmpDir);
cb(rtmpFiles);
});
socket.on('startRTMP', async (dataObject, cb) => {
if (!roomList.has(socket.room_id)) return;
if (rtmpCfg && rtmpFileStreamsCount >= rtmpCfg.maxStreams) {
log.warn('RTMP max file streams reached', rtmpFileStreamsCount);
return cb(false);
}
const data = checkXSS(dataObject);
const { peer_name, peer_uuid, file } = data;
const isPresenter = await isPeerPresenter(socket.room_id, socket.id, peer_name, peer_uuid);
if (!isPresenter) return cb(false);
const room = roomList.get(socket.room_id);
const host = config.ngrok.enabled ? 'localhost' : socket.handshake.headers.host.split(':')[0];
const rtmp = await room.startRTMP(socket.id, room, host, 1935, `../${rtmpDir}/${file}`);
if (rtmp !== false) rtmpFileStreamsCount++;
log.debug('startRTMP - rtmpFileStreamsCount ---->', rtmpFileStreamsCount);
cb(rtmp);
});
socket.on('stopRTMP', async () => {
if (!roomList.has(socket.room_id)) return;
const room = roomList.get(socket.room_id);
rtmpFileStreamsCount--;
log.debug('stopRTMP - rtmpFileStreamsCount ---->', rtmpFileStreamsCount);
await room.stopRTMP();
});
socket.on('endOrErrorRTMP', async () => {
if (!roomList.has(socket.room_id)) return;
rtmpFileStreamsCount--;
log.debug('endRTMP - rtmpFileStreamsCount ---->', rtmpFileStreamsCount);
});
socket.on('startRTMPfromURL', async (dataObject, cb) => {
if (!roomList.has(socket.room_id)) return;
if (rtmpCfg && rtmpUrlStreamsCount >= rtmpCfg.maxStreams) {
log.warn('RTMP max Url streams reached', rtmpUrlStreamsCount);
return cb(false);
}
const data = checkXSS(dataObject);
const { peer_name, peer_uuid, inputVideoURL } = data;
const isPresenter = await isPeerPresenter(socket.room_id, socket.id, peer_name, peer_uuid);
if (!isPresenter) return cb(false);
const room = roomList.get(socket.room_id);
const host = config.ngrok.enabled ? 'localhost' : socket.handshake.headers.host.split(':')[0];
const rtmp = await room.startRTMPfromURL(socket.id, room, host, 1935, inputVideoURL);
if (rtmp !== false) rtmpUrlStreamsCount++;
log.debug('startRTMPfromURL - rtmpUrlStreamsCount ---->', rtmpUrlStreamsCount);
cb(rtmp);
});
socket.on('stopRTMPfromURL', async () => {
if (!roomList.has(socket.room_id)) return;
const room = roomList.get(socket.room_id);
rtmpUrlStreamsCount--;
log.debug('stopRTMPfromURL - rtmpUrlStreamsCount ---->', rtmpUrlStreamsCount);
await room.stopRTMPfromURL();
});
socket.on('endOrErrorRTMPfromURL', async () => {
if (!roomList.has(socket.room_id)) return;
rtmpUrlStreamsCount--;
log.debug('endRTMPfromURL - rtmpUrlStreamsCount ---->', rtmpUrlStreamsCount);
});
socket.on('disconnect', async () => {
if (!roomList.has(socket.room_id)) return;
@@ -2152,6 +2394,8 @@ function startServer() {
if (room.getPeers().size === 0) {
//
stopRTMPActiveStreams(isPresenter, room);
roomList.delete(socket.room_id);
delete presenters[socket.room_id];
@@ -2161,6 +2405,10 @@ function startServer() {
const activeRooms = getActiveRooms();
log.info('[Disconnect] - Last peer - current active rooms', activeRooms);
const activeStreams = getRTMPActiveStreams();
log.info('[Disconnect] - Last peer - current active RTMP streams', activeStreams);
}
room.broadCast(socket.id, 'removeMe', removeMeData(room, peer_name, isPresenter));
@@ -2193,6 +2441,8 @@ function startServer() {
if (room.getPeers().size === 0) {
//
stopRTMPActiveStreams(isPresenter, room);
roomList.delete(socket.room_id);
delete presenters[socket.room_id];
@@ -2202,6 +2452,10 @@ function startServer() {
const activeRooms = getActiveRooms();
log.info('[REMOVE ME] - Last peer - current active rooms', activeRooms);
const activeStreams = getRTMPActiveStreams();
log.info('[REMOVE ME] - Last peer - current active RTMP streams', activeStreams);
}
socket.room_id = null;
@@ -2270,15 +2524,54 @@ function startServer() {
log.debug('[REMOVE ME DATA]', data);
return data;
}
function bytesToSize(bytes) {
let sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB'];
if (bytes == 0) return '0 Byte';
let i = parseInt(Math.floor(Math.log(bytes) / Math.log(1024)));
return Math.round(bytes / Math.pow(1024, i), 2) + ' ' + sizes[i];
}
});
function generateRTMPUrl(baseURL, streamPath, secretKey, expirationHours = 4) {
const currentTime = Math.floor(Date.now() / 1000);
const expirationTime = currentTime + expirationHours * 3600;
const hashValue = crypto.MD5(`${streamPath}-${expirationTime}-${secretKey}`).toString();
const rtmpUrl = `${baseURL}${streamPath}?sign=${expirationTime}-${hashValue}`;
log.debug('generateRTMPUrl', {
currentTime,
expirationTime,
hashValue,
rtmpUrl,
});
return rtmpUrl;
}
function getRTMPActiveStreams() {
return {
rtmpStreams: Object.keys(streams).length,
rtmpFileStreamsCount,
rtmpUrlStreamsCount,
};
}
function stopRTMPActiveStreams(isPresenter, room) {
if (isPresenter) {
if (room.isRtmpFileStreamerActive()) {
room.stopRTMP();
rtmpFileStreamsCount--;
log.info('[REMOVE ME] - Stop RTMP Stream From FIle', rtmpFileStreamsCount);
}
if (room.isRtmpUrlStreamerActive()) {
room.stopRTMPfromURL();
rtmpUrlStreamsCount--;
log.info('[REMOVE ME] - Stop RTMP Stream From URL', rtmpUrlStreamsCount);
}
}
}
function bytesToSize(bytes) {
const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB'];
if (bytes == 0) return '0 Byte';
const i = parseInt(Math.floor(Math.log(bytes) / Math.log(1024)));
return Math.round(bytes / Math.pow(1024, i), 2) + ' ' + sizes[i];
}
function clone(value) {
if (value === undefined) return undefined;
if (Number.isNaN(value)) return NaN;