svn commit: r217768 - projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp

Jeff Roberson jeff at FreeBSD.org
Mon Jan 24 06:03:59 UTC 2011


Author: jeff
Date: Mon Jan 24 06:03:58 2011
New Revision: 217768
URL: http://svn.freebsd.org/changeset/base/217768

Log:
   - Fix flow control issues with nagle and recv buffer posting.
   - re-arm the tx cq according to the sockbuffer character count rather than
     any waiters.
   - Prevent sdp_append() from creating buffers with too many discontiguous
     mbufs.
   - Honor xmit_size_goal.
   - Add a task to handle shutdown so operations which are not safe to call
     from a timer may proceed.
   - Rework a significant amount of the sdp shutdown state machine to be
     more compliant with tcp.
  
  Sponsored by:	Isilon Systems, iX Systems, and Panasas.

Modified:
  projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp.h
  projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_bcopy.c
  projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c
  projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_rx.c
  projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_tx.c

Modified: projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp.h
==============================================================================
--- projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp.h	Mon Jan 24 04:32:59 2011	(r217767)
+++ projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp.h	Mon Jan 24 06:03:58 2011	(r217768)
@@ -54,6 +54,8 @@
 #define	CONFIG_INFINIBAND_SDP_DEBUG		1
 #define	CONFIG_INFINIBAND_SDP_DEBUG_DATA	1
 
+#define	SDP_DEBUG
+
 #include "sdp_dbg.h"
 
 #undef LIST_HEAD
