svn commit: r214767 - in user/davidxu/libthr/sys: kern sys
David Xu
davidxu at FreeBSD.org
Thu Nov 4 02:03:27 UTC 2010
Author: davidxu
Date: Thu Nov 4 02:03:26 2010
New Revision: 214767
URL: http://svn.freebsd.org/changeset/base/214767
Log:
To avoid thundering hurd problem on pthread_cond_broadcast, implement wait
queue migration.
Modified:
user/davidxu/libthr/sys/kern/kern_umtx.c
user/davidxu/libthr/sys/sys/umtx.h
Modified: user/davidxu/libthr/sys/kern/kern_umtx.c
==============================================================================
--- user/davidxu/libthr/sys/kern/kern_umtx.c Wed Nov 3 23:29:52 2010 (r214766)
+++ user/davidxu/libthr/sys/kern/kern_umtx.c Thu Nov 4 02:03:26 2010 (r214767)
@@ -91,6 +91,7 @@ struct umtx_key {
uintptr_t b;
} both;
} info;
+ struct umtxq_chain * volatile chain;
};
/* Priority inheritance mutex info. */
@@ -150,6 +151,8 @@ struct umtx_q {
/* The queue we on */
struct umtxq_queue *uq_cur_queue;
+
+ int uq_repair_mutex;
};
TAILQ_HEAD(umtxq_head, umtx_q);
@@ -160,6 +163,10 @@ struct umtxq_queue {
struct umtx_key key;
LIST_ENTRY(umtxq_queue) link;
int length;
+
+ int binding;
+ struct umutex *bind_mutex;
+ struct umtx_key bind_mkey;
};
LIST_HEAD(umtxq_list, umtxq_queue);
@@ -177,7 +184,7 @@ struct umtxq_chain {
LIST_HEAD(, umtxq_queue) uc_spare_queue;
/* Busy flag */
- char uc_busy;
+ volatile char uc_busy;
/* Chain lock waiters */
int uc_waiters;
@@ -220,10 +227,13 @@ static uma_zone_t umtx_pi_zone;
static struct umtxq_chain umtxq_chains[2][UMTX_CHAINS];
static MALLOC_DEFINE(M_UMTX, "umtx", "UMTX queue memory");
static int umtx_pi_allocated;
+static int umtx_cv_migrated;
SYSCTL_NODE(_debug, OID_AUTO, umtx, CTLFLAG_RW, 0, "umtx debug");
SYSCTL_INT(_debug_umtx, OID_AUTO, umtx_pi_allocated, CTLFLAG_RD,
&umtx_pi_allocated, 0, "Allocated umtx_pi");
+SYSCTL_INT(_debug_umtx, OID_AUTO, umtx_cv_migrated, CTLFLAG_RD,
+ &umtx_cv_migrated, 0, "Thread migrated");
static void umtxq_sysinit(void *);
static void umtxq_hash(struct umtx_key *key);
@@ -232,7 +242,9 @@ static void umtxq_lock(struct umtx_key *
static void umtxq_unlock(struct umtx_key *key);
static void umtxq_busy(struct umtx_key *key);
static void umtxq_unbusy(struct umtx_key *key);
-static void umtxq_insert_queue(struct umtx_q *uq, int q);
+static void umtxq_insert_queue(struct umtx_q *, int);
+static int umtxq_insert_queue2(struct umtx_q *, int, struct umutex *,
+ const struct umtx_key *);
static void umtxq_remove_queue(struct umtx_q *uq, int q);
static int umtxq_sleep(struct umtx_q *uq, const char *wmesg, int timo);
static int umtxq_count(struct umtx_key *key);
@@ -315,14 +327,30 @@ umtx_key_match(const struct umtx_key *k1
k1->info.both.b == k2->info.both.b);
}
+static inline void
+umtx_key_copy(struct umtx_key *k1, const struct umtx_key *k2)
+{
+ k1->hash = k2->hash;
+ k1->type = k2->type;
+ k1->shared = k2->shared;
+ k1->info.both = k2->info.both;
+ k1->chain = k2->chain;
+}
+
static inline struct umtxq_chain *
-umtxq_getchain(struct umtx_key *key)
+umtxq_calcchain(struct umtx_key *key)
{
if (key->type <= TYPE_SEM)
return (&umtxq_chains[1][key->hash]);
return (&umtxq_chains[0][key->hash]);
}
+static inline struct umtxq_chain *
+umtxq_getchain(struct umtx_key *key)
+{
+ return (key->chain);
+}
+
/*
* Lock a chain.
*/
@@ -331,8 +359,14 @@ umtxq_lock(struct umtx_key *key)
{
struct umtxq_chain *uc;
- uc = umtxq_getchain(key);
- mtx_lock(&uc->uc_lock);
+ for (;;) {
+ uc = key->chain;
+ mtx_lock(&uc->uc_lock);
+ if (key->chain != uc)
+ mtx_unlock(&uc->uc_lock);
+ else
+ break;
+ }
}
/*
@@ -341,10 +375,7 @@ umtxq_lock(struct umtx_key *key)
static inline void
umtxq_unlock(struct umtx_key *key)
{
- struct umtxq_chain *uc;
-
- uc = umtxq_getchain(key);
- mtx_unlock(&uc->uc_lock);
+ mtx_unlock(&key->chain->uc_lock);
}
/*
@@ -364,8 +395,10 @@ umtxq_busy(struct umtx_key *key)
int count = BUSY_SPINS;
if (count > 0) {
umtxq_unlock(key);
- while (uc->uc_busy && --count > 0)
+ while (uc->uc_busy && --count > 0) {
cpu_spinwait();
+ uc = key->chain;
+ }
umtxq_lock(key);
}
}
@@ -374,6 +407,9 @@ umtxq_busy(struct umtx_key *key)
uc->uc_waiters++;
msleep(uc, &uc->uc_lock, 0, "umtxqb", 0);
uc->uc_waiters--;
+ mtx_unlock(&uc->uc_lock);
+ umtxq_lock(key);
+ uc = umtxq_getchain(key);
}
}
uc->uc_busy = 1;
@@ -414,19 +450,46 @@ umtxq_queue_lookup(struct umtx_key *key,
static inline void
umtxq_insert_queue(struct umtx_q *uq, int q)
{
+ int error;
+
+ error = umtxq_insert_queue2(uq, q, NULL, NULL);
+ MPASS(error == 0);
+}
+
+static inline int
+umtxq_insert_queue2(struct umtx_q *uq, int q, struct umutex *m,
+ const struct umtx_key *mkey)
+{
struct umtxq_queue *uh;
struct umtxq_chain *uc;
uc = umtxq_getchain(&uq->uq_key);
UMTXQ_LOCKED_ASSERT(uc);
- KASSERT((uq->uq_flags & UQF_UMTXQ) == 0, ("umtx_q is already on queue"));
+ KASSERT((uq->uq_flags & UQF_UMTXQ) == 0,
+ ("umtx_q is already on queue"));
uh = umtxq_queue_lookup(&uq->uq_key, q);
if (uh != NULL) {
+ if (uh->binding) {
+ if (mkey == NULL ||
+ !umtx_key_match(&uh->bind_mkey, mkey))
+ return (EEXIST);
+ } else {
+ if (mkey != NULL)
+ return (EEXIST);
+ }
LIST_INSERT_HEAD(&uc->uc_spare_queue, uq->uq_spare_queue, link);
} else {
uh = uq->uq_spare_queue;
uh->key = uq->uq_key;
LIST_INSERT_HEAD(&uc->uc_queue[q], uh, link);
+ uh->bind_mutex = m;
+ uh->length = 0;
+ if (mkey != NULL) {
+ uh->binding = 1;
+ uh->bind_mkey = *mkey;
+ } else {
+ uh->binding = 0;
+ }
}
uq->uq_spare_queue = NULL;
@@ -434,7 +497,7 @@ umtxq_insert_queue(struct umtx_q *uq, in
uh->length++;
uq->uq_flags |= UQF_UMTXQ;
uq->uq_cur_queue = uh;
- return;
+ return (0);
}
static inline void
@@ -458,6 +521,8 @@ umtxq_remove_queue(struct umtx_q *uq, in
uh = LIST_FIRST(&uc->uc_spare_queue);
KASSERT(uh != NULL, ("uc_spare_queue is empty"));
LIST_REMOVE(uh, link);
+ uh->bind_mutex = NULL;
+ uh->binding = 0;
}
uq->uq_spare_queue = uh;
uq->uq_cur_queue = NULL;
@@ -558,9 +623,10 @@ umtxq_sleep(struct umtx_q *uq, const cha
UMTXQ_LOCKED_ASSERT(uc);
if (!(uq->uq_flags & UQF_UMTXQ))
return (0);
- error = msleep(uq, &uc->uc_lock, PCATCH, wmesg, timo);
+ error = msleep(uq, &uc->uc_lock, PCATCH|PDROP, wmesg, timo);
if (error == EWOULDBLOCK)
error = ETIMEDOUT;
+ umtxq_lock(&uq->uq_key);
return (error);
}
@@ -578,6 +644,7 @@ umtx_key_get(void *addr, int type, int s
boolean_t wired;
key->type = type;
+ key->chain = NULL;
if (share == THREAD_SHARE) {
key->shared = 0;
key->info.private.vs = td->td_proc->p_vmspace;
@@ -607,6 +674,7 @@ umtx_key_get(void *addr, int type, int s
}
umtxq_hash(key);
+ key->chain = umtxq_calcchain(key);
return (0);
}
@@ -1209,7 +1277,11 @@ _do_lock_normal(struct thread *td, struc
umtxq_unbusy(&uq->uq_key);
if (old == owner)
error = umtxq_sleep(uq, "umtxn", timo);
- umtxq_remove(uq);
+ if ((uq->uq_flags & UQF_UMTXQ) != 0) {
+ umtxq_busy(&uq->uq_key);
+ umtxq_remove(uq);
+ umtxq_unbusy(&uq->uq_key);
+ }
umtxq_unlock(&uq->uq_key);
umtx_key_release(&uq->uq_key);
}
@@ -1618,12 +1690,14 @@ umtxq_sleep_pi(struct umtx_q *uq, struct
mtx_unlock_spin(&umtx_lock);
umtxq_unbusy(&uq->uq_key);
- if (uq->uq_flags & UQF_UMTXQ) {
+ if ((uq->uq_flags & UQF_UMTXQ) != 0) {
error = msleep(uq, &uc->uc_lock, PCATCH, wmesg, timo);
if (error == EWOULDBLOCK)
error = ETIMEDOUT;
- if (uq->uq_flags & UQF_UMTXQ) {
+ if ((uq->uq_flags & UQF_UMTXQ) != 0) {
+ umtxq_busy(&uq->uq_key);
umtxq_remove(uq);
+ umtxq_unbusy(&uq->uq_key);
}
}
mtx_lock_spin(&umtx_lock);
@@ -2342,30 +2416,104 @@ do_unlock_umutex(struct thread *td, stru
}
static int
+set_contested_bit(struct umtx_key *mkey, struct umutex *m,
+ struct umtxq_queue *uhm, int repair)
+{
+ int do_wake;
+ int qlen = uhm->length;
+ uint32_t owner;
+
+ do_wake = 0;
+ /*
+ * Set contested bit for mutex when necessary, so that userland
+ * mutex unlocker will wake up a waiter thread.
+ */
+ owner = fuword32(__DEVOLATILE(uint32_t *, &m->m_owner));
+ for (;;) {
+ if (owner == UMUTEX_UNOWNED) {
+ if (!repair && qlen == 1) {
+ do_wake = 1;
+ break;
+ }
+ if ((owner = casuword32(&m->m_owner, UMUTEX_UNOWNED,
+ UMUTEX_CONTESTED)) == UMUTEX_UNOWNED) {
+ do_wake = 1;
+ break;
+ }
+ }
+ if (owner == UMUTEX_CONTESTED) {
+ do_wake = 1;
+ break;
+ }
+ if ((owner & UMUTEX_CONTESTED) == 0) {
+ uint32_t old;
+ old = casuword32(&m->m_owner, owner,
+ owner|UMUTEX_CONTESTED);
+ if (old == owner)
+ break;
+ owner = old;
+ } else {
+ break;
+ }
+ }
+ return (do_wake);
+}
+
+static int
do_cv_wait(struct thread *td, struct ucond *cv, struct umutex *m,
struct timespec *timeout, u_long wflags)
{
struct umtx_q *uq;
+ struct umtx_key mkey, *mkeyp;
+ struct umutex *bind_mutex;
struct timeval tv;
struct timespec cts, ets, tts;
- uint32_t flags;
+ struct umtxq_chain *old_chain;
+ uint32_t flags, mflags;
int error;
uq = td->td_umtxq;
flags = fuword32(&cv->c_flags);
+ mflags = fuword32(&m->m_flags);
error = umtx_key_get(cv, TYPE_CV, GET_SHARE(flags), &uq->uq_key);
if (error != 0)
return (error);
+ if ((wflags & CVWAIT_BIND_MUTEX) != 0) {
+ if ((mflags & UMUTEX_PRIO_INHERIT) != 0)
+ return (EINVAL);
+ error = umtx_key_get(m, TYPE_NORMAL_UMUTEX,
+ GET_SHARE(mflags), &mkey);
+ if (error != 0) {
+ umtx_key_release(&uq->uq_key);
+ return (error);
+ }
+ if (mkey.shared == 0)
+ bind_mutex = m;
+ else
+ bind_mutex = NULL;
+ mkeyp = &mkey;
+ } else {
+ bind_mutex = NULL;
+ mkeyp = NULL;
+ }
+
+ old_chain = uq->uq_key.chain;
umtxq_lock(&uq->uq_key);
umtxq_busy(&uq->uq_key);
- umtxq_insert(uq);
+ error = umtxq_insert_queue2(uq, UMTX_SHARED_QUEUE, bind_mutex, mkeyp);
+ if (error != 0) {
+ umtxq_unbusy(&uq->uq_key);
+ umtxq_unlock(&uq->uq_key);
+ return (error);
+ }
umtxq_unlock(&uq->uq_key);
/*
- * The magic thing is we should set c_has_waiters to 1 before
- * releasing user mutex.
+ * Set c_has_waiters to 1 before releasing user mutex, also
+ * don't modify cache line when unnecessary.
*/
- suword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters), 1);
+ if (fuword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters)) == 0)
+ suword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters), 1);
umtxq_lock(&uq->uq_key);
umtxq_unbusy(&uq->uq_key);
@@ -2375,11 +2523,7 @@ do_cv_wait(struct thread *td, struct uco
umtxq_lock(&uq->uq_key);
if (error == 0) {
- if ((wflags & UMTX_CHECK_UNPARKING) &&
- (td->td_pflags & TDP_WAKEUP)) {
- td->td_pflags &= ~TDP_WAKEUP;
- error = EINTR;
- } else if (timeout == NULL) {
+ if (timeout == NULL) {
error = umtxq_sleep(uq, "ucond", 0);
} else {
getnanouptime(&ets);
@@ -2400,17 +2544,63 @@ do_cv_wait(struct thread *td, struct uco
}
}
}
-
if ((uq->uq_flags & UQF_UMTXQ) == 0)
error = 0;
else {
- umtxq_remove(uq);
+ /*
+ * This must be timeout or interrupted by signal or
+ * surprious wakeup.
+ */
+ umtxq_busy(&uq->uq_key);
+ if ((uq->uq_flags & UQF_UMTXQ) != 0) {
+ int oldlen = uq->uq_cur_queue->length;
+ umtxq_remove(uq);
+ if (oldlen == 1 && old_chain == uq->uq_key.chain) {
+ umtxq_unlock(&uq->uq_key);
+ suword32(
+ __DEVOLATILE(uint32_t *,
+ &cv->c_has_waiters), 0);
+ umtxq_lock(&uq->uq_key);
+ }
+ }
+ umtxq_unbusy(&uq->uq_key);
if (error == ERESTART)
error = EINTR;
}
-
umtxq_unlock(&uq->uq_key);
+
+ /* We were moved to mutex queue. */
+ if (mkeyp != NULL &&
+ old_chain != uq->uq_key.chain) {
+ /*
+ * cv_broadcast can not access the mutex if we are pshared,
+ * but it still migrate threads to mutex queue,
+ * we should repair contested bit here.
+ */
+ if ((mflags & USYNC_PROCESS_SHARED) != 0 && uq->uq_repair_mutex) {
+ uint32_t owner = fuword32(
+ __DEVOLATILE(void *, &m->m_owner));
+ if ((owner & UMUTEX_CONTESTED) == 0) {
+ struct umtxq_queue *uhm;
+ umtxq_lock(mkeyp);
+ umtxq_busy(mkeyp);
+ uhm = umtxq_queue_lookup(mkeyp,
+ UMTX_SHARED_QUEUE);
+ if (uhm != NULL)
+ set_contested_bit(mkeyp, m, uhm, 1);
+ umtxq_unbusy(mkeyp);
+ umtxq_unlock(mkeyp);
+ }
+ }
+
+ error = 0;
+ }
umtx_key_release(&uq->uq_key);
+ if (mkeyp != NULL)
+ umtx_key_release(mkeyp);
+ uq->uq_spare_queue->bind_mutex = NULL;
+ uq->uq_spare_queue->binding = 0;
+ uq->uq_repair_mutex = 0;
return (error);
}
@@ -2427,6 +2617,7 @@ do_cv_signal(struct thread *td, struct u
flags = fuword32(&cv->c_flags);
if ((error = umtx_key_get(cv, TYPE_CV, GET_SHARE(flags), &key)) != 0)
return (error);
+
umtxq_lock(&key);
umtxq_busy(&key);
cnt = umtxq_count(&key);
@@ -2446,6 +2637,8 @@ do_cv_signal(struct thread *td, struct u
static int
do_cv_broadcast(struct thread *td, struct ucond *cv)
{
+ struct umtxq_queue *uh, *uhm, *uh_temp;
+ struct umtxq_chain *uc, *ucm;
struct umtx_key key;
int error;
uint32_t flags;
@@ -2456,17 +2649,142 @@ do_cv_broadcast(struct thread *td, struc
umtxq_lock(&key);
umtxq_busy(&key);
- umtxq_signal(&key, INT_MAX);
- umtxq_unlock(&key);
+ uh = umtxq_queue_lookup(&key, UMTX_SHARED_QUEUE);
+ if (uh != NULL && uh->binding) {
+ /*
+ * To avoid thundering herd problem, if there are waiters,
+ * try to move them to mutex queue.
+ */
+ struct umutex *bind_mutex = uh->bind_mutex;
+ struct umtx_key mkey;
+ struct umtx_q *uq;
+ int do_wake;
+ int len, oldlen;
+
+ len = uh->length;
+ mkey = uh->bind_mkey;
+ uc = umtxq_getchain(&key);
+ ucm = umtxq_getchain(&mkey);
+ LIST_REMOVE(uh, link);
- error = suword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters), 0);
+ /*
+ * Before busying mutex sleep-queue, we must unlock cv's
+ * sleep-queue mutex, because the mutex is unsleepable.
+ */
+ umtxq_unlock(&key);
- umtxq_lock(&key);
- umtxq_unbusy(&key);
- umtxq_unlock(&key);
+ umtxq_lock(&mkey);
+ umtxq_busy(&mkey);
+ umtxq_unlock(&mkey);
+ umtxq_lock(&key);
+ umtxq_lock(&mkey);
+ uhm = umtxq_queue_lookup(&mkey, UMTX_SHARED_QUEUE);
- umtx_key_release(&key);
- return (error);
+ /* Change waiter's key (include chain address). */
+ TAILQ_FOREACH(uq, &uh->head, uq_link) {
+ umtx_key_copy(&uq->uq_key, &mkey);
+ if (uhm != NULL)
+ uq->uq_cur_queue = uhm;
+ }
+ if (uhm == NULL) {
+ /*
+ * Mutex has no waiters, just move the queue head to
+ * new chain.
+ */
+ oldlen = 0;
+ uh->key = mkey;
+ uh->bind_mutex = NULL;
+ uh->binding = 0;
+ LIST_INSERT_HEAD(&ucm->uc_queue[UMTX_SHARED_QUEUE],
+ uh, link);
+ uhm = uh;
+ } else {
+ /*
+ * Otherwise, move cv waiters.
+ */
+ oldlen = uhm->length;
+ TAILQ_CONCAT(&uhm->head, &uh->head, uq_link);
+ uhm->length += uh->length;
+ uh->length = 0;
+ uh->bind_mutex = NULL;
+ uh->binding = 0;
+ LIST_INSERT_HEAD(&ucm->uc_spare_queue, uh, link);
+ }
+
+ /*
+ * At this point, cv's queue no longer needs to be accessed,
+ * NULL it.
+ */
+ uh = NULL;
+
+ /*
+ * One queue head has already been moved, we need to
+ * move (n - 1) free queue head to new chain.
+ */
+ while (--len > 0) {
+ uh_temp = LIST_FIRST(&uc->uc_spare_queue);
+ LIST_REMOVE(uh_temp, link);
+ LIST_INSERT_HEAD(&ucm->uc_spare_queue, uh_temp, link);
+ }
+
+ umtxq_unlock(&mkey);
+ umtxq_unlock(&key);
+
+ /* Now, the cv does not have any waiter. */
+ suword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters), 0);
+
+ umtxq_lock(&key);
+ umtxq_unbusy(&key);
+ umtxq_unlock(&key);
+ umtx_key_release(&key);
+
+ /*
+ * Wake one thread when necessary. if before the queue
+ * migration, there is thread on mutex queue, we don't
+ * need to wake up a thread, because the mutex contention
+ * bit should have already been set by other mutex locking
+ * code.
+ * For pshared mutex, because different process has different
+ * address even for same process-shared mutex!
+ * we don't know where the mutex is in our address space.
+ * In this situation, we let a thread resumed from cv_wait
+ * to repair the mutex contention bit.
+ * XXX Fixme! we should make the repairing thread runs as
+ * soon as possible, boost its priority.
+ */
+ if (oldlen == 0) {
+ if (!mkey.shared) {
+ do_wake = set_contested_bit(&mkey, bind_mutex,
+ uhm, 0);
+ } else {
+ do_wake = 1;
+ }
+ } else {
+ do_wake = 0;
+ }
+
+ umtxq_lock(&mkey);
+ if (do_wake) {
+ uq = TAILQ_FIRST(&uhm->head);
+ if (uq != NULL) {
+ if (mkey.shared)
+ uq->uq_repair_mutex = 1;
+ umtxq_signal_thread(uq);
+ }
+ }
+ umtxq_unbusy(&mkey);
+ umtxq_unlock(&mkey);
+ return (0);
+ } else {
+ umtxq_signal(&key, INT_MAX);
+ umtxq_unlock(&key);
+ suword32(__DEVOLATILE(uint32_t *, &cv->c_has_waiters), 0);
+ umtxq_lock(&key);
+ umtxq_unbusy(&key);
+ umtxq_unlock(&key);
+ umtx_key_release(&key);
+ }
+ return (0);
}
static int
@@ -2839,7 +3157,9 @@ do_sem_wait(struct thread *td, struct _u
umtxq_insert(uq);
umtxq_unlock(&uq->uq_key);
- suword32(__DEVOLATILE(uint32_t *, &sem->_has_waiters), 1);
+ /* Don't modify cacheline when unnecessary. */
+ if (fuword32(__DEVOLATILE(uint32_t *, &sem->_has_waiters)) == 0)
+ suword32(__DEVOLATILE(uint32_t *, &sem->_has_waiters), 1);
count = fuword32(__DEVOLATILE(uint32_t *, &sem->_count));
if (count != 0) {
Modified: user/davidxu/libthr/sys/sys/umtx.h
==============================================================================
--- user/davidxu/libthr/sys/sys/umtx.h Wed Nov 3 23:29:52 2010 (r214766)
+++ user/davidxu/libthr/sys/sys/umtx.h Thu Nov 4 02:03:26 2010 (r214767)
@@ -82,7 +82,9 @@
#define UMTX_OP_MAX 21
/* flags for UMTX_OP_CV_WAIT */
-#define UMTX_CHECK_UNPARKING 0x01
+#define CVWAIT_CHECK_UNPARKING 0x01
+#define CVWAIT_BIND_MUTEX 0x02
+#define UMTX_CHECK_UNPARKING _CVWAIT_CHECK_UNPARKING
#ifndef _KERNEL
More information about the svn-src-user
mailing list