初识Netty

Netty 到底是何方神圣?

用一句简单的话来说就是:Netty 封装了 JDK 的 NIO,让你用得更爽,你不用再写一大堆复杂的代码了。

用官方正式的话来说就是:Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。

使用 Netty 不使用 JDK 原生 NIO 的原因

  • 使用 JDK 自带的 NIO 需要了解太多的概念,编程复杂,一不小心 bug 横飞
  • Netty 底层 IO 模型随意切换,而这一切只需要做微小的改动,改改参数,Netty 可以直接从 NIO 模型变身为 IO 模型
  • Netty 自带的拆包解包,异常检测等机制让你从 NIO 的繁重细节中脱离出来,让你只需要关心业务逻辑
  • Netty 解决了 JDK 的很多包括空轮询在内的 Bug
  • Netty 底层对线程,selector 做了很多细小的优化,精心设计的 reactor 线程模型做到非常高效的并发处理
  • 自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
  • Netty 社区活跃,遇到问题随时邮件列表或者 issue
  • Netty 已经历各大 RPC 框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大

强烈不建议直接基于 JDK 原生 NIO 来进行网络开发,原因:

  • JDK 的 NIO 编程需要了解很多的概念,编程复杂,对 NIO 入门非常不友好,编程模型不友好,ByteBuffer 的 Api 简直反人类
  • 对 NIO 编程来说,一个比较合适的线程模型能充分发挥它的优势,而 JDK 没有给你实现,你需要自己实现,就连简单的自定义协议拆包都要你自己实现
  • JDK 的 NIO 底层由 epoll 实现,该实现饱受诟病的空轮询 bug 会导致 cpu 飙升 100%
  • 项目庞大之后,自行实现的 NIO 很容易出现各类 bug,维护成本较高

服务端启动流程

总结:

  • Netty 服务端启动的流程,一句话来说就是:创建一个引导类,然后给他指定线程模型,IO 模型,连接读写处理逻辑,绑定端口之后,服务端就启动起来了。
  • bind 方法是异步的,我们可以通过这个异步机制来实现端口递增绑定。
  • Netty 服务端启动额外的参数,主要包括给服务端 Channel 或者客户端 Channel 设置属性值,设置底层 TCP 参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class NettyServer {

public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();

//服务端启动其他方法 handler
serverBootstrap.handler(new ChannelInitializer<NioServerSocketChannel>() {
@Override
protected void initChannel(NioServerSocketChannel ch) {
System.out.println("服务端启动中");
}
});

//attr() 方法
serverBootstrap.attr(AttributeKey.newInstance("serverName"), "nettyServer");

int port = 8000;
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
});
bind(serverBootstrap, port);

}

/**
* 自动绑定递增端口
*
* @param serverBootstrap
* @param port
*/
private static void bind(final ServerBootstrap serverBootstrap, final int port) {
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
bind(serverBootstrap, port + 1);
}
});
}

}

客户端启动流程

总结:

  • 一句话来说就是:创建一个引导类,然后给他指定线程模型,IO 模型,连接读写处理逻辑,连接上特定主机和端口,客户端就启动起来了。
  • connect 方法是异步的,我们可以通过这个异步回调机制来实现指数退避重连逻辑。
  • netty 客户端启动额外的参数,主要包括给客户端 Channel 绑定自定义属性值,设置底层 TCP 参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class NettyClient {

private static int MAX_RETRY = 5;

public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();

// 1.指定线程模型
bootstrap.group(group)
//2.指定IO模型
.channel(NioSocketChannel.class)
//3.IO处理逻辑
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
});

// 4.建立连接
connect(bootstrap, "127.0.0.1", 8000, MAX_RETRY);
}

private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else if (retry == 0) {
System.err.println("重试次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
}
});
}
}

双向通信

  • 通过给逻辑处理链 pipeline 添加逻辑处理器,来编写数据的读写逻辑。
  • 在客户端连接成功之后会回调到逻辑处理器的 channelActive() 方法,而不管是服务端还是客户端,收到数据之后都会调用到 channelRead 方法。
  • 写数据调用writeAndFlush方法,客户端与服务端交互的二进制数据载体为 ByteBuf,ByteBuf 通过连接的内存管理器创建,字节数据填充到 ByteBuf 之后才能写到对端

client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new FirstClientHandler());
}
});

public class FirstClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) {

System.out.println(new Date() + ": 客户端写出数据");
// 1. 获取数据
ByteBuf buffer = getByteBuf(ctx, "你好,我是client1");
// 2. 写数据
ctx.channel().writeAndFlush(buffer);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

System.out.println(new Date() + ": 客户端读到数据 -> " + byteBuf.toString(Charset.forName("utf-8")));
}

private ByteBuf getByteBuf(ChannelHandlerContext ctx, String content) {

// 1. 获取二进制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 2. 准备数据,指定字符串的字符集为 utf-8
byte[] bytes = content.getBytes(Charset.forName("utf-8"));
// 3. 填充数据到 ByteBuf
buffer.writeBytes(bytes);

return buffer;
}
}

server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new FirstServerHandler());
}
});

public class FirstServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) {

System.out.println(new Date() + ": 接入netter server ! ");
// 1. 获取数据
ByteBuf buffer = getByteBuf(ctx,"hi,i'm netty server!");
// 2. 写数据
ctx.channel().writeAndFlush(buffer);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {

// 收数据逻辑
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ": 服务端读到数据 -> " + byteBuf.toString(Charset.forName("utf-8")));

// 回复数据到客户端
System.out.println(new Date() + ": 服务端写出数据");
ByteBuf out = getByteBuf(ctx,"netty server response success.");
ctx.channel().writeAndFlush(out);
}

private ByteBuf getByteBuf(ChannelHandlerContext ctx,String content) {

byte[] bytes = content.getBytes(Charset.forName("utf-8"));
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
}

分享到: