svn commit: r188891 - user/lstewart/alq_varlen_8.x/sys/kern

Lawrence Stewart lstewart at FreeBSD.org
Fri Feb 20 20:42:19 PST 2009


Author: lstewart
Date: Sat Feb 21 04:42:19 2009
New Revision: 188891
URL: http://svn.freebsd.org/changeset/base/188891

Log:
  Hopefully fixes the bugs I've been seeing. Also addresses some memory leaks and
  minor style nits.

Modified:
  user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c

Modified: user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c
==============================================================================
--- user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c	Sat Feb 21 03:53:06 2009	(r188890)
+++ user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c	Sat Feb 21 04:42:19 2009	(r188891)
@@ -77,6 +77,8 @@ struct alq {
 #define	ALQ_LOCK(alq)	mtx_lock_spin(&(alq)->aq_mtx)
 #define	ALQ_UNLOCK(alq)	mtx_unlock_spin(&(alq)->aq_mtx)
 
+#define ALQ_HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
+
 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
 
 /*
@@ -186,10 +188,11 @@ ald_daemon(void)
 
 	for (;;) {
 		while ((alq = LIST_FIRST(&ald_active)) == NULL
-				&& !ald_shutingdown)
+		    && !ald_shutingdown)
 			mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
 
-		if (ald_shutingdown) {
+		/* Don't shutdown until all active alq's are flushed */
+		if (ald_shutingdown && alq != NULL) {
 			ALD_UNLOCK();
 			break;
 		}
@@ -213,8 +216,20 @@ ald_shutdown(void *arg, int howto)
 	struct alq *alq;
 
 	ALD_LOCK();
+
+	/* Ensure no new queues can be created */
 	ald_shutingdown = 1;
 
+	/* Shutdown all alqs prior to terminating the ald_daemon */
+	while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
+		LIST_REMOVE(alq, aq_link);
+		ALD_UNLOCK();
+		alq_shutdown(alq);
+		ALD_LOCK();
+	}
+
+	/* At this point, all alqs are flushed and shutdown */
+
 	/*
 	 * Wake ald_daemon so that it exits. It won't be able to do
 	 * anything until we mtx_sleep because we hold the ald_mtx
@@ -224,12 +239,6 @@ ald_shutdown(void *arg, int howto)
 	/* Wait for ald_daemon to exit */
 	mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
 
-	while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
-		LIST_REMOVE(alq, aq_link);
-		ALD_UNLOCK();
-		alq_shutdown(alq);
-		ALD_LOCK();
-	}
 	ALD_UNLOCK();
 }
 
@@ -241,15 +250,29 @@ alq_shutdown(struct alq *alq)
 	/* Stop any new writers. */
 	alq->aq_flags |= AQ_SHUTDOWN;
 
+	/*
+	 * If the alq isn't active but has unwritten data (possible if
+	 * the ALQ_NOACTIVATE flag has been used), explicitly activate the
+	 * alq here so that the pending data gets flushed by the ald_daemon.
+	 */
+	if (!(alq->aq_flags & AQ_ACTIVE) &&
+	    ALQ_HAS_PENDING_DATA(alq)) {
+		alq->aq_flags |= AQ_ACTIVE;
+		ALQ_UNLOCK(alq);
+		ALD_LOCK();
+		ald_activate(alq);
+		ALD_UNLOCK();
+		ALQ_LOCK(alq);
+	}
+
 	/* Drain IO */
-	while (alq->aq_flags & (AQ_FLUSHING|AQ_ACTIVE)) {
+	while (alq->aq_flags & AQ_ACTIVE) {
 		alq->aq_flags |= AQ_WANTED;
 		msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
 	}
 	ALQ_UNLOCK(alq);
 
-	vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
-	    curthread);
+	vn_close(alq->aq_vp, FWRITE, alq->aq_cred, curthread);
 	crfree(alq->aq_cred);
 }
 
@@ -268,7 +291,7 @@ alq_doio(struct alq *alq)
 	int iov;
 	int vfslocked;
 
