Implemented a simple fifo to decouple from burn_source signals

This commit is contained in:
Thomas Schmitt 2007-09-30 21:24:55 +00:00
parent 23b2720cc0
commit f94133cf45
6 changed files with 255 additions and 3 deletions

View File

@ -1 +1 @@
#define Cdrskin_timestamP "2007.09.29.191558" #define Cdrskin_timestamP "2007.09.30.212517"

View File

@ -7,9 +7,12 @@
#include "options.h" #include "options.h"
#include "async.h" #include "async.h"
#include "init.h" #include "init.h"
#include "file.h"
#include "back_hacks.h" #include "back_hacks.h"
#include <pthread.h> #include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
@ -53,6 +56,12 @@ struct write_opts
struct burn_disc *disc; struct burn_disc *disc;
}; };
struct fifo_opts
{
struct burn_source *source;
int flag;
};
struct w_list struct w_list
{ {
@ -67,6 +76,7 @@ struct w_list
struct erase_opts erase; struct erase_opts erase;
struct format_opts format; struct format_opts format;
struct write_opts write; struct write_opts write;
struct fifo_opts fifo;
} u; } u;
}; };
@ -419,6 +429,38 @@ void burn_disc_write(struct burn_write_opts *opts, struct burn_disc *disc)
add_worker(opts->drive, (WorkerFunc) write_disc_worker_func, &o); add_worker(opts->drive, (WorkerFunc) write_disc_worker_func, &o);
} }
static void *fifo_worker_func(struct w_list *w)
{
burn_fifo_source_shuffler(w->u.fifo.source, w->u.fifo.flag);
remove_worker(pthread_self());
return NULL;
}
int burn_fifo_start(struct burn_source *source, int flag)
{
struct fifo_opts o;
struct burn_source_fifo *fs = source->data;
fs->is_started = -1;
/* >>> create and set up ring buffer */;
/* >>> for now: only 1 , later: fs->chunks */
fs->buf = calloc(fs->chunksize, 1);
if (fs->buf == NULL) {
/* >>> could not start ring buffer */;
return -1;
}
o.source = source;
o.flag = flag;
add_worker(NULL, (WorkerFunc) fifo_worker_func, &o);
fs->is_started = 1;
return 1;
}
void burn_async_join_all(void) void burn_async_join_all(void)
{ {
void *ret; void *ret;

View File

@ -5,4 +5,10 @@
void burn_async_join_all(void); void burn_async_join_all(void);
struct burn_write_opts; struct burn_write_opts;
/* ts A70930 */
/* To be called when the first read() call comes to a fifo */
int burn_fifo_start(struct burn_source *source, int flag);
#endif /* BURN__ASYNC_H */ #endif /* BURN__ASYNC_H */

View File

@ -2,12 +2,16 @@
#include <stdlib.h> #include <stdlib.h>
#include <sys/types.h> #include <sys/types.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include "source.h" #include "source.h"
#include "libburn.h" #include "libburn.h"
#include "file.h" #include "file.h"
#include "async.h"
/* main channel data can be padded on read, but 0 padding the subs will make /* main channel data can be padded on read, but 0 padding the subs will make
an unreadable disc */ an unreadable disc */
@ -170,7 +174,7 @@ struct burn_source *burn_fd_source_new(int datafd, int subfd, off_t size)
src->read = file_read; src->read = file_read;
if(subfd != -1) if(subfd != -1)
src->read = file_read_sub; src->read_sub = file_read_sub;
src->get_size = file_size; src->get_size = file_size;
src->set_size = file_set_size; src->set_size = file_set_size;
src->free_data = file_free; src->free_data = file_free;
@ -178,3 +182,148 @@ struct burn_source *burn_fd_source_new(int datafd, int subfd, off_t size)
return src; return src;
} }
/* ts A70930 */
/* ----------------------------- fifo ---------------------------- */
/* The fifo mechanism consists of a burn_source proxy which is here,
a thread management team which is located in async.c,
and a synchronous shuffler which is here.
*/
static int fifo_read(struct burn_source *source,
unsigned char *buffer,
int size)
{
struct burn_source_fifo *fs = source->data;
int ret;
if (fs->is_started == 0) {
ret = burn_fifo_start(source, 0);
if (ret <= 0) {
/* >>> cannot start fifo thread */;
return -1;
}
fs->is_started = 1;
}
if (size == 0)
return 0;
ret = read_full_buffer(fs->outlet[0], buffer, size);
if (ret > 0)
fs->out_counter += ret;
return ret;
}
static off_t fifo_get_size(struct burn_source *source)
{
struct burn_source_fifo *fs = source->data;
return fs->inp->get_size(fs->inp);
}
static int fifo_set_size(struct burn_source *source, off_t size)
{
struct burn_source_fifo *fs = source->data;
return fs->inp->set_size(fs->inp, size);
}
static void fifo_free(struct burn_source *source)
{
struct burn_source_fifo *fs = source->data;
if (fs->outlet[1] >= 0)
close(fs->outlet[1]);
free(fs);
}
struct burn_source *burn_fifo_source_new(struct burn_source *inp,
int chunksize, int chunks, int flag)
{
struct burn_source_fifo *fs;
struct burn_source *src;
int ret, outlet[2];
ret = pipe(outlet);
if (ret == -1) {
/* >>> error on pipe creation */;
return NULL;
}
fs = malloc(sizeof(struct burn_source_fifo));
if (fs == NULL)
return NULL;
fs->is_started = 0;
fs->thread_pid = 0;
fs->thread_pid_valid = 0;
fs->inp = inp;
fs->outlet[0] = outlet[0];
fs->outlet[1] = outlet[1];
fs->chunksize = chunksize;
if (chunksize <= 0)
fs->chunksize = 2048;
fs->chunks = chunks;
fs->buf = NULL;
fs->in_counter = fs->out_counter = 0;
src = burn_source_new();
if (src == NULL) {
free((char *) fs->buf);
free((char *) fs);
return NULL;
}
src->read = fifo_read;
src->read_sub = NULL;
src->get_size = fifo_get_size;
src->set_size = fifo_set_size;
src->free_data = fifo_free;
src->data = fs;
return src;
}
int burn_fifo_source_shuffler(struct burn_source *source, int flag)
{
struct burn_source_fifo *fs = source->data;
int ret;
fs->thread_pid = getpid();
fs->thread_pid_valid = 1;
while (1) {
ret = fs->inp->read(fs->inp, (unsigned char *) fs->buf,
fs->chunksize);
if (ret > 0)
fs->in_counter += ret;
else if (ret == 0)
break; /* EOF */
else {
/* >>> read error */;
break;
}
ret = write(fs->outlet[1], fs->buf, ret);
if (ret == -1) {
/* >>> write error */;
break;
}
}
/* >>> check and destroy ring buffer */;
free(fs->buf);
fs->buf = NULL;
if (fs->outlet[1] >= 0)
close(fs->outlet[1]);
fs->outlet[1] = -1;
return (ret >= 0);
}

