PERFORCE change 166270 for review

Andre Oppermann andre at FreeBSD.org
Sun Jul 19 15:44:48 UTC 2009


http://perforce.freebsd.org/chv.cgi?CH=166270

Change 166270 by andre at andre_t61 on 2009/07/19 15:44:00

	Experiment with a rewritten TCP reassembly queue that uses a ranged
	red-black tree to store the received data blocks.
	
	Advantages are a simpler structure and O(log n) insertion/removal in
	all complexity cases compared to a tail queue.

Affected files ...

.. //depot/projects/tcp_reass/netinet/tcp_reass.c#34 edit
.. //depot/projects/tcp_reass/netinet/tcp_var.h#18 edit

Differences ...

==== //depot/projects/tcp_reass/netinet/tcp_reass.c#34 (text+ko) ====

@@ -1,5 +1,5 @@
 /*-
- * Copyright (c) 2007
+ * Copyright (c) 2007-2009
  *	Andre Oppermann, Internet Business Solutions AG.  All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -50,26 +50,11 @@
  * queue.
  *
  * Instead of storing all segments on their own we build blocks of consequtive
- * segments chained together.  We use a tailq because a new segments has the
- * highest probability to fit the tail of the chain.  If not, the second
- * highest probability is the beginning of the chain for being the missing
- * segment.  Otherwise we cycle through each consequtive block until a match
- * is found.  If a segment matches the end of one block and the start of the
+ * segments chained together.  We use a red-black tree to cope with arbitrary
+ * complexity.  If a segment matches the end of one block and the start of the
  * next block the two blocks are joined together.  If no match is found a
  * new block is created.
  *
- * This system is very efficient and can deal efficiently with long chains 
- * and many holes.
- *
- * trq_tail ----------------------------------------------\
- * trq_head --> [block] ------>	[block] ------>	[block] <-/
- *		m_next		m_next		m_next
- *		   |		   |		   |
- *		m_next		m_next		m_next
- *		   |		   |		   |
- *		m_next		m_next		m_next
- *
- *
  * The reassembly queues block structure is also used to track SACK
  * information as a data receiver.  A double-linked list is added
  * that links the blocks the reverse order of their arrival or updating.
@@ -127,16 +112,11 @@
     &tcp_reass_enabled, 0,
     "Enable/disable use of TCP Reassembly Queue");
 
-static int tcp_reass_maxblocks = 0;
+static int tcp_reass_maxblocks = 65535;
 SYSCTL_INT(_net_inet_tcp_reass, OID_AUTO, maxblocks, CTLFLAG_RDTUN,
     &tcp_reass_maxblocks, 0,
     "Global maximum number of TCP Segment Blocks in Reassembly Queue");
 
-int tcp_reass_qsize = 0;
-SYSCTL_INT(_net_inet_tcp_reass, OID_AUTO, curblocks, CTLFLAG_RD,
-    &tcp_reass_qsize, 0,
-    "Global number of TCP Segment Blocks currently in Reassembly Queue");
-
 static int tcp_reass_qtimo = 0;
 SYSCTL_INT(_net_inet_tcp_reass, OID_AUTO, queue_timeout, CTLFLAG_RW,
     &tcp_reass_qtimo, 0,
@@ -147,17 +127,8 @@
     &tcp_reass_spacetime, 0,
     "Reassembly Queue strategy of space vs. time efficiency");
 
-static void	tcp_reass_merge(struct tcpcb *, struct trq *, struct trq *);
-
-static __inline void
-sack_track(struct tcpcb *tp, struct trq *tqe)
-{
-
-	if (LIST_FIRST(&tp->t_trq_sack) != (tqe)) {
-		LIST_REMOVE((tqe), trq_s);
-		LIST_INSERT_HEAD(&tp->t_trq_sack, (tqe), trq_s);
-	}
-}
+static struct tcp_reass_block
+    tcp_reass_merge(struct tcpcb *, struct tcp_reass_block *, struct tcp_reass_block *);
 
 /* Trim empty mbufs from head of chain. */
 static struct mbuf *
@@ -195,46 +166,116 @@
 	uma_zone_set_max(tcp_reass_zone, tcp_reass_maxblocks);
 }
 
