New API call Xorriso_peek_outlists()

This commit is contained in:
Thomas Schmitt 2012-09-15 17:04:32 +00:00
parent 6338febf8f
commit 634b20ec05
7 changed files with 175 additions and 26 deletions

View File

@ -292,6 +292,7 @@ Xorriso_option_write_type;
Xorriso_option_xattr; Xorriso_option_xattr;
Xorriso_option_zisofs; Xorriso_option_zisofs;
Xorriso_parse_line; Xorriso_parse_line;
Xorriso_peek_outlists;
Xorriso_prescan_args; Xorriso_prescan_args;
Xorriso_process_errfile; Xorriso_process_errfile;
Xorriso_process_msg_queues; Xorriso_process_msg_queues;

View File

@ -328,6 +328,8 @@ int Xorriso_new(struct XorrisO ** xorriso,char *progname, int flag)
m->msgw_info_handler= NULL; m->msgw_info_handler= NULL;
m->msgw_info_handle= NULL; m->msgw_info_handle= NULL;
m->msgw_stack_handle= -1; m->msgw_stack_handle= -1;
m->msgw_msg_pending= 0;
m->msgw_fetch_lock_ini= 0;
m->msglist_stackfill= 0; m->msglist_stackfill= 0;
m->status_history_max= Xorriso_status_history_maX; m->status_history_max= Xorriso_status_history_maX;
m->scsi_log= 0; m->scsi_log= 0;
@ -465,6 +467,10 @@ int Xorriso_new(struct XorrisO ** xorriso,char *progname, int flag)
if(ret != 0) if(ret != 0)
goto failure; goto failure;
m->msg_watcher_lock_ini= 1; m->msg_watcher_lock_ini= 1;
ret= pthread_mutex_init(&(m->msgw_fetch_lock), NULL);
if(ret != 0)
goto failure;
m->msgw_fetch_lock_ini= 1;
if(leafname != NULL) if(leafname != NULL)
free(leafname); free(leafname);
@ -558,6 +564,8 @@ int Xorriso_destroy(struct XorrisO **xorriso, int flag)
pthread_mutex_destroy(&(m->problem_status_lock)); pthread_mutex_destroy(&(m->problem_status_lock));
if(m->msg_watcher_lock_ini) if(m->msg_watcher_lock_ini)
pthread_mutex_destroy(&(m->msg_watcher_lock)); pthread_mutex_destroy(&(m->msg_watcher_lock));
if(m->msgw_fetch_lock_ini)
pthread_mutex_destroy(&(m->msgw_fetch_lock));
free((char *) m); free((char *) m);
*xorriso= NULL; *xorriso= NULL;

View File

