[mirotalksfu] - add upload to s3 recording

هذا الالتزام موجود في:
Miroslav Pejic
2025-04-23 21:57:23 +02:00
الأصل 3510929589
التزام 7827c56759
9 ملفات معدلة مع 313 إضافات و66 حذوفات

29
app/src/MutexManager.js Normal file
عرض الملف

@@ -0,0 +1,29 @@
'use strict';
const { Mutex } = require('async-mutex');
// In-memory file mutex registry
const fileLocks = new Map();
function getFileMutex(filePath) {
if (!fileLocks.has(filePath)) {
fileLocks.set(filePath, new Mutex());
}
return fileLocks.get(filePath);
}
async function withFileLock(filePath, fn) {
const mutex = getFileMutex(filePath);
const release = await mutex.acquire();
try {
return await fn();
} finally {
release();
if (!mutex.isLocked()) {
fileLocks.delete(filePath); // Clean up when no one is waiting
}
}
}
module.exports = { withFileLock };

عرض الملف

@@ -34,6 +34,7 @@ module.exports = class Room {
this._hostOnlyRecording = false;
// ##########################
this.recording = {
recSyncServerToS3: (config?.integrations?.aws?.enabled && config?.media?.recording?.uploadToS3) || false,
recSyncServerRecording: config?.media?.recording?.enabled || false,
recSyncServerEndpoint: config?.media?.recording?.endpoint || '',
};

عرض الملف

@@ -64,12 +64,16 @@ dev dependencies: {
* @license For commercial or closed source, contact us at license.mirotalk@gmail.com or purchase directly via CodeCanyon
* @license CodeCanyon: https://codecanyon.net/item/mirotalk-sfu-webrtc-realtime-video-conferences/40769970
* @author Miroslav Pejic - miroslav.pejic.85@gmail.com
* @version 1.8.25
* @version 1.8.26
*
*/
const express = require('express');
const { auth, requiresAuth } = require('express-openid-connect');
const { withFileLock } = require('./MutexManager');
const { PassThrough } = require('stream');
const { S3Client } = require('@aws-sdk/client-s3');
const { Upload } = require('@aws-sdk/lib-storage');
const cors = require('cors');
const compression = require('compression');
const socketIo = require('socket.io');
@@ -260,6 +264,18 @@ if (rtmpEnabled) {
}
}
// ####################################################
// AWS S3 SETUP
// ####################################################
const s3Client = new S3Client({
region: config?.integrations?.aws?.region, // Set your AWS region
credentials: {
accessKeyId: config?.integrations?.aws?.accessKeyId,
secretAccessKey: config?.integrations?.aws?.secretAccessKey,
},
});
// html views
const views = {
html: path.join(__dirname, '../../public/views'),
@@ -845,85 +861,188 @@ function startServer() {
});
// ####################################################
// KEEP RECORDING ON SERVER DIR
// UTILITY FUNCTIONS
// ####################################################
app.post('/recSync', (req, res) => {
// Store recording...
if (serverRecordingEnabled) {
//
try {
const { fileName } = checkXSS(req.query);
function isValidRequest(req, fileName, roomId, checkContentType = true) {
const contentType = req.headers['content-type'];
if (checkContentType && contentType !== 'application/octet-stream') {
throw new Error('Invalid content type');
}
if (!fileName) {
return res.status(400).send('Filename not provided');
}
if (!fileName || sanitizeFilename(fileName) !== fileName || !Validator.isValidRecFileNameFormat(fileName)) {
throw new Error('Invalid file name');
}
// Sanitize and validate filename
const safeFileName = sanitizeFilename(fileName);
if (safeFileName !== fileName || !Validator.isValidRecFileNameFormat(fileName)) {
log.warn('[RecSync] - Invalid file name:', fileName);
return res.status(400).send('Invalid file name');
}
if (!roomList || typeof roomList.has !== 'function' || !roomList.has(roomId)) {
throw new Error('Invalid room ID');
}
}
const parts = fileName.split('_');
const roomId = parts[1];
function getRoomIdFromFilename(fileName) {
const parts = fileName.split('_');
if (parts.length >= 2) {
return parts[1];
}
throw new Error('Invalid file name format');
}
if (!roomList.has(roomId)) {
log.warn('[RecSync] - RoomID not exists in filename', fileName);
return res.status(400).send('Invalid file name');
}
// ####################################################
// RECORDING HANDLERS
// ####################################################
// Ensure directory exists
if (!fs.existsSync(dir.rec)) {
fs.mkdirSync(dir.rec, { recursive: true });
}
function deleteFile(filePath) {
if (!fs.existsSync(filePath)) return false;
// Resolve and validate file path
const filePath = path.resolve(dir.rec, fileName);
if (!filePath.startsWith(path.resolve(dir.rec))) {
log.warn('[RecSync] - Attempt to save file outside allowed directory:', fileName);
return res.status(400).send('Invalid file path');
}
try {
fs.unlinkSync(filePath);
log.info(`[Upload] File ${filePath} removed from local after S3 upload`);
} catch (err) {
log.error(`[Upload] Failed to delete local file ${filePath}`, err.message);
}
}
//Validate content type
if (!['application/octet-stream'].includes(req.headers['content-type'])) {
log.warn('[RecSync] - Invalid content type:', req.headers['content-type']);
return res.status(400).send('Invalid content type');
}
async function uploadToS3(filePath, fileName, roomId, bucket, s3Client) {
if (!fs.existsSync(filePath)) return false;
// Set up write stream and handle file upload
return withFileLock(filePath, async () => {
const fileStream = fs.createReadStream(filePath);
const key = `recordings/${roomId}/${fileName}`;
const upload = new Upload({
client: s3Client,
params: {
Bucket: bucket,
Key: key,
Body: fileStream,
Metadata: {
'room-id': roomId,
'file-name': fileName,
},
},
});
await upload.done();
return { success: true, fileName, key };
});
}
async function saveLocally(filePath, req, recMaxFileSize) {
return withFileLock(filePath, () => {
return new Promise((resolve, reject) => {
const writeStream = fs.createWriteStream(filePath, { flags: 'a' });
let receivedBytes = 0;
req.on('data', (chunk) => {
receivedBytes += chunk.length;
if (receivedBytes > recMaxFileSize) {
req.destroy(); // Stop receiving data
writeStream.destroy(); // Stop writing data
log.warn('[RecSync] - File size exceeds limit:', fileName);
return res.status(413).send('File too large');
req.destroy();
writeStream.destroy();
return reject(new Error('File size exceeds limit'));
}
});
req.pipe(writeStream);
writeStream.on('error', (err) => {
log.error('[RecSync] - Error writing to file:', err.message);
res.status(500).send('Internal Server Error');
});
writeStream.on('finish', () => resolve({ status: 'file_saved_locally', path: filePath }));
writeStream.on('error', reject);
});
});
}
writeStream.on('finish', () => {
log.debug('[RecSync] - File saved successfully:', fileName);
res.status(200).send('File uploaded successfully');
});
} catch (err) {
log.error('[RecSync] - Error processing upload', err.message);
res.status(500).send('Internal Server Error');
// ####################################################
// ROUTE HANDLER
// ####################################################
app.post('/recSync', async (req, res) => {
if (!serverRecordingEnabled) {
return res.status(403).json({ error: 'Recording disabled' });
}
if (!fs.existsSync(dir.rec)) {
fs.mkdirSync(dir.rec, { recursive: true });
}
try {
const start = Date.now();
const { fileName } = checkXSS(req.query);
const roomId = getRoomIdFromFilename(fileName);
isValidRequest(req, fileName, roomId);
const filePath = path.resolve(dir.rec, fileName);
const passThrough = new PassThrough();
let totalBytes = 0;
passThrough.on('data', (chunk) => {
totalBytes += chunk.length;
});
req.pipe(passThrough);
const localStream = passThrough.pipe(new PassThrough());
await saveLocally(filePath, localStream, recMaxFileSize);
const duration = ((Date.now() - start) / 1000).toFixed(2);
const sizeMB = (totalBytes / 1024 / 1024).toFixed(2);
log.info(`[Upload] Saved ${fileName} (${sizeMB} MB) in ${duration}s`);
return res.status(200).json({ status: 'upload_complete', fileName });
} catch (error) {
log.error('Upload error:', error.message);
if (error.message.includes('exceeds limit')) {
res.status(413).json({ error: 'File too large' });
} else if (['Invalid content type', 'Invalid file name', 'Invalid room ID'].includes(error.message)) {
res.status(400).json({ error: error.message });
} else if (error.message.includes('already in progress')) {
res.status(429).json({ error: 'Upload already in progress' });
} else {
res.status(500).json({ error: 'Internal Server Error' });
}
}
});
app.post('/recSyncFinalize', async (req, res) => {
try {
const shouldUploadToS3 = config?.integrations?.aws?.enabled && config?.media?.recording?.uploadToS3;
if (!shouldUploadToS3 || !serverRecordingEnabled) {
return res.status(403).json({ error: 'Recording disabled' });
}
const start = Date.now();
const { fileName } = checkXSS(req.query);
const roomId = getRoomIdFromFilename(fileName);
isValidRequest(req, fileName, roomId, false);
const filePath = path.resolve(dir.rec, fileName);
if (!fs.existsSync(filePath)) {
return res.status(500).json({ error: 'Rec Finalization failed file not exists' });
}
const bucket = config?.integrations?.aws?.bucket;
const s3 = await uploadToS3(filePath, fileName, roomId, bucket, s3Client);
const duration = ((Date.now() - start) / 1000).toFixed(2);
log.info(`[Rec Finalization] done ${fileName} in ${duration}s`, { ...s3 });
deleteFile(filePath); // Delete local file after successful upload
return res.status(200).json({ status: 's3_upload_complete', ...s3 });
} catch (error) {
log.error('Rec Finalization error', error.message);
return res.status(500).json({ error: 'Rec Finalization failed' });
}
});
// ###############################################################
// INCOMING STREAM (getUserMedia || getDisplayMedia) TO RTMP
// ###############################################################

عرض الملف

@@ -155,6 +155,7 @@ module.exports = {
* Core Settings:
* ------------------------
* - enabled : Enable recording functionality
* - uploadToS3 : Upload recording to AWS S3 bucket [true/false]
* - endpoint : Leave empty ('') to store recordings locally OR
* - Set to a valid URL (e.g., 'http://localhost:8080/') to:
* - Push recordings to a remote server
@@ -173,6 +174,7 @@ module.exports = {
*/
recording: {
enabled: process.env.RECORDING_ENABLED === 'true',
uploadToS3: process.env.RECORDING_UPLOAD_TO_S3 === 'true',
endpoint: process.env.RECORDING_ENDPOINT || '',
dir: 'rec',
maxFileSize: 1 * 1024 * 1024 * 1024, // 1GB
@@ -548,7 +550,7 @@ module.exports = {
* (default: Streaming avatar instructions for MiroTalk SFU)
*/
videoAI: {
enabled: process.env.VIDEOAI_ENABLED !== 'false',
enabled: process.env.VIDEOAI_ENABLED === 'true',
basePath: 'https://api.heygen.com',
apiKey: process.env.VIDEOAI_API_KEY || '',
systemLimit: process.env.VIDEOAI_SYSTEM_LIMIT || 'You are a streaming avatar from MiroTalk SFU...',
@@ -807,6 +809,57 @@ module.exports = {
return `https://get.geojs.io/v1/ip/geo/${ip}.json`;
},
},
/**
* AWS S3 Storage Configuration
* ===========================
* Enables cloud file storage using Amazon Simple Storage Service (S3).
*
* Core Settings:
* --------------
* - enabled: Enable/disable AWS S3 integration [true/false]
*
* Service Setup:
* -------------
* 1. Create an S3 Bucket:
* - Sign in to AWS Management Console
* - Navigate to S3 service
* - Click "Create bucket"
* - Choose unique name (e.g., 'mirotalk')
* - Select region (must match AWS_REGION in config)
* - Enable desired settings (versioning, logging, etc.)
*
* 2. Get Security Credentials:
* - Create IAM user with programmatic access
* - Attach 'AmazonS3FullAccess' policy (or custom minimal policy)
* - Save Access Key ID and Secret Access Key
*
* 3. Configure CORS (for direct uploads):
* [
* {
* "AllowedHeaders": ["*"],
* "AllowedMethods": ["PUT", "POST"],
* "AllowedOrigins": ["*"],
* "ExposeHeaders": []
* }
* ]
*
* Technical Details:
* -----------------
* - Default region: us-east-2 (Ohio)
* - Direct upload uses presigned URLs (expire after 1 hour by default)
* - Recommended permissions for direct upload:
* - s3:PutObject
* - s3:GetObject
* - s3:DeleteObject
*/
aws: {
enabled: process.env.AWS_S3_ENABLED === 'true',
accessKeyId: process.env.AWS_ACCESS_KEY_ID || 'your-access-key-id',
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || 'your-secret-access-key',
region: process.env.AWS_REGION || 'us-east-2',
bucket: process.env.AWS_S3_BUCKET || 'mirotalk',
},
},
// ==============================================