Netty实战笔记(三)

Netty实战笔记(三)

Scroll Down

ByteBuf

网络数据的基本单位总是字节。Java NIO 提供了ByteBuffer 作为它
的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐。

NettyByteBuffer 替代品是ByteBuf,一个强大的实现,既解决了JDK API 的局限性,又为网络应用程序的开发者提供了更好的API。

ByteBuf 的API

Netty 的数据处理API 通过两个组件暴露——(abstractclass ByteBuf 和(interfaceByteBufHolder

下面是一些ByteBuf API 的优点:

  • 它可以被用户自定义的缓冲区类型扩展;
  • 通过内置的复合缓冲区类型实现了透明的零拷贝;
  • 容量可以按需增长(类似于JDK 的StringBuilder);
  • 在读和写这两种模式之间切换不需要调用ByteBufferflip()方法;
  • 读和写使用了不同的索引;
  • 支持方法的链式调用;
  • 支持引用计数;
  • 支持池化。

ByteBuf 类——Netty 的数据容器

因为所有的网络通信都涉及字节序列的移动,所以高效易用的数据结构明显是必不可少的。

NettyByteBuf 实现满足并超越了这些需求。让我们首先来看看它是如何通过使用不同的索引来简化对它所包含的数据的访问的吧。

它是如何工作的

ByteBuf 维护了两个不同的索引:一个用于读取,一个用于写入。当你从 ByteBuf 读取时,它的readerIndex 将会被递增已经被读取的字节数。同样地,当你写入 ByteBuf 时,它的writerIndex 也会被递增。下图展示了一个空 ByteBuf 的布局结构和状态。

image.png

名称以read 或者write 开头的ByteBuf 方法,将会推进其对应的索引,而名称以set 或者get 开头的操作则不会。后面的这些方法将在作为一个参数传入的一个相对索引上执行操作。

ByteBuf 的使用模式

在使用Netty 时,你将遇到几种常见的围绕ByteBuf 而构建的使用模式。

先了解一下直接缓冲区和非直接缓冲区的概念

创建的缓冲区,在JVM内存外开辟内存,在每次调用基础操作系统的一个本机IO之前或者之后,虚拟机都会避免将缓冲区的内容复制到中间缓冲区(或者从中间缓冲区复制内容),缓冲区的内容驻留在物理内存内,会少一次复制过程,如果需要循环使用缓冲区,用直接缓冲区可以很大地提高性能。

虽然直接缓冲区使JVM可以进行高效的I/O操作,但它使用的内存是操作系统分配的,绕过了JVM堆栈,建立和销毁比堆栈上的缓冲区要更大的开销。

字节缓冲区要么是直接的,要么是非直接的。如果为直接字节缓冲区,则 Java 虚拟机会尽最大努力直接在此缓冲区上执行本机 I/O 操作。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)

image.png

image.png

1.堆缓冲区

最常用的 ByteBuf 模式是将数据存储在JVM 的堆空间中。这种模式被称为支撑数组(backing array),它能在没有使用池化的情况下提供快速的分配和释放。这种方式,如下代码所示,非常适合于有遗留的数据需要处理的情况。

    private static final ByteBuf BYTE_BUF_FROM_SOMEWHERE = Unpooled.buffer(1024);

    public static void heapBuffer() {
        ByteBuf heapBuf = BYTE_BUF_FROM_SOMEWHERE; //get reference form somewhere
        //检查 ByteBuf 是否有一个支撑数组
        if (heapBuf.hasArray()) {
            //如果有,则获取对该数组的引用
            byte[] array = heapBuf.array();
            //计算第一个字节的偏移量
            int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
            //获得可读字节数
            int length = heapBuf.readableBytes();
            //使用数组、偏移量和长度作为参数调用你的方法
            handleArray(array, offset, length);
        }
    }

直接缓冲区

直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。如果你正在处理遗留代码,你也可能会遇到另外一个缺点:因为数据不是在堆上,所以你不得不进行一次复制。如下代码清单所示

    private static final ByteBuf BYTE_BUF_FROM_SOMEWHERE = Unpooled.buffer(1024);

    public static void directBuffer() {
        ByteBuf directBuf = BYTE_BUF_FROM_SOMEWHERE; //get reference form somewhere
        //检查 ByteBuf 是否由数组支撑。如果不是,则这是一个直接缓冲区
        if (!directBuf.hasArray()) {
            //获取可读字节数
            int length = directBuf.readableBytes();
            //分配一个新的数组来保存具有该长度的字节数据
            byte[] array = new byte[length];
            //将字节复制到该数组
            directBuf.getBytes(directBuf.readerIndex(), array);
            //使用数组、偏移量和长度作为参数调用你的方法
            handleArray(array, 0, length);
        }
    }

复合缓冲区

复合缓冲区为多个ByteBuf 提供一个聚合视图。在这里你可以根据需要添加或者删除ByteBuf 实例,这是一个JDK 的ByteBuffer 实现完全缺失的特性。

Netty 通过一个ByteBuf子类——CompositeByteBuf——实现了这个模式,它提供了一个将多个缓冲区表示为单个合并缓冲区的虚拟表示。

警告 CompositeByteBuf 中的ByteBuf 实例可能同时包含直接内存分配和非直接内存分配。
如果其中只有一个实例,那么对CompositeByteBuf 上的hasArray()方法的调用将返回该组件上的hasArray()方法的值;否则它将返回false

HTTP 协议传输的消息由头部和主体组成

这两部分由应用程序的不同模块产生,将会在消息被发送的时候组装。该应用程序可以选择为多个消息重用相同的消息主体。当这种情况发生时,对于每个消息都将会创建一个新的头部。

CompositeByteBuf在消除了没必要的复制的同时,暴露了通用的ByteBuf API

image.png

如下代码展示了如何通过使用JDK 的ByteBuffer 来实现这一需求。创建了一个包含两个ByteBuffer 的数组用来保存这些消息组件,同时创建了第三ByteBuffer 用来保存所有这些数据的副本。

    /**
     * 使用 ByteBuffer 的复合缓冲区模式
     */
    public static void byteBufferComposite(ByteBuffer header, ByteBuffer body) {
        // Use an array to hold the message parts
        ByteBuffer[] message =  new ByteBuffer[]{ header, body };

        // Create a new ByteBuffer and use copy to merge the header and body
        ByteBuffer message2 =
                ByteBuffer.allocate(header.remaining() + body.remaining());
        message2.put(header);
        message2.put(body);
        message2.flip();
    }

下面代码清单使用CompositeByteBuf 的复合缓冲区模式

    private static final ByteBuf BYTE_BUF_FROM_SOMEWHERE = Unpooled.buffer(1024);

    public static void byteBufComposite() {
        CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
        ByteBuf headerBuf = BYTE_BUF_FROM_SOMEWHERE; // can be backing or direct
        ByteBuf bodyBuf = BYTE_BUF_FROM_SOMEWHERE;   // can be backing or direct
        //将 ByteBuf 实例追加到 CompositeByteBuf
        messageBuf.addComponents(headerBuf, bodyBuf);
        //...
        //删除位于索引位置为 0(第一个组件)的 ByteBuf
        messageBuf.removeComponent(0); // remove the header
        //循环遍历所有的 ByteBuf 实例
        for (ByteBuf buf : messageBuf) {
            System.out.println(buf.toString());
        }
    }

CompositeByteBuf 可能不支持访问其支撑数组,因此访问CompositeByteBuf 中的数据类似于(访问)直接缓冲区的模式

    /**
     * 代码清单 5-5 访问 CompositeByteBuf 中的数据
     */
    public static void byteBufCompositeArray() {
        CompositeByteBuf compBuf = Unpooled.compositeBuffer();
        //获得可读字节数
        int length = compBuf.readableBytes();
        //分配一个具有可读字节数长度的新数组
        byte[] array = new byte[length];
        //将字节读到该数组中
        compBuf.getBytes(compBuf.readerIndex(), array);
        //使用偏移量和长度作为参数使用该数组
        handleArray(array, 0, array.length);
    }

字节级操作

ByteBuf 提供了许多超出基本读、写操作的方法用于修改它的数据

随机访问索引

Java的字节数组一样,ByteBuf 的索引是从零开始的:第一个字节的索引是0,最后一个字节的索引总是capacity() - 1

ByteBuf buffer = ...;
for (int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i);
System.out.println((char)b);
}

