RFReview: remove aync IO from yppush

Thomas Quinot thomas at FreeBSD.ORG
Fri Aug 11 21:18:55 UTC 2006


All,

Our implementation of yppush is severely flawed, in that it makes
unsafe and unnecessary use of asynchronous I/O (F_ASYNC, SIGIO) on RPC
sockets. This specifically causes incorrect behaviour when pushing maps
to multiple server, and SIGIO occurs while a memory allocation operation
is in progress, because the processing done during the handling of SIGIO
itself attemps to use the memory allocator, which is not reentrant.

So, I'm proposing to get rid of all the asynchronous I/O stuff there,
and implement that properly using select(2) and a clear single thread
of control.

I have successfully tested the patch below on my system, both with a
single outgoing job and with multiple parallel jobs, and it seems to
work correctly. It should be completely equivalent functionally to the
previous implementation, except that it ensures the absence of unsafe
reentrant processing.

I would appreciate reviews and comments on this patch; I intend to
commit it in a few days' time, unless an objection is raised. Ideally I
think it would be nice to then MFC it in time for 6.2, but that will
really depend on the feedback on the change, and RE's call on the
matter.

Thomas.

Index: yppush_main.c
===================================================================
RCS file: /space/mirror/ncvs/src/usr.sbin/yppush/yppush_main.c,v
retrieving revision 1.20
diff -u -r1.20 yppush_main.c
--- yppush_main.c	12 Apr 2005 15:02:57 -0000	1.20
+++ yppush_main.c	11 Aug 2006 21:17:21 -0000
@@ -63,14 +63,12 @@
 int verbose = 0;		/* Toggle verbose mode. */
 unsigned long yppush_transid = 0;
 int yppush_timeout = 80;	/* Default timeout. */
-int yppush_jobs = 0;		/* Number of allowed concurrent jobs. */
+int yppush_jobs = 1;		/* Number of allowed concurrent jobs. */
 int yppush_running_jobs = 0;	/* Number of currently running jobs. */
-int yppush_alarm_tripped = 0;
 
 /* Structure for holding information about a running job. */
 struct jobs {
 	unsigned long tid;
-	int sock;
 	int port;
 	ypxfrstat stat;
 	unsigned long prognum;
@@ -82,6 +80,8 @@
 
 struct jobs *yppush_joblist;	/* Linked list of running jobs. */
 
+static int yppush_svc_run(int);
+
 /*
  * Local error messages.
  */
@@ -171,11 +171,7 @@
 				yp_error("%d transfer%sstill pending",
 					still_pending,
 					still_pending > 1 ? "s " : " ");
-			yppush_alarm_tripped = 0;
-			alarm(YPPUSH_RESPONSE_TIMEOUT);
-			pause();
-			alarm(0);
-			if (yppush_alarm_tripped == 1) {
+			if (yppush_svc_run (YPPUSH_RESPONSE_TIMEOUT) == 0) {
 				yp_error("timed out");
 				now = 1;
 			}
@@ -212,37 +208,30 @@
 		yppush_exit(1);
 	}
 
-	if (sig == SIGALRM) {
-		alarm(0);
-		yppush_alarm_tripped++;
-	}
-
 	return;
 }
 
 /*
  * Dispatch loop for callback RPC services.
+ * Return value:
+ *   -1 error
+ *    0 timeout
+ *   >0 request serviced
  */
