From ad6da6754d688e0cc436808f4ff8889e96996fa0 Mon Sep 17 00:00:00 2001 From: boutmoun123 Date: Thu, 28 May 2026 01:15:31 +0300 Subject: [PATCH] Emit realtime chat messages from REST sends --- src/modules/chat/chat-realtime.service.ts | 19 ++++ src/modules/chat/chat.gateway.ts | 10 ++- src/modules/chat/chat.module.ts | 3 +- src/modules/chat/chat.service.spec.ts | 105 ++++++++++++++++++++++ src/modules/chat/chat.service.ts | 3 + 5 files changed, 137 insertions(+), 3 deletions(-) create mode 100644 src/modules/chat/chat-realtime.service.ts create mode 100644 src/modules/chat/chat.service.spec.ts diff --git a/src/modules/chat/chat-realtime.service.ts b/src/modules/chat/chat-realtime.service.ts new file mode 100644 index 0000000..68b9c0a --- /dev/null +++ b/src/modules/chat/chat-realtime.service.ts @@ -0,0 +1,19 @@ +import { Injectable } from '@nestjs/common'; +import { Server } from 'socket.io'; + +@Injectable() +export class ChatRealtimeService { + private server?: Server; + + bindServer(server: Server): void { + this.server = server; + } + + emitNewMessage(conversationId: string, message: unknown): void { + this.server?.to(this.conversationRoom(conversationId)).emit('new_message', message); + } + + private conversationRoom(conversationId: string): string { + return `conversation:${conversationId}`; + } +} diff --git a/src/modules/chat/chat.gateway.ts b/src/modules/chat/chat.gateway.ts index 7193755..5cbe4fb 100644 --- a/src/modules/chat/chat.gateway.ts +++ b/src/modules/chat/chat.gateway.ts @@ -5,27 +5,34 @@ import { MessageBody, OnGatewayConnection, OnGatewayDisconnect, + OnGatewayInit, SubscribeMessage, WebSocketGateway, WebSocketServer, } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; +import { ChatRealtimeService } from './chat-realtime.service'; import { ChatService } from './chat.service'; import { SendMessageDto } from './dto/send-message.dto'; type SocketWithUser = Socket & { data: { userId?: string } }; @WebSocketGateway({ cors: { origin: '*' }, namespace: 'chat' }) -export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect { +export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { @WebSocketServer() server!: Server; constructor( private readonly chatService: ChatService, + private readonly chatRealtimeService: ChatRealtimeService, private readonly jwtService: JwtService, private readonly configService: ConfigService, ) {} + afterInit(server: Server) { + this.chatRealtimeService.bindServer(server); + } + async handleConnection(client: SocketWithUser) { const token = this.extractToken(client); if (!token) { @@ -78,7 +85,6 @@ export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect { if (!userId) return; const message = await this.chatService.sendMessage(userId, dto); - this.server.to(this.conversationRoom(message.conversationId.toString())).emit('new_message', message); return message; } diff --git a/src/modules/chat/chat.module.ts b/src/modules/chat/chat.module.ts index 1d13bed..252ec19 100644 --- a/src/modules/chat/chat.module.ts +++ b/src/modules/chat/chat.module.ts @@ -7,6 +7,7 @@ import { StorageModule } from '../../infrastructure/storage/storage.module'; import { UsersModule } from '../users/users.module'; import { ChatController } from './chat.controller'; import { ChatGateway } from './chat.gateway'; +import { ChatRealtimeService } from './chat-realtime.service'; import { ChatService } from './chat.service'; import { ChatRepository } from './chat.repository'; import { ChatBlock, ChatBlockSchema } from './schemas/chat-block.schema'; @@ -27,7 +28,7 @@ import { Message, MessageSchema } from './schemas/message.schema'; ]), ], controllers: [ChatController], - providers: [ChatService, ChatRepository, ChatGateway], + providers: [ChatService, ChatRepository, ChatRealtimeService, ChatGateway], exports: [ChatService], }) export class ChatModule {} diff --git a/src/modules/chat/chat.service.spec.ts b/src/modules/chat/chat.service.spec.ts new file mode 100644 index 0000000..5454cb6 --- /dev/null +++ b/src/modules/chat/chat.service.spec.ts @@ -0,0 +1,105 @@ +import { Types } from 'mongoose'; +import { ChatService } from './chat.service'; + +describe('ChatService realtime message broadcasting', () => { + const senderId = new Types.ObjectId().toString(); + const recipientId = new Types.ObjectId().toString(); + const conversationId = new Types.ObjectId().toString(); + + let chatRepository: Record; + let notificationsService: Record; + let storageService: Record; + let chatRealtimeService: Record; + let service: ChatService; + + const conversation = { + id: conversationId, + participantIds: [new Types.ObjectId(senderId), new Types.ObjectId(recipientId)], + }; + + beforeEach(() => { + chatRepository = { + findConversationById: jest.fn().mockResolvedValue(conversation), + findAnyBlockBetween: jest.fn().mockResolvedValue(null), + createMessage: jest.fn(), + updateConversationAfterNewMessage: jest.fn().mockResolvedValue(conversation), + findMessageById: jest.fn(), + }; + notificationsService = { + createMessageNotification: jest.fn().mockResolvedValue(null), + }; + storageService = { + saveFile: jest.fn().mockResolvedValue('/uploads/chat/media/message.jpg'), + deleteFile: jest.fn().mockResolvedValue(undefined), + }; + chatRealtimeService = { + emitNewMessage: jest.fn(), + }; + + service = new ChatService( + chatRepository as any, + {} as any, + notificationsService as any, + storageService as any, + chatRealtimeService as any, + ); + }); + + it('emits new_message after a text message is created through the service', async () => { + const message = { + id: new Types.ObjectId().toString(), + conversationId: new Types.ObjectId(conversationId), + senderId: new Types.ObjectId(senderId), + content: 'hello', + messageType: 'text', + }; + chatRepository.createMessage.mockResolvedValue(message); + + const result = await service.sendMessage(senderId, { + conversationId, + content: 'hello', + messageType: 'text', + }); + + expect(result).toBe(message); + expect(chatRealtimeService.emitNewMessage).toHaveBeenCalledTimes(1); + expect(chatRealtimeService.emitNewMessage).toHaveBeenCalledWith(conversationId, message); + }); + + it('emits new_message after a media upload message is created through the service', async () => { + const message = { + id: new Types.ObjectId().toString(), + conversationId: new Types.ObjectId(conversationId), + senderId: new Types.ObjectId(senderId), + content: '', + mediaUrl: '/uploads/chat/media/message.jpg', + messageType: 'image', + }; + chatRepository.createMessage.mockResolvedValue(message); + + const result = await service.sendMessageWithUpload( + senderId, + { + conversationId, + }, + { + mimetype: 'image/jpeg', + size: 1024, + buffer: Buffer.from('image'), + originalname: 'message.jpg', + }, + ); + + expect(result).toBe(message); + expect(chatRepository.createMessage).toHaveBeenCalledWith( + expect.objectContaining({ + conversationId, + senderId, + messageType: 'image', + mediaUrl: '/uploads/chat/media/message.jpg', + }), + ); + expect(chatRealtimeService.emitNewMessage).toHaveBeenCalledTimes(1); + expect(chatRealtimeService.emitNewMessage).toHaveBeenCalledWith(conversationId, message); + }); +}); diff --git a/src/modules/chat/chat.service.ts b/src/modules/chat/chat.service.ts index a5f557c..efdffb7 100644 --- a/src/modules/chat/chat.service.ts +++ b/src/modules/chat/chat.service.ts @@ -11,6 +11,7 @@ import { UsersRepository } from '../users/users.repository'; import { CreateConversationDto } from './dto/create-conversation.dto'; import { MessageQueryDto } from './dto/message-query.dto'; import { SendMessageDto } from './dto/send-message.dto'; +import { ChatRealtimeService } from './chat-realtime.service'; import { ChatRepository } from './chat.repository'; @Injectable() @@ -22,6 +23,7 @@ export class ChatService { private readonly usersRepository: UsersRepository, private readonly notificationsService: NotificationsService, private readonly storageService: ManagedStorageService, + private readonly chatRealtimeService: ChatRealtimeService, ) {} async createConversation(currentUserId: string, dto: CreateConversationDto) { @@ -169,6 +171,7 @@ export class ChatService { currentUserId, preview, ); + this.chatRealtimeService.emitNewMessage(message.conversationId.toString(), message); await this.dispatchMessageNotifications( currentUserId, conversation.participantIds.map((id) => id.toString()),