Posts p14 Linux高性能服务器编程-- 多线程编程
Post
Cancel

p14 Linux高性能服务器编程-- 多线程编程

1. 常见API

基本API

  1. pthread_create()

    int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);

  2. pthread_exit(void *retval);

    线程函数退出时最好调用一下函数确保安全,干净的退出。

  3. pthread_join() 回收其他线程

  4. pthread_cancel 异常终止一个线程,即取消线程

  5. 接收到请求取消的目标线程可以决定是否取消以及如何取消

    1
    2
    
     pthread_setcancelstate();
     pthread_setcanceltype();
    

常见属性

p273

2. 多线程间的同步问题

本文讨论三种线程同步机制:

  • POSIX信号量

  • 互斥量

  • 条件变量

下文详细说明

3. POSIX信号量

在Linux中,信号量的API有两组,一组是进程间的通信 IPC信号量,还有一组用于线程的POSIX信号量。两组接口类似,信号量语义完全相同

3.1 信号量

信号量就是一个停车场。当前值是停车场里还剩下多少个空车位。最大值是停车场里最多能容纳多少个车位。当汽车进入停车场时,首先要在门口排队(sem_wait),得到进入许可后才能进入。排队顺序原则上先到先得。每进一辆车,停车场就少了1个停车位,即信号量当前值-1。当前值为0时,停车场停满了,所有车不得进入统统在门口排队等。当一辆车离开后,释放其所占据的停车位(sem_post),信号量当前值+1信号量值得到释放后,如果门口有正在排队的车,那么就放进来,每放进来一个就重复前面的步骤。

3.2 基本API

  1. int sem_init()

  2. int sem_destroy()

  3. sem_wait()

  4. sem_trywait()

  5. sem_post()

4 互斥锁

4.1 基本API

  1. int pthread_mutex_init(); // 初始化

  2. int pthread_mutex_destroy(); // 销毁锁

  3. int pthread_mutex_lock(); // 如果已经加锁 则阻塞

  4. int pthread_mutex_trylock(); // 尝试加锁 如果没锁 则加锁 反之 返回错误码

  5. int pthread_mutex_unlock();

4.2 互斥锁属性

p277

4.3 死锁举例demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//
// Created by xm on 2022/4/20.
//

#include <pthread.h>
#include <unistd.h>
#include <stdio.h>


int a = 0;
int b = 0;

pthread_mutex_t mutex_a;
pthread_mutex_t mutex_b;


void* another(void *arg){
    pthread_mutex_lock(&mutex_b);
    printf("in child thread, got mutex b ,waiting for mutex a\n");
    sleep(5);
    ++b;
    pthread_mutex_lock(&mutex_a);
    b += a ++;
    pthread_mutex_unlock(&mutex_a);
    pthread_mutex_unlock(&mutex_b);
    pthread_exit(NULL);
}

int main(){
    pthread_t id;

    pthread_mutex_init(&mutex_a,NULL);
    pthread_mutex_init(&mutex_b,NULL);
    pthread_create(&id,NULL,another,NULL);

    pthread_mutex_lock(&mutex_a);
    printf("in parent thread , got mutex a , waiting for mutex b\n");
    sleep(5);
    ++a;
    pthread_mutex_lock(&mutex_b);
    a += b++;
    pthread_mutex_unlock(&mutex_b);
    pthread_mutex_unlock(&mutex_a);

    pthread_join(id,NULL);
    pthread_mutex_destroy(&mutex_a);
    pthread_mutex_destroy(&mutex_b);
    return 0;
}

5. 条件变量

互斥锁用于同步线程对共享数据的访问,条件变量用于线程之间同步共享数据的值。当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程

条件变量本身不是锁!但它也可以造成线程阻塞。通常与互斥锁配合使用。给多线程提供一个会合的场所。

5.1 常见API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1.  // 初始化一个条件变量
int pthread_cond_init(pthread_cond_t *restrict cond,
    const pthread_condattr_t *restrict attr);

2. // 释放一个条件变量
int pthread_cond_destroy(pthread_cond_t *cond);

3. // 等待条件满足
/*这个重点说一下,pthread_cond_wait在调用前必须加锁!! 必须加锁,
调用wait后由wait自动释放锁供别的进程修改条件变量connd,并进入阻塞状态。 
 被唤醒后又必须重新获得mutex锁*/

 /*
 pthread_cond_wait 其主要流程如下:
 1. 释放互斥锁
 2. 等待条件
 3. 条件被触发
 4. 给互斥锁加锁  //3 4 为原子操作
 */
