ConcurrentLinkedQueue是Queue的一个安全实现.Queue中元素按FIFO原则进行排序.采用CAS操作,来保证元素的一致性.
数据结构为:单向链表.
变量使用volatile修改,保证内在可见性(happens-before,对变量的写操作对后续的读操作是可见的),同样也不会导致CPU指令的重排序.
private static class Node<E> {
volatile E item;
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/***
* Sets the value of the object field at the specified offset in the
* supplied object to the given value, with volatile store semantics.
* 设置obj对象中offset偏移地址对应的object型field的值为指定值。支持volatile store语义
*
* @param obj the object containing the field to modify.
* 包含需要修改field的对象
* @param offset the offset of the object field within <code>obj</code>.
* <code>obj</code>中object型field的偏移量
* @param value the new value of the field.
* field将被设置的新值
* @see #putObject(Object,long,Object)
*/
public native void putObjectVolatile(Object obj, long offset, Object value);
/***
* Compares the value of the object field at the specified offset
* in the supplied object with the given expected value, and updates
* it if they match. The operation of this method should be atomic,
* thus providing an uninterruptible way of updating an object field.
* 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法
* 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。
*
* @param obj the object containing the field to modify.
* 包含要修改field的对象
* @param offset the offset of the object field within <code>obj</code>.
* <code>obj</code>中object型field的偏移量
* @param expect the expected value of the field.
* 希望field中存在的值
* @param update the new value of the field if it equals <code>expect</code>.
* 如果期望值expect与field的当前值相同,设置filed的值为这个新值
* @return true if the field was changed.
* 如果field的值被更改
*/
public native boolean compareAndSwapObject(Object obj, long offset,
Object expect, Object update);
/***
* Sets the value of the object field at the specified offset in the
* supplied object to the given value. This is an ordered or lazy
* version of <code>putObjectVolatile(Object,long,Object)</code>, which
* doesn't guarantee the immediate visibility of the change to other
* threads. It is only really useful where the object field is
* <code>volatile</code>, and is thus expected to change unexpectedly.
* 设置obj对象中offset偏移地址对应的object型field的值为指定值。这是一个有序或者
* 有延迟的<code>putObjectVolatile</cdoe>方法,并且不保证值的改变被其他线程立
* 即看到。只有在field被<code>volatile</code>修饰并且期望被意外修改的时候
* 使用才有用。
*
* @param obj the object containing the field to modify.
* 包含需要修改field的对象
* @param offset the offset of the object field within <code>obj</code>.
* <code>obj</code>中long型field的偏移量
* @param value the new value of the field.
* field将被设置的新值
*/
public native void putOrderedObject(Object obj, long offset, Object value);
构造head和tail都为一个空的Node
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
因为只继承AbstractQueue,Queue所以只有add(e),remove,offer(e),poll.主要的方法.
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
放入到Queue整个过程.
1.获取尾结点t,设置p=t,q为t的下一个结点,判断q是否为空.如果为空,说明为最后的结点,使用CAS更新p的next node,
如果成功,返回true;如果失败说明next node已经被更新,那么执行循环从1开始执行;如果成功执行2.
2.判断p和q是否为同一对象,如果不是使用CAS更新tail(并发时可能出现),更新失败说明已经有tail了.不管成功失败都返回true.
3.如果q不为null 并且p == q,如果tail没有改变,那么jump to head,重新检查,继续执行1
4.如果q不为null 并且p <> q,也就是说此时尾节点后面还有元素,那么就需要把尾节点往后移,继续执行1.
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
从Queue取item整个过程。
1.p = head,如果p.item不过空,那么通过CAS将p.item更新为null,如果更新成功返回item。
2.如果p.item为空,判断p.next是否为空,如果为空更新head为p。
3.如果p.next不为空,那么p = q(p.next).重新执行1。
4.如果p.next不为空,当p==q说明1中更新没有成功,重新执行1。
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
获取队列大小的过程,由于没有一个计数器来对队列大小计数,所以获取队列的大小只能通过从头到尾完整的遍历队列,代价是很大的。所以通常情况下ConcurrentLinkedQueue需要和一个AtomicInteger搭配来获取队列大小。
需要说明的是,对尾节点的tail的操作需要换成临时变量t和s,一方面是为了去掉volatile变量的可变性,另一方面是为了减少volatile的性能影响。
分享到:
相关推荐
ConcurrentLinkedQueue源码分析
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
自己动手让springboot异步处理浏览器发送的请求(只需要使用ConcurrentLinkedQueue即可)
LinkedBlockingQueue 首先 LinkedBlockingQueue 是一个 “可选且有界” 的阻塞队列实现,你可以根据需要指定队列的大小。 接下来,我将创建一个 LinkedBlockingQueue ,它最多可以包含100个元素: ...
聊聊并发(6)ConcurrentLinkedQueue的实现原理分析Java开发Java经验技巧共18页.pdf.zip
Java 多线程与并发(15_26)-JUC集合_ ConcurrentLinkedQueue详解
ConcurrentLinkedQueue 在考虑并发的时候可以先考虑单线程的情况,然后再将并发的情况考虑进来。 比如ConcurrentLinkedQueue: 1、先考虑单线的offer 2、再考虑多线程时候的offer: · 多个线程offer...
主要介绍了Java concurrency集合之ConcurrentLinkedQueue,需要的朋友可以参考下
随着项目的不断迭代,加上产品经理大法(这里加一个弹窗提示,...为了防止多个线程同时操作DialogManager中的queue对象,所以我们采用线程安全的ConcurrentLinkedQueue,这里简单的介绍下ConcurrentLinkedQueue实现和数
ConcurrentLinkedQueue是⼀个基于链接节点的⽆界线程安全队列,它采⽤先进先出的规则对节点 进⾏排序,当我们添加⼀个元素的时候,它会添加到队列的尾部,当我们获取⼀个元素时,它会返 回队列头部的元素。 ● ...
- **`ConcurrentLinkedQueue`** : 高效的并发队列,使用链表实现。可以看做一个线程安全的 `LinkedList`,这是一个非阻塞队列。 - **`BlockingQueue`** : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口...
JUC 提供了线程安全的并发集合类,如 ConcurrentHashMap、ConcurrentLinkedQueue 等。 3. 原子操作(Atomic Operations): 原子操作是不可再分割的基本操作,JUC 提供了一系列原子操作类,如 AtomicInteger、...
Java并发编程常见知识点源码集锦,涉及到对象锁,Executors多任务线程框架,线程池等... ConcurrentLinkedQueue、DelayQueue示例、自定义的线程拒绝策略、自定义线程池(使用有界队列)、自定义线程池(使用无界队列)。。。
并发集合:Java中的并发集合,包括ConcurrentHashMap、ConcurrentLinkedQueue、CopyOnWriteArrayList等。 并发控制:Java中的并发控制机制,包括信号量、原子变量、倒计时等。 线程安全:Java中的线程安全,包括同步...
6. 聊聊并发(六)ConcurrentLinkedQueue的实现原理 7. 聊聊并发(七)Java中的阻塞队列 8. 聊聊并发(八)Fork/Join框架介绍 9. 聊聊并发(九)Java中的CopyOnWrite容器 10. 聊聊并发(十)生产者消费者模式
并发集合:JUC提供了一些线程安全的集合类,如ConcurrentHashMap、ConcurrentLinkedQueue等,可以在多线程环境下安全地访问和修改集合。 原子操作:JUC提供了一些原子操作类,如AtomicInteger、AtomicLong等,可以...
我们将详细介绍 JUC 提供的线程安全集合类,如 ConcurrentHashMap 和 ConcurrentLinkedQueue,以及它们在实际应用中的用法。 通过这份资源,您将获得全面的 Java 并发编程知识,从基础概念到高级应用,从工具使用到...
ConcurrentLinkedQueue 标记: class Map 标记: 顶级接口 HashMap 标记: class V get(Object key) V put(K key, V value) Set<K> keySet() Set,V>> entrySet() 线程不安全,速度快,允许存放null键,...
并发包 同步容器类 Vector与ArrayList区别 ...在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队 列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。