Forráskód Böngészése

mq消费对象兼容旧包BaseMap

baiying 1 hete
szülő
commit
a6e5650d05

+ 11 - 0
yyc-common-mq/src/main/java/com/qunzhixinxi/hnqz/common/core/entity/BaseMap.java

@@ -0,0 +1,11 @@
+package com.qunzhixinxi.hnqz.common.core.entity;
+import java.io.Serializable;
+import java.util.HashMap;
+
+/**
+ * 旧包名 BaseMap 兼容类,用于反序列化存量 MQ 消息中的 Java 序列化对象。
+ */
+public class BaseMap extends HashMap<String, Object> implements Serializable {
+
+	private static final long serialVersionUID = -3990977879105299021L;
+}

+ 17 - 0
yyc-common-mq/src/main/java/net/yyc/common/rabbitmq/config/RabbitMqConfig.java

@@ -1,5 +1,6 @@
 package net.yyc.common.rabbitmq.config;
 
+import net.yyc.common.rabbitmq.core.JavaSerializationMessageConverter;
 import net.yyc.common.rabbitmq.event.HnqzRemoteApplicationEvent;
 import net.yyc.common.rabbitmq.exchange.DelayExchangeBuilder;
 import org.springframework.amqp.core.AcknowledgeMode;
@@ -10,8 +11,10 @@ import org.springframework.amqp.core.Queue;
 import org.springframework.amqp.core.QueueBuilder;
 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
 import org.springframework.amqp.support.ConsumerTagStrategy;
+import org.springframework.amqp.support.converter.MessageConverter;
 import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -35,6 +38,20 @@ public class RabbitMqConfig {
         return rabbitAdmin;
     }
 
+    @Bean
+    public MessageConverter messageConverter() {
+        return new JavaSerializationMessageConverter();
+    }
+
+    @Bean
+    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
+            ConnectionFactory connectionFactory, MessageConverter messageConverter) {
+        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
+        factory.setConnectionFactory(connectionFactory);
+        factory.setMessageConverter(messageConverter);
+        return factory;
+    }
+
     @Bean
     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

+ 35 - 0
yyc-common-mq/src/main/java/net/yyc/common/rabbitmq/core/CompatibleObjectInputStream.java

@@ -0,0 +1,35 @@
+package net.yyc.common.rabbitmq.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * 反序列化时将旧包名 BaseMap 映射到 classpath 中的兼容类。
+ */
+class CompatibleObjectInputStream extends ObjectInputStream {
+
+	CompatibleObjectInputStream(InputStream in) throws IOException {
+		super(in);
+	}
+
+	private static final String LEGACY_BASE_MAP_CLASS = "com.qunzhixinxi.hnqz.common.core.entity.BaseMap";
+
+	@Override
+	protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException {
+		ObjectStreamClass streamDesc = super.readClassDescriptor();
+		if (LEGACY_BASE_MAP_CLASS.equals(streamDesc.getName())) {
+			return ObjectStreamClass.lookup(com.qunzhixinxi.hnqz.common.core.entity.BaseMap.class);
+		}
+		return streamDesc;
+	}
+
+	@Override
+	protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+		if (LEGACY_BASE_MAP_CLASS.equals(desc.getName())) {
+			return com.qunzhixinxi.hnqz.common.core.entity.BaseMap.class;
+		}
+		return super.resolveClass(desc);
+	}
+}

+ 80 - 0
yyc-common-mq/src/main/java/net/yyc/common/rabbitmq/core/JavaSerializationMessageConverter.java

