From 9ebc4a1eefa1008f578e62f98d2569fd1c55e02f Mon Sep 17 00:00:00 2001 From: Vreixo Formoso Date: Mon, 14 Jan 2008 21:13:53 +0100 Subject: [PATCH] Add some more control to the ring buffer. --- demo/cat_buffer.c | 8 ++-- demo/iso.c | 3 +- demo/iso_grow.c | 3 +- demo/iso_modify.c | 3 +- demo/iso_ms.c | 3 +- src/buffer.c | 109 ++++++++++++++++++++++++++++++++++++---------- src/buffer.h | 20 ++++++--- src/ecma119.c | 8 ++-- src/libisofs.h | 29 ++++++++++++ 9 files changed, 146 insertions(+), 40 deletions(-) diff --git a/demo/cat_buffer.c b/demo/cat_buffer.c index 91cdc3b..74657ee 100644 --- a/demo/cat_buffer.c +++ b/demo/cat_buffer.c @@ -42,7 +42,7 @@ void *write_function(void *arg) int fd = open(data->path, O_RDONLY); if (fd < 0) { fprintf(stderr, "Writer thread error: Can't open file"); - iso_ring_buffer_writer_close(data->rbuf); + iso_ring_buffer_writer_close(data->rbuf, 1); pthread_exit(NULL); } @@ -64,7 +64,7 @@ void *write_function(void *arg) fprintf(stderr, "Writer finish: %d\n", res); close(fd); - iso_ring_buffer_writer_close(data->rbuf); + iso_ring_buffer_writer_close(data->rbuf, 0); pthread_exit(NULL); } @@ -88,7 +88,7 @@ void *read_function(void *arg) } fprintf(stderr, "Reader finish: %d\n", res); - iso_ring_buffer_reader_close(data->rbuf); + iso_ring_buffer_reader_close(data->rbuf, 0); pthread_exit(NULL); } @@ -105,7 +105,7 @@ int main(int argc, char **argv) return 1; } - res = iso_ring_buffer_new(&data.rbuf); + res = iso_ring_buffer_new(1024, &data.rbuf); if (res < 0) { fprintf(stderr, "Can't create buffer\n"); return 1; diff --git a/demo/iso.c b/demo/iso.c index 6ace3c7..edc1d01 100644 --- a/demo/iso.c +++ b/demo/iso.c @@ -72,7 +72,8 @@ int main(int argc, char **argv) NULL, /* output charset */ 0, /* appendable */ 0, /* ms_block */ - NULL /* overwrite */ + NULL, /* overwrite */ + 1024 /* fifo_size */ }; while ((c = getopt(argc, argv, optstring)) != -1) { diff --git a/demo/iso_grow.c b/demo/iso_grow.c index de532a4..094a8ec 100644 --- a/demo/iso_grow.c +++ b/demo/iso_grow.c @@ -55,7 +55,8 @@ int main(int argc, char **argv) NULL, /* output charset */ 0, /* appendable */ 0, /* ms_block */ - NULL /* overwrite */ + NULL, /* overwrite */ + 1024 /* fifo_size */ }; struct iso_read_opts ropts = { 0, /* block */ diff --git a/demo/iso_modify.c b/demo/iso_modify.c index 6bd302c..e055f5d 100644 --- a/demo/iso_modify.c +++ b/demo/iso_modify.c @@ -50,7 +50,8 @@ int main(int argc, char **argv) NULL, /* output charset */ 0, /* appendable */ 0, /* ms_block */ - NULL /* overwrite */ + NULL, /* overwrite */ + 1024 /* fifo_size */ }; struct iso_read_opts ropts = { 0, /* block */ diff --git a/demo/iso_ms.c b/demo/iso_ms.c index e6d1901..00e9b0c 100644 --- a/demo/iso_ms.c +++ b/demo/iso_ms.c @@ -50,7 +50,8 @@ int main(int argc, char **argv) NULL, /* output charset */ 0, /* appendable */ 0, /* ms_block */ - NULL /* overwrite */ + NULL, /* overwrite */ + 1024 /* fifo_size */ }; struct iso_read_opts ropts = { 0, /* block */ diff --git a/src/buffer.c b/src/buffer.c index 2109165..dedcf28 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -18,19 +18,24 @@ #include "buffer.h" #include "error.h" +#include "libburn/libburn.h" +#include "ecma119.h" #include #include -#define BUFFER_CAPACITY BLOCK_SIZE * BUFFER_SIZE - #ifndef MIN # define MIN(a, b) (((a) < (b)) ? (a) : (b)) #endif struct iso_ring_buffer { - uint8_t buf[BLOCK_SIZE * BUFFER_SIZE]; + uint8_t *buf; + + /* + * Max number of bytes in buffer + */ + size_t cap; /* * Number of bytes available. @@ -41,9 +46,12 @@ struct iso_ring_buffer size_t rpos; size_t wpos; - /* flags to report if read or writer threads ends execution */ - unsigned int rend :1; - unsigned int wend :1; + /* + * flags to report if read or writer threads ends execution + * 0 not finished, 1 finished ok, 2 finish with error + */ + unsigned int rend :2; + unsigned int wend :2; /* just for statistical purposes */ unsigned int times_full; @@ -57,12 +65,16 @@ struct iso_ring_buffer /** * Create a new buffer. * - * The created buffer can be freed with free(3) + * The created buffer should be freed with iso_ring_buffer_free() * + * @param size + * Number of blocks in buffer. You should supply a number >= 32, otherwise + * size will be ignored and 32 will be used by default, which leads to a + * 64 KiB buffer. * @return * 1 success, < 0 error */ -int iso_ring_buffer_new(IsoRingBuffer **rbuf) +int iso_ring_buffer_new(size_t size, IsoRingBuffer **rbuf) { IsoRingBuffer *buffer; @@ -74,7 +86,14 @@ int iso_ring_buffer_new(IsoRingBuffer **rbuf) if (buffer == NULL) { return ISO_MEM_ERROR; } - + + buffer->cap = (size > 32 ? size : 32) * BLOCK_SIZE; + buffer->buf = malloc(buffer->cap); + if (buffer->buf == NULL) { + free(buffer); + return ISO_MEM_ERROR; + } + buffer->size = 0; buffer->wpos = 0; buffer->rpos = 0; @@ -95,6 +114,7 @@ int iso_ring_buffer_new(IsoRingBuffer **rbuf) void iso_ring_buffer_free(IsoRingBuffer *buf) { + free(buf->buf); pthread_mutex_destroy(&buf->mutex); pthread_cond_destroy(&buf->empty); pthread_cond_destroy(&buf->full); @@ -128,11 +148,11 @@ int iso_ring_buffer_write(IsoRingBuffer *buf, uint8_t *data, size_t count) pthread_mutex_lock(&buf->mutex); - while (buf->size == BUFFER_CAPACITY) { + while (buf->size == buf->cap) { /* * Note. There's only a writer, so we have no race conditions. - * Thus, the while(buf->size == BUFFER_CAPACITY) is used here + * Thus, the while(buf->size == buf->cap) is used here * only to propertly detect the reader has been cancelled */ @@ -146,12 +166,12 @@ int iso_ring_buffer_write(IsoRingBuffer *buf, uint8_t *data, size_t count) pthread_cond_wait(&buf->full, &buf->mutex); } - len = MIN(count - bytes_write, BUFFER_CAPACITY - buf->size); - if (buf->wpos + len > BUFFER_CAPACITY) { - len = BUFFER_CAPACITY - buf->wpos; + len = MIN(count - bytes_write, buf->cap - buf->size); + if (buf->wpos + len > buf->cap) { + len = buf->cap - buf->wpos; } memcpy(buf->buf + buf->wpos, data + bytes_write, len); - buf->wpos = (buf->wpos + len) % (BUFFER_CAPACITY); + buf->wpos = (buf->wpos + len) % (buf->cap); bytes_write += len; buf->size += len; @@ -202,11 +222,11 @@ int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count) } len = MIN(count - bytes_read, buf->size); - if (buf->rpos + len > BUFFER_CAPACITY) { - len = BUFFER_CAPACITY - buf->rpos; + if (buf->rpos + len > buf->cap) { + len = buf->cap - buf->rpos; } memcpy(dest + bytes_read, buf->buf + buf->rpos, len); - buf->rpos = (buf->rpos + len) % (BUFFER_CAPACITY); + buf->rpos = (buf->rpos + len) % (buf->cap); bytes_read += len; buf->size -= len; @@ -217,20 +237,20 @@ int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count) return ISO_SUCCESS; } -void iso_ring_buffer_writer_close(IsoRingBuffer *buf) +void iso_ring_buffer_writer_close(IsoRingBuffer *buf, int error) { pthread_mutex_lock(&buf->mutex); - buf->wend = 1; + buf->wend = error ? 2 : 1; /* ensure no reader is waiting */ pthread_cond_signal(&buf->empty); pthread_mutex_unlock(&buf->mutex); } -void iso_ring_buffer_reader_close(IsoRingBuffer *buf) +void iso_ring_buffer_reader_close(IsoRingBuffer *buf, int error) { pthread_mutex_lock(&buf->mutex); - buf->rend = 1; + buf->rend = error ? 2 : 1; /* ensure no writer is waiting */ pthread_cond_signal(&buf->full); @@ -252,3 +272,48 @@ unsigned int iso_ring_buffer_get_times_empty(IsoRingBuffer *buf) { return buf->times_empty; } + + +/** + * Get the status of the buffer used by a burn_source. + * + * @param b + * A burn_source previously obtained with + * iso_image_create_burn_source(). + * @param size + * Will be filled with the total size of the buffer, in bytes + * @param free_bytes + * Will be filled with the bytes currently available in buffer + * @return + * < 0 error, > 0 state: + * 1="active" : input and consumption are active + * 2="ending" : input has ended without error + * 3="failing" : input had error and ended, + * 5="abandoned" : consumption has ended prematurely + * 6="ended" : consumption has ended without input error + * 7="aborted" : consumption has ended after input error + */ +int iso_ring_buffer_get_status(struct burn_source *b, size_t *size, + size_t *free_bytes) +{ + int ret; + IsoRingBuffer *buf; + if (b == NULL) { + return ISO_NULL_POINTER; + } + buf = ((Ecma119Image*)(b->data))->buffer; + + /* get mutex */ + pthread_mutex_lock(&buf->mutex); + if (size) { + *size = buf->cap; + } + if (free_bytes) { + *free_bytes = buf->size; + } + + ret = (buf->rend + 1) + (buf->wend ? 4 : 0); + + pthread_mutex_unlock(&buf->mutex); + return ret; +} diff --git a/src/buffer.h b/src/buffer.h index 473e64d..c98e06c 100644 --- a/src/buffer.h +++ b/src/buffer.h @@ -12,21 +12,23 @@ #include #include -/* 2 MB buffer */ #define BLOCK_SIZE 2048 -#define BUFFER_SIZE 1024 typedef struct iso_ring_buffer IsoRingBuffer; /** * Create a new buffer. * - * The created buffer can be freed with iso_ring_buffer_free(3) + * The created buffer should be freed with iso_ring_buffer_free() * + * @param size + * Number of blocks in buffer. You should supply a number >= 32, otherwise + * size will be ignored and 32 will be used by default, which leads to a + * 64 KiB buffer. * @return * 1 success, < 0 error */ -int iso_ring_buffer_new(IsoRingBuffer **rbuf); +int iso_ring_buffer_new(size_t size, IsoRingBuffer **rbuf); /** * Free a given buffer @@ -64,15 +66,21 @@ int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count); * Close the buffer (to be called by the writer). * You have to explicity close the buffer when you don't have more data to * write, otherwise reader will be waiting forever. + * + * @param error + * Writer has finished prematurely due to an error */ -void iso_ring_buffer_writer_close(IsoRingBuffer *buf); +void iso_ring_buffer_writer_close(IsoRingBuffer *buf, int error); /** * Close the buffer (to be called by the reader). * If for any reason you don't want to read more data, you need to call this * to let the writer thread finish. + * + * @param error + * Reader has finished prematurely due to an error */ -void iso_ring_buffer_reader_close(IsoRingBuffer *buf); +void iso_ring_buffer_reader_close(IsoRingBuffer *buf, int error); /** * Get the times the buffer was full. diff --git a/src/ecma119.c b/src/ecma119.c index 770f0c2..dfb388c 100644 --- a/src/ecma119.c +++ b/src/ecma119.c @@ -808,13 +808,13 @@ void *write_function(void *arg) } } - iso_ring_buffer_writer_close(target->buffer); + iso_ring_buffer_writer_close(target->buffer, 0); pthread_exit(NULL); write_error: ; iso_msg_fatal(target->image->messenger, LIBISO_WRITE_ERROR, "Image write error, code %d", res); - iso_ring_buffer_writer_close(target->buffer); + iso_ring_buffer_writer_close(target->buffer, 1); pthread_exit(NULL); } @@ -976,7 +976,7 @@ int ecma119_image_new(IsoImage *src, Ecma119WriteOpts *opts, Ecma119Image **img) } /* create the ring buffer */ - ret = iso_ring_buffer_new(&target->buffer); + ret = iso_ring_buffer_new(opts->fifo_size, &target->buffer); if (ret < 0) { goto target_cleanup; } @@ -1097,7 +1097,7 @@ static void bs_free_data(struct burn_source *bs) Ecma119Image *target = (Ecma119Image*)bs->data; /* forces writer to stop if it is still running */ - iso_ring_buffer_reader_close(target->buffer); + iso_ring_buffer_reader_close(target->buffer, 0); /* wait until writer thread finishes */ pthread_join(target->wthread, NULL); diff --git a/src/libisofs.h b/src/libisofs.h index 314428d..b6f8a53 100644 --- a/src/libisofs.h +++ b/src/libisofs.h @@ -196,6 +196,13 @@ typedef struct * enought. */ uint8_t *overwrite; + + /** + * Size, in number of blocks, of the FIFO buffer used between the writer + * thread and the burn_source. You have to provide at least a 32 blocks + * buffer. + */ + size_t fifo_size; } Ecma119WriteOpts; typedef struct Iso_Data_Source IsoDataSource; @@ -1221,6 +1228,28 @@ void iso_data_source_unref(IsoDataSource *src); */ int iso_data_source_new_from_file(const char *path, IsoDataSource **src); +/** + * Get the status of the buffer used by a burn_source. + * + * @param b + * A burn_source previously obtained with + * iso_image_create_burn_source(). + * @param size + * Will be filled with the total size of the buffer, in bytes + * @param free_bytes + * Will be filled with the bytes currently available in buffer + * @return + * < 0 error, > 0 state: + * 1="active" : input and consumption are active + * 2="ending" : input has ended without error + * 3="failing" : input had error and ended, + * 5="abandoned" : consumption has ended prematurely + * 6="ended" : consumption has ended without input error + * 7="aborted" : consumption has ended after input error + */ +int iso_ring_buffer_get_status(struct burn_source *b, size_t *size, + size_t *free_bytes); + #define ISO_MSGS_MESSAGE_LEN 4096 /**