前言
前面我们学了Netty的组件及其设计,这次我们深入一下它的各个环节。
本文代码可以通过这里查看地址
Channel
channel翻译为传输,我们所说的传输就是数据在网络的流动,一般是以字节形式流动的。但是,用户其实不关心这些细节的,他们只关心他们的字节被可靠的发送和接收。
在上述代码地址中,有Java API版本的传输的例子,包含NIO和BIO。然后在Netty版本中,我们发现NIO和BIO(Netty叫OIO)的版本如此简洁一致,只需要EventLoopGroup和channel传入的方法即可轻松切换;而Java版本的却十分复杂。这就是Netty的优点之一,api一致性。
API介绍
传输API的核心是Channel接口,它处理所有的IO操作,下面是它的接口层次结构:
我们可以看到,每个Channel都会生成一个ChannelPipeline和ChannelConfig。ChannelPipeline持有所有将应用于入站和出站数据以及事件的ChannelHandler实 例,这些ChannelHandler实现了应用程序用于处理状态变化以及数据处理的逻辑;ChannelConfig包含了该Channel的所有配置设置,并且支持热更新。
ChannelPipeline实现了拦截过滤模式,有点类似Java中拦截器和过滤器的作用,持有的ChannelHandler有以下几个作用:
- 将数据从一种格式转换成另一种格式
- 提供异常通知
- 提供Channel变为active或者非active通知
- 提供当 Channel 注册到 EventLoop 或者从 EventLoop 注销时的通知
- 提供有关用户自定义事件的通知
内置的Channel
Netty内置了一些可以开箱即用的Channel,参见下表:
名称 | 包 | 描述 |
---|---|---|
NIO | io.netty.channel.socket.nio | 使用java.nio.channels包作为基础——基于 选择器的方式 |
Epoll | io.netty.channel.epoll | 由 JNI 驱动的 epoll()和非阻塞 IO。这个传输支持 只有在 Linux 上可用的多种特性,如 SO_REUSEPORT, 比 NIO 传输更快,而且是完全非阻塞的 |
OIO | io.netty.channel.socket.oio | 使用java.net包作为基础——使用阻塞流 |
Local | io.netty.channel.local | 可以在 VM 内部通过管道进行通信的本地传输 |
Embedded | io.netty.channel.embedded | Embedded 传输,允许使用 ChannelHandler 而又 不需要一个真正的基于网络的传输。这在测试你的 ChannelHandler 实现时非常有用 |
ByteBuf
之前提到网络数据的基本单位总是字节,Java NIO提供了ByteBuffer作为它的字节容器,但是使用起来极其复杂,难用。因此,Netty实现了自己的数据容器-ByteBuf,一个强大的实现,既解决了 JDK API 的局限性, 又为网络应用程序的开发者提供了更好的 API。
API介绍
Netty数据处理的API主要依赖抽象类ByteBuf和ByteBufHolder接口,它具有以下优点:
- 它可以被用户自定义的缓冲区类型扩展
- 通过内置的复合缓冲区类型实现了透明的零拷贝
- 容量可以按需增长(类似于 JDK 的 StringBuilder)
- 在读和写这两种模式之间切换不需要调用 ByteBuffer 的 flip()方法
- 读和写使用了不同的索引
- 支持方法的链式调用
- 支持引用计数
- 支持池化
工作原理
ByteBuf是一个由不同的索引分别控制读访问和写访问的字节数组。它维护了两个不同的索引,一个用于读,一个用于写。当你从ByteBuf读的时候,它的readerIndex将会递增已被读取的字节数;当你向ByteBuf写入的时候,它的writeIndex也会递增;当readerIndex达到writeIndex的时候,就会达到可读数据的末尾,试图读取超出该点的数据会出发数组越界,抛出异常IndexOutOfBoundsExcetption,下图是ByteBuf的简单结构:
使用方式
堆缓冲区
最常用的方式是将将数据存储在JVM堆区中,这种模式叫做支撑数组,它能在没有池化的情况下提供快速分配和释放,适合有遗留的数据需要处理的情况,可见代码:
ByteBuf buf = ...
//检查ByteBuf是否有一个支撑数组
if(buf.hasArray()){
//如果有获取数组的引用
byte[] array = buf.array();
int firstIndex = buf.arrayOffset() + buf.readerIndex(); //计算第一个字节的偏移
int length = buf.readableBytes();//获取可读字节数
handleArray(array, firstIndex, length);//使用数组
}
直接缓冲区
将数据存储在JVM堆外。这样做的好处在于,如果是在对上分配的数据,那么在发送到Socket之前,JVM会把你的数据复制到堆外的缓存区中,在把这个缓存区发送到Socket里。
缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵;如果你在处理历史代码的时候,因为数据不是在JVM堆上,你需要进行额外的处理(处理代码如下)。如果说你知道ByteBuf容器中的数据是作为数组来访问的,建议使用
ByteBuf buf = ...;
//检查ByteBuf是否有一个支撑数组
if(!buf.hasArray()){
//没有支撑数组,证明是一个直接缓冲区
int length = buf.readableBytes();//获取可读字节数
byte[] array = new byte[length];//分配一个新数组来保存堆外数据
buf.getBytes(buf.readerIndex(), array);//将堆外数据复制到堆内
handleArray(array, 0, length);//使用数组
}
复合缓冲区
简单来说就是多个ByteBuf聚合在一起,可以根据需要添加或者删除ByteBuf实例,这个功能JDK没有。Netty通过一个ByteBuf子类—-CompositeByteBuf实现这个模式。
为什么需要这个?
假设一个消息由头部和主体组成,这两个部分分别由不同程序产生,会在消息被发送的时候组装到一起。问题来了,如果发送多条消息,消息头部一致,只是内容不一样。传统方案处理方式是每发送一条消息就创建一个新的头部和主体的缓冲区,这样就导致了内存的占用。
使用CompositeByteBuf的话就只需要为主体创建缓冲区,头部复用之前的缓冲区即可,消除了没有必要的复制,并且提供了和ByteBuf一致的API。
下面代码显示两个处理方式之间的区别:
使用ByteBuffer
// Use an array to hold the message parts
ByteBuffer[] message = new ByteBuffer[] { header, body };
// Create a new ByteBuffer and use copy to merge the header and body ByteBuffer message2 =
ByteBuffer.allocate(header.remaining() + body.remaining()); message2.put(header);
message2.put(body);
message2.flip();
使用CompositeByteBuf
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ...; // can be backing or direct
ByteBuf bodyBuf = ...; // can be backing or direct
messageBuf.addComponents(headerBuf, bodyBuf);
.....
messageBuf.removeComponent(0); // remove the header
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
//访问 CompositeByteBuf 中的数据
int length = messageBuf.readableBytes();//获得可读字节数
byte[] array = new byte[length];//根据length分配数组
messageBuf.getBytes(compBuf.readerIndex(), array);//将字节读到该数组中去
handleArray(array, 0, array.length);//使用数组
字节级操作
ByteBuf 提供了许多超出基本读、写操作的方法用于修改它的数据,接下来我们研究一下。
随机访问索引
ByteBuf索引是从0开始的,最后一个字节的索引总是capacity()-1,所以可以这样遍历:
ByteBuf buffer = ...;
for (int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i); //注意,这里不回改变读写指针的位置
System.out.println((char)b);
}
访问索引顺序
JDK中的ByteBuffer只有一个索引,这就是为什么它读写的时候需要flip()方法来切换读写模式。对于Nett,ByteBuf有两个索引,更易操作和理解。如图:
可丢弃字节
在上图标记的可丢弃字节中,表示已经被读取过的字节。通过调用discardReadBytes()方法,丢弃并回收空间,此时可读部分(content标记)必须移动到缓存区前面,如图:
所以,建议只有在真正需要的时候(内存很宝贵的时候)这样做,因为会导致内存复制影响性能。
可读字节
ByteBuf可读字节分段存储了实际数据,默认readerIndex是0,read或者skip开头的方法会检索或者跳过当前readerIndex数据,并且将它增加已读字节数。
如果read开头的方法(readBytes(ByteBuf byteBuf)
)需要一个ByteBuf作为参数,那么此时writeIndex也会增加。
下面代码展示如何读取所有数据:
ByteBuf buf = Unpooled.buffer();
while (buf.isReadable()){
...
}
可写字节
可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域,默认的writeIndex是0,write开头的方法会从当前的writeIndex开始写数据,并将它增加已写入的字节数。
如果write开头的方法(writeBytes(ByteBuf dest)
)需要传递一个ByteBuf参数,那么此时readIndex也会增加同样大小。
下面代码展示用随机整数值填充缓冲区,直到它空间不足为止:
ByteBuf buf = Unpooled.buffer();
//方法返回可写字节数,等效于this.capacity - this.writerIndex
while (buf.writableBytes()>4){
buf.writeInt(new Random().nextInt());
}
索引管理
JDK中InputStream定义了mark(int readlimit)和reset()方法用于将流中的当前位置标记为指定的值,以及将流重置到该位置。
同样的,ByteBuf可以通过调用 markReaderIndex()、markWriterIndex()、resetWriterIndex() 和 resetReaderIndex()来标记和重置 ByteBuf 的 readerIndex 和 writerIndex。
也可以通过调用 readerIndex(int)或者 writerIndex(int)来将索引移动到指定位置。试图将任何一个索引设置到一个无效的位置都将导致一个 IndexOutOfBoundsException。
可以通过调用 clear()方法来将 readerIndex 和 writerIndex 都设置为 0。注意,这并不会清除内存中的内容
因此,比discardReadBytes()轻量很多,没有内存复制。
查找操作
最简单的就是indexOf方法查找,稍微复杂的可以使用ByteProcessor。举个例子:
ByteBuf buf = Unpooled.buffer();
int index = buf.forEachByte(ByteProcessor.FIND_LF);
派生缓冲区
以下几个方法可以创建派生缓冲区:
- duplicate()
- slice()
- slice(int, int)
- Unpooled.unmodifiableBuffer(…)
- order(ByteOrder);
- readSlice(int)
每个这些方法都将返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记索引。
其内部存储和 JDK 的 ByteBuffer 一样也是共享的。
这使得派生缓冲区的创建成本是很低廉的,但是这也意味着,如果你修改了它的内容,也同时修改了其对应的源实例,所以要小心。
ByteBuf 复制
如果需要一个现有缓冲区的真实副本,请使用 copy()或者 copy(int, int)方法。不同于派生缓冲区,由这个调用所返回的 ByteBuf 拥有独立的数据副本。
我们来看看派生和复制带案例:
//派生缓冲区
Charset utf8 = Charset.forName("UTF-8");
//创建一个ByteBuf
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
//创建一个切片
ByteBuf sliced = buf.slice(0, 15);
//打印 Netty in Action
System.out.println(sliced.toString(utf8));
//更新原来ByteBuf 索引0位置的字节
buf.setByte(0, (byte)'J');
//因为数据是共享的,改一个另一个也会改,结果是true
System.out.println(buf.getByte(0) == sliced.getByte(0));
//复制
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
//复制一份ByteBuf
ByteBuf copy = buf.copy(0, 15);
//打印 Netty in Action
System.out.println(copy.toString(utf8));
//更新原来ByteBuf 索引0位置的字节
buf.setByte(0, (byte) 'J');
//因为数据不是共享的,改一个另一个不拜年,结果是false
System.out.println(buf.getByte(0) == copy.getByte(0));
ByteBufHolder接口
除了实际数据之外,我们还需要存储各种属性值,HTTP响应便是一个很好的例子,除了表示为字节的内容,还包括状态码、cookie 等。
为了处理这种常见的用例,Netty 提供了 ByteBufHolder
ByteBufHolder 只有几种用于访问底层数据和引用计数的方法,见下表:
名称 | 描述 |
---|---|
content() | 返回由这个 ByteBufHolder 所持有的 ByteBuf |
copy() | 返回这个 ByteBufHolder 的一个深拷贝,包括一个其所包含的 ByteBuf 的非共享拷贝 |
duplicate() | 返回这个ByteBufHolder的一个浅拷贝,包括一个其所包含的ByteBuf的共享拷贝 |
ByteBuf分配
按需分配:ByteBufAllocator
为了节省内存分配和释放的压力,Netty通过ByteBufAllocator实现了ByteBuf的池化,它可以用来分配我们所描述过的任意类型的 ByteBuf 实例。下面列出一些api:
名称 描述 buffer()
buffer(int initialCapacity);
buffer(int initialCapacity, int maxCapacity);返回一个基于堆或者直接内存 存储的 ByteBuf heapBuffer()
heapBuffer(int initialCapacity)
heapBuffer(int initialCapacity, int maxCapacity)返回一个基于堆内存存储的ByteBuf directBuffer()
directBuffer(int initialCapacity)
directBuffer(int initialCapacity, int maxCapacity)返回一个基于直接内存存储的ByteBuf compositeBuffer()
compositeBuffer(int maxNumComponents)
compositeDirectBuffer()
compositeDirectBuffer(int maxNumComponents)
compositeHeapBuffer()
compositeHeapBuffer(int maxNumComponents)返回一个可以通过添加最大到指定数目的基于堆的或者直接内存存储的缓冲区来扩展的 CompositeByteBuf ioBuffer() 返回一个用于套接字的 I/O 操 作的 ByteBuf Netty提供了两种ByteBufAllocator的实现:PooledByteBufAllocator和UnpooledByteBufAllocator,前者池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片。
Unpooled 缓冲区
可能某些情况下,你未能获取一个到 ByteBufAllocator 的引用。对于这种情况,Netty 提 供了一个简单的称为 Unpooled 的工具类,它提供了静态的辅助方法来创建未池化的 ByteBuf 实例。下面是一些api:
名称 描述 buffer()
buffer(int initialCapacity)
buffer(int initialCapacity, int maxCapacity)返回一个未池化的基于堆内存存储的 directBuffer()
directBuffer(int initialCapacity)
directBuffer(int initialCapacity, int maxCapacity)返回一个未池化的基于直接内存存储 的 ByteBuf wrappedBuffer() 返回一个包装了给定数据的 ByteBuf copiedBuffer() 返回一个复制了给定数据的 ByteBuf Unpooled 类还使得 ByteBuf 同样可用于那些并不需要 Netty 的其他组件的非网络项目, 使得其能得益于高性能的可扩展的缓冲区 API。
ByteBufUtil 类
ByteBufUtil 提供了用于操作 ByteBuf 的静态的辅助方法。因为这个 API 是通用的,并且和池化无关,所以这些方法已然在分配类的外部实现。
这些静态方法中最有价值的可能就是 hexdump()方法,它以十六进制的表示形式打印 ByteBuf 的内容。这在各种情况下都很有用,例如,出于调试的目的记录 ByteBuf 的内容。十六进制的表示通常会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六进制的版本还可以很容易地转换回实际的字节表示。
另一个有用的方法是 boolean equals(ByteBuf, ByteBuf),它被用来判断两个 ByteBuf 实例的相等性。如果你实现自己的 ByteBuf 子类,你可能会发现 ByteBufUtil 的其他有用方法。
引用计数
引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。
引用计数背后的想法并不是特别的复杂;它主要涉及跟踪到某个特定对象的活动引用的数量。一个 ReferenceCounted 实现的实例将通常以活动的引用计数为 1 作为开始。只要引用 数大于 0,就能保证对象不会被释放。当活动引用的数量减少到 0 时,该实例就会被释放。注意, 虽然释放的确切语义可能是特定于实现的,但是至少已经释放的对象应该不可再用了。
引用释放的代码示例:
ByteBufAllocator alloc = ByteBufAllocator.DEFAULT;
ByteBuf buf = alloc.directBuffer();
System.out.println("buf.refCnt() = " + buf.refCnt());//1
//release
buf.release();
System.out.println("buf.refCnt() = " + buf.refCnt());//0
谁负责释放
一般来说,是由最后访问(引用计数)对象的那一方来负责将它释放。
ChannelHandler和ChannelPipeline
ChannelPipeline中将ChannelHandler连接组织在一起以处理逻辑,以及ChannelHandlerContext的作用。
ChannelHander家族
Channel的生命周期
Channel定义了一组和ChannelInboundHandler API密切相关的简单但是功能强大的状态模型,以下是Channel的这4个状态:
状态 | 描述 |
---|---|
ChannelUnregistered | Channel 已经被创建,但还未注册到 EventLoop |
ChannelRegistered | Channel 已经被注册到了 EventLoop |
ChannelActive | Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了 |
ChannelInactive | Channel没有连接到远程节点 |
Channel的正常生命周期如下图所示,当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。
ChannelHandler的生命周期
下表列出了ChannelHandler定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline中或者被从ChannelPipeline中移除会调用这些操作,这些方法都接收一个ChannelHandlerContext参数。
状态 | 描述 |
---|---|
handlerAdded | 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用 |
handlerRemoved | 当从 ChannelPipeline 中移除 ChannelHandler 时被调用 |
exceptionCaught | 当处理过程中在 ChannelPipeline 中有错误产生时被调用 |
Netty定义了下面两个最重要的ChannelHandler接口:
- ChannelInboundHandler—处理入站数据以及各种状态变化
- ChannelOutboundHandler—处理出站数据并且允许拦截所有的操作
ChannelInboundHandler
下表列出ChannelInboundHandler的生命周期方法,这些方法会在数据被接收或者与其对应的Channel状态发生改变时被调用。
状态 | 描述 |
---|---|
channelRegistered | 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用 |
channelUnregistered | 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调用 |
channelActive | 当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪 |
channelInactive | 当 Channel 离开活动状态并且不再连接它的远程节点时被调用 |
channelReadComplete | 当Channel上的一个读操作完成时被调用 |
channelRead | 当从 Channel 读取数据时被调用 |
ChannelWritabilityChanged | 当Channel的可写状态发生改变时被调用 |
userEventTriggered | 当 ChannelnboundHandler.fireUserEventTriggered()方法被调 用时被调用,因为一个 POJO 被传进了 ChannelPipeline |
当某个ChannelInboundHandler的实现重写了channelRead()方法时,它将负责显式地释放与池化ByteBuf实例相关的内存,为此Netty提供了一个实用方法ReferenceCountUtil.release(),代码如下:
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
但是以这种方式管理资源可能很繁琐,一个更加简单的方式是使用SimpleChannelInboundHandler,具体代码如下:
@Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,Object msg) {
// No need to do anything special
}
}
由于 SimpleChannelInboundHandler 会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。
ChannelOutboundHandler
ChannelOutboundHandler 的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求,下面显示 ChannelOutboundHandler 本身所定义的方法:
状态 | 描述 |
---|---|
bind(ChannelHandlerContext, SocketAddress,ChannelPromise) | 当请求将 Channel 绑定到本地地址时被调用 |
connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) | 当请求将 Channel 连接到远程节点时被调用 |
disconnect(ChannelHandlerContext, ChannelPromise) | 当请求将 Channel 从远程节点断开时被调用 |
close(ChannelHandlerContext,ChannelPromise) | 当请求关闭 Channel 时被调用 |
deregister(ChannelHandlerContext, ChannelPromise) | 当请求将 Channel 从它的 EventLoop 注销 时被调用 |
read(ChannelHandlerContext) | 当请求从 Channel 读取更多的数据时被调用 |
flush(ChannelHandlerContext) | 当请求通过 Channel 将入队数据冲刷到远程 |
write(ChannelHandlerContext,Object, ChannelPromise) | 当请求通过 Channel 将数据写到远程节点时 被调用 |
ChannelPromise与ChannelFuture
ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。
ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess()和setFailure(),从而使ChannelFuture不可变 。
ChannelHandler 适配器
你可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类作为自己的 ChannelHandler 的起始点。
这两个适配器分别提供了 ChannelInboundHandler 和 ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter,它们 获得了它们共同的超接口 ChannelHandler 的方法。
具体类图如下:
ChannelPipeline
每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。这项关联是永久性 的;Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件 的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。
根据事件的起源,事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个ChannelHandler。
ChannelHandlerContext
ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的 ChannelHandler 交 互 。 ChannelHandler 可 以 通 知 其 所 属 的 ChannelPipeline 中 的 下 一 个 ChannelHandler,甚至可以动态修改它所属的ChannelPipeline1。
ChannelHandlerContext 具有丰富的用于处理事件和执行 I/O 操作的 API。6.3 节将提供有关 ChannelHandlerContext 的更多内容。
下图是传播流程:
在 ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline 将跳过该 ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。
修改ChannelPipeline
ChannelHandler 可以通过添加、删除或者替换其他的 ChannelHandler 来实时地修改 ChannelPipeline 的布局。(它也可以将它自己从 ChannelPipeline 中移除。)这是 Channel- Handler 最重要的能力之一,所以我们将仔细地来看看它是如何做到的。下表列出了相关方法:
名称 | 描述 |
---|---|
addFirstaddBefore addAfteraddLast | 将一个ChannelHandler添加到ChannelPipeline中 |
remove | 将一个 ChannelHandler 从 ChannelPipeline 中移除 |
replace | 将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler |
ChannelHandler 的执行和阻塞
通常 ChannelPipeline 中的每一个 ChannelHandler 都是通过它的 EventLoop(I/O 线程)来处
理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的 I/O 处理产生负面的影响。
但有时可能需要与那些使用阻塞 API 的遗留代码进行交互。
对于这种情况,ChannelPipeline 有一些 接受一个 EventExecutorGroup 的 add()方法。
如果一个事件被传递给一个自定义的 EventExecutorGroup,它将被包含在这个 EventExecutorGroup 中的某个 EventExecutor 所处理,从而被从该 Channel 本身的 EventLoop 中移除。
对于这种用例,Netty 提供了一个叫 DefaultEventExecutor- Group 的默认实现。
ChannelHandlerContext接口
ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关系,每当有ChannelHandler添加到ChannelPipeline中,都会创建ChannelHandlerContext,主要功能是管理它所关联的 ChannelHandler 和在同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。
通过观察api发现,ChannelHandlerContext很多方法,Channel和ChannelPipeline都有,区别就是: Channel 或者 ChannelPipeline 上的这 些方法,它们将沿着整个 ChannelPipeline 进行传播;ChannelHandlerContext 上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler
注意:
- ChannelHandlerContext 和 ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的
- 相对于其他类的同名方法,ChannelHandlerContext的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能
使用ChannelHandlerContext
先观察图片,可以看出ChannelHandlerContext、ChannelHandler、Channel和ChannelPipeline之间的关系:
我们看下面一段代码:
//利用Channel写入缓冲区
ChannelHandlerContext ctx = ..;
Channel channel = ctx.channel();//ctx获取关联的Channel
//channel写入缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
//利用ChannelPipeline写入缓冲区
ChannelHandlerContext ctx = ..;
ChannelPipeline pipeline = ctx.pipeline();//ctx获取关联的ChannelPipeline
//pipeline写入缓冲区
pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
代码的调用流程图如下所示:
为什么会想要从 ChannelPipeline 中的某个特定点开始传播事件呢?
为了减少将事件传经对它不感兴趣的 ChannelHandler 所带来的开销
为了避免将事件传经,那些可能会对它感兴趣的 ChannelHandler
如果想要从特定的某个ChannelHandler开始处理问题,必须获取这个handler之前的ChannelHandler,并获取其关联的ChannelHandlerContext,这个ChannelHandlerContext将调用和它所关联的ChannelHandler之后的 ChannelHandler,具体代码和调用流程如下图:
ChannelHandlerContext ctx = ..;
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
ChannelHandler和ChannelHandlerContext的高级用法
可以通过ctx的pipeline方法获取ChannelPipeline的引用,可以在运行时操作ChannelHandler,比如可以实现动态的协议切换
将ChannelHandlerContext的引用缓存起来,可以在ChannelHandler方法之外,甚至不同线程使用,具体代码可以看下面:
public class WriteHandler extends ChannelHandlerAdapter { private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) { this.ctx = ctx;//存储到 ChannelHandlerContext 的引用以供稍后使用 } public void send(String msg) { ctx.writeAndFlush(msg);// 使用之前存储的到 ChannelHandlerContext 的引用来发送消息 } }
由于ChannelHandler可以属于多个ChannelPipeline,所以它会绑定多个ChannelHandlerContext。因此,对于那些希望在多个ChannelPipeline中共享同一个个ChannelHandler的用法,需要使用**@Sharable**注解,具体如下:
@Sharable public class SharableHandler extends ChannelInboundHandlerAdapter { //这里不要加状态,否则会线程不安全 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("Channel read message: " + msg); ctx.fireChannelRead(msg); } }
为啥要共享一个ChannelHandler?
在多个ChannelPipeline中安装同一个ChannelHandler的一个常见的原因是用于收集跨越多个 Channel 的统计信息。
异常处理
异常处理是程序的重要组成部分,Netty提供了几种方式用于处理入站或者出站过程中发生的异常。
入站异常处理
入站的异常遵循以下事实:
- ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个 ChannelHandler。
- 如果异常到达了ChannelPipeline的尾端,它将会被记录为未被处理。
- 要想定义自定义的处理逻辑,你需要重写 exceptionCaught()方法。然后你需要决定是否需要将该异常传播出去。
具体处理代码如下:
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
出站异常处理
用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制:
- 每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener 将在操作完成时被通知该操作是成功了还是出错了
- 几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise 的实例。作为 ChannelFuture 的子类,ChannelPromise 也可以被分配用于异步通知的监听器。
代码如下:
ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
});
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter { @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
});
}
}
}
EventLoop和线程模型
本节我们探讨下Netty的线程模型,简化应用程序代码,同时最大限度提高性能。
EventLoop
运行任务来处理在连接的生命周期内发生的事件是任何网络框架的基本功能,与之相应的编程上的构造通常被称为事件循环(Netty 使用了 interface io.netty.channel. EventLoop 来适配的术语)。
可以看下下面的伪代码,理解EventLoop的思想:
while (!terminated) {
List<Runnable> readyEvents = blockUntilEventsReady();
for (Runnable ev: readyEvents) {//循环遍历,处理所有的事件
ev.run();
}
}
在这个模型中,一个EventLoop将由一个永远都不会改变的Thread绑定,同时任务 (Runnable 或者 Callable)可以直接提交给EventLoop实现,以立即执行或者调度执行。
因此,我们可以把EventLoop看作一个包含了Channel的线程。
任务调度
我们需要调度任务延迟执行或者周期性执行,例如:心跳检测就需要周期性执行。
下面代码展示了EventLoop如何调度任务:
延迟调度
Channel ch = ... ScheduledFuture<?> future = ch.eventLoop().schedule(new Runnable() { @Override public void run() { System.out.println("60 seconds later"); } }, 60, TimeUnit.SECONDS);
周期性调度
Channel ch = ... ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("Run every 60 seconds"); } }, 60, 60, TimeUnit.Seconds);
取消调度
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(...); // Some other code that runs... boolean mayInterruptIfRunning = false; future.cancel(mayInterruptIfRunning);
实现细节
线程管理
就看当前调度线程和EventLoop绑定的线程是不是一样的,如果一致会立马执行;否则,放入EventLoop的任务队列(每个EventLoop都有自己的任务队列)等待下一次调度执行。
注意:
不要把一个长时间的任务放到任务队列中,会阻塞当前线程;如果必须要进行阻塞调用或者执行长时间的任务,那么请将其放到专门的任务线程池中执行。
线程分配
分为两种情况,NIO和BIO,具体如图: