Netty学习笔记

学习时间:2021年9月12日至

NIO基础

1 三大组件简介

1.1 Channel与Buffer

Java NIO系统的核心在于:通道(Channel)和缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理

简而言之,通道负责传输,缓冲区负责存储

常见的Channel有以下四种,其中FileChannel主要用于文件传输,其余三种用于网络通信

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

Buffer有以下几种,其中使用较多的是ByteBuffer

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

1.2 Selector选择器

在使用Selector之前,处理socket连接还有以下两种方法:

  1. 使用多线程技术

为每个连接分别开辟一个线程,分别去处理对应的socket连接。

image-20210914165650417

这种方法存在以下几个问题:

  • 内存占用高
    • 每个线程都需要占用一定的内存,当连接较多时,会开辟大量线程,导致占用大量内存
  • 线程上下文切换成本高
  • 只适合连接数少的场景
    • 连接数过多,会导致创建很多线程,从而出现问题
  1. 使用线程池技术

使用线程池,让线程池中的线程去处理连接。

image-20210914165752157

这种方法存在以下几个问题:

  • 阻塞模式下,线程仅能处理一个连接
    • 线程池中的线程获取任务(task)后,只有当其执行完任务之后(断开连接后),才会去获取并执行下一个任务
    • 若socket连接一直未断开,则其对应的线程无法处理其他socket连接
  • 仅适合短连接场景
    • 短连接即建立连接发送请求并响应后就立即断开,使得线程池中的线程可以快速处理其他连接
  1. 使用选择器

selector 的作用就是配合一个线程来管理多个 channel(fileChannel因为是阻塞式的,所以无法使用selector),获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,当一个channel中没有执行任务时,可以去执行其他channel中的任务。适合连接数多,但流量较少的场景

image-20210914170325513

若事件未就绪,调用 selector 的 select() 方法会阻塞线程,直到 channel 发生了就绪事件。这些事件就绪后,select 方法就会返回这些事件交给 thread 来处理。

1.3 ByteBuffer

1.3.1 使用方式
  • 向 buffer 写入数据,例如调用 channel.read(buffer) (先从管道读,再向缓冲写)
  • 调用 flip() 切换至读模式
    • flip会使得buffer中的limit变为position,position变为0
  • 从 buffer 读取数据,例如调用 buffer.get()
  • 调用 clear() 或者compact()切换至写模式
    • 调用clear()方法时position=0,limit变为capacity
    • 调用compact()方法时,会将缓冲区中的未读数据压缩到缓冲区前面
  • 重复以上步骤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
public class ByteBufferTest {
public static void main(String[] args) {
// FileChannel
// 1.输入输出流 2.RandomAccessFile
try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
// 准备缓冲区
ByteBuffer buffer = ByteBuffer.allocate(10);
while (true){
// 从channel读取数据,向buffer写入
int len = channel.read(buffer);
log.debug("读到的字节数 {}", len);
if (len == -1){ // buffer没有内容了
break;
}
// 打印buffer的内容
buffer.flip(); // 切换至读模式
while(buffer.hasRemaining()){
byte b = buffer.get();
log.debug("读到的字节 {}", (char) b);
}
// 切换为写模式
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

image-20210914184235321

1.3.2 核心属性

字节缓冲区的父类Buffer中有几个核心属性,如下:

1
2
3
4
5
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
  • capacity:缓冲区的容量。通过构造函数赋予,一旦设置,无法更改
  • limit:缓冲区的界限。位于limit 后的数据不可读写。缓冲区的限制不能为负,并且不能大于其容量
  • position下一个读写位置的索引(类似PC)。缓冲区的位置不能为负,并且不能大于limit
  • mark:记录当前position的值。position被改变后,可以通过调用reset() 方法恢复到mark的位置。

以上四个属性必须满足以下要求:mark <= position <= limit <= capacity

1.3.3 核心方法

put()方法

  • put()方法可以将一个数据放入到缓冲区中。
  • 进行该操作后,postition的值会+1,指向下一个可以放入的位置。capacity = limit ,为缓冲区容量的值。

image-20210916161650596

flip()方法

  • flip()方法会切换对缓冲区的操作模式,由写->读 / 读->写
  • 进行该操作后
    • 如果是写模式->读模式,position = 0 , limit 指向最后一个元素的下一个位置,capacity不变
    • 如果是读->写,则恢复为put()方法中的值

image-20210916161736109

get()方法

  • get()方法会读取缓冲区中的一个值
  • 进行该操作后,position会+1,如果超过了limit则会抛出异常。
  • 注意:get(i)方法不会改变position的值

image-20210916172702596

rewind()方法

  • 该方法只能在读模式下使用
  • rewind()方法后,会恢复position、limit和capacity的值,变为进行get()前的值,即可重新读取

image-20210916161914944

clear()方法

  • clear()方法会将缓冲区中的各个属性恢复为最初的状态,position = 0, capacity = limit
  • 此时缓冲区的数据依然存在,处于“被遗忘”状态,下次进行写操作时会覆盖这些数据

image-20210916161943423

mark和reset方法

  • mark()方法会将postion的值保存到mark属性中
  • reset()方法会将position的值改为mark中保存的值
  • 二者配合可以使指针移动到想从新开始读取的位置

compact()方法

此方法为ByteBuffer的方法,而不是Buffer的方法

  • compact会把未读完的数据向前压缩,然后切换到写模式
  • 数据前移后,原位置的值并未清零,写时会覆盖之前的值

image-20210916162048916

clear() VS compact()

clear只是对position、limit、mark进行重置,而compact在对position进行设置,以及limit、mark进行重置的同时,还涉及到数据在内存中拷贝(会调用arraycopy)。所以compact比clear更耗性能。但compact能保存你未读取的数据,将新数据追加到未读取的数据之后;而clear则不行,若你调用了clear,则未读取的数据就无法再读取到了

所以需要根据情况来判断使用哪种方法进行模式切换

1.3.4 方法调用及演示

工具类

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import java.nio.ByteBuffer;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.MathUtil.*;

public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];

static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}

int i;

// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}

// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(StringUtil.NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}

// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}

// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}

// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}

/**
* 打印所有内容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}

/**
* 打印可读取内容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}

private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");

final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;

// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;

// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");

// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}

// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");

// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}

dump.append(StringUtil.NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}

private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(StringUtil.NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}

public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}

buffer读和写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestBufferReadWrite {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put((byte) 0x61); // 字符a
debugAll(buffer);

buffer.put(new byte[]{0x62, 0x63, 0x64}); // b, c, d
debugAll(buffer);

buffer.flip();
System.out.println(buffer.get()); // 十进制的97
debugAll(buffer);

buffer.compact();
debugAll(buffer);

buffer.put(new byte[]{0x65, 0x6f});
debugAll(buffer);
}
}

image-20210916164955654

image-20210916165007490

分配空间

1
2
3
4
5
6
7
8
9
10
public class TestByteBufferAllocate {
public static void main(String[] args) {
System.out.println(ByteBuffer.allocate(16).getClass()); // class java.nio.HeapByteBuffer
System.out.println(ByteBuffer.allocateDirect(16).getClass()); // class java.nio.DirectByteBuffer
/**
* class java.nio.HeapByteBuffer - java 堆内存 读写效率低 受到GC影响
* class java.nio.DirectByteBuffer -直接内存 读写效率高(少一次拷贝) 不会受GC影响 分配的效率低
*/
}
}

读取操作

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestByteBufferRead {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[]{'a', 'b', 'c', 'd'});
buffer.flip();

// rewind 从头开始读
buffer.get(new byte[4]);
debugAll(buffer);
buffer.rewind();
System.out.println((char) buffer.get());// a
}
}

image-20210916170550463

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestByteBufferRead {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[]{'a', 'b', 'c', 'd'});
buffer.flip();

// mark 做一个标记,记录position的位置。reset 将position的位置重置到mark的位置
// 二者配合可以做到从任何一个位置开始读取buffer
System.out.println((char) buffer.get());// a
System.out.println((char) buffer.get());// b
buffer.mark(); // 加标记,索引2的位置
System.out.println((char) buffer.get());// c
System.out.println((char) buffer.get());// d
buffer.reset(); // 将position重置到mark的位置 2
System.out.println((char) buffer.get());// c
System.out.println((char) buffer.get());// d
}
}

image-20210916170645565

字符串与ByteBuffer的相互转换

  1. 方法一

编码:字符串调用getByte方法获得byte数组,将byte数组放入ByteBuffer中

解码先调用ByteBuffer的flip方法,然后通过StandardCharsets的decoder方法解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Translate {
public static void main(String[] args) {
// 准备两个字符串
String str1 = "hello";
String str2 = "";


ByteBuffer buffer1 = ByteBuffer.allocate(16);
// 通过字符串的getByte方法获得字节数组,放入缓冲区中
buffer1.put(str1.getBytes());
ByteBufferUtil.debugAll(buffer1);

// 将缓冲区中的数据转化为字符串
// 切换模式
buffer1.flip();

// 通过StandardCharsets解码,获得CharBuffer,再通过toString获得字符串
str2 = StandardCharsets.UTF_8.decode(buffer1).toString();
System.out.println(str2);
ByteBufferUtil.debugAll(buffer1);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [16]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 00 00 00 00 00 00 00 00 00 00 00 |hello...........|
+--------+-------------------------------------------------+----------------+
hello
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 00 00 00 00 00 00 00 00 00 00 00 |hello...........|
+--------+-------------------------------------------------+----------------+
  1. 方法二

编码:通过StandardCharsets的encode方法获得ByteBuffer,此时获得的ByteBuffer为读模式,无需通过flip切换模式

解码:通过StandardCharsets的decoder方法解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Translate {
public static void main(String[] args) {
// 准备两个字符串
String str1 = "hello";
String str2 = "";

// 通过StandardCharsets的encode方法获得ByteBuffer
// 此时获得的ByteBuffer为读模式,无需通过flip切换模式
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode(str1);
ByteBufferUtil.debugAll(buffer1);

// 将缓冲区中的数据转化为字符串
// 通过StandardCharsets解码,获得CharBuffer,再通过toString获得字符串
str2 = StandardCharsets.UTF_8.decode(buffer1).toString();
System.out.println(str2);
ByteBufferUtil.debugAll(buffer1);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
hello
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
  1. 方法三

编码:字符串调用getByte()方法获得字节数组,将字节数组传给ByteBuffer的wrap()方法,通过该方法获得ByteBuffer。同样无需调用flip方法切换为读模式

解码:通过StandardCharsets的decoder方法解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Translate {
public static void main(String[] args) {
// 准备两个字符串
String str1 = "hello";
String str2 = "";

// 通过StandardCharsets的encode方法获得ByteBuffer
// 此时获得的ByteBuffer为读模式,无需通过flip切换模式
ByteBuffer buffer1 = ByteBuffer.wrap(str1.getBytes());
ByteBufferUtil.debugAll(buffer1);

// 将缓冲区中的数据转化为字符串
// 通过StandardCharsets解码,获得CharBuffer,再通过toString获得字符串
str2 = StandardCharsets.UTF_8.decode(buffer1).toString();
System.out.println(str2);
ByteBufferUtil.debugAll(buffer1);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
hello
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f |hello |
+--------+-------------------------------------------------+----------------+
1.3.5 分散读集中写

分散读

分散读取下列文件中的字符:

1
onetwothree
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestByteBufferString {
public static void main(String[] args) {
try (FileChannel channel = new RandomAccessFile("word.txt", "r").getChannel()) {
ByteBuffer b1 = ByteBuffer.allocate(3); // one
ByteBuffer b2 = ByteBuffer.allocate(3); // two
ByteBuffer b3 = ByteBuffer.allocate(5); // three
channel.read(new ByteBuffer[]{b1, b2, b3});
b1.flip();
b2.flip();
b3.flip();
debugAll(b1);
debugAll(b2);
debugAll(b3);
} catch (IOException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [3]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6f 6e 65 |one |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [3]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 77 6f |two |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 74 68 72 65 65 |three |
+--------+-------------------------------------------------+----------------+

集中写

1
2
3
4
5
6
7
8
9
10
11
public class TestGatheringWrites {
public static void main(String[] args) {
ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");
ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");
try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {
channel.write(new ByteBuffer[]{b1, b2, b3});
} catch (IOException e) {
}
}
}

image-20210919110310423

1.3.6 粘包和半包

现象

网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔,但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为:

  • Hello,world\n
  • I’m Nyima\n
  • How are you?\n

变成了下面的两个 byteBuffer (粘包,半包)

  • Hello,world\nI’m Nyima\nHo
  • w are you?\n

出现原因

粘包

发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到一定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送出去

半包

接收方的缓冲区的大小是有限的,当接收方的缓冲区满了以后,就需要将信息截断,等缓冲区空了以后再继续放入数据。这就会发生一段完整的数据最后被截断的现象

解决办法

  • 通过get(index)方法遍历ByteBuffer,遇到分隔符时进行处理。

    注意:get(index)不会改变position的值

    • 记录该段数据长度,以便于申请对应大小的缓冲区
    • 将缓冲区的数据通过get()方法写入到target中
  • 调用compact方法切换模式,因为缓冲区中可能还有未读的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ByteBufferDemo {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(32);
// 模拟粘包+半包
buffer.put("Hello,world\nI'm Nyima\nHo".getBytes());
// 调用split函数处理
split(buffer);
buffer.put("w are you?\n".getBytes());
split(buffer);
}

private static void split(ByteBuffer buffer) {
// 切换为读模式
buffer.flip();
for(int i = 0; i < buffer.limit(); i++) {

// 遍历寻找分隔符
// get(i)不会移动position
if (buffer.get(i) == '\n') {
// 缓冲区长度
int length = i+1-buffer.position();
ByteBuffer target = ByteBuffer.allocate(length);
// 将前面的内容写入target缓冲区
for(int j = 0; j < length; j++) {
// 将buffer中的数据写入target中
target.put(buffer.get());
}
// 打印查看结果
ByteBufferUtil.debugAll(target);
}
}
// 切换为写模式,但是缓冲区可能未读完,这里需要使用compact
buffer.compact();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
+--------+-------------------- all ------------------------+----------------+
position: [12], limit: [12]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 2c 77 6f 72 6c 64 0a |Hello,world. |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [10], limit: [10]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 49 27 6d 20 4e 79 69 6d 61 0a |I'm Nyima. |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [13], limit: [13]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 6f 77 20 61 72 65 20 79 6f 75 3f 0a |How are you?. |
+--------+-------------------------------------------------+----------------+

2 文件编程

2.1 FileChannel

3 网络编程

3.1 阻塞

  • 阻塞模式下,相关方法都会导致线程暂停
    • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
    • SocketChannel.read 会在通道中没有数据可读时让线程暂停
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
  • 但多线程下,有新的问题,体现在以下方面
    • 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 使用nio来理解阻塞模式,单线程
// 0.ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1.创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();

// 2.绑定监听端口
ssc.bind(new InetSocketAddress(8080));

// 3.连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true){
// 4.accept 建立与客户端连接,SocketChannel 用来与客户端之间通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行,等待新的连接
log.debug("connected... {}", sc);
channels.add(sc);
for(SocketChannel channel : channels){
// 5.接收客户端的数据
log.debug("before read... {}", sc);
channel.read(buffer); // 阻塞方法,线程停止运行,等待发来数据
buffer.flip();
debugAll(buffer);
buffer.clear();
log.debug("after read... {}", sc);
}
}
}
}

客户端

1
2
3
4
5
6
7
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting..."); // 设置断点
}
}
  • 客户端-服务器建立连接前:服务器端因accept阻塞

image-20210919114150999

  • 客户端-服务器建立连接后,客户端发送消息前:服务器端因通道为空被阻塞

image-20210919114227546

  • 客户端发送数据后,服务器处理通道中的数据。再次进入循环时,再次被accept阻塞

image-20210919114329206

image-20210919114412641

image-20210919114445618

  • 之前的客户端再次发送消息,服务器端因为被accept阻塞,无法处理之前客户端发送到通道中的信息

3.2 非阻塞

  • 可以通过ServerSocketChannel的configureBlocking(false)方法将获得连接设置为非阻塞的。此时若没有连接,accept会返回null
  • 可以通过SocketChannel的configureBlocking(false)方法将从通道中读取数据设置为非阻塞的。若此时通道中没有数据可读,read会返回-1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 使用nio来理解阻塞模式,单线程
// 0.ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1.创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2.绑定监听端口
ssc.bind(new InetSocketAddress(8080));

// 3.连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true){
// 4.accept 建立与客户端连接,SocketChannel 用来与客户端之间通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 变为非阻塞,线程还会继续运行,如果没有建立连接,但sc返回的是null
if (sc != null) {
log.debug("connected... {}", sc);
sc.configureBlocking(false); // 设为非阻塞模式
channels.add(sc);
}
for(SocketChannel channel : channels){
// 5.接收客户端的数据
log.debug("before read... {}", sc);
int read = channel.read(buffer);// 变为非阻塞,如果没有读到数据,read返回0
if (read > 0) {
buffer.flip();
debugAll(buffer);
buffer.clear();
log.debug("after read... {}", sc);
}
}
}
}
}

这样写存在一个问题,因为设置为了非阻塞,会一直执行while(true)中的代码,CPU一直处于忙碌状态,会使得性能变低,所以实际情况中不使用这种方法处理请求。线程空转。

3.3 Selector

多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络 IO,普通文件 IO 无法利用多路复用
  • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
    • accept:有可连接事件时才去连接
    • read:有可读事件才去读取
    • write:有可写事件才去写入
      • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件
3.3.1 使用和Accept事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 1.创建selector,管理多个channel
Selector selector = Selector.open();

ByteBuffer buffer = ByteBuffer.allocate(16);
// 获得服务器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
// 通道必须设置为非阻塞模式
ssc.configureBlocking(false);
// 2.建立selector和channel的联系(注册),将channel注册到selector中
// SelectionKey就是将来事件发生后,通过它可以知道事件是哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true){
// 3.select 方法,没有事件则阻塞,有事件线程才恢复运行,返回值为就绪事件的个数
// select 在事件未处理时,它不会阻塞;事件发生后,要么处理,要么取消,不能置之不理
int ready = selector.select();
// 4.获取所有的就绪事件,并处理事件,selectedKeys内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()){
// 获取事件对应的key
SelectionKey key = iter.next();
// 获取key对应的channel
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
// 并处理,而且是必须处理,否则需要取消
channel.accept();
// 如果不处理事件,则:
// key.cancel();
}
}
}
}

