Fixed a gridlock of external filtering in case that the

filter program is slow with processing.
This commit is contained in:
Thomas Schmitt 2009-03-27 13:44:29 +01:00
parent 691887fd2c
commit 6cf484442c

View File

@ -23,6 +23,7 @@
#include <stdio.h> #include <stdio.h>
#include <fcntl.h> #include <fcntl.h>
#include <errno.h> #include <errno.h>
#include <string.h>
/* /*
@ -32,14 +33,19 @@
/* /*
* Individual runtime properties exist only as long as stream is opened. * Individual runtime properties exist only as long as the stream is opened.
*/ */
typedef struct typedef struct
{ {
int send_fd; int send_fd;
int recv_fd; int recv_fd;
pid_t pid; pid_t pid;
int eof; off_t in_counter;
int in_eof;
off_t out_counter;
int out_eof;
uint8_t pipebuf[2048]; /* buffers in case of EAGAIN on write() */
int pipebuf_fill;
} ExternalFilterRuntime; } ExternalFilterRuntime;
@ -134,7 +140,12 @@ int extf_stream_open_flag(IsoStream *stream, int flag)
running->send_fd = send_pipe[1]; running->send_fd = send_pipe[1];
running->recv_fd = recv_pipe[0]; running->recv_fd = recv_pipe[0];
running->pid = child_pid; running->pid = child_pid;
running->eof = 0; running->in_counter = 0;
running->in_eof = 0;
running->out_counter = 0;
running->out_eof = 0;
memset(running->pipebuf, 0, sizeof(running->pipebuf));
running->pipebuf_fill = 0;
data->running = running; data->running = running;
/* Give up the child-side pipe ends */ /* Give up the child-side pipe ends */
@ -149,6 +160,12 @@ int extf_stream_open_flag(IsoStream *stream, int flag)
ret |= O_NONBLOCK; ret |= O_NONBLOCK;
fcntl(recv_pipe[0], F_SETFL, ret); fcntl(recv_pipe[0], F_SETFL, ret);
} }
/* Make filter sink non-blocking */
ret = fcntl(send_pipe[1], F_GETFL);
if (ret != -1) {
ret |= O_NONBLOCK;
fcntl(send_pipe[1], F_SETFL, ret);
}
return 1; return 1;
} }
@ -233,30 +250,43 @@ int extf_stream_close(IsoStream *stream)
static static
int extf_stream_read(IsoStream *stream, void *buf, size_t desired) int extf_stream_read(IsoStream *stream, void *buf, size_t desired)
{ {
int ret, in_done = 0, blocking = 0; int ret, blocking = 0;
ExternalFilterStreamData *data; ExternalFilterStreamData *data;
uint8_t pipebuf[2048]; /* keep this small, not to clog the input */ ExternalFilterRuntime *running;
size_t fill = 0; size_t fill = 0;
if (stream == NULL) { if (stream == NULL) {
return ISO_NULL_POINTER; return ISO_NULL_POINTER;
} }
data = stream->data; data = stream->data;
if (data->running == NULL) { running= data->running;
if (running == NULL) {
return ISO_FILE_NOT_OPENED; return ISO_FILE_NOT_OPENED;
} }
if (data->running->eof) { if (running->out_eof) {
return 0; return 0;
} }
while (1) { while (1) {
if (running->in_eof) {
/* >>> ??? should one replace non-blocking read() by select () ? */
/* Make filter outlet blocking */
ret = fcntl(running->recv_fd, F_GETFL);
if (ret != -1) {
ret &= ~O_NONBLOCK;
fcntl(running->recv_fd, F_SETFL, ret);
}
blocking = 1;
}
/* Try to read desired amount from filter */; /* Try to read desired amount from filter */;
while (1) { while (1) {
/* >>> ??? should one replace non-blocking read() by select () ? */ /* >>> ??? should one replace non-blocking read() by select () ? */
ret = read(data->running->recv_fd, ((char *) buf) + fill, ret = read(running->recv_fd, ((char *) buf) + fill,
desired - fill); desired - fill);
if (ret == -1) { if (ret == -1) {
if (errno == EAGAIN) if (errno == EAGAIN)
@ -264,42 +294,57 @@ int extf_stream_read(IsoStream *stream, void *buf, size_t desired)
return ISO_FILE_READ_ERROR; return ISO_FILE_READ_ERROR;
} }
fill += ret; fill += ret;
running->out_counter+= ret;
if (ret == 0) { if (ret == 0) {
data->running->eof = 1; running->out_eof = 1;
} }
if (ret == 0 || fill >= desired) { if (ret == 0 || fill >= desired) {
return fill; return fill;
} }
} }
if (in_done) { if (running->in_eof) {
/* >>> ??? should one replace non-blocking read() by select () ? */ /* >>> ??? should one replace non-blocking read() by select () ? */
/* Make filter outlet blocking */
ret = fcntl(data->running->recv_fd, F_GETFL);
if (ret != -1) {
ret &= ~O_NONBLOCK;
fcntl(data->running->recv_fd, F_SETFL, ret);
}
blocking = 1;
usleep(1000); /* just in case it is still non-blocking */ usleep(1000); /* just in case it is still non-blocking */
continue; continue;
} }
ret = iso_stream_read(data->orig, pipebuf, sizeof(pipebuf)); if (running->pipebuf_fill) {
ret = running->pipebuf_fill;
running->pipebuf_fill = 0;
} else {
ret = iso_stream_read(data->orig, running->pipebuf,
sizeof(running->pipebuf));
}
if (ret < 0) { if (ret < 0) {
running->in_eof = 1;
return ret; return ret;
} }
if (ret == 0) { if (ret == 0) {
in_done = 1; running->in_eof = 1;
close(data->running->send_fd); /* Tell the filter: it is over */ close(running->send_fd); /* Tell the filter: it is over */
data->running->send_fd = -1; running->send_fd = -1;
} else { } else {
ret = write(data->running->send_fd, pipebuf, ret); running->in_counter += ret;
running->pipebuf_fill = ret;
ret = write(running->send_fd, running->pipebuf,
running->pipebuf_fill);
if (ret == -1) { if (ret == -1) {
if (errno == EAGAIN) {
/* >>> ??? should one replace non-blocking read()
by select () ? */
usleep(1000); /* go lazy because the filter is slow */
continue;
}
/* From the view of the caller it _is_ a read error */ /* From the view of the caller it _is_ a read error */
running->in_eof = 1;
return ISO_FILE_READ_ERROR; return ISO_FILE_READ_ERROR;
} }
running->pipebuf_fill = 0;
} }
} }
return ISO_FILE_READ_ERROR; /* should never be hit */ return ISO_FILE_READ_ERROR; /* should never be hit */