Implemented the ring buffer of burn_fifo_source_new() object

This commit is contained in:
Thomas Schmitt 2007-10-03 22:35:37 +00:00
parent 771e659a43
commit 291ef125b0
4 changed files with 383 additions and 56 deletions

View File

@ -1 +1 @@
#define Cdrskin_timestamP "2007.10.03.115550"
#define Cdrskin_timestamP "2007.10.03.223649"

View File

@ -432,7 +432,7 @@ void burn_disc_write(struct burn_write_opts *opts, struct burn_disc *disc)
static void *fifo_worker_func(struct w_list *w)
{
burn_fifo_source_shuffler(w->u.fifo.source, w->u.fifo.flag);
burn_fifo_source_shoveller(w->u.fifo.source, w->u.fifo.flag);
remove_worker(pthread_self());
return NULL;
}
@ -445,9 +445,8 @@ int burn_fifo_start(struct burn_source *source, int flag)
fs->is_started = -1;
/* >>> create and set up ring buffer */;
/* >>> for now: only 1 , later: fs->chunks */
fs->buf = calloc(fs->chunksize, 1);
/* create and set up ring buffer */;
fs->buf = calloc(fs->chunksize, fs->chunks);
if (fs->buf == NULL) {
/* >>> could not start ring buffer */;
return -1;

View File

@ -8,6 +8,7 @@
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include "source.h"
#include "libburn.h"
#include "file.h"
@ -188,7 +189,7 @@ struct burn_source *burn_fd_source_new(int datafd, int subfd, off_t size)
/* The fifo mechanism consists of a burn_source proxy which is here,
a thread management team which is located in async.c,
and a synchronous shuffler which is here.
and a synchronous shoveller which is here.
*/
static int fifo_read(struct burn_source *source,
@ -202,7 +203,7 @@ static int fifo_read(struct burn_source *source,
ret = burn_fifo_start(source, 0);
if (ret <= 0) {
/* >>> cannot start fifo thread */;
/* >>> msg: cannot start fifo thread */;
return -1;
}
@ -244,53 +245,7 @@ static void fifo_free(struct burn_source *source)
}
struct burn_source *burn_fifo_source_new(struct burn_source *inp,
int chunksize, int chunks, int flag)
{
struct burn_source_fifo *fs;
struct burn_source *src;
int ret, outlet[2];
ret = pipe(outlet);
if (ret == -1) {
/* >>> error on pipe creation */;
return NULL;
}
fs = malloc(sizeof(struct burn_source_fifo));
if (fs == NULL)
return NULL;
fs->is_started = 0;
fs->thread_pid = 0;
fs->thread_pid_valid = 0;
fs->inp = inp;
fs->outlet[0] = outlet[0];
fs->outlet[1] = outlet[1];
fs->chunksize = chunksize;
if (chunksize <= 0)
fs->chunksize = 2048;
fs->chunks = chunks;
fs->buf = NULL;
fs->in_counter = fs->out_counter = 0;
src = burn_source_new();
if (src == NULL) {
free((char *) fs->buf);
free((char *) fs);
return NULL;
}
src->read = fifo_read;
src->read_sub = NULL;
src->get_size = fifo_get_size;
src->set_size = fifo_set_size;
src->free_data = fifo_free;
src->data = fs;
return src;
}
int burn_fifo_source_shuffler(struct burn_source *source, int flag)
int burn_fifo_source_shoveller_og(struct burn_source *source, int flag)
{
struct burn_source_fifo *fs = source->data;
int ret;
@ -327,3 +282,370 @@ int burn_fifo_source_shuffler(struct burn_source *source, int flag)
}
/* ts A71003 */
/* ----------------------------- fifo ng ------------------------- */
/* The fifo mechanism consists of a burn_source proxy which is here,
a thread management team which is located in async.c,
and a synchronous shoveller which is here.
*/
static int fifo_sleep(int flag)
{
static struct timespec sleeptime = { 0, 50000000}; /* 50 ms */
return nanosleep(&sleeptime, NULL);
}
static int fifo_read_ng(struct burn_source *source,
unsigned char *buffer,
int size)
{
struct burn_source_fifo *fs = source->data;
int ret, todo, rpos, bufsize, diff;
if (fs->end_of_consumption) {
/* >>> msg: reading has been ended already */;
return 0;
}
if (fs->is_started == 0) {
ret = burn_fifo_start(source, 0);
if (ret <= 0) {
/* >>> msg: cannot start fifo thread */;
fs->end_of_consumption = 1;
return -1;
}
fs->is_started = 1;
}
if (size == 0)
return 0;
/* Reading from the ring buffer */
/* This needs no mutex because each volatile variable has one thread
which may write and the other which only reads and is aware of
volatility.
The feeder of the ringbuffer is in burn_fifo_source_shoveller_ng().
*/
todo = size;
bufsize = fs->chunksize * fs->chunks;
while (todo > 0) {
/* readpos is not volatile here , writepos is volatile */
rpos = fs->buf_readpos;
while (rpos == fs->buf_writepos) {
if (fs->end_of_input)
break;
if (fs->input_error) {
if (todo < size) /* deliver partial buffer */
break;
fs->end_of_consumption = 1;
/* >>> msg: report fs->input_error as errno */;
return -1;
}
fifo_sleep(0);
}
diff = fs->buf_writepos - rpos; /* read volatile only once */
if (diff == 0)
break;
if (diff > 0)
/* diff bytes are available */;
else
/* at least (bufsize - rpos) bytes are available */
diff = bufsize - rpos;
if (diff > todo)
diff = todo;
memcpy(buffer, fs->buf+(size-todo)+rpos, diff);
fs->buf_readpos += diff;
if (fs->buf_readpos >= bufsize)
fs->buf_readpos = 0;
todo -= diff;
}
if (size - todo <= 0)
fs->end_of_consumption = 1;
else
fs->out_counter += size - todo;
/*
fprintf(stderr,
"libburn_EXPERIMENTAL: read= %d , pos= %d , out_count= %.f\n",
(size - todo), fs->buf_readpos, (double) fs->out_counter);
*/
return (size - todo);
}
static off_t fifo_get_size_ng(struct burn_source *source)
{
struct burn_source_fifo *fs = source->data;
return fs->inp->get_size(fs->inp);
}
static int fifo_set_size_ng(struct burn_source *source, off_t size)
{
struct burn_source_fifo *fs = source->data;
return fs->inp->set_size(fs->inp, size);
}
static void fifo_free_ng(struct burn_source *source)
{
struct burn_source_fifo *fs = source->data;
if (fs->inp != NULL)
burn_source_free(fs->inp);
if (fs->buf != NULL)
free(fs->buf);
free((char *) fs);
}
int burn_fifo_source_shoveller_ng(struct burn_source *source, int flag)
{
struct burn_source_fifo *fs = source->data;
int ret, bufsize, diff, wpos, rpos, trans_end, free_bytes;
char *bufpt;
fs->thread_pid = getpid();
fs->thread_pid_valid = 1;
bufsize = fs->chunksize * fs->chunks;
while (!fs->end_of_consumption) {
/* wait for enough buffer space available */
wpos = fs->buf_writepos;
while (1) {
rpos = fs->buf_readpos;
diff = rpos - wpos;
trans_end = 0;
if (diff == 0)
free_bytes = bufsize - 1;
else if (diff > 0)
free_bytes = diff - 1;
else {
free_bytes = (bufsize - wpos) + rpos - 1;
if (bufsize - wpos < fs->chunksize)
trans_end = 1;
}
if (free_bytes >= fs->chunksize)
break;
fifo_sleep(0);
}
/* prepare the receiving memory */
bufpt = fs->buf + wpos;
if (trans_end) {
bufpt = calloc(fs->chunksize, 1);
if (bufpt == NULL) {
/* >>> msg: out of memory */;
fs->input_error = ENOMEM;
break;
}
}
/* Obtain next chunk */
ret = fs->inp->read(fs->inp, (unsigned char *) fs->buf,
fs->chunksize);
if (ret > 0)
fs->in_counter += ret;
else if (ret == 0)
break; /* EOF */
else {
fs->input_error = errno;
if(errno == 0)
fs->input_error = EIO;
break;
}
/* activate read chunk */
if (ret > fs->chunksize) /* beware of ill custom burn_source */
ret = fs->chunksize;
if (trans_end) {
/* copy to end of buffer */
memcpy(fs->buf + wpos, bufpt, bufsize - wpos);
/* copy to start of buffer */
memcpy(fs->buf, bufpt + (bufsize - wpos),
fs->chunksize - (bufsize - wpos));
free(bufpt);
if (ret >= bufsize - wpos)
fs->buf_writepos = ret - (bufsize - wpos);
else
fs->buf_writepos += ret;
} else if (fs->buf_writepos + ret == bufsize)
fs->buf_writepos = 0;
else
fs->buf_writepos += ret;
/*
fprintf(stderr, "[%2.2d%%] ",
(int) (100.0 - 100.0 * ((double) free_bytes) /
(double) bufsize));
fprintf(stderr,
"libburn_EXPERIMENTAL: writepos= %d ,in_count = %.f\n",
fs->buf_writepos, (double) fs->in_counter);
*/
}
if (!fs->end_of_consumption)
fs->end_of_input = 1;
/* wait for end of reading by consumer */;
while (fs->buf_readpos != fs->buf_writepos && !fs->end_of_consumption)
fifo_sleep(0);
/* destroy ring buffer */;
if (!fs->end_of_consumption)
fs->end_of_consumption = 2; /* Claim stop of consumption */
/* This is not prone to race conditions because either the consumer
indicated hangup by fs->end_of_consumption = 1 or the consumer set
fs->buf_readpos to a value indicating the buffer is empty.
So in both cases the consumer is aware that reading is futile
or even fatal.
*/
free(fs->buf);
fs->buf = NULL;
return (fs->input_error == 0);
}
#define Libburn_fifo_nG 1
int burn_fifo_source_shoveller(struct burn_source *source, int flag)
{
#ifndef Libburn_fifo_nG
return burn_fifo_source_shoveller_og(source, flag);
#else
return burn_fifo_source_shoveller_ng(source, flag);
#endif
}
struct burn_source *burn_fifo_source_new(struct burn_source *inp,
int chunksize, int chunks, int flag)
{
struct burn_source_fifo *fs;
struct burn_source *src;
#ifndef Libburn_fifo_nG
int ret, outlet[2];
#endif
if (((double) chunksize) * ((double) chunks) > 1024.0*1024.0*1024.0) {
/* >>> buffer larger than 1 GB */;
return NULL;
}
if (chunksize < 1 || chunks < 2) {
/* >>> buffer too small */;
return NULL;
}
#ifndef Libburn_fifo_nG
outlet[0] = outlet[1] = -1;
ret = pipe(outlet);
if (ret == -1) {
/* >>> error on pipe creation */;
return NULL;
}
#endif /* ! Libburn_fifo_nG */
fs = malloc(sizeof(struct burn_source_fifo));
if (fs == NULL)
return NULL;
fs->magic[0] = 'f'; fs->magic[1] = 'i';
fs->magic[2] = 'f'; fs->magic[3] = 'o';
fs->is_started = 0;
fs->thread_pid = 0;
fs->thread_pid_valid = 0;
fs->inp = NULL; /* set later */
#ifndef Libburn_fifo_nG
fs->outlet[0] = outlet[0];
fs->outlet[1] = outlet[1];
#endif
fs->chunksize = chunksize;
fs->chunks = chunks;
fs->buf = NULL;
fs->buf_writepos = fs->buf_readpos = 0;
fs->end_of_input = 0;
fs->input_error = 0;
fs->end_of_consumption = 0;
fs->in_counter = fs->out_counter = 0;
src = burn_source_new();
if (src == NULL) {
free((char *) fs);
return NULL;
}
#ifndef Libburn_fifo_nG
src->read = fifo_read;
src->read_sub = NULL;
src->get_size = fifo_get_size;
src->set_size = fifo_set_size;
src->free_data = fifo_free;
#else /* Libburn_fifo_nG */
src->read = fifo_read_ng;
src->read_sub = NULL;
src->get_size = fifo_get_size_ng;
src->set_size = fifo_set_size_ng;
src->free_data = fifo_free_ng;
#endif /* ! Libburn_fifo_nG */
src->data = fs;
fs->inp = inp;
inp->refcount++; /* make sure inp lives longer than src */
return src;
}
/* ts A71003 : API */
int burn_fifo_inquire_status(struct burn_source *source, int *size,
int *free_bytes)
{
struct burn_source_fifo *fs = source->data;
int ret = 0, diff, wpos, rpos;
if (fs->magic[0] != 'f' || fs->magic[1] != 'i' ||
fs->magic[2] != 'f' || fs->magic[3] != 'o') {
/* >>> not a fifo burn_source */;
return -1;
}
*size = fs->chunksize * fs->chunks;
rpos = fs->buf_readpos;
wpos = fs->buf_writepos;
diff = rpos - wpos;
if (diff == 0)
*free_bytes = *size - 1;
else if (diff > 0)
*free_bytes = diff - 1;
else
*free_bytes = (*size - wpos) + rpos - 1;
if (fs->end_of_consumption == 1)
ret |= 4;
if (fs->input_error)
ret |= 3;
else if (fs->end_of_input)
ret |= 2;
else
ret |= 1;
return ret;
}

View File

@ -15,6 +15,7 @@ struct burn_source_file
/* ts A70930 */
struct burn_source_fifo {
char magic[4];
/* The fifo stays inactive and unequipped with eventual resources
until its read() method is called for the first time.
@ -35,10 +36,15 @@ struct burn_source_fifo {
/* <<< currently it is only a pipe */
int outlet[2];
/* >>> later it will be a ring buffer mechanism */
/* The ring buffer mechanism */
int chunksize;
int chunks;
char *buf;
volatile int buf_writepos;
volatile int buf_readpos;
volatile int end_of_input;
volatile int input_error;
volatile int end_of_consumption;
off_t in_counter;
off_t out_counter;
@ -48,7 +54,7 @@ struct burn_source_fifo {
/** The worker behind the fifo thread.
Gets started from burn_fifo_start() in async.c
*/
int burn_fifo_source_shuffler(struct burn_source *source, int flag);
int burn_fifo_source_shoveller(struct burn_source *source, int flag);
#endif /* LIBBURN__FILE_H */