步骤解析

  • 获得选择器Selector
1
Selector selector = Selector.open();
  • 将通道设置为非阻塞模式,并注册到选择器中,并设置感兴趣的事件
    • channel 必须工作在非阻塞模式
    • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
    • 绑定的事件类型可以有:
      • connect - 客户端连接成功时触发
      • accept - 服务器端成功接受连接时触发
      • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
      • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
1
2
3
4
// 通道必须设置为非阻塞模式
server.configureBlocking(false);
// 将通道注册到选择器中,并设置感兴趣的实践
server.register(selector, SelectionKey.OP_ACCEPT);
  • 通过Selector监听事件,并获得就绪的通道个数,若没有通道就绪,线程会被阻塞

    • 阻塞直到绑定事件发生

      1
      int count = selector.select(); // 没有事件发生则阻塞
    • 阻塞直到绑定事件发生,或是超时(时间单位为 ms)

      1
      int count = selector.select(long timeout);
    • 不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

      1
      int count = selector.selectNow();
  • 获取就绪事件并得到对应的通道,然后进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 获取所有事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();

// 使用迭代器遍历事件
Iterator<SelectionKey> iterator = selectionKeys.iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();

// 判断key的类型,此处为Accept类型
if(key.isAcceptable()) {
// 获得key对应的channel
ServerSocketChannel channel = (ServerSocketChannel) key.channel();

// 获取连接并处理,而且是必须处理,否则需要取消
SocketChannel socketChannel = channel.accept();

// 处理完毕后移除
iterator.remove();
}
}

事件发生后能否不处理?

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

3.3.2 Read事件
  • 在Accept事件中,若有客户端与服务器端建立了连接,需要将其对应的SocketChannel设置为非阻塞,并注册到选择其中
  • 添加Read事件,触发后进行读取操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 1.创建selector,管理多个channel
Selector selector = Selector.open();

ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2.建立selector和channel的联系(注册),将channel注册到selector中
// SelectionKey就是将来事件发生后,通过它可以知道事件是哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("sscKey: {}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true){
// 3.select 方法,没有事件则阻塞,有事件线程才恢复运行
// select 在事件未处理时,它不会阻塞;事件发生后,要么处理,要么取消,不能置之不理
selector.select();
// 4.处理事件,selectedKeys内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()){
// 获取事件对应的key
SelectionKey key = iter.next();
log.debug("key: {}", key);
// 通过key来区分一下事件类型
if (key.isAcceptable()){// 如果是accept事件
// 获取key对应的channel
// 注意这里是服务器通道
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
// 建立连接,并返回通道
SocketChannel sc = channel.accept();
// 设置为非阻塞模式,同时将连接的通道也注册到选择其中
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
// 处理key时,要从selectedKeys集合中删除,否则下次处理就会有问题
iter.remove();
}else if(key.isReadable()){// 如果是read事件
// 获取key对应的channel
// 注意这里是客户端通道
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer);
buffer.flip();
debugAll(buffer);
buffer.clear();
// 处理key时,要从selectedKeys集合中删除,否则下次处理就会有问题
iter.remove();
}
}
}
}
}

执行结果:

image-20210920120157949

删除事件

当处理完一个事件后,一定要调用迭代器的remove方法移除对应事件,否则会出现错误。原因如下:

以我们上面的 Read事件 的代码为例

  • 当调用了 server.register(selector, SelectionKey.OP_ACCEPT)后,Selector中维护了一个集合,用于存放SelectionKey以及其对应的通道
1
2
// WindowsSelectorImpl 中的 SelectionKeyImpl数组
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[8];
1
2
3
4
5
public class SelectionKeyImpl extends AbstractSelectionKey {
// Key对应的通道
final SelChImpl channel;
...
}

image-20210920114732909

  • 选择器中的通道对应的事件发生后,selectionKey会被放到另一个集合中,但是selectionKey不会自动移除,所以需要我们在处理完一个事件后,通过迭代器手动移除其中的selectionKey。否则会导致已被处理过的事件再次被处理,就会引发错误

image-20210920114813070

与客服端断开连接

当客户端与服务器之间的连接断开时,会给服务器端发送一个读事件,对异常断开和正常断开需要加以不同的方式进行处理

  • 正常断开
    • 正常断开时,服务器端的channel.read(buffer)方法的返回值为-1,所以当结束到返回值为-1时,需要调用key的cancel方法取消此事件,并在取消后移除该事件
