svn commit: r259632 - head/sys/rpc

Alexander Motin mav at FreeBSD.org
Thu Dec 19 21:31:29 UTC 2013


Author: mav
Date: Thu Dec 19 21:31:28 2013
New Revision: 259632
URL: http://svnweb.freebsd.org/changeset/base/259632

Log:
  Rework flow control for connection-oriented (TCP) RPC server.
  
    When processing receive buffer, write the amount of data, expected
  in present request record, into socket's so_rcv.sb_lowat to make stack
  aware about our needs.  When processing following upcalls, ignore them
  until socket collect enough data to be read and processed in one turn.
    This change reduces number of context switches and other operations
  in RPC stack during large NFS writes (especially via non-Jumbo networks)
  by order of magnitude.
  
    After precessing current packet, take another look into the pending
  buffer to find out whether the next packet had been already received.
  If not, deactivate this port right there without making RPC code to
  push this port to another thread just to find that there is nothing.
  If the next packet is received partially, also deactivate the port, but
  also update socket's so_rcv.sb_lowat to not be woken up prematurely.
    This change additionally reduces number of context switches per NFS
  request about in half.

Modified:
  head/sys/rpc/svc_vc.c

Modified: head/sys/rpc/svc_vc.c
==============================================================================
--- head/sys/rpc/svc_vc.c	Thu Dec 19 21:03:08 2013	(r259631)
+++ head/sys/rpc/svc_vc.c	Thu Dec 19 21:31:28 2013	(r259632)
@@ -381,15 +381,11 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, st
 		 * We must re-test for new connections after taking
 		 * the lock to protect us in the case where a new
 		 * connection arrives after our call to accept fails
-		 * with EWOULDBLOCK. The pool lock protects us from
-		 * racing the upcall after our TAILQ_EMPTY() call
-		 * returns false.
+		 * with EWOULDBLOCK.
 		 */
 		ACCEPT_LOCK();
-		mtx_lock(&xprt->xp_pool->sp_lock);
 		if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
-			xprt_inactive_locked(xprt);
-		mtx_unlock(&xprt->xp_pool->sp_lock);
+			xprt_inactive(xprt);
 		ACCEPT_UNLOCK();
 		sx_xunlock(&xprt->xp_lock);
 		return (FALSE);
@@ -526,35 +522,14 @@ static enum xprt_stat
 svc_vc_stat(SVCXPRT *xprt)
 {
 	struct cf_conn *cd;
-	struct mbuf *m;
-	size_t n;
 
 	cd = (struct cf_conn *)(xprt->xp_p1);
 
 	if (cd->strm_stat == XPRT_DIED)
 		return (XPRT_DIED);
 
-	/*
-	 * Return XPRT_MOREREQS if we have buffered data and we are
-	 * mid-record or if we have enough data for a record
-	 * marker. Since this is only a hint, we read mpending and
-	 * resid outside the lock. We do need to take the lock if we
-	 * have to traverse the mbuf chain.
-	 */
-	if (cd->mpending) {
-		if (cd->resid)
-			return (XPRT_MOREREQS);
-		n = 0;
-		sx_xlock(&xprt->xp_lock);
-		m = cd->mpending;
-		while (m && n < sizeof(uint32_t)) {
-			n += m->m_len;
-			m = m->m_next;
-		}
-		sx_xunlock(&xprt->xp_lock);
-		if (n >= sizeof(uint32_t))
-			return (XPRT_MOREREQS);
-	}
+	if (cd->mreq != NULL && cd->resid == 0 && cd->eor)
+		return (XPRT_MOREREQS);
 
 	if (soreadable(xprt->xp_socket))
 		return (XPRT_MOREREQS);
@@ -575,6 +550,78 @@ svc_vc_backchannel_stat(SVCXPRT *xprt)
 	return (XPRT_IDLE);
 }
 
