|
@@ -0,0 +1,363 @@
|
|
|
+package com.qunzhixinxi.hnqz.common.rabbitmq.client;
|
|
|
+
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
+import com.qunzhixinxi.hnqz.common.core.entity.BaseMap;
|
|
|
+import com.qunzhixinxi.hnqz.common.rabbitmq.annotation.RabbitMqComponent;
|
|
|
+import com.qunzhixinxi.hnqz.common.rabbitmq.event.EventObj;
|
|
|
+import com.qunzhixinxi.hnqz.common.rabbitmq.event.HnqzRemoteApplicationEvent;
|
|
|
+import com.qunzhixinxi.hnqz.common.rabbitmq.exchange.DelayExchangeBuilder;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.amqp.core.*;
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.cloud.bus.BusProperties;
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
+import org.springframework.context.ApplicationEventPublisher;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.context.annotation.Configuration;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.lang.reflect.Method;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+/**
|
|
|
+ * rabbitmq消息队列客户端
|
|
|
+ *
|
|
|
+ * @author Hengchen.sun
|
|
|
+ * @version 1.0.0
|
|
|
+ * @date 2021/08/23 16:27:18
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Configuration
|
|
|
+public class RabbitMqClient {
|
|
|
+
|
|
|
+ private final RabbitAdmin rabbitAdmin;
|
|
|
+
|
|
|
+ private final RabbitTemplate rabbitTemplate;
|
|
|
+
|
|
|
+ private final Map<String, Object> sentObj = new HashMap<>();
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ public RabbitMqClient(RabbitAdmin rabbitAdmin, RabbitTemplate rabbitTemplate) {
|
|
|
+ this.rabbitAdmin = rabbitAdmin;
|
|
|
+ this.rabbitTemplate = rabbitTemplate;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private SimpleMessageListenerContainer messageListenerContainer;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ BusProperties busProperties;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ApplicationEventPublisher publisher;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ApplicationContext applicationContext;
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 初始化队列
|
|
|
+ */
|
|
|
+ @Bean
|
|
|
+ public void initQueue() {
|
|
|
+ Map<String, Object> beansWithRabbitComponentMap = this.applicationContext.getBeansWithAnnotation(RabbitMqComponent.class);
|
|
|
+ Class<?> clazz;
|
|
|
+ log.info(">>>>>>>>>>>> 初始化队列开始 <<<<<<<<<<<<");
|
|
|
+ for (Map.Entry<String, Object> entry : beansWithRabbitComponentMap.entrySet()) {
|
|
|
+ //获取到实例对象的class信息
|
|
|
+ clazz = entry.getValue().getClass();
|
|
|
+ Method[] methods = clazz.getMethods();
|
|
|
+ RabbitListener rabbitListener = clazz.getAnnotation(RabbitListener.class);
|
|
|
+ if (ObjectUtil.isNotEmpty(rabbitListener)) {
|
|
|
+ createQueue(rabbitListener);
|
|
|
+ }
|
|
|
+ for (Method method : methods) {
|
|
|
+ RabbitListener methodRabbitListener = method.getAnnotation(RabbitListener.class);
|
|
|
+ if (ObjectUtil.isNotEmpty(methodRabbitListener)) {
|
|
|
+ createQueue(methodRabbitListener);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.info(">>>>>>>>>>>> 初始化队列结束 <<<<<<<<<<<<");
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建队列
|
|
|
+ *
|
|
|
+ * @param rabbitListener 监听
|
|
|
+ */
|
|
|
+ private void createQueue(RabbitListener rabbitListener) {
|
|
|
+ String[] queues = rabbitListener.queues();
|
|
|
+ DirectExchange directExchange = createExchange(DelayExchangeBuilder.DEFAULT_DIRECT_EXCHANGE);
|
|
|
+ //创建交换机
|
|
|
+ rabbitAdmin.declareExchange(directExchange);
|
|
|
+ if (ObjectUtil.isNotEmpty(queues)) {
|
|
|
+ for (String queueName : queues) {
|
|
|
+ Queue queue = new Queue(queueName);
|
|
|
+ addQueue(queue);
|
|
|
+ Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);
|
|
|
+ rabbitAdmin.declareBinding(binding);
|
|
|
+ log.info("## 队列创建成功:【{}】 ##", queueName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送远程事件
|
|
|
+ *
|
|
|
+ * @param handlerName 绑定名称
|
|
|
+ * @param baseMap 事件
|
|
|
+ */
|
|
|
+ public void publishEvent(String handlerName, BaseMap baseMap) {
|
|
|
+ EventObj eventObj = new EventObj();
|
|
|
+ eventObj.setHandlerName(handlerName);
|
|
|
+ eventObj.setBaseMap(baseMap);
|
|
|
+ publisher.publishEvent(new HnqzRemoteApplicationEvent(eventObj, busProperties.getId()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 转换Message对象
|
|
|
+ *
|
|
|
+ * @param messageType 返回消息类型 MessageProperties类中常量
|
|
|
+ * @param msg
|
|
|
+ * @return 信息对象
|
|
|
+ */
|
|
|
+ public Message getMessage(String messageType, Object msg) {
|
|
|
+ MessageProperties messageProperties = new MessageProperties();
|
|
|
+ messageProperties.setContentType(messageType);
|
|
|
+ return new Message(msg.toString().getBytes(), messageProperties);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 有绑定Key的Exchange发送
|
|
|
+ *
|
|
|
+ * @param routingKey 路由key
|
|
|
+ * @param msg 信息
|
|
|
+ */
|
|
|
+ public void sendMessageToExchange(TopicExchange topicExchange, String routingKey, Object msg) {
|
|
|
+ Message message = getMessage(MessageProperties.CONTENT_TYPE_JSON, msg);
|
|
|
+ rabbitTemplate.send(topicExchange.getName(), routingKey, message);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 没有绑定KEY的Exchange发送
|
|
|
+ *
|
|
|
+ * @param exchange 交换机
|
|
|
+ * @param msg 信息
|
|
|
+ */
|
|
|
+ public void sendMessageToExchange(TopicExchange topicExchange, AbstractExchange exchange, String msg) {
|
|
|
+ addExchange(exchange);
|
|
|
+ log.info("RabbitMQ 发送信息{} -> {}", exchange.getName(), msg);
|
|
|
+ rabbitTemplate.convertAndSend(topicExchange.getName(), msg);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送消息
|
|
|
+ *
|
|
|
+ * @param queueName 队列名称
|
|
|
+ * @param params 消息内容map
|
|
|
+ */
|
|
|
+ public void sendMessage(String queueName, Object params) {
|
|
|
+ log.info("发送消息到mq:{}", params);
|
|
|
+ try {
|
|
|
+ rabbitTemplate.convertAndSend(DelayExchangeBuilder.DEFAULT_DIRECT_EXCHANGE, queueName, params, message -> message);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送消息
|
|
|
+ *
|
|
|
+ * @param queueName 队列名称
|
|
|
+ */
|
|
|
+ public void sendMessage(String queueName) {
|
|
|
+ this.send(queueName, this.sentObj, 0);
|
|
|
+ this.sentObj.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 添加发送信息
|
|
|
+ *
|
|
|
+ * @param key 信息key
|
|
|
+ * @param value 信息值
|
|
|
+ * @return 客户端
|
|
|
+ */
|
|
|
+ public RabbitMqClient put(String key, Object value) {
|
|
|
+ this.sentObj.put(key, value);
|
|
|
+ return this;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 延迟发送消息
|
|
|
+ *
|
|
|
+ * @param queueName 队列名称
|
|
|
+ * @param params 消息内容params
|
|
|
+ * @param expiration 延迟时间 单位毫秒
|
|
|
+ */
|
|
|
+ public void sendMessage(String queueName, Object params, Integer expiration) {
|
|
|
+ this.send(queueName, params, expiration);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送消息
|
|
|
+ *
|
|
|
+ * @param queueName 队列名
|
|
|
+ * @param params 参数
|
|
|
+ * @param expiration 过期时间
|
|
|
+ */
|
|
|
+ private void send(String queueName, Object params, Integer expiration) {
|
|
|
+ Queue queue = new Queue(queueName);
|
|
|
+ addQueue(queue);
|
|
|
+ CustomExchange customExchange = DelayExchangeBuilder.buildExchange();
|
|
|
+ rabbitAdmin.declareExchange(customExchange);
|
|
|
+ Binding binding = BindingBuilder.bind(queue).to(customExchange).with(queueName).noargs();
|
|
|
+ rabbitAdmin.declareBinding(binding);
|
|
|
+ log.debug("发送时间:{}", LocalDateTime.now());
|
|
|
+ messageListenerContainer.setQueueNames(queueName);
|
|
|
+ rabbitTemplate.convertAndSend(DelayExchangeBuilder.DEFAULT_DELAY_EXCHANGE, queueName, params, message -> {
|
|
|
+ if (expiration != null && expiration > 0) {
|
|
|
+ message.getMessageProperties().setHeader("x-delay", expiration);
|
|
|
+ }
|
|
|
+ return message;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 给queue发送消息
|
|
|
+ *
|
|
|
+ * @param queueName 队列名
|
|
|
+ */
|
|
|
+ public String receiveFromQueue(String queueName) {
|
|
|
+ return receiveFromQueue(DirectExchange.DEFAULT, queueName);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 给direct交换机指定queue发送消息
|
|
|
+ *
|
|
|
+ * @param directExchange 直接模式交换机
|
|
|
+ * @param queueName 队列名
|
|
|
+ */
|
|
|
+ public String receiveFromQueue(DirectExchange directExchange, String queueName) {
|
|
|
+ Queue queue = new Queue(queueName);
|
|
|
+ addQueue(queue);
|
|
|
+ Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
|
|
|
+ rabbitAdmin.declareBinding(binding);
|
|
|
+ String messages = (String) rabbitTemplate.receiveAndConvert(queueName);
|
|
|
+ log.info("接收到的信息:{}", messages);
|
|
|
+ return messages;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建Exchange
|
|
|
+ *
|
|
|
+ * @param exchange 交换机
|
|
|
+ */
|
|
|
+ public void addExchange(AbstractExchange exchange) {
|
|
|
+ rabbitAdmin.declareExchange(exchange);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 删除一个Exchange
|
|
|
+ *
|
|
|
+ * @param exchangeName 交换机名称
|
|
|
+ */
|
|
|
+ public boolean deleteExchange(String exchangeName) {
|
|
|
+ return rabbitAdmin.deleteExchange(exchangeName);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 声明其名称自动命名的队列。它是用exclusive=true、autoDelete=true和 durable = false
|
|
|
+ *
|
|
|
+ * @return Queue
|
|
|
+ */
|
|
|
+ public Queue addQueue() {
|
|
|
+ return rabbitAdmin.declareQueue();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建一个指定的Queue
|
|
|
+ *
|
|
|
+ * @param queue 队列
|
|
|
+ * @return queueName
|
|
|
+ */
|
|
|
+ public String addQueue(Queue queue) {
|
|
|
+ return rabbitAdmin.declareQueue(queue);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 删除一个队列
|
|
|
+ *
|
|
|
+ * @param queueName the name of the queue.
|
|
|
+ * @param unused true if the queue should be deleted only if not in use.
|
|
|
+ * @param empty true if the queue should be deleted only if empty.
|
|
|
+ */
|
|
|
+ public void deleteQueue(String queueName, boolean unused, boolean empty) {
|
|
|
+ rabbitAdmin.deleteQueue(queueName, unused, empty);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 删除一个队列
|
|
|
+ *
|
|
|
+ * @param queueName 队列名
|
|
|
+ * @return true if the queue existed and was deleted.
|
|
|
+ */
|
|
|
+ public boolean deleteQueue(String queueName) {
|
|
|
+ return rabbitAdmin.deleteQueue(queueName);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 绑定一个队列到一个匹配型交换器使用一个routingKey
|
|
|
+ *
|
|
|
+ * @param queue 队列
|
|
|
+ * @param exchange 交换机
|
|
|
+ * @param routingKey 路由key
|
|
|
+ */
|
|
|
+ public void addBinding(Queue queue, TopicExchange exchange, String routingKey) {
|
|
|
+ Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
|
|
|
+ rabbitAdmin.declareBinding(binding);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 绑定一个Exchange到一个匹配型Exchange 使用一个routingKey
|
|
|
+ *
|
|
|
+ * @param exchange 交换机
|
|
|
+ * @param topicExchange topic交换机
|
|
|
+ * @param routingKey 路由key
|
|
|
+ */
|
|
|
+ public void addBinding(Exchange exchange, TopicExchange topicExchange, String routingKey) {
|
|
|
+ Binding binding = BindingBuilder.bind(exchange).to(topicExchange).with(routingKey);
|
|
|
+ rabbitAdmin.declareBinding(binding);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 去掉一个binding
|
|
|
+ *
|
|
|
+ * @param binding 绑定
|
|
|
+ */
|
|
|
+ public void removeBinding(Binding binding) {
|
|
|
+ rabbitAdmin.removeBinding(binding);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建交换器
|
|
|
+ *
|
|
|
+ * @param exchangeName 交换机名称
|
|
|
+ * @return 直接模式交换机
|
|
|
+ */
|
|
|
+ public DirectExchange createExchange(String exchangeName) {
|
|
|
+ return new DirectExchange(exchangeName, true, false);
|
|
|
+ }
|
|
|
+}
|