tomcat处理请求

组件

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Server
|-- Service
|--|-- Connector
|--|--|-- Prototal
|--|--|--|-- Endpoint
|--|--|--|--|-- Acceptor
|--|--|--|--|-- Executor
|--|--|--|-- Processor
|--|--|-- Mapper
|--|--|-- CoyteAdapter
|--|-- Container
|--|--|--|-- Host
|--|--|--|-- Context
|--|--|--|-- Wrapper
|--|-- Executor

Http11NioProtocol

1
2
3
4
5
6
7
8
9
10
11
12
13
Http11NioProtocol
|-- NioEndpoint
|--|-- LimitLatch
|--|-- Acceptor
|--|-- Poller
|--|-- Poller池
|--|-- SocketProcessor
|--|-- Executor
|--|-- PollerEvent
|--|-- KeyAttachment
|--|-- NioBufferHandler
|-- Http11ConnectionHandler
|-- Http11NioProcessor

Connector

接收客户端连接并响应

Protocol协议

协议的抽象.
Protocol分类: Http11Protocol,Http11NioProtocol 等.

Http11NioProtocol

包含 NioEndpoint 和 Http11NioProcessor

NioEndpoint

LimitLatch

连接数控制,nio模式下默认是10000,达到这个阈值后,就会拒绝连接请求.使用AQS实现的一个共享锁.

LimitLatch相关流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 初始化最大连接数量
- endpoint.startInternal()
- initializeConnectionLatch(); 初始化LimitLatch,用于控制连接的数量
- connectionLimitLatch = new LimitLatch(getMaxConnections()); 根据最大的连接数量来创建

- acceptor接收请求前,连接计数+1.若到达最大值则等待其他连接释放
- acceptor.run()
- countUpOrAwaitConnection();// 增加连接计数,若connection数量已经达到了最大等待 socket = serverSocket.accept(); 然后再等待连接,若上一步阻塞则不会到此处

- 连接计数-1
- (!setSocketOptions(socket)) { //这里主要是将socket加入到poller对象上面去,而且还要设置参数
countDownConnection(); // 加入poller对象失败了的话,那么将LimitLatch的计数减1
}
- endpoint.cancelKey()
- countDownConnection();

Acceptor

接收socket连接
通过 serverSocket.accept() 获得 SocketChannel 对象,封装成 org.apache.tomcat.util.net.NioChannel.
再将 NioChannel 封装成 PollerEvent, 并将 PollerEvent 压入 events queue里,即ConcurrentLinkedQueue<PollerEvent> events里.

Poller 轮询器

轮询 events.由 Poller 将就绪的事件 生成 SocketProcessor,交由 Executor 处理.

Poller池

包含若干 Poller 组件

SocketProcessor

具体的 http 请求头等解析操作全是 Http11NioProcessor 处理的, 但是是在 NioEndpoint.SocketProcessor 线程的run()方法里调用的.

  • 结构(属性)
    • NioChannel socket
    • SocketStatus status

Executor

Executor 拿到 Poller 传过来的 socket 后,将 socket 封装在 SocketProcessor 对象中.
然后从 Http11ConnectionHandler 中取出 Http11NioProcessor 对象.

PollerEvent

对 NioChannel 的包装,用于注册到 Poller.event 队列中.

  • 属性
    • NioChannel socket
      • NioChannel是 非阻塞通道.负责将数据读到缓冲区中,或将数据从缓冲区中写入.
    • int interestOps; // 感兴趣的事件,第一次是OP_REGISTER
    • KeyAttachment key;// KeyAttachment包装nioChannel

KeyAttachment

  • 注册到SelectionKey上的附件,对 NioChannel 的包装类.

NioBufferHandler

  • 分配nio的buffer的读写空间,内部包含ByteBuffer类型的readbuf和writebuf属性.在构建NioChannel时使用.

Http11ConnectionHandler

  • 结构(属性)
    • protected final Map<S,Processor<S>> connections = new ConcurrentHashMap<S,Processor<S>>();
      connections 这个 Map维护NioChannel(内部包含socket通道)与processor的关系(如Http11NioProcessor).
      所以若processor一次不能读取到 所需的所有数据,就等下一次poller轮询时根据socket找到这个processor继续读取.

Http11NioProcessor