-static void
-yppush_svc_run(void)
+static int
+yppush_svc_run(int timeout_secs)
 {
-#ifdef FD_SETSIZE
+	int rc;
 	fd_set readfds;
-#else
-	int readfds;
-#endif /* def FD_SETSIZE */
 	struct timeval timeout;
 
 	timeout.tv_usec = 0;
-	timeout.tv_sec = 5;
+	timeout.tv_sec = timeout_secs;
 
 retry:
-#ifdef FD_SETSIZE
 	readfds = svc_fdset;
-#else
-	readfds = svc_fds;
-#endif /* def FD_SETSIZE */
-	switch (select(_rpc_dtablesize(), &readfds, NULL, NULL, &timeout)) {
+	rc = select(svc_maxfd + 1, &readfds, NULL, NULL, &timeout);
+	switch (rc) {
 	case -1:
 		if (errno == EINTR)
 			goto retry;
@@ -255,25 +244,7 @@
 		svc_getreqset(&readfds);
 		break;
 	}
-	return;
-}
-
-/*
- * Special handler for asynchronous socket I/O. We mark the
- * sockets of the callback handlers as O_ASYNC and handle SIGIO
- * events here, which will occur when the callback handler has
- * something interesting to tell us.
- */
-static void
-async_handler(int sig)
-{
-	yppush_svc_run();
-
-	/* reset any pending alarms. */
-	alarm(0);
-	yppush_alarm_tripped++;
-	kill(getpid(), SIGALRM);
-	return;
+	return rc;
 }
 
 /*
@@ -415,7 +386,6 @@
 	job->stat = 0;
 	job->tid = tid;
 	job->port = xprt->xp_port;
-	job->sock = xprt->xp_fd; /*XXX: Evil!! EEEEEEEVIL!!! */
 	job->server = strdup(server);
 	job->map = strdup(map);
 	job->prognum = prognum;
@@ -423,27 +393,6 @@
 	job->next = yppush_joblist;
 	yppush_joblist = job;
 
-	/*
-	 * Set the RPC sockets to asynchronous mode. This will
-	 * cause the system to smack us with a SIGIO when an RPC
-	 * callback is delivered. This in turn allows us to handle
-	 * the callback even though we may be in the middle of doing
-	 * something else at the time.
-	 *
-	 * XXX This is a horrible thing to do for two reasons,
-	 * both of which have to do with portability:
-	 * 1) We really ought not to be sticking our grubby mits
-	 *    into the RPC service transport handle like this.
-	 * 2) Even in this day and age, there are still some *NIXes
-	 *    that don't support async socket I/O.
-	 */
-	if (fcntl(xprt->xp_fd, F_SETOWN, getpid()) == -1 ||
-	    fcntl(xprt->xp_fd, F_SETFL, O_ASYNC) == -1) {
-		yp_error("failed to set async I/O mode: %s",
-			 strerror(errno));
-		yppush_exit(1);
-	}
-
 	if (verbose) {
 		yp_error("initiating transfer: %s -> %s (transid = %lu)",
 			yppush_mapname, server, tid);
@@ -482,27 +431,12 @@
 	snprintf(server, sizeof(server), "%.*s", vallen, val);
 
 	/*
-	 * Restrict the number of concurrent jobs. If yppush_jobs number
+	 * Restrict the number of concurrent jobs: if yppush_jobs number
 	 * of jobs have already been dispatched and are still pending,
 	 * wait for one of them to finish so we can reuse its slot.
 	 */
-	if (yppush_jobs <= 1) {
-		yppush_alarm_tripped = 0;
-		while (!yppush_alarm_tripped && yppush_running_jobs) {
-			alarm(yppush_timeout);
-			yppush_alarm_tripped = 0;
-			pause();
-			alarm(0);
-		}
-	} else {
-		yppush_alarm_tripped = 0;
-		while (!yppush_alarm_tripped && yppush_running_jobs >= yppush_jobs) {
-			alarm(yppush_timeout);
-			yppush_alarm_tripped = 0;
-			pause();
-			alarm(0);
-		}
-	}
+	while (yppush_running_jobs >= yppush_jobs && (yppush_svc_run (yppush_timeout) > 0))
+		;
 
 	/* Cleared for takeoff: set everything in motion. */
 	if (yp_push(server, yppush_mapname, yppush_transid))
@@ -633,22 +567,6 @@
 	signal(SIGINT, handler);
 	signal(SIGABRT, handler);
 
-	/*
-	 * Set up the SIGIO handler. Make sure that some of the
-	 * other signals are blocked while the handler is running so
-	 * select() doesn't get interrupted.
-	 */
-	sigemptyset(&sa.sa_mask);
-	sigaddset(&sa.sa_mask, SIGIO); /* Goes without saying. */
-	sigaddset(&sa.sa_mask, SIGPIPE);
-	sigaddset(&sa.sa_mask, SIGCHLD);
-	sigaddset(&sa.sa_mask, SIGALRM);
-	sigaddset(&sa.sa_mask, SIGINT);
-	sa.sa_handler = async_handler;
-	sa.sa_flags = 0;
-
-	sigaction(SIGIO, &sa, NULL);
-
 	/* set initial transaction ID */
 	yppush_transid = time((time_t *)NULL);
 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 187 bytes
Desc: not available
Url : http://lists.freebsd.org/pipermail/freebsd-net/attachments/20060811/ea2ad579/attachment.pgp


More information about the freebsd-net mailing list