View File

@ -10,8 +10,45 @@ struct burn_source_file
off_t fixed_size; off_t fixed_size;
}; };
/* ts A70126 : burn_source_file obsoleted burn_source_fd */ /* ts A70126 : burn_source_file obsoleted burn_source_fd */
/* ts A70930 */
struct burn_source_fifo {
/* The fifo stays inactive and unequipped with eventual resources
until its read() method is called for the first time.
Only then burn_fifo_start() gets called, allocates the complete
resources, starts a thread with burn_fifo_source_shuffler()
which shuffles data and finally destroys the resources.
This late start is to stay modest in case of multiple tracks
in one disc.
*/
int is_started;
int thread_pid;
int thread_pid_valid;
/* the burn_source for which this fifo is acting as proxy */
struct burn_source *inp;
/* <<< currently it is only a pipe */
int outlet[2];
/* >>> later it will be a ring buffer mechanism */
int chunksize;
int chunks;
char *buf;
off_t in_counter;
off_t out_counter;
};
/** The worker behind the fifo thread.
Gets started from burn_fifo_start() in async.c
*/
int burn_fifo_source_shuffler(struct burn_source *source, int flag);
#endif /* LIBBURN__FILE_H */ #endif /* LIBBURN__FILE_H */

View File

@ -1376,6 +1376,24 @@ struct burn_source *burn_file_source_new(const char *path,
struct burn_source *burn_fd_source_new(int datafd, int subfd, off_t size); struct burn_source *burn_fd_source_new(int datafd, int subfd, off_t size);
/* ts A70930 : Provisory API call. Subject to changes ! */
/** Creates a fifo which acts as proxy for an already existing data source.
NOTE: Incomplete function. Curently the ring buffer is not implemented.
The only purpose is to decouple the writer thread of libburn from an
eventually error prone burn_source.
In future this will implement a ring buffer which shall smoothen the
data stream between burn_source and writer thread.
@param inp The burn_source for which the fifo shall act as proxy.
@param chunksize The size in bytes of a chunk. Use 2048 if in doubt.
@param chunks The number of chunks to be allocated in ring buffer.
@param flag Bitfield for control purposes (unused yet, submit 0).
@return A pointer to the newly created burn_source.
Later both burn_sources, inp and the returned fifo, have
to be disposed by calling burn_source_free() for each.
*/
struct burn_source *burn_fifo_source_new(struct burn_source *inp,
int chunksize, int chunks, int flag);
/* ts A70328 */ /* ts A70328 */
/** Sets a fixed track size after the data source object has already been /** Sets a fixed track size after the data source object has already been
created. created.