[Message Queue - 1๋ถ€] Nestjs Message Queue & Redis

[Message Queue - 1๋ถ€] Nestjs Message Queue & Redis

ยท

4 min read

๐Ÿ’๐Ÿปโ€โ™‚๏ธ Subject

  • Message Queue์— ๋Œ€ํ•˜์—ฌ ์•Œ์•„๋ณด์ž.

๐ŸŽฏ Goal for Research

  • [x] Message Queue์˜ ๊ฐœ๋…์„ ์ดํ•ด.

  • [x] Nestjs์—์„œ Message Queue๋ฅผ ์…‹ํŒ…ํ•  ์ˆ˜ ์žˆ๋‹ค.

  • [x] Message Queue์— Message ๋ฅผ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๋‹ค.

  • [x] Message Queue์—์„œ Message๋ฅผ ์ฝ์–ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

๐Ÿ“ƒ Contents

๊ฐœ๋…

  • ํ”„๋กœ๊ทธ๋žจ ๊ฐ„์— ๋ฐ์ดํ„ฐ๋ฅผ ๊ตํ™˜ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ํ†ต์‹  ๋ฐฉ๋ฒ•

  • ๋ฉ”์‹œ์ง€ ์ง€ํ–ฅ ๋ฏธ๋“ค์›จ์–ด(Message Oriented Middleware:MOM)๋ฅผ ๊ตฌํ˜„ํ•œ ์‹œ์Šคํ…œ

  • Message Queue์—๋Š” 3๊ฐœ์˜ ์ดํ•ด๊ด€๊ณ„์ž๊ฐ€ ์กด์žฌ

    • Message Queue : Message ์ž„์‹œ ์ €์žฅ ๋ฒ„ํผ๋กœ Consumer๊ฐ€ ์ฝ์„ ๋•Œ๊นŒ์ง€ ๋ณด๊ด€.

    • Producer : Message ์ƒ์„ฑ์ž๋กœ Message๋ฅผ ๋งŒ๋“ค์–ด MQ์— ์‚ฝ์ž…ํ•˜๋Š” ์—ญํ• .

    • Consumer : Message Queue์— ์žˆ๋Š” ๋ฉ”์„ธ์ง€๋ฅผ ์ฝ๊ณ  ์ฒ˜๋ฆฌํ•˜๋Š” ์„œ๋ฒ„.

  • Producer๊ฐ€ ์ƒ์„ฑํ•œ Message๋ฅผ Message Queue๋Š” Message ์ž„์‹œ ์ €์žฅ Buffer ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•˜๊ณ , ์ด๋ฅผ Consumer๊ฐ€ ๊ฐ€์ ธ๊ฐ€์„œ ์‹คํ–‰ํ•˜๊ฒŒ๋œ๋‹ค.

์‚ฌ์šฉ์ด์œ 

์•ˆ์ •์„ฑ ํ–ฅ์ƒ

  • ํŠธ๋ž˜ํ”ฝ ๊ณผ๋‹ค ํ˜„์ƒ์œผ๋กœ ๋ณ‘๋ชฉ์ด ๋ฐœ์ƒํ•  ๊ฒฝ์šฐ Data Loss๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ Message Queue์— ์ €์žฅํ•จ์œผ๋กœ์จ ์„œ๋ฒ„๊ฐ€ ์ฃฝ์–ด๋„ ์žฌ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ.

๋น„๋™๊ธฐ์ฒ˜๋ฆฌ

  • Producer & Comsumer๊ฐ„ Request -> ๋Œ€๊ธฐ -> Response๋ฅผ ๊ธฐ๋‹ค๋ฆฌ์ง€ ์•Š๊ณ  ๋…๋ฆฝ์ ์œผ๋กœ ์ž‘๋™ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•จ. ์ฆ‰, ์ฒ˜๋ฆฌ ์†๋„๊ฐ€ ๋‹ค๋ฅธ ํ”„๋กœ์„ธ์Šค ๊ฐ„ ๋ณ‘๋ชฉ ํ˜„์ƒ ๋ฐฉ์ง€.

  • ์ด๋Š” ์„œ๋น„์Šค๊ฐ„ ์˜์กด์„ฑ์„ ๋‚ฎ์ถค.

์„œ๋น„์Šค ํ™•์žฅ์„ฑ

  • Message Queue์— ๋“ฑ๋ก๋œ ๋ฐ์ดํ„ฐ๋Š” ์—ฌ๋Ÿฌ Consumer๊ฐ€ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ๋ถ€ํ•˜๋ถ„์‚ฐ ํšจ๊ณผ๊ฐ€ ์žˆ์œผ๋ฉฐ ํŠธ๋ž˜ํ”ฝ ์ฆ๊ฐ€์—๋„ ์•ˆ์ •์  ๋™์ž‘์ด ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•จ.

