linux线程池(zt)
http://blog.csdn.net/phus/archive/2005/06/09/390745.aspx
%A thrmgr.h文件
%A
%A /*
%A * Copyright (C) 2004 Trog
%A *
%A * This program is free software; you can redistribute it and/or modify
%A * it under the terms of the GNU General Public License as published by
%A * the Free Software Foundation; either version 2 of the License, or
%A * (at your option) any later version.
%A *
%A * This program is distributed in the hope that it will be useful,
%A * but WITHOUT ANY WARRANTY; without even the implied warranty of
%A * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
%A * GNU General Public License for more details.
%A *
%A * You should have received a copy of the GNU General Public License
%A * along with this program; if not, write to the Free Software
%A * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
%A */
%A
%A #ifndef __THRMGR_H__
%A #define __THRMGR_H__
%A
%A #include
%A #include
%A
%A typedef struct work_item_tag {
%A struct work_item_tag *next;
%A void *data;
%A struct timeval time_queued;
%A } work_item_t;
%A
%A typedef struct work_queue_tag {
%A work_item_t *head;
%A work_item_t *tail;
%A int item_count;
%A } work_queue_t;
%A
%A typedef enum {
%A POOL_INVALID,
%A POOL_VALID,
%A POOL_EXIT,
%A } pool_state_t;
%A
%A typedef struct threadpool_tag {
%A pthread_mutex_t pool_mutex;
%A pthread_cond_t pool_cond;
%A pthread_attr_t pool_attr;
%A
%A pool_state_t state;
%A int thr_max;
%A int thr_alive;
%A int thr_idle;
%A int idle_timeout;
%A
%A void (*handler)(void *);
%A
%A work_queue_t *queue;
%A } threadpool_t;
%A
%A threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *));
%A void thrmgr_destroy(threadpool_t *threadpool);
%A int thrmgr_dispatch(threadpool_t *threadpool, void *user_data);
%A
%A #endif
%A
%A
%A thrmgr.c文件
%A
%A /*
%A * Copyright (C) 2004 Trog
%A *
%A * This program is free software; you can redistribute it and/or modify
%A * it under the terms of the GNU General Public License as published by
%A * the Free Software Foundation; either version 2 of the License, or
%A * (at your option) any later version.
%A *
%A * This program is distributed in the hope that it will be useful,
%A * but WITHOUT ANY WARRANTY; without even the implied warranty of
%A * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
%A * GNU General Public License for more details.
%A *
%A * You should have received a copy of the GNU General Public License
%A * along with this program; if not, write to the Free Software
%A * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
%A */
%A
%A #include
%A #include
%A #include
%A
%A #include "thrmgr.h"
%A
%A #include "others.h"
%A #include "memory.h"
%A #include "output.h"
%A
%A #define FALSE (0)
%A #define TRUE (1)
%A
%A work_queue_t *work_queue_new()
%A {
%A work_queue_t *work_q;
%A
%A work_q = (work_queue_t *) mmalloc(sizeof(work_queue_t));
%A
%A work_q->head = work_q->tail = NULL;
%A work_q->item_count = 0;
%A return work_q;
%A }
%A
%A void work_queue_add(work_queue_t *work_q, void *data)
%A {
%A work_item_t *work_item;
%A
%A if (!work_q) {
%A return;
%A }
%A work_item = (work_item_t *) mmalloc(sizeof(work_item_t));
%A work_item->next = NULL;
%A work_item->data = data;
%A gettimeofday(&(work_item->time_queued), NULL);
%A
%A if (work_q->head == NULL) {
%A work_q->head = work_q->tail = work_item;
%A work_q->item_count = 1;
%A } else {
%A work_q->tail->next = work_item;
%A work_q->tail = work_item;
%A work_q->item_count++;
%A }
%A return;
%A }
%A
%A void *work_queue_pop(work_queue_t *work_q)
%A {
%A work_item_t *work_item;
%A void *data;
%A
%A if (!work_q || !work_q->head) {
%A return NULL;
%A }
%A work_item = work_q->head;
%A data = work_item->data;
%A work_q->head = work_item->next;
%A if (work_q->head == NULL) {
%A work_q->tail = NULL;
%A }
%A free(work_item);
%A return data;
%A }
%A
%A void thrmgr_destroy(threadpool_t *threadpool)
%A {
%A if (!threadpool || (threadpool->state != POOL_VALID)) {
%A return;
%A }
%A if (pthread_mutex_lock(&threadpool->pool_mutex) != 0) {
%A logg("!Mutex lock failed\n");
%A exit(-1);
%A }
%A threadpool->state = POOL_EXIT;
%A
%A /* wait for threads to exit */
%A if (threadpool->thr_alive > 0) {
%A
%A /*通知兄弟们收工*/
%A if (pthread_cond_broadcast(&(threadpool->pool_cond)) != 0) {
%A pthread_mutex_unlock(&threadpool->pool_mutex);
%A return;
%A }
%A }
%A while (threadpool->thr_alive > 0) {
%A
%A /*原来是这位老兄负责等最后一名兄弟的信号啊*/
%A if (pthread_cond_wait (&threadpool->pool_cond, &threadpool->pool_mutex) != 0) {
%A pthread_mutex_unlock(&threadpool->pool_mutex);
%A return;
%A }
%A }
%A if (pthread_mutex_unlock(&threadpool->pool_mutex) != 0) {
%A logg("!Mutex unlock failed\n");
%A exit(-1);
%A }
%A
%A pthread_mutex_destroy(&(threadpool->pool_mutex));
%A pthread_cond_destroy(&(threadpool->pool_cond));
%A pthread_attr_destroy(&(threadpool->pool_attr));
%A free(threadpool);
%A return;
%A }
%A
%A threadpool_t *thrmgr_new(int max_threads, int idle_timeout, void (*handler)(void *))
%A {
%A threadpool_t *threadpool;
%A
%A if (max_threads <= 0) {
%A return NULL;
%A }
%A
%A threadpool = (threadpool_t *) mmalloc(sizeof(threadpool_t));
%A
%A threadpool->queue = work_queue_new();
%A if (!threadpool->queue) {
%A free(threadpool);
%A return NULL;
%A }
%A threadpool->thr_max = max_threads;
%A threadpool->thr_alive = 0;
%A threadpool->thr_idle = 0;
%A threadpool->idle_timeout = idle_timeout;
%A threadpool->handler = handler;
%A
%A pthread_mutex_init(&(threadpool->pool_mutex), NULL);
%A if (pthread_cond_init(&(threadpool->pool_cond), NULL) != 0) {
%A free(threadpool);
%A return NULL;
%A }
%A
%A if (pthread_attr_init(&(threadpool->pool_attr)) != 0) {
%A free(threadpool);
%A return NULL;
%A }
%A
%A if (pthread_attr_setdetachstate(&(threadpool->pool_attr), PTHREAD_CREATE_DETACHED) != 0) {
%A free(threadpool);
%A return NULL;
%A }
%A threadpool->state = POOL_VALID;
%A
%A return threadpool;
%A }
%A
%A /*工作线程.该工作线程遍历工作链表,如果有活干就干,没活干就等活干,难怪叫民工
%A
%A */
%A
%A void *thrmgr_worker(void *arg)
%A {
%A threadpool_t *threadpool = (threadpool_t *) arg;
%A void *job_data;
%A int retval, must_exit = FALSE;
%A struct timespec timeout;
%A
%A /* loop looking for work */
%A for (;;) {
%A if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
%A /* Fatal error */
%A logg("!Fatal: mutex lock failed\n");
%A exit(-2);
%A }
%A timeout.tv_sec = time(NULL) + threadpool->idle_timeout;
%A timeout.tv_nsec = 0;
%A threadpool->thr_idle++;
%A while (((job_data=work_queue_pop(threadpool->queue)) == NULL)
%A && (threadpool->state != POOL_EXIT)) {
%A /* Sleep, awaiting wakeup ,注意,民工等一段时间,如果没有活干就结束该线程*/
%A retval = pthread_cond_timedwait(&(threadpool->pool_cond),
%A &(threadpool->pool_mutex), &timeout);
%A if (retval == ETIMEDOUT) {
%A must_exit = TRUE;
%A break;
%A }
%A }
%A threadpool->thr_idle--;//要干活了,闲着的民工少了一位
%A if (threadpool->state == POOL_EXIT) {
%A must_exit = TRUE;
%A }
%A
%A if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
%A /* Fatal error */
%A logg("!Fatal: mutex unlock failed\n");
%A exit(-2);
%A }
%A if (job_data) {
%A threadpool->handler(job_data);
%A } else if (must_exit) {
%A
%A /*如果没有等到活或者要结束整个线程池时,该线程收工*/
%A break;
%A }
%A }
%A if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
%A /* Fatal error */
%A logg("!Fatal: mutex lock failed\n");
%A exit(-2);
%A }
%A threadpool->thr_alive--;//活干完了,该走人了(人又少了一个)
%A if (threadpool->thr_alive == 0) {
%A /* signal that all threads are finished */
%A pthread_cond_broadcast(&threadpool->pool_cond);//人都跑光了,谁还听得到这个信号?多次一举吗?
%A }
%A if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
%A /* Fatal error */
%A logg("!Fatal: mutex unlock failed\n");
%A exit(-2);
%A }
%A return NULL;
%A }
%A
%A /*创建一个工作线程,如果目前有等待条件信号的工作线程,则唤醒该工作线程处理数据
%A
%A */
%A
%A int thrmgr_dispatch(threadpool_t *threadpool, void *user_data)
%A {
%A pthread_t thr_id;
%A
%A if (!threadpool) {
%A return FALSE;
%A }
%A
%A /* Lock the threadpool */
%A if (pthread_mutex_lock(&(threadpool->pool_mutex)) != 0) {
%A logg("!Mutex lock failed\n");
%A return FALSE;
%A }
%A
%A if (threadpool->state != POOL_VALID) {
%A if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
%A logg("!Mutex unlock failed\n");
%A return FALSE;
%A }
%A return FALSE;
%A }
%A work_queue_add(threadpool->queue, user_data);
%A
%A /*只有当目前没有线程idle且目前生成的线程数小于最大线程要求时
%A
%A *创建新的线程
%A
%A */
%A
%A if ((threadpool->thr_idle == 0) &&
%A (threadpool->thr_alive < threadpool->thr_max)) {
%A /* Start a new thread */
%A if (pthread_create(&thr_id, &(threadpool->pool_attr),
%A thrmgr_worker, threadpool) != 0) {
%A logg("!pthread_create failed\n");
%A } else {
%A threadpool->thr_alive++;
%A }
%A }
%A
%A /*释放条件信号,如果有正在等待该信号的线程,则该线程运行*/
%A pthread_cond_signal(&(threadpool->pool_cond));
%A
%A if (pthread_mutex_unlock(&(threadpool->pool_mutex)) != 0) {
%A logg("!Mutex unlock failed\n");
%A return FALSE;
%A }
%A return TRUE;
%A }
%A
%A /*
%A 使用方法,以一个tcp服务器为例,简单列出,可能有问题,请牛人指正.
%A
%A 1, thrmgr_new初始话
%A
%A 2, while(1)
%A
%A {
%A
%A accept(......);
%A
%A //构建输入参数
%A
%A thrmgr_dispach(...);
%A
%A }
%A
%A thrmgr_destory(...);
%A
%A */
%A
%A
%A Posted by rui at August 21, 2005 0
%A
%A%A
%A
*博客内容为网友个人发布,仅代表博主个人观点,如有侵权请联系工作人员删除。