nio方式解析请求头等.

  • 结构
    • InternalNioInputBuffer
    • InternalNioOutputBuffer

Nio涉及的其他类

  • NioChannel
    • 作用
      • NioChannel是 非阻塞通道.负责将数据读到缓冲区中,或将数据从缓冲区中写入.
    • 结构
      • Nio.SocketChannel
      • NioEndpoint.NioBufferHandler
  • SocketProperties
    • 从server.xml的Connector节点上获取参数值,设置nio的socket属性

Mapper

  • 路由

CoyoteAdapter

  • 将Connector和Engine适配起来.

处理请求

1. Acceptor线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
- 1.Acceptor线程操作 NioEndpoint中 acceptor.run()
- 该acceptor线程一直循环,直到接收到关闭命令
- countUpOrAwaitConnection(); 通过 LimitLatch 增加连接计数,若已经到达 最大连接数,则让该acceptor线程等待(直到其他线程释放连接),相当于拒绝掉客户端请求
- socket = serverSock.accept(); 接收新连接,返回SocketChannel客户端连接对象,这里用的是阻塞模式. 整个循环靠此处阻塞,直到接收到客户端请求才继续往下走
- setSocketOptions(socket) 设置socket属性,这里面主要 将socketChannel包装到nioChannel里. 再将nioChannel包装到PollerEvent里,并放入轮询的事件队列events中
- socket.configureBlocking(false); 将socket连接通道设置为非阻塞模式
- Socket sock = socket.socket(); 从nio.SocketChannel中获取java.net.Socket
- socketProperties.setProperties(sock); 设置socket的参数值(从server.xml的Connector节点上获取参数值), 如 socket发送,接收的大小.心跳检测,超时时长等
- NioChannel channel = nioChannels.poll(); 构造NioChannel对象,从ConcurrentLinkedQueue (即NioChannel的缓存队列) 中取
- 附:ConcurrentLinkedQueue<NioChannel>类型的nioChannels对象的作用 将关闭通道的nioChannel放入Queue缓存起来,方便复用.复用时替换掉其内部的SocketChannel对象即可, NioChannel包含的其他属性只需做重置操作即可.当获取不到时再重新创建
- 取不到则新创建 channel = new NioChannel(socket, bufhandler); 将socketChannel对象 封装到NioChannel对象中.非SSL的.
1.socket参数: 进行读写操作的nio.SocketChannel对象.
2.bufhandler参数: 为NioChannel.NioBufferHandler.这个类用于 根据xml配置 分配nio的buffer的读写空间. 是对nio.Buffer的封装. 提供用于操作 待写入SocketChannel缓存区 和 读取SocketChannel的缓冲区的方法.
- getPoller0().register(channel); 将新接收到的SocketChannel注册到event队列上 (将nioChannel包装成PollerEvent 注册到轮询线程events上)
- getPoller0() 轮询当前的Poller线程数组,从中取出一个Poller并返回
- Poller.register() 将新接收到的SocketChannel注册到event队列上(nioChannel包装成PollerEvent 注册到轮询线程)
1.创建KeyAttachment并与poller关联; 2.将channelSocket(nioChannel)包装成PollerEvent 注册到事件队列(events)中; 3.是否需要对selector wakeup的处理
- new KeyAttachment(socket) 创建或从缓存中取出KeyAttachment并重置. KeyAttachment是对NioChannel的一层包装
- ka.reset(this,socket,getSocketProperties().getSoTimeout()); 重置KeyAttachment对象中Poller,NioChannel等成员变量的引用
- ka.interestOps(SelectionKey.OP_READ); 设置keyAttachement 读操作(SelectionKey.OP_READ) 为感兴趣的操作
- PollerEvent r = eventCache.poll(); 从通道缓存队列中取出一个PollerEvent. 取到则重置属性,取不到则新建
- 取不到则新建 r = new PollerEvent(socket,ka,OP_REGISTER); 若取出为空,则新建.将nioChannel包装到PollerEvent对象中. 否则,重置PollerEvent中的属性. 并注册一个OP_REGISTER(对应)928行pollerEvent.run()里
- poller.addEvent(r); 将PollerEvent添加到轮询队列(poller.events)中. 相当于每个poller都维护了一个event队列,代表这个poller需要处理的事件列表
- events.offer(event); 将event添加到events队列中
- if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
使一个还未返回的 select() 方法立即返回.即若当前事件队列中 没有事件,则唤醒处于阻塞在selector.select()状态上的线程(会唤醒selector).与1208行对应
- 附:Poller类中protected AtomicLong wakeupCounter = new AtomicLong(0l);的作用 唤醒 多路复用器(即nio中的selector)的条件阈值.
作用: 1.告诉Poller当前有多少个新连接,这样当Poller进行selector的操作时,可以选择是否需要阻塞来等待读写请求到达;
2.标识Poller在进行select选择时,是否有连接到达.如果有,就让当前的阻塞调用立即返回

