Implemented the ring buffer of burn_fifo_source_new() object
This commit is contained in:
parent
5263e46639
commit
54006e424c
@ -1 +1 @@
|
||||
#define Cdrskin_timestamP "2007.10.03.115550"
|
||||
#define Cdrskin_timestamP "2007.10.03.223649"
|
||||
|
@ -432,7 +432,7 @@ void burn_disc_write(struct burn_write_opts *opts, struct burn_disc *disc)
|
||||
|
||||
static void *fifo_worker_func(struct w_list *w)
|
||||
{
|
||||
burn_fifo_source_shuffler(w->u.fifo.source, w->u.fifo.flag);
|
||||
burn_fifo_source_shoveller(w->u.fifo.source, w->u.fifo.flag);
|
||||
remove_worker(pthread_self());
|
||||
return NULL;
|
||||
}
|
||||
@ -445,9 +445,8 @@ int burn_fifo_start(struct burn_source *source, int flag)
|
||||
|
||||
fs->is_started = -1;
|
||||
|
||||
/* >>> create and set up ring buffer */;
|
||||
/* >>> for now: only 1 , later: fs->chunks */
|
||||
fs->buf = calloc(fs->chunksize, 1);
|
||||
/* create and set up ring buffer */;
|
||||
fs->buf = calloc(fs->chunksize, fs->chunks);
|
||||
if (fs->buf == NULL) {
|
||||
/* >>> could not start ring buffer */;
|
||||
return -1;
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <time.h>
|
||||
#include "source.h"
|
||||
#include "libburn.h"
|
||||
#include "file.h"
|
||||
@ -188,7 +189,7 @@ struct burn_source *burn_fd_source_new(int datafd, int subfd, off_t size)
|
||||
|
||||
/* The fifo mechanism consists of a burn_source proxy which is here,
|
||||
a thread management team which is located in async.c,
|
||||
and a synchronous shuffler which is here.
|
||||
and a synchronous shoveller which is here.
|
||||
*/
|
||||
|
||||
static int fifo_read(struct burn_source *source,
|
||||
@ -202,7 +203,7 @@ static int fifo_read(struct burn_source *source,
|
||||
ret = burn_fifo_start(source, 0);
|
||||
if (ret <= 0) {
|
||||
|
||||
/* >>> cannot start fifo thread */;
|
||||
/* >>> msg: cannot start fifo thread */;
|
||||
|
||||
return -1;
|
||||
}
|
||||
@ -244,53 +245,7 @@ static void fifo_free(struct burn_source *source)
|
||||
}
|
||||
|
||||
|
||||
struct burn_source *burn_fifo_source_new(struct burn_source *inp,
|
||||
int chunksize, int chunks, int flag)
|
||||
{
|
||||
struct burn_source_fifo *fs;
|
||||
struct burn_source *src;
|
||||
int ret, outlet[2];
|
||||
|
||||
ret = pipe(outlet);
|
||||
if (ret == -1) {
|
||||
/* >>> error on pipe creation */;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
fs = malloc(sizeof(struct burn_source_fifo));
|
||||
if (fs == NULL)
|
||||
return NULL;
|
||||
fs->is_started = 0;
|
||||
fs->thread_pid = 0;
|
||||
fs->thread_pid_valid = 0;
|
||||
fs->inp = inp;
|
||||
fs->outlet[0] = outlet[0];
|
||||
fs->outlet[1] = outlet[1];
|
||||
fs->chunksize = chunksize;
|
||||
if (chunksize <= 0)
|
||||
fs->chunksize = 2048;
|
||||
fs->chunks = chunks;
|
||||
fs->buf = NULL;
|
||||
fs->in_counter = fs->out_counter = 0;
|
||||
|
||||
src = burn_source_new();
|
||||
if (src == NULL) {
|
||||
free((char *) fs->buf);
|
||||
free((char *) fs);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
src->read = fifo_read;
|
||||
src->read_sub = NULL;
|
||||
src->get_size = fifo_get_size;
|
||||
src->set_size = fifo_set_size;
|
||||
src->free_data = fifo_free;
|
||||
src->data = fs;
|
||||
return src;
|
||||
}
|
||||
|
||||
|
||||
int burn_fifo_source_shuffler(struct burn_source *source, int flag)
|
||||
int burn_fifo_source_shoveller_og(struct burn_source *source, int flag)
|
||||
{
|
||||
struct burn_source_fifo *fs = source->data;
|
||||
int ret;
|
||||
@ -327,3 +282,370 @@ int burn_fifo_source_shuffler(struct burn_source *source, int flag)
|
||||
}
|
||||
|
||||
|
||||
/* ts A71003 */
|
||||
/* ----------------------------- fifo ng ------------------------- */
|
||||
|
||||
/* The fifo mechanism consists of a burn_source proxy which is here,
|
||||
a thread management team which is located in async.c,
|
||||
and a synchronous shoveller which is here.
|
||||
*/
|
||||
|
||||
static int fifo_sleep(int flag)
|
||||
{
|
||||
static struct timespec sleeptime = { 0, 50000000}; /* 50 ms */
|
||||
|
||||
return nanosleep(&sleeptime, NULL);
|
||||
}
|
||||
|
||||
|
||||
static int fifo_read_ng(struct burn_source *source,
|
||||
unsigned char *buffer,
|
||||
int size)
|
||||
{
|
||||
struct burn_source_fifo *fs = source->data;
|
||||
int ret, todo, rpos, bufsize, diff;
|
||||
|
||||
if (fs->end_of_consumption) {
|
||||
|
||||
/* >>> msg: reading has been ended already */;
|
||||
|
||||
return 0;
|
||||
}
|
||||
if (fs->is_started == 0) {
|
||||
ret = burn_fifo_start(source, 0);
|
||||
if (ret <= 0) {
|
||||
|
||||
/* >>> msg: cannot start fifo thread */;
|
||||
|
||||
fs->end_of_consumption = 1;
|
||||
return -1;
|
||||
}
|
||||
fs->is_started = 1;
|
||||
}
|
||||
if (size == 0)
|
||||
return 0;
|
||||
|
||||
/* Reading from the ring buffer */
|
||||
|
||||
/* This needs no mutex because each volatile variable has one thread
|
||||
which may write and the other which only reads and is aware of
|
||||
volatility.
|
||||
The feeder of the ringbuffer is in burn_fifo_source_shoveller_ng().
|
||||
*/
|
||||
todo = size;
|
||||
bufsize = fs->chunksize * fs->chunks;
|
||||
while (todo > 0) {
|
||||
/* readpos is not volatile here , writepos is volatile */
|
||||
rpos = fs->buf_readpos;
|
||||
while (rpos == fs->buf_writepos) {
|
||||
if (fs->end_of_input)
|
||||
break;
|
||||
if (fs->input_error) {
|
||||
if (todo < size) /* deliver partial buffer */
|
||||
break;
|
||||
fs->end_of_consumption = 1;
|
||||
|
||||
/* >>> msg: report fs->input_error as errno */;
|
||||
|
||||
return -1;
|
||||
}
|
||||
fifo_sleep(0);
|
||||
}
|
||||
diff = fs->buf_writepos - rpos; /* read volatile only once */
|
||||
if (diff == 0)
|
||||
break;
|
||||
if (diff > 0)
|
||||
/* diff bytes are available */;
|
||||
else
|
||||
/* at least (bufsize - rpos) bytes are available */
|
||||
diff = bufsize - rpos;
|
||||
if (diff > todo)
|
||||
diff = todo;
|
||||
memcpy(buffer, fs->buf+(size-todo)+rpos, diff);
|
||||
fs->buf_readpos += diff;
|
||||
if (fs->buf_readpos >= bufsize)
|
||||
fs->buf_readpos = 0;
|
||||
todo -= diff;
|
||||
}
|
||||
if (size - todo <= 0)
|
||||
fs->end_of_consumption = 1;
|
||||
else
|
||||
fs->out_counter += size - todo;
|
||||
|
||||
/*
|
||||
fprintf(stderr,
|
||||
"libburn_EXPERIMENTAL: read= %d , pos= %d , out_count= %.f\n",
|
||||
(size - todo), fs->buf_readpos, (double) fs->out_counter);
|
||||
*/
|
||||
|
||||
return (size - todo);
|
||||
}
|
||||
|
||||
|
||||
static off_t fifo_get_size_ng(struct burn_source *source)
|
||||
{
|
||||
struct burn_source_fifo *fs = source->data;
|
||||
|
||||
return fs->inp->get_size(fs->inp);
|
||||
}
|
||||
|
||||
|
||||
static int fifo_set_size_ng(struct burn_source *source, off_t size)
|
||||
{
|
||||
struct burn_source_fifo *fs = source->data;
|
||||
|
||||
return fs->inp->set_size(fs->inp, size);
|
||||
}
|
||||
|
||||
|
||||
static void fifo_free_ng(struct burn_source *source)
|
||||
{
|
||||
struct burn_source_fifo *fs = source->data;
|
||||
|
||||
if (fs->inp != NULL)
|
||||
burn_source_free(fs->inp);
|
||||
if (fs->buf != NULL)
|
||||
free(fs->buf);
|
||||
free((char *) fs);
|
||||
}
|
||||
|
||||
|
||||
int burn_fifo_source_shoveller_ng(struct burn_source *source, int flag)
|
||||
{
|
||||
struct burn_source_fifo *fs = source->data;
|
||||
int ret, bufsize, diff, wpos, rpos, trans_end, free_bytes;
|
||||
char *bufpt;
|
||||
|
||||
fs->thread_pid = getpid();
|
||||
fs->thread_pid_valid = 1;
|
||||
|
||||
bufsize = fs->chunksize * fs->chunks;
|
||||
while (!fs->end_of_consumption) {
|
||||
|
||||
/* wait for enough buffer space available */
|
||||
wpos = fs->buf_writepos;
|
||||
while (1) {
|
||||
rpos = fs->buf_readpos;
|
||||
diff = rpos - wpos;
|
||||
trans_end = 0;
|
||||
if (diff == 0)
|
||||
free_bytes = bufsize - 1;
|
||||
else if (diff > 0)
|
||||
free_bytes = diff - 1;
|
||||
else {
|
||||
free_bytes = (bufsize - wpos) + rpos - 1;
|
||||
if (bufsize - wpos < fs->chunksize)
|
||||
trans_end = 1;
|
||||
}
|
||||
if (free_bytes >= fs->chunksize)
|
||||
break;
|
||||
fifo_sleep(0);
|
||||
}
|
||||
|
||||
/* prepare the receiving memory */
|
||||
bufpt = fs->buf + wpos;
|
||||
if (trans_end) {
|
||||
bufpt = calloc(fs->chunksize, 1);
|
||||
if (bufpt == NULL) {
|
||||
|
||||
/* >>> msg: out of memory */;
|
||||
|
||||
fs->input_error = ENOMEM;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Obtain next chunk */
|
||||
ret = fs->inp->read(fs->inp, (unsigned char *) fs->buf,
|
||||
fs->chunksize);
|
||||
if (ret > 0)
|
||||
fs->in_counter += ret;
|
||||
else if (ret == 0)
|
||||
break; /* EOF */
|
||||
else {
|
||||
fs->input_error = errno;
|
||||
if(errno == 0)
|
||||
fs->input_error = EIO;
|
||||
break;
|
||||
}
|
||||
|
||||
/* activate read chunk */
|
||||
if (ret > fs->chunksize) /* beware of ill custom burn_source */
|
||||
ret = fs->chunksize;
|
||||
if (trans_end) {
|
||||
/* copy to end of buffer */
|
||||
memcpy(fs->buf + wpos, bufpt, bufsize - wpos);
|
||||
/* copy to start of buffer */
|
||||
memcpy(fs->buf, bufpt + (bufsize - wpos),
|
||||
fs->chunksize - (bufsize - wpos));
|
||||
free(bufpt);
|
||||
if (ret >= bufsize - wpos)
|
||||
fs->buf_writepos = ret - (bufsize - wpos);
|
||||
else
|
||||
fs->buf_writepos += ret;
|
||||
} else if (fs->buf_writepos + ret == bufsize)
|
||||
fs->buf_writepos = 0;
|
||||
else
|
||||
fs->buf_writepos += ret;
|
||||
|
||||
/*
|
||||
fprintf(stderr, "[%2.2d%%] ",
|
||||
(int) (100.0 - 100.0 * ((double) free_bytes) /
|
||||
(double) bufsize));
|
||||
fprintf(stderr,
|
||||
"libburn_EXPERIMENTAL: writepos= %d ,in_count = %.f\n",
|
||||
fs->buf_writepos, (double) fs->in_counter);
|
||||
*/
|
||||
}
|
||||
if (!fs->end_of_consumption)
|
||||
fs->end_of_input = 1;
|
||||
|
||||
/* wait for end of reading by consumer */;
|
||||
while (fs->buf_readpos != fs->buf_writepos && !fs->end_of_consumption)
|
||||
fifo_sleep(0);
|
||||
|
||||
/* destroy ring buffer */;
|
||||
if (!fs->end_of_consumption)
|
||||
fs->end_of_consumption = 2; /* Claim stop of consumption */
|
||||
|
||||
/* This is not prone to race conditions because either the consumer
|
||||
indicated hangup by fs->end_of_consumption = 1 or the consumer set
|
||||
fs->buf_readpos to a value indicating the buffer is empty.
|
||||
So in both cases the consumer is aware that reading is futile
|
||||
or even fatal.
|
||||
*/
|
||||
free(fs->buf);
|
||||
fs->buf = NULL;
|
||||
|
||||
return (fs->input_error == 0);
|
||||
}
|
||||
|
||||
|
||||
#define Libburn_fifo_nG 1
|
||||
|
||||
int burn_fifo_source_shoveller(struct burn_source *source, int flag)
|
||||
{
|
||||
#ifndef Libburn_fifo_nG
|
||||
return burn_fifo_source_shoveller_og(source, flag);
|
||||
#else
|
||||
return burn_fifo_source_shoveller_ng(source, flag);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
struct burn_source *burn_fifo_source_new(struct burn_source *inp,
|
||||
int chunksize, int chunks, int flag)
|
||||
{
|
||||
struct burn_source_fifo *fs;
|
||||
struct burn_source *src;
|
||||
#ifndef Libburn_fifo_nG
|
||||
int ret, outlet[2];
|
||||
#endif
|
||||
|
||||
if (((double) chunksize) * ((double) chunks) > 1024.0*1024.0*1024.0) {
|
||||
/* >>> buffer larger than 1 GB */;
|
||||
return NULL;
|
||||
}
|
||||
if (chunksize < 1 || chunks < 2) {
|
||||
/* >>> buffer too small */;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#ifndef Libburn_fifo_nG
|
||||
outlet[0] = outlet[1] = -1;
|
||||
ret = pipe(outlet);
|
||||
if (ret == -1) {
|
||||
/* >>> error on pipe creation */;
|
||||
return NULL;
|
||||
}
|
||||
#endif /* ! Libburn_fifo_nG */
|
||||
|
||||
fs = malloc(sizeof(struct burn_source_fifo));
|
||||
if (fs == NULL)
|
||||
return NULL;
|
||||
fs->magic[0] = 'f'; fs->magic[1] = 'i';
|
||||
fs->magic[2] = 'f'; fs->magic[3] = 'o';
|
||||
fs->is_started = 0;
|
||||
fs->thread_pid = 0;
|
||||
fs->thread_pid_valid = 0;
|
||||
fs->inp = NULL; /* set later */
|
||||
|
||||
#ifndef Libburn_fifo_nG
|
||||
fs->outlet[0] = outlet[0];
|
||||
fs->outlet[1] = outlet[1];
|
||||
#endif
|
||||
|
||||
fs->chunksize = chunksize;
|
||||
fs->chunks = chunks;
|
||||
fs->buf = NULL;
|
||||
fs->buf_writepos = fs->buf_readpos = 0;
|
||||
fs->end_of_input = 0;
|
||||
fs->input_error = 0;
|
||||
fs->end_of_consumption = 0;
|
||||
fs->in_counter = fs->out_counter = 0;
|
||||
|
||||
src = burn_source_new();
|
||||
if (src == NULL) {
|
||||
free((char *) fs);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#ifndef Libburn_fifo_nG
|
||||
src->read = fifo_read;
|
||||
src->read_sub = NULL;
|
||||
src->get_size = fifo_get_size;
|
||||
src->set_size = fifo_set_size;
|
||||
src->free_data = fifo_free;
|
||||
#else /* Libburn_fifo_nG */
|
||||
src->read = fifo_read_ng;
|
||||
src->read_sub = NULL;
|
||||
src->get_size = fifo_get_size_ng;
|
||||
src->set_size = fifo_set_size_ng;
|
||||
src->free_data = fifo_free_ng;
|
||||
#endif /* ! Libburn_fifo_nG */
|
||||
|
||||
src->data = fs;
|
||||
|
||||
fs->inp = inp;
|
||||
inp->refcount++; /* make sure inp lives longer than src */
|
||||
|
||||
return src;
|
||||
}
|
||||
|
||||
|
||||
/* ts A71003 : API */
|
||||
int burn_fifo_inquire_status(struct burn_source *source, int *size,
|
||||
int *free_bytes)
|
||||
{
|
||||
struct burn_source_fifo *fs = source->data;
|
||||
int ret = 0, diff, wpos, rpos;
|
||||
|
||||
if (fs->magic[0] != 'f' || fs->magic[1] != 'i' ||
|
||||
fs->magic[2] != 'f' || fs->magic[3] != 'o') {
|
||||
|
||||
/* >>> not a fifo burn_source */;
|
||||
|
||||
return -1;
|
||||
}
|
||||
*size = fs->chunksize * fs->chunks;
|
||||
rpos = fs->buf_readpos;
|
||||
wpos = fs->buf_writepos;
|
||||
diff = rpos - wpos;
|
||||
if (diff == 0)
|
||||
*free_bytes = *size - 1;
|
||||
else if (diff > 0)
|
||||
*free_bytes = diff - 1;
|
||||
else
|
||||
*free_bytes = (*size - wpos) + rpos - 1;
|
||||
if (fs->end_of_consumption == 1)
|
||||
ret |= 4;
|
||||
if (fs->input_error)
|
||||
ret |= 3;
|
||||
else if (fs->end_of_input)
|
||||
ret |= 2;
|
||||
else
|
||||
ret |= 1;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
@ -15,6 +15,7 @@ struct burn_source_file
|
||||
|
||||
/* ts A70930 */
|
||||
struct burn_source_fifo {
|
||||
char magic[4];
|
||||
|
||||
/* The fifo stays inactive and unequipped with eventual resources
|
||||
until its read() method is called for the first time.
|
||||
@ -35,10 +36,15 @@ struct burn_source_fifo {
|
||||
/* <<< currently it is only a pipe */
|
||||
int outlet[2];
|
||||
|
||||
/* >>> later it will be a ring buffer mechanism */
|
||||
/* The ring buffer mechanism */
|
||||
int chunksize;
|
||||
int chunks;
|
||||
char *buf;
|
||||
volatile int buf_writepos;
|
||||
volatile int buf_readpos;
|
||||
volatile int end_of_input;
|
||||
volatile int input_error;
|
||||
volatile int end_of_consumption;
|
||||
|
||||
off_t in_counter;
|
||||
off_t out_counter;
|
||||
@ -48,7 +54,7 @@ struct burn_source_fifo {
|
||||
/** The worker behind the fifo thread.
|
||||
Gets started from burn_fifo_start() in async.c
|
||||
*/
|
||||
int burn_fifo_source_shuffler(struct burn_source *source, int flag);
|
||||
int burn_fifo_source_shoveller(struct burn_source *source, int flag);
|
||||
|
||||
|
||||
#endif /* LIBBURN__FILE_H */
|
||||
|
Loading…
Reference in New Issue
Block a user