利用ReentrantLock简单实现一个阻塞队列

借助JUC里的ReentrantLock实现一个阻塞队列结构:

代码块1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package demo.concurrent.lock.queue;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author sunqinwen
* @version \: SimpleQueue.java,v 0.1 2019-01-16 14:47
* 利用重入锁和重入锁的线程调度实现的简单阻塞队列
*/
public class SimpleQueue {

private static ReentrantLock lock = new ReentrantLock();

private T[] nodes;

private int tail = 0; // 入元素下标

private int count = 0; // 元素个数

private int head = 0; // 出元素下标

public SimpleQueue(int size) {
nodes = (T[]) new Object[size];
}

private static Condition notFull = lock.newCondition();

private static Condition notEmpty = lock.newCondition();

public void put(T t) {
try {
lock.lock();
if (count == nodes.length) { // 队列已满,阻塞
System.out.println("目前队列已满,等待取值中");
notFull.await();
}
if (tail > (nodes.length - 1)) { // 当前游标值已经大于数组游标最大值了,则从0开始计算
tail = 0;
}
nodes[tail] = t; // 给当前游标位赋值
count++; // 入元素元素个数+1
tail++; // 游标值+1
notEmpty.signalAll(); // 走到这里说明队列内至少有一个元素,则唤醒取值
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public T take() {
T t = null;
try {
lock.lock();
if (count == 0) { // 队列已空,等待加值
System.out.println("目前队列已空,等待入值中");
notEmpty.await();
}
if (head > (nodes.length - 1)) { // 若取值游标大于游标最大值,则从0开始计算
head = 0;
}
t = nodes[head]; // 拿到元素值
nodes[head] = null; // 清空原有位置上的值
head++; // 取值游标+1
count--; // 元素个数-1
notFull.signalAll(); // 走到这里说明队列至少有一个空位,则唤醒入值
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

return t;
}

}

以上为主要代码,下面进行简单的测试:

代码块2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Test
public void simpleQueueTest() throws Exception {

executorService.execute(() -> {
simpleQueue.put(1);
simpleQueue.put(2);
simpleQueue.put(3);
simpleQueue.put(4);
simpleQueue.put(5);
simpleQueue.put(6);

simpleQueue.put(7);
simpleQueue.put(8);
simpleQueue.put(9);
simpleQueue.put(10);
simpleQueue.put(11);
simpleQueue.put(12);
});

Thread.sleep(5000L);

executorService.execute(() -> {

Integer r;
while ((r = simpleQueue.take()) != null) {
System.out.println(r);
}
});

Thread.sleep(5000L);
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
目前队列已满,等待取值中
目前队列已满,等待取值中
1
2
目前队列已满,等待取值中
3
目前队列已满,等待取值中
4
5
6
7
8
9
目前队列已空,等待入值中
10
11
12
目前队列已空,等待入值中