Application layer classifier for ipfw

Daniel Dias Gonçalves ddg at yan.com.br
Wed Aug 20 02:45:25 UTC 2008


Ermal Luçi escreveu:
> On Sat, Aug 2, 2008 at 3:00 PM, Mike Makonnen <mtm at wubethiopia.com> wrote:
>   
>> Mike Makonnen wrote:
>>     
>>> Patrick Tracanelli wrote:
>>>       
>>>> To let you know of my current (real world) tests:
>>>>
>>>> - Wireless Internet Provider 1:
>>>>    - 4Mbit/s of Internet Traffic
>>>>    - Classifying default protocols + soulseek + ssh
>>>>    - Classifying 100Mbit/s of dump over ssh
>>>>
>>>> Results in:
>>>>    No latency added, very low CPU usage, no packets dropping.
>>>>
>>>> - Wireless ISP 2:
>>>>    - 21 Mbit/s of Internet Traffic
>>>>    - Classifying default protocols + soulseek + ssh
>>>>
>>>> Results in:
>>>>    No tcp or udp traffic at all; everything that gets diverted never
>>>> comes out of the divert socket, and ipfw-classifyd logs
>>>>
>>>> Aug  1 12:07:35 ourofino last message repeated 58 times
>>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: bittorrent
>>>> (rule 50000)
>>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: edonkey (rule
>>>> 50000)
>>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: fasttrack (rule
>>>> 50000)
>>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: gnutella (rule
>>>> 1000)
>>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: soulseek (rule
>>>> 50000)
>>>> Aug  1 12:17:54 ourofino ipfw-classifyd: Loaded Protocol: ssh   (rule
>>>> 50000)
>>>> Aug  1 12:18:28 ourofino ipfw-classifyd: unable to write to divert
>>>> socket: Operation not permitted
>>>> Aug  1 12:18:50 ourofino last message repeated 90 times
>>>>         
>>> Hmmm... this part means that the call to sendto(2) to write the packet
>>> back into network stack failed.  This explains why you are not seein g any
>>> traffic comming back out of the divert socket, but I don't see why it would
>>> suddenly fail with a permission error. Could this be a kernel bug?
>>>       
>>>> Aug  1 12:18:51 ourofino ipfw-classifyd: packet dropped: input queue full
>>>> Aug  1 12:19:11 ourofino last message repeated 94 times
>>>>
>>>> Raised queue len a lot (up to 40960), when the application starts it uses
>>>> up to 25% CPU and a second after that, CPU usage gets lower the 0.1%.
>>>>         
>>> This looks like a deadlock. If it weren't able to process packets fast
>>> enough the cpu usage should be high even as it's spewing "packet dropped"
>>> messages. Can you send me some more information like memory usage and the
>>> firewall script you are using? How much of the 21Mbits/s of traffic is P2P?
>>> If you reduce the number of protocols you are trying to match against does
>>> the behavior change? Using netstat -w1 -I<interface> can you tell me how
>>> many packets per second we're talking about for 4Mbits/s and 21Mbit/s? Also,
>>> the timestamps from the log file seem to show that the daemon is running for
>>> approx. 34 sec. before the first "unable to write to write to divert socket"
>>> message. Is it passing traffic during this time? Thanks.
>>>
>>> I've uploaded a newer version. Can you try that also please. It includes:
>>>  o SIGHUP forces it to re-read its configuration file
>>>  o rc.d script
>>>  o minor optimization (calls pthread_cond_signal with the mutex unlocked)
>>>  o code cleanup
>>>
>>> Also, for your convenience I have attached a patch against the earlier
>>> version that removes a debugging printf that should remove spammage to your
>>> log files (the current version has it removed already).
>>>
>>>       
>> Ooops, a few minutes after I sent this email I found a couple of bugs (one
>> major, and one minor). They were in the original tarball as well as the
>> newer one I uploaded earlier today. I've uploaded a fixed version of the
>> code. Can you try that instead please.
>>
>> Also, to help track down performance issues I've modified the Makefile to
>> build a profiled version of the application so you can use gprof(1) to
>> figure out where any problems lie.
>>
>>     
>
> Does this sound about right for implementing the GC and implementing syntax as
> $protocol = dnpipe 20
> $protocol2 = dnqueue 30
> it has some extra goos for pf(4) and altq(4)
> $protocol3 = queue $queue name
> $protocol4 = tag TAGNAME
> $protocol5 = action block
>
> It adds 2 new options -e seconds for seconds before a flow is
> considered expired and -n #packets proccessed before kicking the GC.
>
> --- classifyd_old.c	2008-08-09 00:33:04.000000000 +0000
> +++ classifyd.c	2008-08-09 00:33:34.000000000 +0000
> @@ -28,13 +28,17 @@
>
>  #include <sys/types.h>
>  #include <sys/socket.h>
> +#include <sys/ioctl.h>
> +#include <sys/time.h>
>
> +#include <net/if.h>
>  #include <arpa/inet.h>
>  #include <netinet/in.h>
>  #include <netinet/in_systm.h>
>  #include <netinet/ip.h>
>  #include <netinet/tcp.h>
>  #include <netinet/udp.h>
> +#include <net/pfvar.h>
>
>  #include <assert.h>
>  #include <err.h>
> @@ -53,6 +57,7 @@
>  #include <unistd.h>
>
>  #include "hashtable.h"
> +#include "hashtable_private.h"
>  #include "pathnames.h"
>  #include "protocols.h"
>
> @@ -94,6 +99,7 @@
>  	uint32_t if_datalen;	/* length in bytes of if_data */
>  	uint16_t if_pktcount;	/* number of packets concatenated */
>  	uint16_t if_fwrule;	/* ipfw(4) rule associated with flow */
> +	time_t	 expire;	/* flow expire time */
>  };
>
>  /*
> @@ -126,7 +132,7 @@
>  static struct ic_queue outQ;
>
>  /* divert(4) socket */
> -static int dvtS;
> +static int dvtS = 0;
>
>  /* config file path */
>  static const char *conf = IC_CONFIG_PATH;
> @@ -137,12 +143,25 @@
>  /* List of protocols available to the system */
>  struct ic_protocols *fp;
>
> +/* Our hashtables */
> +struct hashtable *sh = NULL,
> +		*th = NULL,
> +		*uh = NULL;
> +
> +/* signaled to kick garbage collector */
> +static pthread_cond_t  gq_condvar;
> +
> +/* number of packets before kicking garbage collector */
> +static unsigned int npackets = 250;
> +
> +static time_t time_expire = 40; /* 40 seconds */
>  /*
>   * Forward function declarations.
>   */
>  void		*classify_pthread(void *);
>  void		*read_pthread(void *);
>  void		*write_pthread(void *);
> +void		*garbage_pthread(void *);
>  static int	equalkeys(void *, void *);
>  static unsigned int hashfromkey(void *);
>  static void	test_re(void);
> @@ -155,7 +174,7 @@
>  {
>  	struct sockaddr_in addr;
>  	struct sigaction sa;
> -	pthread_t  classifytd, readtd, writetd;
> +	pthread_t  classifytd, readtd, writetd, garbagectd;
>  	const char *errstr;
>  	long long  num;
>  	uint16_t   port, qmaxsz;
> @@ -164,13 +183,27 @@
>  	tflag = 0;
>  	port = IC_DPORT;
>  	qmaxsz = IC_QMAXSZ;
> -	while ((ch = getopt(argc, argv, "htc:P:p:q:")) != -1) {
> +	while ((ch = getopt(argc, argv, "n:e:htc:P:p:q:")) != -1) {
>  		switch(ch) {
>  		case 'c':
>  			conf = strdup(optarg);
>  			if (conf == NULL)
>  				err(EX_TEMPFAIL, "config file path");
>  			break;
> +		case 'e':
> +			num = strtonum((const char *)optarg, 1, 400, &errstr);
> +			if (num == 0 && errstr != NULL) {
> +				errx(EX_USAGE, "invalud expire seconds: %s", errstr);	
> +			}
> +			time_expire = (time_t)num;
> +			break;
> +		case 'n':
> +                        num = strtonum((const char *)optarg, 1,
> 65535, &errstr);
> +                        if (num == 0 && errstr != NULL) {
> +                                errx(EX_USAGE, "invalud expire
> seconds: %s", errstr);
> +                        }
> +                        npackets = (unsigned int)num;
> +			break;
>  		case 'P':
>  			protoDir = strdup(optarg);
>  			if (protoDir == NULL)
> @@ -230,6 +263,9 @@
>  	error = pthread_cond_init(&outQ.fq_condvar, NULL);
>  	if (error != 0)
>  		err(EX_OSERR, "unable to initialize output queue condvar");
> +        error = pthread_cond_init(&gq_condvar, NULL);
> +        if (error != 0)
> +                err(EX_OSERR, "unable to initialize garbage collector
> condvar");
>
>  	/*
>  	 * Create and bind the divert(4) socket.
> @@ -276,32 +312,80 @@
>  	if (error == -1)
>  		err(EX_OSERR, "unable to set signal handler");
>
> +        /*
> +         * There are 3 tables: udp, tcp, and tcp syn.
> +         * The tcp syn table tracks connections for which a
> +         * SYN packet has been sent but no reply has been returned
> +         * yet. Once the SYN ACK reply is detected it is moved to
> +         * the regular tcp connection tracking table.
> +         */
> +        sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> +        if (sh == NULL) {
> +                syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
> +		error = EX_SOFTWARE;
> +		goto cleanup;
> +        }
> +        th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> +        if (th == NULL) {
> +                syslog(LOG_ERR, "unable to create TCP tracking table");
> +		error = EX_SOFTWARE;
> +                goto cleanup;
> +        }
> +        uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> +        if (uh == NULL) {
> +                syslog(LOG_ERR, "unable to create UDP tracking table");
> +		error = EX_SOFTWARE;
> +                goto cleanup;
> +        }
> +
>  	/*
>  	 * Create the various threads.
>  	 */
>  	error = pthread_create(&readtd, NULL, read_pthread, NULL);
> -	if (error != 0)
> -		err(EX_OSERR, "unable to create reader thread");
> +	if (error != 0) {
> +		syslog(LOG_ERR, "unable to create reader thread");
> +		error = EX_OSERR;
> +		goto cleanup;
> +	}
>  	error = pthread_create(&classifytd, NULL, classify_pthread, NULL);
> -	if (error != 0)
> -		err(EX_OSERR, "unable to create classifier thread");
> +	if (error != 0) {
> +		syslog(LOG_ERR, "unable to create classifier thread");
> +		error = EX_OSERR;
> +		goto cleanup;
> +	}
>  	error = pthread_create(&writetd, NULL, write_pthread, NULL);
> -	if (error != 0)
> -		err(EX_OSERR, "unable to create writer thread");
> -
> +	if (error != 0) {
> +		syslog(LOG_ERR, "unable to create writer thread");
> +		error = EX_OSERR;
> +		goto cleanup;
> +	}
> +        error = pthread_create(&garbagectd, NULL, garbage_pthread, NULL);
> +        if (error != 0) {
> +                syslog(LOG_ERR, "unable to create garbage collect thread");
> +		error = EX_OSERR;
> +		goto cleanup;
> +	}
>  	/*
>  	 * Wait for our threads to exit.
>  	 */
>  	pthread_join(readtd, NULL);
>  	pthread_join(classifytd, NULL);
>  	pthread_join(writetd, NULL);
> -
> +	pthread_join(garbagectd, NULL);
>  	/*
>  	 * Cleanup
>  	 */
> -	close(dvtS);
> +cleanup:
> +	if (dvtS > 0)
> +		close(dvtS);
> +	if (sh != NULL)
> +		hashtable_destroy(sh, 1);
> +	if (th != NULL)
> +		hashtable_destroy(th, 1);
> +	if (uh != NULL)
> +		hashtable_destroy(uh, 1);
>  	
> -	return (0);
> +	return (error);
>  }
>
>  void *
> @@ -310,6 +394,7 @@
>  	struct ic_pkt	   *pkt;
>  	struct ip *ipp;
>  	int	  len;
> +	unsigned int pcktcnt = 0;
>
>  	while (1) {
>  		pkt = (struct ic_pkt *)malloc(sizeof(struct ic_pkt));
> @@ -353,6 +438,10 @@
>  		STAILQ_INSERT_HEAD(&inQ.fq_pkthead, pkt, fp_link);
>  		inQ.fq_size++;
>  		pthread_mutex_unlock(&inQ.fq_mtx);
> +		if (++pcktcnt > npackets) {
> +			pcktcnt = 0;
> +			pthread_cond_signal(&gq_condvar);
> +		}
>  		pthread_cond_signal(&inQ.fq_condvar);
>  	}
>
> @@ -420,39 +509,19 @@
>  	struct tcphdr	 *tcp;
>  	struct udphdr	 *udp;
>  	struct ic_pkt	 *pkt;
> -	struct hashtable *sh, *th, *uh;
>  	struct protocol	 *proto;
> +	struct timeval	 tv;
>  	regmatch_t	 pmatch;
>  	u_char		 *data, *payload;
>  	uint16_t	 trycount;
>  	int		 datalen, error;
>
> -	/*
> -	 * There are 3 tables: udp, tcp, and tcp syn.
> -	 * The tcp syn table tracks connections for which a
> -	 * SYN packet has been sent but no reply has been returned
> -	 * yet. Once the SYN ACK reply is detected it is moved to
> -	 * the regular tcp connection tracking table.
> -	 */
> -	sh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> -	if (sh == NULL) {
> -		syslog(LOG_ERR, "unable to create TCP (SYN) tracking table");
> -		exit(EX_SOFTWARE);
> -	}
> -	th = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> -	if (th == NULL) {
> -		syslog(LOG_ERR, "unable to create TCP tracking table");
> -		exit(EX_SOFTWARE);
> -	}
> -	uh = create_hashtable(IC_HASHSZ, hashfromkey, equalkeys);
> -	if (uh == NULL) {
> -		syslog(LOG_ERR, "unable to create UDP tracking table");
> -		exit(EX_SOFTWARE);
> -	}
> -
>  	flow = NULL;
>  	key = NULL;
>  	while(1) {
> +		while(gettimeofday(&tv, NULL) != 0)
> +			;
> +
>  		pthread_mutex_lock(&inQ.fq_mtx);
>  		pkt = STAILQ_LAST(&inQ.fq_pkthead, ic_pkt, fp_link);
>  		while (pkt == NULL) {
> @@ -528,6 +597,8 @@
>  					free(pkt);
>  					continue;
>  				}
> +				
> +				flow->expire = tv.tv_sec;
>  				goto enqueue;
>  			/*
>  			 * Handle session tear-down.
> @@ -583,8 +654,11 @@
>  				 * collecting IC_PKTMAXMATCH packets, just pass it through.
>  				 */
>  				} else if (flow->if_pktcount >= IC_PKTMAXMATCH &&
> -				    flow->if_fwrule == 0)
> +				    flow->if_fwrule == 0) {
> +					flow->expire = tv.tv_sec;
>  					goto enqueue;
> +				}
> +				flow->expire = tv.tv_sec;
>  				goto classify;
>  			}
>
> @@ -630,6 +704,7 @@
>  					free(pkt);
>  					continue;
>  				}
> +				flow->expire = tv.tv_sec;
>  				goto classify;
>  			}
>
> @@ -688,6 +763,7 @@
>  				flow->if_datalen = datalen;
>  				flow->if_pktcount = 1;
>  				flow->if_fwrule = 0;
> +				flow->expire = tv.tv_sec;
>  				if (hashtable_insert(uh, (void *)key, (void *)flow) == 0) {
>  					syslog(LOG_WARNING,
>  					    "packet dropped: unable to insert into table");
> @@ -715,19 +791,26 @@
>  				flow->if_data = data;
>  				flow->if_datalen += datalen;
>  				flow->if_pktcount++;
> +				flow->expire = tv.tv_sec;
>  			/*
>  			 * If we haven't been able to classify this flow after
>  			 * collecting IC_PKTMAXMATCH packets, just pass it through.
>  			 */
>  			} else if (flow->if_pktcount >= IC_PKTMAXMATCH &&
> -			    flow->if_fwrule == 0)
> +			    flow->if_fwrule == 0) {
> +				flow->expire = tv.tv_sec;
>  				goto enqueue;
> +			}
>  		} else
>  			/* Not an TCP or UDP packet. */
>  			goto enqueue;
>
>  classify:
> -		assert(flow != NULL);
> +		if (flow == NULL) {
> +			syslog(LOG_ERR, "flow is null argghhhhhhh");
> +			goto enqueue;
> +		}
> +		//assert(flow != NULL);
>
>  		/*
>  		 * Inform divert(4) what rule to send it to by
> @@ -823,6 +906,80 @@
>  	return (NULL);
>  }
>
> +void *
> +garbage_pthread(void *arg __unused)
> +{
> +	char errbuf[LINE_MAX];
> +	struct entry *e, *f;
> +	unsigned int i, flows_expired, error;
> +	struct timeval tv;
> +
> +	while (1) {
> +		flows_expired = 0;
> +		while (gettimeofday(&tv, NULL) != 0)
> +			;
> +		tv.tv_sec -= time_expire;
> +
> +		pthread_mutex_lock(&inQ.fq_mtx);
> +                error = pthread_cond_wait(&gq_condvar, &inQ.fq_mtx);
> +                if (error != 0) {
> +                        strerror_r(error, errbuf, sizeof(errbuf));
> +                        syslog(EX_OSERR, "unable to wait on garbage
> collection: %s",
> +                                errbuf);
> +                        exit(EX_OSERR);
> +                }
> +
> +		for (i = 0; i < sh->tablelength; i++) {
> +			e = sh->table[i];
> +			while (e != NULL) {
> +				f = e; e = e->next;
> +				if (((struct ip_flow *)f->v)->expire < tv.tv_sec) {
> +					freekey(f->k);
> +					sh->entrycount--;
> +                                	if (f->v != NULL)
> +                                        	free(f->v);
> +                                	free(f);
> +					flows_expired++;
> +				}
> +			}
> +		}
> +                for (i = 0; i < th->tablelength; i++) {
> +                        e = th->table[i];
> +                        while (e != NULL) {
> +                                f = e; e = e->next;
> +                                if (((struct ip_flow *)f->v)->expire
> < tv.tv_sec) {
> +                                        freekey(f->k);
> +                                        th->entrycount--;
> +                                        if (f->v != NULL)
> +                                                free(f->v);
> +                                        free(f);
> +					flows_expired++;
> +                                }
> +                        }
> +                }
> +                for (i = 0; i < uh->tablelength; i++) {
> +                        e = uh->table[i];
> +                        while (e != NULL) {
> +                                f = e; e = e->next;
> +                                if (((struct ip_flow *)f->v)->expire
> < tv.tv_sec) {
> +                                        freekey(f->k);
> +                                        uh->entrycount--;
> +                                        if (f->v != NULL)
> +                                                free(f->v);
> +                                        free(f);
> +					flows_expired++;
> +                                }
> +                        }
> +                }
> +
> +		pthread_mutex_unlock(&inQ.fq_mtx);
> +		
> +		syslog(LOG_WARNING, "expired %u flows", flows_expired);
> +	}
> +
> +	return (NULL);
> +}
> +
>  /*
>   * NOTE: The protocol list (plist) passed as an argument is a global
>   *	 variable. It is accessed from 3 functions: classify_pthread,
> @@ -840,12 +997,20 @@
>  static int
>  read_config(const char *file, struct ic_protocols *plist)
>  {
> +	enum { bufsize = 2048 };
>  	struct protocol *proto;
>  	properties	props;
> -	const char	*errmsg, *name, *value;
> -	int		fd;
> +	const char	*errmsg, *name;
> +	char		*value;
> +	int		fd, fdpf;
>  	uint16_t	rule;
> +	char **ap, *argv[bufsize];
>
> +	fdpf = open("/dev/pf", O_RDONLY);
> +	if (fdpf == -1) {
> +		syslog(LOG_ERR, "unable to open /dev/pf");
> +		return (EX_OSERR);
> +	}
>  	fd = open(file, O_RDONLY);
>  	if (fd == -1) {
>  		syslog(LOG_ERR, "unable to open configuration file");
> @@ -863,10 +1028,48 @@
>  		/* Do not match traffic against this pattern */
>  		if (value == NULL)
>  			continue;
> -		rule = strtonum(value, 1, 65535, &errmsg);
> -		if (rule == 0) {
> +		for (ap = argv; (*ap = strsep(&value, " \t")) != NULL;)
> + 	       		if (**ap != '\0')
> +        	     		if (++ap >= &argv[bufsize])
> +                			break;
> +		if (!strncmp(argv[0], "queue", strlen("queue"))) {
> +			if (ioctl(fdpf, DIOCGETNAMEDALTQ, &rule)) {
> +				syslog(LOG_WARNING,
> +					"could not get ALTQ translation for"
> +					" queue %s", argv[1]);
> +				continue;
> +			}
> +			if (rule == 0) {
> +				syslog(LOG_WARNING,
> +					"queue %s does not exists!", argv[1]);
> +				continue;
> +			}
> +		} else if (!strncmp(argv[0], "dnqueue", strlen("dnqueue")))
> +			rule = strtonum(argv[1], 1, 65535, &errmsg);
> +		else if (!strncmp(argv[0], "dnpipe", strlen("dnpipe")))
> +			rule = strtonum(argv[1], 1, 65535, &errmsg);
> +		else if (!strncmp(argv[0], "tag", strlen("tag"))) {
> +                        if (ioctl(fdpf, DIOCGETNAMEDTAG, &rule)) {
> +                                syslog(LOG_WARNING,
> +                                        "could not get tag translation for"
> +                                        " queue %s", argv[1]);
> +                                continue;
> +                        }
> +                        if (rule == 0) {
> +                                syslog(LOG_WARNING,
> +                                        "tag %s does not exists!", argv[1]);
> +                                continue;
> +                        }
> +		} else if (!strncmp(argv[0], "action", strlen("action"))) {
> +			if (strncmp(argv[1], "block", strlen("block")))
> +				rule = PF_DROP;
> +			else if (strncmp(argv[1], "allow", strlen("allow")))
> +				rule = PF_PASS;
> +			else
> +				continue;
> +		} else {
>  			syslog(LOG_WARNING,
> -			    "invalid rule number for %s protocol: %s",
> +			    "invalid action specified for %s protocol: %s",
>  			    proto->p_name, errmsg);
>  			continue;
>  		}
> @@ -953,10 +1156,14 @@
>  static void
>  usage(const char *arg0)
>  {
> -	printf("usage: %s [-h] [-c file] [-p port] [-P dir] [-q length]\n",
> basename(arg0));
> +	printf("usage: %s [-h] [-c file] [-e seconds] [-n packets] "
> +		"[-p port] [-P dir] [-q length]\n", basename(arg0));
>  	printf("usage: %s -t -P dir\n", basename(arg0));
>  	printf(	"    -c file   : path to configuration file\n"
> +		"    -e secs   : number of seconds before a flow is expired\n"
>  		"    -h        : this help screen\n"
> +		"    -n packets: number of packets before the garbage collector"
> +			" tries to expire flows\n"
>  		"    -P dir    : directory containing protocol patterns\n"
>  		"    -p port   : port number of divert socket\n"
>  		"    -q length : max length (in packets) of in/out queues\n"
>
>   

Some progress in solution of presented problems ?

--
Daniel





More information about the freebsd-net mailing list