更新时间: 2019-12-12 10:27:00       分类: 学习笔记


简介

在近期的某个项目中,需要在用户和目标服务器之间设立一个流量代理,拦截并转发HSF和HTTP请求,这便需要我们去编写相应的代理服务器。如果按照常规的思路,针对HTTP和HSF两种不同类型的流量,需要编写两套不同的代码(HTTP使用HttpClient转发,HSF使用Generic Invoke转发)。但由于实际场景中我们并不需要对用户的流量进行任何处理,而是直接原封不动地转发给目标服务器(这种转发称为盲转),因此可以直接将编程的视角转换从应用层转换到传输层,直接编写一个基于TCP协议的代理转发服务器,即可满足要求。

既然要面向网络传输编程,就让人不得不联想到Netty这款Java生态圈中享有盛誉的网络编程框架,它凭借着优雅的编程模型和高性能的网络IO处理得到了广大开发人员的追捧,本文接下来便介绍如何使用Netty框架编写一个TCP代理服务器。

Netty基本知识

在开始编写代码前我们首先需要对Netty框架和基本的网络IO有一定的了解,由于网上关于这些内容的文章很多,本文在此只简单介绍一下,不做深入的讨论。

BIO与NIO

Netty之所以能够拥有超高的传输性能,最大的原因就是它采用了非阻塞的IO模型(Nonblocking I/O, NIO),相比与传统的阻塞IO模型(Blocking I/O,BIO),NIO的并发处理能力有一个质的飞跃,让我们通过下面两张图对比BIO和NIO的区别:

BIO通信模型

NIO通信模型

对于服务端而言,当一个客户端连接成功建立之后需要做两件事情,首先是需要接受完客户端发送的所有数据,然后是处理数据并将相应结果返回给客户端。

在BIO中,第一步操作是阻塞的,因此每个线程只能处理一个请求,如果这个请求的数据传输没有结束,该线程就不能去处理下一个请求。当然,机器的线程资源是有限的,这就限制了BIO对高并发的支持能力。

在NIO中,引入了Selector,使得一个线程可以同时处理多个连接。当一个Socket连接建立后,线程并不会阻塞地等待数据传输,而是将这个请求交给Selector去处理。Selector会不断地去遍历所有地Socket,一旦有某个Socket的数据读取完成,就会主动去通知对应的线程,此时线程进行业务逻辑的处理并返回数据给客户端。这个过程是不阻塞的,因此一个线程可以被复用处理多个不同的Socket连接(其实就是多路复用的IO模型)。

零拷贝

Netty传输速度快的另一个秘诀是采用了零拷贝特性。所谓零拷贝就是使用堆外内存来存放数据,对于JVM而言,大部分的对象数据都是存放在堆内存中的,如果数据需要从IO读取到内存,需要经过Socket缓冲区,这样就会导致一份数据需要经过两次拷贝才能够到达目的地。而使用堆外内存就可以解决这个问题,两种拷贝方式的比较如下图所示:

传统拷贝技术 零拷贝技术

需要注意的是,由于Netty采用了零拷贝技术,在debug时我们无法在Watch中直接观察到传输数据的具体内容,这可能会给调试带来一些麻烦。

Netty编程模型

Java在JDK1.5版本中引入了NIO的编程模型,但是平心而论,这些相对原始的API使用起来并不方便。Netty在此基础上进行封装,设计出了一套十分方便易用的编程模型,充分应用了流式编程的思想,让开发非阻塞的网络服务器变得十分简单。接下来对Netty编程模型中的核心概念和思路做一下介绍。

  1. Channel

Channel是Netty中最基础的概念,它对应着一个具体的Socket连接,可以理解为每一个请求都对应着一个Channel

  1. ChannelPipeline & ChannelHandler

Netty中采用责任链模式来对请求进行处理,每个Channel对应了一个处理流程ChannelPipeline,其中包含了很多的ChannelHandler,开发时编写的业务逻辑通常都在其中实现。为了实现数据在Pipeline中的的传输,Netty中还设立了ChannelHandlerContext上下文变量。

上述三个概念和Channel间的关系如下所示:

undefined ChannelHandler作为一个抽象的接口,有着很多不同的实现,在实际编程中,开发者一般通过继承ChannelInboundHandlerAdapter(处理入站请求)和ChannelOutboundHandlerAdapter(处理出站响应)并重写它们的方法来编写业务定制的ChannelHandler

  1. Future

Netty是一个完全异步的框架,当你查看Channel的API时会发现,大部分方法的返回结果都是一个ChannelFuture对象。了解JDK异步编程的同学对Future应该都不陌生,它是异步操作结果的占位符。

在Netty中,若要获取一个异步操作的结果,通常使用addListener()方法传入一个ChannelFutureListener的实现,这样便可以方便地实现异步回调。另外如果要同步阻塞地等待某个操作完成,只需要调用sync()方法即可。

  1. EventLoop

EventLoop是Netty中对控制流和线程的抽象,简单来说,你可以认为一个EventLoop就对应了一个实际处理请求的线程。在实际开发中我们还会经常用到一组线程(线程池)来处理请求,这时候可以使用EventLoopGroup来处理。

有关Netty EventLoop底层的线程模型,可以参考这篇文章,其设计相当精妙,值得深入研究。

  1. Bootstrap

