svn commit: r260229 - in head/sys: fs/nfs fs/nfsserver rpc

Alexander Motin mav at FreeBSD.org
Fri Jan 3 15:10:02 UTC 2014


Author: mav
Date: Fri Jan  3 15:09:59 2014
New Revision: 260229
URL: http://svnweb.freebsd.org/changeset/base/260229

Log:
  Rework NFS Duplicate Request Cache cleanup logic.
  
   - Introduce additional hash to group requests by hash of sockref.  This
  allows to process TCP acknowledgements without looping though all the cache,
  and as result allows to do it every time.
   - Indroduce additional callbacks to notify application layer about sockets
  disconnection.  Without this last few requests processed just before socket
  disconnection never processed their ACKs and stuck in cache for many hours.
   - Implement transport-specific method for tracking reply acknowledgements.
  New implementation does not cross multiple stack layers to get the data and
  does not have race conditions that previously made some requests stuck
  in cache.  This could be done more efficiently at sockbuf layer, but that
  would broke some KBIs, while I don't know other consumers for it aside NFS.
   - Instead of traversing all DRC twice per request, run cleaning only once
  per request, and except in some conditions traverse only single hash slot
  at a time.
  
  Together this limits NFS DRC growth only to situations of real connectivity
  problems.  If network is working well, and so all replies are acknowledged,
  cache remains almost empty even after hours of heavy load.  Without this
  change on the same test cache was growing to many thousand requests even
  with perfectly working local network.
  
  As another result this reduces CPU time spent on the DRC handling during
  SPEC NFS benchmark from about 10% to 0.5%.
  
  Sponsored by:	iXsystems, Inc.

Modified:
  head/sys/fs/nfs/nfs_var.h
  head/sys/fs/nfs/nfsrvcache.h
  head/sys/fs/nfsserver/nfs_nfsdcache.c
  head/sys/fs/nfsserver/nfs_nfsdkrpc.c
  head/sys/fs/nfsserver/nfs_nfsdport.c
  head/sys/fs/nfsserver/nfs_nfsdsubs.c
  head/sys/rpc/svc.c
  head/sys/rpc/svc.h
  head/sys/rpc/svc_dg.c
  head/sys/rpc/svc_vc.c

