diff options
author | paul <paul> | 2005-02-14 23:47:47 +0000 |
---|---|---|
committer | paul <paul> | 2005-02-14 23:47:47 +0000 |
commit | 050c013ac35337d86b03f140fb17d2e8e33a8baa (patch) | |
tree | 7241e3972e2116f67b762a37305c31ee7d8d9671 | |
parent | f2e6c429375adf0d3c5deaa409734d5d41ac15ce (diff) |
2005-02-14 Paul Jakma <paul.jakma@sun.com>
* stream.h: Unsigned long updated to size_t
* stream.c: ditto
* stream.h: Add stream_copy, stream_dup, stream_recvmsg.
Add comment describing struct stream abstraction, and various
other comments.
Deprecate several unsafe/ambigious macros.
Add STREAM_WRITEABLE and STREAM_READABLE.
Add (stream_getl_from) for symmetry.
Update stream_forward_{endp,getp} to use size_t offset.
Make stream data a 0 length array, rather than a seperate malloc.
* stream.c: Add consistency checks. Update to follow stream.h
changes.
(stream_new) Alloc stream+data in one go.
(stream_copy) new function, copy a stream.
(stream_dup) new function, dup a stream.
(stream_recvmsg) new function, recvmsg data into a stream.
(stream_empty) no need to check getp == 0.
-rw-r--r-- | lib/ChangeLog | 15 | ||||
-rw-r--r-- | lib/stream.c | 434 | ||||
-rw-r--r-- | lib/stream.h | 112 |
3 files changed, 486 insertions, 75 deletions
diff --git a/lib/ChangeLog b/lib/ChangeLog index 2459f927..4ce92d9b 100644 --- a/lib/ChangeLog +++ b/lib/ChangeLog @@ -2,6 +2,21 @@ * stream.h: Unsigned long updated to size_t * stream.c: ditto + * stream.h: Add stream_copy, stream_dup, stream_recvmsg. + Add comment describing struct stream abstraction, and various + other comments. + Deprecate several unsafe/ambigious macros. + Add STREAM_WRITEABLE and STREAM_READABLE. + Add (stream_getl_from) for symmetry. + Update stream_forward_{endp,getp} to use size_t offset. + Make stream data a 0 length array, rather than a seperate malloc. + * stream.c: Add consistency checks. Update to follow stream.h + changes. + (stream_new) Alloc stream+data in one go. + (stream_copy) new function, copy a stream. + (stream_dup) new function, dup a stream. + (stream_recvmsg) new function, recvmsg data into a stream. + (stream_empty) no need to check getp == 0. 2005-02-09 Paul Jakma <paul.jakma@sun.com> diff --git a/lib/stream.c b/lib/stream.c index ebeea333..e15d0426 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -1,4 +1,4 @@ -/* + /* * Packet interface * Copyright (C) 1999 Kunihiro Ishiguro * @@ -20,25 +20,70 @@ * 02111-1307, USA. */ +#include <stddef.h> #include <zebra.h> #include "stream.h" #include "memory.h" #include "network.h" #include "prefix.h" +#include "log.h" +/* Tests whether a position is valid */ +#define GETP_VALID(S,G) \ + ((G) <= (S)->endp) +#define PUT_AT_VALID(S,G) GETP_VALID(S,G) +#define ENDP_VALID(S,E) \ + ((E) <= (S)->size) -/*A macro to check pointers in order to not - go behind the allocated mem block - S -- stream reference - Z -- size of data to be written -*/ - +/* asserting sanity checks. Following must be true before + * stream functions are called: + * + * Following must always be true of stream elements + * before and after calls to stream functions: + * + * getp <= endp <= size + * + * Note that after a stream function is called following may be true: + * if (getp == endp) then stream is no longer readable + * if (endp == size) then stream is no longer writeable + * + * It is valid to put to anywhere within the size of the stream, but only + * using stream_put..._at() functions. + */ +#define STREAM_WARN_OFFSETS(S) \ + zlog_warn ("&(struct stream): %p, size: %lu, endp: %lu, getp: %lu\n", \ + (S), \ + (unsigned long) (S)->size, \ + (unsigned long) (S)->getp, \ + (unsigned long) (S)->endp)\ + +#define STREAM_VERIFY_SANE(S) \ + do { \ + if ( !(GETP_VALID(S, (S)->getp)) && ENDP_VALID(S, (S)->endp) ) \ + STREAM_WARN_OFFSETS(S); \ + assert ( GETP_VALID(S, (S)->getp) ); \ + assert ( ENDP_VALID(S, (S)->endp) ); \ + } while (0) + +#define STREAM_BOUND_WARN(S, WHAT) \ + do { \ + zlog_warn ("%s: Attempt to %s out of bounds", __func__, (WHAT)); \ + STREAM_WARN_OFFSETS(S); \ + assert (0); \ + } while (0) + +/* XXX: Deprecated macro: do not use */ #define CHECK_SIZE(S, Z) \ - if (((S)->endp + (Z)) > (S)->size) \ - (Z) = (S)->size - (S)->endp; - -/* Stream is fixed length buffer for network output/input. */ + do { \ + if (((S)->endp + (Z)) > (S)->size) \ + { \ + zlog_warn ("CHECK_SIZE: truncating requested size %lu\n", \ + (unsigned long) (Z)); \ + STREAM_WARN_OFFSETS(S); \ + (Z) = (S)->size - (S)->endp; \ + } \ + } while (0); /* Make stream buffer. */ struct stream * @@ -49,11 +94,16 @@ stream_new (size_t size) assert (size > 0); if (size == 0) - return NULL; + { + zlog_warn ("stream_new(): called with 0 size!"); + return NULL; + } - s = XCALLOC (MTYPE_STREAM, sizeof (struct stream)); + s = XCALLOC (MTYPE_STREAM, offsetof(struct stream, size)); - s->data = XCALLOC (MTYPE_STREAM_DATA, size); + if (s == NULL) + return s; + s->size = size; return s; } @@ -62,25 +112,56 @@ stream_new (size_t size) void stream_free (struct stream *s) { - XFREE (MTYPE_STREAM_DATA, s->data); XFREE (MTYPE_STREAM, s); } + +struct stream * +stream_copy (struct stream *new, struct stream *src) +{ + STREAM_VERIFY_SANE (src); + + assert (new != NULL); + assert (STREAM_SIZE(new) >= src->endp); + + new->endp = src->endp; + new->getp = src->getp; + + memcpy (new->data, src->data, src->endp); + + return new; +} + +struct stream * +stream_dup (struct stream *s) +{ + struct stream *new; + + STREAM_VERIFY_SANE (s); + + if ( (new = stream_new (s->endp)) == NULL) + return NULL; + + return (stream_copy (new, s)); +} size_t stream_get_getp (struct stream *s) { + STREAM_VERIFY_SANE(s); return s->getp; } size_t stream_get_endp (struct stream *s) { + STREAM_VERIFY_SANE(s); return s->endp; } size_t stream_get_size (struct stream *s) { + STREAM_VERIFY_SANE(s); return s->size; } @@ -88,19 +169,43 @@ stream_get_size (struct stream *s) void stream_set_getp (struct stream *s, size_t pos) { + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID (s, pos)) + { + STREAM_BOUND_WARN (s, "set getp"); + pos = s->endp; + } + s->getp = pos; } /* Forward pointer. */ void -stream_forward_getp (struct stream *s, int size) +stream_forward_getp (struct stream *s, size_t size) { + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID (s, s->getp + size)) + { + STREAM_BOUND_WARN (s, "seek getp"); + return; + } + s->getp += size; } void -stream_forward_endp (struct stream *s, int size) +stream_forward_endp (struct stream *s, size_t size) { + STREAM_VERIFY_SANE(s); + + if (!ENDP_VALID (s, s->endp + size)) + { + STREAM_BOUND_WARN (s, "seek endp"); + return; + } + s->endp += size; } @@ -108,6 +213,14 @@ stream_forward_endp (struct stream *s, int size) void stream_get (void *dst, struct stream *s, size_t size) { + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE(s) < size) + { + STREAM_BOUND_WARN (s, "get"); + return; + } + memcpy (dst, s->data + s->getp, size); s->getp += size; } @@ -117,9 +230,16 @@ u_char stream_getc (struct stream *s) { u_char c; + + STREAM_VERIFY_SANE (s); - c = s->data[s->getp]; - s->getp++; + if (STREAM_READABLE(s) < sizeof (u_char)) + { + STREAM_BOUND_WARN (s, "get char"); + return 0; + } + c = s->data[s->getp++]; + return c; } @@ -129,7 +249,16 @@ stream_getc_from (struct stream *s, size_t from) { u_char c; + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID (s, from + sizeof (u_char))) + { + STREAM_BOUND_WARN (s, "get char"); + return 0; + } + c = s->data[from]; + return c; } @@ -139,8 +268,17 @@ stream_getw (struct stream *s) { u_int16_t w; + STREAM_VERIFY_SANE (s); + + if (STREAM_READABLE (s) < sizeof (u_int16_t)) + { + STREAM_BOUND_WARN (s, "get "); + return 0; + } + w = s->data[s->getp++] << 8; w |= s->data[s->getp++]; + return w; } @@ -150,43 +288,102 @@ stream_getw_from (struct stream *s, size_t from) { u_int16_t w; + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID (s, from + sizeof (u_int16_t))) + { + STREAM_BOUND_WARN (s, "get "); + return 0; + } + w = s->data[from++] << 8; w |= s->data[from]; + return w; } /* Get next long word from the stream. */ u_int32_t +stream_getl_from (struct stream *s, size_t from) +{ + u_int32_t l; + + STREAM_VERIFY_SANE(s); + + if (!GETP_VALID (s, from + sizeof (u_int32_t))) + { + STREAM_BOUND_WARN (s, "get long"); + return 0; + } + + l = s->data[from++] << 24; + l |= s->data[from++] << 16; + l |= s->data[from++] << 8; + l |= s->data[from]; + + return l; +} + +u_int32_t stream_getl (struct stream *s) { u_int32_t l; + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE (s) < sizeof (u_int32_t)) + { + STREAM_BOUND_WARN (s, "get long"); + return 0; + } + l = s->data[s->getp++] << 24; l |= s->data[s->getp++] << 16; l |= s->data[s->getp++] << 8; l |= s->data[s->getp++]; + return l; } - /* Get next long word from the stream. */ u_int32_t stream_get_ipv4 (struct stream *s) { u_int32_t l; - memcpy (&l, s->data + s->getp, 4); - s->getp += 4; + STREAM_VERIFY_SANE(s); + + if (STREAM_READABLE (s) < sizeof(u_int32_t)) + { + STREAM_BOUND_WARN (s, "get ipv4"); + return 0; + } + + memcpy (&l, s->data + s->getp, sizeof(u_int32_t)); + s->getp += sizeof(u_int32_t); return l; } -/* Copy to source to stream. */ +/* Copy to source to stream. + * + * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap + * around. This should be fixed once the stream updates are working. + */ void stream_put (struct stream *s, void *src, size_t size) { + /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */ CHECK_SIZE(s, size); - + + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE (s) < size) + { + STREAM_BOUND_WARN (s, "put"); + return; + } + if (src) memcpy (s->data + s->endp, src, size); else @@ -199,20 +396,30 @@ stream_put (struct stream *s, void *src, size_t size) int stream_putc (struct stream *s, u_char c) { - if (s->endp >= s->size) return 0; - - s->data[s->endp] = c; - s->endp++; - - return 1; + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE (s) < sizeof(u_char)) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + + s->data[s->endp++] = c; + return sizeof (u_char); } /* Put word to the stream. */ int stream_putw (struct stream *s, u_int16_t w) { - if ((s->size - s->endp) < 2) return 0; + STREAM_VERIFY_SANE (s); + if (STREAM_WRITEABLE (s) < sizeof (u_int16_t)) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + s->data[s->endp++] = (u_char)(w >> 8); s->data[s->endp++] = (u_char) w; @@ -223,8 +430,14 @@ stream_putw (struct stream *s, u_int16_t w) int stream_putl (struct stream *s, u_int32_t l) { - if ((s->size - s->endp) < 4) return 0; + STREAM_VERIFY_SANE (s); + if (STREAM_WRITEABLE (s) < sizeof (u_int32_t)) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + s->data[s->endp++] = (u_char)(l >> 24); s->data[s->endp++] = (u_char)(l >> 16); s->data[s->endp++] = (u_char)(l >> 8); @@ -236,25 +449,51 @@ stream_putl (struct stream *s, u_int32_t l) int stream_putc_at (struct stream *s, size_t putp, u_char c) { + STREAM_VERIFY_SANE(s); + + if (!PUT_AT_VALID (s, putp + sizeof (u_char))) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + s->data[putp] = c; + return 1; } int stream_putw_at (struct stream *s, size_t putp, u_int16_t w) { + STREAM_VERIFY_SANE(s); + + if (!PUT_AT_VALID (s, putp + sizeof (u_int16_t))) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + s->data[putp] = (u_char)(w >> 8); s->data[putp + 1] = (u_char) w; + return 2; } int stream_putl_at (struct stream *s, size_t putp, u_int32_t l) { + STREAM_VERIFY_SANE(s); + + if (!PUT_AT_VALID (s, putp + sizeof (u_int32_t))) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } s->data[putp] = (u_char)(l >> 24); s->data[putp + 1] = (u_char)(l >> 16); s->data[putp + 2] = (u_char)(l >> 8); s->data[putp + 3] = (u_char)l; + return 4; } @@ -262,38 +501,53 @@ stream_putl_at (struct stream *s, size_t putp, u_int32_t l) int stream_put_ipv4 (struct stream *s, u_int32_t l) { - if ((s->size - s->endp) < 4) - return 0; - - memcpy (s->data + s->endp, &l, 4); - s->endp += 4; + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE (s) < sizeof (u_int32_t)) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + memcpy (s->data + s->endp, &l, sizeof (u_int32_t)); + s->endp += sizeof (u_int32_t); - return 4; + return sizeof (u_int32_t); } /* Put long word to the stream. */ int stream_put_in_addr (struct stream *s, struct in_addr *addr) { - if ((s->size - s->endp) < 4) - return 0; + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE (s) < sizeof (u_int32_t)) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } - memcpy (s->data + s->endp, addr, 4); - s->endp += 4; + memcpy (s->data + s->endp, addr, sizeof (u_int32_t)); + s->endp += sizeof (u_int32_t); - return 4; + return sizeof (u_int32_t); } /* Put prefix by nlri type format. */ int stream_put_prefix (struct stream *s, struct prefix *p) { - u_char psize; - + size_t psize; + + STREAM_VERIFY_SANE(s); + psize = PSIZE (p->prefixlen); - - if ((s->size - s->endp) < psize) return 0; - + + if (STREAM_WRITEABLE (s) < psize) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + stream_putc (s, p->prefixlen); memcpy (s->data + s->endp, &p->u.prefix, psize); s->endp += psize; @@ -307,6 +561,14 @@ stream_read (struct stream *s, int fd, size_t size) { int nbytes; + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE (s) < size) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + nbytes = readn (fd, s->data + s->endp, size); if (nbytes > 0) @@ -321,7 +583,15 @@ stream_read_unblock (struct stream *s, int fd, size_t size) { int nbytes; int val; - + + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE (s) < size) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + val = fcntl (fd, F_GETFL, 0); fcntl (fd, F_SETFL, val|O_NONBLOCK); nbytes = read (fd, s->data + s->endp, size); @@ -333,6 +603,39 @@ stream_read_unblock (struct stream *s, int fd, size_t size) return nbytes; } +/* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting + * from endp. + * First iovec will be used to receive the data. + * Stream need not be empty. + */ +int +stream_recvmsg (struct stream *s, int fd, struct msghdr *msgh, int flags, + size_t size) +{ + int nbytes; + struct iovec *iov; + + STREAM_VERIFY_SANE(s); + assert (msgh->msg_iovlen > 0); + + if (STREAM_WRITEABLE (s) < size) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + + iov = &(msgh->msg_iov[0]); + iov->iov_base = (s->data + s->endp); + iov->iov_len = size; + + nbytes = recvmsg (fd, msgh, flags); + + if (nbytes > 0) + s->endp += nbytes; + + return nbytes; +} + /* Write data to buffer. */ int stream_write (struct stream *s, u_char *ptr, size_t size) @@ -340,16 +643,29 @@ stream_write (struct stream *s, u_char *ptr, size_t size) CHECK_SIZE(s, size); + STREAM_VERIFY_SANE(s); + + if (STREAM_WRITEABLE (s) < size) + { + STREAM_BOUND_WARN (s, "put"); + return 0; + } + memcpy (s->data + s->endp, ptr, size); s->endp += size; return size; } -/* Return current read pointer. */ +/* Return current read pointer. + * DEPRECATED! + * Use stream_get_pnt_to if you must, but decoding streams properly + * is preferred + */ u_char * stream_pnt (struct stream *s) { + STREAM_VERIFY_SANE(s); return s->data + s->getp; } @@ -357,18 +673,18 @@ stream_pnt (struct stream *s) int stream_empty (struct stream *s) { - if (s->endp == 0 && s->getp == 0) - return 1; - else - return 0; + STREAM_VERIFY_SANE(s); + + return (s->endp == 0); } /* Reset stream. */ void stream_reset (struct stream *s) { - s->endp = 0; - s->getp = 0; + STREAM_VERIFY_SANE (s); + + s->getp = s->endp = 0; } /* Write stream contens to the file discriptor. */ @@ -376,9 +692,11 @@ int stream_flush (struct stream *s, int fd) { int nbytes; - + + STREAM_VERIFY_SANE(s); + nbytes = write (fd, s->data + s->getp, s->endp - s->getp); - + return nbytes; } diff --git a/lib/stream.h b/lib/stream.h index 2936f46b..fe45a4f5 100644 --- a/lib/stream.h +++ b/lib/stream.h @@ -25,21 +25,86 @@ #include "prefix.h" +/* + * A stream is an arbitrary buffer, whose contents generally are assumed to + * be in network order. + * + * A stream has the following attributes associated with it: + * + * - size: the allocated, invariant size of the buffer. + * + * - getp: the get position marker, denoting the offset in the stream where + * the next read (or 'get') will be from. This getp marker is + * automatically adjusted when data is read from the stream, the + * user may also manipulate this offset as they wish, within limits + * (see below) + * + * - endp: the end position marker, denoting the offset in the stream where + * valid data ends, and if the user attempted to write (or + * 'put') data where that data would be written (or 'put') to. + * + * These attributes are all size_t values. + * + * Constraints: + * + * 1. getp can never exceed endp + * + * - hence if getp is equal to endp, there is no more valid data that can be + * gotten from the stream (though, the user may reposition getp to earlier in + * the stream, if they wish). + * + * 2. endp can never exceed size + * + * - hence, if endp is equal to size, then the stream is full, and no more + * data can be written to the stream. + * + * In other words the following must always be true, and the stream + * abstraction is allowed internally to assert that the following property + * holds true for a stream, as and when it wishes: + * + * getp <= endp <= size + * + * It is the users responsibility to ensure this property is never violated. + * + * A stream therefore can be thought of like this: + * + * --------------------------------------------------- + * |XXXXXXXXXXXXXXXXXXXXXXXX | + * --------------------------------------------------- + * ^ ^ ^ + * getp endp size + * + * This shows a stream containing data (shown as 'X') up to the endp offset. + * The stream is empty from endp to size. Without adjusting getp, there are + * still endp-getp bytes of valid data to be read from the stream. + * + * Methods are provided to get and put to/from the stream, as well as + * retrieve the values of the 3 markers and manipulate the getp marker. + * + * Note: + * At the moment, newly allocated streams are zero filled. Hence, one can + * use stream_forward_endp() to effectively create arbitrary zero-fill + * padding. However, note that stream_reset() does *not* zero-out the + * stream. This property should **not** be relied upon. + * + * A Good stream user should ensure it writes all bytes. (the zero-fill + * guarantee may be removed in future, however, the zero-filling may + * possibly be moved to stream_forward_endp() instead, maybe..) + */ + /* Stream buffer. */ struct stream { struct stream *next; - unsigned char *data; - - /* Get pointer. */ - size_t getp; - - /* End of pointer. */ - size_t endp; - - /* Data size. */ - size_t size; + /* Remainder is ***private*** to stream + * direct access is frowned upon! + * Use the appropriate functions/macros + */ + size_t getp; /* next get position */ + size_t endp; /* last valid data position */ + size_t size; /* size of data segment */ + unsigned char data[0]; /* data pointer */ }; /* First in first out queue structure. */ @@ -52,14 +117,22 @@ struct stream_fifo }; /* Utility macros. */ -#define STREAM_PNT(S) ((S)->data + (S)->getp) #define STREAM_SIZE(S) ((S)->size) -#define STREAM_REMAIN(S) ((S)->size - (S)->endp) + /* number of bytes which can still be written */ +#define STREAM_WRITEABLE(S) ((S)->size - (S)->endp) + /* number of bytes still to be read */ +#define STREAM_READABLE(S) ((S)->endp - (S)->getp) + +/* deprecated macros - do not use in new code */ +#define STREAM_PNT(S) stream_pnt((S)) #define STREAM_DATA(S) ((S)->data) +#define STREAM_REMAIN(S) STREAM_WRITEABLE((S)) /* Stream prototypes. */ struct stream *stream_new (size_t); void stream_free (struct stream *); +struct stream * stream_copy (struct stream *new, struct stream *src); +struct stream *stream_dup (struct stream *); size_t stream_get_getp (struct stream *); size_t stream_get_endp (struct stream *); @@ -67,8 +140,8 @@ size_t stream_get_size (struct stream *); u_char *stream_get_data (struct stream *); void stream_set_getp (struct stream *, size_t); -void stream_forward_getp (struct stream *, int); -void stream_forward_endp (struct stream *, int); +void stream_forward_getp (struct stream *, size_t); +void stream_forward_endp (struct stream *, size_t); void stream_put (struct stream *, void *, size_t); int stream_putc (struct stream *, u_char); @@ -87,18 +160,23 @@ u_char stream_getc_from (struct stream *, size_t); u_int16_t stream_getw (struct stream *); u_int16_t stream_getw_from (struct stream *, size_t); u_int32_t stream_getl (struct stream *); +u_int32_t stream_getl_from (struct stream *, size_t); u_int32_t stream_get_ipv4 (struct stream *); #undef stream_read #undef stream_write int stream_read (struct stream *, int, size_t); int stream_read_unblock (struct stream *, int, size_t); +int stream_recvmsg (struct stream *s, int fd, struct msghdr *, + int flags, size_t size); int stream_write (struct stream *, u_char *, size_t); -u_char *stream_pnt (struct stream *); -void stream_reset (struct stream *); +void stream_reset (struct stream *); /* reset the stream. See Note above */ int stream_flush (struct stream *, int); -int stream_empty (struct stream *); +int stream_empty (struct stream *); /* is the stream empty? */ + +/* deprecated */ +u_char *stream_pnt (struct stream *); /* Stream fifo. */ struct stream_fifo *stream_fifo_new (); |