四时宝库

程序员的知识宝库

Micronaut 如何操作 RabbitMQ(rabbit mq教程)

在之前的文章中,我们介绍过多种消息中间件,其中比较出名就是 RabbitMQ。那么在 Micronaut 框架的项目中,如何操作 RabbitMQ 呢?

首先,我们需要在 Micronaut 项目中添加 MQ 相关的配置项,如下:

rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: mq
    password: pwd

然后我们添加相应的依赖项:

<dependency>
    <groupId>io.micronaut.rabbitmq</groupId>
    <artifactId>micronaut-rabbitmq</artifactId>
</dependency>

我们先创建一个生产者:

import io.micronaut.rabbitmq.annotation.Binding;
import io.micronaut.rabbitmq.annotation.RabbitClient;

@RabbitClient("exchange")
public interface ProductClient {

    @Binding("product")
    void send(byte[] data);
}

@RabbitClient 中的参数代表生产者对应的 exchange 的名字,而 @Binding 中的参数对应的是路由名字。生产者我们定义为接口即可。

我们再创建一个消费者:

import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@RabbitListener // (1)
public class ProductListener {

    List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());

    @Queue("product") // (2)
    public void receive(byte[] data) { // (3)
        messageLengths.add(new String(data));
        System.out.println("Java received " + data.length + " bytes from RabbitMQ");
    }
}

@Queue 中的参数代表对应的队列。

注意不论是生产者和消费者,数据都以 byte[] 的形式传递。

以上定义后,如果 RabbitMQ 中已经有相应的 exchange 的定义,则代码直接就可以正常运行。但如果生产者的 exchange 还不存在,则会报异常,这个时候需要我们在代码中配置相应的初始化代码。如下:

import com.rabbitmq.client.Channel;
import io.micronaut.rabbitmq.connect.ChannelInitializer;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Singleton // (1)
public class ChannelPoolListener extends ChannelInitializer { // (2)

    @Override
    public void initialize(Channel channel, String name) throws IOException { // (3)
        //docs/quickstart
        Map<String, Object> args = new HashMap<>();
        args.put("x-max-priority", 100);
        channel.queueDeclare("product", false, false, false, args); // (4)

        //docs/exchange
        channel.exchangeDeclare("animals", "headers", false);
      
        channel.queueDeclare("snakes", false, false, false, null);
        channel.queueDeclare("cats", false, false, false, null);
        Map<String, Object> catArgs = new HashMap<>();
        catArgs.put("x-match", "all");
        catArgs.put("animalType", "Cat");
        channel.queueBind("cats", "animals", "", catArgs);

        Map<String, Object> snakeArgs = new HashMap<>();
        snakeArgs.put("x-match", "all");
        snakeArgs.put("animalType", "Snake");
        channel.queueBind("snakes", "animals", "", snakeArgs);
    }

}

以上代码中,需要注意:

  • exchangeDeclare 用于声明要创建的 exchange
  • queueDeclare 用于声明队列
  • queueBind 用于将队列与 exchange 绑定,并可以指定相应的路由

有了以上的配置,Micronaut 项目就可以非常丝滑的使用 RabbitMQ。你学会了吗?

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接