1. 前言
Thrift是Facebook贡献给apache的rpc框架,但是这款框架的java版本在公司内部并不是那么受待见,因为其性能相比C++版本差了很多,但是后续基于netty重写了以后性能得到了极大的提升,相比于C++版本已经差距不大了。为此取了个新的名字Nifty = Netty + Thrift。
如果你使用过thrift的话,基本都会使用自动生成的代码,那真的是没法看,即使定义一个简单的类都会生成巨多的代码,把read,write方法全部写到里面去了。总之早期的thrfit各方面都似乎不那么友好。后面架构进行了升级,提供了新的swift库,注意这个不是ios的swift,从而生成的java类和普通的java类基本一致,无非多了点注解,而序列化反序列化也都移到了相应的包中,从而使得我们的代码非常简洁易懂。
其实这款rpc框架的性能是非常不错的,早几年性能是好过grpc的,目前小米还是在用的。这款框架很轻量,即不提供服务治理的功能。如果公司规模不大急需做功能,暂时没精力去做服务治理的话可能还是会选择dubbo等带服务治理功能的rpc框架。但是恰恰是thrift不提供服务治理,这样公司可以自己去定义服务治理的功能。
目前不管是书籍还是博客等关于thrift的都是少之又少,最近突然爱学习了,所以打算写一系列thrift相关的博客,关于使用基本不会介绍的过多,因为基本使用也就十几行代码,主要是介绍内部处理逻辑。具体的包括Thrift框架分析,netty框架分析,分布式服务治理等三个方面。
为了方便后续统称为Thrift而不是Nifty,因为很多代码还是沿用的Thrift。
2. Thrfit服务端创建与核心组件介绍
EchoServiceImpl service = new EchoServiceImpl();
ThriftCodecManager manager = new ThriftCodecManager();
ThriftServiceProcessor processor = new ThriftServiceProcessor(manager, ImmutableList.of(), service);
ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081));
server.start();
复制代码
其中EchoServiceImpl是使用swift代码生成器生成的接口的实现类,有个echo方法,简单说下这几个组件。
- ThriftCodecManager:编解码器的管理类,会将各类编解码器,比如StringCodec添加到缓存中。
- ThriftServiceProcessor:thrift服务处理器,服务端收到数据后,最终将由这个类来进行处理,可以理解为最核心的类了。
- ThriftServer: thrift服务器,主要用来设置参。启动服务,说具体点就是设置好netty的一些处理器等参数,然后启动netty服务。
- ThriftServerConfig: Thrfit服务启动的一些配置类,包括了端口号,线程池,线程数量等。
3. 服务端启动流程极简介绍
从上面的组件分析是不是能猜到一点thrift的内部处理流程。简单两句话就是,创建各类自定义处理器handler,添加到netty的处理器集合中,然后启动netty服务。当收到客户端发来的数据后,交由特定处理器进行数据的处理,根据协议和编解码器从buffer中进行数据的解析和转换,最终得到类名,方法的参数和方法名等(各类信息都能解析到);从ThriftServicProcessor查到Method,传入方法参数,反射执行得到结果,然后将结果通过netty响应给客户端。
4. 服务端启动全解析
先从ThriftServer
创建和启动来看,了解总体的流程,后续再回过头来看各个组件的处理流程。
ThriftServer server = new ThriftServer(processor, new ThriftServerConfig().setPort(8081));
public ThriftServer(NiftyProcessor processor, ThriftServerConfig config){
this(processor, config, new NiftyTimer("thrift"));
}
复制代码
继续往下,追到最终的构造方法
public ThriftServer(
final NiftyProcessor processor,
ThriftServerConfig config,
@ThriftServerTimer Timer timer,
Map<String, ThriftFrameCodecFactory> availableFrameCodecFactories,
Map<String, TDuplexProtocolFactory> availableProtocolFactories,
@ThriftServerWorkerExecutor Map<String, ExecutorService> availableWorkerExecutors,
NiftySecurityFactoryHolder securityFactoryHolder){
NiftyProcessorFactory processorFactory = new NiftyProcessorFactory(){
@Override
public NiftyProcessor getProcessor(TTransport transport)
{
return processor;
}
};
String transportName = config.getTransportName();
String protocolName = config.getProtocolName();
checkState(availableFrameCodecFactories.containsKey(transportName), "No available server transport named " + transportName);
checkState(availableProtocolFactories.containsKey(protocolName), "No available server protocol named " + protocolName);
configuredPort = config.getPort();
workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors);
acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build());
acceptorThreads = config.getAcceptorThreadCount();
ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build());
ioThreads = config.getIoThreadCount();
serverChannelFactory = new NioServerSocketChannelFactory(new NioServerBossPool(acceptorExecutor, acceptorThreads, ThreadNameDeterminer.CURRENT),
new NioWorkerPool(ioExecutor, ioThreads, ThreadNameDeterminer.CURRENT));
ThriftServerDef thriftServerDef = ThriftServerDef.newBuilder()
.name("thrift")
.listen(configuredPort)
.limitFrameSizeTo((int) config.getMaxFrameSize().toBytes())
.clientIdleTimeout(config.getIdleConnectionTimeout())
.withProcessorFactory(processorFactory)
.limitConnectionsTo(config.getConnectionLimit())
.limitQueuedResponsesPerConnection(config.getMaxQueuedResponsesPerConnection())
.thriftFrameCodecFactory(availableFrameCodecFactories.get(transportName))
.protocol(availableProtocolFactories.get(protocolName))
.withSecurityFactory(securityFactoryHolder.niftySecurityFactory)
.using(workerExecutor)
.taskTimeout(config.getTaskExpirationTimeout())
.withSecurityFactory(config.getSecurityFactory())
.withHeader(config.getHeader())
.withServerHandler(config.getNiftyServerHandler())
.build();
NettyServerConfigBuilder nettyServerConfigBuilder = NettyServerConfig.newBuilder();
nettyServerConfigBuilder.getServerSocketChannelConfig().setBacklog(config.getAcceptBacklog());
nettyServerConfigBuilder.setBossThreadCount(config.getAcceptorThreadCount());
nettyServerConfigBuilder.setWorkerThreadCount(config.getIoThreadCount());
nettyServerConfigBuilder.setTimer(timer);
NettyServerConfig nettyServerConfig = nettyServerConfigBuilder.build();
transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels);
}
复制代码
构造方法简单来说主要做了两件事:
- 创建和获取netty需要的连接线程池和io处理线程池以及数量,从而构建netty服务组件
NioServerSocketChannelFactory
; - 构建netty服务的配置类nettyServerConfig, thrift服务的配置类ThriftServerDef
我们再来看下细节点的东西,思考一些问题,如果不感兴趣可以跳过。
工作线程池workerExecutor
的创建:
workerExecutor = config.getOrBuildWorkerExecutor(availableWorkerExecutors);
复制代码
默认传进来的availableWorkerExecutors
是空的,所以最终是构建一个新的线程池,
最终调用的方法是makeDefaultWorkerExecutor
,下面的代码稍微简化了一点。
- 默认得到的就是个无界的队列;
- 如果你需要构建有限容量队列的线程池,可以在创建config后调用
setMaxQueuedRequests
来设置队列容量 - 超出队列容量后将执行线程池拒绝策略(
throw RejectedExecutionException
) - 默认核心线程数和最大线程数是一样的,数值为200;
- 使用guava提供的ThreadFactoryBuilder来构建线程工厂,主要是取个容易理解的名字;在thrift中大量使用了
guava
提供的工具类。
private ExecutorService makeDefaultWorkerExecutor(){
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
return new ThreadPoolExecutor(getWorkerThreads(),
getWorkerThreads(),
0L,
TimeUnit.MILLISECONDS,
queue,
new ThreadFactoryBuilder().setNameFormat("thrift-worker-%s").build(),
new ThreadPoolExecutor.AbortPolicy());
}
复制代码
netty的连接线程池和io线程池创建
acceptorExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-acceptor-%s").build());
acceptorThreads = config.getAcceptorThreadCount();
ioExecutor = newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("thrift-io-%s").build());
ioThreads = config.getIoThreadCount();
复制代码
- 该线程池创建的方法是netty提供的,看名字是不是像无界队列的线程池呢?
- 连接数量的限定是1,io线程数量的限定默认是电脑核数*2,但都是可以在配置类中指定的。
关于netty配置类NettyServerConfig的构建nettyServerConfigBuilder.build()
, 在之前设置了连接线程的线程数和io线程池线程数以及定时器timer,调用build后如下:
public NettyServerConfig build() {
Timer timer = getTimer();
ExecutorService bossExecutor = getBossExecutor();
int bossThreadCount = getBossThreadCount();
ExecutorService workerExecutor = getWorkerExecutor();
int workerThreadCount = getWorkerThreadCount();
return new NettyServerConfig(
getBootstrapOptions(),
timer != null ? timer : new NiftyTimer(threadNamePattern("")),
bossExecutor != null ? bossExecutor : buildDefaultBossExecutor(),
bossThreadCount,
workerExecutor != null ? workerExecutor : buildDefaultWorkerExecutor(),
workerThreadCount
);
}
private ExecutorService buildDefaultBossExecutor() {
return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-boss-%s")));
}
private ExecutorService buildDefaultWorkerExecutor() {
return newCachedThreadPool(renamingThreadFactory(threadNamePattern("-worker-%s")));
}
复制代码
由于没有设置两个线程池,所以会设置默认的线程池,注意这里一个是Boss线程池一个是Worker线程池。 其中的timer在后续构建netty处理器的时候会多次用到。
allChannels
是一个netty
提供的channelGroup:
private final DefaultChannelGroup allChannels = new DefaultChannelGroup();
复制代码
在介绍后续流程前,先了解下netty服务端创建步骤,因为这里用的是netty3,和我们比较熟悉的netty4差距有点大,可以对比着看下。
ChannelFactory factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),
);
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("handler1", new Handler1());
return pipeline;
}
});
Channel channel = bootstrap.bind(new InetSocketAddress(8081));
复制代码
构建NettyServerTransport
的时候,在构造方法里面就是进行netty的一些设置。
transport = new NettyServerTransport(thriftServerDef, nettyServerConfig, allChannels);
复制代码
在这里面主要是进行一些对象成员变量的设置,最后也是最重要的就是构建ChannelPipelineFactory
,在其中设置各种处理器,大部分继承自SimpleChannelUpstreamHandler
,少部分继承自ChannelDownstreamHandler
,可以类比netty4
的ChannelInboundHandler,ChannelOutboundHandler
。
this.pipelineFactory = new ChannelPipelineFactory(){
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline cp = Channels.pipeline();
// 设置处理器handler
return cp;
}
}
复制代码
我们来看有哪些处理器。
ConnectionLimiter
: 连接限流器, 创建的时候需要指定最大连接数,以及初始值为0的一个计数器;每次建立连接的时候计数器数值 + 1,关闭的时候数值 - 1;当连接数达到上限,则关闭通道,即channel。ChannelStatistics
: 传入allChannels来构建对象,内部持有一个channelCount = 0
,来统计建立通道channel数, 每次建立连接接受到数据的时候channelCount + 1
,提供了get方法来获取channelCount
;同时将channel
加入到allChannels
中。- 编解码处理器
DefaultThriftFrameCodec
,每次收到的数据和传出的数据都需要进行一次编码或者解码。 - 连接的上下文处理器
ConnectionContextHandler
,在建立连接的时候创建NiftyConnectionContext
,连接的上下文环境,包含了removeAddress和属性map,绑定到ChannelHandlerContext
,即ctx.setAttachment(context);
。 netty
提供的关于超时处理器IdleStateHandler
和IdleDisconnectHandler
。NiftyDispatcher
,这个处理器最为核心,收到buffer经过解码后数据就传到了该处理器,该处理器会对数据进行一系列的处理和方法的调用等。- 异常事件处理器
NiftyExceptionLogger
,重写了log
方法,如果出现异常事件,该处理器会打印响应的异常日志。
来一览处理过程吧
public NettyServerTransport(final ThriftServerDef def, final NettyServerConfig nettyServerConfig, final ChannelGroup allChannels){
this.def = def;
this.nettyServerConfig = nettyServerConfig;
this.port = def.getServerPort();
this.allChannels = allChannels;
final ConnectionLimiter connectionLimiter = new ConnectionLimiter(def.getMaxConnections());
this.channelStatistics = new ChannelStatistics(allChannels);
this.pipelineFactory = new ChannelPipelineFactory()
{
@Override
public ChannelPipeline getPipeline()
throws Exception
{
ChannelPipeline cp = Channels.pipeline();
TProtocolFactory inputProtocolFactory = def.getDuplexProtocolFactory().getInputProtocolFactory();
NiftySecurityHandlers securityHandlers = def.getSecurityFactory().getSecurityHandlers(def, nettyServerConfig);
cp.addLast("connectionContext", new ConnectionContextHandler());
cp.addLast("connectionLimiter", connectionLimiter);
cp.addLast(ChannelStatistics.NAME, channelStatistics);
cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(),
inputProtocolFactory));
if (def.getClientIdleTimeout() != null) {
// Add handlers to detect idle client connections and disconnect them
cp.addLast("idleTimeoutHandler", new IdleStateHandler(nettyServerConfig.getTimer(),
def.getClientIdleTimeout().toMillis(),
NO_WRITER_IDLE_TIMEOUT,
NO_ALL_IDLE_TIMEOUT,
TimeUnit.MILLISECONDS));
cp.addLast("idleDisconnectHandler", new IdleDisconnectHandler());
}
cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer()));
cp.addLast("exceptionLogger", new NiftyExceptionLogger());
return cp;
}
};
}
复制代码
我们再回到server.start()
,看下ThriftServer如何启动的
ThriftServer
: 初始状态是NOT_STARTED
public synchronized ThriftServer start(){
checkState(state != State.CLOSED, "Thrift server is closed");
if (state == State.NOT_STARTED) {
transport.start(serverChannelFactory);
state = State.RUNNING;
}
return this;
}
复制代码
NettyServerTransport:
标准的netty服务端创建过程,其中pipelineFactory
就是在前面NettyServerTransport
构造方法中所创建的。
public void start(ServerChannelFactory serverChannelFactory){
bootstrap = new ServerBootstrap(serverChannelFactory);
bootstrap.setOptions(nettyServerConfig.getBootstrapOptions());
bootstrap.setPipelineFactory(pipelineFactory);
serverChannel = bootstrap.bind(new InetSocketAddress(port));
// ...
}
复制代码
到这里Thrift
的服务端启动就介绍完了,下一部分将会介绍服务端接受数据,处理数据和响应结果的流程。