diff --git a/cdrskin/cdrskin_timestamp.h b/cdrskin/cdrskin_timestamp.h index f21ecb0..c63da2f 100644 --- a/cdrskin/cdrskin_timestamp.h +++ b/cdrskin/cdrskin_timestamp.h @@ -1 +1 @@ -#define Cdrskin_timestamP "2007.10.03.115550" +#define Cdrskin_timestamP "2007.10.03.223649" diff --git a/libburn/async.c b/libburn/async.c index 4bc807b..79a5d04 100644 --- a/libburn/async.c +++ b/libburn/async.c @@ -432,7 +432,7 @@ void burn_disc_write(struct burn_write_opts *opts, struct burn_disc *disc) static void *fifo_worker_func(struct w_list *w) { - burn_fifo_source_shuffler(w->u.fifo.source, w->u.fifo.flag); + burn_fifo_source_shoveller(w->u.fifo.source, w->u.fifo.flag); remove_worker(pthread_self()); return NULL; } @@ -445,9 +445,8 @@ int burn_fifo_start(struct burn_source *source, int flag) fs->is_started = -1; - /* >>> create and set up ring buffer */; - /* >>> for now: only 1 , later: fs->chunks */ - fs->buf = calloc(fs->chunksize, 1); + /* create and set up ring buffer */; + fs->buf = calloc(fs->chunksize, fs->chunks); if (fs->buf == NULL) { /* >>> could not start ring buffer */; return -1; diff --git a/libburn/file.c b/libburn/file.c index fe3d452..a859998 100644 --- a/libburn/file.c +++ b/libburn/file.c @@ -8,6 +8,7 @@ #include #include #include +#include #include "source.h" #include "libburn.h" #include "file.h" @@ -188,7 +189,7 @@ struct burn_source *burn_fd_source_new(int datafd, int subfd, off_t size) /* The fifo mechanism consists of a burn_source proxy which is here, a thread management team which is located in async.c, - and a synchronous shuffler which is here. + and a synchronous shoveller which is here. */ static int fifo_read(struct burn_source *source, @@ -202,7 +203,7 @@ static int fifo_read(struct burn_source *source, ret = burn_fifo_start(source, 0); if (ret <= 0) { - /* >>> cannot start fifo thread */; + /* >>> msg: cannot start fifo thread */; return -1; } @@ -244,53 +245,7 @@ static void fifo_free(struct burn_source *source) } -struct burn_source *burn_fifo_source_new(struct burn_source *inp, - int chunksize, int chunks, int flag) -{ - struct burn_source_fifo *fs; - struct burn_source *src; - int ret, outlet[2]; - - ret = pipe(outlet); - if (ret == -1) { - /* >>> error on pipe creation */; - return NULL; - } - - fs = malloc(sizeof(struct burn_source_fifo)); - if (fs == NULL) - return NULL; - fs->is_started = 0; - fs->thread_pid = 0; - fs->thread_pid_valid = 0; - fs->inp = inp; - fs->outlet[0] = outlet[0]; - fs->outlet[1] = outlet[1]; - fs->chunksize = chunksize; - if (chunksize <= 0) - fs->chunksize = 2048; - fs->chunks = chunks; - fs->buf = NULL; - fs->in_counter = fs->out_counter = 0; - - src = burn_source_new(); - if (src == NULL) { - free((char *) fs->buf); - free((char *) fs); - return NULL; - } - - src->read = fifo_read; - src->read_sub = NULL; - src->get_size = fifo_get_size; - src->set_size = fifo_set_size; - src->free_data = fifo_free; - src->data = fs; - return src; -} - - -int burn_fifo_source_shuffler(struct burn_source *source, int flag) +int burn_fifo_source_shoveller_og(struct burn_source *source, int flag) { struct burn_source_fifo *fs = source->data; int ret; @@ -327,3 +282,370 @@ int burn_fifo_source_shuffler(struct burn_source *source, int flag) } +/* ts A71003 */ +/* ----------------------------- fifo ng ------------------------- */ + +/* The fifo mechanism consists of a burn_source proxy which is here, + a thread management team which is located in async.c, + and a synchronous shoveller which is here. +*/ + +static int fifo_sleep(int flag) +{ + static struct timespec sleeptime = { 0, 50000000}; /* 50 ms */ + + return nanosleep(&sleeptime, NULL); +} + + +static int fifo_read_ng(struct burn_source *source, + unsigned char *buffer, + int size) +{ + struct burn_source_fifo *fs = source->data; + int ret, todo, rpos, bufsize, diff; + + if (fs->end_of_consumption) { + + /* >>> msg: reading has been ended already */; + + return 0; + } + if (fs->is_started == 0) { + ret = burn_fifo_start(source, 0); + if (ret <= 0) { + + /* >>> msg: cannot start fifo thread */; + + fs->end_of_consumption = 1; + return -1; + } + fs->is_started = 1; + } + if (size == 0) + return 0; + + /* Reading from the ring buffer */ + + /* This needs no mutex because each volatile variable has one thread + which may write and the other which only reads and is aware of + volatility. + The feeder of the ringbuffer is in burn_fifo_source_shoveller_ng(). + */ + todo = size; + bufsize = fs->chunksize * fs->chunks; + while (todo > 0) { + /* readpos is not volatile here , writepos is volatile */ + rpos = fs->buf_readpos; + while (rpos == fs->buf_writepos) { + if (fs->end_of_input) + break; + if (fs->input_error) { + if (todo < size) /* deliver partial buffer */ + break; + fs->end_of_consumption = 1; + + /* >>> msg: report fs->input_error as errno */; + + return -1; + } + fifo_sleep(0); + } + diff = fs->buf_writepos - rpos; /* read volatile only once */ + if (diff == 0) + break; + if (diff > 0) + /* diff bytes are available */; + else + /* at least (bufsize - rpos) bytes are available */ + diff = bufsize - rpos; + if (diff > todo) + diff = todo; + memcpy(buffer, fs->buf+(size-todo)+rpos, diff); + fs->buf_readpos += diff; + if (fs->buf_readpos >= bufsize) + fs->buf_readpos = 0; + todo -= diff; + } + if (size - todo <= 0) + fs->end_of_consumption = 1; + else + fs->out_counter += size - todo; + +/* + fprintf(stderr, + "libburn_EXPERIMENTAL: read= %d , pos= %d , out_count= %.f\n", + (size - todo), fs->buf_readpos, (double) fs->out_counter); +*/ + + return (size - todo); +} + + +static off_t fifo_get_size_ng(struct burn_source *source) +{ + struct burn_source_fifo *fs = source->data; + + return fs->inp->get_size(fs->inp); +} + + +static int fifo_set_size_ng(struct burn_source *source, off_t size) +{ + struct burn_source_fifo *fs = source->data; + + return fs->inp->set_size(fs->inp, size); +} + + +static void fifo_free_ng(struct burn_source *source) +{ + struct burn_source_fifo *fs = source->data; + + if (fs->inp != NULL) + burn_source_free(fs->inp); + if (fs->buf != NULL) + free(fs->buf); + free((char *) fs); +} + + +int burn_fifo_source_shoveller_ng(struct burn_source *source, int flag) +{ + struct burn_source_fifo *fs = source->data; + int ret, bufsize, diff, wpos, rpos, trans_end, free_bytes; + char *bufpt; + + fs->thread_pid = getpid(); + fs->thread_pid_valid = 1; + + bufsize = fs->chunksize * fs->chunks; + while (!fs->end_of_consumption) { + + /* wait for enough buffer space available */ + wpos = fs->buf_writepos; + while (1) { + rpos = fs->buf_readpos; + diff = rpos - wpos; + trans_end = 0; + if (diff == 0) + free_bytes = bufsize - 1; + else if (diff > 0) + free_bytes = diff - 1; + else { + free_bytes = (bufsize - wpos) + rpos - 1; + if (bufsize - wpos < fs->chunksize) + trans_end = 1; + } + if (free_bytes >= fs->chunksize) + break; + fifo_sleep(0); + } + + /* prepare the receiving memory */ + bufpt = fs->buf + wpos; + if (trans_end) { + bufpt = calloc(fs->chunksize, 1); + if (bufpt == NULL) { + + /* >>> msg: out of memory */; + + fs->input_error = ENOMEM; + break; + } + } + + /* Obtain next chunk */ + ret = fs->inp->read(fs->inp, (unsigned char *) fs->buf, + fs->chunksize); + if (ret > 0) + fs->in_counter += ret; + else if (ret == 0) + break; /* EOF */ + else { + fs->input_error = errno; + if(errno == 0) + fs->input_error = EIO; + break; + } + + /* activate read chunk */ + if (ret > fs->chunksize) /* beware of ill custom burn_source */ + ret = fs->chunksize; + if (trans_end) { + /* copy to end of buffer */ + memcpy(fs->buf + wpos, bufpt, bufsize - wpos); + /* copy to start of buffer */ + memcpy(fs->buf, bufpt + (bufsize - wpos), + fs->chunksize - (bufsize - wpos)); + free(bufpt); + if (ret >= bufsize - wpos) + fs->buf_writepos = ret - (bufsize - wpos); + else + fs->buf_writepos += ret; + } else if (fs->buf_writepos + ret == bufsize) + fs->buf_writepos = 0; + else + fs->buf_writepos += ret; + +/* + fprintf(stderr, "[%2.2d%%] ", + (int) (100.0 - 100.0 * ((double) free_bytes) / + (double) bufsize)); + fprintf(stderr, + "libburn_EXPERIMENTAL: writepos= %d ,in_count = %.f\n", + fs->buf_writepos, (double) fs->in_counter); +*/ + } + if (!fs->end_of_consumption) + fs->end_of_input = 1; + + /* wait for end of reading by consumer */; + while (fs->buf_readpos != fs->buf_writepos && !fs->end_of_consumption) + fifo_sleep(0); + + /* destroy ring buffer */; + if (!fs->end_of_consumption) + fs->end_of_consumption = 2; /* Claim stop of consumption */ + + /* This is not prone to race conditions because either the consumer + indicated hangup by fs->end_of_consumption = 1 or the consumer set + fs->buf_readpos to a value indicating the buffer is empty. + So in both cases the consumer is aware that reading is futile + or even fatal. + */ + free(fs->buf); + fs->buf = NULL; + + return (fs->input_error == 0); +} + + +#define Libburn_fifo_nG 1 + +int burn_fifo_source_shoveller(struct burn_source *source, int flag) +{ +#ifndef Libburn_fifo_nG + return burn_fifo_source_shoveller_og(source, flag); +#else + return burn_fifo_source_shoveller_ng(source, flag); +#endif +} + + +struct burn_source *burn_fifo_source_new(struct burn_source *inp, + int chunksize, int chunks, int flag) +{ + struct burn_source_fifo *fs; + struct burn_source *src; +#ifndef Libburn_fifo_nG + int ret, outlet[2]; +#endif + + if (((double) chunksize) * ((double) chunks) > 1024.0*1024.0*1024.0) { + /* >>> buffer larger than 1 GB */; + return NULL; + } + if (chunksize < 1 || chunks < 2) { + /* >>> buffer too small */; + return NULL; + } + +#ifndef Libburn_fifo_nG + outlet[0] = outlet[1] = -1; + ret = pipe(outlet); + if (ret == -1) { + /* >>> error on pipe creation */; + return NULL; + } +#endif /* ! Libburn_fifo_nG */ + + fs = malloc(sizeof(struct burn_source_fifo)); + if (fs == NULL) + return NULL; + fs->magic[0] = 'f'; fs->magic[1] = 'i'; + fs->magic[2] = 'f'; fs->magic[3] = 'o'; + fs->is_started = 0; + fs->thread_pid = 0; + fs->thread_pid_valid = 0; + fs->inp = NULL; /* set later */ + +#ifndef Libburn_fifo_nG + fs->outlet[0] = outlet[0]; + fs->outlet[1] = outlet[1]; +#endif + + fs->chunksize = chunksize; + fs->chunks = chunks; + fs->buf = NULL; + fs->buf_writepos = fs->buf_readpos = 0; + fs->end_of_input = 0; + fs->input_error = 0; + fs->end_of_consumption = 0; + fs->in_counter = fs->out_counter = 0; + + src = burn_source_new(); + if (src == NULL) { + free((char *) fs); + return NULL; + } + +#ifndef Libburn_fifo_nG + src->read = fifo_read; + src->read_sub = NULL; + src->get_size = fifo_get_size; + src->set_size = fifo_set_size; + src->free_data = fifo_free; +#else /* Libburn_fifo_nG */ + src->read = fifo_read_ng; + src->read_sub = NULL; + src->get_size = fifo_get_size_ng; + src->set_size = fifo_set_size_ng; + src->free_data = fifo_free_ng; +#endif /* ! Libburn_fifo_nG */ + + src->data = fs; + + fs->inp = inp; + inp->refcount++; /* make sure inp lives longer than src */ + + return src; +} + + +/* ts A71003 : API */ +int burn_fifo_inquire_status(struct burn_source *source, int *size, + int *free_bytes) +{ + struct burn_source_fifo *fs = source->data; + int ret = 0, diff, wpos, rpos; + + if (fs->magic[0] != 'f' || fs->magic[1] != 'i' || + fs->magic[2] != 'f' || fs->magic[3] != 'o') { + + /* >>> not a fifo burn_source */; + + return -1; + } + *size = fs->chunksize * fs->chunks; + rpos = fs->buf_readpos; + wpos = fs->buf_writepos; + diff = rpos - wpos; + if (diff == 0) + *free_bytes = *size - 1; + else if (diff > 0) + *free_bytes = diff - 1; + else + *free_bytes = (*size - wpos) + rpos - 1; + if (fs->end_of_consumption == 1) + ret |= 4; + if (fs->input_error) + ret |= 3; + else if (fs->end_of_input) + ret |= 2; + else + ret |= 1; + return ret; +} + + diff --git a/libburn/file.h b/libburn/file.h index f7338cd..1ca228c 100644 --- a/libburn/file.h +++ b/libburn/file.h @@ -15,6 +15,7 @@ struct burn_source_file /* ts A70930 */ struct burn_source_fifo { + char magic[4]; /* The fifo stays inactive and unequipped with eventual resources until its read() method is called for the first time. @@ -35,10 +36,15 @@ struct burn_source_fifo { /* <<< currently it is only a pipe */ int outlet[2]; - /* >>> later it will be a ring buffer mechanism */ + /* The ring buffer mechanism */ int chunksize; int chunks; char *buf; + volatile int buf_writepos; + volatile int buf_readpos; + volatile int end_of_input; + volatile int input_error; + volatile int end_of_consumption; off_t in_counter; off_t out_counter; @@ -48,7 +54,7 @@ struct burn_source_fifo { /** The worker behind the fifo thread. Gets started from burn_fifo_start() in async.c */ -int burn_fifo_source_shuffler(struct burn_source *source, int flag); +int burn_fifo_source_shoveller(struct burn_source *source, int flag); #endif /* LIBBURN__FILE_H */