1
2
3
4
5
6
7
8
9
10
11
int read = channel.read(buffer);
// 正常断开连接时,客户端会向服务器发送一个写事件,此时read的返回值为-1
if(read == -1) {
// 取消该事件的处理
key.cancel();
channel.close();
} else {
...
}
// 取消或者处理,都需要移除key
iterator.remove();
  • 异常断开
    • 异常断开时,会抛出IOException异常, 在try-catch的catch块中捕获异常并调用key的cancel方法即可
1
2
3
4
5
6
try{
...
} catch (IOException e){
e.printStackTrace();
key.cancel();
}

消息边界

  1. 不处理消息边界存在的问题

将缓冲区的大小设置为4个字节,发送2个汉字(你好),通过decode解码并打印时,会出现乱码:

1
2
3
ByteBuffer buffer = ByteBuffer.allocate(4);
// 解码并打印
System.out.println(StandardCharsets.UTF_8.decode(buffer));
1
2
你�
��

这是因为UTF-8字符集下,1个汉字占用3个字节,此时缓冲区大小为4个字节,一次读时间无法处理完通道中的所有数据,所以一共会触发两次读事件。这就导致 你好 字被拆分为了前半部分和后半部分发送,解码时就会出现问题。

  1. 处理消息边界
  • 文本大于缓冲区大小
    • 此时需要将缓冲区进行扩容
  • 发生半包现象
  • 发生粘包现象

image-20210922162428055

解决思路大致有以下三种

  • 固定消息长度,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致。缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低,需要一个一个字符地去匹配分隔符
  • TLV 格式,即 Type 类型、Length 长度、Value 数据(也就是在消息开头用一些空间存放后面数据的长度),如HTTP请求头中的Content-Type与Content-Length。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式

image-20210922163533646

  1. 按分隔符拆分的方法实现:

如果服务器的bytebuffer容量为16,接收的大小为20,则容量不够,则可以利用附件与扩容机制

1
2
3
4
5
6
7
8
9
10
11
sequenceDiagram
participant c1 as 客户端1
participant s as 服务器
participant b1 as ByteBuffer1
participant b2 as ByteBuffer2
c1 ->> s:发送 01234567890abcdef3333\r
s ->> b1:第一次read存入01234567890abcdef
s ->> b2:扩容
b1 ->> b2:拷贝
s ->> b2:第二次read存入3333\r
b2 ->> b2:01234567890abcdef3333\r

Channel的register方法还有第三个参数附件,可以向其中放入一个Object类型的对象,该对象会与登记的Channel以及其对应的SelectionKey绑定,可以从SelectionKey获取到对应通道的附件:

1
public final SelectionKey register(Selector sel, int ops, Object att)

可通过SelectionKey的attachment()方法获得附件:

1
ByteBuffer buffer = (ByteBuffer) key.attachment();

我们需要在Accept事件发生后,将通道注册到Selector中时,对每个通道添加一个ByteBuffer附件,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题:

1
2
3
4
5
// 设置为非阻塞模式,同时将连接的通道也注册到选择其中,同时设置附件
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
// 添加通道对应的Buffer附件
socketChannel.register(selector, SelectionKey.OP_READ, buffer);

当Channel中的数据大于缓冲区时,需要对缓冲区进行扩容操作。此代码中的扩容的判定方法:Channel调用compact方法后的position与limit相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用SelectionKey的attach方法将新的缓冲区作为新的附件放入SelectionKey中

1
2
3
4
5
6
7
8
// 如果缓冲区太小,就进行扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
// 将旧buffer中的内容放入新的buffer中
ewBuffer.put(buffer);
// 将新buffer作为附件放到key中
key.attach(newBuffer);
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");
// sc.write(Charset.defaultCharset().encode("hi"))
sc.write(Charset.defaultCharset().encode("1234567890abcdef3333\n"));
System.in.read();
}
}

服务器端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("sscKey: {}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
log.debug("key: {}", key);
if (key.isAcceptable()){// 如果是accept事件
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16); // attachment附件
// 将ByteBuffer作为附件关联到SelectionKey上
sc.register(selector, SelectionKey.OP_READ, buffer);
iter.remove();
}else if(key.isReadable()){// 如果是read事件
try{
SocketChannel channel = (SocketChannel) key.channel();
// 通过key获取附件
ByteBuffer buffer = (ByteBuffer)key.attachment();
int read = channel.read(buffer);
if (read == -1) { // 正常断开连接
key.cancel();
channel.close();
} else{
split(buffer);
if (buffer.position() == buffer.limit()){
// 扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer);
// 将新buffer放到key中作为附件
key.attach(newBuffer);
}
}

} catch (IOException e){
e.printStackTrace();
key.cancel();
}
iter.remove();
}
}
}
}

private static void split(ByteBuffer buffer) {
buffer.flip();
for(int i = 0; i < buffer.limit(); i++) {
// 遍历寻找分隔符
// get(i)不会移动position
if (buffer.get(i) == '\n') {
// 缓冲区长度
int length = i+1-buffer.position();
ByteBuffer target = ByteBuffer.allocate(length);
// 将前面的内容写入target缓冲区
for(int j = 0; j < length; j++) {
// 将buffer中的数据写入target中
target.put(buffer.get());
}
// 打印结果
ByteBufferUtil.debugAll(target);
}
}
// 切换为写模式,但是缓冲区可能未读完,这里需要使用compact
buffer.compact();
}
}

执行结果:

1
2
3
4
5
6
7
8
+--------+-------------------- all ------------------------+----------------+
position: [21], limit: [21]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 32 33 34 35 36 37 38 39 30 61 62 63 64 65 66 |1234567890abcdef|
|00000010| 33 33 33 33 0a |3333. |
+--------+-------------------------------------------------+----------------+

ByteBuffer的大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
  • 分配思路可以参考
    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
3.3.3 Write事件

原始模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
StringBuilder sb = new StringBuilder();
// 1.向客户端写入大量数据
for (int i = 0; i < 1000000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 下面的while循环可能导致线程空转
while (buffer.hasRemaining()) {
// 2.返回值代表实际写入的字节数
int write = sc.write(buffer);
System.out.println(write);
}
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
// 3.接收数据
int count = 0;
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println(count);
buffer.clear();
}
}
}

执行结果:

image-20210922202808552

image-20210922202820227

缺点:服务器端不能一次写入,而是多次写入,而且由于网络原因,有多次写入的字节为零,一直卡在这个通道上,浪费了线程资源。

改进模型

服务器通过Buffer向通道中写入数据时,可能因为通道容量小于Buffer中的数据大小,导致无法一次性将Buffer中的数据全部写入到Channel中,这时便需要分多次写入,具体步骤如下

  • 执行一次写操作,向将buffer中的内容写入到SocketChannel中,然后判断Buffer中是否还有数据
  • 若Buffer中还有数据,则需要将SockerChannel注册到Seletor中,并关注写事件,同时将未写完的Buffer作为附件一起放入到SelectionKey中
1
2
3
4
5
6
7
int write = socket.write(buffer);
// 通道中可能无法放入缓冲区中的所有数据
if (buffer.hasRemaining()) {
// 注册到Selector中,关注可写事件,并将buffer添加到key的附件中
socket.configureBlocking(false);
socket.register(selector, SelectionKey.OP_WRITE, buffer);
}
  • 添加写事件的相关操作key.isWritable(),对Buffer再次进行写操作
    • 每次写后需要判断Buffer中是否还有数据(是否写完)。若写完,需要移除SelecionKey中的Buffer附件,避免其占用过多内存,同时还需移除对写事件的关注
1
2
3
4
5
6
7
8
9
10
11
SocketChannel socket = (SocketChannel) key.channel();
// 获得buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 执行写操作
int write = socket.write(buffer);
System.out.println(write);
// 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣
if (!buffer.hasRemaining()) {
key.attach(null);
key.interestOps(0);
}