2. Poller线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- 2.Poller线程操作 poller.run()
- poller.events() 从events队列中取出pollerEvent对象并run(); 即处理轮询器的事件队列中的事件,若事件队列是空的则返回false,否则返回true. 每个poller都有一个events队列,poller会一直循环取出里面的event进行处理
- while ( (r = events.poll()) != null ) 循环从events队列 中 逐个取出PollerEvent事件,运行每一个PollerEvent的处理逻辑
- r.run(); 运行每一个PollerEvent的处理逻辑.即PollerEvent.run(). 注意,此处还是在当前线程中完成.因为调用的是r.run()不是r.start(), 可以理解成只是一个普通方法,在当前而不是新起一个线程
- PollerEvent.run() 若这个pollerEvent中的interestOps是OP_REGISTER, 则将pollerEvent.nioChannel.socketChannel注册到poller.selector上
- if ( interestOps == OP_REGISTER )
// PollerEvent最初的interestOps就是OP_REGISTER socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
从nioChannel中获取socketChannel,从poller中获取selector. 则此时将socketChannel.register(selector, OP_READ, keyAttachment),即将socketChannel向selector注册读感兴趣事件.
- 上一步r.run()执行完后 ((PollerEvent)r).reset(); eventCache.offer((PollerEvent)r); 只要从events中取出后,便重置PollerEvent并缓存到eventCache中, 方便复用PollerEvent对象
- Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; 根据向selector中注册的key遍历channel中已经就绪的SelectionKey,并处理这些key
- iterator遍历 SelectionKey sk = iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); 这里的attachment()返回的就是在register()中注册的KeyAttachement,该对象是对socket的包装类 attachment.access();更新 通道最近一次发生事件的事件,防止因超时没有事件发生而被剔除出selector iterator.remove();从迭代器中移除掉(因为下一步processKey就处理它)
- processKey(sk, attachment); 具体处理通道的逻辑,交由SocketProcessor处理 sk为SelectionKey
- NioChannel channel = attachment.getChannel(); 从keyAttachment中获取nioChannel
- if (sk.isReadable() || sk.isWritable() ) { SelectionKey处理通道(channel)发生的 读写事件 }
- unreg(sk, attachment, sk.readyOps()); 在通道上 注销 对已经发生事件的关注, 防止通道对同一个事件不断select的问题
- 若sk.isReadable() 通道读事件的处理
- processSocket(channel, SocketStatus.OPEN_READ, true) 把SocketProcessor交给Excutor去执行.
- 创建SocketProcessor实例 SocketProcessor sc = processorCache.poll(); 从SocketProcessor缓存队列中取出一个来处理socket, 没有则新建.
有则重置属性 new SocketProcessor(socket,status) socketProcessor内的socket属性为NioChannel类型
- getExecutor().execute(sc); 将socketProccessor任务交由org.apache.tomcat.util.threads.ThreadPoolExecutor线程池处理, 则该线程池会调用socketProcessor.run() .
即SocketProcessor实例是对NioChannel的封装.由Executor执行socketProcessor.run()方法

