diff options
author | ajs <ajs> | 2005-04-11 15:51:40 +0000 |
---|---|---|
committer | ajs <ajs> | 2005-04-11 15:51:40 +0000 |
commit | 634f9ea20fce82c94407cb677b5487b65bde1973 (patch) | |
tree | 32db82e4ae3b0f409d6d06a0fda401f9d5df5cc9 /lib | |
parent | dfb9a545f8e3500e6a99518193872f526c1f56ba (diff) |
2005-04-11 Andrew J. Schorr <ajschorr@alumni.princeton.edu>
Implement non-blocking zclient I/O with buffering.
* zclient.h (struct zclient): Add two fields to support non-blocking
I/O: struct buffer *wb, and struct thread *t_write.
(zclient_free): Remove function.
(zebra_redistribute_send): Change 2nd arg from socket fd to
struct zclient * (needed to support non-blocking I/O and buffering).
(zclient_send_message): New function to send an arbitrary
message with non-blocking I/O.
* zclient.c (zclient_new): Create write buffer.
(zclient_free): Remove unused function.
(zclient_stop): Must cancel new t_write thread. Also, reset
all buffers: ibuf, obuf, and wb.
(zclient_failed): New helper function for typical error handling.
(zclient_flush_data): New thread to flush queued data.
(zclient_send_message): New function to send the message in
zclient->obuf to zebra using non-blocking I/O and buffering.
(zebra_message_send, zapi_ipv4_route, zapi_ipv6_route): Use
new zclient_send_message function instead of calling writen.
(zclient_start): Set socket non-blocking. Also, change 2nd arg
to zebra_redistribute_send from zclient->sock to zclient.
(zebra_redistribute_send): Change 2nd arg to struct zclient *.
Can now use zclient->obuf to assemble the message instead of
allocating a temporary stream. And call zclient_send_message to
send the message instead of writen.
(zclient_read): Convert to support non-blocking I/O by using
stream_read_try instead of deprecated stream_read.
(zclient_redistribute): Change 2nd arg to zebra_redistribute_send
from zclient->sock to zclient.
* ospf6_zebra.c (ospf6_zebra_redistribute, ospf6_zebra_no_redistribute):
Change 2nd arg to zebra_redistribute_send from zclient->sock
to zclient.
* ospf_zebra.c (ospf_zebra_add): Call zclient_send_message instead
of writen.
* rip_zebra.c (rip_redistribute_set, rip_redistribute_unset,
rip_redistribute_clean): Change 2nd arg to zebra_redistribute_send
from zclient->sock to zclient.
* ripng_zebra.c (ripng_redistribute_unset, ripng_redistribute_clean):
Change 2nd arg to zebra_redistribute_send from zclient->sock
to zclient.
* bgp_zebra.c (bgp_redistribute_set, bgp_redistribute_unset):
The 2nd arg to zebra_redistribute_send is now zclient instead of
zclient->sock.
* isis_zebra.h (isis_zebra_finish): Remove declaration of unused
function.
* isis_zebra.c (isis_zebra_route_add_ipv4): Call zclient_send_message
to send the message to zebra instead of calling writen directly, since
zclient_send_message understands non-blocking I/O and will manage
the buffer queue appropriately.
(isis_zebra_finish): Remove unused function, particularly since
the zclient_free function has been removed.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/ChangeLog | 30 | ||||
-rw-r--r-- | lib/zclient.c | 235 | ||||
-rw-r--r-- | lib/zclient.h | 28 |
3 files changed, 216 insertions, 77 deletions
diff --git a/lib/ChangeLog b/lib/ChangeLog index 97b7fb22..2bacff1e 100644 --- a/lib/ChangeLog +++ b/lib/ChangeLog @@ -1,3 +1,33 @@ +2005-04-11 Andrew J. Schorr <ajschorr@alumni.princeton.edu> + + * zclient.h (struct zclient): Add two fields to support non-blocking + I/O: struct buffer *wb, and struct thread *t_write. + (zclient_free): Remove function. + (zebra_redistribute_send): Change 2nd arg from socket fd to + struct zclient * (needed to support non-blocking I/O and buffering). + (zclient_send_message): New function to send an arbitrary + message with non-blocking I/O. + * zclient.c (zclient_new): Create write buffer. + (zclient_free): Remove unused function. + (zclient_stop): Must cancel new t_write thread. Also, reset + all buffers: ibuf, obuf, and wb. + (zclient_failed): New helper function for typical error handling. + (zclient_flush_data): New thread to flush queued data. + (zclient_send_message): New function to send the message in + zclient->obuf to zebra using non-blocking I/O and buffering. + (zebra_message_send, zapi_ipv4_route, zapi_ipv6_route): Use + new zclient_send_message function instead of calling writen. + (zclient_start): Set socket non-blocking. Also, change 2nd arg + to zebra_redistribute_send from zclient->sock to zclient. + (zebra_redistribute_send): Change 2nd arg to struct zclient *. + Can now use zclient->obuf to assemble the message instead of + allocating a temporary stream. And call zclient_send_message to + send the message instead of writen. + (zclient_read): Convert to support non-blocking I/O by using + stream_read_try instead of deprecated stream_read. + (zclient_redistribute): Change 2nd arg to zebra_redistribute_send + from zclient->sock to zclient. + 2005-04-09 Jeroen Simonetti <jeroens@office.netland.nl> * routemap.c: Show description in "show route-map" output. diff --git a/lib/zclient.c b/lib/zclient.c index efcad57f..cd99b843 100644 --- a/lib/zclient.c +++ b/lib/zclient.c @@ -1,5 +1,6 @@ /* Zebra's client library. * Copyright (C) 1999 Kunihiro Ishiguro + * Copyright (C) 2005 Andrew J. Schorr * * This file is part of GNU Zebra. * @@ -23,6 +24,7 @@ #include "prefix.h" #include "stream.h" +#include "buffer.h" #include "network.h" #include "if.h" #include "log.h" @@ -40,6 +42,8 @@ enum event {ZCLIENT_SCHEDULE, ZCLIENT_READ, ZCLIENT_CONNECT}; /* Prototype for event manager. */ static void zclient_event (enum event, struct zclient *); +extern struct thread_master *master; + /* This file local debug flag. */ int zclient_debug = 0; @@ -53,16 +57,31 @@ zclient_new () zclient->ibuf = stream_new (ZEBRA_MAX_PACKET_SIZ); zclient->obuf = stream_new (ZEBRA_MAX_PACKET_SIZ); + zclient->wb = buffer_new(0); return zclient; } +#if 0 +/* This function is never used. And it must not be used, because + many parts of the code do not check for I/O errors, so they could + reference an invalid pointer if the structure was ever freed. +*/ + /* Free zclient structure. */ void zclient_free (struct zclient *zclient) { + if (zclient->ibuf) + stream_free(zclient->ibuf); + if (zclient->obuf) + stream_free(zclient->obuf); + if (zclient->wb) + buffer_free(zclient->wb); + XFREE (MTYPE_ZCLIENT, zclient); } +#endif /* Initialize zebra client. Argument redist_default is unwanted redistribute route type. */ @@ -104,16 +123,16 @@ zclient_stop (struct zclient *zclient) zlog_debug ("zclient stopped"); /* Stop threads. */ - if (zclient->t_read) - { - thread_cancel (zclient->t_read); - zclient->t_read = NULL; - } - if (zclient->t_connect) - { - thread_cancel (zclient->t_connect); - zclient->t_connect = NULL; - } + THREAD_OFF(zclient->t_read); + THREAD_OFF(zclient->t_connect); + THREAD_OFF(zclient->t_write); + + /* Reset streams. */ + stream_reset(zclient->ibuf); + stream_reset(zclient->obuf); + + /* Empty the write buffer. */ + buffer_reset(zclient->wb); /* Close socket. */ if (zclient->sock >= 0) @@ -133,7 +152,7 @@ zclient_reset (struct zclient *zclient) /* Make socket to zebra daemon. Return zebra socket. */ int -zclient_socket () +zclient_socket(void) { int sock; int ret; @@ -196,8 +215,66 @@ zclient_socket_un (const char *path) return sock; } -/* Send simple Zebra message. */ +static int +zclient_failed(struct zclient *zclient) +{ + zclient->fail++; + zclient_stop(zclient); + zclient_event(ZCLIENT_CONNECT, zclient); + return -1; +} + +static int +zclient_flush_data(struct thread *thread) +{ + struct zclient *zclient = THREAD_ARG(thread); + + zclient->t_write = NULL; + if (zclient->sock < 0) + return -1; + switch (buffer_flush_available(zclient->wb, zclient->sock)) + { + case BUFFER_ERROR: + zlog_warn("%s: buffer_flush_available failed on zclient fd %d, closing", + __func__, zclient->sock); + return zclient_failed(zclient); + break; + case BUFFER_PENDING: + zclient->t_write = thread_add_write(master, zclient_flush_data, + zclient, zclient->sock); + break; + case BUFFER_EMPTY: + break; + } + return 0; +} + int +zclient_send_message(struct zclient *zclient) +{ + if (zclient->sock < 0) + return -1; + switch (buffer_write(zclient->wb, zclient->sock, STREAM_DATA(zclient->obuf), + stream_get_endp(zclient->obuf))) + { + case BUFFER_ERROR: + zlog_warn("%s: buffer_write failed to zclient fd %d, closing", + __func__, zclient->sock); + return zclient_failed(zclient); + break; + case BUFFER_EMPTY: + THREAD_OFF(zclient->t_write); + break; + case BUFFER_PENDING: + THREAD_WRITE_ON(master, zclient->t_write, + zclient_flush_data, zclient, zclient->sock); + break; + } + return 0; +} + +/* Send simple Zebra message. */ +static int zebra_message_send (struct zclient *zclient, int command) { struct stream *s; @@ -210,7 +287,7 @@ zebra_message_send (struct zclient *zclient, int command) stream_putw (s, 3); stream_putc (s, command); - return writen (zclient->sock, s->data, 3); + return zclient_send_message(zclient); } /* Make connection to zebra daemon. */ @@ -249,6 +326,9 @@ zclient_start (struct zclient *zclient) return -1; } + if (set_nonblocking(zclient->sock) < 0) + zlog_warn("%s: set_nonblocking(%d) failed", __func__, zclient->sock); + /* Clear fail count. */ zclient->fail = 0; if (zclient_debug) @@ -266,7 +346,7 @@ zclient_start (struct zclient *zclient) /* Flush all redistribute request. */ for (i = 0; i < ZEBRA_ROUTE_MAX; i++) if (i != zclient->redist_default && zclient->redist[i]) - zebra_redistribute_send (ZEBRA_REDISTRIBUTE_ADD, zclient->sock, i); + zebra_redistribute_send (ZEBRA_REDISTRIBUTE_ADD, zclient, i); /* If default information is needed. */ if (zclient->default_information) @@ -277,7 +357,7 @@ zclient_start (struct zclient *zclient) /* This function is a wrapper function for calling zclient_start from timer or event thread. */ -int +static int zclient_connect (struct thread *t) { struct zclient *zclient; @@ -394,7 +474,7 @@ zapi_ipv4_route (u_char cmd, struct zclient *zclient, struct prefix_ipv4 *p, /* Put length at the first point of the stream. */ stream_putw_at (s, 0, stream_get_endp (s)); - return writen (zclient->sock, s->data, stream_get_endp (s)); + return zclient_send_message(zclient); } #ifdef HAVE_IPV6 @@ -449,7 +529,7 @@ zapi_ipv6_route (u_char cmd, struct zclient *zclient, struct prefix_ipv6 *p, /* Put length at the first point of the stream. */ stream_putw_at (s, 0, stream_get_endp (s)); - return writen (zclient->sock, s->data, stream_get_endp (s)); + return zclient_send_message(zclient); } #endif /* HAVE_IPV6 */ @@ -460,24 +540,20 @@ zapi_ipv6_route (u_char cmd, struct zclient *zclient, struct prefix_ipv6 *p, * sending client */ int -zebra_redistribute_send (int command, int sock, int type) +zebra_redistribute_send (int command, struct zclient *zclient, int type) { - int ret; struct stream *s; - s = stream_new (ZEBRA_MAX_PACKET_SIZ); + s = zclient->obuf; + stream_reset(s); - /* Total length of the messages. */ + /* Total length of the message. */ stream_putw (s, 4); stream_putc (s, command); stream_putc (s, type); - ret = writen (sock, s->data, 4); - - stream_free (s); - - return ret; + return zclient_send_message(zclient); } /* Router-id update from zebra daemon. */ @@ -715,72 +791,87 @@ zebra_interface_address_read (int type, struct stream *s) /* Zebra client message read function. */ -int +static int zclient_read (struct thread *thread) { int ret; - int nbytes; - int sock; + size_t already; zebra_size_t length; zebra_command_t command; struct zclient *zclient; /* Get socket to zebra. */ - sock = THREAD_FD (thread); zclient = THREAD_ARG (thread); zclient->t_read = NULL; - /* Clear input buffer. */ - stream_reset (zclient->ibuf); - - /* Read zebra header. */ - nbytes = stream_read (zclient->ibuf, sock, ZEBRA_HEADER_SIZE); - - /* zebra socket is closed. */ - if (nbytes == 0) + /* Read zebra header (if we don't have it already). */ + if ((already = stream_get_endp(zclient->ibuf)) < ZEBRA_HEADER_SIZE) { - if (zclient_debug) - zlog_debug ("zclient connection closed socket [%d].", sock); - zclient->fail++; - zclient_stop (zclient); - zclient_event (ZCLIENT_CONNECT, zclient); - return -1; + ssize_t nbyte; + if (((nbyte = stream_read_try(zclient->ibuf, zclient->sock, + ZEBRA_HEADER_SIZE-already)) == 0) || + (nbyte == -1)) + { + if (zclient_debug) + zlog_debug ("zclient connection closed socket [%d].", zclient->sock); + return zclient_failed(zclient); + } + if (nbyte != (ssize_t)(ZEBRA_HEADER_SIZE-already)) + { + /* Try again later. */ + zclient_event (ZCLIENT_READ, zclient); + return 0; + } + already = ZEBRA_HEADER_SIZE; } - /* zebra read error. */ - if (nbytes < 0 || nbytes != ZEBRA_HEADER_SIZE) - { - if (zclient_debug) - zlog_debug ("Can't read all packet (length %d).", nbytes); - zclient->fail++; - zclient_stop (zclient); - zclient_event (ZCLIENT_CONNECT, zclient); - return -1; - } + /* Reset to read from the beginning of the incoming packet. */ + stream_set_getp(zclient->ibuf, 0); /* Fetch length and command. */ length = stream_getw (zclient->ibuf); command = stream_getc (zclient->ibuf); + if (length < ZEBRA_HEADER_SIZE) + { + zlog_err("%s: socket %d message length %u is less than %d ", + __func__, zclient->sock, length, ZEBRA_HEADER_SIZE); + return zclient_failed(zclient); + } + /* Length check. */ - if (length >= zclient->ibuf->size) + if (length > STREAM_SIZE(zclient->ibuf)) { + struct stream *ns; + zlog_warn("%s: message size %u exceeds buffer size %lu, expanding...", + __func__, length, (u_long)STREAM_SIZE(zclient->ibuf)); + ns = stream_new(length); + stream_copy(ns, zclient->ibuf); stream_free (zclient->ibuf); - zclient->ibuf = stream_new (length + 1); + zclient->ibuf = ns; } - length -= ZEBRA_HEADER_SIZE; /* Read rest of zebra packet. */ - nbytes = stream_read (zclient->ibuf, sock, length); - if (nbytes != length) - { - if (zclient_debug) - zlog_debug ("zclient connection closed socket [%d].", sock); - zclient->fail++; - zclient_stop (zclient); - zclient_event (ZCLIENT_CONNECT, zclient); - return -1; - } + if (already < length) + { + ssize_t nbyte; + if (((nbyte = stream_read_try(zclient->ibuf, zclient->sock, + length-already)) == 0) || + (nbyte == -1)) + { + if (zclient_debug) + zlog_debug("zclient connection closed socket [%d].", zclient->sock); + return zclient_failed(zclient); + } + if (nbyte != (ssize_t)(length-already)) + { + /* Try again later. */ + zclient_event (ZCLIENT_READ, zclient); + return 0; + } + } + + length -= ZEBRA_HEADER_SIZE; if (zclient_debug) zlog_debug("zclient 0x%p command 0x%x \n", zclient, command); @@ -835,7 +926,12 @@ zclient_read (struct thread *thread) break; } + if (zclient->sock < 0) + /* Connection was closed during packet processing. */ + return -1; + /* Register read thread. */ + stream_reset(zclient->ibuf); zclient_event (ZCLIENT_READ, zclient); return 0; @@ -859,7 +955,7 @@ zclient_redistribute (int command, struct zclient *zclient, int type) } if (zclient->sock > 0) - zebra_redistribute_send (command, zclient->sock, type); + zebra_redistribute_send (command, zclient, type); } @@ -884,9 +980,6 @@ zclient_redistribute_default (int command, struct zclient *zclient) zebra_message_send (zclient, command); } - -extern struct thread_master *master; - static void zclient_event (enum event event, struct zclient *zclient) { diff --git a/lib/zclient.h b/lib/zclient.h index d18cf628..aa9f75c2 100644 --- a/lib/zclient.h +++ b/lib/zclient.h @@ -50,10 +50,16 @@ struct zclient /* Output buffer for zebra message. */ struct stream *obuf; + /* Buffer of data waiting to be written to zebra. */ + struct buffer *wb; + /* Read and connect thread. */ struct thread *t_read; struct thread *t_connect; + /* Thread to write buffered data to zebra. */ + struct thread *t_write; + /* Redistribute information. */ u_char redist_default; u_char redist[ZEBRA_ROUTE_MAX]; @@ -103,19 +109,29 @@ struct zapi_ipv4 /* Prototypes of zebra client service functions. */ struct zclient *zclient_new (void); -void zclient_free (struct zclient *); void zclient_init (struct zclient *, int); int zclient_start (struct zclient *); void zclient_stop (struct zclient *); void zclient_reset (struct zclient *); -int zclient_socket (); + +/* Get TCP socket connection to zebra daemon at loopback address. */ +int zclient_socket (void); + +/* Get unix stream socket connection to zebra daemon at given path. */ int zclient_socket_un (const char *); -void zclient_redistribute (int, struct zclient *, int); -void zclient_redistribute_default (int, struct zclient *); +/* Send redistribute command to zebra daemon. Do not update zclient state. */ +int zebra_redistribute_send (int command, struct zclient *, int type); + +/* If state has changed, update state and call zebra_redistribute_send. */ +void zclient_redistribute (int command, struct zclient *, int type); + +/* If state has changed, update state and send the command to zebra. */ +void zclient_redistribute_default (int command, struct zclient *); -/* struct zebra *zebra_new (); */ -int zebra_redistribute_send (int, int, int); +/* Send the message in zclient->obuf to the zebra daemon (or enqueue it). + Returns 0 for success or -1 on an I/O error. */ +extern int zclient_send_message(struct zclient *); struct interface *zebra_interface_add_read (struct stream *); struct interface *zebra_interface_state_read (struct stream *s); |