PERFORCE change 141820 for review
Aaron Meihm
alm at FreeBSD.org
Sun May 18 20:13:09 UTC 2008
http://perforce.freebsd.org/chv.cgi?CH=141820
Change 141820 by alm at alm_praetorian on 2008/05/18 20:12:18
Remove some old cruft, support for all transport methods
is now implemented (pipe, net, trail).
Affected files ...
.. //depot/projects/trustedbsd/netauditd/conf.c#9 edit
.. //depot/projects/trustedbsd/netauditd/conf.h#2 edit
.. //depot/projects/trustedbsd/netauditd/grammar.y#4 edit
.. //depot/projects/trustedbsd/netauditd/netauditd.c#18 edit
.. //depot/projects/trustedbsd/netauditd/netauditd.conf#6 edit
.. //depot/projects/trustedbsd/netauditd/netauditd.h#15 edit
.. //depot/projects/trustedbsd/netauditd/reader.c#5 edit
.. //depot/projects/trustedbsd/netauditd/reader.h#3 edit
.. //depot/projects/trustedbsd/netauditd/token.l#2 edit
.. //depot/projects/trustedbsd/netauditd/writer.c#6 edit
.. //depot/projects/trustedbsd/netauditd/writer.h#3 edit
Differences ...
==== //depot/projects/trustedbsd/netauditd/conf.c#9 (text+ko) ====
@@ -23,6 +23,7 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
+
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/queue.h>
@@ -59,11 +60,11 @@
}
struct au_cmpnt *
-conf_get_src(char *name)
+conf_get_reader(char *name)
{
struct au_cmpnt *ret;
- TAILQ_FOREACH(ret, &ac_list_src, ac_glue) {
+ TAILQ_FOREACH(ret, &ac_list_readers, ac_glue) {
if (strcmp(ret->ac_name, name) == 0)
return (ret);
}
@@ -71,11 +72,11 @@
}
struct au_cmpnt *
-conf_get_dst(char *name)
+conf_get_writer(char *name)
{
struct au_cmpnt *ret;
- TAILQ_FOREACH(ret, &ac_list_dst, ac_glue) {
+ TAILQ_FOREACH(ret, &ac_list_writers, ac_glue) {
if (strcmp(ret->ac_name, name) == 0)
return (ret);
}
@@ -87,13 +88,15 @@
{
if ((src == NULL) || (dst == NULL))
conf_error("A component specified does not exist");
- src->ac_ndsts++;
- if (src->ac_dsts == NULL)
- src->ac_dsts = malloc(sizeof(struct au_cmpnt *));
+ src->ac_nwriters++;
+ if (src->ac_writers == NULL)
+ src->ac_writers = malloc(sizeof(struct au_cmpnt *));
else
- src->ac_dsts = realloc(src->ac_dsts,
- sizeof(struct au_cmpnt *) * src->ac_ndsts);
- src->ac_dsts[src->ac_ndsts - 1] = dst;
+ src->ac_writers = realloc(src->ac_writers,
+ sizeof(struct au_cmpnt *) * src->ac_nwriters);
+ if (src->ac_writers == NULL)
+ exit(2);
+ src->ac_writers[src->ac_nwriters - 1] = dst;
}
void
==== //depot/projects/trustedbsd/netauditd/conf.h#2 (text+ko) ====
@@ -27,8 +27,8 @@
extern int lineno;
void conf_error(char *, ...);
-struct au_cmpnt *conf_get_src(char *);
-struct au_cmpnt *conf_get_dst(char *);
+struct au_cmpnt *conf_get_reader(char *);
+struct au_cmpnt *conf_get_writer(char *);
void conf_link(struct au_cmpnt *, struct au_cmpnt *);
void conf_load(char *);
void yyerror(const char *);
==== //depot/projects/trustedbsd/netauditd/grammar.y#4 (text+ko) ====
@@ -24,6 +24,7 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
+
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/queue.h>
@@ -32,6 +33,7 @@
#include <string.h>
#include <unistd.h>
#include <netdb.h>
+#include <pthread.h>
#include <assert.h>
#include "conf.h"
@@ -39,10 +41,9 @@
#include "reader.h"
#include "writer.h"
-#define AU_CMPNT_INIT(x) x = malloc(sizeof(struct au_cmpnt)); \
- assert (x != NULL); \
- bzero(x, sizeof(struct au_cmpnt)); \
- TAILQ_INIT(&x->ac_sbuffers);
+#define AU_CMPNT_INIT(x) x = calloc(1, sizeof(struct au_cmpnt)); \
+ if (x == NULL) \
+ exit(2);
static int ainfo_passive;
@@ -55,10 +56,10 @@
struct au_cmpnt *ac;
}
-%token SRC DST MAP NEWLINE STRING PIPE_TOKEN NET_TOKEN TRAIL_TOKEN
+%token READ WRITE MAP NEWLINE STRING PIPE_TOKEN NET_TOKEN TRAIL_TOKEN
%type <str> STRING
-%type <ac> src_component_spec dst_component_spec src_component_name
-%type <ac> dst_component_name
+%type <ac> read_component_spec write_component_spec read_component_name
+%type <ac> write_component_name
%type <ai> ainfo;
%%
@@ -69,17 +70,17 @@
;
cmd :
- SRC src_component_spec
+ READ read_component_spec
{
- TAILQ_INSERT_TAIL(&ac_list_src, $2, ac_glue);
+ TAILQ_INSERT_TAIL(&ac_list_readers, $2, ac_glue);
}
|
- DST dst_component_spec
+ WRITE write_component_spec
{
- TAILQ_INSERT_TAIL(&ac_list_dst, $2, ac_glue);
+ TAILQ_INSERT_TAIL(&ac_list_writers, $2, ac_glue);
}
|
- MAP src_component_name dst_component_name
+ MAP read_component_name write_component_name
{
conf_link($2, $3);
}
@@ -90,54 +91,65 @@
nl NEWLINE
;
-src_component_name :
+read_component_name :
STRING
{
struct au_cmpnt *ret;
- ret = conf_get_src($1);
+ ret = conf_get_reader($1);
free($1);
$$ = ret;
}
;
-dst_component_name :
+write_component_name :
STRING
{
struct au_cmpnt *ret;
- ret = conf_get_dst($1);
+ ret = conf_get_writer($1);
free($1);
$$ = ret;
}
;
-dst_component_spec :
+write_component_spec :
NET_TOKEN { ainfo_passive = 0; } STRING ainfo
{
struct au_cmpnt *new;
+ int ret;
AU_CMPNT_INIT(new);
new->ac_type = COMPONENT_NET;
new->ac_name = $3;
new->ac_ainfo = $4;
new->ac_init_func = writer_init_net;
- writer_q_init(new);
+ new->ac_write_func = writer_write_fd;
+ ret = pthread_mutex_init(&new->ac_writer_lock, NULL);
+ assert(ret == 0);
+ ret = pthread_cond_init(&new->ac_writer_cond, NULL);
+ assert(ret == 0);
+ TAILQ_INIT(&new->ac_writer_q);
$$ = new;
}
|
TRAIL_TOKEN STRING STRING
{
struct au_cmpnt *new;
+ int ret;
AU_CMPNT_INIT(new);
new->ac_type = COMPONENT_TRAIL;
new->ac_name = $2;
new->ac_path = $3;
new->ac_init_func = writer_init_trail;
- new->ac_write_func = writer_write_trail;
- writer_q_init(new);
+ new->ac_write_func = writer_write_fd;
+ ret = pthread_mutex_init(&new->ac_writer_lock, NULL);
+ assert(ret == 0);
+ ret = pthread_cond_init(&new->ac_writer_cond, NULL);
+ assert(ret == 0);
+ TAILQ_INIT(&new->ac_writer_q);
$$ = new;
}
;
-src_component_spec :
+read_component_spec :
PIPE_TOKEN STRING STRING
{
struct au_cmpnt *new;
==== //depot/projects/trustedbsd/netauditd/netauditd.c#18 (text+ko) ====
@@ -23,6 +23,7 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
+
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/queue.h>
@@ -32,10 +33,9 @@
#include <string.h>
#include <stdarg.h>
#include <unistd.h>
-#include <netdb.h>
-#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
+#include <assert.h>
#include "conf.h"
#include "netauditd.h"
@@ -43,10 +43,9 @@
#include "writer.h"
static int debug_flag;
-pthread_mutex_t debug_mutex;
-ac_head_t ac_list_src;
-ac_head_t ac_list_dst;
-
+static pthread_mutex_t debug_mutex;
+ac_head_t ac_list_readers;
+ac_head_t ac_list_writers;
extern char *conf_path;
void
@@ -54,27 +53,28 @@
{
char buf[1024];
va_list ap;
+ int r;
if (!debug_flag)
return;
- if (pthread_mutex_lock(&debug_mutex) != 0)
- exit(2);
+ r = pthread_mutex_lock(&debug_mutex);
+ assert(r == 0);
va_start(ap, fmt);
(void) vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
(void) printf("debug: %s\n", buf);
- if (pthread_mutex_unlock(&debug_mutex) != 0)
- exit(2);
+ r = pthread_mutex_unlock(&debug_mutex);
+ assert(r == 0);
}
int
main(int argc, char *argv[])
{
- pthread_t writer_thread;
char ch;
+ int r;
- if (pthread_mutex_init(&debug_mutex, NULL) != 0)
- exit(2);
+ r = pthread_mutex_init(&debug_mutex, NULL);
+ assert(r == 0);
conf_path = DEFAULT_CONF_PATH;
while ((ch = getopt(argc, argv, "df:h")) != -1) {
switch (ch) {
@@ -91,29 +91,13 @@
}
}
(void) signal(SIGPIPE, SIG_IGN);
- TAILQ_INIT(&ac_list_src);
- TAILQ_INIT(&ac_list_dst);
+ TAILQ_INIT(&ac_list_readers);
+ TAILQ_INIT(&ac_list_writers);
conf_load(conf_path);
- if (pthread_create(&writer_thread, NULL, writer_start, NULL) != 0)
- exit(2);
reader_start();
return (0);
}
-int
-nonblock(int fd)
-{
- int flags;
-
- flags = fcntl(fd, F_GETFL);
- if (flags == -1)
- return (-1);
- flags |= O_NONBLOCK;
- if (fcntl(fd, F_SETFL, flags) == -1)
- return (-1);
- return (0);
-}
-
void
usage()
{
==== //depot/projects/trustedbsd/netauditd/netauditd.conf#6 (text+ko) ====
@@ -1,7 +1,7 @@
-src: p source_pipe /dev/auditpipe
-src: n source_net 0.0.0.0 6655
-dst: n dst_net 127.0.0.1 6655
-dst: t dst_trail /tmp/trail
+read: p source_pipe /dev/auditpipe
+read: n source_net 0.0.0.0 6655
+write: n dst_net 127.0.0.1 6655
+write: t dst_trail /tmp/trail
map: source_pipe dst_net
map: source_net dst_trail
==== //depot/projects/trustedbsd/netauditd/netauditd.h#15 (text+ko) ====
@@ -24,6 +24,7 @@
* SUCH DAMAGE.
*/
+#define AU_BUFFER_SIZE 8192
#define DEFAULT_CONF_PATH "/usr/local/etc/netauditd.conf"
enum {
@@ -32,68 +33,58 @@
COMPONENT_TRAIL
};
-#define FLAG_ONLINE 1 /* Component is online */
-#define FLAG_CONNECTING (1 << 1) /* Component connecting */
-
struct audit_record {
void *ar_buf;
int ar_count;
u_int32_t ar_record_len;
};
-struct au_queue_ent {
- TAILQ_ENTRY(au_queue_ent) aq_glue;
- struct audit_record aq_record;
- u_int32_t aq_remain;
+struct au_buffer {
+ u_char ab_buf[AU_BUFFER_SIZE];
+ size_t ab_size;
+ TAILQ_ENTRY(au_buffer) ab_glue;
};
-typedef TAILQ_HEAD(, au_queue_ent) au_q_t;
+typedef TAILQ_HEAD(, au_buffer) ab_q_t;
-struct au_qpair {
- au_q_t qp_a, qp_b;
- pthread_mutex_t qp_lock;
- au_q_t *qp_store;
- au_q_t *qp_hold;
- au_q_t *qp_free;
- u_int32_t qp_store_len;
- int qp_store_n;
-};
-
struct au_cmpnt {
struct addrinfo *ac_ainfo;
- struct au_cmpnt **ac_dsts;
- time_t ac_failed;
+ struct au_cmpnt **ac_writers;
+ int ac_nwriters;
+ int ac_connected;
int ac_fd;
- int ac_flags;
- TAILQ_ENTRY(au_cmpnt) ac_glue;
char *ac_name;
- int ac_ndsts;
char *ac_path;
- struct au_qpair ac_q;
int ac_type;
+ struct au_buffer ac_buffer;
- TAILQ_HEAD(, au_src_buffer) ac_sbuffers;
+ pthread_mutex_t ac_writer_lock;
+ pthread_cond_t ac_writer_cond;
+ int ac_writer_nbufs;
+ u_int32_t ac_writer_index;
+ ab_q_t ac_writer_q;
int (*ac_init_func)(struct au_cmpnt *);
int (*ac_read_func)(struct au_cmpnt *);
int (*ac_write_func)(struct au_cmpnt *);
+
+ TAILQ_ENTRY(au_cmpnt) ac_glue;
};
-struct au_src_buffer {
- struct sockaddr as_addr;
- socklen_t as_addrlen;
- int as_fd;
- TAILQ_ENTRY(au_src_buffer) as_glue;
- u_char as_header[5];
- struct au_cmpnt *as_parent;
- u_int32_t as_nread;
- struct audit_record *as_record;
+struct au_client {
+ struct sockaddr auc_addr;
+ socklen_t auc_addrlen;
+ int auc_fd;
+ u_char auc_header[5];
+ struct au_cmpnt *auc_parent;
+ u_int32_t auc_nread;
+ struct audit_record *auc_record;
+ struct au_buffer auc_buffer;
};
typedef TAILQ_HEAD(, au_cmpnt) ac_head_t;
-extern ac_head_t ac_list_src;
-extern ac_head_t ac_list_dst;
+extern ac_head_t ac_list_readers;
+extern ac_head_t ac_list_writers;
void dprintf(char *, ...);
-int nonblock(int);
void usage(void);
==== //depot/projects/trustedbsd/netauditd/reader.c#5 (text+ko) ====
@@ -23,6 +23,7 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
+
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/queue.h>
@@ -44,112 +45,166 @@
#include "reader.h"
#include "writer.h"
-#define SRC_BUFFER_INIT(x) do { \
- x = calloc(1, sizeof(struct au_src_buffer)); \
- assert(x != NULL); \
-} while (0)
-
-#define WRITER_SIGNAL(x) do { \
- (void) pthread_mutex_lock(&ready_lock); \
- records_waiting += x; \
- x = 0; \
- (void) pthread_cond_signal(&ready_cond); \
- (void) pthread_mutex_unlock(&ready_lock); \
-} while (0)
-
-#define ROTATE() do { \
- assert(ac->ac_q.qp_free != NULL); \
- ac->ac_q.qp_hold = ac->ac_q.qp_store; \
- ac->ac_q.qp_store = ac->ac_q.qp_free; \
- ac->ac_q.qp_free = NULL; \
- ac->ac_q.qp_store_len = 0; \
-} while (0)
-
-pthread_mutex_t ready_lock;
-pthread_cond_t ready_cond;
-u_int32_t records_waiting;
-
-static fd_set rfds;
-
int
reader_accept_client(struct au_cmpnt *ac)
{
- struct au_src_buffer *new;
+ struct au_client *new;
struct sockaddr addr;
socklen_t addrlen;
- int fd;
+ int fd, ret;
+ pthread_t p;
fd = accept(ac->ac_fd, &addr, &addrlen);
if (fd == -1) {
- if ((errno != EINTR) && (errno != EWOULDBLOCK) &&
- (errno != ECONNABORTED))
+ if ((errno == EINTR) || (errno == ECONNABORTED))
+ return (0);
+ else
return (-1);
- else
- return (0);
}
- SRC_BUFFER_INIT(new);
- new->as_parent = ac;
- new->as_fd = fd;
- bcopy(&addr, &new->as_addr, sizeof(new->as_addr));
- new->as_addrlen = addrlen;
- TAILQ_INSERT_TAIL(&ac->ac_sbuffers, new, as_glue);
- reader_build_rfds(&rfds);
+ new = calloc(1, sizeof(struct au_client));
+ if (new == NULL)
+ exit(2);
+ new->auc_parent = ac;
+ new->auc_fd = fd;
+ bcopy(&addr, &new->auc_addr, sizeof(new->auc_addr));
+ new->auc_addrlen = addrlen;
+ ret = pthread_create(&p, NULL, reader_client_handler, new);
+ assert(ret == 0);
+ (void) pthread_detach(p);
+ dprintf("reader_accept_client(%s): new client %d", ac->ac_name,
+ new->auc_fd);
return (0);
}
void
-reader_build_rfds(fd_set *rfds)
+reader_push(struct au_cmpnt *ac, struct au_buffer *ab)
{
- struct au_cmpnt *ac;
- struct au_src_buffer *s;
+ struct au_buffer *clone;
+ struct au_cmpnt *dst;
+ int ret, i;
- FD_ZERO(rfds);
- TAILQ_FOREACH(ac, &ac_list_src, ac_glue) {
- FD_SET(ac->ac_fd, rfds);
- if (ac->ac_type == COMPONENT_NET) {
- TAILQ_FOREACH(s, &ac->ac_sbuffers, as_glue)
- FD_SET(s->as_fd, rfds);
- }
+ if (ab->ab_size <= 0)
+ return;
+ for (i = 0; i < ac->ac_nwriters; i++) {
+ dst = ac->ac_writers[i];
+ clone = malloc(sizeof(struct au_buffer));
+ bcopy(ab, clone, sizeof(struct au_buffer));
+ ret = pthread_mutex_lock(&dst->ac_writer_lock);
+ assert(ret == 0);
+ TAILQ_INSERT_TAIL(&dst->ac_writer_q, clone, ab_glue);
+ dst->ac_writer_nbufs++;
+ dprintf("reader_push(%s): %s (%d)", ac->ac_name,
+ dst->ac_name, dst->ac_writer_nbufs);
+ clone = NULL;
+ ret = pthread_cond_signal(&dst->ac_writer_cond);
+ assert(ret == 0);
+ ret = pthread_mutex_unlock(&dst->ac_writer_lock);
+ assert(ret == 0);
}
}
void
-reader_handler(fd_set *rfds)
+reader_buffer_record(struct au_cmpnt *ac, struct au_buffer *ab,
+ struct audit_record *ar)
{
- struct au_src_buffer *as, *tmp;
- struct au_cmpnt *ac;
- fd_set lrfds;
- struct timeval tv;
- int ret, ret2;
+ u_int32_t rlen = ar->ar_record_len;
+ u_char *ptr;
- lrfds = *rfds;
- bzero(&tv, sizeof(struct timeval));
- tv.tv_sec = 5;
- ret = select(FD_SETSIZE, &lrfds, NULL, NULL, &tv);
- if (ret == -1) {
- if (errno == EINTR)
- return;
- else
- exit(2);
+ if ((AU_BUFFER_SIZE - ab->ab_size) < rlen) {
+ reader_push(ac, ab);
+ ab->ab_size = 0;
}
- else if (ret == 0) {
- reader_timeout();
+ /*
+ * In the case where the record is too big for the audit
+ * buffer, the record is dropped.
+ */
+ if ((ab->ab_size == 0) && (rlen > AU_BUFFER_SIZE)) {
+ dprintf("reader_buffer_record(%s): dropping record, too"
+ " large (%u)", ac->ac_name, rlen);
+ free(ar->ar_buf);
return;
}
- TAILQ_FOREACH(ac, &ac_list_src, ac_glue) {
- if (FD_ISSET(ac->ac_fd, &lrfds)) {
- ret2 = ac->ac_read_func(ac);
- if (ret2 == -1)
+ dprintf("reader_buffer_record(%p): appending %u bytes (%u)",
+ ab, rlen, ab->ab_size);
+ ptr = ab->ab_buf + ab->ab_size;
+ bcopy(ar->ar_buf, ptr, rlen);
+ ab->ab_size += rlen;
+ free(ar->ar_buf);
+ free(ar);
+}
+
+void *
+reader_client_handler(void *arg)
+{
+ struct timeval tv;
+ struct au_client *ac;
+ fd_set rfds;
+ int ret;
+
+ ac = (struct au_client *)arg;
+ for (;;) {
+ FD_ZERO(&rfds);
+ FD_SET(ac->auc_fd, &rfds);
+ tv.tv_sec = READER_TIMEOUT;
+ tv.tv_usec = 0;
+ ret = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
+ if (ret == -1) {
+ if (errno == EINTR)
+ continue;
+ else
+ break;
+ } else if (ret == 0) {
+ dprintf("reader_client_handler(%s): client %d timeout",
+ ac->auc_parent->ac_name, ac->auc_fd);
+ reader_push(ac->auc_parent, &ac->auc_buffer);
+ ac->auc_buffer.ab_size = 0;
+ continue;
+ }
+ if (FD_ISSET(ac->auc_fd, &rfds)) {
+ ret = reader_read_socket(ac);
+ if (ret == -1)
+ break;
+ }
+ }
+ dprintf("reader_client_handler(%s): client %d terminating",
+ ac->auc_parent->ac_name, ac->auc_fd);
+ (void) close(ac->auc_fd);
+ return (NULL); /* Thread termination */
+}
+
+void *
+reader_handler(void *arg)
+{
+ struct timeval tv;
+ struct au_cmpnt *ac;
+ fd_set rfds;
+ int ret;
+
+ ac = (struct au_cmpnt *)arg;
+ for (;;) {
+ FD_ZERO(&rfds);
+ FD_SET(ac->ac_fd, &rfds);
+ tv.tv_sec = READER_TIMEOUT;
+ tv.tv_usec = 0;
+ ret = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
+ if (ret == -1) {
+ if (errno == EINTR)
+ continue;
+ else
exit(2);
+ } else if (ret == 0) {
+ if (ac->ac_type == COMPONENT_PIPE) {
+ dprintf("reader_handler(%s): timeout",
+ ac->ac_name);
+ reader_push(ac, &ac->ac_buffer);
+ ac->ac_buffer.ab_size = 0;
+ }
+ continue;
}
- if (ac->ac_type == COMPONENT_NET) {
- TAILQ_FOREACH_SAFE(as, &ac->ac_sbuffers, as_glue,
- tmp)
- if (FD_ISSET(as->as_fd, &lrfds)) {
- ret2 = reader_read_socket(as);
- }
- }
+ if (ac->ac_read_func(ac) == -1)
+ exit(2);
}
+ return (NULL);
}
void
@@ -157,13 +212,14 @@
{
struct au_cmpnt *ac;
- (void) pthread_mutex_init(&ready_lock, NULL);
- (void) pthread_cond_init(&ready_cond, NULL);
- TAILQ_FOREACH(ac, &ac_list_src, ac_glue)
+ TAILQ_FOREACH(ac, &ac_list_readers, ac_glue)
if (ac->ac_init_func(ac) == -1)
exit(2);
}
+/*
+ * XXX Add error reporting to reader_init_*.
+ */
int
reader_init_net(struct au_cmpnt *ac)
{
@@ -182,10 +238,6 @@
return (-1);
}
ac->ac_fd = fd;
- if (nonblock(fd) == -1) {
- (void) close(fd);
- return (-1);
- }
return (0);
}
@@ -198,75 +250,9 @@
if (fd == -1)
return (-1);
ac->ac_fd = fd;
- if (nonblock(fd) == -1) {
- (void) close(fd);
- return (-1);
- }
return (0);
}
-void
-reader_q_record(struct audit_record *ar, struct au_cmpnt *ac)
-{
- int i;
-
- for (i = 0; i < ac->ac_ndsts; i++)
- reader_q_record_cmpnt(ar, ac->ac_dsts[i]);
- free(ar);
-}
-
-void
-reader_q_record_cmpnt(struct audit_record *ar, struct au_cmpnt *ac)
-{
- struct au_queue_ent *new;
-
- new = malloc(sizeof(struct au_queue_ent));
- assert(new != NULL);
- bzero(new, sizeof(struct au_queue_ent));
- /*
- * In most cases we will have a 1:1 relationship between source
- * and destination components. We avoid an extra copy by reference
- * counting usage of this audit record. This may be built on to
- * avoid copying altogether.
- */
- if (ar->ar_count == 0)
- new->aq_record.ar_buf = ar->ar_buf;
- else {
- new->aq_record.ar_buf = malloc(ar->ar_record_len);
- assert(new->aq_record.ar_buf != NULL);
- bcopy(ar->ar_buf, new->aq_record.ar_buf, ar->ar_record_len);
- }
- ar->ar_count++;
- new->aq_record.ar_record_len = ar->ar_record_len;
- new->aq_remain = ar->ar_record_len;
- (void) pthread_mutex_lock(&ac->ac_q.qp_lock);
- if (ac->ac_q.qp_store_len < WRITER_MAX) {
- dprintf("queueing record for %s (%d bytes in queue)",
- ac->ac_name, ac->ac_q.qp_store_len);
- (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
- TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue);
- ac->ac_q.qp_store_len += ar->ar_record_len;
- ac->ac_q.qp_store_n++;
- return;
- }
- if (ac->ac_q.qp_hold != NULL) {
- /* This consumer is still processing it's queue, so the record
- * is dropped. */
- (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
- dprintf("dropping record for %s", ac->ac_name);
- free(new->aq_record.ar_buf);
- free(new);
- return;
- }
- dprintf("rotating queues for %s", ac->ac_name);
- ROTATE();
- WRITER_SIGNAL(ac->ac_q.qp_store_n);
- (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
- TAILQ_INSERT_TAIL(ac->ac_q.qp_store, new, aq_glue);
- ac->ac_q.qp_store_len += ar->ar_record_len;
- ac->ac_q.qp_store_n++;
-}
-
int
reader_read_pipe(struct au_cmpnt *ac)
{
@@ -276,88 +262,88 @@
ret = read(ac->ac_fd, buf, sizeof(buf));
if (ret == -1) {
- if ((errno == EINTR) || (errno == EAGAIN))
+ if (errno == EINTR)
return (0);
else
return (-1);
- }
- else if (ret == 0)
+ } else if (ret == 0)
return (-1);
- ar = malloc(sizeof(struct audit_record));
- assert(ar != NULL);
- bzero(ar, sizeof(struct audit_record));
+ ar = calloc(1, sizeof(struct audit_record));
+ if (ar == NULL)
+ exit(2);
ar->ar_buf = malloc(ret);
- assert(ar->ar_buf != NULL);
+ if (ar->ar_buf == NULL)
+ exit(2);
bcopy(buf, ar->ar_buf, ret);
ar->ar_record_len = ret;
- dprintf("reader_read_pipe: read record %d bytes", ret);
- reader_q_record(ar, ac);
+ dprintf("reader_read_pipe(%s): %d bytes", ac->ac_name, ret);
+ reader_buffer_record(ac, &ac->ac_buffer, ar);
return (0);
}
int
-reader_read_socket(struct au_src_buffer *asb)
+reader_read_socket(struct au_client *asb)
{
u_char *bufptr, *recbufptr;
int ret, left;
u_int32_t hdr_remain, val, need;
u_char as_buf[2048];
- ret = read(asb->as_fd, as_buf, sizeof(as_buf));
+ ret = read(asb->auc_fd, as_buf, sizeof(as_buf));
if (ret == -1) {
if (errno != EINTR)
return (-1);
else
return (0);
- }
- else if (ret == 0)
+ } else if (ret == 0)
return (-1);
left = ret;
bufptr = as_buf;
while (left > 0) {
- if (asb->as_record == NULL) {
- hdr_remain = sizeof(asb->as_header) -
- asb->as_nread;
+ if (asb->auc_record == NULL) {
+ hdr_remain = sizeof(asb->auc_header) -
+ asb->auc_nread;
if (left >= hdr_remain) {
- (void) memcpy(asb->as_header + asb->as_nread,
+ (void) memcpy(asb->auc_header + asb->auc_nread,
bufptr, hdr_remain);
- asb->as_nread += hdr_remain;
+ asb->auc_nread += hdr_remain;
left -= hdr_remain;
bufptr += hdr_remain;
- (void) memcpy(&val, asb->as_header + 1,
+ (void) memcpy(&val, asb->auc_header + 1,
sizeof(val));
- asb->as_record =
+ asb->auc_record =
malloc(sizeof(struct audit_record));
- assert(asb->as_record != NULL);
- asb->as_record->ar_record_len = be32toh(val);
- asb->as_record->ar_buf = \
- malloc(asb->as_record->ar_record_len);
- assert(asb->as_record->ar_buf != NULL);
- (void) memcpy(asb->as_record->ar_buf,
- asb->as_header, sizeof(asb->as_header));
+ if (asb->auc_record == NULL)
+ exit(2);
+ asb->auc_record->ar_record_len = be32toh(val);
+ asb->auc_record->ar_buf = \
+ malloc(asb->auc_record->ar_record_len);
+ if (asb->auc_record->ar_buf == NULL)
+ exit(2);
+ (void) memcpy(asb->auc_record->ar_buf,
+ asb->auc_header, sizeof(asb->auc_header));
continue;
- }
- else {
- (void) memcpy(asb->as_header + asb->as_nread,
+ } else {
+ (void) memcpy(asb->auc_header + asb->auc_nread,
bufptr, left);
- asb->as_nread += left;
+ asb->auc_nread += left;
return (0);
}
}
- need = asb->as_record->ar_record_len - asb->as_nread;
- recbufptr = asb->as_record->ar_buf + asb->as_nread;
+ need = asb->auc_record->ar_record_len - asb->auc_nread;
+ recbufptr = asb->auc_record->ar_buf + asb->auc_nread;
if (left < need) {
(void) memcpy(recbufptr, bufptr, left);
- asb->as_nread += left;
+ asb->auc_nread += left;
return (0);
- }
- else {
+ } else {
(void) memcpy(recbufptr, bufptr, need);
left -= need;
bufptr += need;
- reader_q_record(asb->as_record, asb->as_parent);
- asb->as_record = NULL;
- asb->as_nread = 0;
+ reader_buffer_record(asb->auc_parent,
+ &asb->auc_buffer, asb->auc_record);
+ asb->auc_record = NULL;
+ asb->auc_nread = 0;
}
}
return (0);
@@ -366,25 +352,20 @@
void
reader_start()
{
- reader_init();
- reader_build_rfds(&rfds);
- for (;;)
- reader_handler(&rfds);
-}
-
-void
-reader_timeout()
-{
struct au_cmpnt *ac;
+ pthread_t p;
+ int ret;
- TAILQ_FOREACH(ac, &ac_list_dst, ac_glue) {
- (void) pthread_mutex_lock(&ac->ac_q.qp_lock);
- if ((ac->ac_q.qp_hold != NULL) || (ac->ac_q.qp_store_n == 0)) {
- (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
- continue;
- }
- ROTATE();
- WRITER_SIGNAL(ac->ac_q.qp_store_n);
- (void) pthread_mutex_unlock(&ac->ac_q.qp_lock);
+ reader_init();
+ TAILQ_FOREACH(ac, &ac_list_readers, ac_glue) {
+ ret = pthread_create(&p, NULL, reader_handler, ac);
+ assert(ret == 0);
}
+ writer_start();
+ /*
+ * Reader threads will always exist, so we suspend execution of
+ * the main thread using the last reader thread which was created.
+ */
+ (void) pthread_join(p, NULL);
+ exit(0);
}
==== //depot/projects/trustedbsd/netauditd/reader.h#3 (text+ko) ====
@@ -24,19 +24,17 @@
* SUCH DAMAGE.
>>> TRUNCATED FOR MAIL (1000 lines) <<<
More information about the p4-projects
mailing list