+/*
+ * If we have an mbuf chain in cd->mpending, try to parse a record from it,
+ * leaving the result in cd->mreq. If we don't have a complete record, leave
+ * the partial result in cd->mreq and try to read more from the socket.
+ */
+static void
+svc_vc_process_pending(SVCXPRT *xprt)
+{
+	struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
+	struct socket *so = xprt->xp_socket;
+	struct mbuf *m;
+
+	/*
+	 * If cd->resid is non-zero, we have part of the
+	 * record already, otherwise we are expecting a record
+	 * marker.
+	 */
+	if (!cd->resid && cd->mpending) {
+		/*
+		 * See if there is enough data buffered to
+		 * make up a record marker. Make sure we can
+		 * handle the case where the record marker is
+		 * split across more than one mbuf.
+		 */
+		size_t n = 0;
+		uint32_t header;
+
+		m = cd->mpending;
+		while (n < sizeof(uint32_t) && m) {
+			n += m->m_len;
+			m = m->m_next;
+		}
+		if (n < sizeof(uint32_t)) {
+			so->so_rcv.sb_lowat = sizeof(uint32_t) - n;
+			return;
+		}
+		m_copydata(cd->mpending, 0, sizeof(header),
+		    (char *)&header);
+		header = ntohl(header);
+		cd->eor = (header & 0x80000000) != 0;
+		cd->resid = header & 0x7fffffff;
+		m_adj(cd->mpending, sizeof(uint32_t));
+	}
+
+	/*
+	 * Start pulling off mbufs from cd->mpending
+	 * until we either have a complete record or
+	 * we run out of data. We use m_split to pull
+	 * data - it will pull as much as possible and
+	 * split the last mbuf if necessary.
+	 */
+	while (cd->mpending && cd->resid) {
+		m = cd->mpending;
+		if (cd->mpending->m_next
+		    || cd->mpending->m_len > cd->resid)
+			cd->mpending = m_split(cd->mpending,
+			    cd->resid, M_WAITOK);
+		else
+			cd->mpending = NULL;
+		if (cd->mreq)
+			m_last(cd->mreq)->m_next = m;
+		else
+			cd->mreq = m;
+		while (m) {
+			cd->resid -= m->m_len;
+			m = m->m_next;
+		}
+	}
+
+	so->so_rcv.sb_lowat = imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2));
+}
+
 static bool_t
 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
     struct sockaddr **addrp, struct mbuf **mp)
@@ -582,6 +629,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
 	struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
 	struct uio uio;
 	struct mbuf *m;
+	struct socket* so = xprt->xp_socket;
 	XDR xdrs;
 	int error, rcvflag;
 