์‚ฌ์šฉ ์˜ˆ์‹œ

์ฑ„ํŒ… ์‹œ์Šคํ…œ

  • ์‚ฌ์šฉ์ž๊ฐ€ ๋ฉ”์„ธ์ง€ ์ „์†ก ์‹œ, Socket Emit์ด ๋˜๊ณ  Onํ•œ ์ชฝ์—์„œ ๋ฆฌ์Šค๋‹ํ•˜๊ฒŒ๋˜๋Š”๋ฐ ์ด๋Ÿฌํ•œ ๋ฉ”์„ธ์ง€๋ฅผ ๋ฉ”์„ธ์ง€ ํ์— ์ €์žฅํ•˜๊ณ , Onํ•œ ์ชฝ์—์„œ ์ˆ˜์‹ ์ด ์™„๋ฃŒ๋˜๋ฉด ๋ฉ”์„ธ์ง€ํ์—์„œ ์‚ญ์ œํ•จ์œผ๋กœ์จ ๋ฉ”์„ธ์ง€ ์†์‹ค์„ ๋ฐฉ์ง€.

์‹ค์‹œ๊ฐ„ ์•Œ๋ฆผ ์„œ๋น„์Šค

  • ์‚ฌ์šฉ์ž๊ฐ€ ํŠน์ • ์ด๋ฒคํŠธ(์ชฝ์ง€ ๋“ฑ)์— ๋Œ€ํ•˜์—ฌ ์•Œ๋ฆผ ๊ตฌ๋… ์‹œ, ๋ฉ”์„ธ์ง€ ํ์— ์ •๋ณด๋ฅผ ์ €์žฅ ํ›„ ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒ(ex> ์‚ฌ์šฉ์ž ๋กœ๊ทธ์ธ)ํ•˜๋ฉด ๋ฉ”์„ธ์ง€ ํ๋ฅผ ํ†ตํ•ด ๊ตฌ๋…์ž์—๊ฒŒ ์•Œ๋ฆผ ์ „์†ก.

Web3 ์ด๋ฒคํŠธ ์ˆ˜์‹ 

  • Smart Contract์—์„œ ๋ฐœ์ƒํ•˜๋Š” ์ด๋ฒคํŠธ๋ฅผ Message Queue์— ์ €์žฅํ•˜๊ณ , xxx์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๋•Œ ์ ์ ˆํžˆ ์ฒ˜๋ฆฌํ•˜๋Š” ์„œ๋ฒ„(Consumer) ๊ฐ€ ์žก์•„์„œ ์ฒ˜๋ฆฌ๋ฅผ ํ–ˆ์„ ๋•Œ ์•ˆ์ •์„ฑ ๋†’๊ฒŒ Event ์ˆ˜์‹ ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

Message Queue ๊ธฐ๋ณธ ์‚ฌ์šฉ ์˜ต์…˜

Visibility Timeout (์ œํ•œ์‹œ๊ฐ„ ์ดˆ๊ณผ)

  • Consumer๊ฐ€ Message Queue ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์€ ๋’ค, ์ฆ‰์‹œ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ญ์ œํ•˜์ง€ ์•Š๊ณ  ๋‚จ๊ฒจ๋‘๋Š” ๊ธฐ๊ฐ„.

  • Consumer๊ฐ€ Message ์ฒ˜๋ฆฌ์— ๋‹ค์†Œ ์‹œ๊ฐ„์ด ์†Œ์š”๋˜๋Š” ๊ฒฝ์šฐ ์œ ์šฉ.

  • ์žฅ์  :

    • Consumer๊ฐ€ ๋ฉ”์„ธ์ง€ ์ฒ˜๋ฆฌ ์‹คํŒจ ์‹œ, ๋‹ค๋ฅธ Consumer๊ฐ€ ๋ฉ”์„ธ์ง€ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Œ.

    • Message Data Loss ๋ฐฉ์ง€

  • ๋‹จ์  :

    • Message Queue ์šฉ๋Ÿ‰ ์†Œ๋ชจ๋Ÿ‰ ์ฆ๋Œ€

