Changed sequence of fork and stream opening in extf_stream_class.open().

So the child does not inherit the pipe inlets of underlying filters
which would stay open and prevent those underlying filter children
from seeing EOF at their input.
This commit is contained in:
Thomas Schmitt 2009-04-05 09:56:12 +02:00
parent 198f6536bc
commit da125e8f6b

View File

@ -96,11 +96,66 @@ typedef struct
static ino_t extf_ino_id = 0; static ino_t extf_ino_id = 0;
/* <<< */
static int print_fd= 0;
/* /*
* Methods for the IsoStreamIface of an External Filter object. * Methods for the IsoStreamIface of an External Filter object.
*/ */
/*
* @param flag bit0= original stream is not open
*/
static
int extf_stream_close_flag(IsoStream *stream, int flag)
{
int ret, status;
ExternalFilterStreamData *data;
if (stream == NULL) {
return ISO_NULL_POINTER;
}
data = stream->data;
if (data->running == NULL) {
return 1;
}
/* <<< */
if (print_fd) {
fprintf(stderr, "libisofs_DEBUG: filter close in = %d , ic= %.f\n",
data->running->recv_fd, (double) data->running->in_counter);
fprintf(stderr, "libisofs_DEBUG: filter close out = %d , oc= %.f\n",
data->running->send_fd, (double) data->running->out_counter);
}
if(data->running->recv_fd != -1)
close(data->running->recv_fd);
if(data->running->send_fd != -1)
close(data->running->send_fd);
ret = waitpid(data->running->pid, &status, WNOHANG);
if (ret == 0 && data->running->pid != 0) {
kill(data->running->pid, SIGKILL);
waitpid(data->running->pid, &status, 0);
}
free(data->running);
data->running = NULL;
if (flag & 1)
return 1;
return iso_stream_close(data->orig);
}
static
int extf_stream_close(IsoStream *stream)
{
return extf_stream_close_flag(stream, 0);
}
/* /*
* @param flag bit0= do not run .get_size() if size is < 0 * @param flag bit0= do not run .get_size() if size is < 0
*/ */
@ -121,18 +176,12 @@ int extf_stream_open_flag(IsoStream *stream, int flag)
if (data->running != NULL) { if (data->running != NULL) {
return ISO_FILE_ALREADY_OPENED; return ISO_FILE_ALREADY_OPENED;
} }
if (data->size < 0 && !(flag & 1)) { if (data->size < 0 && !(flag & 1)) {
/* Do the size determination run now, so that the size gets cached /* Do the size determination run now, so that the size gets cached
and .get_size() will not fail on an opened stream. and .get_size() will not fail on an opened stream.
*/ */
stream->class->get_size(stream); stream->class->get_size(stream);
} }
ret = iso_stream_open(data->orig);
if (ret < 0) {
return ret;
}
stream_open = 1;
ret = pipe(send_pipe); ret = pipe(send_pipe);
if (ret == -1) { if (ret == -1) {
@ -160,10 +209,37 @@ int extf_stream_open_flag(IsoStream *stream, int flag)
} }
data->running = running; data->running = running;
/* <<< */
if (print_fd) {
fprintf(stderr, "libisofs_DEBUG: filter parent in = %d\n",
data->running->recv_fd);
fprintf(stderr, "libisofs_DEBUG: filter parent out = %d\n",
data->running->send_fd);
}
/* Give up the child-side pipe ends */ /* Give up the child-side pipe ends */
close(send_pipe[0]); close(send_pipe[0]);
close(recv_pipe[1]); close(recv_pipe[1]);
/* Open stream only after forking so that the child does not know
the pipe inlets of eventually underlying other filter streams.
They would stay open and prevent those underlying filter children
from seeing EOF at their input.
*/
ret = iso_stream_open(data->orig);
/* <<< TEST <<<
ret= ISO_FILE_READ_ERROR;
*/
if (ret < 0) {
/* Dispose pipes and child */
extf_stream_close_flag(stream, 1);
return ret;
}
stream_open = 1;
/* >>> ??? should one replace non-blocking read() by select () ? */ /* >>> ??? should one replace non-blocking read() by select () ? */
/* Make filter outlet non-blocking */ /* Make filter outlet non-blocking */
@ -178,7 +254,6 @@ int extf_stream_open_flag(IsoStream *stream, int flag)
ret |= O_NONBLOCK; ret |= O_NONBLOCK;
fcntl(send_pipe[1], F_SETFL, ret); fcntl(send_pipe[1], F_SETFL, ret);
} }
return 1; return 1;
} }
@ -200,6 +275,14 @@ int extf_stream_open_flag(IsoStream *stream, int flag)
goto child_failed; goto child_failed;
} }
/* <<< */
if (print_fd) {
fprintf(stderr, "libisofs_DEBUG: filter child in = %d\n",
send_pipe[0]);
fprintf(stderr, "libisofs_DEBUG: filter child out = %d\n",
recv_pipe[1]);
}
/* Self conversion into external program */ /* Self conversion into external program */
execv(data->cmd->path, data->cmd->argv); /* should never come back */ execv(data->cmd->path, data->cmd->argv); /* should never come back */
@ -209,6 +292,15 @@ child_failed:;
exit(127); exit(127);
parent_failed:; parent_failed:;
/* <<< */
if (print_fd) {
fprintf(stderr, "libisofs_DEBUG: FAILED : filter parent in = %d\n",
recv_pipe[0]);
fprintf(stderr, "libisofs_DEBUG: FAILED : filter parent out = %d\n",
send_pipe[1]);
}
if (stream_open) if (stream_open)
iso_stream_close(data->orig); iso_stream_close(data->orig);
if(send_pipe[0] != -1) if(send_pipe[0] != -1)
@ -230,37 +322,6 @@ int extf_stream_open(IsoStream *stream)
} }
static
int extf_stream_close(IsoStream *stream)
{
int ret, status;
ExternalFilterStreamData *data;
if (stream == NULL) {
return ISO_NULL_POINTER;
}
data = stream->data;
if (data->running == NULL) {
return 1;
}
if(data->running->recv_fd != -1)
close(data->running->recv_fd);
if(data->running->send_fd != -1)
close(data->running->send_fd);
ret = waitpid(data->running->pid, &status, WNOHANG);
if (ret == 0 && data->running->pid != 0) {
kill(data->running->pid, SIGKILL);
waitpid(data->running->pid, &status, 0);
}
free(data->running);
data->running = NULL;
return iso_stream_close(data->orig);
}
static static
int extf_stream_read(IsoStream *stream, void *buf, size_t desired) int extf_stream_read(IsoStream *stream, void *buf, size_t desired)
{ {
@ -282,7 +343,7 @@ int extf_stream_read(IsoStream *stream, void *buf, size_t desired)
} }
while (1) { while (1) {
if (running->in_eof) { if (running->in_eof && !blocking) {
/* >>> ??? should one replace non-blocking read() by select () ? */ /* >>> ??? should one replace non-blocking read() by select () ? */
@ -302,17 +363,17 @@ int extf_stream_read(IsoStream *stream, void *buf, size_t desired)
ret = read(running->recv_fd, ((char *) buf) + fill, ret = read(running->recv_fd, ((char *) buf) + fill,
desired - fill); desired - fill);
if (ret == -1) { if (ret < 0) {
if (errno == EAGAIN) if (errno == EAGAIN)
break; break;
return ISO_FILE_READ_ERROR; return ISO_FILE_READ_ERROR;
} }
fill += ret; fill += ret;
running->out_counter+= ret;
if (ret == 0) { if (ret == 0) {
running->out_eof = 1; running->out_eof = 1;
} }
if (ret == 0 || fill >= desired) { if (ret == 0 || fill >= desired) {
running->out_counter += fill;
return fill; return fill;
} }
} }
@ -330,17 +391,26 @@ int extf_stream_read(IsoStream *stream, void *buf, size_t desired)
} else { } else {
ret = iso_stream_read(data->orig, running->pipebuf, ret = iso_stream_read(data->orig, running->pipebuf,
sizeof(running->pipebuf)); sizeof(running->pipebuf));
if (ret > 0)
running->in_counter += ret;
} }
if (ret < 0) { if (ret < 0) {
running->in_eof = 1; running->in_eof = 1;
return ret; return ret;
} }
if (ret == 0) { if (ret == 0) {
/* <<< */
if (print_fd) {
fprintf(stderr,
"libisofs_DEBUG: filter close out = %d , ic= %.f\n",
running->send_fd, (double) running->in_counter);
}
running->in_eof = 1; running->in_eof = 1;
close(running->send_fd); /* Tell the filter: it is over */ close(running->send_fd); /* Tell the filter: it is over */
running->send_fd = -1; running->send_fd = -1;
} else { } else {
running->in_counter += ret;
running->pipebuf_fill = ret; running->pipebuf_fill = ret;
ret = write(running->send_fd, running->pipebuf, ret = write(running->send_fd, running->pipebuf,
running->pipebuf_fill); running->pipebuf_fill);