socsvn commit: r256163 - in soc2013/ambarisha/head/usr.bin: dmget dms

ambarisha at FreeBSD.org ambarisha at FreeBSD.org
Mon Aug 19 20:31:58 UTC 2013


Author: ambarisha
Date: Mon Aug 19 20:31:57 2013
New Revision: 256163
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256163

Log:
  DMS is now threaded. Each worker is a new thread.
  

Modified:
  soc2013/ambarisha/head/usr.bin/dmget/dmget.c
  soc2013/ambarisha/head/usr.bin/dmget/dmget.h
  soc2013/ambarisha/head/usr.bin/dmget/fetch.c
  soc2013/ambarisha/head/usr.bin/dmget/utils.c
  soc2013/ambarisha/head/usr.bin/dms/Makefile
  soc2013/ambarisha/head/usr.bin/dms/dms.c
  soc2013/ambarisha/head/usr.bin/dms/dms.h
  soc2013/ambarisha/head/usr.bin/dms/utils.c
  soc2013/ambarisha/head/usr.bin/dms/worker.c

Modified: soc2013/ambarisha/head/usr.bin/dmget/dmget.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/dmget.c	Mon Aug 19 18:57:58 2013	(r256162)
+++ soc2013/ambarisha/head/usr.bin/dmget/dmget.c	Mon Aug 19 20:31:57 2013	(r256163)
@@ -13,7 +13,7 @@
 #include "dm.h"
 #include "dmget.h"
 
-auth_t		 dmAuthMethod;
+dm_auth_t	 dmAuthMethod;
 stat_display_t	 dmStatDisplayMethod;
 int		 dmTimeout;
 int		 dmRestartCalls;
@@ -37,7 +37,7 @@
 	return ptr;
 }
 
