网站开发 项目介绍,专业网站开发价格,品牌网站设计案例,官网和网站的区别文章目录 概述设计实现通用的序列化接口通用的序列化实现【推荐】 vs 专用的序列化实现专用序列化接口定义序列化实现 概述
网络传输和序列化这两部分的功能相对来说是非常通用并且独立的#xff0c;在设计的时候#xff0c;只要能做到比较好的抽象#xff0c;这两部的实现… 文章目录 概述设计实现通用的序列化接口通用的序列化实现【推荐】 vs 专用的序列化实现专用序列化接口定义序列化实现 概述
网络传输和序列化这两部分的功能相对来说是非常通用并且独立的在设计的时候只要能做到比较好的抽象这两部的实现它的通用性是非常强的。不仅可以用于 RPC 框架中同样可以直接拿去用于实现消息队列或者其他需要互相通信的分布式系统中。
我们先来实现序列化和反序列化部分因为后面的部分会用到序列化和反序列化。 设计实现
通用的序列化接口
首先我们需要设计一个可扩展的通用的序列化接口为了方便使用我们直接使用静态类的方式来定义这个接口严格来说这并不是一个接口 public class SerializeSupport {public static E E parse(byte [] buffer) {// ...}public static E byte [] serialize(E entry) {// ...}
}parse 方法用于反序列化serialize 方法用于序列化
比如
// 序列化
MyClass myClassObject new MyClass();
byte [] bytes SerializeSupport.serialize(myClassObject);
// 反序列化
MyClass myClassObject1 SerializeSupport.parse(bytes);通用的序列化实现【推荐】 vs 专用的序列化实现
在讲解序列化和反序列化的时候说过可以使用通用的序列化实现也可以自己来定义专用的序列化实现。
专用的序列化性能最好但缺点是实现起来比较复杂你要为每一种类型的数据专门编写序列化和反序列化方法。一般的 RPC 框架采用的都是通用的序列化实现比如 gRPC 采用的是 Protobuf 序列化实现Dubbo 支持 hession2 等好几种序列化实现 为什么这些 RPC 框架不像消息队列一样采用性能更好的专用的序列化实现呢这个原因很简单消息队列它需要序列化数据的类型是固定的只是它自己的内部通信的一些命令。但 RPC 框架它需要序列化的数据是用户调用远程方法的参数这些参数可能是各种数据类型所以必须使用通用的序列化实现确保各种类型的数据都能被正确的序列化和反序列化。 我们这里还是采用专用的序列化实现主要的目的是一起来实践一下如何来实现序列化和反序列化
专用序列化接口定义
public interface SerializerT {/*** 计算对象序列化后的长度主要用于申请存放序列化数据的字节数组* param entry 待序列化的对象* return 对象序列化后的长度*/int size(T entry);/*** 序列化对象。将给定的对象序列化成字节数组* param entry 待序列化的对象* param bytes 存放序列化数据的字节数组* param offset 数组的偏移量从这个位置开始写入序列化数据* param length 对象序列化后的长度也就是{link Serializer#size(java.lang.Object)}方法的返回值。*/void serialize(T entry, byte[] bytes, int offset, int length);/*** 反序列化对象* param bytes 存放序列化数据的字节数组* param offset 数组的偏移量从这个位置开始写入序列化数据* param length 对象序列化后的长度* return 反序列化之后生成的对象*/T parse(byte[] bytes, int offset, int length);/*** 用一个字节标识对象类型每种类型的数据应该具有不同的类型值*/byte type();/*** 返回序列化对象类型的Class对象。*/ClassT getSerializeClass();
}这个接口中除了 serialize 和 parse 这两个序列化和反序列化两个方法以外还定义了下面这几个方法
size 方法计算序列化之后的数据长度用于事先来申请存放序列化数据的字节数组type 方法定义每种序列化实现的类型这个类型值也会写入到序列化之后的数据中主要的作用是在反序列化的时候能够识别是什么数据类型的以便找到对应的反序列化实现类getSerializeClass 这个方法返回这个序列化实现类对应的对象类型目的是在执行序列化的时候通过被序列化的对象类型找到对应序列化实现类 序列化实现
利用这个 Serializer 接口我们就可以来实现 SerializeSupport 这个支持任何对象类型序列化的通用静态类了。
首先我们定义两个 Map这两个 Map 中存放着所有实现 Serializer 接口的序列化实现类
private static MapClass?/*序列化对象类型*/, Serializer?/*序列化实现*/ serializerMap new HashMap();
private static MapByte/*序列化实现类型*/, Class?/*序列化对象类型*/ typeMap new HashMap();
serializerMap 中的 key 是序列化实现类对应的序列化对象的类型它的用途是在序列化的时候通过被序列化的对象类型找到对应的序列化实现类。typeMap 的作用和 serializerMap 是类似的它的 key 是序列化实现类的类型用于在反序列化的时候从序列化的数据中读出对象类型然后找到对应的序列化实现类
理解了这两个 Map 的作用实现序列化和反序列化这两个方法就很容易了。这两个方法的实现思路是一样的都是通过一个类型在这两个 Map 中进行查找查找的结果就是对应的序列化实现类的实例也就是 Serializer 接口的实现然后调用对应的序列化或者反序列化方法就可以了。
public class SerializeSupport {private static final Logger logger LoggerFactory.getLogger(SerializeSupport.class);private static MapClass?/*序列化对象类型*/, Serializer?/*序列化实现*/ serializerMap new HashMap();private static MapByte/*序列化实现类型*/, Class?/*序列化对象类型*/ typeMap new HashMap();static {for (Serializer serializer : ServiceSupport.loadAll(Serializer.class)) {registerType(serializer.type(), serializer.getSerializeClass(), serializer);logger.info(Found serializer, class: {}, type: {}.,serializer.getSerializeClass().getCanonicalName(),serializer.type());}}private static byte parseEntryType(byte[] buffer) {return buffer[0];}private static E void registerType(byte type, ClassE eClass, SerializerE serializer) {serializerMap.put(eClass, serializer);typeMap.put(type, eClass);}SuppressWarnings(unchecked)private static E E parse(byte [] buffer, int offset, int length, ClassE eClass) {Object entry serializerMap.get(eClass).parse(buffer, offset, length);if (eClass.isAssignableFrom(entry.getClass())) {return (E) entry;} else {throw new SerializeException(Type mismatch!);}}public static E E parse(byte [] buffer) {return parse(buffer, 0, buffer.length);}private static E E parse(byte[] buffer, int offset, int length) {byte type parseEntryType(buffer);SuppressWarnings(unchecked)ClassE eClass (ClassE )typeMap.get(type);if(null eClass) {throw new SerializeException(String.format(Unknown entry type: %d!, type));} else {return parse(buffer, offset 1, length - 1,eClass);}}public static E byte [] serialize(E entry) {SuppressWarnings(unchecked)SerializerE serializer (SerializerE) serializerMap.get(entry.getClass());if(serializer null) {throw new SerializeException(String.format(Unknown entry class type: %s, entry.getClass().toString()));}byte [] bytes new byte [serializer.size(entry) 1];bytes[0] serializer.type();serializer.serialize(entry, bytes, 1, bytes.length - 1);return bytes;}
}
所有的 Serializer 的实现类是怎么加载到 SerializeSupport 的那两个 Map 中的呢这里面利用了 Java 的一个 SPI 类加载机制
public class ServiceSupport {private final static MapString, Object singletonServices new HashMap();public synchronized static S S load(ClassS service) {return StreamSupport.stream(ServiceLoader.load(service).spliterator(), false).map(ServiceSupport::singletonFilter).findFirst().orElseThrow(ServiceLoadException::new);}public synchronized static S CollectionS loadAll(ClassS service) {return StreamSupport.stream(ServiceLoader.load(service).spliterator(), false).map(ServiceSupport::singletonFilter).collect(Collectors.toList());}SuppressWarnings(unchecked)private static S S singletonFilter(S service) {if(service.getClass().isAnnotationPresent(Singleton.class)) {String className service.getClass().getCanonicalName();Object singletonInstance singletonServices.putIfAbsent(className, service);return singletonInstance null ? service : (S) singletonInstance;} else {return service;}}
}
到这里我们就封装好了一个通用的序列化的接口 对于使用序列化的模块来说它只要依赖 SerializeSupport 这个静态类调用它的序列化和反序列化方法就可以了不需要依赖任何序列化实现类。 对于序列化实现的提供者来说也只需要依赖并实现 Serializer 这个接口就可以了。
比如我们的 HelloService 例子中的参数是一个 String 类型的数据我们需要实现一个支持 String 类型的序列化实现
public class StringSerializer implements SerializerString {Overridepublic int size(String entry) {return entry.getBytes(StandardCharsets.UTF_8).length;}Overridepublic void serialize(String entry, byte[] bytes, int offset, int length) {byte [] strBytes entry.getBytes(StandardCharsets.UTF_8);System.arraycopy(strBytes, 0, bytes, offset, strBytes.length);}Overridepublic String parse(byte[] bytes, int offset, int length) {return new String(bytes, offset, length, StandardCharsets.UTF_8);}Overridepublic byte type() {return Types.TYPE_STRING;}Overridepublic ClassString getSerializeClass() {return String.class;}
}在把 String 和 byte 数组做转换的时候一定要指定编码方式确保序列化和反序列化的时候都使用一致的编码我们这里面统一使用 UTF8 编码。否则如果遇到执行序列化和反序列化的两台服务器默认编码不一样就会出现乱码。我们在开发过程用遇到的很多中文乱码问题绝大部分都是这个原因 还有一个更复杂的序列化实现 MetadataSerializer用于将注册中心的数据持久化到文件中
/*** Size of the map 2 bytes* Map entry:* Key string:* Length: 2 bytes* Serialized key bytes: variable length* Value list* List size: 2 bytes* item(URI):* Length: 2 bytes* serialized uri: variable length* item(URI):* ...* Map entry:* ...**/
public class MetadataSerializer implements SerializerMetadata {Overridepublic int size(Metadata entry) {return Short.BYTES // Size of the map 2 bytesentry.entrySet().stream().mapToInt(this::entrySize).sum();}Overridepublic void serialize(Metadata entry, byte[] bytes, int offset, int length) {ByteBuffer buffer ByteBuffer.wrap(bytes, offset, length);buffer.putShort(toShortSafely(entry.size()));entry.forEach((k,v) - {byte [] keyBytes k.getBytes(StandardCharsets.UTF_8);buffer.putShort(toShortSafely(keyBytes.length));buffer.put(keyBytes);buffer.putShort(toShortSafely(v.size()));for (URI uri : v) {byte [] uriBytes uri.toASCIIString().getBytes(StandardCharsets.UTF_8);buffer.putShort(toShortSafely(uriBytes.length));buffer.put(uriBytes);}});}private int entrySize(Map.EntryString, ListURI e) {// Map entry:return Short.BYTES // Key string length: 2 bytese.getKey().getBytes().length // Serialized key bytes: variable lengthShort.BYTES // List size: 2 bytese.getValue().stream() // Value list.mapToInt(uri - {return Short.BYTES // Key string length: 2 bytesuri.toASCIIString().getBytes(StandardCharsets.UTF_8).length; // Serialized key bytes: variable length}).sum();}Overridepublic Metadata parse(byte[] bytes, int offset, int length) {ByteBuffer buffer ByteBuffer.wrap(bytes, offset, length);Metadata metadata new Metadata();int sizeOfMap buffer.getShort();for (int i 0; i sizeOfMap; i) {int keyLength buffer.getShort();byte [] keyBytes new byte [keyLength];buffer.get(keyBytes);String key new String(keyBytes, StandardCharsets.UTF_8);int uriListSize buffer.getShort();ListURI uriList new ArrayList(uriListSize);for (int j 0; j uriListSize; j) {int uriLength buffer.getShort();byte [] uriBytes new byte [uriLength];buffer.get(uriBytes);URI uri URI.create(new String(uriBytes, StandardCharsets.UTF_8));uriList.add(uri);}metadata.put(key, uriList);}return metadata;}Overridepublic byte type() {return Types.TYPE_METADATA;}Overridepublic ClassMetadata getSerializeClass() {return Metadata.class;}private short toShortSafely(int v) {assert v Short.MAX_VALUE;return (short) v;}
}
到这里序列化的部分就实现完成了。我们这个序列化的实现对外提供服务的就只有一个 SerializeSupport 静态类并且可以通过扩展支持序列化任何类型的数据这样一个通用的实现不仅可以用在我们这个 RPC 框架的例子中完全可以把这部分直接拿过去用在业务代码中