svn commit: r260097 - in head/sys: nfs rpc
Adrian Chadd
adrian at freebsd.org
Mon Dec 30 22:23:28 UTC 2013
Hm, this kind of per-bucket hash table may end up wanting to be used
in a variety of places. I wonder if we should create a locked hash
table type to reuse in various laces.
Nice work though. I wonder about doing this elsewhere (eg inpcb hash,
kqueue hash, etc.)
-a
On 30 December 2013 12:23, Alexander Motin <mav at freebsd.org> wrote:
> Author: mav
> Date: Mon Dec 30 20:23:15 2013
> New Revision: 260097
> URL: http://svnweb.freebsd.org/changeset/base/260097
>
> Log:
> Move most of NFS file handle affinity code out of the heavily congested
> global RPC thread pool lock and protect it with own set of locks.
>
> On synthetic benchmarks this improves peak NFS request rate by 40%.
>
> Modified:
> head/sys/nfs/nfs_fha.c
> head/sys/nfs/nfs_fha.h
> head/sys/rpc/svc.c
> head/sys/rpc/svc.h
>
> Modified: head/sys/nfs/nfs_fha.c
> ==============================================================================
> --- head/sys/nfs/nfs_fha.c Mon Dec 30 20:15:46 2013 (r260096)
> +++ head/sys/nfs/nfs_fha.c Mon Dec 30 20:23:15 2013 (r260097)
> @@ -52,13 +52,10 @@ void
> fha_init(struct fha_params *softc)
> {
> char tmpstr[128];
> + int i;
>
> - /*
> - * A small hash table to map filehandles to fha_hash_entry
> - * structures.
> - */
> - softc->g_fha.hashtable = hashinit(256, M_NFS_FHA,
> - &softc->g_fha.hashmask);
> + for (i = 0; i < FHA_HASH_SIZE; i++)
> + mtx_init(&softc->fha_hash[i].mtx, "fhalock", NULL, MTX_DEF);
>
> /*
> * Set the default tuning parameters.
> @@ -117,8 +114,11 @@ fha_init(struct fha_params *softc)
> void
> fha_uninit(struct fha_params *softc)
> {
> + int i;
> +
> sysctl_ctx_free(&softc->sysctl_ctx);
> - hashdestroy(softc->g_fha.hashtable, M_NFS_FHA, softc->g_fha.hashmask);
> + for (i = 0; i < FHA_HASH_SIZE; i++)
> + mtx_destroy(&softc->fha_hash[i].mtx);
> }
>
> /*
> @@ -207,8 +207,13 @@ static void
> fha_hash_entry_destroy(struct fha_hash_entry *e)
> {
>
> - if (e->num_rw + e->num_exclusive)
> - panic("nonempty fhe");
> + mtx_assert(e->mtx, MA_OWNED);
> + KASSERT(e->num_rw == 0,
> + ("%d reqs on destroyed fhe %p", e->num_rw, e));
> + KASSERT(e->num_exclusive == 0,
> + ("%d exclusive reqs on destroyed fhe %p", e->num_exclusive, e));
> + KASSERT(e->num_threads == 0,
> + ("%d threads on destroyed fhe %p", e->num_threads, e));
> free(e, M_NFS_FHA);
> }
>
> @@ -216,6 +221,7 @@ static void
> fha_hash_entry_remove(struct fha_hash_entry *e)
> {
>
> + mtx_assert(e->mtx, MA_OWNED);
> LIST_REMOVE(e, link);
> fha_hash_entry_destroy(e);
> }
> @@ -224,36 +230,22 @@ static struct fha_hash_entry *
> fha_hash_entry_lookup(struct fha_params *softc, u_int64_t fh)
> {
> SVCPOOL *pool;
> -
> - pool = *softc->pool;
> -
> + struct fha_hash_slot *fhs;
> struct fha_hash_entry *fhe, *new_fhe;
>
> - LIST_FOREACH(fhe, &softc->g_fha.hashtable[fh % softc->g_fha.hashmask],
> - link)
> + pool = *softc->pool;
> + fhs = &softc->fha_hash[fh % FHA_HASH_SIZE];
> + new_fhe = fha_hash_entry_new(fh);
> + new_fhe->mtx = &fhs->mtx;
> + mtx_lock(&fhs->mtx);
> + LIST_FOREACH(fhe, &fhs->list, link)
> if (fhe->fh == fh)
> break;
> -
> if (!fhe) {
> - /* Allocate a new entry. */
> - mtx_unlock(&pool->sp_lock);
> - new_fhe = fha_hash_entry_new(fh);
> - mtx_lock(&pool->sp_lock);
> -
> - /* Double-check to make sure we still need the new entry. */
> - LIST_FOREACH(fhe,
> - &softc->g_fha.hashtable[fh % softc->g_fha.hashmask], link)
> - if (fhe->fh == fh)
> - break;
> - if (!fhe) {
> - fhe = new_fhe;
> - LIST_INSERT_HEAD(
> - &softc->g_fha.hashtable[fh % softc->g_fha.hashmask],
> - fhe, link);
> - } else
> - fha_hash_entry_destroy(new_fhe);
> - }
> -
> + fhe = new_fhe;
> + LIST_INSERT_HEAD(&fhs->list, fhe, link);
> + } else
> + fha_hash_entry_destroy(new_fhe);
> return (fhe);
> }
>
> @@ -261,6 +253,8 @@ static void
> fha_hash_entry_add_thread(struct fha_hash_entry *fhe, SVCTHREAD *thread)
> {
>
> + mtx_assert(fhe->mtx, MA_OWNED);
> + thread->st_p2 = 0;
> LIST_INSERT_HEAD(&fhe->threads, thread, st_alink);
> fhe->num_threads++;
> }
> @@ -269,6 +263,9 @@ static void
> fha_hash_entry_remove_thread(struct fha_hash_entry *fhe, SVCTHREAD *thread)
> {
>
> + mtx_assert(fhe->mtx, MA_OWNED);
> + KASSERT(thread->st_p2 == 0,
> + ("%d reqs on removed thread %p", thread->st_p2, thread));
> LIST_REMOVE(thread, st_alink);
> fhe->num_threads--;
> }
> @@ -280,6 +277,7 @@ static void
> fha_hash_entry_add_op(struct fha_hash_entry *fhe, int locktype, int count)
> {
>
> + mtx_assert(fhe->mtx, MA_OWNED);
> if (LK_EXCLUSIVE == locktype)
> fhe->num_exclusive += count;
> else
> @@ -306,7 +304,7 @@ fha_hash_entry_choose_thread(struct fha_
> pool = *softc->pool;
>
> LIST_FOREACH(thread, &fhe->threads, st_alink) {
> - req_count = thread->st_reqcount;
> + req_count = thread->st_p2;
>
> /* If there are any writes in progress, use the first thread. */
> if (fhe->num_exclusive) {
> @@ -322,7 +320,7 @@ fha_hash_entry_choose_thread(struct fha_
> * exceed our per-thread load limit in the process.
> */
> offset1 = i->offset;
> - offset2 = STAILQ_FIRST(&thread->st_reqs)->rq_p3;
> + offset2 = thread->st_p3;
>
> if (((offset1 >= offset2)
> && ((offset1 - offset2) < (1 << softc->ctls.bin_shift)))
> @@ -360,28 +358,11 @@ fha_hash_entry_choose_thread(struct fha_
> */
> if ((softc->ctls.max_nfsds_per_fh == 0) ||
> (fhe->num_threads < softc->ctls.max_nfsds_per_fh)) {
> - /*
> - * We can add a new thread, so try for an idle thread
> - * first, and fall back to this_thread if none are idle.
> - */
> - if (STAILQ_EMPTY(&this_thread->st_reqs)) {
> - thread = this_thread;
> + thread = this_thread;
> #if 0
> - ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO,
> - "fha: %p(%d)t", thread, thread->st_reqcount);
> -#endif
> - } else if ((thread = LIST_FIRST(&pool->sp_idlethreads))) {
> -#if 0
> - ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO,
> - "fha: %p(%d)i", thread, thread->st_reqcount);
> -#endif
> - } else {
> - thread = this_thread;
> -#if 0
> - ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO,
> - "fha: %p(%d)b", thread, thread->st_reqcount);
> + ITRACE_CURPROC(ITRACE_NFS, ITRACE_INFO,
> + "fha: %p(%d)t", thread, thread->st_p2);
> #endif
> - }
> fha_hash_entry_add_thread(fhe, thread);
> } else {
> /*
> @@ -411,16 +392,16 @@ fha_assign(SVCTHREAD *this_thread, struc
>
> /* Check to see whether we're enabled. */
> if (softc->ctls.enable == 0)
> - return (this_thread);
> + goto thist;
>
> /*
> * Only do placement if this is an NFS request.
> */
> if (req->rq_prog != NFS_PROG)
> - return (this_thread);
> + goto thist;
>
> if (req->rq_vers != 2 && req->rq_vers != 3)
> - return (this_thread);
> + goto thist;
>
> fha_extract_info(req, &i, cb);
>
> @@ -440,8 +421,21 @@ fha_assign(SVCTHREAD *this_thread, struc
> thread = fha_hash_entry_choose_thread(softc, fhe, &i, this_thread);
> KASSERT(thread, ("fha_assign: NULL thread!"));
> fha_hash_entry_add_op(fhe, i.locktype, 1);
> + thread->st_p2++;
> + thread->st_p3 = i.offset;
> +
> + /*
> + * Grab the pool lock here to not let chosen thread go away before
> + * the new request inserted to its queue while we drop fhe lock.
> + */
> + mtx_lock(&(*softc->pool)->sp_lock);
> + mtx_unlock(fhe->mtx);
>
> return (thread);
> +thist:
> + req->rq_p1 = NULL;
> + mtx_lock(&(*softc->pool)->sp_lock);
> + return (this_thread);
> }
>
> /*
> @@ -452,6 +446,7 @@ void
> fha_nd_complete(SVCTHREAD *thread, struct svc_req *req)
> {
> struct fha_hash_entry *fhe = req->rq_p1;
> + struct mtx *mtx;
>
> /*
> * This may be called for reqs that didn't go through
> @@ -460,13 +455,18 @@ fha_nd_complete(SVCTHREAD *thread, struc
> if (!fhe)
> return;
>
> + mtx = fhe->mtx;
> + mtx_lock(mtx);
> fha_hash_entry_add_op(fhe, req->rq_p2, -1);
> -
> - if (thread->st_reqcount == 0) {
> + thread->st_p2--;
> + KASSERT(thread->st_p2 >= 0, ("Negative request count %d on %p",
> + thread->st_p2, thread));
> + if (thread->st_p2 == 0) {
> fha_hash_entry_remove_thread(fhe, thread);
> if (0 == fhe->num_rw + fhe->num_exclusive)
> fha_hash_entry_remove(fhe);
> }
> + mtx_unlock(mtx);
> }
>
> int
> @@ -489,10 +489,9 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, st
> }
> pool = *softc->pool;
>
> - mtx_lock(&pool->sp_lock);
> count = 0;
> - for (i = 0; i <= softc->g_fha.hashmask; i++)
> - if (!LIST_EMPTY(&softc->g_fha.hashtable[i]))
> + for (i = 0; i < FHA_HASH_SIZE; i++)
> + if (!LIST_EMPTY(&softc->fha_hash[i].list))
> count++;
>
> if (count == 0) {
> @@ -500,8 +499,9 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, st
> goto out;
> }
>
> - for (i = 0; i <= softc->g_fha.hashmask; i++) {
> - LIST_FOREACH(fhe, &softc->g_fha.hashtable[i], link) {
> + for (i = 0; i < FHA_HASH_SIZE; i++) {
> + mtx_lock(&softc->fha_hash[i].mtx);
> + LIST_FOREACH(fhe, &softc->fha_hash[i].list, link) {
> sbuf_printf(&sb, "%sfhe %p: {\n", first ? "" : ", ", fhe);
>
> sbuf_printf(&sb, " fh: %ju\n", (uintmax_t) fhe->fh);
> @@ -512,8 +512,7 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, st
> LIST_FOREACH(thread, &fhe->threads, st_alink) {
> sbuf_printf(&sb, " thread %p offset %ju "
> "(count %d)\n", thread,
> - STAILQ_FIRST(&thread->st_reqs)->rq_p3,
> - thread->st_reqcount);
> + thread->st_p3, thread->st_p2);
> }
>
> sbuf_printf(&sb, "}");
> @@ -525,11 +524,10 @@ fhe_stats_sysctl(SYSCTL_HANDLER_ARGS, st
> break;
> }
> }
> + mtx_unlock(&softc->fha_hash[i].mtx);
> }
>
> out:
> - if (pool)
> - mtx_unlock(&pool->sp_lock);
> sbuf_trim(&sb);
> sbuf_finish(&sb);
> error = sysctl_handle_string(oidp, sbuf_data(&sb), sbuf_len(&sb), req);
>
> Modified: head/sys/nfs/nfs_fha.h
> ==============================================================================
> --- head/sys/nfs/nfs_fha.h Mon Dec 30 20:15:46 2013 (r260096)
> +++ head/sys/nfs/nfs_fha.h Mon Dec 30 20:23:15 2013 (r260097)
> @@ -35,11 +35,7 @@
> #define FHA_DEF_MAX_NFSDS_PER_FH 8
> #define FHA_DEF_MAX_REQS_PER_NFSD 0 /* Unlimited */
>
> -/* This is the global structure that represents the state of the fha system. */
> -struct fha_global {
> - struct fha_hash_entry_list *hashtable;
> - u_long hashmask;
> -};
> +#define FHA_HASH_SIZE 251
>
> struct fha_ctls {
> int enable;
> @@ -62,6 +58,7 @@ struct fha_ctls {
> * avoid contention between threads over single files.
> */
> struct fha_hash_entry {
> + struct mtx *mtx;
> LIST_ENTRY(fha_hash_entry) link;
> u_int64_t fh;
> u_int32_t num_rw;
> @@ -72,6 +69,11 @@ struct fha_hash_entry {
>
> LIST_HEAD(fha_hash_entry_list, fha_hash_entry);
>
> +struct fha_hash_slot {
> + struct fha_hash_entry_list list;
> + struct mtx mtx;
> +};
> +
> /* A structure used for passing around data internally. */
> struct fha_info {
> u_int64_t fh;
> @@ -93,7 +95,7 @@ struct fha_callbacks {
> };
>
> struct fha_params {
> - struct fha_global g_fha;
> + struct fha_hash_slot fha_hash[FHA_HASH_SIZE];
> struct sysctl_ctx_list sysctl_ctx;
> struct sysctl_oid *sysctl_tree;
> struct fha_ctls ctls;
>
> Modified: head/sys/rpc/svc.c
> ==============================================================================
> --- head/sys/rpc/svc.c Mon Dec 30 20:15:46 2013 (r260096)
> +++ head/sys/rpc/svc.c Mon Dec 30 20:23:15 2013 (r260097)
> @@ -71,6 +71,8 @@ static struct svc_callout *svc_find(SVCP
> char *);
> static void svc_new_thread(SVCPOOL *pool);
> static void xprt_unregister_locked(SVCXPRT *xprt);
> +static void svc_change_space_used(SVCPOOL *pool, int delta);
> +static bool_t svc_request_space_available(SVCPOOL *pool);
>
> /* *************** SVCXPRT related stuff **************** */
>
> @@ -373,7 +375,8 @@ xprt_active(SVCXPRT *xprt)
> if (!xprt->xp_active) {
> xprt->xp_active = TRUE;
> if (xprt->xp_thread == NULL) {
> - if (!xprt_assignthread(xprt))
> + if (!svc_request_space_available(pool) ||
> + !xprt_assignthread(xprt))
> TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
> xp_alink);
> }
> @@ -965,56 +968,63 @@ svc_assign_waiting_sockets(SVCPOOL *pool
> {
> SVCXPRT *xprt;
>
> + mtx_lock(&pool->sp_lock);
> while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
> if (xprt_assignthread(xprt))
> TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
> else
> break;
> }
> + mtx_unlock(&pool->sp_lock);
> }
>
> -static bool_t
> -svc_request_space_available(SVCPOOL *pool)
> +static void
> +svc_change_space_used(SVCPOOL *pool, int delta)
> {
> + unsigned int value;
>
> - mtx_assert(&pool->sp_lock, MA_OWNED);
> -
> - if (pool->sp_space_throttled) {
> - /*
> - * Below the low-water yet? If so, assign any waiting sockets.
> - */
> - if (pool->sp_space_used < pool->sp_space_low) {
> - pool->sp_space_throttled = FALSE;
> - svc_assign_waiting_sockets(pool);
> - return TRUE;
> - }
> -
> - return FALSE;
> - } else {
> - if (pool->sp_space_used
> - >= pool->sp_space_high) {
> + value = atomic_fetchadd_int(&pool->sp_space_used, delta) + delta;
> + if (delta > 0) {
> + if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
> pool->sp_space_throttled = TRUE;
> pool->sp_space_throttle_count++;
> - return FALSE;
> }
> -
> - return TRUE;
> + if (value > pool->sp_space_used_highest)
> + pool->sp_space_used_highest = value;
> + } else {
> + if (value < pool->sp_space_low && pool->sp_space_throttled) {
> + pool->sp_space_throttled = FALSE;
> + svc_assign_waiting_sockets(pool);
> + }
> }
> }
>
> +static bool_t
> +svc_request_space_available(SVCPOOL *pool)
> +{
> +
> + if (pool->sp_space_throttled)
> + return (FALSE);
> + return (TRUE);
> +}
> +
> static void
> svc_run_internal(SVCPOOL *pool, bool_t ismaster)
> {
> + struct svc_reqlist reqs;
> SVCTHREAD *st, *stpref;
> SVCXPRT *xprt;
> enum xprt_stat stat;
> struct svc_req *rqstp;
> + size_t sz;
> int error;
>
> st = mem_alloc(sizeof(*st));
> + st->st_pool = pool;
> st->st_xprt = NULL;
> STAILQ_INIT(&st->st_reqs);
> cv_init(&st->st_cond, "rpcsvc");
> + STAILQ_INIT(&reqs);
>
> mtx_lock(&pool->sp_lock);
> LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
> @@ -1108,15 +1118,14 @@ svc_run_internal(SVCPOOL *pool, bool_t i
> * RPCs.
> */
> xprt->xp_lastactive = time_uptime;
> - stat = XPRT_IDLE;
> do {
> + mtx_unlock(&pool->sp_lock);
> if (!svc_request_space_available(pool))
> break;
> rqstp = NULL;
> - mtx_unlock(&pool->sp_lock);
> stat = svc_getreq(xprt, &rqstp);
> - mtx_lock(&pool->sp_lock);
> if (rqstp) {
> + svc_change_space_used(pool, rqstp->rq_size);
> /*
> * See if the application has
> * a preference for some other
> @@ -1126,17 +1135,12 @@ svc_run_internal(SVCPOOL *pool, bool_t i
> if (pool->sp_assign)
> stpref = pool->sp_assign(st,
> rqstp);
> + else
> + mtx_lock(&pool->sp_lock);
>
> - pool->sp_space_used +=
> - rqstp->rq_size;
> - if (pool->sp_space_used
> - > pool->sp_space_used_highest)
> - pool->sp_space_used_highest =
> - pool->sp_space_used;
> rqstp->rq_thread = stpref;
> STAILQ_INSERT_TAIL(&stpref->st_reqs,
> rqstp, rq_link);
> - stpref->st_reqcount++;
>
> /*
> * If we assigned the request
> @@ -1156,7 +1160,8 @@ svc_run_internal(SVCPOOL *pool, bool_t i
> stpref->st_idle = FALSE;
> cv_signal(&stpref->st_cond);
> }
> - }
> + } else
> + mtx_lock(&pool->sp_lock);
> } while (stat == XPRT_MOREREQS
> && pool->sp_state != SVCPOOL_CLOSING);
>
> @@ -1171,25 +1176,30 @@ svc_run_internal(SVCPOOL *pool, bool_t i
> xprt->xp_thread = NULL;
> st->st_xprt = NULL;
> if (xprt->xp_active) {
> - if (!xprt_assignthread(xprt))
> + if (!svc_request_space_available(pool) ||
> + !xprt_assignthread(xprt))
> TAILQ_INSERT_TAIL(&pool->sp_active,
> xprt, xp_alink);
> }
> + STAILQ_CONCAT(&reqs, &st->st_reqs);
> mtx_unlock(&pool->sp_lock);
> SVC_RELEASE(xprt);
> - mtx_lock(&pool->sp_lock);
> + } else {
> + STAILQ_CONCAT(&reqs, &st->st_reqs);
> + mtx_unlock(&pool->sp_lock);
> }
>
> /*
> * Execute what we have queued.
> */
> - while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
> - size_t sz = rqstp->rq_size;
> - mtx_unlock(&pool->sp_lock);
> + sz = 0;
> + while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) {
> + STAILQ_REMOVE_HEAD(&reqs, rq_link);
> + sz += rqstp->rq_size;
> svc_executereq(rqstp);
> - mtx_lock(&pool->sp_lock);
> - pool->sp_space_used -= sz;
> }
> + svc_change_space_used(pool, -sz);
> + mtx_lock(&pool->sp_lock);
> }
>
> if (st->st_xprt) {
> @@ -1309,24 +1319,13 @@ void
> svc_freereq(struct svc_req *rqstp)
> {
> SVCTHREAD *st;
> - SVCXPRT *xprt;
> SVCPOOL *pool;
>
> st = rqstp->rq_thread;
> - xprt = rqstp->rq_xprt;
> - if (xprt)
> - pool = xprt->xp_pool;
> - else
> - pool = NULL;
> if (st) {
> - mtx_lock(&pool->sp_lock);
> - KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
> - ("Freeing request out of order"));
> - STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
> - st->st_reqcount--;
> + pool = st->st_pool;
> if (pool->sp_done)
> pool->sp_done(st, rqstp);
> - mtx_unlock(&pool->sp_lock);
> }
>
> if (rqstp->rq_auth.svc_ah_ops)
>
> Modified: head/sys/rpc/svc.h
> ==============================================================================
> --- head/sys/rpc/svc.h Mon Dec 30 20:15:46 2013 (r260096)
> +++ head/sys/rpc/svc.h Mon Dec 30 20:23:15 2013 (r260097)
> @@ -275,14 +275,16 @@ STAILQ_HEAD(svc_reqlist, svc_req);
> * thread to read and execute pending RPCs.
> */
> typedef struct __rpc_svcthread {
> + struct __rpc_svcpool *st_pool;
> SVCXPRT *st_xprt; /* transport we are processing */
> struct svc_reqlist st_reqs; /* RPC requests to execute */
> - int st_reqcount; /* number of queued reqs */
> int st_idle; /* thread is on idle list */
> struct cv st_cond; /* sleeping for work */
> LIST_ENTRY(__rpc_svcthread) st_link; /* all threads list */
> LIST_ENTRY(__rpc_svcthread) st_ilink; /* idle threads list */
> LIST_ENTRY(__rpc_svcthread) st_alink; /* application thread list */
> + int st_p2; /* application workspace */
> + uint64_t st_p3; /* application workspace */
> } SVCTHREAD;
> LIST_HEAD(svcthread_list, __rpc_svcthread);
>
More information about the svn-src-all
mailing list