tomcat处理请求
组件
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Server |-- Service |--|-- Connector |--|--|-- Prototal |--|--|--|-- Endpoint |--|--|--|--|-- Acceptor |--|--|--|--|-- Executor |--|--|--|-- Processor |--|--|-- Mapper |--|--|-- CoyteAdapter |--|-- Container |--|--|--|-- Host |--|--|--|-- Context |--|--|--|-- Wrapper |--|-- Executor
|
Http11NioProtocol
1 2 3 4 5 6 7 8 9 10 11 12 13
| Http11NioProtocol |-- NioEndpoint |--|-- LimitLatch |--|-- Acceptor |--|-- Poller |--|-- Poller池 |--|-- SocketProcessor |--|-- Executor |--|-- PollerEvent |--|-- KeyAttachment |--|-- NioBufferHandler |-- Http11ConnectionHandler |-- Http11NioProcessor
|
Connector
接收客户端连接并响应
Protocol协议
协议的抽象.
Protocol分类: Http11Protocol,Http11NioProtocol 等.
Http11NioProtocol
包含 NioEndpoint 和 Http11NioProcessor
NioEndpoint
LimitLatch
连接数控制,nio模式下默认是10000,达到这个阈值后,就会拒绝连接请求.使用AQS实现的一个共享锁.
LimitLatch相关流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| - 初始化最大连接数量 - endpoint.startInternal() - initializeConnectionLatch(); 初始化LimitLatch,用于控制连接的数量 - connectionLimitLatch = new LimitLatch(getMaxConnections()); 根据最大的连接数量来创建
- acceptor接收请求前,连接计数+1.若到达最大值则等待其他连接释放 - acceptor.run() - countUpOrAwaitConnection();// 增加连接计数,若connection数量已经达到了最大等待 socket = serverSocket.accept(); 然后再等待连接,若上一步阻塞则不会到此处
- 连接计数-1 - (!setSocketOptions(socket)) { //这里主要是将socket加入到poller对象上面去,而且还要设置参数 countDownConnection(); // 加入poller对象失败了的话,那么将LimitLatch的计数减1 } - endpoint.cancelKey() - countDownConnection();
|
Acceptor
接收socket连接
通过 serverSocket.accept() 获得 SocketChannel 对象,封装成 org.apache.tomcat.util.net.NioChannel.
再将 NioChannel 封装成 PollerEvent, 并将 PollerEvent 压入 events queue里,即ConcurrentLinkedQueue<PollerEvent> events
里.
Poller 轮询器
轮询 events.由 Poller 将就绪的事件 生成 SocketProcessor,交由 Executor 处理.
Poller池
包含若干 Poller 组件
SocketProcessor
具体的 http 请求头等解析操作全是 Http11NioProcessor 处理的, 但是是在 NioEndpoint.SocketProcessor 线程的run()方法里调用的.
- 结构(属性)
- NioChannel socket
- SocketStatus status
Executor
Executor 拿到 Poller 传过来的 socket 后,将 socket 封装在 SocketProcessor 对象中.
然后从 Http11ConnectionHandler 中取出 Http11NioProcessor 对象.
PollerEvent
对 NioChannel 的包装,用于注册到 Poller.event 队列中.
- 属性
- NioChannel socket
- NioChannel是 非阻塞通道.负责将数据读到缓冲区中,或将数据从缓冲区中写入.
- int interestOps; // 感兴趣的事件,第一次是OP_REGISTER
- KeyAttachment key;// KeyAttachment包装nioChannel
KeyAttachment
- 注册到SelectionKey上的附件,对 NioChannel 的包装类.
NioBufferHandler
- 分配nio的buffer的读写空间,内部包含ByteBuffer类型的readbuf和writebuf属性.在构建NioChannel时使用.
Http11ConnectionHandler
- 结构(属性)
protected final Map<S,Processor<S>> connections = new ConcurrentHashMap<S,Processor<S>>();
connections 这个 Map维护NioChannel(内部包含socket通道)与processor的关系(如Http11NioProcessor).
所以若processor一次不能读取到 所需的所有数据,就等下一次poller轮询时根据socket找到这个processor继续读取.
Http11NioProcessor
nio方式解析请求头等.
- 结构
- InternalNioInputBuffer
- InternalNioOutputBuffer
Nio涉及的其他类
- NioChannel
- 作用
- NioChannel是 非阻塞通道.负责将数据读到缓冲区中,或将数据从缓冲区中写入.
- 结构
- Nio.SocketChannel
- NioEndpoint.NioBufferHandler
- SocketProperties
- 从server.xml的Connector节点上获取参数值,设置nio的socket属性
Mapper
CoyoteAdapter
处理请求
1. Acceptor线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| - 1.Acceptor线程操作 NioEndpoint中 acceptor.run() - 该acceptor线程一直循环,直到接收到关闭命令 - countUpOrAwaitConnection(); 通过 LimitLatch 增加连接计数,若已经到达 最大连接数,则让该acceptor线程等待(直到其他线程释放连接),相当于拒绝掉客户端请求 - socket = serverSock.accept(); 接收新连接,返回SocketChannel客户端连接对象,这里用的是阻塞模式. 整个循环靠此处阻塞,直到接收到客户端请求才继续往下走 - setSocketOptions(socket) 设置socket属性,这里面主要 将socketChannel包装到nioChannel里. 再将nioChannel包装到PollerEvent里,并放入轮询的事件队列events中 - socket.configureBlocking(false); 将socket连接通道设置为非阻塞模式 - Socket sock = socket.socket(); 从nio.SocketChannel中获取java.net.Socket - socketProperties.setProperties(sock); 设置socket的参数值(从server.xml的Connector节点上获取参数值), 如 socket发送,接收的大小.心跳检测,超时时长等 - NioChannel channel = nioChannels.poll(); 构造NioChannel对象,从ConcurrentLinkedQueue (即NioChannel的缓存队列) 中取 - 附:ConcurrentLinkedQueue<NioChannel>类型的nioChannels对象的作用 将关闭通道的nioChannel放入Queue缓存起来,方便复用.复用时替换掉其内部的SocketChannel对象即可, NioChannel包含的其他属性只需做重置操作即可.当获取不到时再重新创建 - 取不到则新创建 channel = new NioChannel(socket, bufhandler); 将socketChannel对象 封装到NioChannel对象中.非SSL的. 1.socket参数: 进行读写操作的nio.SocketChannel对象. 2.bufhandler参数: 为NioChannel.NioBufferHandler.这个类用于 根据xml配置 分配nio的buffer的读写空间. 是对nio.Buffer的封装. 提供用于操作 待写入SocketChannel缓存区 和 读取SocketChannel的缓冲区的方法. - getPoller0().register(channel); 将新接收到的SocketChannel注册到event队列上 (将nioChannel包装成PollerEvent 注册到轮询线程events上) - getPoller0() 轮询当前的Poller线程数组,从中取出一个Poller并返回 - Poller.register() 将新接收到的SocketChannel注册到event队列上(nioChannel包装成PollerEvent 注册到轮询线程) 1.创建KeyAttachment并与poller关联; 2.将channelSocket(nioChannel)包装成PollerEvent 注册到事件队列(events)中; 3.是否需要对selector wakeup的处理 - new KeyAttachment(socket) 创建或从缓存中取出KeyAttachment并重置. KeyAttachment是对NioChannel的一层包装 - ka.reset(this,socket,getSocketProperties().getSoTimeout()); 重置KeyAttachment对象中Poller,NioChannel等成员变量的引用 - ka.interestOps(SelectionKey.OP_READ); 设置keyAttachement 读操作(SelectionKey.OP_READ) 为感兴趣的操作 - PollerEvent r = eventCache.poll(); 从通道缓存队列中取出一个PollerEvent. 取到则重置属性,取不到则新建 - 取不到则新建 r = new PollerEvent(socket,ka,OP_REGISTER); 若取出为空,则新建.将nioChannel包装到PollerEvent对象中. 否则,重置PollerEvent中的属性. 并注册一个OP_REGISTER(对应)928行pollerEvent.run()里 - poller.addEvent(r); 将PollerEvent添加到轮询队列(poller.events)中. 相当于每个poller都维护了一个event队列,代表这个poller需要处理的事件列表 - events.offer(event); 将event添加到events队列中 - if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); 使一个还未返回的 select() 方法立即返回.即若当前事件队列中 没有事件,则唤醒处于阻塞在selector.select()状态上的线程(会唤醒selector).与1208行对应 - 附:Poller类中protected AtomicLong wakeupCounter = new AtomicLong(0l);的作用 唤醒 多路复用器(即nio中的selector)的条件阈值. 作用: 1.告诉Poller当前有多少个新连接,这样当Poller进行selector的操作时,可以选择是否需要阻塞来等待读写请求到达; 2.标识Poller在进行select选择时,是否有连接到达.如果有,就让当前的阻塞调用立即返回
|
2. Poller线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| - 2.Poller线程操作 poller.run() - poller.events() 从events队列中取出pollerEvent对象并run(); 即处理轮询器的事件队列中的事件,若事件队列是空的则返回false,否则返回true. 每个poller都有一个events队列,poller会一直循环取出里面的event进行处理 - while ( (r = events.poll()) != null ) 循环从events队列 中 逐个取出PollerEvent事件,运行每一个PollerEvent的处理逻辑 - r.run(); 运行每一个PollerEvent的处理逻辑.即PollerEvent.run(). 注意,此处还是在当前线程中完成.因为调用的是r.run()不是r.start(), 可以理解成只是一个普通方法,在当前而不是新起一个线程 - PollerEvent.run() 若这个pollerEvent中的interestOps是OP_REGISTER, 则将pollerEvent.nioChannel.socketChannel注册到poller.selector上 - if ( interestOps == OP_REGISTER ) // PollerEvent最初的interestOps就是OP_REGISTER socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key); 从nioChannel中获取socketChannel,从poller中获取selector. 则此时将socketChannel.register(selector, OP_READ, keyAttachment),即将socketChannel向selector注册读感兴趣事件. - 上一步r.run()执行完后 ((PollerEvent)r).reset(); eventCache.offer((PollerEvent)r); 只要从events中取出后,便重置PollerEvent并缓存到eventCache中, 方便复用PollerEvent对象 - Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; 根据向selector中注册的key遍历channel中已经就绪的SelectionKey,并处理这些key - iterator遍历 SelectionKey sk = iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); 这里的attachment()返回的就是在register()中注册的KeyAttachement,该对象是对socket的包装类 attachment.access();更新 通道最近一次发生事件的事件,防止因超时没有事件发生而被剔除出selector iterator.remove();从迭代器中移除掉(因为下一步processKey就处理它) - processKey(sk, attachment); 具体处理通道的逻辑,交由SocketProcessor处理 sk为SelectionKey - NioChannel channel = attachment.getChannel(); 从keyAttachment中获取nioChannel - if (sk.isReadable() || sk.isWritable() ) { SelectionKey处理通道(channel)发生的 读写事件 } - unreg(sk, attachment, sk.readyOps()); 在通道上 注销 对已经发生事件的关注, 防止通道对同一个事件不断select的问题 - 若sk.isReadable() 通道读事件的处理 - processSocket(channel, SocketStatus.OPEN_READ, true) 把SocketProcessor交给Excutor去执行. - 创建SocketProcessor实例 SocketProcessor sc = processorCache.poll(); 从SocketProcessor缓存队列中取出一个来处理socket, 没有则新建. 有则重置属性 new SocketProcessor(socket,status) socketProcessor内的socket属性为NioChannel类型 - getExecutor().execute(sc); 将socketProccessor任务交由org.apache.tomcat.util.threads.ThreadPoolExecutor线程池处理, 则该线程池会调用socketProcessor.run() . 即SocketProcessor实例是对NioChannel的封装.由Executor执行socketProcessor.run()方法
|
3. SocketProcessor线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| - 3.SocketProcessor线程操作 socketProcessor.run() - socketProcessor.run()总体逻辑 1.使用nio方式 读取套接字 并进行处理, 输出响应报文;2.连接计数器减1,腾出通道;3.关闭套接字 - SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); 从socket(SocketProcessor的成员变量NioChannel类型)中获取SelectionKey - ka = (KeyAttachment)key.attachment(); 从SelectionKey中获取关联的keyAttachment - synchronized (socket) {// 对NioChannel加锁 doRun(selectionkey, keyAttachment); } socketProcessor的具体逻辑 - state = handler.process(ka, status); 此处handler为Http11NioProtocol$Http11ConnectionHandler, 这个静态类虽然在Http11NioProtocol内部,但是它持有Http11NioProtocol对象 - 调用父类AbstractProtocol.AbstractConnectionHandler.process() 该内部类的主要逻辑是获取HttpProcessor,缓存取 或新建 - state = processor.process(wrapper); 使用Http11NioProcessor处理socket请求. 此处processor为Http11NioProcessor对象 - 父类AbstractHttp11Processor.process() 其实后面操作全是Http11NioProcessor处理了, 但是是在Endpoint.SocketProcessor线程的run()方法里调用的 - getInputBuffer().init(socketWrapper, endpoint); 初始化 输入流.将socket的输入流 赋值给InterInputBuffer的inputStream属性 - InternalNioInputBuffer.init() 设置internalNioInputBuffer的成员变量.如socket,buf大小,SelectorPool - getOutputBuffer().init(socketWrapper, endpoint); 初始化 输出流 - 设置selectorPool和socket - getInputBuffer().parseRequestLine(keptAlive)) 解析请求行 - 即InternalNioInputBuffer.parseRequestLine() - InternalNioInputBuffer.fill(true,false) 读取socket数据到buf字节数组中. timeout:是否有超时; block:是否阻塞读 - InternalNioInputBuffer.readSocket() 从socket中读取数据到buf字节数组中. 从底层socket读取数据.有阻塞式 和 非阻塞式 从 socket中读. 若采用nio,在处理http请求的requestLine和header时非阻塞. 处理body时必须阻塞 (因为读取body部分的 代码一般是在servlet部分控制的即上层代码控制,已经超出tomcat控制范围). - socket.getBufHandler().getReadBuffer().clear(); 清空nio.ByteBuffer,方便下面将socket里读取的新数据写进去 - 由于block=false,此处非阻塞读 即nRead = socket.read(socket.getBufHandler().getReadBuffer()); - sc.read(dst); 其实就是socketChannel.read(buffer) // read into buffer - if (nRead > 0) {// 若有数据读取出来 socket.getBufHandler().getReadBuffer().flip();// 调用ByteBuffer.flip()将buffer切换到读模式 socket.getBufHandler().getReadBuffer().limit(nRead);// 设置buffer.limit就是本次读取的长度 expand(nRead + pos);// 检查当前对象的buf字节数组容量是否够大,若不够则扩容 socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);// 将socket读取的数据 转移到当前对象的buf数组中 lastValid = pos + nRead;// 更新最后一个有效数据的下标 return nRead; } - getInputBuffer().parseHeaders() 解析请求头 - adapter.service(request, response); 将socket封装成request,response,再交给adapter做后续处理 - coyteAdapter.postParseRequest() - 匹配请求对应的Host,Context,Wrapper connector.getMapper().map(serverName, decodedURI, version, request.getMappingData()); request.setContext((Context) request.getMappingData().context); request.setWrapper((Wrapper) request.getMappingData().wrapper); - connector.getService().getContainer().getPipeline().getFirst().invoke(request, response); 调用container.调用StandardEngine的pipeline 处理request和response - StandardEngineValve.invoke()中 host.getPipeline().getFirst().invoke(request, response); - AccessLogValve.invoke()中 getNext().invoke(request, response); 没做事,直接调下一个管道 - ErrorReportValve.invoke() tomcat错误输出页面 - StandardHostValve.invoke() context.getPipeline().getFirst().invoke(request, response); - AuthenticatorBase.invoke() getNext().invoke(request, response); - StandardContextValve.invoke() wrapper.getPipeline().getFirst().invoke(request, response); 调用wrapper的pipeline来处理请求,即StandardWrapperValve - StandardWrapperValve.invoke() filterChain.doFilter(request.getRequest(), response.getResponse()); 调用fiterChain来处理 request 和 response - ApplicationFilterChain.doFilter()中 internalDoFilter(request,response); - filter.doFilter(request, response, this); 调用具体filter.doFilter方法 - WsFilter.doFilter() - ApplicationFilterChain.internalDoFilter()中 servlet.service(request, response); 调用对应servlet.service()
|