Application layer classifier for ipfw

Ermal Luçi ermal.luci at gmail.com
Fri Aug 8 23:26:34 UTC 2008


On Sat, Aug 2, 2008 at 3:00 PM, Mike Makonnen <mtm at wubethiopia.com> wrote:
> Mike Makonnen wrote:
>>
>> Patrick Tracanelli wrote:
>>>
>>> To let you know of my current (real world) tests:
>>>
>>> - Wireless Internet Provider 1:
>>>    - 4Mbit/s of Internet Traffic
>>>    - Classifying default protocols + soulseek + ssh
>>>    - Classifying 100Mbit/s of dump over ssh
>>>
>>> Results in:
>>>    No latency added, very low CPU usage, no packets dropping.
>>>
>>> - Wireless ISP 2:
>>>    - 21 Mbit/s of Internet Traffic
>>>    - Classifying default protocols + soulseek + ssh
>>>
>>> Results in:
>>>    No tcp or udp traffic at all; everything that gets diverted never
>>> comes out of the divert socket, and ipfw-classifyd logs
>>>
>>> Aug  1 12:07:35 ourofino last message repeated 58 times
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: bittorrent
>>> (rule 50000)
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: edonkey (rule
>>> 50000)
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: fasttrack (rule
>>> 50000)
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: gnutella (rule
>>> 1000)
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: soulseek (rule
>>> 50000)
>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: ssh   (rule
>>> 50000)
>>> Aug  1 12:18:28 ourofino ipfw-classifyd: unable to write to divert
>>> socket: Operation not permitted
>>> Aug  1 12:18:50 ourofino last message repeated 90 times
>>
>> Hmmm... this part means that the call to sendto(2) to write the packet
>> back into network stack failed.  This explains why you are not seein g any
>> traffic comming back out of the divert socket, but I don't see why it would
>> suddenly fail with a permission error. Could this be a kernel bug?
>>>
>>> Aug  1 12:18:51 ourofino ipfw-classifyd: packet dropped: input queue full
>>> Aug  1 12:19:11 ourofino last message repeated 94 times
>>>
>>> Raised queue len a lot (up to 40960), when the application starts it uses
>>> up to 25% CPU and a second after that, CPU usage gets lower the 0.1%.
>>
>> This looks like a deadlock. If it weren't able to process packets fast
>> enough the cpu usage should be high even as it's spewing "packet dropped"
>> messages. Can you send me some more information like memory usage and the
>> firewall script you are using? How much of the 21Mbits/s of traffic is P2P?
>> If you reduce the number of protocols you are trying to match against does
>> the behavior change? Using netstat -w1 -I<interface> can you tell me how
>> many packets per second we're talking about for 4Mbits/s and 21Mbit/s? Also,
>> the timestamps from the log file seem to show that the daemon is running for
>> approx. 34 sec. before the first "unable to write to write to divert socket"
>> message. Is it passing traffic during this time? Thanks.
>>
>> I've uploaded a newer version. Can you try that also please. It includes:
>>  o SIGHUP forces it to re-read its configuration file
>>  o rc.d script
>>  o minor optimization (calls pthread_cond_signal with the mutex unlocked)
>>  o code cleanup
>>
>> Also, for your convenience I have attached a patch against the earlier
>> version that removes a debugging printf that should remove spammage to your
>> log files (the current version has it removed already).
>>
>
> Ooops, a few minutes after I sent this email I found a couple of bugs (one
> major, and one minor). They were in the original tarball as well as the
> newer one I uploaded earlier today. I've uploaded a fixed version of the
> code. Can you try that instead please.
>
> Also, to help track down performance issues I've modified the Makefile to
> build a profiled version of the application so you can use gprof(1) to
> figure out where any problems lie.
>

Does this sound about right for implementing the GC and implementing syntax as
$protocol = dnpipe 20
$protocol2 = dnqueue 30
it has some extra goos for pf(4) and altq(4)
$protocol3 = queue $queue name
$protocol4 = tag TAGNAME
$protocol5 = action block

It adds 2 new options -e seconds for seconds before a flow is
considered expired and -n #packets proccessed before kicking the GC.

--- classifyd_old.c	2008-08-09 00:33:04.000000000 +0000
+++ classifyd.c	2008-08-09 00:33:34.000000000 +0000
@@ -28,13 +28,17 @@

 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <sys/time.h>

+#include <net/if.h>
 #include <arpa/inet.h>
 #include <netinet/in.h>
 #include <netinet/in_systm.h>
 #include <netinet/ip.h>
 #include <netinet/tcp.h>
 #include <netinet/udp.h>
+#include <net/pfvar.h>

 #include <assert.h>
 #include <err.h>
@@ -53,6 +57,7 @@
 #include <unistd.h>

 #include "hashtable.h"
+#include "hashtable_private.h"
 #include "pathnames.h"
 #include "protocols.h"

@@ -94,6 +99,7 @@
 	uint32_t if_datalen;	/* length in bytes of if_data */
 	uint16_t if_pktcount;	/* number of packets concatenated */
 	uint16_t if_fwrule;	/* ipfw(4) rule associated with flow */
+	time_t	 expire;	/* flow expire time */
 };

 /*
@@ -126,7 +132,7 @@
 static struct ic_queue outQ;

 /* divert(4) socket */
-static int dvtS;
+static int dvtS = 0;

 /* config file path */
 static const char *conf = IC_CONFIG_PATH;
@@ -137,12 +143,25 @@
 /* List of protocols available to the system */
 struct ic_protocols *fp;

+/* Our hashtables */
+struct hashtable *sh = NULL,
+		*th = NULL,
+		*uh = NULL;
+
+/* signaled to kick garbage collector */
+static pthread_cond_t  gq_condvar;
+
+/* number of packets before kicking garbage collector */
+static unsigned int npackets = 250;
+
+static time_t time_expire = 40; /* 40 seconds */
 /*
  * Forward function declarations.
  */
 void		*classify_pthread(void *);
 void		*read_pthread(void *);
 void		*write_pthread(void *);
+void		*garbage_pthread(void *);
 static int	equalkeys(void *, void *);
 static unsigned int hashfromkey(void *);
 static void	test_re(void);