服务器端完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 一定要记得手动删除key
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 将sc通道注册到selector上
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
StringBuilder sb = new StringBuilder();
// 1.向客户端写入大量数据
for (int i = 0; i < 1000000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 2.返回值代表实际写入的字节数
int write = sc.write(buffer);
System.out.println(write);
// 3.判断是否有剩余内容
if (buffer.hasRemaining()) {
// 4.既关注读也关注写事件
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
// 5.把未写完的数据附加到scKey上
scKey.attach(buffer);
}
}else if (key.isWritable()){
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println(write);
// 6.写完数据后,用null替换该附件,取消写入事件
if (!buffer.hasRemaining()) {
key.attach(null);
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}

}
}
}
}
}
3.3.4 多线程优化

充分利用多核CPU,分两组选择器:

  • 单线程配一个选择器(Boss),专门处理 accept 事件
  • 创建 cpu 核心数的线程(Worker),每个线程配一个选择器,轮流处理 read 事件

实现思路

  • 创建一个负责处理Accept事件的Boss线程,与多个负责处理Read事件的Worker线程

  • Boss线程执行的操作

    • 接受并处理Accepet事件,当Accept事件发生后,调用Worker的register(SocketChannel socket)方法,让Worker去处理Read事件,其中需要根据标识robin去判断将任务分配给哪个Worker
    1
    2
    3
    4
    5
    6
    // 创建固定数量的Worker
    Worker[] workers = new Worker[4];
    // 用于负载均衡的原子整数
    AtomicInteger robin = new AtomicInteger(0);
    // 负载均衡,轮询分配Worker
    workers[robin.getAndIncrement() % workers.length].register(socket);
  • register(SocketChannel socket)方法会通过同步队列完成Boss线程与Worker线程之间的通信,让SocketChannel的注册任务被Worker线程执行。添加任务后需要调用selector.wakeup()来唤醒被阻塞的Selector

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public void register(final SocketChannel socket) throws IOException {
    // 只启动一次
    if (!started) {
    // 初始化操作
    }
    // 向同步队列中添加SocketChannel的注册事件
    // 在Worker线程中执行注册事件
    queue.add(new Runnable() {
    @Override
    public void run() {
    try {
    socket.register(selector, SelectionKey.OP_READ);
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    });
    // 唤醒被阻塞的Selector
    // select类似LockSupport中的park,wakeup的原理类似LockSupport中的unpark
    selector.wakeup();
    }
  • Worker线程执行的操作

    • 从同步队列中获取注册任务,并处理Read事件

服务器端实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.register(boss, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
// 1.创建固定数量的worker 并初始化
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger index = new AtomicInteger();
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected... {}", sc.getRemoteAddress());
// 2.关联 selector
log.debug("before connected... {}", sc.getRemoteAddress());
// round-robin 轮询
workers[index.getAndIncrement() % workers.length].register(sc);
log.debug("after connected... {}", sc.getRemoteAddress());
}
}
}
}

static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; // 还未初始化

public Worker(String name) {
this.name = name;
}

// 初始化线程和selector
public void register(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this, name);
selector = Selector.open();
thread.start();
start = true;
}
selector.wakeup(); // 唤醒select方法 boss
sc.register(selector, SelectionKey.OP_READ, null); // boss
}

@Override
public void run() {
while (true) {
try {
selector.select(); // worker-0 阻塞
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read... {}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}

}
}
}
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
14:43:11.674 [boss] DEBUG com.hongyi.netty.c2.MultiThreadServer - connected... /127.0.0.1:50849
14:43:11.677 [boss] DEBUG com.hongyi.netty.c2.MultiThreadServer - before connected... /127.0.0.1:50849
14:43:11.679 [boss] DEBUG com.hongyi.netty.c2.MultiThreadServer - after connected... /127.0.0.1:50849
14:43:11.679 [worker-0] DEBUG com.hongyi.netty.c2.MultiThreadServer - read... /127.0.0.1:50849
14:43:11.686 [worker-0] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [16]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 32 33 34 35 36 37 38 39 30 61 62 63 64 65 66 |1234567890abcdef|
+--------+-------------------------------------------------+----------------+

4 NIO与BIO

4.1 Stream与Channel

  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
  • 二者均为全双工,即读写可以同时进行
    • 虽然Stream是单向流动的,但是它也是全双工的

4.2 IO模型

  • 同步:线程自己去获取结果(一个线程)
    • 例如:线程调用一个方法后,需要等待方法返回结果
  • 异步:线程自己不去获取结果,而是由其它线程返回结果(至少两个线程)
    • 例如:线程A调用一个方法后,继续向下运行,运行结果由线程B返回

当调用一次 channel.read 或 stream.read 后,会由用户态切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段
  • 复制数据阶段

image-20210923151537035

根据UNIX 网络编程 - 卷 I,IO模型主要有以下几种:

  1. 阻塞IO:

image-20210923151917985

  • 用户线程进行read操作时,需要等待操作系统执行实际的read操作,此期间用户线程是被阻塞的,无法执行其他操作
  1. 非阻塞IO:

image-20210923151958889

  • 用户线程在一个循环中一直调用read方法,若内核空间中还没有数据可读,立即返回
    • 只是在等待阶段非阻塞
  • 用户线程发现内核空间中有数据后,等待内核空间执行复制数据,待复制结束后返回结果
  1. 多路复用:

image-20210923152056210

Java中通过Selector实现多路复用

  • 当没有事件时,调用select方法会被阻塞住
  • 一旦有一个或多个事件发生后,就会处理对应的事件,从而实现多路复用

多路复用与阻塞IO的区别

  • 阻塞IO模式下,若线程因accept事件被阻塞,发生read事件后,仍需等待accept事件执行完成后,才能去处理read事件
  • 多路复用模式下,一个事件发生后,若另一个事件处于阻塞状态,不会影响该事件的执行
  1. 异步非阻塞IO:

image-20210923152504538

  • 线程1调用方法后理解返回,不会被阻塞也不需要立即获取结果
  • 当方法的运行结果出来以后,由线程2将结果返回给线程1

4.3 零拷贝

零拷贝指的是数据无需拷贝到 JVM 内存中,同时具有以下三个优点

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输
4.3.1 传统IO问题

传统的 IO 将一个文件通过 socket 写出:

1
2
3
4
5
6
7
8
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

内部工作流

image-20210923153557523

  • Java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 Java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 CPU

    DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  • 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 CPU 会参与拷贝,无法利用 DMA

  • 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,CPU 会参与拷贝

  • 接下来要向网卡写数据,这项能力 Java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次
4.3.2 NIO优化

通过 DirectByteBuf

  • ByteBuffer.allocate(10)
    • 底层对应 HeapByteBuffer,使用的还是 Java 内存
  • ByteBuffer.allocateDirect(10)
    • 底层对应DirectByteBuffer,使用的是操作系统内存

image-20210923153821096

大部分步骤与优化前相同,唯有一点:Java 可以使用 DirectByteBuffer 将堆外内存映射到 JVM 内存中来直接访问使用

  • 这块内存不受 JVM 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • Java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列
      • 当引用的对象ByteBuffer被垃圾回收以后,虚引用对象Cleaner就会被放入引用队列中,然后调用Cleaner的clean方法来释放直接内存
      • DirectByteBuffer 的释放底层调用的是 Unsafe 的 freeMemory 方法
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化1

以下两种方式都是零拷贝,即无需将数据拷贝到用户缓冲区中(JVM内存中)

底层采用了 linux 2.1 后提供的 sendFile 方法,Java 中对应着两个 channel 调用transferTo/transferFrom 方法拷贝数据

image-20210923154031115

  • Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
  • 数据从内核缓冲区传输到 socket 缓冲区,CPU 会参与拷贝
  • 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 CPU

这种方法下

  • 只发生了1次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化2

linux 2.4 对上述方法再次进行了优化

