From 7827c56759dcce5aa515d3e39689bac38aef0eaf Mon Sep 17 00:00:00 2001 From: Miroslav Pejic Date: Wed, 23 Apr 2025 21:57:23 +0200 Subject: [PATCH] [mirotalksfu] - add upload to s3 recording --- .env.template | 10 +- app/src/MutexManager.js | 29 +++++ app/src/Room.js | 1 + app/src/Server.js | 227 ++++++++++++++++++++++++++++--------- app/src/config.template.js | 55 ++++++++- package.json | 9 +- public/js/Brand.js | 2 +- public/js/Room.js | 4 +- public/js/RoomClient.js | 42 ++++++- 9 files changed, 313 insertions(+), 66 deletions(-) create mode 100644 app/src/MutexManager.js diff --git a/.env.template b/.env.template index 380da8b4..2f1f3369 100644 --- a/.env.template +++ b/.env.template @@ -54,10 +54,11 @@ CORS_ORIGIN=* # Allowed CORS origins (comma- # Recording RECORDING_ENABLED=false # Enable recording functionality (true|false) +RECORDING_UPLOAD_TO_S3=false # Upload recording to AWS S3 bucket [true/false] RECORDING_ENDPOINT= # Recording service endpoint es http://localhost:8080 # Rtmp streaming -RTMP_ENABLED=true # Enable RTMP streaming (true|false) +RTMP_ENABLED=false # Enable RTMP streaming (true|false) RTMP_FROM_FILE=true # Enable local file streaming RTMP_FROM_URL=true # Enable URL streaming RTMP_FROM_STREAM=true # Enable live stream (camera, microphone, screen, window) @@ -182,6 +183,13 @@ WEBHOOK_URL=https://your-site.com/webhook-endpoint # Webhook endpoint URL # IP Geolocation IP_LOOKUP_ENABLED=false # Enable IP lookup functionality (true|false) +# AWS S3 Configuration +AWS_S3_ENABLED=false # Enable AWS S3 storage (true|false) +AWS_S3_BUCKET_NAME=mirotalk # Name of your S3 bucket (must exist) +AWS_ACCESS_KEY_ID= # AWS Access Key ID (leave empty for IAM roles) +AWS_SECRET_ACCESS_KEY= # AWS Secret Access Key (leave empty for IAM roles) +AWS_REGION= # AWS region (e.g., us-east-2, eu-west-2) + # ---------------------------------------------------- # 7. UI Customization # ---------------------------------------------------- diff --git a/app/src/MutexManager.js b/app/src/MutexManager.js new file mode 100644 index 00000000..8de195d3 --- /dev/null +++ b/app/src/MutexManager.js @@ -0,0 +1,29 @@ +'use strict'; + +const { Mutex } = require('async-mutex'); + +// In-memory file mutex registry +const fileLocks = new Map(); + +function getFileMutex(filePath) { + if (!fileLocks.has(filePath)) { + fileLocks.set(filePath, new Mutex()); + } + return fileLocks.get(filePath); +} + +async function withFileLock(filePath, fn) { + const mutex = getFileMutex(filePath); + const release = await mutex.acquire(); + + try { + return await fn(); + } finally { + release(); + if (!mutex.isLocked()) { + fileLocks.delete(filePath); // Clean up when no one is waiting + } + } +} + +module.exports = { withFileLock }; diff --git a/app/src/Room.js b/app/src/Room.js index eeeab832..f6116fa2 100644 --- a/app/src/Room.js +++ b/app/src/Room.js @@ -34,6 +34,7 @@ module.exports = class Room { this._hostOnlyRecording = false; // ########################## this.recording = { + recSyncServerToS3: (config?.integrations?.aws?.enabled && config?.media?.recording?.uploadToS3) || false, recSyncServerRecording: config?.media?.recording?.enabled || false, recSyncServerEndpoint: config?.media?.recording?.endpoint || '', }; diff --git a/app/src/Server.js b/app/src/Server.js index ac1b7d14..62b254f3 100644 --- a/app/src/Server.js +++ b/app/src/Server.js @@ -64,12 +64,16 @@ dev dependencies: { * @license For commercial or closed source, contact us at license.mirotalk@gmail.com or purchase directly via CodeCanyon * @license CodeCanyon: https://codecanyon.net/item/mirotalk-sfu-webrtc-realtime-video-conferences/40769970 * @author Miroslav Pejic - miroslav.pejic.85@gmail.com - * @version 1.8.25 + * @version 1.8.26 * */ const express = require('express'); const { auth, requiresAuth } = require('express-openid-connect'); +const { withFileLock } = require('./MutexManager'); +const { PassThrough } = require('stream'); +const { S3Client } = require('@aws-sdk/client-s3'); +const { Upload } = require('@aws-sdk/lib-storage'); const cors = require('cors'); const compression = require('compression'); const socketIo = require('socket.io'); @@ -260,6 +264,18 @@ if (rtmpEnabled) { } } +// #################################################### +// AWS S3 SETUP +// #################################################### + +const s3Client = new S3Client({ + region: config?.integrations?.aws?.region, // Set your AWS region + credentials: { + accessKeyId: config?.integrations?.aws?.accessKeyId, + secretAccessKey: config?.integrations?.aws?.secretAccessKey, + }, +}); + // html views const views = { html: path.join(__dirname, '../../public/views'), @@ -845,85 +861,188 @@ function startServer() { }); // #################################################### - // KEEP RECORDING ON SERVER DIR + // UTILITY FUNCTIONS // #################################################### - app.post('/recSync', (req, res) => { - // Store recording... - if (serverRecordingEnabled) { - // - try { - const { fileName } = checkXSS(req.query); + function isValidRequest(req, fileName, roomId, checkContentType = true) { + const contentType = req.headers['content-type']; + if (checkContentType && contentType !== 'application/octet-stream') { + throw new Error('Invalid content type'); + } - if (!fileName) { - return res.status(400).send('Filename not provided'); - } + if (!fileName || sanitizeFilename(fileName) !== fileName || !Validator.isValidRecFileNameFormat(fileName)) { + throw new Error('Invalid file name'); + } - // Sanitize and validate filename - const safeFileName = sanitizeFilename(fileName); - if (safeFileName !== fileName || !Validator.isValidRecFileNameFormat(fileName)) { - log.warn('[RecSync] - Invalid file name:', fileName); - return res.status(400).send('Invalid file name'); - } + if (!roomList || typeof roomList.has !== 'function' || !roomList.has(roomId)) { + throw new Error('Invalid room ID'); + } + } - const parts = fileName.split('_'); - const roomId = parts[1]; + function getRoomIdFromFilename(fileName) { + const parts = fileName.split('_'); + if (parts.length >= 2) { + return parts[1]; + } + throw new Error('Invalid file name format'); + } - if (!roomList.has(roomId)) { - log.warn('[RecSync] - RoomID not exists in filename', fileName); - return res.status(400).send('Invalid file name'); - } + // #################################################### + // RECORDING HANDLERS + // #################################################### - // Ensure directory exists - if (!fs.existsSync(dir.rec)) { - fs.mkdirSync(dir.rec, { recursive: true }); - } + function deleteFile(filePath) { + if (!fs.existsSync(filePath)) return false; - // Resolve and validate file path - const filePath = path.resolve(dir.rec, fileName); - if (!filePath.startsWith(path.resolve(dir.rec))) { - log.warn('[RecSync] - Attempt to save file outside allowed directory:', fileName); - return res.status(400).send('Invalid file path'); - } + try { + fs.unlinkSync(filePath); + log.info(`[Upload] File ${filePath} removed from local after S3 upload`); + } catch (err) { + log.error(`[Upload] Failed to delete local file ${filePath}`, err.message); + } + } - //Validate content type - if (!['application/octet-stream'].includes(req.headers['content-type'])) { - log.warn('[RecSync] - Invalid content type:', req.headers['content-type']); - return res.status(400).send('Invalid content type'); - } + async function uploadToS3(filePath, fileName, roomId, bucket, s3Client) { + if (!fs.existsSync(filePath)) return false; - // Set up write stream and handle file upload + return withFileLock(filePath, async () => { + const fileStream = fs.createReadStream(filePath); + const key = `recordings/${roomId}/${fileName}`; + + const upload = new Upload({ + client: s3Client, + params: { + Bucket: bucket, + Key: key, + Body: fileStream, + Metadata: { + 'room-id': roomId, + 'file-name': fileName, + }, + }, + }); + + await upload.done(); + + return { success: true, fileName, key }; + }); + } + + async function saveLocally(filePath, req, recMaxFileSize) { + return withFileLock(filePath, () => { + return new Promise((resolve, reject) => { const writeStream = fs.createWriteStream(filePath, { flags: 'a' }); let receivedBytes = 0; req.on('data', (chunk) => { receivedBytes += chunk.length; if (receivedBytes > recMaxFileSize) { - req.destroy(); // Stop receiving data - writeStream.destroy(); // Stop writing data - log.warn('[RecSync] - File size exceeds limit:', fileName); - return res.status(413).send('File too large'); + req.destroy(); + writeStream.destroy(); + return reject(new Error('File size exceeds limit')); } }); req.pipe(writeStream); - writeStream.on('error', (err) => { - log.error('[RecSync] - Error writing to file:', err.message); - res.status(500).send('Internal Server Error'); - }); + writeStream.on('finish', () => resolve({ status: 'file_saved_locally', path: filePath })); + writeStream.on('error', reject); + }); + }); + } - writeStream.on('finish', () => { - log.debug('[RecSync] - File saved successfully:', fileName); - res.status(200).send('File uploaded successfully'); - }); - } catch (err) { - log.error('[RecSync] - Error processing upload', err.message); - res.status(500).send('Internal Server Error'); + // #################################################### + // ROUTE HANDLER + // #################################################### + + app.post('/recSync', async (req, res) => { + if (!serverRecordingEnabled) { + return res.status(403).json({ error: 'Recording disabled' }); + } + + if (!fs.existsSync(dir.rec)) { + fs.mkdirSync(dir.rec, { recursive: true }); + } + + try { + const start = Date.now(); + + const { fileName } = checkXSS(req.query); + const roomId = getRoomIdFromFilename(fileName); + + isValidRequest(req, fileName, roomId); + + const filePath = path.resolve(dir.rec, fileName); + const passThrough = new PassThrough(); + + let totalBytes = 0; + + passThrough.on('data', (chunk) => { + totalBytes += chunk.length; + }); + + req.pipe(passThrough); + + const localStream = passThrough.pipe(new PassThrough()); + + await saveLocally(filePath, localStream, recMaxFileSize); + + const duration = ((Date.now() - start) / 1000).toFixed(2); + const sizeMB = (totalBytes / 1024 / 1024).toFixed(2); + + log.info(`[Upload] Saved ${fileName} (${sizeMB} MB) in ${duration}s`); + + return res.status(200).json({ status: 'upload_complete', fileName }); + } catch (error) { + log.error('Upload error:', error.message); + + if (error.message.includes('exceeds limit')) { + res.status(413).json({ error: 'File too large' }); + } else if (['Invalid content type', 'Invalid file name', 'Invalid room ID'].includes(error.message)) { + res.status(400).json({ error: error.message }); + } else if (error.message.includes('already in progress')) { + res.status(429).json({ error: 'Upload already in progress' }); + } else { + res.status(500).json({ error: 'Internal Server Error' }); } } }); + app.post('/recSyncFinalize', async (req, res) => { + try { + const shouldUploadToS3 = config?.integrations?.aws?.enabled && config?.media?.recording?.uploadToS3; + if (!shouldUploadToS3 || !serverRecordingEnabled) { + return res.status(403).json({ error: 'Recording disabled' }); + } + const start = Date.now(); + + const { fileName } = checkXSS(req.query); + const roomId = getRoomIdFromFilename(fileName); + + isValidRequest(req, fileName, roomId, false); + + const filePath = path.resolve(dir.rec, fileName); + + if (!fs.existsSync(filePath)) { + return res.status(500).json({ error: 'Rec Finalization failed file not exists' }); + } + + const bucket = config?.integrations?.aws?.bucket; + const s3 = await uploadToS3(filePath, fileName, roomId, bucket, s3Client); + + const duration = ((Date.now() - start) / 1000).toFixed(2); + + log.info(`[Rec Finalization] done ${fileName} in ${duration}s`, { ...s3 }); + + deleteFile(filePath); // Delete local file after successful upload + + return res.status(200).json({ status: 's3_upload_complete', ...s3 }); + } catch (error) { + log.error('Rec Finalization error', error.message); + return res.status(500).json({ error: 'Rec Finalization failed' }); + } + }); + // ############################################################### // INCOMING STREAM (getUserMedia || getDisplayMedia) TO RTMP // ############################################################### diff --git a/app/src/config.template.js b/app/src/config.template.js index aa566cc5..e7e01f1b 100644 --- a/app/src/config.template.js +++ b/app/src/config.template.js @@ -155,6 +155,7 @@ module.exports = { * Core Settings: * ------------------------ * - enabled : Enable recording functionality + * - uploadToS3 : Upload recording to AWS S3 bucket [true/false] * - endpoint : Leave empty ('') to store recordings locally OR * - Set to a valid URL (e.g., 'http://localhost:8080/') to: * - Push recordings to a remote server @@ -173,6 +174,7 @@ module.exports = { */ recording: { enabled: process.env.RECORDING_ENABLED === 'true', + uploadToS3: process.env.RECORDING_UPLOAD_TO_S3 === 'true', endpoint: process.env.RECORDING_ENDPOINT || '', dir: 'rec', maxFileSize: 1 * 1024 * 1024 * 1024, // 1GB @@ -548,7 +550,7 @@ module.exports = { * (default: Streaming avatar instructions for MiroTalk SFU) */ videoAI: { - enabled: process.env.VIDEOAI_ENABLED !== 'false', + enabled: process.env.VIDEOAI_ENABLED === 'true', basePath: 'https://api.heygen.com', apiKey: process.env.VIDEOAI_API_KEY || '', systemLimit: process.env.VIDEOAI_SYSTEM_LIMIT || 'You are a streaming avatar from MiroTalk SFU...', @@ -807,6 +809,57 @@ module.exports = { return `https://get.geojs.io/v1/ip/geo/${ip}.json`; }, }, + + /** + * AWS S3 Storage Configuration + * =========================== + * Enables cloud file storage using Amazon Simple Storage Service (S3). + * + * Core Settings: + * -------------- + * - enabled: Enable/disable AWS S3 integration [true/false] + * + * Service Setup: + * ------------- + * 1. Create an S3 Bucket: + * - Sign in to AWS Management Console + * - Navigate to S3 service + * - Click "Create bucket" + * - Choose unique name (e.g., 'mirotalk') + * - Select region (must match AWS_REGION in config) + * - Enable desired settings (versioning, logging, etc.) + * + * 2. Get Security Credentials: + * - Create IAM user with programmatic access + * - Attach 'AmazonS3FullAccess' policy (or custom minimal policy) + * - Save Access Key ID and Secret Access Key + * + * 3. Configure CORS (for direct uploads): + * [ + * { + * "AllowedHeaders": ["*"], + * "AllowedMethods": ["PUT", "POST"], + * "AllowedOrigins": ["*"], + * "ExposeHeaders": [] + * } + * ] + * + * Technical Details: + * ----------------- + * - Default region: us-east-2 (Ohio) + * - Direct upload uses presigned URLs (expire after 1 hour by default) + * - Recommended permissions for direct upload: + * - s3:PutObject + * - s3:GetObject + * - s3:DeleteObject + */ + aws: { + enabled: process.env.AWS_S3_ENABLED === 'true', + accessKeyId: process.env.AWS_ACCESS_KEY_ID || 'your-access-key-id', + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || 'your-secret-access-key', + region: process.env.AWS_REGION || 'us-east-2', + bucket: process.env.AWS_S3_BUCKET || 'mirotalk', + }, }, // ============================================== diff --git a/package.json b/package.json index ce005dbc..f13d940e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mirotalksfu", - "version": "1.8.25", + "version": "1.8.26", "description": "WebRTC SFU browser-based video calls", "main": "Server.js", "scripts": { @@ -57,9 +57,12 @@ "node": ">=18" }, "dependencies": { + "@aws-sdk/client-s3": "^3.787.0", + "@aws-sdk/lib-storage": "^3.787.0", "@mattermost/client": "10.6.0", "@ngrok/ngrok": "1.5.0", - "@sentry/node": "^9.13.0", + "@sentry/node": "^9.14.0", + "async-mutex": "^0.5.0", "axios": "^1.8.4", "chokidar": "^4.0.3", "colors": "1.4.0", @@ -81,7 +84,7 @@ "mediasoup": "3.15.7", "mediasoup-client": "3.9.5", "nodemailer": "^6.10.1", - "openai": "^4.95.1", + "openai": "^4.96.0", "qs": "6.14.0", "sanitize-filename": "^1.6.3", "socket.io": "4.8.1", diff --git a/public/js/Brand.js b/public/js/Brand.js index 6837b55d..9e3a5bec 100644 --- a/public/js/Brand.js +++ b/public/js/Brand.js @@ -64,7 +64,7 @@ let BRAND = { }, about: { imageUrl: '../images/mirotalk-logo.gif', - title: 'WebRTC SFU v1.8.25', + title: 'WebRTC SFU v1.8.26', html: `