socsvn commit: r256168 - soc2013/ambarisha/head/usr.bin/dms

ambarisha at FreeBSD.org ambarisha at FreeBSD.org
Mon Aug 19 20:46:33 UTC 2013


Author: ambarisha
Date: Mon Aug 19 20:46:32 2013
New Revision: 256168
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256168

Log:
  Thread-proofing the globals.
  The global job queue is now protected by a mutex lock
  

Modified:
  soc2013/ambarisha/head/usr.bin/dms/dms.c
  soc2013/ambarisha/head/usr.bin/dms/worker.c

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c	Mon Aug 19 20:45:05 2013	(r256167)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c	Mon Aug 19 20:46:32 2013	(r256168)
@@ -17,8 +17,9 @@
 static int	dm_err;
 static char	dm_errstr[512];
 
-int	 	 stop;
-struct dmjob	*jobs;
+int	 	 	 stop;
+struct dmjob		*jobs;
+pthread_mutex_t	 job_queue_mutex;
 
 void *run_worker(struct dmjob *job);
 
@@ -261,7 +262,25 @@
 		if ((dmjob = mk_dmjob(dmreq, csock)) == NULL)
 			goto error;
 
+		/* Acquire job queue lock */
+		ret = pthread_mutex_lock(&job_queue_mutex);
+		if (ret == -1) {
+			fprintf(stderr, "handle_request: Attempt to acquire"
+					" job queue mutex failed\n");
+			goto error;
+		}
+
 		jobs = add_job(jobs, dmjob);
+
+		ret = pthread_mutex_unlock(&job_queue_mutex);
+		if (ret == -1) {
+			fprintf(stderr, "handle_request: Couldn't release "
+					"job queue lock\n");
+
+			goto error;
+		}
+		/* Job queue lock released */
+
 		pthread_create(&(dmjob->worker), NULL, run_worker, dmjob);
 		pthread_detach(dmjob->worker);
 		goto done;
@@ -310,6 +329,7 @@
 	struct dmjob *cur;
 	void *retptr;
 	jobs = NULL;
+	job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
 	fd_set fdset;
 
 	signal(SIGINT, sigint_handler);
@@ -320,6 +340,14 @@
 		maxfd = socket;
 		FD_SET(socket, &fdset);
 
+		/* Acquire job queue lock */
+		ret = pthread_mutex_lock(&job_queue_mutex);
+		if (ret == -1) {
+			fprintf(stderr, "handle_request: Attempt to acquire"
+					" job queue mutex failed\n");
+			return -1;
+		}
+
 		cur = jobs;
 		while (cur != NULL) {
 			FD_SET(cur->client, &fdset);
@@ -328,6 +356,15 @@
 			cur = cur->next;
 		}
 
+		ret = pthread_mutex_unlock(&job_queue_mutex);
+		if (ret == -1) {
+			fprintf(stderr, "handle_request: Couldn't release "
+					"job queue lock\n");
+
+			return -1;
+		}
+		/* Job queue lock released */
+
 		ret = select(maxfd + 1, &fdset, NULL, NULL, NULL);
 		if (ret ==  -1) {
 			fprintf(stderr, "run_event_loop: "
@@ -335,6 +372,14 @@
 			goto wrap_up;
 		}
 
+		/* Acquire job queue lock */
+		ret = pthread_mutex_lock(&job_queue_mutex);
+		if (ret == -1) {
+			fprintf(stderr, "handle_request: Attempt to acquire"
+					" job queue mutex failed\n");
+			return -1;
+		}
+
 		cur = jobs;
 		while (cur != NULL) {
 			state = service_job(cur, &fdset);
@@ -346,6 +391,15 @@
 			cur = cur->next;
 		}
 
+		ret = pthread_mutex_unlock(&job_queue_mutex);
+		if (ret == -1) {
+			fprintf(stderr, "handle_request: Couldn't release "
+					"job queue lock\n");
+
+			return -1;
+		}
+		/* Job queue lock released */
+
 		if (FD_ISSET(socket, &fdset)) {
 			struct sockaddr_un cliaddr;
 			size_t cliaddrlen = sizeof(cliaddr);
@@ -362,6 +416,14 @@
 
 wrap_up:
 	/* Notify all running workers that we've to wrap up */
+	/* Acquire job queue lock */
+	ret = pthread_mutex_lock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Attempt to acquire"
+				" job queue mutex failed\n");
+		return -1;
+	}
+
 	cur = jobs;
 	while (cur != NULL) {
 		if (cur->state == RUNNING)
@@ -371,6 +433,16 @@
 		jobs = rm_job(jobs, cur);
 		cur = cur->next;
 	}
