๐๐ปโโ๏ธ Subject
- Message Queue์ ๋ํ์ฌ ์์๋ณด์.
๐ฏ Goal for Research
[x] Message Queue์ ๊ฐ๋ ์ ์ดํด.
[x] Nestjs์์ Message Queue๋ฅผ ์ ํ ํ ์ ์๋ค.
[x] Message Queue์ Message ๋ฅผ ์ถ๊ฐํ ์ ์๋ค.
[x] Message Queue์์ Message๋ฅผ ์ฝ์ด ์ฌ์ฉํ ์ ์๋ค.
๐ Contents
๊ฐ๋
ํ๋ก๊ทธ๋จ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๊ตํํ ๋ ์ฌ์ฉํ๋ ํต์ ๋ฐฉ๋ฒ
๋ฉ์์ง ์งํฅ ๋ฏธ๋ค์จ์ด(Message Oriented Middleware:MOM)๋ฅผ ๊ตฌํํ ์์คํ
Message Queue์๋ 3๊ฐ์ ์ดํด๊ด๊ณ์๊ฐ ์กด์ฌ
Message Queue : Message ์์ ์ ์ฅ ๋ฒํผ๋ก Consumer๊ฐ ์ฝ์ ๋๊น์ง ๋ณด๊ด.
Producer : Message ์์ฑ์๋ก Message๋ฅผ ๋ง๋ค์ด MQ์ ์ฝ์ ํ๋ ์ญํ .
Consumer : Message Queue์ ์๋ ๋ฉ์ธ์ง๋ฅผ ์ฝ๊ณ ์ฒ๋ฆฌํ๋ ์๋ฒ.
Producer๊ฐ ์์ฑํ Message๋ฅผ Message Queue๋ Message ์์ ์ ์ฅ Buffer ์ญํ ์ ์ํํ๊ณ , ์ด๋ฅผ Consumer๊ฐ ๊ฐ์ ธ๊ฐ์ ์คํํ๊ฒ๋๋ค.
์ฌ์ฉ์ด์
์์ ์ฑ ํฅ์
- ํธ๋ํฝ ๊ณผ๋ค ํ์์ผ๋ก ๋ณ๋ชฉ์ด ๋ฐ์ํ ๊ฒฝ์ฐ Data Loss๊ฐ ๋ฐ์ํ ์ ์๋๋ฐ Message Queue์ ์ ์ฅํจ์ผ๋ก์จ ์๋ฒ๊ฐ ์ฃฝ์ด๋ ์ฌ์ฒ๋ฆฌ ๊ฐ๋ฅ.
๋น๋๊ธฐ์ฒ๋ฆฌ
Producer & Comsumer๊ฐ Request -> ๋๊ธฐ -> Response๋ฅผ ๊ธฐ๋ค๋ฆฌ์ง ์๊ณ ๋ ๋ฆฝ์ ์ผ๋ก ์๋ํ ์ ์๋๋ก ํจ. ์ฆ, ์ฒ๋ฆฌ ์๋๊ฐ ๋ค๋ฅธ ํ๋ก์ธ์ค ๊ฐ ๋ณ๋ชฉ ํ์ ๋ฐฉ์ง.
์ด๋ ์๋น์ค๊ฐ ์์กด์ฑ์ ๋ฎ์ถค.
์๋น์ค ํ์ฅ์ฑ
- Message Queue์ ๋ฑ๋ก๋ ๋ฐ์ดํฐ๋ ์ฌ๋ฌ Consumer๊ฐ ์ฌ์ฉํ ์ ์๋ค. ์ด๋ฅผ ํตํด ๋ถํ๋ถ์ฐ ํจ๊ณผ๊ฐ ์์ผ๋ฉฐ ํธ๋ํฝ ์ฆ๊ฐ์๋ ์์ ์ ๋์์ด ๊ฐ๋ฅํ๊ฒ ํจ.
์ฌ์ฉ ์์
์ฑํ ์์คํ
- ์ฌ์ฉ์๊ฐ ๋ฉ์ธ์ง ์ ์ก ์, Socket Emit์ด ๋๊ณ Onํ ์ชฝ์์ ๋ฆฌ์ค๋ํ๊ฒ๋๋๋ฐ ์ด๋ฌํ ๋ฉ์ธ์ง๋ฅผ ๋ฉ์ธ์ง ํ์ ์ ์ฅํ๊ณ , Onํ ์ชฝ์์ ์์ ์ด ์๋ฃ๋๋ฉด ๋ฉ์ธ์งํ์์ ์ญ์ ํจ์ผ๋ก์จ ๋ฉ์ธ์ง ์์ค์ ๋ฐฉ์ง.
์ค์๊ฐ ์๋ฆผ ์๋น์ค
- ์ฌ์ฉ์๊ฐ ํน์ ์ด๋ฒคํธ(์ชฝ์ง ๋ฑ)์ ๋ํ์ฌ ์๋ฆผ ๊ตฌ๋ ์, ๋ฉ์ธ์ง ํ์ ์ ๋ณด๋ฅผ ์ ์ฅ ํ ์ด๋ฒคํธ๊ฐ ๋ฐ์(ex> ์ฌ์ฉ์ ๋ก๊ทธ์ธ)ํ๋ฉด ๋ฉ์ธ์ง ํ๋ฅผ ํตํด ๊ตฌ๋ ์์๊ฒ ์๋ฆผ ์ ์ก.
Web3 ์ด๋ฒคํธ ์์
- Smart Contract์์ ๋ฐ์ํ๋ ์ด๋ฒคํธ๋ฅผ Message Queue์ ์ ์ฅํ๊ณ , xxx์ด๋ฒคํธ๊ฐ ๋ฐ์ํ์ ๋ ์ ์ ํ ์ฒ๋ฆฌํ๋ ์๋ฒ(Consumer) ๊ฐ ์ก์์ ์ฒ๋ฆฌ๋ฅผ ํ์ ๋ ์์ ์ฑ ๋๊ฒ Event ์์ ์ด ๊ฐ๋ฅํ๋ค.
Message Queue ๊ธฐ๋ณธ ์ฌ์ฉ ์ต์
Visibility Timeout (์ ํ์๊ฐ ์ด๊ณผ)
Consumer๊ฐ Message Queue ๋ฐ์ดํฐ๋ฅผ ์ฝ์ ๋ค, ์ฆ์ ๋ฐ์ดํฐ๋ฅผ ์ญ์ ํ์ง ์๊ณ ๋จ๊ฒจ๋๋ ๊ธฐ๊ฐ.
Consumer๊ฐ Message ์ฒ๋ฆฌ์ ๋ค์ ์๊ฐ์ด ์์๋๋ ๊ฒฝ์ฐ ์ ์ฉ.
์ฅ์ :
Consumer๊ฐ ๋ฉ์ธ์ง ์ฒ๋ฆฌ ์คํจ ์, ๋ค๋ฅธ Consumer๊ฐ ๋ฉ์ธ์ง ์ฒ๋ฆฌํ ์ ์์.
Message Data Loss ๋ฐฉ์ง
๋จ์ :
- Message Queue ์ฉ๋ ์๋ชจ๋ ์ฆ๋
Delay Queue (์ง์ฐ ํ)
๋ฉ์์ง๊ฐ ํ์ ์ ์ฅ๋ ํ ํน์ ์๊ฐ์ด ์ง๋์ผ Consumer๊ฐ ์ฝ์ ์ ์๋ ํ.
๋ฉ์์ง ์ ์ก์ ์ผ์ ์๊ฐ ์ง์ฐ์ํค๊ฑฐ๋, ํน์ ์์๋๋ก ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํด์ผ ํ๋ ๊ฒฝ์ฐ ์ ์ฉ.
์ฅ์ :
๋ถํ ๋ถ์ฐ
๋ฉ์ธ์ง ์์๋๋ก ์ฒ๋ฆฌ ํ ์ ์ ์ฉํจ.
๋จ์ :
- ์ฒ๋ฆฌ์์๊ฐ ์ค์ํ์ง ์์ ๊ฒฝ์ฐ ์ค๋ฒํค๋๋ก ์์ฉ.
Exclusive Queue (๋ ์ ํ)
ํ ๋ฒ์ ํ๋์ Consumer๋ง ํ์ ์ ๊ทผํ ์ ์๋๋ก ํ๋ ํ
์ฅ์ :
- ๋ฐ์ดํฐ ์ถฉ๋ ๋ฐฉ์ง
๋จ์ :
- ๋น๋๊ธฐ ์ฒ๋ฆฌ ํจ์จ์ฑ ์ ํ
Message Queue ๊ตฌํ ๋ฐ๋ชจ
- Redis Docker๋ก ์คํ
docker pull redis
sudo docker run -p 6379:6379 redis
- bull ์ค์น
npm install --save @nestjs/bull bull
๋ฉ์ธ์ง๋ฅผ ๋ฑ๋กํ๋ producer์ฉ API๋ฅผ ์์ฑํฉ๋๋ค.
app.module์ Bull Module ๋ฑ๋ก
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
// Nestjs Bull
import { BullModule } from '@nestjs/bull';
import { AppConsumer } from './consumer.service';
@Module({
imports: [
BullModule.forRoot({
// Redis Host ์ค์ ์ต์
redis: {
host: '127.0.1',
port: 6379,
},
limiter : {
max : 3, // ์ด๋น ์ต๋ 1๊ฐ์ ์์
์ ํ์ฉ
duration : 1 // 1์ด๋์ ์ ํ ์ ์ฉ
}
}),
BullModule.registerQueue({
name : "my-queue"
}),
],
controllers: [AppController],
providers: [AppService, AppConsumer],
})
export class AppModule {}
- Controller์ API ์์ฑ
export class AppController {
constructor(private readonly appService: AppService) {}
@Get()
addMessage(@Body() data : any) {
return this.appService.addMessageQueue(data);
}
}
- Service ์์ฑ
export class AppService {
constructor(
@InjectQueue("my-queue") private testQueue : Queue
){}
async addMessageQueue(data : any) {
const job = await this.testQueue.add('my-queue', data, {
delay : 1500,
// priority : ์ฐ์ ์์ ์ค์
});
return job;
}
์ API๋ฅผ ํตํด Message๋ฅผ ๋ฑ๋กํ๋ฉด Redis์ ์๋์ฒ๋ผ ์์ฑ๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
- ์ด์ ๋ฉ์ธ์ง๋ฅผ ์๋นํ Consumer๋ฅผ ์์ฑํ๋ค.
// consumer.service.ts
import { Injectable } from '@nestjs/common';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('my-queue')
export class AppConsumer {
@Process('my-queue')
async processData(job: Job) {
console.log('id', job.id);
console.log('data :', job.data);
// ์๋ฃ์ฒ๋ฆฌ๋ฅผ ์งํํจ. ์ ์์ฒ๋ฆฌ๋์์ ๋ ๊ฒฐ๊ณผ๊ฐ์ ์๋์ ๋ฃ์ด์ค์ผํจ.
job.moveToCompleted("์์ฒ๋ฆฌ โ
");
}
}
- ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋์์ ๋, ๊ฒฐ๊ณผ๊ฐ์ด ์๋์ ๊ฐ์ด ์ ๋ฐ์ดํธ ๋๋ ๊ฒ์ ํ์ธ ํ ์ ์๋ค.