socsvn commit: r257723 - in soc2013/ambarisha/head/usr.bin: dmget dms
ambarisha at FreeBSD.org
ambarisha at FreeBSD.org
Thu Sep 26 13:03:31 UTC 2013
Author: ambarisha
Date: Thu Sep 26 13:03:30 2013
New Revision: 257723
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=257723
Log:
Fixed some bugs in job migration and status dump
Modified:
soc2013/ambarisha/head/usr.bin/dmget/dmget.c
soc2013/ambarisha/head/usr.bin/dmget/utils.c
soc2013/ambarisha/head/usr.bin/dms/dm.h
soc2013/ambarisha/head/usr.bin/dms/dms.c
soc2013/ambarisha/head/usr.bin/dms/dms.h
soc2013/ambarisha/head/usr.bin/dms/mirror.c
soc2013/ambarisha/head/usr.bin/dms/utils.c
soc2013/ambarisha/head/usr.bin/dms/utils.h
soc2013/ambarisha/head/usr.bin/dms/worker.c
Modified: soc2013/ambarisha/head/usr.bin/dmget/dmget.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/dmget.c Thu Sep 26 13:01:43 2013 (r257722)
+++ soc2013/ambarisha/head/usr.bin/dmget/dmget.c Thu Sep 26 13:03:30 2013 (r257723)
@@ -281,7 +281,6 @@
ret = sigsafe_write(sock, reqbuf, bufsize);
free(reqbuf);
-
if (ret == -1)
return -1;
@@ -406,9 +405,8 @@
while (!sigint) {
struct dmmsg *msg;
msg = recv_dmmsg(sock);
- if (msg == NULL) {
+ if (msg == NULL)
goto failure;
- }
if (sigint) {
send_signal(sock, SIGINT);
Modified: soc2013/ambarisha/head/usr.bin/dmget/utils.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/utils.c Thu Sep 26 13:01:43 2013 (r257722)
+++ soc2013/ambarisha/head/usr.bin/dmget/utils.c Thu Sep 26 13:03:30 2013 (r257723)
@@ -13,10 +13,12 @@
int
send_dmmsg(int socket, struct dmmsg msg)
{
+ printf("IN DMMSG\n");
int bufsize = sizeof(bufsize); // Buffer size
bufsize += 1; // Op
bufsize += msg.len; // Signal number
+ printf("About to send message\n");
char *sndbuf = (char *) malloc(bufsize);
if (sndbuf == NULL) {
fprintf(stderr, "send_dmmsg: malloc: insufficient memory\n");
@@ -30,10 +32,14 @@
*(sndbuf + i) = msg.op;
i++;
- memcpy(sndbuf + i, msg.buf, msg.len);
- i += msg.len;
+ if (msg.len != 0) {
+ memcpy(sndbuf + i, msg.buf, msg.len);
+ i += msg.len;
+ }
int nbytes = write(socket, sndbuf, bufsize);
+ perror("send_dmmsg write");
+ printf("%d bytes sent\n", nbytes);
free(sndbuf);
if (nbytes == -1) {
@@ -79,6 +85,13 @@
}
bufsize -= sizeof(msg->op);
+
+ /* This is to accommodate for 0 length messages */
+ if (bufsize == 0) {
+ msg->len = 0;
+ msg->buf = NULL;
+ return msg;
+ }
msg->buf = (char *) malloc(bufsize);
if (msg == NULL) {
Modified: soc2013/ambarisha/head/usr.bin/dms/dm.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dm.h Thu Sep 26 13:01:43 2013 (r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/dm.h Thu Sep 26 13:03:30 2013 (r257723)
@@ -18,6 +18,8 @@
#define MAX_CHKSUM_LEN SHA_DIGEST_LENGTH /* TODO: Any better alternative? */
+typedef enum { RUNNING = 0, DONE, DUPLICATE } state_t;
+
struct dmres {
int status;
int errcode;
@@ -68,6 +70,15 @@
char *buf;
};
+struct dmsumm {
+ char name[64];
+ char mirror[64];
+ state_t state;
+ off_t size;
+ off_t rcvd;
+ long eta;
+};
+
struct xferstat {
char name[64];
struct timeval start; /* start of transfer */
@@ -85,5 +96,7 @@
#define DMAUTHRESP 4
#define DMSIG 5
#define DMSTAT 6
+#define DMDUMPREQ 7
+#define DMDUMPRESP 8
#endif /* _DMCLIENT_H */
Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c Thu Sep 26 13:01:43 2013 (r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c Thu Sep 26 13:03:30 2013 (r257723)
@@ -17,12 +17,14 @@
#include "utils.h"
#include "mirror.h"
-static int dm_err;
-static char dm_errstr[512];
+#define MAX_SUMMS 32
-volatile sig_atomic_t stop;
-struct dmjob *jobs;
-pthread_mutex_t job_queue_mutex;
+static int dm_err;
+static char dm_errstr[512];
+
+volatile sig_atomic_t stop;
+struct dmjob *jobs;
+pthread_mutex_t job_queue_mutex;
extern struct dmmirr *mirrors;
extern pthread_mutex_t mirror_list_mutex;
@@ -103,7 +105,7 @@
new->next = head;
}
-static struct dmjob *
+struct dmjob *
rm_job(struct dmjob *head, struct dmjob *job)
{
if (job->next != NULL)
@@ -123,6 +125,7 @@
{
int ret;
struct dmmirr *cur;
+
struct dmjob *dmjob = (struct dmjob *) malloc(sizeof(struct dmjob));
if (dmjob == NULL) {
fprintf(stderr, "mk_dmjob: malloc: insufficient memory\n");
@@ -241,7 +244,7 @@
return dmreq;
}
-static void
+void
rm_dmreq(struct dmreq **dmreq)
{
if (*dmreq == NULL)
@@ -263,6 +266,50 @@
free((*dmjob)->url);
}
+static void
+send_job_summaries(int sock)
+{
+ struct dmsumm summs[MAX_SUMMS];
+ int i, ret;
+ struct dmmsg dmmsg;
+ struct dmjob *tmp = jobs;
+
+ /* Acquire job queue lock */
+ ret = pthread_mutex_lock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "send_job_summaries: Attempt to acquire"
+ " job queue mutex failed\n");
+ return;
+ }
+
+ for (i = 0; i < MAX_SUMMS; i++) {
+ void *temp = tmp->url->doc;
+ strncpy(summs[i].name, tmp->url->doc, sizeof(summs[i].name));
+ strncpy(summs[i].mirror, tmp->mirror->name,
+ sizeof(summs[i].name));
+
+ summs[i].state = tmp->state;
+ summs[i].size = tmp->oldstat.size;
+ summs[i].rcvd = tmp->oldstat.rcvd;
+ summs[i].eta = get_eta(&(tmp->oldstat));
+ }
+
+ ret = pthread_mutex_unlock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "send_job_summaries: Couldn't release "
+ "job queue lock\n");
+ return;
+ }
+ /* Job queue lock released */
+
+ dmmsg.op = DMDUMPRESP;
+ dmmsg.len = i * sizeof(struct dmsumm);
+ dmmsg.buf = (char *)summs;
+
+ send_dmmsg(sock, dmmsg);
+ return;
+}
+
static int
handle_request(int csock)
{
@@ -273,6 +320,7 @@
int ret;
pid_t pid;
+
msg = recv_dmmsg(csock);
if (msg == NULL) {
report.status = -1;
@@ -311,6 +359,9 @@
pthread_create(&(dmjob->worker), NULL, run_worker, dmjob);
pthread_detach(dmjob->worker);
goto done;
+ case DMDUMPREQ:
+ send_job_summaries(csock);
+ goto done;
default:
free_dmmsg(&msg);
goto error;
@@ -326,6 +377,7 @@
rm_dmjob(&dmjob);
ret = -1;
done:
+
free_dmmsg(&msg);
return ret;
}
@@ -376,7 +428,6 @@
"select: %s\n", strerror(errno));
goto wrap_up;
}
-
handle_request(csock);
}
@@ -414,6 +465,7 @@
int main(int argc, char **argv)
{
int sock, err;
+
sock = socket(AF_UNIX, SOCK_STREAM, 0);
if (sock == -1) {
fprintf(stderr, "main: socket: %s\n", strerror(errno));
Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.h Thu Sep 26 13:01:43 2013 (r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.h Thu Sep 26 13:03:30 2013 (r257723)
@@ -18,13 +18,7 @@
int siginfo_en;
unsigned timeout;
int preempted;
-
- enum {
- RUNNING = 0,
- DONE,
- DUPLICATE
- } state;
-
+ state_t state;
pthread_t worker;
struct dmreq *request;
struct url *url;
@@ -42,7 +36,7 @@
};
struct dmmirr {
- char name[512];
+ char name[256];
int index;
enum {
Modified: soc2013/ambarisha/head/usr.bin/dms/mirror.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/mirror.c Thu Sep 26 13:01:43 2013 (r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/mirror.c Thu Sep 26 13:03:30 2013 (r257723)
@@ -86,6 +86,8 @@
mirror->remark = NOT_TRIED;
} else if (strcmp(rem, "FAILED") == 0) {
mirror->remark = FAILED;
+ } else if (strcmp(rem, "ACTIVE") == 0) {
+ mirror->remark = ACTIVE;
} else {
fprintf(stderr, "WARNING: Unknown mirror state in mirrors.list\n");
}
@@ -122,7 +124,12 @@
case FAILED:
fputs("FAILED\n", f);
break;
+ case ACTIVE:
+ fputs("ACTIVE\n", f);
+ break;
}
+
+ fprintf(f, "%u\n", mirror->index);
for(i = 0; i < MAX_SAMPLES; i++) {
fprintf(f, "%ld\t%f\n", mirror->timestamps[i].tv_sec,
@@ -143,6 +150,7 @@
for(i = 0; i < sizeof(MIRROR_LIST) / sizeof(MIRROR_LIST[0]); i++) {
fwrite(MIRROR_LIST[i], strlen(MIRROR_LIST[i]), 1, f);
fprintf(f, "\nNOT_TRIED\n");
+ fprintf(f, "0\n");
for (j = 0; j < MAX_SAMPLES; j++)
fprintf(f, "0\t0\n");
}
Modified: soc2013/ambarisha/head/usr.bin/dms/utils.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/utils.c Thu Sep 26 13:01:43 2013 (r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/utils.c Thu Sep 26 13:03:30 2013 (r257723)
@@ -30,8 +30,10 @@
*(sndbuf + i) = msg.op;
i++;
- memcpy(sndbuf + i, msg.buf, msg.len);
- i += msg.len;
+ if (msg.len != 0) {
+ memcpy(sndbuf + i, msg.buf, msg.len);
+ i += msg.len;
+ }
int nbytes = write(socket, sndbuf, bufsize);
free(sndbuf);
@@ -79,6 +81,13 @@
}
bufsize -= sizeof(msg->op);
+
+ /* This is to accommodate for 0 length messages */
+ if (bufsize == 0) {
+ msg->len = 0;
+ msg->buf = NULL;
+ return msg;
+ }
msg->buf = (char *) malloc(bufsize);
if (msg == NULL) {
@@ -118,3 +127,14 @@
free(*msg);
*msg = NULL;
}
+
+long
+get_eta(struct xferstat *xs)
+{
+ long eta, elapsed, speed, received, expected;
+ elapsed = xs->last.tv_sec - xs->start.tv_sec;
+ received = xs->rcvd - xs->offset;
+ expected = xs->size - xs->rcvd;
+ eta = (long)((double) elapsed * expected / received);
+ return eta;
+}
Modified: soc2013/ambarisha/head/usr.bin/dms/utils.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/utils.h Thu Sep 26 13:01:43 2013 (r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/utils.h Thu Sep 26 13:03:30 2013 (r257723)
@@ -18,3 +18,6 @@
void
free_dmmsg(struct dmmsg **msg);
+
+long
+get_eta(struct xferstat *xs);
Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c Thu Sep 26 13:01:43 2013 (r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c Thu Sep 26 13:03:30 2013 (r257723)
@@ -93,20 +93,11 @@
}
static long
-get_eta(struct dmjob *dmjob, struct dmmirr *dmmirr)
+mirr_eta(off_t size, struct dmmirr *dmmirr)
{
- long eta, elapsed, speed, received, expected;
- if (dmmirr == dmjob->mirror) {
- elapsed = dmjob->oldstat.last.tv_sec - dmjob->oldstat.start.tv_sec;
- received = dmjob->oldstat.rcvd - dmjob->oldstat.offset;
- expected = dmjob->oldstat.size - dmjob->oldstat.rcvd;
- eta = (long)((double) elapsed * expected / received);
- } else {
- expected = dmjob->oldstat.size;
- speed = get_average_speed(dmmirr);
- eta = (long)((double) expected / speed);
- }
-
+ long expected, speed, eta;
+ speed = get_average_speed(dmmirr);
+ eta = (long)((double) size / speed);
return eta;
}
@@ -377,8 +368,9 @@
/* TODO: Not good assumes type of opaque pthread_t type
* Fix this by having a pthread_t -> id mapping
*/
- unsigned int tid = (unsigned long)pthread_self();
- char idstr[8];
+ unsigned int tid = (unsigned int)pthread_self();
+ char idstr[32];
+
sprintf(idstr, ".%u", tid);
char *tmpfn = (char *) malloc(strlen(fn) + strlen(idstr) + strlen(TMP_EXT) + 1);
@@ -410,11 +402,10 @@
tmp = jobs;
while (tmp != NULL) {
- cureta = get_eta(tmp, tmp->mirror);
- neweta = get_eta(tmp, dmmirr);
+ cureta = get_eta(&(tmp->oldstat));
+ neweta = mirr_eta(tmp->oldstat.size, dmmirr);
if (neweta < cureta) {
- /* notify the current owner worker to let go */
tmp->preempted = 1;
break;
}
@@ -456,6 +447,9 @@
tv.tv_usec = 0;
ret = select(dmjob->client + 1, &fds, NULL, NULL, &tv);
+ if (!FD_ISSET(dmjob->client, &fds))
+ break;
+
msg = recv_dmmsg(dmjob->client);
sig = (int *)msg->buf;
if (*sig == SIGINT)
@@ -562,6 +556,7 @@
if (dmjob->preempted && dmjob->worker != tid)
goto preempted;
+
/* check that size is as expected */
/*if (dmreq->S_size) {
if (us.size == -1) {
@@ -770,6 +765,7 @@
stat_end(&xs, dmjob);
+ goto signal;
preempted:
r = -1;
@@ -932,9 +928,9 @@
tmpreq.path = get_tmpfn(dmreq->path);
if (tmpreq.path == NULL)
goto done;
-
+
tmpjob.ofd = open(tmpreq.path, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
-
+
FILE *f = fetchXGet(tmpjob.url, us, flags);
if (f == NULL) {
close(tmpjob.ofd);
@@ -946,6 +942,7 @@
ret = fetch(&tmpjob, f, *us);
if (ret == -1) {
f = NULL;
+ fprintf(stderr, "Failed now\n");
goto done;
}
@@ -984,6 +981,7 @@
/* Notify the client of the same */
return -1;
}
+
break;
case MD5_CHKSUM:
MD5_Init(&md5_ctx);
@@ -998,6 +996,7 @@
fprintf(stderr, "dms: checksum mismatch\n");
return -1;
}
+
break;
default:
break;
@@ -1045,6 +1044,7 @@
{
struct dmrep report;
struct dmjob *tmp;
+ struct dmmirr *dmmirr;
struct url_stat us;
int ret;
FILE *f;
@@ -1086,7 +1086,7 @@
f = dmXGet(dmjob, &us);
if (f == NULL && dmjob->preempted && dmjob->worker != pthread_self())
return NULL;
-
+
/* Acquire job queue lock */
ret = pthread_mutex_lock(&job_queue_mutex);
if (ret == -1) {
@@ -1095,6 +1095,7 @@
return NULL;
}
+
/* Serve any outstanding requests from the local tmp file */
tmp = jobs;
while (tmp != NULL) {
@@ -1129,18 +1130,24 @@
if (f != NULL) {
tmppath = get_tmpfn(dmjob->request->path);
if (tmppath != NULL) {
- remove(tmppath);
+// remove(tmppath);
free(tmppath);
}
}
+ dmmirr = dmjob->mirror;
+
+ rm_dmreq(&(dmjob->request));
+ jobs = rm_job(jobs, dmjob);
/* Check if this worker can prempt any downloads */
- dmjob = find_potential_job(dmjob->mirror);
- if (dmjob != NULL)
+ tmp = find_potential_job(dmmirr);
+ if (tmp != NULL) {
+ dmjob = tmp;
goto start;
-
- release_mirror(dmjob->mirror);
+ }
+
+ release_mirror(dmmirr);
/* TODO : What is this? Yew!! */
sleep(10);
}
More information about the svn-soc-all
mailing list