-void dm_sighandler(int signal)
+void dmSigHandler(int signal)
 {
 	switch(signal) {
 	case SIGINT:
@@ -242,12 +242,16 @@
 send_request(int sock, struct dmreq dmreq)
 {
 	char *reqbuf;
-	int bufsize, err;
+	int bufsize, err, fd;
 
 	bufsize = mk_reqbuf(dmreq, &reqbuf, DMREQ);
 	err = sigsafe_write(sock, reqbuf, bufsize);
 
-	int fd = open(dmreq.path, O_CREAT|O_RDWR|O_TRUNC, S_IRUSR|S_IWUSR);
+	if (dmreq.flags & O_STDOUT)
+		fd = STDOUT_FILENO;
+	else
+		fd = open(dmreq.path, O_CREAT|O_RDWR|O_TRUNC, S_IRUSR|S_IWUSR);
+
 	Write_fd(sock, fd);
 	close(fd);	
 
@@ -255,56 +259,62 @@
 	return(err);
 }
 
-static void
-free_msg(struct dmmsg **msg)
+struct dmauth *
+mk_dmauth(char *buf, int bufsize)
 {
-	free((*msg)->buf);
-	free(*msg);
-	*msg = NULL;
-}
+	int i = 0, len;
+	struct dmauth *dmauth = (struct dmauth *) Malloc(sizeof(struct dmauth));
 
-static struct dmmsg *
-recv_msg(int sock)
-{
-	int err;
-	fd_set fds;
-	sigset_t sm;
-	struct dmmsg *msg;
+	len = strlen(buf + i);	
+	dmauth->scheme = (char *) Malloc(len + 1);
+	strncpy(dmauth->scheme, buf + i, len);
+	i += len + 1;
 
-	msg = (struct dmmsg *) Malloc(sizeof(struct dmmsg));
+	len = strlen(buf + i);
+	dmauth->host = (char *) Malloc(len + 1);
+	strncpy(dmauth->host, buf + i, len);
+	i += len + 1;
 
-	FD_ZERO(&fds);
-	FD_SET(sock, &fds);
+	dmauth->port = *(int *)(buf + i);
+	i += sizeof(int);
 	
-	err = Select(sock + 1, &fds, NULL, NULL, NULL);
-	if (err == -1)
-		return(-1);
+	return dmauth;
+}
 
-	sigemptyset(&sm);
-	sigaddset(&sm, SIGINT);
-	sigaddset(&sm, SIGINFO);
+void
+rm_dmauth(struct dmauth **dmauth)
+{
+	free((*dmauth)->scheme);
+	free((*dmauth)->host);
+	free(*dmauth);
+	*dmauth = NULL;
+}
 
-	sigprocmask(SIG_BLOCK, &sm, NULL);
-	err = Peel(sock, msg);
-	sigprocmask(SIG_UNBLOCK, &sm, NULL);
+static int
+send_dmauth(int sock, struct dmauth *dmauth)
+{
+	int ulen = strlen(dmauth->user) + 1;
+	int bufsize = ulen + strlen(dmauth->pwd) + 1;
+	char *buf = (char *) Malloc(bufsize);
 
-	if (err == -1) {
-		/* Set dmg_err* */
-		free_msg(&msg);
-		return NULL;
-	}
-	
-	return msg;
+	strcpy(buf, dmauth->user);
+	strcpy(buf + ulen, dmauth->user);
+
+	struct dmmsg msg;
+	msg.op = DMAUTHRESP;
+	msg.buf = buf;
+	msg.len = bufsize;
+	send_msg(sock, msg);
 }
 
 int
 dmget(struct dmreq dmreq)
 {
-	int sock, err, ret, force;
+	int sock, err, ret, force, i;
 	struct sockaddr_un dms_addr;
 	struct dmres *dmres;
 	struct xferstat xs;
-
+	struct dmauth *dmauth;
 	sock = Socket(AF_UNIX, SOCK_STREAM, 0);
 
 	dms_addr.sun_family = AF_UNIX;
@@ -347,6 +357,13 @@
 			dmStatDisplayMethod(&xs, force);
 			break;
 		case DMAUTHREQ:
+			dmauth = mk_dmauth(msg->buf, msg->len);
+			if (dmAuthMethod(dmauth) == -1) {
+				
+			}
+			send_dmauth(sock, dmauth);
+			rm_dmauth(&dmauth);
+			break;
 		default:
 			break;
 		}

Modified: soc2013/ambarisha/head/usr.bin/dmget/dmget.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/dmget.h	Mon Aug 19 18:57:58 2013	(r256162)
+++ soc2013/ambarisha/head/usr.bin/dmget/dmget.h	Mon Aug 19 20:31:57 2013	(r256163)
@@ -5,20 +5,31 @@
 #include <sys/time.h>
 
 #include <stdio.h>
-#include <fetch.h>
 
 #include "dm.h"
 
+#define AUTH_USERLEN 256
+#define AUTH_PWDLEN 256
+
+
+struct dmauth {
+	int	 port;
+	char 	*scheme;
+	char	*host;
+	char	 user[AUTH_USERLEN+1];
+	char	 pwd[AUTH_PWDLEN+1]
+};
+
 extern int		 dmLastErrCode;
 extern int		 dmRestartCalls;
 extern char 		 dmLastErrString[];
 
-typedef int (*auth_t)(struct url *);
-extern auth_t		 dmAuthMethod;
+typedef int (*dm_auth_t)(struct dmauth *);
+extern dm_auth_t		 dmAuthMethod;
 typedef void (*stat_display_t) (struct xferstat *, int);
 extern stat_display_t	 dmStatDisplayMethod;
 
-int dm_request(struct dmreq);
-void dm_sighandler(int sig);
+int dmget(struct dmreq);
+void dmSigHandler(int sig);
 
 #endif /* _DMCLIENT_H */

Modified: soc2013/ambarisha/head/usr.bin/dmget/fetch.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/fetch.c	Mon Aug 19 18:57:58 2013	(r256162)
+++ soc2013/ambarisha/head/usr.bin/dmget/fetch.c	Mon Aug 19 20:31:57 2013	(r256163)
@@ -207,21 +207,21 @@
  * Ask the user for authentication details
  */
 static int
-query_auth(struct url *URL)
+query_auth(struct dmauth *auth)
 {
 	struct termios tios;
 	tcflag_t saved_flags;
 	int i, nopwd;
 
 	fprintf(stderr, "Authentication required for <%s://%s:%d/>!\n",
-	    URL->scheme, URL->host, URL->port);
+	    auth->scheme, auth->host, auth->port);
 
 	fprintf(stderr, "Login: ");
-	if (fgets(URL->user, sizeof URL->user, stdin) == NULL)
+	if (fgets(auth->user, sizeof auth->user, stdin) == NULL)
 		return (-1);
-	for (i = strlen(URL->user); i >= 0; --i)
-		if (URL->user[i] == '\r' || URL->user[i] == '\n')
-			URL->user[i] = '\0';
+	for (i = strlen(auth->user); i >= 0; --i)
+		if (auth->user[i] == '\r' || auth->user[i] == '\n')
+			auth->user[i] = '\0';
 
 	fprintf(stderr, "Password: ");
 	if (tcgetattr(STDIN_FILENO, &tios) == 0) {
@@ -229,17 +229,17 @@
 		tios.c_lflag &= ~ECHO;
 		tios.c_lflag |= ECHONL|ICANON;
 		tcsetattr(STDIN_FILENO, TCSAFLUSH|TCSASOFT, &tios);
-		nopwd = (fgets(URL->pwd, sizeof URL->pwd, stdin) == NULL);
+		nopwd = (fgets(auth->pwd, sizeof auth->pwd, stdin) == NULL);
 		tios.c_lflag = saved_flags;
 		tcsetattr(STDIN_FILENO, TCSANOW|TCSASOFT, &tios);
 	} else {
-		nopwd = (fgets(URL->pwd, sizeof URL->pwd, stdin) == NULL);
+		nopwd = (fgets(auth->pwd, sizeof auth->pwd, stdin) == NULL);
 	}
 	if (nopwd)
 		return (-1);
-	for (i = strlen(URL->pwd); i >= 0; --i)
-		if (URL->pwd[i] == '\r' || URL->pwd[i] == '\n')
-			URL->pwd[i] = '\0';
+	for (i = strlen(auth->pwd); i >= 0; --i)
+		if (auth->pwd[i] == '\r' || auth->pwd[i] == '\n')
+			auth->pwd[i] = '\0';
 
 	return (0);
 }
@@ -473,7 +473,7 @@
 
 	/* signal handling */
 	sa.sa_flags = 0;
-	sa.sa_handler = dm_sighandler;
+	sa.sa_handler = dmSigHandler;
 	sigemptyset(&sa.sa_mask);
 	sigaction(SIGALRM, &sa, NULL);
 	sa.sa_flags = SA_RESETHAND;

Modified: soc2013/ambarisha/head/usr.bin/dmget/utils.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/utils.c	Mon Aug 19 18:57:58 2013	(r256162)
+++ soc2013/ambarisha/head/usr.bin/dmget/utils.c	Mon Aug 19 20:31:57 2013	(r256163)
@@ -122,25 +122,24 @@
 	int err;
 	err = Read(sock, &bufsize, sizeof(bufsize));
 	if (err == 0)
-		return (-1);
+		return (err);
 	bufsize -= sizeof(bufsize);
 	
 	err = Read(sock, &(msg->op), sizeof(msg->op));
 	if (err == 0)
-		return (-1);
+		return (err);
 	bufsize -= sizeof(msg->op);
 
 	msg->buf = (char *) Malloc(bufsize);
 	msg->len = bufsize;
 
-	Read(sock, msg->buf, bufsize);
+	err = Read(sock, msg->buf, bufsize);
 	if (err == 0) {
 		free(msg->buf);
 		msg->len = 0;
-		return (-1);
 	}
 
-	return 1;
+	return bufsize;
 }
 
 int
