yoncho`s blog
[RabbitMQ] 7. Code Example 본문
자동으로 RabbitMQ 3개로 구성된 Cluster 구축하고 Queue Mirroring 정책을 설정하는 Docker-compose.yml
더보기
version: '3'
services:
rabbitmq1:
container_name : rabbitmq1
image: rabbitmq:3-management
hostname: rabbit1
ports:
- "15672:15672"
- "5672:5672"
networks:
- rabbitmq-network
environment:
RABBITMQ_ERLANG_COOKIE: 1234
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
command: >
bash -c "
rabbitmq-server -detached;
rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit\@rabbit1.pid;
rabbitmqctl set_policy ha-policy-name '^your-queue-name$' '{\"ha-mode\":\"exactly\",\"ha-params\":2}' --apply-to queues
rabbitmqctl start_app;
sleep infinity
"
rabbitmq2:
container_name : rabbitmq2
image: rabbitmq:3-management
hostname: rabbit2
networks:
- rabbitmq-network
environment:
RABBITMQ_ERLANG_COOKIE: 1234
links:
- rabbitmq1:rabbit1
command: >
bash -c "
rabbitmq-server -detached;
rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit\@rabbit2.pid;
rabbitmqctl stop_app;
rabbitmqctl join_cluster rabbit@rabbit1;
rabbitmqctl start_app;
sleep infinity
"
depends_on:
- rabbitmq1
rabbitmq3:
container_name : rabbitmq3
image: rabbitmq:3-management
hostname: rabbit3
networks:
- rabbitmq-network
environment:
RABBITMQ_ERLANG_COOKIE: 1234
links:
- rabbitmq1:rabbit1
- rabbitmq2:rabbit2
command: >
bash -c "
rabbitmq-server -detached;
rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit\@rabbit3.pid;
rabbitmqctl stop_app;
rabbitmqctl join_cluster rabbit@rabbit1;
rabbitmqctl start_app;
sleep infinity
"
depends_on:
- rabbitmq2
networks:
rabbitmq-network:
driver: bridge
RabbitMQ가 ShutDown 될 시 Local에 Publish 할 메시지를 임시로 저장하고, 다시 연결되었을 때 Local 파일에 쌓인 메시지 먼서 Publish하는 Publisher.js
더보기
const crypto = require('crypto');
const fs = require('fs'); //fileSystem
const amqp = require('amqplib');
const localDataPath = './rejectPublishData.txt';
class RabbitMQ{
constructor(url, connection, channel, exchangeName, exchangeType, queueName, routingKey){
this.url = url;
this.connection = connection;
this.channel = channel;
this.exchangeName = exchangeName;
this.exchangeType = exchangeType;
this.queueName = queueName;
this.routingKey = routingKey;
}
}
// RabbitMQ 서버에 연결하는 함수
async function connectRabbitMQ(rabbitmq) {
rabbitmq.connection = await amqp.connect(rabbitmq.url); // RabbitMQ 서버 주소
rabbitmq.channel = await rabbitmq.connection.createChannel();
}
// RabbitMQ 서버에 Exchange, Queue 생성 및 Bind
async function settingExchangeAndQueue(rabbitmq){
await rabbitmq.channel.assertExchange(rabbitmq.exchangeName, rabbitmq.exchangeType, { arguments: {durable: false} }, {mandatory:true});
await rabbitmq.channel.assertQueue(rabbitmq.queueName,
{arguments: {
'x-max-length':10,
'x-overflow':'reject-publish',
'x-message-ttl':5000,
durable: false} });
await rabbitmq.channel.bindQueue(rabbitmq.queueName, rabbitmq.exchangeName, rabbitmq.routingKey);
}
// 메시지를 RabbitMQ에 게시하는 함수
async function publishMessage(rabbitmq, message) {
try{
rabbitmq.channel.publish(rabbitmq.exchangeName, rabbitmq.routingKey, Buffer.from(message), {mandatory:true, persistent: true});
console.log(`[+]: ${message}`);
return true;
}
catch(error){
console.error('[error]: ', error);
return false;
}
}
async function readDataFromLocal(filePath) {
try {
const fileData = fs.readFileSync(filePath, 'utf8');
fs.unlinkSync(filePath);
return fileData;
} catch (err) {
console.error('파일을 읽어올 수 없습니다.', err);
return null;
}
}
async function writeDataToLocal(message)
{
//
fs.open(localDataPath,'a',function(err){
if (err) throw err;
console.log('file open complete');
});
fs.appendFile(localDataPath, message + '\n', function (err) {
if (err) throw err;
console.log('write :', message);
});
}
async function publishLocalData(filePath, rabbitmq)
{
//check RejectFile
if(fs.existsSync(filePath))
{
const fileData = await readDataFromLocal(filePath);
if(fileData != null)
{
const rejectPublishList = fileData.split('\n');
for(const rejectData of rejectPublishList)
{
if (rejectData != null && rejectData != '')
{
let publishOk = await publishMessage(rabbitmq, rejectData);
if (!publishOk)
{
// disconnect event handler
writeDataToLocal(rejectData);
console.log('[error]: ', rejectData);
}
}
await delay(500);
}
}
}
}
// RabbitMQ에 연결하고 메시지를 게시하는 예제 코드
async function main(retryCount) {
const rabbitmq = new RabbitMQ();
rabbitmq.url = 'amqp://localhost:5672';
rabbitmq.exchangeName = 'CAN';
rabbitmq.exchangeType = 'direct';
rabbitmq.routingKey = 'autocrypt_';
rabbitmq.queueName = 'IBU';
let isConnectionOk = false;
await connectRabbitMQ(rabbitmq);
await settingExchangeAndQueue(rabbitmq);
//check local data
await publishLocalData(localDataPath, rabbitmq);
//channel event
rabbitmq.channel.on('return', (message)=>{
console.log('[return]: ', message);
});
rabbitmq.channel.on('error', (message)=>{
console.log('[error]: ', message);
});
rabbitmq.channel.on('close', ()=>{
console.log('[close]: ');
});
for (let i = 0; i< retryCount; i++)
{
var message = `[${i}]_ 7E8,08,${crypto.randomBytes(8).toString('hex')}`;
let publishOk = await publishMessage(rabbitmq, message);
if (!publishOk)
{
// disconnect event handler
writeDataToLocal(message);
console.log('[error]: ', message);
}
await delay(500);
}
rabbitmq.connection.close();
}
//timer
function delay(milliseconds) {
return new Promise((resolve) => {
setTimeout(resolve, milliseconds);
});
}
main(process.argv[2]).catch((err) => console.error(err));
'기술, 나의 공부를 공유합니다. > MQTT' 카테고리의 다른 글
[RabbitMQ] 6. Queue 옵션으로 Message Control (0) | 2023.06.24 |
---|---|
[RabbitMQ] 5. RabbitMQ Broker ShutDown 대비 (0) | 2023.06.24 |
[RabbitMQ] 4. Clustering & Mirroring (0) | 2023.06.24 |
[RabbitMQ] 3. Connection & Channel (0) | 2023.06.24 |
[RabbitMQ] 2. Queue (0) | 2023.06.24 |
Comments