image-20210923154101150

  • Java 调用 transferTo 方法后,要从 Java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 CPU
  • 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  • 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 CPU

整个过程仅只发生了1次用户态与内核态的切换,数据拷贝了 2 次

4.4 异步IO

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO

  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

Netty框架

1 概述

1.1 什么是netty

1
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

Netty 是一个异步的、基于事件驱动网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

注意netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO

1.2 netty的优势

如果使用传统NIO,其工作量大,bug 多

  • 需要自己构建协议
  • 解决 TCP 传输问题,如粘包、半包
  • 因为bug的存在,epoll 空轮询导致 CPU 100%

Netty 对 API 进行增强,使之更易用,如

  • FastThreadLocal => ThreadLocal
  • ByteBuf => ByteBuffer

2 Hello World

2.1 目标

开发一个简单的服务器端和客户端

  • 客户端向服务器发送hello world
  • 服务器仅接收,不返回

添加依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
</dependency>

2.2 服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class HelloServer {
public static void main(String[] args) {
// 1.启动器,负责组装netty组件,启动服务器
new ServerBootstrap()
// 2.BossEventLoop WorkerEventLoop(selector, thread)
// group 组 创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector
.group(new NioEventLoopGroup())
// 3.选择服务器的 ServerSocketChannel 实现
.channel(NioServerSocketChannel.class)
// 4.child 负责处理读写,该方法决定了 child 执行哪些操作
// ChannelInitializer 处理器(仅执行一次)
// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 5.SocketChannel的处理器,使用StringDecoder解码,ByteBuf=>String
ch.pipeline().addLast(new StringDecoder());
// 6.SocketChannel的业务处理,使用上一个处理器的处理结果
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ // 自定义handler
@Override // 读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 打印上一步转换好的字符串
System.out.println(msg);
}
});
}
}).bind(8080); // 7.ServerSocketChannel绑定8080端口
}
}

2.3 客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
// 1.创建启动器类
new Bootstrap()
// 2.添加EventLoop
.group(new NioEventLoopGroup())
// 3.选择客户端channel实现
// 选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现
.channel(NioSocketChannel.class)
// 4.添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
// 消息会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出
ch.pipeline().addLast(new StringEncoder()); // 将字符串转换为buffer
}
})
// 5.连接服务器
.connect(new InetSocketAddress("localhost", 8080))
// Netty 中很多方法都是异步的,如 connect
// 这时需要使用 sync 方法等待 connect 建立连接完毕
.sync()
// 获取 channel 对象,它即为通道抽象,可以进行数据读写操作
.channel()
// 6.向服务器发送数据
// 写入消息并清空缓冲区
.writeAndFlush("Hello World!");
}
}

执行结果

image-20210923174903912

2.4 流程分析

image-20210923175056387

组件解释

  • channel 可以理解为数据的通道
  • msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型对象,最后输出又变成 ByteBuf
  • handler 可以理解为数据的处理工序
    • 工序有多道,合在一起就是 pipeline(传递途径),pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
      • pipeline 中有多个 handler,处理时会依次调用其中的 handler
    • handler 分 Inbound 和 Outbound 两类
      • Inbound 入站
      • Outbound 出站
  • eventLoop 可以理解为处理数据的工人
    • eventLoop 可以管理多个 channel 的 io 操作,并且一旦 eventLoop 负责了某个 channel,就会将其与channel进行绑定,以后该 channel 中的 io 操作都由该 eventLoop 负责
    • eventLoop 既可以执行 io 操作,也可以进行任务处理,每个 eventLoop 有自己的任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
    • eventLoop 按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每个 handler 指定不同的 eventLoop

3 组件

3.1 EventLoop

3.1.1 概念

事件循环对象 EventLoop

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件

它的继承关系如下

  • 继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 继承自 netty 自己的 OrderedEventExecutor
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup

事件循环组 EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop
3.1.2 处理普通任务和定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestEventLoop {
public static void main(String[] args) {
// 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
EventLoopGroup group = new NioEventLoopGroup(2);
// 通过next方法可以获得下一个 EventLoop
System.out.println(group.next());
System.out.println(group.next());

// 通过EventLoop执行普通任务
group.next().execute(()->{
System.out.println(Thread.currentThread().getName() + " hello");
});

// 通过EventLoop执行定时任务
// 每隔一秒输出
group.next().scheduleAtFixedRate(()->{
System.out.println(Thread.currentThread().getName() + " hello2");
}, 0, 1, TimeUnit.SECONDS);

// 优雅地关闭
group.shutdownGracefully();
}
}

输出结果:

1
2
3
4
5
6
io.netty.channel.nio.NioEventLoop@7bb11784
io.netty.channel.nio.NioEventLoop@33a10788
nioEventLoopGroup-2-1 hello
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2

关闭 EventLoopGroup

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。

3.1.3 处理IO任务

服务器端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
// 1.创建启动器类
Channel channel = new Bootstrap()
// 2.添加EventLoop
.group(new NioEventLoopGroup())
// 3.选择客户端channel实现
.channel(NioSocketChannel.class)
// 4.添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); // 将字符串转换为buffer
}
})
// 5.连接服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);
// 此处打断点调试,调用 channel.writeAndFlush(...);
System.out.println("");
}
}

执行结果

1
2
3
4
5
6
7
17:29:24.372 [nioEventLoopGroup-2-3] DEBUG com.hongyi.netty.c3.EventLoopServer - Hello World
17:29:36.079 [nioEventLoopGroup-2-3] DEBUG com.hongyi.netty.c3.EventLoopServer - Hello World
17:29:36.709 [nioEventLoopGroup-2-3] DEBUG com.hongyi.netty.c3.EventLoopServer - Hello World
17:29:37.249 [nioEventLoopGroup-2-3] DEBUG com.hongyi.netty.c3.EventLoopServer - Hello World
17:29:49.474 [nioEventLoopGroup-2-3] DEBUG com.hongyi.netty.c3.EventLoopServer - Hello World
17:29:49.865 [nioEventLoopGroup-2-3] DEBUG com.hongyi.netty.c3.EventLoopServer - Hello World
17:29:50.388 [nioEventLoopGroup-2-3] DEBUG com.hongyi.netty.c3.EventLoopServer - Hello World
3.1.4 分工

Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件

1
2
3
4
5
6
7
8
9
public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
// 两个Group,分别为Boss 只负责ServerSocketChannel上的Accept事件,Worker 只负责socketChannel上的读写事件
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))

...
}
}

多个客户端分别发送 hello 结果

1
2
3
4
5
nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4

可以看出,一个EventLoop可以负责多个Channel,且EventLoop一旦与Channel绑定,则一直负责处理该Channel中的事件。

image-20211007173616345

增加自定义EventLoopGroup

当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
// 细分2:创建一个独立的EventLoopGroup,增加自定义的非NioEventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
// boss 和 worker
// 第一个是boss,负责accept事件 第二个是worker,有两个线程,负责io事件
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
// 将消息传递给下一个handler
ctx.fireChannelRead(msg);
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter(){ // 该handler绑定自定义的Group
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}

启动四个客户端发送数据:

1
2
3
4
5
6
7
8
nioEventLoopGroup-4-1 hello1
defaultEventLoopGroup-2-1 hello1
nioEventLoopGroup-4-2 hello2
defaultEventLoopGroup-2-2 hello2
nioEventLoopGroup-4-1 hello3
defaultEventLoopGroup-2-3 hello3
nioEventLoopGroup-4-2 hello4
defaultEventLoopGroup-2-4 hello4

可以看出,客户端与服务器之间的事件,被nioEventLoopGroup和defaultEventLoopGroup分别处理:

image-20211102160042239

切换的实现

不同的EventLoopGroup切换的实现原理如下

由上面的图可以看出,当handler中绑定的Group不同时,需要切换Group来执行不同的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获得下一个handler的EventLoop, excutor 即为 EventLoopGroup
EventExecutor executor = next.executor();