+
+	ret = pthread_mutex_unlock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Couldn't release "
+				"job queue lock\n");
+
+		return -1;
+	}
+	/* Job queue lock released */
+
 }
 
 int main(int argc, char **argv)

Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c	Mon Aug 19 20:45:05 2013	(r256167)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c	Mon Aug 19 20:46:32 2013	(r256168)
@@ -13,8 +13,9 @@
 #include "dm.h"
 
 
-static const char *prefixes = " kMGTP";
+static const char 	*prefixes = " kMGTP";
 extern struct dmjob 	*jobs;
+extern pthread_mutex_t	 job_queue_mutex;
 
 #define	TMP_EXT		".tmp"
 
@@ -22,8 +23,8 @@
 authenticate(struct url *url)
 {
 	struct dmmsg msg;
-	struct dmjob *cur = jobs;
-	int bufsize = 0, i = 0, schlen, hlen;
+	struct dmjob *cur;
+	int bufsize = 0, i = 0, schlen, hlen, ret;
 
 	schlen = strlen(url->scheme) + 1;
 	hlen = strlen(url->host) + 1;
@@ -46,6 +47,15 @@
 	msg.op = DMAUTHREQ;
 	msg.len = bufsize;
 
+	/* Acquire job queue lock */
+	ret = pthread_mutex_lock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Attempt to acquire"
+				" job queue mutex failed\n");
+		return -1;
+	}
+
+	cur = jobs;
 	while (cur != NULL) {
 		/* TODO: May be a more thorough comparison? */
 		if (cur->url != url) {
@@ -53,13 +63,22 @@
 			continue;
 		}
 
-		
+
 		/* TODO: How do we figure out which request's 
 		 * authentication credentials to use ???
 		 * */
 
 	}
 
+	ret = pthread_mutex_unlock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Couldn't release "
+				"job queue lock\n");
+
+		return -1;
+	}
+	/* Job queue lock released */
+
 	return 1;
 }
 
@@ -72,6 +91,7 @@
 static void
 stat_send(struct xferstat *xs, int force)
 {
+	int ret;
 	struct dmjob *cur;
 	struct dmmsg msg;
 	pthread_t  self = pthread_self();
@@ -89,14 +109,30 @@
 	msg.op = DMSTAT;
 	msg.buf = buf; 
 	msg.len = sizeof(*xs) + sizeof(force);
-	cur = jobs;
 
+	/* Acquire job queue lock */
+	ret = pthread_mutex_lock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Attempt to acquire"
+				" job queue mutex failed\n");
+		goto done;
+	}
+
+	cur = jobs;
 	while (cur != NULL) {
 		if (pthread_equal(self, cur->worker) != 0)	
 			send_dmmsg(cur->client, msg);
 		cur = cur->next;
 	}
 
+	ret = pthread_mutex_unlock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Couldn't release "
+				"job queue lock\n");
+	}
+	/* Job queue lock released */
+
+done:
 	free(buf);
 	return;
 }
@@ -740,6 +776,7 @@
 	int *clisig;
 	pthread_t tid = pthread_self();
 	if (sig == SIGUSR1) {
+		/* TODO: Umm...Locking? */
 		while (tmp != NULL) {
 			if (pthread_equal(tid, tmp->worker) != 0)
 				break;
@@ -802,8 +839,14 @@
 	char *tmppath;
 	char flags[8];
 
-	/* check if this is a duplicate */
-	ret = mk_url(dmjob, flags);	
+	/* Acquire job queue lock */
+	ret = pthread_mutex_lock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Attempt to acquire"
+				" job queue mutex failed\n");
+		return -1;
+	}
+
 	tmp = jobs;
 	while (tmp != NULL) {
 		if (tmp != dmjob && compare_jobs(tmp, dmjob) == 0) {
@@ -814,11 +857,30 @@
 		tmp = tmp->next;
 	}
 
+	ret = pthread_mutex_unlock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Couldn't release "
+				"job queue lock\n");
+
+		return -1;
+	}
+	/* Job queue lock released */
+
+	/* check if this is a duplicate */
+	ret = mk_url(dmjob, flags);	
 	dmjob->worker = pthread_self();
 
 	/* fetch the remote file into a local tmp file */
 	f = dmXGet(dmjob, &us);
 
+	/* Acquire job queue lock */
+	ret = pthread_mutex_lock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Attempt to acquire"
+				" job queue mutex failed\n");
+		return -1;
+	}
+
 	/* Serve any outstanding requests from the local tmp file */
 	tmp = jobs;
 	while (tmp != NULL) {
@@ -843,6 +905,15 @@
 		tmp = tmp->next;
 	}
 
+	ret = pthread_mutex_unlock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "handle_request: Couldn't release "
+				"job queue lock\n");
+
+		return -1;
+	}
+	/* Job queue lock released */
+
 	/* remove the local tmp file */
 	if (f != NULL) {
 		tmppath = (char *) malloc(strlen(dmjob->request->path) + strlen(TMP_EXT));


More information about the svn-soc-all mailing list