Add some more control to the ring buffer.
This commit is contained in:
parent
8bc1cf90a9
commit
9ebc4a1eef
@ -42,7 +42,7 @@ void *write_function(void *arg)
|
|||||||
int fd = open(data->path, O_RDONLY);
|
int fd = open(data->path, O_RDONLY);
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
fprintf(stderr, "Writer thread error: Can't open file");
|
fprintf(stderr, "Writer thread error: Can't open file");
|
||||||
iso_ring_buffer_writer_close(data->rbuf);
|
iso_ring_buffer_writer_close(data->rbuf, 1);
|
||||||
pthread_exit(NULL);
|
pthread_exit(NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,7 +64,7 @@ void *write_function(void *arg)
|
|||||||
fprintf(stderr, "Writer finish: %d\n", res);
|
fprintf(stderr, "Writer finish: %d\n", res);
|
||||||
|
|
||||||
close(fd);
|
close(fd);
|
||||||
iso_ring_buffer_writer_close(data->rbuf);
|
iso_ring_buffer_writer_close(data->rbuf, 0);
|
||||||
pthread_exit(NULL);
|
pthread_exit(NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -88,7 +88,7 @@ void *read_function(void *arg)
|
|||||||
}
|
}
|
||||||
fprintf(stderr, "Reader finish: %d\n", res);
|
fprintf(stderr, "Reader finish: %d\n", res);
|
||||||
|
|
||||||
iso_ring_buffer_reader_close(data->rbuf);
|
iso_ring_buffer_reader_close(data->rbuf, 0);
|
||||||
|
|
||||||
pthread_exit(NULL);
|
pthread_exit(NULL);
|
||||||
}
|
}
|
||||||
@ -105,7 +105,7 @@ int main(int argc, char **argv)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
res = iso_ring_buffer_new(&data.rbuf);
|
res = iso_ring_buffer_new(1024, &data.rbuf);
|
||||||
if (res < 0) {
|
if (res < 0) {
|
||||||
fprintf(stderr, "Can't create buffer\n");
|
fprintf(stderr, "Can't create buffer\n");
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -72,7 +72,8 @@ int main(int argc, char **argv)
|
|||||||
NULL, /* output charset */
|
NULL, /* output charset */
|
||||||
0, /* appendable */
|
0, /* appendable */
|
||||||
0, /* ms_block */
|
0, /* ms_block */
|
||||||
NULL /* overwrite */
|
NULL, /* overwrite */
|
||||||
|
1024 /* fifo_size */
|
||||||
};
|
};
|
||||||
|
|
||||||
while ((c = getopt(argc, argv, optstring)) != -1) {
|
while ((c = getopt(argc, argv, optstring)) != -1) {
|
||||||
|
@ -55,7 +55,8 @@ int main(int argc, char **argv)
|
|||||||
NULL, /* output charset */
|
NULL, /* output charset */
|
||||||
0, /* appendable */
|
0, /* appendable */
|
||||||
0, /* ms_block */
|
0, /* ms_block */
|
||||||
NULL /* overwrite */
|
NULL, /* overwrite */
|
||||||
|
1024 /* fifo_size */
|
||||||
};
|
};
|
||||||
struct iso_read_opts ropts = {
|
struct iso_read_opts ropts = {
|
||||||
0, /* block */
|
0, /* block */
|
||||||
|
@ -50,7 +50,8 @@ int main(int argc, char **argv)
|
|||||||
NULL, /* output charset */
|
NULL, /* output charset */
|
||||||
0, /* appendable */
|
0, /* appendable */
|
||||||
0, /* ms_block */
|
0, /* ms_block */
|
||||||
NULL /* overwrite */
|
NULL, /* overwrite */
|
||||||
|
1024 /* fifo_size */
|
||||||
};
|
};
|
||||||
struct iso_read_opts ropts = {
|
struct iso_read_opts ropts = {
|
||||||
0, /* block */
|
0, /* block */
|
||||||
|
@ -50,7 +50,8 @@ int main(int argc, char **argv)
|
|||||||
NULL, /* output charset */
|
NULL, /* output charset */
|
||||||
0, /* appendable */
|
0, /* appendable */
|
||||||
0, /* ms_block */
|
0, /* ms_block */
|
||||||
NULL /* overwrite */
|
NULL, /* overwrite */
|
||||||
|
1024 /* fifo_size */
|
||||||
};
|
};
|
||||||
struct iso_read_opts ropts = {
|
struct iso_read_opts ropts = {
|
||||||
0, /* block */
|
0, /* block */
|
||||||
|
107
src/buffer.c
107
src/buffer.c
@ -18,19 +18,24 @@
|
|||||||
|
|
||||||
#include "buffer.h"
|
#include "buffer.h"
|
||||||
#include "error.h"
|
#include "error.h"
|
||||||
|
#include "libburn/libburn.h"
|
||||||
|
#include "ecma119.h"
|
||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
#define BUFFER_CAPACITY BLOCK_SIZE * BUFFER_SIZE
|
|
||||||
|
|
||||||
#ifndef MIN
|
#ifndef MIN
|
||||||
# define MIN(a, b) (((a) < (b)) ? (a) : (b))
|
# define MIN(a, b) (((a) < (b)) ? (a) : (b))
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
struct iso_ring_buffer
|
struct iso_ring_buffer
|
||||||
{
|
{
|
||||||
uint8_t buf[BLOCK_SIZE * BUFFER_SIZE];
|
uint8_t *buf;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Max number of bytes in buffer
|
||||||
|
*/
|
||||||
|
size_t cap;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Number of bytes available.
|
* Number of bytes available.
|
||||||
@ -41,9 +46,12 @@ struct iso_ring_buffer
|
|||||||
size_t rpos;
|
size_t rpos;
|
||||||
size_t wpos;
|
size_t wpos;
|
||||||
|
|
||||||
/* flags to report if read or writer threads ends execution */
|
/*
|
||||||
unsigned int rend :1;
|
* flags to report if read or writer threads ends execution
|
||||||
unsigned int wend :1;
|
* 0 not finished, 1 finished ok, 2 finish with error
|
||||||
|
*/
|
||||||
|
unsigned int rend :2;
|
||||||
|
unsigned int wend :2;
|
||||||
|
|
||||||
/* just for statistical purposes */
|
/* just for statistical purposes */
|
||||||
unsigned int times_full;
|
unsigned int times_full;
|
||||||
@ -57,12 +65,16 @@ struct iso_ring_buffer
|
|||||||
/**
|
/**
|
||||||
* Create a new buffer.
|
* Create a new buffer.
|
||||||
*
|
*
|
||||||
* The created buffer can be freed with free(3)
|
* The created buffer should be freed with iso_ring_buffer_free()
|
||||||
*
|
*
|
||||||
|
* @param size
|
||||||
|
* Number of blocks in buffer. You should supply a number >= 32, otherwise
|
||||||
|
* size will be ignored and 32 will be used by default, which leads to a
|
||||||
|
* 64 KiB buffer.
|
||||||
* @return
|
* @return
|
||||||
* 1 success, < 0 error
|
* 1 success, < 0 error
|
||||||
*/
|
*/
|
||||||
int iso_ring_buffer_new(IsoRingBuffer **rbuf)
|
int iso_ring_buffer_new(size_t size, IsoRingBuffer **rbuf)
|
||||||
{
|
{
|
||||||
IsoRingBuffer *buffer;
|
IsoRingBuffer *buffer;
|
||||||
|
|
||||||
@ -75,6 +87,13 @@ int iso_ring_buffer_new(IsoRingBuffer **rbuf)
|
|||||||
return ISO_MEM_ERROR;
|
return ISO_MEM_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
buffer->cap = (size > 32 ? size : 32) * BLOCK_SIZE;
|
||||||
|
buffer->buf = malloc(buffer->cap);
|
||||||
|
if (buffer->buf == NULL) {
|
||||||
|
free(buffer);
|
||||||
|
return ISO_MEM_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
buffer->size = 0;
|
buffer->size = 0;
|
||||||
buffer->wpos = 0;
|
buffer->wpos = 0;
|
||||||
buffer->rpos = 0;
|
buffer->rpos = 0;
|
||||||
@ -95,6 +114,7 @@ int iso_ring_buffer_new(IsoRingBuffer **rbuf)
|
|||||||
|
|
||||||
void iso_ring_buffer_free(IsoRingBuffer *buf)
|
void iso_ring_buffer_free(IsoRingBuffer *buf)
|
||||||
{
|
{
|
||||||
|
free(buf->buf);
|
||||||
pthread_mutex_destroy(&buf->mutex);
|
pthread_mutex_destroy(&buf->mutex);
|
||||||
pthread_cond_destroy(&buf->empty);
|
pthread_cond_destroy(&buf->empty);
|
||||||
pthread_cond_destroy(&buf->full);
|
pthread_cond_destroy(&buf->full);
|
||||||
@ -128,11 +148,11 @@ int iso_ring_buffer_write(IsoRingBuffer *buf, uint8_t *data, size_t count)
|
|||||||
|
|
||||||
pthread_mutex_lock(&buf->mutex);
|
pthread_mutex_lock(&buf->mutex);
|
||||||
|
|
||||||
while (buf->size == BUFFER_CAPACITY) {
|
while (buf->size == buf->cap) {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Note. There's only a writer, so we have no race conditions.
|
* Note. There's only a writer, so we have no race conditions.
|
||||||
* Thus, the while(buf->size == BUFFER_CAPACITY) is used here
|
* Thus, the while(buf->size == buf->cap) is used here
|
||||||
* only to propertly detect the reader has been cancelled
|
* only to propertly detect the reader has been cancelled
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@ -146,12 +166,12 @@ int iso_ring_buffer_write(IsoRingBuffer *buf, uint8_t *data, size_t count)
|
|||||||
pthread_cond_wait(&buf->full, &buf->mutex);
|
pthread_cond_wait(&buf->full, &buf->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
len = MIN(count - bytes_write, BUFFER_CAPACITY - buf->size);
|
len = MIN(count - bytes_write, buf->cap - buf->size);
|
||||||
if (buf->wpos + len > BUFFER_CAPACITY) {
|
if (buf->wpos + len > buf->cap) {
|
||||||
len = BUFFER_CAPACITY - buf->wpos;
|
len = buf->cap - buf->wpos;
|
||||||
}
|
}
|
||||||
memcpy(buf->buf + buf->wpos, data + bytes_write, len);
|
memcpy(buf->buf + buf->wpos, data + bytes_write, len);
|
||||||
buf->wpos = (buf->wpos + len) % (BUFFER_CAPACITY);
|
buf->wpos = (buf->wpos + len) % (buf->cap);
|
||||||
bytes_write += len;
|
bytes_write += len;
|
||||||
buf->size += len;
|
buf->size += len;
|
||||||
|
|
||||||
@ -202,11 +222,11 @@ int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count)
|
|||||||
}
|
}
|
||||||
|
|
||||||
len = MIN(count - bytes_read, buf->size);
|
len = MIN(count - bytes_read, buf->size);
|
||||||
if (buf->rpos + len > BUFFER_CAPACITY) {
|
if (buf->rpos + len > buf->cap) {
|
||||||
len = BUFFER_CAPACITY - buf->rpos;
|
len = buf->cap - buf->rpos;
|
||||||
}
|
}
|
||||||
memcpy(dest + bytes_read, buf->buf + buf->rpos, len);
|
memcpy(dest + bytes_read, buf->buf + buf->rpos, len);
|
||||||
buf->rpos = (buf->rpos + len) % (BUFFER_CAPACITY);
|
buf->rpos = (buf->rpos + len) % (buf->cap);
|
||||||
bytes_read += len;
|
bytes_read += len;
|
||||||
buf->size -= len;
|
buf->size -= len;
|
||||||
|
|
||||||
@ -217,20 +237,20 @@ int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count)
|
|||||||
return ISO_SUCCESS;
|
return ISO_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void iso_ring_buffer_writer_close(IsoRingBuffer *buf)
|
void iso_ring_buffer_writer_close(IsoRingBuffer *buf, int error)
|
||||||
{
|
{
|
||||||
pthread_mutex_lock(&buf->mutex);
|
pthread_mutex_lock(&buf->mutex);
|
||||||
buf->wend = 1;
|
buf->wend = error ? 2 : 1;
|
||||||
|
|
||||||
/* ensure no reader is waiting */
|
/* ensure no reader is waiting */
|
||||||
pthread_cond_signal(&buf->empty);
|
pthread_cond_signal(&buf->empty);
|
||||||
pthread_mutex_unlock(&buf->mutex);
|
pthread_mutex_unlock(&buf->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
void iso_ring_buffer_reader_close(IsoRingBuffer *buf)
|
void iso_ring_buffer_reader_close(IsoRingBuffer *buf, int error)
|
||||||
{
|
{
|
||||||
pthread_mutex_lock(&buf->mutex);
|
pthread_mutex_lock(&buf->mutex);
|
||||||
buf->rend = 1;
|
buf->rend = error ? 2 : 1;
|
||||||
|
|
||||||
/* ensure no writer is waiting */
|
/* ensure no writer is waiting */
|
||||||
pthread_cond_signal(&buf->full);
|
pthread_cond_signal(&buf->full);
|
||||||
@ -252,3 +272,48 @@ unsigned int iso_ring_buffer_get_times_empty(IsoRingBuffer *buf)
|
|||||||
{
|
{
|
||||||
return buf->times_empty;
|
return buf->times_empty;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the status of the buffer used by a burn_source.
|
||||||
|
*
|
||||||
|
* @param b
|
||||||
|
* A burn_source previously obtained with
|
||||||
|
* iso_image_create_burn_source().
|
||||||
|
* @param size
|
||||||
|
* Will be filled with the total size of the buffer, in bytes
|
||||||
|
* @param free_bytes
|
||||||
|
* Will be filled with the bytes currently available in buffer
|
||||||
|
* @return
|
||||||
|
* < 0 error, > 0 state:
|
||||||
|
* 1="active" : input and consumption are active
|
||||||
|
* 2="ending" : input has ended without error
|
||||||
|
* 3="failing" : input had error and ended,
|
||||||
|
* 5="abandoned" : consumption has ended prematurely
|
||||||
|
* 6="ended" : consumption has ended without input error
|
||||||
|
* 7="aborted" : consumption has ended after input error
|
||||||
|
*/
|
||||||
|
int iso_ring_buffer_get_status(struct burn_source *b, size_t *size,
|
||||||
|
size_t *free_bytes)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
IsoRingBuffer *buf;
|
||||||
|
if (b == NULL) {
|
||||||
|
return ISO_NULL_POINTER;
|
||||||
|
}
|
||||||
|
buf = ((Ecma119Image*)(b->data))->buffer;
|
||||||
|
|
||||||
|
/* get mutex */
|
||||||
|
pthread_mutex_lock(&buf->mutex);
|
||||||
|
if (size) {
|
||||||
|
*size = buf->cap;
|
||||||
|
}
|
||||||
|
if (free_bytes) {
|
||||||
|
*free_bytes = buf->size;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = (buf->rend + 1) + (buf->wend ? 4 : 0);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&buf->mutex);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
20
src/buffer.h
20
src/buffer.h
@ -12,21 +12,23 @@
|
|||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
/* 2 MB buffer */
|
|
||||||
#define BLOCK_SIZE 2048
|
#define BLOCK_SIZE 2048
|
||||||
#define BUFFER_SIZE 1024
|
|
||||||
|
|
||||||
typedef struct iso_ring_buffer IsoRingBuffer;
|
typedef struct iso_ring_buffer IsoRingBuffer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new buffer.
|
* Create a new buffer.
|
||||||
*
|
*
|
||||||
* The created buffer can be freed with iso_ring_buffer_free(3)
|
* The created buffer should be freed with iso_ring_buffer_free()
|
||||||
*
|
*
|
||||||
|
* @param size
|
||||||
|
* Number of blocks in buffer. You should supply a number >= 32, otherwise
|
||||||
|
* size will be ignored and 32 will be used by default, which leads to a
|
||||||
|
* 64 KiB buffer.
|
||||||
* @return
|
* @return
|
||||||
* 1 success, < 0 error
|
* 1 success, < 0 error
|
||||||
*/
|
*/
|
||||||
int iso_ring_buffer_new(IsoRingBuffer **rbuf);
|
int iso_ring_buffer_new(size_t size, IsoRingBuffer **rbuf);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Free a given buffer
|
* Free a given buffer
|
||||||
@ -64,15 +66,21 @@ int iso_ring_buffer_read(IsoRingBuffer *buf, uint8_t *dest, size_t count);
|
|||||||
* Close the buffer (to be called by the writer).
|
* Close the buffer (to be called by the writer).
|
||||||
* You have to explicity close the buffer when you don't have more data to
|
* You have to explicity close the buffer when you don't have more data to
|
||||||
* write, otherwise reader will be waiting forever.
|
* write, otherwise reader will be waiting forever.
|
||||||
|
*
|
||||||
|
* @param error
|
||||||
|
* Writer has finished prematurely due to an error
|
||||||
*/
|
*/
|
||||||
void iso_ring_buffer_writer_close(IsoRingBuffer *buf);
|
void iso_ring_buffer_writer_close(IsoRingBuffer *buf, int error);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close the buffer (to be called by the reader).
|
* Close the buffer (to be called by the reader).
|
||||||
* If for any reason you don't want to read more data, you need to call this
|
* If for any reason you don't want to read more data, you need to call this
|
||||||
* to let the writer thread finish.
|
* to let the writer thread finish.
|
||||||
|
*
|
||||||
|
* @param error
|
||||||
|
* Reader has finished prematurely due to an error
|
||||||
*/
|
*/
|
||||||
void iso_ring_buffer_reader_close(IsoRingBuffer *buf);
|
void iso_ring_buffer_reader_close(IsoRingBuffer *buf, int error);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the times the buffer was full.
|
* Get the times the buffer was full.
|
||||||
|
@ -808,13 +808,13 @@ void *write_function(void *arg)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
iso_ring_buffer_writer_close(target->buffer);
|
iso_ring_buffer_writer_close(target->buffer, 0);
|
||||||
pthread_exit(NULL);
|
pthread_exit(NULL);
|
||||||
|
|
||||||
write_error: ;
|
write_error: ;
|
||||||
iso_msg_fatal(target->image->messenger, LIBISO_WRITE_ERROR,
|
iso_msg_fatal(target->image->messenger, LIBISO_WRITE_ERROR,
|
||||||
"Image write error, code %d", res);
|
"Image write error, code %d", res);
|
||||||
iso_ring_buffer_writer_close(target->buffer);
|
iso_ring_buffer_writer_close(target->buffer, 1);
|
||||||
pthread_exit(NULL);
|
pthread_exit(NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -976,7 +976,7 @@ int ecma119_image_new(IsoImage *src, Ecma119WriteOpts *opts, Ecma119Image **img)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* create the ring buffer */
|
/* create the ring buffer */
|
||||||
ret = iso_ring_buffer_new(&target->buffer);
|
ret = iso_ring_buffer_new(opts->fifo_size, &target->buffer);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
goto target_cleanup;
|
goto target_cleanup;
|
||||||
}
|
}
|
||||||
@ -1097,7 +1097,7 @@ static void bs_free_data(struct burn_source *bs)
|
|||||||
Ecma119Image *target = (Ecma119Image*)bs->data;
|
Ecma119Image *target = (Ecma119Image*)bs->data;
|
||||||
|
|
||||||
/* forces writer to stop if it is still running */
|
/* forces writer to stop if it is still running */
|
||||||
iso_ring_buffer_reader_close(target->buffer);
|
iso_ring_buffer_reader_close(target->buffer, 0);
|
||||||
|
|
||||||
/* wait until writer thread finishes */
|
/* wait until writer thread finishes */
|
||||||
pthread_join(target->wthread, NULL);
|
pthread_join(target->wthread, NULL);
|
||||||
|
@ -196,6 +196,13 @@ typedef struct
|
|||||||
* enought.
|
* enought.
|
||||||
*/
|
*/
|
||||||
uint8_t *overwrite;
|
uint8_t *overwrite;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Size, in number of blocks, of the FIFO buffer used between the writer
|
||||||
|
* thread and the burn_source. You have to provide at least a 32 blocks
|
||||||
|
* buffer.
|
||||||
|
*/
|
||||||
|
size_t fifo_size;
|
||||||
} Ecma119WriteOpts;
|
} Ecma119WriteOpts;
|
||||||
|
|
||||||
typedef struct Iso_Data_Source IsoDataSource;
|
typedef struct Iso_Data_Source IsoDataSource;
|
||||||
@ -1221,6 +1228,28 @@ void iso_data_source_unref(IsoDataSource *src);
|
|||||||
*/
|
*/
|
||||||
int iso_data_source_new_from_file(const char *path, IsoDataSource **src);
|
int iso_data_source_new_from_file(const char *path, IsoDataSource **src);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the status of the buffer used by a burn_source.
|
||||||
|
*
|
||||||
|
* @param b
|
||||||
|
* A burn_source previously obtained with
|
||||||
|
* iso_image_create_burn_source().
|
||||||
|
* @param size
|
||||||
|
* Will be filled with the total size of the buffer, in bytes
|
||||||
|
* @param free_bytes
|
||||||
|
* Will be filled with the bytes currently available in buffer
|
||||||
|
* @return
|
||||||
|
* < 0 error, > 0 state:
|
||||||
|
* 1="active" : input and consumption are active
|
||||||
|
* 2="ending" : input has ended without error
|
||||||
|
* 3="failing" : input had error and ended,
|
||||||
|
* 5="abandoned" : consumption has ended prematurely
|
||||||
|
* 6="ended" : consumption has ended without input error
|
||||||
|
* 7="aborted" : consumption has ended after input error
|
||||||
|
*/
|
||||||
|
int iso_ring_buffer_get_status(struct burn_source *b, size_t *size,
|
||||||
|
size_t *free_bytes);
|
||||||
|
|
||||||
#define ISO_MSGS_MESSAGE_LEN 4096
|
#define ISO_MSGS_MESSAGE_LEN 4096
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user