在之前的文章中,我们介绍过多种消息中间件,其中比较出名就是 RabbitMQ。那么在 Micronaut 框架的项目中,如何操作 RabbitMQ 呢?
首先,我们需要在 Micronaut 项目中添加 MQ 相关的配置项,如下:
rabbitmq:
host: 127.0.0.1
port: 5672
username: mq
password: pwd
然后我们添加相应的依赖项:
<dependency>
<groupId>io.micronaut.rabbitmq</groupId>
<artifactId>micronaut-rabbitmq</artifactId>
</dependency>
我们先创建一个生产者:
import io.micronaut.rabbitmq.annotation.Binding;
import io.micronaut.rabbitmq.annotation.RabbitClient;
@RabbitClient("exchange")
public interface ProductClient {
@Binding("product")
void send(byte[] data);
}
@RabbitClient 中的参数代表生产者对应的 exchange 的名字,而 @Binding 中的参数对应的是路由名字。生产者我们定义为接口即可。
我们再创建一个消费者:
import io.micronaut.rabbitmq.annotation.Queue;
import io.micronaut.rabbitmq.annotation.RabbitListener;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@RabbitListener // (1)
public class ProductListener {
List<String> messageLengths = Collections.synchronizedList(new ArrayList<>());
@Queue("product") // (2)
public void receive(byte[] data) { // (3)
messageLengths.add(new String(data));
System.out.println("Java received " + data.length + " bytes from RabbitMQ");
}
}
@Queue 中的参数代表对应的队列。
注意不论是生产者和消费者,数据都以 byte[] 的形式传递。
以上定义后,如果 RabbitMQ 中已经有相应的 exchange 的定义,则代码直接就可以正常运行。但如果生产者的 exchange 还不存在,则会报异常,这个时候需要我们在代码中配置相应的初始化代码。如下:
import com.rabbitmq.client.Channel;
import io.micronaut.rabbitmq.connect.ChannelInitializer;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Singleton // (1)
public class ChannelPoolListener extends ChannelInitializer { // (2)
@Override
public void initialize(Channel channel, String name) throws IOException { // (3)
//docs/quickstart
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 100);
channel.queueDeclare("product", false, false, false, args); // (4)
//docs/exchange
channel.exchangeDeclare("animals", "headers", false);
channel.queueDeclare("snakes", false, false, false, null);
channel.queueDeclare("cats", false, false, false, null);
Map<String, Object> catArgs = new HashMap<>();
catArgs.put("x-match", "all");
catArgs.put("animalType", "Cat");
channel.queueBind("cats", "animals", "", catArgs);
Map<String, Object> snakeArgs = new HashMap<>();
snakeArgs.put("x-match", "all");
snakeArgs.put("animalType", "Snake");
channel.queueBind("snakes", "animals", "", snakeArgs);
}
}
以上代码中,需要注意:
- exchangeDeclare 用于声明要创建的 exchange
- queueDeclare 用于声明队列
- queueBind 用于将队列与 exchange 绑定,并可以指定相应的路由
有了以上的配置,Micronaut 项目就可以非常丝滑的使用 RabbitMQ。你学会了吗?