在《操作系统同步原语》 这篇文章中,介绍了操作系统在面对 进程/线程 间同步的时候,所支持的一些同步原语,其中 semaphore 信号量 和 mutex 互斥量是最重要的同步原语。
在使用基本的 mutex 进行并发控制时,需要程序员非常小心地控制 mutex 的 down 和 up 操作,否则很容易引起死锁等问题。为了更容易地编写出正确的并发程序,所以在 mutex 和 semaphore 的基础上,提出了更高层次的同步原语 monitor,管程就可以对开发者屏蔽掉这些手动细节,在语言内部实现,更加简单易用。
不过需要注意的是,操作系统本身并不支持 monitor 机制,实际上,monitor 是属于编程语言的范畴,当你想要使用 monitor 时,先了解一下语言本身是否支持 monitor 原语,例如 C 语言它就不支持 monitor,Java 语言支持 monitor。
一般的 monitor 实现模式是编程语言在语法上提供语法糖,而如何实现 monitor 机制,则属于编译器的工作。
JAVA管程
在Java使用是的是Mesa 梅莎
管程模型。其结构如下图所示
- 多个线程进入管程的入口队列e(_EntryList),并试图获取临界区锁。获取到锁的线程进入临界区,其他线程仍然在e中。
- 如果在业务代码里调用Object锁对象的wait()方法(底层则是调用ObjectMonitor(java管程)的wait原语),该线程阻塞,释放临界区的锁,离开临界区并进入q(_WaitSet)。
- 临界区执行完毕后调用notify原语(相当于signal),唤醒q中的一个线程。执行完毕的线程释放锁,并离开管程的作用域。
- 被唤醒的线程进入队列e,返回第1步重新开始获取锁。
Meas管程的特点
线程阻塞状态被唤醒之后不会立即执行,而是回到入口等待。当阻塞的线程再次获取到CPU后,会执行wait()方法后的代码。这带来的潜在问题是线程获取到CPU后,原先满足的条件可能已经不再满足,必须重新检查。所以在Mesa管程模型下编写程序时,检查条件应该用while,而不是if:
while (!condition) {
wait(a)
}
下面是从网上找到的其他人写的验证检查条件应该用while的代码
package com.test;
import java.util.ArrayList;
import java.util.List;
public class EarlyNotify extends Object {
private List<String> list = new ArrayList<String>();
public String removeItem() throws InterruptedException {
print("in removeItem() - entering");
synchronized (list) {
if (list.isEmpty()) { // 这里用if语句会发生危险
print("in removeItem() - about to wait()");
list.wait();
print("in removeItem() - done with wait()");
}
// 删除元素
String item = (String) list.remove(0);
print("in removeItem() - leaving");
return item;
}
}
public void addItem(String item) {
print("in addItem() - entering");
synchronized (list) {
// 添加元素
list.add(item);
print("in addItem() - just added: '" + item + "'");
// 添加后,通知所有线程
list.notifyAll();
print("in addItem() - just notified");
}
print("in addItem() - leaving");
}
private static void print(String msg) {
String name = Thread.currentThread().getName();
System.out.println(name + ": " + msg);
}
public static void main(String[] args) {
final EarlyNotify en = new EarlyNotify();
Runnable consumer = new Runnable() {
public void run() {
try {
String item = en.removeItem();
print("in run() - returned: '" + item + "'");
} catch (InterruptedException ix) {
print("interrupted!");
} catch (Exception x) {
print("threw an Exception!!!\n" + x);
}
}
};
Runnable producer = new Runnable() {
public void run() {
en.addItem("Hello!");
}
};
try {
// 启动第一个删除元素的线程
Thread threadConsumer1 = new Thread(consumer, "threadConsumer1");
threadConsumer1.start();
Thread.sleep(500);
// 启动第二个删除元素的线程
Thread threadConsumer2 = new Thread(consumer, "threadConsumer2");
threadConsumer2.start();
Thread.sleep(500);
// 启动增加元素的线程
Thread threadProducer = new Thread(producer, "threadProducer");
threadProducer.start();
// Thread.sleep(10000); // wait 10 seconds
// threadA1.interrupt();
// threadA2.interrupt();
} catch (InterruptedException x) {
}
}
}
运行结果
threadConsumer1: in removeItem() - entering
threadConsumer1: in removeItem() - about to wait()
threadConsumer2: in removeItem() - entering
threadConsumer2: in removeItem() - about to wait()
threadProducer: in addItem() - entering
threadProducer: in addItem() - just added: 'Hello!'
threadProducer: in addItem() - just notified
threadConsumer2: in removeItem() - done with wait()
threadConsumer2: in removeItem() - leaving
threadProducer: in addItem() - leaving
threadConsumer1: in removeItem() - done with wait()
threadConsumer2: in run() - returned: 'Hello!'
threadConsumer1: threw an Exception!!!
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
PS:Hoare管程模型在线程被唤醒后就会立即切换上下文,让被唤醒的线程先执行,但会触发更多的上下文切换操作,浪费CPU时间。
ObjectMonitor
//构造方法
ObjectMonitor() {
_header = NULL;
//获取管程锁的次数
_count = 0;
_waiters = 0,
_recursions = 0;
_object = NULL;
//持有该ObjectMonitor线程的指针
_owner = NULL;
//管程的条件变量的资源等待队列
_WaitSet = NULL;
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
//管程的入口线程队列
_EntryList = NULL ;
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
_previous_owner_tid = 0;
}
//所有阻塞的线程都会被封装成ObjectWaiter对象
class ObjectWaiter : public StackObj {
public:
enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ;
enum Sorted { PREPEND, APPEND, SORTED } ;
ObjectWaiter * volatile _next;
ObjectWaiter * volatile _prev;
Thread* _thread;
jlong _notifier_tid;
ParkEvent * _event;
volatile int _notified ;
volatile TStates TState ;
Sorted _Sorted ; // List placement disposition
bool _active ; // Contention monitoring is enabled
public:
ObjectWaiter(Thread* thread);
void wait_reenter_begin(ObjectMonitor *mon);
void wait_reenter_end(ObjectMonitor *mon);
};