RabbitMQ amqplib 死信和延时队列操作

conn.js

const amqp = require('amqplib');
let connection = null;
module.exports = {
    connection,
    init: () => amqp.connect({
        protocol: 'amqp',
        hostname: '127.0.0.1',
        port: 5672,
        username: 'admin',
        password: '*********',
        frameMax: 0,
        heartbeat: 30,
        vhost: '/',
        client_provided_name: 'BFF'
    }).then(conn => {
        connection = conn;
        console.log('rabbitmq connect success');
        return connection;
    })
}
 
死信
 
pub

const rabbitmq = require('./conn/mqtt');
async function producerDLX(connnection) {
    const testExchange = 'testEx';
    const testQueue = 'testQu';
    const testExchangeDLX = 'testExDLX';
    const testRoutingKeyDLX = 'testRoutingKeyDLX';
    const ch = await connnection.createChannel();
    await ch.assertExchange(testExchange, 'direct', {
        durable: true
    });
    const {queue} = await ch.assertQueue(testQueue, {
        deadLetterExchange: testExchangeDLX,
        deadLetterRoutingKey: testRoutingKeyDLX,
    });
    await ch.bindQueue(queue, testExchange);
    const msg = {
        'ut':new Date() -0
    };
    // console.log(msg)
    await ch.sendToQueue(queue, Buffer.from(JSON.stringify(msg)), {
        expiration: 10000
    });
    ch.close();
}
rabbitmq.init().then(conn => {
        producerDLX(conn)
})
 
sub

const rabbitmq = require('./conn/mqtt');
async function consumerDLX(connnection) {
    const testExchangeDLX = 'testExDLX';
    const testRoutingKeyDLX = 'testRoutingKeyDLX';
    const testQueueDLX = 'testQueueDLX';
    const ch = await connnection.createChannel();
    await ch.assertExchange(testExchangeDLX, 'direct', {
        durable: true
    });
    const {
        queue
    } = await ch.assertQueue(testQueueDLX, {
        exclusive: false,
    });
    await ch.bindQueue(queue, testExchangeDLX, testRoutingKeyDLX);
    await ch.consume(queue, msg => {
        const CONTENT = msg.content.toString();
        console.log(new Date() - 0- JSON.parse(CONTENT).ut)
        // console.log('consumer msg:');
    }, {
        noAck: true
    });
}
rabbitmq.init().then(connection => consumerDLX(connection));
 
延时队列

需要在rabbitmq中安装对应的延时插件


pub

const rabbitmq = require('./conn/mqtt');
async function producerDLX(connnection) {
    const EXCHANGE = 'zdnf-xdm'
    const ch = await connnection.createChannel();
    await ch.assertExchange(EXCHANGE, 'x-delayed-message', {
        durable: true,
        'x-delayed-type': 'topic'
    });
    const {
        queue
    } = await ch.assertQueue('xdpub-test');
    await ch.bindQueue(queue, EXCHANGE);
    const msg = {
        'ut': new Date() - 0
    };
    // console.log(msg)
    await ch.publish(EXCHANGE,'rtk-xdpub-test', Buffer.from(JSON.stringify(msg)), {
        headers: {
            'x-delay': 10000, // 一定要设置,否则无效
        }
    });
    ch.close();
}
rabbitmq.init().then(conn => {
    // setInterval(() => {
        producerDLX(conn)
    // }, 3000);
})
 
 
sub
 

const rabbitmq = require('./conn/mqtt');
async function consumerDLX(connnection) {
    const EXCHANGE = 'zdnf-xdm'
    const ch = await connnection.createChannel();
    await ch.assertExchange(EXCHANGE, 'x-delayed-message', {
        durable: true,
        'x-delayed-type': 'topic'
    });
    ch.prefetch(1);
    const {
        queue
    } = await ch.assertQueue('xdpub-test');
     await ch.bindQueue(queue, EXCHANGE, 'rtk-xdpub-test');
    await ch.consume(queue, msg => {
        const CONTENT = msg.content.toString();
        console.log(new Date() - 0 - JSON.parse(CONTENT).ut)
        // console.log('consumer msg:');
    }, {
        noAck: true
    });
}
rabbitmq.init().then(connection => consumerDLX(connection));
 

0 个评论

要回复文章请先登录注册