日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯網營銷解決方案
實現異步編程,這個工具類你得掌握!

 [[393773]]

本文轉載自微信公眾號「月伴飛魚」,作者日常加油站。轉載本文請聯系月伴飛魚公眾號。

我們提供的服務有:成都網站設計、成都網站制作、外貿網站建設、微信公眾號開發(fā)、網站優(yōu)化、網站認證、韶山ssl等。為上千企事業(yè)單位解決了網站和推廣的問題。提供周到的售前咨詢和貼心的售后服務,是有科學管理、有技術的韶山網站制作公司

前言

最近看公司代碼,多線程編程用的比較多,其中有對CompletableFuture的使用,所以想寫篇文章總結下

在日常的Java8項目開發(fā)中,CompletableFuture是很強大的并行開發(fā)工具,其語法貼近java8的語法風格,與stream一起使用也能大大增加代碼的簡潔性

大家可以多應用到工作中,提升接口性能,優(yōu)化代碼

基本介紹

CompletableFuture是Java 8新增的一個類,用于異步編程,繼承了Future和CompletionStage

這個Future主要具備對請求結果獨立處理的功能,CompletionStage用于實現流式處理,實現異步請求的各個階段組合或鏈式處理,因此completableFuture能實現整個異步調用接口的扁平化和流式處理,解決原有Future處理一系列鏈式異步請求時的復雜編碼

Future的局限性

1、Future 的結果在非阻塞的情況下,不能執(zhí)行更進一步的操作

我們知道,使用Future時只能通過isDone()方法判斷任務是否完成,或者通過get()方法阻塞線程等待結果返回,它不能非阻塞的情況下,執(zhí)行更進一步的操作。

2、不能組合多個Future的結果

假設你有多個Future異步任務,你希望最快的任務執(zhí)行完時,或者所有任務都執(zhí)行完后,進行一些其他操作

3、多個Future不能組成鏈式調用

當異步任務之間有依賴關系時,Future不能將一個任務的結果傳給另一個異步任務,多個Future無法創(chuàng)建鏈式的工作流。

4、沒有異常處理

現在使用CompletableFuture能幫助我們完成上面的事情,讓我們編寫更強大、更優(yōu)雅的異步程序

基本使用

創(chuàng)建異步任務

通??梢允褂孟旅鎺讉€CompletableFuture的靜態(tài)方法創(chuàng)建一個異步任務

 
 
 
 
  1. public static CompletableFuture runAsync(Runnable runnable);              //創(chuàng)建無返回值的異步任務
  2. public static CompletableFuture runAsync(Runnable runnable, Executor executor);     //無返回值,可指定線程池(默認使用ForkJoinPool.commonPool)
  3. public static  CompletableFuture supplyAsync(Supplier supplier);           //創(chuàng)建有返回值的異步任務
  4. public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor); //有返回值,可指定線程池

使用示例:

 
 
 
 
  1. Executor executor = Executors.newFixedThreadPool(10);
  2. CompletableFuture future = CompletableFuture.runAsync(() -> {
  3.     //do something
  4. }, executor);
  5. int poiId = 111;
  6. CompletableFuture future = CompletableFuture.supplyAsync(() -> {
  7.  PoiDTO poi = poiService.loadById(poiId);
  8.   return poi.getName();
  9. });
  10. // Block and get the result of the Future
  11. String poiName = future.get();

使用回調方法

通過future.get()方法獲取異步任務的結果,還是會阻塞的等待任務完成

CompletableFuture提供了幾個回調方法,可以不阻塞主線程,在異步任務完成后自動執(zhí)行回調方法中的代碼

 
 
 
 
  1. public CompletableFuture thenRun(Runnable runnable);            //無參數、無返回值
  2. public CompletableFuture thenAccept(Consumer action);         //接受參數,無返回值
  3. public  CompletableFuture thenApply(Function fn); //接受參數T,有返回值U

使用示例:

 
 
 
 
  1. CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  2.                            .thenRun(() -> System.out.println("do other things. 比如異步打印日志或發(fā)送消息"));
  3. //如果只想在一個CompletableFuture任務執(zhí)行完后,進行一些后續(xù)的處理,不需要返回值,那么可以用thenRun回調方法來完成。
  4. //如果主線程不依賴thenRun中的代碼執(zhí)行完成,也不需要使用get()方法阻塞主線程。
 
 
 
 
  1. CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  2.                            .thenAccept((s) -> System.out.println(s + " world"));
  3. //輸出:Hello world
  4. //回調方法希望使用異步任務的結果,并不需要返回值,那么可以使用thenAccept方法
 
 
 
 
  1. CompletableFuture future = CompletableFuture.supplyAsync(() -> {
  2.   PoiDTO poi = poiService.loadById(poiId);
  3.   return poi.getMainCategory();
  4. }).thenApply((s) -> isMainPoi(s));   // boolean isMainPoi(int poiId);
  5. future.get();
  6. //希望將異步任務的結果做進一步處理,并需要返回值,則使用thenApply方法。
  7. //如果主線程要獲取回調方法的返回,還是要用get()方法阻塞得到

