Netty 框架

1.1 Netty 简介

Netty 是一个高性能、异步事件驱动的 NIO 框架,它提供了对 TCP、UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞的,通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。

本文并不会详细的讨论的 Netty 框架的使用方法,关于其详细的使用方法,可以参考 Netty in Action 一书,书中对 Netty 的相关原理和使用方法做了非常细致的讲解。本文虽然只会使用到 Netty 的一小部分特性,但是对于没有接触过 Netty 的读者来说,可以使其快速的了解 Netty 的整体结构,然后方便其对 Netty 的进一步学习。本文通过 Netty 构建了一个非常具有代表性的 IM 服务端(同时也说明本文并没有介绍 Netty 的客户端使用方法,有兴趣的读者可自行查阅),然后通过 WebSocket 协议从而实现了一个在线聊天系统。

1.2 通过 Netty 构建一个 WebSocket 服务器

我们把一个 Netty 服务分为三部分(分别对应了 3 个 Java 类)来讲:

  • ServerBootStrap
  • ChannelInitializer
  • Handler
  1. ServerBootStrap,用于启动一个 Netty 服务,执行了监听端口、获取请求等等这样的操作;具体实现代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    public class WebSocketServer {

    private int port;

    public WebSocketServer(int port) {
    this.port = port;
    }

    public void run() throws Exception {

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new WebSocketServerInitializer()) // 创建了 WebSocketServerInitializer
    .option(ChannelOption.SO_BACKLOG, 128)
    .childOption(ChannelOption.SO_KEEPALIVE, true);

    System.out.println("WebSocketServer 启动了" + port);

    ChannelFuture f = b.bind(port).sync();
    f.channel().closeFuture().sync();

    } finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();

    System.out.println("WebSocketServer 关闭了");
    }
    }

    public static void main(String[] args) throws Exception {
    int port;
    if (args.length > 0) {
    port = Integer.parseInt(args[0]);
    } else {
    port = 8080;
    }
    new WebSocketServer(port).run();
    }
    }
  2. ChannelInitializer,看它的名字就知道它是一个和初始化有关的类。确实,它是用来初始化 ChannelPipeline,并且向其中加入各种我们需要用到的 Channel 的;具体实现代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();

    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new HttpObjectAggregator(64*1024));
    pipeline.addLast(new ChunkedWriteHandler());
    pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
    pipeline.addLast(new WebSocketFrameHandler()); // 使用了 WebSocketFrameHandler 来对 Channel 进行处理
    }
    }
  3. 第 2 条中提到了 Channel 的概念,其实你可以把 Channel 看做成一个连接,多个 Channel 就会构成一个 ChannelGroup。我们在这里定义了一个 Handler 管理这些 Channle,Handler 本身会包含了各种回调函数,比如 read、added、removed 等等,当事件发生时,就会触发这些函数的执行,从而实现管理 Channle 的目的。具体实现代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    public class WebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 这里存储了所有的 Channel

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
    Channel incoming = ctx.channel();
    for (Channel channel : channels) {
    if (channel != incoming){
    channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
    } else {
    channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
    }
    }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();

    // Broadcast a message to multiple Channels
    channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));

    channels.add(incoming);
    System.out.println("Client:"+incoming.remoteAddress() +"加入");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();

    // Broadcast a message to multiple Channels
    channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开"));

    System.out.println("Client:"+incoming.remoteAddress() +"离开");

    // A closed Channel is automatically removed from ChannelGroup,
    // so there is no need to do "channels.remove(ctx.channel());"
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    System.out.println("Client:"+incoming.remoteAddress()+"在线");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    System.out.println("Client:"+incoming.remoteAddress()+"掉线");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    Channel incoming = ctx.channel();
    System.out.println("Client:"+incoming.remoteAddress()+"异常");
    // 当出现异常就关闭连接
    cause.printStackTrace();
    ctx.close();
    }

    }

通过以上这三个类,我们已经构建好了一个 WebSocket 服务器,可以用于与浏览器进行 WebSocket 连接并且通信了,完整的代码在 GitHub 上可以找到。

