线程池、进程池与内存池


线程池

简介

  • 用于执行大量相对短暂的任务
  • 当任务增加的时候能够动态的增加线程池中线程的数量直到达到一个阈值
  • 当任务执行完毕的时候,能够动态的销毁线程池中的线程

  • 该线程池的实现本质上也是生产者与消费模型的应用。生产者线程向任务队列中添加任务,一旦队列有任务到来,如果有等待线程就唤醒来执行任务,如果没有等待线程并且线程数没有达到阈值,就创建新线程来执行任务。

  • 计算密集型任务:线程数 = CPU个数

  • I/O密集型任务:线程数 > CPU个数

实现

Makefile

1
2
3
4
5
6
7
8
9
10
11
12
CC=gcc
CFLAGS=-Wall -g
ALL=main
OBJS=main.o threadpool.o condition.o
.c.o:
$(CC) $(CFLAGS) -c $<
main:$(OBJS)
$(CC) $(CFLAGS) $^ -o $@ -lpthread -lrt
clean:
rm -rf $(ALL) *.o

condition.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#ifndef CONDITION_H
#define CONDITION_H
#include <pthread.h>
typedef struct condition {
pthread_mutex_t pmutex;
pthread_cond_t pcond;
} condition_t;
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
int condition_signal(condition_t *cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);
#endif /* CONDITION_H */

condition.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include "condition.h"
int condition_init(condition_t *cond)
{
int status;
if ((status = pthread_mutex_init(&cond->pmutex, NULL)))
return status;
if ((status = pthread_cond_init(&cond->pcond, NULL)))
return status;
return 0;
}
int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
int condition_wait(condition_t *cond)
{
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}
int condition_signal(condition_t *cond)
{
return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t* cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
int condition_destroy(condition_t* cond)
{
int status;
if ((status = pthread_mutex_destroy(&cond->pmutex)))
return status;
if ((status = pthread_cond_destroy(&cond->pcond)))
return status;
return 0;
}

threadpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include "condition.h"
// 任务结构体,将任务放入队列由线程池中的线程来执行
typedef struct task {
void *(*run)(void *arg); // 任务回调函数
void *arg; // 回调函数参数
struct task *next;
} task_t;
// 线程池结构体
typedef struct threadpool {
condition_t ready; // 任务准备就绪或者线程池销毁通知
task_t *first; // 任务队列头指针
task_t *last; // 任务队列尾指针
int counter; // 线程池中当前线程数
int idle; // 线程池中当前正在等待任务的线程数
int max_threads; // 线程池中最大允许的线程数
int quit; // 销毁线程池的时候置1
} threadpool_t;
// 初始化线程池
void threadpool_init(threadpool_t *pool, int threads);
// 往线程池中添加任务
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
// 销毁线程池
void threadpool_destroy(threadpool_t *pool);
#endif

threadpool.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include "threadpool.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <time.h>
void *thread_routine(void *arg) {
struct timespec abstime;
int timeout;
printf("thread 0x%0x is starting\n", (int)pthread_self());
threadpool_t *pool = (threadpool_t *)arg;
while (1) {
condition_lock(&pool->ready);
timeout = 0;
pool->idle++;
// 等待队列中有任务到来或者线程池销毁通知
while (pool->first == NULL && !pool->quit) {
printf("thread 0x%0x is waiting\n", (int)pthread_self());
clock_gettime(CLOCK_REALTIME, &abstime);
abstime.tv_sec += 2;
int status = condition_timedwait(&pool->ready, &abstime);
if (status == ETIMEDOUT) {
printf("thread 0x%0x is waiting time out\n", (int)pthread_self());
timeout = -1;
break;
}
}
// 等待条件处于工作状态
pool->idle--;
if (pool->first != NULL) {
// 从队头取出任务
task_t *t = pool->first;
pool->first = t->next;
// 执行任务需要一定的时间,所以要解锁,以便生产者进程能够往队列中添加任务
// 其他消费者进程能够进入等待任务
condition_unlock(&pool->ready);
t->run(t->arg);
free(t);
condition_lock(&pool->ready);
}
// 如果等待到线程池销毁通知, 且任务都执行完毕了
if (pool->quit && pool->first == NULL) {
pool->counter--;
if (pool->counter == 0) {
condition_signal(&pool->ready);
}
condition_unlock(&pool->ready);
// 跳出循环之前要记得解锁
break;
}
//
if (timeout == -1 && pool->first == NULL) {
pool->counter--;
condition_unlock(&pool->ready);
// 跳出循环之前要记得解锁
break;
}
condition_unlock(&pool->ready);
}
printf("thread 0x%0x is exiting\n", (int)pthread_self());
return NULL;
}
// 初始化线程池
void threadpool_init(threadpool_t *pool, int threads) {
// 对线程池中各个字段进行初始化
condition_init(&pool->ready);
pool->first = NULL;
pool->last = NULL;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads;
pool->quit = 0;
}
// 往线程池中添加任务
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg) {
condition_lock(&pool->ready);
// 生成新任务
task_t *newtask = (task_t *)malloc(sizeof(task_t));
newtask->run = run;
newtask->arg = arg;
newtask->next = NULL;
// 将任务添加到队列
if (pool->first == NULL) {
pool->first = newtask;
} else {
pool->last->next = newtask;
}
pool->last = newtask;
// 如果有等待线程,则唤醒其中一个
if (pool->idle > 0) {
condition_signal(&pool->ready);
} else if (pool->counter < pool->max_threads){
// 没有等待线程,并且当前线程数不超过最大线程数
pthread_t tid;
pthread_create(&tid, NULL, thread_routine, pool);
pool->counter++;
}
condition_unlock(&pool->ready);
}
// 销毁线程池
void threadpool_destroy(threadpool_t *pool) {
if (pool->quit) {
return;
}
condition_lock(&pool->ready);
pool->quit = 1;
if (pool->counter > 0) {
if (pool->idle > 0) {
condition_broadcast(&pool->ready);
}
// 处于执行任务状态的线程,不会收到广播
// 线程池需要等待执行任务状态中的线程全部退出
while (pool->counter > 0) {
condition_wait(&pool->ready);
}
}
condition_unlock(&pool->ready);
condition_destroy(&pool->ready);
}


进程池

内存池