3. SocketProcessor线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
- 3.SocketProcessor线程操作 socketProcessor.run()
- socketProcessor.run()总体逻辑 1.使用nio方式 读取套接字 并进行处理, 输出响应报文;2.连接计数器减1,腾出通道;3.关闭套接字
- SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); 从socket(SocketProcessor的成员变量NioChannel类型)中获取SelectionKey
- ka = (KeyAttachment)key.attachment(); 从SelectionKey中获取关联的keyAttachment
- synchronized (socket) {// 对NioChannel加锁 doRun(selectionkey, keyAttachment); } socketProcessor的具体逻辑
- state = handler.process(ka, status); 此处handler为Http11NioProtocol$Http11ConnectionHandler, 这个静态类虽然在Http11NioProtocol内部,但是它持有Http11NioProtocol对象
- 调用父类AbstractProtocol.AbstractConnectionHandler.process() 该内部类的主要逻辑是获取HttpProcessor,缓存取 或新建
- state = processor.process(wrapper); 使用Http11NioProcessor处理socket请求. 此处processor为Http11NioProcessor对象
- 父类AbstractHttp11Processor.process() 其实后面操作全是Http11NioProcessor处理了, 但是是在Endpoint.SocketProcessor线程的run()方法里调用的
- getInputBuffer().init(socketWrapper, endpoint); 初始化 输入流.将socket的输入流 赋值给InterInputBuffer的inputStream属性
- InternalNioInputBuffer.init() 设置internalNioInputBuffer的成员变量.如socket,buf大小,SelectorPool
- getOutputBuffer().init(socketWrapper, endpoint); 初始化 输出流
- 设置selectorPool和socket
- getInputBuffer().parseRequestLine(keptAlive)) 解析请求行
- 即InternalNioInputBuffer.parseRequestLine()
- InternalNioInputBuffer.fill(true,false) 读取socket数据到buf字节数组中. timeout:是否有超时; block:是否阻塞读
- InternalNioInputBuffer.readSocket()
从socket中读取数据到buf字节数组中. 从底层socket读取数据.有阻塞式 和 非阻塞式 从 socket中读.
若采用nio,在处理http请求的requestLine和header时非阻塞.
处理body时必须阻塞 (因为读取body部分的 代码一般是在servlet部分控制的即上层代码控制,已经超出tomcat控制范围).
- socket.getBufHandler().getReadBuffer().clear(); 清空nio.ByteBuffer,方便下面将socket里读取的新数据写进去
- 由于block=false,此处非阻塞读 即nRead = socket.read(socket.getBufHandler().getReadBuffer());
- sc.read(dst); 其实就是socketChannel.read(buffer) // read into buffer
- if (nRead > 0) {// 若有数据读取出来
socket.getBufHandler().getReadBuffer().flip();// 调用ByteBuffer.flip()将buffer切换到读模式
socket.getBufHandler().getReadBuffer().limit(nRead);// 设置buffer.limit就是本次读取的长度
expand(nRead + pos);// 检查当前对象的buf字节数组容量是否够大,若不够则扩容
socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);// 将socket读取的数据 转移到当前对象的buf数组中
lastValid = pos + nRead;// 更新最后一个有效数据的下标
return nRead; }
- getInputBuffer().parseHeaders() 解析请求头
- adapter.service(request, response); 将socket封装成request,response,再交给adapter做后续处理
- coyteAdapter.postParseRequest()
- 匹配请求对应的Host,Context,Wrapper
connector.getMapper().map(serverName, decodedURI, version, request.getMappingData());
request.setContext((Context) request.getMappingData().context);
request.setWrapper((Wrapper) request.getMappingData().wrapper);
- connector.getService().getContainer().getPipeline().getFirst().invoke(request, response); 调用container.调用StandardEngine的pipeline 处理request和response
- StandardEngineValve.invoke()中 host.getPipeline().getFirst().invoke(request, response);
- AccessLogValve.invoke()中 getNext().invoke(request, response); 没做事,直接调下一个管道
- ErrorReportValve.invoke() tomcat错误输出页面
- StandardHostValve.invoke() context.getPipeline().getFirst().invoke(request, response);
- AuthenticatorBase.invoke() getNext().invoke(request, response);
- StandardContextValve.invoke() wrapper.getPipeline().getFirst().invoke(request, response); 调用wrapper的pipeline来处理请求,即StandardWrapperValve
- StandardWrapperValve.invoke() filterChain.doFilter(request.getRequest(), response.getResponse()); 调用fiterChain来处理 request 和 response
- ApplicationFilterChain.doFilter()中 internalDoFilter(request,response);
- filter.doFilter(request, response, this); 调用具体filter.doFilter方法
- WsFilter.doFilter()
- ApplicationFilterChain.internalDoFilter()中 servlet.service(request, response); 调用对应servlet.service()