int pthread_cond_wait(pthread_cond_t *restrict cond,
    pthread_mutex_t *restrict mutex);

4. // 唤醒至少一个等待条件的线程
int pthread_cond_signal(pthread_cond_t *cond);

5. // 唤醒所有等待条件的线程
int pthread_cond_broadcast(pthread_cond_t *cond);

5.2 一个举例demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

// 消息结构
struct msg {
    struct msg *next;
    int data;       // 消息数据
};

struct msg *queue;  // 消息队列
pthread_cond_t qcond = PTHREAD_COND_INITIALIZER;    // 简化初始化条件变量和互斥体
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

// 随机数范围[mi, ma]
int randint(int mi, int ma) {
    double r = (double)rand() * (1.0 / ((double)RAND_MAX + 1.0));
    r *= (double)(ma - mi) + 1.0;
    return (int)r + mi;
}

// 打印消息
void print_msg(struct msg *m) {
    printf(">>>>msg: %d\n", m->data);
}

// 压入消息
void push_msg(struct msg *m) {
    pthread_mutex_lock(&qlock);
    m->next = queue;
    queue = m;
    pthread_mutex_unlock(&qlock);
    // 通知条件满足
    pthread_cond_signal(&qcond);
}

// 生产者线程:
void* product(void *data) {
    while (1) {
        usleep(randint(1000*100, 1000*200));
        struct msg *m = malloc(sizeof(*m));
        memset(m, 0, sizeof(*m));
        m->data = randint(0, 1000);
        push_msg(m);
    }
}

// 弹出消息
struct msg* pop_msg() {
    struct msg *m;
    pthread_mutex_lock(&qlock);
    // 等待条件满足
    while (queue == NULL) pthread_cond_wait(&qcond, &qlock);
    m = queue;
    queue = m->next;
    pthread_mutex_unlock(&qlock);
    return m;
}

// 消费者线程
void* consum(void *data) {
    whlie (1) {
        struct msg *m = pop_msg();
        print_msg(m);
        free(m);
    }
}

int main() {
#define PRO_NUM 3
#define CON_NUM 3
    pthread_t tid_p[PRO_NUM];
    pthread_t tid_c[CON_NUM];

    int i;
    for (i = 0; i < PRO_NUM; ++i) {
        pthread_create(&tid_p[i], NULL, product, NULL);
    }
    for (i = 0; i < CON_NUM; ++i) {
        pthread_create(&tid_c[i], NULL, consum, NULL);
    }


    for (i = 0; i < PRO_NUM; ++i) {
        pthread_join(tid_p[i], NULL);
    }
    for (i = 0; i < CON_NUM; ++i) {
        pthread_join(tid_c[i], NULL);
    }
    return 0;
}

6. 封装上述三种工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//
// Created by xm on 2022/4/20.
//

#ifndef LINUX_LOCKER_H
#define LINUX_LOCKER_H

#include <exception>
#include <pthread.h>
#include <semaphore.h>  // 信号量包

/*
 * 封装信号量类
 */
class sem{
public:
    // 初始化信号量
    sem(){
        if (sem_init(&m_sem,0,0) != 0){
            throw std::exception();
        }
    }
    // 销毁信号量
    ~sem(){
        sem_destroy(&m_sem);
    }
    // 等待信号量
    bool wait(){
        //如果m_sem为0  阻塞等待
        return sem_wait(&m_sem) == 0;
    }
    // 新增信号量
    bool  post(){
        return sem_post(&m_sem) == 0;
    }
private:
    sem_t m_sem;
};



// 封装的互斥锁类
class locker{
public:
    locker(){
        if (pthread_mutex_init(&m_mutex,NULL) != 0){
            throw std::exception();
        }
    }
    ~locker(){
        pthread_mutex_destroy(&m_mutex);
    }
    // 获得互斥锁
    bool lock(){
        return pthread_mutex_lock(&m_mutex);
    }
    // 释放互斥锁
    bool unlock(){
        return pthread_mutex_unlock(&m_mutex);
    }
private:
    pthread_mutex_t m_mutex;
};