@@ -592,99 +640,40 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
 	sx_xlock(&xprt->xp_lock);
 
 	for (;;) {
-		/*
-		 * If we have an mbuf chain in cd->mpending, try to parse a
-		 * record from it, leaving the result in cd->mreq. If we don't
-		 * have a complete record, leave the partial result in
-		 * cd->mreq and try to read more from the socket.
-		 */
-		if (cd->mpending) {
-			/*
-			 * If cd->resid is non-zero, we have part of the
-			 * record already, otherwise we are expecting a record
-			 * marker.
-			 */
-			if (!cd->resid) {
-				/*
-				 * See if there is enough data buffered to
-				 * make up a record marker. Make sure we can
-				 * handle the case where the record marker is
-				 * split across more than one mbuf.
-				 */
-				size_t n = 0;
-				uint32_t header;
-
-				m = cd->mpending;
-				while (n < sizeof(uint32_t) && m) {
-					n += m->m_len;
-					m = m->m_next;
-				}
-				if (n < sizeof(uint32_t))
-					goto readmore;
-				m_copydata(cd->mpending, 0, sizeof(header),
-				    (char *)&header);
-				header = ntohl(header);
-				cd->eor = (header & 0x80000000) != 0;
-				cd->resid = header & 0x7fffffff;
-				m_adj(cd->mpending, sizeof(uint32_t));
-			}
-
-			/*
-			 * Start pulling off mbufs from cd->mpending
-			 * until we either have a complete record or
-			 * we run out of data. We use m_split to pull
-			 * data - it will pull as much as possible and
-			 * split the last mbuf if necessary.
-			 */
-			while (cd->mpending && cd->resid) {
-				m = cd->mpending;
-				if (cd->mpending->m_next
-				    || cd->mpending->m_len > cd->resid)
-					cd->mpending = m_split(cd->mpending,
-					    cd->resid, M_WAITOK);
-				else
-					cd->mpending = NULL;
-				if (cd->mreq)
-					m_last(cd->mreq)->m_next = m;
-				else
-					cd->mreq = m;
-				while (m) {
-					cd->resid -= m->m_len;
-					m = m->m_next;
-				}
+		/* If we have no request ready, check pending queue. */
+		while (cd->mpending &&
+		    (cd->mreq == NULL || cd->resid != 0 || !cd->eor))
+			svc_vc_process_pending(xprt);
+
+		/* Process and return complete request in cd->mreq. */
+		if (cd->mreq != NULL && cd->resid == 0 && cd->eor) {
+
+			xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
+			cd->mreq = NULL;
+
+			/* Check for next request in a pending queue. */
+			svc_vc_process_pending(xprt);
+			if (cd->mreq == NULL || cd->resid != 0) {
+				SOCKBUF_LOCK(&so->so_rcv);
+				if (!soreadable(so))
+					xprt_inactive(xprt);
+				SOCKBUF_UNLOCK(&so->so_rcv);
 			}
 
-			/*
-			 * If cd->resid is zero now, we have managed to
-			 * receive a record fragment from the stream. Check
-			 * for the end-of-record mark to see if we need more.
-			 */
-			if (cd->resid == 0) {
-				if (!cd->eor)
-					continue;
-
-				/*
-				 * Success - we have a complete record in
-				 * cd->mreq.
-				 */
-				xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
-				cd->mreq = NULL;
-				sx_xunlock(&xprt->xp_lock);
-
-				if (! xdr_callmsg(&xdrs, msg)) {
-					XDR_DESTROY(&xdrs);
-					return (FALSE);
-				}
+			sx_xunlock(&xprt->xp_lock);
 
-				*addrp = NULL;
-				*mp = xdrmbuf_getall(&xdrs);
+			if (! xdr_callmsg(&xdrs, msg)) {
 				XDR_DESTROY(&xdrs);
-
-				return (TRUE);
+				return (FALSE);
 			}
+
+			*addrp = NULL;
+			*mp = xdrmbuf_getall(&xdrs);
+			XDR_DESTROY(&xdrs);
+
+			return (TRUE);
 		}
 
-	readmore:
 		/*
 		 * The socket upcall calls xprt_active() which will eventually
 		 * cause the server to call us here. We attempt to
@@ -697,8 +686,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
 		uio.uio_td = curthread;
 		m = NULL;
 		rcvflag = MSG_DONTWAIT;
-		error = soreceive(xprt->xp_socket, NULL, &uio, &m, NULL,
-		    &rcvflag);
+		error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
 
 		if (error == EWOULDBLOCK) {
 			/*
@@ -706,25 +694,23 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms
 			 * taking the lock to protect us in the case
 			 * where a new packet arrives on the socket
 			 * after our call to soreceive fails with
-			 * EWOULDBLOCK. The pool lock protects us from
-			 * racing the upcall after our soreadable()
-			 * call returns false.
+			 * EWOULDBLOCK.
 			 */
-			mtx_lock(&xprt->xp_pool->sp_lock);
-			if (!soreadable(xprt->xp_socket))
-				xprt_inactive_locked(xprt);
-			mtx_unlock(&xprt->xp_pool->sp_lock);
+			SOCKBUF_LOCK(&so->so_rcv);
+			if (!soreadable(so))
+				xprt_inactive(xprt);
+			SOCKBUF_UNLOCK(&so->so_rcv);
 			sx_xunlock(&xprt->xp_lock);
 			return (FALSE);
 		}
 
 		if (error) {
-			SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
+			SOCKBUF_LOCK(&so->so_rcv);
 			if (xprt->xp_upcallset) {
 				xprt->xp_upcallset = 0;
-				soupcall_clear(xprt->xp_socket, SO_RCV);
+				soupcall_clear(so, SO_RCV);
 			}
-			SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
+			SOCKBUF_UNLOCK(&so->so_rcv);
 			xprt_inactive(xprt);
 			cd->strm_stat = XPRT_DIED;
 			sx_xunlock(&xprt->xp_lock);
@@ -908,7 +894,8 @@ svc_vc_soupcall(struct socket *so, void 
 {
 	SVCXPRT *xprt = (SVCXPRT *) arg;
 
-	xprt_active(xprt);
+	if (soreadable(xprt->xp_socket))
+		xprt_active(xprt);
 	return (SU_OK);
 }
 


More information about the svn-src-all mailing list