@ -1726,7 +1726,7 @@ next_command:;
/* Test setup for for Xorriso_push_outlists() et.al. */ /* Test setup for for Xorriso_push_outlists() et.al. */
{ {
int stack_handle = -1, line_count; int stack_handle = -1, line_count= 0;
struct Xorriso_lsT *result_list, *info_list; struct Xorriso_lsT *result_list, *info_list;
int Xorriso_process_msg_lists(struct XorrisO *xorriso, int Xorriso_process_msg_lists(struct XorrisO *xorriso,
struct Xorriso_lsT *result_list, struct Xorriso_lsT *result_list,
@ -1736,12 +1736,11 @@ next_command:;
(*idx)++; (*idx)++;
if(strcmp(arg1, "push") == 0) { if(strcmp(arg1, "push") == 0) {
ret= Xorriso_push_outlists(xorriso, &stack_handle, 3); ret= Xorriso_push_outlists(xorriso, &stack_handle, 3);
fprintf(stderr, "xorriso -test: Xorriso_push() = %d, handle = %d\n", fprintf(stderr, "xorriso -test: push = %d, handle = %d\n",
ret, stack_handle); ret, stack_handle);
} else if(strcmp(arg1, "pull") == 0) { } else if(strcmp(arg1, "pull") == 0) {
ret= Xorriso_pull_outlists(xorriso, -1, &result_list, &info_list, 0); ret= Xorriso_pull_outlists(xorriso, -1, &result_list, &info_list, 0);
fprintf(stderr, "xorriso -test: Xorriso_push() = %d, handle = %d\n", fprintf(stderr, "xorriso -test: pull = %d\n", ret);
ret, stack_handle);
if(ret > 0) { if(ret > 0) {
ret= Xorriso_process_msg_lists(xorriso, result_list, info_list, ret= Xorriso_process_msg_lists(xorriso, result_list, info_list,
&line_count, 0); &line_count, 0);
@ -1751,8 +1750,7 @@ next_command:;
} }
} else if(strcmp(arg1, "fetch") == 0) { } else if(strcmp(arg1, "fetch") == 0) {
ret= Xorriso_fetch_outlists(xorriso, -1, &result_list, &info_list, 0); ret= Xorriso_fetch_outlists(xorriso, -1, &result_list, &info_list, 0);
fprintf(stderr, "xorriso -test: Xorriso_fetch() = %d, handle = %d\n", fprintf(stderr, "xorriso -test: fetch = %d\n", ret);
ret, stack_handle);
if(ret > 0) { if(ret > 0) {
ret= Xorriso_process_msg_lists(xorriso, result_list, info_list, ret= Xorriso_process_msg_lists(xorriso, result_list, info_list,
&line_count, 0); &line_count, 0);
@ -1760,6 +1758,16 @@ next_command:;
"xorriso -test: Xorriso_process_msg_lists() = %d, line_count = %d\n", "xorriso -test: Xorriso_process_msg_lists() = %d, line_count = %d\n",
ret, line_count); ret, line_count);
} }
} else if(strcmp(arg1, "peek") == 0) {
ret= Xorriso_peek_outlists(xorriso, -1, 0, 0);
fprintf(stderr, "xorriso -test: peek = %d\n", ret);
} else if(strcmp(arg1, "sleep_peek") == 0) {
usleep(1000000);
ret= Xorriso_peek_outlists(xorriso, -1, 0, 0);
fprintf(stderr, "xorriso -test: sleep_peek = %d\n", ret);
} else if(strcmp(arg1, "peek_loop") == 0) {
ret= Xorriso_peek_outlists(xorriso, -1, 3, 4);
fprintf(stderr, "xorriso -test: peek_loop = %d\n", ret);
} else if(strcmp(arg1, "start") == 0) { } else if(strcmp(arg1, "start") == 0) {
ret= Xorriso_start_msg_watcher(xorriso, NULL, NULL, NULL, NULL, 0); ret= Xorriso_start_msg_watcher(xorriso, NULL, NULL, NULL, NULL, 0);
fprintf(stderr, "xorriso -test: Xorriso_start_msg_watcher() = %d\n", ret); fprintf(stderr, "xorriso -test: Xorriso_start_msg_watcher() = %d\n", ret);

View File

@ -522,44 +522,76 @@ ex:;
} }
static int Xorriso_lock_outlists(struct XorrisO *xorriso, int flag) /* @param flag bit0= no error message in case of failure
*/
static int Xorriso_obtain_lock(struct XorrisO *xorriso,
pthread_mutex_t *lock_handle,
char *purpose, int flag)
{ {
int ret; int ret;
static int complaints= 0, complaint_limit= 5; static int complaints= 0, complaint_limit= 5;
ret= pthread_mutex_lock(&(xorriso->result_msglists_lock)); ret= pthread_mutex_lock(lock_handle);
if(ret != 0) { if(ret != 0) {
if(flag & 1)
return(-1);
/* Cannot report failure through the failing message output system */ /* Cannot report failure through the failing message output system */
complaints++; complaints++;
if(complaints < complaint_limit) if(complaints <= complaint_limit)
fprintf(stderr, fprintf(stderr,
"xorriso : pthread_mutex_lock() for outlists returns %d\n", "xorriso : pthread_mutex_lock() for %s returns %d\n",
ret); purpose, ret);
return(-1); return(-1);
} }
return(1); return(1);
} }
static int Xorriso_unlock_outlists(struct XorrisO *xorriso, int flag) /* @param flag bit0= no error message in case of failure
*/
static int Xorriso_release_lock(struct XorrisO *xorriso,
pthread_mutex_t *lock_handle,
char *purpose, int flag)
{ {
int ret; int ret;
static int complaints= 0, complaint_limit= 5; static int complaints= 0, complaint_limit= 5;
ret= pthread_mutex_unlock(&(xorriso->result_msglists_lock)); ret= pthread_mutex_unlock(lock_handle);
if(ret != 0) { if(ret != 0) {
if(flag & 1)
return(0);
/* Cannot report failure through the failing message output system */ /* Cannot report failure through the failing message output system */
complaints++; complaints++;
if(complaints < complaint_limit) if(complaints <= complaint_limit)
fprintf(stderr, fprintf(stderr,
"xorriso : pthread_mutex_unlock() for outlists returns %d\n", "xorriso : pthread_mutex_unlock() for %s returns %d\n",
ret); purpose, ret);
return(0); return(0);
} }
return(1); return(1);
} }
static int Xorriso_lock_outlists(struct XorrisO *xorriso, int flag)
{
int ret;
ret= Xorriso_obtain_lock(xorriso, &(xorriso->result_msglists_lock),
"outlists", 0);
return(ret);
}
static int Xorriso_unlock_outlists(struct XorrisO *xorriso, int flag)
{
int ret;
ret= Xorriso_release_lock(xorriso, &(xorriso->result_msglists_lock),
"outlists", 0);
return(ret);
}
static int Xorriso_write_to_msglist(struct XorrisO *xorriso, static int Xorriso_write_to_msglist(struct XorrisO *xorriso,
struct Xorriso_lsT **xorriso_msglist, struct Xorriso_lsT **xorriso_msglist,
char *text, int flag) char *text, int flag)
@ -850,13 +882,6 @@ int Xorriso_fetch_outlists(struct XorrisO *xorriso, int stack_handle,
#ifdef Xorriso_fetch_with_msg_queueS #ifdef Xorriso_fetch_with_msg_queueS
/* >>> This would need more locking around
xorriso->problem_status= sev;
Probably also locking of
Xorriso_info(), Xorriso_result
because unredirected output could collide.
*/
ret= Xorriso_process_msg_queues(xorriso, 0); ret= Xorriso_process_msg_queues(xorriso, 0);
if(ret <= 0) if(ret <= 0)
goto ex; goto ex;
@ -896,6 +921,59 @@ ex:;
} }
int Xorriso_peek_outlists(struct XorrisO *xorriso, int stack_handle,
int timeout, int flag)
{
int ret, locked= 0, yes= 0;
static int u_wait= 19000;
time_t start_time;
if((flag & 3) == 0)
flag|= 3;
if(stack_handle == -1)
stack_handle= xorriso->msglist_stackfill - 1;
start_time= time(NULL);
try_again:;
ret= Xorriso_obtain_lock(xorriso, &(xorriso->msgw_fetch_lock),
"message watcher fetch operation", 0);
if(ret <= 0)
{yes= -2; goto ex;}
locked= 1;
yes= 0;
if(stack_handle < 0 || stack_handle >= xorriso->msglist_stackfill)
{yes= -1; goto ex;}
if(flag & 1)
yes|= (xorriso->result_msglists[stack_handle] != NULL);
if(flag & 2)
yes|= (xorriso->info_msglists[stack_handle] != NULL);
if(xorriso->msg_watcher_state == 2 && xorriso->msgw_msg_pending)
yes|= 2;
ret= Xorriso_release_lock(xorriso, &(xorriso->msgw_fetch_lock),
"message watcher fetch operation", 0);
if(ret <= 0)
{yes= -2; goto ex;}
locked= 0;
if(yes && (flag & 4)) {
usleep(u_wait);
if(time(NULL) <= start_time + timeout)
goto try_again;
}
ex:;
if(locked) {
ret= Xorriso_release_lock(xorriso, &(xorriso->msgw_fetch_lock),
"message watcher fetch operation", 0);
if(ret <= 0 && yes >= 0)
yes= -2;
}
return(yes);
}
int Xorriso_pull_outlists(struct XorrisO *xorriso, int stack_handle, int Xorriso_pull_outlists(struct XorrisO *xorriso, int stack_handle,
struct Xorriso_lsT **result_list, struct Xorriso_lsT **result_list,
struct Xorriso_lsT **info_list, int flag) struct Xorriso_lsT **info_list, int flag)
@ -1025,18 +1103,31 @@ static void *Xorriso_msg_watcher(void *state_pt)
if(xorriso->msg_watcher_state == 3) if(xorriso->msg_watcher_state == 3)
break; break;
Xorriso_obtain_lock(xorriso, &(xorriso->msgw_fetch_lock),
"message watcher fetch operation", 1);
xorriso->msgw_msg_pending= 1;
ret= Xorriso_fetch_outlists(xorriso, -1, &result_list, &info_list, 3); ret= Xorriso_fetch_outlists(xorriso, -1, &result_list, &info_list, 3);
if(ret < 0)
break;
if(ret > 0) { if(ret > 0) {
/* Process fetched lines */ /* Process fetched lines */
xorriso->msgw_msg_pending= 2;
Xorriso_release_lock(xorriso, &(xorriso->msgw_fetch_lock),
"message watcher fetch operation", 1);
ret= Xorriso_process_msg_lists(xorriso, result_list, info_list, ret= Xorriso_process_msg_lists(xorriso, result_list, info_list,
&line_count, 0); &line_count, 0);
xorriso->msgw_msg_pending= 0;
Xorriso_lst_destroy_all(&result_list, 0); Xorriso_lst_destroy_all(&result_list, 0);
Xorriso_lst_destroy_all(&info_list, 0); Xorriso_lst_destroy_all(&info_list, 0);
if(ret < 0) if(ret < 0)
break; break;
} else {
xorriso->msgw_msg_pending= 0;
Xorriso_release_lock(xorriso, &(xorriso->msgw_fetch_lock),
"message watcher fetch operation", 1);
} }
xorriso->msgw_msg_pending= 0;
if(ret < 0)
break;
if(line_count < sleep_thresh) if(line_count < sleep_thresh)
usleep(u_wait); usleep(u_wait);
@ -1179,13 +1270,24 @@ int Xorriso_stop_msg_watcher(struct XorrisO *xorriso, int flag)
usleep(u_wait); usleep(u_wait);
} }
Xorriso_obtain_lock(xorriso, &(xorriso->msgw_fetch_lock),
"message watcher fetch operation", 1);
xorriso->msgw_msg_pending= 1;
ret= Xorriso_pull_outlists(xorriso, xorriso->msgw_stack_handle, ret= Xorriso_pull_outlists(xorriso, xorriso->msgw_stack_handle,
&result_list, &info_list, 0); &result_list, &info_list, 0);
if(ret > 0) { if(ret > 0) {
xorriso->msgw_msg_pending= 2;
Xorriso_release_lock(xorriso, &(xorriso->msgw_fetch_lock),
"message watcher fetch operation", 1);
Xorriso_process_msg_lists(xorriso, result_list, info_list, Xorriso_process_msg_lists(xorriso, result_list, info_list,
&line_count, 0); &line_count, 0);
xorriso->msgw_msg_pending= 0;
Xorriso_lst_destroy_all(&result_list, 0); Xorriso_lst_destroy_all(&result_list, 0);
Xorriso_lst_destroy_all(&info_list, 0); Xorriso_lst_destroy_all(&info_list, 0);
} else {
xorriso->msgw_msg_pending= 0;
Xorriso_release_lock(xorriso, &(xorriso->msgw_fetch_lock),
"message watcher fetch operation", 1);
} }
xorriso->msgw_result_handler= NULL; xorriso->msgw_result_handler= NULL;

