java多线程学习之【atomic】
本文主要是讲解atomic的概念,基本用法,使用场景,底层代码原理剖析。在java.util.concurrent.atomic包下atomic一般指原子操作类,主要分为四种类型的原子更新类:原子更新基本类型、原子更新数组类型、原子更新引用和原子更新属性。原子更新基本类型:使用原子方式更新基本类型;原子更新数组:通过原子更新数组里的某个元素;原子更新引用类型:需要更新引用类型往往涉及多个变量;原子
目前卷文化盛行,为了增强面试能力,开始了无边无际的学习,无边界不是重点,重点是要深入1万米。言归正传,本文打算做一个多线程学习的系列文章,沉淀自我。
前言
本文主要是讲解atomic的概念,基本用法,使用场景,底层代码原理剖析。
一、atomic是什么?
在java.util.concurrent.atomic包下atomic一般指原子操作类,主要分为四种类型的原子更新类:原子更新基本类型、原子更新数组类型、原子更新引用和原子更新属性。
原子更新基本类型 | 说明 |
---|---|
AtomicBoolean | 原子更新布尔变量 |
AtomicInteger | 原子更新整型变量 |
AtomicLong | 原子更新长整型变量 |
原子更新数组 | 说明 |
---|---|
AtomicIntegerArray | 原子更新整型数组的某个元素 |
AtomicLongArray | 原子更新长整型数组的某个元素 |
AtomicReferenceArray | 原子更新引用类型数组的某个元素 |
原子更新引用类型 | 说明 |
---|---|
AtomicReference | 原子更新引用类型 |
AtomicReferenceFieldUpdater | 原子更新引用类型里的字段 |
AtomicMarkableReference | 原子更新带有标记位的引用类型 |
原子更新字段类 | 说明 |
---|---|
AtomicIntegerFieldUpdater | 原子更新整型字段 |
AtomicLongFieldUpdater | 原子更新长整型字段 |
AtomicStampedReference | 原子更新带有版本号的引用类型 |
二、基本应用场景
原子更新基本类型:使用原子方式更新基本类型;
原子更新数组:通过原子更新数组里的某个元素;
原子更新引用类型:需要更新引用类型往往涉及多个变量;
原子更新字段类:需要原子更新某个类的某个字段。
2.1原子更新基本类型
以AtomicLong为例,主要方法如下
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;
// setup to use Unsafe.compareAndSwapLong for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
/**
* Records whether the underlying JVM supports lockless
* compareAndSwap for longs. While the Unsafe.compareAndSwapLong
* method works in either case, some constructions should be
* handled at Java level to avoid locking user-visible locks.
*/
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
/**
* Returns whether underlying JVM supports lockless CompareAndSet
* for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.
*/
private static native boolean VMSupportsCS8();
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile long value;
/**
* Creates a new AtomicLong with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicLong(long initialValue) {
value = initialValue;
}
/**
* Creates a new AtomicLong with initial value {@code 0}.
*/
public AtomicLong() {
}
/**
* Gets the current value.
*
* @return the current value
*/
public final long get() {
return value;
}
/**
* Sets to the given value.
*
* @param newValue the new value
*/
public final void set(long newValue) {
value = newValue;
}
/**
* Eventually sets to the given value.
*
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(long newValue) {
unsafe.putOrderedLong(this, valueOffset, newValue);
}
/**
* Atomically sets to the given value and returns the old value.
*
* @param newValue the new value
* @return the previous value
*/
public final long getAndSet(long newValue) {
return unsafe.getAndSetLong(this, valueOffset, newValue);
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* <p><a href="package-summary.html#weakCompareAndSet">May fail
* spuriously and does not provide ordering guarantees</a>, so is
* only rarely an appropriate alternative to {@code compareAndSet}.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
*/
public final boolean weakCompareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}
/**
* Atomically decrements by one the current value.
*
* @return the previous value
*/
public final long getAndDecrement() {
return unsafe.getAndAddLong(this, valueOffset, -1L);
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
public final long getAndAdd(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta);
}
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
/**
* Atomically decrements by one the current value.
*
* @return the updated value
*/
public final long decrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
public final long addAndGet(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}
/**
* Atomically updates the current value with the results of
* applying the given function, returning the previous value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param updateFunction a side-effect-free function
* @return the previous value
* @since 1.8
*/
public final long getAndUpdate(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return prev;
}
/**
* Atomically updates the current value with the results of
* applying the given function, returning the updated value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param updateFunction a side-effect-free function
* @return the updated value
* @since 1.8
*/
public final long updateAndGet(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return next;
}
/**
* Atomically updates the current value with the results of
* applying the given function to the current and given values,
* returning the previous value. The function should be
* side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function
* is applied with the current value as its first argument,
* and the given update as the second argument.
*
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the previous value
* @since 1.8
*/
public final long getAndAccumulate(long x,
LongBinaryOperator accumulatorFunction) {
long prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}
/**
* Atomically updates the current value with the results of
* applying the given function to the current and given values,
* returning the updated value. The function should be
* side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function
* is applied with the current value as its first argument,
* and the given update as the second argument.
*
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the updated value
* @since 1.8
*/
public final long accumulateAndGet(long x,
LongBinaryOperator accumulatorFunction) {
long prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSet(prev, next));
return next;
}
}
使用示例代码如下:
package com.valley.juc.atomic;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author valley
* @date 2022/6/23
* @Description AtomicLong
*/
public class AtomicLongDemo {
static AtomicLong cnt = new AtomicLong(0l);
static void m() {
for (int i = 0; i < 10000; i++) {
cnt.incrementAndGet();
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[1000];
for (int i = 0; i < threads.length; i++)
threads[i] = new Thread(() -> {
AtomicLongDemo.m();
});
long start = System.currentTimeMillis();
for(int i=0;i<threads.length;i++){
threads[i].start();
}
for(int i=0;i<threads.length;i++){
threads[i].join();
}
System.out.println(System.currentTimeMillis()-start);
System.out.println(cnt.get());
}
}
2.2原子更新数组
以AtomicIntegerArray为例,主要方法如下
/**
* Creates a new AtomicIntegerArray of the given length, with all
* elements initially zero.
*
* @param length the length of the array
*/
public AtomicIntegerArray(int length) {
array = new int[length];
}
/**
* Creates a new AtomicIntegerArray with the same length as, and
* all elements copied from, the given array.
*
* @param array the array to copy elements from
* @throws NullPointerException if array is null
*/
public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}
/**
* Returns the length of the array.
*
* @return the length of the array
*/
public final int length() {
return array.length;
}
/**
* Atomically increments by one the element at index {@code i}.
*
* @param i the index
* @return the previous value
*/
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}
/**
* Atomically decrements by one the element at index {@code i}.
*
* @param i the index
* @return the previous value
*/
public final int getAndDecrement(int i) {
return getAndAdd(i, -1);
}
/**
* Atomically adds the given value to the element at index {@code i}.
*
* @param i the index
* @param delta the value to add
* @return the previous value
*/
public final int getAndAdd(int i, int delta) {
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}
/**
* Atomically increments by one the element at index {@code i}.
*
* @param i the index
* @return the updated value
*/
public final int incrementAndGet(int i) {
return getAndAdd(i, 1) + 1;
}
/**
* Atomically decrements by one the element at index {@code i}.
*
* @param i the index
* @return the updated value
*/
public final int decrementAndGet(int i) {
return getAndAdd(i, -1) - 1;
}
/**
* Atomically adds the given value to the element at index {@code i}.
*
* @param i the index
* @param delta the value to add
* @return the updated value
*/
public final int addAndGet(int i, int delta) {
return getAndAdd(i, delta) + delta;
}
使用示例代码如下:展示了数组每个元素都是安全性操作
package com.valley.juc.atomic;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
* @author valley
* @date 2022/6/24
* @Description AtomicArray
*/
public class AtomicArrayDemo {
public static void main(String[] args) throws InterruptedException {
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(1000);
Thread[] threads1 = new Thread[100];
Thread[] threads2 = new Thread[100];
Decrement decrement = new Decrement(atomicIntegerArray);
Increment increment = new Increment(atomicIntegerArray);
for (int i = 0; i < 100; i++) {
threads1[i] = new Thread(decrement);
threads2[i] = new Thread(increment);
threads1[i].start();
threads2[i].start();
}
for (int i = 0; i < 100; i++) {
threads1[i].join();
threads2[i].join();
}
for (int i = 0; i < atomicIntegerArray.length(); i++) {
if(atomicIntegerArray.get(i) != 0){
System.out.println("发现了错误:错误的索引:"+i);
}
}
System.out.println("运行结束");
}
static class Decrement implements Runnable{
private AtomicIntegerArray array;
public Decrement(AtomicIntegerArray array) {
this.array = array;
}
@Override
public void run() {
for (int i = 0; i < array.length(); i++) {
array.getAndDecrement(i);
}
}
}
static class Increment implements Runnable{
private AtomicIntegerArray array;
public Increment(AtomicIntegerArray array) {
this.array = array;
}
@Override
public void run() {
for (int i = 0; i < array.length(); i++) {
array.getAndIncrement(i);
}
}
}
}
2.3原子更新引用类型
以AtomicReference为例,主要方法如下
/**
* Creates a new AtomicReference with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicReference(V initialValue) {
value = initialValue;
}
/**
* Creates a new AtomicReference with null initial value.
*/
public AtomicReference() {
}
/**
* Gets the current value.
*
* @return the current value
*/
public final V get() {
return value;
}
/**
* Sets to the given value.
*
* @param newValue the new value
*/
public final void set(V newValue) {
value = newValue;
}
/**
* Eventually sets to the given value.
*
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(V newValue) {
unsafe.putOrderedObject(this, valueOffset, newValue);
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
使用示例代码如下:AtomicReference可以让一个对象保证原子性,当然,AtomicReference的功能明显比AtomicInteger强,因为一个对象里可以包含很多属性
package com.valley.juc.atomic;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author valley
* @date 2022/7/1
* @Description TODO
*/
public class AtomicReferenceDemo {
private AtomicReference<Thread> sign = new AtomicReference<>();
public void lock(){
Thread current = Thread.currentThread();
while(!sign.compareAndSet(null,current)){
System.out.println(Thread.currentThread().getName() + "自旋锁获取失败,再次尝试");
}
}
public void unlock(){
Thread current = Thread.currentThread();
sign.compareAndSet(current,null);
}
public static void main(String[] args) {
AtomicReferenceDemo spinLock = new AtomicReferenceDemo();
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "尝试获取自旋锁");
spinLock.lock();
System.out.println(Thread.currentThread().getName() + "获取到了自旋锁");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
spinLock.unlock();
System.out.println(Thread.currentThread().getName() + "释放了自旋锁");
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
}
}
2.4原子更新引用类型
- 需要更新某个类里的某个字段
- Atomic包提供了以下三个类:
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
AtomicLongFieldUpdater:原子更新长整型字段的更新器。
AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更数据和数据的版本号,可以解决使用CAS进行原子更新时,可能出现的ABA问题。
原子更新字段类都是抽象类,每次使用都时候必须使用静态方法newUpdater创建一个更新器。原子更新类的字段的必须使用public volatile修饰符。
使用上述类是必须遵循以下原则:
字段必须是voliatile类型的,在线程之间共享变量时能保持可见性。
字段的描述类型是与调用者的操作对象字段保持一致。
也就是说调用者可以直接操作对象字段,那么就可以反射进行原子操作。
对父类的字段,子类不能直接操作的,尽管子类可以访问父类的字段
只能是实例变量,不能是类变量,也就是说不能加static关键字
只能是可修改变量,不能使用final修饰变量,final的语义,不可更改。
以AtomicIntegerFieldUpdater为例,主要方法如下
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
String fieldName) {
return new AtomicIntegerFieldUpdaterImpl<U>
(tclass, fieldName, Reflection.getCallerClass());
}
/**
* Protected do-nothing constructor for use by subclasses.
*/
protected AtomicIntegerFieldUpdater() {
}
public int addAndGet(T obj, int delta) {
int prev, next;
do {
prev = get(obj);
next = prev + delta;
} while (!compareAndSet(obj, prev, next));
return next;
}
public int getAndAdd(T obj, int delta) {
int prev, next;
do {
prev = get(obj);
next = prev + delta;
} while (!compareAndSet(obj, prev, next));
return prev;
}
public int decrementAndGet(T obj) {
int prev, next;
do {
prev = get(obj);
next = prev - 1;
} while (!compareAndSet(obj, prev, next));
return next;
}
public int getAndDecrement(T obj) {
int prev, next;
do {
prev = get(obj);
next = prev - 1;
} while (!compareAndSet(obj, prev, next));
return prev;
}
使用示例代码如下:
package com.valley.juc.atomic;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* @author valley
* @date 2022/7/1
* @Description AtomicIntegerFieldUpdater
*/
public class AtomicIntegerFieldUpdaterDemo implements Runnable{
static Candidate tom;
static Candidate peter;
public static AtomicIntegerFieldUpdater<Candidate> scoreUpdater =
AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
peter.score++;
scoreUpdater.getAndIncrement(tom);
}
}
public static void main(String[] args) throws InterruptedException {
tom = new Candidate();
peter = new Candidate();
AtomicIntegerFieldUpdaterDemo updaterDemo = new AtomicIntegerFieldUpdaterDemo();
Thread thread1 = new Thread(updaterDemo);
Thread thread2 = new Thread(updaterDemo);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("普通变量自增:"+peter.score);
System.out.println("AtomicIntegerFieldUpdater操作的变量:"+tom.score);
}
public static class Candidate{
volatile int score;
}
}
三、和高性能原子类比较
- 高性能原子类,是java8中增加的原子类,它们使用分段的思想,把不同的线程hash到不同的段上去更新,最后再把这些段的值相加得到最终的值,这些类主要有:
- Striped64: LongAccumulator,LongAdder,DoubleAccumulator,DoubleAdder的父类。
- LongAccumulator:long类型的聚合器,需要传入一个long类型的二元操作,可以用来计算各种聚合操作,包括加乘等。
- LongAdder:long类型的累加器,LongAccumulator的特例,只能用来计算加法,且从0开始计算。
- DoubleAccumulator:double类型的聚合器,需要传入一个double类型的二元操作,可以用来计算各种聚合操作,包括加乘等。
- DoubleAdder:double类型的累加器,DoubleAccumulator的特例,只能用来计算加法,且从0开始计算。
- 这几个类操作基本类似,其中DoubleAccumulator和DoubleAdder底层其实也是用long来实现的。Accumulator和Adder非常相似,Accumulator就是一个更通用版本的Adder。LongAddr与LongAccumulator类都是使用非阻塞算法CAS实现的,这相比于使用锁实现原子性操作在性能上有很大的提高。LongAddr类是LongAccumulator类的一个特例,只是LongAccumulator提供了更强大的功能,可以让用户自定义计算规则。
- 以高并发场景下LongAdder比AtomicLong性能好举例子
高并发下LongAdder比AtomicLong效率高,本质是空间换时间
在竞争激烈的时候,LongAdder把不同线程对应到不同的Cell上进行修改,降低了冲突的概率,是多段锁的理念,提高了并发性
AtomicLong中有个内部变量value保存着实际的long值,所有的操作都是针对该变量进行。也就是说,高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。由于竞争很激烈,每一次操作,都要flush和refresh,导致很多资源浪费
LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
更多推荐
所有评论(0)