@@ -181,3 +180,45 @@
 	return (nbytes);
 }
 
+struct dmmsg *
+recv_msg(int sock)
+{
+	int bufsize = 0;
+	int err;
+	struct dmmsg *msg;
+	err = Read(sock, &bufsize, sizeof(bufsize));
+	if (err == 0) {
+		/* set dms_error */
+		return (NULL);
+	}
+
+	bufsize -= sizeof(bufsize);
+
+	err = Read(sock, &(msg->op), sizeof(msg->op));
+	if (err == 0) {
+		/* set dms_error */
+		return (NULL);
+	}
+	bufsize -= sizeof(msg->op);
+
+	msg->buf = (char *) Malloc(bufsize);
+	msg->len = bufsize;
+
+	err = Read(sock, msg->buf, bufsize);
+	if (err == 0) {
+		free(msg->buf);
+		msg->len = 0;
+		/* set dms_error */
+		return (NULL);
+	}
+
+	return msg;
+}
+
+void
+free_msg(struct dmmsg **msg)
+{
+	free((*msg)->buf);
+	free(*msg);
+	*msg = NULL;
+}

Modified: soc2013/ambarisha/head/usr.bin/dms/Makefile
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/Makefile	Mon Aug 19 18:57:58 2013	(r256162)
+++ soc2013/ambarisha/head/usr.bin/dms/Makefile	Mon Aug 19 20:31:57 2013	(r256163)
@@ -1,12 +1,12 @@
 # $FreeBSD$
 .include <bsd.own.mk>
 