組合兩個異步任務

 
 
 
 
  1. //thenCompose方法中的異步任務依賴調用該方法的異步任務
  2. public  CompletableFuture thenCompose(Function> fn); 
  3. //用于兩個獨立的異步任務都完成的時候
  4. public  CompletableFuture thenCombine(CompletionStage other, 
  5.                                               BiFunction fn); 

使用示例:

 
 
 
 
  1. CompletableFuture> poiFuture = CompletableFuture.supplyAsync(
  2.   () -> poiService.queryPoiIds(cityId, poiId)
  3. );
  4. //第二個任務是返回CompletableFuture的異步方法
  5. CompletableFuture> getDeal(List poiIds){
  6.   return CompletableFuture.supplyAsync(() ->  poiService.queryPoiIds(poiIds));
  7. }
  8. //thenCompose
  9. CompletableFuture> resultFuture = poiFuture.thenCompose(poiIds -> getDeal(poiIds));
  10. resultFuture.get();

thenCompose和thenApply的功能類似,兩者區(qū)別在于thenCompose接受一個返回CompletableFuture的Function,當想從回調方法返回的CompletableFuture中直接獲取結果U時,就用thenCompose

如果使用thenApply,返回結果resultFuture的類型是CompletableFuture>>,而不是CompletableFuture>

 
 
 
 
  1. CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
  2.   .thenCombine(CompletableFuture.supplyAsync(() -> "world"), (s1, s2) -> s1 + s2);
  3. //future.get()

組合多個CompletableFuture

當需要多個異步任務都完成時,再進行后續(xù)處理,可以使用allOf方法

 
 
 
 
  1. CompletableFuture poiIDTOFuture = CompletableFuture
  2.  .supplyAsync(() -> poiService.loadPoi(poiId))
  3.   .thenAccept(poi -> {
  4.     model.setModelTitle(poi.getShopName());
  5.     //do more thing
  6.   });
  7. CompletableFuture productFuture = CompletableFuture
  8.  .supplyAsync(() -> productService.findAllByPoiIdOrderByUpdateTimeDesc(poiId))
  9.   .thenAccept(list -> {
  10.     model.setDefaultCount(list.size());
  11.     model.setMoreDesc("more");
  12.   });
  13. //future3等更多異步任務,這里就不一一寫出來了
  14. CompletableFuture.allOf(poiIDTOFuture, productFuture, future3, ...).join();  //allOf組合所有異步任務,并使用join獲取結果

該方法挺適合C端的業(yè)務,比如通過poiId異步的從多個服務拿門店信息,然后組裝成自己需要的模型,最后所有門店信息都填充完后返回

這里使用了join方法獲取結果,它和get方法一樣阻塞的等待任務完成

