深入理解ThreadLocal

ThreadLocal

多线程产生的线程安全性问题

数据一致性问题:发生在多个主体对同一份数据无法达成共识。

解决方法:

  1. 排队:锁、互斥量、管程、屏障
  2. 投票:Paxos算法、Raft算法

这两种方法都会消耗额外的性能。

ThreadLocal是能够避免数据不一致性的。

ThreadLocal使用

定义:

提供线程局部变量

一个线程局部变量在多个线程中,分别有多个独立的值。

特点:

简单(开箱即用)、快速(无额外开销)、安全(线程安全)

API使用

初始化值

1
2
3
4
5
6
static ThreadLocal<Long> local = new ThreadLocal<Long>(){
@Override
protected Long initialValue() {
return Thread.currentThread().getId();
}
};

得到值

1
local.get();

设置值

1
local.set(0L);

移除值

1
local.remove();

综合案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class demo1 {

static ThreadLocal<Long> local = new ThreadLocal<Long>(){
@Override
protected Long initialValue() {
return Thread.currentThread().getId();
}
};

public static void main(String[] args) {
System.out.println("线程" + Thread.currentThread().getName() + "的值为" + local.get());
local.remove();
System.out.println("将主线程的local移除");

new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "中的Local值为" + local.get());
System.out.println("重新设置线程" + Thread.currentThread().getName() + "中的Local值为111L");
local.set(111L);
System.out.println("线程" + Thread.currentThread().getName() + "中的Local值为" + local.get());
System.out.println("将线程" + Thread.currentThread().getName() + "中的Local值移除");
local.remove();

System.out.println("移除之后在打印线程" + Thread.currentThread().getName() + "的值" + local.get());
}
}
).start();
}
}

结果:

1
2
3
4
5
6
7
线程main的值为1
将主线程的local移除
线程Thread-0中的Local值为12
重新设置线程Thread-0中的Local值为111L
线程Thread-0中的Local值为111
将线程Thread-0中的Local值移除
移除之后在打印线程Thread-0的值12

ThreadLocal使用场景

数据库连接 – 线程一致

1
2
3
4
5
6
7
8
@Transactional
@Override
public void updateTest(Test updateVO) {
Test test = testMapper.selectByPrimaryKey(updateVO.getId());

testMapper.updateByPrimaryKey(updateVO);
testMapper.deleteByPrimaryKey(2);
}

我们有一个线程来执行这个方法,这个方法开启的事务,事务是基于数据库Connection连接的,这个事务中有三个操作数据库的DAO方法。如果想用一个事务来管三个操作dao的方法,那么这三个dao方法必须基于同一个Connection连接

转账案例
1
2
3
4
5
6
7
8
9
10
11
-- 使用数据库
use demo;
-- 创建一张账户表
create table account(
id int primary key auto_increment,
name varchar(20),
money double
);
-- 初始化数据
insert into account values(null, 'Jack', 10000);
insert into account values(null, 'Rose', 0);

引入maven

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.mchange/c3p0 -->
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.5.5</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>2.2</version>
<scope>provided</scope>
</dependency>


<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.25</version>
</dependency>
</dependencies>

C3P0配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<c3p0-config>
<!-- 使用默认的配置读取连接池对象 -->
<default-config>
<!-- 连接参数 -->
<property name="driverClass">com.mysql.jdbc.Driver</property>
<property name="jdbcUrl">jdbc:mysql://localhost:3306/demo</property>
<property name="user">root</property>
<property name="password">root</property>

<!-- 连接池参数 -->
<property name="initialPoolSize">5</property>
<property name="maxPoolSize">10</property>
<property name="checkoutTimeout">3000</property>
</default-config>

</c3p0-config>

