summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/ChangeLog21
-rw-r--r--lib/workqueue.c83
-rw-r--r--lib/workqueue.h30
3 files changed, 114 insertions, 20 deletions
diff --git a/lib/ChangeLog b/lib/ChangeLog
index 3268ab99..8e5b7c3b 100644
--- a/lib/ChangeLog
+++ b/lib/ChangeLog
@@ -9,6 +9,27 @@
from VTY_GET_INTEGER_RANGE
* vty.h: fix the VTY_GET macros, do {..} while(0) so they have
correct function like syntax in usage.
+ * workqueue.h: Add a WQ_QUEUE_BLOCKED item_status return code,
+ to allow a queue function to indicate the queue is not
+ ready/blocked - rather than any problem with the item at hand.
+ Add a notion of being able to 'plug' and 'unplug' a queue.
+ Add helpers to plug/unplug a queue.
+ Add a completion callback, to be called when a queue is emptied.
+ * workqueue.c: (work_queue_new) remove useless list_free.
+ (work_queue_schedule) new internal helper function to schedule
+ queue, if appropriate.
+ (work_queue_add) use work_queue_schedule
+ (show_work_queues) Print 'P' if queue is plugged.
+ (work_queue_plug) new API function, plug a queue - ie prevent it
+ from 'drained' / processed / scheduled.
+ (work_queue_unplug) unplug a queue, allowing it to be drained
+ / scheduled / processed again.
+ (work_queue_run) Add support for WQ_QUEUE_BLOCKED.
+ Add comment for RETRY_NOW case.
+ Make hysteris more aggresive in ramping up granularity, improves
+ performance significantly.
+ Add support for calling completion callback when queue is emptied,
+ possibly useful for knowing when to unplug a queue.
2005-05-19 Paul Jakma <paul@dishone.st>
diff --git a/lib/workqueue.c b/lib/workqueue.c
index fc61d680..bac41302 100644
--- a/lib/workqueue.c
+++ b/lib/workqueue.c
@@ -69,9 +69,6 @@ work_queue_new (struct thread_master *m, const char *queue_name)
if ( (new->items = list_new ()) == NULL)
{
- if (new->items)
- list_free (new->items);
-
XFREE (MTYPE_WORK_QUEUE_NAME, new->name);
XFREE (MTYPE_WORK_QUEUE, new);
@@ -99,6 +96,22 @@ work_queue_free (struct work_queue *wq)
return;
}
+static inline int
+work_queue_schedule (struct work_queue *wq, unsigned int delay)
+{
+ /* if appropriate, schedule work queue thread */
+ if ( (wq->flags == WQ_UNPLUGGED)
+ && (wq->thread == NULL)
+ && (listcount (wq->items) > 0) )
+ {
+ wq->thread = thread_add_background (wq->master, work_queue_run,
+ wq, delay);
+ return 1;
+ }
+ else
+ return 0;
+}
+
void
work_queue_add (struct work_queue *wq, void *data)
{
@@ -115,12 +128,7 @@ work_queue_add (struct work_queue *wq, void *data)
item->data = data;
listnode_add (wq->items, item);
- /* if thread isnt already waiting, add one */
- if (wq->thread == NULL)
- wq->thread = thread_add_background (wq->master, work_queue_run,
- wq, wq->spec.hold);
-
- /* XXX: what if we didnt get a thread? try again? */
+ work_queue_schedule (wq, wq->spec.hold);
return;
}
@@ -159,11 +167,12 @@ DEFUN(show_work_queues,
struct work_queue *wq;
vty_out (vty,
- "%8s %11s %8s %21s%s",
- "List","(ms) ","Q. Runs","Cycle Counts ",
+ "%c %8s %11s %8s %21s%s",
+ ' ', "List","(ms) ","Q. Runs","Cycle Counts ",
VTY_NEWLINE);
vty_out (vty,
- "%8s %5s %5s %8s %7s %6s %6s %s%s",
+ "%c %8s %5s %5s %8s %7s %6s %6s %s%s",
+ ' ',
"Items",
"Delay","Hold",
"Total",
@@ -173,7 +182,8 @@ DEFUN(show_work_queues,
for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq))
{
- vty_out (vty,"%8d %5d %5d %8ld %7d %6d %6u %s%s",
+ vty_out (vty,"%c %8d %5d %5d %8ld %7d %6d %6u %s%s",
+ (wq->flags == WQ_PLUGGED ? 'P' : ' '),
listcount (wq->items),
wq->spec.delay, wq->spec.hold,
wq->runs,
@@ -187,6 +197,32 @@ DEFUN(show_work_queues,
return CMD_SUCCESS;
}
+/* 'plug' a queue: Stop it from being scheduled,
+ * ie: prevent the queue from draining.
+ */
+void
+work_queue_plug (struct work_queue *wq)
+{
+ if (wq->thread)
+ thread_cancel (wq->thread);
+
+ wq->thread = NULL;
+
+ wq->flags = WQ_PLUGGED;
+}
+
+/* unplug queue, schedule it again, if appropriate
+ * Ie: Allow the queue to be drained again
+ */
+void
+work_queue_unplug (struct work_queue *wq)
+{
+ wq->flags = WQ_UNPLUGGED;
+
+ /* if thread isnt already waiting, add one */
+ work_queue_schedule (wq, wq->spec.hold);
+}
+
/* timer thread to process a work queue
* will reschedule itself if required,
* otherwise work_queue_item_add
@@ -250,6 +286,13 @@ work_queue_run (struct thread *thread)
switch (ret)
{
+ case WQ_QUEUE_BLOCKED:
+ {
+ /* decrement item->ran again, cause this isn't an item
+ * specific error, and fall through to WQ_RETRY_LATER
+ */
+ item->ran--;
+ }
case WQ_RETRY_LATER:
{
goto stats;
@@ -260,6 +303,7 @@ work_queue_run (struct thread *thread)
break;
}
case WQ_RETRY_NOW:
+ /* a RETRY_NOW that gets here has exceeded max_tries, same as ERROR */
case WQ_ERROR:
{
if (wq->spec.errorfunc)
@@ -303,7 +347,9 @@ stats:
wq->cycles.best = cycles;
/* along with yielded check, provides hysteris for granularity */
- if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR))
+ if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR * 2))
+ wq->cycles.granularity *= WQ_HYSTERIS_FACTOR; /* quick ramp-up */
+ else if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR))
wq->cycles.granularity += WQ_HYSTERIS_FACTOR;
}
#undef WQ_HYSTERIS_FACTOR
@@ -316,10 +362,11 @@ stats:
__func__, cycles, wq->cycles.best, wq->cycles.granularity);
#endif
- /* Is the queue done yet? */
+ /* Is the queue done yet? If it is, call the completion callback. */
if (listcount (wq->items) > 0)
- wq->thread = thread_add_background (wq->master, work_queue_run, wq,
- wq->spec.delay);
-
+ work_queue_schedule (wq, wq->spec.delay);
+ else if (wq->spec.completion_func)
+ wq->spec.completion_func (wq);
+
return 0;
}
diff --git a/lib/workqueue.h b/lib/workqueue.h
index 257667e2..626d8e6c 100644
--- a/lib/workqueue.h
+++ b/lib/workqueue.h
@@ -35,7 +35,10 @@ typedef enum
WQ_ERROR, /* Error, run error handler if provided */
WQ_RETRY_NOW, /* retry immediately */
WQ_RETRY_LATER, /* retry later, cease processing work queue */
- WQ_REQUEUE /* requeue item, continue processing work queue */
+ WQ_REQUEUE, /* requeue item, continue processing work queue */
+ WQ_QUEUE_BLOCKED, /* Queue cant be processed at this time.
+ * Similar to WQ_RETRY_LATER, but doesn't penalise
+ * the particular item.. */
} wq_item_status;
/* A single work queue item, unsurprisingly */
@@ -45,11 +48,18 @@ struct work_queue_item
unsigned short ran; /* # of times item has been run */
};
+enum work_queue_flags
+{
+ WQ_UNPLUGGED = 0,
+ WQ_PLUGGED = 1,
+};
+
struct work_queue
{
struct thread_master *master; /* thread master */
struct thread *thread; /* thread, if one is active */
char *name; /* work queue name */
+ enum work_queue_flags flags; /* flags */
/* specification for this work queue */
struct {
@@ -62,6 +72,9 @@ struct work_queue
/* callback to delete user specific item data */
void (*del_item_data) (void *);
+ /* completion callback, called when queue is emptied, optional */
+ void (*completion_func) (struct work_queue *);
+
/* max number of retries to make for item that errors */
unsigned int max_retries;
@@ -71,7 +84,7 @@ struct work_queue
/* remaining fields should be opaque to users */
struct list *items; /* queue item list */
- unsigned long runs; /* runs count */
+ unsigned long runs; /* runs count */
struct {
unsigned int best;
@@ -81,11 +94,24 @@ struct work_queue
};
/* User API */
+
+/* create a new work queue, of given name.
+ * user must fill in the spec of the returned work queue before adding
+ * anything to it
+ */
extern struct work_queue *work_queue_new (struct thread_master *,
const char *);
+/* destroy work queue */
extern void work_queue_free (struct work_queue *);
+
+/* Add the supplied data as an item onto the workqueue */
extern void work_queue_add (struct work_queue *, void *);
+/* plug the queue, ie prevent it from being drained / processed */
+extern void work_queue_plug (struct work_queue *wq);
+/* unplug the queue, allow it to be drained again */
+extern void work_queue_unplug (struct work_queue *wq);
+
/* Helpers, exported for thread.c and command.c */
extern int work_queue_run (struct thread *);
extern struct cmd_element show_work_queues_cmd;