git: fd8f61d6e970 - main - cxgbei: Dispatch sent PDUs to the NIC asynchronously.

From: John Baldwin <jhb_at_FreeBSD.org>
Date: Tue, 08 Feb 2022 00:21:16 UTC
The branch main has been updated by jhb:

URL: https://cgit.FreeBSD.org/src/commit/?id=fd8f61d6e970fa443d393d330ae70c54c9a523a4

commit fd8f61d6e970fa443d393d330ae70c54c9a523a4
Author:     John Baldwin <jhb@FreeBSD.org>
AuthorDate: 2022-02-08 00:20:06 +0000
Commit:     John Baldwin <jhb@FreeBSD.org>
CommitDate: 2022-02-08 00:20:06 +0000

    cxgbei: Dispatch sent PDUs to the NIC asynchronously.
    
    Previously the driver was called to send PDUs to the NIC synchronously
    from the icl_conn_pdu_queue_cb callback.  However, this performed a
    fair bit of work while holding the icl connection lock.  Instead,
    change the callback to add sent PDUs to a STAILQ and defer dispatching
    of PDUs to the NIC to a helper thread similar to the scheme used in
    the TCP iSCSI backend.
    
    - Replace rx_flags int and the sole RXF_ACTIVE flag with a simple
      rx_active bool.
    
    - Add a pool of transmit worker threads for cxgbei.
    
    - Fix worker thread exit to depend on the wakeup in kthread_exit()
      to fix a race with module unload.
    
    Reported by:    mav
    Sponsored by:   Chelsio Communications
---
 sys/dev/cxgbe/cxgbei/cxgbei.c     | 187 +++++++++++++++++++++-----------------
 sys/dev/cxgbe/cxgbei/cxgbei.h     |  21 +++--
 sys/dev/cxgbe/cxgbei/icl_cxgbei.c | 172 +++++++++++++++++++++++++++++------
 3 files changed, 260 insertions(+), 120 deletions(-)

diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.c b/sys/dev/cxgbe/cxgbei/cxgbei.c
index 4a8df99b3d48..c06e39005197 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.c
@@ -95,8 +95,9 @@ __FBSDID("$FreeBSD$");
 #include "cxgbei.h"
 
 static int worker_thread_count;
