文章目录
- Object - wait/notify
- object.wait()
- object.notify()
- LockSupport - park/unpark
- LockSupport.park()
- LockSupport.unPark()
Object - wait/notify
object.wait()
ObjectSynchronizer::wait
从这段代码可以得到两个信息
1:wait() 底层是对象锁(就是synchronized底层实现的那个对象锁)。这也正是 wait/notify 要在同步代码块内的原因。
2:wait() 的调用会使得对象锁立马膨胀成重量级锁(因为需要使用mutex阻塞线程)。
ObjectMonitor::wait
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {//...// 封装成ObjectWaiter对象ObjectWaiter node(Self);node.TState = ObjectWaiter::TS_WAIT ;Self->_ParkEvent->reset() ;OrderAccess::fence();//加入 WaitSet Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;AddWaiter (&node) ;Thread::SpinRelease (&_WaitSetLock) ;//...//释放当前线程占用的对象锁exit (true, Self) ;//...//阻塞当前线程if (node._notified == 0) {if (millis <= 0) {Self->_ParkEvent->park () ;} else {ret = Self->_ParkEvent->park (millis) ;}}//...
}
wait主要干了三件事:
1:封装objectWaiter对象并加入 WaitSet
2:释放对象锁
3:调用 ParkEvent.park() 阻塞当前线程(底层调用 pthread_mutex_lock )
object.notify()
同理
ObjectSynchronizer::notify
ObjectMonitor::notify
void ObjectMonitor::notify(TRAPS) {//...//Policy 移动策略,默认为 2 int Policy = Knob_MoveNotifyee ;//取出waitSet的第一个Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;ObjectWaiter * iterator = DequeueWaiter() ;//根据 Policy 策略移动ObjectWaiter到cxq或者entryList或直接唤醒// Policy == 0 :头插entrylist// Policy == 1 :尾插entrylist// Policy == 2 :如果entrylist为空,那么插入entrylist,否则插入cxq队列// Policy == 3 :直接插入cxq // 其他:直接唤醒线程,让线程直接调用enterIif (iterator != NULL) {//...// Policy == 0 :头插entrylistif (Policy == 0) {if (List == NULL) {iterator->_next = iterator->_prev = NULL ;_EntryList = iterator ;} else {List->_prev = iterator ;iterator->_next = List ;iterator->_prev = NULL ;_EntryList = iterator ;}}// Policy == 1 :尾插entrylistelse if (Policy == 1) {if (List == NULL) {iterator->_next = iterator->_prev = NULL ;_EntryList = iterator ;} else {ObjectWaiter * Tail ;for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;assert (Tail != NULL && Tail->_next == NULL, "invariant") ;Tail->_next = iterator ;iterator->_prev = Tail ;iterator->_next = NULL ;}} // Policy == 2 :如果entrylist为空,那么插入entrylist,否则插入cxq队列else if (Policy == 2) {if (List == NULL) {iterator->_next = iterator->_prev = NULL ;_EntryList = iterator ;} else {iterator->TState = ObjectWaiter::TS_CXQ ;for (;;) {ObjectWaiter * Front = _cxq ;iterator->_next = Front ;if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {break ;}}}}// Policy == 3 :直接插入cxq else if (Policy == 3) {iterator->TState = ObjectWaiter::TS_CXQ ;for (;;) {ObjectWaiter * Tail ;Tail = _cxq ;if (Tail == NULL) {iterator->_next = NULL ;if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {break ;}} else {while (Tail->_next != NULL) Tail = Tail->_next ;Tail->_next = iterator ;iterator->_prev = Tail ;iterator->_next = NULL ;break ;}}}// 否则直接唤醒线程,让线程直接调用enterIelse {ParkEvent * ev = iterator->_event ;iterator->TState = ObjectWaiter::TS_RUN ;OrderAccess::fence() ;ev->unpark() ;}//...
}
notify主要就是根据 Policy 策略来决定以何种方式唤醒目标线程:
Policy = 0 :头插entrylist
Policy = 1 :尾插entrylist
Policy = 2 :如果entrylist为空,那么插入entrylist,否则插入cxq队列(默认策略)
Policy = 3 :直接插入cxq
其他:直接唤醒线程,让线程直接调用enterI
LockSupport - park/unpark
核心设计思想是 ”许可“,park消费许可,unpark生产许可(同一时间最多最有一个许可)。
_counter:许可
_con:条件变量
_mutex:互斥锁
LockSupport.park()
每个线程都内置了一个 parker,通过 Parker.park() 方法进行阻塞
Parker与ParkEvent的功能类型,在源码的注释中也提了,计划将Parker合并到ParkEvent
注释原文: In the future we’ll want to think about eliminating Parker and using ParkEvent instead. There’s considerable duplication between the two services.
Parker::park
void Parker::park(bool isAbsolute, jlong time) {//原子替换_counter为0,如果之前_counter为1则直接返回,不阻塞当前线程if (Atomic::xchg(0, &_counter) > 0) return;//...//如果线程被终止,也直接返回if (Thread::is_interrupted(thread, false)) {return;}//解析时间参数timespec absTime;if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at allreturn;}if (time > 0) {unpackTime(&absTime, isAbsolute, time);}//...//如果线程被终止或者获取mutex锁失败直接返回if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {return;}//走到这里说明获mutex锁成功//获锁后再次检查_counter是否大于0,如果是直接消费许可,无需等待。int status ;if (_counter > 0) { // no wait needed_counter = 0;status = pthread_mutex_unlock(_mutex);assert (status == 0, "invariant") ;OrderAccess::fence();return;}//...//调用 pthread_cond_wait 通过 _con 和 _mutex 配合使用阻塞当前线程直至满足_con条件if (time == 0) {_cur_index = REL_INDEX;status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;}//有超时时间的话,就调用 safe_cond_timedwait , 不管有没有满足条件,一旦超时都会唤醒else {_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;if (status != 0 && WorkAroundNPTLTimedWaitHang) {pthread_cond_destroy (&_cond[_cur_index]) ;pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());}}//设置_counter为0并释放 mutex 锁_counter = 0 ;status = pthread_mutex_unlock(_mutex) ;//...
}
pthread_cond_wait :阻塞当前线程,直至条件满足。并且阻塞后会自动释放锁,唤醒后又会自动获取锁。
为什么 pthread_cond_wait 需要搭配互斥锁使用?
条件不成立会进入阻塞,假如在进入阻塞的这个期间,条件又成立了,此时最终的结果就是条件成立,但一直在阻塞。同理,唤醒的时候也需搭配互斥锁才能保证不漏掉临界条件。
总结:
1:将许可置为0,同时检查之前许可是否为1,如果为1直接返回
2:获取mutex互斥锁
3:在条件变量上阻塞当前线程(阻塞会自动释放锁,唤醒会自动获取锁)
4:线程被唤醒后重置许可为0,并释放互斥锁
LockSupport.unPark()
同理
Parker::unpark
void Parker::unpark() {int s, status ;//获取mutex锁status = pthread_mutex_lock(_mutex);assert (status == 0, "invariant") ;s = _counter;//设置许可为1_counter = 1;//如果许可原本小于1表示线程可能被阻塞(parked),需要唤醒线程。if (s < 1) {//如果线程确实被阻塞(即 _cur_index 不等于-1)//调用 pthread_cond_signal 唤醒线程if (_cur_index != -1) { if (WorkAroundNPTLTimedWaitHang) {status = pthread_cond_signal (&_cond[_cur_index]);assert (status == 0, "invariant");status = pthread_mutex_unlock(_mutex);assert (status == 0, "invariant");} else {int index = _cur_index;status = pthread_mutex_unlock(_mutex);assert (status == 0, "invariant");status = pthread_cond_signal (&_cond[index]);assert (status == 0, "invariant");}}//反之什么都不做,直接释放锁返回 else {pthread_mutex_unlock(_mutex);assert (status == 0, "invariant") ;}}//如果许可原本为1,什么都不做,直接释放锁返回 else {pthread_mutex_unlock(_mutex);assert (status == 0, "invariant") ;}
}
总结:
1:获取互斥锁
2:置许可为1
3:唤醒在条件变量上等待的线程
4:释放互斥锁
wait/notify是一种等待/通知机制。等待啥?线程对互斥资源的等待。通知啥?通知线程互斥资源已经没人在用可以去抢占了。因为描述的是对互斥资源的竞争,所以wait/notify在object上(任何对象都可以是互斥资源)。
而park/unPark是一种对线程的精准控制,他更多的是描述线程之间的先后顺序(比如生产者线程和消费者线程)。
park/unPark相对于wait/notify
更直观,以thread为操作对象。
更精准,可以指定唤醒那个线程。
更简单,无需在synchronized代码块内。
更灵活,unpark方法可以在park方法前调用。