diff options
-rw-r--r-- | lib/ChangeLog | 21 | ||||
-rw-r--r-- | lib/workqueue.c | 83 | ||||
-rw-r--r-- | lib/workqueue.h | 30 |
3 files changed, 114 insertions, 20 deletions
diff --git a/lib/ChangeLog b/lib/ChangeLog index 3268ab99..8e5b7c3b 100644 --- a/lib/ChangeLog +++ b/lib/ChangeLog @@ -9,6 +9,27 @@ from VTY_GET_INTEGER_RANGE * vty.h: fix the VTY_GET macros, do {..} while(0) so they have correct function like syntax in usage. + * workqueue.h: Add a WQ_QUEUE_BLOCKED item_status return code, + to allow a queue function to indicate the queue is not + ready/blocked - rather than any problem with the item at hand. + Add a notion of being able to 'plug' and 'unplug' a queue. + Add helpers to plug/unplug a queue. + Add a completion callback, to be called when a queue is emptied. + * workqueue.c: (work_queue_new) remove useless list_free. + (work_queue_schedule) new internal helper function to schedule + queue, if appropriate. + (work_queue_add) use work_queue_schedule + (show_work_queues) Print 'P' if queue is plugged. + (work_queue_plug) new API function, plug a queue - ie prevent it + from 'drained' / processed / scheduled. + (work_queue_unplug) unplug a queue, allowing it to be drained + / scheduled / processed again. + (work_queue_run) Add support for WQ_QUEUE_BLOCKED. + Add comment for RETRY_NOW case. + Make hysteris more aggresive in ramping up granularity, improves + performance significantly. + Add support for calling completion callback when queue is emptied, + possibly useful for knowing when to unplug a queue. 2005-05-19 Paul Jakma <paul@dishone.st> diff --git a/lib/workqueue.c b/lib/workqueue.c index fc61d680..bac41302 100644 --- a/lib/workqueue.c +++ b/lib/workqueue.c @@ -69,9 +69,6 @@ work_queue_new (struct thread_master *m, const char *queue_name) if ( (new->items = list_new ()) == NULL) { - if (new->items) - list_free (new->items); - XFREE (MTYPE_WORK_QUEUE_NAME, new->name); XFREE (MTYPE_WORK_QUEUE, new); @@ -99,6 +96,22 @@ work_queue_free (struct work_queue *wq) return; } +static inline int +work_queue_schedule (struct work_queue *wq, unsigned int delay) +{ + /* if appropriate, schedule work queue thread */ + if ( (wq->flags == WQ_UNPLUGGED) + && (wq->thread == NULL) + && (listcount (wq->items) > 0) ) + { + wq->thread = thread_add_background (wq->master, work_queue_run, + wq, delay); + return 1; + } + else + return 0; +} + void work_queue_add (struct work_queue *wq, void *data) { @@ -115,12 +128,7 @@ work_queue_add (struct work_queue *wq, void *data) item->data = data; listnode_add (wq->items, item); - /* if thread isnt already waiting, add one */ - if (wq->thread == NULL) - wq->thread = thread_add_background (wq->master, work_queue_run, - wq, wq->spec.hold); - - /* XXX: what if we didnt get a thread? try again? */ + work_queue_schedule (wq, wq->spec.hold); return; } @@ -159,11 +167,12 @@ DEFUN(show_work_queues, struct work_queue *wq; vty_out (vty, - "%8s %11s %8s %21s%s", - "List","(ms) ","Q. Runs","Cycle Counts ", + "%c %8s %11s %8s %21s%s", + ' ', "List","(ms) ","Q. Runs","Cycle Counts ", VTY_NEWLINE); vty_out (vty, - "%8s %5s %5s %8s %7s %6s %6s %s%s", + "%c %8s %5s %5s %8s %7s %6s %6s %s%s", + ' ', "Items", "Delay","Hold", "Total", @@ -173,7 +182,8 @@ DEFUN(show_work_queues, for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq)) { - vty_out (vty,"%8d %5d %5d %8ld %7d %6d %6u %s%s", + vty_out (vty,"%c %8d %5d %5d %8ld %7d %6d %6u %s%s", + (wq->flags == WQ_PLUGGED ? 'P' : ' '), listcount (wq->items), wq->spec.delay, wq->spec.hold, wq->runs, @@ -187,6 +197,32 @@ DEFUN(show_work_queues, return CMD_SUCCESS; } +/* 'plug' a queue: Stop it from being scheduled, + * ie: prevent the queue from draining. + */ +void +work_queue_plug (struct work_queue *wq) +{ + if (wq->thread) + thread_cancel (wq->thread); + + wq->thread = NULL; + + wq->flags = WQ_PLUGGED; +} + +/* unplug queue, schedule it again, if appropriate + * Ie: Allow the queue to be drained again + */ +void +work_queue_unplug (struct work_queue *wq) +{ + wq->flags = WQ_UNPLUGGED; + + /* if thread isnt already waiting, add one */ + work_queue_schedule (wq, wq->spec.hold); +} + /* timer thread to process a work queue * will reschedule itself if required, * otherwise work_queue_item_add @@ -250,6 +286,13 @@ work_queue_run (struct thread *thread) switch (ret) { + case WQ_QUEUE_BLOCKED: + { + /* decrement item->ran again, cause this isn't an item + * specific error, and fall through to WQ_RETRY_LATER + */ + item->ran--; + } case WQ_RETRY_LATER: { goto stats; @@ -260,6 +303,7 @@ work_queue_run (struct thread *thread) break; } case WQ_RETRY_NOW: + /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */ case WQ_ERROR: { if (wq->spec.errorfunc) @@ -303,7 +347,9 @@ stats: wq->cycles.best = cycles; /* along with yielded check, provides hysteris for granularity */ - if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR)) + if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR * 2)) + wq->cycles.granularity *= WQ_HYSTERIS_FACTOR; /* quick ramp-up */ + else if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR)) wq->cycles.granularity += WQ_HYSTERIS_FACTOR; } #undef WQ_HYSTERIS_FACTOR @@ -316,10 +362,11 @@ stats: __func__, cycles, wq->cycles.best, wq->cycles.granularity); #endif - /* Is the queue done yet? */ + /* Is the queue done yet? If it is, call the completion callback. */ if (listcount (wq->items) > 0) - wq->thread = thread_add_background (wq->master, work_queue_run, wq, - wq->spec.delay); - + work_queue_schedule (wq, wq->spec.delay); + else if (wq->spec.completion_func) + wq->spec.completion_func (wq); + return 0; } diff --git a/lib/workqueue.h b/lib/workqueue.h index 257667e2..626d8e6c 100644 --- a/lib/workqueue.h +++ b/lib/workqueue.h @@ -35,7 +35,10 @@ typedef enum WQ_ERROR, /* Error, run error handler if provided */ WQ_RETRY_NOW, /* retry immediately */ WQ_RETRY_LATER, /* retry later, cease processing work queue */ - WQ_REQUEUE /* requeue item, continue processing work queue */ + WQ_REQUEUE, /* requeue item, continue processing work queue */ + WQ_QUEUE_BLOCKED, /* Queue cant be processed at this time. + * Similar to WQ_RETRY_LATER, but doesn't penalise + * the particular item.. */ } wq_item_status; /* A single work queue item, unsurprisingly */ @@ -45,11 +48,18 @@ struct work_queue_item unsigned short ran; /* # of times item has been run */ }; +enum work_queue_flags +{ + WQ_UNPLUGGED = 0, + WQ_PLUGGED = 1, +}; + struct work_queue { struct thread_master *master; /* thread master */ struct thread *thread; /* thread, if one is active */ char *name; /* work queue name */ + enum work_queue_flags flags; /* flags */ /* specification for this work queue */ struct { @@ -62,6 +72,9 @@ struct work_queue /* callback to delete user specific item data */ void (*del_item_data) (void *); + /* completion callback, called when queue is emptied, optional */ + void (*completion_func) (struct work_queue *); + /* max number of retries to make for item that errors */ unsigned int max_retries; @@ -71,7 +84,7 @@ struct work_queue /* remaining fields should be opaque to users */ struct list *items; /* queue item list */ - unsigned long runs; /* runs count */ + unsigned long runs; /* runs count */ struct { unsigned int best; @@ -81,11 +94,24 @@ struct work_queue }; /* User API */ + +/* create a new work queue, of given name. + * user must fill in the spec of the returned work queue before adding + * anything to it + */ extern struct work_queue *work_queue_new (struct thread_master *, const char *); +/* destroy work queue */ extern void work_queue_free (struct work_queue *); + +/* Add the supplied data as an item onto the workqueue */ extern void work_queue_add (struct work_queue *, void *); +/* plug the queue, ie prevent it from being drained / processed */ +extern void work_queue_plug (struct work_queue *wq); +/* unplug the queue, allow it to be drained again */ +extern void work_queue_unplug (struct work_queue *wq); + /* Helpers, exported for thread.c and command.c */ extern int work_queue_run (struct thread *); extern struct cmd_element show_work_queues_cmd; |