thread.c

00001 #include "thread.h"
00002 #include "common.h"
00003 #include "log.h"
00004 
00010 #include <stdlib.h>
00011 #include <time.h>
00012 #include <errno.h>
00013 
00014 #ifdef _WINDOWS
00015 #define __WINDOWS_WINNT 0x0400
00016 #include <Windows.h>
00017 #endif
00018 
00020 static int cp_thread_pool_set_available(cp_thread_pool *owner, cp_pooled_thread *pt);
00021 
00022 long cp_pooled_thread_get_id(cp_pooled_thread *pt)
00023 {
00024     long id = 0;
00025 #ifdef CP_HAS_PTHREAD_GETUNIQUE_NP
00026     pthread_getunique_np(*pt->worker, &id);
00027 #else
00028     id = (long) *pt->worker;
00029 #endif
00030 
00031     return id;
00032 }
00033 
00034 cp_pooled_thread *cp_pooled_thread_create(cp_thread_pool *owner)
00035 {
00036     int rc;
00037     cp_pooled_thread *pt = calloc(1, sizeof(cp_pooled_thread));
00038 
00039     if (pt == NULL) 
00040     {
00041         cp_error(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate pooled thread");
00042         errno = ENOMEM;
00043         return NULL;
00044     }
00045     pt->worker = calloc(1, sizeof(cp_thread));
00046     if (pt->worker == NULL)
00047     {
00048         cp_error(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread");
00049         errno = ENOMEM;
00050         return NULL;
00051     }
00052         
00053     pt->owner = owner;
00054 
00055     pt->suspend_lock = (cp_mutex *) malloc(sizeof(cp_mutex));
00056     if (pt->suspend_lock == NULL) 
00057         goto THREAD_CREATE_CANCEL;
00058     if ((rc = cp_mutex_init(pt->suspend_lock, NULL)))
00059     {
00060         cp_error(rc, "starting up pooled thread");
00061         goto THREAD_CREATE_CANCEL;
00062     }
00063 
00064     pt->suspend_cond = (cp_cond *) malloc(sizeof(cp_cond));
00065     if ((rc = cp_cond_init(pt->suspend_cond, NULL)))
00066     {
00067         cp_error(rc, "starting up pooled thread");
00068         cp_mutex_destroy(pt->suspend_lock);
00069         free(pt->suspend_lock);
00070         goto THREAD_CREATE_CANCEL;
00071     }
00072 
00073     pt->done = 0;
00074     pt->wait = 1;
00075 
00076     cp_thread_create(*pt->worker, NULL, cp_pooled_thread_run, pt);
00077 
00078     pt->id = cp_pooled_thread_get_id(pt);
00079     cp_thread_detach(*pt->worker); //~~ check
00080 
00081     return pt;
00082 
00083 THREAD_CREATE_CANCEL:
00084     free(pt->worker);
00085     free(pt);
00086     
00087     return NULL;
00088 }
00089         
00090 int cp_pooled_thread_stop(cp_pooled_thread *pt)
00091 {
00092     int rc = 0;
00093 
00094     cp_mutex_lock(pt->suspend_lock);
00095     pt->action = NULL;
00096     if (pt->stop_fn)
00097     {
00098         rc = pt->stop_prm ? (*pt->stop_fn)(pt->stop_prm) : 
00099                               (*pt->stop_fn)(pt->action_prm);
00100     }
00101 
00102     pt->done = 1;
00103     pt->wait = 0;
00104 
00105     cp_cond_signal(pt->suspend_cond);
00106     cp_mutex_unlock(pt->suspend_lock);
00107 
00108 //  cp_thread_join(*pt->worker, NULL); //~~ rc
00109 
00110     return rc;
00111 }
00112 
00113 void cp_pooled_thread_destroy(cp_pooled_thread *t)
00114 {
00115 #ifdef __TRACE__
00116     DEBUGMSG("destroying cp_pooled_thread %lX", t);
00117 #endif
00118     cp_mutex_destroy(t->suspend_lock);
00119     free(t->suspend_lock);
00120     cp_cond_destroy(t->suspend_cond);
00121     free(t->suspend_cond);
00122     free(t->worker);
00123     free(t);
00124 }
00125 
00126 int cp_pooled_thread_release(cp_pooled_thread *t)
00127 {
00128     return 0;
00129 }
00130 
00131 int cp_pooled_thread_run_task(cp_pooled_thread *pt, 
00132                               cp_thread_action action, 
00133                               void *prm)
00134 {
00135 #ifdef __TRACE__
00136     DEBUGMSG("cp_pooled_thread_run_task: action %lx, prm %lx\n", 
00137              (long) action, (long) prm);
00138 #endif
00139 
00140     pt->action = action;
00141     pt->action_prm = prm;
00142 
00143     if (action == NULL)
00144     {
00145         cp_error(CP_INVALID_FUNCTION_POINTER, "missing thread function");
00146         return CP_INVALID_FUNCTION_POINTER;
00147     }
00148 
00149     /* signal thread to run */
00150     cp_mutex_lock(pt->suspend_lock);
00151     pt->wait = 0;
00152     cp_cond_signal(pt->suspend_cond);
00153     cp_mutex_unlock(pt->suspend_lock);
00154 
00155     return 0;
00156 }
00157 
00158 int cp_pooled_thread_run_stoppable_task(cp_pooled_thread *pt, 
00159                                         cp_thread_action action, 
00160                                         void *action_prm, 
00161                                         cp_thread_stop_fn stop_fn,
00162                                         void *stop_prm)
00163 {
00164     pt->stop_fn = stop_fn;
00165     pt->stop_prm = stop_prm;
00166     return cp_pooled_thread_run_task(pt, action, action_prm);
00167 }
00168 
00169 void *cp_pooled_thread_run(void *prm)
00170 {
00171     cp_pooled_thread *pt = (cp_pooled_thread *) prm;
00172 
00173 #ifdef __TRACE__
00174     DEBUGMSG("cp_pooled_thread (%lx) starts", (long) pt);
00175 #endif
00176 
00177     while (!pt->done && pt->owner->running)
00178     {
00179         cp_mutex_lock(pt->suspend_lock);
00180         while (pt->wait && (!pt->done) && pt->owner->running)
00181             cp_cond_wait(pt->suspend_cond, pt->suspend_lock);
00182         cp_mutex_unlock(pt->suspend_lock);
00183 
00184         if (pt->done || !pt->owner->running) break;
00185 #ifdef __TRACE__
00186         DEBUGMSG("cp_pooled_thread_run: action is %lX, action_prm is %lX", pt->action, pt->action_prm);
00187 #endif
00188         if (pt->action) /* run user defined function if set */
00189         {
00190 #ifdef __TRACE__
00191             DEBUGMSG("pooled thread (%lX) handles action (%lX)", (long) pt, (long) pt->action);
00192 #endif
00193             (*pt->action)(pt->action_prm);
00194         }
00195         if (pt->done || !pt->owner->running) break;
00196 
00197         pt->wait = 1;
00198         /* performed work, notify pool */
00199         cp_thread_pool_set_available(pt->owner, pt);
00200     }
00201 
00202 #ifdef __TRACE__
00203     DEBUGMSG("cp_pooled_thread (%lx) exits", (long) pt);
00204 #endif
00205 
00206     cp_pooled_thread_destroy(pt);
00207 
00208     return NULL;
00209 }
00210 
00211 static int cp_thread_pool_set_available(cp_thread_pool *pool, 
00212                                         cp_pooled_thread *thread)
00213 {
00214     cp_mutex_lock(pool->pool_lock);
00215     cp_hashlist_remove(pool->in_use, &thread->id);
00216     cp_list_append(pool->free_pool, thread);
00217 //  pool->size--;
00218     cp_cond_signal(pool->pool_cond);
00219     cp_mutex_unlock(pool->pool_lock);
00220 
00221     return 0;
00222 }
00223 
00224     
00225 int cp_thread_pool_wait(cp_thread_pool *pool)
00226 {
00227     while (cp_hashlist_item_count(pool->in_use) && pool->running)
00228     {
00229         cp_mutex_lock(pool->pool_lock);
00230         cp_cond_wait(pool->pool_cond, pool->pool_lock);
00231 
00232         if (pool->running && 
00233             cp_hashlist_item_count(pool->in_use)) /* wake up someone else */
00234             cp_cond_signal(pool->pool_cond);
00235         cp_mutex_unlock(pool->pool_lock);
00236     }
00237 
00238     return 0;
00239 }
00240 
00241 int cp_thread_pool_stop(cp_thread_pool *pool)
00242 {
00243     cp_hashlist_iterator *i;
00244     cp_list_iterator *j;
00245     cp_pooled_thread *pt;
00246 
00247     pool->running = 0;
00248 
00249     i = cp_hashlist_create_iterator(pool->in_use, COLLECTION_LOCK_READ);
00250     while ((pt = (cp_pooled_thread *) cp_hashlist_iterator_next_value(i)))
00251         cp_pooled_thread_stop(pt);
00252     cp_hashlist_iterator_destroy(i);
00253 
00254     j = cp_list_create_iterator(pool->free_pool, COLLECTION_LOCK_READ);
00255     while ((pt = (cp_pooled_thread *) cp_list_iterator_next(j)))
00256         cp_pooled_thread_stop(pt);
00257     cp_list_iterator_destroy(j);
00258 
00259     return 0;
00260 }
00261 
00262 cp_thread_pool *cp_thread_pool_create(int min_size, int max_size)
00263 {
00264     int rc;
00265     cp_thread_pool *pool = calloc(1, sizeof(cp_thread_pool));
00266     if (pool == NULL)
00267         cp_fatal(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread pool structure");
00268 
00269     pool->min_size = min_size;
00270     pool->max_size = max_size;
00271 
00272     pool->running = 1;
00273 
00274     pool->free_pool = cp_list_create();
00275     if (pool->free_pool == NULL)
00276         cp_fatal(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread pool list");
00277 
00278     pool->in_use = cp_hashlist_create(10, cp_hash_long, cp_hash_compare_long);
00279     if (pool->in_use == NULL)
00280         cp_fatal(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread pool running list");
00281 
00282     pool->pool_lock = (cp_mutex *) malloc(sizeof(cp_mutex));
00283     if (pool->pool_lock == NULL)
00284     {
00285         cp_error(CP_MEMORY_ALLOCATION_FAILURE, "can\'t create mutex");
00286         goto THREAD_POOL_CREATE_CANCEL;
00287     }
00288     if ((rc = cp_mutex_init(pool->pool_lock, NULL))) 
00289     {
00290         cp_error(rc, "can\'t create mutex");
00291         goto THREAD_POOL_CREATE_CANCEL;
00292     }
00293 
00294     pool->pool_cond = (cp_cond *) malloc(sizeof(cp_cond));
00295     if (pool->pool_cond == NULL)
00296     {
00297         cp_error(rc, "can\'t create condition variable");
00298         cp_mutex_destroy(pool->pool_lock);
00299         free(pool->pool_lock);
00300         goto THREAD_POOL_CREATE_CANCEL;
00301     }
00302     if ((rc = cp_cond_init(pool->pool_cond, NULL)))
00303     {
00304         cp_error(rc, "can\'t create condition variable");
00305         free(pool->pool_cond);
00306         cp_mutex_destroy(pool->pool_lock);
00307         free(pool->pool_lock);
00308         goto THREAD_POOL_CREATE_CANCEL;
00309     }
00310 
00311     for ( ; pool->size < pool->min_size; pool->size++)
00312     {
00313         cp_pooled_thread *pt = cp_pooled_thread_create(pool);
00314         if (pt == NULL)
00315             cp_fatal(CP_THREAD_CREATION_FAILURE, "can\'t create thread pool (created %d threads, minimum pool size is %d", pool->size, pool->min_size);
00316         cp_list_append(pool->free_pool, pt);
00317     }
00318 
00319     return pool;
00320 
00321 THREAD_POOL_CREATE_CANCEL:
00322     cp_list_destroy_custom(pool->free_pool, 
00323             (cp_destructor_fn) cp_pooled_thread_destroy);
00324     cp_hashlist_destroy_custom(pool->in_use, NULL, 
00325             (cp_destructor_fn) cp_pooled_thread_destroy);
00326     free(pool);
00327     return NULL;
00328 }
00329 
00330 cp_thread *cp_thread_pool_get_impl(cp_thread_pool *pool, 
00331                                    cp_thread_action action, 
00332                                    void *action_prm, 
00333                                    cp_thread_stop_fn stop_fn,
00334                                    void *stop_prm,
00335                                    int block)
00336 {
00337     cp_pooled_thread *pt = NULL;
00338     cp_mutex_lock(pool->pool_lock);
00339         
00340 #ifdef __TRACE__
00341     DEBUGMSG("cp_thread_pool_get_impl (%d) pool size = %d max size = %d\n", block, pool->size, pool->max_size);
00342 #endif
00343 
00344     pt = cp_list_remove_head(pool->free_pool);
00345     if (pt == NULL)
00346     {
00347         if (pool->size < pool->max_size)
00348         {
00349             pt = cp_pooled_thread_create(pool);
00350             if (pt)
00351                 pool->size++;
00352         }
00353 
00354         if (pt == NULL) /* no thread available and poolsize == max */
00355         {
00356             if (!block)  /* asked not to block, return NULL */
00357             {
00358                 cp_mutex_unlock(pool->pool_lock);
00359                 return NULL;
00360             }
00361 
00362             /* wait for a thread to be released to the pool */
00363 #ifdef _WINDOWS
00364             cp_mutex_unlock(pool->pool_lock);
00365 #endif
00366             while (pool->running && cp_list_is_empty(pool->free_pool))
00367                 cp_cond_wait(pool->pool_cond, pool->pool_lock);
00368 
00369             if (pool->running)
00370                 pt = cp_list_remove_head(pool->free_pool);
00371 
00372             if (pt == NULL) /* shouldn't be happening except for shutdown */
00373             {
00374                 cp_mutex_unlock(pool->pool_lock);
00375                 return NULL;
00376             }
00377         }
00378     }
00379 
00380     cp_hashlist_append(pool->in_use, &pt->id, pt);
00381     cp_mutex_unlock(pool->pool_lock);
00382 
00383     cp_pooled_thread_run_stoppable_task(pt, action, action_prm, stop_fn, stop_prm);
00384 
00385     return pt->worker;
00386 }
00387 
00388 cp_thread *cp_thread_pool_get(cp_thread_pool *pool, 
00389                               cp_thread_action action, 
00390                               void *prm)
00391 {
00392     return cp_thread_pool_get_impl(pool, action, prm, NULL, NULL, 1);
00393 }
00394 
00395 cp_thread *cp_thread_pool_get_stoppable(cp_thread_pool *pool, 
00396                                         cp_thread_action action, 
00397                                         void *action_prm, 
00398                                         cp_thread_stop_fn stop_fn, 
00399                                         void *stop_prm)
00400 {
00401     return cp_thread_pool_get_impl(pool, action, action_prm, stop_fn, stop_prm, 1);
00402 }
00403 
00404 cp_thread *cp_thread_pool_get_nb(cp_thread_pool *pool, 
00405                                  cp_thread_action action, 
00406                                  void *prm)
00407 {
00408     return cp_thread_pool_get_impl(pool, action, prm, NULL, NULL, 0);
00409 }
00410     
00411 cp_thread *cp_thread_pool_get_stoppable_nb(cp_thread_pool *pool, 
00412                                            cp_thread_action action, 
00413                                            void *action_prm,
00414                                            cp_thread_stop_fn stop_fn, 
00415                                            void *stop_prm)
00416 {
00417     return cp_thread_pool_get_impl(pool, action, action_prm, stop_fn, stop_prm, 0);
00418 }
00419 
00420 void cp_thread_pool_destroy(cp_thread_pool *pool)
00421 {
00422 #ifdef __TRACE__
00423     DEBUGMSG("stopping cp_thread_pool %lX", pool);
00424 #endif
00425 
00426     if (pool->running) cp_thread_pool_stop(pool);
00427 
00428     cp_list_destroy(pool->free_pool);
00429 //  cp_list_destroy_custom(pool->free_pool, 
00430 //              (cp_destructor_fn) cp_pooled_thread_destroy);
00431     cp_hashlist_destroy(pool->in_use);
00432 //  cp_hashlist_destroy_custom(pool->in_use, NULL, 
00433 //              (cp_destructor_fn) cp_pooled_thread_destroy);
00434     cp_mutex_destroy(pool->pool_lock);
00435     free(pool->pool_lock);
00436     cp_cond_destroy(pool->pool_cond);
00437     free(pool->pool_cond);
00438 
00439     free(pool);
00440 }
00441 
00442 int cp_thread_pool_count_available(cp_thread_pool *pool)
00443 {
00444     return (pool->max_size - pool->size) + 
00445             cp_list_item_count(pool->free_pool);
00446 }
00447 
00448 
00449 /* **************************************************************************
00450  *                                                                          *
00451  *                      thread management framework                         *
00452  *                                                                          *
00453  ************************************************************************** */
00454 
00455 cp_pooled_thread_client_interface *
00456     cp_pooled_thread_client_interface_create
00457         (cp_pooled_thread_scheduler *owner, 
00458          void *client, 
00459          int min_threads, 
00460          int max_threads,
00461          cp_pooled_thread_report_load report_load,
00462          cp_pooled_thread_shrink shrink,
00463          cp_thread_action action,
00464          void *action_prm, 
00465          cp_thread_stop_fn stop_fn, 
00466          void *stop_prm)
00467 {
00468     cp_pooled_thread_client_interface *ci = 
00469         calloc(1, sizeof(cp_pooled_thread_client_interface));
00470     if (client == NULL)
00471         cp_fatal(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread pool client interface");
00472 
00473     ci->owner = owner;
00474     ci->client = client;
00475     ci->min = min_threads;
00476     ci->max = max_threads;
00477     ci->report_load = report_load;
00478     ci->shrink = shrink;
00479     ci->action = action;
00480     ci->action_prm = action_prm;
00481     ci->stop_fn = stop_fn;
00482     ci->stop_prm = stop_prm;
00483 
00484     cp_pooled_thread_scheduler_register_client(owner, ci);
00485 
00486     return ci;
00487 }
00488 
00489 
00490 void cp_pooled_thread_client_interface_destroy
00491     (cp_pooled_thread_client_interface *client)
00492 {
00493     free(client);
00494 }
00495 
00496 cp_pooled_thread_scheduler *cp_pooled_thread_scheduler_create(cp_thread_pool *pool)
00497 {
00498     cp_pooled_thread_scheduler *scheduler = 
00499         calloc(1, sizeof(cp_pooled_thread_scheduler));
00500     if (scheduler == NULL)
00501         cp_fatal(CP_MEMORY_ALLOCATION_FAILURE, "can\'t allocate thread manager");
00502 
00503     scheduler->pool = pool;
00504     scheduler->client_list = cp_vector_create(20);
00505 
00506 #ifdef CP_HAS_SRANDOM
00507     srandom(time(NULL));
00508 #else
00509     srand(time(NULL));
00510 #endif
00511 
00512     return scheduler;
00513 }
00514 
00515 void cp_pooled_thread_scheduler_destroy(cp_pooled_thread_scheduler *scheduler)
00516 {
00517     cp_vector_destroy(scheduler->client_list);
00518     free(scheduler);
00519 }
00520 
00521 void cp_pooled_thread_scheduler_register_client
00522         (cp_pooled_thread_scheduler *scheduler, 
00523          cp_pooled_thread_client_interface *client)
00524 {
00525     cp_vector_add_element(scheduler->client_list, client);
00526 }
00527 
00528 cp_pooled_thread_client_interface *
00529     choose_random_client(cp_pooled_thread_scheduler *scheduler)
00530 {
00531     int index = 
00532 #ifdef CP_HAS_RANDOM
00533         random() 
00534 #else
00535         rand()
00536 #endif
00537         % (cp_vector_size(scheduler->client_list));
00538     return (cp_pooled_thread_client_interface *) 
00539         cp_vector_element_at(scheduler->client_list, index);
00540 }
00541 
00542 #define SCHEDULER_THRESHOLD 1
00543 
00544 void cp_pooled_thread_client_get_nb(cp_pooled_thread_client_interface *c)
00545 {
00546     if (cp_thread_pool_get_nb(c->owner->pool, c->action, c->action_prm))
00547         c->count++; 
00548 }
00549 
00550 void cp_pooled_thread_client_get(cp_pooled_thread_client_interface *c)
00551 {
00552     c->count++;
00553     cp_thread_pool_get(c->owner->pool, c->action, c->action_prm);
00554 }
00555 
00556 void cp_pooled_thread_client_get_stoppable_nb(cp_pooled_thread_client_interface *c)
00557 {
00558     if (cp_thread_pool_get_stoppable_nb(c->owner->pool, c->action, c->action_prm, c->stop_fn, c->stop_prm))
00559         c->count++; 
00560 }
00561 
00562 void cp_pooled_thread_client_get_stoppable(cp_pooled_thread_client_interface *c)
00563 {
00564     c->count++;
00565     cp_thread_pool_get_stoppable(c->owner->pool, c->action, c->action_prm, c->stop_fn, c->stop_prm);
00566 }
00567 
00568 #if 0 //~~ for future optimization
00569 void cp_pooled_thread_client_negociate(cp_pooled_thread_client_interface *c)
00570 {
00571     int curr_load = (*c->report_load)(c);
00572     cp_pooled_thread_scheduler *scheduler = c->owner;
00573     Vector *clients = scheduler->client_list;
00574     int i, min, imin, max, imax, cval, max_count;
00575     cp_pooled_thread_client_interface *other;
00576 
00577     int clen = cp_vector_size(clients);
00578 
00579     if (clen == 0) return; //~~ warning for bad usage
00580 
00581     min = INT_MAX;
00582     max = -1;
00583     max_count = -1;
00584 
00585     for (i = 0; i < clen; i++)
00586     {
00587         other = (cp_pooled_thread_client_interface *) cp_vector_element_at(clients, i);
00588         cval = (*other->report_load)(other);
00589 
00590         if (other->count < other->min) //~~ what's with the switching
00591             cp_thread_pool_get_nb(other->owner->pool, other->action, other->prm);
00592 
00593         DEBUGMSG("negociate: pool %d: load = %d, %d <= %d <= %d", i, cval, other->min, other->count, other->max);
00594         if (cval > max) { max = cval; imax = i; }
00595         if (cval < min || (cval == min && other->count > max_count)) { min = cval; imin = i; max_count = other->count; }
00596     }
00597     DEBUGMSG("negociate: min = %d, max = %d", min, max);
00598     if (abs(max - min) > SCHEDULER_THRESHOLD)
00599     {
00600         cp_pooled_thread_client_interface *maxc = cp_vector_element_at(clients, imax);
00601         cp_pooled_thread_client_interface *minc = cp_vector_element_at(clients, imin);
00602 
00603         if (cp_thread_pool_count_available(scheduler->pool) == 0 &&
00604             minc->count > minc->min)
00605         {
00606             DEBUGMSG("negociate: shrinking min pool (%d)", imin);
00607             (*minc->shrink)(minc);
00608         }
00609             
00610         DEBUGMSG("negociate: get_nb for max pool (%d)", imax);
00611         if (cp_thread_pool_get_nb(maxc->owner->pool, maxc->action, maxc->prm))
00612             maxc->count++;
00613     }
00614 }
00615         
00616 #endif
00617 
00618 void cp_pooled_thread_client_negociate(cp_pooled_thread_client_interface *c)
00619 {
00620     int curr_load;
00621     if (c->owner->bypass) return;
00622 
00623     curr_load = (*c->report_load)(c);
00624 
00625 #ifdef __TRACE__
00626     DEBUGMSG("negociate for client %lx - load == %d\n", (long) c, curr_load);
00627 #endif
00628     /* if needed, try get idle thread from pool */
00629     if (curr_load > SCHEDULER_THRESHOLD && c->count < c->max) 
00630     {
00631         cp_thread *t = 
00632             cp_thread_pool_get_nb(c->owner->pool, c->action, c->action_prm);
00633         if (t) /* got a thread, return */
00634         {
00635 #ifdef __TRACE__
00636     DEBUGMSG("negociate: got thread from pool");
00637 #endif
00638             c->count++;
00639             return;
00640         }
00641     }
00642 
00643     /* 
00644      * following code runs if 
00645      * 
00646      * (1) no threads needed - check if someone else (other client) needs one
00647      * 
00648      * or
00649      * 
00650      * (2) thread needed but no idle threads in pool, ask someone else
00651      */
00652     {
00653         int other_load;
00654         int *inc; /* the thread count to be incremented on switch */
00655         cp_thread *t;
00656         cp_pooled_thread_client_interface *load, *unload;
00657         cp_pooled_thread_client_interface *other = 
00658             choose_random_client(c->owner);
00659 
00660         other_load = (*other->report_load)(other);
00661         if (abs(curr_load - other_load) < SCHEDULER_THRESHOLD) return; 
00662         if (curr_load > other_load) /* other releases thread */
00663         {
00664             if (c->count >= c->max || 
00665                 other->count <= other->min) return; /* no switch */
00666 #ifdef __TRACE__
00667             DEBUGMSG("negociate: switching a thread to this pool");
00668 #endif
00669             inc = &c->count;
00670             other->count--;
00671             load = c;
00672             unload = other;
00673         }
00674         else /* client releases thread */
00675         {
00676             if (c->count <= c->min ||
00677                 other->count >= other->max) return; /* no switch */
00678 #ifdef __TRACE__
00679             DEBUGMSG("negociate: switching a thread to other pool");
00680 #endif
00681             inc = &other->count;
00682             c->count--;
00683             load = other;
00684             unload = c;
00685         }
00686 #ifdef __TRACE__
00687         DEBUGMSG("negociate: shrinking unload pool");
00688 #endif
00689         (*unload->shrink)(unload);
00690         t = cp_thread_pool_get_nb(load->owner->pool, //~~ get_nb?
00691                                load->action, load->action_prm); 
00692 
00693         if (t) (*inc)++; //~~ maybe better to get blocking and be sure?
00694     }
00695 }
00696 

Generated on Mon Dec 5 23:00:22 2011 for cprops by  doxygen 1.4.7