-SRCS=		utils.c list.c dms.c worker.c
+SRCS=		utils.c dms.c worker.c
 PROG=		dms
 CSTD?=		c99
 .if ${MK_OPENSSL} != "no"
 DPADD=		${LIBFETCH} ${LIBSSL} ${LIBCRYPTO}
-LDADD=		-lfetch -lssl -lcrypto
+LDADD=		-lfetch -lssl -lcrypto -lpthread
 .else
 DPADD=		${LIBFETCH}
 LDADD=		-lfetch

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c	Mon Aug 19 18:57:58 2013	(r256162)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c	Mon Aug 19 20:31:57 2013	(r256163)
@@ -9,12 +9,49 @@
 #include <errno.h>
 #include <err.h>
 #include <fetch.h>
+#include <pthread.h>
 
 #include "dm.h"
-#include "list.h"
 #include "dms.h"
 
-int	 stop;
+int	 	 stop;
+struct dmjob	*jobs;
+
+void *run_worker(struct dmjob *job);
+
+static struct dmjob *
+add_job(struct dmjob *head, struct dmjob *new)
+{ 
+	new->prev = NULL;
+	new->next = NULL;
+
+	if (head == NULL)
+		return new;
+
+	head->prev = new;
+	new->next = head;	
+}
+
+static struct dmjob *
+rm_job(struct dmjob *head, struct dmjob *job)
+{
+	if (head == NULL)
+		return NULL;
+	
+	if (job == NULL)
+		return head;
+		
+	if (job->next != NULL) 
+		job->next->prev = job->prev;
+
+	if (job->prev != NULL)
+		job->prev->next = job->next;
+	
+	if (job == head) 
+		return job->next;
+
+	return head;
+}
 
 static int
 read_fd(int sock)
@@ -61,80 +98,35 @@
 	return newfd;
 }
 
-static int
-Read_fd(int sock)
-{
-	int ret = read_fd(sock);
-	if (ret == -1) {
-		perror("Read_fd():");
-	} else {
-		printf("Read_fd(): Success\n");
-	}
-	return(ret);
-}
-
 static struct dmjob *
-mk_dmjob(int sock, struct dmreq dmreq)
+mk_dmjob(struct dmreq *dmreq, int client)
 {
 	struct dmjob *dmjob = (struct dmjob *) Malloc(sizeof(struct dmjob));
-
-	/* Right now dmjob and dmreq are same */
-	dmjob->v_level = dmreq.v_level;
-	dmjob->family = dmreq.family;
-	dmjob->ftp_timeout = dmreq.ftp_timeout;
-	dmjob->http_timeout = dmreq.http_timeout;
-	dmjob->B_size = dmreq.B_size;
-
-	dmjob->S_size = dmreq.S_size;
-	dmjob->T_secs = dmreq.T_secs;
-	dmjob->flags = dmreq.flags;
-
-	dmjob->i_filename = (char *) Malloc(strlen(dmreq.i_filename) + 1);
-	strcpy(dmjob->i_filename, dmreq.i_filename);
-
-	dmjob->URL = (char *) Malloc(strlen(dmreq.URL) + 1);
-	strcpy(dmjob->URL, dmreq.URL);
-
-	dmjob->path = (char *) Malloc(strlen(dmreq.path) + 1);
-	strcpy(dmjob->path, dmreq.path);
-
-	dmjob->fd = Read_fd(sock);
-	dmjob->csock = sock;
-
+	dmjob->request = dmreq;
+	dmjob->ofd = read_fd(client);
+	if (dmjob->ofd == -1) {
+		/* Handle error */
+		free(dmjob);
+		return NULL;
+	}
+	dmjob->client = client;
+	dmjob->sigint = 0;
+	dmjob->sigalrm = 0;
+	dmjob->siginfo = 0;
+	dmjob->siginfo_en = 0;
+	dmjob->state = RUNNING;
+	dmjob->url = NULL;
 	return dmjob;
 }
 
