您好,登录后才能下订单哦!
无 锁 算法 详 解
无 锁 的Vector 实现:
参照着JDK中的 Vector 源码
1、Vector中的 add 方法的实现,它是一个同步方法,所以保证了每一次只能又一个值对数组 elementData 进行操作。
protected Object[] elementData; 通过数据来实现存储
protected int elementCount; 记录对这个Vector的操作数
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);//这边是做越界判断
elementData[elementCount++] = e;
return true;
}
private void ensureCapacityHelper(int minCapacity) {
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);//如果没有越界
}
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + ((capacityIncrement > 0) ?
capacityIncrement : oldCapacity);
//如果初始化的时候不指定增量capacityIncrement,那么就是将oldCapacity+oldCapacity赋值给新的长度,如果指定增量那么就是oldCapacity+capacityIncrement
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
elementData = Arrays.copyOf(elementData, newCapacity);
//最后将老的元素和新的一起加入到Vector中
}
public static <T> T[] copyOf(T[] original, int newLength) {
return (T[]) copyOf(original, newLength, original.getClass());
}
值得注意的一点就是如果指定增量,那么可以减少空间的浪费。
JDK阅读只是为未无锁的Vector做铺垫
无锁的Vector的实现
/**
@date
/
public class LockFreeVector<E> extends AbstractList<E>{
private static final boolean debug = false;
private static final int FIRST_BUCKET_SIZE = 8;
//虽然这边篮子的个数是固定的,但是绝对是够用的因为总共的篮子数就是82^30-1
private static final int N_BUCKET = 30;
private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;
//利用二维数组来嵌套是为了使一维的AtomicReferenceArray尽量避免修改,在一维数组填充满了时是不会去扩充的,
// 而是往二维数组里面填充,这样就避免了修改一维数组,而且高并发就是避免不必要的写来影响性能
public LockFreeVector(AtomicReferenceArray<AtomicReferenceArray<E>> buckets) {
this.buckets = buckets;
}
static class WriteDescriptor<E>{
public E oldV;
public E newV;
public AtomicReferenceArray<E> addr;
public int addr_ind;
public WriteDescriptor( AtomicReferenceArray<E> addr, int addr_ind,E oldV, E newV) {
this.oldV = oldV;
this.newV = newV;
this.addr = addr;
this.addr_ind = addr_ind;
}
public void doInt(){
addr.compareAndSet(addr_ind,oldV,newV);
}
}
static class Descriptor<E>{
public int size;
volatile WriteDescriptor<E> writeOp;
public Descriptor(int size, WriteDescriptor<E> writeOp) {
this.size = size;
this.writeOp = writeOp;
}
public void completeWrite(){
WriteDescriptor<E> tmpOp = writeOp;
if(tmpOp != null){
tmpOp.doInt();
writeOp=null;
}
}
}
private AtomicReference<Descriptor<E>> descriptor;
private static final int zeroNumFirst = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE);
public LockFreeVector(){
buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET);
buckets.set(0,new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE));
descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0,null));
}
public void push_back(E e){
Descriptor<E> desc;
Descriptor<E> newD;
do {
desc=descriptor.get();
desc.completeWrite();
int pos = desc.size+FIRST_BUCKET_SIZE;
int zeroNumPos = Integer.numberOfLeadingZeros(pos);
int bucketInd = zeroNumFirst - zeroNumPos;
if(buckets.get(bucketInd)==null){
int newLen = 2 * buckets.get(bucketInd - 1).length();
if (debug)
System.out.println("New Length is:"+newLen);
buckets.compareAndSet(bucketInd,null,new AtomicReferenceArray<E>(newLen));
}
//0x80000000就是1000000... 总共32位
int idx = (0x80000000>>>zeroNumPos) ^ pos;
newD = new Descriptor<>(desc.size + 1,new WriteDescriptor<E>(buckets.get(bucketInd),idx,null, e));
}while (!descriptor.compareAndSet(desc,newD));
descriptor.get().completeWrite();
}
public E pop_back(){
Descriptor<E> desc;
Descriptor<E> newD;
E elem;
do {
desc = descriptor.get();
desc.completeWrite();
int pos = desc.size + FIRST_BUCKET_SIZE - 1;
int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);
int idx = Integer.highestOneBit(pos) ^ pos;
elem = buckets.get(bucketInd).get(idx);
newD = new Descriptor<E>(desc.size - 1,null);
}while (!descriptor.compareAndSet(desc,newD));
return elem;
}
@Override
public E get(int index){
int pos = index + FIRST_BUCKET_SIZE;
int zeroNumPos = Integer.numberOfLeadingZeros(pos);
int bucketInd = zeroNumFirst - zeroNumPos;
int idx = (0x80000000>>>zeroNumPos) ^ pos;
return buckets.get(bucketInd).get(idx);
}
@Override
public E set(int index,E e){
int pos = index + FIRST_BUCKET_SIZE;
int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);
int idx = Integer.highestOneBit(pos) ^ pos;
AtomicReferenceArray<E> bucket = buckets.get(bucketInd);
while (true){
E oldV = bucket.get(idx);
if (bucket.compareAndSet(idx,oldV,e))
return oldV;
}
}
public void reserve(int newSize){
int size = descriptor.get().size;
int pos = size + FIRST_BUCKET_SIZE - 1;
int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(pos);
if (i<1)
i = 1;
int initialSize = buckets.get(i - 1).length();
while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)){
i++;
initialSize *= FIRST_BUCKET_SIZE;
buckets.compareAndSet(i, null , new AtomicReferenceArray<E>(initialSize));
}
}
public int size(){
return descriptor.get().size;
}
@Override
public boolean add(E obj){
push_back(obj);
return true;
}
}
它的结构是:private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;
从这里我们可以看到,它的内部是采用的是 无锁的引用数组, 数组嵌套数组
变量buckets存放所有的内部元素。从定义上看,它是一个保存着数组的数组,也就是通常的二维数组。特别之处在于这些数组都是使用CAS的原子数组。为什么使用二维数组去实现一个一维的Vector呢?这是为了将来Vector进行动态扩展时可以更加方便。我们知道,AtomicReferenceArray内部使用Object[]来进行实际数据的存储,这使得动态空间增加特别的麻烦,因此使用二维数组的好处就是为将来增加新的元素。
相当于一个二维数组,它的大小可以动态的进行扩展,
为了更有序的读写数组,定义了一个Descriptor的静态内部类。它的作用是使用CAS操作写入新数据。
它定义了
private static final int FIRST_BUCKET_SIZE = 8;
/**
FIRST_BUCKET_SIZE:为第一个数组的长度
N_BUCKET 整个二维数组最大可扩转至30
每次的扩展是成倍的增加,即:第一个数组长度为8,第二个为8<<1,第三个为8<<2 ......第30个为 8<<29
3. push_back
在第23行,使用descriptor将数据真正地写入数组中。这个descriptor写入的数据由20~21行构造的WriteDescriptor决定。
在循环最开始(第5行),使用descriptor先将数据写入数组,是为了防止上一个线程设置完descriptor后(22行),还没来得及执行第23行的写入,因此,做一次预防性的操作。
第8~10行通过当前Vector的大小(desc.size),计算新的元素应该落入哪个数组。这里使用了位运算进行计算。
LockFreeVector每次都会扩容。它的第一个数组长度为8,第2个就是16,第3个就是32,依次类推。它们的二进制表示如下:
它们之和就是整个LockFreeVector的总大小,因此,如果每一个数组都恰好填满,那么总大小应该类似如下的值(以4个数组为例)00000000 00000000 00000000 01111000:4个数组都恰好填满时的大小。
导致这个数字进位的最小条件,就是加上二进制的1000。而这个数字整好是8(FIRST_BUCKET_SIZE就是8)这就是第8行代码的意义。
它可以使得数组大小发生一次二进制进位(如果不进位说明还在第一个数组中),进位后前导零的数量就会发生变化。而元素所在的数组,和pos(第8行定义的比变量)的前导零直接相关。每进行一次数组扩容,它的前导零就会减1。如果从来没有扩容过,它的前导零就是28个。以后,逐级减1。这就是第9行获得pos前导零的原因。第10行,通过pos的前导零可以立即定位使用哪个数组(也就是得到了bucketInd的值)。
第11行,判断这个数组是否存在。如果不存在,则创建这个数组,大小为前一个数组的两倍,并把它设置到buckets中。
接着再看一下元素没有恰好填满的情况:
那么总大小如下:
总个数加上二进制1000后,得到:
显然,通过前导零可以定位到第4个数组。而剩余位,显然就表示元素在当前数组内偏移量(也就是数组下标)。根据这个理论,就可以通过pos计算这个元素应该放在给定数组的哪个位置。通过第19行代码,获得pos的除了第一位数字1以外的其他位的数值。因此,pos的前导零可以表示元素所在的数组,而pos的后面几位,则表示元素所在这个数组中的位置。由此,第19行代码就取得了元素所在位置idx。
代码理解可以参考:
https://blog.csdn.net/netcobol/article/details/79785651
和
http://www.shaoqun.com/a/197387.html
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。