Jetty源码剖析系列(7) - 底层网络通信的细节

我们先来回顾一下Jetty负责网络连接的类ServerConnector的构造函数: 可以看到在ServerConnector的构造函数里会创建一个SelectorManager,然后加到它的bean里面(这里其实体现了Jetty的一个设计模式,就是设计了一个containner,把相关的模块放到这个containner里面,启动该container(调用doStart方法)就会连带启动里面的模块(调用模块的doStart方法), Jetty里面的Server,Connector都是这样的containner。)

我们再来看newSelectorManager方法: 就是创建一个ServerConnectorManager对象,ServerConnectorManager继承了SelectorManager类: ,我们再看一下SelectorManager的doStart方法: 可以看到在调用父类的doStart方法之前,会创建ManagedSelector,并将该selector加到SelectorManager的bean里面,随后再在调用父类的doStart方法时调用该selector的doStart方法,注意selector的数目是根据CPU的数量计算出来的, newSelector方法其实就是直接构建一个ManagedSelector对象: 我们看一下ManagedSelector的构造函数: 关于ManagedSelector,我在系列(4)里面已经有详细剖析。

此时我们先回过头去看一下ServerConnector的doStart方法: 它先调用open方法打开ServerSocketChannel,这个我在系列(6)里面有详细剖析。我们这里关注一下它调用的父类的doStart(AbstractConnector)方法: 它先调用了父类ConntainnerLifeCycle的doStart方法将它所拥有的bean(模块)都启动起来,其中就包括了我们前面加进去的SelectorManager,随即也就会把SelectorManager里面的ManagedSelector启动起来(调用它的doStart方法)。到此,我们可以看到ServerConnector会先启动Selector,然后再启动Acceptor。Connector,Selector,Acceptor,这三者的关系我画了下图来说明: 从上图可以看到,Acceptor负责接收网络连接,封装成一个channel,然后把该channel注册到Selector,由selector来monitor这个channel的状态是Readable还是Connectable还是Writable, 针对不同的状态来做不同的事,比如readable的时候,也就是TCP的receive buffer里面有发送给某个socket的数据时,tcp就会通知application来读取。其实以上所说的这些,都是标准的NIO的操作,与编程语言无关,与容器框架也无关,而Jetty也只是实现了这个标准的NIO操作而已。

Acceptor实现了Runnable接口,它是被扔到QueuedThreadPool线程池里面执行的:
Acceptor的run方法里面循环调用了accept方法:
值得注意的是,serverChannel的accept方法调用是blocking的,就是说直到有一个connect请求过来它才会返回,当然,这个是可配置的(其实它就是在ServerConnection open这个serverChannel的时候就设置成blocking的了),当接收到一个connect请求后,返回一个socketChannel对象,然后就调用accepted方法,我们注意看accepted方法里面最后那句,_manager.accept(channel), 这个_manager就是我们上文说到的SelectorManager,我们再看它的accept方法: 到此,我们就清楚地看到Selector跟Accptor是如何联系到一起了,当然这里的Selector是封装后的Selector,我们先看一下它的submit方法: 这个方法核心的部分是这句: _updates.offer(update),_updates是一个ArrayDeque<SelectorUpdate>,就是说submit方法会把一个SelectorUpdate对象塞到一个Deque里面,那我们再来看一下这个SelectorUpdate对象是什么:

到此,我们先来总结一下,就是Acceptor接收到一个connect请求后,会封装出一个Accept对象,这个对象实现了SelectorUpdate接口,然后把这个Accept对象塞到ManagedSelector的一个ArrayDeque里面,其实这里就是体现上文中的那个图,接收到connect请求后,封装好channel,交给Selector去monitor它的状态,就是说接下来的事就由Selector来handle了,在看ManagedSelector的doStart方法前,我们先来回顾一下它的构造函数: 我们看到它把一个EatWhatYouKill对象加到它的bean里面,那就意味着在启动ManagedSelector时也会启动这个EatWhatYouKill对象(调用它的doStart方法),同样我们先来看一下EatWhatYouKill的构造函数: 一样的套路,它把一个Producer对象加到它的bean里面了,这个Producer是上ManagedSelector的构造函数里面创建出来传给EatWhatYouKill的构造函数的,SelectorProducer对象。

