前言
再通过对THsHaServer的源码学习后,我们知道THsHaServer缺点为主线程仍然需要完成所有socket的监听接收(accept)、数据读取和数据写入操作(read/write)。当并发请求数较大时,且发送数据量较多时,负责监听的主线程就只有一个。监听socket上新连接请求不能被及时接受。 ##TThreadedSelectorServer
-
TThreadedSelectorServer是对THsHaServer的一种扩充与完善,它将selector中的读写IO事件(read/write)从主线程中分离出来。交给了多个专门负责读写IO事件的SelectorThread,同时引入worker工作线程池,负责业务处理。它也是种Half-Sync/Half-Async的服务模型。
-
TThreadedSelectorServer模式是目前Thrift提供的最高级的线程服务模型,它内部有如果几个部分构成:
- 一个AcceptThread线程对象,专门用于处理监听socket上的新连接。
- 若干个SelectorThread对象专门用于处理业务socket的网络I/O读写操作,所有网络数据的读写均是有这些线程来完成。
- 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
- 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求数据读取之后,交给ExecutorService线程池中的线程完成此次调用的具体执行。主要用于处理每个rpc请求的handler回调处理(这部分是同步的)。
(一) 工作流程图
(二 ) TThreadedSelectorServer类继承图
(三 ) 核心代码解析
注意:TThreadedSelectorServer和THsHaServer与TNonblockingServer一样,要求底层的传输通道必须使用TFramedTransport。
1. Args 类源码
大家可以看出一些重要参数,这些参数用户也可以自定义
- 负责网络IO读写的selector默认线程数(selectorThreads):2
- 负责业务处理的线程池里的默认工作线程数(workerThreads):5
- 选择线程单个线程的缓存队列大小(acceptQueueSizePerThread):4
acceptThread线程接收到客户端请求后,是将客户端通道放到selectThreads线程的缓存队列里,selectThreads再从缓存队列里拿取通道后进行IO读写,缓存队列的大小默认为4
public static class Args extends AbstractNonblockingServerArgs<Args> {
// Args类里的这些参数都有默认值,而且也可以自己定义
/** The number of threads for selecting on already-accepted connections */
// 负责IO读取的选择线程默认设置为2个
public int selectorThreads = 2;
/**
* The size of the executor service (if none is specified) that will handle
* invocations. This may be set to 0, in which case invocations will be
* handled directly on the selector threads (as is in TNonblockingServer)
*/
// 工作线程池里的核心线程数默认为5个,用户也可以设置为0个,如果设置为0个的话
// 那么业务逻辑处理则会交由selectorThread去处理,这样就像TNonblockingServer了
private int workerThreads = 5;
/** Time to wait for server to stop gracefully */
private int stopTimeoutVal = 60;
private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
/** The ExecutorService for handling dispatched requests */
// 工作线程池
private ExecutorService executorService = null;
/**
* The size of the blocking queue per selector thread for passing accepted
* connections to the selector thread
*/
// 选择线程的缓存队列大小默认为4
private int acceptQueueSizePerThread = 4;
// 对于新的客户端连接的处理策略
public static enum AcceptPolicy {
/**
* Require accepted connection registration to be handled by the executor.
* If the worker pool is saturated, further accepts will be closed
* immediately. Slightly increases latency due to an extra scheduling.
*/
/**
* 工作线程池只能去处理已经注册了的客户端连接请求,如果工作线程池已经饱和了,新来的链接将会被关闭
*/
FAIR_ACCEPT,
/**
* Handle the accepts as fast as possible, disregarding the status of the
* executor service.
*/
// 立即接受请求,不管线程池的状态
FAST_ACCEPT
}
// 检查参数是否合法的
public void validate() {
if (selectorThreads <= 0) {
throw new IllegalArgumentException("selectorThreads must be positive.");
}
if (workerThreads < 0) {
throw new IllegalArgumentException("workerThreads must be non-negative.");
}
if (acceptQueueSizePerThread <= 0) {
throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
}
}
}
复制代码
2. TThreadedSelectorServer类
再看看TThreadedSelectorServer有哪些属性和构造函数
- AcceptThread线程对象,专门用于处理监听socket上的新连接。
- SelectorThread对象专门用于处理业务socket的网络I/O读写操作,存储在HashSet里面,
- 一个 Args 属性 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
- 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求数据读取之后,交给ExecutorService线程池中的线程完成此次调用的具体执行。
public class TThreadedSelectorServer extends AbstractNonblockingServer {
// The thread handling all accepts
private AcceptThread acceptThread;
// Threads handling events on client transports
private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the selector thread(s) to the workers
// (if any).
private final ExecutorService invoker;
private final Args args;
/**
* Create the server with the specified Args configuration
*/
public TThreadedSelectorServer(Args args) {
super(args);
// 参数校验
args.validate();
// 创建工作线程
invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
this.args = args;
}
复制代码
创建默认工作线程池是固定线程池,核心线程和最大线程数量固定的,等待队列为链表队列。
protected static ExecutorService createDefaultExecutor(Args options) {
return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null;
}
复制代码
server()方法
server()方法在TThreadedSelectorServer的父类AbstractNonblockingServer类里面我们看下server()方法
public void serve() {
// start any IO threads
if (!startThreads()) {
return;
}
// start listening, or exit
if (!startListening()) {
return;
}
setServing(true);
// this will block while we serve
waitForShutdown();
setServing(false);
// do a little cleanup
stopListening();
}
复制代码
这个方法里面startThreads()方法, waitForShutdown()方法分别在AbstractNonblockingServer类的子类TThreadedSelectorServer类里实现的
startThreads()方法
startThreads()方法里面先开起了SelectorThread线程,然后开启AcceptThread线程等待接受客户端连接请求
@Override
protected boolean startThreads () {
try {
// 实例化SelectorThread 放进HashSet里面
for (int i = 0; i < args.selectorThreads; ++i) {
selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
}
// 实例化AcceptThread线程
acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
createSelectorThreadLoadBalancer(selectorThreads));
// 逐一启动实例化SelectorThread线程
for (SelectorThread thread : selectorThreads) {
thread.start();
}
// 启动SelectorThread线程
acceptThread.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start threads!", e);
return false;
}
}
复制代码
waitForShutdown()方法
waitForShutdown()方法是阻塞主线程,如果selectorThread或者acceptThread挂掉了则调用gracefullyShutdownInvokerPool()方法关闭工作线程池
/**
* Joins the accept and selector threads and shuts down the executor service.
*/
// 阻塞调用这个方法的线程,selectorThread或者acceptThread挂掉了则关闭工作线程池
@Override
protected void waitForShutdown() {
try {
joinThreads();
} catch (InterruptedException e) {
// Non-graceful shutdown occurred
LOGGER.error("Interrupted while joining threads!", e);
}
// 关闭工作线程池
gracefullyShutdownInvokerPool();
}
protected void joinThreads() throws InterruptedException {
// wait until the io threads exit
acceptThread.join();
for (SelectorThread thread : selectorThreads) {
thread.join();
}
}
复制代码
stop()方法
stop()方法里面将stopped_设置为true以后,分别调用acceptThread和selectorThreads的wakeupSelector()方法,wakeupSelector()方法里面 调用了selector.wakeup(),唤醒被Selector选择器执行select()时所阻塞的线程,进行新的一次循环,大家可以点过去看循环执行Selector.select()的循环条件是while (!stopped_), stopped_设置为true 就会跳出while 循环,从而达到停止acceptThread和selectorThreads的作用
/**
* Stop serving and shut everything down.
*/
@Override
public void stop() {
stopped_ = true;
// Stop queuing connect attempts asap
stopListening();
// stop()方法里面将stopped_设置为true以后,分别调用acceptThread
// 和selectorThreads的wakeupSelector()方法,wakeupSelector()方法里面
// 调用了selector.wakeup(),唤醒被Selector选择器执行select()时所阻塞的线程,
// 进行新的一次循环,大家可以点过去看循环执行Selector.select()的循环条件是while (!stopped_)
// stopped_设置为true 就会跳出while 循环,从而达到停止acceptThread和selectorThreads的作用
if (acceptThread != null) {
acceptThread.wakeupSelector();
}
if (selectorThreads != null) {
for (SelectorThread thread : selectorThreads) {
if (thread != null)
thread.wakeupSelector();
}
}
}
复制代码
3. AcceptThread源码
AcceptThread继承于Thread,可以看出包含三个重要的属性:
- 非阻塞式传输通道(TNonblockingServerTransport)
- NIO选择器(acceptSelector)
- 选择器线程负载均衡器(threadChooser)
protected class AcceptThread extends Thread {
// The listen socket to accept on
// 非阻塞式传输通道(TNonblockingServerTransport)
private final TNonblockingServerTransport serverTransport;
// NIO选择器(acceptSelector)
private final Selector acceptSelector;
// 选择器线程负载均衡器(threadChooser)
private final SelectorThreadLoadBalancer threadChooser;
/**
* Set up the AcceptThead
*
* @throws IOException
*/
public AcceptThread(TNonblockingServerTransport serverTransport,
SelectorThreadLoadBalancer threadChooser) throws IOException {
this.serverTransport = serverTransport;
this.threadChooser = threadChooser;
// 实例化acceptSelector
this.acceptSelector = SelectorProvider.provider().openSelector();
// 将客户端注册到Selector上
this.serverTransport.registerSelector(acceptSelector);
}
}
复制代码
我们看看AcceptThread的run()方法,可以看出accept线程一旦启动,就会不停地调用select()方法:
public void run() {
try {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
while (!stopped_) {
select();
}
} catch (Throwable t) {
LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
} finally {
try {
acceptSelector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing accept selector!", e);
}
// This will wake up the selector threads
TThreadedSelectorServer.this.stop();
}
}
复制代码
select()方法
我们看看select()方法,acceptSelector选择器等待IO事件的到来,拿到SelectionKey即检查是不是accept事件。如果是,通过handleAccept()方法接收一个新来的连接;否则,如果是IO读写事件,AcceptThread不作任何处理,交由SelectorThread完成。
private void select() {
try {
// wait for connect events.
acceptSelector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
// 处理新到来的请求
handleAccept();
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
复制代码
handleAccept()方法
我们看看handleAccept()方法,
- 先通过doAccept()去拿连接通道
- 然后Selector线程负载均衡器选择一个Selector线程,
- 调用doAddAccept将客户端通道交给选中的Selector线程,然后由Selector线程完成接下来的IO读写事件。
/**
* Accept a new connection.
*/
private void handleAccept() {
// 获取客户端的非阻塞传输通道
final TNonblockingTransport client = doAccept();
if (client != null) {
// Pass this connection to a selector thread
// 获取将要加入的SelectorThread线程
final SelectorThread targetThread = threadChooser.nextThread();
// 如果是快速加入,调用doAddAccept将客户端通道加入到SelectorThread线程
if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
doAddAccept(targetThread, client);
} else {
// FAIR_ACCEPT
try {
invoker.submit(new Runnable() {
public void run() {
doAddAccept(targetThread, client);
}
});
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected accept registration!", rx);
// close immediately
client.close();
}
}
}
}
// 获取客户端通道
private TNonblockingTransport doAccept() {
try {
return (TNonblockingTransport) serverTransport.accept();
} catch (TTransportException tte) {
// something went wrong accepting.
LOGGER.warn("Exception trying to accept!", tte);
return null;
}
}
//将客户端通道加入到SelectorThread线程的缓存队列里
private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
if (!thread.addAcceptedConnection(client)) {
client.close();
}
}
复制代码
4. SelectorThread源码
SelectorThread继承于AbstractSelectThread,主要新增功能在复写run()方法里面,我们先来看看属性,
- BlockingQueue : 用来缓存 AcceptThread 线程给与的的客户端通道的缓存队列,其可以设置为固定长度的阻塞队列,也可以设置为不限制长度的阻塞队列
protected class SelectorThread extends AbstractSelectThread {
// Accepted connections added by the accept thread.
// 用来缓存 AcceptThread 线程给与的的客户端通道的缓存队列
private final BlockingQueue<TNonblockingTransport> acceptedQueue;
// 以下这三个参数貌似是为了处理Selector.select()方法时候的bug用,暂时还没弄懂为啥
private int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
private long MONITOR_PERIOD = 1000L;
private int jvmBug = 0;
/**
* Set up the SelectorThread with an unbounded queue for incoming accepts.
*
* @throws IOException if a selector cannot be created
*/
// 设置为无长度限制的缓存队列
public SelectorThread() throws IOException {
this(new LinkedBlockingQueue<TNonblockingTransport>());
}
/**
* Set up the SelectorThread with an bounded queue for incoming accepts.
*
* @throws IOException if a selector cannot be created
*/
// 通过传入的参数设置固定长度的缓存队列
public SelectorThread(int maxPendingAccepts) throws IOException {
this(createDefaultAcceptQueue(maxPendingAccepts));
}
}
复制代码
run()方法
我们先来看下的SelectorThread线程的run()方法,可以看到run()方法里面有一个while循环,循环分别是
- select()方法:执行一次Selector.select() 处理就绪的I/O事件
- processAcceptedConnections():处理新来的客户端连接,主要是将客户端连接里的通道取出来注册到Selector上
- processInterestChanges(): 检查是否有通道改变了感兴趣的类型 ,比如从读转换为写,或者写换成读
/**
* The work loop. Handles selecting (read/write IO), dispatching, and
* managing the selection preferences of all existing connections.
*/
public void run() {
try {
while (!stopped_) {
// Selector进行一次轮询,获取就绪的事件,将事件交给工作线程处理
select();
// 处理新到来的客户端请求
processAcceptedConnections();
// 检查是否有任何framebuffer将其兴趣类型从读转换为写,或者写换成读。
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
// 关闭TThreadedSelectorServer服务, AcceptThread 和 SelectThread线程也会被关闭
TThreadedSelectorServer.this.stop();
}
}
复制代码
addAcceptedConnection()方法里select()函数一般是大同小异,这里只处理了读事件和写事件,select()多了个doSelect()用来处理epoll自身存在的一个bug,这个有详细文档,但是我还没看懂,以后在慢慢看懂。
AcceptThread线程与新的客户端建立连接请求以后,会调用doAddAccept()方法将客户端的非阻塞式传输通道放到SelectorThread线程的缓存队列里面,doAddAccept()方法里实际上又调用了SelectThread线程类的addAcceptedConnection()将客户端的非阻塞式传输通道放到SelectorThread线程的缓存队列里面,SelectorThread在从缓存队列里面拿出来后在进行IO读写。
// 将新建立的客户端的非阻塞式传输通道放入到这个线程的缓存队列里面,如果队列满了的话,线程将会阻塞在这里
public boolean addAcceptedConnection(TNonblockingTransport accepted) {
try {
acceptedQueue.put(accepted);
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while adding accepted connection!", e);
return false;
}
// 唤醒Selector重新开始循环 这个方法大家可以学NIO的时候了解
selector.wakeup();
return true;
}
复制代码
addAcceptedConnection()方法将新建立的客户端的非阻塞式传输通道放入到这个线程的缓存队列里面后,最后会调用selector.wakeup(),这个方法是唤醒在执行run()方法时,阻塞在select()方法里的selector.select()的线程, 好继续执行 select()方法下面的processAcceptedConnections()方法,从阻塞队列里拿取数据。
processAcceptedConnections()方法
我们再来看看processAcceptedConnections()方法,processAcceptedConnections()方法循环的将缓存队列里面的全部客户端的非阻塞式传输通道一一拿出来,然后调用registerAccepted()方法一一注册到Selector上
private void processAcceptedConnections() {
// Register accepted connections
// 循环的将缓存队列里面的全部客户端的非阻塞式传输通道一一拿出来
// 然后调用registerAccepted()方法一一注册到Selector上
while (!stopped_) {
TNonblockingTransport accepted = acceptedQueue.poll();
if (accepted == null) {
break;
}
// 将客户端通道注册到Selector
registerAccepted(accepted);
}
}
// 将客户端的通道注册到Selector
private void registerAccepted(TNonblockingTransport accepted) {
SelectionKey clientKey = null;
try {
clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
// 创建一个FrameBuffer,然后将FrameBuffer附加到clientKey,用来读取或者写数据
FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);
clientKey.attach(frameBuffer);
} catch (IOException e) {
LOGGER.warn("Failed to register accepted connection to selector!", e);
// 抛异常则销毁创建的通道
if (clientKey != null) {
cleanupSelectionKey(clientKey);
}
accepted.close();
}
}
复制代码
run()方法的循环里最后一步是执行processInterestChanges()方法,processInterestChanges()方法的函数体在AbstractSelectThread类里,就是改变事件所感谢的类型,比如从读换成写(读取了客户端的请求,执行完相应的业务处理后,要将数据返回给客户端),从写换成读(要将数据返回给客户端后,重新开始接受客户端新的的请求)。
createSelectorThreadLoadBalancer()方法
/**
* Creates a SelectorThreadLoadBalancer to be used by the accept thread for
* assigning newly accepted connections across the threads.
*/
// 创建一个SelectorThreadLoadBalancer对象后返回,SelectorThreadLoadBalancer对象用来选择SelectorThread线程
protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
return new SelectorThreadLoadBalancer(threads);
}
复制代码
5. SelectorThreadLoadBalancer类源码
SelectorThreadLoadBalancer类是通过轮询算法来选择SelectorThread线程,通过一个迭代器为新进来的连接顺序分配SelectorThread。
protected static class SelectorThreadLoadBalancer {
private final Collection<? extends SelectorThread> threads;
private Iterator<? extends SelectorThread> nextThreadIterator;
// 实例化的时候将存储SelectThread的线程的Set传入进来
public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
if (threads.isEmpty()) {
throw new IllegalArgumentException("At least one selector thread is required");
}
this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
nextThreadIterator = this.threads.iterator();
}
// 通过迭代器来获取下一个SelectThread
public SelectorThread nextThread() {
// Choose a selector thread (round robin)
// 如果一次迭代结束了,从头开始从头迭代
if (!nextThreadIterator.hasNext()) {
nextThreadIterator = threads.iterator();
}
return nextThreadIterator.next();
}
}
复制代码