// 封装条件变量类
class cond{
public:
    cond(){
        if (pthread_mutex_init(&m_mutex,NULL) != 0)
            throw std::exception();
        if (pthread_cond_init(&m_cond,NULL) != 0){
            pthread_mutex_destroy(&m_mutex);
            throw std::exception();
        }
    }
    ~cond(){
        pthread_cond_destroy(&m_cond);
        pthread_mutex_destroy(&m_mutex);
    }

    // 等待条件变量
    bool wait(){
        int ret = 0;
        pthread_mutex_lock(&m_mutex);
        ret = pthread_cond_wait(&m_cond,&m_mutex);
        pthread_mutex_unlock(&m_mutex);
        return ret == 0;
    }
    // 唤醒等待条件变量的线程
    bool signal(){
        return pthread_cond_signal(&m_cond);
    }
private:
    pthread_mutex_t m_mutex;
    pthread_cond_t m_cond;
};


#endif //LINUX_LOCKER_H

7.多线程环境

7.1 可重入函数

如果一个函数能被多线程同时调用切不发生竞态条件,则我们称他为线程安全的,或者说他是可重入函数

7.2 线程中的进程

问题

试想一下,在父进程中创建的线程锁,在fork()后的子进程中, 子进程再对其加锁,会出现什么情况?

子进程将自动继承父进程中的互斥锁,也就是说,父进程中锁住的互斥锁,在子进程中也是锁住的

书中说对同一把锁重复加锁会一直阻塞,可是demo并没有阻塞 很奇怪

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//
// Created by xm on 2022/4/20.
//

#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <wait.h>

pthread_mutex_t mutex;


// 子线程运行函数,它先获得mutex锁  5s后释放锁
void *another(void* arg){
    printf("in child thread, lock th mutex\n");
    pthread_mutex_lock(&mutex);
    sleep(5);
    pthread_mutex_unlock(&mutex);
}

int main(){
    pthread_mutex_init(&mutex,NULL);
    pthread_t id;
    pthread_create(&id,NULL,another,NULL);
    sleep(5);

    int pid = fork();
    if (pid < 0){
        perror("fork error");
        return 1;
    } else if (pid == 0){
        printf("i am child process , I want got lock\n");
        // 子进程从父进程继承了额互斥锁mutex的状态,该互斥锁处于锁住的状态
        // 是由父进程中的子线程锁住的  因此 下面的锁会一直处于阻塞 ???  其实并没有一直阻塞 不知道书上的例子为什么会这样写
        pthread_mutex_lock(&mutex);
        printf(" i can not run to here ?");
        pthread_mutex_unlock(&mutex);
        exit(0);
    } else{
        wait(NULL);
    }
    pthread_join(id,NULL);
    pthread_mutex_destroy(&mutex);
    return 0;
}

pthread_atfork()

pthread_atfork(void (*preparten)(void),void(*parent)(void),void(*child)(void))

该函数解决上述子进程锁一直阻塞问题。一共三个句柄

  • preparent
    fork() 调用前执行

  • parent fork() 后父进程执行

  • child fork() 后子进程执行

1
2
3
4
5
6
7
void prepare(){
    pthread_mutex_lock(&mutex);
}
void infork(){
    pthread_mutex_unlock(&mutesx);
}
pthread_atfork(prepare,infork,infork);

7.3 线程与信号

每个线程都可以独立的设置信号挡板,这里略了。

int pthread_sigmask(int how, const sigset_t *set, sigset_t *oldset);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/*
 * 用一个线程处理所有信号
 */

#define handle_error_en(en, msg) \
               do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)

static void *sig_thread(void *arg){
    sigset_t *set = (sigset_t *) arg;
    int s ,sig ;
    for(;;){
        // 调用sigwait等待信号
        s = sigwait(set,&sig);
        if (s != 0)
            handle_error_en(s,"sigwait");
        printf("Signal handing thread get signal &d\n",sig);
    }
}

int main(){
    pthread_t thread;
    sigset_t set;
    int s;

    // 在主线程中设置信号掩码
    sigemptyset(&set);
    sigaddset(&set,SIGQUIT);
    sigaddset(&set,SIGUSR1);  // 用户定义的信号
    s = pthread_sigmask(SIG_BLOCK,&set,NULL);
    if (s != 0){
        handle_error_en(s,"pthread_sigmask");
    }
    s = pthread_create(&thread,NULL,&sig_thread,(void *)&set);
    
    pause();
}
This post is licensed under CC BY 4.0 by the author.

Contents

Trending Tags