WebSocket 协议

2.1 WebSocket 简介

WebSocket 协议是作为一种对 HTTP 协议的补充而提出来的。因为 HTTP 协议的无状态特性,导致服务器无法主动的向客户端进行消息推送,如果我们想要实现一个网页聊天功能,在以前,就必须要由客户端不断的向服务器进行轮询,通过这种不断发送请求的方式来检查服务器是否有新消息发送。显而易见,这种方式十分低效并且浪费资源。

WebSocket 并不依赖于 HTTP 来实现,他们相互独立。WebSocket 是一种在 TCP 上直接进行全双工通讯的协议,它是有连接的。

如果想要与服务器进行 WbeSocket 连接,客户端需要先在某一个 HTTP 请求中显示的发送一个 WebSocket 连接请求,服务器收到请求后,会发出一个回应,然后与客户端构建一个 WebSocket 连接,这个过程通常称为“握手”。“握手”成功之后,客户端和服务器就可以通过已经构建好的这个 WebSocket 连接进行通讯了,这个通讯是有链接的。此外,这个 WebSocket 的连接不会对接下来的客户端与服务器其他的 HTTP 请求产生任何影响。

下图是一个客户端请求 WebSocket 连接以及服务器返回的相应的响应数据,着重观察红色方框中的内容,它表示了一个 HTTP 请求是如何升级为一个 WebSocket 的:

2.2 使用 Nginx 搭建一个静态文件服务器并设置反向代理

想要实现 WebSocket 需要浏览器的支持,不过现代浏览器已经都支持了这一项特性。接下来我们通过 Nginx 来搭建一个 Web 服务器用来存放静态的聊天网页并且对 WebSocket 的链接进行反向代理,配置如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
server {

listen 80;
server_name 127.0.0.1;

location / {
root /home/html;
index index.html;
}

location /ws {
proxy_pass http://127.0.0.1:8080;
proxy_http_version 1.1;
proxy_read_timeout 300s; # Nginx 的 WebSocket 超时配置,默认 60s 自动断开链接
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}

可以看到:

  1. 我们部署了一个服务监听在 80 端口,然后这个 Web 服务器会默认打开 index.html 页面,这个就是我们接下来在前台通过 JavaScript 实现 WebSocket 功能的前台聊天页面,具体实现会在下面详细说明;
  2. 第二个 location 是对路径为 /ws 的请求进行了反向代理,这个对应的就是 WebSocket 的连接。可以看到,此链接被反向代理到 8080 端口,也就是上面我们所写的 Netty 服务器所运行的端口,至于路径为什么是 /ws 我会在下面进行解释。另外的三个代理参数表示了这个代理的连接是一个 HTTP 的升级连接,在这里就是 WebSocket 连接。

2.3 JavaScript 实现 WebSocket 客户端

浏览器内部可以通过 js 来实现 WebSocket 的相关操作,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket; // 如果浏览器不支持 WebSocket,那就使用 MozWebSocket
}
var socket; // 定义一个连接
if (window.WebSocket) {
socket = new WebSocket("ws://127.0.0.1/ws"); // 打开一个新的连接
socket.onmessage = function(event) { // 收到信息的回调函数
console.log('收到信息:' + event.data);
};
socket.onopen = function(event) { // 连接打开的回调函数
console.log('连接开启!');
};
socket.onclose = function(event) { // 连接关闭的函数函数
console.log('连接关闭!');
};
} else {
alert("你的浏览器不支持 WebSocket!");
}

function send(message) {
if (!window.WebSocket) { // 若不支持 WebSocket,不执行发送操作
return;
}
if (socket.readyState == WebSocket.OPEN) { // 如果连接已打开,执行发送操作
socket.send(message);
} else {
alert("连接没有开启.");
}
}

查看上面的第 6 行代码,我所设置的请求连接地址为ws://127.0.0.1/ws,第一个 ws 定义了使用 WebSocket 协议,第二个 ws 就和上面的 Nginx 中路径匹配相对应起来了,也就是对 /ws 路径进行反向代理操作。点击这里可以看到完整的前台页面实现代码。