-static struct dmjob *
-Mk_dmjob(int sock, struct dmreq dmreq)
-{
-	struct dmjob *dmjob = mk_dmjob(sock, dmreq);
-	if (dmjob == NULL) {
-		perror("mk_dmjob():");
-#if DEBUG
-	} else {
-		printf("mk_dmjob(): Success\n");
-#endif
-	}
-	return dmjob;
-}
-
-static void
-rm_dmjob(struct dmjob **dmjob)
-{
-	free((*dmjob)->i_filename);
-	free((*dmjob)->path);
-	free((*dmjob)->URL);
-	free(*dmjob);
-	*dmjob = NULL;
-}
-
-static int
-parse_request(char *rcvbuf, int bufsize)
+static struct dmreq *
+mk_dmreq(char *rcvbuf, int bufsize)
 {
 	int i = 0;
 
 	struct dmreq *dmreq = (struct dmreq *) Malloc(sizeof(struct dmreq));
-
+	if (dmreq == NULL) 
+		return NULL;
 	memcpy(&(dmreq->v_level), rcvbuf + i, sizeof(dmreq->v_level));
 	i += sizeof(dmreq->v_level);
 
@@ -157,7 +149,6 @@
 	i += sizeof(dmreq->T_secs);
 	
 	memcpy(&(dmreq->flags), rcvbuf + i, sizeof(dmreq->flags));
-
 	i += sizeof(dmreq->flags);
 
 	int sz = strlen(rcvbuf+i);
@@ -178,22 +169,8 @@
 	return dmreq;
 }
 
-static int
-Parse_request(char *rcvbuf, int bufsize)
-{
-	struct dmreq *dmreq = parse_request(rcvbuf, bufsize);
-	if (dmreq == NULL) {
-		perror("Parse_request():");
-#if DEBUG
-	} else {
-		printf("Parse_reqeust(): Success\n");
-#endif
-	}
-	return dmreq;
-}
-
 static void
-Free_request(struct dmreq **dmreq)
+rm_dmreq(struct dmreq **dmreq)
 {
 	free((*dmreq)->i_filename);
 	free((*dmreq)->URL);
@@ -202,61 +179,40 @@
 	*dmreq = NULL;
 }
 
-static void
-send_report(int sock, struct dmrep report, char op)
-{
-	char *buf;
-	int bufsize = sizeof(report) - sizeof(report.errstr);
-	int errlen = strlen(report.errstr);
-	bufsize +=  errlen;	
-
-	buf = (char *) Malloc(bufsize);
-	int i = 0;
-	
-	memcpy(buf + i, &(report.status), sizeof(report.status));
-	i += sizeof(report.status);
-
-	memcpy(buf + i, &(report.errcode), sizeof(report.errcode));
-	i += sizeof(report.errcode);
-
-	strcpy(buf + i, report.errstr);
-	i += errlen;
-	
-	struct dmmsg msg;
-	msg.op = op;
-	msg.buf = buf;
-	msg.len = bufsize;
-	send_msg(sock, msg);
-	
-	free(buf);
-}
-
 static int
