新聞中心
背景介紹
在業(yè)務(wù)開發(fā)過程中,存在這樣的場景:程序接收到數(shù)據(jù)后,調(diào)用其他接口再將數(shù)據(jù)轉(zhuǎn)發(fā)出去;如果接收一條轉(zhuǎn)發(fā)一條,效率是比較低的,所以一個思路是先將數(shù)據(jù)緩存起來,緩存到一定數(shù)量后一次性轉(zhuǎn)發(fā)出去。

10余年的柳江網(wǎng)站建設(shè)經(jīng)驗,針對設(shè)計、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時及時工作處理。營銷型網(wǎng)站的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整柳江建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計,從而大程度地提升瀏覽體驗。成都創(chuàng)新互聯(lián)從事“柳江網(wǎng)站設(shè)計”,“柳江網(wǎng)站推廣”以來,每個客戶項目都認(rèn)真落實執(zhí)行。
有優(yōu)點就有缺點,需要根據(jù)業(yè)務(wù)場景進(jìn)行考量:
- 在QPS較小的情況下,達(dá)到閾值的等待時間較長,造成數(shù)據(jù)延遲較大
- 在應(yīng)用發(fā)布的時候,緩存的數(shù)據(jù)存在丟失的可能性
- 在應(yīng)用非正常down掉的情況下,緩存的數(shù)據(jù)存在丟失的可能性
下面內(nèi)容是對Hystrix請求合并及根據(jù)Hystrix請求合并原理自定義實現(xiàn)的簡化版本。
Hystrix請求合并
什么是請求合并
Without Collapsing
without collapsing
With Collapsing
with collapsing
請求合并設(shè)計思路
design
Hystrix使用示例
示例采用Spring-Boot編寫,下面代碼拷貝到工程中可以直接運(yùn)行。
添加依賴
下面是spring與hystrix集成的依賴pom。
pom
org.springframework.cloud
spring-cloud-starter-netflix-hystrix
2.1.6.RELEASE
啟動類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
@EnableHystrix
@SpringBootApplication
public class Application {
public static void main(String[] args){
SpringApplication.run(Application.class, args);
}
}
使用示例
HystrixController
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@RestController
public class HystrixController {
@Resource
private HystrixService hystrixService;
@RequestMapping("/byid")
public Long byId(Long id) throws InterruptedException, ExecutionException {
Futurefuture = hystrixService.byId(id);
return future.get();
}
}
HystrixService
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.Future;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
@Service
public class HystrixService {
@HystrixCollapser(batchMethod="byIds",scope= com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties={
@HystrixProperty(name="maxRequestsInBatch",value="10"),
@HystrixProperty(name="timerDelayInMilliseconds",value="1000")
})
public FuturebyId(Long id){
return null;
}
@HystrixCommand
public ListbyIds(List ids){
System.out.println(ids);
return ids;
}
}
測試類
發(fā)送請求進(jìn)行驗證。
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class HystrixTest {
public static void main(String[] args) throws Exception{
CloseableHttpClient httpClient = HttpClients.custom().setMaxConnPerRoute(100).build();
String url = "http://localhost:8086/byid?id=";
ExecutorService executorService = Executors.newFixedThreadPool(20);
int requestCount = 20;
for(int i = 0;i < requestCount;i++){
final int id = i;
executorService.execute(new Runnable() {
@Override
public void run(){
try{
HttpGet httpGet = new HttpGet(url+ id);
HttpResponse response = httpClient.execute(httpGet);
System.out.println(response);
}catch (Exception e){
e.printStackTrace();
}
}
});
}
Thread.sleep(1000*10);
executorService.shutdown();
httpClient.close();
}
}
簡化版本實現(xiàn)
由于Hystrix已不再維護(hù),同時考慮到Hystrix使用RxJava的學(xué)習(xí)門檻,根據(jù)HystrixCollapser設(shè)計思路及常見業(yè)務(wù)功能需求實現(xiàn)了一個簡化版本。
實現(xiàn)
RequestCollapserFactory
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class RequestCollapserFactory {
private static RequestCollapserFactory factory;
private static final int MAXBATCHSIZE = 20;
private static final long DELAY = 1000L;
private static final long PERIOD = 500L;
private ConcurrentHashMapcollapsers;
private ScheduledExecutorService executor;
private RequestCollapserFactory(){
collapsers = new ConcurrentHashMap<>();
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r){
Thread thread = new Thread(r, "RequestCollapserTimer-" + counter.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
executor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), threadFactory);
}
public static RequestCollapserFactory getInstance(){
if(factory != null){
return factory;
}
synchronized (RequestCollapserFactory.class){
if(factory != null){
return factory;
}
factory = new RequestCollapserFactory();
}
return factory;
}
public RequestCollapser getRequestCollapser(String key,RequestBatch requestBatch){
return getRequestCollapser(key,requestBatch,MAXBATCHSIZE,DELAY,PERIOD);
}
public RequestCollapser getRequestCollapser(String key,RequestBatch requestBatch,int maxBatchSize){
return getRequestCollapser(key,requestBatch,maxBatchSize,DELAY,PERIOD);
}
public RequestCollapser getRequestCollapser(String key,RequestBatch requestBatch,int maxBatchSize,long delay, long period){
RequestCollapser collapser = collapsers.get(key);
if(collapser != null){
return collapser;
}
synchronized (collapsers){
collapser = collapsers.get(key);
if(collapser != null){
return collapser;
}
collapser = new RequestCollapser(requestBatch,maxBatchSize,delay,period,executor);
collapsers.put(key,collapser);
return collapser;
}
}
}
RequestCollapser
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class RequestCollapser {
private int maxBatchSize;
private long delay;
private long period;
private LinkedBlockingQueue
RequestBatch
import java.util.List;
public interface RequestBatch {
boolean batch(ListobjectList);
}
驗證測試
RequestCollapserTest
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class RequestCollapserTest {
private static AtomicInteger counter = new AtomicInteger();
private static long delay = 1;
private static long period = 1;
private static int maxBatchSize = 20;
private static int requestCount = 50000;
private static RequestCollapserFactory factory = RequestCollapserFactory.getInstance();
private static RequestBatch requestBatch = new RequestBatch() {
@Override
public boolean batch(ListobjectList){
int size = objectList.size();
counter.addAndGet(size);
System.out.println(counter + ":::::" + size + ":::::" + objectList);
return true;
}
};
public static void main(String[] args) throws Exception{
//sync();
async();
}
public static void async() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(20);
CountDownLatch countDownLatch = new CountDownLatch(requestCount);
for(int i = 0;i < requestCount;i++){
final int id = i;
executorService.execute(new Runnable() {
@Override
public void run(){
try{
RequestCollapser requestCollapser =
factory.getRequestCollapser("1",requestBatch,maxBatchSize,delay,period);
requestCollapser.submitRequest(id,true);
}catch (Exception e){
e.printStackTrace();
}
countDownLatch.countDown();
}
});
}
executorService.shutdown();
countDownLatch.await();
Thread.sleep(1000);
System.out.println(counter.get());
}
public static void sync() throws Exception{
for(int i = 0;i < requestCount;i++){
final int id = i;
RequestCollapser requestCollapser =
factory.getRequestCollapser("1",requestBatch,maxBatchSize,delay,period);
requestCollapser.submitRequest(id,true);
}
Thread.sleep(1000);
System.out.println(counter.get());
}
}
網(wǎng)站欄目:性能優(yōu)化之Hystrix請求合并&自實現(xiàn)簡化版本
網(wǎng)頁URL:http://m.fisionsoft.com.cn/article/dpdjohs.html


咨詢
建站咨詢