@@ -0,0 +1,80 @@
+package net.yyc.common.rabbitmq.core;
+
+import net.yyc.common.core.entity.BaseMap;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageProperties;
+import org.springframework.amqp.support.converter.MessageConversionException;
+import org.springframework.amqp.support.converter.MessageConverter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+
+/**
+ * Java 原生序列化 MessageConverter,兼容旧包名 BaseMap 消息。
+ */
+public class JavaSerializationMessageConverter implements MessageConverter {
+
+	private static final byte[] JAVA_SERIALIZATION_MAGIC = new byte[] {(byte) 0xAC, (byte) 0xED};
+
+	@Override
+	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
+		if (object == null) {
+			throw new MessageConversionException("Cannot convert null to message");
+		}
+		try {
+			com.qunzhixinxi.hnqz.common.core.entity.BaseMap legacyMap = new com.qunzhixinxi.hnqz.common.core.entity.BaseMap();
+			if (object instanceof Map) {
+				legacyMap.putAll((Map<String, Object>) object);
+			} else {
+				throw new MessageConversionException("Unsupported message type: " + object.getClass().getName());
+			}
+			byte[] body = serialize(legacyMap);
+			MessageProperties props = messageProperties != null ? messageProperties : new MessageProperties();
+			props.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
+			return new Message(body, props);
+		} catch (IOException e) {
+			throw new MessageConversionException("Failed to serialize message", e);
+		}
+	}
+
+	@Override
+	public Object fromMessage(Message message) throws MessageConversionException {
+		byte[] body = message.getBody();
+		if (body == null || body.length < 2) {
+			throw new MessageConversionException("Message body is empty or too short");
+		}
+		if (!isJavaSerialized(body)) {
+			throw new MessageConversionException("Message body is not Java serialized object");
+		}
+		try {
+			Object obj = deserialize(body);
+			if (obj instanceof Map) {
+				return BaseMap.toBaseMap((Map<String, Object>) obj);
+			}
+			throw new MessageConversionException("Deserialized object is not a Map: " + obj.getClass().getName());
+		} catch (IOException | ClassNotFoundException e) {
+			throw new MessageConversionException("Failed to deserialize message", e);
+		}
+	}
+
+	private static boolean isJavaSerialized(byte[] body) {
+		return body[0] == JAVA_SERIALIZATION_MAGIC[0] && body[1] == JAVA_SERIALIZATION_MAGIC[1];
+	}
+
+	private static Object deserialize(byte[] body) throws IOException, ClassNotFoundException {
+		try (CompatibleObjectInputStream input = new CompatibleObjectInputStream(new ByteArrayInputStream(body))) {
+			return input.readObject();
+		}
+	}
+
+	private static byte[] serialize(Object object) throws IOException {
+		ByteArrayOutputStream output = new ByteArrayOutputStream();
+		try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(output)) {
+			objectOutputStream.writeObject(object);
+		}
+		return output.toByteArray();
+	}
+}

+ 54 - 0
yyc-common-mq/src/test/java/net/yyc/common/rabbitmq/core/JavaSerializationMessageConverterTest.java

@@ -0,0 +1,54 @@
+package net.yyc.common.rabbitmq.core;
+
+import net.yyc.common.core.entity.BaseMap;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageProperties;
+
+import java.util.Base64;
+import java.util.List;
+
+public class JavaSerializationMessageConverterTest {
+
+	private static final String PRODUCTION_PAYLOAD_BASE64 =
+			"rO0ABXNyAC9jb20ucXVuemhpeGlueGkuaG5xei5jb21tb24uY29yZS5lbnRpdHkuQmFzZU1hcLxcUd0p4zDnAgAAeHIAEWphdmEudXRpbC5IYXNoTWFwBQfa\n" +
+					"wcMWYNEDAAJGAApsb2FkRmFjdG9ySQAJdGhyZXNob2xkeHA/QAAAAAAADHcIAAAAEAAAAAJ0AAd0YXNrSWRzc3IAEWphdmEudXRpbC5Db2xsU2VyV46rtjob\n" +
+					"qBEDAAFJAAN0YWd4cAAAAAF3BAAAAAF0AAczNzc2MzE4eHQABnN0YXR1c3QAATN4";
+
+	@Test
+	public void fromMessage_shouldDeserializeProductionPayload() {
+		JavaSerializationMessageConverter converter = new JavaSerializationMessageConverter();
+		byte[] body = Base64.getDecoder().decode(PRODUCTION_PAYLOAD_BASE64.replace("\n", "").replace("\r", ""));
+		Message message = new Message(body, new MessageProperties());
+
+		Object result = converter.fromMessage(message);
+
+		Assert.assertTrue(result instanceof BaseMap);
+		BaseMap baseMap = (BaseMap) result;
+		Assert.assertEquals("3", baseMap.get("status"));
+
+		List<Long> taskIds = baseMap.getListLong("taskIds");
+		Assert.assertNotNull(taskIds);
+		Assert.assertEquals(1, taskIds.size());
+		Assert.assertEquals(Long.valueOf(3776318L), taskIds.get(0));
+	}
+
+	@Test
+	public void toMessage_shouldSerializeAsLegacyBaseMap() {
+		JavaSerializationMessageConverter converter = new JavaSerializationMessageConverter();
+		BaseMap baseMap = new BaseMap();
+		baseMap.put("taskIds", java.util.Collections.singletonList("3776318"));
+		baseMap.put("status", "3");
+
+		Message message = converter.toMessage(baseMap, new MessageProperties());
+
+		Assert.assertNotNull(message.getBody());
+		Assert.assertTrue(message.getBody().length > 2);
+		Assert.assertEquals((byte) 0xAC, message.getBody()[0]);
+		Assert.assertEquals((byte) 0xED, message.getBody()[1]);
+
+		BaseMap roundTrip = (BaseMap) converter.fromMessage(message);
+		Assert.assertEquals("3", roundTrip.get("status"));
+	}
+}