Modified: head/sys/fs/nfs/nfs_var.h
==============================================================================
--- head/sys/fs/nfs/nfs_var.h	Fri Jan  3 14:33:25 2014	(r260228)
+++ head/sys/fs/nfs/nfs_var.h	Fri Jan  3 15:09:59 2014	(r260229)
@@ -218,14 +218,14 @@ void nfsrvd_dorpc(struct nfsrv_descript 
 
 /* nfs_nfsdcache.c */
 void nfsrvd_initcache(void);
-int nfsrvd_getcache(struct nfsrv_descript *, struct socket *);
-struct nfsrvcache *nfsrvd_updatecache(struct nfsrv_descript *,
-    struct socket *);
-void nfsrvd_sentcache(struct nfsrvcache *, struct socket *, int);
+int nfsrvd_getcache(struct nfsrv_descript *);
+struct nfsrvcache *nfsrvd_updatecache(struct nfsrv_descript *);
+void nfsrvd_sentcache(struct nfsrvcache *, uint32_t);
 void nfsrvd_cleancache(void);
 void nfsrvd_refcache(struct nfsrvcache *);
 void nfsrvd_derefcache(struct nfsrvcache *);
 void nfsrvd_delcache(struct nfsrvcache *);
+void nfsrc_trimcache(uint64_t, uint32_t, int);
 
 /* nfs_commonsubs.c */
 void newnfs_init(void);
@@ -327,9 +327,6 @@ int nfsd_checkrootexp(struct nfsrv_descr
 void nfscl_retopts(struct nfsmount *, char *, size_t);
 
 /* nfs_commonport.c */
-int nfsrv_checksockseqnum(struct socket *, tcp_seq);
-int nfsrv_getsockseqnum(struct socket *, tcp_seq *);
-int nfsrv_getsocksndseq(struct socket *, tcp_seq *, tcp_seq *);
 int nfsrv_lookupfilename(struct nameidata *, char *, NFSPROC_T *);
 void nfsrv_object_create(vnode_t, NFSPROC_T *);
 int nfsrv_mallocmget_limit(void);

Modified: head/sys/fs/nfs/nfsrvcache.h
==============================================================================
--- head/sys/fs/nfs/nfsrvcache.h	Fri Jan  3 14:33:25 2014	(r260228)
+++ head/sys/fs/nfs/nfsrvcache.h	Fri Jan  3 15:09:59 2014	(r260229)
@@ -46,6 +46,7 @@
 /* Cache table entry. */
 struct nfsrvcache {
 	LIST_ENTRY(nfsrvcache) rc_hash;		/* Hash chain */
+	LIST_ENTRY(nfsrvcache) rc_ahash;	/* ACK hash chain */
 	TAILQ_ENTRY(nfsrvcache)	rc_lru;		/* UDP lru chain */
 	u_int32_t	rc_xid;			/* rpc id number */
 	time_t		rc_timestamp;		/* Time done */
@@ -64,6 +65,7 @@ struct nfsrvcache {
 			int16_t		refcnt;
 			u_int16_t	cksum;
 			time_t		cachetime;
+			int		acked;
 		} ot;
 	} rc_un2;
 	u_int16_t	rc_proc;		/* rpc proc number */
@@ -81,6 +83,13 @@ struct nfsrvcache {
 #define	rc_reqlen	rc_un2.ot.len
 #define	rc_cksum	rc_un2.ot.cksum
 #define	rc_cachetime	rc_un2.ot.cachetime
+#define	rc_acked	rc_un2.ot.acked
+
+/* TCP ACK values */
+#define	RC_NO_SEQ		0
+#define	RC_NO_ACK		1
+#define	RC_ACK			2
+#define	RC_NACK			3
 
 /* Return values */
 #define	RC_DROPIT		0
@@ -95,7 +104,6 @@ struct nfsrvcache {
 #define	RC_UDP		0x0010
 #define	RC_INETIPV6	0x0020
 #define	RC_INPROG	0x0040
-#define	RC_TCPSEQ	0x0080
 #define	RC_NFSV2	0x0100
 #define	RC_NFSV3	0x0200
 #define	RC_NFSV4	0x0400

Modified: head/sys/fs/nfsserver/nfs_nfsdcache.c
==============================================================================
--- head/sys/fs/nfsserver/nfs_nfsdcache.c	Fri Jan  3 14:33:25 2014	(r260228)
+++ head/sys/fs/nfsserver/nfs_nfsdcache.c	Fri Jan  3 15:09:59 2014	(r260229)
@@ -162,6 +162,7 @@ __FBSDID("$FreeBSD$");
 extern struct nfsstats newnfsstats;
 extern struct mtx nfsrc_udpmtx;
 extern struct nfsrchash_bucket nfsrchash_table[NFSRVCACHE_HASHSIZE];
+extern struct nfsrchash_bucket nfsrcahash_table[NFSRVCACHE_HASHSIZE];
 int nfsrc_floodlevel = NFSRVCACHE_FLOODLEVEL, nfsrc_tcpsavedreplies = 0;
 #endif	/* !APPLEKEXT */
 
@@ -238,6 +239,7 @@ static int newnfsv2_procid[NFS_V3NPROCS]
 	(&nfsrvudphashtbl[nfsrc_hash(xid)])
 #define	NFSRCHASH(xid) \
 	(&nfsrchash_table[nfsrc_hash(xid)].tbl)
+#define	NFSRCAHASH(xid) (&nfsrcahash_table[nfsrc_hash(xid)])
 #define	TRUE	1
 #define	FALSE	0
 #define	NFSRVCACHE_CHECKLEN	100
@@ -281,9 +283,6 @@ static void nfsrc_lock(struct nfsrvcache
 static void nfsrc_unlock(struct nfsrvcache *rp);
 static void nfsrc_wanted(struct nfsrvcache *rp);
 static void nfsrc_freecache(struct nfsrvcache *rp);
-static void nfsrc_trimcache(u_int64_t, struct socket *);
-static int nfsrc_activesocket(struct nfsrvcache *rp, u_int64_t,
-    struct socket *);
 static int nfsrc_getlenandcksum(mbuf_t m1, u_int16_t *cksum);
 static void nfsrc_marksametcpconn(u_int64_t);
 
@@ -314,6 +313,7 @@ nfsrvd_initcache(void)
 	for (i = 0; i < NFSRVCACHE_HASHSIZE; i++) {
 		LIST_INIT(&nfsrvudphashtbl[i]);
 		LIST_INIT(&nfsrchash_table[i].tbl);
+		LIST_INIT(&nfsrcahash_table[i].tbl);
 	}
 	TAILQ_INIT(&nfsrvudplru);
 	nfsrc_tcpsavedreplies = 0;
@@ -325,10 +325,9 @@ nfsrvd_initcache(void)
 /*
  * Get a cache entry for this request. Basically just malloc a new one
  * and then call nfsrc_getudp() or nfsrc_gettcp() to do the rest.
- * Call nfsrc_trimcache() to clean up the cache before returning.
  */
 APPLESTATIC int
-nfsrvd_getcache(struct nfsrv_descript *nd, struct socket *so)
+nfsrvd_getcache(struct nfsrv_descript *nd)
 {
 	struct nfsrvcache *newrp;
 	int ret;
@@ -356,7 +355,6 @@ nfsrvd_getcache(struct nfsrv_descript *n
 	} else {
 		ret = nfsrc_gettcp(nd, newrp);
 	}
-	nfsrc_trimcache(nd->nd_sockref, so);
 	NFSEXITCODE2(0, nd);
 	return (ret);
 }
@@ -456,7 +454,7 @@ out:
  * Update a request cache entry after the rpc has been done
  */
 APPLESTATIC struct nfsrvcache *
-nfsrvd_updatecache(struct nfsrv_descript *nd, struct socket *so)
+nfsrvd_updatecache(struct nfsrv_descript *nd)
 {
 	struct nfsrvcache *rp;
 	struct nfsrvcache *retrp = NULL;
@@ -549,7 +547,6 @@ nfsrvd_updatecache(struct nfsrv_descript
 	}
 
 out:
-	nfsrc_trimcache(nd->nd_sockref, so);
 	NFSEXITCODE2(0, nd);
 	return (retrp);
 }
@@ -575,30 +572,22 @@ nfsrvd_delcache(struct nfsrvcache *rp)
 
 /*
  * Called after nfsrvd_updatecache() once the reply is sent, to update
- * the entry for nfsrc_activesocket() and unlock it. The argument is
+ * the entry's sequence number and unlock it. The argument is
  * the pointer returned by nfsrvd_updatecache().
  */
 APPLESTATIC void
-nfsrvd_sentcache(struct nfsrvcache *rp, struct socket *so, int err)
+nfsrvd_sentcache(struct nfsrvcache *rp, uint32_t seq)
 {
-	tcp_seq tmp_seq;
-	struct mtx *mutex;
+	struct nfsrchash_bucket *hbp;
 
-	mutex = nfsrc_cachemutex(rp);
-	if (!(rp->rc_flag & RC_LOCKED))
-		panic("nfsrvd_sentcache not locked");
-	if (!err) {
-		if ((so->so_proto->pr_domain->dom_family != AF_INET &&
-		     so->so_proto->pr_domain->dom_family != AF_INET6) ||
-		     so->so_proto->pr_protocol != IPPROTO_TCP)
-			panic("nfs sent cache");
-		if (nfsrv_getsockseqnum(so, &tmp_seq)) {
-			mtx_lock(mutex);
-			rp->rc_tcpseq = tmp_seq;
-			rp->rc_flag |= RC_TCPSEQ;
-			mtx_unlock(mutex);
-		}
-	}
+	KASSERT(rp->rc_flag & RC_LOCKED, ("nfsrvd_sentcache not locked"));
+	hbp = NFSRCAHASH(rp->rc_sockref);
+	mtx_lock(&hbp->mtx);
+	rp->rc_tcpseq = seq;
+	if (rp->rc_acked != RC_NO_ACK)
+		LIST_INSERT_HEAD(&hbp->tbl, rp, rc_ahash);
+	rp->rc_acked = RC_NO_ACK;
+	mtx_unlock(&hbp->mtx);
 	nfsrc_unlock(rp);
 }
 
@@ -790,11 +779,18 @@ nfsrc_wanted(struct nfsrvcache *rp)
 static void
 nfsrc_freecache(struct nfsrvcache *rp)
 {
+	struct nfsrchash_bucket *hbp;
 
 	LIST_REMOVE(rp, rc_hash);
 	if (rp->rc_flag & RC_UDP) {
 		TAILQ_REMOVE(&nfsrvudplru, rp, rc_lru);
 		nfsrc_udpcachesize--;
+	} else if (rp->rc_acked != RC_NO_SEQ) {
+		hbp = NFSRCAHASH(rp->rc_sockref);
+		mtx_lock(&hbp->mtx);
+		if (rp->rc_acked == RC_NO_ACK)
+			LIST_REMOVE(rp, rc_ahash);
+		mtx_unlock(&hbp->mtx);
 	}
 	nfsrc_wanted(rp);
 	if (rp->rc_flag & RC_REPMBUF) {
@@ -836,14 +832,32 @@ nfsrvd_cleancache(void)
 /*
  * The basic rule is to get rid of entries that are expired.
  */
-static void
-nfsrc_trimcache(u_int64_t sockref, struct socket *so)
+void
+nfsrc_trimcache(u_int64_t sockref, uint32_t snd_una, int final)
 {
+	struct nfsrchash_bucket *hbp;
 	struct nfsrvcache *rp, *nextrp;
-	int i, j, k, tto, time_histo[HISTSIZE];
+	int force, lastslot, i, j, k, tto, time_histo[HISTSIZE];
 	time_t thisstamp;
 	static time_t udp_lasttrim = 0, tcp_lasttrim = 0;
-	static int onethread = 0;
+	static int onethread = 0, oneslot = 0;
+
+	if (sockref != 0) {
+		hbp = NFSRCAHASH(sockref);
+		mtx_lock(&hbp->mtx);
+		LIST_FOREACH_SAFE(rp, &hbp->tbl, rc_ahash, nextrp) {
+			if (sockref == rp->rc_sockref) {
+				if (SEQ_GEQ(snd_una, rp->rc_tcpseq)) {
+					rp->rc_acked = RC_ACK;
+					LIST_REMOVE(rp, rc_ahash);
+				} else if (final) {
+					rp->rc_acked = RC_NACK;
+					LIST_REMOVE(rp, rc_ahash);
+				}
+			}
+		}
+		mtx_unlock(&hbp->mtx);
+	}
 
 	if (atomic_cmpset_acq_int(&onethread, 0, 1) == 0)
 		return;
@@ -864,13 +878,28 @@ nfsrc_trimcache(u_int64_t sockref, struc
 	}
 	if (NFSD_MONOSEC != tcp_lasttrim ||
 	    nfsrc_tcpsavedreplies >= nfsrc_tcphighwater) {
-		for (i = 0; i < HISTSIZE; i++)
-			time_histo[i] = 0;
+		force = nfsrc_tcphighwater / 4;
+		if (force > 0 &&
+		    nfsrc_tcpsavedreplies + force >= nfsrc_tcphighwater) {
+			for (i = 0; i < HISTSIZE; i++)
+				time_histo[i] = 0;
+			i = 0;
+			lastslot = NFSRVCACHE_HASHSIZE;
+		} else {
+			force = 0;
+			if (NFSD_MONOSEC != tcp_lasttrim) {
+				i = 0;
+				lastslot = NFSRVCACHE_HASHSIZE - 1;
+			} else {
+				lastslot = i = oneslot;
+				if (++oneslot >= NFSRVCACHE_HASHSIZE)
+					oneslot = 0;
+			}
+		}
 		tto = nfsrc_tcptimeout;
-		for (i = 0; i < NFSRVCACHE_HASHSIZE; i++) {
+		tcp_lasttrim = NFSD_MONOSEC;
+		for (; i <= lastslot; i++) {
 			mtx_lock(&nfsrchash_table[i].mtx);
-			if (i == 0)
-				tcp_lasttrim = NFSD_MONOSEC;
 			LIST_FOREACH_SAFE(rp, &nfsrchash_table[i].tbl, rc_hash,
 			    nextrp) {
 				if (!(rp->rc_flag &
@@ -878,12 +907,12 @@ nfsrc_trimcache(u_int64_t sockref, struc
 				     && rp->rc_refcnt == 0) {
 					if ((rp->rc_flag & RC_REFCNT) ||
 					    tcp_lasttrim > rp->rc_timestamp ||
-					    nfsrc_activesocket(rp, sockref, so)) {
+					    rp->rc_acked == RC_ACK) {
 						nfsrc_freecache(rp);
 						continue;
 					}
 
-					if (nfsrc_tcphighwater == 0)
+					if (force == 0)
 						continue;
 					/*
 					 * The timestamps range from roughly the
@@ -903,8 +932,7 @@ nfsrc_trimcache(u_int64_t sockref, struc
 			}
 			mtx_unlock(&nfsrchash_table[i].mtx);
 		}
-		j = nfsrc_tcphighwater / 5;	/* 20% of it */
-		if (j > 0 && (nfsrc_tcpsavedreplies + j) > nfsrc_tcphighwater) {
+		if (force) {
 			/*
 			 * Trim some more with a smaller timeout of as little
 			 * as 20% of nfsrc_tcptimeout to try and get below
@@ -913,7 +941,7 @@ nfsrc_trimcache(u_int64_t sockref, struc
 			k = 0;
 			for (i = 0; i < (HISTSIZE - 2); i++) {
 				k += time_histo[i];
-				if (k > j)
+				if (k > force)
 					break;
 			}
 			k = tto * (i + 1) / HISTSIZE;
@@ -929,8 +957,7 @@ nfsrc_trimcache(u_int64_t sockref, struc
 					     && rp->rc_refcnt == 0
 					     && ((rp->rc_flag & RC_REFCNT) ||
 						 thisstamp > rp->rc_timestamp ||
-						 nfsrc_activesocket(rp, sockref,
-						    so)))
+						 rp->rc_acked == RC_ACK))
 						nfsrc_freecache(rp);
 				}
 				mtx_unlock(&nfsrchash_table[i].mtx);
@@ -975,28 +1002,6 @@ nfsrvd_derefcache(struct nfsrvcache *rp)
 }
 
 /*
- * Check to see if the socket is active.
- * Return 1 if the reply has been received/acknowledged by the client,
- * 0 otherwise.
- * XXX - Uses tcp internals.
- */
-static int
-nfsrc_activesocket(struct nfsrvcache *rp, u_int64_t cur_sockref,
-    struct socket *cur_so)
-{
-	int ret = 0;
-
-	if (!(rp->rc_flag & RC_TCPSEQ))
-		return (ret);
-	/*
-	 * If the sockref is the same, it is the same TCP connection.
-	 */
-	if (cur_sockref == rp->rc_sockref)
-		ret = nfsrv_checksockseqnum(cur_so, rp->rc_tcpseq);
-	return (ret);
-}
-
-/*
  * Calculate the length of the mbuf list and a checksum on the first up to
  * NFSRVCACHE_CHECKLEN bytes.
  */

Modified: head/sys/fs/nfsserver/nfs_nfsdkrpc.c
==============================================================================
--- head/sys/fs/nfsserver/nfs_nfsdkrpc.c	Fri Jan  3 14:33:25 2014	(r260228)
+++ head/sys/fs/nfsserver/nfs_nfsdkrpc.c	Fri Jan  3 15:09:59 2014	(r260229)
@@ -97,8 +97,8 @@ static int	nfs_maxvers = NFS_VER4;
 SYSCTL_INT(_vfs_nfsd, OID_AUTO, server_max_nfsvers, CTLFLAG_RW,
     &nfs_maxvers, 0, "The highest version of NFS handled by the server");
 
-static int nfs_proc(struct nfsrv_descript *, u_int32_t, struct socket *,
-    u_int64_t, struct nfsrvcache **);
+static int nfs_proc(struct nfsrv_descript *, u_int32_t, SVCXPRT *xprt,
+    struct nfsrvcache **);
 
 extern u_long sb_max_adj;
 extern int newnfs_numnfsd;
@@ -251,8 +251,7 @@ nfssvc_program(struct svc_req *rqst, SVC
 			}
 		}
 
-		cacherep = nfs_proc(&nd, rqst->rq_xid, xprt->xp_socket,
-		    xprt->xp_sockref, &rp);
+		cacherep = nfs_proc(&nd, rqst->rq_xid, xprt, &rp);
 		NFSLOCKV4ROOTMUTEX();
 		nfsv4_relref(&nfsd_suspend_lock);
 		NFSUNLOCKV4ROOTMUTEX();
@@ -287,8 +286,10 @@ nfssvc_program(struct svc_req *rqst, SVC
 	} else if (!svc_sendreply_mbuf(rqst, nd.nd_mreq)) {
 		svcerr_systemerr(rqst);
 	}
-	if (rp != NULL)
-		nfsrvd_sentcache(rp, xprt->xp_socket, 0);
+	if (rp != NULL) {
+		if (rqst->rq_reply_seq != 0 || SVC_ACK(xprt, NULL))
+			nfsrvd_sentcache(rp, rqst->rq_reply_seq);
+	}
 	svc_freereq(rqst);
 
 out:
@@ -300,11 +301,12 @@ out:
  * Return the appropriate cache response.
  */
 static int
-nfs_proc(struct nfsrv_descript *nd, u_int32_t xid, struct socket *so,
-    u_int64_t sockref, struct nfsrvcache **rpp)
+nfs_proc(struct nfsrv_descript *nd, u_int32_t xid, SVCXPRT *xprt,
+    struct nfsrvcache **rpp)
 {
 	struct thread *td = curthread;
 	int cacherep = RC_DOIT, isdgram;
+	uint32_t ack;
 
 	*rpp = NULL;
 	if (nd->nd_nam2 == NULL) {
@@ -336,8 +338,11 @@ nfs_proc(struct nfsrv_descript *nd, u_in
 			nd->nd_flag |= ND_SAMETCPCONN;
 		nd->nd_retxid = xid;
 		nd->nd_tcpconntime = NFSD_MONOSEC;
-		nd->nd_sockref = sockref;
-		cacherep = nfsrvd_getcache(nd, so);
+		nd->nd_sockref = xprt->xp_sockref;
+		cacherep = nfsrvd_getcache(nd);
+		ack = 0;
+		SVC_ACK(xprt, &ack);
+		nfsrc_trimcache(xprt->xp_sockref, ack, 0);
 	}
 
 	/*
@@ -352,13 +357,23 @@ nfs_proc(struct nfsrv_descript *nd, u_in
 			cacherep = RC_DROPIT;
 		else
 			cacherep = RC_REPLY;
-		*rpp = nfsrvd_updatecache(nd, so);
+		*rpp = nfsrvd_updatecache(nd);
 	}
 
 	NFSEXITCODE2(0, nd);
 	return (cacherep);
 }
 
+static void
+nfssvc_loss(SVCXPRT *xprt)
+{
+	uint32_t ack;
+
+	ack = 0;
+	SVC_ACK(xprt, &ack);
+	nfsrc_trimcache(xprt->xp_sockref, ack, 1);
+}
+
 /*
  * Adds a socket to the list for servicing by nfsds.
  */
@@ -399,6 +414,8 @@ nfsrvd_addsock(struct file *fp)
 		if (nfs_maxvers >= NFS_VER4)
 			svc_reg(xprt, NFS_PROG, NFS_VER4, nfssvc_program,
 			    NULL);
+		if (so->so_type == SOCK_STREAM)
+			svc_loss_reg(xprt, nfssvc_loss);
 		SVC_RELEASE(xprt);
 	}
 

Modified: head/sys/fs/nfsserver/nfs_nfsdport.c
==============================================================================
--- head/sys/fs/nfsserver/nfs_nfsdport.c	Fri Jan  3 14:33:25 2014	(r260228)
+++ head/sys/fs/nfsserver/nfs_nfsdport.c	Fri Jan  3 15:09:59 2014	(r260229)
@@ -61,6 +61,7 @@ extern struct nfsv4lock nfsd_suspend_loc
 struct vfsoptlist nfsv4root_opt, nfsv4root_newopt;
 NFSDLOCKMUTEX;
 struct nfsrchash_bucket nfsrchash_table[NFSRVCACHE_HASHSIZE];
+struct nfsrchash_bucket nfsrcahash_table[NFSRVCACHE_HASHSIZE];
 struct mtx nfsrc_udpmtx;
 struct mtx nfs_v4root_mutex;
 struct nfsrvfh nfs_rootfh, nfs_pubfh;
@@ -2881,40 +2882,6 @@ out:
 }
 
 /*
- * Get the tcp socket sequence numbers we need.
- * (Maybe this should be moved to the tcp sources?)
- */
-int
-nfsrv_getsocksndseq(struct socket *so, tcp_seq *maxp, tcp_seq *unap)
-{
-	struct inpcb *inp;
-	struct tcpcb *tp;
-	int error = 0;
-
-	inp = sotoinpcb(so);
-	KASSERT(inp != NULL, ("nfsrv_getsocksndseq: inp == NULL"));
-	INP_RLOCK(inp);
-	if (inp->inp_flags & (INP_TIMEWAIT | INP_DROPPED)) {
-		INP_RUNLOCK(inp);
-		error = EPIPE;
-		goto out;
-	}
-	tp = intotcpcb(inp);
-	if (tp->t_state != TCPS_ESTABLISHED) {
-		INP_RUNLOCK(inp);
-		error = EPIPE;
-		goto out;
-	}
-	*maxp = tp->snd_max;
-	*unap = tp->snd_una;
-	INP_RUNLOCK(inp);
-
-out:
-	NFSEXITCODE(error);
-	return (error);
-}
-
-/*
  * This function needs to test to see if the system is near its limit
  * for memory allocation via malloc() or mget() and return True iff
  * either of these resources are near their limit.
@@ -3340,6 +3307,11 @@ nfsd_modevent(module_t mod, int type, vo
 			    i);
 			mtx_init(&nfsrchash_table[i].mtx,
 			    nfsrchash_table[i].lock_name, NULL, MTX_DEF);
+			snprintf(nfsrcahash_table[i].lock_name,
+			    sizeof(nfsrcahash_table[i].lock_name), "nfsrc_tcpa%d",
+			    i);
+			mtx_init(&nfsrcahash_table[i].mtx,
+			    nfsrcahash_table[i].lock_name, NULL, MTX_DEF);
 		}
 		mtx_init(&nfsrc_udpmtx, "nfs_udpcache_mutex", NULL, MTX_DEF);
 		mtx_init(&nfs_v4root_mutex, "nfs_v4root_mutex", NULL, MTX_DEF);
@@ -3385,8 +3357,10 @@ nfsd_modevent(module_t mod, int type, vo
 			svcpool_destroy(nfsrvd_pool);
 
 		/* and get rid of the locks */
-		for (i = 0; i < NFSRVCACHE_HASHSIZE; i++)
+		for (i = 0; i < NFSRVCACHE_HASHSIZE; i++) {
 			mtx_destroy(&nfsrchash_table[i].mtx);
+			mtx_destroy(&nfsrcahash_table[i].mtx);
+		}
 		mtx_destroy(&nfsrc_udpmtx);
 		mtx_destroy(&nfs_v4root_mutex);
 		mtx_destroy(&nfsv4root_mnt.mnt_mtx);

Modified: head/sys/fs/nfsserver/nfs_nfsdsubs.c
==============================================================================
--- head/sys/fs/nfsserver/nfs_nfsdsubs.c	Fri Jan  3 14:33:25 2014	(r260228)
+++ head/sys/fs/nfsserver/nfs_nfsdsubs.c	Fri Jan  3 15:09:59 2014	(r260229)
@@ -1987,47 +1987,6 @@ nfsmout:
 	return (error);
 }
 
-/*
- * Check the tcp socket sequence number has been acknowledged.
- */
-int
-nfsrv_checksockseqnum(struct socket *so, tcp_seq tcpseqval)
-{
-	tcp_seq maxseq, unaseq;
-	int error, ret;
-
-	error = nfsrv_getsocksndseq(so, &maxseq, &unaseq);
-	if (error)
-		return (0);
-	ret = SEQ_GEQ(unaseq, tcpseqval);
-	return (ret);
-}
-
-/*
- * Get the tcp sequence number to be acknowledged.
- */
-int
-nfsrv_getsockseqnum(struct socket *so, tcp_seq *tcpseqp)
-{
-	tcp_seq maxseq, unaseq;
-	u_int sbcc;
-	int error;
-
-	sbcc = so->so_snd.sb_cc;
-	error = nfsrv_getsocksndseq(so, &maxseq, &unaseq);
-	if (error)
-		return (0);
-	/*
-	 * Set the seq# to a value that will
-	 * be at least the end of the reply.
-	 * When this sequence# is acknowledged
-	 * by the client, the client has received
-	 * the reply.
-	 */
-	*tcpseqp = sbcc + maxseq;
-	return (1);
-}
-
 void
 nfsd_init(void)
 {

Modified: head/sys/rpc/svc.c
==============================================================================
--- head/sys/rpc/svc.c	Fri Jan  3 14:33:25 2014	(r260228)
+++ head/sys/rpc/svc.c	Fri Jan  3 15:09:59 2014	(r260229)
@@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$");
 #include <sys/queue.h>
 #include <sys/socketvar.h>
 #include <sys/systm.h>
+#include <sys/sx.h>
 #include <sys/ucred.h>
 
 #include <rpc/rpc.h>
@@ -93,6 +94,7 @@ svcpool_create(const char *name, struct 
 	TAILQ_INIT(&pool->sp_xlist);
 	TAILQ_INIT(&pool->sp_active);
 	TAILQ_INIT(&pool->sp_callouts);
+	TAILQ_INIT(&pool->sp_lcallouts);
 	LIST_INIT(&pool->sp_threads);
 	LIST_INIT(&pool->sp_idlethreads);
 	pool->sp_minthreads = 1;
@@ -158,6 +160,7 @@ svcpool_destroy(SVCPOOL *pool)
 {
 	SVCXPRT *xprt, *nxprt;
 	struct svc_callout *s;
+	struct svc_loss_callout *sl;
 	struct svcxprt_list cleanup;
 
 	TAILQ_INIT(&cleanup);
@@ -169,12 +172,16 @@ svcpool_destroy(SVCPOOL *pool)
 		TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
 	}
 
-	while (TAILQ_FIRST(&pool->sp_callouts)) {
-		s = TAILQ_FIRST(&pool->sp_callouts);
+	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
 		mtx_unlock(&pool->sp_lock);
 		svc_unreg(pool, s->sc_prog, s->sc_vers);
 		mtx_lock(&pool->sp_lock);
 	}
+	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
+		mtx_unlock(&pool->sp_lock);
+		svc_loss_unreg(pool, sl->slc_dispatch);
+		mtx_lock(&pool->sp_lock);
+	}
 	mtx_unlock(&pool->sp_lock);
 
 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
@@ -511,6 +518,55 @@ svc_unreg(SVCPOOL *pool, const rpcprog_t
 	mtx_unlock(&pool->sp_lock);
 }
 
+/*
+ * Add a service connection loss program to the callout list.
+ * The dispatch routine will be called when some port in ths pool die.
+ */
+bool_t
+svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
+{
+	SVCPOOL *pool = xprt->xp_pool;
+	struct svc_loss_callout *s;
+
+	mtx_lock(&pool->sp_lock);
+	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
+		if (s->slc_dispatch == dispatch)
+			break;
+	}
+	if (s != NULL) {
+		mtx_unlock(&pool->sp_lock);
+		return (TRUE);
+	}
+	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
+	if (s == NULL) {
+		mtx_unlock(&pool->sp_lock);
+		return (FALSE);
+	}
+	s->slc_dispatch = dispatch;
+	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
+	mtx_unlock(&pool->sp_lock);
+	return (TRUE);
+}
+
+/*
+ * Remove a service connection loss program from the callout list.
+ */
+void
+svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
+{
+	struct svc_loss_callout *s;
+
+	mtx_lock(&pool->sp_lock);
+	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
+		if (s->slc_dispatch == dispatch) {
+			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
+			free(s, M_RPC);
+			break;
+		}
+	}
+	mtx_unlock(&pool->sp_lock);
+}
+
 /* ********************** CALLOUT list related stuff ************* */
 
 /*
@@ -554,7 +610,7 @@ svc_sendreply_common(struct svc_req *rqs
 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
 		return (FALSE);
 
-	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 
+	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
 	if (rqstp->rq_addr) {
 		free(rqstp->rq_addr, M_SONAME);
 		rqstp->rq_addr = NULL;
@@ -803,6 +859,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req
 	struct svc_req *r;
 	struct rpc_msg msg;
 	struct mbuf *args;
+	struct svc_loss_callout *s;
 	enum xprt_stat stat;
 
 	/* now receive msgs from xprtprt (support batch calls) */
@@ -831,7 +888,7 @@ svc_getreq(SVCXPRT *xprt, struct svc_req
 				break;
 			case RS_DONE:
 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
-				    repbody);
+				    repbody, &r->rq_reply_seq);
 				if (r->rq_addr) {
 					free(r->rq_addr, M_SONAME);
 					r->rq_addr = NULL;
@@ -881,6 +938,8 @@ call_done:
 		r = NULL;
 	}
 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
+		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
+			(*s->slc_dispatch)(xprt);
 		xprt_unregister(xprt);
 	}
 

Modified: head/sys/rpc/svc.h
==============================================================================
--- head/sys/rpc/svc.h	Fri Jan  3 14:33:25 2014	(r260228)
+++ head/sys/rpc/svc.h	Fri Jan  3 15:09:59 2014	(r260229)
@@ -103,9 +103,11 @@ struct xp_ops {
 	    struct sockaddr **, struct mbuf **);
 	/* get transport status */
 	enum xprt_stat (*xp_stat)(struct __rpc_svcxprt *);
+	/* get transport acknowledge sequence */
+	bool_t (*xp_ack)(struct __rpc_svcxprt *, uint32_t *);
 	/* send reply */
 	bool_t	(*xp_reply)(struct __rpc_svcxprt *, struct rpc_msg *,
-	    struct sockaddr *, struct mbuf *);
+	    struct sockaddr *, struct mbuf *, uint32_t *);
 	/* destroy this struct */
 	void	(*xp_destroy)(struct __rpc_svcxprt *);
 	/* catch-all function */
@@ -166,6 +168,8 @@ typedef struct __rpc_svcxprt {
 	time_t		xp_lastactive;	/* time of last RPC */
 	u_int64_t	xp_sockref;	/* set by nfsv4 to identify socket */
 	int		xp_upcallset;	/* socket upcall is set up */
+	uint32_t	xp_snd_cnt;	/* # of bytes sent to socket */
+	struct sx	xp_snd_lock;	/* protects xp_snd_cnt & sb_cc */
 #else
 	int		xp_fd;
 	u_short		xp_port;	 /* associated port number */
@@ -230,6 +234,17 @@ struct svc_callout {
 };
 TAILQ_HEAD(svc_callout_list, svc_callout);
 
+/*
+ * The services connection loss list
+ * The dispatch routine takes request structs and runs the
+ * apropriate procedure.
+ */
+struct svc_loss_callout {
+	TAILQ_ENTRY(svc_loss_callout) slc_link;
+	void		    (*slc_dispatch)(SVCXPRT *);
+};
+TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout);
+
 struct __rpc_svcthread;
 
 /*
@@ -253,6 +268,7 @@ struct svc_req {
 	void		*rq_p1;		/* application workspace */
 	int		rq_p2;		/* application workspace */
 	uint64_t	rq_p3;		/* application workspace */
+	uint32_t	rq_reply_seq;	/* reply socket sequence # */
 	char		rq_credarea[3*MAX_AUTH_BYTES];
 };
 STAILQ_HEAD(svc_reqlist, svc_req);
@@ -318,6 +334,7 @@ typedef struct __rpc_svcpool {
 	struct svcxprt_list sp_xlist;	/* all transports in the pool */
 	struct svcxprt_list sp_active;	/* transports needing service */
 	struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */
+	struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */
 	struct svcthread_list sp_threads; /* service threads */
 	struct svcthread_list sp_idlethreads; /* idle service threads */
 	int		sp_minthreads;	/* minimum service thread count */
@@ -393,8 +410,12 @@ struct svc_req {
 #define SVC_STAT(xprt)					\
 	(*(xprt)->xp_ops->xp_stat)(xprt)
 
-#define SVC_REPLY(xprt, msg, addr, m)			\
-	(*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m))
+#define SVC_ACK(xprt, ack)				\
+	((xprt)->xp_ops->xp_stat == NULL ? FALSE :	\
+	    ((ack) == NULL ? TRUE : (*(xprt)->xp_ops->xp_ack)((xprt), (ack))))
+
+#define SVC_REPLY(xprt, msg, addr, m, seq)			\
+	(*(xprt)->xp_ops->xp_reply) ((xprt), (msg), (addr), (m), (seq))
 
 #define SVC_DESTROY(xprt)				\
 	(*(xprt)->xp_ops->xp_destroy)(xprt)
@@ -496,6 +517,30 @@ extern void	svc_unreg(const rpcprog_t, c
 __END_DECLS
 
 /*
+ * Service connection loss registration
+ *
+ * svc_loss_reg(xprt, dispatch)
+ *	const SVCXPRT *xprt;
+ *	const void (*dispatch)();
+ */
+
+__BEGIN_DECLS
+extern bool_t	svc_loss_reg(SVCXPRT *, void (*)(SVCXPRT *));
+__END_DECLS
+
+/*
+ * Service connection loss un-registration
+ *
+ * svc_loss_unreg(xprt, dispatch)
+ *	const SVCXPRT *xprt;
+ *	const void (*dispatch)();
+ */
+
+__BEGIN_DECLS
+extern void	svc_loss_unreg(SVCPOOL *, void (*)(SVCXPRT *));
+__END_DECLS
+
+/*
  * Transport registration.
  *
  * xprt_register(xprt)

Modified: head/sys/rpc/svc_dg.c
==============================================================================
--- head/sys/rpc/svc_dg.c	Fri Jan  3 14:33:25 2014	(r260228)
+++ head/sys/rpc/svc_dg.c	Fri Jan  3 15:09:59 2014	(r260229)
@@ -66,7 +66,7 @@ static enum xprt_stat svc_dg_stat(SVCXPR
 static bool_t svc_dg_recv(SVCXPRT *, struct rpc_msg *,
     struct sockaddr **, struct mbuf **);
 static bool_t svc_dg_reply(SVCXPRT *, struct rpc_msg *,
-    struct sockaddr *, struct mbuf *);
+    struct sockaddr *, struct mbuf *, uint32_t *);
 static void svc_dg_destroy(SVCXPRT *);
 static bool_t svc_dg_control(SVCXPRT *, const u_int, void *);
 static int svc_dg_soupcall(struct socket *so, void *arg, int waitflag);
@@ -230,7 +230,7 @@ svc_dg_recv(SVCXPRT *xprt, struct rpc_ms
 
 static bool_t
 svc_dg_reply(SVCXPRT *xprt, struct rpc_msg *msg,
-    struct sockaddr *addr, struct mbuf *m)
+    struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
 {
 	XDR xdrs;
 	struct mbuf *mrep;

Modified: head/sys/rpc/svc_vc.c
==============================================================================
--- head/sys/rpc/svc_vc.c	Fri Jan  3 14:33:25 2014	(r260228)
+++ head/sys/rpc/svc_vc.c	Fri Jan  3 15:09:59 2014	(r260229)
@@ -76,10 +76,11 @@ static void svc_vc_rendezvous_destroy(SV
 static bool_t svc_vc_null(void);
 static void svc_vc_destroy(SVCXPRT *);
 static enum xprt_stat svc_vc_stat(SVCXPRT *);
+static bool_t svc_vc_ack(SVCXPRT *, uint32_t *);
 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
     struct sockaddr **, struct mbuf **);
 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
-    struct sockaddr *, struct mbuf *);
+    struct sockaddr *, struct mbuf *, uint32_t *seq);
 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
     void *in);
@@ -88,7 +89,7 @@ static enum xprt_stat svc_vc_backchannel
 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
     struct sockaddr **, struct mbuf **);
 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
-    struct sockaddr *, struct mbuf *);
+    struct sockaddr *, struct mbuf *, uint32_t *);
 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
     void *in);
 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
@@ -100,7 +101,7 @@ static struct xp_ops svc_vc_rendezvous_o
 	.xp_recv =	svc_vc_rendezvous_recv,
 	.xp_stat =	svc_vc_rendezvous_stat,
 	.xp_reply =	(bool_t (*)(SVCXPRT *, struct rpc_msg *,
-		struct sockaddr *, struct mbuf *))svc_vc_null,
+		struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null,
 	.xp_destroy =	svc_vc_rendezvous_destroy,
 	.xp_control =	svc_vc_rendezvous_control
 };
@@ -108,6 +109,7 @@ static struct xp_ops svc_vc_rendezvous_o
 static struct xp_ops svc_vc_ops = {
 	.xp_recv =	svc_vc_recv,
 	.xp_stat =	svc_vc_stat,
+	.xp_ack =	svc_vc_ack,
 	.xp_reply =	svc_vc_reply,
 	.xp_destroy =	svc_vc_destroy,
 	.xp_control =	svc_vc_control
@@ -159,6 +161,7 @@ svc_vc_create(SVCPOOL *pool, struct sock
 
 	xprt = svc_xprt_alloc();
 	sx_init(&xprt->xp_lock, "xprt->xp_lock");
+	sx_init(&xprt->xp_snd_lock, "xprt->xp_snd_lock");
 	xprt->xp_pool = pool;
 	xprt->xp_socket = so;
 	xprt->xp_p1 = NULL;
@@ -184,8 +187,11 @@ svc_vc_create(SVCPOOL *pool, struct sock
 
 	return (xprt);
 cleanup_svc_vc_create:
-	if (xprt)
+	if (xprt) {
+		sx_destroy(&xprt->xp_snd_lock);
+		sx_destroy(&xprt->xp_lock);
 		svc_xprt_free(xprt);
+	}
 	return (NULL);
 }
 
@@ -231,6 +237,7 @@ svc_vc_create_conn(SVCPOOL *pool, struct
 
 	xprt = svc_xprt_alloc();
 	sx_init(&xprt->xp_lock, "xprt->xp_lock");
+	sx_init(&xprt->xp_snd_lock, "xprt->xp_snd_lock");
 	xprt->xp_pool = pool;
 	xprt->xp_socket = so;
 	xprt->xp_p1 = cd;
@@ -270,7 +277,9 @@ svc_vc_create_conn(SVCPOOL *pool, struct
 	return (xprt);
 cleanup_svc_vc_create:
 	if (xprt) {
-		mem_free(xprt, sizeof(*xprt));
+		sx_destroy(&xprt->xp_snd_lock);
+		sx_destroy(&xprt->xp_lock);
+		svc_xprt_free(xprt);
 	}
 	if (cd)
 		mem_free(cd, sizeof(*cd));
@@ -291,6 +300,7 @@ svc_vc_create_backchannel(SVCPOOL *pool)
 
 	xprt = svc_xprt_alloc();
 	sx_init(&xprt->xp_lock, "xprt->xp_lock");
+	sx_init(&xprt->xp_snd_lock, "xprt->xp_snd_lock");
 	xprt->xp_pool = pool;
 	xprt->xp_socket = NULL;
 	xprt->xp_p1 = cd;
@@ -451,7 +461,6 @@ svc_vc_destroy_common(SVCXPRT *xprt)
 	}
 	SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
 
-	sx_destroy(&xprt->xp_lock);
 	if (xprt->xp_socket)
 		(void)soclose(xprt->xp_socket);
 
@@ -537,6 +546,16 @@ svc_vc_stat(SVCXPRT *xprt)
 	return (XPRT_IDLE);
 }
 
+static bool_t
+svc_vc_ack(SVCXPRT *xprt, uint32_t *ack)
+{
+
+	sx_slock(&xprt->xp_snd_lock);
+	*ack = xprt->xp_snd_cnt - xprt->xp_socket->so_snd.sb_cc;
+	sx_sunlock(&xprt->xp_snd_lock);
+	return (TRUE);
+}
+
 static enum xprt_stat
 svc_vc_backchannel_stat(SVCXPRT *xprt)
 {
@@ -785,12 +804,12 @@ svc_vc_backchannel_recv(SVCXPRT *xprt, s
 
 static bool_t
 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
-    struct sockaddr *addr, struct mbuf *m)
+    struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
 {
 	XDR xdrs;
 	struct mbuf *mrep;
 	bool_t stat = TRUE;
-	int error;
+	int error, len;
 
 	/*
 	 * Leave space for record mark.
@@ -817,14 +836,19 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_m
 		 * Prepend a record marker containing the reply length.
 		 */
 		M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
+		len = mrep->m_pkthdr.len;
 		*mtod(mrep, uint32_t *) =
-			htonl(0x80000000 | (mrep->m_pkthdr.len
-				- sizeof(uint32_t)));
+			htonl(0x80000000 | (len - sizeof(uint32_t)));
+		sx_xlock(&xprt->xp_snd_lock);
 		error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
 		    0, curthread);
 		if (!error) {
+			xprt->xp_snd_cnt += len;
+			if (seq)
+				*seq = xprt->xp_snd_cnt;
 			stat = TRUE;
 		}
+		sx_xunlock(&xprt->xp_snd_lock);
 	} else {
 		m_freem(mrep);
 	}
@@ -837,7 +861,7 @@ svc_vc_reply(SVCXPRT *xprt, struct rpc_m
 
 static bool_t
 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
-    struct sockaddr *addr, struct mbuf *m)
+    struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
 {
 	struct ct_data *ct;
 	XDR xdrs;


More information about the svn-src-all mailing list