git: eba13bbc37ab - main - cxgbe: Support TCP_USE_DDP on offloaded TOE connections

From: John Baldwin <jhb_at_FreeBSD.org>
Date: Wed, 20 Mar 2024 22:34:38 UTC
The branch main has been updated by jhb:

URL: https://cgit.FreeBSD.org/src/commit/?id=eba13bbc37ab4f45a8a3502d59c37d56d9a04ca5

commit eba13bbc37ab4f45a8a3502d59c37d56d9a04ca5
Author:     John Baldwin <jhb@FreeBSD.org>
AuthorDate: 2024-03-20 22:29:28 +0000
Commit:     John Baldwin <jhb@FreeBSD.org>
CommitDate: 2024-03-20 22:29:28 +0000

    cxgbe: Support TCP_USE_DDP on offloaded TOE connections
    
    When this socket option is enabled, relatively large contiguous
    buffers are allocated and used to receive data from the remote
    connection.  When data is received a wrapper M_EXT mbuf is queued to
    the socket's receive buffer.  This reduces the length of the linked
    list of received mbufs and allows consumers to consume receive data in
    larger chunks.
    
    To minimize reprogramming the page pods in the adapter, receive
    buffers for a given connection are recycled.  When a buffer has been
    fully consumed by the receiver and freed, the buffer is placed on a
    per-connection free buffers list.
    
    The size of the receive buffers defaults to 256k and can be set via
    the hw.cxgbe.toe.ddp_rcvbuf_len sysctl.  The
    hw.cxgbe.toe.ddp_rcvbuf_cache sysctl (defaults to 4) determines the
    maximum number of free buffers cached per connection.  Note that this
    limit does not apply to "in-flight" receive buffers that are
    associated with mbufs in the socket's receive buffer.
    
    Co-authored-by: Navdeep Parhar <np@FreeBSD.org>
    Sponsored by:   Chelsio Communications
    Differential Revision:  https://reviews.freebsd.org/D44001
---
 sys/dev/cxgbe/adapter.h       |   6 +
 sys/dev/cxgbe/t4_main.c       |  13 +
 sys/dev/cxgbe/t4_sge.c        |  18 +
 sys/dev/cxgbe/tom/t4_cpl_io.c |   7 +-
 sys/dev/cxgbe/tom/t4_ddp.c    | 786 +++++++++++++++++++++++++++++++++++++++---
 sys/dev/cxgbe/tom/t4_tom.c    |  31 ++
 sys/dev/cxgbe/tom/t4_tom.h    |  51 ++-
 7 files changed, 853 insertions(+), 59 deletions(-)

diff --git a/sys/dev/cxgbe/adapter.h b/sys/dev/cxgbe/adapter.h
index 3157d08cc67b..b9780809421f 100644
--- a/sys/dev/cxgbe/adapter.h
+++ b/sys/dev/cxgbe/adapter.h
@@ -690,6 +690,10 @@ struct sge_ofld_rxq {
 	uint64_t rx_aio_ddp_octets;
 	u_long	rx_toe_tls_records;
 	u_long	rx_toe_tls_octets;
+	u_long	rx_toe_ddp_octets;
+	counter_u64_t ddp_buffer_alloc;
+	counter_u64_t ddp_buffer_reuse;
+	counter_u64_t ddp_buffer_free;
 } __aligned(CACHE_LINE_SIZE);
 
 static inline struct sge_ofld_rxq *
@@ -1344,6 +1348,8 @@ extern int t4_tmr_idx;
 extern int t4_pktc_idx;
 extern unsigned int t4_qsize_rxq;
 extern unsigned int t4_qsize_txq;
+extern int t4_ddp_rcvbuf_len;
+extern unsigned int t4_ddp_rcvbuf_cache;
 extern device_method_t cxgbe_methods[];
 
 int t4_os_find_pci_capability(struct adapter *, int);
diff --git a/sys/dev/cxgbe/t4_main.c b/sys/dev/cxgbe/t4_main.c
index e20c697d2865..6cb434295b42 100644
--- a/sys/dev/cxgbe/t4_main.c
+++ b/sys/dev/cxgbe/t4_main.c
@@ -412,6 +412,15 @@ SYSCTL_INT(_hw_cxgbe_toe_rexmt_backoff, OID_AUTO, 14, CTLFLAG_RDTUN,
     &t4_toe_rexmt_backoff[14], 0, "");
 SYSCTL_INT(_hw_cxgbe_toe_rexmt_backoff, OID_AUTO, 15, CTLFLAG_RDTUN,
     &t4_toe_rexmt_backoff[15], 0, "");
+
+int t4_ddp_rcvbuf_len = 256 * 1024;
+SYSCTL_INT(_hw_cxgbe_toe, OID_AUTO, ddp_rcvbuf_len, CTLFLAG_RWTUN,
+    &t4_ddp_rcvbuf_len, 0, "length of each DDP RX buffer");
+
+unsigned int t4_ddp_rcvbuf_cache = 4;
+SYSCTL_UINT(_hw_cxgbe_toe, OID_AUTO, ddp_rcvbuf_cache, CTLFLAG_RWTUN,
+    &t4_ddp_rcvbuf_cache, 0,
+    "maximum number of free DDP RX buffers to cache per connection");
 #endif
 
 #ifdef DEV_NETMAP
@@ -12046,6 +12055,10 @@ clear_stats(struct adapter *sc, u_int port_id)
 				ofld_rxq->rx_aio_ddp_octets = 0;
 				ofld_rxq->rx_toe_tls_records = 0;
 				ofld_rxq->rx_toe_tls_octets = 0;
+				ofld_rxq->rx_toe_ddp_octets = 0;
+				counter_u64_zero(ofld_rxq->ddp_buffer_alloc);
+				counter_u64_zero(ofld_rxq->ddp_buffer_reuse);
+				counter_u64_zero(ofld_rxq->ddp_buffer_free);
 			}
 #endif
 
