農林漁牧網

您現在的位置是:首頁 > 農業

Java併發程式設計實踐之道

2023-01-05由 一個即將退役的碼農 發表于 農業

姓名的資料型別是什麼

目錄:

一.生產者和消費者模式

二.線上問題定位

三.效能測試

四.非同步任務池

當你在進行併發程式設計時,看著程式的執行速度在自己的最佳化下執行得越來越快,你會覺得越來越有成就感,這就是併發程式設計的魅力。但與此同時,併發程式設計產生的問題和風險可能也會隨之而來。

一.生產者和消費者模式

執行緒通訊

,在多執行緒系統中,不同的執行緒執行不同的任務;如果這些任務之間存在聯絡,那麼執行這些任務的執行緒之間就必須能夠通訊,共同協調完成系統任務。

Java併發程式設計實踐之道

執行緒通訊

生產者、消費者案例

案例分析

在案例中明,蔬菜基地作為生產者,負責生產蔬菜,並向超市輸送生產的蔬菜;消費者透過向超市購買獲得蔬菜;超市怎作為生產者和消費者之間的共享資源,都會和超市有聯絡;蔬菜基地、共享資源、消費者之間的互動流程如下:

Java併發程式設計實踐之道

生產者、消費者案例

在這個案例中,

為什麼不設計成生產者直接與給消費者互動

?讓兩者直接交換資料不是更好嗎?選擇先把

資料儲存

到共享資源中,然後消費者再從共享資源中取出資料使用,中間多了一個環節不是更麻煩了?

其實不是的,設計成這樣是有原因的,因為這樣設計很好地體現了面向物件的

低耦合的設計理念

;透過這樣實現的程式能更加符合人的操作理念,更加貼合現實環境;同時,也能很好地避免因生產者與消費者直接互動而導致的操作不安全的問題。

我們來對

高耦合和低耦合

做一個對比就會很直觀了:

高(緊)耦合:生產者與消費者直接互動

,生產者(蔬菜基地)把蔬菜直接給到給消費者,雙方之間的依賴程度很高;此時,生產者中就必須持有消費者物件的引用,同樣的道理,消費者也必須要持有生產者物件的引用;這樣,消費者和生產者才能夠直接互動。

低(松)耦合:

引入一個

中間物件(共享資源)

來,將生產者、消費者中需要對外輸出或者從外資料的操作封裝到中間物件中,這樣,消費者和生產者將會持有這個中間物件的引用,遮蔽了生產者和消費者直接的資料互動。,大大見減小生產者和消費者之間的依賴程度。

關於高耦合和低耦合的區別,電腦中主機中的整合顯示卡和獨立顯示卡也是一個非常好的例子。

整合顯示卡

普遍都集成於CPU中,所以如果整合顯示卡出現了問題需要更換,那麼會連著CPU一塊更換,其維護成本與CPU其實是一樣的;

獨立顯示卡

需要插在主機板的顯示卡介面上才能與計算機通訊,其相對於整個計算機系統來說,是獨立的存在,即便出現問題需要更換,也只更換顯示卡即可。

案例的程式碼實現

接下來我們使用多執行緒技術實現該案例,案例程式碼如下:

蔬菜基地物件,VegetableBase。java

// VegetableBase。java // 蔬菜基地public class VegetableBase implements Runnable { // 超市例項 private Supermarket supermarket = null; public VegetableBase(Supermarket supermarket) { this。supermarket = supermarket; } @Override public void run() { for (int i = 0; i < 100; i++) { if (i % 2 == 0) { supermarket。push(“黃瓜”, 1300); System。out。println(“push : 黃瓜 ” + 1300); } else { supermarket。push(“青菜”, 1400); System。out。println(“push : 青菜 ” + 1400); } } }}

消費者物件,Consumer。java

// Consumer。java// 消費者public class Consumer implements Runnable {// 超市例項 private Supermarket supermarket = null; public Consumer(Supermarket supermarket) { this。supermarket = supermarket; } @Override public void run() { for (int i = 0; i < 100; i++) { supermarket。popup(); } }}

超市物件,Supermarket。java

// Supermarket。java// 超市public class Supermarket {// 蔬菜名稱 private String name;// 蔬菜數量 private Integer num;// 蔬菜基地想超市輸送蔬菜 public void push(String name, Integer num) { this。name = name; this。num = num; }// 使用者從超市中購買蔬菜 public void popup() {// 為了讓效果更明顯,在這裡模擬網路延遲 try { Thread。sleep(1000); } catch (InterruptedException e) { } System。out。println(“蔬菜:” + this。name + “, ” + this。num + “顆。”); }}

執行案例,App。java

// 案例應用入口public class App { public static void main(String[] args) {// 建立超市例項 Supermarket supermarket = new Supermarket();// 蔬菜基地執行緒啟動, 開始往超市輸送蔬菜 new Thread(new VegetableBase(supermarket))。start(); new Thread(new VegetableBase(supermarket))。start();// 消費者執行緒啟動,消費者開始購買蔬菜 new Thread(new Consumer(supermarket))。start(); new Thread(new Consumer(supermarket))。start(); }}

發現了問題

執行該案例,打印出執行結果,外表一片祥和,可還是被敏銳地發現了問題,問題如下所示:

Java併發程式設計實踐之道

案例執行中發現的問題

在一片看似祥和的列印結果中,出現了一個很不祥和的特例,生產基地在輸送蔬菜時,黃瓜的數量一直都是1300顆,青菜的數量一直是1400顆,但是在消費者消費時卻出現了

蔬菜名稱是黃瓜的,但數量卻是青菜的數量

的情況。

之所以出現這樣的問題,是因為在本案例共享的資源中,多個執行緒共同競爭資源時沒有使用

同步操作

,而是非同步操作,今兒導致了資源分配紊亂的情況;

需要注意的是,

並不是因為我們在案例中使用Thread.sleep();模擬網路延遲才導致問題出現,而是本來就存在問題,使用Thread.sleep();只是讓問題更加明顯。

案例問題的解決

在本案例中需要

解決的問題有兩個

,分別如下:

問題一:

蔬菜名稱和數量不匹配的問題。

問題二:

需要保證超市無貨時生產,超市有貨時才消費。

針對

問題一解決方案

:保證蔬菜基地在輸送蔬菜的過程保持同步,中間不能被其他執行緒(特別是消費者執行緒)干擾,打亂輸送操作;直至當前執行緒完成輸送後,其他執行緒才能進入操作,同樣的,當有執行緒進入操作後,其他執行緒只能在操作外等待。

所以,

技術方案

可以使用

同步程式碼塊/同步方法/Lock機制

來保持操作的同步性。

針對

問題二的解決方案

給超市一個有無貨的狀態標誌,超市無貨時,蔬菜基地輸送蔬菜補貨,此時生產基地執行緒可操作;

超市有貨時,消費者執行緒可操作;就是:保證生產基地 ——> 共享資源 ——> 消費者這個整個流程的完整執行。

技術方案

:使用執行緒中的

等待和喚醒機制

