1. 常见API
基本API
pthread_create()
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
pthread_exit(void *retval);
线程函数退出时最好调用一下函数确保安全,干净的退出。
pthread_join() 回收其他线程
pthread_cancel 异常终止一个线程,即取消线程
接收到请求取消的目标线程可以决定是否取消以及如何取消
1 2
pthread_setcancelstate(); pthread_setcanceltype();
常见属性
p273
2. 多线程间的同步问题
本文讨论三种线程同步机制:
POSIX信号量
互斥量
条件变量
下文详细说明
3. POSIX信号量
在Linux中,信号量的API有两组,一组是进程间的通信 IPC信号量,还有一组用于线程的POSIX信号量。两组接口类似,信号量语义完全相同
3.1 信号量
信号量就是一个停车场。当前值是停车场里还剩下多少个空车位。最大值是停车场里最多能容纳多少个车位。当汽车进入停车场时,首先要在门口排队(sem_wait),得到进入许可后才能进入。排队顺序原则上先到先得。每进一辆车,停车场就少了1个停车位,即信号量当前值-1。当前值为0时,停车场停满了,所有车不得进入统统在门口排队等。当一辆车离开后,释放其所占据的停车位(sem_post),信号量当前值+1信号量值得到释放后,如果门口有正在排队的车,那么就放进来,每放进来一个就重复前面的步骤。
3.2 基本API
int sem_init()
int sem_destroy()
sem_wait()
sem_trywait()
sem_post()
4 互斥锁
4.1 基本API
int pthread_mutex_init(); // 初始化
int pthread_mutex_destroy(); // 销毁锁
int pthread_mutex_lock(); // 如果已经加锁 则阻塞
int pthread_mutex_trylock(); // 尝试加锁 如果没锁 则加锁 反之 返回错误码
int pthread_mutex_unlock();
4.2 互斥锁属性
p277
4.3 死锁举例demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//
// Created by xm on 2022/4/20.
//
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
int a = 0;
int b = 0;
pthread_mutex_t mutex_a;
pthread_mutex_t mutex_b;
void* another(void *arg){
pthread_mutex_lock(&mutex_b);
printf("in child thread, got mutex b ,waiting for mutex a\n");
sleep(5);
++b;
pthread_mutex_lock(&mutex_a);
b += a ++;
pthread_mutex_unlock(&mutex_a);
pthread_mutex_unlock(&mutex_b);
pthread_exit(NULL);
}
int main(){
pthread_t id;
pthread_mutex_init(&mutex_a,NULL);
pthread_mutex_init(&mutex_b,NULL);
pthread_create(&id,NULL,another,NULL);
pthread_mutex_lock(&mutex_a);
printf("in parent thread , got mutex a , waiting for mutex b\n");
sleep(5);
++a;
pthread_mutex_lock(&mutex_b);
a += b++;
pthread_mutex_unlock(&mutex_b);
pthread_mutex_unlock(&mutex_a);
pthread_join(id,NULL);
pthread_mutex_destroy(&mutex_a);
pthread_mutex_destroy(&mutex_b);
return 0;
}
5. 条件变量
互斥锁用于同步线程对共享数据的访问,条件变量用于线程之间同步共享数据的值。当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程
条件变量本身不是锁!但它也可以造成线程阻塞。通常与互斥锁配合使用。给多线程提供一个会合的场所。
5.1 常见API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1. // 初始化一个条件变量
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
2. // 释放一个条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
3. // 等待条件满足
/*这个重点说一下,pthread_cond_wait在调用前必须加锁!! 必须加锁,
调用wait后由wait自动释放锁供别的进程修改条件变量connd,并进入阻塞状态。
被唤醒后又必须重新获得mutex锁*/
/*
pthread_cond_wait 其主要流程如下:
1. 释放互斥锁
2. 等待条件
3. 条件被触发
4. 给互斥锁加锁 //3 4 为原子操作
*/
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
4. // 唤醒至少一个等待条件的线程
int pthread_cond_signal(pthread_cond_t *cond);
5. // 唤醒所有等待条件的线程
int pthread_cond_broadcast(pthread_cond_t *cond);
5.2 一个举例demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
// 消息结构
struct msg {
struct msg *next;
int data; // 消息数据
};
struct msg *queue; // 消息队列
pthread_cond_t qcond = PTHREAD_COND_INITIALIZER; // 简化初始化条件变量和互斥体
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
// 随机数范围[mi, ma]
int randint(int mi, int ma) {
double r = (double)rand() * (1.0 / ((double)RAND_MAX + 1.0));
r *= (double)(ma - mi) + 1.0;
return (int)r + mi;
}
// 打印消息
void print_msg(struct msg *m) {
printf(">>>>msg: %d\n", m->data);
}
// 压入消息
void push_msg(struct msg *m) {
pthread_mutex_lock(&qlock);
m->next = queue;
queue = m;
pthread_mutex_unlock(&qlock);
// 通知条件满足
pthread_cond_signal(&qcond);
}
// 生产者线程:
void* product(void *data) {
while (1) {
usleep(randint(1000*100, 1000*200));
struct msg *m = malloc(sizeof(*m));
memset(m, 0, sizeof(*m));
m->data = randint(0, 1000);
push_msg(m);
}
}
// 弹出消息
struct msg* pop_msg() {
struct msg *m;
pthread_mutex_lock(&qlock);
// 等待条件满足
while (queue == NULL) pthread_cond_wait(&qcond, &qlock);
m = queue;
queue = m->next;
pthread_mutex_unlock(&qlock);
return m;
}
// 消费者线程
void* consum(void *data) {
whlie (1) {
struct msg *m = pop_msg();
print_msg(m);
free(m);
}
}
int main() {
#define PRO_NUM 3
#define CON_NUM 3
pthread_t tid_p[PRO_NUM];
pthread_t tid_c[CON_NUM];
int i;
for (i = 0; i < PRO_NUM; ++i) {
pthread_create(&tid_p[i], NULL, product, NULL);
}
for (i = 0; i < CON_NUM; ++i) {
pthread_create(&tid_c[i], NULL, consum, NULL);
}
for (i = 0; i < PRO_NUM; ++i) {
pthread_join(tid_p[i], NULL);
}
for (i = 0; i < CON_NUM; ++i) {
pthread_join(tid_c[i], NULL);
}
return 0;
}
6. 封装上述三种工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//
// Created by xm on 2022/4/20.
//
#ifndef LINUX_LOCKER_H
#define LINUX_LOCKER_H
#include <exception>
#include <pthread.h>
#include <semaphore.h> // 信号量包
/*
* 封装信号量类
*/
class sem{
public:
// 初始化信号量
sem(){
if (sem_init(&m_sem,0,0) != 0){
throw std::exception();
}
}
// 销毁信号量
~sem(){
sem_destroy(&m_sem);
}
// 等待信号量
bool wait(){
//如果m_sem为0 阻塞等待
return sem_wait(&m_sem) == 0;
}
// 新增信号量
bool post(){
return sem_post(&m_sem) == 0;
}
private:
sem_t m_sem;
};
// 封装的互斥锁类
class locker{
public:
locker(){
if (pthread_mutex_init(&m_mutex,NULL) != 0){
throw std::exception();
}
}
~locker(){
pthread_mutex_destroy(&m_mutex);
}
// 获得互斥锁
bool lock(){
return pthread_mutex_lock(&m_mutex);
}
// 释放互斥锁
bool unlock(){
return pthread_mutex_unlock(&m_mutex);
}
private:
pthread_mutex_t m_mutex;
};
// 封装条件变量类
class cond{
public:
cond(){
if (pthread_mutex_init(&m_mutex,NULL) != 0)
throw std::exception();
if (pthread_cond_init(&m_cond,NULL) != 0){
pthread_mutex_destroy(&m_mutex);
throw std::exception();
}
}
~cond(){
pthread_cond_destroy(&m_cond);
pthread_mutex_destroy(&m_mutex);
}
// 等待条件变量
bool wait(){
int ret = 0;
pthread_mutex_lock(&m_mutex);
ret = pthread_cond_wait(&m_cond,&m_mutex);
pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
// 唤醒等待条件变量的线程
bool signal(){
return pthread_cond_signal(&m_cond);
}
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif //LINUX_LOCKER_H
7.多线程环境
7.1 可重入函数
如果一个函数能被多线程同时调用切不发生竞态条件,则我们称他为线程安全的,或者说他是可重入函数。
7.2 线程中的进程
问题
试想一下,在父进程中创建的线程锁,在fork()后的子进程中, 子进程再对其加锁,会出现什么情况?
子进程将自动继承父进程中的互斥锁,也就是说,父进程中锁住的互斥锁,在子进程中也是锁住的。
书中说对同一把锁重复加锁会一直阻塞,可是demo并没有阻塞 很奇怪。
demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//
// Created by xm on 2022/4/20.
//
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <wait.h>
pthread_mutex_t mutex;
// 子线程运行函数,它先获得mutex锁 5s后释放锁
void *another(void* arg){
printf("in child thread, lock th mutex\n");
pthread_mutex_lock(&mutex);
sleep(5);
pthread_mutex_unlock(&mutex);
}
int main(){
pthread_mutex_init(&mutex,NULL);
pthread_t id;
pthread_create(&id,NULL,another,NULL);
sleep(5);
int pid = fork();
if (pid < 0){
perror("fork error");
return 1;
} else if (pid == 0){
printf("i am child process , I want got lock\n");
// 子进程从父进程继承了额互斥锁mutex的状态,该互斥锁处于锁住的状态
// 是由父进程中的子线程锁住的 因此 下面的锁会一直处于阻塞 ??? 其实并没有一直阻塞 不知道书上的例子为什么会这样写
pthread_mutex_lock(&mutex);
printf(" i can not run to here ?");
pthread_mutex_unlock(&mutex);
exit(0);
} else{
wait(NULL);
}
pthread_join(id,NULL);
pthread_mutex_destroy(&mutex);
return 0;
}
pthread_atfork()
pthread_atfork(void (*preparten)(void),void(*parent)(void),void(*child)(void))
该函数解决上述子进程锁一直阻塞问题。一共三个句柄
preparent
fork() 调用前执行parent fork() 后父进程执行
child fork() 后子进程执行
1
2
3
4
5
6
7
void prepare(){
pthread_mutex_lock(&mutex);
}
void infork(){
pthread_mutex_unlock(&mutesx);
}
pthread_atfork(prepare,infork,infork);
7.3 线程与信号
每个线程都可以独立的设置信号挡板,这里略了。
int pthread_sigmask(int how, const sigset_t *set, sigset_t *oldset);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/*
* 用一个线程处理所有信号
*/
#define handle_error_en(en, msg) \
do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)
static void *sig_thread(void *arg){
sigset_t *set = (sigset_t *) arg;
int s ,sig ;
for(;;){
// 调用sigwait等待信号
s = sigwait(set,&sig);
if (s != 0)
handle_error_en(s,"sigwait");
printf("Signal handing thread get signal &d\n",sig);
}
}
int main(){
pthread_t thread;
sigset_t set;
int s;
// 在主线程中设置信号掩码
sigemptyset(&set);
sigaddset(&set,SIGQUIT);
sigaddset(&set,SIGUSR1); // 用户定义的信号
s = pthread_sigmask(SIG_BLOCK,&set,NULL);
if (s != 0){
handle_error_en(s,"pthread_sigmask");
}
s = pthread_create(&thread,NULL,&sig_thread,(void *)&set);
pause();
}