diff --git a/sys/dev/cxgbe/t4_sge.c b/sys/dev/cxgbe/t4_sge.c
index 76293b06a6a9..2ef05b5a9f86 100644
--- a/sys/dev/cxgbe/t4_sge.c
+++ b/sys/dev/cxgbe/t4_sge.c
@@ -4098,6 +4098,9 @@ alloc_ofld_rxq(struct vi_info *vi, struct sge_ofld_rxq *ofld_rxq, int idx,
 		ofld_rxq->rx_iscsi_ddp_setup_ok = counter_u64_alloc(M_WAITOK);
 		ofld_rxq->rx_iscsi_ddp_setup_error =
 		    counter_u64_alloc(M_WAITOK);
+		ofld_rxq->ddp_buffer_alloc = counter_u64_alloc(M_WAITOK);
+		ofld_rxq->ddp_buffer_reuse = counter_u64_alloc(M_WAITOK);
+		ofld_rxq->ddp_buffer_free = counter_u64_alloc(M_WAITOK);
 		add_ofld_rxq_sysctls(&vi->ctx, oid, ofld_rxq);
 	}
 
@@ -4132,6 +4135,9 @@ free_ofld_rxq(struct vi_info *vi, struct sge_ofld_rxq *ofld_rxq)
 		MPASS(!(ofld_rxq->iq.flags & IQ_SW_ALLOCATED));
 		counter_u64_free(ofld_rxq->rx_iscsi_ddp_setup_ok);
 		counter_u64_free(ofld_rxq->rx_iscsi_ddp_setup_error);
+		counter_u64_free(ofld_rxq->ddp_buffer_alloc);
+		counter_u64_free(ofld_rxq->ddp_buffer_reuse);
+		counter_u64_free(ofld_rxq->ddp_buffer_free);
 		bzero(ofld_rxq, sizeof(*ofld_rxq));
 	}
 }
@@ -4158,6 +4164,18 @@ add_ofld_rxq_sysctls(struct sysctl_ctx_list *ctx, struct sysctl_oid *oid,
 	SYSCTL_ADD_ULONG(ctx, children, OID_AUTO,
 	    "rx_toe_tls_octets", CTLFLAG_RD, &ofld_rxq->rx_toe_tls_octets,
 	    "# of payload octets in received TOE TLS records");
+	SYSCTL_ADD_ULONG(ctx, children, OID_AUTO,
+	    "rx_toe_ddp_octets", CTLFLAG_RD, &ofld_rxq->rx_toe_ddp_octets,
+	    "# of payload octets received via TCP DDP");
+	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
+	    "ddp_buffer_alloc", CTLFLAG_RD, &ofld_rxq->ddp_buffer_alloc,
+	    "# of DDP RCV buffers allocated");
+	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
+	    "ddp_buffer_reuse", CTLFLAG_RD, &ofld_rxq->ddp_buffer_reuse,
+	    "# of DDP RCV buffers reused");
+	SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
+	    "ddp_buffer_free", CTLFLAG_RD, &ofld_rxq->ddp_buffer_free,
+	    "# of DDP RCV buffers freed");
 
 	oid = SYSCTL_ADD_NODE(ctx, children, OID_AUTO, "iscsi",
 	    CTLFLAG_RD | CTLFLAG_MPSAFE, NULL, "TOE iSCSI statistics");
