-
大小: 9.38KB文件类型: .zip金币: 1下载: 0 次发布日期: 2021-02-22
- 标签: threadpool thread Read AD
资源简介
threadpool
【核心代码】
/* * Copyright (c) 2016, Mathias Brossard <mathias@brossard.org>. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /** * @file threadpool.c * @brief Threadpool implementation file */ #include <stdlib.h> #include <pthread.h> #include <unistd.h> #include "threadpool.h" typedef enum { immediate_shutdown = 1, graceful_shutdown = 2 } threadpool_shutdown_t; /** * @struct threadpool_task * @brief the work struct * * @var function Pointer to the function that will perform the task. * @var argument Argument to be passed to the function. */ typedef struct { void (*function)(void *); void *argument; } threadpool_task_t; /** * @struct threadpool * @brief The threadpool struct * * @var notify Condition variable to notify worker threads. * @var threads Array containing worker threads ID. * @var thread_count Number of threads * @var queue Array containing the task queue. * @var queue_size Size of the task queue. * @var head Index of the first element. * @var tail Index of the next element. * @var count Number of pending tasks * @var shutdown Flag indicating if the pool is shutting down * @var started Number of started threads */ struct threadpool_t { pthread_mutex_t lock; pthread_cond_t notify; pthread_t *threads; threadpool_task_t *queue; int thread_count; int queue_size; int head; int tail; int count; int shutdown; int started; }; /** * @function void *threadpool_thread(void *threadpool) * @brief the worker thread * @param threadpool the pool which own the thread */ static void *threadpool_thread(void *threadpool); int threadpool_free(threadpool_t *pool); threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) { threadpool_t *pool; int i; (void) flags; if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) { return NULL; } if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { goto err; } /* Initialize */ pool->thread_count = 0; pool->queue_size = queue_size; pool->head = pool->tail = pool->count = 0; pool->shutdown = pool->started = 0; /* Allocate thread and task queue */ pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count); pool->queue = (threadpool_task_t *)malloc (sizeof(threadpool_task_t) * queue_size); /* Initialize mutex and conditional variable first */ if((pthread_mutex_init(&(pool->lock), NULL) != 0) || (pthread_cond_init(&(pool->notify), NULL) != 0) || (pool->threads == NULL) || (pool->queue == NULL)) { goto err; } /* Start worker threads */ for(i = 0; i < thread_count; i ) { if(pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool) != 0) { threadpool_destroy(pool, 0); return NULL; } pool->thread_count ; pool->started ; } return pool; err: if(pool) { threadpool_free(pool); } return NULL; } int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument, int flags) { int err = 0; int next; (void) flags; if(pool == NULL || function == NULL) { return threadpool_invalid; } if(pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } next = (pool->tail 1) % pool->queue_size; do { /* Are we full ? */ if(pool->count == pool->queue_size) { err = threadpool_queue_full; break; } /* Are we shutting down ? */ if(pool->shutdown) { err = threadpool_shutdown; break; } /* Add task to queue */ pool->queue[pool->tail].function = function; pool->queue[pool->tail].argument = argument; pool->tail = next; pool->count = 1; /* pthread_cond_broadcast */ if(pthread_cond_signal(&(pool->notify)) != 0) { err = threadpool_lock_failure; break; } } while(0); if(pthread_mutex_unlock(&pool->lock) != 0) { err = threadpool_lock_failure; } return err; } int threadpool_destroy(threadpool_t *pool, int flags) { int i, err = 0; if(pool == NULL) { return threadpool_invalid; } if(pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } do { /* Already shutting down */ if(pool->shutdown) { err = threadpool_shutdown; break; } pool->shutdown = (flags & threadpool_graceful) ? graceful_shutdown : immediate_shutdown; /* Wake up all worker threads */ if((pthread_cond_broadcast(&(pool->notify)) != 0) || (pthread_mutex_unlock(&(pool->lock)) != 0)) { err = threadpool_lock_failure; break; } /* Join all worker thread */ for(i = 0; i < pool->thread_count; i ) { if(pthread_join(pool->threads[i], NULL) != 0) { err = threadpool_thread_failure; } } } while(0); /* Only if everything went well do we deallocate the pool */ if(!err) { threadpool_free(pool); } return err; } int threadpool_free(threadpool_t *pool) { if(pool == NULL || pool->started > 0) { return -1; } /* Did we manage to allocate ? */ if(pool->threads) { free(pool->threads); free(pool->queue); /* Because we allocate pool->threads after initializing the mutex and condition variable, we're sure they're initialized. Let's lock the mutex just in case. */ pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_cond_destroy(&(pool->notify)); } free(pool); return 0; } static void *threadpool_thread(void *threadpool) { threadpool_t *pool = (threadpool_t *)threadpool; threadpool_task_t task; for(;;) { /* Lock must be taken to wait on conditional variable */ pthread_mutex_lock(&(pool->lock)); /* Wait on condition variable, check for spurious wakeups. When returning from pthread_cond_wait(), we own the lock. */ while((pool->count == 0) && (!pool->shutdown)) { pthread_cond_wait(&(pool->notify), &(pool->lock)); } if((pool->shutdown == immediate_shutdown) || ((pool->shutdown == graceful_shutdown) && (pool->count == 0))) { break; } /* Grab our task */ task.function = pool->queue[pool->head].function; task.argument = pool->queue[pool->head].argument; pool->head = (pool->head 1) % pool->queue_size; pool->count -= 1; /* Unlock */ pthread_mutex_unlock(&(pool->lock)); /* Get to work */ (*(task.function))(task.argument); } pool->started--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); return(NULL); }
代码片段和文件信息
/*
* Copyright (c) 2016 Mathias Brossard .
* All rights reserved.
*
* Redistribution and use in source and binary forms with or without
* modification are permitted provided that the following conditions are
* met:
*
* 1. Redistributions of source code must retain the above copyright
* notice this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* “AS IS“ AND ANY EXPRESS OR IMPLIED WARRANTIES INCLUDING BUT NOT
* LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT INDIRECT INCIDENTAL
* SPECIAL EXEMPLA
属性 大小 日期 时间 名称
----------- --------- ---------- ----- ----
目录 0 2016-10-04 18:11 threadpool-master\
文件 16 2016-10-04 18:11 threadpool-master\.gitignore
文件 808 2016-10-04 18:11 threadpool-master\.travis.yml
文件 1310 2016-10-04 18:11 threadpool-master\LICENSE
文件 937 2016-10-04 18:11 threadpool-master\Makefile
文件 896 2016-10-04 18:11 threadpool-master\README.md
文件 246 2016-10-04 18:11 threadpool-master\package.json
目录 0 2016-10-04 18:11 threadpool-master\src\
文件 8491 2016-10-04 18:11 threadpool-master\src\threadpool.c
文件 3368 2016-10-04 18:11 threadpool-master\src\threadpool.h
目录 0 2016-10-04 18:11 threadpool-master\tests\
文件 1459 2016-10-04 18:11 threadpool-master\tests\heavy.c
文件 1033 2016-10-04 18:11 threadpool-master\tests\shutdown.c
文件 964 2016-10-04 18:11 threadpool-master\tests\thrdtest.c
- 上一篇:计算电器所消耗的电能.cpp
- 下一篇:茶壶的光照观察组(c++代码)
评论
共有 条评论