svn commit: r205237 - in user/lstewart/alq_varlen_head/sys: kern sys
Lawrence Stewart
lstewart at FreeBSD.org
Wed Mar 17 03:02:49 UTC 2010
Author: lstewart
Date: Wed Mar 17 03:02:48 2010
New Revision: 205237
URL: http://svn.freebsd.org/changeset/base/205237
Log:
- Rework the way thread ordering is enforced so that it actually behaves as
expected (issue discovered during detailed testing). Ordering is now an
off-by-default option that can be enabled at ALQ creation time using the
ALQ_ORDERED flag.
- Add an alq_open_flags() KPI call to allow the new ALQ_ORDERED flag to be
specified. alq_open() is now implemented as a wrapper around alq_open_flags.
- Rename alq_postn() to alq_post_flags() to keep the naming consistent.
- Keep a record of some useful debugging printf's (will be removed in a later
diff).
- Remove some no longer relevant assertions.
- Introduce the AQ_VARLEN flag, used internally to indicate the ALQ is variable
length message capable.
- Protect alq_getn/alq_post from 0 length writes so that the "use less than you
asked for" feature works in contexts where no data may be generated.
Sponsored by: FreeBSD Foundation
Modified:
user/lstewart/alq_varlen_head/sys/kern/kern_alq.c
user/lstewart/alq_varlen_head/sys/sys/alq.h
Modified: user/lstewart/alq_varlen_head/sys/kern/kern_alq.c
==============================================================================
--- user/lstewart/alq_varlen_head/sys/kern/kern_alq.c Wed Mar 17 02:48:14 2010 (r205236)
+++ user/lstewart/alq_varlen_head/sys/kern/kern_alq.c Wed Mar 17 03:02:48 2010 (r205237)
@@ -54,15 +54,16 @@ __FBSDID("$FreeBSD$");
/* Async. Logging Queue */
struct alq {
+ char *aq_entbuf; /* Buffer for stored entries */
int aq_entmax; /* Max entries */
int aq_entlen; /* Entry length */
int aq_freebytes; /* Bytes available in buffer */
int aq_buflen; /* Total length of our buffer */
- char *aq_entbuf; /* Buffer for stored entries */
int aq_writehead; /* Location for next write */
int aq_writetail; /* Flush starts at this location */
int aq_wrapearly; /* # bytes left blank at end of buf */
int aq_flags; /* Queue flags */
+ int aq_waiters; /* Num threads waiting for resources */
struct ale aq_getpost; /* ALE for use by get/post */
struct mtx aq_mtx; /* Queue lock */
struct vnode *aq_vp; /* Open vnode handle */
@@ -75,6 +76,8 @@ struct alq {
#define AQ_ACTIVE 0x0002 /* on the active list */
#define AQ_FLUSHING 0x0004 /* doing IO */
#define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */
+#define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */
+#define AQ_VARLEN 0x0020 /* Queue is variable length capable */
#define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx)
#define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
@@ -200,7 +203,7 @@ ald_daemon(void)
needwakeup = alq_doio(alq);
ALQ_UNLOCK(alq);
if (needwakeup)
- wakeup_one(alq);
+ wakeup(alq);
ALD_LOCK();
}
@@ -334,6 +337,9 @@ alq_doio(struct alq *alq)
totlen = aiov[0].iov_len + aiov[1].iov_len;
}
+ /*printf("Flushing %d bytes to disk, aq_freebytes==%d\n", totlen,
+ alq->aq_freebytes);*/
+
alq->aq_flags |= AQ_FLUSHING;
ALQ_UNLOCK(alq);
@@ -366,6 +372,9 @@ alq_doio(struct alq *alq)
alq->aq_buflen;
alq->aq_freebytes += totlen + wrapearly;
+ /*printf("Flushed %d bytes to disk, aq_freebytes==%d, AQ_WANTED==%d\n",
+ totlen, alq->aq_freebytes, alq->aq_flags & AQ_WANTED);*/
+
/*
* If we just flushed part of the buffer which wrapped, reset the
* wrapearly indicator.
@@ -374,8 +383,8 @@ alq_doio(struct alq *alq)
alq->aq_wrapearly = 0;
/*
- * If we just flushed the buffer completely,
- * reset indexes to 0 to minimise buffer wraps.
+ * If we just flushed the buffer completely, reset indexes to 0 to
+ * minimise buffer wraps.
* This is also required to ensure alq_getn() can't wedge itself.
*/
if (!HAS_PENDING_DATA(alq))
@@ -407,14 +416,15 @@ SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY,
/*
* Create the queue data structure, allocate the buffer, and open the file.
*/
+
int
-alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
- int size, int count)
+alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
+ int size, int count, int flags)
{
struct thread *td;
struct nameidata nd;
struct alq *alq;
- int flags;
+ int oflags;
int error;
int vfslocked;
@@ -425,9 +435,9 @@ alq_open(struct alq **alqp, const char *
td = curthread;
NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
- flags = FWRITE | O_NOFOLLOW | O_CREAT;
+ oflags = FWRITE | O_NOFOLLOW | O_CREAT;
- error = vn_open_cred(&nd, &flags, cmode, 0, cred, NULL);
+ error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
if (error)
return (error);
@@ -453,11 +463,14 @@ alq_open(struct alq **alqp, const char *
alq->aq_buflen = size;
alq->aq_entmax = 0;
alq->aq_entlen = 0;
+ alq->aq_flags |= AQ_VARLEN;
}
alq->aq_freebytes = alq->aq_buflen;
alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
alq->aq_writehead = alq->aq_writetail = 0;
+ if (flags & ALQ_ORDERED)
+ alq->aq_flags |= AQ_ORDERED;
if ((error = ald_add(alq)) != 0) {
alq_destroy(alq);
@@ -503,27 +516,47 @@ alq_writen(struct alq *alq, void *data,
}
/*
+ * If we want ordered writes and there are threads already waiting for
+ * resources to become available, spin until we're woken.
+ */
+ if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
+ /*printf("tid %d order sleep, wants %d bytes (%d avail)\n",
+ curthread->td_tid, len, alq->aq_freebytes);*/
+ alq->aq_waiters++;
+ msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwriten", 0);
+ alq->aq_waiters--;
+ /*printf("tid %d order woken, wants %d bytes (%d avail)\n",
+ curthread->td_tid, len, alq->aq_freebytes);*/
+ }
+
+ /*
* ALQ_WAITOK or alq->aq_freebytes > len, either spin until
- * we have enough free bytes (former) or skip (latter). However in the
- * latter case, we can't skip if other threads are already
- * waiting (AQ_WANTED is set), otherwise records can get out of order.
+ * we have enough free bytes (former) or skip (latter).
*/
- while ((alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN))
- || alq->aq_flags & AQ_WANTED) {
+ while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
+ /*printf("tid %d sleep, wants %d bytes (%d avail)\n",
+ curthread->td_tid, len, alq->aq_freebytes);*/
alq->aq_flags |= AQ_WANTED;
+ alq->aq_waiters++;
msleep_spin(alq, &alq->aq_mtx, "alqwriten", 0);
- KASSERT(!(alq->aq_flags & AQ_WANTED),
- ("AQ_WANTED should have been unset!"));
+ alq->aq_waiters--;
+ /*printf("tid %d woken, wants %d bytes (%d avail)\n",
+ curthread->td_tid, len, alq->aq_freebytes);*/
}
+ /*printf("tid %d got %d bytes (%d avail, %d waiters)\n",
+ curthread->td_tid, len, alq->aq_freebytes, alq->aq_waiters);*/
+
/*
- * We need to serialise wakeups to ensure records remain in order.
- * Therefore, wakeup the next thread in the queue waiting for
- * ALQ resources to be available.
- * (Technically this is only required if we actually entered the above
- * while loop.)
+ * If there are waiters, wakeup the next thread in the queue waiting for
+ * ALQ resources.
*/
- wakeup_one(alq);
+ if (alq->aq_waiters > 0) {
+ if (alq->aq_flags & AQ_ORDERED)
+ wakeup_one(&alq->aq_waiters);
+ else
+ wakeup(alq);
+ }
/* Bail if we're shutting down. */
if (alq->aq_flags & AQ_SHUTDOWN) {
@@ -569,6 +602,8 @@ alq_writen(struct alq *alq, void *data,
activate = 1;
}
+ KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue emtpy!", __func__));
+
ALQ_UNLOCK(alq);
if (activate) {
@@ -584,7 +619,7 @@ int
alq_write(struct alq *alq, void *data, int flags)
{
/* Should only be called in fixed length message (legacy) mode. */
- KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0),
+ KASSERT((!(alq->aq_flags & AQ_VARLEN)),
("%s: fixed length write on variable length queue", __func__));
return (alq_writen(alq, data, alq->aq_entlen, flags));
}
@@ -651,19 +686,31 @@ alq_getn(struct alq *alq, int len, int f
}
/*
+ * If we want ordered writes and there are threads already waiting for
+ * resources to become available, spin until we're woken.
+ */
+ if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
+ /*printf("tid %d order sleep, wants %d bytes (%d avail)\n",
+ curthread->td_tid, len, alq->aq_freebytes);*/
+ alq->aq_waiters++;
+ msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgetn", 0);
+ alq->aq_waiters--;
+ /*printf("tid %d order woken, wants %d bytes (%d avail)\n",
+ curthread->td_tid, len, alq->aq_freebytes);*/
+ }
+
+ /*
* ALQ_WAITOK or contigbytes >= len,
* either spin until we have enough free contiguous bytes (former)
* or skip (latter). However, in the latter case, we can't skip if
* other threads are already waiting (AQ_WANTED is set), otherwise
* records can get out of order.
*/
- while ((contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN))
- || alq->aq_flags & AQ_WANTED) {
+ while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
alq->aq_flags |= AQ_WANTED;
+ alq->aq_waiters++;
msleep_spin(alq, &alq->aq_mtx, "alqgetn", 0);
-
- KASSERT(!(alq->aq_flags & AQ_WANTED),
- ("AQ_WANTED should have been unset!"));
+ alq->aq_waiters--;
if (alq->aq_writehead <= alq->aq_writetail)
contigbytes = alq->aq_freebytes;
@@ -672,13 +719,15 @@ alq_getn(struct alq *alq, int len, int f
}
/*
- * We need to serialise wakeups to ensure records remain in order.
- * Therefore, wakeup the next thread in the queue waiting for
- * ALQ resources to be available.
- * (Technically this is only required if we actually entered the above
- * while loop.)
+ * If there are waiters, wakeup the next thread in the queue waiting for
+ * ALQ resources.
*/
- wakeup_one(alq);
+ if (alq->aq_waiters > 0) {
+ if (alq->aq_flags & AQ_ORDERED)
+ wakeup_one(&alq->aq_waiters);
+ else
+ wakeup(alq);
+ }
/* Bail if we're shutting down. */
if (alq->aq_flags & AQ_SHUTDOWN) {
@@ -700,32 +749,39 @@ struct ale *
alq_get(struct alq *alq, int flags)
{
/* Should only be called in fixed length message (legacy) mode. */
- KASSERT((alq->aq_entmax > 0 && alq->aq_entlen > 0),
+ KASSERT((!(alq->aq_flags & AQ_VARLEN)),
("%s: fixed length get on variable length queue", __func__));
return (alq_getn(alq, alq->aq_entlen, flags));
}
void
-alq_postn(struct alq *alq, struct ale *ale, int flags)
+alq_post_flags(struct alq *alq, struct ale *ale, int flags)
{
int activate;
activate = 0;
- if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
- alq->aq_flags |= AQ_ACTIVE;
- activate = 1;
- }
+ if (ale->ae_bytesused > 0) {
+ if (!(alq->aq_flags & AQ_ACTIVE) &&
+ !(flags & ALQ_NOACTIVATE)) {
+ alq->aq_flags |= AQ_ACTIVE;
+ activate = 1;
+ }
- alq->aq_writehead += ale->ae_bytesused;
- alq->aq_freebytes -= ale->ae_bytesused;
+ alq->aq_writehead += ale->ae_bytesused;
+ alq->aq_freebytes -= ale->ae_bytesused;
- /* Wrap aq_writehead if we've filled to the end of the buffer. */
- if (alq->aq_writehead == alq->aq_buflen)
- alq->aq_writehead = 0;
+ /* Wrap aq_writehead if we filled to the end of the buffer. */
+ if (alq->aq_writehead == alq->aq_buflen)
+ alq->aq_writehead = 0;
+
+ KASSERT((alq->aq_writehead >= 0 &&
+ alq->aq_writehead < alq->aq_buflen),
+ ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
+ __func__));
- KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
- ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
+ KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue emtpy!", __func__));
+ }
ALQ_UNLOCK(alq);
@@ -761,7 +817,7 @@ alq_flush(struct alq *alq)
ALQ_UNLOCK(alq);
if (needwakeup)
- wakeup_one(alq);
+ wakeup(alq);
}
/*
Modified: user/lstewart/alq_varlen_head/sys/sys/alq.h
==============================================================================
--- user/lstewart/alq_varlen_head/sys/sys/alq.h Wed Mar 17 02:48:14 2010 (r205236)
+++ user/lstewart/alq_varlen_head/sys/sys/alq.h Wed Mar 17 03:02:48 2010 (r205237)
@@ -56,6 +56,7 @@ struct ale {
#define ALQ_NOWAIT 0x0001
#define ALQ_WAITOK 0x0002
#define ALQ_NOACTIVATE 0x0004
+#define ALQ_ORDERED 0x0010
/* Suggested mode for file creation. */
#define ALQ_DEFAULT_CMODE 0600
@@ -77,8 +78,15 @@ struct ale {
* error from open or 0 on success
*/
struct ucred;
-int alq_open(struct alq **, const char *file, struct ucred *cred, int cmode,
- int size, int count);
+int alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
+ int size, int count, int flags);
+
+static __inline int
+alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
+ int size, int count)
+{
+ return alq_open_flags(alqp, file, cred, cmode, size, count, 0);
+}
/*
* alq_writen: Write data into the queue
@@ -133,12 +141,12 @@ struct ale *alq_get(struct alq *alq, int
* ale An asynch logging entry returned by alq_get.
* flags ALQ_NOACTIVATE
*/
-void alq_postn(struct alq *alq, struct ale *ale, int flags);
+void alq_post_flags(struct alq *alq, struct ale *ale, int flags);
static __inline void
alq_post(struct alq *alq, struct ale *ale)
{
- alq_postn(alq, ale, 0);
+ alq_post_flags(alq, ale, 0);
}
#endif /* _SYS_ALQ_H_ */
More information about the svn-src-user
mailing list