-handle_request(int csock, struct conn **conns)
+handle_request(int csock)
 {
-	struct dmjob *dmjob;
-	struct dmreq *dmreq;
-	struct dmmsg msg;
-	struct dmrep report;
-	int err;
+	struct dmreq 	*dmreq;
+	struct dmmsg 	*msg;
+	struct dmjob	*dmjob;
+	int ret;
 	pid_t pid;
 
-	Peel(csock, &msg);
+	msg = recv_msg(csock);
+	if (msg == NULL) {
+		/* set dms_error */
+		return -1;
+	}
 	
-	switch (msg.op) {
+	switch (msg->op) {
 	case DMREQ:
- 		dmreq = Parse_request(msg.buf, msg.len);
-		dmjob = Mk_dmjob(csock, *dmreq);
-		Free_request(&dmreq);
-		do_job(*dmjob, &report);
-		send_report(csock, report, DMRESP);
-		default:
-			/* Unknown opcode recieved */
-			return -1;
+ 		dmreq = mk_dmreq(msg->buf, msg->len);
+		dmjob = mk_dmjob(dmreq, csock);
+		jobs = add_job(jobs, dmjob);
+		pthread_create(&(dmjob->worker), NULL, run_worker, dmjob);
+		break;
+	default:
+		goto error;
 		break;
 	}
-	
-	return 1;
+success:
+	ret = 0;
+	goto done;
+error:
+	ret = -1;	
+done:
+	free_msg(msg);
+	return ret;
 }
 
 void
@@ -267,93 +223,38 @@
 }
 
 static int
-handle_client_msg(struct conn *conn)
-{
-	struct dmmsg msg;
-	int ret = Peel(conn->client, &msg);
-	if (ret == 0)
-		 return(1);
-	
-	switch(msg.op) {
-	case DMSIG:
-		send_msg(conn->worker, msg);
-		break;
-	case DMAUTHRESP:
-		/* TODO: Implement these */
-		break;
-	default:
-		/* Unrecognized opcode */
-		break;
-	}
-	return(0);
-}
-
-static int
-handle_worker_msg(struct conn *conn)
-{
-	struct dmmsg msg;
-
-	int ret = Peel(conn->worker, &msg);
-	if (ret == 0) /* Worker closed the socket !! */
-		return(1);
-	
-	switch(msg.op) {
-	case DMRESP:
-		send_msg(conn->client, msg);
-		ret = 1;
-		break;
-	case DMAUTHREQ:	
-		/* TODO: Implement these */
-		break;
-	default:
-		/* Unrecoginized opcode */
-		break;	
-	}
-	return (0);
-}
-
-static int
-service_conn(struct conn *conn, fd_set *fdset)
+service_job(struct dmjob *job, fd_set *fdset)
 {
 	int ret = 0;
-	if (FD_ISSET(conn->client, fdset)) {
-		ret = handle_client_msg(conn);
-	}
-	
-	if (FD_ISSET(conn->worker, fdset)) {
-		ret |= handle_worker_msg(conn);
-		/* TODO: Do this better */
-	}
+	if (FD_ISSET(job->client, fdset))
+		pthread_kill(job->worker, SIGUSR1);
 
+	if (job->state == DONE)
+		ret = 1;
 	return (ret);
 }
 
 static void
 run_event_loop(int socket)
 {
-	int i, maxfd = socket;
-
-	struct conn *conns = NULL, *cur;
-
+	int i, ret, maxfd = socket;
+	struct dmjob *cur;
+	void *retptr;
+	jobs = NULL;
 	fd_set fdset;
 
 	signal(SIGINT, sigint_handler);
-
 	while (!stop) {
 
 		/* Prepare fdset and make select call */
 		FD_ZERO(&fdset);
 		FD_SET(socket, &fdset);
 
-		cur = conns;
+		cur = jobs;
 		while (cur != NULL) {
 			FD_SET(cur->client, &fdset);
-			FD_SET(cur->worker, &fdset);
-
 			if (cur->client > maxfd)
 				maxfd = cur->client;
-			if (cur->worker > maxfd)
-				maxfd = cur->worker;
 			cur = cur->next;
 		}
 		
@@ -364,30 +265,29 @@
 			size_t cliaddrlen = sizeof(cliaddr);
 			int csock = Accept(socket, (struct sockaddr *) &cliaddr,
 					&cliaddrlen);
-			handle_request(csock, &conns);
+			handle_request(csock);
 		}
 		
-		cur = conns;
+		cur = jobs;
 		while (cur != NULL) {
-			int ret = service_conn(cur, &fdset);
+			ret = service_job(cur, &fdset);
 			if (ret == 1) {
 				close(cur->client);
-				close(cur->worker);
-				/* What should happen to the worker */
-				conns = rm_conn(conns, cur);
+				pthread_join(cur->worker, &retptr);
+				jobs = rm_job(jobs, cur);
 			}
 			cur = cur->next;
 		}
 			
 	}
 