多個異步任務有任意一個完成時就返回結果,可以使用anyOf方法

 
 
 
 
  1. CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
  2.     try {
  3.         TimeUnit.SECONDS.sleep(2);
  4.     } catch (InterruptedException e) {
  5.        throw new IllegalStateException(e);
  6.     }
  7.     return "Result of Future 1";
  8. });
  9. CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
  10.     try {
  11.         TimeUnit.SECONDS.sleep(1);
  12.     } catch (InterruptedException e) {
  13.        throw new IllegalStateException(e);
  14.     }
  15.     return "Result of Future 2";
  16. });
  17. CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
  18.     try {
  19.         TimeUnit.SECONDS.sleep(3);
  20.     } catch (InterruptedException e) {
  21.        throw new IllegalStateException(e);
  22.       return "Result of Future 3";
  23. });
  24. CompletableFuture anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);
  25. System.out.println(anyOfFuture.get()); // Result of Future 2
  26. 異常處理

     
     
     
     
    1. Integer age = -1;
    2. CompletableFuture maturityFuture = CompletableFuture.supplyAsync(() -> {
    3.   if(age < 0) {
    4.     throw new IllegalArgumentException("Age can not be negative");
    5.   }
    6.   if(age > 18) {
    7.     return "Adult";
    8.   } else {
    9.     return "Child";
    10.   }
    11. }).exceptionally(ex -> {
    12.   System.out.println("Oops! We have an exception - " + ex.getMessage());
    13.   return "Unknown!";
    14. }).thenAccept(s -> System.out.print(s));
    15. //Unkown!

    exceptionally方法可以處理異步任務的異常,在出現異常時,給異步任務鏈一個從錯誤中恢復的機會,可以在這里記錄異?;蚍祷匾粋€默認值

    使用handler方法也可以處理異常,并且無論是否發(fā)生異常它都會被調用

     
     
     
     
    1. Integer age = -1;
    2. CompletableFuture maturityFuture = CompletableFuture.supplyAsync(() -> {
    3.     if(age < 0) {
    4.         throw new IllegalArgumentException("Age can not be negative");
    5.     }
    6.     if(age > 18) {
    7.         return "Adult";
    8.     } else {
    9.         return "Child";
    10.     }
    11. }).handle((res, ex) -> {
    12.     if(ex != null) {
    13.         System.out.println("Oops! We have an exception - " + ex.getMessage());
    14.         return "Unknown!";
    15.     }
    16.     return res;
    17. });

    分片處理

    分片和并行處理:分片借助stream實現,然后通過CompletableFuture實現并行執(zhí)行,最后做數據聚合(其實也是stream的方法)

    CompletableFuture并不提供單獨的分片api,但可以借助stream的分片聚合功能實現

    舉個例子:

     
     
     
     
    1. //請求商品數量過多時,做分批異步處理
    2. List> skuBaseIdsList = ListUtils.partition(skuIdList, 10);//分片
    3. //并行
    4. List>> futureList = Lists.newArrayList();
    5. for (List skuId : skuBaseIdsList) {
    6.   CompletableFuture> tmpFuture = getSkuSales(skuId);
    7.   futureList.add(tmpFuture);
    8. }
    9. //聚合
    10. futureList.stream().map(CompletalbleFuture::join).collent(Collectors.toList());

    舉個例子

    帶大家領略下CompletableFuture異步編程的優(yōu)勢

    這里我們用CompletableFuture實現水泡茶程序

    首先還是需要先完成分工方案,在下面的程序中,我們分了3個任務:

    • 任務1負責洗水壺、燒開水
    • 任務2負責洗茶壺、洗茶杯和拿茶葉
    • 任務3負責泡茶。其中任務3要等待任務1和任務2都完成后才能開始

    下面是代碼實現,你先略過runAsync()、supplyAsync()、thenCombine()這些不太熟悉的方法,從大局上看,你會發(fā)現:

    • 無需手工維護線程,沒有繁瑣的手工維護線程的工作,給任務分配線程的工作也不需要我們關注;
    • 語義更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能夠清晰地表述任務3要等待任務1和任務2都完成后才能開始;
    • 代碼更簡練并且專注于業(yè)務邏輯,幾乎所有代碼都是業(yè)務邏輯相關的
     
     
     
     
    1. //任務1:洗水壺->燒開水
    2. CompletableFuture f1 = 
    3.   CompletableFuture.runAsync(()->{
    4.   System.out.println("T1:洗水壺...");
    5.   sleep(1, TimeUnit.SECONDS);
    6.   System.out.println("T1:燒開水...");
    7.   sleep(15, TimeUnit.SECONDS);
    8. });
    9. //任務2:洗茶壺->洗茶杯->拿茶葉
    10. CompletableFuture f2 = 
    11.   CompletableFuture.supplyAsync(()->{
    12.   System.out.println("T2:洗茶壺...");
    13.   sleep(1, TimeUnit.SECONDS);
    14.   System.out.println("T2:洗茶杯...");
    15.   sleep(2, TimeUnit.SECONDS);
    16.   System.out.println("T2:拿茶葉...");
    17.   sleep(1, TimeUnit.SECONDS);
    18.   return "龍井";
    19. });
    20. //任務3:任務1和任務2完成后執(zhí)行:泡茶
    21. CompletableFuture f3 = 
    22.   f1.thenCombine(f2, (__, tf)->{
    23.     System.out.println("T1:拿到茶葉:" + tf);
    24.     System.out.println("T1:泡茶...");
    25.     return "上茶:" + tf;
    26.   });
    27. //等待任務3執(zhí)行結果
    28. System.out.println(f3.join());
    29. void sleep(int t, TimeUnit u) {
    30.   try {
    31.     u.sleep(t);
    32.   }catch(InterruptedException e){}
    33. }

    注意事項

    1.CompletableFuture默認線程池是否滿足使用

    前面提到創(chuàng)建CompletableFuture異步任務的靜態(tài)方法runAsync和supplyAsync等,可以指定使用的線程池,不指定則用CompletableFuture的默認線程池

     
     
     
     
    1. private static final Executor asyncPool = useCommonPool ?
    2.         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

    可以看到,CompletableFuture默認線程池是調用ForkJoinPool的commonPool()方法創(chuàng)建,這個默認線程池的核心線程數量根據CPU核數而定,公式為Runtime.getRuntime().availableProcessors() - 1,以4核雙槽CPU為例,核心線程數量就是4*2-1=7個

    這樣的設置滿足CPU密集型的應用,但對于業(yè)務都是IO密集型的應用來說,是有風險的,當qps較高時,線程數量可能就設的太少了,會導致線上故障

    所以可以根據業(yè)務情況自定義線程池使用

    2.get設置超時時間不能串行get,不然會導致接口延時線程數量*超時時間


    名稱欄目:實現異步編程,這個工具類你得掌握!
    本文網址:http://m.5511xx.com/article/djojgoo.html