同步操作,

分為

同步程式碼塊

同步方法

兩種。詳情可檢視我的另外一篇關於多執行緒的文章:

Java 執行緒不安全分析,同步鎖和Lock機制,哪個解決方案更好

在同步程式碼塊中的

同步鎖

必須選擇

多個執行緒共同的資源物件

,當前生產者執行緒在生產資料的時候(先

擁有同步鎖

),其他執行緒就在鎖池中等待獲取鎖;當生產者執行緒執行完同步程式碼塊的時候,就會

釋放同步鎖

,其他執行緒開始搶鎖的使用權,搶到後就會擁有該同步鎖,執行完成後釋放,其他執行緒再開始搶鎖的使用權,依次往復執行。

多個執行緒只有使用

同一個物件

(就好比案例中的共享資源物件)的時候,多執行緒之間才有互斥效果,我們把這個

用來做互斥的物件稱之為同步監聽物件,

又稱

同步監聽器、互斥鎖、同步鎖

,同步鎖是一個抽象概念,可以理解為在物件上標記了一把鎖。

同步鎖物件可以選擇

任意型別的物件

即可,只需要保證多個執行緒使用的是相同鎖物件即可。

在任何時候,最多隻能執行一個執行緒擁有同步鎖

。因為只有同步監聽鎖物件才能呼叫wait和notify方法,wait和notify方法存在於Object類中。

執行緒通訊之 wait和notify方法

java.lang.Object類

中提供了用於操作執行緒通訊的方法,詳情如下:

wait():

執行該方法的執行緒物件會釋放同步鎖,然後JVM把該執行緒存放到

等待池

中,等待著其他執行緒來喚醒該執行緒;

notify():

執行該方法的執行緒會

喚醒

在等待池中處於等待狀態的的

任意一個執行緒

,把執行緒轉到

同步鎖池

中等待;

notifyAll():

執行該方法的執行緒會

喚醒

在等待池中

處於等待狀態的所有的執行緒

,把這些執行緒轉到

同步鎖池

中等待;

注意:上述方法只能被

同步監聽鎖物件

來呼叫,否則發生

IllegalMonitorStateException

wait和notify方法應用例項

假設

A執行緒

B執行緒

共同操作一個

X物件(同步鎖)

,A、B執行緒可以透過X物件的wait和notify方法來進行通訊,流程如下:

當A執行緒執行X物件的同步方法時,A執行緒持有X物件的鎖,B執行緒沒有執行機會,此時的B執行緒會在X物件的鎖池中等待;

當A執行緒在同步方法中執行X。wait()方法時,A執行緒會釋放X物件的同步鎖,然後進入X物件的等待池中;

接著,在X物件的鎖池中等待鎖的B執行緒獲取X物件的鎖,執行X的另一個同步方法;

當B執行緒在同步方法中執行X。notify()方法時,JVM會把A執行緒從X物件的等待池中轉到X物件的同步鎖池中,等待獲取鎖的使用權;

當B執行緒執行完同步方法後,會釋放擁有的鎖,然後A執行緒獲得鎖,繼續執行同步方法;

基於上述機制,我們就可以使用

同步操作 + wait和notify方法

來解決案例中的問題了,重新來實現共享資源——超市物件:

// 超市public class Supermarket {// 蔬菜名稱 private String name;// 蔬菜數量 private Integer num;// 超市是否為空 private Boolean isEmpty = true;// 蔬菜基地向超市輸送蔬菜 public synchronized void push(String name, Integer num) { try {// 超市有貨時,不再輸送蔬菜,而是要等待消費者獲取 while (!isEmpty) { this。wait(); } this。name = name; this。num = num; isEmpty = false; this。notify(); // 喚醒另一個執行緒 } catch (Exception e) { } }// 使用者從超市中購買蔬菜 public synchronized void popup() { try {// 超市無貨時,不再提供消費,而是要等待蔬菜基地輸送 while (isEmpty) { this。wait(); }// 為了讓效果更明顯,在這裡模擬網路延遲 Thread。sleep(1000); System。out。println(“蔬菜:” + this。name + “, ” + this。num + “顆。”); isEmpty = true; this。notify(); // 喚醒另一執行緒 } catch (Exception e) { } }}

執行緒通訊之 使用Lock和Condition介面

由於wait和notify方法,只能被同步監聽鎖物件來呼叫,否則發生

IllegalMonitorStateException。從Java 5開始,提供了

Lock機制

,同時還有

處理Lock機制的通訊控制的Condition介面

。Lock機制沒有同步鎖的概念,也就

沒有自動獲取鎖和自動釋放鎖

的這樣的操作了。

因為沒有同步鎖,所以Lock機制中的執行緒通訊就不能呼叫wait和notify方法了;同樣的,Java 5 中也提供瞭解決方案,因此從Java5開始,可以:

使用

Lock機制

取代

synchronized 程式碼塊

synchronized 方法

使用Condition介面物件的

await、signal、signalAll

方法取代Object類中的

wait、notify、notifyAll

方法;

Lock和Condition介面

的效能也比同步操作要高很多,所以這種方式也是我們推薦使用的方式。

我們可以使用

Lock機制和Condition介面

方法來解決案例中的問題,重新來實現的共享資源——超市物件,程式碼如下:

// 超市public class Supermarket {// 蔬菜名稱 private String name;// 蔬菜數量 private Integer num;// 超市是否為空 private Boolean isEmpty = true;// lock private final Lock lock = new ReentrantLock();// Condition private Condition condition = lock。newCondition();// 蔬菜基地向超市輸送蔬菜 public synchronized void push(String name, Integer num) { lock。lock(); // 獲取鎖 try {// 超市有貨時,不再輸送蔬菜,而是要等待消費者獲取 while (!isEmpty) { condition。await(); } this。name = name; this。num = num; isEmpty = false; condition。signalAll(); } catch (Exception e) { } finally { lock。unlock(); // 釋放鎖 } }// 使用者從超市中購買蔬菜 public synchronized void popup() { lock。lock(); try {// 超市無貨時,不再提供消費,而是要等待蔬菜基地輸送 while (isEmpty) { condition。await(); }// 為了讓效果更明顯,在這裡模擬網路延遲 Thread。sleep(1000); System。out。println(“蔬菜:” + this。name + “, ” + this。num + “顆。”); isEmpty = true; condition。signalAll(); } catch (Exception e) { } finally { lock。unlock(); } }}

二.線上問題定位

背景

大家都知道,在服務/應用釋出到預覽或者線上環境時,經常會出現一些測試中沒有出現的問題。並且由於環境所限,我們也不可能在線上除錯程式碼,所以只能透過日誌、系統資訊和dump等手段來在線上定位問題。

通常需要藉助一些工具,例如jdk本身提供的一些jmap,jstack等等,或者是阿里提供的比較強大的Arthus,另外就是最基礎的一些命令。根據經驗,系統上發生的主要問題是在cpu、記憶體、磁碟幾個方面,因此會優先針對這類問題進行定位。由於絕大部分服務都是部署在Linux環境下,所以一下以Linux命令為例進行說明。