// 如果下一个EventLoop 在当前的 EventLoopGroup中
if (executor.inEventLoop()) {
// 使用当前 EventLoopGroup 中的 EventLoop 来处理任务
next.invokeChannelRead(m);
} else {
// 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
  • 如果两个 handler 绑定的是同一个EventLoopGroup,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的 EventLoopGroup 来调用

3.2 Channel

Channel 的常用方法

  • close() 可以用来关闭Channel
  • closeFuture() 用来处理 Channel 的关闭
    • sync 方法作用是同步等待 Channel 关闭
    • 而 addListener 方法是异步等待 Channel 关闭
  • pipeline() 方法用于添加处理器
  • write() 方法将数据写入
    • 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
    • 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
  • writeAndFlush() 方法将数据写入并立即发送(刷出)
3.2.1 ChannelFuture

连接问题

拆分客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 该方法为异步非阻塞方法,主线程mian调用后不会被阻塞,真正去执行连接操作的是NIO线程
// NIO线程:NioEventLoop 中的线程
.connect(new InetSocketAddress("localhost", 8080)); // 连接建立需要时间

// 该方法用于等待连接真正起建立
channelFuture.sync();

// 获取客户端-服务器之间的Channel对象
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
System.in.read();
}
}

如果我们去掉channelFuture.sync()方法,会服务器无法收到hello world

这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端。

所以需要通过channelFuture.sync()方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程

下面还有一种方法,用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO 线程(去执行connect操作的线程)。

addListener方法

通过这种方法可以在NIO线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
// 2.带有Future,Promise的类型都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 1.连接到服务器
.connect(new InetSocketAddress("localhost", 8080));
// 2.1 方法一:使用sync方法同步处理结果
// channelFuture.sync(); // 阻塞住当前线程(主线程main),直到nio线程连接建立完毕
// Channel channel = channelFuture.channel();
// // 2.向服务器发送数据
// channel.writeAndFlush("Hello World");

// 2.2 方法二:使用 addListener(回调对象) 方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
// 在nio线程建立完连接后,nio会调用此方法
// 即执行下面代码的线程不是主线程,而是nio线程
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
channel.writeAndFlush("Hello World");
}
});
}
}

处理关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ReadClient {
public static void main(String[] args) throws InterruptedException {
// 创建EventLoopGroup,使用完毕后关闭
NioEventLoopGroup group = new NioEventLoopGroup();

ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync();

Channel channel = channelFuture.channel();
Scanner scanner = new Scanner(System.in);

// 创建一个线程用于输入并向服务器发送
new Thread(()->{
while (true) {
String msg = scanner.next();
if ("q".equals(msg)) {
// 关闭操作是异步的,在NIO线程中执行
channel.close();
break;
}
channel.writeAndFlush(msg);
}
}, "inputThread").start();

// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("waiting close...");

// 同步等待NIO线程执行完close操作
closeFuture.sync();

// 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的
System.out.println("关闭之后执行一些额外操作...");

// 关闭EventLoopGroup
group.shutdownGracefully();
}
}

关闭channel

当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作

如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现

  • 通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作
1
2
3
4
5
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();

// 同步等待NIO线程执行完close操作
closeFuture.sync();
  • 调用closeFuture.addListener方法,添加close的后续操作
1
2
3
4
5
6
7
8
9
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 等待channel关闭后才执行的操作
System.out.println("关闭之后执行一些额外操作...");
// 关闭EventLoopGroup
group.shutdownGracefully();
}
});

3.3 Future & Promise

3.3.1 概念

netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口

netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果
3.3.2 JDK Future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 2.提交任务
Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
// 模拟计算了1s
Thread.sleep(1000);
// 模拟返回结果50
return 50;
}
});
// 3.主线程通过future获取结果
log.debug("等待结果");

log.debug("结果是 {}", future.get());
}
}

执行结果:

1
2
3
16:36:26.169 [pool-1-thread-1] DEBUG com.hongyi.netty.c4.TestJdkFuture - 执行计算
16:36:26.169 [main] DEBUG com.hongyi.netty.c4.TestJdkFuture - 等待结果
16:36:27.171 [main] DEBUG com.hongyi.netty.c4.TestJdkFuture - 结果是 50

可以看出,[pool-1-thread-1]线程(线程池中的两个线程之一)执行了任务,并将结果返回给主线程main,主线程main在get方法阻塞等待该结果的返回并打印。

3.3.3 Netty Future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
// 获得 EventLoop 对象
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
// 模拟计算了1s
Thread.sleep(1000);
// 模拟返回结果50
return 50;
}
});

// 主线程中获取结果 同步方式
log.debug("等待结果");
log.debug("结果是 {}", future.get());

// NIO线程中异步获取结果
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("等待结果");
log.debug("结果是 {}", future.getNow());
}
});
}
}

执行结果:

1
2
3
4
5
16:50:28.326 [main] DEBUG com.hongyi.netty.c4.TestNettyFuture - 等待结果
16:50:28.327 [nioEventLoopGroup-2-1] DEBUG com.hongyi.netty.c4.TestNettyFuture - 执行计算
16:50:29.328 [main] DEBUG com.hongyi.netty.c4.TestNettyFuture - 结果是 50
16:50:29.329 [nioEventLoopGroup-2-1] DEBUG com.hongyi.netty.c4.TestNettyFuture - 等待结果
16:50:29.329 [nioEventLoopGroup-2-1] DEBUG com.hongyi.netty.c4.TestNettyFuture - 结果是 50

可以看出,第二种异步方法中(future.addListener),是nio线程去接收结果,而不是主线程。

Netty中的Future对象,可以通过EventLoop的sumbit()方法得到

  • 可以通过Future对象的get方法,阻塞地获取返回结果
  • 也可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
  • 还可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果
3.3.4 Netty Promise

Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.准备EventLoop对象
EventLoop eventLoop = new NioEventLoopGroup().next();

// 2.可以主动创建promise对象
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

new Thread(() -> {
// 任意一个线程执行计算,计算完毕后向promise填充结果
log.debug("开始计算");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回计算结果
promise.setSuccess(80);
}).start();

// 4.接收结果的线程,此时为主线程
log.debug("等待结果");
log.debug("结果为: {}", promise.get());

// 4.也可使用addListener方法异步获得结果
// code...
}
}

执行结果:

1
2
3
17:03:04.616 [main] DEBUG com.hongyi.netty.c4.TestNettyPromise - 等待结果
17:03:04.616 [Thread-0] DEBUG com.hongyi.netty.c4.TestNettyPromise - 开始计算
17:03:05.618 [main] DEBUG com.hongyi.netty.c4.TestNettyPromise - 结果为: 80

3.4 Handler & Pipeline

3.4.1 Pipeline

ChannelHandler用来处理Channel上的各种事件,分为入站和出站两种。所有ChannelHandler被连成一串,就是Pipeline。

  • 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工

打个比喻,每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler就是流水线上的各道工序,而ByteBuf是原材料,经过很多道工序的加工:先经过一道道入站工序,再经过一道道出站工序,就变成最终的产品。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 1.通过channel拿到pipeline
ChannelPipeline pipeline = ch.pipeline();
// 2.添加处理器
// 注意:netty会自动在流水线两端添加head和tail两个处理器
// 这里添加的处理器实质上式添加在head和tail之间
// 最终形式为:head <-> h1 <-> h2 <-> h3 <-> ... <-> tail 底层为双向链表
pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
// 父类该方法内部会调用fireChannelRead
// 将数据传递给下一个handler
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h4", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("4");
super.channelRead(ctx, msg);
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));// 触发出站处理器
}
});
// Outbound主要处理出站操作,一般为写操作,发生出站操作时会触发Outbound方法
// 出站时,handler的调用是从tail向前调用的
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}

