|
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>
#include <time.h>
#include <unistd.h>
#include <curses.h>
#define MAX_SIZE 10
#define WORK_NUMBER 100
#define MAX_THREADS 5
int id_num = 0;
int id = 1;
int exitthread = 0;
typedef struct worker {
void *(*process)(void *arg);
void *arg;
int id;
struct worker *prev;
struct worker *next;
int data;
int flag ;
} CalThread_worker;
typedef struct {
pthread_mutex_t queue_lock;
pthread_cond_t queue_ready;
CalThread_worker *queue_head;
int shutdown;
pthread_t *threadid;
int max_thread_num;
int cur_queue_size;
int cal_counts;
int r[MAX_SIZE];
int addid[MAX_SIZE];
int subid[MAX_SIZE];
int addflag;
int subflag;
int start;
int sumdata;
} CalThread_pool;
int pool_add_worker(void *(*process)(void *arg), void *arg);
void *thread_routine(void *arg);
void *thread_moniter(void *arg);
//share resource
static CalThread_pool *pool = NULL;
void pool_init(int max_thread_num)
{
int i = 0;
pthread_t moniter_tid;
srand(clock());
pool = (CalThread_pool *) malloc(sizeof(CalThread_pool));
pthread_mutex_init(&(pool->queue_lock), NULL);
pthread_cond_init(&(pool->queue_ready), NULL);
pool->queue_head = NULL;
pool->max_thread_num = max_thread_num;
pool->cur_queue_size = 0;
pool->shutdown = 0;
pool->start = 0;
pool->cal_counts = 0;
//锟斤拷锟揭?拷锟斤拷锟侥诧拷锟斤拷
for (i = 0; i < MAX_SIZE; i++) {
pool->addid[i] = rand() % 100;
pool->subid[i] = rand() % 100;
pool->r[i] = rand() & 1000;
}
pool->subflag = 1;
printf("%d,%d\n", pool->addid, pool->subid);
//}
pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t));
for (i = 0; i < max_thread_num; i++) {
pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL);
}
pthread_create (&moniter_tid, NULL, thread_moniter,NULL);
}
int pool_add_worker(void *(*process)(void *arg), void *arg)
{
CalThread_worker *newworker = (CalThread_worker *) malloc(
sizeof(CalThread_worker));
newworker->process = process;
newworker->arg = arg;
newworker->next = NULL;
pthread_mutex_lock(&(pool->queue_lock));
CalThread_worker *member = pool->queue_head;
if (member != NULL) {
while (member->next != NULL)
member = member->next;
member->next = newworker;
newworker->prev = member;
} else {
pool->queue_head = newworker;
newworker->prev = NULL;
}
newworker->id = id_num++;
newworker->data = 1000;
//assert (pool->queue_head != NULL);
pool->cur_queue_size++;
pthread_mutex_unlock(&(pool->queue_lock));
return 0;
}
int pool_destroy()
{
if (pool->shutdown)
return -1;
pool->start = 0;
pool->cal_counts = 0;
pool->shutdown = 1;
pthread_cond_broadcast(&(pool->queue_ready));
int i;
for (i = 0; i < pool->max_thread_num; i++)
pthread_join(pool->threadid[i], NULL);
free(pool->threadid);
CalThread_worker *head = NULL;
while (pool->queue_head != NULL) {
head = pool->queue_head;
pool->queue_head = pool->queue_head->next;
free(head);
}
pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_ready));
free(pool);
pool = NULL;
printf("pool_destroy\n");
return 0;
}
void * thread_routine(void *arg)
{
//printf ("starting thread 0x%x\n", pthread_self ());
while (1) {
pthread_mutex_lock(&(pool->queue_lock));
while (pool->cal_counts == 0 && !pool->shutdown) {
printf("thread 0x%x is waiting\n", pthread_self());
pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
}
if (pool->shutdown) {
pthread_mutex_unlock(&(pool->queue_lock));
printf("thread 0x%x will exit\n", pthread_self());
pthread_exit(NULL);
}
printf("thread 0x%x is starting to work\n", pthread_self());
//assert (pool->queue_head != NULL);
//pool->cur_queue_size--;
usleep(1000*500);
CalThread_worker *worker = NULL;
if (pool->start == 1) {
worker = pool->queue_head;
if (pool->subflag) {
do {
if (worker->id == pool->subid[pool->cal_counts-1])
break;
worker = worker->next;
} while (worker != NULL);
worker->flag = 1;
pool->addflag = 1;
pool->subflag = 0;
} else if (pool->addflag) {
do {
if (worker->id == pool->addid[pool->cal_counts-1])
break;
worker = worker->next;
} while (worker != NULL);
if (worker != NULL) {
pool->addflag = 0;
pool->subflag = 1;
worker->flag = 2;
}
}
}
pthread_mutex_unlock(&(pool->queue_lock));
if (worker != NULL) {
(*(worker->process))(worker->arg);
}
}
pthread_exit (NULL);
return 0;
}
void * thread_moniter(void *arg){
while (1) {
char ch = getchar();
if (ch == 'e' || ch == 'E')break;
usleep(1000*500);
}
exitthread = 1;
pthread_exit (NULL);
return NULL;
}
void * calprocess(void *arg)
{
int i ;
//printf("threadid is 0x%x, working on task %d\n", pthread_self(), *(int *) arg);
CalThread_worker *worker = NULL;
worker = pool->queue_head;
int orgdata = 0;
do {
if (worker->id == *(int *) arg)
break;
worker = worker->next;
} while (worker != NULL);
if (worker == NULL)return NULL;
if (worker->flag == 1) {
orgdata = worker->data ;
worker->data -= pool->r[pool->cal_counts -1];
pool->sumdata = worker->data;
printf ("the %d calcuate \n",id++);
printf ("threadid :0x%x, sub-data:%d\n",pthread_self(),worker->data );
worker->data = orgdata;
}
if (worker->flag == 2) {
orgdata = worker->data;
worker->data += pool->r[pool->cal_counts -1];
pool->cal_counts--;
if (pool->cal_counts == 0){
pool->start = 0;
for (i = 0; i < MAX_SIZE; i++) {
pool->addid[i] = rand() % 100;
pool->subid[i] = rand() % 100;
pool->r[i] = rand() & 1000;
}
pool->start = 1;
pool->cal_counts = MAX_SIZE;
}
pool->sumdata += worker->data;
printf ("threadid :0x%x, add-data:%d\n",pthread_self(),worker->data );
printf ("threadid :0x%x, sum-data:%d\n",pthread_self(),pool->sumdata);
worker->data = orgdata;
}
worker->flag = 0;
return NULL;
}
int main(int argc, char **argv)
{
int i = 0;
pool_init(MAX_THREADS);
pool->subflag = 1;
pool->start = 1;
int *workingnum = (int *) malloc(sizeof(int) * WORK_NUMBER);
for (i = 0; i < WORK_NUMBER; i++) {
workingnum[i] = i;
pool_add_worker(calprocess, &workingnum[i]);
}
pool->cal_counts = MAX_SIZE;
printf("counts:%d\n", pool->cal_counts);
pthread_cond_broadcast(&(pool->queue_ready));
while (!exitthread)
sleep(2);
pool_destroy();
free(workingnum);
printf("main end\n");
return 0;
|
|