top命令

top命令可以用於查詢每個程序的情況,顯示資訊如下:

top - 22:32:03 up 180 days, 7:23, 1 user, load average: 0。07, 0。06, 0。05Tasks: 106 total, 1 running, 105 sleeping, 0 stopped, 0 zombie%Cpu(s): 1。5 us, 1。0 sy, 0。0 ni, 97。5 id, 0。0 wa, 0。0 hi, 0。0 si, 0。0 st

KiB Mem : 16266504 total, 324836 free, 6100252 used, 9841416 buff/cache

KiB Swap: 0 total, 0 free, 0 used。 9827120 avail Mem

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND

1466 root 20 0 3243684 665836 14684 S 0。7 4。1 82:18。89 java

660 root 20 0 835120 25288 8036 S 0。3 0。2 718:10。90 exe

4835 root 20 0 6715536 129904 13368 S 0。3 0。8 400:01。40 java

8287 root 20 0 1003108 118464 18812 S 0。3 0。7 731:56。27 node /opt/my-ya

8299 root 20 0 1002164 107792 18816 S 0。3 0。7 730:11。28 node /opt/my-ya

8395 root 20 0 611552 35476 14504 S 0。3 0。2 14:17。25 node /opt/qkd-n 10184 root 20 0 3089652 673520 15880 S 0。3 4。1 83:32。81 java 12882 root 20 0 917540 64556 16156 S 0。3 0。4 543:55。74 PM2 v4。4。0: God 13556 root 20 0 2998424 556848 14548 S 0。3 3。4 496:48。18 java 14293 root 10 -10 151296 26920 6880 S 0。3 0。2 1868:03 AliYunDun 14755 root 20 0 3030352 676388 14720 S 0。3 4。2 49:16。41 java 22908 root 20 0 623456 38892 14536 S 0。3 0。2 98:50。65 node /opt/qkd-n 22936 root 20 0 622680 39712 14532 S 0。3 0。2 98:27。12 node /opt/qkd-n 24142 root 20 0 3303328 659496 14716 S 0。3 4。1 23:20。38 java 25566 root 20 0 706964 52660 16308 S 0。3 0。3 19:17。11 node /opt/qkd-n 25597 root 20 0 708020 53112 16308 S 0。3 0。3 19:06。83 node /opt/qkd-

如上面內容所示,需要注意一下各列的含義,這裡再重複一遍,如下表所示:

Java併發程式設計實踐之道

由於限定我們的應用是Java應用,所以只需要關注COMMOND列是java的程序資訊。

有時候%CPU這列的數字可能會超過100%,這不一定是出了問題,因為是機器所有核加在一起的CPU利用率,所以我們需要計算一下,平均每個核上的利用比例,再來確定是否是CPU使用過高,進而再去分析是否發生了死迴圈、記憶體回收等問題的可能。

在top命令出來的介面下,輸入1(top的互動命令數字),可以檢視每個CPU的效能資訊:

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND

5875 qkadmin 20 0 163144 3324 1612 R 1。0 0。0 0:00。24 top

1498 root 20 0 3243684 665836 14684 S 0。3 4。1 39:27。18 java 27412 root 20 0 3243684 665836 14684 S 0。3 4。1 15:14。25 java

4982 root 20 0 6715536 129904 13368 S 0。3 0。8 198:59。46 java

8287 root 20 0 1003108 118728 18812 S 0。3 0。7 688:11。51 node /opt/my-ya 10289 root 20 0 3089652 673520 15880 S 0。3 4。1 30:15。15 java 12261 root 20 0 803192 10800 4592 S 0。3 0。1 10:05。35 aliyun-service 12263 root 20 0 803192 10800 4592 S 0。3 0。1 5:45。73 aliyun-service 14351 root 20 0 2998424 556848 14548 S 0。3 3。4 1:14。78 java

以上是我們某臺機器上的實時資料,因為當前執行正常,所以沒有異常資料。但看一下下面的資料:

Java併發程式設計實踐之道

命令列顯示了5個CPU,說明是一個5核的機器,平均每個CPU利用率在60%以上。有時可能存在CPU利用率達到100%,如果出現這種情況,那麼很有可能是程式碼中寫了死迴圈,繼續看程式碼定位問題原因。

CPU引數的含義如下:

Java併發程式設計實踐之道

互動命令H,可以檢視每個執行緒的效能資訊:

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND

5875 qkadmin 20 0 163144 3324 1612 R 1。0 0。0 0:00。24 top

1498 root 20 0 3243684 665836 14684 S 0。3 4。1 39:27。18 java 27412 root 20 0 3243684 665836 14684 S 0。3 4。1 15:14。25 java

4982 root 20 0 6715536 129904 13368 S 0。3 0。8 198:59。46 java

8287 root 20 0 1003108 118728 18812 S 0。3 0。7 688:11。51 node /opt/my-ya 10289 root 20 0 3089652 673520 15880 S 0。3 4。1 30:15。15 java 12261 root 20 0 803192 10800 4592 S 0。3 0。1 10:05。35 aliyun-service 12263 root 20 0 803192 10800 4592 S 0。3 0。1 5:45。73 aliyun-service 14351 root 20 0 2998424 556848 14548 S 0。3 3。4 1:14。78 java

可能發生的幾個問題和對應的現象有:

1、某個執行緒,CPU利用率一直在100%左右,那麼說明這個執行緒很有可能出現死迴圈,記住這個PID,並進一步定位具體應用;另外也可能是出現記憶體洩漏,觸發頻繁GC導致。這種情況,可以使用jstat命令檢視GC情況,以分析是否持久代或老年代記憶體區域滿導致觸發Full GC,進而使CPU利用率飆高,命令和顯示資訊如下(81443是當前機器上觀察的程序id):

jstat -gcutil 81443 1000 5

資訊:

S0 S1 E O M CCS YGC YGCT FGC FGCT GCT

0。00 53。94 78。77 0。05 97。26 93。39 1 0。006 0 0。000 0。006

0。00 53。94 78。77 0。05 97。26 93。39 1 0。006 0 0。000 0。006

0。00 53。94 78。77 0。05 97。26 93。39 1 0。006 0 0。000 0。006

0。00 53。94 78。77 0。05 97。26 93。39 1 0。006 0 0。000 0。006

0。00 53。94 78。77 0。05 97。26 93。39 1 0。006 0 0。000 0。006

dump

下一步,可以把執行緒dump下來,然後再繼續分析是哪個執行緒、執行到那段程式碼導致CPU利用率飆高。使用命令可以參考如下:

jstack 81443 > 。/dump01

dump檔案內容:

192:dubbo-proxy-tools xxx$ cat dump01 2021-02-13 22:51:08

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25。212-b10 mixed mode):

“Attach Listener” #14 daemon prio=9 os_prio=31 tid=0x00007f8cef903000 nid=0x1527 waiting on condition [0x0000000000000000]

java。lang。Thread。State: RUNNABLE

