svn commit: r362455 - head/sys/rpc

Rick Macklem rmacklem at FreeBSD.org
Sun Jun 21 00:06:05 UTC 2020


Author: rmacklem
Date: Sun Jun 21 00:06:04 2020
New Revision: 362455
URL: https://svnweb.freebsd.org/changeset/base/362455

Log:
  Modify the way the client side krpc does soreceive() for TCP.
  
  Without this patch, clnt_vc_soupcall() first does a soreceive() for
  4 bytes (the Sun RPC over TCP record mark) and then soreceive(s) for
  the RPC message.
  This first soreceive() almost always results in an mbuf allocation,
  since having the 4byte record mark in a separate mbuf in the socket
  rcv queue is unlikely.
  This is somewhat inefficient and rather odd. It also will not work
  for the ktls rx, since the latter returns a TLS record for each
  soreceive().
  
  This patch replaces the above with code similar to what the server side
  of the krpc does for TCP, where it does a soreceive() for as much data
  as possible and then parses RPC messages out of the received data.
  A new field of the TCP socket structure called ct_raw is the list of
  received mbufs that the RPC message(s) are parsed from.
  I think this results in cleaner code and is needed for support of
  nfs-over-tls.
  It also fixes the code for the case where a server sends an RPC message
  in multiple RPC message fragments. Although this is allowed by RFC5531,
  no extant NFS server does this. However, it is probably good to fix this
  in case some future NFS server does do this.

Modified:
  head/sys/rpc/clnt_vc.c
  head/sys/rpc/krpc.h

Modified: head/sys/rpc/clnt_vc.c
==============================================================================
--- head/sys/rpc/clnt_vc.c	Sat Jun 20 23:48:57 2020	(r362454)
+++ head/sys/rpc/clnt_vc.c	Sun Jun 21 00:06:04 2020	(r362455)
@@ -269,6 +269,7 @@ clnt_vc_create(
 	soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct);
 	SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
 
+	ct->ct_raw = NULL;
 	ct->ct_record = NULL;
 	ct->ct_record_resid = 0;
 	TAILQ_INIT(&ct->ct_pending);
@@ -826,6 +827,8 @@ clnt_vc_destroy(CLIENT *cl)
 		soshutdown(so, SHUT_WR);
 		soclose(so);
 	}
+	m_freem(ct->ct_record);
+	m_freem(ct->ct_raw);
 	mem_free(ct, sizeof(struct ct_data));
 	if (cl->cl_netid && cl->cl_netid[0])
 		mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
@@ -854,122 +857,118 @@ clnt_vc_soupcall(struct socket *so, void *arg, int wai
 	struct ct_request *cr;
 	int error, rcvflag, foundreq;
 	uint32_t xid_plus_direction[2], header;
-	bool_t do_read;
 	SVCXPRT *xprt;
 	struct cf_conn *cd;
+	u_int rawlen;
 
-	CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t));
+	/*
+	 * If another thread is already here, it must be in
+	 * soreceive(), so just return to avoid races with it.
+	 * ct_upcallrefs is protected by the SOCKBUF_LOCK(),
+	 * which is held in this function, except when
+	 * soreceive() is called.
+	 */
+	if (ct->ct_upcallrefs > 0)
+		return (SU_OK);
 	ct->ct_upcallrefs++;
