svn commit: r189623 - head/tools/tools/netrate/tcpp

Robert Watson rwatson at FreeBSD.org
Tue Mar 10 07:52:18 PDT 2009


Author: rwatson
Date: Tue Mar 10 14:52:17 2009
New Revision: 189623
URL: http://svn.freebsd.org/changeset/base/189623

Log:
  Add tcpp -- TCP parallelism microbenchmark.
  
  This tool creates large numbers of TCP connections, each of which will
  transmit a fixed amount of data, between client and server hosts.  tcpp can
  use multiple workers (typically up to the number of hardware cores), and can
  use multiple source IPs in order to use an expanded port/IP 4-tuple space to
  avoid problems from reusing 4-tuples too quickly.  Aggregate bandwidth use
  will be reported after a client run.
  
  While by no means a perfect tool, it has proven quite useful in generating
  and optimizing TCP stack lock contention by easily generating high-intensity
  workloads.  It also proves surprisingly good at finding device driver bugs.

Added:
  head/tools/tools/netrate/tcpp/
  head/tools/tools/netrate/tcpp/Makefile   (contents, props changed)
  head/tools/tools/netrate/tcpp/README   (contents, props changed)
  head/tools/tools/netrate/tcpp/tcpp.c   (contents, props changed)
  head/tools/tools/netrate/tcpp/tcpp.h   (contents, props changed)
  head/tools/tools/netrate/tcpp/tcpp_client.c   (contents, props changed)
  head/tools/tools/netrate/tcpp/tcpp_server.c   (contents, props changed)
  head/tools/tools/netrate/tcpp/tcpp_util.c   (contents, props changed)

Added: head/tools/tools/netrate/tcpp/Makefile
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ head/tools/tools/netrate/tcpp/Makefile	Tue Mar 10 14:52:17 2009	(r189623)
@@ -0,0 +1,9 @@
+# $FreeBSD$
+
+PROG=	tcpp
+INCS=	tcpp.h
+NO_MAN=	
+SRCS=	tcpp.c tcpp_client.c tcpp_server.c tcpp_util.c
+WARNS=	3
+
+.include <bsd.prog.mk>

