libisofs/libisofs/filters/external.c

515 lines
12 KiB
C

/*
* Copyright (c) 2009 Thomas Schmitt
*
* This file is part of the libisofs project; you can redistribute it and/or
* modify it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation. See COPYING file for details.
*
* It implements a filter facility which can pipe a IsoStream into an external
* process, read its output and forward it as IsoStream output to an IsoFile.
* The external processes get started according to an IsoExternalFilterCommand
* which is described in libisofs.h.
*
*/
#include "../libisofs.h"
#include "../filter.h"
#include "../fsource.h"
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
/*
* A filter that starts an external process and uses its stdin and stdout
* for classical pipe filtering.
*/
/*
* Individual runtime properties exist only as long as stream is opened.
*/
typedef struct
{
int send_fd;
int recv_fd;
pid_t pid;
int eof;
} ExternalFilterRuntime;
/*
* The data payload of an individual IsoStream from External Filter
*/
typedef struct
{
ino_t id;
IsoStream *orig;
IsoExternalFilterCommand *cmd;
off_t size; /* -1 means that the size is unknown yet */
ExternalFilterRuntime *running; /* is non-NULL when open */
} ExternalFilterStreamData;
/* Each individual ExternalFilterStreamData needs a unique id number. */
/* >>> This is very suboptimal:
The counter can rollover.
*/
static ino_t extf_ino_id = 0;
/*
* Methods for the IsoStreamIface of an External Filter object.
*/
/*
* @param flag bit0= do not run .get_size() if size is < 0
*/
static
int extf_stream_open_flag(IsoStream *stream, int flag)
{
ExternalFilterStreamData *data;
ExternalFilterRuntime *running = NULL;
pid_t child_pid;
int send_pipe[2], recv_pipe[2], ret, stream_open = 0;
send_pipe[0] = send_pipe[1] = recv_pipe[0] = recv_pipe[1] = -1;
if (stream == NULL) {
return ISO_NULL_POINTER;
}
data = (ExternalFilterStreamData*)stream->data;
if (data->running != NULL) {
return ISO_FILE_ALREADY_OPENED;
}
if (data->size < 0 && !(flag & 1)) {
/* Do the size determination run now, so that the size gets cached
and .get_size() will not fail on an opened stream.
*/
stream->class->get_size(stream);
}
ret = iso_stream_open(data->orig);
if (ret < 0) {
return ret;
}
stream_open = 1;
ret = pipe(send_pipe);
if (ret == -1) {
ret = ISO_OUT_OF_MEM;
goto parent_failed;
}
ret = pipe(recv_pipe);
if (ret == -1) {
ret = ISO_OUT_OF_MEM;
goto parent_failed;
}
child_pid= fork();
if (child_pid == -1) {
ret = ISO_DATA_SOURCE_FATAL;
goto parent_failed;
}
if (child_pid != 0) {
/* parent */
running = calloc(sizeof(ExternalFilterRuntime), 1);
if (running == NULL) {
ret = ISO_OUT_OF_MEM;
goto parent_failed;
}
running->send_fd = send_pipe[1];
running->recv_fd = recv_pipe[0];
running->pid = child_pid;
running->eof = 0;
data->running = running;
/* Give up the child-side pipe ends */
close(send_pipe[0]);
close(recv_pipe[1]);
/* >>> ??? should one replace non-blocking read() by select () ? */
/* Make filter outlet non-blocking */
ret = fcntl(recv_pipe[0], F_GETFL);
if (ret != -1) {
ret |= O_NONBLOCK;
fcntl(recv_pipe[0], F_SETFL, ret);
}
return 1;
}
/* child */
/* Give up the parent-side pipe ends */
close(send_pipe[1]);
close(recv_pipe[0]);
/* attach pipe ends to stdin and stdout */;
close(0);
ret = dup2(send_pipe[0], 0);
if (ret == -1) {
goto child_failed;
}
close(1);
ret = dup2(recv_pipe[1], 1);
if (ret == -1) {
goto child_failed;
}
/* Self conversion into external program */
execv(data->cmd->path, data->cmd->argv); /* should never come back */
child_failed:;
fprintf(stderr,"--- execution of external filter command failed:\n");
fprintf(stderr," %s\n", data->cmd->path);
exit(127);
parent_failed:;
if (stream_open)
iso_stream_close(data->orig);
if(send_pipe[0] != -1)
close(send_pipe[0]);
if(send_pipe[1] != -1)
close(send_pipe[1]);
if(recv_pipe[0] != -1)
close(recv_pipe[0]);
if(recv_pipe[1] != -1)
close(recv_pipe[1]);
return ret;
}
static
int extf_stream_open(IsoStream *stream)
{
return extf_stream_open_flag(stream, 0);
}
static
int extf_stream_close(IsoStream *stream)
{
int ret, status;
ExternalFilterStreamData *data;
if (stream == NULL) {
return ISO_NULL_POINTER;
}
data = stream->data;
if (data->running != NULL) {
if(data->running->recv_fd != -1)
close(data->running->recv_fd);
if(data->running->send_fd != -1)
close(data->running->send_fd);
ret = waitpid(data->running->pid, &status, WNOHANG);
if (ret == -1) {
kill(data->running->pid, SIGKILL);
waitpid(data->running->pid, &status, 0);
}
free(data->running);
data->running = NULL;
}
return iso_stream_close(data->orig);
}
static
int extf_stream_read(IsoStream *stream, void *buf, size_t desired)
{
int ret, in_done = 0, blocking = 0;
ExternalFilterStreamData *data;
uint8_t pipebuf[2048]; /* keep this small, not to clog the input */
size_t fill = 0;
if (stream == NULL) {
return ISO_NULL_POINTER;
}
data = stream->data;
if (data->running == NULL) {
return ISO_FILE_NOT_OPENED;
}
if (data->running->eof) {
return 0;
}
while (1) {
/* Try to read desired amount from filter */;
while (1) {
/* >>> ??? should one replace non-blocking read() by select () ? */
ret = read(data->running->recv_fd, ((char *) buf) + fill,
desired - fill);
if (ret == -1) {
if (errno == EAGAIN)
break;
return ISO_FILE_READ_ERROR;
}
fill += ret;
if (ret == 0) {
data->running->eof = 1;
}
if (ret == 0 || fill >= desired) {
return fill;
}
}
if (in_done) {
/* >>> ??? should one replace non-blocking read() by select () ? */
/* Make filter outlet blocking */
ret = fcntl(data->running->recv_fd, F_GETFL);
if (ret != -1) {
ret &= ~O_NONBLOCK;
fcntl(data->running->recv_fd, F_SETFL, ret);
}
blocking = 1;
usleep(1000); /* just in case it is still non-blocking */
continue;
}
ret = iso_stream_read(data->orig, pipebuf, sizeof(pipebuf));
if (ret < 0) {
return ret;
}
if (ret == 0) {
in_done = 1;
close(data->running->send_fd); /* Tell the filter: it is over */
data->running->send_fd = -1;
} else {
ret = write(data->running->send_fd, pipebuf, ret);
if (ret == -1) {
/* From the view of the caller it _is_ a read error */
return ISO_FILE_READ_ERROR;
}
}
}
return ISO_FILE_READ_ERROR; /* should never be hit */
}
static
off_t extf_stream_get_size(IsoStream *stream)
{
int ret, ret_close;
off_t count = 0;
ExternalFilterStreamData *data;
char buf[64 * 1024];
size_t bufsize = 64 * 1024;
if (stream == NULL) {
return ISO_NULL_POINTER;
}
data = stream->data;
if (data->size >= 0) {
return data->size;
}
/* Run filter command and count output bytes */
ret = extf_stream_open_flag(stream, 1);
if (ret < 0) {
return ret;
}
while (1) {
ret = extf_stream_read(stream, buf, bufsize);
if (ret <= 0)
break;
count += ret;
}
ret_close = extf_stream_close(stream);
if (ret < 0)
return ret;
if (ret_close < 0)
return ret_close;
data->size = count;
return count;
}
static
int extf_stream_is_repeatable(IsoStream *stream)
{
/* Only repeatable streams are accepted as orig */
return 1;
}
static
void extf_stream_get_id(IsoStream *stream, unsigned int *fs_id,
dev_t *dev_id, ino_t *ino_id)
{
ExternalFilterStreamData *data;
data = stream->data;
*fs_id = ISO_FILTER_FS_ID;
*dev_id = ISO_FILTER_EXTERNAL_DEV_ID;
*ino_id = data->id;
}
static
void extf_stream_free(IsoStream *stream)
{
ExternalFilterStreamData *data;
if (stream == NULL) {
return;
}
data = stream->data;
iso_stream_unref(data->orig);
if (data->cmd->refcount > 0)
data->cmd->refcount--;
free(data);
}
IsoStreamIface extf_stream_class = {
0,
"extf",
extf_stream_open,
extf_stream_close,
extf_stream_get_size,
extf_stream_read,
extf_stream_is_repeatable,
extf_stream_get_id,
extf_stream_free
};
static
void extf_filter_free(FilterContext *filter)
{
/* no data are allocated */;
}
/* To be called by iso_file_add_filter().
* The FilterContext input parameter is not furtherly needed for the
* emerging IsoStream.
*/
static
int extf_filter_get_filter(FilterContext *filter, IsoStream *original,
IsoStream **filtered)
{
IsoStream *str;
ExternalFilterStreamData *data;
IsoExternalFilterCommand *cmd;
if (filter == NULL || original == NULL || filtered == NULL) {
return ISO_NULL_POINTER;
}
cmd = (IsoExternalFilterCommand *) filter->data;
if (cmd->refcount + 1 <= 0) {
return ISO_EXTF_TOO_OFTEN;
}
str = malloc(sizeof(IsoStream));
if (str == NULL) {
return ISO_OUT_OF_MEM;
}
data = malloc(sizeof(ExternalFilterStreamData));
if (str == NULL) {
free(str);
return ISO_OUT_OF_MEM;
}
/* These data items are not owned by this filter object */
data->id = ++extf_ino_id;
data->orig = original;
data->cmd = cmd;
data->size = -1;
data->running = NULL;
/* get reference to the source */
iso_stream_ref(data->orig);
str->refcount = 1;
str->data = data;
str->class = &extf_stream_class;
*filtered = str;
cmd->refcount++;
return ISO_SUCCESS;
}
/* Produce a parameter object suitable for iso_file_add_filter().
* It may be disposed by free() after all those calls are made.
*
* This is an internal call of libisofs to be used by an API call that
* attaches an IsoExternalFilterCommand to one or more IsoFile objects.
* See libisofs.h for IsoExternalFilterCommand.
*/
static
int extf_create_context(IsoExternalFilterCommand *cmd,
FilterContext **filter, int flag)
{
FilterContext *f;
*filter = f = calloc(1, sizeof(FilterContext));
if (f == NULL) {
return ISO_OUT_OF_MEM;
}
f->refcount = 1;
f->version = 0;
f->data = cmd;
f->free = extf_filter_free;
f->get_filter = extf_filter_get_filter;
return ISO_SUCCESS;
}
/*
* A function which adds a filter to an IsoFile shall create a temporary
* FilterContext by iso_extf_create_context(), use it in one or more calls
* of filter.c:iso_file_add_filter() and finally dispose it by free().
*/
int iso_file_add_external_filter(IsoFile *file, IsoExternalFilterCommand *cmd,
int flag)
{
int ret;
FilterContext *f = NULL;
IsoStream *stream;
ret = extf_create_context(cmd, &f, 0);
if (ret < 0) {
return ret;
}
ret = iso_file_add_filter(file, f, 0);
free(f);
if (ret < 0) {
return ret;
}
/* Run a full filter process getsize so that the size is cached */
stream = iso_file_get_stream(file);
ret = iso_stream_get_size(stream);
if (ret < 0) {
return ret;
}
return ISO_SUCCESS;
}