现在我们了解了Netty中对IO操作进行的各种抽象,便可以编写各种业务逻辑了。但是我们还需要一个实体类来启动Server,就像Spring中的Application对象。

Netty中为我们提供了引导服务器(Bootstrap),可以通过链式调用方便地构建出Server的实例。创建Bootstrap时我们需要为其指定一个EventLoopGroup为其提供工作线程。

下面给出一个简单的代码来展示Bootstrap类是如何将各种组件组合在一起,形成一个完整可运行Server的:

ServerBootstrap serverBootstrap =
            new ServerBootstrap()
            .channel(NioServerSocketChannel.class) // 指定通道类型,也就是指定IO模式
            .group(new NioEventLoopGroup()) // 指定EventLoopGroup
            .localAddress(new InetSocketAddress(8080)) // 绑定本地端口
            .channelHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline()
                        .addLast(new BizChannelHandler()); // 指定业务handler, BizChannelHandler是具体的实现
                }
            });

ChannelFuture f = serverBootstrap.bind().sync(); // 同步阻塞等待端口绑定成功,完成后server即开始运行

f.channel().closeFuture().sync(); // 阻塞当前线程直到运行完成

...

接下来的代码实现中我们将使用上面提到几种组件完成代理服务器的开发,如果你想系统了解有关更多Netty框架编程模型的知识,推荐阅读《Netty实战》一书。

代理服务器模型

现在让我们分析一下如何编写代理服务器。总的来说,一个代理服务器主要需要做两件事情:

  1. 拦截用户的请求并将其转发给目标服务器

  2. 拦截目标服务器的响应并将其回写给用户

示意图:

undefined

可以看到,代理服务器需要同时和客户端及目标服务器之间建立两个Channel(Socket),在ServerChannel中,代理服务器扮演Server的角色,而在ClientChannel中,代理服务器扮演的则是Client的角色。

就盲转的情况而言,由于我们并不需要对请求和响应的内容进行任何额外的处理,所以只需要编写两个简单的Bootstrap和对应的ChannelHandler即可,接下来我们就来看看具体的代码实现。

代码实现

转发的核心逻辑都通过ChannelHandler来实现,因此首先编写两个Channel所对应的ChannelHandler

  1. ProxyServerChannelHandler(对应ServerChannel)
/**
 * netty channel handler for proxy server-side.
 *
 * @author lumin
 */
public class ProxyServerChannelHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(ProxyServerChannelHandler.class);

    // 指向clientChannel的引用
    private volatile Channel clientChannel;

    // 标记是否已经初始化过代理
    private Boolean proxyInited = false;

    // 目标服务器的ip
    private String targetHost;

    // 目标服务器的端口
    private int targetPort;

    public ProxyServerChannelHandler(String targetHost, int targetPort) {
        this.targetHost = targetHost;
        this.targetPort = targetPort;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 当用户连接成功建立时会回调此方法
        // 此时可以建立到目标服务器的连接(即初始化clientChannel)
        super.channelActive(ctx);
        logger.debug("channel active.");
        // 双检锁,确保代理只被初始化一次
        if (!proxyInited) {
            synchronized (proxyInited) {
                if (!proxyInited) {
                    prepareClientChannel(ctx.channel());
                    proxyInited = true;
                }
            }
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 客户端请求消息体过大时会拆包,这个方法会被调用多次
        // 仅在第一次触发时激活实例和创建代理,注意考虑多并发情况
        logger.debug("channel read.");
        
        if (clientChannel.isActive()) {
            // 将内容原封不动地写入clientChannel,转发给目标服务器
            clientChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            });
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
        logger.debug("channel read complete");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("exception happened: ", cause);
    }

    private void prepareClientChannel(Channel serverChannel) throws Exception {
        //创建Client Bootstrap
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ProxyClientChannelHandler(serverChannel));
        // 建立到目标服务器的连接,注意这里一定要同步等待连接建立完成
        // 否则实际读取用户请求的时候,到目标服务器的连接还没有完成,会导致丢包造成请求不完整
        ChannelFuture f = bootstrap.connect(targetHost, targetPort).sync();
        clientChannel = f.channel();
    }
}
  1. ProxyClientChannelHandler (对应ClientChannel)
public class ProxyClientChannelHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(ProxyClientChannelHandler.class);

    // 指向serverChannel的引用
    private Channel serverChannel;

    public ProxyClientChannelHandler(Channel serverChannel) {
        this.serverChannel = serverChannel;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 原封不动地将响应内容会写到serverChannelel中
        logger.debug("msg wrote.");
        if (msg != null) {
            serverChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            });
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (serverChannel.isActive()) {
            serverChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("fatal error: ", cause);
        // close the channel
        ctx.close();
    }
}

接着编写ProxyServer的启动类即可:

  1. Server启动类
public class ProxyServer {

    public static void main(String[] args) {
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossLoopGroup, workerLoopGroup)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline()
                        // 在这里指定目标服务器
                        .addLast(new ProxyServerChannelHandler("127.0.0.1", 8080));
                }
            })
            .localAddress(new InetSocketAddress(80));
        try {
            serverBootstrap.bind().sync();
        } catch (Exception e) {
            bossLoopGroup.shutdownGracefully();
            workerLoopGroup.shutdownGracefully();
            // close the loop group on exception.
            logger.error("fail to launch proxy server at port " + port, e);
            throw e;
        }
    }
}

评论

还没有评论