-	cur = conns;
+	cur = jobs;
 	while (cur != NULL) {	
 		close(cur->client);
-		close(cur->worker);
+		ret = service_job(cur, &fdset);
 		/* TODO: Force the worker to quit as well */
-		conns = rm_conn(conns, cur);
-		cur = conns;
+		jobs = rm_job(jobs, cur);
+		cur = jobs;
 	}
 }
 

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.h	Mon Aug 19 18:57:58 2013	(r256162)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.h	Mon Aug 19 20:31:57 2013	(r256163)
@@ -1,34 +1,24 @@
+#ifndef _DMS_H
+#define	_DMS_H
+
 #include <sys/types.h>
 
+typedef enum {RUNNING=0, DONE=1} state_t;
+
 struct dmjob {
-	int	 v_level;
-	int	 family;
-	long	 ftp_timeout;
-	long	 http_timeout;
-	off_t	 B_size;
-	off_t	 S_size;
-	long	 T_secs;
-	long	 flags;
-	int	 fd;
-	int	 csock;
-
-#define		A_FLAG		(1 << 0)
-#define		F_FLAG		(1 << 1)
-#define		O_STDOUT	(1 << 2)
-#define		R_FLAG		(1 << 3)
-#define		U_FLAG		(1 << 4)
-#define		d_FLAG		(1 << 5)
-#define		i_FLAG		(1 << 6)
-#define		l_FLAG		(1 << 7)
-#define		m_FLAG		(1 << 8)
-#define		n_FLAG		(1 << 9)
-#define		p_FLAG		(1 << 10)
-#define		r_FLAG		(1 << 11)
-#define		s_FLAG		(1 << 12)
-
-	char	*i_filename;
-	char	*URL;
-	char	*path;
+	int		 ofd;
+	int	 	 client;
+	state_t	 	 state;
+	int		 sigint;
+	int	 	 sigalrm;
+	int	 	 siginfo;
+	int	 	 siginfo_en;
+	struct dmreq 	*request;
+	struct url	*url;
+	pthread_t	*worker;
+
+	struct dmjob 	*next;
+	struct dmjob	*prev;
 };
 
 struct dmrep {
@@ -41,3 +31,5 @@
 
 #define MAX_LISTEN_QUEUE	5
 #define MINBUFSIZE		4096
+
+#endif

Modified: soc2013/ambarisha/head/usr.bin/dms/utils.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/utils.c	Mon Aug 19 18:57:58 2013	(r256162)
+++ soc2013/ambarisha/head/usr.bin/dms/utils.c	Mon Aug 19 20:31:57 2013	(r256163)
@@ -133,16 +133,12 @@
 	msg->buf = (char *) Malloc(bufsize);
 	msg->len = bufsize;
 
-	Read(sock, msg->buf, bufsize);
+	err = Read(sock, msg->buf, bufsize);
 	if (err == 0) {
 		free(msg->buf);
 		msg->len = 0;
 	}
 
-#if DEBUG
-	printf("{ msg->op = %d; msg->len = %d }\n", msg->op, msg->len);
-#endif
-
 	return bufsize;
 }
 
@@ -184,3 +180,58 @@
 	return (nbytes);
 }
 
+struct dmmsg *
+recv_msg(int sock)
+{
+	printf("in recv_msg\n");
+	int bufsize = 0;
+	int err;
+	struct dmmsg *msg;
+	err = Read(sock, &bufsize, sizeof(bufsize));
+	if (err == 0) {
+		/* set dms_error */
+#if DEBUG
+		fprintf(stderr, "recv_msg: remote end closed connection\n");
+#endif
+		return (NULL);
+	}
+
+	printf("bufsize = %d\n", bufsize);
+
+	bufsize -= sizeof(bufsize);
+
+	printf("sock = %d\n", sock);
+	err = Read(sock, &(msg->op), sizeof(msg->op));
+	if (err == 0) {
+		/* set dms_error */
+#if DEBUG
+		fprintf(stderr, "recv_msg: remote end closed connection\n");
+#endif
+		return (NULL);
+	}
+	bufsize -= sizeof(msg->op);
+
+	msg->buf = (char *) Malloc(bufsize);
+	msg->len = bufsize;
+
+	err = Read(sock, msg->buf, bufsize);
+	if (err == 0) {
+		free(msg->buf);
+		msg->len = 0;
+		/* set dms_error */
+#if DEBUG
+		fprintf(stderr, "recv_msg: remote end closed connection\n");
+#endif
+		return (NULL);
+	}
+
+	return msg;
+}
+
+void
+free_msg(struct dmmsg **msg)
+{
+	free((*msg)->buf);
+	free(*msg);
+	*msg = NULL;
+}

Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c	Mon Aug 19 18:57:58 2013	(r256162)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c	Mon Aug 19 20:31:57 2013	(r256163)
@@ -10,10 +10,48 @@
 #include "dms.h"
 #include "dm.h"
 