-	KASSERT((alq->aq_freebytes != alq->aq_buflen),
+	KASSERT((ALQ_HAS_PENDING_DATA(alq)),
 		("%s: queue emtpy!", __func__)
 	);
 
@@ -280,16 +303,16 @@ alq_doio(struct alq *alq)
 	bzero(&aiov, sizeof(aiov));
 	bzero(&auio, sizeof(auio));
 
-	/* start the write from the location of our buffer tail pointer */
+	/* Start the write from the location of our buffer tail pointer. */
 	aiov[iov].iov_base = alq->aq_entbuf + alq->aq_writetail;
 
 	if (alq->aq_writetail < alq->aq_writehead) {
-		/* buffer not wrapped */
+		/* Buffer not wrapped */
 		totlen = aiov[iov].iov_len =  alq->aq_writehead -
 							alq->aq_writetail;
 	} else {
 		/*
-		 * buffer wrapped, requires 2 aiov entries:
+		 * Buffer wrapped, requires 2 aiov entries:
 		 * - first is from writetail to end of buffer
 		 * - second is from start of buffer to writehead
 		 */
@@ -336,16 +359,16 @@ alq_doio(struct alq *alq)
 	ALQ_LOCK(alq);
 	alq->aq_flags &= ~AQ_FLUSHING;
 
-	/* Adjust writetail as required, taking into account wrapping */
+	/* Adjust writetail as required, taking into account wrapping. */
 	alq->aq_writetail += (iov == 2) ? aiov[1].iov_len : totlen;
 	alq->aq_freebytes += totlen;
 
 	/*
 	 * If we just flushed the buffer completely,
-	 * reset indexes to 0 to minimise buffer wraps
-	 * This is also required to ensure alq_getn() can't wedge itself
+	 * reset indexes to 0 to minimise buffer wraps.
+	 * This is also required to ensure alq_getn() can't wedge itself.
 	 */
-	if (alq->aq_freebytes == alq->aq_buflen)
+	if (!ALQ_HAS_PENDING_DATA(alq))
 		alq->aq_writehead = alq->aq_writetail = 0;
 
 	if (alq->doio_debugcallback != NULL)
@@ -400,7 +423,7 @@ alq_open(struct alq **alqp, const char *
 
 	vfslocked = NDHASGIANT(&nd);
 	NDFREE(&nd, NDF_ONLY_PNBUF);
-	/* We just unlock so we hold a reference */
+	/* We just unlock so we hold a reference. */
 	VOP_UNLOCK(nd.ni_vp, 0);
 	VFS_UNLOCK_GIANT(vfslocked);
 
@@ -411,12 +434,12 @@ alq_open(struct alq **alqp, const char *
 	mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
 
 	if (count > 0) {
-		/* fixed length messages */
+		/* Fixed length messages. */
 		alq->aq_buflen = size * count;
 		alq->aq_entmax = count;
 		alq->aq_entlen = size;
 	} else {
-		/* variable length messages */
+		/* Variable length messages. */
 		alq->aq_buflen = size;
 		alq->aq_entmax = 0;
 		alq->aq_entlen = 0;
@@ -443,7 +466,7 @@ alq_open(struct alq **alqp, const char *
 int
 alq_write(struct alq *alq, void *data, int flags)
 {
-	/* should only be called in fixed length message (legacy) mode */
+	/* Should only be called in fixed length message (legacy) mode. */
 	KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0),
 		("%s: fixed length write on variable length queue", __func__)
 	);
@@ -465,17 +488,17 @@ alq_writen(struct alq *alq, void *data, 
 	/*
 	 * If the message is larger than our underlying buffer or
 	 * there is not enough free space in our underlying buffer
-	 * to accept the message and the user can't wait, return
+	 * to accept the message and the user can't wait, return.
 	 */
 	if ((len > alq->aq_buflen) ||
-		((flags & ALQ_NOWAIT) && (alq->aq_freebytes < len))) {
+	    ((flags & ALQ_NOWAIT) && (alq->aq_freebytes < len))) {
 		ALQ_UNLOCK(alq);
 		return (EWOULDBLOCK);
 	}
 
 	/*
-	 * ALQ_WAITOK or alq->aq_freebytes > len,
-	 * either spin until we have enough free bytes (former) or skip (latter)
+	 * ALQ_WAITOK or alq->aq_freebytes > len, either spin until
+	 * we have enough free bytes (former) or skip (latter).
 	 */
 	while (alq->aq_freebytes < len && (alq->aq_flags & AQ_SHUTDOWN) == 0) {
 		alq->aq_flags |= AQ_WANTED;
@@ -483,35 +506,35 @@ alq_writen(struct alq *alq, void *data, 
 	}
 
 	/*
-	 * we need to serialise wakups to ensure records remain in order...
-	 * therefore, wakeup the next thread in the queue waiting for
-	 * alq resources to be available
+	 * We need to serialise wakups to ensure records remain in order...
+	 * Therefore, wakeup the next thread in the queue waiting for
+	 * alq resources to be available.
 	 * (technically this is only required if we actually entered the above
 	 * while loop)
 	 */
 	wakeup_one(alq);
 
-	/* bail if we're shutting down */
+	/* Bail if we're shutting down. */
 	if (alq->aq_flags & AQ_SHUTDOWN) {
-		ALQ_UNLOCK(alq);
+	    ALQ_UNLOCK(alq);
 		return (EWOULDBLOCK);
 	}
 
 	/*
-	 * if we need to wrap the buffer to accommodate the write,
-	 * we'll need 2 calls to bcopy
+	 * If we need to wrap the buffer to accommodate the write,
+	 * we'll need 2 calls to bcopy.
 	 */
 	if ((alq->aq_buflen - alq->aq_writehead) < len)
 		copy = alq->aq_buflen - alq->aq_writehead;
 
-	/* copy (part of) message to the buffer */
+	/* Copy (part of) message to the buffer. */
 	bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
 	alq->aq_writehead += copy;
 
 	if (copy != len) {
 		/*
-		 * wrap the buffer by copying the remainder of our message
-		 * to the start of the buffer and resetting aq_writehead
+		 * Wrap the buffer by copying the remainder of our message
+		 * to the start of the buffer and resetting aq_writehead.
 		 */
 		bcopy(data, alq->aq_entbuf, len - copy);
 		alq->aq_writehead = len - copy;
@@ -539,7 +562,7 @@ alq_writen(struct alq *alq, void *data, 
 struct ale *
 alq_get(struct alq *alq, int flags)
 {
-	/* should only be called in fixed length message (legacy) mode */
+	/* Should only be called in fixed length message (legacy) mode. */
 	KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0),
 		("%s: fixed length get on variable length queue", __func__)
 	);
@@ -582,18 +605,19 @@ alq_getn(struct alq *alq, int len, int f
 	/*
 	 * If the message is larger than our underlying buffer or
 	 * there is not enough free contiguous space in our underlying buffer
-	 * to accept the message and the user can't wait, return
+	 * to accept the message and the user can't wait, return.
 	 */
 	if ((len > alq->aq_buflen) ||
 		((flags & ALQ_NOWAIT) && (contigbytes < len))) {
 		ALQ_UNLOCK(alq);
+		free(ale, M_ALD);
 		return (NULL);
 	}
 
 	/*
 	 * ALQ_WAITOK or contigbytes >= len,
 	 * either spin until we have enough free contiguous bytes (former)
-	 * or skip (latter)
+	 * or skip (latter).
 	 */
 	while (contigbytes < len && (alq->aq_flags & AQ_SHUTDOWN) == 0) {
 		alq->aq_flags |= AQ_WANTED;
@@ -616,6 +640,7 @@ alq_getn(struct alq *alq, int len, int f
 	/* Bail if we're shutting down */
 	if (alq->aq_flags & AQ_SHUTDOWN) {
 		ALQ_UNLOCK(alq);
+		free(ale, M_ALD);
 		return (NULL);
 	}
 
@@ -628,7 +653,7 @@ alq_getn(struct alq *alq, int len, int f
 	alq->aq_writehead += len;
 	alq->aq_freebytes -= len;
 
-	/* Wrap aq_writehead if we've filled to the end of the buffer */
+	/* Wrap aq_writehead if we've filled to the end of the buffer. */
 	if (alq->aq_writehead == alq->aq_buflen)
 		alq->aq_writehead = 0;
 
@@ -665,12 +690,15 @@ alq_flush(struct alq *alq)
 
 	ALD_LOCK();
 	ALQ_LOCK(alq);
-	if (alq->aq_flags & AQ_ACTIVE) {
+
+	if (alq->aq_flags & AQ_ACTIVE)
 		ald_deactivate(alq);
-		ALD_UNLOCK();
+
+	ALD_UNLOCK();
+
+	if (ALQ_HAS_PENDING_DATA(alq))
 		needwakeup = alq_doio(alq);
-	} else
-		ALD_UNLOCK();
+
 	ALQ_UNLOCK(alq);
 
 	if (needwakeup)


More information about the svn-src-user mailing list