+/*
+ * Initialize TCP reassembly zone on startup.
+ */
+void
+tcp_reass_init(void)
+{
+
+	TUNABLE_INT_FETCH("net.inet.tcp.reass.maxblocks",
+	    &tcp_reass_maxblocks);
+	tcp_reass_zone = uma_zcreate("tcpreass", sizeof(struct trb),
+	    NULL, NULL, NULL, NULL, UMA_ALIGN_PTR, 0);
+	uma_zone_set_max(tcp_reass_zone, tcp_reass_maxblocks);
+	EVENTHANDLER_REGISTER(nmbclusters_change,
+	    tcp_reass_zone_change, NULL, EVENTHANDLER_PRI_ANY);
+}
+
+/*
+ * Compare function implementing the ranged lookup on the RB tree.
+ * NB: The tree must never have any overlapping elements.
+ */
+static __inline int
+tcp_reass_cmp(struct tcp_reass_block *a, struct tcp_reass_block *b)
+{
+	if (SEQ_LT(a->trb_seqe, b->trb_seqs))
+		return (-1);
+	else if (SEQ_GT(a->trb_seqs, b->trb_seqe))
+		return (1);
+	else
+		return (0);
+}
+
+RB_PROTOTYPE_STATIC(tcp_ra, tcp_reass_block, trb_rb, tcp_reass_cmp);
+RB_GENERATE_STATIC(tcp_ra, tcp_reass_block, trb_rb, tcp_reass_cmp);
+
 #ifdef INVARIANTS
 static int
 tcp_reass_verify(struct tcpcb *tp)
 {
-	struct trq *tqe, *tqen;
-	int i = 0;
+	int i = 0, size = 0, total = 0;
+	struct mbuf *m;
+	struct tcp_reass_block *trb, *trbn;
 
-	TAILQ_FOREACH_SAFE(tqe, &tp->t_trq, trq_q, tqen) {
-		KASSERT(SEQ_GEQ(tqe->trq_seq, tp->rcv_nxt),
-		    ("%s: trq_seq < rcv_nxt", __func__));
-		KASSERT(tqen == NULL ||
-		    SEQ_LT(tqe->trq_seq + tqe->trq_len, tqen->trq_seq),
-		    ("%s: overlapping blocks", __func__));
+	RB_FOREACH_SAFE(trb, tcp_rb, &tp->rcv_reass, trbn) {
+		KASSERT(SEQ_LT(trb->trb_seqs, trb->trb_seqe),
+		    ("%s: trb_seqs >= trb_seqe", __func__));
+		KASSERT(SEQ_GT(trb->trb_seqs, tp->rcv_nxt),
+		    ("%s: rcv_nxt >= trb_seqs", __func__));
+		KASSERT(trb->trb_m != NULL,
+		    ("%s: trb_m == NULL", __func__));
+		KASSERT(trb->trb_mt != NULL,
+		    ("%s: trb_mt == NULL", __func__));
+		size = SEQ_DELTA(trb->trb_seqs, tsb->trb_seqe);
+		KASSERT(size == m_length(trb->trb_m, &m),
+		    ("%s: seq# size != actual mbuf size", __func__));
+		KASSERT(trb->trb_mt == m,
+		    ("%s: trb_mt is not last mbuf", __func__));
+		KASSERT(tsbn == NULL || SEQ_LT(tsb->trb_seqe, tsbn->trb_seqs),
+		    ("%s: overlaps into next block", __func__));
+		total += size;
 		i++;
 	}
-	LIST_FOREACH(tqe, &tp->t_trq_sack, trq_s) {
+	KASSERT(tp->rcv_reass_size == total,
+	    ("%s: total not correct", __func__));
+
+	LIST_FOREACH(tcp_reass_block, &tp->rcv_reass_sack, trb_sack) {
 		i--;
 	}
-	KASSERT(i == 0, ("%s: SEQ# ordered tailq and arrival ordered "
-	    "SACK list are not equally long", __func__));
+	KASSERT(i == 0,
+	    ("%s: sack list incorrect", __func__));
+
 	return (0);
 }
 #endif
 
-/*
- * Initialize TCP reassembly zone on startup.
- */
+static void
+tcp_reass_free(struct tcpcb *tp, struct tcp_reass_block *trb)
+{
+
+	trb = RB_REMOVE(tcp_ra, &tp->rcv_reass, trb);
+	KASSERT(trb != NULL, ("%s: RB_REMOVE failed", __func__));
+	LIST_REMOVE(trb, trb_sack);
+	if (trb->trb_m != NULL)
+		m_freem(trb->trb_m);
+	tp->rcv_reass_size -= SEQ_DELTA(trb->trb_seqs, trb->trb_seqe);
+	uma_zfree(tcp_reass_zone, trb);
+}
+
 void
-tcp_reass_init(void)
+tcp_reass_flush(struct tcpcb *tp)
+{
+	struct tcp_reass_block *trb, *trbn;
+
+	INP_WLOCK_ASSERT(tp->t_inpcb);
+	KASSERT(tcp_reass_verify(tp),
+	    ("%s: reassembly queue inconsistent", __func__));
+
+	RB_FOREACH_SAFE(trb, tcp_rb, &tp->rcv_reass, trbn) {
+		tcp_reass_free(tp, trb);
+	}
+	KASSERT(tp->rcv_reass_size == 0, ("%s: snd_sacked not zero", __func__));
+}
+
+static __inline void
+tcp_reass_sacktrack(struct tcpcb *tp, struct tcp_reass_block *trb)
 {
 
-	/* XXX: nmbclusters may be zero. */
-	tcp_reass_maxblocks = nmbclusters / 16;
-	TUNABLE_INT_FETCH("net.inet.tcp.reass.maxblocks",
-	    &tcp_reass_maxblocks);
-	tcp_reass_zone = uma_zcreate("tcpreass", sizeof (struct trq),
-	    NULL, NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
-	uma_zone_set_max(tcp_reass_zone, tcp_reass_maxblocks);
-	EVENTHANDLER_REGISTER(nmbclusters_change,
-	    tcp_reass_zone_change, NULL, EVENTHANDLER_PRI_ANY);
+	if (LIST_FIRST(&tp->rcv_reass_sack) != trb) {
+		LIST_REMOVE(trb, trb_sack);
+		LIST_INSERT_HEAD(&tp->rcv_reass_sack, trb, trb_sack);
+	}
 }
 
 /*
@@ -246,12 +287,11 @@
 int
 tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m)
 {
-	struct trq *tqe, *tqen;
+	int i, thflags = 0;
+	tcp_seq th_seq;
 	struct socket *so = tp->t_inpcb->inp_socket;
-	struct mbuf *n;
-	int i, thflags = 0, mcnt;
-	tcp_seq th_seq;
-	struct trq tqes;
+	struct tcp_reass_block *trb, *trbn;
+	struct tcp_reass_block trbs;
 
 	INP_WLOCK_ASSERT(tp->t_inpcb);
 
@@ -262,13 +302,19 @@
 	 */
 	if (th == NULL) {
 		if (!TCPS_HAVEESTABLISHED(tp->t_state) ||
-		    TAILQ_EMPTY(&tp->t_trq) ||
-		    ((tqe = TAILQ_FIRST(&tp->t_trq)) &&
-		     tqe->trq_seq != tp->rcv_nxt))
+		    RB_EMPTY(&tp->t_trq) ||
+		    ((trb = RB_MIN(tcp_ra, &tp->rcv_reass)) &&
+		     trb->trb_seqs != tp->rcv_nxt))
 			return (0);
 		goto present;
 	}
 
+	KASSERT(th != NULL, ("%s: th is NULL", __func__));
+	KASSERT(tlenp != NULL, ("%s: tlenp is NULL", __func__));
+	KASSERT(m != NULL, ("%s: m is NULL", __func__));
+	KASSERT(*tlenp == m_length(m, NULL),
+	    ("%s: tlen != mbuf length", __func__));
+
 	/*
 	 * Store TCP header information in local variables as
 	 * we may lose access to it after mbuf compacting.
@@ -278,15 +324,15 @@
 	th = NULL;		/* Prevent further use. */
 
 	/* Check if it is really neccessary to do all the work. */
-	if (!tcp_reass_enabled && TAILQ_EMPTY(&tp->t_trq)) {
+	if (!tcp_reass_enabled && RB_EMPTY(&tp->rcv_reass)) {
 		*tlenp = 0;
 		m_freem(m);
 		return (0);
 	}
 
-	KASSERT(SEQ_LEQ(tp->rcv_nxt, th_seq),
+	KASSERT(SEQ_LT(tp->rcv_nxt, th_seq),
 	    ("%s: sequence number below rcv_nxt", __func__));
-	KASSERT(!(tp->rcv_nxt == th_seq) || !(TAILQ_EMPTY(&tp->t_trq)),
+	KASSERT(!(tp->rcv_nxt == th_seq) || !(RB_EMPTY(&tp->rcv_reass)),
 	    ("%s: got missing segment but queue is empty", __func__));
 	KASSERT(tcp_reass_verify(tp),
 	    ("%s: reassembly queue inconsistent", __func__));
@@ -311,6 +357,7 @@
 	 * buffer vs. actual real data with 2k clusters and 1500 byte
 	 * packets by introducing a correction factor of 11/8th.
 	 */
+	/*
 	if (th_seq != tp->rcv_nxt &&
 	    tp->t_trqmcnt > (sbspace(&so->so_rcv) / 8 * 11)) {
 		TCPSTAT_INC(tcps_reass_overflow);
@@ -319,15 +366,7 @@
 		*tlenp = 0;
 		return (0);
 	}
-
-	/* Get rid of packet header and mtags. */
-	m_demote(m, 1);
-
-	/* Trim empty mbufs from head of chain. */
-	m = m_trimhead(m);
-
-	/* NB: m_adj(m, -i) may free mbufs at the tail of a chain. */
-	mcnt = m_storagesize(m);
+	*/
 
 	/*
 	 * FIN handling is a bit tricky.
@@ -344,269 +383,119 @@
 	 * This approach is based on a discussion on TCPM mailing list.
 	 */
 	if ((thflags & TH_FIN) && tp->rcv_nxt == th_seq) {
-		tcp_reass_qfree(tp);
-		tqe = NULL;
+		tcp_reass_flush(tp);
 		if (m->m_len == 0) {
 			tcp_timer_activate(tp, TT_REASS, 0);
 			return (thflags);
 		}
-		goto insert;
-	} else
+	} else if (*tlenp = 0) {
+		m_freem(m);
+		return (0);
+	else
 		thflags &= ~TH_FIN;
 
-	/* Check if this is the first segment. */
-	if (TAILQ_EMPTY(&tp->t_trq))
-		goto insert;
+	/* Get rid of packet header and mtags. */
+	m_demote(m, 1);
+	/* Trim empty mbufs from head of chain. */
+	m = m_trimhead(m);
+	/* Compact mbuf chain. */
+	m = m_collapse(m, M_DONTWAIT, 1024);
 
-	/* Starting point for the following tests. */
-	tqe = TAILQ_LAST(&tp->t_trq, trq_head);
+	KASSERT(m != NULL, ("%s: m is NULL after collapse", __func__));
 
-	/* Check if this segment directly attaches to the end. */
-	if (tqe->trq_seq + tqe->trq_len == th_seq) {
-		tqe->trq_len += *tlenp;
-		tqe->trq_mcnt += mcnt;
-		tp->t_trqmcnt += mcnt;
-		tqe->trq_ml->m_next = m;
-		tqe->trq_ml = m_last(m);
-		if (tcp_reass_spacetime) {
-			tqe->trq_m =  m_collapse(tqe->trq_m, M_DONTWAIT, 1024);
-			tp->t_trqmcnt -= tqe->trq_mcnt;
-			tqe->trq_mcnt = m_storagesize(tqe->trq_m);
-			tqe->trq_mcnt += tp->t_trqmcnt;
-		}
-		sack_track(tp, tqe);
-		/* TCP statistics. */
-		TCPSTAT_INC(tcps_rcvoopack);
-		TCPSTAT_ADD(tcps_rcvoobyte, *tlenp);
-		TCPSTAT_INC(tcps_reass_tail);
-		return (0);
-	}
+	/* Set up search structure. */
+	trbs.trb_seqs = th_seq;
+	trbs.trb_seqe = th_seq + *tlenp;
+	trbs.trb_m = m;
+	trbs.trb_mt = m_last(m);
 
-	/* Check if beyond last block. */
-	if (SEQ_LT(tqe->trq_seq + tqe->trq_len, th_seq))
-		goto insert;
-
-	/* Check if this is the missing segment. */
-	if (tp->rcv_nxt == th_seq) {
-		tqe = TAILQ_FIRST(&tp->t_trq);
-		KASSERT(SEQ_GT(tqe->trq_seq, th_seq),
-		    ("%s: first block starts below missing segment", __func__));
-		/* Check if segment prepends first block. */
-		if (SEQ_LEQ(tqe->trq_seq, th_seq + *tlenp)) {
-			/* Trim tail of segment. */
-			if ((i = SEQ_DELTA(tqe->trq_seq, th_seq + *tlenp))) {
-				m_adj(m, -i);
-				*tlenp -= i;
-				/* TCP statistics. */
-				TCPSTAT_INC(tcps_rcvpartduppack);
-				TCPSTAT_ADD(tcps_rcvpartdupbyte, i);
-				/* Update accounting. */
-				mcnt = m_storagesize(m);
-			}
-			tqe->trq_len += *tlenp;
-			tqe->trq_mcnt += mcnt;
-			tp->t_trqmcnt += mcnt;
-			tqe->trq_seq = th_seq;
-			n = m_last(m);
-			n->m_next = tqe->trq_m;
-			tqe->trq_m = m;
-			goto present;
-		}
-		goto insert;	/* No statistics, this segment is in line. */
-	}
-
-	/* TCP statistics. */
-	TCPSTAT_INC(tcps_rcvoopack);
-	TCPSTAT_ADD(tcps_rcvoobyte, *tlenp);
-
-	/* See where it fits. */
-	TAILQ_FOREACH_SAFE(tqe, &tp->t_trq, trq_q, tqen) {
-		/* Segment is after this blocks coverage. */
-		if (SEQ_LT(tqe->trq_seq + tqe->trq_len, th_seq))
-			continue;
-		/* Segment is after the previous one but before this one. */
-		if (SEQ_GT(tqe->trq_seq, th_seq + *tlenp))
-			break;		/* Insert as new block. */
-
-		/* Segment is already fully covered. */
-		if (SEQ_LEQ(tqe->trq_seq, th_seq) &&
-		    SEQ_GEQ(tqe->trq_seq + tqe->trq_len, th_seq + *tlenp)) {
-			TCPSTAT_INC(tcps_rcvduppack);
-			TCPSTAT_ADD(tcps_rcvdupbyte, *tlenp);
-			TCPSTAT_INC(tcps_reass_covered);
-			/*
-			 * XXXAO: What to SACK report when duplicate?
-			 * See RFC2883: D-SACK (Duplicate SACK)
-			 */
-			sack_track(tp, tqe);
+	/*
+	 * Return match that has at least partial overlap to either side or
+	 * insert a new reassembly block.
+	 */
+	if ((trb = RB_FIND(tcp_rb, &tp->rcv_reass, &trbs)) != NULL) {
+		/* Within an already known block. */
+		if (SEQ_GEQ(trbs.trb_seqs, trb->trb_seqs) &&
+		    SEQ_LEQ(trbs.trb_seqe, trb->trb_seqe)) {
+			tcp_reass_sacktrack(tp, trb);
 			m_freem(m);
 			*tlenp = 0;
 			return (0);
 		}
+		tp->rcv_reass_size += SEQ_DELTA(trb->trb_seqs, trb->trb_seqe);
 
-		/* Segment covers and extends on both ends. */
-		if (SEQ_GT(tqe->trq_seq, th_seq) &&
-		    SEQ_LT(tqe->trq_seq + tqe->trq_len, th_seq + *tlenp)) {
-			/* Replace block content. */
-			tp->t_trqmcnt -= tqe->trq_mcnt;
-			m_freem(tqe->trq_m);
-			tqe->trq_len = *tlenp;
-			tqe->trq_mcnt = mcnt;
-			tp->t_trqmcnt += mcnt;
-			tqe->trq_seq = th_seq;
-			tqe->trq_m = m;
-			tqe->trq_ml = m_last(m);
-			/* Check if segment bridges next block to merge. */
-			if (tqen != NULL &&
-			    SEQ_GEQ(tqe->trq_seq + tqe->trq_len, tqen->trq_seq))
-				tcp_reass_merge(tp, tqe, tqen);
-			sack_track(tp, tqe);
-			TCPSTAT_INC(tcps_reass_replace);
-			return (0);
-		}
+		/* Extends the end, common case. */
+		if (SEQ_GT(trbs.trb_seqe, trb->trb_seqe)) {
+			(void)tcp_reass_merge(trb, &trbs);
+			tcp_reass_sacktrack(tp, trb);
 
-		/* Segment prepends to this block. */
-		if (SEQ_GT(tqe->trq_seq, th_seq) &&
-		    SEQ_LEQ(tqe->trq_seq, th_seq + *tlenp) &&
-		    SEQ_GEQ(tqe->trq_seq + tqe->trq_len, th_seq + *tlenp)) {
-			KASSERT(!(thflags & TH_FIN),
-			    ("%s: new segment with FIN can't prepend", __func__));
-			/* Trim tail of segment. */
-			if ((i = SEQ_DELTA(tqe->trq_seq, th_seq + *tlenp))) {
-				m_adj(m, -i);
-				*tlenp -= i;
-				/* TCP statistics. */
-				TCPSTAT_INC(tcps_rcvpartduppack);
-				TCPSTAT_ADD(tcps_rcvpartdupbyte, i);
-				/* Update accounting. */
-				mcnt = m_storagesize(m);
+			/* Merge in next blocks if there is overlap. */
+			while ((trbn = RB_NEXT(tcp_rb, &tp->rcv_reass, trb)) != NULL &&
+			    SEQ_GEQ(trbn->trb_seqs, trb->trb_seqe)) {
+				trbn = tcp_reass_merge(trb, trbn);
+				tcp_reass_free(tp, trbn);
 			}
-			tqe->trq_len += *tlenp;
-			tqe->trq_mcnt += mcnt;
-			tp->t_trqmcnt += mcnt;
-			tqe->trq_seq = th_seq;
-			n = m_last(m);
-			n->m_next = tqe->trq_m;
-			tqe->trq_m = m;
-			sack_track(tp, tqe);
-			TCPSTAT_INC(tcps_reass_prepend);
-			return (0);
 		}
 
-		/* Segment appends to this block. */
-		if (SEQ_LT(tqe->trq_seq + tqe->trq_len, th_seq + *tlenp) &&
-		    SEQ_LEQ(tqe->trq_seq, th_seq) &&
-		    SEQ_GEQ(tqe->trq_seq + tqe->trq_len, th_seq)) {
-			/* Trim head of segment. */
-			if ((i = SEQ_DELTA(tqe->trq_seq + tqe->trq_len, th_seq))) {
-				m_adj(m, i);
-				*tlenp -= i;
-				/* TCP Statistics. */
-				TCPSTAT_INC(tcps_rcvpartduppack);
-				TCPSTAT_ADD(tcps_rcvpartdupbyte, i);
+		/* Extends the start. */
+		if (SEQ_LT(trbs.trb_seqs, trb->trb_seqs)) {
+			(void)tcp_reass_merge(trb, &trbs);
+			tcp_reass_sacktrack(tp, trb);
+
+			/* Merge in previous blocks if there is overlap. */
+			while ((trbn = RB_PREV(tcp_rb, &tp->rcv_reass, trb)) != NULL &&
+			    SEQ_GEQ(trbn->trb_seqe, trb->trb_seqs)) {
+				trbn = tcp_reass_merge(tp, trb, trbn);
+				tcp_reass_free(tp, trbn);
 			}
-			tqe->trq_len += *tlenp;
-			tqe->trq_mcnt += mcnt;
-			tp->t_trqmcnt += mcnt;
-			tqe->trq_ml->m_next = m;
-			tqe->trq_ml = m_last(m);
-			/* Check if segment bridges two blocks to merge. */
-			if (tqen != NULL &&
-			    SEQ_GEQ(tqe->trq_seq + tqe->trq_len, tqen->trq_seq))
-				tcp_reass_merge(tp, tqe, tqen);
-			sack_track(tp, tqe);
-			TCPSTAT_INC(tcps_reass_append);
-			return (0);
 		}
+	} else if ((trb = (struct tcp_reass_block *)uma_zalloc(tcp_reass_zone, (M_NOWAIT|M_ZERO))) != NULL) {
+		trb->trb_seqs = trbs.trb_seqs;
+		trb->trb_seqe = trbs.trb_seqe;
+		trb->trb_m = trbs.trb_m;
+		trb->trb_mt = trbs.trb_mt;
+		trbn = RB_INSERT(tcp_rb, &tp->rcv_reass, trb);
+		KASSERT(trbn == NULL, ("%s: RB_INSERT failed", __func__));
+		tcp_reass_sacktrack(tp, trb);
+		tp->rcv_reass_size += SEQ_DELTA(trb->trb_seqs, trb->trb_seqe);
+	} else if (tp->rcv_nxt == th_seq) {
+		trbn = RB_INSERT(tcp_rb, &tp->rcv_reass, &trbs);
+		KASSERT(trbn == NULL, ("%s: RB_INSERT failed", __func__));
 	}
+	if (tp->rcv_nxt == th_seq)
+		goto present;
 
-insert:
-	/* Prepare to insert into block queue. */
-	if (tp->rcv_nxt == th_seq) {
-		/*
-		 * Use temporary struct trq on the stack for missing
-		 * segment to prevent blocking of all reassembly queues
-		 * due to zone exhaustion.
-		 */
-		tqen = &tqes;
-	} else {
-		tqen = uma_zalloc(tcp_reass_zone, (M_NOWAIT|M_ZERO));
-		if (tqen == NULL) {
-			TCPSTAT_INC(tcps_rcvmemdrop);
-			m_freem(m);
-			*tlenp = 0;
-			return (0);
-		}
-		TCPSTAT_INC(tcps_reass_blocks);
-	}
-	tcp_reass_qsize++;
-	if (tcp_reass_spacetime) {
-		m = m_collapse(m, M_DONTWAIT, 1024);
-		mcnt = m_storagesize(m);
-	}
-	tqen->trq_seq = th_seq;
-	tqen->trq_len = *tlenp;
-	tqen->trq_mcnt = mcnt;
-	tp->t_trqmcnt += mcnt;
-	tqen->trq_m = m;
-	tqen->trq_ml = m_last(m);
+	KASSERT(tcp_reass_verify(tp),
+	    ("%s: reassembly queue inconsistent", __func__));
+	return (0);
 
-	/* Where to insert. */
-	if (tqe != NULL && SEQ_LT(tqe->trq_seq + tqe->trq_len, th_seq))
-		TAILQ_INSERT_AFTER(&tp->t_trq, tqe, tqen, trq_q);
-	else if (tqe != NULL)
-		TAILQ_INSERT_BEFORE(tqe, tqen, trq_q);
-	else {
-		KASSERT(TAILQ_EMPTY(&tp->t_trq),
-		    ("%s: first element queue not empty", __func__));
-		TAILQ_INSERT_HEAD(&tp->t_trq, tqen, trq_q);
-		/*
-		 * Flush the reassembly queue after x times the
-		 * current retransmit interval measured from the
-		 * arrival time of the first segment.
-		 */
-		if (tcp_reass_qtimo)
-			tcp_timer_activate(tp, TT_REASS,
-			    tp->t_rxtcur * tcp_reass_qtimo);
-	}
-	LIST_INSERT_HEAD(&tp->t_trq_sack, tqen, trq_s);
-
-	/* Missing segment? */
-	if (tp->rcv_nxt != th_seq)
-		return (0);
 present:
 	/*
 	 * Present data to user, advancing rcv_nxt through the
 	 * completed sequence space.
 	 */
-	KASSERT(!TAILQ_EMPTY(&tp->t_trq),
+	KASSERT(!RB_EMPTY(&tp->rcv_reass),
 	    ("%s: queue empty at present", __func__));
-	KASSERT((TAILQ_FIRST(&tp->t_trq))->trq_seq == tp->rcv_nxt,
+	KASSERT((RB_MIN(tcp_ra, &tp->rcv_reass))->trb_seqs == tp->rcv_nxt,
 	    ("%s: first block does not match rcv_nxt", __func__));
 	TCPSTAT_INC(tcps_reass_missingseg);
 
 	SOCKBUF_LOCK(&so->so_rcv);
-	TAILQ_FOREACH_SAFE(tqe, &tp->t_trq, trq_q, tqen) {
-		KASSERT(SEQ_GEQ(tqe->trq_seq, tp->rcv_nxt),
-		    ("%s: trq_seq < rcv_nxt", __func__));
-		KASSERT(tqen == NULL ||
-		    SEQ_LEQ(tqe->trq_seq + tqe->trq_len, tqen->trq_seq),
-		    ("%s: block overlaps into next one", __func__));
 
-		if (tqe->trq_seq != tp->rcv_nxt)
-			break;
-		if (so->so_rcv.sb_state & SBS_CANTRCVMORE)
-			m_freem(tqe->trq_m);
-		else
-			sbappendstream_locked(&so->so_rcv, tqe->trq_m);
+	trb = RB_MIN(tcp_ra, &tp->rcv_reass);
+	if (!(so->so_rcv.sb_state & SBS_CANTRCVMORE)) {
+		sbappendstream_locked(&so->so_rcv, trb->trb_m);
 		tp->rcv_nxt += tqe->trq_len;
-		tp->t_trqmcnt -= tqe->trq_mcnt;
-		TAILQ_REMOVE(&tp->t_trq, tqe, trq_q);
-		LIST_REMOVE(tqe, trq_s);
-		if (tqe != &tqes)
-			uma_zfree(tcp_reass_zone, tqe);
-		V_tcp_reass_qsize--;
+		trb->trb_m = NULL;
+		trb->trb_mt = NULL;
 	}
+	if (trb == &trbs) {
+		RB_REMOVE(tcp_ra, &tp->rcv_reass, trb);
+		if (trb->trb_m != NULL)
+			m_freem(trb->trb_m);
+	} else
+		tcp_reass_free(tp, trb);
+
 	/* NB: sorwakeup_locked() does a implicit socket buffer unlock. */
 	sorwakeup_locked(so);
 
@@ -615,7 +504,7 @@
 	 * the sequence space and if queue is not empty.  Otherwise
 	 * deactivate it.
 	 */
-	if (tcp_reass_qtimo && !TAILQ_EMPTY(&tp->t_trq))
+	if (tcp_reass_qtimo && !RB_EMPTY(&tp->rcv_reass))
 		tcp_timer_activate(tp, TT_REASS,
 		    tp->t_rxtcur * tcp_reass_qtimo);
 	else
@@ -627,57 +516,45 @@
 
 /*
  * Merge one or more consecutive blocks together.
+ * Always merge trbn into trb!
  */
-static void
-tcp_reass_merge(struct tcpcb *tp, struct trq *tqe, struct trq *tqen)
+static struct tcp_reass_block *
+tcp_reass_merge(struct tcpcb *tp, struct tcp_reass_block *trb, struct tcp_reass_block *trbn)
 {
 	int i;
 
-	KASSERT(tqe != NULL && tqen != NULL,
+	KASSERT(trb != NULL && trbn != NULL,
 	    ("%s: incomplete input", __func__));
-	KASSERT(SEQ_GEQ(tqe->trq_seq + tqe->trq_len, tqen->trq_seq),
-	    ("%s: blocks do not overlap, nothing to merge", __func__));
 
-	/* Appended block may reach beyond next block. */
-	while (SEQ_GEQ(tqe->trq_seq + tqe->trq_len, tqen->trq_seq + tqen->trq_len)) {
-		/* TCP Statistics. */
-		TCPSTAT_ADD(tcps_rcvpartdupbyte, tqen->trq_len);
-		TCPSTAT_INC(tcps_reass_covered);
-		tp->t_trqmcnt -= tqe->trq_mcnt;
-		m_freem(tqen->trq_m);
-		TAILQ_REMOVE(&tp->t_trq, tqen, trq_q);
-		LIST_REMOVE(tqen, trq_s);
-		uma_zfree(tcp_reass_zone, tqen);
-		tcp_reass_qsize--;
-		/* And the one after that. */
-		if ((tqen = TAILQ_NEXT(tqe, trq_q)) == NULL)
-			return;
-	}
-
-	/* Trim head of next block. */
-	if ((i = SEQ_DELTA(tqe->trq_seq + tqe->trq_len, tqen->trq_seq))) {
-		m_adj(tqen->trq_m, i);
-		tqen->trq_len -= i;
-		TCPSTAT_ADD(tcps_rcvpartdupbyte, i);		/* Statistics */
-		/* Dispose of empty mbufs. */
-		if (tcp_reass_spacetime) {
-			tqen->trq_m = m_trimhead(tqen->trq_m);
-			tqen->trq_mcnt = m_storagesize(tqen->trq_m);
+	/* Append and prepend. */
+	if (SEQ_GEQ(trb->trb_seqe, trbn->trb_seqs)) {
+		if (SEQ_GEQ(trb->trb_seqe, trbn->trb_seqe))
+			return (trbn);
+		if ((i = SEQ_DELTA(trb->trb_seqe, trbn->trb_seqs)) > 0) {
+			m_adj(trbn->trb_m, i);
+			trbn->trb_m = m_trimhead(trbn->trb_m);
+		}
+		trb->trb_seqe = trbn->trb_seqe;
+		trb->trb_mt->m_next = trbn->trb_m;
+		trb->trb_mt = trbn->trb_mt;
+	} else if (SEQ_LEQ(trb->trb_seqs, trbn->trb_seqe)) {
+		if (SEQ_LEQ(trb->trb_seqs, trbn->trb_seqs)
+			return (trbn);
+		if ((i = SEQ_DELTA(trb->trb_seqs, trbn->trb_seqe)) > 0) {
+			m_adj(trb->trb_m, i);
+			trb->trb_m = m_trimhead(trb->trb_m);
 		}
-		KASSERT(tqen->trq_m != NULL,
-		    ("%s: no remaining mbufs in block", __func__));
-	}
+		trb->trb_seqs = trbn->trb_seqs;
+		trbn->trb_mt->m_next = trb->trb_m;
+		trb->trb_m = trbn->trb_m;
+	} else
+		return (NULL);
 
-	/* Merge blocks together. */
-	tqe->trq_len += tqen->trq_len;
-	tqe->trq_mcnt += tqen->trq_mcnt;
-	tqe->trq_ml->m_next = tqen->trq_m;
-	tqe->trq_ml = tqen->trq_ml;
-	TAILQ_REMOVE(&tp->t_trq, tqen, trq_q);
-	LIST_REMOVE(tqen, trq_s);
-	uma_zfree(tcp_reass_zone, tqen);
-	tcp_reass_qsize--;
-	TCPSTAT_INC(tcps_reass_merge);
+	trbn->trb_seqs = 0;
+	trbn->trb_seqe = i;
+	trbn->trb_m = NULL;
+	trbn->trb_mt = NULL;
+	return (trbn);		
 }
 
 /*
@@ -691,10 +568,9 @@
 	tcp_seq sack_seq;
 	int nsacks = 0;
 
+	INP_WLOCK_ASSERT(tp->t_inpcb);
 	KASSERT(numsacks > 0,
 	    ("%s: zero sack blocks to add", __func__));
-	KASSERT(!TAILQ_EMPTY(&tp->t_trq),
-	    ("%s: reassembly queue empty", __func__));
 	KASSERT(!LIST_EMPTY(&tp->t_trq_sack),
 	    ("%s: sack list empty", __func__));
 
@@ -718,25 +594,15 @@
 	return (nsacks);
 }
 
-/*
- * Free the reassembly queue on tcpcb disposal or on general memory shortage.
- */
-void
-tcp_reass_qfree(struct tcpcb *tp)
+#ifdef DDB
+static void
+db_print_reassblocks(struct tcpcb *tp)
 {
-	struct trq *tqe, *tqen;
+	struct tcp_reass_block *trb;
 
-	INP_WLOCK_ASSERT(tp->t_inpcb);
-
-	TAILQ_FOREACH_SAFE(tqe, &tp->t_trq, trq_q, tqen) {
-		m_freem(tqe->trq_m);
-		KASSERT(tp->t_trqmcnt >= tqe->trq_mcnt,
-		    ("%s: t_trqmcnt incorrect", __func__));
-		tp->t_trqmcnt -= tqe->trq_mcnt;
-		TAILQ_REMOVE(&tp->t_trq, tqe, trq_q);
-		LIST_REMOVE(tqe, trq_s);
-		uma_zfree(tcp_reass_zone, tqe);
-		tcp_reass_qsize--;
+	RB_FOREACH(trb, tcp_ra, &tp->rcv_reass) {
+		db_printf(" reass block 0x%08x - 0x%08x\n",
+		    trb->trb_seqs, trb->trb_seqe);
 	}
-	tcp_timer_activate(tp, TT_REASS, 0);
 }
+#endif
==== //depot/projects/tcp_reass/netinet/tcp_var.h#18 (text+ko) ====

@@ -51,14 +51,13 @@
 #endif /* _KERNEL */
 
 /* TCP reassembly queue segment block entry. */
-struct trq {
-	TAILQ_ENTRY(trq) trq_q;		/* linked list in SEQ# order */
-	LIST_ENTRY(trq)	trq_s;		/* linked list in SACK order */
-	tcp_seq		trq_seq;	/* start of block */
-	int		trq_len;	/* length of block */
-	int		trq_mcnt;	/* gross mbuf size of block */
-	struct mbuf	*trq_m;		/* mbuf chain of data */
-	struct mbuf	*trq_ml;	/* last mbuf in chain of data */
+struct tcp_reass_block {
+	RB_ENTRY(tcp_reass_block) trb_rb;
+	LIST_ENTRY(tcp_reass_block)	trb_sack;	/* linked list in SACK order */
+	tcp_seq		trb_seqs;	/* start of block */
+	tcp_seq		trb_seqe;	/* end of block */
+	struct mbuf	*trb_m;		/* mbuf chain of data */
+	struct mbuf	*trb_mt;	/* last mbuf in chain of data */
 };
 
 struct sackblk {
@@ -105,9 +104,9 @@
  * Organized for 16 byte cacheline efficiency.
  */
 struct tcpcb {
-	TAILQ_HEAD(trq_head, trq) t_trq;	/* segment reassembly queue */
-	LIST_HEAD(trq_shead, trq) t_trq_sack;	/* last additions to reass queue */
-	int	t_trqmcnt;		/* segment reassembly queue gross usage */
+	RB_HEAD(tcp_ra, tcp_reass_block) rcv_reass;	/* segment reassembly queue */
+	LIST_HEAD(trq_shead, trq) rcv_reass_sack;	/* last additions to reass queue */
+	int	rcv_reass_size;		/* segment reassembly memory usage */
 
 	int	t_dupacks;		/* consecutive dup acks recd */
 
@@ -653,7 +652,7 @@
 int	 tcp_reass(struct tcpcb *, struct tcphdr *, int *, struct mbuf *);
 void	 tcp_reass_init(void);
 int	 tcp_reass_sack(struct tcpcb *, u_char *, int);
-void	 tcp_reass_qfree(struct tcpcb *);
+void	 tcp_reass_flush(struct tcpcb *);
 void	 tcp_input(struct mbuf *, int);
 u_long	 tcp_maxmtu(struct in_conninfo *, int *);
 u_long	 tcp_maxmtu6(struct in_conninfo *, int *);


More information about the p4-projects mailing list