-static int	sigalrm;	/* SIGALRM received by client */
-static int 	siginfo;	/* SIGINFO received by client */
-static int 	sigint;		/* SIGINT received by client */
-static int	handle_siginfo;	/* Yes or No */
+extern struct dmjob 	*jobs;
+
+static int
+authenticate(struct url *url)
+{
+	struct dmmsg msg;
+	struct dmjob *cur = jobs;
+	while (cur != NULL) {
+		if (cur->url == url)
+			break;
+		cur = cur->next;
+	}
+
+	if (cur == NULL)
+		return -1; // Todo: Verify this
+
+	int bufsize = 0, i = 0, schlen, hlen;
+	schlen = strlen(url->scheme) + 1;
+	hlen = strlen(url->host) + 1;
+	bufsize += schlen + hlen + sizeof(url->port);
+
+	msg.buf = (char *) Malloc(bufsize);
+
+	strcpy(msg.buf, url->scheme);
+	i += schlen;
+
+	strcpy(msg.buf + i, url->host);
+	i += hlen;
+
+	*(int *) (msg.buf + i) = url->port;
+
+	msg.op = DMAUTHREQ;
+	msg.len = bufsize;
+	send_msg(cur->client, msg);
+
+	struct dmmsg *rcvmsg;
+	rcvmsg = recv_msg(cur->client);
+
+	strncpy(url->user, rcvmsg->buf, sizeof(url->user));
+	strncpy(url->pwd, rcvmsg->buf + strlen(rcvmsg->buf) + 1, sizeof(url->pwd));
+	free_msg(&rcvmsg);
+}
 
 static void
 stat_send(int csock, struct xferstat *xs, int force)
@@ -68,7 +106,7 @@
 
 static void
 stat_start(struct xferstat *xs, const char *name, off_t size,
-	off_t offset, struct dmjob dmjob)
+	off_t offset, struct dmjob *dmjob)
 {
 	snprintf(xs->name, sizeof xs->name, "%s", name);
 	gettimeofday(&xs->start, NULL);
@@ -77,44 +115,36 @@
 	xs->offset = offset;
 	xs->rcvd = offset;
 	xs->lastrcvd = offset;
-	if ((dmjob.flags & V_TTY) && dmjob.v_level > 0)
-		stat_send(dmjob.csock, xs, 1);
-	else if (dmjob.v_level > 0)
+	if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0)
+		stat_send(dmjob->client, xs, 1);
+	else if (dmjob->request->v_level > 0)
 		fprintf(stderr, "%-46s", xs->name);
 }
 
 static void
-stat_end(struct xferstat *xs, struct dmjob dmjob)
+stat_end(struct xferstat *xs, struct dmjob *dmjob)
 {
 	gettimeofday(&xs->last, NULL);
-	if ((dmjob.flags & V_TTY) && dmjob.v_level > 0) {
-		stat_send(dmjob.csock, xs, 2);
+	if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) {
+		stat_send(dmjob->client, xs, 2);
 		putc('\n', stderr);
-	} else if (dmjob.v_level > 0) {
+	} else if (dmjob->request->v_level > 0) {
 		fprintf(stderr, "        %s %s\n",
 		    stat_bytes(xs->size), stat_bps(xs));
 	}
 }
 
 static void
-stat_update(struct xferstat *xs, off_t rcvd, struct dmjob dmjob)

*** DIFF OUTPUT TRUNCATED AT 1000 LINES ***


More information about the svn-soc-all mailing list