技术标签: Java RPC Apache Thrift 多线程 网络通信模型 Apache Thrift学习系列
Thrift
提供的网络服务模型:单线程、多线程、事件驱动,从另一个角度划分为:阻塞服务模型、非阻塞服务模型。
阻塞服务模型:TSimpleServer
、TThreadPoolServer
。
非阻塞服务模型:TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
。
TServer
类的层次关系:
TServer
定义了静态内部类Args
,Args
继承自抽象类AbstractServerArgs
。AbstractServerArgs
采用了建造者模式,向TServer
提供各种工厂:
工厂属性 | 工厂类型 | 作用 |
---|---|---|
ProcessorFactory | TProcessorFactory | 处理层工厂类,用于具体的TProcessor对象的创建 |
InputTransportFactory | TTransportFactory | 传输层输入工厂类,用于具体的TTransport对象的创建 |
OutputTransportFactory | TTransportFactory | 传输层输出工厂类,用于具体的TTransport对象的创建 |
InputProtocolFactory | TProtocolFactory | 协议层输入工厂类,用于具体的TProtocol对象的创建 |
OutputProtocolFactory | TProtocolFactory | 协议层输出工厂类,用于具体的TProtocol对象的创建 |
下面是TServer
的部分核心代码:
public abstract class TServer {
public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
}
protected TProcessorFactory processorFactory_;
protected TServerTransport serverTransport_;
protected TTransportFactory inputTransportFactory_;
protected TTransportFactory outputTransportFactory_;
protected TProtocolFactory inputProtocolFactory_;
protected TProtocolFactory outputProtocolFactory_;
private boolean isServing;
protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public abstract void serve();
public void stop() {
}
public boolean isServing() {
return isServing;
}
protected void setServing(boolean serving) {
isServing = serving;
}
}
TServer
的三个方法:serve()
、stop()
和isServing()
。serve()
用于启动服务,stop()
用于关闭服务,isServing()
用于检测服务的起停状态。
TServer
的不同实现类的启动方式不一样,因此serve()
定义为抽象方法。不是所有的服务都需要优雅的退出, 因此stop()
方法没有被定义为抽象。
TSimpleServer
的工作模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,但是一次只能接收和处理一个socket
连接,效率比较低。它主要用于演示Thrift
的工作过程,在实际开发过程中很少用到它。
服务端:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor processor =
new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.protocolFactory(protocolFactory);
// 简单的单线程服务模型 一般用于测试
TServer tServer = new TSimpleServer(tArgs);
System.out.println("Running Simple Server");
tServer.serve();
客户端:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("Leo");
System.out.println("Result =: " + result);
transport.close();
查看上述流程的源代码,即TSimpleServer.java
中的serve()
方法如下:
serve()
方法的操作:
TServerSocket
的listen()
方法启动连接监听。TTransport
对象。TServerEventHandler
对象处理具体的业务请求。TThreadPoolServer
模式采用阻塞socket
方式工作,主线程负责阻塞式监听是否有新socket
到来,具体的业务处理交由一个线程池来处理。
服务端:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor<HelloWorldService.Iface> processor =
new HelloWorldService.Processor<>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
ttpsArgs.processor(processor);
ttpsArgs.protocolFactory(protocolFactory);
// 线程池服务模型 使用标准的阻塞式IO 预先创建一组线程处理请求
TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
System.out.println("Running ThreadPool Server");
ttpsServer.serve();
客户端:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadPoolClient");
System.out.println("Result =: " + result);
transport.close();
ThreadPoolServer
解决了TSimpleServer
不支持并发和多连接的问题,引入了线程池。实现的模型是One Thread Per Connection
。查看上述流程的源代码,先查看线程池的代码片段:
TThreadPoolServer.java
中的serve()
方法如下:
serve()
方法的操作:
TServerSocket
的listen()
方法启动连接监听。WorkerProcess
对象(WorkerProcess
实现了Runnabel
接口),并提交到线程池。WorkerProcess
的run()
方法负责业务处理,为客户端创建了处理器对象、输入传输通道对象、输出传输通道对象、输入协议对象和输出协议对象。TServerEventHandler
对象处理具体的业务请求。WorkerProcess
的run()
方法:
拆分了监听线程(Accept Thread
)和处理客户端连接的工作线程(Worker Thread
),数据读取和业务处理都交给线程池处理。因此在并发量较大时新连接也能够被及时接受。
线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。
线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。
TNonblockingServer
模式也是单线程工作,但是采用NIO
的模式,借助Channel/Selector
机制, 采用IO
事件模型来处理。
所有的socket
都被注册到selector
中,在一个线程中通过seletor
循环监控所有的socket
。
每次selector
循环结束时,处理所有的处于就绪状态的socket
,对于有数据到来的socket
进行数据读取操作,对于有数据发送的socket则进行数据发送操作,对于监听socket
则产生一个新业务socket
并将其注册到selector
上。
注意:TNonblockingServer要求底层的传输通道必须使用TFramedTransport。
服务端:
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
tnbArgs.processor(tprocessor);
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TCompactProtocol.Factory());
// 使用非阻塞式IO服务端和客户端需要指定TFramedTransport数据传输的方式
TServer server = new TNonblockingServer(tnbArgs);
System.out.println("Running Non-blocking Server");
server.serve();
客户端:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TCompactProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("NonBlockingClient");
System.out.println("Result =: " + result);
transport.close();
TNonblockingServer
继承于AbstractNonblockingServer
,这里我们更关心基于NIO
的selector
部分的关键代码。
相比于TSimpleServer
效率提升主要体现在IO
多路复用上,TNonblockingServer
采用非阻塞IO
,对accept/read/write
等IO
事件进行监控和处理,同时监控多个socket
的状态变化。
TNonblockingServer
模式在业务处理上还是采用单线程顺序来完成。在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,会导致整个服务被阻塞住,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。
鉴于TNonblockingServer
的缺点,THsHaServer
继承于TNonblockingServer
,引入了线程池提高了任务处理的并发能力。THsHaServer
是半同步半异步(Half-Sync/Half-Async
)的处理模式,Half-Aysnc
用于IO
事件处理(Accept/Read/Write
),Half-Sync
用于业务handler
对rpc
的同步处理上。
注意:THsHaServer和TNonblockingServer一样,要求底层的传输通道必须使用TFramedTransport。
服务端:
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 半同步半异步
THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
thhsArgs.processor(tprocessor);
thhsArgs.transportFactory(new TFramedTransport.Factory());
thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new THsHaServer(thhsArgs);
System.out.println("Running HsHa Server");
server.serve();
客户端:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 协议要和服务端一致
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("HsHaClient");
System.out.println("Result =: " + result);
transport.close();
THsHaServer
继承于TNonblockingServer
,新增了线程池并发处理工作任务的功能,查看线程池的相关代码:
任务线程池的创建过程:
下文的TThreadedSelectorServer囊括了THsHaServer的大部分特性,源码分析可参考TThreadedSelectorServer。
THsHaServer
与TNonblockingServer
模式相比,THsHaServer
在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升。
主线程仍然需要完成所有socket
的监听接收、数据读取和数据写入操作。当并发请求数较大时,且发送数据量较多时,监听socket
上新连接请求不能被及时接受。
TThreadedSelectorServer
是对THsHaServer
的一种扩充,它将selector
中的读写IO
事件(read/write
)从主线程中分离出来。同时引入worker
工作线程池,它也是种Half-Sync/Half-Async
的服务模型。
TThreadedSelectorServer
模式是目前Thrift
提供的最高级的线程服务模型,它内部有如果几个部分构成:
AcceptThread
线程对象,专门用于处理监听socket
上的新连接。SelectorThread
对象专门用于处理业务socket
的网络I/O
读写操作,所有网络数据的读写均是有这些线程来完成。SelectorThreadLoadBalancer
对象,主要用于AcceptThread
线程接收到一个新socket
连接请求时,决定将这个新连接请求分配给哪个SelectorThread
线程。ExecutorService
类型的工作线程池,在SelectorThread
线程中,监听到有业务socket
中有调用请求过来,则将请求数据读取之后,交给ExecutorService
线程池中的线程完成此次调用的具体执行。主要用于处理每个rpc
请求的handler
回调处理(这部分是同步的)。服务端:
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 多线程半同步半异步
TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
ttssArgs.processor(processor);
ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
// 使用非阻塞式IO时 服务端和客户端都需要指定数据传输方式为TFramedTransport
ttssArgs.transportFactory(new TFramedTransport.Factory());
// 多线程半同步半异步的服务模型
TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
System.out.println("Running ThreadedSelector Server");
server.serve();
客户端:
for (int i = 0; i < 10; i++) {
new Thread("Thread " + i) {
@Override
public void run() {
// 设置传输通道 对于非阻塞服务 需要使用TFramedTransport(用于将数据分块发送)
for (int j = 0; j < 10; j++) {
TTransport transport = null;
try {
transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadedSelector Client");
System.out.println("Result =: " + result);
transport.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭传输通道
transport.close();
}
}
}
}.start();
}
以上工作流程的三个组件AcceptThread
、SelectorThread
和ExecutorService
在源码中的定义如下:
TThreadedSelectorServer
模式中有一个专门的线程AcceptThread
用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread
线程中来完成,因此能够快速对网络I/O
进行读写操作,能够很好地应对网络I/O
较多的情况。
TThreadedSelectorServer
默认参数定义如下:
创建、初始化并启动AcceptThread
和SelectorThreads
,同时启动selector
线程的负载均衡器(selectorThreads
)。
AcceptThread
继承于Thread
,可以看出包含三个重要的属性:非阻塞式传输通道(TNonblockingServerTransport
)、NIO
选择器(acceptSelector
)和选择器线程负载均衡器(threadChooser
)。
查看AcceptThread
的run()
方法,可以看出accept
线程一旦启动,就会不停地调用select()
方法:
查看select()
方法,acceptSelector
选择器等待IO
事件的到来,拿到SelectionKey
即检查是不是accept
事件。如果是,通过handleAccept()
方法接收一个新来的连接;否则,如果是IO
读写事件,AcceptThread
不作任何处理,交由SelectorThread
完成。
在handleAccept()
方法中,先通过doAccept()
去拿连接通道,然后Selector
线程负载均衡器选择一个Selector
线程,完成接下来的IO
读写事件。
接下来继续查看doAddAccept()
方法的实现,毫无悬念,它进一步调用了SelectorThread
的addAcceptedConnection()
方法,把非阻塞传输通道对象传递给选择器线程做进一步的IO
读写操作。
SelectorThreadLoadBalancer
如何创建?
SelectorThreadLoadBalancer
是一个基于轮询算法的Selector
线程选择器,通过线程迭代器为新进来的连接顺序分配SelectorThread
。
SelectorThread
和AcceptThread
一样,是TThreadedSelectorServer
的一个成员内部类,每个SelectorThread
线程对象内部都有一个阻塞式的队列,用于存放该线程被接收的连接通道。
阻塞队列的大小可由构造函数指定:
上面看到,在AcceptThread
的doAddAccept()
方法中调用了SelectorThread
的addAcceptedConnection()
方法。
这个方法做了两件事:
SelectorThread
线程接收的连接通道放入阻塞队列中。wakeup()
方法唤醒SelectorThread
中的NIO
选择器selector
。既然SelectorThread
也是继承于Thread
,查看其run()
方法的实现:
SelectorThread
方法的select()
监听IO
事件,仅仅用于处理数据读取和数据写入。如果连接有数据可读,读取并以frame
的方式缓存;如果需要向连接中写入数据,缓存并发送客户端的数据。且在数据读写处理完成后,需要向NIO
的selector
清空和注销自身的SelectionKey
。
rpc
调用过程也就结束了,handleWrite()
方法如下:Thrift
会利用已读数据执行目标方法,handleRead()
方法如下:handleRead
方法在执行read()
方法,将数据读取完成后,会调用requestInvoke()
方法调用目标方法完成具体业务处理。requestInvoke()
方法将请求数据封装为一个Runnable
对象,提交到工作任务线程池(ExecutorService
)进行处理。
select()
方法完成后,线程继续运行processAcceptedConnections()
方法处理下一个连接的IO
事件。
这里比较核心的几个操作:
SelectorThread
的阻塞队列acceptedQueue
中获取一个连接的传输通道。如果获取成功,调用registerAccepted()
方法;否则,进入下一次循环。registerAccepted()
方法将传输通道底层的连接注册到NIO
的选择器selector
上面,获取到一个SelectionKey
。FrameBuffer
对象,并绑定到获取的SelectionKey
上面,用于数据传输时的中间读写缓存。本文对Thrift
的各种线程服务模型进行了介绍,包括2种阻塞式服务模型:TSimpleServer
、TThreadPoolServer
,3种非阻塞式服务模型:TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
。对各种服务模型的具体用法、工作流程、原理和源码实现进行了一定程度的分析。
鉴于篇幅较长,请各位看官请慢慢批阅!
欢迎关注技术公众号: 零壹技术栈
本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。
文章浏览阅读2.2k次。通过adb shell获取系统屏幕亮度命令: >adbshell"dumpsyspower|grepmScreenBrightnessSetting"mScreenBrightnessSetting=176mScreenBrightnessSettingMini..._adb shell获取亮度值
文章浏览阅读4.1k次,点赞3次,收藏23次。目录一、简介:二、SOME/IP-SD报文格式:三、EntriesArray:四、Option类型五、报文传输过程:六、服务发现通信行为七、状态机八、错误处理机制九、使用SOMEIP和SOMEIP-SD发布、订阅过程前言:熟悉SOMEIP-SD报文格式及通信过程;正文:一、简介:SomeIP-SD为服务发现,是SomeIP的一种特殊服务,SOME/IP-SD主要用于:1、定位服务实例2、检测服务实例是否在运行3、实现发布或订阅处理二、SOME_【some/ip通信系列】(九)解读some/ip-sd服务发现协议
文章浏览阅读1.2k次。前言前一章提到,怎么搭建一个简单的web服务,咱们搭建好了,是不是需要测试下自己代码的健壮性。所以我又找了个压力测试工具。发现简go_bench单实用。正文 安装方式一:go get github.com/linkxzhou/http_bench方式二:git clone [email protected]:linkxzhou/http_bench.git下载..._go bench
文章浏览阅读888次。先在项目设置的Physics中设置在内容浏览器增加物理材质在里面进行定义进入人物的physics中将自己创的定义进去这样角色就拥有了物理材质,接下来进入程序加上互动。呈现点效果进入项目.h文件中增加宏定义,=//就是给这两个物理材质增加个假名,等下容易进行调用#define SURFACE_FLESHDEFAULT SurfaceType1#define SU..._c2039"surfacetype": 不是 "tweakobjectptr
文章浏览阅读569次。话不多说,直接进入正题.首先,要在微信公众平台注册一个公众账号,我这边是用的测试账号.有了账号之后进入微信公众平台获取appID和appsecret以供调用接口.微信开发者文档地址:https://developers.weixin.qq.com/doc/offiaccount/OA_Web_Apps/Wechat_webpage_authorization.html首先判断coo..._java vue 微信公众号openid自动登录
文章浏览阅读1.7k次。问题描述:mysql执行还有自定义参数的sql语句会报错“Fatal error encountered during command execution”(执行命令时遇到的致命错误)问题原因:发帖的时候刚用msql不久,公司项目里遇见的,好像是mysql执行非过程语句步能使用变量吧,有清楚的朋友可以评论补充一下解决方法:数据库连接字符出添加”Allow User Variables=T..._fatal error encountered during command execution. csdn
文章浏览阅读3.6k次。#include <sys/system_properties.h> //读取序号 __system_property_get("ro.serialno",m_szDevID); LOGE("%s", m_szDevID); //读取机型 __system_property_get("ro.product.model",m_szDevModel); LO..._jni中过去当前sdk版本
文章浏览阅读66次。
文章浏览阅读8.9k次,点赞2次,收藏5次。在vscode的终端窗口运行命令:git config user.name “设置你的名字”git config user.email “这里面设置你的邮箱地址”然后再上传代码,重新运行项目。然后就可以了git config user.name 查看配置好的名称git config user.email 查看配置好的邮箱..._在visual studio code中每次提交都显示请确保在git中配置你的"user name"和"user.
文章浏览阅读809次。收稿日期: 2012 年 3 月 基于 MATLAB 的机床主轴结构优化设计 刘红娟宝鸡文理学院 摘要: 介绍了机床主轴的结构,建立了以质量最轻为目标函数的优化模型,运用 MATLAB 优化工具箱中的fmincon 函数对其进行优化设计。通过对已有的机床主轴实例进行优化求解和分析,对比优化前后的数据信息,表明优化之后的机床主轴质量更轻,且编程简单,设计效率高。最后绘出了实例的各设计变量和目标函数之..._机床主轴优化设计matlab程序
文章浏览阅读3.5k次,点赞3次,收藏10次。转载至https://www.cnblogs.com/dmtz/p/11091090.htmlfrom openpyxl import *class excel(): def __init__(self,file): self.file = file self.wb = load_workbook(self.file) sheets = self.wb.get_sheet_names() self.sheet = sheets_python get excel表的行与列值
文章浏览阅读4.5w次,点赞14次,收藏147次。处理代码分为两部分,第一部分用于去除边缘的突出部,第二部分用于边缘光滑。具体如下所示1.去除边缘突出部//去除二值图像边缘的突出部//uthreshold、vthreshold分别表示突出部的宽度阈值和高度阈值//type代表突出部的颜色,0表示黑色,1代表白色 void delete_jut(Mat& src, Mat& dst, int uthreshold, int vthre_opencv 凸出明显的地方删除