socsvn commit: r256168 - soc2013/ambarisha/head/usr.bin/dms
ambarisha at FreeBSD.org
ambarisha at FreeBSD.org
Mon Aug 19 20:46:33 UTC 2013
Author: ambarisha
Date: Mon Aug 19 20:46:32 2013
New Revision: 256168
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=256168
Log:
Thread-proofing the globals.
The global job queue is now protected by a mutex lock
Modified:
soc2013/ambarisha/head/usr.bin/dms/dms.c
soc2013/ambarisha/head/usr.bin/dms/worker.c
Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:45:05 2013 (r256167)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c Mon Aug 19 20:46:32 2013 (r256168)
@@ -17,8 +17,9 @@
static int dm_err;
static char dm_errstr[512];
-int stop;
-struct dmjob *jobs;
+int stop;
+struct dmjob *jobs;
+pthread_mutex_t job_queue_mutex;
void *run_worker(struct dmjob *job);
@@ -261,7 +262,25 @@
if ((dmjob = mk_dmjob(dmreq, csock)) == NULL)
goto error;
+ /* Acquire job queue lock */
+ ret = pthread_mutex_lock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Attempt to acquire"
+ " job queue mutex failed\n");
+ goto error;
+ }
+
jobs = add_job(jobs, dmjob);
+
+ ret = pthread_mutex_unlock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Couldn't release "
+ "job queue lock\n");
+
+ goto error;
+ }
+ /* Job queue lock released */
+
pthread_create(&(dmjob->worker), NULL, run_worker, dmjob);
pthread_detach(dmjob->worker);
goto done;
@@ -310,6 +329,7 @@
struct dmjob *cur;
void *retptr;
jobs = NULL;
+ job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
fd_set fdset;
signal(SIGINT, sigint_handler);
@@ -320,6 +340,14 @@
maxfd = socket;
FD_SET(socket, &fdset);
+ /* Acquire job queue lock */
+ ret = pthread_mutex_lock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Attempt to acquire"
+ " job queue mutex failed\n");
+ return -1;
+ }
+
cur = jobs;
while (cur != NULL) {
FD_SET(cur->client, &fdset);
@@ -328,6 +356,15 @@
cur = cur->next;
}
+ ret = pthread_mutex_unlock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Couldn't release "
+ "job queue lock\n");
+
+ return -1;
+ }
+ /* Job queue lock released */
+
ret = select(maxfd + 1, &fdset, NULL, NULL, NULL);
if (ret == -1) {
fprintf(stderr, "run_event_loop: "
@@ -335,6 +372,14 @@
goto wrap_up;
}
+ /* Acquire job queue lock */
+ ret = pthread_mutex_lock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Attempt to acquire"
+ " job queue mutex failed\n");
+ return -1;
+ }
+
cur = jobs;
while (cur != NULL) {
state = service_job(cur, &fdset);
@@ -346,6 +391,15 @@
cur = cur->next;
}
+ ret = pthread_mutex_unlock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Couldn't release "
+ "job queue lock\n");
+
+ return -1;
+ }
+ /* Job queue lock released */
+
if (FD_ISSET(socket, &fdset)) {
struct sockaddr_un cliaddr;
size_t cliaddrlen = sizeof(cliaddr);
@@ -362,6 +416,14 @@
wrap_up:
/* Notify all running workers that we've to wrap up */
+ /* Acquire job queue lock */
+ ret = pthread_mutex_lock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Attempt to acquire"
+ " job queue mutex failed\n");
+ return -1;
+ }
+
cur = jobs;
while (cur != NULL) {
if (cur->state == RUNNING)
@@ -371,6 +433,16 @@
jobs = rm_job(jobs, cur);
cur = cur->next;
}
+
+ ret = pthread_mutex_unlock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Couldn't release "
+ "job queue lock\n");
+
+ return -1;
+ }
+ /* Job queue lock released */
+
}
int main(int argc, char **argv)
Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:45:05 2013 (r256167)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c Mon Aug 19 20:46:32 2013 (r256168)
@@ -13,8 +13,9 @@
#include "dm.h"
-static const char *prefixes = " kMGTP";
+static const char *prefixes = " kMGTP";
extern struct dmjob *jobs;
+extern pthread_mutex_t job_queue_mutex;
#define TMP_EXT ".tmp"
@@ -22,8 +23,8 @@
authenticate(struct url *url)
{
struct dmmsg msg;
- struct dmjob *cur = jobs;
- int bufsize = 0, i = 0, schlen, hlen;
+ struct dmjob *cur;
+ int bufsize = 0, i = 0, schlen, hlen, ret;
schlen = strlen(url->scheme) + 1;
hlen = strlen(url->host) + 1;
@@ -46,6 +47,15 @@
msg.op = DMAUTHREQ;
msg.len = bufsize;
+ /* Acquire job queue lock */
+ ret = pthread_mutex_lock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Attempt to acquire"
+ " job queue mutex failed\n");
+ return -1;
+ }
+
+ cur = jobs;
while (cur != NULL) {
/* TODO: May be a more thorough comparison? */
if (cur->url != url) {
@@ -53,13 +63,22 @@
continue;
}
-
+
/* TODO: How do we figure out which request's
* authentication credentials to use ???
* */
}
+ ret = pthread_mutex_unlock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Couldn't release "
+ "job queue lock\n");
+
+ return -1;
+ }
+ /* Job queue lock released */
+
return 1;
}
@@ -72,6 +91,7 @@
static void
stat_send(struct xferstat *xs, int force)
{
+ int ret;
struct dmjob *cur;
struct dmmsg msg;
pthread_t self = pthread_self();
@@ -89,14 +109,30 @@
msg.op = DMSTAT;
msg.buf = buf;
msg.len = sizeof(*xs) + sizeof(force);
- cur = jobs;
+ /* Acquire job queue lock */
+ ret = pthread_mutex_lock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Attempt to acquire"
+ " job queue mutex failed\n");
+ goto done;
+ }
+
+ cur = jobs;
while (cur != NULL) {
if (pthread_equal(self, cur->worker) != 0)
send_dmmsg(cur->client, msg);
cur = cur->next;
}
+ ret = pthread_mutex_unlock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Couldn't release "
+ "job queue lock\n");
+ }
+ /* Job queue lock released */
+
+done:
free(buf);
return;
}
@@ -740,6 +776,7 @@
int *clisig;
pthread_t tid = pthread_self();
if (sig == SIGUSR1) {
+ /* TODO: Umm...Locking? */
while (tmp != NULL) {
if (pthread_equal(tid, tmp->worker) != 0)
break;
@@ -802,8 +839,14 @@
char *tmppath;
char flags[8];
- /* check if this is a duplicate */
- ret = mk_url(dmjob, flags);
+ /* Acquire job queue lock */
+ ret = pthread_mutex_lock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Attempt to acquire"
+ " job queue mutex failed\n");
+ return -1;
+ }
+
tmp = jobs;
while (tmp != NULL) {
if (tmp != dmjob && compare_jobs(tmp, dmjob) == 0) {
@@ -814,11 +857,30 @@
tmp = tmp->next;
}
+ ret = pthread_mutex_unlock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Couldn't release "
+ "job queue lock\n");
+
+ return -1;
+ }
+ /* Job queue lock released */
+
+ /* check if this is a duplicate */
+ ret = mk_url(dmjob, flags);
dmjob->worker = pthread_self();
/* fetch the remote file into a local tmp file */
f = dmXGet(dmjob, &us);
+ /* Acquire job queue lock */
+ ret = pthread_mutex_lock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Attempt to acquire"
+ " job queue mutex failed\n");
+ return -1;
+ }
+
/* Serve any outstanding requests from the local tmp file */
tmp = jobs;
while (tmp != NULL) {
@@ -843,6 +905,15 @@
tmp = tmp->next;
}
+ ret = pthread_mutex_unlock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Couldn't release "
+ "job queue lock\n");
+
+ return -1;
+ }
+ /* Job queue lock released */
+
/* remove the local tmp file */
if (f != NULL) {
tmppath = (char *) malloc(strlen(dmjob->request->path) + strlen(TMP_EXT));
More information about the svn-soc-all
mailing list