View File

@ -628,6 +628,33 @@ int Xorriso_fetch_outlists(struct XorrisO *xorriso, int stack_handle,
struct Xorriso_lsT **info_list, int flag); struct Xorriso_lsT **info_list, int flag);
/** Inquire whether messages are pending in redirected result and info channel.
This may be used to determine whether a concurrent message watcher already
has obtained all pending messages.
@since 1.2.6
@param xorriso The environment handle
@param stack_handle An id number returned by Xorriso_push_outlists()
and not yet revoked by Xorriso_pull_outlists().
Submit -1 to address the most recent valid id.
@param timeout Number of seconds after which to return despite
flag bit 2
@param flag Bitfield for control purposes
bit0= fetch result channel
bit1= fetch info channel
bit2= wait and retry until return is 0 or -1
If bit0 and bit1 are 0, both channels get fetched.
@return If > 0:
bit0= messages are pending in outlists
bit1= message watcher is processing fetched messages
Else:
0= no messages are pending anywhere
-1= inappropriate stack_handle
-2= locking failed
*/
int Xorriso_peek_outlists(struct XorrisO *xorriso, int stack_handle,
int timeout, int flag);
/** Disable the redirection given by stack_handle. If it was the current /** Disable the redirection given by stack_handle. If it was the current
receiver of messages then switch output to the next older redirection receiver of messages then switch output to the next older redirection
resp. to the normal channels if no redirections are stacked any more. resp. to the normal channels if no redirections are stacked any more.

View File

@ -587,6 +587,9 @@ struct XorrisO { /* the global context of xorriso */
int (*msgw_info_handler)(void *handle, char *text); int (*msgw_info_handler)(void *handle, char *text);
void *msgw_info_handle; void *msgw_info_handle;
int msgw_stack_handle; int msgw_stack_handle;
int msgw_msg_pending; /* 0=no, 1=fetching(i.e. maybe) , 2=yes */
int msgw_fetch_lock_ini;
pthread_mutex_t msgw_fetch_lock;
int status_history_max; /* for -status long_history */ int status_history_max; /* for -status long_history */

View File

@ -1 +1 @@
#define Xorriso_timestamP "2012.09.15.095146" #define Xorriso_timestamP "2012.09.15.170346"