这个时候就可以来看一下ManagedSelector的doStart方法了: 它先调用的父类的doStart方法,其实就是去启动它前面添加的beans(递归调用bean的doStart方法),然后通过selectorManager的newSelector方法打开一个真正的Selector对象: 然后就通过线程池去执行了EatWhatYouKill的的produce方法(这里用的是Java8的语法,而一个方法其实就是一段可执行的字节码,自然是runnable的): 这里核心的是tryProduce方法里面循环调用的doProduce方法: 这个方法比较长,我们看核心的,先看它调用的produceTask方法是生产了什么东西: 它调用的producer的produce方法,producer是上文提到的SelectorProducer对象,它的produce方法: 这个方法就是Jetty最核心的NIO实现了。它在一个while true的循环调用里面先调用processSelected方法,如果该方法返回一个非空的Runnable对象的话,它就将该Runnable对象返回给上层的EatWhatYouKill调用的doProduce方法,doProduce会再执行该Runnable。那我们先来看一下processSelected方法会返回一个什么样的Runnable对象: 它会去遍历cursor,cursor是一个SelectionKey的遍历器: 那么问题来了,如果我们这个jetty进程是刚刚开始启动的,那么这个cursor应当为空的,所以这个方法应当会直接return null出去了,所以现在我们暂且先不深究processSelected究竟会返回一个什么样的Runnable对象回去,这个放在后面讲。我们再回到SelectorProducer的produce方法里面,既然processSelected方法第一次被调用的时候先会返回null,那么,我们继续往下看,它接着调用了processUpdates方法: 这个方法核心的地方在于它遍历了一个SelectorUpdate的Deque,然后调用了该SelectorUpdate的update方法,那么这个SelectorUpdate的Deque是个什么呢?SelectorUpdate又是什么时候被加进Deque的呢?

这个就要回到前文那里,“.....Acceptor接收到一个connect请求后,会封装出一个Accept对象,这个对象实现了SelectorUpdate接口,然后把这个Accept对象塞到ManagedSelector的一个ArrayDeque里面”,原来这个SelectorUpdate就是在封装了一个Connect请求的Accept对象!到这里的时候,很多东西就开始联系上了 - 开始处理接收到的connect请求。 那我们就可以去看Accept的update方法做了什么:

是不是似曾相识?这不就是最基础的NIO的channel注册selector的代码吗?我们终于看到源头。

update方法先是将channel注册到selector,这里要注意到调用register方法的时候,传的第二个参数是0,0代表了该channel一开始给Selector注册的感兴趣的connect连接的动作(这里我说的可能不对,如果有更清楚的读者看到,请不吝赐教,留言或者发邮件到eric_cen@outlook.com)。

然后通过线程池执行了Accept,也就是Accept的run方法会被调用: 看到这里,我想大家都清楚地看到脉络,就是Jetty如何接受connect请求,最后如何创建出这个connection对象来。 先是创建出一个抽象的Socket5ChannelEndPoint,代表端对端,接着创建一个Connection对象: 我们看一下HttpConnection的构造函数: 里面的_channel.getRequest()引起了我的注意,这代表了Channel里面封装了request对象,这个Channel对象是通过调用newHttpChannel方法创建出来的: 我们继续进HttpChannelOverHttp的构造方法里面: 它先调用了父类HttpChannel的构造方法: OK,到这里,终于看到了Request和Response:

看到没有?在创建这个connection时候,就会同时创建一个Request和Response对象,这两个分别实现了HttpServletRequest和HttpServletResponse,也就是实现了javax-servlet-api规范,看到这里,也解答了我一直以来的一个疑问:像Jetty,tomcat这些java容器究竟是如何去实现java官方的API规范的。