-static struct cxgbei_worker_thread_softc *cwt_softc;
-static struct proc *cxgbei_proc;
+static struct cxgbei_worker_thread *cwt_rx_threads, *cwt_tx_threads;
+
+static void cwt_queue_for_rx(struct icl_cxgbei_conn *icc);
 
 static void
 read_pdu_limits(struct adapter *sc, uint32_t *max_tx_data_len,
@@ -585,17 +586,9 @@ do_rx_iscsi_ddp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
 	icl_cxgbei_new_pdu_set_conn(ip, ic);
 
 	STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
-	if ((icc->rx_flags & RXF_ACTIVE) == 0) {
-		struct cxgbei_worker_thread_softc *cwt = &cwt_softc[icc->cwt];
-
-		mtx_lock(&cwt->cwt_lock);
-		icc->rx_flags |= RXF_ACTIVE;
-		TAILQ_INSERT_TAIL(&cwt->rx_head, icc, rx_link);
-		if (cwt->cwt_state == CWT_SLEEPING) {
-			cwt->cwt_state = CWT_RUNNING;
-			cv_signal(&cwt->cwt_cv);
-		}
-		mtx_unlock(&cwt->cwt_lock);
+	if (!icc->rx_active) {
+		icc->rx_active = true;
+		cwt_queue_for_rx(icc);
 	}
 	SOCKBUF_UNLOCK(sb);
 	INP_WUNLOCK(inp);
@@ -836,17 +829,9 @@ do_rx_iscsi_cmp(struct sge_iq *iq, const struct rss_header *rss, struct mbuf *m)
 
 	/* Enqueue the PDU to the received pdus queue. */
 	STAILQ_INSERT_TAIL(&icc->rcvd_pdus, ip, ip_next);
-	if ((icc->rx_flags & RXF_ACTIVE) == 0) {
-		struct cxgbei_worker_thread_softc *cwt = &cwt_softc[icc->cwt];
-
-		mtx_lock(&cwt->cwt_lock);
-		icc->rx_flags |= RXF_ACTIVE;
-		TAILQ_INSERT_TAIL(&cwt->rx_head, icc, rx_link);
-		if (cwt->cwt_state == CWT_SLEEPING) {
-			cwt->cwt_state = CWT_RUNNING;
-			cv_signal(&cwt->cwt_cv);
-		}
-		mtx_unlock(&cwt->cwt_lock);
+	if (!icc->rx_active) {
+		icc->rx_active = true;
+		cwt_queue_for_rx(icc);
 	}
 	SOCKBUF_UNLOCK(sb);
 	INP_WUNLOCK(inp);
@@ -944,9 +929,9 @@ static struct uld_info cxgbei_uld_info = {
 };
 
 static void
-cwt_main(void *arg)
+cwt_rx_main(void *arg)
 {
-	struct cxgbei_worker_thread_softc *cwt = arg;
+	struct cxgbei_worker_thread *cwt = arg;
 	struct icl_cxgbei_conn *icc = NULL;
 	struct icl_conn *ic;
 	struct icl_pdu *ip;
@@ -962,8 +947,8 @@ cwt_main(void *arg)
 
 	while (__predict_true(cwt->cwt_state != CWT_STOP)) {
 		cwt->cwt_state = CWT_RUNNING;
-		while ((icc = TAILQ_FIRST(&cwt->rx_head)) != NULL) {
-			TAILQ_REMOVE(&cwt->rx_head, icc, rx_link);
+		while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
+			TAILQ_REMOVE(&cwt->icc_head, icc, rx_link);
 			mtx_unlock(&cwt->cwt_lock);
 
 			ic = &icc->ic;
@@ -979,7 +964,7 @@ cwt_main(void *arg)
 				 */
 				parse_pdus(icc, sb);
 			}
-			MPASS(icc->rx_flags & RXF_ACTIVE);
+			MPASS(icc->rx_active);
 			if (__predict_true(!(sb->sb_state & SBS_CANTRCVMORE))) {
 				MPASS(STAILQ_EMPTY(&rx_pdus));
 				STAILQ_SWAP(&icc->rcvd_pdus, &rx_pdus, icl_pdu);
@@ -994,11 +979,16 @@ cwt_main(void *arg)
 				SOCKBUF_LOCK(sb);
 				MPASS(STAILQ_EMPTY(&rx_pdus));
 			}
-			MPASS(icc->rx_flags & RXF_ACTIVE);
+			MPASS(icc->rx_active);
 			if (STAILQ_EMPTY(&icc->rcvd_pdus) ||
 			    __predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
-				icc->rx_flags &= ~RXF_ACTIVE;
+				icc->rx_active = false;
+				SOCKBUF_UNLOCK(sb);
+
+				mtx_lock(&cwt->cwt_lock);
 			} else {
+				SOCKBUF_UNLOCK(sb);
+
 				/*
 				 * More PDUs were received while we were busy
 				 * handing over the previous batch to ICL.
@@ -1006,13 +996,9 @@ cwt_main(void *arg)
 				 * queue.
 				 */
 				mtx_lock(&cwt->cwt_lock);
-				TAILQ_INSERT_TAIL(&cwt->rx_head, icc,
+				TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
 				    rx_link);
-				mtx_unlock(&cwt->cwt_lock);
 			}
-			SOCKBUF_UNLOCK(sb);
-
-			mtx_lock(&cwt->cwt_lock);
 		}
 
 		/* Inner loop doesn't check for CWT_STOP, do that first. */
@@ -1022,84 +1008,121 @@ cwt_main(void *arg)
 		cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
 	}
 
-	MPASS(TAILQ_FIRST(&cwt->rx_head) == NULL);
-	mtx_assert(&cwt->cwt_lock, MA_OWNED);
-	cwt->cwt_state = CWT_STOPPED;
-	cv_signal(&cwt->cwt_cv);
+	MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
 	mtx_unlock(&cwt->cwt_lock);
 	kthread_exit();
 }
 
+static void
+cwt_queue_for_rx(struct icl_cxgbei_conn *icc)
+{
+	struct cxgbei_worker_thread *cwt = &cwt_rx_threads[icc->cwt];
+
+	mtx_lock(&cwt->cwt_lock);
+	TAILQ_INSERT_TAIL(&cwt->icc_head, icc, rx_link);
+	if (cwt->cwt_state == CWT_SLEEPING) {
+		cwt->cwt_state = CWT_RUNNING;
+		cv_signal(&cwt->cwt_cv);
+	}
+	mtx_unlock(&cwt->cwt_lock);
+}
+
+void
+cwt_queue_for_tx(struct icl_cxgbei_conn *icc)
+{
+	struct cxgbei_worker_thread *cwt = &cwt_tx_threads[icc->cwt];
+
+	mtx_lock(&cwt->cwt_lock);
+	TAILQ_INSERT_TAIL(&cwt->icc_head, icc, tx_link);
+	if (cwt->cwt_state == CWT_SLEEPING) {
+		cwt->cwt_state = CWT_RUNNING;
+		cv_signal(&cwt->cwt_cv);
+	}
+	mtx_unlock(&cwt->cwt_lock);
+}
+
 static int
 start_worker_threads(void)
 {
+	struct proc *cxgbei_proc;
 	int i, rc;
-	struct cxgbei_worker_thread_softc *cwt;
+	struct cxgbei_worker_thread *cwt;
 
 	worker_thread_count = min(mp_ncpus, 32);
-	cwt_softc = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
+	cwt_rx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
+	    M_WAITOK | M_ZERO);
+	cwt_tx_threads = malloc(worker_thread_count * sizeof(*cwt), M_CXGBE,
 	    M_WAITOK | M_ZERO);
 
-	MPASS(cxgbei_proc == NULL);
-	for (i = 0, cwt = &cwt_softc[0]; i < worker_thread_count; i++, cwt++) {
+	for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
+	     i++, cwt++) {
+		mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
+		cv_init(&cwt->cwt_cv, "cwt cv");
+		TAILQ_INIT(&cwt->icc_head);
+	}
+
+	for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
+	     i++, cwt++) {
 		mtx_init(&cwt->cwt_lock, "cwt lock", NULL, MTX_DEF);
 		cv_init(&cwt->cwt_cv, "cwt cv");
-		TAILQ_INIT(&cwt->rx_head);
-		rc = kproc_kthread_add(cwt_main, cwt, &cxgbei_proc, NULL, 0, 0,
-		    "cxgbei", "%d", i);
+		TAILQ_INIT(&cwt->icc_head);
+	}
+
+	cxgbei_proc = NULL;
+	for (i = 0, cwt = &cwt_rx_threads[0]; i < worker_thread_count;
+	     i++, cwt++) {
+		rc = kproc_kthread_add(cwt_rx_main, cwt, &cxgbei_proc,
+		    &cwt->cwt_td, 0, 0, "cxgbei", "rx %d", i);
 		if (rc != 0) {
-			printf("cxgbei: failed to start thread #%d/%d (%d)\n",
+			printf("cxgbei: failed to start rx thread #%d/%d (%d)\n",
 			    i + 1, worker_thread_count, rc);
-			mtx_destroy(&cwt->cwt_lock);
-			cv_destroy(&cwt->cwt_cv);
-			bzero(cwt, sizeof(*cwt));
-			if (i == 0) {
-				free(cwt_softc, M_CXGBE);
-				worker_thread_count = 0;
-
-				return (rc);
-			}
-
-			/* Not fatal, carry on with fewer threads. */
-			worker_thread_count = i;
-			rc = 0;
-			break;
+			return (rc);
 		}
+	}
 
-		/* Wait for thread to start before moving on to the next one. */
-		mtx_lock(&cwt->cwt_lock);
-		while (cwt->cwt_state == 0)
-			cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
-		mtx_unlock(&cwt->cwt_lock);
+	for (i = 0, cwt = &cwt_tx_threads[0]; i < worker_thread_count;
+	     i++, cwt++) {
+		rc = kproc_kthread_add(cwt_tx_main, cwt, &cxgbei_proc,
+		    &cwt->cwt_td, 0, 0, "cxgbei", "tx %d", i);
+		if (rc != 0) {
+			printf("cxgbei: failed to start tx thread #%d/%d (%d)\n",
+			    i + 1, worker_thread_count, rc);
+			return (rc);
+		}
 	}
 
-	MPASS(cwt_softc != NULL);
-	MPASS(worker_thread_count > 0);
 	return (0);
 }
 
 static void
-stop_worker_threads(void)
+stop_worker_threads1(struct cxgbei_worker_thread *threads)
 {
+	struct cxgbei_worker_thread *cwt;
 	int i;
-	struct cxgbei_worker_thread_softc *cwt = &cwt_softc[0];
 
-	MPASS(worker_thread_count >= 0);
-
-	for (i = 0, cwt = &cwt_softc[0]; i < worker_thread_count; i++, cwt++) {
+	for (i = 0, cwt = &threads[0]; i < worker_thread_count; i++, cwt++) {
 		mtx_lock(&cwt->cwt_lock);
-		MPASS(cwt->cwt_state == CWT_RUNNING ||
-		    cwt->cwt_state == CWT_SLEEPING);
-		cwt->cwt_state = CWT_STOP;
-		cv_signal(&cwt->cwt_cv);
-		do {
-			cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
-		} while (cwt->cwt_state != CWT_STOPPED);
+		if (cwt->cwt_td != NULL) {
+			MPASS(cwt->cwt_state == CWT_RUNNING ||
+			    cwt->cwt_state == CWT_SLEEPING);
+			cwt->cwt_state = CWT_STOP;
+			cv_signal(&cwt->cwt_cv);
+			mtx_sleep(cwt->cwt_td, &cwt->cwt_lock, 0, "cwtstop", 0);
+		}
 		mtx_unlock(&cwt->cwt_lock);
 		mtx_destroy(&cwt->cwt_lock);
 		cv_destroy(&cwt->cwt_cv);
 	}
-	free(cwt_softc, M_CXGBE);
+	free(threads, M_CXGBE);
+}
+
+static void
+stop_worker_threads(void)
+{
+
+	MPASS(worker_thread_count >= 0);
+	stop_worker_threads1(cwt_rx_threads);
+	stop_worker_threads1(cwt_tx_threads);
 }
 
 /* Select a worker thread for a connection. */
diff --git a/sys/dev/cxgbe/cxgbei/cxgbei.h b/sys/dev/cxgbe/cxgbei/cxgbei.h
index 58a5dac6d63b..b078f3110d62 100644
--- a/sys/dev/cxgbe/cxgbei/cxgbei.h
+++ b/sys/dev/cxgbe/cxgbei/cxgbei.h
@@ -36,23 +36,19 @@ enum {
 	CWT_SLEEPING	= 1,
 	CWT_RUNNING	= 2,
 	CWT_STOP	= 3,
-	CWT_STOPPED	= 4,
 };
 
-struct cxgbei_worker_thread_softc {
+struct cxgbei_worker_thread {
 	struct mtx	cwt_lock;
 	struct cv	cwt_cv;
 	volatile int	cwt_state;
+	struct thread	*cwt_td;
 
-	TAILQ_HEAD(, icl_cxgbei_conn) rx_head;
+	TAILQ_HEAD(, icl_cxgbei_conn) icc_head;
 } __aligned(CACHE_LINE_SIZE);
 
 #define CXGBEI_CONN_SIGNATURE 0x56788765
 
-enum {
-	RXF_ACTIVE	= 1 << 0,	/* In the worker thread's queue */
-};
-
 struct cxgbei_cmp {
 	LIST_ENTRY(cxgbei_cmp) link;
 
@@ -71,16 +67,21 @@ struct icl_cxgbei_conn {
 	int ulp_submode;
 	struct adapter *sc;
 	struct toepcb *toep;
+	u_int cwt;
 
 	/* Receive related. */
-	u_int rx_flags;				/* protected by so_rcv lock */
-	u_int cwt;
+	bool rx_active;				/* protected by so_rcv lock */
 	STAILQ_HEAD(, icl_pdu) rcvd_pdus;	/* protected by so_rcv lock */
 	TAILQ_ENTRY(icl_cxgbei_conn) rx_link;	/* protected by cwt lock */
 
 	struct cxgbei_cmp_head *cmp_table;	/* protected by cmp_lock */
 	struct mtx cmp_lock;
 	unsigned long cmp_hash_mask;
+
+	/* Transmit related. */
+	bool tx_active;				/* protected by ic lock */
+	STAILQ_HEAD(, icl_pdu) sent_pdus;	/* protected by ic lock */
+	TAILQ_ENTRY(icl_cxgbei_conn) tx_link;	/* protected by cwt lock */
 };
 
 static inline struct icl_cxgbei_conn *
@@ -134,8 +135,10 @@ struct cxgbei_data {
 
 /* cxgbei.c */
 u_int cxgbei_select_worker_thread(struct icl_cxgbei_conn *);
+void cwt_queue_for_tx(struct icl_cxgbei_conn *);
 
 /* icl_cxgbei.c */
+void cwt_tx_main(void *);
 int icl_cxgbei_mod_load(void);
 int icl_cxgbei_mod_unload(void);
 struct icl_pdu *icl_cxgbei_new_pdu(int);
diff --git a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
index f66a959f6311..516ab931a49c 100644
--- a/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
+++ b/sys/dev/cxgbe/cxgbei/icl_cxgbei.c
@@ -421,6 +421,128 @@ finalize_pdu(struct icl_cxgbei_conn *icc, struct icl_cxgbei_pdu *icp)
 	return (m);
 }
 
+static void
+cwt_push_pdus(struct icl_cxgbei_conn *icc, struct socket *so, struct mbufq *mq)
+{
+	struct epoch_tracker et;
+	struct icl_conn *ic = &icc->ic;
+	struct toepcb *toep = icc->toep;
+	struct inpcb *inp;
+
+	/*
+	 * Do not get inp from toep->inp as the toepcb might have
+	 * detached already.
+	 */
+	inp = sotoinpcb(so);
+	CURVNET_SET(toep->vnet);
+	NET_EPOCH_ENTER(et);
+	INP_WLOCK(inp);
+
+	ICL_CONN_UNLOCK(ic);
+	if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
+	    __predict_false((toep->flags & TPF_ATTACHED) == 0)) {
+		mbufq_drain(mq);
+	} else {
+		mbufq_concat(&toep->ulp_pduq, mq);
+		t4_push_pdus(icc->sc, toep, 0);
+	}
+	INP_WUNLOCK(inp);
+	NET_EPOCH_EXIT(et);
+	CURVNET_RESTORE();
+
+	ICL_CONN_LOCK(ic);
+}
+
+void
+cwt_tx_main(void *arg)
+{
+	struct cxgbei_worker_thread *cwt = arg;
+	struct icl_cxgbei_conn *icc;
+	struct icl_conn *ic;
+	struct icl_pdu *ip;
+	struct socket *so;
+	struct mbuf *m;
+	struct mbufq mq;
+	STAILQ_HEAD(, icl_pdu) tx_pdus = STAILQ_HEAD_INITIALIZER(tx_pdus);
+
+	MPASS(cwt != NULL);
+
+	mtx_lock(&cwt->cwt_lock);
+	MPASS(cwt->cwt_state == 0);
+	cwt->cwt_state = CWT_RUNNING;
+	cv_signal(&cwt->cwt_cv);
+
+	mbufq_init(&mq, INT_MAX);
+	while (__predict_true(cwt->cwt_state != CWT_STOP)) {
+		cwt->cwt_state = CWT_RUNNING;
+		while ((icc = TAILQ_FIRST(&cwt->icc_head)) != NULL) {
+			TAILQ_REMOVE(&cwt->icc_head, icc, tx_link);
+			mtx_unlock(&cwt->cwt_lock);
+
+			ic = &icc->ic;
+
+			ICL_CONN_LOCK(ic);
+			MPASS(icc->tx_active);
+			STAILQ_SWAP(&icc->sent_pdus, &tx_pdus, icl_pdu);
+			ICL_CONN_UNLOCK(ic);
+
+			while ((ip = STAILQ_FIRST(&tx_pdus)) != NULL) {
+				STAILQ_REMOVE_HEAD(&tx_pdus, ip_next);
+
+				m = finalize_pdu(icc, ip_to_icp(ip));
+				M_ASSERTPKTHDR(m);
+				MPASS((m->m_pkthdr.len & 3) == 0);
+
+				mbufq_enqueue(&mq, m);
+			}
+
+			ICL_CONN_LOCK(ic);
+			so = ic->ic_socket;
+			if (__predict_false(ic->ic_disconnecting) ||
+			    __predict_false(so == NULL)) {
+				mbufq_drain(&mq);
+				icc->tx_active = false;
+				ICL_CONN_UNLOCK(ic);
+
+				mtx_lock(&cwt->cwt_lock);
+				continue;
+			}
+
+			cwt_push_pdus(icc, so, &mq);
+
+			MPASS(icc->tx_active);
+			if (STAILQ_EMPTY(&icc->sent_pdus)) {
+				icc->tx_active = false;
+				ICL_CONN_UNLOCK(ic);
+
+				mtx_lock(&cwt->cwt_lock);
+			} else {
+				ICL_CONN_UNLOCK(ic);
+
+				/*
+				 * More PDUs were queued while we were
+				 * busy sending the previous batch.
+				 * Re-add this connection to the end
+				 * of the queue.
+				 */
+				mtx_lock(&cwt->cwt_lock);
+				TAILQ_INSERT_TAIL(&cwt->icc_head, icc,
+				    tx_link);
+			}
+		}
+
+		/* Inner loop doesn't check for CWT_STOP, do that first. */
+		if (__predict_false(cwt->cwt_state == CWT_STOP))
+			break;
+		cwt->cwt_state = CWT_SLEEPING;
+		cv_wait(&cwt->cwt_cv, &cwt->cwt_lock);
+	}
+
+	MPASS(TAILQ_FIRST(&cwt->icc_head) == NULL);
+	mtx_unlock(&cwt->cwt_lock);
+	kthread_exit();
+}
+
 int
 icl_cxgbei_conn_pdu_append_data(struct icl_conn *ic, struct icl_pdu *ip,
     const void *addr, size_t len, int flags)
@@ -534,13 +656,9 @@ void
 icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip,
 			     icl_pdu_cb cb)
 {
-	struct epoch_tracker et;
 	struct icl_cxgbei_conn *icc = ic_to_icc(ic);
 	struct icl_cxgbei_pdu *icp = ip_to_icp(ip);
 	struct socket *so = ic->ic_socket;
-	struct toepcb *toep = icc->toep;
-	struct inpcb *inp;
-	struct mbuf *m;
 
 	MPASS(ic == ip->ip_conn);
 	MPASS(ip->ip_bhs_mbuf != NULL);
@@ -557,28 +675,11 @@ icl_cxgbei_conn_pdu_queue_cb(struct icl_conn *ic, struct icl_pdu *ip,
 		return;
 	}
 
-	m = finalize_pdu(icc, icp);
-	M_ASSERTPKTHDR(m);
-	MPASS((m->m_pkthdr.len & 3) == 0);
-
-	/*
-	 * Do not get inp from toep->inp as the toepcb might have detached
-	 * already.
-	 */
-	inp = sotoinpcb(so);
-	CURVNET_SET(toep->vnet);
-	NET_EPOCH_ENTER(et);
-	INP_WLOCK(inp);
-	if (__predict_false(inp->inp_flags & (INP_DROPPED | INP_TIMEWAIT)) ||
-	    __predict_false((toep->flags & TPF_ATTACHED) == 0))
-		m_freem(m);
-	else {
-		mbufq_enqueue(&toep->ulp_pduq, m);
-		t4_push_pdus(icc->sc, toep, 0);
+	STAILQ_INSERT_TAIL(&icc->sent_pdus, ip, ip_next);
+	if (!icc->tx_active) {
+		icc->tx_active = true;
+		cwt_queue_for_tx(icc);
 	}
-	INP_WUNLOCK(inp);
-	NET_EPOCH_EXIT(et);
-	CURVNET_RESTORE();
 }
 
 static struct icl_conn *
@@ -593,6 +694,7 @@ icl_cxgbei_new_conn(const char *name, struct mtx *lock)
 	    M_WAITOK | M_ZERO);
 	icc->icc_signature = CXGBEI_CONN_SIGNATURE;
 	STAILQ_INIT(&icc->rcvd_pdus);
+	STAILQ_INIT(&icc->sent_pdus);
 
 	icc->cmp_table = hashinit(64, M_CXGBEI, &icc->cmp_hash_mask);
 	mtx_init(&icc->cmp_lock, "cxgbei_cmp", NULL, MTX_DEF);
@@ -935,21 +1037,33 @@ icl_cxgbei_conn_close(struct icl_conn *ic)
 	if (toep != NULL) {	/* NULL if connection was never offloaded. */
 		toep->ulpcb = NULL;
 
+		/*
+		 * Wait for the cwt threads to stop processing this
+		 * connection for transmit.
+		 */
+		while (icc->tx_active)
+			rw_sleep(inp, &inp->inp_lock, 0, "conclo", 1);
+
 		/* Discard PDUs queued for TX. */
+		while (!STAILQ_EMPTY(&icc->sent_pdus)) {
+			ip = STAILQ_FIRST(&icc->sent_pdus);
+			STAILQ_REMOVE_HEAD(&icc->sent_pdus, ip_next);
+			icl_cxgbei_pdu_done(ip, ENOTCONN);
+		}
 		mbufq_drain(&toep->ulp_pduq);
 
 		/*
 		 * Wait for the cwt threads to stop processing this
-		 * connection.
+		 * connection for receive.
 		 */
 		SOCKBUF_LOCK(sb);
-		if (icc->rx_flags & RXF_ACTIVE) {
-			volatile u_int *p = &icc->rx_flags;
+		if (icc->rx_active) {
+			volatile bool *p = &icc->rx_active;
 
 			SOCKBUF_UNLOCK(sb);
 			INP_WUNLOCK(inp);
 
-			while (*p & RXF_ACTIVE)
+			while (*p)
 				pause("conclo", 1);
 
 			INP_WLOCK(inp);