diff --git a/libisofs/filters/external.c b/libisofs/filters/external.c index 706a4aa..f76aaa3 100644 --- a/libisofs/filters/external.c +++ b/libisofs/filters/external.c @@ -23,6 +23,7 @@ #include #include #include +#include /* @@ -32,14 +33,19 @@ /* - * Individual runtime properties exist only as long as stream is opened. + * Individual runtime properties exist only as long as the stream is opened. */ typedef struct { int send_fd; int recv_fd; pid_t pid; - int eof; + off_t in_counter; + int in_eof; + off_t out_counter; + int out_eof; + uint8_t pipebuf[2048]; /* buffers in case of EAGAIN on write() */ + int pipebuf_fill; } ExternalFilterRuntime; @@ -134,7 +140,12 @@ int extf_stream_open_flag(IsoStream *stream, int flag) running->send_fd = send_pipe[1]; running->recv_fd = recv_pipe[0]; running->pid = child_pid; - running->eof = 0; + running->in_counter = 0; + running->in_eof = 0; + running->out_counter = 0; + running->out_eof = 0; + memset(running->pipebuf, 0, sizeof(running->pipebuf)); + running->pipebuf_fill = 0; data->running = running; /* Give up the child-side pipe ends */ @@ -149,6 +160,12 @@ int extf_stream_open_flag(IsoStream *stream, int flag) ret |= O_NONBLOCK; fcntl(recv_pipe[0], F_SETFL, ret); } + /* Make filter sink non-blocking */ + ret = fcntl(send_pipe[1], F_GETFL); + if (ret != -1) { + ret |= O_NONBLOCK; + fcntl(send_pipe[1], F_SETFL, ret); + } return 1; } @@ -233,30 +250,43 @@ int extf_stream_close(IsoStream *stream) static int extf_stream_read(IsoStream *stream, void *buf, size_t desired) { - int ret, in_done = 0, blocking = 0; + int ret, blocking = 0; ExternalFilterStreamData *data; - uint8_t pipebuf[2048]; /* keep this small, not to clog the input */ + ExternalFilterRuntime *running; size_t fill = 0; if (stream == NULL) { return ISO_NULL_POINTER; } data = stream->data; - if (data->running == NULL) { + running= data->running; + if (running == NULL) { return ISO_FILE_NOT_OPENED; } - if (data->running->eof) { + if (running->out_eof) { return 0; } while (1) { + if (running->in_eof) { + + /* >>> ??? should one replace non-blocking read() by select () ? */ + + /* Make filter outlet blocking */ + ret = fcntl(running->recv_fd, F_GETFL); + if (ret != -1) { + ret &= ~O_NONBLOCK; + fcntl(running->recv_fd, F_SETFL, ret); + } + blocking = 1; + } /* Try to read desired amount from filter */; while (1) { /* >>> ??? should one replace non-blocking read() by select () ? */ - ret = read(data->running->recv_fd, ((char *) buf) + fill, + ret = read(running->recv_fd, ((char *) buf) + fill, desired - fill); if (ret == -1) { if (errno == EAGAIN) @@ -264,42 +294,57 @@ int extf_stream_read(IsoStream *stream, void *buf, size_t desired) return ISO_FILE_READ_ERROR; } fill += ret; + running->out_counter+= ret; if (ret == 0) { - data->running->eof = 1; + running->out_eof = 1; } if (ret == 0 || fill >= desired) { return fill; } } - if (in_done) { + if (running->in_eof) { /* >>> ??? should one replace non-blocking read() by select () ? */ - /* Make filter outlet blocking */ - ret = fcntl(data->running->recv_fd, F_GETFL); - if (ret != -1) { - ret &= ~O_NONBLOCK; - fcntl(data->running->recv_fd, F_SETFL, ret); - } - blocking = 1; usleep(1000); /* just in case it is still non-blocking */ continue; } - ret = iso_stream_read(data->orig, pipebuf, sizeof(pipebuf)); + if (running->pipebuf_fill) { + ret = running->pipebuf_fill; + running->pipebuf_fill = 0; + } else { + ret = iso_stream_read(data->orig, running->pipebuf, + sizeof(running->pipebuf)); + } if (ret < 0) { + running->in_eof = 1; return ret; } if (ret == 0) { - in_done = 1; - close(data->running->send_fd); /* Tell the filter: it is over */ - data->running->send_fd = -1; + running->in_eof = 1; + close(running->send_fd); /* Tell the filter: it is over */ + running->send_fd = -1; } else { - ret = write(data->running->send_fd, pipebuf, ret); + running->in_counter += ret; + running->pipebuf_fill = ret; + ret = write(running->send_fd, running->pipebuf, + running->pipebuf_fill); if (ret == -1) { + if (errno == EAGAIN) { + + /* >>> ??? should one replace non-blocking read() + by select () ? */ + + usleep(1000); /* go lazy because the filter is slow */ + continue; + } + /* From the view of the caller it _is_ a read error */ + running->in_eof = 1; return ISO_FILE_READ_ERROR; } + running->pipebuf_fill = 0; } } return ISO_FILE_READ_ERROR; /* should never be hit */