Emit realtime chat messages from REST sends
بعض الفحوصات معلقة
Deploy To Ghaymah / deploy (push) Waiting to run
بعض الفحوصات معلقة
Deploy To Ghaymah / deploy (push) Waiting to run
هذا الالتزام موجود في:
19
src/modules/chat/chat-realtime.service.ts
Normal file
19
src/modules/chat/chat-realtime.service.ts
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
import { Injectable } from '@nestjs/common';
|
||||||
|
import { Server } from 'socket.io';
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class ChatRealtimeService {
|
||||||
|
private server?: Server;
|
||||||
|
|
||||||
|
bindServer(server: Server): void {
|
||||||
|
this.server = server;
|
||||||
|
}
|
||||||
|
|
||||||
|
emitNewMessage(conversationId: string, message: unknown): void {
|
||||||
|
this.server?.to(this.conversationRoom(conversationId)).emit('new_message', message);
|
||||||
|
}
|
||||||
|
|
||||||
|
private conversationRoom(conversationId: string): string {
|
||||||
|
return `conversation:${conversationId}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,27 +5,34 @@ import {
|
|||||||
MessageBody,
|
MessageBody,
|
||||||
OnGatewayConnection,
|
OnGatewayConnection,
|
||||||
OnGatewayDisconnect,
|
OnGatewayDisconnect,
|
||||||
|
OnGatewayInit,
|
||||||
SubscribeMessage,
|
SubscribeMessage,
|
||||||
WebSocketGateway,
|
WebSocketGateway,
|
||||||
WebSocketServer,
|
WebSocketServer,
|
||||||
} from '@nestjs/websockets';
|
} from '@nestjs/websockets';
|
||||||
import { Server, Socket } from 'socket.io';
|
import { Server, Socket } from 'socket.io';
|
||||||
|
import { ChatRealtimeService } from './chat-realtime.service';
|
||||||
import { ChatService } from './chat.service';
|
import { ChatService } from './chat.service';
|
||||||
import { SendMessageDto } from './dto/send-message.dto';
|
import { SendMessageDto } from './dto/send-message.dto';
|
||||||
|
|
||||||
type SocketWithUser = Socket & { data: { userId?: string } };
|
type SocketWithUser = Socket & { data: { userId?: string } };
|
||||||
|
|
||||||
@WebSocketGateway({ cors: { origin: '*' }, namespace: 'chat' })
|
@WebSocketGateway({ cors: { origin: '*' }, namespace: 'chat' })
|
||||||
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
|
||||||
@WebSocketServer()
|
@WebSocketServer()
|
||||||
server!: Server;
|
server!: Server;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly chatService: ChatService,
|
private readonly chatService: ChatService,
|
||||||
|
private readonly chatRealtimeService: ChatRealtimeService,
|
||||||
private readonly jwtService: JwtService,
|
private readonly jwtService: JwtService,
|
||||||
private readonly configService: ConfigService,
|
private readonly configService: ConfigService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
|
afterInit(server: Server) {
|
||||||
|
this.chatRealtimeService.bindServer(server);
|
||||||
|
}
|
||||||
|
|
||||||
async handleConnection(client: SocketWithUser) {
|
async handleConnection(client: SocketWithUser) {
|
||||||
const token = this.extractToken(client);
|
const token = this.extractToken(client);
|
||||||
if (!token) {
|
if (!token) {
|
||||||
@@ -78,7 +85,6 @@ export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
|
|||||||
if (!userId) return;
|
if (!userId) return;
|
||||||
|
|
||||||
const message = await this.chatService.sendMessage(userId, dto);
|
const message = await this.chatService.sendMessage(userId, dto);
|
||||||
this.server.to(this.conversationRoom(message.conversationId.toString())).emit('new_message', message);
|
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import { StorageModule } from '../../infrastructure/storage/storage.module';
|
|||||||
import { UsersModule } from '../users/users.module';
|
import { UsersModule } from '../users/users.module';
|
||||||
import { ChatController } from './chat.controller';
|
import { ChatController } from './chat.controller';
|
||||||
import { ChatGateway } from './chat.gateway';
|
import { ChatGateway } from './chat.gateway';
|
||||||
|
import { ChatRealtimeService } from './chat-realtime.service';
|
||||||
import { ChatService } from './chat.service';
|
import { ChatService } from './chat.service';
|
||||||
import { ChatRepository } from './chat.repository';
|
import { ChatRepository } from './chat.repository';
|
||||||
import { ChatBlock, ChatBlockSchema } from './schemas/chat-block.schema';
|
import { ChatBlock, ChatBlockSchema } from './schemas/chat-block.schema';
|
||||||
@@ -27,7 +28,7 @@ import { Message, MessageSchema } from './schemas/message.schema';
|
|||||||
]),
|
]),
|
||||||
],
|
],
|
||||||
controllers: [ChatController],
|
controllers: [ChatController],
|
||||||
providers: [ChatService, ChatRepository, ChatGateway],
|
providers: [ChatService, ChatRepository, ChatRealtimeService, ChatGateway],
|
||||||
exports: [ChatService],
|
exports: [ChatService],
|
||||||
})
|
})
|
||||||
export class ChatModule {}
|
export class ChatModule {}
|
||||||
|
|||||||
105
src/modules/chat/chat.service.spec.ts
Normal file
105
src/modules/chat/chat.service.spec.ts
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
import { Types } from 'mongoose';
|
||||||
|
import { ChatService } from './chat.service';
|
||||||
|
|
||||||
|
describe('ChatService realtime message broadcasting', () => {
|
||||||
|
const senderId = new Types.ObjectId().toString();
|
||||||
|
const recipientId = new Types.ObjectId().toString();
|
||||||
|
const conversationId = new Types.ObjectId().toString();
|
||||||
|
|
||||||
|
let chatRepository: Record<string, jest.Mock>;
|
||||||
|
let notificationsService: Record<string, jest.Mock>;
|
||||||
|
let storageService: Record<string, jest.Mock>;
|
||||||
|
let chatRealtimeService: Record<string, jest.Mock>;
|
||||||
|
let service: ChatService;
|
||||||
|
|
||||||
|
const conversation = {
|
||||||
|
id: conversationId,
|
||||||
|
participantIds: [new Types.ObjectId(senderId), new Types.ObjectId(recipientId)],
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
chatRepository = {
|
||||||
|
findConversationById: jest.fn().mockResolvedValue(conversation),
|
||||||
|
findAnyBlockBetween: jest.fn().mockResolvedValue(null),
|
||||||
|
createMessage: jest.fn(),
|
||||||
|
updateConversationAfterNewMessage: jest.fn().mockResolvedValue(conversation),
|
||||||
|
findMessageById: jest.fn(),
|
||||||
|
};
|
||||||
|
notificationsService = {
|
||||||
|
createMessageNotification: jest.fn().mockResolvedValue(null),
|
||||||
|
};
|
||||||
|
storageService = {
|
||||||
|
saveFile: jest.fn().mockResolvedValue('/uploads/chat/media/message.jpg'),
|
||||||
|
deleteFile: jest.fn().mockResolvedValue(undefined),
|
||||||
|
};
|
||||||
|
chatRealtimeService = {
|
||||||
|
emitNewMessage: jest.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
service = new ChatService(
|
||||||
|
chatRepository as any,
|
||||||
|
{} as any,
|
||||||
|
notificationsService as any,
|
||||||
|
storageService as any,
|
||||||
|
chatRealtimeService as any,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('emits new_message after a text message is created through the service', async () => {
|
||||||
|
const message = {
|
||||||
|
id: new Types.ObjectId().toString(),
|
||||||
|
conversationId: new Types.ObjectId(conversationId),
|
||||||
|
senderId: new Types.ObjectId(senderId),
|
||||||
|
content: 'hello',
|
||||||
|
messageType: 'text',
|
||||||
|
};
|
||||||
|
chatRepository.createMessage.mockResolvedValue(message);
|
||||||
|
|
||||||
|
const result = await service.sendMessage(senderId, {
|
||||||
|
conversationId,
|
||||||
|
content: 'hello',
|
||||||
|
messageType: 'text',
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(result).toBe(message);
|
||||||
|
expect(chatRealtimeService.emitNewMessage).toHaveBeenCalledTimes(1);
|
||||||
|
expect(chatRealtimeService.emitNewMessage).toHaveBeenCalledWith(conversationId, message);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('emits new_message after a media upload message is created through the service', async () => {
|
||||||
|
const message = {
|
||||||
|
id: new Types.ObjectId().toString(),
|
||||||
|
conversationId: new Types.ObjectId(conversationId),
|
||||||
|
senderId: new Types.ObjectId(senderId),
|
||||||
|
content: '',
|
||||||
|
mediaUrl: '/uploads/chat/media/message.jpg',
|
||||||
|
messageType: 'image',
|
||||||
|
};
|
||||||
|
chatRepository.createMessage.mockResolvedValue(message);
|
||||||
|
|
||||||
|
const result = await service.sendMessageWithUpload(
|
||||||
|
senderId,
|
||||||
|
{
|
||||||
|
conversationId,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
mimetype: 'image/jpeg',
|
||||||
|
size: 1024,
|
||||||
|
buffer: Buffer.from('image'),
|
||||||
|
originalname: 'message.jpg',
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(result).toBe(message);
|
||||||
|
expect(chatRepository.createMessage).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
conversationId,
|
||||||
|
senderId,
|
||||||
|
messageType: 'image',
|
||||||
|
mediaUrl: '/uploads/chat/media/message.jpg',
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
expect(chatRealtimeService.emitNewMessage).toHaveBeenCalledTimes(1);
|
||||||
|
expect(chatRealtimeService.emitNewMessage).toHaveBeenCalledWith(conversationId, message);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -11,6 +11,7 @@ import { UsersRepository } from '../users/users.repository';
|
|||||||
import { CreateConversationDto } from './dto/create-conversation.dto';
|
import { CreateConversationDto } from './dto/create-conversation.dto';
|
||||||
import { MessageQueryDto } from './dto/message-query.dto';
|
import { MessageQueryDto } from './dto/message-query.dto';
|
||||||
import { SendMessageDto } from './dto/send-message.dto';
|
import { SendMessageDto } from './dto/send-message.dto';
|
||||||
|
import { ChatRealtimeService } from './chat-realtime.service';
|
||||||
import { ChatRepository } from './chat.repository';
|
import { ChatRepository } from './chat.repository';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
@@ -22,6 +23,7 @@ export class ChatService {
|
|||||||
private readonly usersRepository: UsersRepository,
|
private readonly usersRepository: UsersRepository,
|
||||||
private readonly notificationsService: NotificationsService,
|
private readonly notificationsService: NotificationsService,
|
||||||
private readonly storageService: ManagedStorageService,
|
private readonly storageService: ManagedStorageService,
|
||||||
|
private readonly chatRealtimeService: ChatRealtimeService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async createConversation(currentUserId: string, dto: CreateConversationDto) {
|
async createConversation(currentUserId: string, dto: CreateConversationDto) {
|
||||||
@@ -169,6 +171,7 @@ export class ChatService {
|
|||||||
currentUserId,
|
currentUserId,
|
||||||
preview,
|
preview,
|
||||||
);
|
);
|
||||||
|
this.chatRealtimeService.emitNewMessage(message.conversationId.toString(), message);
|
||||||
await this.dispatchMessageNotifications(
|
await this.dispatchMessageNotifications(
|
||||||
currentUserId,
|
currentUserId,
|
||||||
conversation.participantIds.map((id) => id.toString()),
|
conversation.participantIds.map((id) => id.toString()),
|
||||||
|
|||||||
المرجع في مشكلة جديدة
حظر مستخدم