Added: head/tools/tools/netrate/tcpp/README
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ head/tools/tools/netrate/tcpp/README	Tue Mar 10 14:52:17 2009	(r189623)
@@ -0,0 +1,99 @@
+tcpp -- Parallel TCP Exercise Tool
+
+This is a new tool, and is rife with bugs.  However, it appears to create
+even more problems for device drivers and the kernel, so that's OK.
+
+This tool generates large numbers of TCP connections and stuffs lots of data
+into them.  One binary encapsulates both a client and a server.  Each of the
+client and the server generates a certain number of worker processes, each of
+which in turn uses its own TCP port.  The number of server processes must be
+>= the number of client processes, or some of the ports required by the
+client won't have a listener.  The client then proceeds to make connections 
+and send data to the server.  Each worker multiplexes many connections at
+once, up to a maximum parallelism limit.  The client can use one or many IP
+addresses, in order to make more 4-tuples available for testing, and will
+automatically spread the load of new connections across available source
+addresses.
+
+You will need to retune your TCP stack for high volume, see Configuration
+Notes below.
+
+The server has very little to configure, use the following command line
+flags:
+
+  -s                           Select server mode
+  -p <numprocs>                Number of workers, should be >= client -p arg
+  -r <baseport>                Non-default base TCP port, should match client
+  -T                           Print CPU usage every ten seconds
+  -m <maxconnectionsperproc>   Maximum simultaneous connections/proc, should
+                               be >= client setting.
+
+Typical use:
+
+  ./tcpp -s -p 4 -m 1000000
+
+This selects server mode, four workers, and at most 1 million TCP connections
+per worker at a time.
+
+The client has more to configure, with the following flags:
+
+  -c <remoteIP>                Select client mode, and specific dest IP
+  -C                           Print connections/second instead of GBps
+  -M <localIPcount>            Number of sequential local IPs to use; req. -l
+  -T                           Include CPU use summary in stats at end of run
+  -b <bytespertcp>             Data bytes per connection
+  -l <localIPbase>             Starting local IP address to bind
+  -m <maxtcpsatonce>           Max simultaneous conn/worker (see server -m)
+  -p <numprocs>                Number of workers, should be <= server -p
+  -r <baseport>                Non-default base TCP port, should match server
+  -t <tcpsperproc>             How many connections to use per worker
+  
+Typical use:
+
+  ./tcpp -c 192.168.100.201 -p 4 -t 100000 -m 10000 -b 100000 \
+    -l 192.168.100.101 -M 4
+
+This creates four workers, each of which will (over its lifetime) set up and
+use 100,000 TCP connections carrying 100K of data, up to 10,000 simultaneous
+connection at any given moment.  tcpp will use four source IP addresses,
+starting with 192.168.100.101, and all connections will be to the single
+destination IP of 192.168.100.201.
+
+Having (p) <= the number of cores is advisable.  When multiple IPs are used
+on the client, they will be sequential starting with the localIPbase set with
+-l.
+
+Known Issues
+------------
+
+The bandwidth estimate doesn't handle failures well.  It also has serious
+rounding errors and probably conceptual problems.
+
+It's not clear that kevent() is "fair" to multiple connections.
+
+Rather than passing the length for each connection, we might want to pass
+it once with a control connection up front.  On the other hand, the server
+is quite dumb right now, so we could take advantage of this to do size
+mixes.
+
+Configuration Notes
+-------------------
+
+In my testing, I use sysctl.conf entries of:
+
+net.inet.ip.portrange.first=100
+kern.ipc.maxsockets=1000000
+net.inet.tcp.maxtcptw=3000000
+kern.ipc.somaxconn=49152
+
+# For running !multiq, do this before loading the driver:
+kenv hw.cxgb.singleq="1"
+
+kldload if_cxgb
+
+# Consider turning off TSO and/or adjusting the MTU for some scenarios:
+ifconfig cxgb0 -tso
+ifconfig cxgb0 mtu 1500
+
+
+$FreeBSD$

