From 94e687d9d39451290c8a494aa53a18280cf9e2c9 Mon Sep 17 00:00:00 2001 From: Vreixo Formoso Date: Sun, 23 Dec 2007 02:24:21 +0100 Subject: [PATCH] Add a ring buffer implementation. This is intented to replace the pipe between the writer and reader threads. That way we can have a much better control of cancelation situations. Still to be optimized. --- .bzrignore | 1 + Makefile.am | 8 +- demo/cat_buffer.c | 126 ++++++++++++++++++++++++ src/buffer.c | 245 ++++++++++++++++++++++++++++++++++++++++++++++ src/buffer.h | 82 ++++++++++++++++ src/ecma119.c | 4 +- 6 files changed, 463 insertions(+), 3 deletions(-) create mode 100644 demo/cat_buffer.c create mode 100644 src/buffer.c create mode 100644 src/buffer.h diff --git a/.bzrignore b/.bzrignore index 20b3250..e09f2f5 100644 --- a/.bzrignore +++ b/.bzrignore @@ -29,3 +29,4 @@ demo/cat demo/tree demo/ecma119tree demo/iso +demo/catbuffer diff --git a/Makefile.am b/Makefile.am index 3cd45a5..f9ad239 100644 --- a/Makefile.am +++ b/Makefile.am @@ -36,7 +36,8 @@ src_libisofs_la_SOURCES = \ src/ecma119.c \ src/ecma119_tree.h \ src/ecma119_tree.c \ - src/writer.h + src/writer.h \ + src/buffer.c libinclude_HEADERS = \ src/libisofs.h @@ -46,6 +47,7 @@ libinclude_HEADERS = \ noinst_PROGRAMS = \ demo/lsl \ demo/cat \ + demo/catbuffer \ demo/tree \ demo/ecma119tree \ demo/iso @@ -58,6 +60,10 @@ demo_cat_CPPFLAGS = -Isrc demo_cat_LDADD = $(src_libisofs_la_OBJECTS) $(THREAD_LIBS) demo_cat_SOURCES = demo/cat.c +demo_catbuffer_CPPFLAGS = -Isrc +demo_catbuffer_LDADD = $(src_libisofs_la_OBJECTS) $(THREAD_LIBS) +demo_catbuffer_SOURCES = demo/cat_buffer.c + demo_tree_CPPFLAGS = -Isrc demo_tree_LDADD = $(src_libisofs_la_OBJECTS) $(THREAD_LIBS) demo_tree_SOURCES = demo/tree.c diff --git a/demo/cat_buffer.c b/demo/cat_buffer.c new file mode 100644 index 0000000..d5745fe --- /dev/null +++ b/demo/cat_buffer.c @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2007 Vreixo Formoso + * + * This file is part of the libisofs project; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. See COPYING file for details. + */ + +#include "libisofs.h" +#include "buffer.h" + +#include +#include +#include +#include +#include +#include +#include + +/* + * Little test program that reads a file and outputs it to stdout, using + * the libisofs ring buffer as intermediate memory + */ + +struct th_data { + IsoRingBuffer *rbuf; + char *path; +}; + +#define WRITE_CHUNK 2048 +#define READ_CHUNK 2048 + +static +void *write_function(void *arg) +{ + ssize_t bytes; + int res; + unsigned char tmp[WRITE_CHUNK]; + struct th_data *data = (struct th_data *) arg; + + int fd = open(data->path, O_RDONLY); + if (fd < 0) { + fprintf(stderr, "Writer thread error: Can't open file"); + iso_ring_buffer_writer_close(data->rbuf); + pthread_exit(NULL); + } + + res = 1; + while ( (bytes = read(fd, tmp, WRITE_CHUNK)) > 0 ) { + res = iso_ring_buffer_write(data->rbuf, tmp, bytes); + if (res <= 0) { + break; + } + /* To test premature reader exit >>>>>>>>>>> + iso_ring_buffer_writer_close(data->rbuf); + pthread_exit(NULL); + <<<<<<<<<<<<<<<<<<<<<<<<< */ +// if (rand() > 2000000000) { +// fprintf(stderr, "Writer sleeping\n"); +// sleep(1); +// } + } + fprintf(stderr, "Writer finish: %d\n", res); + + close(fd); + iso_ring_buffer_writer_close(data->rbuf); + pthread_exit(NULL); +} + +static +void *read_function(void *arg) +{ + unsigned char tmp[READ_CHUNK]; + int res = 1; + struct th_data *data = (struct th_data *) arg; + + while ( (res = iso_ring_buffer_read(data->rbuf, tmp, READ_CHUNK)) > 0 ) { + write(1, tmp, READ_CHUNK); + /* To test premature reader exit >>>>>>>>>>> + iso_ring_buffer_reader_close(data->rbuf); + pthread_exit(NULL); + <<<<<<<<<<<<<<<<<<<<<<<<< */ +// if (rand() > 2000000000) { +// fprintf(stderr, "Reader sleeping\n"); +// sleep(1); +// } + } + fprintf(stderr, "Reader finish: %d\n", res); + + iso_ring_buffer_reader_close(data->rbuf); + + pthread_exit(NULL); +} + +int main(int argc, char **argv) +{ + int res; + struct th_data data; + pthread_t reader; + pthread_t writer; + + if (argc != 2) { + fprintf(stderr, "Usage: catbuffer /path/to/file\n"); + return 1; + } + + res = iso_ring_buffer_new(&data.rbuf); + if (res < 0) { + fprintf(stderr, "Can't create buffer\n"); + return 1; + } + data.path = argv[1]; + + res = pthread_create(&writer, NULL, write_function, (void *) &data); + res = pthread_create(&reader, NULL, read_function, (void *) &data); + + pthread_join(writer, NULL); + pthread_join(reader, NULL); + + fprintf(stderr, "Buffer was %d times full and %d times empty.\n", + iso_ring_buffer_get_times_full(data.rbuf), + iso_ring_buffer_get_times_empty(data.rbuf)); + + free(data.rbuf); + return 0; +} diff --git a/src/buffer.c b/src/buffer.c new file mode 100644 index 0000000..6ac3ceb --- /dev/null +++ b/src/buffer.c @@ -0,0 +1,245 @@ +/* + * Copyright (c) 2007 Vreixo Formoso + * + * This file is part of the libisofs project; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. See COPYING file for details. + */ + +/* + * Synchronized ring buffer, works with a writer thread and a read thread. + * + * TODO things to optimize + * - write/read at the end of buffer requires a second mutex_lock, even if + * there's enought space/data at the beginning + * - pre-buffer for writes < BLOCK_SIZE + * + */ + +#include "buffer.h" +#include "error.h" + +#include +#include + +#define BUFFER_CAPACITY BLOCK_SIZE * BUFFER_SIZE + +#ifndef MIN +# define MIN(a, b) (((a) < (b)) ? (a) : (b)) +#endif + +struct iso_ring_buffer { + uint8_t buf[BLOCK_SIZE * BUFFER_SIZE]; + + /* + * Number of bytes available. + */ + size_t size; + + /* position for reading and writing, offset from buf */ + size_t rpos; + size_t wpos; + + /* flags to report if read or writer threads ends execution */ + unsigned int rend:1; + unsigned int wend:1; + + /* just for statistical purposes */ + unsigned int times_full; + unsigned int times_empty; + + pthread_mutex_t mutex; + pthread_cond_t empty; + pthread_cond_t full; +}; + +/** + * Create a new buffer. + * + * The created buffer can be freed with free(3) + * + * @return + * 1 success, < 0 error + */ +int iso_ring_buffer_new(IsoRingBuffer **rbuf) +{ + IsoRingBuffer *buffer; + + if (rbuf == NULL) { + return ISO_NULL_POINTER; + } + + buffer = malloc(sizeof(IsoRingBuffer)); + if (buffer == NULL) { + return ISO_MEM_ERROR; + } + + buffer->size = 0; + buffer->wpos = 0; + buffer->rpos = 0; + + buffer->times_full = 0; + buffer->times_empty = 0; + + buffer->rend = buffer->wend = 0; + + /* init mutex and waiting queues */ + pthread_mutex_init(&buffer->mutex, NULL); + pthread_cond_init(&buffer->empty, NULL); + pthread_cond_init(&buffer->full, NULL); + + *rbuf = buffer; + return ISO_SUCCESS; +} + +/** + * Write count bytes into buffer. It blocks until all bytes where written or + * reader close the buffer. + * + * @param buf + * the buffer + * @param data + * pointer to a memory region of at least coun bytes, from which data + * will be read. + * @param + * Number of bytes to write + * @return + * 1 succes, 0 read finished, < 0 error + */ +int iso_ring_buffer_write(IsoRingBuffer *buf, uint8_t *data, size_t count) +{ + size_t len; + int bytes_write = 0; + + if (buf == NULL || data == NULL) { + return ISO_NULL_POINTER; + } + + while (bytes_write < count) { + + pthread_mutex_lock(&buf->mutex); + + while (buf->size == BUFFER_CAPACITY) { + + /* + * Note. There's only a writer, so we have no race conditions. + * Thus, the while(buf->size == BUFFER_CAPACITY) is used here + * only to propertly detect the reader has been cancelled + */ + + if (buf->rend) { + /* the read procces has been finished */ + pthread_mutex_unlock(&buf->mutex); + return 0; + } + buf->times_full++; + /* wait until space available */ + pthread_cond_wait(&buf->full, &buf->mutex); + } + + len = MIN(count - bytes_write, BUFFER_CAPACITY - buf->size); + if (buf->wpos + len > BUFFER_CAPACITY) { + len = BUFFER_CAPACITY - buf->wpos; + } + memcpy(buf->buf + buf->wpos, data + bytes_write, len); + buf->wpos = (buf->wpos + len) % (BUFFER_CAPACITY); + bytes_write += len; + buf->size += len; + + /* wake up reader */ + pthread_cond_signal(&buf->empty); + pthread_mutex_unlock(&buf->mutex); + } + return ISO_SUCCESS; +} + +/** + * Read count bytes from the buffer into dest. It blocks until the desired + * bytes has been read. If the writer finishes before outputting enought + * bytes, 0 (EOF) is returned, the number of bytes already read remains + * unknown. + * + * @return + * 1 success, 0 EOF, < 0 error + */ +int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count) +{ + size_t len; + int bytes_read = 0; + + if (buf == NULL || dest == NULL) { + return ISO_NULL_POINTER; + } + + while (bytes_read < count) { + pthread_mutex_lock(&buf->mutex); + + while (buf->size == 0) { + /* + * Note. There's only a reader, so we have no race conditions. + * Thus, the while(buf->size == 0) is used here just to ensure + * a reader detects the EOF propertly if the writer has been + * canceled while the reader was waiting + */ + + if (buf->wend) { + /* the writer procces has been finished */ + pthread_mutex_unlock(&buf->mutex); + return 0; /* EOF */ + } + buf->times_empty++; + /* wait until data available */ + pthread_cond_wait(&buf->empty, &buf->mutex); + } + + len = MIN(count - bytes_read, buf->size); + if (buf->rpos + len > BUFFER_CAPACITY) { + len = BUFFER_CAPACITY - buf->rpos; + } + memcpy(dest + bytes_read, buf->buf + buf->rpos, len); + buf->rpos = (buf->rpos + len) % (BUFFER_CAPACITY); + bytes_read += len; + buf->size -= len; + + /* wake up the writer */ + pthread_cond_signal(&buf->full); + pthread_mutex_unlock(&buf->mutex); + } + return ISO_SUCCESS; +} + +void iso_ring_buffer_writer_close(IsoRingBuffer *buf) +{ + pthread_mutex_lock(&buf->mutex); + buf->wend = 1; + + /* ensure no reader is waiting */ + pthread_cond_signal(&buf->empty); + pthread_mutex_unlock(&buf->mutex); +} + +void iso_ring_buffer_reader_close(IsoRingBuffer *buf) +{ + pthread_mutex_lock(&buf->mutex); + buf->rend = 1; + + /* ensure no writer is waiting */ + pthread_cond_signal(&buf->full); + pthread_mutex_unlock(&buf->mutex); +} + +/** + * Get the times the buffer was full. + */ +unsigned int iso_ring_buffer_get_times_full(IsoRingBuffer *buf) +{ + return buf->times_full; +} + +/** + * Get the times the buffer was empty. + */ +unsigned int iso_ring_buffer_get_times_empty(IsoRingBuffer *buf) +{ + return buf->times_empty; +} diff --git a/src/buffer.h b/src/buffer.h new file mode 100644 index 0000000..0179cbc --- /dev/null +++ b/src/buffer.h @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2007 Vreixo Formoso + * + * This file is part of the libisofs project; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. See COPYING file for details. + */ + +#ifndef LIBISO_BUFFER_H_ +#define LIBISO_BUFFER_H_ + +#include +#include + +/* 2 MB buffer */ +#define BLOCK_SIZE 2048 +#define BUFFER_SIZE 1024 + +typedef struct iso_ring_buffer IsoRingBuffer; + +/** + * Create a new buffer. + * + * The created buffer can be freed with free(3) + * + * @return + * 1 success, < 0 error + */ +int iso_ring_buffer_new(IsoRingBuffer **rbuf); + +/** + * Write count bytes into buffer. It blocks until all bytes where written or + * reader close the buffer. + * + * @param buf + * the buffer + * @param data + * pointer to a memory region of at least coun bytes, from which data + * will be read. + * @param + * Number of bytes to write + * @return + * 1 succes, 0 read finished, < 0 error + */ +int iso_ring_buffer_write(IsoRingBuffer *buf, uint8_t *data, size_t count); + +/** + * Read count bytes from the buffer into dest. It blocks until the desired + * bytes has been read. If the writer finishes before outputting enought + * bytes, 0 (EOF) is returned, the number of bytes already read remains + * unknown. + * + * @return + * 1 success, 0 EOF, < 0 error + */ +int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count); + +/** + * Close the buffer (to be called by the writer). + * You have to explicity close the buffer when you don't have more data to + * write, otherwise reader will be waiting forever. + */ +void iso_ring_buffer_writer_close(IsoRingBuffer *buf); + +/** + * Close the buffer (to be called by the reader). + * If for any reason you don't want to read more data, you need to call this + * to let the writer thread finish. + */ +void iso_ring_buffer_reader_close(IsoRingBuffer *buf); + +/** + * Get the times the buffer was full. + */ +unsigned int iso_ring_buffer_get_times_full(IsoRingBuffer *buf); + +/** + * Get the times the buffer was empty. + */ +unsigned int iso_ring_buffer_get_times_empty(IsoRingBuffer *buf); + +#endif /*LIBISO_BUFFER_H_*/ diff --git a/src/ecma119.c b/src/ecma119.c index c1f672b..43ca4dd 100644 --- a/src/ecma119.c +++ b/src/ecma119.c @@ -705,8 +705,8 @@ int ecma119_image_new(IsoImage *src, Ecma119WriteOpts *opts, pthread_attr_init(&(target->th_attr)); pthread_attr_setdetachstate(&(target->th_attr), PTHREAD_CREATE_JOINABLE); - ret = pthread_create(&(target->wthread), NULL, write_function, - (void *) target); + ret = pthread_create(&(target->wthread), &(target->th_attr), + write_function, (void *) target); if (ret != 0) { iso_msg_fatal(target->image, LIBISO_THREAD_ERROR, "Cannot create writer thread");