加入收藏 | 设为首页 | 会员中心 | 我要投稿 财气旺网 - 财气网 (https://www.caiqiwang.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

linux c 开启线程池,Linux线程池(C语言描述)

发布时间:2022-10-20 14:31:59 所属栏目:Linux 来源:网络
导读: 创建线程或者进程的开销是很大的,为了防止频繁的创建线程,提高程序的运行效率,往往都会建立一个线程池用于多线程程序的调度
下面的程序就是完整的线程池实现,主要采用互斥量和条件变量

创建线程或者进程的开销是很大的,为了防止频繁的创建线程,提高程序的运行效率,往往都会建立一个线程池用于多线程程序的调度

下面的程序就是完整的线程池实现,主要采用互斥量和条件变量实现同步

首先定义头文件threadpool.h

在该文件中定义了线程池的数据结构和所有的函数

#ifndef THREADPOOL_H_

#define THREADPOOL_H_

#include

#include

#include

#include

/**

* 线程体数据结构

*/

typedef struct runner

{

void (*callback)(void* arg); // 回调函数指针

void* arg; // 回调函数的参数

struct runner* next;

} thread_runner;

/**

* 线程池数据结构

*/

typedef struct

{

pthread_mutex_t mutex; //互斥量

pthread_cond_t cond; // 条件变量

thread_runner* runner_head; // 线程池中所有等待任务的头指针

thread_runner* runner_tail; // 线程池所有等待任务的尾指针

int shutdown; // 线程池是否销毁

pthread_t* threads; // 所有线程

int max_thread_size; //线程池中允许的活动线程数目

} thread_pool;

/**

* 线程体

*/

void run(void *arg);

/**

* 初始化线程池

* 参数:

* pool:指向线程池结构有效地址的动态指针

* max_thread_size:最大的线程数

*/

void threadpool_init(thread_pool* pool, int max_thread_size);

/**

* 向线程池加入任务

* 参数:

* pool:指向线程池结构有效地址的动态指针

* callback:线程回调函数

* arg:回调函数参数

*/

void threadpool_add_runner(thread_pool* pool, void (*callback)(void *arg), void *arg);

/**

* 销毁线程池

* 参数:

* ppool:指向线程池结构有效地址的动态指针地址(二级指针),销毁后释放内存,该指针为NULL

*/

void threadpool_destroy(thread_pool** ppool);

#endif

然后是函数实现threadpool.h

该文件实现了threadpool.h的函数定义

#include "threadpool.h"

#define DEBUG 1

/**

* 线程体

*/

void run(void *arg)

{

thread_pool* pool = (thread_pool*) arg;

while (1)

{

// 加锁

pthread_mutex_lock(&(pool->mutex));

#ifdef DEBUG

printf("run-> locked\n");

#endif

linux c线程池_停止线程池中的线程_线程池linux

// 如果等待队列为0并且线程池未销毁线程池linux,则处于阻塞状态

while (pool->runner_head == NULL && !pool->shutdown)

{

pthread_cond_wait(&(pool->cond), &(pool->mutex));

}

//如果线程池已经销毁

if (pool->shutdown)

{

// 解锁

pthread_mutex_unlock(&(pool->mutex));

#ifdef DEBUG

printf("run-> unlocked and thread exit\n");

#endif

pthread_exit(NULL);

}

// 取出链表中的头元素

thread_runner *runner = pool->runner_head;

pool->runner_head = runner->next;

// 解锁

pthread_mutex_unlock(&(pool->mutex));

#ifdef DEBUG

printf("run-> unlocked\n");

#endif

// 调用回调函数,执行任务

(runner->callback)(runner->arg);

free(runner);

runner = NULL;

#ifdef DEBUG

printf("run-> runned and free runner\n");

#endif

}

pthread_exit(NULL);

}

/**

* 初始化线程池

* 参数:

* pool:指向线程池结构有效地址的动态指针

* max_thread_size:最大的线程数

*/

void threadpool_init(thread_pool* pool, int max_thread_size)

{

// 初始化互斥量

pthread_mutex_init(&(pool->mutex), NULL);

// 初始化条件变量

pthread_cond_init(&(pool->cond), NULL);

pool->runner_head = NULL;

pool->runner_tail = NULL;

pool->max_thread_size = max_thread_size;

pool->shutdown = 0;

// 创建所有分离态线程

pool->threads = (pthread_t *) malloc(max_thread_size * sizeof(pthread_t));

int i = 0;

for (i = 0; i < max_thread_size; i++)

{

pthread_attr_t attr;

pthread_attr_init(&attr);

pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

pthread_create(&(pool->threads[i]), &attr, (void*) run, (void*) pool);

}

#ifdef DEBUG

printf("threadpool_init-> create %d detached thread\n", max_thread_size);

#endif

}

/**

* 向线程池加入任务

* 参数:

* pool:指向线程池结构有效地址的动态指针

* callback:线程回调函数

* arg:回调函数参数

*/

void threadpool_add_runner(thread_pool* pool, void (*callback)(void *arg), void *arg)

{

// 构造一个新任务

thread_runner *newrunner = (thread_runner *) malloc(sizeof(thread_runner));

newrunner->callback = callback;

newrunner->arg = arg;

newrunner->next = NULL;

// 加锁

pthread_mutex_lock(&(pool->mutex));

#ifdef DEBUG

printf("threadpool_add_runner-> locked\n");

#endif

// 将任务加入到等待队列中

if (pool->runner_head != NULL)

{

pool->runner_tail->next = newrunner;

pool->runner_tail = newrunner;

}

else

{

pool->runner_head = newrunner;

pool->runner_tail = newrunner;

}

// 解锁

pthread_mutex_unlock(&(pool->mutex));

#ifdef DEBUG

printf("threadpool_add_runner-> unlocked\n");

#endif

// 唤醒一个等待线程

pthread_cond_signal(&(pool->cond));

#ifdef DEBUG

printf("threadpool_add_runner-> add a runner and wakeup a waiting thread\n");

#endif

}

/**

* 销毁线程池

* 参数:

* ppool:指向线程池结构有效地址的动态指针地址(二级指针)

*/

void threadpool_destroy(thread_pool** ppool)

{

thread_pool *pool = *ppool;

// 防止2次销毁

if (!pool->shutdown)

{

pool->shutdown = 1;

// 唤醒所有等待线程,线程池要销毁了

pthread_cond_broadcast(&(pool->cond));

// 等待所有线程中止

sleep(1);

#ifdef DEBUG

printf("threadpool_destroy-> wakeup all waiting threads\n");

#endif

// 回收空间

free(pool->threads);

// 销毁等待队列

thread_runner *head = NULL;

while (pool->runner_head != NULL)

{

head = pool->runner_head;

pool->runner_head = pool->runner_head->next;

free(head);

}

#ifdef DEBUG

printf("threadpool_destroy-> all runners freed\n");

#endif

/*条件变量和互斥量也别忘了销毁*/

pthread_mutex_destroy(&(pool->mutex));

pthread_cond_destroy(&(pool->cond));

#ifdef DEBUG

printf("threadpool_destroy-> mutex and cond destoryed\n");

#endif

free(pool);

(*ppool) = NULL;

#ifdef DEBUG

printf("threadpool_destroy-> pool freed\n");

#endif

}

}

(编辑:财气旺网 - 财气网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!