@@ -88,9 +90,6 @@ struct name {                           
 
 #define SDP_MAX_RDMA_READ_LEN (PAGE_SIZE * (SDP_FMR_SIZE - 2))
 
-#define SDP_MAX_RECV_SGES 9 /* 1 for sdp header + 8 for payload */
-#define SDP_MAX_SEND_SGES 9 /* same as above */
-
 /* mb inlined data len - rest will be rx'ed into frags */
 #define SDP_HEAD_SIZE (sizeof(struct sdp_bsdh))
 
@@ -100,6 +99,9 @@ struct name {                           
 #define	SDP_MAX_PACKET	(1 << 16)
 #define SDP_MAX_PAYLOAD (SDP_MAX_PACKET - SDP_HEAD_SIZE)
 
+#define SDP_MAX_RECV_SGES (SDP_MAX_PACKET / MCLBYTES)
+#define SDP_MAX_SEND_SGES (SDP_MAX_PACKET / MCLBYTES) + 2
+
 #define SDP_NUM_WC 4
 
 #define SDP_DEF_ZCOPY_THRESH 64*1024
@@ -366,11 +368,12 @@ struct sdp_moderation {
 #define	SDP_NODELAY	0x0008		/* Disble nagle. */
 #define	SDP_NEEDFIN	0x0010		/* Send a fin on the next tx. */
 #define	SDP_DREQWAIT	0x0020		/* Waiting on DREQ. */
-#define	SDP_HAVEOOB	0x0040		/* Have OOB data. */
+#define	SDP_DESTROY	0x0040		/* Being destroyed. */
+#define	SDP_DISCON	0x0080		/* rdma_disconnect is owed. */
 
 /* These are oobflags */
 #define	SDP_HADOOB	0x0001		/* Had OOB data. */
-#define	SDP_DESTROY	0x0002		/* Being destroyed. */
+#define	SDP_HAVEOOB	0x0002		/* Have OOB data. */
 
 struct sdp_sock {
 	LIST_ENTRY(sdp_sock) list;
@@ -429,6 +432,7 @@ struct sdp_sock {
 	unsigned long tx_bytes;
 	unsigned long rx_bytes;
 	struct sdp_moderation auto_mod;
+	struct task shutdown_task;
 #ifdef SDP_ZCOPY
 	struct tx_srcavail_state *tx_sa;
 	struct rx_srcavail_state *rx_sa;
@@ -713,5 +717,6 @@ int sdp_post_sendsm(struct socket *sk);
 void srcavail_cancel_timeout(struct work_struct *work);
 void sdp_abort_srcavail(struct socket *sk);
 void sdp_abort_rdma_read(struct socket *sk);
+int sdp_process_rx(struct sdp_sock *ssk);
 
 #endif

Modified: projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_bcopy.c
==============================================================================
--- projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_bcopy.c	Mon Jan 24 04:32:59 2011	(r217767)
+++ projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_bcopy.c	Mon Jan 24 06:03:58 2011	(r217768)
@@ -115,7 +115,7 @@ sdp_nagle_off(struct sdp_sock *ssk, stru
 		unlikely(h->mid != SDP_MID_DATA) ||
 		(ssk->flags & SDP_NODELAY) ||
 		!ssk->nagle_last_unacked ||
-		mb->m_pkthdr.len >= ssk->xmit_size_goal ||
+		mb->m_pkthdr.len >= ssk->xmit_size_goal / 4 ||
 		(mb->m_flags & M_PUSH);
 
 	if (send_now) {
@@ -208,9 +208,10 @@ sdp_post_sends(struct sdp_sock *ssk, int
 		SOCKBUF_LOCK(&sk->so_snd);
 		sk->so_snd.sb_sndptr = mb->m_nextpkt;
 		sk->so_snd.sb_mb = mb->m_nextpkt;
-		for (n = mb; n != NULL; n = mb->m_next)
-			sbfree(&sk->so_snd, mb);
+		mb->m_nextpkt = NULL;
 		SB_EMPTY_FIXUP(&sk->so_snd);
+		for (n = mb; n != NULL; n = n->m_next)
+			sbfree(&sk->so_snd, n);
 		SOCKBUF_UNLOCK(&sk->so_snd);
 		sdp_post_send(ssk, mb);
 		post_count++;

Modified: projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c
==============================================================================
--- projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c	Mon Jan 24 04:32:59 2011	(r217767)
+++ projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c	Mon Jan 24 06:03:58 2011	(r217768)
@@ -87,6 +87,8 @@ RW_SYSINIT(sdplockinit, &sdp_lock, "SDP 
 
 MALLOC_DEFINE(M_SDP, "sdp", "Socket Direct Protocol");
 
+static void sdp_stop_keepalive_timer(struct socket *so);
+
 /*
  * SDP protocol interface to socket abstraction.
  */
@@ -99,6 +101,19 @@ u_long	sdp_recvspace = 1024*64;
 
 static int sdp_count;
 
+/*
+ * Disable async. CMA events for sockets which are being torn down.
+ */
+static void
+sdp_destroy_cma(struct sdp_sock *ssk)
+{
+
+	if (ssk->id == NULL)
+		return;
+	rdma_destroy_id(ssk->id);
+	ssk->id = NULL;
+}
+
 static int
 sdp_pcbbind(struct sdp_sock *ssk, struct sockaddr *nam, struct ucred *cred)
 {
@@ -128,30 +143,15 @@ sdp_pcbbind(struct sdp_sock *ssk, struct
 	}
 	error = -rdma_bind_addr(ssk->id, nam);
 	SDP_WLOCK(ssk);
-	if (error) {
-		rdma_destroy_id(ssk->id);
-		ssk->id = NULL;
-	} else {
+	if (error == 0) {
 		sin = (struct sockaddr_in *)&ssk->id->route.addr.src_addr;
 		ssk->laddr = sin->sin_addr.s_addr;
 		ssk->lport = sin->sin_port;
-	}
+	} else
+		sdp_destroy_cma(ssk);
 	return (error);
 }
 
-/*
- * Disable async. CMA events for sockets which are being torn down.
- */
-static void
-sdp_destroy_cma(struct sdp_sock *ssk)
-{
-
-	if (ssk->id == NULL)
-		return;
-	rdma_destroy_id(ssk->id);
-	ssk->id = NULL;
-}
-
 static void
 sdp_pcbfree(struct sdp_sock *ssk)
 {
@@ -176,6 +176,7 @@ sdp_pcbfree(struct sdp_sock *ssk)
 	sdp_rx_ring_destroy(ssk);
 	rw_destroy(&ssk->rx_ring.destroyed_lock);
 	uma_zfree(sdp_zone, ssk);
+	rw_destroy(&ssk->lock);
 }
 
 /*
@@ -267,6 +268,85 @@ sdp_apply_all(void (*func)(struct sdp_so
 #endif
 
 static void
+sdp_output_reset(struct sdp_sock *ssk)
+{
+	struct rdma_cm_id *id;
+
+	SDP_WLOCK_ASSERT(ssk);
+	if (ssk->id) {
+		id = ssk->id;
+		ssk->qp_active = 0;
+		SDP_WUNLOCK(ssk);
+		rdma_disconnect(id);
+		SDP_WLOCK(ssk);
+	}
+	ssk->state = TCPS_CLOSED;
+}
+
+/*
+ * Attempt to close a SDP socket, marking it as dropped, and freeing
+ * the socket if we hold the only reference.
+ */
+static struct sdp_sock *
+sdp_closed(struct sdp_sock *ssk)
+{
+	struct socket *so;
+
+	SDP_WLOCK_ASSERT(ssk);
+
+	ssk->flags |= SDP_DROPPED;
+	so = ssk->socket;
+	soisdisconnected(so);
+	if (ssk->flags & SDP_SOCKREF) {
+		KASSERT(so->so_state & SS_PROTOREF,
+		    ("sdp_closed: !SS_PROTOREF"));
+		ssk->flags &= ~SDP_SOCKREF;
+		SDP_WUNLOCK(ssk);
+		ACCEPT_LOCK();
+		SOCK_LOCK(so);
+		so->so_state &= ~SS_PROTOREF;
+		sofree(so);
+		return (NULL);
+	}
+	return (ssk);
+}
+
+/*
+ * Perform timer based shutdowns which can not operate in
+ * callout context.
+ */
+static void
+sdp_shutdown_task(void *data, int pending)
+{
+	struct sdp_sock *ssk;
+
+	ssk = data;
+	SDP_WLOCK(ssk);
+	/*
+	 * I don't think this can race with another call to pcbfree()
+	 * because SDP_TIMEWAIT protects it.  SDP_DESTROY may be redundant.
+	 */
+	if (ssk->flags & SDP_DESTROY)
+		panic("sdp_shutdown_task: Racing with pcbfree for ssk %p",
+		    ssk);
+	if (ssk->flags & SDP_DISCON)
+		sdp_output_reset(ssk);
+	/* We have to clear this so sdp_detach() will call pcbfree(). */
+	ssk->flags &= ~(SDP_TIMEWAIT | SDP_DREQWAIT);
+	if ((ssk->flags & SDP_DROPPED) == 0 &&
+	    sdp_closed(ssk) == NULL)
+		return;
+	if (ssk->socket == NULL) {
+		sdp_pcbfree(ssk);
+		return;
+	}
+	SDP_WUNLOCK(ssk);
+}
+
+/*
+ * 2msl has expired, schedule the shutdown task.
+ */
+static void
 sdp_2msl_timeout(void *data)
 {
 	struct sdp_sock *ssk;
@@ -277,15 +357,17 @@ sdp_2msl_timeout(void *data)
 		goto out;
         callout_deactivate(&ssk->keep2msl);
 	/* Should be impossible, defensive programming. */
-	if ((ssk->flags & (SDP_TIMEWAIT | SDP_DROPPED)) == 0)
+	if ((ssk->flags & SDP_TIMEWAIT) == 0)
 		goto out;
-	sdp_pcbfree(ssk);
-	return;
+	taskqueue_enqueue(taskqueue_thread, &ssk->shutdown_task);
 out:
 	SDP_WUNLOCK(ssk);
 	return;
 }
 
+/*
+ * Schedule the 2msl wait timer.
+ */
 static void
 sdp_2msl_wait(struct sdp_sock *ssk)
 {
@@ -293,9 +375,13 @@ sdp_2msl_wait(struct sdp_sock *ssk)
 	SDP_WLOCK_ASSERT(ssk);
 	ssk->flags |= SDP_TIMEWAIT;
 	ssk->state = TCPS_TIME_WAIT;
+	soisdisconnected(ssk->socket);
 	callout_reset(&ssk->keep2msl, TCPTV_MSL, sdp_2msl_timeout, ssk);
 }
 
+/*
+ * Timed out waiting for the final fin/ack from rdma_disconnect().
+ */
 static void
 sdp_dreq_timeout(void *data)
 {
@@ -314,21 +400,16 @@ sdp_dreq_timeout(void *data)
 	if ((ssk->flags & SDP_DREQWAIT) == 0)
 		goto out;
 	ssk->flags &= ~SDP_DREQWAIT;
+	ssk->flags |= SDP_DISCON;
 	sdp_2msl_wait(ssk);
 	ssk->qp_active = 0;
-	if (ssk->id) {
-		struct rdma_cm_id *id;
-
-		id = ssk->id;
-		ssk->id = NULL;
-		SDP_WUNLOCK(ssk);
-		rdma_disconnect(id);
-		return;
-	}
 out:
 	SDP_WUNLOCK(ssk);
 }
 
+/*
+ * Received the final fin/ack.  Cancel the 2msl.
+ */
 void
 sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk)
 {
@@ -345,6 +426,7 @@ sdp_init_sock(struct socket *sk)
 	sdp_dbg(sk, "%s\n", __func__);
 
 	callout_init_rw(&ssk->keep2msl, &ssk->lock, CALLOUT_RETURNUNLOCKED);
+	TASK_INIT(&ssk->shutdown_task, 0, sdp_shutdown_task, ssk);
 #ifdef SDP_ZCOPY
 	INIT_DELAYED_WORK(&ssk->srcavail_cancel_work, srcavail_cancel_timeout);
 	ssk->zcopy_thresh = -1; /* use global sdp_zcopy_thresh */
@@ -375,9 +457,10 @@ sdp_attach(struct socket *so, int proto,
 	}
 	so->so_rcv.sb_flags |= SB_AUTOSIZE;
 	so->so_snd.sb_flags |= SB_AUTOSIZE;
-	ssk = uma_zalloc(sdp_zone, M_NOWAIT);
+	ssk = uma_zalloc(sdp_zone, M_NOWAIT | M_ZERO);
 	if (ssk == NULL)
 		return (ENOBUFS);
+	rw_init(&ssk->lock, "sdpsock");
 	ssk->socket = so;
 	ssk->cred = crhold(so->so_cred);
 	so->so_pcb = (caddr_t)ssk;
@@ -409,7 +492,7 @@ sdp_detach(struct socket *so)
 	KASSERT(ssk->socket != NULL, ("sdp_detach: socket is NULL"));
 	ssk->socket->so_pcb = NULL;
 	ssk->socket = NULL;
-	if (ssk->flags & SDP_TIMEWAIT)
+	if (ssk->flags & (SDP_TIMEWAIT | SDP_DREQWAIT))
 		SDP_WUNLOCK(ssk);
 	else if (ssk->flags & SDP_DROPPED || ssk->state < TCPS_SYN_SENT)
 		sdp_pcbfree(ssk);
@@ -545,51 +628,6 @@ sdp_connect(struct socket *so, struct so
 }
 
 /*
- * Attempt to close a SDP socket, marking it as dropped, and freeing
- * the socket if we hold the only reference.
- */
-static struct sdp_sock *
-sdp_closed(struct sdp_sock *ssk)
-{
-	struct socket *so;
-
-	SDP_WLOCK_ASSERT(ssk);
-
-	ssk->flags |= SDP_DROPPED;
-	so = ssk->socket;
-	soisdisconnected(so);
-	if (ssk->flags & SDP_SOCKREF) {
-		KASSERT(so->so_state & SS_PROTOREF,
-		    ("sdp_closed: !SS_PROTOREF"));
-		ssk->flags &= ~SDP_SOCKREF;
-		SDP_WUNLOCK(ssk);
-		ACCEPT_LOCK();
-		SOCK_LOCK(so);
-		so->so_state &= ~SS_PROTOREF;
-		sofree(so);
-		return (NULL);
-	}
-	return (ssk);
-}
-
-static void
-sdp_output_reset(struct sdp_sock *ssk)
-{
-	struct rdma_cm_id *id;
-
-	SDP_WLOCK_ASSERT(ssk);
-	if (ssk->id) {
-		id = ssk->id;
-		ssk->qp_active = 0;
-		ssk->id = NULL;
-		SDP_WUNLOCK(ssk);
-		rdma_disconnect(id);
-		SDP_WLOCK(ssk);
-	}
-	ssk->state = TCPS_CLOSED;
-}
-
-/*
  * Drop a SDP socket, reporting
  * the specified error.  If connection is synchronized,
  * then send a RST to peer.
@@ -627,10 +665,12 @@ sdp_usrclosed(struct sdp_sock *ssk)
 
 	switch (ssk->state) {
 	case TCPS_LISTEN:
+		ssk->state = TCPS_CLOSED;
+		SDP_WUNLOCK(ssk);
 		sdp_destroy_cma(ssk);
+		SDP_WLOCK(ssk);
 		/* FALLTHROUGH */
 	case TCPS_CLOSED:
-		ssk->state = TCPS_CLOSED;
 		ssk = sdp_closed(ssk);
 		/*
 		 * sdp_closed() should never return NULL here as the socket is
@@ -641,13 +681,13 @@ sdp_usrclosed(struct sdp_sock *ssk)
 		break;
 
 	case TCPS_SYN_SENT:
-		sdp_destroy_cma(ssk);
 		/* FALLTHROUGH */
 	case TCPS_SYN_RECEIVED:
 		ssk->flags |= SDP_NEEDFIN;
 		break;
 
 	case TCPS_ESTABLISHED:
+		ssk->flags |= SDP_NEEDFIN;
 		ssk->state = TCPS_FIN_WAIT_1;
 		break;
 
@@ -656,10 +696,11 @@ sdp_usrclosed(struct sdp_sock *ssk)
 		break;
 	}
 	if (ssk->state >= TCPS_FIN_WAIT_2) {
-		soisdisconnected(ssk->socket);
 		/* Prevent the connection hanging in FIN_WAIT_2 forever. */
 		if (ssk->state == TCPS_FIN_WAIT_2)
 			sdp_2msl_wait(ssk);
+		else
+			soisdisconnected(ssk->socket);
 	}
 }
 
@@ -686,9 +727,11 @@ static void
 sdp_start_disconnect(struct sdp_sock *ssk)
 {
 	struct socket *so;
+	int unread;
 
 	so = ssk->socket;
 	SDP_WLOCK_ASSERT(ssk);
+	sdp_stop_keepalive_timer(so);
 	/*
 	 * Neither sdp_closed() nor sdp_drop() should return NULL, as the
 	 * socket is still open.
@@ -703,10 +746,15 @@ sdp_start_disconnect(struct sdp_sock *ss
 		    ("sdp_start_disconnect: sdp_drop() returned NULL"));
 	} else {
 		soisdisconnecting(so);
+		unread = so->so_rcv.sb_cc;
 		sbflush(&so->so_rcv);
 		sdp_usrclosed(ssk);
-		if (!(ssk->flags & SDP_DROPPED))
-			sdp_output_disconnect(ssk);
+		if (!(ssk->flags & SDP_DROPPED)) {
+			if (unread)
+				sdp_output_reset(ssk);
+			else
+				sdp_output_disconnect(ssk);
+		}
 	}
 }
 
@@ -756,6 +804,8 @@ sdp_accept(struct socket *so, struct soc
 	if (so->so_state & SS_ISDISCONNECTED)
 		return (ECONNABORTED);
 
+	port = 0;
+	addr.s_addr = 0;
 	error = 0;
 	ssk = sdp_sk(so);
 	SDP_WLOCK(ssk);
@@ -799,9 +849,10 @@ out:
 }
 
 static void
-sdp_append(struct sockbuf *sb, struct mbuf *mb)
+sdp_append(struct sdp_sock *ssk, struct sockbuf *sb, struct mbuf *mb, int cnt)
 {
 	struct mbuf *n;
+	int ncnt;
 
 	SOCKBUF_LOCK_ASSERT(sb);
 	SBLASTRECORDCHK(sb)
@@ -820,24 +871,31 @@ sdp_append(struct sockbuf *sb, struct mb
 		return;
 	}
 	/*
+	 * Count the number of mbufs in the current tail.
+	 */
+	for (ncnt = 0; n->m_next; n = n->m_next)
+		ncnt++;
+	n = sb->sb_lastrecord;
+	/*
 	 * If the two chains can fit in a single sdp packet and
 	 * the last record has not been sent yet (WRITABLE) coalesce
 	 * them.  The lastrecord remains the same but we must strip the
 	 * packet header and then let sbcompress do the hard part.
 	 */
-	if (M_WRITABLE(n) &&
+	if (M_WRITABLE(n) && ncnt + cnt < SDP_MAX_SEND_SGES &&
 	    n->m_pkthdr.len + mb->m_pkthdr.len - SDP_HEAD_SIZE <
-	    SDP_MAX_PAYLOAD) {
+	    ssk->xmit_size_goal) {
+		m_adj(mb, SDP_HEAD_SIZE);
 		n->m_pkthdr.len += mb->m_pkthdr.len;
 		n->m_flags |= mb->m_flags & (M_PUSH | M_URG);
 		m_demote(mb, 1);
-		m_adj(mb, SDP_HEAD_SIZE);
-		sbcompress(sb, mb, n);
+		sbcompress(sb, mb, sb->sb_mbtail);
 		return;
 	}
 	/*
-	 * Not compressable, just append to the end and adjust counters.
+	 * Not compressible, just append to the end and adjust counters.
 	 */
+	sb->sb_lastrecord->m_flags |= M_PUSH;
 	sb->sb_lastrecord->m_nextpkt = mb;
 	sb->sb_lastrecord = mb;
 	if (sb->sb_sndptr == NULL)
@@ -861,14 +919,29 @@ static int
 sdp_send(struct socket *so, int flags, struct mbuf *m,
     struct sockaddr *nam, struct mbuf *control, struct thread *td)
 {
-	int error = 0;
 	struct sdp_sock *ssk;
+	struct mbuf *n;
+	int error;
+	int cnt;
 
+	error = 0;
 	ssk = sdp_sk(so);
 	KASSERT(m->m_flags & M_PKTHDR,
 	    ("sdp_send: %p no packet header", m));
-	M_PREPEND(m, sizeof(struct sdp_bsdh), M_WAIT);
+	M_PREPEND(m, SDP_HEAD_SIZE, M_WAIT);
 	mtod(m, struct sdp_bsdh *)->mid = SDP_MID_DATA; 
+	for (n = m, cnt = 0; n->m_next; n = n->m_next)
+		cnt++;
+	if (cnt > SDP_MAX_SEND_SGES) {
+		n = m_collapse(m, M_WAIT, SDP_MAX_SEND_SGES);
+		if (n == NULL) {
+			m_freem(m);
+			return (EMSGSIZE);
+		}
+		m = n;
+		for (cnt = 0; n->m_next; n = n->m_next)
+			cnt++;
+	}
 	SDP_WLOCK(ssk);
 	if (ssk->flags & (SDP_TIMEWAIT | SDP_DROPPED)) {
 		if (control)
@@ -891,7 +964,7 @@ sdp_send(struct socket *so, int flags, s
 	}
 	if (!(flags & PRUS_OOB)) {
 		SOCKBUF_LOCK(&so->so_snd);
-		sdp_append(&so->so_snd, m);
+		sdp_append(ssk, &so->so_snd, m, cnt);
 		SOCKBUF_UNLOCK(&so->so_snd);
 		if (nam && ssk->state < TCPS_SYN_SENT) {
 			/*
@@ -908,8 +981,10 @@ sdp_send(struct socket *so, int flags, s
 			 */
 			socantsendmore(so);
 			sdp_usrclosed(ssk);
-		}
-		if (!(ssk->flags & SDP_DROPPED) && !(flags & PRUS_MORETOCOME))
+			if (!(ssk->flags & SDP_DROPPED))
+				sdp_output_disconnect(ssk);
+		} else if (!(ssk->flags & SDP_DROPPED) &&
+		    !(flags & PRUS_MORETOCOME))
 			sdp_post_sends(ssk, M_NOWAIT);
 		SDP_WUNLOCK(ssk);
 		return (0);
@@ -929,7 +1004,8 @@ sdp_send(struct socket *so, int flags, s
 		 * of data past the urgent section.
 		 * Otherwise, snd_up should be one lower.
 		 */
-		sdp_append(&so->so_snd, m);
+		m->m_flags |= M_URG | M_PUSH;
+		sdp_append(ssk, &so->so_snd, m, cnt);
 		SOCKBUF_UNLOCK(&so->so_snd);
 		if (nam && ssk->state < TCPS_SYN_SENT) {
 			/*
@@ -939,7 +1015,6 @@ sdp_send(struct socket *so, int flags, s
 			if (error)
 				goto out;
 		}
-		m->m_flags |= M_URG | M_PUSH;
 		sdp_post_sends(ssk, M_NOWAIT);
 		SDP_WUNLOCK(ssk);
 		return (0);
@@ -968,6 +1043,7 @@ static int
 sdp_sosend(struct socket *so, struct sockaddr *addr, struct uio *uio,
     struct mbuf *top, struct mbuf *control, int flags, struct thread *td)
 {
+	struct sdp_sock *ssk;
 	long space, resid;
 	int atomic;
 	int error;
@@ -1005,6 +1081,7 @@ sdp_sosend(struct socket *so, struct soc
 	if (td != NULL)
 		td->td_ru.ru_msgsnd++;
 
+	ssk = sdp_sk(so);
 	error = sblock(&so->so_snd, SBLOCKWAIT(flags));
 	if (error)
 		goto out;
@@ -1031,7 +1108,7 @@ restart:
 		space = sbspace(&so->so_snd);
 		if (flags & MSG_OOB)
 			space += 1024;
-		if (atomic && resid > SDP_MAX_PAYLOAD) {
+		if (atomic && resid > ssk->xmit_size_goal - SDP_HEAD_SIZE) {
 			SOCKBUF_UNLOCK(&so->so_snd);
 			error = EMSGSIZE;
 			goto release;
@@ -1061,10 +1138,10 @@ restart:
 				 * chain.  If no data is to be copied in,
 				 * a single empty mbuf is returned.
 				 */
-				/* XXX Should be tx target? */
-				copy = min(space, SDP_MAX_PAYLOAD);
+				copy = min(space,
+				    ssk->xmit_size_goal - SDP_HEAD_SIZE);
 				top = m_uiotombuf(uio, M_WAITOK, copy,
-				    sizeof(struct sdp_bsdh), M_PKTHDR |
+				    0, M_PKTHDR |
 				    ((flags & MSG_EOR) ? M_EOR : 0));
 				if (top == NULL) {
 					/* only possible error */
@@ -1144,6 +1221,7 @@ sdp_sorecv(struct socket *so, struct soc
 	int len = 0, error = 0, flags, oresid;
 	struct sockbuf *sb;
 	struct mbuf *m, *n = NULL;
+	struct sdp_sock *ssk;
 
 	/* We only do stream sockets. */
 	if (so->so_type != SOCK_STREAM)
@@ -1162,6 +1240,7 @@ sdp_sorecv(struct socket *so, struct soc
 		*mp0 = NULL;
 
 	sb = &so->so_rcv;
+	ssk = sdp_sk(so);
 
 	/* Prevent other readers from entering the socket. */
 	error = sblock(sb, SBLOCKWAIT(flags));
@@ -1310,9 +1389,9 @@ deliver:
 		    (((flags & MSG_WAITALL) && uio->uio_resid > 0) ||
 		     !(flags & MSG_SOCALLBCK))) {
 			SOCKBUF_UNLOCK(sb);
-			SDP_WLOCK(sdp_sk(so));
-			sdp_do_posts(sdp_sk(so));
-			SDP_WUNLOCK(sdp_sk(so));
+			SDP_WLOCK(ssk);
+			sdp_do_posts(ssk);
+			SDP_WUNLOCK(ssk);
 			SOCKBUF_LOCK(sb);
 		}
 	}
@@ -1424,15 +1503,6 @@ out:
 	return (error);
 }
 
-static int
-sdp_sock_init(void *mem, int size, int flags)
-{
-	struct sdp_sock *ssk = mem;
-
-	rw_init(&ssk->lock, "sdpsock");
-	return (0);
-}
-
 void
 sdp_urg(struct sdp_sock *ssk, struct mbuf *mb)
 {
@@ -1531,11 +1601,9 @@ sdp_start_keepalive_timer(struct socket 
 	struct sdp_sock *ssk;
 
 	ssk = sdp_sk(so);
-	SDP_WLOCK(ssk);
 	if (!callout_pending(&ssk->keep2msl))
                 callout_reset(&ssk->keep2msl, SDP_KEEPALIVE_TIME,
                     sdp_keepalive_timeout, ssk);
-	SDP_WUNLOCK(ssk);
 }
 
 static void
@@ -1544,9 +1612,7 @@ sdp_stop_keepalive_timer(struct socket *
 	struct sdp_sock *ssk;
 
 	ssk = sdp_sk(so);
-	SDP_WLOCK(ssk);
 	callout_stop(&ssk->keep2msl);
-	SDP_WUNLOCK(ssk);
 }
 
 /*
@@ -1572,10 +1638,12 @@ sdp_ctloutput(struct socket *so, struct 
 	error = 0;
 	ssk = sdp_sk(so);
 	if (sopt->sopt_level == SOL_SOCKET && sopt->sopt_name == SO_KEEPALIVE) {
+		SDP_WLOCK(ssk);
 		if (so->so_options & SO_KEEPALIVE)
 			sdp_start_keepalive_timer(so);
 		else
 			sdp_stop_keepalive_timer(so);
+		SDP_WUNLOCK(ssk);
 	}
 	if (sopt->sopt_level != IPPROTO_TCP)
 		return (error);
@@ -1602,6 +1670,7 @@ sdp_ctloutput(struct socket *so, struct 
 				ssk->flags |= opt;
 			else
 				ssk->flags &= ~opt;
+			sdp_do_posts(ssk);
 			SDP_WUNLOCK(ssk);
 			break;
 
@@ -1756,8 +1825,10 @@ sdp_pcblist(SYSCTL_HANDLER_ARGS)
 			    ssk->socket);
 		else
 			error = EINVAL;
-		if (error)
+		if (error) {
+			error = 0;
 			goto next;
+		}
 
 		bzero(&xt, sizeof(xt));
 		xt.xt_len = sizeof xt;
@@ -1775,6 +1846,8 @@ sdp_pcblist(SYSCTL_HANDLER_ARGS)
 		xt.xt_socket.xso_protocol = IPPROTO_TCP;
 		SDP_RUNLOCK(ssk);
 		error = SYSCTL_OUT(req, &xt, sizeof xt);
+		if (error)
+			break;
 		i++;
 		continue;
 next:
@@ -1815,7 +1888,7 @@ sdp_init(void)
 
 	LIST_INIT(&sdp_list);
 	sdp_zone = uma_zcreate("sdp_sock", sizeof(struct sdp_sock),
-	    NULL, NULL, sdp_sock_init, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
+	    NULL, NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
 	uma_zone_set_max(sdp_zone, maxsockets);
 	EVENTHANDLER_REGISTER(maxsockets_change, sdp_zone_change, NULL,
 		EVENTHANDLER_PRI_ANY);

Modified: projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_rx.c
==============================================================================
--- projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_rx.c	Mon Jan 24 04:32:59 2011	(r217767)
+++ projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_rx.c	Mon Jan 24 06:03:58 2011	(r217768)
@@ -65,7 +65,6 @@ sdp_handle_disconn(struct sdp_sock *ssk)
 			struct rdma_cm_id *id;
 
 			id = ssk->id;
-			ssk->id = NULL;
 			SDP_WUNLOCK(ssk);
 			rdma_disconnect(id);
 			SDP_WLOCK(ssk);
@@ -177,8 +176,7 @@ sdp_post_recvs_needed(struct sdp_sock *s
 		unsigned long bytes_in_process =
 			(rx_ring_posted(ssk) - SDP_MIN_TX_CREDITS) *
 			buffer_size;
-		bytes_in_process += rcv_nxt(ssk);
-
+		bytes_in_process += ssk->socket->so_rcv.sb_cc;
 		if (bytes_in_process >= max_bytes) {
 			sdp_prf(ssk->socket, NULL,
 				"bytes_in_process:%ld > max_bytes:%ld",
@@ -645,7 +643,7 @@ sdp_do_posts(struct sdp_sock *ssk)
 
 }
 
-static void
+int
 sdp_process_rx(struct sdp_sock *ssk)
 {
 	int wc_processed = 0;
@@ -653,7 +651,7 @@ sdp_process_rx(struct sdp_sock *ssk)
 
 	if (!rx_ring_trylock(&ssk->rx_ring)) {
 		sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
-		return;
+		return 0;
 	}
 
 	credits_before = tx_credits(ssk);
@@ -669,6 +667,8 @@ sdp_process_rx(struct sdp_sock *ssk)
 	sdp_arm_rx_cq(ssk);
 
 	rx_ring_unlock(&ssk->rx_ring);
+
+	return (wc_processed);
 }
 
 static void
@@ -722,6 +722,7 @@ sdp_rx_ring_create(struct sdp_sock *ssk,
 
 
 	sdp_dbg(ssk->socket, "rx ring created");
+	INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
 	atomic_set(&ssk->rx_ring.head, 1);
 	atomic_set(&ssk->rx_ring.tail, 1);
 
@@ -745,9 +746,6 @@ sdp_rx_ring_create(struct sdp_sock *ssk,
 	}
 
 	sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
-
-	INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
-
 	sdp_arm_rx_cq(ssk);
 
 	return 0;

Modified: projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_tx.c
==============================================================================
--- projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_tx.c	Mon Jan 24 04:32:59 2011	(r217767)
+++ projects/ofed/head/sys/ofed/drivers/infiniband/ulp/sdp/sdp_tx.c	Mon Jan 24 06:03:58 2011	(r217768)
@@ -307,11 +307,10 @@ sdp_process_tx_cq(struct sdp_sock *ssk)
 		sowwakeup(ssk->socket);
 		/*
 		 * If there is no room in the tx queue we arm the tx cq
-		 * to force an interrupt.  sb_notify() isn't a precise
-		 * measure if being out of space but is very cheap and
-		 * should be close enough.
+		 * to force an interrupt.
 		 */
-		if (tx_ring_posted(ssk) && sb_notify(&sk->so_snd)) {
+		if (tx_ring_posted(ssk) && sk->so_snd.sb_cc >=
+		    sk->so_snd.sb_mbmax - ssk->xmit_size_goal) {
 			sdp_prf(ssk->socket, NULL, "pending tx - rearming");
 			sdp_arm_tx_cq(ssk);
 		}
@@ -462,6 +461,7 @@ sdp_tx_ring_create(struct sdp_sock *ssk,
 	}
 	ssk->tx_ring.cq = tx_cq;
 	ssk->tx_ring.poll_cnt = 0;
+	sdp_arm_tx_cq(ssk);
 
 	return 0;
 
@@ -481,6 +481,8 @@ sdp_tx_ring_destroy(struct sdp_sock *ssk
 	callout_stop(&ssk->tx_ring.timer);
 	callout_stop(&ssk->nagle_timer);
 	SDP_WUNLOCK(ssk);
+	callout_drain(&ssk->tx_ring.timer);
+	callout_drain(&ssk->nagle_timer);
 
 	if (ssk->tx_ring.buffer) {
 		sdp_tx_ring_purge(ssk);


More information about the svn-src-projects mailing list