Added: head/tools/tools/netrate/tcpp/tcpp.c
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ head/tools/tools/netrate/tcpp/tcpp.c	Tue Mar 10 14:52:17 2009	(r189623)
@@ -0,0 +1,204 @@
+/*-
+ * Copyright (c) 2008-2009 Robert N. M. Watson
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $FreeBSD$
+ */
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include <netinet/in.h>
+
+#include <arpa/inet.h>
+
+#include <err.h>
+#include <getopt.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sysexits.h>
+#include <unistd.h>
+
+#include "tcpp.h"
+
+#define	BYTES_DEFAULT	10*1024*1024	/* Data per connection. */
+#define	MAXTCPS_DEFAULT	32		/* Number of TCPs at a time per proc. */
+#define	PROCS_DEFAULT	1		/* Processes used in run. */
+#define	TCPS_DEFAULT	1		/* Number of connections per process. */
+#define	BASEPORT_DEFAULT	10000
+
+struct sockaddr_in remoteip; 		/* Base target address. */
+struct sockaddr_in localipbase;		/* Base local address, if -l. */
+int cflag, lflag, mflag, pflag, sflag, tflag, Cflag, Mflag, Tflag;
+uint64_t bflag;
+u_short rflag;
+
+static void
+usage(void)
+{
+
+	fprintf(stderr, "client: tcpp"
+	    " -c remoteIP"
+	    " [-CT]"
+	    " [-M localIPcount]"
+	    " [-l localIPbase]"
+	    " [-b bytespertcp]"
+	    " [-m maxtcpsatonce]"
+	    "\n"
+	    "\t"
+	    " [-p procs]"
+	    " [-t tcpsperproc]"
+	    " [-r baseport]"
+	    "\n");
+
+	fprintf(stderr, "server: tcpp"
+	    " -s"
+	    " [-T]"
+	    " [-l localIPbase]"
+	    " [-m maxtcpsatonce]"
+	    " [-p procs]"
+	    " [-r baseport]"
+	    "\n");
+	exit(EX_USAGE);
+}
+
+int
+main(int argc, char *argv[])
+{
+	long long ll;
+	char *dummy;
+	int ch;
+
+	bzero(&localipbase, sizeof(localipbase));
+	localipbase.sin_len = sizeof(localipbase);
+	localipbase.sin_family = AF_INET;
+	localipbase.sin_addr.s_addr = htonl(INADDR_ANY);	/* Default. */
+	localipbase.sin_port = htons(0);				/* Default. */
+
+	bzero(&remoteip, sizeof(remoteip));
+	remoteip.sin_len = sizeof(remoteip);
+	remoteip.sin_family = AF_INET;
+	remoteip.sin_addr.s_addr = htonl(INADDR_LOOPBACK); /* Default. */
+	remoteip.sin_port = htons(0);				/* Default. */
+
+	bflag = BYTES_DEFAULT;
+	mflag = MAXTCPS_DEFAULT;
+	pflag = PROCS_DEFAULT;
+	rflag = BASEPORT_DEFAULT;
+	tflag = TCPS_DEFAULT;
+	Mflag = 1;
+	while ((ch = getopt(argc, argv, "b:c:l:m:p:r:st:CM:T")) != -1) {
+		switch (ch) {
+		case 'b':
+			ll = strtoll(optarg, &dummy, 10);
+			if (*dummy != '\0' || ll <= 0)
+				usage();
+			bflag = ll;
+			break;
+
+		case 'c':
+			cflag++;
+			if (inet_aton(optarg, &remoteip.sin_addr) != 1)
+				err(-1, "inet_aton: %s", optarg);
+			break;
+
+		case 'l':
+			lflag++;
+			if (inet_aton(optarg, &localipbase.sin_addr) != 1)
+				err(-1, "inet_aton: %s", optarg);
+			break;
+
+		case 'm':
+			ll = strtoll(optarg, &dummy, 10);
+			if (*dummy != '\0' || ll <= 0)
+				usage();
+			mflag = ll;
+			break;
+
+		case 'p':
+			ll = strtoll(optarg, &dummy, 10);
+			if (*dummy != '\0' || ll <= 0)
+				usage();
+			pflag = ll;
+			break;
+
+		case 'r':
+			ll = strtol(optarg, &dummy, 10);
+			if (*dummy != '\0' || ll < 1 || ll > 65535)
+				usage();
+			rflag = ll;
+			break;
+
+		case 's':
+			sflag++;
+			break;
+
+		case 't':
+			ll = strtoll(optarg, &dummy, 10);
+			if (*dummy != '\0' || ll <= 0)
+				usage();
+			tflag = ll;
+			break;
+
+		case 'C':
+			Cflag++;
+			break;
+
+		case 'M':
+			ll = strtoll(optarg, &dummy, 10);
+			if (*dummy != '\0' || ll <= 1)
+				usage();
+			Mflag = ll;
+			break;
+
+		case 'T':
+			Tflag++;
+			break;
+
+		default:
+			usage();
+		}
+	}
+
+	/* Exactly one of client and server. */
+	if (cflag > 1 || sflag > 1)
+		usage();
+	if ((cflag && sflag) || (!cflag && !sflag))
+		usage();
+
+	/* If Mflag is specified, we must have the lflag for a local IP. */
+	if (Mflag > 1 && !lflag)
+		usage();
+
+	/* Several flags are valid only on the client, disallow if server. */
+	if (sflag && (Cflag || Mflag > 1))
+		usage();
+
+	if (cflag)
+		tcpp_client();
+	else
+		tcpp_server();
+	exit(0);
+}

