当前位置:首页 > 系统教程 > 正文

Linux线程池实现:从互斥、同步到日志

Linux线程池实现:从互斥、同步到日志

完整代码实现带注释详解,小白也能看懂的线程池教程

Linux线程池实现:从互斥、同步到日志 Linux线程池 互斥锁 条件变量 日志系统 第1张

在Linux系统编程中,线程池是一种常用的并发模式,它通过预先创建一组线程并复用它们来执行任务,避免了频繁创建和销毁线程的开销。本文将从零开始,带你实现一个完整的线程池,涵盖互斥锁条件变量以及简单的日志系统,并提供详细注释,确保小白也能理解。

1. 线程池基础

线程池的核心是一个任务队列和一组工作线程。主线程将任务添加到队列,工作线程不断从队列中取出任务执行。为了保证线程安全,队列的访问需要互斥锁保护,而条件变量则用于线程间的同步(如队列空时等待,有任务时唤醒)。

2. 互斥与同步机制

在Linux下,我们使用pthread库提供的pthread_mutex_t实现互斥,pthread_cond_t实现条件同步。当任务队列为空时,工作线程调用pthread_cond_wait阻塞;当有新任务加入时,主线程通过pthread_cond_signal唤醒一个等待线程。

3. 日志系统

日志可以帮助我们观察线程池运行状态。这里实现一个简单的日志宏,打印时间戳和消息到标准输出,用于调试。

4. 完整代码实现(带详细注释)

    #include #include #include #include #include #include // 日志宏:打印时间戳和消息#define LOG(level, msg) do { \    time_t now = time(NULL); \    char buf[32]; \    strftime(buf, sizeof(buf), \             "%Y-%m-%d %H:%M:%S", localtime(&now)); \    printf("[%s] [%s] %s\n", buf, level, msg); \} while(0)// 任务结构体typedef struct task {    void (function)(void); // 任务函数    void* arg;                // 函数参数    struct task* next;        // 指向下一个任务} task_t;// 线程池结构体typedef struct threadpool {    pthread_mutex_t lock;     // 互斥锁    pthread_cond_t notify;    // 条件变量    pthread_t* threads;       // 线程数组    task_t* task_head;        // 任务队列头    task_t* task_tail;        // 任务队列尾    int thread_count;         // 线程数    int shutdown;             // 是否销毁线程池    int started;              // 已启动线程数} threadpool_t;// 工作线程函数static void* worker(void* arg) {    threadpool_t* pool = (threadpool_t)arg;    task_t task;    LOG("INFO", "Worker thread started");    while (1) {        pthread_mutex_lock(&(pool->lock));        // 等待任务,直到有任务或线程池销毁        while (pool->task_head == NULL && !pool->shutdown) {            pthread_cond_wait(&(pool->notify), &(pool->lock));        }        if (pool->shutdown) {            pthread_mutex_unlock(&(pool->lock));            LOG("INFO", "Worker thread exiting");            pthread_exit(NULL);        }        // 取出任务        task = pool->task_head;        pool->task_head = task->next;        if (pool->task_head == NULL) {            pool->task_tail = NULL;        }        pthread_mutex_unlock(&(pool->lock));        // 执行任务        ((task->function))(task->arg);        free(task); // 释放任务内存    }    return NULL;}// 创建线程池threadpool_t threadpool_create(int thread_count) {    threadpool_t* pool = (threadpool_t)malloc(sizeof(threadpool_t));    if (pool == NULL) return NULL;    // 初始化互斥锁和条件变量    pthread_mutex_init(&(pool->lock), NULL);    pthread_cond_init(&(pool->notify), NULL);    pool->threads = (pthread_t)malloc(sizeof(pthread_t) * thread_count);    pool->task_head = NULL;    pool->task_tail = NULL;    pool->thread_count = thread_count;    pool->shutdown = 0;    pool->started = 0;    // 创建线程    for (int i = 0; i < thread_count; i++) {        if (pthread_create(&(pool->threads[i]), NULL, worker, (void*)pool) != 0) {            // 创建失败,销毁已创建的线程            threadpool_destroy(pool, 0);            return NULL;        }        pool->started++;    }    LOG("INFO", "Thread pool created");    return pool;}// 添加任务int threadpool_add(threadpool_t* pool, void (function)(void), void* arg) {    pthread_mutex_lock(&(pool->lock));    // 创建新任务    task_t* task = (task_t*)malloc(sizeof(task_t));    if (task == NULL) {        pthread_mutex_unlock(&(pool->lock));        return -1;    }    task->function = function;    task->arg = arg;    task->next = NULL;    // 插入队列尾部    if (pool->task_head == NULL) {        pool->task_head = task;        pool->task_tail = task;    } else {        pool->task_tail->next = task;        pool->task_tail = task;    }    // 唤醒一个工作线程    pthread_cond_signal(&(pool->notify));    pthread_mutex_unlock(&(pool->lock));    LOG("INFO", "Task added to queue");    return 0;}// 销毁线程池int threadpool_destroy(threadpool_t* pool, int graceful) {    if (pool == NULL) return -1;    pthread_mutex_lock(&(pool->lock));    pool->shutdown = (graceful ? 0 : 1); // graceful为0表示立即销毁,1表示等待任务完成    // 唤醒所有等待线程    pthread_cond_broadcast(&(pool->notify));    pthread_mutex_unlock(&(pool->lock));    // 等待所有线程结束    for (int i = 0; i < pool->started; i++) {        pthread_join(pool->threads[i], NULL);    }    // 清理资源    free(pool->threads);    // 清空剩余任务    task_t* task = pool->task_head;    while (task != NULL) {        task_t* next = task->next;        free(task);        task = next;    }    pthread_mutex_destroy(&(pool->lock));    pthread_cond_destroy(&(pool->notify));    free(pool);    LOG("INFO", "Thread pool destroyed");    return 0;}// 示例任务函数void example_task(void* arg) {    int num = (int)arg;    LOG("DEBUG", "Processing task with data: ");    printf("Task data: %d\n", num);    sleep(1); // 模拟工作}int main() {    // 创建包含3个线程的线程池    threadpool_t* pool = threadpool_create(3);    if (pool == NULL) {        LOG("ERROR", "Failed to create thread pool");        return 1;    }    // 添加5个任务    int nums[5] = {1, 2, 3, 4, 5};    for (int i = 0; i < 5; i++) {        threadpool_add(pool, example_task, (void*)&nums[i]);        usleep(100); // 微调,避免参数覆盖(仅示例,实际应动态分配内存)    }    // 等待一段时间,让任务执行    sleep(6);    // 销毁线程池(等待任务完成)    threadpool_destroy(pool, 1);    return 0;}  

5. 代码详解

互斥锁 pthread_mutex_t保护任务队列的插入和取出操作,防止数据竞争。条件变量 pthread_cond_t配合互斥锁,使线程在队列空时休眠,避免忙等待。日志宏LOG打印时间戳和级别,便于观察。

6. 编译与运行

使用gcc编译(需链接pthread):gcc -o threadpool threadpool.c -lpthread运行:./threadpool,观察日志输出和任务执行顺序。

7. 总结

本文通过一个完整的Linux线程池实现,展示了互斥锁条件变量和简单日志系统的应用。掌握这些基础,你就能应对更复杂的并发场景。建议动手运行代码,加深理解。

— 教程结束,欢迎实践 —