JDBC工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class JdbcUtils {
// c3p0 数据库连接池对象属性
private static final ComboPooledDataSource ds = new ComboPooledDataSource();
// 获取连接
public static Connection getConnection() throws SQLException {
return ds.getConnection();
}
//释放资源
public static void release(AutoCloseable... ios){
for (AutoCloseable io : ios) {
if(io != null){
try {
io.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}


public static void commitAndClose(Connection conn) {
try {
if(conn != null){
//提交事务
conn.commit();
//释放连接
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}

public static void rollbackAndClose(Connection conn) {
try {
if(conn != null){
//回滚事务
conn.rollback();
//释放连接
conn.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}

DAO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class AccountDao {

public void out(Connection conn, String outUser, int money) throws SQLException{
String sql = "update account set money = money - ? where name = ?";
//注释从连接池获取连接的代码,使用从service中传递过来的connection
// Connection conn = JdbcUtils.getConnection();
PreparedStatement pstm = conn.prepareStatement(sql);
pstm.setInt(1,money);
pstm.setString(2,outUser);
pstm.executeUpdate();
//连接不能在这里释放,service层中还需要使用
// JdbcUtils.release(pstm,conn);
JdbcUtils.release(pstm);
}

public void in(Connection conn, String inUser, int money) throws SQLException {
String sql = "update account set money = money + ? where name = ?";
// Connection conn = JdbcUtils.getConnection();
PreparedStatement pstm = conn.prepareStatement(sql);
pstm.setInt(1,money);
pstm.setString(2,inUser);
pstm.executeUpdate();
// JdbcUtils.release(pstm,conn);
JdbcUtils.release(pstm);
}
}

Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class AccountService {
public boolean transfer(String outUser, String inUser, int money) {
AccountDao ad = new AccountDao();
//线程并发情况下,为了保证每个线程使用各自的connection,故加锁
synchronized (AccountService.class) {

Connection conn = null;
try {
conn = JdbcUtils.getConnection();
//开启事务
conn.setAutoCommit(false);
// 转出
ad.out(conn, outUser, money);
// 模拟转账过程中的异常
// int i = 1/0;
// 转入
ad.in(conn, inUser, money);
//事务提交
JdbcUtils.commitAndClose(conn);
} catch (Exception e) {
e.printStackTrace();
//事务回滚
JdbcUtils.rollbackAndClose(conn);
return false;
}
return true;
}
}
}

​ Servlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ServletDemo extends HttpServlet {

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// 模拟数据 : Jack 给 Rose 转账 100
String outUser = "Jack";
String inUser = "Rose";
int money = 100;

AccountService as = new AccountService();
boolean result = as.transfer(outUser, inUser, money);

if (result == false) {
System.out.println("转账失败!");
} else {
System.out.println("转账成功!");
}
}

@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
super.doPost(req, resp);
}
}

web.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
<display-name>Archetype Created Web Application</display-name>

<servlet>
<servlet-name>servletDemo</servlet-name>
<servlet-class>com.jiang.ServletDemo</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>servletDemo</servlet-name>
<url-pattern>/transfer</url-pattern>
</servlet-mapping>


</web-app>

使用apache ab 压力测试

ab -n 100 -c 8 http://localhost:8080/transfer

最后结果没有问题。

这种方法可以解决在一个事务中的多个DAO操作均使用一个相同的Connection

简单来看,其实就是一个service方法中有一个事务,那么在这个方法中获取一个Connection,然后将这个连接传递到这个事务下面的DAO操作中。DAO操作使用相同的连接,这样就保证了事务。

1
2
3
4
5
6
7
8
9
Connection conn = null;
try {
//从连接池中获取connection
conn = JdbcUtils.getConnection();
//将conn传递给DAO 操作
ad.out(conn, outUser, money);
//将conn传递给DAO 操作
ad.in(conn, inUser, money);
}

这种方法的弊端:

耦合度太高了,这里的conn需要从Service层传递给DAO层

解决方法:ThreadLocal

将Connection连接存储在ThreadLocal,保证同一线程使用同一Conn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class JdbcUtils {
//ThreadLocal对象 : 将connection绑定在当前线程中
private static final ThreadLocal<Connection> tl = new ThreadLocal();

// c3p0 数据库连接池对象属性
private static final ComboPooledDataSource ds = new ComboPooledDataSource();

// 获取连接
public static Connection getConnection() throws SQLException {
//取出当前线程绑定的connection对象
Connection conn = tl.get();
if (conn == null) {
//如果没有,则从连接池中取出
conn = ds.getConnection();
//再将connection对象绑定到当前线程中
tl.set(conn);
}
return conn;
}

//释放资源
public static void release(AutoCloseable... ios) {
for (AutoCloseable io : ios) {
if (io != null) {
try {
io.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

public static void commitAndClose() {
try {
Connection conn = getConnection();
//提交事务
conn.commit();
//解除绑定
tl.remove();
//释放连接
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}

public static void rollbackAndClose() {
try {
Connection conn = getConnection();
//回滚事务
conn.rollback();
//解除绑定
tl.remove();
//释放连接
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}

这样在在Service层和DAO层不需要传递Conn,依然可以保证在一个事务中的多个DAO操作使用的同一个连接。

简易版ThreadLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.HashMap;
import java.util.Map;

public class MyThreadLocal<T> {

//我们这里的设计是
//通过map来存储线程的值,因此第一个map的K是线程
//而在源码中,我们不需要这样通过map来存储线程
//因为我们在Thread中定义了一个Map (ThreadLocalMap)
static HashMap<Thread,HashMap<MyThreadLocal<?>,Object>> threadLocalMap
= new HashMap();

synchronized static HashMap<MyThreadLocal<?>,Object> getMap(){
Thread thread = Thread.currentThread();
if(!threadLocalMap.containsKey(thread)){
threadLocalMap.put(thread,new HashMap<MyThreadLocal<?>,Object>());
}
return threadLocalMap.get(thread);
}

protected T initialValue(){
return null;
}

public T get(){
Map map = getMap();
if(!map.containsKey(this)){
map.put(this,initialValue());
}
return (T) map.get(this);
}

public void set(T v){
Map map = getMap();
map.put(this,v);
}

public void remove(){
Map map = getMap();
if (map != null){
map.remove(this);
}
}


}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class LocalTest {

static MyThreadLocal<Long> local = new MyThreadLocal<Long>(){

@Override
protected Long initialValue() {
return Thread.currentThread().getId();
}
};

public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
new Thread(() ->{
System.out.println(Thread.currentThread().getName() + ":" + local.get());
}).start();
}
}
}

ThreadLocal源码设计

1
2
3
4
5
6
1. 每个Thread维护着一个ThreadLocalMap的引用
2. ThreadLocalMap是ThreadLocal的内部类,用Entry来进行存储
3. ThreadLocal创建的副本是存储在自己的threadLocals中的,也就是自己的ThreadLocalMap。
4. ThreadLocalMap的键值为ThreadLocal对象,而且可以有多个threadLocal变量,因此保存在map中
5. 在进行get之前,必须先set,否则会报空指针异常,当然也可以初始化一个,但是必须重写initialValue()方法。
6. ThreadLocal本身并不存储值,它只是作为一个key来让线程从ThreadLocalMap获取value。

ThreadLocal设计

1.自定义的ThreadLocalMap

ThreadLocalMap 是一个定制的哈希映射,仅适用于维护线程本地值。不会在 ThreadLocal 类之外导出任何操作。该类是包私有的允许在类 Thread 中声明字段。为了帮助处理非常大且长期存在的用法,哈希表条目使用 WeakReferences 作为键。但是,由于不使用引用队列,所以只有在表开始耗尽空间时才能保证删除陈旧条目。

1
2
static class ThreadLocalMap {
}

为什么要重新定义Map、和HashMap有什么区别:

ThreadLocalMap使用线性探测法来解决哈希冲突的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();

if (k == key) {
e.value = value;
return;
}

if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}

tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
2.弱引用和内存泄漏

此哈希映射中的条目扩展了 WeakReference,使用其主要 ref 字段作为键(始终是ThreadLocal 对象)。请注意,空键(即 entry.get() == null)意味着不再引用该键,因此可以从表中删除条目。此类条目在后面的代码中被称为 “陈旧条目”。

1
2
3
4
5
6
7
8
9
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

强引用和弱引用都会引起内存泄漏。

弱引用造成内存泄漏

无论是强引用还是弱引用,都是因为在ThreadLocalMap的生命周期中没有调用remove()方法将K-V删除掉。

在强引用中,无法删除ThreadLocal和K-V。在弱引用中,无法删除K-V

这就是为什么要使用弱引用。

如何避免内存泄漏:

最主要的方法就是调用remove()方法。特别是在线程池中。


深入理解ThreadLocal
https://johnjoyjzw.github.io/2021/09/05/深入理解ThreadLocal/
Author
John Joy
Posted on
September 5, 2021
Licensed under