yoncho`s blog

[RabbitMQ] 7. Code Example 본문

기술, 나의 공부를 공유합니다./MQTT

[RabbitMQ] 7. Code Example

욘초 2023. 6. 24. 13:46

자동으로 RabbitMQ 3개로 구성된 Cluster 구축하고 Queue Mirroring 정책을 설정하는 Docker-compose.yml 

더보기
version: '3'
services:
  rabbitmq1:
    container_name : rabbitmq1
    image: rabbitmq:3-management
    hostname: rabbit1
    ports:
      - "15672:15672"
      - "5672:5672"
    networks:
      - rabbitmq-network
    environment:
      RABBITMQ_ERLANG_COOKIE: 1234
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    command: >
      bash -c "
        rabbitmq-server -detached;
        rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit\@rabbit1.pid;
        rabbitmqctl set_policy ha-policy-name '^your-queue-name$' '{\"ha-mode\":\"exactly\",\"ha-params\":2}' --apply-to queues
        rabbitmqctl start_app;
        sleep infinity
      "

  rabbitmq2:
    container_name : rabbitmq2
    image: rabbitmq:3-management
    hostname: rabbit2
    networks:
      - rabbitmq-network
    environment:
      RABBITMQ_ERLANG_COOKIE: 1234
    links:
      - rabbitmq1:rabbit1
    command: >
      bash -c "
        rabbitmq-server -detached;
        rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit\@rabbit2.pid;
        rabbitmqctl stop_app;
        rabbitmqctl join_cluster rabbit@rabbit1;
        rabbitmqctl start_app;
        sleep infinity
      "
    depends_on:
      - rabbitmq1

  rabbitmq3:
    container_name : rabbitmq3
    image: rabbitmq:3-management
    hostname: rabbit3
    networks:
      - rabbitmq-network
    environment:
      RABBITMQ_ERLANG_COOKIE: 1234
    links:
      - rabbitmq1:rabbit1
      - rabbitmq2:rabbit2
    command: >
      bash -c "
        rabbitmq-server -detached;
        rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit\@rabbit3.pid;
        rabbitmqctl stop_app;
        rabbitmqctl join_cluster rabbit@rabbit1;
        rabbitmqctl start_app;
        sleep infinity
      "
    depends_on:
      - rabbitmq2

networks:
  rabbitmq-network:
    driver: bridge

 


RabbitMQ가 ShutDown 될 시 Local에 Publish 할 메시지를 임시로 저장하고, 다시 연결되었을 때 Local 파일에 쌓인 메시지 먼서 Publish하는 Publisher.js

더보기
const crypto = require('crypto'); 
const fs = require('fs'); //fileSystem
const amqp = require('amqplib');
const localDataPath = './rejectPublishData.txt';

class RabbitMQ{
    constructor(url, connection, channel, exchangeName, exchangeType, queueName, routingKey){
        this.url = url;
        this.connection = connection;
        this.channel = channel;
        this.exchangeName = exchangeName;
        this.exchangeType = exchangeType;
        this.queueName = queueName;
        this.routingKey = routingKey;
    }
}

// RabbitMQ 서버에 연결하는 함수
async function connectRabbitMQ(rabbitmq) {
    rabbitmq.connection = await amqp.connect(rabbitmq.url); // RabbitMQ 서버 주소
    rabbitmq.channel = await rabbitmq.connection.createChannel();
}

// RabbitMQ 서버에 Exchange, Queue 생성 및 Bind
async function settingExchangeAndQueue(rabbitmq){
    await rabbitmq.channel.assertExchange(rabbitmq.exchangeName, rabbitmq.exchangeType, { arguments: {durable: false} }, {mandatory:true});
    await rabbitmq.channel.assertQueue(rabbitmq.queueName, 
        {arguments: {
            'x-max-length':10,
            'x-overflow':'reject-publish', 
            'x-message-ttl':5000,
            durable: false} });
    await rabbitmq.channel.bindQueue(rabbitmq.queueName, rabbitmq.exchangeName, rabbitmq.routingKey);
}

  // 메시지를 RabbitMQ에 게시하는 함수
async function publishMessage(rabbitmq, message) {
    try{   
        rabbitmq.channel.publish(rabbitmq.exchangeName, rabbitmq.routingKey, Buffer.from(message), {mandatory:true, persistent: true});
        console.log(`[+]: ${message}`);
        return true;
    }
    catch(error){
        console.error('[error]: ', error);
        return false;
    }
}

async function readDataFromLocal(filePath) {
    try {
      const fileData = fs.readFileSync(filePath, 'utf8');
      fs.unlinkSync(filePath);

      return fileData;
    } catch (err) {
      console.error('파일을 읽어올 수 없습니다.', err);
      return null;
    }
}

async function writeDataToLocal(message)
{
    //
    fs.open(localDataPath,'a',function(err){
        if (err) throw err;
        console.log('file open complete');
    });
    
    fs.appendFile(localDataPath, message + '\n', function (err) {
        if (err) throw err;
            console.log('write :', message);
      });
}

async function publishLocalData(filePath, rabbitmq)
{
    //check RejectFile
    if(fs.existsSync(filePath))
    {
        const fileData = await readDataFromLocal(filePath);
        if(fileData != null)
        {
            const rejectPublishList = fileData.split('\n');
            for(const rejectData of rejectPublishList)
            {
                if (rejectData != null && rejectData != '')
                {
                    let publishOk = await publishMessage(rabbitmq, rejectData);
                    if (!publishOk)
                    {   
                        // disconnect event handler
                        writeDataToLocal(rejectData);
                        console.log('[error]: ', rejectData);
                    }
                }

                await delay(500);
            }
        }
    }
}

// RabbitMQ에 연결하고 메시지를 게시하는 예제 코드
async function main(retryCount) {

    const rabbitmq = new RabbitMQ();
    rabbitmq.url = 'amqp://localhost:5672';
    rabbitmq.exchangeName = 'CAN';
    rabbitmq.exchangeType = 'direct';
    rabbitmq.routingKey = 'autocrypt_';
    rabbitmq.queueName = 'IBU';
    
    let isConnectionOk = false;
    await connectRabbitMQ(rabbitmq);
    await settingExchangeAndQueue(rabbitmq);

    //check local data
    await publishLocalData(localDataPath, rabbitmq);
    
    //channel event
    rabbitmq.channel.on('return', (message)=>{
        console.log('[return]: ', message);
    });
    rabbitmq.channel.on('error', (message)=>{
        console.log('[error]: ', message);
    });
    rabbitmq.channel.on('close', ()=>{
        console.log('[close]: ');
    });
    
    
   for (let i = 0; i< retryCount; i++)
   {
        var message = `[${i}]_ 7E8,08,${crypto.randomBytes(8).toString('hex')}`;
        let publishOk = await publishMessage(rabbitmq, message);
        if (!publishOk)
        {               
            // disconnect event handler
            writeDataToLocal(message);
            console.log('[error]: ', message);
        }
        await delay(500);
   }

   rabbitmq.connection.close();
  }
  
//timer
function delay(milliseconds) {
    return new Promise((resolve) => {
      setTimeout(resolve, milliseconds);
    });
}
  
main(process.argv[2]).catch((err) => console.error(err));

 

 

 

Comments