diff options
author | paul <paul> | 2005-04-25 16:26:42 +0000 |
---|---|---|
committer | paul <paul> | 2005-04-25 16:26:42 +0000 |
commit | 354d119a6569b2c6335ae9309e4606e5cccedb6a (patch) | |
tree | e29bb213c52bb46b3454f845fa5892df306b82e2 /lib/workqueue.c | |
parent | b64d92a8a88e69f711976a3c12c17667ecaba4ee (diff) |
2005-04-25 Paul Jakma <paul.jakma@sun.com>
* workqueue.{c,h}: Helper API for setting up and running queues via
background threads.
* command.c: install the 'show workqueues' command
* memtypes.c: Add work queue mtypes, and a rib-queue type for
a zebra rib work queue.
* memtypes.h: Updated to match memtypes.c
* Makefile.am: Add new workqueue files to build.
Diffstat (limited to 'lib/workqueue.c')
-rw-r--r-- | lib/workqueue.c | 329 |
1 files changed, 329 insertions, 0 deletions
diff --git a/lib/workqueue.c b/lib/workqueue.c new file mode 100644 index 00000000..0c9592d2 --- /dev/null +++ b/lib/workqueue.c @@ -0,0 +1,329 @@ +/* + * Quagga Work Queue Support. + * + * Copyright (C) 2005 Sun Microsystems, Inc. + * + * This file is part of GNU Zebra. + * + * Quagga is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * Quagga is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Quagga; see the file COPYING. If not, write to the Free + * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA + * 02111-1307, USA. + */ + +#include <lib/zebra.h> +#include "thread.h" +#include "memory.h" +#include "workqueue.h" +#include "linklist.h" +#include "command.h" +#include "log.h" + +/* master list of work_queues */ +static struct list work_queues; + +#define WORK_QUEUE_MIN_GRANULARITY 1 + +static struct work_queue_item * +work_queue_item_new (struct work_queue *wq) +{ + struct work_queue_item *item; + assert (wq); + + item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, + sizeof (struct work_queue_item)); + + return item; +} + +static void +work_queue_item_free (struct work_queue_item *item) +{ + XFREE (MTYPE_WORK_QUEUE_ITEM, item); + return; +} + +/* create new work queue */ +struct work_queue * +work_queue_new (struct thread_master *m, const char *queue_name) +{ + struct work_queue *new; + + new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue)); + + if (new == NULL) + return new; + + new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name); + new->master = m; + + if ( (new->items = list_new ()) == NULL) + { + if (new->items) + list_free (new->items); + + XFREE (MTYPE_WORK_QUEUE_NAME, new->name); + XFREE (MTYPE_WORK_QUEUE, new); + + return NULL; + } + + new->items->del = (void (*)(void *)) work_queue_item_free; + + listnode_add (&work_queues, new); + + new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; + + return new; +} + +void +work_queue_free (struct work_queue *wq) +{ + /* list_delete frees items via callback */ + list_delete (wq->items); + listnode_delete (&work_queues, wq); + + XFREE (MTYPE_WORK_QUEUE_NAME, wq->name); + XFREE (MTYPE_WORK_QUEUE, wq); + return; +} + +void +work_queue_add (struct work_queue *wq, void *data) +{ + struct work_queue_item *item; + + assert (wq); + + if (!(item = work_queue_item_new (wq))) + { + zlog_err ("%s: unable to get new queue item", __func__); + return; + } + + item->data = data; + listnode_add (wq->items, item); + + /* if thread isnt already waiting, add one */ + if (wq->thread == NULL) + wq->thread = thread_add_background (wq->master, work_queue_run, + wq, wq->spec.hold); + + /* XXX: what if we didnt get a thread? try again? */ + + return; +} + +static void +work_queue_item_remove (struct work_queue *wq, struct listnode *ln) +{ + struct work_queue_item *item = listgetdata (ln); + + assert (item && item->data); + + /* call private data deletion callback if needed */ + if (wq->spec.del_item_data) + wq->spec.del_item_data (item->data); + + list_delete_node (wq->items, ln); + work_queue_item_free (item); + + return; +} + +static void +work_queue_item_requeue (struct work_queue *wq, struct listnode *ln) +{ + LISTNODE_DETACH (wq->items, ln); + LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */ +} + +DEFUN(show_work_queues, + show_work_queues_cmd, + "show work-queues", + SHOW_STR + "Work Queue information\n") +{ + struct listnode *node; + struct work_queue *wq; + struct timeval tvnow; + + gettimeofday (&tvnow, NULL); + + vty_out (vty, + "%8s %11s %8s %21s%s", + "List","(ms) ","Q. Runs","Cycle Counts ", + VTY_NEWLINE); + vty_out (vty, + "%8s %5s %5s %8s %7s %6s %6s %s%s", + "Items", + "Delay","Hold", + "Total", + "Best","Gran.","Avg.", + "Name", + VTY_NEWLINE); + + for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq)) + { + vty_out (vty,"%8d %5d %5d %8ld %7d %6d %6u %s%s", + listcount (wq->items), + wq->spec.delay, wq->spec.hold, + wq->runs, + wq->cycles.best, wq->cycles.granularity, + (unsigned int)(wq->cycles.total / wq->runs), + wq->name, + VTY_NEWLINE); + } + + return CMD_SUCCESS; +} + +/* timer thread to process a work queue + * will reschedule itself if required, + * otherwise work_queue_item_add + */ +int +work_queue_run (struct thread *thread) +{ + struct work_queue *wq; + struct work_queue_item *item; + wq_item_status ret; + unsigned int cycles = 0; + struct listnode *node, *nnode; + char yielded = 0; + + wq = THREAD_ARG (thread); + wq->thread = NULL; + + assert (wq && wq->items); + + /* calculate cycle granularity: + * list iteration == 1 cycle + * granularity == # cycles between checks whether we should yield. + * + * granularity should be > 0, and can increase slowly after each run to + * provide some hysteris, but not past cycles.best or 2*cycles. + * + * Best: starts low, can only increase + * + * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased if we run to end of time + * slot, can increase otherwise by a small factor. + * + * We could use just the average and save some work, however we want to be + * able to adjust quickly to CPU pressure. Average wont shift much if + * daemon has been running a long time. + */ + if (wq->cycles.granularity == 0) + wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; + + for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item)) + { + assert (item && item->data); + + /* dont run items which are past their allowed retries */ + if (item->retry_count >= wq->spec.max_retries) + { + /* run error handler, if any */ + if (wq->spec.errorfunc) + wq->spec.errorfunc (wq, item->data); + work_queue_item_remove (wq, node); + continue; + } + + /* run and take care of items that want to be retried immediately */ + do + { + ret = wq->spec.workfunc (item->data); + item->retry_count++; + } + while ((ret == WQ_RETRY_NOW) + && (item->retry_count < wq->spec.max_retries)); + + switch (ret) + { + case WQ_RETRY_LATER: + { + item->retry_count++; + goto stats; + } + case WQ_REQUEUE: + { + item->retry_count++; + work_queue_item_requeue (wq, node); + break; + } + case WQ_RETRY_NOW: + case WQ_ERROR: + { + if (wq->spec.errorfunc) + wq->spec.errorfunc (wq, item); + } + /* fall through here is deliberate */ + case WQ_SUCCESS: + default: + { + work_queue_item_remove (wq, node); + break; + } + } + + /* completed cycle */ + cycles++; + + /* test if we should yield */ + if ( !(cycles % wq->cycles.granularity) + && thread_should_yield (thread)) + { + yielded = 1; + goto stats; + } + } + +stats: + +#define WQ_HYSTERIS_FACTOR 2 + + /* we yielded, check whether granularity should be reduced */ + if (yielded && (cycles < wq->cycles.granularity)) + { + wq->cycles.granularity = ((cycles > 0) ? cycles + : WORK_QUEUE_MIN_GRANULARITY); + } + + if (cycles > (wq->cycles.granularity)) + { + if (cycles > wq->cycles.best) + wq->cycles.best = cycles; + + /* along with yielded check, provides hysteris for granularity */ + if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR)) + wq->cycles.granularity += WQ_HYSTERIS_FACTOR; + } +#undef WQ_HYSTERIS_FACTOR + + wq->runs++; + wq->cycles.total += cycles; + +#if 0 + printf ("%s: cycles %d, new: best %d, worst %d\n", + __func__, cycles, wq->cycles.best, wq->cycles.granularity); +#endif + + /* Is the queue done yet? */ + if (listcount (wq->items) > 0) + wq->thread = thread_add_background (wq->master, work_queue_run, wq, + wq->spec.delay); + + return 0; +} |