svn commit: r188891 - user/lstewart/alq_varlen_8.x/sys/kern
Lawrence Stewart
lstewart at FreeBSD.org
Fri Feb 20 20:42:19 PST 2009
Author: lstewart
Date: Sat Feb 21 04:42:19 2009
New Revision: 188891
URL: http://svn.freebsd.org/changeset/base/188891
Log:
Hopefully fixes the bugs I've been seeing. Also addresses some memory leaks and
minor style nits.
Modified:
user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c
Modified: user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c
==============================================================================
--- user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c Sat Feb 21 03:53:06 2009 (r188890)
+++ user/lstewart/alq_varlen_8.x/sys/kern/kern_alq.c Sat Feb 21 04:42:19 2009 (r188891)
@@ -77,6 +77,8 @@ struct alq {
#define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx)
#define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
+#define ALQ_HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
+
static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
/*
@@ -186,10 +188,11 @@ ald_daemon(void)
for (;;) {
while ((alq = LIST_FIRST(&ald_active)) == NULL
- && !ald_shutingdown)
+ && !ald_shutingdown)
mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
- if (ald_shutingdown) {
+ /* Don't shutdown until all active alq's are flushed */
+ if (ald_shutingdown && alq != NULL) {
ALD_UNLOCK();
break;
}
@@ -213,8 +216,20 @@ ald_shutdown(void *arg, int howto)
struct alq *alq;
ALD_LOCK();
+
+ /* Ensure no new queues can be created */
ald_shutingdown = 1;
+ /* Shutdown all alqs prior to terminating the ald_daemon */
+ while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
+ LIST_REMOVE(alq, aq_link);
+ ALD_UNLOCK();
+ alq_shutdown(alq);
+ ALD_LOCK();
+ }
+
+ /* At this point, all alqs are flushed and shutdown */
+
/*
* Wake ald_daemon so that it exits. It won't be able to do
* anything until we mtx_sleep because we hold the ald_mtx
@@ -224,12 +239,6 @@ ald_shutdown(void *arg, int howto)
/* Wait for ald_daemon to exit */
mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
- while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
- LIST_REMOVE(alq, aq_link);
- ALD_UNLOCK();
- alq_shutdown(alq);
- ALD_LOCK();
- }
ALD_UNLOCK();
}
@@ -241,15 +250,29 @@ alq_shutdown(struct alq *alq)
/* Stop any new writers. */
alq->aq_flags |= AQ_SHUTDOWN;
+ /*
+ * If the alq isn't active but has unwritten data (possible if
+ * the ALQ_NOACTIVATE flag has been used), explicitly activate the
+ * alq here so that the pending data gets flushed by the ald_daemon.
+ */
+ if (!(alq->aq_flags & AQ_ACTIVE) &&
+ ALQ_HAS_PENDING_DATA(alq)) {
+ alq->aq_flags |= AQ_ACTIVE;
+ ALQ_UNLOCK(alq);
+ ALD_LOCK();
+ ald_activate(alq);
+ ALD_UNLOCK();
+ ALQ_LOCK(alq);
+ }
+
/* Drain IO */
- while (alq->aq_flags & (AQ_FLUSHING|AQ_ACTIVE)) {
+ while (alq->aq_flags & AQ_ACTIVE) {
alq->aq_flags |= AQ_WANTED;
msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
}
ALQ_UNLOCK(alq);
- vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
- curthread);
+ vn_close(alq->aq_vp, FWRITE, alq->aq_cred, curthread);
crfree(alq->aq_cred);
}
@@ -268,7 +291,7 @@ alq_doio(struct alq *alq)
int iov;
int vfslocked;
- KASSERT((alq->aq_freebytes != alq->aq_buflen),
+ KASSERT((ALQ_HAS_PENDING_DATA(alq)),
("%s: queue emtpy!", __func__)
);
@@ -280,16 +303,16 @@ alq_doio(struct alq *alq)
bzero(&aiov, sizeof(aiov));
bzero(&auio, sizeof(auio));
- /* start the write from the location of our buffer tail pointer */
+ /* Start the write from the location of our buffer tail pointer. */
aiov[iov].iov_base = alq->aq_entbuf + alq->aq_writetail;
if (alq->aq_writetail < alq->aq_writehead) {
- /* buffer not wrapped */
+ /* Buffer not wrapped */
totlen = aiov[iov].iov_len = alq->aq_writehead -
alq->aq_writetail;
} else {
/*
- * buffer wrapped, requires 2 aiov entries:
+ * Buffer wrapped, requires 2 aiov entries:
* - first is from writetail to end of buffer
* - second is from start of buffer to writehead
*/
@@ -336,16 +359,16 @@ alq_doio(struct alq *alq)
ALQ_LOCK(alq);
alq->aq_flags &= ~AQ_FLUSHING;
- /* Adjust writetail as required, taking into account wrapping */
+ /* Adjust writetail as required, taking into account wrapping. */
alq->aq_writetail += (iov == 2) ? aiov[1].iov_len : totlen;
alq->aq_freebytes += totlen;
/*
* If we just flushed the buffer completely,
- * reset indexes to 0 to minimise buffer wraps
- * This is also required to ensure alq_getn() can't wedge itself
+ * reset indexes to 0 to minimise buffer wraps.
+ * This is also required to ensure alq_getn() can't wedge itself.
*/
- if (alq->aq_freebytes == alq->aq_buflen)
+ if (!ALQ_HAS_PENDING_DATA(alq))
alq->aq_writehead = alq->aq_writetail = 0;
if (alq->doio_debugcallback != NULL)
@@ -400,7 +423,7 @@ alq_open(struct alq **alqp, const char *
vfslocked = NDHASGIANT(&nd);
NDFREE(&nd, NDF_ONLY_PNBUF);
- /* We just unlock so we hold a reference */
+ /* We just unlock so we hold a reference. */
VOP_UNLOCK(nd.ni_vp, 0);
VFS_UNLOCK_GIANT(vfslocked);
@@ -411,12 +434,12 @@ alq_open(struct alq **alqp, const char *
mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
if (count > 0) {
- /* fixed length messages */
+ /* Fixed length messages. */
alq->aq_buflen = size * count;
alq->aq_entmax = count;
alq->aq_entlen = size;
} else {
- /* variable length messages */
+ /* Variable length messages. */
alq->aq_buflen = size;
alq->aq_entmax = 0;
alq->aq_entlen = 0;
@@ -443,7 +466,7 @@ alq_open(struct alq **alqp, const char *
int
alq_write(struct alq *alq, void *data, int flags)
{
- /* should only be called in fixed length message (legacy) mode */
+ /* Should only be called in fixed length message (legacy) mode. */
KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0),
("%s: fixed length write on variable length queue", __func__)
);
@@ -465,17 +488,17 @@ alq_writen(struct alq *alq, void *data,
/*
* If the message is larger than our underlying buffer or
* there is not enough free space in our underlying buffer
- * to accept the message and the user can't wait, return
+ * to accept the message and the user can't wait, return.
*/
if ((len > alq->aq_buflen) ||
- ((flags & ALQ_NOWAIT) && (alq->aq_freebytes < len))) {
+ ((flags & ALQ_NOWAIT) && (alq->aq_freebytes < len))) {
ALQ_UNLOCK(alq);
return (EWOULDBLOCK);
}
/*
- * ALQ_WAITOK or alq->aq_freebytes > len,
- * either spin until we have enough free bytes (former) or skip (latter)
+ * ALQ_WAITOK or alq->aq_freebytes > len, either spin until
+ * we have enough free bytes (former) or skip (latter).
*/
while (alq->aq_freebytes < len && (alq->aq_flags & AQ_SHUTDOWN) == 0) {
alq->aq_flags |= AQ_WANTED;
@@ -483,35 +506,35 @@ alq_writen(struct alq *alq, void *data,
}
/*
- * we need to serialise wakups to ensure records remain in order...
- * therefore, wakeup the next thread in the queue waiting for
- * alq resources to be available
+ * We need to serialise wakups to ensure records remain in order...
+ * Therefore, wakeup the next thread in the queue waiting for
+ * alq resources to be available.
* (technically this is only required if we actually entered the above
* while loop)
*/
wakeup_one(alq);
- /* bail if we're shutting down */
+ /* Bail if we're shutting down. */
if (alq->aq_flags & AQ_SHUTDOWN) {
- ALQ_UNLOCK(alq);
+ ALQ_UNLOCK(alq);
return (EWOULDBLOCK);
}
/*
- * if we need to wrap the buffer to accommodate the write,
- * we'll need 2 calls to bcopy
+ * If we need to wrap the buffer to accommodate the write,
+ * we'll need 2 calls to bcopy.
*/
if ((alq->aq_buflen - alq->aq_writehead) < len)
copy = alq->aq_buflen - alq->aq_writehead;
- /* copy (part of) message to the buffer */
+ /* Copy (part of) message to the buffer. */
bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
alq->aq_writehead += copy;
if (copy != len) {
/*
- * wrap the buffer by copying the remainder of our message
- * to the start of the buffer and resetting aq_writehead
+ * Wrap the buffer by copying the remainder of our message
+ * to the start of the buffer and resetting aq_writehead.
*/
bcopy(data, alq->aq_entbuf, len - copy);
alq->aq_writehead = len - copy;
@@ -539,7 +562,7 @@ alq_writen(struct alq *alq, void *data,
struct ale *
alq_get(struct alq *alq, int flags)
{
- /* should only be called in fixed length message (legacy) mode */
+ /* Should only be called in fixed length message (legacy) mode. */
KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0),
("%s: fixed length get on variable length queue", __func__)
);
@@ -582,18 +605,19 @@ alq_getn(struct alq *alq, int len, int f
/*
* If the message is larger than our underlying buffer or
* there is not enough free contiguous space in our underlying buffer
- * to accept the message and the user can't wait, return
+ * to accept the message and the user can't wait, return.
*/
if ((len > alq->aq_buflen) ||
((flags & ALQ_NOWAIT) && (contigbytes < len))) {
ALQ_UNLOCK(alq);
+ free(ale, M_ALD);
return (NULL);
}
/*
* ALQ_WAITOK or contigbytes >= len,
* either spin until we have enough free contiguous bytes (former)
- * or skip (latter)
+ * or skip (latter).
*/
while (contigbytes < len && (alq->aq_flags & AQ_SHUTDOWN) == 0) {
alq->aq_flags |= AQ_WANTED;
@@ -616,6 +640,7 @@ alq_getn(struct alq *alq, int len, int f
/* Bail if we're shutting down */
if (alq->aq_flags & AQ_SHUTDOWN) {
ALQ_UNLOCK(alq);
+ free(ale, M_ALD);
return (NULL);
}
@@ -628,7 +653,7 @@ alq_getn(struct alq *alq, int len, int f
alq->aq_writehead += len;
alq->aq_freebytes -= len;
- /* Wrap aq_writehead if we've filled to the end of the buffer */
+ /* Wrap aq_writehead if we've filled to the end of the buffer. */
if (alq->aq_writehead == alq->aq_buflen)
alq->aq_writehead = 0;
@@ -665,12 +690,15 @@ alq_flush(struct alq *alq)
ALD_LOCK();
ALQ_LOCK(alq);
- if (alq->aq_flags & AQ_ACTIVE) {
+
+ if (alq->aq_flags & AQ_ACTIVE)
ald_deactivate(alq);
- ALD_UNLOCK();
+
+ ALD_UNLOCK();
+
+ if (ALQ_HAS_PENDING_DATA(alq))
needwakeup = alq_doio(alq);
- } else
- ALD_UNLOCK();
+
ALQ_UNLOCK(alq);
if (needwakeup)
More information about the svn-src-user
mailing list