新聞中心
RPC框架的實(shí)現(xiàn)
相信很多小伙伴已經(jīng)看了很多Dubbo的八股文了。比如,Dubbo支持哪些序列化框架,支持哪些注冊(cè)中心,支持哪些集群容錯(cuò)策略,支持服務(wù)降級(jí)嗎?但是你知道Dubbo服務(wù)導(dǎo)出和服務(wù)引入的過(guò)程嗎?服務(wù)降級(jí)是如何實(shí)現(xiàn)的?等等

成都創(chuàng)新互聯(lián)成立于2013年,我們提供高端重慶網(wǎng)站建設(shè)公司、成都網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)、網(wǎng)站定制、成都營(yíng)銷(xiāo)網(wǎng)站建設(shè)、微信平臺(tái)小程序開(kāi)發(fā)、微信公眾號(hào)開(kāi)發(fā)、seo優(yōu)化排名服務(wù),提供專業(yè)營(yíng)銷(xiāo)思路、內(nèi)容策劃、視覺(jué)設(shè)計(jì)、程序開(kāi)發(fā)來(lái)完成項(xiàng)目落地,為石涼亭企業(yè)提供源源不斷的流量和訂單咨詢。
本文就從源碼的角度來(lái)分享一下Dubbo的整個(gè)調(diào)用過(guò)程(放心,圖示為主,輔助一少部分源碼)
「RPC框架的實(shí)現(xiàn)基本上都是如下架構(gòu)」
一個(gè)RPC調(diào)用的過(guò)程如下
- 調(diào)用方發(fā)送請(qǐng)求后由代理類將調(diào)用的方法,參數(shù)組裝成能進(jìn)行網(wǎng)絡(luò)傳輸?shù)南Ⅲw
- 調(diào)用方代理類將消息體發(fā)送到提供方
- 提供方代理類將消息進(jìn)行解碼,得到調(diào)用的方法和參數(shù)
- 提供方代理類執(zhí)行相應(yīng)的方法,并將結(jié)果返回
「協(xié)議,編解碼,序列化的部分不是本文的重點(diǎn),我就不分析了,有興趣的可以看我之前的文章?!?/p>
首先來(lái)手寫(xiě)一個(gè)極簡(jiǎn)版的RPC框架,以便你對(duì)上面的流程有一個(gè)更深的認(rèn)識(shí)
手寫(xiě)一個(gè)簡(jiǎn)單的PRC框架
封裝網(wǎng)絡(luò)請(qǐng)求對(duì)象
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcRequest implements Serializable {
private String interfaceName;
private String methodName;
private Class>[] paramTypes;
private Object[] parameters;
}
根據(jù)interfaceName可以確定需要調(diào)用的接口,methodName和paramTypes則可以確定要調(diào)用接口的方法名,定位到具體的方法,傳入?yún)?shù)即可調(diào)用方法
封裝調(diào)用接口
封裝接口到api模塊,producer端寫(xiě)實(shí)現(xiàn)邏輯,consumer端寫(xiě)調(diào)用邏輯
public interface HelloService {
String sayHello(String content);
}
public interface UpperCaseService {
String toUpperCase(String content);
}
開(kāi)發(fā)producer端
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String content) {
return "hello " + content;
}
}
public class UpperCaseServiceImpl implements UpperCaseService {
@Override
public String toUpperCase(String content) {
return content.toUpperCase();
}
}ServiceMap保存了producer端接口名和接口實(shí)現(xiàn)類的映射關(guān)系,這樣可以根據(jù)請(qǐng)求對(duì)象的接口名,找到對(duì)應(yīng)的實(shí)現(xiàn)類
public class ServiceMap {
// 接口名 -> 接口實(shí)現(xiàn)類
private static Map serviceMap = new HashMap<>();
public static void registerService(String serviceKey, Object service) {
serviceMap.put(serviceKey, service);
}
public static Object lookupService(String serviceKey) {
return serviceMap.get(serviceKey);
}
} 為了提高服務(wù)端的并發(fā)度,我們將每一個(gè)請(qǐng)求的處理過(guò)程放到線程池中
@Slf4j
public class RequestHandler implements Runnable {
private Socket socket;
public RequestHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();
Object service = ServiceMap.lookupService(rpcRequest.getInterfaceName());
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
Object result = method.invoke(service, rpcRequest.getParameters());
outputStream.writeObject(result);
} catch (Exception e) {
log.error("invoke method error", e);
throw new RuntimeException("invoke method error");
}
}
}
啟動(dòng)服務(wù)端
public class RpcProviderMain {
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
HelloService helloService = new HelloServiceImpl();
UpperCaseService upperCaseService = new UpperCaseServiceImpl();
// 將需要暴露的接口注冊(cè)到serviceMap中
ServiceMap.registerService(HelloService.class.getName(), helloService);
ServiceMap.registerService(UpperCaseService.class.getName(), upperCaseService);
ServerSocket serverSocket = new ServerSocket(8080);
while (true) {
// 獲取一個(gè)套接字(阻塞)。所以為了并行,來(lái)一個(gè)請(qǐng)求,開(kāi)一個(gè)線程處理
// 為了復(fù)用線程,用了threadPool
final Socket socket = serverSocket.accept();
executorService.execute(new RequestHandler(socket));
}
}
}
開(kāi)發(fā)consumer端
前面說(shuō)過(guò),我們要通過(guò)動(dòng)態(tài)代理對(duì)象解耦方法調(diào)用和網(wǎng)絡(luò)調(diào)用,所以接下來(lái)我們就寫(xiě)一下動(dòng)態(tài)代理對(duì)象的實(shí)現(xiàn)邏輯
生成一個(gè)代理對(duì)象的過(guò)程很簡(jiǎn)單
實(shí)現(xiàn)InvocationHandler接口,在invoke方法中增加代理邏輯
調(diào)用Proxy.newProxyInstance方法生成代理對(duì)象,3個(gè)參數(shù)分別是ClassLoader,代理對(duì)象需要實(shí)現(xiàn)的接口數(shù)組,InvocationHandler接口實(shí)現(xiàn)類
當(dāng)執(zhí)行代理執(zhí)行實(shí)現(xiàn)的接口方法時(shí),會(huì)調(diào)用到InvocationHandler#invoke,這個(gè)方法中增加了代理邏輯哈。
public class ConsumerProxy {
public static T getProxy(final Class interfaceClass, final String host, final int port) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class>[]{interfaceClass}, new ConsumerInvocationHandler(host, port));
}
} 可以看到代理對(duì)象的主要功能就是組裝請(qǐng)求參數(shù),然后發(fā)起網(wǎng)絡(luò)調(diào)用
@Slf4j
public class ConsumerInvocationHandler implements InvocationHandler {
private String host;
private Integer port;
public ConsumerInvocationHandler(String host, Integer port) {
this.host = host;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try (Socket socket = new Socket(host, port);
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) {
RpcRequest rpcRequest = RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.paramTypes(method.getParameterTypes())
.parameters(args).build();
outputStream.writeObject(rpcRequest);
Object result = inputStream.readObject();
return result;
} catch (Exception e) {
log.error("consumer invoke error", e);
throw new RuntimeException("consumer invoke error");
}
}
}
此時(shí)我們只需要通過(guò)ConsumerProxy#getProxy方法,就能很方便的獲取到代理對(duì)象。通過(guò)代理對(duì)象調(diào)用遠(yuǎn)程方法和調(diào)用本地方法一樣方便
public class RpcConsumerMain {
public static void main(String[] args) {
// 因?yàn)檫@是一個(gè)小demo,就不拆分多模塊了
// 這個(gè)HelloService是通過(guò)網(wǎng)絡(luò)調(diào)用的HelloServiceImpl,而不是本地調(diào)用
HelloService helloService = ConsumerProxy.getProxy(HelloService.class, "127.0.0.1", 8080);
// hello world
System.out.println(helloService.sayHello("world"));
UpperCaseService upperCaseService = ConsumerProxy.getProxy(UpperCaseService.class, "127.0.0.1", 8080);
// THIS IS CONTENT
System.out.println(upperCaseService.toUpperCase("this is content"));
}
}至此我們已經(jīng)把一個(gè)RPC框架最核心的功能就實(shí)現(xiàn)了,是不是很簡(jiǎn)單?!钙鋵?shí)Dubbo的源碼也很簡(jiǎn)單,只不過(guò)增加了很多擴(kuò)展功能,所以大家有時(shí)候會(huì)認(rèn)為比較難?!?/p>
所以我們就來(lái)分析一下核心的擴(kuò)展功能。比如Filter,服務(wù)降級(jí),集群容錯(cuò)等是如何實(shí)現(xiàn)的?其他的擴(kuò)展功能,比如支持多種注冊(cè)中心,支持多種序列化框架,支持多種協(xié)議,基本不會(huì)打交道,所以就不浪費(fèi)時(shí)間了
從前面的圖示我們知道,代理類在服務(wù)調(diào)用和響應(yīng)過(guò)程中扮演著重要的角色。「在Dubbo中,代理類有個(gè)專有名詞叫做Invoker,而Dubbo中就是通過(guò)對(duì)這個(gè)Invoker不斷進(jìn)行代理增加各種新功能的」
Dubbo服務(wù)導(dǎo)出
「當(dāng)?shù)谌娇蚣芟牒蚐pring整合時(shí),有哪些方式?」
實(shí)現(xiàn)BeanFactoryPostProcessor接口(對(duì)BeanFactory進(jìn)行擴(kuò)展)
實(shí)現(xiàn)BeanPostProcessor接口(對(duì)Bean的生成過(guò)程進(jìn)行擴(kuò)展)
Dubbo也不例外,當(dāng)Dubbo和Spring整合時(shí),會(huì)往容器中注入2個(gè)BeanPostProcessor,作用如下
ServiceAnnotationBeanPostProcessor,將@Service注解的類封裝成ServiceBean注入容器 ReferenceAnnotationBeanPostProcessor,將@Reference注解的接口封裝成ReferenceBean注入容器
所以服務(wù)導(dǎo)出和服務(wù)引入肯定和ServiceBean和ReferenceBean的生命周期有關(guān)。
「ServiceBean實(shí)現(xiàn)了ApplicationListener接口,當(dāng)收到ContextRefreshedEvent事件時(shí)(即Spring容器啟動(dòng)完成)開(kāi)始服務(wù)導(dǎo)出?!?/p>
服務(wù)導(dǎo)出比較重要的2個(gè)步驟就是
將服務(wù)注冊(cè)到zk(我們后面的分析,注冊(cè)中心都基于zk哈)
將服務(wù)對(duì)象包裝成Invoker,并保存在一個(gè)map中,key為服務(wù)名,value為Invoker對(duì)象
「當(dāng)收到請(qǐng)求時(shí),根據(jù)服務(wù)名找到Invoker對(duì)象,Invoker對(duì)象根據(jù)方法名和參數(shù)反射執(zhí)行方法,然后將結(jié)果返回?!?/p>
這里留個(gè)小問(wèn)題,反射執(zhí)行方式效率會(huì)很低,那么在Dubbo中還有哪些解決方案呢?
從圖中可以看到AbstractProxyInvoker被其他Invoker進(jìn)行代理了,而這些Invoker是用來(lái)執(zhí)行Filter的,一個(gè)Invoker代理類執(zhí)行一個(gè)Filter,層層進(jìn)行代理
「如下圖為Dubbo收到請(qǐng)求層層調(diào)用的過(guò)程」
Dubbo服務(wù)引入
前面我們已經(jīng)推斷出來(lái)服務(wù)導(dǎo)出和ReferenceBean有關(guān)。我們來(lái)看看具體在哪個(gè)階段?ReferenceBean實(shí)現(xiàn)了FactoryBean接口,并重寫(xiě)了getObject方法,在這個(gè)方法中進(jìn)行服務(wù)導(dǎo)出。因此我們推斷服務(wù)導(dǎo)出的時(shí)機(jī)是ReferenceBean被其他對(duì)象注入時(shí)
public Object getObject() {
return get();
}接下來(lái)就是從注冊(cè)中心獲取服務(wù)地址,構(gòu)建Invoker對(duì)象,并基于Invoker對(duì)象構(gòu)建動(dòng)態(tài)代理類,賦值給接口。
最終能發(fā)起網(wǎng)絡(luò)調(diào)用的是DubboInvoker,而這個(gè)Invoker被代理了很多層,用來(lái)實(shí)現(xiàn)各種擴(kuò)展功能。
服務(wù)降級(jí)
第一個(gè)就是服務(wù)降級(jí),什么是服務(wù)降級(jí)呢?
「當(dāng)服務(wù)可不用時(shí),我們不希望拋出異常,而是返回特定的值(友好的提示等),這時(shí)候我們就可以用到服務(wù)降級(jí)?!?/p>
dubbo中有很多服務(wù)降級(jí)策略,簡(jiǎn)單舉幾個(gè)例子
force: 代表強(qiáng)制使用 Mock 行為,在這種情況下不會(huì)走遠(yuǎn)程調(diào)用 fail: 只有當(dāng)遠(yuǎn)程調(diào)用發(fā)生錯(cuò)誤時(shí)才使用 Mock 行為
假如有如下一個(gè)controller,調(diào)用DemoService獲取值,但是DemoService并沒(méi)有啟動(dòng)
@RestController
public class DemoController {
@Reference(check = false, mock = "force:return mock")
private DemoService demoService;
@RequestMapping("hello")
public String hello(@RequestParam("msg") String msg) {
return demoService.hello(msg);
}
}
可以看到直接返回mock字符串(也并不會(huì)發(fā)生網(wǎng)絡(luò)調(diào)用)
將@Reference的mock屬性改為如下,再次調(diào)用
@RestController
public class DemoController {
@Reference(check = false, mock = "fail:return fail")
private DemoService demoService;
@RequestMapping("hello")
public String hello(@RequestParam("msg") String msg) {
return demoService.hello(msg);
}
}
會(huì)發(fā)起網(wǎng)絡(luò)調(diào)用,調(diào)用失敗,然后返回fail。
「dubbo中的服務(wù)降級(jí)只用了MockClusterInvoker這一個(gè)類來(lái)實(shí)現(xiàn),因此相對(duì)于Hystrix等功能很簡(jiǎn)單,實(shí)現(xiàn)也很簡(jiǎn)單,如下圖。」
當(dāng)Reference不配置mock屬性或者屬性為false時(shí),表示不進(jìn)行降級(jí),直接調(diào)用代理對(duì)象即可
以屬性以force開(kāi)頭時(shí),表示直接進(jìn)行降級(jí),都不會(huì)發(fā)生網(wǎng)絡(luò)調(diào)用
其他請(qǐng)求就是在進(jìn)行網(wǎng)絡(luò)失敗后才進(jìn)行降級(jí)
集群容錯(cuò)
過(guò)了服務(wù)降級(jí)這一層,接下來(lái)就到了集群容錯(cuò)了。
dubbo中有很多集群容錯(cuò)策略
|
容錯(cuò)策略 |
解釋 |
代理類 |
|
AvailableCluster |
找到一個(gè)可用的節(jié)點(diǎn),直接發(fā)起調(diào)用 |
AbstractClusterInvoker匿名內(nèi)部類 |
|
FailoverCluster |
失敗重試(默認(rèn)) |
FailoverClusterInvoker |
|
FailfastCluster |
快速失敗 |
FailfastClusterInvoker |
|
FailsafeCluster |
安全失敗 |
FailsafeClusterInvoker |
|
FailbackCluster |
失敗自動(dòng)恢復(fù) |
FailbackClusterInvoker |
|
ForkingCluster |
并行調(diào)用 |
ForkingClusterInvoker |
|
BroadcastCluster |
廣播調(diào)用 |
BroadcastClusterInvoker |
Failover Cluster:失敗自動(dòng)切換,當(dāng)出現(xiàn)失敗,重試其它服務(wù)器。通常用于讀操作,但重試會(huì)帶來(lái)更長(zhǎng)延遲。
Failfast Cluster:快速失敗,只發(fā)起一次調(diào)用,失敗立即報(bào)錯(cuò)。通常用于非冪等性的寫(xiě)操作,比如新增記錄。
Failsafe Cluster:失敗安全,出現(xiàn)異常時(shí),直接忽略。通常用于寫(xiě)入審計(jì)日志等操作。
Failback Cluster:失敗自動(dòng)恢復(fù),后臺(tái)記錄失敗請(qǐng)求,定時(shí)重發(fā)。通常用于消息通知操作。
Forking Cluster:并行調(diào)用多個(gè)服務(wù)器,只要一個(gè)成功即返回。通常用于實(shí)時(shí)性要求較高的讀操作,但需要浪費(fèi)更多服務(wù)資源??赏ㄟ^(guò) forks=”2″ 來(lái)設(shè)置最大并行數(shù)。
Broadcast Cluster:廣播調(diào)用所有提供者,逐個(gè)調(diào)用,任意一臺(tái)報(bào)錯(cuò)則報(bào)錯(cuò) 。通常用于通知所有提供者更新緩存或日志等本地資源信息。
「讀操作建議使用 Failover 失敗自動(dòng)切換,默認(rèn)重試兩次其他服務(wù)器。寫(xiě)操作建議使用 Failfast 快速失敗,發(fā)一次調(diào)用失敗就立即報(bào)錯(cuò)?!?/p>
不知道你發(fā)現(xiàn)沒(méi)?「換集群容錯(cuò)策略就是換DubboInvoker的代理類」
集群容錯(cuò)相關(guān)的代理類都有一個(gè)共同的屬性RegistryDirectory,這個(gè)是一個(gè)很重要的組件,它用List保存了服務(wù)提供者對(duì)應(yīng)的所有Invoker。
「更牛逼的是這個(gè)List是動(dòng)態(tài)變化的,當(dāng)服務(wù)提供者下線時(shí),會(huì)觸發(fā)相應(yīng)的事件,調(diào)用方會(huì)監(jiān)聽(tīng)這個(gè)事件,并把對(duì)應(yīng)的Invoker刪除,這樣后續(xù)就不會(huì)調(diào)用到下線的服務(wù)了。當(dāng)有新的服務(wù)提供者時(shí),會(huì)觸發(fā)生成新的Invoker?!?/p>
當(dāng)一個(gè)服務(wù)的多個(gè)Invoker擺在我們面前時(shí),該選擇哪個(gè)來(lái)調(diào)用呢?這就不得不提到負(fù)載均衡策略了。
|
負(fù)載均衡策略實(shí)現(xiàn)類 |
解釋 |
|
RandomLoadBalance |
隨機(jī)策略(默認(rèn)) |
|
RoundRobinLoadBalance |
輪詢策略 |
|
LeastActiveLoadBalance |
最少活躍調(diào)用數(shù) |
|
ConsistentHashLoadBalance |
一致性hash策略 |
「我們只需要通過(guò)合適的負(fù)載均衡策略來(lái)選擇即可」
和服務(wù)端類似類似,最終能發(fā)送網(wǎng)絡(luò)請(qǐng)求的Invoker還會(huì)被Filter對(duì)應(yīng)的Invoker類所代理,一個(gè)Filter一個(gè)代理類,層層代理。
如下圖為Dubbo發(fā)送請(qǐng)求時(shí)層層調(diào)用的過(guò)程
好了,Dubbo一些比較重要的擴(kuò)展點(diǎn)就分享完了,整個(gè)請(qǐng)求響應(yīng)的基本過(guò)程也串下來(lái)了!
文章名稱:你管這個(gè)叫Dubbo?
文章位置:http://m.fisionsoft.com.cn/article/cdjcceh.html


咨詢
建站咨詢
