Netty实战笔记(四)

Netty实战笔记(四)

Scroll Down

ChannelHandler 和ChannelPipeline

Netty可以在ChannelPipeline 中将ChannelHandler 链接在一起以组织处理逻辑

ChannelHandler 家族

Channel 的生命周期

Interface Channel 定义了一组和ChannelInboundHandler API 密切相关的简单但功能强大的状态模型,

状态描述
ChannelUnregisteredChannel 已经被创建,但还未注册到EventLoop
ChannelRegisteredChannel 已经被注册到了EventLoop
ChannelActiveChannel 处于活动状态(已经连接到它的远程节点),它现在可以接收和发送数据了
ChannelInactiveChannel 没有连接到远程节点

Channel 的正常生命周期如图所示,当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline 中的ChannelHandler,其可以随后对它们做出响应。

image.png

ChannelHandler的生命周期

类 型描 述
handlerAdded当把ChannelHandler 添加到ChannelPipeline 中时被调用
handlerRemoved当从ChannelPipeline 中移除ChannelHandler 时被调用
exceptionCaught当处理过程中在ChannelPipeline 中有错误产生时被调用

Netty 定义了下面两个重要的ChannelHandler 子接口:

  • ChannelInboundHandler——处理入站数据以及各种状态变化;

  • ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作。

ChannelInboundHandler 接口

下表列出了interface ChannelInboundHandler 的生命周期方法。这些方法将会在数据被接收时或者与其对应的Channel 状态发生改变时被调用。

类 型描 述
channelRegistered当Channel 已经注册到它的EventLoop 并且能够处理I/O 时被调用
channelUnregistered当Channel 从它的EventLoop 注销并且无法处理任何I/O 时被调用
channelActive当Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
channelInactivechannelInactive 当Channel 离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete当Channel上的一个读操作完成时被调用
channelRead当从Channel 读取数据时被调用
ChannelWritabilityChanged当Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生OutOfMemoryError)或者可以在Channel 变为再次可写时恢复写入。可以通过调用Channel 的isWritable()方法来检测Channel 的可写性。与可写性相关的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法来设置
userEventTriggered当ChannelnboundHandler.fireUserEventTriggered()方法被调
用时被调用,因为一个POJO 被传经了ChannelPipeline

当某个ChannelInboundHandler 的实现重写channelRead()方法时,它将负责显式地释放与池化的ByteBuf 实例相关的内存

//释放消息资源
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //丢弃已接收的消息
        ReferenceCountUtil.release(msg);
    }
}

Netty 将使用WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用SimpleChannelInboundHandler

@Sharable
//扩展了SimpleChannelInboundHandler
public class SimpleDiscardHandler
    extends SimpleChannelInboundHandler<Object> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
        Object msg) {
        //不需要任何显式的资源释放
        // No need to do anything special
    }
}

由于SimpleChannelInboundHandler 会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。

ChannelOutboundHandler 接口

出站操作和数据将由ChannelOutboundHandler 处理。它的方法将被ChannelChannelPipeline 以及ChannelHandlerContext 调用。

下表显示了所有由ChannelOutboundHandler本身所定义的方法

类 型描 述
bind(ChannelHandlerContext,
SocketAddress,ChannelPromise)
当请求将Channel 绑定到本地地址时被调用
connect(ChannelHandlerContext,
SocketAddress,SocketAddress,ChannelPromise)
当请求将Channel 连接到远程节点时被调用
disconnect(ChannelHandlerContext,
ChannelPromise)
当请求将Channel 从远程节点断开时被调用
close(ChannelHandlerContext,ChannelPromise)当请求关闭Channel 时被调用
deregister(ChannelHandlerContext,
ChannelPromise)
当请求将Channel 从它的EventLoop 注销时被调用
read(ChannelHandlerContext)当请求从Channel 读取更多的数据时被调用
flush(ChannelHandlerContext)当请求通过Channel 将入队数据冲刷到远程节点时被调用
write(ChannelHandlerContext,Object,
ChannelPromise)
当请求通过Channel 将数据写到远程节点时被调用

ChannelPromiseChannelFuture ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。ChannelPromiseChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()setFailure(),从而使ChannelFuture不可变

ChannelHandler 适配器

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter类作为自己的ChannelHandler的适配器类。这两个适配器分别提供了ChannelInboundHandlerChannelOutboundHandler的基本实现。通过扩展抽象类ChannelHandlerAdapter,它们获得了它们共同的超接口ChannelHandler的方法。生成的类的层次结构如:

image.png

ChannelHandlerAdapter还提供了实用方法isSharable()。如果其对应的实现被标注为Sharable,那么 这个方法将返回true,表示它可以被添加到多个ChannelPipeline中。(@Sharable注解)

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter中所提供的方法体调用了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到了ChannelPipeline中的下一个ChannelHandler中。