“DestroyJavaVM” #12 prio=5 os_prio=31 tid=0x00007f8cef91d000 nid=0x2803 waiting on condition [0x0000000000000000]

java。lang。Thread。State: RUNNABLE

“JPS event loop” #10 prio=5 os_prio=31 tid=0x00007f8cf1153800 nid=0xa703 runnable [0x0000700003656000]

java。lang。Thread。State: RUNNABLE

at sun。nio。ch。KQueueArrayWrapper。kevent0(Native Method)

at sun。nio。ch。KQueueArrayWrapper。poll(KQueueArrayWrapper。java:198)

at sun。nio。ch。KQueueSelectorImpl。doSelect(KQueueSelectorImpl。java:117)

at sun。nio。ch。SelectorImpl。lockAndDoSelect(SelectorImpl。java:86)

- locked <0x00000007b5700798> (a io。netty。channel。nio。SelectedSelectionKeySet)

- locked <0x00000007b57007b0> (a java。util。Collections$UnmodifiableSet)

- locked <0x00000007b5700748> (a sun。nio。ch。KQueueSelectorImpl)

at sun。nio。ch。SelectorImpl。select(SelectorImpl。java:97)

at io。netty。channel。nio。SelectedSelectionKeySetSelector。select(SelectedSelectionKeySetSelector。java:62)

at io。netty。channel。nio。NioEventLoop。select(NioEventLoop。java:753)

at io。netty。channel。nio。NioEventLoop。run(NioEventLoop。java:408)

at io。netty。util。concurrent。SingleThreadEventExecutor$5。run(SingleThreadEventExecutor。java:897)

at java。lang。Thread。run(Thread。java:748)

“Service Thread” #9 daemon prio=9 os_prio=31 tid=0x00007f8cf3822800 nid=0x5503 runnable [0x0000000000000000]

java。lang。Thread。State: RUNNABLE

“C1 CompilerThread3” #8 daemon prio=9 os_prio=31 tid=0x00007f8cf1802800 nid=0x3a03 waiting on condition [0x0000000000000000]

java。lang。Thread。State: RUNNABLE

“C2 CompilerThread2” #7 daemon prio=9 os_prio=31 tid=0x00007f8cf480c000 nid=0x3c03 waiting on condition [0x0000000000000000]

java。lang。Thread。State: RUNNABLE

執行緒id (nid=0x2803) 是16進位制,可與轉成10進位制,來跟top命令觀察的id對應(可以簡單地使用 printf “%x\n” 0x5503即可):

192:dubbo-proxy-tools xxxx$ printf “%x\n” 0x55035503

2、某個執行緒一直在top 10的位置,那麼說明該執行緒可能有效能問題

3、CPU利用率高的執行緒不斷變化,說明不是某一個執行緒導致的CPU利用率飆高

三。 效能測試

測試併發程式而言,所面臨的主要挑戰在於:潛在的錯誤發生具有不確定性,需要比普通的序列程式測試更廣的範圍並且執行更長的時間。

併發測試大致分為兩類:安全性測試和活躍性測試。

安全測試 ——- 通常採用測試不變性條件的形式,即判斷某個類的行為是否與其他規範保持一致。

活躍性測試 ——- 包括進展測試和無進展測試兩個方面(很難量化)。

效能測試——- 效能測試與活躍性測試相關,主要透過:吞吐量、響應性、可伸縮性衡量。

正確性測試

測試併發類設計單元測試時,首先要執行與測試序列類時相同的分析——找出需要檢查的不變性條件與後驗條件。(不變性條件:判斷狀態是有效還是無效,後驗條件:判斷狀態改變後是否有效)。接下來講透過構建一個基於Semaphore來實現的快取的有界快取,測試快取的正確性。

基本單元測試(基於訊號量有界快取BoundedBuffer例子)

知識鋪墊(Semaphore可以控制同時訪問資源的執行緒個數,例如,實現一個檔案允許的併發訪問數。單個訊號量的Semaphore物件可以實現互斥鎖的功能,並且可以是由一個執行緒獲得了“鎖”,再由另一個執行緒釋放“鎖”)

BoundedBuffer 用一個泛型陣列、Semaphore 實現了一個固定長度的、可以快取佇列可刪除可插入個數的佇列。availableItems表示可以從快取中刪除的元素個數。availableSpaces表示可以插入到快取的元素個數,初始值等於快取的大小。分析:

availableItems 設定為0,要求任何執行緒在accquire之前要release保證了佇列必須插入有值才能take

availableSpaces 初始值為大小為capacity,表明佇列最大值為capacity,同時也表明一開始最多有capacity個執行緒可以同時put佇列,當然隨著佇列插入可同時插入的執行緒變少。

在最關鍵的插入、取出佇列的操作中,採用synchronized 包裝兩個方法,保證了同步性。

說明:在實際使用中,肯定不可能自己編寫一個有界快取,但是此例實現的思路值得學習。如果實際需要使用有界快取,應該直接使用ArrayBlockingQueue或者LinkedBlockingQueue。

