RabbitMQ amqplib 死信和延时队列操作
conn.js
const amqp = require('amqplib');
let connection = null;
module.exports = {
connection,
init: () => amqp.connect({
protocol: 'amqp',
hostname: '127.0.0.1',
port: 5672,
username: 'admin',
password: '*********',
frameMax: 0,
heartbeat: 30,
vhost: '/',
client_provided_name: 'BFF'
}).then(conn => {
connection = conn;
console.log('rabbitmq connect success');
return connection;
})
}
死信
pub
const rabbitmq = require('./conn/mqtt');
async function producerDLX(connnection) {
const testExchange = 'testEx';
const testQueue = 'testQu';
const testExchangeDLX = 'testExDLX';
const testRoutingKeyDLX = 'testRoutingKeyDLX';
const ch = await connnection.createChannel();
await ch.assertExchange(testExchange, 'direct', {
durable: true
});
const {queue} = await ch.assertQueue(testQueue, {
deadLetterExchange: testExchangeDLX,
deadLetterRoutingKey: testRoutingKeyDLX,
});
await ch.bindQueue(queue, testExchange);
const msg = {
'ut':new Date() -0
};
// console.log(msg)
await ch.sendToQueue(queue, Buffer.from(JSON.stringify(msg)), {
expiration: 10000
});
ch.close();
}
rabbitmq.init().then(conn => {
producerDLX(conn)
})
sub
const rabbitmq = require('./conn/mqtt');
async function consumerDLX(connnection) {
const testExchangeDLX = 'testExDLX';
const testRoutingKeyDLX = 'testRoutingKeyDLX';
const testQueueDLX = 'testQueueDLX';
const ch = await connnection.createChannel();
await ch.assertExchange(testExchangeDLX, 'direct', {
durable: true
});
const {
queue
} = await ch.assertQueue(testQueueDLX, {
exclusive: false,
});
await ch.bindQueue(queue, testExchangeDLX, testRoutingKeyDLX);
await ch.consume(queue, msg => {
const CONTENT = msg.content.toString();
console.log(new Date() - 0- JSON.parse(CONTENT).ut)
// console.log('consumer msg:');
}, {
noAck: true
});
}
rabbitmq.init().then(connection => consumerDLX(connection));
延时队列
pub
const rabbitmq = require('./conn/mqtt');
async function producerDLX(connnection) {
const EXCHANGE = 'zdnf-xdm'
const ch = await connnection.createChannel();
await ch.assertExchange(EXCHANGE, 'x-delayed-message', {
durable: true,
'x-delayed-type': 'topic'
});
const {
queue
} = await ch.assertQueue('xdpub-test');
await ch.bindQueue(queue, EXCHANGE);
const msg = {
'ut': new Date() - 0
};
// console.log(msg)
await ch.publish(EXCHANGE,'rtk-xdpub-test', Buffer.from(JSON.stringify(msg)), {
headers: {
'x-delay': 10000, // 一定要设置,否则无效
}
});
ch.close();
}
rabbitmq.init().then(conn => {
// setInterval(() => {
producerDLX(conn)
// }, 3000);
})
sub
const rabbitmq = require('./conn/mqtt');
async function consumerDLX(connnection) {
const EXCHANGE = 'zdnf-xdm'
const ch = await connnection.createChannel();
await ch.assertExchange(EXCHANGE, 'x-delayed-message', {
durable: true,
'x-delayed-type': 'topic'
});
ch.prefetch(1);
const {
queue
} = await ch.assertQueue('xdpub-test');
await ch.bindQueue(queue, EXCHANGE, 'rtk-xdpub-test');
await ch.consume(queue, msg => {
const CONTENT = msg.content.toString();
console.log(new Date() - 0 - JSON.parse(CONTENT).ut)
// console.log('consumer msg:');
}, {
noAck: true
});
}
rabbitmq.init().then(connection => consumerDLX(connection));
const amqp = require('amqplib');
let connection = null;
module.exports = {
connection,
init: () => amqp.connect({
protocol: 'amqp',
hostname: '127.0.0.1',
port: 5672,
username: 'admin',
password: '*********',
frameMax: 0,
heartbeat: 30,
vhost: '/',
client_provided_name: 'BFF'
}).then(conn => {
connection = conn;
console.log('rabbitmq connect success');
return connection;
})
}
死信
pub
const rabbitmq = require('./conn/mqtt');
async function producerDLX(connnection) {
const testExchange = 'testEx';
const testQueue = 'testQu';
const testExchangeDLX = 'testExDLX';
const testRoutingKeyDLX = 'testRoutingKeyDLX';
const ch = await connnection.createChannel();
await ch.assertExchange(testExchange, 'direct', {
durable: true
});
const {queue} = await ch.assertQueue(testQueue, {
deadLetterExchange: testExchangeDLX,
deadLetterRoutingKey: testRoutingKeyDLX,
});
await ch.bindQueue(queue, testExchange);
const msg = {
'ut':new Date() -0
};
// console.log(msg)
await ch.sendToQueue(queue, Buffer.from(JSON.stringify(msg)), {
expiration: 10000
});
ch.close();
}
rabbitmq.init().then(conn => {
producerDLX(conn)
})
sub
const rabbitmq = require('./conn/mqtt');
async function consumerDLX(connnection) {
const testExchangeDLX = 'testExDLX';
const testRoutingKeyDLX = 'testRoutingKeyDLX';
const testQueueDLX = 'testQueueDLX';
const ch = await connnection.createChannel();
await ch.assertExchange(testExchangeDLX, 'direct', {
durable: true
});
const {
queue
} = await ch.assertQueue(testQueueDLX, {
exclusive: false,
});
await ch.bindQueue(queue, testExchangeDLX, testRoutingKeyDLX);
await ch.consume(queue, msg => {
const CONTENT = msg.content.toString();
console.log(new Date() - 0- JSON.parse(CONTENT).ut)
// console.log('consumer msg:');
}, {
noAck: true
});
}
rabbitmq.init().then(connection => consumerDLX(connection));
延时队列
需要在rabbitmq中安装对应的延时插件
pub
const rabbitmq = require('./conn/mqtt');
async function producerDLX(connnection) {
const EXCHANGE = 'zdnf-xdm'
const ch = await connnection.createChannel();
await ch.assertExchange(EXCHANGE, 'x-delayed-message', {
durable: true,
'x-delayed-type': 'topic'
});
const {
queue
} = await ch.assertQueue('xdpub-test');
await ch.bindQueue(queue, EXCHANGE);
const msg = {
'ut': new Date() - 0
};
// console.log(msg)
await ch.publish(EXCHANGE,'rtk-xdpub-test', Buffer.from(JSON.stringify(msg)), {
headers: {
'x-delay': 10000, // 一定要设置,否则无效
}
});
ch.close();
}
rabbitmq.init().then(conn => {
// setInterval(() => {
producerDLX(conn)
// }, 3000);
})
sub
const rabbitmq = require('./conn/mqtt');
async function consumerDLX(connnection) {
const EXCHANGE = 'zdnf-xdm'
const ch = await connnection.createChannel();
await ch.assertExchange(EXCHANGE, 'x-delayed-message', {
durable: true,
'x-delayed-type': 'topic'
});
ch.prefetch(1);
const {
queue
} = await ch.assertQueue('xdpub-test');
await ch.bindQueue(queue, EXCHANGE, 'rtk-xdpub-test');
await ch.consume(queue, msg => {
const CONTENT = msg.content.toString();
console.log(new Date() - 0 - JSON.parse(CONTENT).ut)
// console.log('consumer msg:');
}, {
noAck: true
});
}
rabbitmq.init().then(connection => consumerDLX(connection));