1. 概述
为什么不能通过动态创建子进程(线程)来实现并发服务器?
不是不行,效率不太高,所以提出了“池”技术。
当有新任务到来的时候,主进程通过哪种方式选择进程池中的某个子进程为其服务?
主进程使用某种算法主动选择子进程,随机算法,轮转算法,等。
主进程和子进程通过一个共享的工作队列来同步。子进程睡眠在一个工作队列中,主进程将任务添加到工作队列中,这将唤醒一个子进程为其服务。
进程间的传递数据
管道。
2. 半同步/半异步进程池实现
代码太多,这里不贴细节代码,代码见代码包。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 子进程类
class process {
pid_t m_pid; // 进程id
int m_pipefd[2];
}
// 线程池类 定义为模板方便复用
template< typename T >
class processpool
{
// 1.成员变量
static const int MAX_PROCESS_NUMBER = 16; // 最大允许子进程数量
static const int USER_PER_PROCESS = 65536;// 最多能处理客户端数量
static const int MAX_EVENT_NUMBER = 10000;//epoll最多能处理的事件数
int m_process_number;// 进程池中进程总数
int m_idx;// 子进程在池中的序号 从0开始
int m_epollfd;// 每个进程都有一个epoll内核事件表
int m_listenfd;// 监听socket
int m_stop;// 子进程通过m_stop来决定是否停止运行
process* m_sub_process;//保存所有子进程的描述信息
static processpool< T >* m_instance;// 进程池的静态实例
}
// 进程池类的构造函数
template< typename T >
processpool< T >::processpool( int listenfd, int process_number )
{
// 1. 创建进程数组
m_sub_process = new process[ process_number ];
// 2. 创建process_number 个子进程,并建立他们和父进程之间的管道
for( int i = 0; i < process_number; ++i )
{
int ret = socketpair( PF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd );
m_sub_process[i].m_pid = fork(); // m_pid 默认值为-1.
if( m_sub_process[i].m_pid > 0 ) //父进程
{
close( m_sub_process[i].m_pipefd[1] );
continue;
}
else// 子进程
{
close( m_sub_process[i].m_pipefd[0] );
m_idx = i;
break;
}
}
}
3. 线程池的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 1. 成员变量
private:
int m_thread_number; // 线程池中的线程数
int m_max_requests; // 请求队列中允许的最大请求数目
pthread_t* m_threads; // 描述线程池的数组 大小为m_thread_number
std::list< T* > m_workqueue;// 请求队列
locker m_queuelocker; // 保护请求队列的互斥锁
sem m_queuestat; // 是否有任务需要处理
bool m_stop; // 是否结束线程
// 2. 构造函数
template< typename T >
threadpool< T >::threadpool( int thread_number, int max_requests ) :
m_thread_number( thread_number ), m_max_requests( max_requests ), m_stop( false ), m_threads( NULL )
{
m_threads = new pthread_t[ m_thread_number ];
for ( int i = 0; i < thread_number; ++i )
{
if( pthread_create( m_threads + i, NULL, worker, this ) != 0 )
{
delete [] m_threads;
throw std::exception();
}
if( pthread_detach( m_threads[i] ) ) // 将他们设置为脱离线程
{
delete [] m_threads;
throw std::exception();
}
}
}