@@ -155,7 +174,7 @@
 {
 	struct sockaddr_in addr;
 	struct sigaction sa;
-	pthread_t  classifytd, readtd, writetd;
+	pthread_t  classifytd, readtd, writetd, garbagectd;
 	const char *errstr;
 	long long  num;
 	uint16_t   port, qmaxsz;
@@ -164,13 +183,27 @@
 	tflag = 0;
 	port = IC_DPORT;
 	qmaxsz = IC_QMAXSZ;
-	while ((ch = getopt(argc, argv, "htc:P:p:q:")) != -1) {
+	while ((ch = getopt(argc, argv, "n:e:htc:P:p:q:")) != -1) {
 		switch(ch) {
 		case 'c':
 			conf = strdup(optarg);
 			if (conf == NULL)
 				err(EX_TEMPFAIL, "config file path");
 			break;
+		case 'e':
+			num = strtonum((const char *)optarg, 1, 400, &errstr);
+			if (num == 0 && errstr != NULL) {
+				errx(EX_USAGE, "invalud expire seconds: %s", errstr);	
+			}
+			time_expire = (time_t)num;
+			break;
+		case 'n':
+                        num = strtonum((const char *)optarg, 1,
65535, &errstr);
+                        if (num == 0 && errstr != NULL) {
+                                errx(EX_USAGE, "invalud expire
seconds: %s", errstr);
+                        }
+                        npackets = (unsigned int)num;
+			break;
 		case 'P':
 			protoDir = strdup(optarg);
 			if (protoDir == NULL)
@@ -230,6 +263,9 @@
 	error = pthread_cond_init(&outQ.fq_condvar, NULL);
 	if (error != 0)
 		err(EX_OSERR, "unable to initialize output queue condvar");
+        error = pthread_cond_init(&gq_condvar, NULL);
+        if (error != 0)
+                err(EX_OSERR, "unable to initialize garbage collector
condvar");

 	/*
 	 * Create and bind the divert(4) socket.
@@ -276,32 +312,80 @@
 	if (error == -1)
 		err(EX_OSERR, "unable to set signal handler");

+        /*
+         * There are 3 tables: udp, tcp, and tcp syn.
+         * The tcp syn table tracks connections for which a
+         * SYN packet has been sent but no reply has been returned
+         * yet. Once the SYN ACK reply is detected it is moved to
+         * the regular tcp connection tracking table.
+         */
+        sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+        if (sh == NULL) {
+                syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
+		error = EX_SOFTWARE;
+		goto cleanup;
+        }
+        th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+        if (th == NULL) {
+                syslog(LOG_ERR, "unable to create TCP tracking table");
+		error = EX_SOFTWARE;
+                goto cleanup;
+        }
+        uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
+        if (uh == NULL) {
+                syslog(LOG_ERR, "unable to create UDP tracking table");
+		error = EX_SOFTWARE;
+                goto cleanup;
+        }
+
 	/*
 	 * Create the various threads.
 	 */
 	error = pthread_create(&readtd, NULL, read_pthread, NULL);
-	if (error != 0)
-		err(EX_OSERR, "unable to create reader thread");
+	if (error != 0) {
+		syslog(LOG_ERR, "unable to create reader thread");
+		error = EX_OSERR;
+		goto cleanup;
+	}
 	error = pthread_create(&classifytd, NULL, classify_pthread, NULL);
-	if (error != 0)
-		err(EX_OSERR, "unable to create classifier thread");
+	if (error != 0) {
+		syslog(LOG_ERR, "unable to create classifier thread");
+		error = EX_OSERR;
+		goto cleanup;
+	}
 	error = pthread_create(&writetd, NULL, write_pthread, NULL);
-	if (error != 0)
-		err(EX_OSERR, "unable to create writer thread");
-
+	if (error != 0) {
+		syslog(LOG_ERR, "unable to create writer thread");
+		error = EX_OSERR;
+		goto cleanup;
+	}
+        error = pthread_create(&garbagectd, NULL, garbage_pthread, NULL);
+        if (error != 0) {
+                syslog(LOG_ERR, "unable to create garbage collect thread");
+		error = EX_OSERR;
+		goto cleanup;
+	}
 	/*
 	 * Wait for our threads to exit.
 	 */
 	pthread_join(readtd, NULL);
 	pthread_join(classifytd, NULL);
 	pthread_join(writetd, NULL);
-
+	pthread_join(garbagectd, NULL);
 	/*
 	 * Cleanup
 	 */
-	close(dvtS);
+cleanup:
+	if (dvtS > 0)
+		close(dvtS);
+	if (sh != NULL)
+		hashtable_destroy(sh, 1);
+	if (th != NULL)
+		hashtable_destroy(th, 1);
+	if (uh != NULL)
+		hashtable_destroy(uh, 1);
 	
-	return (0);
+	return (error);
 }

 void *
@@ -310,6 +394,7 @@
 	struct ic_pkt	   *pkt;
 	struct ip *ipp;
 	int	  len;
+	unsigned int pcktcnt = 0;

 	while (1) {
 		pkt = (struct ic_pkt *)malloc(sizeof(struct ic_pkt));
@@ -353,6 +438,10 @@
 		STAILQ_INSERT_HEAD(&inQ.fq_pkthead, pkt, fp_link);
 		inQ.fq_size++;
 		pthread_mutex_unlock(&inQ.fq_mtx);
+		if (++pcktcnt > npackets) {
+			pcktcnt = 0;
+			pthread_cond_signal(&gq_condvar);
+		}
 		pthread_cond_signal(&inQ.fq_condvar);
 	}

@@ -420,39 +509,19 @@
 	struct tcphdr	 *tcp;
 	struct udphdr	 *udp;
 	struct ic_pkt	 *pkt;
-	struct hashtable *sh, *th, *uh;
 	struct protocol	 *proto;
+	struct timeval	 tv;
 	regmatch_t	 pmatch;
 	u_char		 *data, *payload;
 	uint16_t	 trycount;
 	int		 datalen, error;

-	/*
-	 * There are 3 tables: udp, tcp, and tcp syn.
-	 * The tcp syn table tracks connections for which a
-	 * SYN packet has been sent but no reply has been returned
-	 * yet. Once the SYN ACK reply is detected it is moved to
-	 * the regular tcp connection tracking table.
-	 */
-	sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
-	if (sh == NULL) {
-		syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
-		exit(EX_SOFTWARE);
-	}
-	th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
-	if (th == NULL) {
-		syslog(LOG_ERR, "unable to create TCP tracking table");
-		exit(EX_SOFTWARE);
-	}
-	uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
-	if (uh == NULL) {
-		syslog(LOG_ERR, "unable to create UDP tracking table");
-		exit(EX_SOFTWARE);
-	}
-
 	flow = NULL;
 	key = NULL;
 	while(1) {
+		while(gettimeofday(&tv, NULL) != 0)
+			;
+
 		pthread_mutex_lock(&inQ.fq_mtx);
 		pkt = STAILQ_LAST(&inQ.fq_pkthead, ic_pkt, fp_link);
 		while (pkt == NULL) {
@@ -528,6 +597,8 @@
 					free(pkt);
 					continue;
 				}
+				
+				flow->expire = tv.tv_sec;
 				goto enqueue;
 			/*
 			 * Handle session tear-down.
@@ -583,8 +654,11 @@
 				 * collecting IC_PKTMAXMATCH packets, just pass it through.
 				 */
 				} else if (flow->if_pktcount >= IC_PKTMAXMATCH &&
-				    flow->if_fwrule == 0)
+				    flow->if_fwrule == 0) {
+					flow->expire = tv.tv_sec;
 					goto enqueue;
+				}
+				flow->expire = tv.tv_sec;
 				goto classify;
 			}

@@ -630,6 +704,7 @@
 					free(pkt);
 					continue;
 				}
+				flow->expire = tv.tv_sec;
 				goto classify;
 			}

@@ -688,6 +763,7 @@
 				flow->if_datalen = datalen;
 				flow->if_pktcount = 1;
 				flow->if_fwrule = 0;
+				flow->expire = tv.tv_sec;
 				if (hashtable_insert(uh, (void *)key, (void *)flow) == 0) {
 					syslog(LOG_WARNING,
 					    "packet dropped: unable to insert into table");
@@ -715,19 +791,26 @@
 				flow->if_data = data;
 				flow->if_datalen += datalen;
 				flow->if_pktcount++;
+				flow->expire = tv.tv_sec;
 			/*
 			 * If we haven't been able to classify this flow after
 			 * collecting IC_PKTMAXMATCH packets, just pass it through.
 			 */
 			} else if (flow->if_pktcount >= IC_PKTMAXMATCH &&
-			    flow->if_fwrule == 0)
+			    flow->if_fwrule == 0) {
+				flow->expire = tv.tv_sec;
 				goto enqueue;
+			}
 		} else
 			/* Not an TCP or UDP packet. */
 			goto enqueue;

 classify:
-		assert(flow != NULL);
+		if (flow == NULL) {
+			syslog(LOG_ERR, "flow is null argghhhhhhh");
+			goto enqueue;
+		}
+		//assert(flow != NULL);

 		/*
 		 * Inform divert(4) what rule to send it to by
@@ -823,6 +906,80 @@
 	return (NULL);
 }

+void *
+garbage_pthread(void *arg __unused)
+{
+	char errbuf[LINE_MAX];
+	struct entry *e, *f;
+	unsigned int i, flows_expired, error;
+	struct timeval tv;
+
+	while (1) {
+		flows_expired = 0;
+		while (gettimeofday(&tv, NULL) != 0)
+			;
+		tv.tv_sec -= time_expire;
+
+		pthread_mutex_lock(&inQ.fq_mtx);
+                error = pthread_cond_wait(&gq_condvar, &inQ.fq_mtx);
+                if (error != 0) {
+                        strerror_r(error, errbuf, sizeof(errbuf));
+                        syslog(EX_OSERR, "unable to wait on garbage
collection: %s",
+                                errbuf);
+                        exit(EX_OSERR);
+                }
+
+		for (i = 0; i < sh->tablelength; i++) {
+			e = sh->table[i];
+			while (e != NULL) {
+				f = e; e = e->next;
+				if (((struct ip_flow *)f->v)->expire < tv.tv_sec) {
+					freekey(f->k);
+					sh->entrycount--;
+                                	if (f->v != NULL)
+                                        	free(f->v);
+                                	free(f);
+					flows_expired++;
+				}
+			}
+		}
+                for (i = 0; i < th->tablelength; i++) {
+                        e = th->table[i];
+                        while (e != NULL) {
+                                f = e; e = e->next;
+                                if (((struct ip_flow *)f->v)->expire
< tv.tv_sec) {
+                                        freekey(f->k);
+                                        th->entrycount--;
+                                        if (f->v != NULL)
+                                                free(f->v);
+                                        free(f);
+					flows_expired++;
+                                }
+                        }
+                }
+                for (i = 0; i < uh->tablelength; i++) {
+                        e = uh->table[i];
+                        while (e != NULL) {
+                                f = e; e = e->next;
+                                if (((struct ip_flow *)f->v)->expire
< tv.tv_sec) {
+                                        freekey(f->k);
+                                        uh->entrycount--;
+                                        if (f->v != NULL)
+                                                free(f->v);
+                                        free(f);
+					flows_expired++;
+                                }
+                        }
+                }
+
+		pthread_mutex_unlock(&inQ.fq_mtx);
+		
+		syslog(LOG_WARNING, "expired %u flows", flows_expired);
+	}
+
+	return (NULL);
+}
+
 /*
  * NOTE: The protocol list (plist) passed as an argument is a global
  *	 variable. It is accessed from 3 functions: classify_pthread,
@@ -840,12 +997,20 @@
 static int
 read_config(const char *file, struct ic_protocols *plist)
 {
+	enum { bufsize = 2048 };
 	struct protocol *proto;
 	properties	props;
-	const char	*errmsg, *name, *value;
-	int		fd;
+	const char	*errmsg, *name;
+	char		*value;
+	int		fd, fdpf;
 	uint16_t	rule;
+	char **ap, *argv[bufsize];

+	fdpf = open("/dev/pf", O_RDONLY);
+	if (fdpf == -1) {
+		syslog(LOG_ERR, "unable to open /dev/pf");
+		return (EX_OSERR);
+	}
 	fd = open(file, O_RDONLY);
 	if (fd == -1) {
 		syslog(LOG_ERR, "unable to open configuration file");
@@ -863,10 +1028,48 @@
 		/* Do not match traffic against this pattern */
 		if (value == NULL)
 			continue;
