socsvn commit: r257721 - in soc2013/ambarisha/head/usr.bin: dmget dms
ambarisha at FreeBSD.org
ambarisha at FreeBSD.org
Thu Sep 26 12:37:47 UTC 2013
Author: ambarisha
Date: Thu Sep 26 12:37:47 2013
New Revision: 257721
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=257721
Log:
Worker uses "preempted" flag to clean up.
Modified:
soc2013/ambarisha/head/usr.bin/dmget/dmget.c
soc2013/ambarisha/head/usr.bin/dmget/fetch.c
soc2013/ambarisha/head/usr.bin/dms/dms.c
soc2013/ambarisha/head/usr.bin/dms/worker.c
Modified: soc2013/ambarisha/head/usr.bin/dmget/dmget.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/dmget.c Thu Sep 26 12:32:12 2013 (r257720)
+++ soc2013/ambarisha/head/usr.bin/dmget/dmget.c Thu Sep 26 12:37:47 2013 (r257721)
@@ -22,8 +22,8 @@
int dmLastErrCode;
char dmLastErrString[MAXERRSTRING];
-static int sigint;
-static int siginfo;
+static volatile sig_atomic_t sigint;
+static volatile sig_atomic_t siginfo;
static int dmg_error;
static char dmg_errstr[512];
Modified: soc2013/ambarisha/head/usr.bin/dmget/fetch.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/fetch.c Thu Sep 26 12:32:12 2013 (r257720)
+++ soc2013/ambarisha/head/usr.bin/dmget/fetch.c Thu Sep 26 12:37:47 2013 (r257721)
@@ -91,7 +91,7 @@
static int chksum_type = NO_CHKSUM; /* (SHA1/MD5/NO)_CHKSUM */
static char chksum[MAX_CHKSUM_LEN];
-static int sigint; /* SIGINT received */
+static volatile sig_atomic_t sigint; /* SIGINT received */
static long ftp_timeout = TIMEOUT; /* default timeout for FTP transfers */
static long http_timeout = TIMEOUT;/* default timeout for HTTP transfers */
Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c Thu Sep 26 12:32:12 2013 (r257720)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c Thu Sep 26 12:37:47 2013 (r257721)
@@ -20,7 +20,7 @@
static int dm_err;
static char dm_errstr[512];
-int stop;
+volatile sig_atomic_t stop;
struct dmjob *jobs;
pthread_mutex_t job_queue_mutex;
Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c Thu Sep 26 12:32:12 2013 (r257720)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c Thu Sep 26 12:37:47 2013 (r257721)
@@ -371,6 +371,28 @@
return (r);
}
+static char *
+get_tmpfn(const char *fn)
+{
+ /* TODO: Not good assumes type of opaque pthread_t type
+ * Fix this by having a pthread_t -> id mapping
+ */
+ unsigned int tid = (unsigned long)pthread_self();
+ char idstr[8];
+ sprintf(idstr, ".%u", tid);
+
+ char *tmpfn = (char *) malloc(strlen(fn) + strlen(idstr) + strlen(TMP_EXT) + 1);
+ if (tmpfn == NULL) {
+ fprintf(stderr, "dms: Insufficient memory\n");
+ return NULL;
+ }
+
+ strcpy(tmpfn, fn);
+ strcat(tmpfn, idstr);
+ strcat(tmpfn, TMP_EXT);
+ return tmpfn;
+}
+
static struct dmjob *
find_potential_job(struct dmmirr *dmmirr)
{
@@ -470,6 +492,7 @@
char *ptr;
char *buf;
struct dmreq *dmreq = dmjob->request;
+ pthread_t tid = pthread_self();
of = NULL;
tmppath = NULL;
@@ -535,6 +558,9 @@
if (check_signal(SIGALRM, dmjob) || check_signal(SIGINT, dmjob))
goto signal;
+
+ if (dmjob->preempted && dmjob->worker != tid)
+ goto preempted;
/* check that size is as expected */
/*if (dmreq->S_size) {
@@ -650,6 +676,9 @@
dmjob->url->offset = 0;
if (check_signal(SIGINT, dmjob))
goto signal;
+
+ if (dmjob->preempted && dmjob->worker != tid)
+ goto preempted;
}
@@ -692,7 +721,8 @@
/* suck in the data */
dmjob->siginfo_en = 1;
- while (!check_signal(SIGINT, dmjob)) {
+ while (!check_signal(SIGINT, dmjob) &&
+ (dmjob->preempted == 0 || dmjob->worker == tid)) {
if (us.size != -1 && us.size - count < dmreq->B_size &&
us.size - count >= 0)
size = us.size - count;
@@ -703,11 +733,15 @@
dmjob->siginfo = 0;
}
+ if (dmjob->preempted && dmjob->worker != tid)
+ goto preempted;
+
if (size == 0)
break;
if ((readcnt = fread(buf, 1, size, f)) < size) {
- if (ferror(f) && errno == EINTR && !check_signal(SIGINT, dmjob))
+ if (ferror(f) && errno == EINTR && !check_signal(SIGINT, dmjob) &&
+ (dmjob->preempted == 0 || dmjob->worker == tid))
clearerr(f);
else if (readcnt == 0) {
break;
@@ -731,8 +765,16 @@
dmjob->sigalrm = ferror(f) && errno == ETIMEDOUT;
dmjob->siginfo_en = 0;
+ if (dmjob->preempted && dmjob->worker != tid)
+ goto preempted;
+
+
stat_end(&xs, dmjob);
+preempted:
+ r = -1;
+ goto done;
+
/*
* If the transfer timed out or was interrupted, we still want to
* set the mtime in case the file is not removed (-r or -R) and
@@ -887,13 +929,9 @@
}
*/
- tmpreq.path = (char *) malloc(strlen(dmreq->path) + strlen(TMP_EXT));
- if (tmpreq.path == NULL) {
- fprintf(stderr, "dmXGet: Insufficient memory\n");
+ tmpreq.path = get_tmpfn(dmreq->path);
+ if (tmpreq.path == NULL)
goto done;
- }
- strcpy(tmpreq.path, dmreq->path);
- strcat(tmpreq.path, TMP_EXT);
tmpjob.ofd = open(tmpreq.path, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
@@ -905,11 +943,14 @@
goto done;
}
- fetch(&tmpjob, f, *us);
- fclose(f);
+ ret = fetch(&tmpjob, f, *us);
+ if (ret == -1) {
+ f = NULL;
+ goto done;
+ }
+ fclose(f);
f = fopen(tmpreq.path, "r");
-
done:
free(tmpjob.url->doc);
free(tmpjob.url);
@@ -1008,7 +1049,7 @@
int ret;
FILE *f;
char *tmppath;
- char flags[8];
+ char flags[8], tid[8];
/* Acquire job queue lock */
ret = pthread_mutex_lock(&job_queue_mutex);
@@ -1043,6 +1084,8 @@
/* fetch the remote file into a local tmp file */
f = dmXGet(dmjob, &us);
+ if (f == NULL && dmjob->preempted && dmjob->worker != pthread_self())
+ return NULL;
/* Acquire job queue lock */
ret = pthread_mutex_lock(&job_queue_mutex);
@@ -1084,13 +1127,11 @@
/* remove the local tmp file */
if (f != NULL) {
- tmppath = (char *) malloc(strlen(dmjob->request->path) +
- strlen(TMP_EXT));
- strcpy(tmppath, dmjob->request->path);
- strcat(tmppath, TMP_EXT);
-
- remove(tmppath);
- free(tmppath);
+ tmppath = get_tmpfn(dmjob->request->path);
+ if (tmppath != NULL) {
+ remove(tmppath);
+ free(tmppath);
+ }
}
More information about the svn-soc-all
mailing list