diff --git a/sys/dev/cxgbe/tom/t4_cpl_io.c b/sys/dev/cxgbe/tom/t4_cpl_io.c
index 4d61189f5fe3..842e72bf8b2b 100644
--- a/sys/dev/cxgbe/tom/t4_cpl_io.c
+++ b/sys/dev/cxgbe/tom/t4_cpl_io.c
@@ -1352,8 +1352,6 @@ do_peer_close(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
 	if (toep->flags & TPF_ABORT_SHUTDOWN)
 		goto done;
 
-	so = inp->inp_socket;
-	socantrcvmore(so);
 	if (ulp_mode(toep) == ULP_MODE_TCPDDP) {
 		DDP_LOCK(toep);
 		if (__predict_false(toep->ddp.flags &
@@ -1361,6 +1359,8 @@ do_peer_close(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
 			handle_ddp_close(toep, tp, cpl->rcv_nxt);
 		DDP_UNLOCK(toep);
 	}
+	so = inp->inp_socket;
+	socantrcvmore(so);
 
 	if (ulp_mode(toep) == ULP_MODE_RDMA ||
 	    (ulp_mode(toep) == ULP_MODE_ISCSI && chip_id(sc) >= CHELSIO_T6)) {
@@ -1782,7 +1782,8 @@ do_rx_data(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
 	sbappendstream_locked(sb, m, 0);
 	t4_rcvd_locked(&toep->td->tod, tp);
 
-	if (ulp_mode(toep) == ULP_MODE_TCPDDP && toep->ddp.waiting_count > 0 &&
+	if (ulp_mode(toep) == ULP_MODE_TCPDDP &&
+	    (toep->ddp.flags & DDP_AIO) != 0 && toep->ddp.waiting_count > 0 &&
 	    sbavail(sb) != 0) {
 		CTR2(KTR_CXGBE, "%s: tid %u queueing AIO task", __func__,
 		    tid);
diff --git a/sys/dev/cxgbe/tom/t4_ddp.c b/sys/dev/cxgbe/tom/t4_ddp.c
index 1cd1ea68826c..c1d4af45fd70 100644
--- a/sys/dev/cxgbe/tom/t4_ddp.c
+++ b/sys/dev/cxgbe/tom/t4_ddp.c
@@ -81,6 +81,10 @@ static void aio_ddp_requeue_task(void *context, int pending);
 static void ddp_complete_all(struct toepcb *toep, int error);
 static void t4_aio_cancel_active(struct kaiocb *job);
 static void t4_aio_cancel_queued(struct kaiocb *job);
+static int t4_alloc_page_pods_for_rcvbuf(struct ppod_region *pr,
+    struct ddp_rcv_buffer *drb);
+static int t4_write_page_pods_for_rcvbuf(struct adapter *sc,
+    struct sge_wrq *wrq, int tid, struct ddp_rcv_buffer *drb);
 
 static TAILQ_HEAD(, pageset) ddp_orphan_pagesets;
 static struct mtx ddp_orphan_pagesets_lock;
@@ -89,15 +93,15 @@ static struct task ddp_orphan_task;
 #define MAX_DDP_BUFFER_SIZE		(M_TCB_RX_DDP_BUF0_LEN)
 
 /*
- * A page set holds information about a buffer used for DDP.  The page
- * set holds resources such as the VM pages backing the buffer (either
- * held or wired) and the page pods associated with the buffer.
- * Recently used page sets are cached to allow for efficient reuse of
- * buffers (avoiding the need to re-fault in pages, hold them, etc.).
- * Note that cached page sets keep the backing pages wired.  The
- * number of wired pages is capped by only allowing for two wired
- * pagesets per connection.  This is not a perfect cap, but is a
- * trade-off for performance.
+ * A page set holds information about a user buffer used for AIO DDP.
+ * The page set holds resources such as the VM pages backing the
+ * buffer (either held or wired) and the page pods associated with the
+ * buffer.  Recently used page sets are cached to allow for efficient
+ * reuse of buffers (avoiding the need to re-fault in pages, hold
+ * them, etc.).  Note that cached page sets keep the backing pages
+ * wired.  The number of wired pages is capped by only allowing for
+ * two wired pagesets per connection.  This is not a perfect cap, but
+ * is a trade-off for performance.
  *
  * If an application ping-pongs two buffers for a connection via
  * aio_read(2) then those buffers should remain wired and expensive VM
@@ -174,8 +178,99 @@ ddp_complete_one(struct kaiocb *job, int error)
 }
 
 static void
-free_ddp_buffer(struct tom_data *td, struct ddp_buffer *db)
+free_ddp_rcv_buffer(struct toepcb *toep, struct ddp_rcv_buffer *drb)
 {
+	t4_free_page_pods(&drb->prsv);
+	contigfree(drb->buf, drb->len, M_CXGBE);
+	free(drb, M_CXGBE);
+	counter_u64_add(toep->ofld_rxq->ddp_buffer_free, 1);
+	free_toepcb(toep);
+}
+
+static void
+recycle_ddp_rcv_buffer(struct toepcb *toep, struct ddp_rcv_buffer *drb)
+{
+	DDP_CACHE_LOCK(toep);
+	if (!(toep->ddp.flags & DDP_DEAD) &&
+	    toep->ddp.cached_count < t4_ddp_rcvbuf_cache) {
+		TAILQ_INSERT_HEAD(&toep->ddp.cached_buffers, drb, link);
+		toep->ddp.cached_count++;
+		DDP_CACHE_UNLOCK(toep);
+	} else {
+		DDP_CACHE_UNLOCK(toep);
+		free_ddp_rcv_buffer(toep, drb);
+	}
+}
+
+static struct ddp_rcv_buffer *
+alloc_cached_ddp_rcv_buffer(struct toepcb *toep)
+{
+	struct ddp_rcv_buffer *drb;
+
+	DDP_CACHE_LOCK(toep);
+	if (!TAILQ_EMPTY(&toep->ddp.cached_buffers)) {
+		drb = TAILQ_FIRST(&toep->ddp.cached_buffers);
+		TAILQ_REMOVE(&toep->ddp.cached_buffers, drb, link);
+		toep->ddp.cached_count--;
+		counter_u64_add(toep->ofld_rxq->ddp_buffer_reuse, 1);
+	} else
+		drb = NULL;
+	DDP_CACHE_UNLOCK(toep);
+	return (drb);
+}
+
+static struct ddp_rcv_buffer *
+alloc_ddp_rcv_buffer(struct toepcb *toep, int how)
+{
+	struct tom_data *td = toep->td;
+	struct adapter *sc = td_adapter(td);
+	struct ddp_rcv_buffer *drb;
+	int error;
+
+	drb = malloc(sizeof(*drb), M_CXGBE, how | M_ZERO);
+	if (drb == NULL)
+		return (NULL);
+
+	drb->buf = contigmalloc(t4_ddp_rcvbuf_len, M_CXGBE, how, 0, ~0,
+	    t4_ddp_rcvbuf_len, 0);
+	if (drb->buf == NULL) {
+		free(drb, M_CXGBE);
+		return (NULL);
+	}
+	drb->len = t4_ddp_rcvbuf_len;
+	drb->refs = 1;
+
+	error = t4_alloc_page_pods_for_rcvbuf(&td->pr, drb);
+	if (error != 0) {
+		contigfree(drb->buf, drb->len, M_CXGBE);
+		free(drb, M_CXGBE);
+		return (NULL);
+	}
+
+	error = t4_write_page_pods_for_rcvbuf(sc, toep->ctrlq, toep->tid, drb);
+	if (error != 0) {
+		t4_free_page_pods(&drb->prsv);
+		contigfree(drb->buf, drb->len, M_CXGBE);
+		free(drb, M_CXGBE);
+		return (NULL);
+	}
+
+	hold_toepcb(toep);
+	counter_u64_add(toep->ofld_rxq->ddp_buffer_alloc, 1);
+	return (drb);
+}
+
+static void
+free_ddp_buffer(struct toepcb *toep, struct ddp_buffer *db)
+{
+	if ((toep->ddp.flags & DDP_RCVBUF) != 0) {
+		if (db->drb != NULL)
+			free_ddp_rcv_buffer(toep, db->drb);
+#ifdef INVARIANTS
+		db->drb = NULL;
+#endif
+		return;
+	}
 
 	if (db->job) {
 		/*
@@ -192,7 +287,7 @@ free_ddp_buffer(struct tom_data *td, struct ddp_buffer *db)
 	}
 
 	if (db->ps) {
-		free_pageset(td, db->ps);
+		free_pageset(toep->td, db->ps);
 #ifdef INVARIANTS
 		db->ps = NULL;
 #endif
@@ -203,11 +298,10 @@ static void
 ddp_init_toep(struct toepcb *toep)
 {
 
-	TAILQ_INIT(&toep->ddp.aiojobq);
-	TASK_INIT(&toep->ddp.requeue_task, 0, aio_ddp_requeue_task, toep);
 	toep->ddp.flags = DDP_OK;
 	toep->ddp.active_id = -1;
 	mtx_init(&toep->ddp.lock, "t4 ddp", NULL, MTX_DEF);
+	mtx_init(&toep->ddp.cache_lock, "t4 ddp cache", NULL, MTX_DEF);
 }
 
 void
@@ -215,24 +309,38 @@ ddp_uninit_toep(struct toepcb *toep)
 {
 
 	mtx_destroy(&toep->ddp.lock);
+	mtx_destroy(&toep->ddp.cache_lock);
 }
 
 void
 release_ddp_resources(struct toepcb *toep)
 {
+	struct ddp_rcv_buffer *drb;
 	struct pageset *ps;
 	int i;
 
 	DDP_LOCK(toep);
+	DDP_CACHE_LOCK(toep);
 	toep->ddp.flags |= DDP_DEAD;
+	DDP_CACHE_UNLOCK(toep);
 	for (i = 0; i < nitems(toep->ddp.db); i++) {
-		free_ddp_buffer(toep->td, &toep->ddp.db[i]);
+		free_ddp_buffer(toep, &toep->ddp.db[i]);
 	}
-	while ((ps = TAILQ_FIRST(&toep->ddp.cached_pagesets)) != NULL) {
-		TAILQ_REMOVE(&toep->ddp.cached_pagesets, ps, link);
-		free_pageset(toep->td, ps);
+	if ((toep->ddp.flags & DDP_AIO) != 0) {
+		while ((ps = TAILQ_FIRST(&toep->ddp.cached_pagesets)) != NULL) {
+			TAILQ_REMOVE(&toep->ddp.cached_pagesets, ps, link);
+			free_pageset(toep->td, ps);
+		}
+		ddp_complete_all(toep, 0);
+	}
+	if ((toep->ddp.flags & DDP_RCVBUF) != 0) {
+		DDP_CACHE_LOCK(toep);
+		while ((drb = TAILQ_FIRST(&toep->ddp.cached_buffers)) != NULL) {
+			TAILQ_REMOVE(&toep->ddp.cached_buffers, drb, link);
+			free_ddp_rcv_buffer(toep, drb);
+		}
+		DDP_CACHE_UNLOCK(toep);
 	}
-	ddp_complete_all(toep, 0);
 	DDP_UNLOCK(toep);
 }
 
@@ -242,13 +350,20 @@ ddp_assert_empty(struct toepcb *toep)
 {
 	int i;
 
-	MPASS(!(toep->ddp.flags & DDP_TASK_ACTIVE));
+	MPASS((toep->ddp.flags & (DDP_TASK_ACTIVE | DDP_DEAD)) != DDP_TASK_ACTIVE);
 	for (i = 0; i < nitems(toep->ddp.db); i++) {
-		MPASS(toep->ddp.db[i].job == NULL);
-		MPASS(toep->ddp.db[i].ps == NULL);
+		if ((toep->ddp.flags & DDP_AIO) != 0) {
+			MPASS(toep->ddp.db[i].job == NULL);
+			MPASS(toep->ddp.db[i].ps == NULL);
+		} else
+			MPASS(toep->ddp.db[i].drb == NULL);
+	}
+	if ((toep->ddp.flags & DDP_AIO) != 0) {
+		MPASS(TAILQ_EMPTY(&toep->ddp.cached_pagesets));
+		MPASS(TAILQ_EMPTY(&toep->ddp.aiojobq));
 	}
-	MPASS(TAILQ_EMPTY(&toep->ddp.cached_pagesets));
-	MPASS(TAILQ_EMPTY(&toep->ddp.aiojobq));
+	if ((toep->ddp.flags & DDP_RCVBUF) != 0)
+		MPASS(TAILQ_EMPTY(&toep->ddp.cached_buffers));
 }
 #endif
 
@@ -256,13 +371,18 @@ static void
 complete_ddp_buffer(struct toepcb *toep, struct ddp_buffer *db,
     unsigned int db_idx)
 {
+	struct ddp_rcv_buffer *drb;
 	unsigned int db_flag;
 
 	toep->ddp.active_count--;
 	if (toep->ddp.active_id == db_idx) {
 		if (toep->ddp.active_count == 0) {
-			KASSERT(toep->ddp.db[db_idx ^ 1].job == NULL,
-			    ("%s: active_count mismatch", __func__));
+			if ((toep->ddp.flags & DDP_AIO) != 0)
+				KASSERT(toep->ddp.db[db_idx ^ 1].job == NULL,
+				    ("%s: active_count mismatch", __func__));
+			else
+				KASSERT(toep->ddp.db[db_idx ^ 1].drb == NULL,
+				    ("%s: active_count mismatch", __func__));
 			toep->ddp.active_id = -1;
 		} else
 			toep->ddp.active_id ^= 1;
@@ -276,10 +396,18 @@ complete_ddp_buffer(struct toepcb *toep, struct ddp_buffer *db,
 		    ("%s: active count mismatch", __func__));
 	}
 
-	db->cancel_pending = 0;
-	db->job = NULL;
-	recycle_pageset(toep, db->ps);
-	db->ps = NULL;
+	if ((toep->ddp.flags & DDP_AIO) != 0) {
+		db->cancel_pending = 0;
+		db->job = NULL;
+		recycle_pageset(toep, db->ps);
+		db->ps = NULL;
+	} else {
+		drb = db->drb;
+		if (atomic_fetchadd_int(&drb->refs, -1) == 1)
+			recycle_ddp_rcv_buffer(toep, drb);
+		db->drb = NULL;
+		db->placed = 0;
+	}
 
 	db_flag = db_idx == 1 ? DDP_BUF1_ACTIVE : DDP_BUF0_ACTIVE;
 	KASSERT(toep->ddp.flags & db_flag,
@@ -288,6 +416,47 @@ complete_ddp_buffer(struct toepcb *toep, struct ddp_buffer *db,
 	toep->ddp.flags &= ~db_flag;
 }
 
+/* Called when m_free drops the last reference. */
+static void
+ddp_rcv_mbuf_done(struct mbuf *m)
+{
+	struct toepcb *toep = m->m_ext.ext_arg1;
+	struct ddp_rcv_buffer *drb = m->m_ext.ext_arg2;
+
+	recycle_ddp_rcv_buffer(toep, drb);
+}
+
+static void
+queue_ddp_rcvbuf_mbuf(struct toepcb *toep, u_int db_idx, u_int len)
+{
+	struct inpcb *inp = toep->inp;
+	struct sockbuf *sb;
+	struct ddp_buffer *db;
+	struct ddp_rcv_buffer *drb;
+	struct mbuf *m;
+
+	m = m_gethdr(M_NOWAIT, MT_DATA);
+	if (m == NULL) {
+		printf("%s: failed to allocate mbuf", __func__);
+		return;
+	}
+	m->m_pkthdr.rcvif = toep->vi->ifp;
+
+	db = &toep->ddp.db[db_idx];
+	drb = db->drb;
+	m_extaddref(m, (char *)drb->buf + db->placed, len, &drb->refs,
+	    ddp_rcv_mbuf_done, toep, drb);
+	m->m_pkthdr.len = len;
+	m->m_len = len;
+
+	sb = &inp->inp_socket->so_rcv;
+	SOCKBUF_LOCK_ASSERT(sb);
+	sbappendstream_locked(sb, m, 0);
+
+	db->placed += len;
+	toep->ofld_rxq->rx_toe_ddp_octets += len;
+}
+
 /* XXX: handle_ddp_data code duplication */
 void
 insert_ddp_data(struct toepcb *toep, uint32_t n)
@@ -302,10 +471,12 @@ insert_ddp_data(struct toepcb *toep, uint32_t n)
 #ifdef INVARIANTS
 	unsigned int db_flag;
 #endif
+	bool ddp_rcvbuf;
 
 	INP_WLOCK_ASSERT(inp);
 	DDP_ASSERT_LOCKED(toep);
 
+	ddp_rcvbuf = (toep->ddp.flags & DDP_RCVBUF) != 0;
 	tp->rcv_nxt += n;
 #ifndef USE_DDP_RX_FLOW_CONTROL
 	KASSERT(tp->rcv_wnd >= n, ("%s: negative window size", __func__));
@@ -321,6 +492,16 @@ insert_ddp_data(struct toepcb *toep, uint32_t n)
 #endif
 		MPASS((toep->ddp.flags & db_flag) != 0);
 		db = &toep->ddp.db[db_idx];
+		if (ddp_rcvbuf) {
+			placed = n;
+			if (placed > db->drb->len - db->placed)
+				placed = db->drb->len - db->placed;
+			if (placed != 0)
+				queue_ddp_rcvbuf_mbuf(toep, db_idx, placed);
+			complete_ddp_buffer(toep, db, db_idx);
+			n -= placed;
+			continue;
+		}
 		job = db->job;
 		copied = job->aio_received;
 		placed = n;
@@ -423,12 +604,13 @@ mk_rx_data_ack_ulp(struct ulp_txpkt *ulpmc, struct toepcb *toep)
 
 static struct wrqe *
 mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx,
-    struct pageset *ps, int offset, uint64_t ddp_flags, uint64_t ddp_flags_mask)
+    struct ppod_reservation *prsv, int offset, uint32_t len,
+    uint64_t ddp_flags, uint64_t ddp_flags_mask)
 {
 	struct wrqe *wr;
 	struct work_request_hdr *wrh;
 	struct ulp_txpkt *ulpmc;
-	int len;
+	int wrlen;
 
 	KASSERT(db_idx == 0 || db_idx == 1,
 	    ("%s: bad DDP buffer index %d", __func__, db_idx));
@@ -441,21 +623,21 @@ mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx,
 	 * The ULPTX master commands that follow must all end at 16B boundaries
 	 * too so we round up the size to 16.
 	 */
-	len = sizeof(*wrh) + 3 * roundup2(LEN__SET_TCB_FIELD_ULP, 16) +
+	wrlen = sizeof(*wrh) + 3 * roundup2(LEN__SET_TCB_FIELD_ULP, 16) +
 	    roundup2(LEN__RX_DATA_ACK_ULP, 16);
 
-	wr = alloc_wrqe(len, toep->ctrlq);
+	wr = alloc_wrqe(wrlen, toep->ctrlq);
 	if (wr == NULL)
 		return (NULL);
 	wrh = wrtod(wr);
-	INIT_ULPTX_WRH(wrh, len, 1, 0);	/* atomic */
+	INIT_ULPTX_WRH(wrh, wrlen, 1, 0);	/* atomic */
 	ulpmc = (struct ulp_txpkt *)(wrh + 1);
 
 	/* Write the buffer's tag */
 	ulpmc = mk_set_tcb_field_ulp(ulpmc, toep,
 	    W_TCB_RX_DDP_BUF0_TAG + db_idx,
 	    V_TCB_RX_DDP_BUF0_TAG(M_TCB_RX_DDP_BUF0_TAG),
-	    V_TCB_RX_DDP_BUF0_TAG(ps->prsv.prsv_tag));
+	    V_TCB_RX_DDP_BUF0_TAG(prsv->prsv_tag));
 
 	/* Update the current offset in the DDP buffer and its total length */
 	if (db_idx == 0)
@@ -464,14 +646,14 @@ mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx,
 		    V_TCB_RX_DDP_BUF0_OFFSET(M_TCB_RX_DDP_BUF0_OFFSET) |
 		    V_TCB_RX_DDP_BUF0_LEN(M_TCB_RX_DDP_BUF0_LEN),
 		    V_TCB_RX_DDP_BUF0_OFFSET(offset) |
-		    V_TCB_RX_DDP_BUF0_LEN(ps->len));
+		    V_TCB_RX_DDP_BUF0_LEN(len));
 	else
 		ulpmc = mk_set_tcb_field_ulp(ulpmc, toep,
 		    W_TCB_RX_DDP_BUF1_OFFSET,
 		    V_TCB_RX_DDP_BUF1_OFFSET(M_TCB_RX_DDP_BUF1_OFFSET) |
 		    V_TCB_RX_DDP_BUF1_LEN((u64)M_TCB_RX_DDP_BUF1_LEN << 32),
 		    V_TCB_RX_DDP_BUF1_OFFSET(offset) |
-		    V_TCB_RX_DDP_BUF1_LEN((u64)ps->len << 32));
+		    V_TCB_RX_DDP_BUF1_LEN((u64)len << 32));
 
 	/* Update DDP flags */
 	ulpmc = mk_set_tcb_field_ulp(ulpmc, toep, W_TCB_RX_DDP_FLAGS,
@@ -484,7 +666,8 @@ mk_update_tcb_for_ddp(struct adapter *sc, struct toepcb *toep, int db_idx,
 }
 
 static int
-handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len)
+handle_ddp_data_aio(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt,
+    int len)
 {
 	uint32_t report = be32toh(ddp_report);
 	unsigned int db_idx;
@@ -607,11 +790,239 @@ out:
 	return (0);
 }
 
+static bool
+queue_ddp_rcvbuf(struct toepcb *toep, struct ddp_rcv_buffer *drb)
+{
+	struct adapter *sc = td_adapter(toep->td);
+	struct ddp_buffer *db;
+	struct wrqe *wr;
+	uint64_t ddp_flags, ddp_flags_mask;
+	int buf_flag, db_idx;
+
+	DDP_ASSERT_LOCKED(toep);
+
+	KASSERT((toep->ddp.flags & DDP_DEAD) == 0, ("%s: DDP_DEAD", __func__));
+	KASSERT(toep->ddp.active_count < nitems(toep->ddp.db),
+	    ("%s: no empty DDP buffer slot", __func__));
+
+	/* Determine which DDP buffer to use. */
+	if (toep->ddp.db[0].drb == NULL) {
+		db_idx = 0;
+	} else {
+		MPASS(toep->ddp.db[1].drb == NULL);
+		db_idx = 1;
+	}
+
+	/*
+	 * Permit PSH to trigger a partial completion without
+	 * invalidating the rest of the buffer, but disable the PUSH
+	 * timer.
+	 */
+	ddp_flags = 0;
+	ddp_flags_mask = 0;
+	if (db_idx == 0) {
+		ddp_flags |= V_TF_DDP_PSH_NO_INVALIDATE0(1) |
+		    V_TF_DDP_PUSH_DISABLE_0(0) | V_TF_DDP_PSHF_ENABLE_0(1) |
+		    V_TF_DDP_BUF0_VALID(1);
+		ddp_flags_mask |= V_TF_DDP_PSH_NO_INVALIDATE0(1) |
+		    V_TF_DDP_PUSH_DISABLE_0(1) | V_TF_DDP_PSHF_ENABLE_0(1) |
+		    V_TF_DDP_BUF0_FLUSH(1) | V_TF_DDP_BUF0_VALID(1);
+		buf_flag = DDP_BUF0_ACTIVE;
+	} else {
+		ddp_flags |= V_TF_DDP_PSH_NO_INVALIDATE1(1) |
+		    V_TF_DDP_PUSH_DISABLE_1(0) | V_TF_DDP_PSHF_ENABLE_1(1) |
+		    V_TF_DDP_BUF1_VALID(1);
+		ddp_flags_mask |= V_TF_DDP_PSH_NO_INVALIDATE1(1) |
+		    V_TF_DDP_PUSH_DISABLE_1(1) | V_TF_DDP_PSHF_ENABLE_1(1) |
+		    V_TF_DDP_BUF1_FLUSH(1) | V_TF_DDP_BUF1_VALID(1);
+		buf_flag = DDP_BUF1_ACTIVE;
+	}
+	MPASS((toep->ddp.flags & buf_flag) == 0);
+	if ((toep->ddp.flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)) == 0) {
+		MPASS(db_idx == 0);
+		MPASS(toep->ddp.active_id == -1);
+		MPASS(toep->ddp.active_count == 0);
+		ddp_flags_mask |= V_TF_DDP_ACTIVE_BUF(1);
+	}
+
+	/*
+	 * The TID for this connection should still be valid.  If
+	 * DDP_DEAD is set, SBS_CANTRCVMORE should be set, so we
+	 * shouldn't be this far anyway.
+	 */
+	wr = mk_update_tcb_for_ddp(sc, toep, db_idx, &drb->prsv, 0, drb->len,
+	    ddp_flags, ddp_flags_mask);
+	if (wr == NULL) {
+		recycle_ddp_rcv_buffer(toep, drb);
+		printf("%s: mk_update_tcb_for_ddp failed\n", __func__);
+		return (false);
+	}
+
+#ifdef VERBOSE_TRACES
+	CTR(KTR_CXGBE,
+	    "%s: tid %u, scheduling DDP[%d] (flags %#lx/%#lx)", __func__,
+	    toep->tid, db_idx, ddp_flags, ddp_flags_mask);
+#endif
+	/*
+	 * Hold a reference on scheduled buffers that is dropped in
+	 * complete_ddp_buffer.
+	 */
+	drb->refs = 1;
+
+	/* Give the chip the go-ahead. */
+	t4_wrq_tx(sc, wr);
+	db = &toep->ddp.db[db_idx];
+	db->drb = drb;
+	toep->ddp.flags |= buf_flag;
+	toep->ddp.active_count++;
+	if (toep->ddp.active_count == 1) {
+		MPASS(toep->ddp.active_id == -1);
+		toep->ddp.active_id = db_idx;
+		CTR2(KTR_CXGBE, "%s: ddp_active_id = %d", __func__,
+		    toep->ddp.active_id);
+	}
+	return (true);
+}
+
+static int
+handle_ddp_data_rcvbuf(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt,
+    int len)
+{
+	uint32_t report = be32toh(ddp_report);
+	struct inpcb *inp = toep->inp;
+	struct tcpcb *tp;
+	struct socket *so;
+	struct sockbuf *sb;
+	struct ddp_buffer *db;
+	struct ddp_rcv_buffer *drb;
+	unsigned int db_idx;
+	bool invalidated;
+
+	db_idx = report & F_DDP_BUF_IDX ? 1 : 0;
+
+	invalidated = (report & F_DDP_INV) != 0;
+
+	INP_WLOCK(inp);
+	so = inp_inpcbtosocket(inp);
+	sb = &so->so_rcv;
+	DDP_LOCK(toep);
+
+	KASSERT(toep->ddp.active_id == db_idx,
+	    ("completed DDP buffer (%d) != active_id (%d) for tid %d", db_idx,
+	    toep->ddp.active_id, toep->tid));
+	db = &toep->ddp.db[db_idx];
+
+	if (__predict_false(inp->inp_flags & INP_DROPPED)) {
+		/*
+		 * This can happen due to an administrative tcpdrop(8).
+		 * Just ignore the received data.
+		 */
+		CTR5(KTR_CXGBE, "%s: tid %u, seq 0x%x, len %d, inp_flags 0x%x",
+		    __func__, toep->tid, be32toh(rcv_nxt), len, inp->inp_flags);
+		if (invalidated)
+			complete_ddp_buffer(toep, db, db_idx);
+		goto out;
+	}
+
+	tp = intotcpcb(inp);
+
+	/*
+	 * For RX_DDP_COMPLETE, len will be zero and rcv_nxt is the
+	 * sequence number of the next byte to receive.  The length of
+	 * the data received for this message must be computed by
+	 * comparing the new and old values of rcv_nxt.
+	 *
+	 * For RX_DATA_DDP, len might be non-zero, but it is only the
+	 * length of the most recent DMA.  It does not include the
+	 * total length of the data received since the previous update
+	 * for this DDP buffer.  rcv_nxt is the sequence number of the
+	 * first received byte from the most recent DMA.
+	 */
+	len += be32toh(rcv_nxt) - tp->rcv_nxt;
+	tp->rcv_nxt += len;
+	tp->t_rcvtime = ticks;
+#ifndef USE_DDP_RX_FLOW_CONTROL
+	KASSERT(tp->rcv_wnd >= len, ("%s: negative window size", __func__));
+	tp->rcv_wnd -= len;
+#endif
+#ifdef VERBOSE_TRACES
+	CTR5(KTR_CXGBE, "%s: tid %u, DDP[%d] placed %d bytes (%#x)", __func__,
+	    toep->tid, db_idx, len, report);
+#endif
+
+	/* receive buffer autosize */
+	MPASS(toep->vnet == so->so_vnet);
+	CURVNET_SET(toep->vnet);
+	SOCKBUF_LOCK(sb);
+	if (sb->sb_flags & SB_AUTOSIZE &&
+	    V_tcp_do_autorcvbuf &&
+	    sb->sb_hiwat < V_tcp_autorcvbuf_max &&
+	    len > (sbspace(sb) / 8 * 7)) {
+		struct adapter *sc = td_adapter(toep->td);
+		unsigned int hiwat = sb->sb_hiwat;
+		unsigned int newsize = min(hiwat + sc->tt.autorcvbuf_inc,
+		    V_tcp_autorcvbuf_max);
+
+		if (!sbreserve_locked(so, SO_RCV, newsize, NULL))
+			sb->sb_flags &= ~SB_AUTOSIZE;
+	}
+
+	if (len > 0) {
+		queue_ddp_rcvbuf_mbuf(toep, db_idx, len);
+		t4_rcvd_locked(&toep->td->tod, tp);
+	}
+	sorwakeup_locked(so);
+	SOCKBUF_UNLOCK_ASSERT(sb);
+	CURVNET_RESTORE();
+
+	if (invalidated)
+		complete_ddp_buffer(toep, db, db_idx);
+	else
+		KASSERT(db->placed < db->drb->len,
+		    ("%s: full DDP buffer not invalidated", __func__));
+
+	if (toep->ddp.active_count != nitems(toep->ddp.db)) {
+		drb = alloc_cached_ddp_rcv_buffer(toep);
+		if (drb == NULL)
+			drb = alloc_ddp_rcv_buffer(toep, M_NOWAIT);
+		if (drb == NULL)
+			ddp_queue_toep(toep);
+		else {
+			if (!queue_ddp_rcvbuf(toep, drb)) {
+				ddp_queue_toep(toep);
+			}
+		}
+	}
+out:
+	DDP_UNLOCK(toep);
+	INP_WUNLOCK(inp);
+
+	return (0);
+}
+
+static int
+handle_ddp_data(struct toepcb *toep, __be32 ddp_report, __be32 rcv_nxt, int len)
+{
+	if ((toep->ddp.flags & DDP_RCVBUF) != 0)
+		return (handle_ddp_data_rcvbuf(toep, ddp_report, rcv_nxt, len));
+	else
+		return (handle_ddp_data_aio(toep, ddp_report, rcv_nxt, len));
+}
+
 void
 handle_ddp_indicate(struct toepcb *toep)
 {
 
 	DDP_ASSERT_LOCKED(toep);
+	if ((toep->ddp.flags & DDP_RCVBUF) != 0) {
+		/*
+		 * Indicates are not meaningful for RCVBUF since
+		 * buffers are activated when the socket option is
+		 * set.
+		 */
+		return;
+	}
+
 	MPASS(toep->ddp.active_count == 0);
 	MPASS((toep->ddp.flags & (DDP_BUF0_ACTIVE | DDP_BUF1_ACTIVE)) == 0);
 	if (toep->ddp.waiting_count == 0) {
@@ -654,6 +1065,8 @@ do_ddp_tcb_rpl(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
 		/*
 		 * XXX: This duplicates a lot of code with handle_ddp_data().
 		 */
+		KASSERT((toep->ddp.flags & DDP_AIO) != 0,
+		    ("%s: DDP_RCVBUF", __func__));
 		db_idx = G_COOKIE(cpl->cookie) - CPL_COOKIE_DDP0;
 		MPASS(db_idx < nitems(toep->ddp.db));
 		INP_WLOCK(inp);
@@ -707,6 +1120,8 @@ do_ddp_tcb_rpl(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
 void
 handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, __be32 rcv_nxt)
 {
+	struct socket *so = toep->inp->inp_socket;
+	struct sockbuf *sb = &so->so_rcv;
 	struct ddp_buffer *db;
 	struct kaiocb *job;
 	long copied;
@@ -715,14 +1130,19 @@ handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, __be32 rcv_nxt)
 	unsigned int db_flag;
 #endif
 	int len, placed;
+	bool ddp_rcvbuf;
 
 	INP_WLOCK_ASSERT(toep->inp);
 	DDP_ASSERT_LOCKED(toep);
 
+	ddp_rcvbuf = (toep->ddp.flags & DDP_RCVBUF) != 0;
+
 	/* - 1 is to ignore the byte for FIN */
 	len = be32toh(rcv_nxt) - tp->rcv_nxt - 1;
 	tp->rcv_nxt += len;
 
+	CTR(KTR_CXGBE, "%s: tid %d placed %u bytes before FIN", __func__,
+	    toep->tid, len);
 	while (toep->ddp.active_count > 0) {
 		MPASS(toep->ddp.active_id != -1);
 		db_idx = toep->ddp.active_id;
@@ -731,6 +1151,20 @@ handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, __be32 rcv_nxt)
 #endif
 		MPASS((toep->ddp.flags & db_flag) != 0);
 		db = &toep->ddp.db[db_idx];
+		if (ddp_rcvbuf) {
+			placed = len;
+			if (placed > db->drb->len - db->placed)
+				placed = db->drb->len - db->placed;
+			if (placed != 0) {
+				SOCKBUF_LOCK(sb);
+				queue_ddp_rcvbuf_mbuf(toep, db_idx, placed);
+				sorwakeup_locked(so);
+				SOCKBUF_UNLOCK_ASSERT(sb);
+			}
+			complete_ddp_buffer(toep, db, db_idx);
+			len -= placed;
+			continue;
+		}
 		job = db->job;
 		copied = job->aio_received;
 		placed = len;
@@ -758,7 +1192,8 @@ handle_ddp_close(struct toepcb *toep, struct tcpcb *tp, __be32 rcv_nxt)
 	}
 
 	MPASS(len == 0);
-	ddp_complete_all(toep, 0);
+	if ((toep->ddp.flags & DDP_AIO) != 0)
+		ddp_complete_all(toep, 0);
 }
 
 #define DDP_ERR (F_DDP_PPOD_MISMATCH | F_DDP_LLIMIT_ERR | F_DDP_ULIMIT_ERR |\
@@ -892,6 +1327,7 @@ set_ddp_ulp_mode(struct toepcb *toep)
 static void
 enable_ddp(struct adapter *sc, struct toepcb *toep)
 {
+	uint64_t ddp_flags;
 
 	KASSERT((toep->ddp.flags & (DDP_ON | DDP_OK | DDP_SC_REQ)) == DDP_OK,
 	    ("%s: toep %p has bad ddp_flags 0x%x",
@@ -900,13 +1336,16 @@ enable_ddp(struct adapter *sc, struct toepcb *toep)
 	CTR3(KTR_CXGBE, "%s: tid %u (time %u)",
 	    __func__, toep->tid, time_uptime);
 
+	ddp_flags = 0;
+	if ((toep->ddp.flags & DDP_AIO) != 0)
+		ddp_flags |= V_TF_DDP_BUF0_INDICATE(1) |
+		    V_TF_DDP_BUF1_INDICATE(1);
 	DDP_ASSERT_LOCKED(toep);
 	toep->ddp.flags |= DDP_SC_REQ;
 	t4_set_tcb_field(sc, toep->ctrlq, toep, W_TCB_RX_DDP_FLAGS,
 	    V_TF_DDP_OFF(1) | V_TF_DDP_INDICATE_OUT(1) |
 	    V_TF_DDP_BUF0_INDICATE(1) | V_TF_DDP_BUF1_INDICATE(1) |
-	    V_TF_DDP_BUF0_VALID(1) | V_TF_DDP_BUF1_VALID(1),
-	    V_TF_DDP_BUF0_INDICATE(1) | V_TF_DDP_BUF1_INDICATE(1), 0, 0);
+	    V_TF_DDP_BUF0_VALID(1) | V_TF_DDP_BUF1_VALID(1), ddp_flags, 0, 0);
 	t4_set_tcb_field(sc, toep->ctrlq, toep, W_TCB_T_FLAGS,
 	    V_TF_RCV_COALESCE_ENABLE(1), 0, 0, 0);
 }
@@ -1103,6 +1542,19 @@ have_pgsz:
 	return (0);
 }
 
+static int
+t4_alloc_page_pods_for_rcvbuf(struct ppod_region *pr,
+    struct ddp_rcv_buffer *drb)
+{
+	struct ppod_reservation *prsv = &drb->prsv;
+
+	KASSERT(prsv->prsv_nppods == 0,
+	    ("%s: page pods already allocated", __func__));
+
+	return (t4_alloc_page_pods_for_buf(pr, (vm_offset_t)drb->buf, drb->len,
+	    prsv));
+}
+
 int
 t4_alloc_page_pods_for_sgl(struct ppod_region *pr, struct ctl_sg_entry *sgl,
     int entries, struct ppod_reservation *prsv)
@@ -1223,7 +1675,6 @@ t4_write_page_pods_for_ps(struct adapter *sc, struct sge_wrq *wrq, int tid,
 	ddp_pgsz = 1 << pr->pr_page_shift[G_PPOD_PGSZ(prsv->prsv_tag)];
 	ppod_addr = pr->pr_start + (prsv->prsv_tag & pr->pr_tag_mask);
 	for (i = 0; i < prsv->prsv_nppods; ppod_addr += chunk) {
-
 		/* How many page pods are we writing in this cycle */
 		n = min(prsv->prsv_nppods - i, NUM_ULP_TX_SC_IMM_PPODS);
 		chunk = PPOD_SZ(n);
@@ -1276,6 +1727,96 @@ t4_write_page_pods_for_ps(struct adapter *sc, struct sge_wrq *wrq, int tid,
 	return (0);
 }
 
+static int
+t4_write_page_pods_for_rcvbuf(struct adapter *sc, struct sge_wrq *wrq, int tid,
+    struct ddp_rcv_buffer *drb)
+{
+	struct wrqe *wr;
+	struct ulp_mem_io *ulpmc;
+	struct ulptx_idata *ulpsc;
+	struct pagepod *ppod;
+	int i, j, k, n, chunk, len, ddp_pgsz;
+	u_int ppod_addr, offset;
+	uint32_t cmd;
+	struct ppod_reservation *prsv = &drb->prsv;
+	struct ppod_region *pr = prsv->prsv_pr;
+	uintptr_t end_pva, pva;
+	vm_paddr_t pa;
+
+	MPASS(prsv->prsv_nppods > 0);
+
+	cmd = htobe32(V_ULPTX_CMD(ULP_TX_MEM_WRITE));
+	if (is_t4(sc))
*** 423 LINES SKIPPED ***