您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Linux如何实现C线程池
## 1. 线程池概述
### 1.1 什么是线程池
线程池(Thread Pool)是一种多线程处理形式,它预先创建一组线程并保存在内存中,当有任务到来时,直接从池中取出空闲线程执行任务,任务完成后线程返回池中等待下一次任务分配,而不是立即销毁。这种技术避免了频繁创建和销毁线程的开销。
### 1.2 为什么需要线程池
在传统多线程编程中,我们通常为每个任务创建一个新线程,这种方式存在明显缺陷:
1. **线程创建销毁开销大**:每次创建/销毁线程涉及系统调用和资源分配
2. **资源耗尽风险**:无限制创建线程可能导致系统资源耗尽
3. **调度开销**:大量线程会增加OS调度负担
4. **响应延迟**:任务到来时需等待线程创建
线程池通过重用固定数量的线程解决了这些问题,具有以下优势:
- 降低资源消耗
- 提高响应速度
- 提高线程的可管理性
- 提供更强大的功能扩展
### 1.3 线程池的应用场景
线程池技术广泛应用于:
1. 高并发网络服务器(如Web服务器)
2. 数据库连接池
3. 批处理系统
4. 异步任务处理系统
5. 需要限制并发线程数的场景
## 2. 线程池的核心组件
一个完整的线程池实现通常包含以下核心组件:
### 2.1 任务队列(Task Queue)
- 存储待处理任务的容器
- 通常实现为先进先出(FIFO)的队列
- 需要线程安全保证(互斥锁+条件变量)
### 2.2 工作线程(Worker Threads)
- 实际执行任务的线程集合
- 线程数量可固定或动态调整
- 每个线程循环获取并执行任务
### 2.3 线程池管理器(Pool Manager)
- 负责线程池的创建、销毁
- 管理线程生命周期
- 监控线程池状态
- 可能包含动态调整线程数量的逻辑
### 2.4 同步机制
- 互斥锁(Mutex):保护共享资源(如任务队列)
- 条件变量(Condition Variable):线程间通信/通知
- 信号量(可选):控制并发量
## 3. Linux下的线程基础
### 3.1 POSIX线程(pthread)
Linux通过POSIX线程库(pthread)提供线程支持,主要API包括:
```c
#include <pthread.h>
// 创建线程
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
// 终止线程
void pthread_exit(void *retval);
// 等待线程结束
int pthread_join(pthread_t thread, void **retval);
// 线程取消
int pthread_cancel(pthread_t thread);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_destroy(pthread_cond_t *cond);
typedef struct {
void (*function)(void *); // 任务函数指针
void *arg; // 任务参数
} threadpool_task_t;
typedef struct {
pthread_mutex_t lock; // 互斥锁
pthread_cond_t notify; // 条件变量
pthread_t *threads; // 线程数组
threadpool_task_t *queue; // 任务队列
int thread_count; // 线程数量
int queue_size; // 队列大小
int head; // 队头索引
int tail; // 队尾索引
int count; // 当前任务数
int shutdown; // 关闭标志
int started; // 已启动线程数
} threadpool_t;
threadpool_t *threadpool_create(int thread_count, int queue_size) {
if(thread_count <= 0 || thread_count > MAX_THREADS ||
queue_size <= 0 || queue_size > MAX_QUEUE) {
return NULL;
}
threadpool_t *pool = (threadpool_t *)malloc(sizeof(threadpool_t));
if(pool == NULL) {
return NULL;
}
// 初始化成员
pool->thread_count = 0;
pool->queue_size = queue_size;
pool->head = pool->tail = pool->count = 0;
pool->shutdown = pool->started = 0;
// 分配线程数组和任务队列
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
pool->queue = (threadpool_task_t *)malloc(
sizeof(threadpool_task_t) * queue_size);
// 初始化互斥锁和条件变量
if(pthread_mutex_init(&(pool->lock), NULL) != 0 ||
pthread_cond_init(&(pool->notify), NULL) != 0 ||
pool->threads == NULL || pool->queue == NULL) {
if(pool) threadpool_free(pool);
return NULL;
}
// 创建工作线程
for(int i = 0; i < thread_count; ++i) {
if(pthread_create(&(pool->threads[i]), NULL,
threadpool_thread, (void*)pool) != 0) {
threadpool_destroy(pool, 0);
return NULL;
}
pool->thread_count++;
pool->started++;
}
return pool;
}
static void *threadpool_thread(void *threadpool) {
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
for(;;) {
// 加锁访问共享数据
pthread_mutex_lock(&(pool->lock));
// 无任务且不关闭时等待
while((pool->count == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
// 立即关闭或优雅关闭且无任务时退出
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->count == 0))) {
break;
}
// 获取任务
task.function = pool->queue[pool->head].function;
task.arg = pool->queue[pool->head].arg;
// 更新队列
pool->head = (pool->head + 1) % pool->queue_size;
pool->count--;
// 解锁
pthread_mutex_unlock(&(pool->lock));
// 执行任务
(*(task.function))(task.arg);
}
// 线程退出处理
pool->started--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
return NULL;
}
int threadpool_add(threadpool_t *pool, void (*function)(void *), void *arg) {
int err = 0;
int next;
if(pool == NULL || function == NULL) {
return threadpool_invalid;
}
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
next = (pool->tail + 1) % pool->queue_size;
// 队列已满
if(pool->count == pool->queue_size) {
err = threadpool_queue_full;
goto out;
}
// 已关闭
if(pool->shutdown) {
err = threadpool_shutdown;
goto out;
}
// 添加任务到队列
pool->queue[pool->tail].function = function;
pool->queue[pool->tail].arg = arg;
pool->tail = next;
pool->count += 1;
// 通知工作线程
if(pthread_cond_signal(&(pool->notify)) != 0) {
err = threadpool_lock_failure;
goto out;
}
out:
if(pthread_mutex_unlock(&pool->lock)) != 0) {
err = threadpool_lock_failure;
}
return err;
}
int threadpool_destroy(threadpool_t *pool, int flags) {
int i, err = 0;
if(pool == NULL) {
return threadpool_invalid;
}
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
do {
// 已设置关闭标志
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
pool->shutdown = (flags & threadpool_graceful) ?
graceful_shutdown : immediate_shutdown;
// 唤醒所有线程
if((pthread_cond_broadcast(&(pool->notify)) != 0 ||
(pthread_mutex_unlock(&(pool->lock)) != 0) {
err = threadpool_lock_failure;
break;
}
// 等待所有线程结束
for(i = 0; i < pool->thread_count; ++i) {
if(pthread_join(pool->threads[i], NULL) != 0) {
err = threadpool_thread_failure;
}
}
} while(0);
// 仅当成功时才释放资源
if(!err) {
threadpool_free(pool);
}
return err;
}
// threadpool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
typedef struct threadpool_t threadpool_t;
typedef enum {
threadpool_invalid = -1,
threadpool_lock_failure = -2,
threadpool_queue_full = -3,
threadpool_shutdown = -4,
threadpool_thread_failure = -5
} threadpool_error_t;
typedef enum {
immediate_shutdown = 1,
graceful_shutdown = 2
} threadpool_shutdown_t;
threadpool_t *threadpool_create(int thread_count, int queue_size);
int threadpool_add(threadpool_t *pool, void (*function)(void *), void *arg);
int threadpool_destroy(threadpool_t *pool, int flags);
int threadpool_free(threadpool_t *pool);
#endif
// threadpool.c
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include "threadpool.h"
#define MAX_THREADS 64
#define MAX_QUEUE 65536
// ... 前面列出的实现代码 ...
static int threadpool_free(threadpool_t *pool) {
if(pool == NULL || pool->started > 0) {
return -1;
}
if(pool->threads) {
free(pool->threads);
free(pool->queue);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->notify));
}
free(pool);
return 0;
}
基本线程池使用固定数量的线程,更高级的实现可以动态调整:
通过实现优先级队列而非简单FIFO队列:
扩展任务结构以支持结果返回:
typedef struct {
void (*function)(void *);
void *arg;
void *result; // 结果存储
pthread_mutex_t lock; // 结果锁
pthread_cond_t finished; // 完成条件
int is_finished; // 完成标志
} future_task_t;
线程数 = CPU核心数 * (1 + 等待时间/计算时间)
void handle_request(void *arg) {
int client_fd = *(int *)arg;
free(arg);
// 模拟处理HTTP请求
char response[] = "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World!";
write(client_fd, response, sizeof(response)-1);
close(client_fd);
}
int main() {
threadpool_t *pool = threadpool_create(4, 1024);
int server_fd = create_server_socket(8080);
while(1) {
int *client_fd = malloc(sizeof(int));
*client_fd = accept(server_fd, NULL, NULL);
threadpool_add(pool, handle_request, client_fd);
}
threadpool_destroy(pool, graceful_shutdown);
return 0;
}
typedef struct {
int start;
int end;
double *result;
} compute_task_t;
void compute_partial_sum(void *arg) {
compute_task_t *task = (compute_task_t *)arg;
double sum = 0.0;
for(int i = task->start; i <= task->end; i++) {
sum += 1.0/(i*i);
}
*(task->result) = sum;
free(task);
}
double compute_pi(int terms, int threads) {
threadpool_t *pool = threadpool_create(threads, threads);
double *partial_sums = calloc(threads, sizeof(double));
int chunk_size = terms / threads;
for(int i = 0; i < threads; i++) {
compute_task_t *task = malloc(sizeof(compute_task_t));
task->start = i * chunk_size + 1;
task->end = (i == threads-1) ? terms : (i+1)*chunk_size;
task->result = &partial_sums[i];
threadpool_add(pool, compute_partial_sum, task);
}
threadpool_destroy(pool, graceful_shutdown);
double sum = 0.0;
for(int i = 0; i < threads; i++) {
sum += partial_sums[i];
}
free(partial_sums);
return sqrt(6 * sum);
}
| 特性 | 线程池 | 协程
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。