执行结果:

1
2
3
4
5
6
10:36:02.629 [nioEventLoopGroup-2-2] DEBUG com.hongyi.netty.c4.TestPipeline - 1
10:36:02.629 [nioEventLoopGroup-2-2] DEBUG com.hongyi.netty.c4.TestPipeline - 2
10:36:02.629 [nioEventLoopGroup-2-2] DEBUG com.hongyi.netty.c4.TestPipeline - 3
10:36:02.629 [nioEventLoopGroup-2-2] DEBUG com.hongyi.netty.c4.TestPipeline - 4
10:36:02.634 [nioEventLoopGroup-2-2] DEBUG com.hongyi.netty.c4.TestPipeline - 6
10:36:02.634 [nioEventLoopGroup-2-2] DEBUG com.hongyi.netty.c4.TestPipeline - 5

注意出站顺序和入站顺序是相反的。

通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler

handler需要放入通道的pipeline中,才能根据放入顺序来使用handler

  • pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler
    • 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
  • 当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
  • 当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止

具体结构如下

image-20211104104329111

调用顺序如下

image-20211104104348298

3.4.2 OutboundHandler

socketChannel.writeAndFlush()

当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找OutboundHandler

1
2
3
4
5
6
7
8
9
// 代码片段
pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});

image-20211104105351486

ctx.writeAndFlush()

当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler

1
2
3
4
5
6
7
8
9
// 代码片段
pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});

image-20211104105629620

3.4.3 EmbeddedChannel

EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("1");
super.channelRead(ctx, msg);
}
};

ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("2");
super.channelRead(ctx, msg);
}
};

ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("3");
super.write(ctx, msg, promise);
}
};

ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("4");
super.write(ctx, msg, promise);
}
};

// 用于测试Handler的Channel
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);

// 执行Inbound操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
// 执行Outbound操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
}
}

3.5 ByteBuf

3.5.1 创建

调试代码

1
2
3
4
5
6
7
8
9
10
11
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}

该方法可以帮助我们更为详细地查看ByteBuf中的内容

创建

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestByteBuf {
public static void main(String[] args) {
// 自动扩容
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
log(buf);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 20; i++) {
sb.append("a");
}
buf.writeBytes(sb.toString().getBytes());
log(buf);
}
}
1
2
3
4
5
6
7
8
9
read index:0 write index:0 capacity:16

read index:0 write index:20 capacity:64
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000010| 61 61 61 61 |aaaa |
+--------+-------------------------------------------------+----------------+

ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小

当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作

如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建

3.5.2 直接内存与堆内存

通过该方法创建的ByteBuf,使用的是基于直接内存的ByteBuf

1
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);

可以使用下面的代码来创建池化基于堆的 ByteBuf

1
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);

也可以使用下面的代码来创建池化基于直接内存的 ByteBuf

1
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

验证

1
2
3
4
5
6
7
8
9
10
11
12
public class ByteBufStudy {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println(buffer.getClass());

buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
System.out.println(buffer.getClass());

buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
System.out.println(buffer.getClass());
}
}
1
2
3
4
5
6
7
8
// 使用池化的直接内存
class io.netty.buffer.PooledUnsafeDirectByteBuf

// 使用池化的堆内存
class io.netty.buffer.PooledUnsafeHeapByteBuf

// 使用池化的直接内存
class io.netty.buffer.PooledUnsafeDirectByteBuf
3.5.3 池化和非池化

池化的最大意义在于可以重用 ByteBuf,优点有

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

1
-Dio.netty.allocator.type={unpooled|pooled}Copy
  • 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
  • 4.1 之前,池化功能还不成熟,默认是非池化实现
3.5.4 组成

ByteBuf主要有以下几个组成部分

  • 最大容量与当前容量

    • 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
    • 当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常
  • 读写操作不同于ByteBuffer只用position进行控制,

    ByteBuf分别由读指针和写指针两个指针控制。进行读写操作时,无需进行模式的切换

    • 读指针前的部分被称为废弃部分,是已经读过的内容
    • 读指针与写指针之间的空间称为可读部分
    • 写指针与当前容量之间的空间称为可写部分

image-20211105102937038

3.5.5 写入
方法签名 含义 备注
writeBoolean(boolean value) 写入 boolean 值 **用一字节 01\ 00 代表 true\ false**
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian(大端写入),即 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian(小端写入),即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串 CharSequence为字符串类的父类,第二个参数为对应的字符集

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
  • 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)

使用方法示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
ByteBufUtil.log(buffer);

// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
ByteBufUtil.log(buffer);

buffer.writeInt(5);
ByteBufUtil.log(buffer);

buffer.writeIntLE(6);
ByteBufUtil.log(buffer);

buffer.writeLong(7);
ByteBufUtil.log(buffer);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
read index:0 write index:0 capacity:16

read index:0 write index:4 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+

read index:0 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+

read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 |............ |
+--------+-------------------------------------------------+----------------+

read index:0 write index:20 capacity:20
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07 |.... |
+--------+-------------------------------------------------+----------------+
3.5.6 扩容

当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作

1
2
buffer.writeLong(7);
ByteBufUtil.log(buffer);
1
2
3
4
5
6
7
8
9
10
11
// 扩容前
read index:0 write index:12 capacity:16
...
// 扩容后
read index:0 write index:20 capacity:20
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07 |.... |
+--------+-------------------------------------------------+----------------+

扩容规则

  • 如何写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容
    • 例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
  • 如果写入后数据大小超过 512 字节,则选择下一个 2^n^
    • 例如写入后大小为 513 字节,则扩容后 capacity 是 2^10^=1024 字节(2^9^=512 已经不够了)
  • 扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常
1
2
Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)
...
3.5.7 读取

读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针。

如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
buffer.writeInt(5);

// 读取4个字节
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
ByteBufUtil.log(buffer);

// 通过mark与reset实现重复读取
buffer.markReaderIndex();
System.out.println(buffer.readInt());
ByteBufUtil.log(buffer);

// 恢复到mark标记处
buffer.resetReaderIndex();
ByteBufUtil.log(buffer);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1
2
3
4
read index:4 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 |.... |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16

read index:4 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 |.... |
+--------+-------------------------------------------------+----------------+
3.5.8 释放

由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

释放规则

因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))

  • 入站 ByteBuf 处理原则

    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则

    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则

    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

      1
      while (!buffer.release()) {}

当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中

TailConext中释放ByteBuf的源码

1
2
3
4
5
6
7
8
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
// 具体的释放方法
ReferenceCountUtil.release(msg);
}
}

判断传过来的是否为ByteBuf,是的话才需要释放:

1
2
3
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}
3.5.9 切片

ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用

修改原ByteBuf中的值,也会影响切片后得到的ByteBuf

image-20211105105517913

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class TestSlice {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});

// 将buffer分成两部分
ByteBuf slice1 = buffer.slice(0, 5);
ByteBuf slice2 = buffer.slice(5, 5);

// 需要让分片的buffer引用计数加一
// 避免原Buffer释放导致分片buffer无法使用
slice1.retain();
slice2.retain();

ByteBufUtil.log(slice1);
ByteBufUtil.log(slice2);

// 更改原始buffer中的值
System.out.println("===========修改原buffer中的值===========");
buffer.setByte(0,5);

System.out.println("===========打印slice1===========");
ByteBufUtil.log(slice1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 |..... |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a |..... |
+--------+-------------------------------------------------+----------------+
===========修改原buffer中的值===========
===========打印slice1===========
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 02 03 04 05 |..... |
+--------+-------------------------------------------------+----------------+
3.5.10 优势
  • 池化思想 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如
    • slice、duplicate、CompositeByteBuf

4 应用

4.1 粘包与半包

4.2 协议设计与解析

4.3 在线聊天室