priority_list.c

00001 
00006 /* @file
00007  * 
00008  * priority list implementation. 
00009  * <p>
00010  * features
00011  * <p>
00012  * <ul>
00013  * <li> immediate priority queue
00014  * <li> simple weight based distribution on normal priority queues
00015  * </ul>
00016  */
00017 
00018 #include <stdlib.h>
00019 #include <stdio.h>
00020 #include <errno.h>
00021 
00022 #include "collection.h"
00023 #include "common.h"
00024 #include "priority_list.h"
00025 
00026 /* fwd declaration for internal lock function */
00027 int cp_priority_list_txlock(cp_priority_list *list, int type);
00028 /* fwd declaration for internal unlock function */
00029 int cp_priority_list_txunlock(cp_priority_list *list);
00030 
00034 static void cp_priority_list_destroy_internal(cp_priority_list *list);
00035 
00036 void cp_priority_list_destroy(cp_priority_list *list)
00037 {
00038     cp_priority_list_destroy_by_option(list, list->mode);
00039 }
00040 
00041 void cp_priority_list_destroy_by_option(cp_priority_list *list, int mode)
00042 {
00043     int i;
00044 
00045     if (list->immediate) cp_list_destroy_by_option(list->immediate, mode);
00046 
00047     if (list->weight) free(list->weight);
00048 
00049     for (i = 0; i < list->normal_priority_queues; i++)
00050         cp_list_destroy_by_option(list->normal[i], mode);
00051 
00052     if (list->lock)
00053     {
00054         cp_lock_destroy(list->lock);
00055         free(list->lock);
00056     }
00057 
00058     free(list->normal);
00059     free(list);
00060 }
00061 
00062 
00063 cp_priority_list *
00064     cp_priority_list_create_by_option(int immediate, 
00065                                       int normal_priority_queues,
00066                                       int *weight, 
00067                                       cp_compare_fn compare_fn,
00068                                       cp_copy_fn copy_fn, 
00069                                       cp_destructor_fn item_destructor,
00070                                       int mode)
00071 {
00072     cp_priority_list *list;
00073     int i;
00074 
00075     list = (cp_priority_list *) calloc(1, sizeof(cp_priority_list));
00076     if (list == NULL) 
00077     {
00078         errno = ENOMEM;
00079         return NULL;
00080     }
00081     
00082     list->immediate = NULL;
00083     list->normal = NULL;
00084     list->normal_priority_queues = normal_priority_queues;
00085 
00086     if (normal_priority_queues > 0) 
00087     {
00088         list->weight = (int *) malloc(normal_priority_queues * sizeof(int));
00089         if (list->weight == NULL) 
00090         {
00091             cp_priority_list_destroy_internal(list);
00092             errno = ENOMEM;
00093             return NULL;
00094         }
00095 
00096         list->normal = (cp_list **) malloc(normal_priority_queues * sizeof(cp_list *));
00097         if (list->normal == NULL) 
00098         {
00099             cp_priority_list_destroy_internal(list);
00100             errno = ENOMEM;
00101             return NULL;
00102         }
00103 
00104         for (i = 0; i < normal_priority_queues; i++) 
00105         {
00106             list->weight[i] = weight[i];
00107 
00108             list->normal[i] = cp_list_create_list(mode | COLLECTION_MODE_NOSYNC,
00109                                                   compare_fn, copy_fn, 
00110                                                   item_destructor);
00111             if (list->normal[i] == NULL) 
00112             {
00113                 cp_priority_list_destroy_internal(list);
00114                 errno = ENOMEM;
00115                 return NULL;
00116             }
00117         }
00118     }
00119 
00120     if (immediate) 
00121     {
00122         list->immediate = cp_list_create_list(mode | COLLECTION_MODE_NOSYNC,
00123                                               compare_fn, copy_fn,
00124                                               item_destructor);
00125         if (list->immediate == NULL) 
00126         {
00127             cp_priority_list_destroy_internal(list);
00128             errno = ENOMEM;
00129             return NULL;
00130         }
00131     } 
00132     else 
00133         list->immediate = NULL;
00134 
00135     if ((list->lock = malloc(sizeof(cp_lock))) == NULL)
00136     {
00137         cp_priority_list_destroy_internal(list);
00138         errno = ENOMEM;
00139         return NULL;
00140     }
00141     if (cp_lock_init(list->lock, NULL) == -1)
00142     {
00143         cp_priority_list_destroy_internal(list);
00144         return NULL;
00145     }
00146 
00147     list->mode = mode;
00148     list->cycle_position = 0;
00149     list->distribution_counter = 0;
00150     list->compare_fn = compare_fn;
00151 
00152 
00153     return list;    
00154 }
00155 
00156 void *cp_priority_list_insert(cp_priority_list *list, void *item, int priority)
00157 {
00158     return cp_priority_list_insert_by_option(list, item, priority, list->mode);
00159 }
00160 
00161 void *cp_priority_list_insert_by_option(cp_priority_list *list, 
00162                                         void *item, 
00163                                         int priority, 
00164                                         int mode)
00165 {
00166     void *res = NULL;
00167     int err = 0;
00168 
00169     if (!(mode & COLLECTION_MODE_NOSYNC))
00170         if (cp_priority_list_txlock(list, COLLECTION_LOCK_WRITE)) return NULL;
00171 
00172     /* if no mutliples allowed, check all inner lists before adding */
00173     if (!(mode & COLLECTION_MODE_MULTIPLE_VALUES))
00174     {
00175         int i;
00176         
00177         if (list->compare_fn == NULL)
00178         {
00179             err = CP_INVALID_FUNCTION_POINTER;
00180             goto INSERT_DONE;
00181         }
00182         if (list->immediate != NULL && cp_list_search(list->immediate, item))
00183         {
00184             err = CP_ITEM_EXISTS;
00185             goto INSERT_DONE;
00186         }
00187         
00188         for (i = 0; i < list-> normal_priority_queues; i++)
00189             if (cp_list_search(list->normal[i], item))
00190             {
00191                 err = CP_ITEM_EXISTS;
00192                 goto INSERT_DONE;
00193             }
00194     }
00195     
00196     if (priority == 0 && list->immediate != NULL) 
00197         res = cp_list_insert(list->immediate, item);
00198     else if (priority > 0 && priority <= list->normal_priority_queues) 
00199         res = cp_list_insert(list->normal[priority - 1], item);
00200     else    // if priority is not supported, add to lowest priority queue
00201     {
00202 //        res = cp_list_insert(list->normal[list->normal_priority_queues - 1], item);
00203         res = NULL;
00204         err = EINVAL;
00205     }
00206     
00207 
00208     if (res) list->item_count++; 
00209     
00210 INSERT_DONE:
00211     if (!(mode & COLLECTION_MODE_NOSYNC)) cp_priority_list_txunlock(list);
00212 
00213     if (err) errno = err;
00214 
00215     return res;
00216 }
00217 
00218 int cp_priority_list_is_empty(cp_priority_list *list)
00219 {
00220     int rc;
00221     int empty;
00222 
00223     if ((rc = cp_priority_list_txlock(list, COLLECTION_LOCK_READ))) return rc;
00224     
00225     empty = list->item_count == 0;
00226 
00227     cp_priority_list_txunlock(list);
00228 
00229     return empty;
00230 }
00231 
00232 void *cp_priority_list_get_next_by_option(cp_priority_list *list, int mode)
00233 {
00234     void *res = NULL;
00235     int start_queue;
00236 
00237     if (cp_priority_list_txlock(list, COLLECTION_LOCK_WRITE)) return NULL;;
00238 
00239     if (list->immediate) res = cp_list_remove_tail(list->immediate);
00240     if (res == NULL && list->normal_priority_queues) 
00241     {
00242         if (list->distribution_counter == list->weight[list->cycle_position]) 
00243         {
00244             list->distribution_counter = 0;
00245             list->cycle_position = 
00246                 (list->cycle_position + 1) % list->normal_priority_queues;
00247         }
00248 
00249         list->distribution_counter++;
00250         start_queue = list->cycle_position;
00251         do 
00252         {
00253             res = cp_list_remove_tail(list->normal[list->cycle_position]);
00254             if (res == NULL) 
00255             {
00256                 list->cycle_position = (list->cycle_position + 1) % list->normal_priority_queues;
00257                 list->distribution_counter = 1;
00258             }
00259             else
00260                 break;
00261         }
00262         while (list->cycle_position != start_queue);
00263             
00264     }
00265 
00266     if (res) list->item_count--;
00267     cp_priority_list_txunlock(list);
00268 
00269     return res;
00270 }
00271 
00272 int cp_priority_list_lock_internal(cp_priority_list *list, int lock_mode)
00273 {
00274     int rc = -1;
00275 
00276     switch (lock_mode) 
00277     {
00278         case COLLECTION_LOCK_READ:
00279                 rc = cp_lock_rdlock(list->lock);
00280                 break;
00281 
00282         case COLLECTION_LOCK_WRITE:
00283                 rc = cp_lock_wrlock(list->lock);
00284                 break;
00285 
00286             case COLLECTION_LOCK_NONE:
00287                 rc = 0;
00288                 break;
00289                 
00290         default:
00291                 errno = EINVAL;
00292     }
00293 
00294     return rc;
00295 }
00296 
00297 int cp_priority_list_unlock_internal(cp_priority_list *list)
00298 {
00299     return cp_lock_unlock(list->lock);
00300 }
00301 
00302 int cp_priority_list_txlock(cp_priority_list *list, int type)
00303 {
00304     if (list->mode & COLLECTION_MODE_NOSYNC) return 0;
00305     if (list->mode & COLLECTION_MODE_IN_TRANSACTION && 
00306         list->txtype == COLLECTION_LOCK_WRITE)
00307     {
00308         cp_thread self = cp_thread_self();
00309         if (cp_thread_equal(self, list->txowner)) return 0;
00310     }
00311     return cp_priority_list_lock_internal(list, type);
00312 }
00313 
00314 int cp_priority_list_txunlock(cp_priority_list *list)
00315 {
00316     if (list->mode & COLLECTION_MODE_NOSYNC) return 0;
00317     if (list->mode & COLLECTION_MODE_IN_TRANSACTION && 
00318         list->txtype == COLLECTION_LOCK_WRITE)
00319     {
00320         cp_thread self = cp_thread_self();
00321         if (cp_thread_equal(self, list->txowner)) return 0;
00322     }
00323     return cp_priority_list_unlock_internal(list);
00324 }
00325 
00326 /* lock and set the transaction indicators */
00327 int cp_priority_list_lock(cp_priority_list *list, int type)
00328 {
00329     int rc;
00330     if ((list->mode & COLLECTION_MODE_NOSYNC)) return EINVAL;
00331     if ((rc = cp_priority_list_lock_internal(list, type))) return rc;
00332     list->txtype = type;
00333     list->txowner = cp_thread_self();
00334     list->mode |= COLLECTION_MODE_IN_TRANSACTION;
00335     return 0;
00336 }
00337 
00338 /* unset the transaction indicators and unlock */
00339 int cp_priority_list_unlock(cp_priority_list *list)
00340 {
00341     cp_thread self = cp_thread_self();
00342     if (list->txowner == self)
00343     {
00344         list->txtype = 0;
00345         list->txowner = 0;
00346         list->mode ^= COLLECTION_MODE_IN_TRANSACTION;
00347     }
00348     else if (list->txtype == COLLECTION_LOCK_WRITE)
00349         return -1;
00350     return cp_priority_list_unlock_internal(list);
00351 }
00352 
00353 /* get the current collection mode */
00354 int cp_priority_list_get_mode(cp_priority_list *list)
00355 {
00356     return list->mode;
00357 }
00358 
00359 /* set mode bits on the list mode indicator */
00360 int cp_priority_list_set_mode(cp_priority_list *list, int mode)
00361 {
00362     int nosync;
00363 
00364     /* can't set NOSYNC in the middle of a transaction */
00365     if ((list->mode & COLLECTION_MODE_IN_TRANSACTION) && 
00366         (mode & COLLECTION_MODE_NOSYNC)) return EINVAL;
00367     
00368     nosync = list->mode & COLLECTION_MODE_NOSYNC;
00369     if (!nosync)
00370         if (cp_priority_list_txlock(list, COLLECTION_LOCK_WRITE))
00371             return -1;
00372 
00373     list->mode |= mode;
00374 
00375     if (!nosync)
00376         cp_priority_list_txunlock(list);
00377 
00378     return 0;
00379 }
00380 
00381 /* unset mode bits on the list mode indicator. if unsetting 
00382  * COLLECTION_MODE_NOSYNC and the list was not previously synchronized, the 
00383  * internal synchronization structure is initalized.
00384  */
00385 int cp_priority_list_unset_mode(cp_priority_list *list, int mode)
00386 {
00387     int nosync = list->mode & COLLECTION_MODE_NOSYNC;
00388 
00389     if (!nosync)
00390         if (cp_priority_list_txlock(list, COLLECTION_LOCK_WRITE))
00391             return -1;
00392     
00393     /* handle the special case of unsetting COLLECTION_MODE_NOSYNC */
00394     if ((mode & COLLECTION_MODE_NOSYNC) && list->lock == NULL)
00395     {
00396         /* list can't be locked in this case, no need to unlock on failure */
00397         if ((list->lock = malloc(sizeof(cp_lock))) == NULL)
00398             return -1; 
00399         if (cp_lock_init(list->lock, NULL))
00400             return -1;
00401     }
00402     
00403     /* unset specified bits */
00404     list->mode &= list->mode ^ mode;
00405     if (!nosync)
00406         cp_priority_list_txunlock(list);
00407 
00408     return 0;
00409 }
00410 
00411 
00412 long cp_priority_list_item_count(cp_priority_list *list)
00413 {
00414     return list->item_count;
00415 }
00416 
00419 static void cp_priority_list_destroy_internal(cp_priority_list *list)
00420 {
00421     int i;
00422 
00423     if (list->weight) free(list->weight);
00424     if (list->immediate) cp_list_destroy(list->immediate);
00425     if (list->normal) 
00426     {
00427         for (i = 0; i < list->normal_priority_queues; i++)
00428         {
00429             if (list->normal[i] == NULL) break;
00430             cp_list_destroy(list->normal[i]);
00431         }
00432         free(list->normal);
00433     }
00434     if (list->lock) 
00435     {
00436         cp_lock_destroy(list->lock);
00437         free(list->lock);
00438     }
00439     free(list);
00440 }
00441 

Generated on Mon Dec 5 23:00:22 2011 for cprops by  doxygen 1.4.7