public class BoundedBuffer {//可用訊號量、空間訊號量 private final Semaphore availableItems, availableSpaces; private final E[] items;//快取 private int putPosition = 0, takePosition = 0;//放、取索引位置 public BoundedBuffer(int capacity) { availableItems = new Semaphore(0);//初始時沒有可用的元素 availableSpaces = new Semaphore(capacity);//初始時空間訊號量為最大容量 items = (E[]) new Object[capacity]; } public boolean isEmpty() {//如果可用訊號量為0,則表示快取為空 return availableItems。availablePermits() == 0; } public boolean isFull() {//如果空間訊號量為0,表示快取已滿 return availableSpaces。availablePermits() == 0; } public void put(E x) throws InterruptedException { availableSpaces。acquire();//阻塞獲取空間訊號量 doInsert(x); availableItems。release();//可用訊號量加1 } public E take() throws InterruptedException { availableItems。acquire(); E item = doExtract(); availableSpaces。release(); return item; } private synchronized void doInsert(E x) { int i = putPosition; items[i] = x; putPosition = (++i == items。length) ? 0 : i; } private synchronized E doExtract() { int i = takePosition; E x = items[i]; items[i] = null;//加快垃圾回收 takePosition = (++i == items。length) ? 0 : i; return x; }}

先進行基本的單元測試:該基本的單元測試相當於序列上下文中執行的測試,測試了BoundedBuffer的所有方法,間接驗證其後驗條件和不變性條件。

public class BoundedBufferTest extends TestCase {//剛構造好的快取是否為空測試 public void testIsEmptyWhenConstructed() { BoundedBuffer bb = new BoundedBuffer(10); assertTrue(bb。isEmpty()); assertFalse(bb。isFull()); }//測試是否滿 public void testIsFullAfterPuts() throws InterruptedException { BoundedBuffer bb = new BoundedBuffer(10); for (int i = 0; i < 10; i++) bb。put(i); assertTrue(bb。isFull()); assertFalse(bb。isEmpty()); }

對阻塞行為與對中斷響應的測試

在測試併發的基本屬性時,需要引入多個執行緒。大多數測試框架並不能很好的支援併發性測試,

測試阻塞行為,當然執行緒被阻塞不再執行時,阻塞才是成功的,為了讓阻塞行為效果更明顯,可以在阻塞方法中丟擲異常。當阻塞發生後,要使方法解除阻塞最簡單的方式是採用中斷,可以在阻塞方法發生後,執行緒阻塞後再中斷它,當然這要求阻塞方法提取返回或者丟擲InterrupedException來響應中斷。

例如BoundedBuffer的阻塞行為以及對中斷的響應性測試,如果從空快取中獲取一個元素,如果take方法成功,表明測試失敗。在等待“獲取”一段時間後,再中斷該執行緒,如果執行緒在呼叫Object類的wait()、 join() 或者sleep()方法被強行中斷,那麼將會丟擲InterruptedException。

public class TestBoundedBufferBlock extends TestCase { public void testTakeBlocksWhenEmpty() { final BoundedBuffer bb = new BoundedBuffer(10); Thread taker = new Thread() { public void run() { try { int unused = bb。take(); System。out。println(“由於阻塞,將不會輸出此句”); fail(); // 如果執行到這裡,就說明有錯誤,fail會丟擲異常 } catch (InterruptedException success) { success。printStackTrace(); } } }; try { taker。start(); Thread。sleep(100);// taker。interrupt();//中斷阻塞執行緒 taker。join(1000);//等待阻塞執行緒完成 assertFalse(taker。isAlive());//斷言阻塞執行緒已終止 } catch (Exception unexpected) { fail(); } } public static void main(String[] args) { TestBoundedBufferBlock boundedBufferTest = new TestBoundedBufferBlock(); boundedBufferTest。testTakeBlocksWhenEmpty(); }}

可進行interrupt強行中斷,將丟擲異常InterrupedException。

Java併發程式設計實踐之道

以下結果是,執行執行緒執行join()方法,設定等待該執行緒最長時間為1秒,最後斷言執行緒已中止。

Java併發程式設計實踐之道

安全性測試(測試BoundedBuffer生產者—消費者)

安全性測試即測試是否會發生資料競爭從而引發錯誤,測試類似BoundedBuffer生產者—消費者模式的類,需要建立多個執行緒來分別執行put和take操作。

為了儘可能達到多個執行緒並行執行,避免執行緒交替執行達不到預期的結果,可採用CountDownLatch或者CyclicBarrier。若採用兩個CountDownLatch,其中一個作為開啟閥門,另一個作為結束閥門。若採用CyclicBarrier相對簡單。(CyclicBarrier初始化時規定一個數目,然後計算呼叫了CyclicBarrier。await()進入等待的執行緒數。當執行緒數達到了這個數目時,所有進入等待狀態的執行緒被喚醒並繼續。 )

分析:

PutTakeTest開啟了10對生產—消費者執行緒,初始化CyclicBarrier時將計數值指定為工作者匯流排程的數量再加1,並在執行開始和結束時,使工作者執行緒和測試執行緒都在這個柵欄處等待。這能確保所有執行緒在開始執行任何工作之前,都首先執行到同一位置。

透過一個對順序敏感的校驗和計算函式來計算所有入列元素以及出列元素的校驗和,並進行比較。如果兩者相等,程式最終沒有報錯,則測試就是成功的。事實上也確實沒有報錯。

public class PutTakeTest extends TestCase { protected static final ExecutorService pool = Executors 。newCachedThreadPool(); protected CyclicBarrier barrier;//為了儘量做到真正併發,使用屏障 protected final BoundedBuffer bb; protected final int nTrials, nPairs;//元素個數、生產與消費執行緒數 protected final AtomicInteger putSum = new AtomicInteger(0);//放入元素檢驗和 protected final AtomicInteger takeSum = new AtomicInteger(0);//取出元素檢驗和 public static void main(String[] args) throws Exception { new PutTakeTest(10, 10, 100000)。test(); // sample parameters pool。shutdown(); } public PutTakeTest(int capacity, int npairs, int ntrials) { this。bb = new BoundedBuffer(capacity); this。nTrials = ntrials; this。nPairs = npairs; this。barrier = new CyclicBarrier(npairs * 2 + 1); } void test() { try { for (int i = 0; i < nPairs; i++) { pool。execute(new Producer());//提交生產任務 pool。execute(new Consumer());//提交消費任務 } barrier。await(); // 等待所有執行緒都準備好 barrier。await(); // 等待所有執行緒完成,即所有執行緒都執行到這裡時才能往下執行 assertEquals(putSum。get(), takeSum。get());//如果不等,則會拋異常 } catch (Exception e) { throw new RuntimeException(e); } } class Producer implements Runnable { public void run() { try {//等待所有生產-消費執行緒、還有主執行緒都準備好後才可以往後執行 barrier。await();// 種子,即起始值 int seed = (this。hashCode() ^ (int) System。nanoTime()); int sum = 0;//執行緒內部檢驗和 for (int i = nTrials; i > 0; ——i) { bb。put(seed);//入隊 /* * 累計放入檢驗和,為了不影響原程式,這裡不要直接使用全域性的 * putSum來累計,而是等每個執行緒試驗完後再將內部統計的結果一 * 次性存入 */ sum += seed; seed = xorShift(seed);//根據種子隨機產生下一個將要放入的元素 }//試驗完成後將每個執行緒的內部檢驗和再次累計到全域性檢驗和 putSum。getAndAdd(sum);//等待所有生產-消費執行緒、還有主執行緒都完成後才可以往後執行 barrier。await(); } catch (Exception e) { throw new RuntimeException(e); } } } class Consumer implements Runnable { public void run() { try {//等待所有生產-消費執行緒、還有主執行緒都準備好後才可以往後執行 barrier。await(); int sum = 0; for (int i = nTrials; i > 0; ——i) { sum += bb。take(); } takeSum。getAndAdd(sum);//等待所有生產-消費執行緒、還有主執行緒都完成後才可以往後執行 barrier。await(); } catch (Exception e) { throw new RuntimeException(e); } } } /* * 測試時儘量不是使用類庫中的隨機函式,大多數的隨機數生成器都是執行緒安全的, * 使用它們可能會影響原本的效能測試。在這裡我們也不必要使用高先是的隨機性。 * 所以使用簡單而快的隨機演算法在這裡是必要的。 */ static int xorShift(int y) { y ^= (y << 6); y ^= (y >>> 21); y ^= (y << 7); return y; }}

資源管理測試(測試資源洩露)

書中原文,透過一些測量應用程式中記憶體使用情況的堆檢查工具,可以很容易地測試出對記憶體的不合理佔用,許多商用和開源的堆分析工具中都支援這種功能。下面程式的testLeak方法中包含了一些堆分析工具用於抓取堆的快照,這將強制執行一次垃圾回收,然後記錄堆大小和記憶體使用量資訊。

testLeak方法將多個大型物件插入到一個有界快取中,然後將它們移除。第2個堆快照中的記憶體用量應該與第1個堆快照中的記憶體用量基本相同。然而,doExtract如果忘記將返回元素的引用置為空(items[i] = null),那麼在兩次快照中報告的記憶體用量將明顯不同。(這是為數不多幾種需要顯式地將變數置空的情況之一。大多數情況下,這種做法不僅不會帶來幫助,甚至還會帶來負面作用。)

功力不足,並不能看出垃圾回收中堆是如何變化的。待補充

//大物件

class Big {

double[] data = new double[100000]; }void testLeak() throws InterruptedException {

BoundedBuffer bb = new BoundedBuffer(CAPACITY);

//使用前堆大小快照,這裡可以呼叫第三方堆追蹤(heap-profiling)工具來記錄。堆追蹤工具會強制進行垃圾回收,然後記錄下堆大小和記憶體用量資訊

int heapSize1 = /* snapshot heap */ ;

for (int i = 0; i < CAPACITY; i++)

bb。put(new Big());

for (int i = 0; i < CAPACITY; i++)

bb。take();

int heapSize2 = /* snapshot heap */ ;

assertTrue(Math。abs(heapSize1-heapSize2) < THRESHOLD);}

使用回撥

在構造測試案例時,對客戶提供的程式碼進行回撥是非常有幫助的。回撥函式的執行通常是在物件生命週期的一些已知位置上,並且在這些位置上非常適合判斷不變性條件是否被破壞。例如,在ThreadPoolExecutor中將呼叫任務的Runnable和ThreadFactory。

透過使用自定義的執行緒工廠,可以對執行緒的建立過程進行控制。下面程式TestingThreadFactory中將記錄已建立執行緒的數量。這樣,在測試過程中,測試方案可以驗證已建立執行緒的數量。

我們還可以對TestingThreadFactory進行擴充套件,使其返回一個自定義的Thread,並且該物件可以記錄自己在何時結束,從而在測試方案中驗證執行緒在被回收時是否與執行策略一致。

class TestingThreadFactory implements ThreadFactory { public final AtomicInteger numCreated = new AtomicInteger();//記錄已建立的工作執行緒數 private final ThreadFactory factory = Executors。defaultThreadFactory(); public Thread newThread(Runnable r) {//Executor框架在建立工作執行緒時回撥此方法 numCreated。incrementAndGet(); return factory。newThread(r); }}

如果執行緒池的基本大小小於最大大小,那麼執行緒池會根據執行需求相應增長。當把一些執行時間較長的任務提交給執行緒池時,執行緒池中的任務數量在長時間內都不會變化,這就可以進行一些判斷,例如測試執行緒池是否能按照預期的方式擴充套件,如下程式:

public class TestThreadPool extends TestCase { private final TestingThreadFactory threadFactory = new TestingThreadFactory(); public void testPoolExpansion() throws InterruptedException { int MAX_SIZE = 10; ExecutorService exec = Executors。newFixedThreadPool(MAX_SIZE); for (int i = 0; i < 10 * MAX_SIZE; i++) exec。execute(new Runnable() { public void run() { try { Thread。sleep(Long。MAX_VALUE); } catch (InterruptedException e) { Thread。currentThread()。interrupt(); } } }); for (int i = 0; i < 20 && threadFactory。numCreated。get() < MAX_SIZE; i++) Thread。sleep(100); assertEquals(threadFactory。numCreated。get(), MAX_SIZE); exec。shutdownNow(); }}

使用Thread。yield產生更多的交替操作

由於併發程式碼中的大多數錯誤都是一些低機率事件,因此在測試併發錯誤時需要反覆地執行許多次,但有些方法可以提高發現這些錯誤的機率。有一種有用的方法可以提高交替操作的數量,以便能有效地搜尋程式的狀態空間:在訪問共享狀態的操作中,使用Thread。yield將產生更多的上下文切換。(這項技術的有效性與具體的平臺相關,因為JVM可以將Thread。yield作用一個空操作。如果使用一個睡眠時間較短的sleep,那麼雖然慢些,但卻更可靠。)

下面程式中的方法在兩個賬戶之間執行轉賬操作,在兩次更新操作之間,像”所有賬戶的總和應等於零“這樣的一些不變性條件可能會被破壞。當代碼在訪問狀態時沒有使用足夠的同步,將存在一些對執行時序敏感的錯誤,透過在某個操作的執行過程中呼叫yield方法,可以將這些錯誤暴露出來。這種方法需要在測試中新增一些呼叫並且在正式產品中刪除這些呼叫,這將給開發人員帶來不便,透過使用面向方面程式設計(AOP)的工具,可以降低這種不便性。

使用Thread.yield,讓執行緒從Thread.yield呼叫點切換到另一執行緒,有助於發現Bug,該方法只適合用於測試環境中

。下面使用該方法在取出與存入間切換到另一執行緒:

public synchronized void transferCredits(Account from, Account to,int amount){ from。setBalance(from。getBalance()-amount); if(random。nextInt(1000)>THRESHOLD) Thread。yield();//切換到另一執行緒 to。setBalance(to。getBalance()+amount);}

效能測試

使用CyclicBarrier測量併發執行時間與吞吐率

以上面的PutTakeTest,給它加上時間測量特性。測試效能時的時間最好取多個執行緒的平均消耗時間,這樣會精確一些。在PutTakeTest中我們已經使用了CyclicBarrier去同時啟動和結束工作者執行緒了,所以我們只要使用一個關卡動作(在所有執行緒都達關卡點後開始執行的動作)來記錄啟動和結束時間,就完成了對併發執行時間的測試。下面是擴充套件後的PutTakeTest,:

public class TimedPutTakeTest extends PutTakeTest { private BarrierTimer timer = new BarrierTimer(); public TimedPutTakeTest(int cap, int pairs, int trials) { super(cap, pairs, trials); barrier = new CyclicBarrier(nPairs * 2 + 1, timer); } public void test() { try { timer。clear(); for (int i = 0; i < nPairs; i++) { pool。execute(new PutTakeTest。Producer()); pool。execute(new PutTakeTest。Consumer()); } barrier。await();//等待所有執行緒都準備好後開始往下執行 barrier。await();//等待所有線都執行完後開始往下執行//每個元素完成處理所需要的時間 long nsPerItem = timer。getTime() / (nPairs * (long) nTrials); System。out。print(“Throughput: ” + nsPerItem + “ ns/item”); assertEquals(putSum。get(), takeSum。get()); } catch (Exception e) { throw new RuntimeException(e); } } public static void main(String[] args) throws Exception { int tpt = 100000; // 每對執行緒(生產-消費)需處理的元素個數//測試快取容量分別為1、10、100、1000的情況 for (int cap = 1; cap <= 1000; cap *= 10) { System。out。println(“Capacity: ” + cap);//測試工作執行緒數1、2、4、8、16、32、64、128的情況 for (int pairs = 1; pairs <= 128; pairs *= 2) { TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt); System。out。print(“Pairs: ” + pairs + “\t”);//測試兩次 t。test();//第一次 System。out。print(“\t”); Thread。sleep(1000); t。test();//第二次 System。out。println(); Thread。sleep(1000); } } PutTakeTest。pool。shutdown(); }//關卡動作,在最後一個執行緒達到後執行。在該測試中會執行兩次://一次是執行任務前,二是所有任務都執行完後 static class BarrierTimer implements Runnable { private boolean started;//是否是第一次執行關卡活動 private long startTime, endTime; public synchronized void run() { long t = System。nanoTime(); if (!started) {//第一次關卡活動走該分支 started = true; startTime = t; } else//第二次關卡活動走該分支 endTime = t; } public synchronized void clear() { started = false; } public synchronized long getTime() {//任務所耗時間 return endTime - startTime; } }}

執行結果:

Capacity: 1

Pairs: 1 Throughput: 9440 ns/item Throughput: 9308 ns/item

Pairs: 2 Throughput: 12159 ns/item Throughput: 12111 ns/item

Pairs: 4 Throughput: 12198 ns/item Throughput: 12234 ns/item

Pairs: 8 Throughput: 13001 ns/item Throughput: 13432 ns/item

Pairs: 16 Throughput: 12672 ns/item Throughput: 12930 ns/item

Pairs: 32 Throughput: 12409 ns/item Throughput: 14012 ns/item

Pairs: 64 Throughput: 12551 ns/item Throughput: 12619 ns/item

Pairs: 128 Throughput: 11897 ns/item Throughput: 11806 ns/item

Capacity: 10

Pairs: 1 Throughput: 1444 ns/item Throughput: 1231 ns/item

Pairs: 2 Throughput: 1190 ns/item Throughput: 1186 ns/item

Pairs: 4 Throughput: 1283 ns/item Throughput: 1283 ns/item

Pairs: 8 Throughput: 1251 ns/item Throughput: 1263 ns/item

Pairs: 16 Throughput: 1227 ns/item Throughput: 1236 ns/item

Pairs: 32 Throughput: 1216 ns/item Throughput: 1221 ns/item

Pairs: 64 Throughput: 1208 ns/item Throughput: 1282 ns/item

Pairs: 128 Throughput: 1265 ns/item Throughput: 1227 ns/item

Capacity: 100

Pairs: 1 Throughput: 519 ns/item Throughput: 473 ns/item

Pairs: 2 Throughput: 374 ns/item Throughput: 370 ns/item

Pairs: 4 Throughput: 302 ns/item Throughput: 289 ns/item

Pairs: 8 Throughput: 286 ns/item Throughput: 286 ns/item

Pairs: 16 Throughput: 306 ns/item Throughput: 311 ns/item

Pairs: 32 Throughput: 310 ns/item Throughput: 316 ns/item

Pairs: 64 Throughput: 322 ns/item Throughput: 321 ns/item

Pairs: 128 Throughput: 324 ns/item Throughput: 323 ns/item

Capacity: 1000

Pairs: 1 Throughput: 393 ns/item Throughput: 484 ns/item

Pairs: 2 Throughput: 267 ns/item Throughput: 315 ns/item

Pairs: 4 Throughput: 192 ns/item Throughput: 278 ns/item

Pairs: 8 Throughput: 277 ns/item Throughput: 212 ns/item

Pairs: 16 Throughput: 218 ns/item Throughput: 226 ns/item

Pairs: 32 Throughput: 214 ns/item Throughput: 242 ns/item

Pairs: 64 Throughput: 245 ns/item Throughput: 251 ns/item

Pairs: 128 Throughput: 261 ns/item Throughput: 260 ns/item

分析:

快取size為1時,即cap為1時,BoundedBuffer的Capacity為1時,BoundedBuffer中的Semaphore限定了每次只能1個執行緒訪問佇列,每個執行緒在阻塞等待前一個使用有界快取佇列的執行緒,當快取提高至10,吞吐量得到了極大的提高,從上面的執行結果也可看出,併發執行時間極大的縮小。但是執行緒增加時,吞吐率卻有所下降,執行時間不見得有很大的降低,原因在於雖然有許多執行緒,但卻沒有足夠多的計算量,大多數的時間都消耗線上程的阻塞與解除阻塞操作上。圖中,吞吐率已歸一化,size為快取大小。

Java併發程式設計實踐之道

不同快取佇列效能測試比較

雖然上面的BoundedBuffer是一種相當可靠的實現,它的執行機制也非常合理,但是它還不足以和ArrayBlockingQueue 與LinkedBlockingQueue相提並論,這也解釋了為什麼這種快取演算法沒有被選入類庫中。併發類庫中的演算法已經被選擇並調整到最佳效能狀態了。BoundedBuffer效能不高的主要原因:put和take操作分別都有多個操作可能遇到競爭——獲取一個訊號量,獲取一個鎖、釋放訊號量。

在測試的過程中發現LinkedBlockingQueue的伸縮性好於ArrayBlockingQueue,這主要是因為連結串列佇列的put和take操作允許有比基於陣列的佇列更好的併發訪問,好的連結串列佇列演算法允許佇列的頭和尾彼此獨立地更新。由於記憶體分配操作通常是執行緒本地的,因此如果演算法能透過執行一些記憶體分配操作來降低競爭程度,那麼這種演算法通常具有更高的可伸縮性。這種情況再次證明了,基於傳統的效能調優直覺與提升可伸縮性的實際需求是背道而馳的。

Java併發程式設計實踐之道

Throughput表示吞吐率

響應性衡量

響應性透過任務完成的時間來衡量。除非執行緒由於密集的同步需求而被持續的阻塞,否則非公平的訊號量通常能實現更好的吞吐量,而公平的訊號量則實現更低的變動性(公平性開銷主要由於執行緒阻塞所引起)。下圖為TimePutTakeTest中使用1000個快取,256個併發任務中每個任務完成時間,其中每個任務都是用非公平訊號量(隱蔽柵欄,Shaded Bars)和公平的訊號量(開放柵欄,open bars)來迭代1000個元素,其中非公平訊號量完成時間從104毫秒到8714毫米,相差80倍。若採用同步控制實現更高的公平性,能縮小任務完成時間變動範圍(變動性),但是會極大的降低吞吐率。

Java併發程式設計實踐之道

避免效能測試的陷阱

以下的幾種編碼陷阱是效能測試變得毫無意義。

垃圾回收

垃圾回收的執行時序是無法預測的,可能發生在任何時刻,如果在測試程式時,恰巧觸發的垃圾回收操作,那麼在最終測試的時間上會帶來很大但虛假的影響。

兩種策略方式垃圾回收操作對測試結果產生偏差:

一:保證垃圾回收在執行測試程式期間不被執行,可透過呼叫JVM時指定-verbose:gc檢視是否有垃圾回收資訊。

二:保證垃圾回收在執行測試程式期間執行多次,可以充分反映出執行期間的記憶體分配和垃圾回收等開銷。

通常而言,第二種更好,更能反映實際環境下的效能。

動態編譯

相比靜態的編譯語言(C或者C++),java動態編譯語言的效能基準測試變得困難的多。在JVM中將位元組碼的解釋和動態編譯結合起來。當某個類第一次被載入,JVM會透過解釋位元組碼方式執行它,然而某個時刻,如果某個方法執行測試足夠多,那麼動態編譯器會將其編譯為機器程式碼,某個方法的執行方法從解釋執行變成直接執行。這種編譯的執行實際無法預測,如果編譯器可以在測試期間執行,那麼將在兩個方面給測試結果帶來偏差:

一:編譯過程消耗CPU資源

二:測量的程式碼中既包含解釋執行程式碼,又包含編譯執行程式碼,測試結果是混合程式碼的效能指標沒有太大的意義。

解決辦法:

一:可以讓測試程式執行足夠長時間,防止動態編譯對測試結果產生的偏差。

二:在HotSpot執行程式時設定-xx:+PrintCompilation,在動態編譯時輸出一條資訊,可以透過這條訊息驗證動態編譯是測試執行前,而不是執行過程中執行

對程式碼路徑的不真實取樣

動態編譯可能會讓不同地方呼叫的同一方法編譯出的程式碼不同。

測試程式不僅要大致判斷某個典型應用程式的使用模式,還要儘量覆蓋在該應用程式中將執行的程式碼路徑集合

訪問共享資料競爭程度影響吞吐量

併發程式交替執行兩種型別的工作:訪問共享資料(例如從共享工作佇列取出下一個任務)和執行執行緒本地的計算(例如:執行任務)。如果任務是計算密集型,即任務執行時間較長,那麼這種情況下幾乎不存在競爭,吞吐量受限於CPU資源可用性。然而,如果任務生命週期較慢,那麼在工作佇列上存在嚴重的競爭,吞吐量受限於同步的開銷。例如TimePutTakeTest由於消費者沒有執行太多工作,吞吐量受限於執行緒的協調開銷。

無用程式碼的消除

無論是何種語言編寫優秀的基準測試程式,一個需要面對的挑戰是:最佳化編譯能找出並消除那些對輸出結果不會產生任何影響的無用程式碼。由於基準測試程式碼通常不會執行任何技術,因此很容易在編譯器的最佳化過程中被消除,因此測試的內容變得更少。在動態編譯語言java中,要檢測編譯器是否消除了測試基準是很困難的。

解決辦法,就是告訴最佳化器不要將基準測試程式碼當成無用程式碼而最佳化掉,這就要求在程式中對每個計算結果都透過某種方法使用,這種方法不需要大量的計算。例如在PutTakeTest中,我們計算了在佇列中新增刪除了所有元素的校驗和,如果在程式中沒有用到這個校驗和,那麼計算校驗和操作很有可能被最佳化掉,但是幸好 assertEquals(putSum。get(), takeSum。get());在程式中使用了校驗和來驗證演算法的正確性。

上訴中要儘量採用某種方法使用計算結果避免被最佳化掉,有個簡單的方法可不會引入過高的開銷,將計算結果與System。nanoTime比較,若相等輸出一個無用的訊息即可。

if(計算結果== System。nanoTime())

System。out。print(“ ”);

總結

要測試併發程式的正確性可能非常困難,因為併發程式的許多故障模式都是一些低機率事件,它們對於執行時序、負載情況以及其他難以重現的條件都非常敏感。而且,在測試程式中還會引入額外的同步或執行時序限制,這些因素將掩蓋被測試程式碼中的一些併發問題。要測試併發程式的效能同樣非常困難,與使用靜態編譯語言(例如C)編寫的程式相比,用Java編寫的程式在測試起來更加困難,因為動態編譯、垃圾回收以及自動化等操作都會影響與時間相關的測試結果。

要想盡可能地發現潛在的錯誤以及避免它們在正式產品中暴露出來,我們需要將傳統的測試技術(要謹慎地避免在這裡討論的各種陷阱)與程式碼審查和自動化分析工具結合起來,每項技術都可以找出其他技術忽略的問題。

四。 非同步任務池

Java中的執行緒池設計得非常巧妙,可以高效併發執行多個任務,但是在某些場景下需要對執行緒池進行擴展才能更好地服務於系統。例如,如果一個任務仍進執行緒池之後,執行執行緒池的程式重啟了,那麼執行緒池裡的任務就會丟失。另外,執行緒池只能處理本機的任務,在叢集環境下不能有效地排程所有機器的任務。所以,需要結合線程池開發一個非同步任務處理池。圖11-2 為非同步任務池設計圖。

Java併發程式設計實踐之道

非同步任務池設計圖

任務池的主要處理流程是,每臺機器會啟動一個任務池,每個任務池裡有多個執行緒池,當某臺機器將一個任務交給任務池後,任務池會先將這個任務儲存到資料中,然後某臺機器上的任務池會從資料庫中獲取待執行的任務,再執行這個任務。

每個任務有幾種狀態,分別是建立(NEW)、執行中(EXECUTING)、RETRY(重試)、掛起

(SUSPEND)、中止(TEMINER)和執行完成(FINISH)。

建立:提交給任務池之後的狀態。

執行中:任務池從資料庫中拿到任務執行時的狀態。

重試:當執行任務時出現錯誤,程式顯式地告訴任務池這個任務需要重試,並設定下一次

執行時間。

掛起:當一個任務的執行依賴於其他任務完成時,可以將這個任務掛起,當收到訊息後, 再開始執行。

中止:任務執行失敗,讓任務池停止執行這個任務,並設定錯誤訊息告訴呼叫端。

執行完成:任務執行結束。

任務池的任務隔離

。非同步任務有很多種型別,比如抓取網頁任務、同步資料任務等,不同型別的任務優先順序不一樣,但是系統資源是有限的,如果低優先順序的任務非常多,高優先順序的任務就可能得不到執行,所以必須對任務進行隔離執行。使用不同的執行緒池處理不同的任務, 或者不同的執行緒池處理不同優先順序的任務,如果任務型別非常少,建議用任務型別來隔離,如果任務型別非常多,比如幾十個,建議採用優先順序的方式來隔離。

任務池的重試策略

。根據不同的任務型別設定不同的重試策略,有的任務對實時性要求高,那麼每次的重試間隔就會非常短,如果對實時性要求不高,可以採用預設的重試策略,重試間隔隨著次數的增加,時間不斷增長,比如間隔幾秒、幾分鐘到幾小時。每個任務型別可以設定執行該任務型別執行緒池的最小和最大執行緒數、最大重試次數。

使用任務池的注意事項

。任務必須無狀態:任務不能在執行任務的機器中儲存資料,比如某個任務是處理上傳的檔案,任務的屬性裡有檔案的上傳路徑,如果檔案上傳到機器1,機器2 獲取到了任務則會處理失敗,所以上傳的檔案必須存在其他的集群裡,比如OSS或SFTP。

非同步任務的屬性

。包括任務名稱、下次執行時間、已執行次數、任務型別、任務優先順序和

執行時的報錯資訊(用於快速定位問題)。