资源管理

为了诊断潜在的(资源泄漏)问题,Netty提供了class ResourceLeakDetector它将对你应用程序的缓冲区分配做大约1%的采样来检测内存泄露。相关的开销是非常小的。

Netty目前定义了4种泄漏检测级别:

级别描述
DISABLED禁用泄漏检测。只有在详尽的测试之后才应使用
SIMPLE使用1%的默认采样率检测并报告任何发现的泄露。这是默认级别,适合绝大部分情况。
ADVANCED使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置
PARANOID类似于ADVANCED,但是其将会对每次(对消息的)访问都进行采样。这对性能将会有很大影响,在调试阶段使用

泄露检测级别可以通过将下面的Java系统属性设置为表中的一个值来定义:

java-Dio.netty.leakDetectionLevel=ADVANCED

// 消费并释放入站消息
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //通过调用 ReferenceCountUtil.release()方法释放资源
        ReferenceCountUtil.release(msg);
    }
}
//丢弃并释放出站消息
public class DiscardOutboundHandler
    extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx,
        Object msg, ChannelPromise promise) {
        //通过使用 ReferenceCountUtil.realse(...)方法释放资源
        ReferenceCountUtil.release(msg);
        //通知 ChannelPromise数据已经被处理了
        promise.setSuccess();
    }
}

如果一个消息被消费或者丢弃了,并且没有传递给ChannelPipeline中的下一个ChannelOutboundHandler,那么用户就有责任调用ReferenceCountUtil.release()。如果消息到达了实际的传输层,那么当它被写入时或者Channel关闭时,都将被自动释放。

ChannelPipeline接口

ChannelPipeline是一个拦截流经Channel的入站和出站事件的Channel-Handler实例链,那么就很容易看出这些ChannelHandler之间的交互是组成一个应用程序数据和事件处理逻辑的核心。

每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联是永久性的;Channel既不能附加另外一个ChannelPipeline,也不能分离其当前的。在Netty组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理。随后,通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler

ChannelHandlerContext

ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler交互。ChannelHandler可以通知其所属的ChannelPipeline中的下一个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline。 ChannelHandlerContext具有丰富的用于处理事件和执行I/O 操作的API。

image.png

当你完成了通过调用ChannelPipeline.add*()方法将入站处理器(ChannelInboundHandler)和出站处理器(ChannelOutboundHandler)混合添加到ChannelPipeline之后,每一个ChannelHandler从头部到尾端的顺序位置正如同我们方才所定义它们的一样。因此,如果你将图中的处理器(ChannelHandler)从左到右进行编号,那么第一个被入站事件看到的ChannelHandler将是1,而第一个被出站事件看到的ChannelHandler将是 5。

ChannelPipeline传播事件时,它会测试ChannelPipeline中的下一个Channel-Handler的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline将跳过该ChannelHandler并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。

ChannelPipeline

ChannelHandler可以通过添加、删除或者替换其他的ChannelHandler来实时地修改ChannelPipeline的布局。

名称描述
addFirst addBefore addAfter addLast将一个ChannelHandler 添加到ChannelPipeline 中
remove将一个ChannelHandler 从 ChannelPipeline 中移除
replace将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个ChannelHandler
    //修改 ChannelPipeline    
    public static void modifyPipeline() {
        ChannelPipeline pipeline = CHANNEL_PIPELINE_FROM_SOMEWHERE; // get reference to pipeline;
        //创建一个 FirstHandler 的实例
        FirstHandler firstHandler = new FirstHandler();
        //将该实例作为"handler1"添加到ChannelPipeline 中
        pipeline.addLast("handler1", firstHandler);
        //将一个 SecondHandler的实例作为"handler2"添加到 ChannelPipeline的第一个槽中。这意味着它将被放置在已有的"handler1"之前
        pipeline.addFirst("handler2", new SecondHandler());
        //将一个 ThirdHandler 的实例作为"handler3"添加到 ChannelPipeline 的最后一个槽中
        pipeline.addLast("handler3", new ThirdHandler());
        //...
        //通过名称移除"handler3"
        pipeline.remove("handler3");
        //通过引用移除FirstHandler(它是唯一的,所以不需要它的名称)
        pipeline.remove(firstHandler);
        //将 SecondHandler("handler2")替换为 FourthHandler:"handler4"
        pipeline.replace("handler2", "handler4", new FourthHandler());
    }

ChannelHandler的执行和阻塞