在这里,我要特别指出的是,一个connection只有一个Request和Response对象,这个request和response是recycle的,就是说,比如你用浏览器发送多个URL HTTP请求过来,那这多个请求在Keep-alive的情况下都会用的是同一个TCP connection,对应的是Jetty里面的一个connection对象,而这多个请求对应的也只有这一个request对象。(关于这一点,在Request的注释里面有提及,也可以从代码中看到。)

我们再回到createEndPoint方法那里,在创建了connection对象后,调用了selectorManager的connectionOpened方法: HttpConnection的onOpen方法:
这里的核心是调用的fillInterested方法: 它给EndPoint的FillInterest注册了一个readCallback 那么FillInterest和ReadCallback又是什么呢?我们先来看FillInterest: 看到这里其实还是有点懵,不知道它是用来干嘛的,没关系,我们先继续看一下ReadCallback: 接着我们看FillInterest的register方法,看它如何注册readCallback: 它先通过CAS设置readcallback,设置成功的话就会调用FillInterest的needsFillInterest: 而它其实又是调用了EndPoint的needsFillInterest方法,所以关键是看EndPoint的needsFillInterest做了什么: 我们看到它把一个updateKeyAction又submit到Selector的SelectorUpdate Deque里了,我们来看一下这个updateKeyAction是什么:

核心在updateKey方法里面的这一句:key.interestOps(newInterestOps), 我们回想一下,在第一次给Channel注册Selector的时候,传的参数ops是0,代表的是SelectionKey.OPCONNECT, 而当建好连接后,Selector应该去monitor的是读和写的操作了,在这里就应该是准备去monitor读的操作,也就是SelectionKey.OPREAD,我们在EndPoint的needsFillInterest方法里面看到,传的参数正是: SelectionKey.OPREAD。

到此,创建Connection的过程就结束了,总结一下就是,Acceptor和Selector分别在不同的线程中,Acceptor每接收到一个connect请求,就会将其包装成一个Accept对象,将它submit到ManagedSelector的一个SelectorUpdate的Deque里面,而另一条线程中的EatWhatYouKill的SelectorProducer就会去monitor这个Deque里的SelectorUpdate对象的状态了。

我们再重新回到前文的SelectorProducer的produce方法: 当processSelected方法返回为null的时候,它继续往下调用完processUpdates方法和updateKeys方法后,就调用了select方法,这个select方法就是真正是的Selector去阻塞调用select方法来select出有状态改变(可读可写)的connection(底层其实是File Descriptor的状态): 可以看到前文的cursor其实就是通过Selector的Select方法来update的,其实就是把select出来的key放进cursor里面,等下个循环进到produce方法里面去调用processSelected方法时就会去从_curso里面遍历出可操作的key,然后拿出key里面的attachment(ChannelEndPoint), 再调用ChannelEndPoint的onSelected方法:

我们再看一下ChannelEndPoint的onSelected方法会返回一个什么Runnable对象: 可以看到它其实就是根据Key的状态判断它是可读还是可写的,然后封装一个Runnable的task返回给上层的线程池去执行,比如可读的_runFillable: 我们看到它的run方法,是调用了它的FillInterest的fillable方法,fillable方法如下: 我们看到它实际上调用的是Callback的succeeded方法,那这个Callback是什么呢?我们回到前文中的“FillInterest和ReadCallback”那里,疑团是不是解开了?就是我们在创建connection后,调用的HttpConnection的OnOpen方法,然后给FillInterest注册的ReadCallback! 那我们就可以去看ReadCallback的succeeded方法: 里面调用的其实就是HttpConnection的onFillable方法:

我们就可以看到,后面其实就是如何从这个HttpConnection去读取和解析客户端传输过来的数据的过程了。

至此,Jetty的底层网络通信剖析算是从头到尾捋了一遍,其中有不少细节还有待推敲。请看到的读者如果发现什么问题。不吝赐教,可以留言或者发邮件到eric_cen@outlook.com, 或者添加我的微信告知,微信号:13929556523.

Written on 16 April 2018