00001 #include "thread.h"
00002 #include "common.h"
00003 #include "log.h"
00004
00010 #include <stdlib.h>
00011 #include <time.h>
00012 #include <errno.h>
00013
00014 #ifdef _WINDOWS
00015 #define __WINDOWS_WINNT 0x0400
00016 #include <Windows.h>
00017 #endif
00018
00020 static int cp_thread_pool_set_available(cp_thread_pool *owner, cp_pooled_thread *pt);
00021
00022 long cp_pooled_thread_get_id(cp_pooled_thread *pt)
00023 {
00024 long id = 0;
00025 #ifdef CP_HAS_PTHREAD_GETUNIQUE_NP
00026 pthread_getunique_np(*pt->worker, &id);
00027 #else
00028 id = (long) *pt->worker;
00029 #endif
00030
00031 return id;
00032 }
00033
00034 cp_pooled_thread *cp_pooled_thread_create(cp_thread_pool *owner)
00035 {
00036 int rc;
00037 cp_pooled_thread *pt = calloc(1, sizeof(cp_pooled_thread));
00038
00039 if (pt == NULL)
00040 {
00041 cp_error(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate pooled thread");
00042 errno = ENOMEM;
00043 return NULL;
00044 }
00045 pt->worker = calloc(1, sizeof(cp_thread));
00046 if (pt->worker == NULL)
00047 {
00048 cp_error(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread");
00049 errno = ENOMEM;
00050 return NULL;
00051 }
00052
00053 pt->owner = owner;
00054
00055 pt->suspend_lock = (cp_mutex *) malloc(sizeof(cp_mutex));
00056 if (pt->suspend_lock == NULL)
00057 goto THREAD_CREATE_CANCEL;
00058 if ((rc = cp_mutex_init(pt->suspend_lock, NULL)))
00059 {
00060 cp_error(rc, "starting up pooled thread");
00061 goto THREAD_CREATE_CANCEL;
00062 }
00063
00064 pt->suspend_cond = (cp_cond *) malloc(sizeof(cp_cond));
00065 if ((rc = cp_cond_init(pt->suspend_cond, NULL)))
00066 {
00067 cp_error(rc, "starting up pooled thread");
00068 cp_mutex_destroy(pt->suspend_lock);
00069 free(pt->suspend_lock);
00070 goto THREAD_CREATE_CANCEL;
00071 }
00072
00073 pt->done = 0;
00074 pt->wait = 1;
00075
00076 cp_thread_create(*pt->worker, NULL, cp_pooled_thread_run, pt);
00077
00078 pt->id = cp_pooled_thread_get_id(pt);
00079 cp_thread_detach(*pt->worker);
00080
00081 return pt;
00082
00083 THREAD_CREATE_CANCEL:
00084 free(pt->worker);
00085 free(pt);
00086
00087 return NULL;
00088 }
00089
00090 int cp_pooled_thread_stop(cp_pooled_thread *pt)
00091 {
00092 int rc = 0;
00093
00094 cp_mutex_lock(pt->suspend_lock);
00095 pt->action = NULL;
00096 if (pt->stop_fn)
00097 {
00098 rc = pt->stop_prm ? (*pt->stop_fn)(pt->stop_prm) :
00099 (*pt->stop_fn)(pt->action_prm);
00100 }
00101
00102 pt->done = 1;
00103 pt->wait = 0;
00104
00105 cp_cond_signal(pt->suspend_cond);
00106 cp_mutex_unlock(pt->suspend_lock);
00107
00108
00109
00110 return rc;
00111 }
00112
00113 void cp_pooled_thread_destroy(cp_pooled_thread *t)
00114 {
00115 #ifdef __TRACE__
00116 DEBUGMSG("destroying cp_pooled_thread %lX", t);
00117 #endif
00118 cp_mutex_destroy(t->suspend_lock);
00119 free(t->suspend_lock);
00120 cp_cond_destroy(t->suspend_cond);
00121 free(t->suspend_cond);
00122 free(t->worker);
00123 free(t);
00124 }
00125
00126 int cp_pooled_thread_release(cp_pooled_thread *t)
00127 {
00128 return 0;
00129 }
00130
00131 int cp_pooled_thread_run_task(cp_pooled_thread *pt,
00132 cp_thread_action action,
00133 void *prm)
00134 {
00135 #ifdef __TRACE__
00136 DEBUGMSG("cp_pooled_thread_run_task: action %lx, prm %lx\n",
00137 (long) action, (long) prm);
00138 #endif
00139
00140 pt->action = action;
00141 pt->action_prm = prm;
00142
00143 if (action == NULL)
00144 {
00145 cp_error(CP_INVALID_FUNCTION_POINTER, "missing thread function");
00146 return CP_INVALID_FUNCTION_POINTER;
00147 }
00148
00149
00150 cp_mutex_lock(pt->suspend_lock);
00151 pt->wait = 0;
00152 cp_cond_signal(pt->suspend_cond);
00153 cp_mutex_unlock(pt->suspend_lock);
00154
00155 return 0;
00156 }
00157
00158 int cp_pooled_thread_run_stoppable_task(cp_pooled_thread *pt,
00159 cp_thread_action action,
00160 void *action_prm,
00161 cp_thread_stop_fn stop_fn,
00162 void *stop_prm)
00163 {
00164 pt->stop_fn = stop_fn;
00165 pt->stop_prm = stop_prm;
00166 return cp_pooled_thread_run_task(pt, action, action_prm);
00167 }
00168
00169 void *cp_pooled_thread_run(void *prm)
00170 {
00171 cp_pooled_thread *pt = (cp_pooled_thread *) prm;
00172
00173 #ifdef __TRACE__
00174 DEBUGMSG("cp_pooled_thread (%lx) starts", (long) pt);
00175 #endif
00176
00177 while (!pt->done && pt->owner->running)
00178 {
00179 cp_mutex_lock(pt->suspend_lock);
00180 while (pt->wait && (!pt->done) && pt->owner->running)
00181 cp_cond_wait(pt->suspend_cond, pt->suspend_lock);
00182 cp_mutex_unlock(pt->suspend_lock);
00183
00184 if (pt->done || !pt->owner->running) break;
00185 #ifdef __TRACE__
00186 DEBUGMSG("cp_pooled_thread_run: action is %lX, action_prm is %lX", pt->action, pt->action_prm);
00187 #endif
00188 if (pt->action)
00189 {
00190 #ifdef __TRACE__
00191 DEBUGMSG("pooled thread (%lX) handles action (%lX)", (long) pt, (long) pt->action);
00192 #endif
00193 (*pt->action)(pt->action_prm);
00194 }
00195 if (pt->done || !pt->owner->running) break;
00196
00197 pt->wait = 1;
00198
00199 cp_thread_pool_set_available(pt->owner, pt);
00200 }
00201
00202 #ifdef __TRACE__
00203 DEBUGMSG("cp_pooled_thread (%lx) exits", (long) pt);
00204 #endif
00205
00206 cp_pooled_thread_destroy(pt);
00207
00208 return NULL;
00209 }
00210
00211 static int cp_thread_pool_set_available(cp_thread_pool *pool,
00212 cp_pooled_thread *thread)
00213 {
00214 cp_mutex_lock(pool->pool_lock);
00215 cp_hashlist_remove(pool->in_use, &thread->id);
00216 cp_list_append(pool->free_pool, thread);
00217
00218 cp_cond_signal(pool->pool_cond);
00219 cp_mutex_unlock(pool->pool_lock);
00220
00221 return 0;
00222 }
00223
00224
00225 int cp_thread_pool_wait(cp_thread_pool *pool)
00226 {
00227 while (cp_hashlist_item_count(pool->in_use) && pool->running)
00228 {
00229 cp_mutex_lock(pool->pool_lock);
00230 cp_cond_wait(pool->pool_cond, pool->pool_lock);
00231
00232 if (pool->running &&
00233 cp_hashlist_item_count(pool->in_use))
00234 cp_cond_signal(pool->pool_cond);
00235 cp_mutex_unlock(pool->pool_lock);
00236 }
00237
00238 return 0;
00239 }
00240
00241 int cp_thread_pool_stop(cp_thread_pool *pool)
00242 {
00243 cp_hashlist_iterator *i;
00244 cp_list_iterator *j;
00245 cp_pooled_thread *pt;
00246
00247 pool->running = 0;
00248
00249 i = cp_hashlist_create_iterator(pool->in_use, COLLECTION_LOCK_READ);
00250 while ((pt = (cp_pooled_thread *) cp_hashlist_iterator_next_value(i)))
00251 cp_pooled_thread_stop(pt);
00252 cp_hashlist_iterator_destroy(i);
00253
00254 j = cp_list_create_iterator(pool->free_pool, COLLECTION_LOCK_READ);
00255 while ((pt = (cp_pooled_thread *) cp_list_iterator_next(j)))
00256 cp_pooled_thread_stop(pt);
00257 cp_list_iterator_destroy(j);
00258
00259 return 0;
00260 }
00261
00262 cp_thread_pool *cp_thread_pool_create(int min_size, int max_size)
00263 {
00264 int rc;
00265 cp_thread_pool *pool = calloc(1, sizeof(cp_thread_pool));
00266 if (pool == NULL)
00267 cp_fatal(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread pool structure");
00268
00269 pool->min_size = min_size;
00270 pool->max_size = max_size;
00271
00272 pool->running = 1;
00273
00274 pool->free_pool = cp_list_create();
00275 if (pool->free_pool == NULL)
00276 cp_fatal(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread pool list");
00277
00278 pool->in_use = cp_hashlist_create(10, cp_hash_long, cp_hash_compare_long);
00279 if (pool->in_use == NULL)
00280 cp_fatal(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread pool running list");
00281
00282 pool->pool_lock = (cp_mutex *) malloc(sizeof(cp_mutex));
00283 if (pool->pool_lock == NULL)
00284 {
00285 cp_error(CP_MEMORY_ALLOCATION_FAILURE, "can\'t create mutex");
00286 goto THREAD_POOL_CREATE_CANCEL;
00287 }
00288 if ((rc = cp_mutex_init(pool->pool_lock, NULL)))
00289 {
00290 cp_error(rc, "can\'t create mutex");
00291 goto THREAD_POOL_CREATE_CANCEL;
00292 }
00293
00294 pool->pool_cond = (cp_cond *) malloc(sizeof(cp_cond));
00295 if (pool->pool_cond == NULL)
00296 {
00297 cp_error(rc, "can\'t create condition variable");
00298 cp_mutex_destroy(pool->pool_lock);
00299 free(pool->pool_lock);
00300 goto THREAD_POOL_CREATE_CANCEL;
00301 }
00302 if ((rc = cp_cond_init(pool->pool_cond, NULL)))
00303 {
00304 cp_error(rc, "can\'t create condition variable");
00305 free(pool->pool_cond);
00306 cp_mutex_destroy(pool->pool_lock);
00307 free(pool->pool_lock);
00308 goto THREAD_POOL_CREATE_CANCEL;
00309 }
00310
00311 for ( ; pool->size < pool->min_size; pool->size++)
00312 {
00313 cp_pooled_thread *pt = cp_pooled_thread_create(pool);
00314 if (pt == NULL)
00315 cp_fatal(CP_THREAD_CREATION_FAILURE, "can\'t create thread pool (created %d threads, minimum pool size is %d", pool->size, pool->min_size);
00316 cp_list_append(pool->free_pool, pt);
00317 }
00318
00319 return pool;
00320
00321 THREAD_POOL_CREATE_CANCEL:
00322 cp_list_destroy_custom(pool->free_pool,
00323 (cp_destructor_fn) cp_pooled_thread_destroy);
00324 cp_hashlist_destroy_custom(pool->in_use, NULL,
00325 (cp_destructor_fn) cp_pooled_thread_destroy);
00326 free(pool);
00327 return NULL;
00328 }
00329
00330 cp_thread *cp_thread_pool_get_impl(cp_thread_pool *pool,
00331 cp_thread_action action,
00332 void *action_prm,
00333 cp_thread_stop_fn stop_fn,
00334 void *stop_prm,
00335 int block)
00336 {
00337 cp_pooled_thread *pt = NULL;
00338 cp_mutex_lock(pool->pool_lock);
00339
00340 #ifdef __TRACE__
00341 DEBUGMSG("cp_thread_pool_get_impl (%d) pool size = %d max size = %d\n", block, pool->size, pool->max_size);
00342 #endif
00343
00344 pt = cp_list_remove_head(pool->free_pool);
00345 if (pt == NULL)
00346 {
00347 if (pool->size < pool->max_size)
00348 {
00349 pt = cp_pooled_thread_create(pool);
00350 if (pt)
00351 pool->size++;
00352 }
00353
00354 if (pt == NULL)
00355 {
00356 if (!block)
00357 {
00358 cp_mutex_unlock(pool->pool_lock);
00359 return NULL;
00360 }
00361
00362
00363 #ifdef _WINDOWS
00364 cp_mutex_unlock(pool->pool_lock);
00365 #endif
00366 while (pool->running && cp_list_is_empty(pool->free_pool))
00367 cp_cond_wait(pool->pool_cond, pool->pool_lock);
00368
00369 if (pool->running)
00370 pt = cp_list_remove_head(pool->free_pool);
00371
00372 if (pt == NULL)
00373 {
00374 cp_mutex_unlock(pool->pool_lock);
00375 return NULL;
00376 }
00377 }
00378 }
00379
00380 cp_hashlist_append(pool->in_use, &pt->id, pt);
00381 cp_mutex_unlock(pool->pool_lock);
00382
00383 cp_pooled_thread_run_stoppable_task(pt, action, action_prm, stop_fn, stop_prm);
00384
00385 return pt->worker;
00386 }
00387
00388 cp_thread *cp_thread_pool_get(cp_thread_pool *pool,
00389 cp_thread_action action,
00390 void *prm)
00391 {
00392 return cp_thread_pool_get_impl(pool, action, prm, NULL, NULL, 1);
00393 }
00394
00395 cp_thread *cp_thread_pool_get_stoppable(cp_thread_pool *pool,
00396 cp_thread_action action,
00397 void *action_prm,
00398 cp_thread_stop_fn stop_fn,
00399 void *stop_prm)
00400 {
00401 return cp_thread_pool_get_impl(pool, action, action_prm, stop_fn, stop_prm, 1);
00402 }
00403
00404 cp_thread *cp_thread_pool_get_nb(cp_thread_pool *pool,
00405 cp_thread_action action,
00406 void *prm)
00407 {
00408 return cp_thread_pool_get_impl(pool, action, prm, NULL, NULL, 0);
00409 }
00410
00411 cp_thread *cp_thread_pool_get_stoppable_nb(cp_thread_pool *pool,
00412 cp_thread_action action,
00413 void *action_prm,
00414 cp_thread_stop_fn stop_fn,
00415 void *stop_prm)
00416 {
00417 return cp_thread_pool_get_impl(pool, action, action_prm, stop_fn, stop_prm, 0);
00418 }
00419
00420 void cp_thread_pool_destroy(cp_thread_pool *pool)
00421 {
00422 #ifdef __TRACE__
00423 DEBUGMSG("stopping cp_thread_pool %lX", pool);
00424 #endif
00425
00426 if (pool->running) cp_thread_pool_stop(pool);
00427
00428 cp_list_destroy(pool->free_pool);
00429
00430
00431 cp_hashlist_destroy(pool->in_use);
00432
00433
00434 cp_mutex_destroy(pool->pool_lock);
00435 free(pool->pool_lock);
00436 cp_cond_destroy(pool->pool_cond);
00437 free(pool->pool_cond);
00438
00439 free(pool);
00440 }
00441
00442 int cp_thread_pool_count_available(cp_thread_pool *pool)
00443 {
00444 return (pool->max_size - pool->size) +
00445 cp_list_item_count(pool->free_pool);
00446 }
00447
00448
00449
00450
00451
00452
00453
00454
00455 cp_pooled_thread_client_interface *
00456 cp_pooled_thread_client_interface_create
00457 (cp_pooled_thread_scheduler *owner,
00458 void *client,
00459 int min_threads,
00460 int max_threads,
00461 cp_pooled_thread_report_load report_load,
00462 cp_pooled_thread_shrink shrink,
00463 cp_thread_action action,
00464 void *action_prm,
00465 cp_thread_stop_fn stop_fn,
00466 void *stop_prm)
00467 {
00468 cp_pooled_thread_client_interface *ci =
00469 calloc(1, sizeof(cp_pooled_thread_client_interface));
00470 if (client == NULL)
00471 cp_fatal(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread pool client interface");
00472
00473 ci->owner = owner;
00474 ci->client = client;
00475 ci->min = min_threads;
00476 ci->max = max_threads;
00477 ci->report_load = report_load;
00478 ci->shrink = shrink;
00479 ci->action = action;
00480 ci->action_prm = action_prm;
00481 ci->stop_fn = stop_fn;
00482 ci->stop_prm = stop_prm;
00483
00484 cp_pooled_thread_scheduler_register_client(owner, ci);
00485
00486 return ci;
00487 }
00488
00489
00490 void cp_pooled_thread_client_interface_destroy
00491 (cp_pooled_thread_client_interface *client)
00492 {
00493 free(client);
00494 }
00495
00496 cp_pooled_thread_scheduler *cp_pooled_thread_scheduler_create(cp_thread_pool *pool)
00497 {
00498 cp_pooled_thread_scheduler *scheduler =
00499 calloc(1, sizeof(cp_pooled_thread_scheduler));
00500 if (scheduler == NULL)
00501 cp_fatal(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread manager");
00502
00503 scheduler->pool = pool;
00504 scheduler->client_list = cp_vector_create(20);
00505
00506 #ifdef CP_HAS_SRANDOM
00507 srandom(time(NULL));
00508 #else
00509 srand(time(NULL));
00510 #endif
00511
00512 return scheduler;
00513 }
00514
00515 void cp_pooled_thread_scheduler_destroy(cp_pooled_thread_scheduler *scheduler)
00516 {
00517 cp_vector_destroy(scheduler->client_list);
00518 free(scheduler);
00519 }
00520
00521 void cp_pooled_thread_scheduler_register_client
00522 (cp_pooled_thread_scheduler *scheduler,
00523 cp_pooled_thread_client_interface *client)
00524 {
00525 cp_vector_add_element(scheduler->client_list, client);
00526 }
00527
00528 cp_pooled_thread_client_interface *
00529 choose_random_client(cp_pooled_thread_scheduler *scheduler)
00530 {
00531 int index =
00532 #ifdef CP_HAS_RANDOM
00533 random()
00534 #else
00535 rand()
00536 #endif
00537 % (cp_vector_size(scheduler->client_list));
00538 return (cp_pooled_thread_client_interface *)
00539 cp_vector_element_at(scheduler->client_list, index);
00540 }
00541
00542 #define SCHEDULER_THRESHOLD 1
00543
00544 void cp_pooled_thread_client_get_nb(cp_pooled_thread_client_interface *c)
00545 {
00546 if (cp_thread_pool_get_nb(c->owner->pool, c->action, c->action_prm))
00547 c->count++;
00548 }
00549
00550 void cp_pooled_thread_client_get(cp_pooled_thread_client_interface *c)
00551 {
00552 c->count++;
00553 cp_thread_pool_get(c->owner->pool, c->action, c->action_prm);
00554 }
00555
00556 void cp_pooled_thread_client_get_stoppable_nb(cp_pooled_thread_client_interface *c)
00557 {
00558 if (cp_thread_pool_get_stoppable_nb(c->owner->pool, c->action, c->action_prm, c->stop_fn, c->stop_prm))
00559 c->count++;
00560 }
00561
00562 void cp_pooled_thread_client_get_stoppable(cp_pooled_thread_client_interface *c)
00563 {
00564 c->count++;
00565 cp_thread_pool_get_stoppable(c->owner->pool, c->action, c->action_prm, c->stop_fn, c->stop_prm);
00566 }
00567
00568 #if 0 //~~ for future optimization
00569 void cp_pooled_thread_client_negociate(cp_pooled_thread_client_interface *c)
00570 {
00571 int curr_load = (*c->report_load)(c);
00572 cp_pooled_thread_scheduler *scheduler = c->owner;
00573 Vector *clients = scheduler->client_list;
00574 int i, min, imin, max, imax, cval, max_count;
00575 cp_pooled_thread_client_interface *other;
00576
00577 int clen = cp_vector_size(clients);
00578
00579 if (clen == 0) return;
00580
00581 min = INT_MAX;
00582 max = -1;
00583 max_count = -1;
00584
00585 for (i = 0; i < clen; i++)
00586 {
00587 other = (cp_pooled_thread_client_interface *) cp_vector_element_at(clients, i);
00588 cval = (*other->report_load)(other);
00589
00590 if (other->count < other->min)
00591 cp_thread_pool_get_nb(other->owner->pool, other->action, other->prm);
00592
00593 DEBUGMSG("negociate: pool %d: load = %d, %d <= %d <= %d", i, cval, other->min, other->count, other->max);
00594 if (cval > max) { max = cval; imax = i; }
00595 if (cval < min || (cval == min && other->count > max_count)) { min = cval; imin = i; max_count = other->count; }
00596 }
00597 DEBUGMSG("negociate: min = %d, max = %d", min, max);
00598 if (abs(max - min) > SCHEDULER_THRESHOLD)
00599 {
00600 cp_pooled_thread_client_interface *maxc = cp_vector_element_at(clients, imax);
00601 cp_pooled_thread_client_interface *minc = cp_vector_element_at(clients, imin);
00602
00603 if (cp_thread_pool_count_available(scheduler->pool) == 0 &&
00604 minc->count > minc->min)
00605 {
00606 DEBUGMSG("negociate: shrinking min pool (%d)", imin);
00607 (*minc->shrink)(minc);
00608 }
00609
00610 DEBUGMSG("negociate: get_nb for max pool (%d)", imax);
00611 if (cp_thread_pool_get_nb(maxc->owner->pool, maxc->action, maxc->prm))
00612 maxc->count++;
00613 }
00614 }
00615
00616 #endif
00617
00618 void cp_pooled_thread_client_negociate(cp_pooled_thread_client_interface *c)
00619 {
00620 int curr_load;
00621 if (c->owner->bypass) return;
00622
00623 curr_load = (*c->report_load)(c);
00624
00625 #ifdef __TRACE__
00626 DEBUGMSG("negociate for client %lx - load == %d\n", (long) c, curr_load);
00627 #endif
00628
00629 if (curr_load > SCHEDULER_THRESHOLD && c->count < c->max)
00630 {
00631 cp_thread *t =
00632 cp_thread_pool_get_nb(c->owner->pool, c->action, c->action_prm);
00633 if (t)
00634 {
00635 #ifdef __TRACE__
00636 DEBUGMSG("negociate: got thread from pool");
00637 #endif
00638 c->count++;
00639 return;
00640 }
00641 }
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652 {
00653 int other_load;
00654 int *inc;
00655 cp_thread *t;
00656 cp_pooled_thread_client_interface *load, *unload;
00657 cp_pooled_thread_client_interface *other =
00658 choose_random_client(c->owner);
00659
00660 other_load = (*other->report_load)(other);
00661 if (abs(curr_load - other_load) < SCHEDULER_THRESHOLD) return;
00662 if (curr_load > other_load)
00663 {
00664 if (c->count >= c->max ||
00665 other->count <= other->min) return;
00666 #ifdef __TRACE__
00667 DEBUGMSG("negociate: switching a thread to this pool");
00668 #endif
00669 inc = &c->count;
00670 other->count--;
00671 load = c;
00672 unload = other;
00673 }
00674 else
00675 {
00676 if (c->count <= c->min ||
00677 other->count >= other->max) return;
00678 #ifdef __TRACE__
00679 DEBUGMSG("negociate: switching a thread to other pool");
00680 #endif
00681 inc = &other->count;
00682 c->count--;
00683 load = other;
00684 unload = c;
00685 }
00686 #ifdef __TRACE__
00687 DEBUGMSG("negociate: shrinking unload pool");
00688 #endif
00689 (*unload->shrink)(unload);
00690 t = cp_thread_pool_get_nb(load->owner->pool,
00691 load->action, load->action_prm);
00692
00693 if (t) (*inc)++;
00694 }
00695 }
00696