Added: head/tools/tools/netrate/tcpp/tcpp.h
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ head/tools/tools/netrate/tcpp/tcpp.h	Tue Mar 10 14:52:17 2009	(r189623)
@@ -0,0 +1,52 @@
+/*-
+ * Copyright (c) 2008-2009 Robert N. M. Watson
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $FreeBSD$
+ */
+
+#ifndef TCPP_H
+#define	TCPP_H
+
+extern struct sockaddr_in localipbase, remoteip;
+extern int cflag, lflag, mflag, pflag, sflag, tflag;
+extern int Cflag, Iflag, Mflag, Tflag;
+extern uint64_t bflag;
+extern u_short rflag;
+
+#define	TCPP_MAGIC	0x84e812f7
+struct tcpp_header {
+	u_int32_t	th_magic;
+	u_int64_t	th_len;
+} __packed;
+
+void	tcpp_client(void);
+void	tcpp_header_encode(struct tcpp_header *thp);
+void	tcpp_header_decode(struct tcpp_header *thp);
+void	tcpp_server(void);
+
+#define	SYSCTLNAME_CPUS		"kern.smp.cpus"
+#define	SYSCTLNAME_CPTIME	"kern.cp_time"
+
+#endif /* TCPP_H */

Added: head/tools/tools/netrate/tcpp/tcpp_client.c
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ head/tools/tools/netrate/tcpp/tcpp_client.c	Tue Mar 10 14:52:17 2009	(r189623)
@@ -0,0 +1,346 @@
+/*-
+ * Copyright (c) 2008-2009 Robert N. M. Watson
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $FreeBSD$
+ */
+
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/resource.h>
+#include <sys/sched.h>
+#include <sys/socket.h>
+#include <sys/sysctl.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+
+#include <netinet/in.h>
+
+#include <err.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <inttypes.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "tcpp.h"
+
+#define	min(x, y)	(x < y ? x : y)
+
+#define timespecsub(vvp, uvp)						\
+	do {								\
+		(vvp)->tv_sec -= (uvp)->tv_sec;				\
+		(vvp)->tv_nsec -= (uvp)->tv_nsec;			\
+		if ((vvp)->tv_nsec < 0) {				\
+			(vvp)->tv_sec--;				\
+			(vvp)->tv_nsec += 1000000000;			\
+		}							\
+	} while (0)
+
+
+/*
+ * Gist of each client worker: build up to mflag connections at a time, and
+ * pump data in to them somewhat fairly until tflag connections have been
+ * completed.
+ */
+#define	CONNECTION_MAGIC	0x87a3f56e
+struct connection {
+	uint32_t	conn_magic;		/* Just magic. */
+	int		conn_fd;
+	struct tcpp_header	conn_header;	/* Header buffer. */
+	u_int		conn_header_sent;	/* Header bytes sent. */
+	u_int64_t	conn_data_sent;		/* Data bytes sent.*/
+};
+
+static u_char			 buffer[256 * 1024];	/* Buffer to send. */
+static pid_t			*pid_list;
+static int			 kq;
+static int			 started;	/* Number started so far. */
+static int			 finished;	/* Number finished so far. */
+static int			 counter;	/* IP number offset. */
+
+static struct connection *
+tcpp_client_newconn(void)
+{
+	struct sockaddr_in sin;
+	struct connection *conn;
+	struct kevent kev;
+	int fd, i;
+
+	/*
+	 * Spread load over available IPs, roating through them as we go.  No
+	 * attempt to localize IPs to particular workers.
+	 */
+	sin = localipbase;
+	sin.sin_addr.s_addr = htonl(ntohl(localipbase.sin_addr.s_addr) +
+	    (counter++ % Mflag));
+
+	fd = socket(PF_INET, SOCK_STREAM, 0);
+	if (fd < 0)
+		err(-1, "socket");
+
+	if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0)
+		err(-1, "fcntl");
+
+	i = 1;
+	if (setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i)) < 0)
+		err(-1, "setsockopt");
+#if 0
+	i = 1;
+	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)) < 0)
+		err(-1, "setsockopt");
+#endif
+
+	if (lflag) {
+		if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) < 0)
+			err(-1, "bind");
+	}
+
+	if (connect(fd, (struct sockaddr *)&remoteip, sizeof(remoteip)) < 0 &&
+	    errno != EINPROGRESS)
+		err(-1, "connect");
+
+	conn = malloc(sizeof(*conn));
+	if (conn == NULL)
+		return (NULL);
+	bzero(conn, sizeof(*conn));
+	conn->conn_magic = CONNECTION_MAGIC;
+	conn->conn_fd = fd;
+	conn->conn_header.th_magic = TCPP_MAGIC;
+	conn->conn_header.th_len = bflag;
+	tcpp_header_encode(&conn->conn_header);
+
+	EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, conn);
+	if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0)
+		err(-1, "newconn kevent");
+
+	started++;
+	return (conn);
+}
+
+static void
+tcpp_client_closeconn(struct connection *conn)
+{
+
+	close(conn->conn_fd);
+	bzero(conn, sizeof(*conn));
+	free(conn);
+	finished++;
+}
+
+static void
+tcpp_client_handleconn(struct kevent *kev)
+{
+	struct connection *conn;
+	ssize_t len;
+
+	conn = kev->udata;
+	if (conn->conn_magic != CONNECTION_MAGIC)
+		errx(-1, "tcpp_client_handleconn: magic");
+
+	if (conn->conn_header_sent < sizeof(conn->conn_header)) {
+		len = write(conn->conn_fd, ((u_char *)&conn->conn_header) +
+		    conn->conn_header_sent, sizeof(conn->conn_header) -
+		    conn->conn_header_sent);
+		if (len < 0) {
+			tcpp_client_closeconn(conn);
+			err(-1, "tcpp_client_handleconn: header write");
+		}
+		if (len == 0) {
+			tcpp_client_closeconn(conn);
+			errx(-1, "tcpp_client_handleconn: header write "
+			    "premature EOF");
+		}
+		conn->conn_header_sent += len;
+	} else {
+		len = write(conn->conn_fd, buffer, min(sizeof(buffer),
+		    bflag - conn->conn_data_sent));
+		if (len < 0) {
+			tcpp_client_closeconn(conn);
+			err(-1, "tcpp_client_handleconn: data write");
+		}
+		if (len == 0) {
+			tcpp_client_closeconn(conn);
+			errx(-1, "tcpp_client_handleconn: data write: "
+			    "premature EOF");
+		}
+		conn->conn_data_sent += len;
+		if (conn->conn_data_sent >= bflag) {
+			/*
+			 * All is well.
+			 */
+			tcpp_client_closeconn(conn);
+		}
+	}
+}
+
+static void
+tcpp_client_worker(int workernum)
+{
+	struct kevent *kev_array;
+	int i, numevents, kev_bytes;
+#if defined(CPU_SETSIZE) && 0
+	cpu_set_t mask;
+	int ncpus;
+	size_t len;
+
+	len = sizeof(ncpus);
+	if (sysctlbyname(SYSCTLNAME_CPUS, &ncpus, &len, NULL, 0) < 0)
+		err(-1, "sysctlbyname: %s", SYSCTLNAME_CPUS);
+	if (len != sizeof(ncpus))
+		errx(-1, "sysctlbyname: %s: len %jd", SYSCTLNAME_CPUS,
+		    (intmax_t)len);
+
+	CPU_ZERO(&mask);
+	CPU_SET(workernum % ncpus, &mask);
+	if (sched_setaffinity(0, CPU_SETSIZE, &mask) < 0)
+		err(-1, "sched_setaffinity");
+#endif
+	setproctitle("tcpp_client %d", workernum);
+
+	/*
+	 * Add the worker number to the remote port.
+	 */
+	remoteip.sin_port = htons(rflag + workernum);
+
+	kev_bytes = sizeof(*kev_array) * mflag;
+	kev_array = malloc(kev_bytes);
+	if (kev_array == NULL)
+		err(-1, "malloc");
+	bzero(kev_array, kev_bytes);
+
+	kq = kqueue();
+	if (kq < 0)
+		err(-1, "kqueue");
+
+	while (finished < tflag) {
+		while ((started - finished < mflag) && (started < tflag))
+			(void)tcpp_client_newconn();
+		numevents = kevent(kq, NULL, 0, kev_array, mflag, NULL);
+		if (numevents < 0)
+			err(-1, "kevent");
+		if (numevents > mflag)
+			errx(-1, "kevent: %d", numevents);
+		for (i = 0; i < numevents; i++)
+			tcpp_client_handleconn(&kev_array[i]);
+	}
+	/* printf("Worker %d done - %d finished\n", workernum, finished); */
+}
+
+void
+tcpp_client(void)
+{
+	struct timespec ts_start, ts_finish;
+	long cp_time_start[CPUSTATES], cp_time_finish[CPUSTATES];
+	long ticks;
+	size_t size;
+	pid_t pid;
+	int i, failed, status;
+
+	pid_list = malloc(sizeof(*pid_list) * pflag);
+	if (pid_list == NULL)
+		err(-1, "malloc pid_list");
+	bzero(pid_list, sizeof(*pid_list) * pflag);
+
+	/*
+	 * Start workers.
+	 */
+	size = sizeof(cp_time_start);
+	if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_start, &size, NULL, 0)
+	    < 0)
+		err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME);
+	if (clock_gettime(CLOCK_REALTIME, &ts_start) < 0)
+		err(-1, "clock_gettime");
+	for (i = 0; i < pflag; i++) {
+		pid = fork();
+		if (pid < 0) {
+			warn("fork");
+			for (i = 0; i < pflag; i++) {
+				if (pid_list[i] != 0)
+					(void)kill(pid_list[i], SIGKILL);
+			}
+			exit(-1);
+		}
+		if (pid == 0) {
+			tcpp_client_worker(i);
+			exit(0);
+		}
+		pid_list[i] = pid;
+	}
+
+	/*
+	 * GC workers.
+	 */
+	failed = 0;
+	for (i = 0; i < pflag; i++) {
+		if (pid_list[i] != 0) {
+			while (waitpid(pid_list[i], &status, 0) != pid_list[i]);
+			if (WEXITSTATUS(status) != 0)
+				failed = 1;
+		}
+	}
+	if (clock_gettime(CLOCK_REALTIME, &ts_finish) < 0)
+		err(-1, "clock_gettime");
+	size = sizeof(cp_time_finish);
+	if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_finish, &size, NULL, 0)
+	    < 0)
+		err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME);
+	timespecsub(&ts_finish, &ts_start);
+
+	if (failed)
+		errx(-1, "Too many errors");
+
+	printf("%jd bytes transferred in %jd.%09jd seconds\n", 
+	    (bflag * tflag * pflag), (intmax_t)ts_finish.tv_sec,
+	    (intmax_t)(ts_finish.tv_nsec));
+
+	if (Tflag)
+		printf("%d procs ", pflag);
+	if (Cflag) {
+		printf("%f cps%s", (double)(pflag * tflag)/
+		    (ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9),
+		    Tflag ? " " : "\n");
+	} else {
+		printf("%f Gbps%s", (double)(bflag * tflag * pflag * 8) /
+		    (ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9) * 1e-9,
+		    Tflag ? " " : "\n");
+	}
+	if (Tflag) {
+		ticks = 0;
+		for (i = 0; i < CPUSTATES; i++) {
+			cp_time_finish[i] -= cp_time_start[i];
+			ticks += cp_time_finish[i];
+		}
+		printf("user%% %lu nice%% %lu sys%% %lu intr%% %lu "
+		    "idle%% %lu\n",
+		    (100 * cp_time_finish[CP_USER]) / ticks,
+		    (100 * cp_time_finish[CP_NICE]) / ticks,
+		    (100 * cp_time_finish[CP_SYS]) / ticks,
+		    (100 * cp_time_finish[CP_INTR]) / ticks,
+		    (100 * cp_time_finish[CP_IDLE]) / ticks);
+	}
+}