通常ChannelPipeline中的每一个ChannelHandler都是通过它的EventLoop(I/O 线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的I/O 处理产生负面的影响。但有时可能需要与那些使用阻塞API 的遗留代码进行交互。对于这种情况,ChannelPipeline有一些接受一个EventExecutorGroup的add()方法。如果一个事件被传递给一个自定义的EventExecutorGroup,它将被包含在这个EventExecutorGroup中的某个EventExecutor所处理,从而被从该Channel本身的EventLoop中移除。对于这种用例,Netty提供了一个叫DefaultEventExecutor-Group的默认实现。

通过类型或者名称来访问ChannelHandler的方法:

名称描述
get通过类型或者名称返回 ChannelHandler
context返回和ChannelHandler 绑定的 ChannelHandlerContext
names返回ChannelPipeline 中所有 ChannelHandler 的名称

触发事件

ChannelPipeline的API公开了用于调用入站和出站操作的附加方法:

ChannelPipeline的入站操作:

方法名称描述
fireChannelRegistered调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法
fireChannelUnregistered调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelUnregistered(ChannelHandlerContext)方法
fireChannelActive调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelActive(ChannelHandlerContext)方法
fireChannelInactive调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelInactive(ChannelHandlerContext)方法
fireExceptionCaught调用ChannelPipeline中下一个ChannelInboundHandler的fireExceptionCaught(ChannelHandlerContext)方法
fireUserEventTriggered调用ChannelPipeline中下一个ChannelInboundHandler的fireUserEventTriggered(ChannelHandlerContext)方法
fireChannelRead调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelRead(ChannelHandlerContext)方法
fireChannelReadComplete调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelReadComplete(ChannelHandlerContext)方法
fireChannelWritabilityChanged调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelWritabilityChanged(ChannelHandlerContext)方法

在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。

ChannelPipeline的出站操作:

方法名称描述
bind将Channel绑定到一个本地地址,这将调用ChannelPipeline中的下一个ChannelOutboundHandler的bind(ChannelHandlerContext,Socket-Address, ChannelPromise)方法
connect将Channel连接到一个远程地址,这将调用ChannelPipeline中的下一个ChannelOutboundHandler的connect(ChannelHandlerContext, SocketAddress, ChannelPromise)方法
disconnect将Channel断开连接 。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的disconnect(ChannelHandlerContext, ChannelPromise)方法
close将Channel关闭。 这将调用ChannelPipeline中的下一个ChannelOutboundHandler的close(ChannelHandlerContext, ChannelPromise)方法
deregister将Channel从它先前所分配的EventExecutor(即EventLoop)中注销。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的deregister(ChannelHandlerContext, ChannelPromise)方法
flush冲刷Channel所有挂起的写入。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的flush(ChannelHandlerContext)方法
write将消息写入Channel。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的write(ChannelHandlerContext, Object msg, Channel-Promise)方法。注意:这并不会将消息写入底层的Socket,而只会将它放入队列中。要将它 写入Socket,需要调用flush()或者writeAndFlush()方法
writeAndFlush这是一个先调用write()方法再接着调用flush()方法的便利方法
read请求从Channel中读取更多的数据。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的read(ChannelHandlerContext)方法

总结:

  • ChannelPipeline保存了与Channel相关联的ChannelHandler;
  • ChannelPipeline可以根据需要,通过添加或者删除ChannelHandler来动态地修改;
  • ChannelPipeline有着丰富的API用以被调用,以响应入站和出站事件。

ChannelHandlerContext接口

hannelHandlerContext代表了ChannelHandlerChannelPipeline之间的关联,每当有ChannelHandler添加到ChannelPipeline中时,都会创建ChannelHandlerContextChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。

ChannelHandlerContext有很多的方法,其中一些方法也存在于ChannelChannelPipeline本身上,但是有一点重要的不同。如果调用Channel或者ChannelPipeline上的这些方法,它们将沿着整个ChannelPipeline进行传播。而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的ChannelHandler开始,并且只会传播给位于该ChannelPipeline中的下一个能够处理该事件的ChannelHandler

image.png

使用ChannelHandlerContext的API的时候,请牢记以下两点:

  • ChannelHandlerContextChannelHandler之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
  • 相对于其他类的同名方法,ChannelHandlerContext的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。

使用 ChannelHandlerContext

image.png

    // 从 ChannelHandlerContext 访问 Channel
    public static void writeViaChannel() {
        ChannelHandlerContext ctx = CHANNEL_HANDLER_CONTEXT_FROM_SOMEWHERE; //get reference form somewhere
        //获取到与 ChannelHandlerContext相关联的 Channel 的引用
        Channel channel = ctx.channel();
        //通过 Channel 写入缓冲区
        channel.write(Unpooled.copiedBuffer("Netty in Action",
                CharsetUtil.UTF_8));
    }

调用Channel 上的write()方法将会导致写入事件从尾端到头部地流经ChannelPipeline

    // 通过 ChannelHandlerContext 访问 ChannelPipeline
    public static void writeViaChannelPipeline() {
        ChannelHandlerContext ctx = CHANNEL_HANDLER_CONTEXT_FROM_SOMEWHERE; //get reference form somewhere
        //获取到与 ChannelHandlerContext相关联的 ChannelPipeline 的引用
        ChannelPipeline pipeline = ctx.pipeline(); //get reference form somewhere
        //通过 ChannelPipeline写入缓冲区
        pipeline.write(Unpooled.copiedBuffer("Netty in Action",
                CharsetUtil.UTF_8));
    }

这一次是写入ChannelPipeline,(到ChannelPipline 的)引用是通过ChannelHandlerContext 获取的。

image.png

被调用的ChannelChannelPipeline上的write()方法将一直传播事件通过整个ChannelPipeline,但是在ChannelHandler的级别上,事件从一个ChannelHandler到下一个ChannelHandler的移动是由ChannelHandlerContext上的调用完成的。

要想调用从某个特定的ChannelHandler开始的处理过程,必须获取到在(ChannelPipeline)该ChannelHandler之前的ChannelHandler所关联的ChannelHandlerContext。这个ChannelHandlerContext将调用和它所关联的ChannelHandler之后的ChannelHandler

消息将从下一个ChannelHandler开始流经ChannelPipeline,绕过了所有前面的ChannelHandler

    //调用 ChannelHandlerContext 的 write()方法
    public static void writeViaChannelHandlerContext() {
        //获取到 ChannelHandlerContext 的引用
        ChannelHandlerContext ctx = CHANNEL_HANDLER_CONTEXT_FROM_SOMEWHERE; //get reference form somewhere;
        //write()方法将把缓冲区数据发送到下一个 ChannelHandler
        ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
    }

image.png

因为一个ChannelHandler可以从 属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例。 对于这种用法 指在多个ChannelPipeline中共享同一个ChannelHandler,对应的ChannelHandler必须要使用**@Sharable**注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常。显而易见,为了安全地被用于多个并发的Channel(即连接),这样的ChannelHandler必须是线程安全的。

只应该在确定了你的ChannelHandler是线程安全的时才使用@Sharable注解。

异常处理

处理入站异常

如果在处理入站事件的过程中有异常被抛出,那么它将从它在ChannelInboundHandler里被触发的那一点开始流经ChannelPipeline。要想处理这种类型的入站异常,你需要在你的ChannelInboundHandler实现中重写下面的方法。

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
        Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻辑的ChannelInboundHandler通常位于ChannelPipeline的最后。这确保了所有的入站异常都总是会被处理,无论它们可能会发生在ChannelPipeline中的什么位置。

  • ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个ChannelHandler
  • 如果异常到达了ChannelPipeline的尾端,它将会被记录为未被处理;
  • 要想定义自定义的处理逻辑,你需要重写exceptionCaught()方法。然后你需要决定是否需要将该异常传播出去。

处理出站异常

用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制。

  • 每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener将在操作完成时被通知该操作是成功了还是出错了 。

  • 几乎所有的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例。作为ChannelFuture的子类,ChannelPromise也可以被分配用于异步通知的监听器。但是,ChannelPromise还具有提供立即通知的可写方法:

    ChannelPromise setSuccess();
    ChannelPromise setFailure(Throwable cause);

添加ChannelFutureListener只需要调用ChannelFuture实例上的addListener(ChannelFutureListener)方法,并且有两种不同的方式可以做到这一点。其中最常用的方式是,调用出站操作(如write()方法)所返回的ChannelFuture上的addListener()方法。

public class ChannelFutures {
    private static final Channel CHANNEL_FROM_SOMEWHERE = new NioSocketChannel();
    private static final ByteBuf SOME_MSG_FROM_SOMEWHERE = Unpooled.buffer(1024);

    /**
     *  添加 ChannelFutureListener 到 ChannelFuture
     * */
    public static void addingChannelFutureListener(){
        Channel channel = CHANNEL_FROM_SOMEWHERE; // get reference to pipeline;
        ByteBuf someMessage = SOME_MSG_FROM_SOMEWHERE; // get reference to pipeline;
        //...
        io.netty.channel.ChannelFuture future = channel.write(someMessage);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(io.netty.channel.ChannelFuture f) {
                if (!f.isSuccess()) {
                    f.cause().printStackTrace();
                    f.channel().close();
                }
            }
        });
    }
}

方式二:ChannelFutureListener添加到即将作为参数传递给
ChannelOutboundHandler的方法的ChannelPromise。

/**
 *  添加 ChannelFutureListener 到 ChannelPromise
 */
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg,
        ChannelPromise promise) {
        promise.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) {
                if (!f.isSuccess()) {
                    f.cause().printStackTrace();
                    f.channel().close();
                }
            }
        });
    }
}

ChannelPromise的可写方法
通过调用ChannelPromise上的setSuccess()setFailure()方法,可以使一个操作的状态在ChannelHandler的方法返回给其调用者时便即刻被感知到。