-		rule = strtonum(value, 1, 65535, &errmsg);
-		if (rule == 0) {
+		for (ap = argv; (*ap = strsep(&value, " \t")) != NULL;)
+ 	       		if (**ap != '\0')
+        	     		if (++ap >= &argv[bufsize])
+                			break;
+		if (!strncmp(argv[0], "queue", strlen("queue"))) {
+			if (ioctl(fdpf, DIOCGETNAMEDALTQ, &rule)) {
+				syslog(LOG_WARNING,
+					"could not get ALTQ translation for"
+					" queue %s", argv[1]);
+				continue;
+			}
+			if (rule == 0) {
+				syslog(LOG_WARNING,
+					"queue %s does not exists!", argv[1]);
+				continue;
+			}
+		} else if (!strncmp(argv[0], "dnqueue", strlen("dnqueue")))
+			rule = strtonum(argv[1], 1, 65535, &errmsg);
+		else if (!strncmp(argv[0], "dnpipe", strlen("dnpipe")))
+			rule = strtonum(argv[1], 1, 65535, &errmsg);
+		else if (!strncmp(argv[0], "tag", strlen("tag"))) {
+                        if (ioctl(fdpf, DIOCGETNAMEDTAG, &rule)) {
+                                syslog(LOG_WARNING,
+                                        "could not get tag translation for"
+                                        " queue %s", argv[1]);
+                                continue;
+                        }
+                        if (rule == 0) {
+                                syslog(LOG_WARNING,
+                                        "tag %s does not exists!", argv[1]);
+                                continue;
+                        }
+		} else if (!strncmp(argv[0], "action", strlen("action"))) {
+			if (strncmp(argv[1], "block", strlen("block")))
+				rule = PF_DROP;
+			else if (strncmp(argv[1], "allow", strlen("allow")))
+				rule = PF_PASS;
+			else
+				continue;
+		} else {
 			syslog(LOG_WARNING,
-			    "invalid rule number for %s protocol: %s",
+			    "invalid action specified for %s protocol: %s",
 			    proto->p_name, errmsg);
 			continue;
 		}
@@ -953,10 +1156,14 @@
 static void
 usage(const char *arg0)
 {
-	printf("usage: %s [-h] [-c file] [-p port] [-P dir] [-q length]\n",
basename(arg0));
+	printf("usage: %s [-h] [-c file] [-e seconds] [-n packets] "
+		"[-p port] [-P dir] [-q length]\n", basename(arg0));
 	printf("usage: %s -t -P dir\n", basename(arg0));
 	printf(	"    -c file   : path to configuration file\n"
+		"    -e secs   : number of seconds before a flow is expired\n"
 		"    -h        : this help screen\n"
+		"    -n packets: number of packets before the garbage collector"
+			" tries to expire flows\n"
 		"    -P dir    : directory containing protocol patterns\n"
 		"    -p port   : port number of divert socket\n"
 		"    -q length : max length (in packets) of in/out queues\n"

-- 
Ermal


More information about the freebsd-net mailing list