Delay Queue (์ง€์—ฐ ํ)

  • ๋ฉ”์‹œ์ง€๊ฐ€ ํ์— ์ €์žฅ๋œ ํ›„ ํŠน์ • ์‹œ๊ฐ„์ด ์ง€๋‚˜์•ผ Consumer๊ฐ€ ์ฝ์„ ์ˆ˜ ์žˆ๋Š” ํ.

  • ๋ฉ”์‹œ์ง€ ์ „์†ก์„ ์ผ์ • ์‹œ๊ฐ„ ์ง€์—ฐ์‹œํ‚ค๊ฑฐ๋‚˜, ํŠน์ • ์ˆœ์„œ๋Œ€๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•ด์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ ์œ ์šฉ.

  • ์žฅ์  :

    • ๋ถ€ํ•˜ ๋ถ„์‚ฐ

    • ๋ฉ”์„ธ์ง€ ์ˆœ์„œ๋Œ€๋กœ ์ฒ˜๋ฆฌ ํ•  ์‹œ ์œ ์šฉํ•จ.

  • ๋‹จ์  :

    • ์ฒ˜๋ฆฌ์ˆœ์„œ๊ฐ€ ์ค‘์š”ํ•˜์ง€ ์•Š์€ ๊ฒฝ์šฐ ์˜ค๋ฒ„ํ—ค๋“œ๋กœ ์ž‘์šฉ.

Exclusive Queue (๋…์  ํ)

  • ํ•œ ๋ฒˆ์— ํ•˜๋‚˜์˜ Consumer๋งŒ ํ์— ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•˜๋Š” ํ

  • ์žฅ์  :

    • ๋ฐ์ดํ„ฐ ์ถฉ๋Œ ๋ฐฉ์ง€
  • ๋‹จ์  :

    • ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ํšจ์œจ์„ฑ ์ €ํ•˜

Message Queue ๊ตฌํ˜„ ๋ฐ๋ชจ

  • Redis Docker๋กœ ์‹คํ–‰
docker pull redis
sudo docker run -p 6379:6379 redis
  • bull ์„ค์น˜
npm install --save @nestjs/bull bull
  • ๋ฉ”์„ธ์ง€๋ฅผ ๋“ฑ๋กํ•˜๋Š” producer์šฉ API๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

  • app.module์— Bull Module ๋“ฑ๋ก

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';

// Nestjs Bull
import { BullModule } from '@nestjs/bull';
import { AppConsumer } from './consumer.service';



@Module({
    imports: [
        BullModule.forRoot({
        // Redis Host ์„ค์ • ์˜ต์…˜
            redis: {
                host: '127.0.1',
                port: 6379,
            },
            limiter : {
                max : 3, // ์ดˆ๋‹น ์ตœ๋Œ€ 1๊ฐœ์˜ ์ž‘์—…์„ ํ—ˆ์šฉ
                duration : 1 // 1์ดˆ๋™์•ˆ ์ œํ•œ ์ ์šฉ
            }
        }),

        BullModule.registerQueue({
            name : "my-queue"
        }),
    ],
    controllers: [AppController],
    providers: [AppService, AppConsumer],
})

export class AppModule {}
  • Controller์— API ์ƒ์„ฑ
export class AppController {
    constructor(private readonly appService: AppService) {}
    @Get()
    addMessage(@Body() data : any) {
        return this.appService.addMessageQueue(data);
    }
}
  • Service ์ƒ์„ฑ
export class AppService {
    constructor(
        @InjectQueue("my-queue") private testQueue : Queue
    ){}

async addMessageQueue(data : any) {
    const job = await this.testQueue.add('my-queue', data, {    
        delay : 1500,
        // priority : ์šฐ์„ ์ˆœ์œ„ ์„ค์ •
    });

    return job;
}

์œ„ API๋ฅผ ํ†ตํ•ด Message๋ฅผ ๋“ฑ๋กํ•˜๋ฉด Redis์— ์•„๋ž˜์ฒ˜๋Ÿผ ์ƒ์„ฑ๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

  • ์ด์ œ ๋ฉ”์„ธ์ง€๋ฅผ ์†Œ๋น„ํ•  Consumer๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
// consumer.service.ts
import { Injectable } from '@nestjs/common';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('my-queue')
export class AppConsumer {
    @Process('my-queue')
    async processData(job: Job) {
        console.log('id', job.id);
        console.log('data :', job.data);

        // ์™„๋ฃŒ์ฒ˜๋ฆฌ๋ฅผ ์ง„ํ–‰ํ•จ. ์ •์ƒ์ฒ˜๋ฆฌ๋˜์—ˆ์„ ๋•Œ ๊ฒฐ๊ณผ๊ฐ’์„ ์•„๋ž˜์— ๋„ฃ์–ด์ค˜์•ผํ•จ.
        job.moveToCompleted("์ž˜์ฒ˜๋ฆฌ โœ…");
    }
}
  • ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ๋˜์—ˆ์„ ๋•Œ, ๊ฒฐ๊ณผ๊ฐ’์ด ์•„๋ž˜์™€ ๊ฐ™์ด ์—…๋ฐ์ดํŠธ ๋˜๋Š” ๊ฒƒ์„ ํ™•์ธ ํ•  ์ˆ˜ ์žˆ๋‹ค.

ย