Added: head/tools/tools/netrate/tcpp/tcpp_server.c
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ head/tools/tools/netrate/tcpp/tcpp_server.c	Tue Mar 10 14:52:17 2009	(r189623)
@@ -0,0 +1,340 @@
+/*-
+ * Copyright (c) 2008-2009 Robert N. M. Watson
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $FreeBSD$
+ */
+
+#include <sys/types.h>
+#include <sys/endian.h>
+#include <sys/event.h>
+#include <sys/resource.h>
+#include <sys/sched.h>
+#include <sys/socket.h>
+#include <sys/sysctl.h>
+#include <sys/time.h>
+#include <sys/wait.h>
+
+#include <netinet/in.h>
+
+#include <err.h>
+#include <fcntl.h>
+#include <inttypes.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "tcpp.h"
+
+/*
+ * Server side -- create a pool of processes, each listening on its own TCP
+ * port number for new connections.  The first 8 bytes of each connection
+ * will be a network byte order length, then there will be that number of
+ * bytes of data.  We use non-blocking sockets with kqueue to to avoid the
+ * overhead of threading or more than one process per processor, which makes
+ * things a bit awkward when dealing with data we care about.  As such, we
+ * read into a small character buffer which we then convert to a length once
+ * we have all the data.
+ */
+#define	CONNECTION_MAGIC	0x6392af27
+struct connection {
+	uint32_t	conn_magic;		/* Just magic. */
+	int		conn_fd;
+	struct tcpp_header	conn_header;	/* Header buffer. */
+	u_int		conn_header_len;	/* Bytes so far. */
+	u_int64_t	conn_data_len;		/* How much to sink. */
+	u_int64_t	conn_data_received;	/* How much so far. */
+};
+
+static pid_t			*pid_list;
+static int			 kq;
+
+static struct connection *
+tcpp_server_newconn(int listen_fd)
+{
+	struct connection *conn;
+	struct kevent kev;
+	int fd;
+
+	fd = accept(listen_fd, NULL, NULL);
+	if (fd < 0) {
+		warn("accept");
+		return (NULL);
+	}
+
+	if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0)
+		err(-1, "fcntl");
+
+	conn = malloc(sizeof(*conn));
+	if (conn == NULL)
+		return (NULL);
+	bzero(conn, sizeof(*conn));
+	conn->conn_magic = CONNECTION_MAGIC;
+	conn->conn_fd = fd;
+
+	/*
+	 * Register to read on the socket, and set our conn pointer as the
+	 * udata so we can find it quickly in the future.
+	 */
+	EV_SET(&kev, fd, EVFILT_READ, EV_ADD, 0, 0, conn);
+	if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0)
+		err(-1, "kevent");
+
+	return (conn);
+}
+
+static void
+tcpp_server_closeconn(struct connection *conn)
+{
+
+	/*
+	 * Kqueue cleans up after itself once we close the socket, and since
+	 * we are processing only one kevent at a time, we don't need to
+	 * worry about watching out for future kevents referring to it.
+	 *
+	 * ... right?
+	 */
+	close(conn->conn_fd);
+	bzero(conn, sizeof(*conn));
+	free(conn);
+}
+
+static u_char buffer[256*1024];	/* Buffer in which to sink data. */
+static void
+tcpp_server_handleconn(struct kevent *kev)
+{
+	struct connection *conn;
+	ssize_t len;
+
+	conn = kev->udata;
+	if (conn->conn_magic != CONNECTION_MAGIC)
+		errx(-1, "tcpp_server_handleconn: magic");
+
+	if (conn->conn_header_len < sizeof(conn->conn_header)) {
+		len = read(conn->conn_fd,
+		    ((u_char *)&conn->conn_header) + conn->conn_header_len,
+		    sizeof(conn->conn_header) - conn->conn_header_len);
+		if (len < 0) {
+			warn("tcpp_server_handleconn: header read");
+			tcpp_server_closeconn(conn);
+			return;
+		}
+		if (len == 0) {
+			warnx("tcpp_server_handleconn: header premature eof");
+			tcpp_server_closeconn(conn);
+			return;
+		}
+		conn->conn_header_len += len;
+		if (conn->conn_header_len == sizeof(conn->conn_header)) {
+			tcpp_header_decode(&conn->conn_header);
+			if (conn->conn_header.th_magic != TCPP_MAGIC) {
+				warnx("tcpp_server_handleconn: bad magic");
+				tcpp_server_closeconn(conn);
+				return;
+			}
+		}
+	} else {
+		/*
+		 * Drain up to a buffer from the connection, so that we pay
+		 * attention to other connections too.
+		 */
+		len = read(conn->conn_fd, buffer, sizeof(buffer));
+		if (len < 0) {
+			warn("tcpp_server_handleconn: data bad read");
+			tcpp_server_closeconn(conn);
+			return;
+		}
+		if (len == 0 && conn->conn_data_received <
+		    conn->conn_header.th_len) {
+			warnx("tcpp_server_handleconn: data premature eof");
+			tcpp_server_closeconn(conn);
+			return;
+		}
+		conn->conn_data_received += len;
+		if (conn->conn_data_received > conn->conn_header.th_len) {
+			warnx("tcpp_server_handleconn: too much data");
+			tcpp_server_closeconn(conn);
+			return;
+		}
+		if (conn->conn_data_received == conn->conn_header.th_len) {
+			/*
+			 * All is well.
+			 */
+			tcpp_server_closeconn(conn);
+			return;
+		}
+	}
+}
+
+static void
+tcpp_server_worker(int workernum)
+{
+	int i, listen_sock, numevents;
+	struct kevent kev, *kev_array;
+	int kev_bytes;
+#if defined(CPU_SETSIZE) && 0
+	cpu_set_t mask;
+	int ncpus;
+	ssize_t len;
+
+	len = sizeof(ncpus);
+	if (sysctlbyname(SYSCTLNAME_CPUS, &ncpus, &len, NULL, 0) < 0)
+		err(-1, "sysctlbyname: %s", SYSCTLNAME_CPUS);
+	if (len != sizeof(ncpus))
+		errx(-1, "sysctlbyname: %s: len %jd", SYSCTLNAME_CPUS,
+		    (intmax_t)len);
+
+	CPU_ZERO(&mask);
+	CPU_SET(workernum % ncpus, &mask);
+	if (sched_setaffinity(0, CPU_SETSIZE, &mask) < 0)
+		err(-1, "sched_setaffinity");
+#endif
+	setproctitle("tcpp_server %d", workernum);
+
+	/* Allow an extra kevent for the listen socket. */
+	kev_bytes = sizeof(*kev_array) * (mflag + 1);
+	kev_array = malloc(kev_bytes);
+	if (kev_array == NULL)
+		err(-1, "malloc");
+	bzero(kev_array, kev_bytes);
+
+	/* XXXRW: Want to set and pin the CPU here. */
+
+	/*
+	 * Add the worker number to the local port.
+	 */
+	localipbase.sin_port = htons(rflag + workernum);
+
+	listen_sock = socket(PF_INET, SOCK_STREAM, 0);
+	if (listen_sock < 0)
+		err(-1, "socket");
+	i = 1;
+	if (setsockopt(listen_sock, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i))
+	    < 0)
+		err(-1, "setsockopt");
+	i = 1;
+	if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i))
+	    < 0)
+		err(-1, "setsockopt");
+	if (bind(listen_sock, (struct sockaddr *)&localipbase,
+	    sizeof(localipbase)) < 0)
+		err(-1, "bind");
+	if (listen(listen_sock, 16384))
+		err(-1, "listen");
+	if (fcntl(listen_sock, F_SETFL, O_NONBLOCK) < 0)
+		err(-1, "fcntl");
+
+	kq = kqueue();
+	if (kq < 0)
+		err(-1, "kqueue");
+
+	EV_SET(&kev, listen_sock, EVFILT_READ, EV_ADD, 0, 0, NULL);
+	if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0)
+		err(-1, "kevent");
+
+	while ((numevents = kevent(kq, NULL, 0, kev_array, mflag + 1, NULL))
+	    > 0) {
+		for (i = 0; i < numevents; i++) {
+			if (kev_array[i].ident == (u_int)listen_sock)
+				(void)tcpp_server_newconn(listen_sock);
+			else
+				tcpp_server_handleconn(&kev_array[i]);
+		}
+	}
+	printf("Worker %d done\n", workernum);
+}
+
+void
+tcpp_server(void)
+{

*** DIFF OUTPUT TRUNCATED AT 1000 LINES ***


More information about the svn-src-head mailing list