-	uio.uio_td = curthread;
-	do {
-		/*
-		 * If ct_record_resid is zero, we are waiting for a
-		 * record mark.
-		 */
-		if (ct->ct_record_resid == 0) {
 
+	/*
+	 * Read as much as possible off the socket and link it
+	 * onto ct_raw.
+	 */
+	for (;;) {
+		uio.uio_resid = 1000000000;
+		uio.uio_td = curthread;
+		m2 = m = NULL;
+		rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
+		SOCKBUF_UNLOCK(&so->so_rcv);
+		error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
+		SOCKBUF_LOCK(&so->so_rcv);
+
+		if (error == EWOULDBLOCK) {
 			/*
-			 * Make sure there is either a whole record
-			 * mark in the buffer or there is some other
-			 * error condition
+			 * We must re-test for readability after
+			 * taking the lock to protect us in the case
+			 * where a new packet arrives on the socket
+			 * after our call to soreceive fails with
+			 * EWOULDBLOCK.
 			 */
-			do_read = FALSE;
-			if (sbavail(&so->so_rcv) >= sizeof(uint32_t)
-			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
-			    || so->so_error)
-				do_read = TRUE;
-
-			if (!do_read)
+			error = 0;
+			if (!soreadable(so))
 				break;
+			continue;
+		}
+		if (error == 0 && m == NULL) {
+			/*
+			 * We must have got EOF trying
+			 * to read from the stream.
+			 */
+			error = ECONNRESET;
+		}
+		if (error != 0)
+			break;
 
-			SOCKBUF_UNLOCK(&so->so_rcv);
-			uio.uio_resid = sizeof(uint32_t);
-			m = NULL;
-			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
-			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
-			SOCKBUF_LOCK(&so->so_rcv);
+		if (ct->ct_raw != NULL)
+			m_last(ct->ct_raw)->m_next = m;
+		else
+			ct->ct_raw = m;
+	}
+	rawlen = m_length(ct->ct_raw, NULL);
 
-			if (error == EWOULDBLOCK)
+	/* Now, process as much of ct_raw as possible. */
+	for (;;) {
+		/*
+		 * If ct_record_resid is zero, we are waiting for a
+		 * record mark.
+		 */
+		if (ct->ct_record_resid == 0) {
+			if (rawlen < sizeof(uint32_t))
 				break;
-			
-			/*
-			 * If there was an error, wake up all pending
-			 * requests.
-			 */
-			if (error || uio.uio_resid > 0) {
-			wakeup_all:
-				mtx_lock(&ct->ct_lock);
-				if (!error) {
-					/*
-					 * We must have got EOF trying
-					 * to read from the stream.
-					 */
-					error = ECONNRESET;
-				}
-				ct->ct_error.re_status = RPC_CANTRECV;
-				ct->ct_error.re_errno = error;
-				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
-					cr->cr_error = error;
-					wakeup(cr);
-				}
-				mtx_unlock(&ct->ct_lock);
-				break;
-			}
-			m_copydata(m, 0, sizeof(uint32_t), (char *)&header);
+			m_copydata(ct->ct_raw, 0, sizeof(uint32_t),
+			    (char *)&header);
 			header = ntohl(header);
-			ct->ct_record = NULL;
 			ct->ct_record_resid = header & 0x7fffffff;
 			ct->ct_record_eor = ((header & 0x80000000) != 0);
-			m_freem(m);
+			m_adj(ct->ct_raw, sizeof(uint32_t));
+			rawlen -= sizeof(uint32_t);
 		} else {
 			/*
-			 * Wait until the socket has the whole record
-			 * buffered.
+			 * Move as much of the record as possible to
+			 * ct_record.
 			 */
-			do_read = FALSE;
-			if (sbavail(&so->so_rcv) >= ct->ct_record_resid
-			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
-			    || so->so_error)
-				do_read = TRUE;
-
-			if (!do_read)
+			if (rawlen == 0)
 				break;
-
-			/*
-			 * We have the record mark. Read as much as
-			 * the socket has buffered up to the end of
-			 * this record.
-			 */
-			SOCKBUF_UNLOCK(&so->so_rcv);
-			uio.uio_resid = ct->ct_record_resid;
-			m = NULL;
-			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
-			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
-			SOCKBUF_LOCK(&so->so_rcv);
-
-			if (error == EWOULDBLOCK)
+			if (rawlen <= ct->ct_record_resid) {
+				if (ct->ct_record != NULL)
+					m_last(ct->ct_record)->m_next =
+					    ct->ct_raw;
+				else
+					ct->ct_record = ct->ct_raw;
+				ct->ct_raw = NULL;
+				ct->ct_record_resid -= rawlen;
+				rawlen = 0;
+			} else {
+				m = m_split(ct->ct_raw, ct->ct_record_resid,
+				    M_NOWAIT);
+				if (m == NULL)
+					break;
+				if (ct->ct_record != NULL)
+					m_last(ct->ct_record)->m_next =
+					    ct->ct_raw;
+				else
+					ct->ct_record = ct->ct_raw;
+				rawlen -= ct->ct_record_resid;
+				ct->ct_record_resid = 0;
+				ct->ct_raw = m;
+			}
+			if (ct->ct_record_resid > 0)
 				break;
 
-			if (error || uio.uio_resid == ct->ct_record_resid)
-				goto wakeup_all;
-
 			/*
-			 * If we have part of the record already,
-			 * chain this bit onto the end.
-			 */
-			if (ct->ct_record)
-				m_last(ct->ct_record)->m_next = m;
-			else
-				ct->ct_record = m;
-
-			ct->ct_record_resid = uio.uio_resid;
-
-			/*
 			 * If we have the entire record, see if we can
 			 * match it to a request.
 			 */
-			if (ct->ct_record_resid == 0
-			    && ct->ct_record_eor) {
+			if (ct->ct_record_eor) {
 				/*
 				 * The XID is in the first uint32_t of
 				 * the reply and the message direction
@@ -979,8 +978,20 @@ clnt_vc_soupcall(struct socket *so, void *arg, int wai
 				    sizeof(xid_plus_direction) &&
 				    m_length(ct->ct_record, NULL) <
 				    sizeof(xid_plus_direction)) {
-					m_freem(ct->ct_record);
-					break;
+					/*
+					 * What to do now?
+					 * The data in the TCP stream is
+					 * corrupted such that there is no
+					 * valid RPC message to parse.
+					 * I think it best to close this
+					 * connection and allow
+					 * clnt_reconnect_XXX() to try
+					 * and establish a new one.
+					 */
+					printf("clnt_vc_soupcall: "
+					    "connection data corrupted\n");
+					error = ECONNRESET;
+					goto wakeup_all;
 				}
 				m_copydata(ct->ct_record, 0,
 				    sizeof(xid_plus_direction),
@@ -1057,7 +1068,26 @@ clnt_vc_soupcall(struct socket *so, void *arg, int wai
 				}
 			}
 		}
-	} while (m);
+	}
+
+	if (error != 0) {
+	wakeup_all:
+		/*
+		 * This socket is broken, so mark that it cannot
+		 * receive and fail all RPCs waiting for a reply
+		 * on it, so that they will be retried on a new
+		 * TCP connection created by clnt_reconnect_X().
+		 */
+		mtx_lock(&ct->ct_lock);
+		ct->ct_error.re_status = RPC_CANTRECV;
+		ct->ct_error.re_errno = error;
+		TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
+			cr->cr_error = error;
+			wakeup(cr);
+		}
+		mtx_unlock(&ct->ct_lock);
+	}
+
 	ct->ct_upcallrefs--;
 	if (ct->ct_upcallrefs < 0)
 		panic("rpcvc upcall refcnt");

Modified: head/sys/rpc/krpc.h
==============================================================================
--- head/sys/rpc/krpc.h	Sat Jun 20 23:48:57 2020	(r362454)
+++ head/sys/rpc/krpc.h	Sun Jun 21 00:06:04 2020	(r362455)
@@ -101,6 +101,7 @@ struct ct_data {
 	struct ct_request_list ct_pending;
 	int		ct_upcallrefs;	/* Ref cnt of upcalls in prog. */
 	SVCXPRT		*ct_backchannelxprt; /* xprt for backchannel */
+	struct mbuf	*ct_raw;	/* Raw mbufs recv'd */
 };
 
 struct cf_conn {  /* kept in xprt->xp_p1 for actual connection */


More information about the svn-src-all mailing list