顺序访问索引

ByteBuf 被读索引和写索引划分成3个区域。

image.png

可丢弃字节

在上图中标记为可丢弃字节的分段包含了已经被读过的字节,通过调用`discardReadBytes()方法,可以丢弃它们并回收空间。

这个分段的初始大小为0,存储在readerIndex 中,会随着read操作的执行而增加(get操作不会移动readerIndex)。

下图展示了中上图所展示的缓冲区上调用discardReadBytes()方法后的结果。

可以看到,可丢弃字节分段中的空间已经变为可写的了。

注意,在调用discardReadBytes()之后,因为只是移动了可以读取的字节以及writerIndex,而没有对所有可写入的字节进行擦除写。所以对可写分段的内容并没有任何的保证。

image.png

可读字节

ByteBuf 的可读字节分段存储了实际数据,任何名称以read 或者skip 开头的操作都将检索或者跳过位于当前readerIndex 的数据,并且将它增加已读字节数。

    //读取所有数据
    public static void readAllData() {
        ByteBuf buffer = Unpooled.buffer(1024);
        while (buffer.isReadable()) {
            System.out.println(buffer.readByte());
        }
    }

可写字节

可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域,任何名称以write 开头的操作都将从当前的writerIndex处开始写数据,并将它增加已经写入的字节数

    public static void write() {
        ByteBuf buffer = Unpooled.buffer(1024);
        while (buffer.writableBytes() >= 4) {
            buffer.writeInt(random.nextInt());
        }
    }

索引管理

我们可以通过调用markReaderIndex()markWriterIndex()resetWriterIndex()resetReaderIndex()来标记和重置ByteBufreaderIndexwriterIndex

也可以通过调用readerIndex(int)或者writerIndex(int)来将索引移动到指定位置

可以通过调用clear()方法来将readerIndexwriterIndex 都设置为0。但这并不会清除内存中的内容

image.png

image.png

调用clear()比调用discardReadBytes()轻量得多,因为它将只是重置索引而不会复制任何的内存。

查找操作

ByteBuf中有多种可以用来确定指定值的索引的方法。最简单的是使用indexOf()方法。

较复杂的查找可以通过那些需要一个ByteBufProcessor,它针对一些常见的值定义了许多便利的方法。

这个接口只定义了一个方法:

boolean process(byte value)

它将检查输入值是否是正在查找的值。

假设你的应用程序需要和所谓的包含有以NULL结尾的内容的Flash套接字
作为参数的方法达成。

ByteBuf buffer = ...;
int index =buffer.forEachByte(ByteBufProcessor.FIND_NUL)

派生缓冲区

派生缓冲区为ByteBuf 提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方法被创建的:

  • duplicate();
  • slice();
  • slice(int, int);
  • Unpooled.unmodifiableBuffer(…);
  • order(ByteOrder);
  • readSlice(int)。

每个这些方法都将返回一个新的ByteBuf 实例,它具有自己的读索引、写索引和标记索引。

其内部存储是共享的,如果你修改了它的内容,也同时修改了其对应的源实例
如果需要一个现有缓冲区的真实副本,请使用copy()或者copy(int, int)方
法。不同于派生缓冲区,由这个调用所返回的ByteBuf 拥有独立的数据副本。

下面代码演示一下两者的区别

    //使用slice(int, int)对 ByteBuf 进行切片
    public static void byteBufSlice() {
        Charset utf8 = Charset.forName("UTF-8");
        //创建一个用于保存给定字符串的字节的 ByteBuf
        ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
        //创建该 ByteBuf 从索引 0 开始到索引 15 结束的一个新切片
        ByteBuf sliced = buf.slice(0, 15);
        //将打印“Netty in Action”
        System.out.println(sliced.toString(utf8));
        //更新索引 0 处的字节
        buf.setByte(0, (byte)'J');
        //将会成功,因为数据是共享的,对其中一个所做的更改对另外一个也是可见的
        assert buf.getByte(0) == sliced.getByte(0);
    }

    //复制一个 ByteBuf
    public static void byteBufCopy() {
        Charset utf8 = Charset.forName("UTF-8");
        //创建 ByteBuf 以保存所提供的字符串的字节
        ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
        //创建该 ByteBuf 从索引 0 开始到索引 15 结束的分段的副本
        ByteBuf copy = buf.copy(0, 15);
        //将打印“Netty in Action”
        System.out.println(copy.toString(utf8));
        //更新索引 0 处的字节
        buf.setByte(0, (byte)'J');
        //将会成功,因为数据不是共享的
        assert buf.getByte(0) != copy.getByte(0);
    }

读/写操作

两种类别的读/写操作:

  • get()set()操作,从给定的索引开始,并且保持索引不变;
  • read()write()操作,从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整。
    // get()和 set()方法的用法
    public static void byteBufSetGet() {
        Charset utf8 = Charset.forName("UTF-8");
        //创建一个新的 ByteBuf以保存给定字符串的字节
        ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
        //打印第一个字符'N'
        System.out.println((char)buf.getByte(0));
        //存储当前的 readerIndex 和 writerIndex
        int readerIndex = buf.readerIndex();
        int writerIndex = buf.writerIndex();
        //将索引 0 处的字 节更新为字符'B'
        buf.setByte(0, (byte)'B');
        //打印第一个字符,现在是'B'
        System.out.println((char)buf.getByte(0));
        //将会成功,因为这些操作并不会修改相应的索引
        assert readerIndex == buf.readerIndex();
        assert writerIndex == buf.writerIndex();
    }

read()操作,其作用于当前的readerIndexwriterIndex
这些方法将用于从ByteBuf 中读取数据,如同它是一个流

     //read()和 write()操作
    public static void byteBufWriteRead() {
        Charset utf8 = Charset.forName("UTF-8");
        //创建一个新的 ByteBuf 以保存给定字符串的字节
        ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
        //打印第一个字符'N'
        System.out.println((char)buf.readByte());
        //存储当前的readerIndex
        int readerIndex = buf.readerIndex();
        //存储当前的writerIndex
        int writerIndex = buf.writerIndex();
        //将字符 '?'追加到缓冲区
        buf.writeByte((byte)'?');
        assert readerIndex == buf.readerIndex();
        //将会成功,因为 writeByte()方法移动了 writerIndex
        assert writerIndex != buf.writerIndex();
    }

ByteBufHolder 接口

除了实际的数据负载之外,我们还需要存储各种属性值。

HTTP 响应便是一个很好的例子,除了表示为字节的内容,还包括状态码、cookie 等。

为了处理这种常见的用例,Netty 提供了ByteBufHolder

ByteBufHolder 也为Netty 的高级特性提供了支持,如缓冲区池化,其中可以从池中借用ByteBuf,并且在需要时自动释放。

image.png

ByteBuf 分配

管理ByteBuf 实例有不同的方式

按需分配:ByteBufAllocator 接口

ByteBufAllocator 实现了(ByteBuf 的)池化,它可以用来分配我们所描述过的任意类型的ByteBuf 实例

可以通过Channel(每个都可以有一个不同的ByteBufAllocator 实例)或者绑定到ChannelHandlerChannelHandlerContext 获取一个到ByteBufAllocator 的引用

    //获取一个到 ByteBufAllocator 的引用
    public static void obtainingByteBufAllocatorReference(){
        Channel channel = CHANNEL_FROM_SOMEWHERE; //get reference form somewhere
        //从 Channel 获取一个到ByteBufAllocator 的引用
        ByteBufAllocator allocator = channel.alloc();
        //...
        ChannelHandlerContext ctx = CHANNEL_HANDLER_CONTEXT_FROM_SOMEWHERE; //get reference form somewhere
        //从 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用
        ByteBufAllocator allocator2 = ctx.alloc();
        //...
    }

Unpooled 缓冲区

可能某些情况下,你未能获取一个到ByteBufAllocator 的引用。对于这种情况,Netty 提供了一个简单的称为Unpooled 的工具类,它提供了静态的辅助方法来创建未池化的ByteBuf实例。

image.png

引用计数

Netty 在第4 版中为ByteBufByteBufHolder 引入了引用计数技术,它们都实现了interface ReferenceCounted

    /**
     * 引用计数
     * */
    public static void referenceCounting(){
        Channel channel = CHANNEL_FROM_SOMEWHERE; //get reference form somewhere
        //从 Channel 获取ByteBufAllocator
        ByteBufAllocator allocator = channel.alloc();
        //...
        //从 ByteBufAllocator分配一个 ByteBuf
        ByteBuf buffer = allocator.directBuffer();
        //检查引用计数是否为预期的 1
        assert buffer.refCnt() == 1;
        //...
    }
    // 5-16 释放引用计数的对象
    public static void releaseReferenceCountedObject(){
        ByteBuf buffer = BYTE_BUF_FROM_SOMEWHERE; //get reference form somewhere
        //减少到该对象的活动引用。当减少到 0 时,该对象被释放,并且该方法返回 true
        boolean released = buffer.release();
        //...
    }