diff options
-rw-r--r-- | lib/Makefile.am | 5 | ||||
-rw-r--r-- | lib/command.c | 9 | ||||
-rw-r--r-- | lib/memtypes.c | 6 | ||||
-rw-r--r-- | lib/memtypes.h | 5 | ||||
-rw-r--r-- | lib/workqueue.c | 329 | ||||
-rw-r--r-- | lib/workqueue.h | 91 |
6 files changed, 439 insertions, 6 deletions
diff --git a/lib/Makefile.am b/lib/Makefile.am index 03ca5e12..3763c8b4 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -12,7 +12,7 @@ libzebra_la_SOURCES = \ sockunion.c prefix.c thread.c if.c memory.c buffer.c table.c hash.c \ filter.c routemap.c distribute.c stream.c str.c log.c plist.c \ zclient.c sockopt.c smux.c md5.c if_rmap.c keychain.c privs.c \ - sigevent.c pqueue.c jhash.c memtypes.c + sigevent.c pqueue.c jhash.c memtypes.c workqueue.c BUILT_SOURCES = memtypes.h @@ -25,7 +25,8 @@ pkginclude_HEADERS = \ memory.h network.h prefix.h routemap.h distribute.h sockunion.h \ str.h stream.h table.h thread.h vector.h version.h vty.h zebra.h \ plist.h zclient.h sockopt.h smux.h md5-gnu.h if_rmap.h keychain.h \ - privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h + privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h \ + privs.h sigevent.h pqueue.h jhash.h zassert.h memtypes.h workqueue.h EXTRA_DIST = regex.c regex-gnu.h memtypes.awk diff --git a/lib/command.c b/lib/command.c index 64118103..9b5f75f2 100644 --- a/lib/command.c +++ b/lib/command.c @@ -1,5 +1,5 @@ /* - $Id: command.c,v 1.46 2005/03/14 20:19:01 paul Exp $ + $Id: command.c,v 1.47 2005/04/25 16:26:42 paul Exp $ Command interpreter routine for virtual terminal [aka TeletYpe] Copyright (C) 1997, 98, 99 Kunihiro Ishiguro @@ -31,6 +31,7 @@ Boston, MA 02111-1307, USA. */ #include "vector.h" #include "vty.h" #include "command.h" +#include "workqueue.h" /* Command vector which includes some level of command lists. Normally each daemon maintains each own cmdvec. */ @@ -3578,8 +3579,10 @@ cmd_init (int terminal) install_element (CONFIG_NODE, &service_terminal_length_cmd); install_element (CONFIG_NODE, &no_service_terminal_length_cmd); - install_element(VIEW_NODE, &show_thread_cpu_cmd); - install_element(ENABLE_NODE, &show_thread_cpu_cmd); + install_element (VIEW_NODE, &show_thread_cpu_cmd); + install_element (ENABLE_NODE, &show_thread_cpu_cmd); + install_element (VIEW_NODE, &show_work_queues_cmd); + install_element (ENABLE_NODE, &show_work_queues_cmd); } srand(time(NULL)); } diff --git a/lib/memtypes.c b/lib/memtypes.c index 7caa42a1..7865f544 100644 --- a/lib/memtypes.c +++ b/lib/memtypes.c @@ -6,7 +6,7 @@ * The script is sensitive to the format (though not whitespace), see * the top of memtypes.awk for more details. * - * $Id: memtypes.c,v 1.3 2005/04/25 14:02:44 paul Exp $ + * $Id: memtypes.c,v 1.4 2005/04/25 16:26:43 paul Exp $ */ #include "zebra.h" @@ -64,6 +64,9 @@ struct memory_list memory_list_lib[] = { MTYPE_PRIVS, "Privilege information" }, { MTYPE_ZLOG, "Logging" }, { MTYPE_ZCLIENT, "Zclient" }, + { MTYPE_WORK_QUEUE, "Work queue" }, + { MTYPE_WORK_QUEUE_ITEM, "Work queue item" }, + { MTYPE_WORK_QUEUE_NAME, "Work queue name string" }, { -1, NULL }, }; @@ -74,6 +77,7 @@ struct memory_list memory_list_zebra[] = { MTYPE_VRF_NAME, "VRF name" }, { MTYPE_NEXTHOP, "Nexthop" }, { MTYPE_RIB, "RIB" }, + { MTYPE_RIB_QUEUE, "RIB process work queue" }, { MTYPE_STATIC_IPV4, "Static IPv4 route" }, { MTYPE_STATIC_IPV6, "Static IPv6 route" }, { -1, NULL }, diff --git a/lib/memtypes.h b/lib/memtypes.h index 2d843c5b..b1ca6f6a 100644 --- a/lib/memtypes.h +++ b/lib/memtypes.h @@ -56,11 +56,16 @@ enum MTYPE_PRIVS, MTYPE_ZLOG, MTYPE_ZCLIENT, + MTYPE_WORK_QUEUE, + MTYPE_WORK_QUEUE_ITEM, + MTYPE_WORK_QUEUE_NAME, + MTYPE_WORK_QUEUE_SPEC, MTYPE_RTADV_PREFIX, MTYPE_VRF, MTYPE_VRF_NAME, MTYPE_NEXTHOP, MTYPE_RIB, + MTYPE_RIB_QUEUE, MTYPE_STATIC_IPV4, MTYPE_STATIC_IPV6, MTYPE_BGP, diff --git a/lib/workqueue.c b/lib/workqueue.c new file mode 100644 index 00000000..0c9592d2 --- /dev/null +++ b/lib/workqueue.c @@ -0,0 +1,329 @@ +/* + * Quagga Work Queue Support. + * + * Copyright (C) 2005 Sun Microsystems, Inc. + * + * This file is part of GNU Zebra. + * + * Quagga is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * Quagga is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Quagga; see the file COPYING. If not, write to the Free + * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA + * 02111-1307, USA. + */ + +#include <lib/zebra.h> +#include "thread.h" +#include "memory.h" +#include "workqueue.h" +#include "linklist.h" +#include "command.h" +#include "log.h" + +/* master list of work_queues */ +static struct list work_queues; + +#define WORK_QUEUE_MIN_GRANULARITY 1 + +static struct work_queue_item * +work_queue_item_new (struct work_queue *wq) +{ + struct work_queue_item *item; + assert (wq); + + item = XCALLOC (MTYPE_WORK_QUEUE_ITEM, + sizeof (struct work_queue_item)); + + return item; +} + +static void +work_queue_item_free (struct work_queue_item *item) +{ + XFREE (MTYPE_WORK_QUEUE_ITEM, item); + return; +} + +/* create new work queue */ +struct work_queue * +work_queue_new (struct thread_master *m, const char *queue_name) +{ + struct work_queue *new; + + new = XCALLOC (MTYPE_WORK_QUEUE, sizeof (struct work_queue)); + + if (new == NULL) + return new; + + new->name = XSTRDUP (MTYPE_WORK_QUEUE_NAME, queue_name); + new->master = m; + + if ( (new->items = list_new ()) == NULL) + { + if (new->items) + list_free (new->items); + + XFREE (MTYPE_WORK_QUEUE_NAME, new->name); + XFREE (MTYPE_WORK_QUEUE, new); + + return NULL; + } + + new->items->del = (void (*)(void *)) work_queue_item_free; + + listnode_add (&work_queues, new); + + new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; + + return new; +} + +void +work_queue_free (struct work_queue *wq) +{ + /* list_delete frees items via callback */ + list_delete (wq->items); + listnode_delete (&work_queues, wq); + + XFREE (MTYPE_WORK_QUEUE_NAME, wq->name); + XFREE (MTYPE_WORK_QUEUE, wq); + return; +} + +void +work_queue_add (struct work_queue *wq, void *data) +{ + struct work_queue_item *item; + + assert (wq); + + if (!(item = work_queue_item_new (wq))) + { + zlog_err ("%s: unable to get new queue item", __func__); + return; + } + + item->data = data; + listnode_add (wq->items, item); + + /* if thread isnt already waiting, add one */ + if (wq->thread == NULL) + wq->thread = thread_add_background (wq->master, work_queue_run, + wq, wq->spec.hold); + + /* XXX: what if we didnt get a thread? try again? */ + + return; +} + +static void +work_queue_item_remove (struct work_queue *wq, struct listnode *ln) +{ + struct work_queue_item *item = listgetdata (ln); + + assert (item && item->data); + + /* call private data deletion callback if needed */ + if (wq->spec.del_item_data) + wq->spec.del_item_data (item->data); + + list_delete_node (wq->items, ln); + work_queue_item_free (item); + + return; +} + +static void +work_queue_item_requeue (struct work_queue *wq, struct listnode *ln) +{ + LISTNODE_DETACH (wq->items, ln); + LISTNODE_ATTACH (wq->items, ln); /* attach to end of list */ +} + +DEFUN(show_work_queues, + show_work_queues_cmd, + "show work-queues", + SHOW_STR + "Work Queue information\n") +{ + struct listnode *node; + struct work_queue *wq; + struct timeval tvnow; + + gettimeofday (&tvnow, NULL); + + vty_out (vty, + "%8s %11s %8s %21s%s", + "List","(ms) ","Q. Runs","Cycle Counts ", + VTY_NEWLINE); + vty_out (vty, + "%8s %5s %5s %8s %7s %6s %6s %s%s", + "Items", + "Delay","Hold", + "Total", + "Best","Gran.","Avg.", + "Name", + VTY_NEWLINE); + + for (ALL_LIST_ELEMENTS_RO ((&work_queues), node, wq)) + { + vty_out (vty,"%8d %5d %5d %8ld %7d %6d %6u %s%s", + listcount (wq->items), + wq->spec.delay, wq->spec.hold, + wq->runs, + wq->cycles.best, wq->cycles.granularity, + (unsigned int)(wq->cycles.total / wq->runs), + wq->name, + VTY_NEWLINE); + } + + return CMD_SUCCESS; +} + +/* timer thread to process a work queue + * will reschedule itself if required, + * otherwise work_queue_item_add + */ +int +work_queue_run (struct thread *thread) +{ + struct work_queue *wq; + struct work_queue_item *item; + wq_item_status ret; + unsigned int cycles = 0; + struct listnode *node, *nnode; + char yielded = 0; + + wq = THREAD_ARG (thread); + wq->thread = NULL; + + assert (wq && wq->items); + + /* calculate cycle granularity: + * list iteration == 1 cycle + * granularity == # cycles between checks whether we should yield. + * + * granularity should be > 0, and can increase slowly after each run to + * provide some hysteris, but not past cycles.best or 2*cycles. + * + * Best: starts low, can only increase + * + * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased if we run to end of time + * slot, can increase otherwise by a small factor. + * + * We could use just the average and save some work, however we want to be + * able to adjust quickly to CPU pressure. Average wont shift much if + * daemon has been running a long time. + */ + if (wq->cycles.granularity == 0) + wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; + + for (ALL_LIST_ELEMENTS (wq->items, node, nnode, item)) + { + assert (item && item->data); + + /* dont run items which are past their allowed retries */ + if (item->retry_count >= wq->spec.max_retries) + { + /* run error handler, if any */ + if (wq->spec.errorfunc) + wq->spec.errorfunc (wq, item->data); + work_queue_item_remove (wq, node); + continue; + } + + /* run and take care of items that want to be retried immediately */ + do + { + ret = wq->spec.workfunc (item->data); + item->retry_count++; + } + while ((ret == WQ_RETRY_NOW) + && (item->retry_count < wq->spec.max_retries)); + + switch (ret) + { + case WQ_RETRY_LATER: + { + item->retry_count++; + goto stats; + } + case WQ_REQUEUE: + { + item->retry_count++; + work_queue_item_requeue (wq, node); + break; + } + case WQ_RETRY_NOW: + case WQ_ERROR: + { + if (wq->spec.errorfunc) + wq->spec.errorfunc (wq, item); + } + /* fall through here is deliberate */ + case WQ_SUCCESS: + default: + { + work_queue_item_remove (wq, node); + break; + } + } + + /* completed cycle */ + cycles++; + + /* test if we should yield */ + if ( !(cycles % wq->cycles.granularity) + && thread_should_yield (thread)) + { + yielded = 1; + goto stats; + } + } + +stats: + +#define WQ_HYSTERIS_FACTOR 2 + + /* we yielded, check whether granularity should be reduced */ + if (yielded && (cycles < wq->cycles.granularity)) + { + wq->cycles.granularity = ((cycles > 0) ? cycles + : WORK_QUEUE_MIN_GRANULARITY); + } + + if (cycles > (wq->cycles.granularity)) + { + if (cycles > wq->cycles.best) + wq->cycles.best = cycles; + + /* along with yielded check, provides hysteris for granularity */ + if (cycles > (wq->cycles.granularity * WQ_HYSTERIS_FACTOR)) + wq->cycles.granularity += WQ_HYSTERIS_FACTOR; + } +#undef WQ_HYSTERIS_FACTOR + + wq->runs++; + wq->cycles.total += cycles; + +#if 0 + printf ("%s: cycles %d, new: best %d, worst %d\n", + __func__, cycles, wq->cycles.best, wq->cycles.granularity); +#endif + + /* Is the queue done yet? */ + if (listcount (wq->items) > 0) + wq->thread = thread_add_background (wq->master, work_queue_run, wq, + wq->spec.delay); + + return 0; +} diff --git a/lib/workqueue.h b/lib/workqueue.h new file mode 100644 index 00000000..45fffe7b --- /dev/null +++ b/lib/workqueue.h @@ -0,0 +1,91 @@ +/* + * Quagga Work Queues. + * + * Copyright (C) 2005 Sun Microsystems, Inc. + * + * This file is part of Quagga. + * + * Quagga is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2, or (at your option) any + * later version. + * + * Quagga is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Quagga; see the file COPYING. If not, write to the Free + * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA + * 02111-1307, USA. + */ + +#ifndef _QUAGGA_WORK_QUEUE_H +#define _QUAGGA_WORK_QUEUE_H + +/* Work queue default hold and cycle times - millisec */ +#define WORK_QUEUE_DEFAULT_HOLD 50 /* hold time for initial run of a queue */ +#define WORK_QUEUE_DEFAULT_DELAY 10 /* minimum delay between queue runs */ + +/* action value, for use by item processor and item error handlers */ +typedef enum +{ + WQ_SUCCESS = 0, + WQ_ERROR, /* Error, run error handler if provided */ + WQ_RETRY_NOW, /* retry immediately */ + WQ_RETRY_LATER, /* retry later, cease processing work queue */ + WQ_REQUEUE /* requeue item, continue processing work queue */ +} wq_item_status; + +/* A single work queue item, unsurprisingly */ +struct work_queue_item +{ + void *data; /* opaque data */ + unsigned short retry_count; /* number of times retried */ +}; + +struct work_queue +{ + struct thread_master *master; /* thread master */ + struct thread *thread; /* thread, if one is active */ + char *name; /* work queue name */ + + /* specification for this work queue */ + struct { + /* work function to process items with */ + wq_item_status (*workfunc) (void *); + + /* error handling function, optional */ + void (*errorfunc) (struct work_queue *, struct work_queue_item *); + + /* callback to delete user specific item data */ + void (*del_item_data) (void *); + + /* max number of retries to make for item that errors */ + unsigned int max_retries; + + unsigned int hold; /* hold time for first run, in ms */ + unsigned int delay; /* min delay between queue runs, in ms */ + } spec; + + /* remaining fields should be opaque to users */ + struct list *items; /* queue item list */ + unsigned long runs; /* runs count */ + + struct { + unsigned int best; + unsigned int granularity; + unsigned long total; + } cycles; /* cycle counts */ +}; + +/* User API */ +struct work_queue *work_queue_new (struct thread_master *, const char *); +void work_queue_free (struct work_queue *); +void work_queue_add (struct work_queue *, void *); + +/* Helpers, exported for thread.c and command.c */ +int work_queue_run (struct thread *); +extern struct cmd_element show_work_queues_cmd; +#endif /* _QUAGGA_WORK_QUEUE_H */ |