socsvn commit: r257723 - in soc2013/ambarisha/head/usr.bin: dmget dms

ambarisha at FreeBSD.org ambarisha at FreeBSD.org
Thu Sep 26 13:03:31 UTC 2013


Author: ambarisha
Date: Thu Sep 26 13:03:30 2013
New Revision: 257723
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=257723

Log:
  Fixed some bugs in job migration and status dump
  

Modified:
  soc2013/ambarisha/head/usr.bin/dmget/dmget.c
  soc2013/ambarisha/head/usr.bin/dmget/utils.c
  soc2013/ambarisha/head/usr.bin/dms/dm.h
  soc2013/ambarisha/head/usr.bin/dms/dms.c
  soc2013/ambarisha/head/usr.bin/dms/dms.h
  soc2013/ambarisha/head/usr.bin/dms/mirror.c
  soc2013/ambarisha/head/usr.bin/dms/utils.c
  soc2013/ambarisha/head/usr.bin/dms/utils.h
  soc2013/ambarisha/head/usr.bin/dms/worker.c

Modified: soc2013/ambarisha/head/usr.bin/dmget/dmget.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/dmget.c	Thu Sep 26 13:01:43 2013	(r257722)
+++ soc2013/ambarisha/head/usr.bin/dmget/dmget.c	Thu Sep 26 13:03:30 2013	(r257723)
@@ -281,7 +281,6 @@
 
 	ret = sigsafe_write(sock, reqbuf, bufsize);
 	free(reqbuf);
-
 	if (ret == -1)
 		return -1;
 
@@ -406,9 +405,8 @@
 	while (!sigint) {
 		struct dmmsg *msg;
 		msg = recv_dmmsg(sock);				
-		if (msg == NULL) {
+		if (msg == NULL) 
 			goto failure;
-		}
 
 		if (sigint) {
 			send_signal(sock, SIGINT);

Modified: soc2013/ambarisha/head/usr.bin/dmget/utils.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dmget/utils.c	Thu Sep 26 13:01:43 2013	(r257722)
+++ soc2013/ambarisha/head/usr.bin/dmget/utils.c	Thu Sep 26 13:03:30 2013	(r257723)
@@ -13,10 +13,12 @@
 int
 send_dmmsg(int socket, struct dmmsg msg)
 {
+	printf("IN DMMSG\n");
 	int bufsize = sizeof(bufsize);	// Buffer size
 	bufsize += 1; 			// Op
 	bufsize += msg.len;		// Signal number
 
+	printf("About to send message\n");
 	char *sndbuf = (char *) malloc(bufsize);
 	if (sndbuf == NULL) {
 		fprintf(stderr, "send_dmmsg: malloc: insufficient memory\n");
@@ -30,10 +32,14 @@
 	*(sndbuf + i) = msg.op;
 	i++;
 
-	memcpy(sndbuf + i, msg.buf, msg.len);
-	i += msg.len;
+	if (msg.len != 0) {
+		memcpy(sndbuf + i, msg.buf, msg.len);
+		i += msg.len;
+	}
 
 	int nbytes = write(socket, sndbuf, bufsize);
+	perror("send_dmmsg write");
+	printf("%d bytes sent\n", nbytes);
 	free(sndbuf);
 	
 	if (nbytes == -1) {
@@ -79,6 +85,13 @@
 	}
 
 	bufsize -= sizeof(msg->op);
+	
+	/* This is to accommodate for 0 length messages */
+	if (bufsize == 0) {
+		msg->len = 0;
+		msg->buf = NULL;
+		return msg;
+	}
 
 	msg->buf = (char *) malloc(bufsize);
 	if (msg == NULL) {

Modified: soc2013/ambarisha/head/usr.bin/dms/dm.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dm.h	Thu Sep 26 13:01:43 2013	(r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/dm.h	Thu Sep 26 13:03:30 2013	(r257723)
@@ -18,6 +18,8 @@
 
 #define	MAX_CHKSUM_LEN	SHA_DIGEST_LENGTH /* TODO: Any better alternative? */
 
+typedef enum { RUNNING = 0, DONE, DUPLICATE } state_t;
+
 struct dmres {
 	int	 status;
 	int	 errcode;
@@ -68,6 +70,15 @@
 	char 	*buf;
 };
 
+struct dmsumm {
+	char		name[64];
+	char		mirror[64];
+	state_t		state;
+	off_t		size;
+	off_t		rcvd;
+	long		eta;
+};
+
 struct xferstat {
 	char		 name[64];
 	struct timeval	 start;		/* start of transfer */
@@ -85,5 +96,7 @@
 #define		DMAUTHRESP		4
 #define		DMSIG			5
 #define		DMSTAT			6
+#define		DMDUMPREQ		7
+#define		DMDUMPRESP		8
 
 #endif /* _DMCLIENT_H */

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c	Thu Sep 26 13:01:43 2013	(r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c	Thu Sep 26 13:03:30 2013	(r257723)
@@ -17,12 +17,14 @@
 #include "utils.h"
 #include "mirror.h"
 
-static int	dm_err;
-static char	dm_errstr[512];
+#define MAX_SUMMS	32
 
-volatile sig_atomic_t 	 stop;
-struct dmjob		*jobs;
-pthread_mutex_t	 	 job_queue_mutex;
+static int			 dm_err;
+static char			 dm_errstr[512];
+
+volatile sig_atomic_t 	 	 stop;
+struct dmjob			*jobs;
+pthread_mutex_t	 	 	 job_queue_mutex;
 
 extern struct dmmirr		*mirrors;
 extern pthread_mutex_t		 mirror_list_mutex;
@@ -103,7 +105,7 @@
 	new->next = head;	
 }
 
-static struct dmjob *
+struct dmjob *
 rm_job(struct dmjob *head, struct dmjob *job)
 {
 	if (job->next != NULL) 
@@ -123,6 +125,7 @@
 {
 	int ret;
 	struct dmmirr *cur;
+
 	struct dmjob *dmjob = (struct dmjob *) malloc(sizeof(struct dmjob));
 	if (dmjob == NULL) {
 		fprintf(stderr, "mk_dmjob: malloc: insufficient memory\n");
@@ -241,7 +244,7 @@
 	return dmreq;
 }
 
-static void
+void
 rm_dmreq(struct dmreq **dmreq)
 {
 	if (*dmreq == NULL)
@@ -263,6 +266,50 @@
 	free((*dmjob)->url);
 }
 
+static void
+send_job_summaries(int sock)
+{
+	struct dmsumm summs[MAX_SUMMS];
+	int i, ret;
+	struct dmmsg dmmsg;
+	struct dmjob *tmp = jobs;
+
+	/* Acquire job queue lock */
+	ret = pthread_mutex_lock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "send_job_summaries: Attempt to acquire"
+				" job queue mutex failed\n");
+		return;
+	}
+
+	for (i = 0; i < MAX_SUMMS; i++) {
+		void *temp = tmp->url->doc;
+		strncpy(summs[i].name, tmp->url->doc, sizeof(summs[i].name));
+		strncpy(summs[i].mirror, tmp->mirror->name,
+				sizeof(summs[i].name));
+
+		summs[i].state = tmp->state;
+		summs[i].size = tmp->oldstat.size;
+		summs[i].rcvd = tmp->oldstat.rcvd;
+		summs[i].eta = get_eta(&(tmp->oldstat));
+	}
+
+	ret = pthread_mutex_unlock(&job_queue_mutex);
+	if (ret == -1) {
+		fprintf(stderr, "send_job_summaries: Couldn't release "
+				"job queue lock\n");
+		return;
+	}
+	/* Job queue lock released */
+
+	dmmsg.op = DMDUMPRESP;
+	dmmsg.len = i * sizeof(struct dmsumm);
+	dmmsg.buf = (char *)summs;
+
+	send_dmmsg(sock, dmmsg);
+	return;
+}
+
 static int
 handle_request(int csock)
 {
@@ -273,6 +320,7 @@
 	int ret;
 	pid_t pid;
 
+
 	msg = recv_dmmsg(csock);
 	if (msg == NULL) {
 		report.status = -1;
@@ -311,6 +359,9 @@
 		pthread_create(&(dmjob->worker), NULL, run_worker, dmjob);
 		pthread_detach(dmjob->worker);
 		goto done;
+	case DMDUMPREQ:
+		send_job_summaries(csock);
+		goto done;
 	default:
 		free_dmmsg(&msg);
 		goto error;
@@ -326,6 +377,7 @@
 	rm_dmjob(&dmjob);
 	ret = -1;	
 done:
+
 	free_dmmsg(&msg);
 	return ret;
 }
@@ -376,7 +428,6 @@
 				"select: %s\n", strerror(errno));
 			goto wrap_up;
 		}
-
 		handle_request(csock);
 	}
 
@@ -414,6 +465,7 @@
 int main(int argc, char **argv)
 {
 	int sock, err; 
+
 	sock = socket(AF_UNIX, SOCK_STREAM, 0);
 	if (sock == -1) {
 		fprintf(stderr, "main: socket: %s\n", strerror(errno));

Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.h	Thu Sep 26 13:01:43 2013	(r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.h	Thu Sep 26 13:03:30 2013	(r257723)
@@ -18,13 +18,7 @@
 	int	 	 siginfo_en;
 	unsigned	 timeout;
 	int		 preempted;
-
-	enum {
-		RUNNING = 0,
-		DONE,
-		DUPLICATE
-	} state;
-
+	state_t		 state;
 	pthread_t	 worker;
 	struct dmreq 	*request;
 	struct url	*url;
@@ -42,7 +36,7 @@
 };
 
 struct dmmirr {
-	char		name[512];
+	char		name[256];
 	int		index;
 
 	enum {

Modified: soc2013/ambarisha/head/usr.bin/dms/mirror.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/mirror.c	Thu Sep 26 13:01:43 2013	(r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/mirror.c	Thu Sep 26 13:03:30 2013	(r257723)
@@ -86,6 +86,8 @@
 		mirror->remark = NOT_TRIED;
 	} else if (strcmp(rem, "FAILED") == 0) {
 		mirror->remark = FAILED;
+	} else if (strcmp(rem, "ACTIVE") == 0) {
+		mirror->remark = ACTIVE;
 	} else {
 		fprintf(stderr, "WARNING: Unknown mirror state in mirrors.list\n");
 	}
@@ -122,7 +124,12 @@
 	case FAILED:
 		fputs("FAILED\n", f);
 		break;
+	case ACTIVE:
+		fputs("ACTIVE\n", f);
+		break;
 	}
+	
+	fprintf(f, "%u\n", mirror->index);
 
 	for(i = 0; i < MAX_SAMPLES; i++) {
 		fprintf(f, "%ld\t%f\n", mirror->timestamps[i].tv_sec,
@@ -143,6 +150,7 @@
 	for(i = 0; i < sizeof(MIRROR_LIST) / sizeof(MIRROR_LIST[0]); i++) {
 		fwrite(MIRROR_LIST[i], strlen(MIRROR_LIST[i]), 1, f);
 		fprintf(f, "\nNOT_TRIED\n");
+		fprintf(f, "0\n");
 		for (j = 0; j < MAX_SAMPLES; j++)
 			fprintf(f, "0\t0\n");
 	}

Modified: soc2013/ambarisha/head/usr.bin/dms/utils.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/utils.c	Thu Sep 26 13:01:43 2013	(r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/utils.c	Thu Sep 26 13:03:30 2013	(r257723)
@@ -30,8 +30,10 @@
 	*(sndbuf + i) = msg.op;
 	i++;
 
-	memcpy(sndbuf + i, msg.buf, msg.len);
-	i += msg.len;
+	if (msg.len != 0) {
+		memcpy(sndbuf + i, msg.buf, msg.len);
+		i += msg.len;
+	}
 
 	int nbytes = write(socket, sndbuf, bufsize);
 	free(sndbuf);
@@ -79,6 +81,13 @@
 	}
 
 	bufsize -= sizeof(msg->op);
+	
+	/* This is to accommodate for 0 length messages */
+	if (bufsize == 0) {
+		msg->len = 0;
+		msg->buf = NULL;
+		return msg;
+	}
 
 	msg->buf = (char *) malloc(bufsize);
 	if (msg == NULL) {
@@ -118,3 +127,14 @@
 	free(*msg);
 	*msg = NULL;
 }
+
+long
+get_eta(struct xferstat *xs)
+{
+	long eta, elapsed, speed, received, expected;
+	elapsed = xs->last.tv_sec - xs->start.tv_sec;
+	received = xs->rcvd - xs->offset;
+	expected = xs->size - xs->rcvd;
+	eta = (long)((double) elapsed * expected / received);
+	return eta;
+}

Modified: soc2013/ambarisha/head/usr.bin/dms/utils.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/utils.h	Thu Sep 26 13:01:43 2013	(r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/utils.h	Thu Sep 26 13:03:30 2013	(r257723)
@@ -18,3 +18,6 @@
 
 void
 free_dmmsg(struct dmmsg **msg);
+
+long
+get_eta(struct xferstat *xs);

Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c	Thu Sep 26 13:01:43 2013	(r257722)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c	Thu Sep 26 13:03:30 2013	(r257723)
@@ -93,20 +93,11 @@
 }
 
 static long
-get_eta(struct dmjob *dmjob, struct dmmirr *dmmirr)
+mirr_eta(off_t size, struct dmmirr *dmmirr)
 {
-	long eta, elapsed, speed, received, expected;
-	if (dmmirr == dmjob->mirror) {
-		elapsed = dmjob->oldstat.last.tv_sec - dmjob->oldstat.start.tv_sec;
-		received = dmjob->oldstat.rcvd - dmjob->oldstat.offset;
-		expected = dmjob->oldstat.size - dmjob->oldstat.rcvd;
-		eta = (long)((double) elapsed * expected / received);
-	} else {
-		expected = dmjob->oldstat.size;
-		speed = get_average_speed(dmmirr);
-		eta = (long)((double) expected / speed);
-	}
-
+	long expected, speed, eta;
+	speed = get_average_speed(dmmirr);
+	eta = (long)((double) size / speed);
 	return eta;
 }
 
@@ -377,8 +368,9 @@
 	/* TODO: Not good assumes type of opaque pthread_t type
 	 *       Fix this by having a pthread_t -> id mapping
 	 */
-	unsigned int tid = (unsigned long)pthread_self();
-	char idstr[8];
+	unsigned int tid = (unsigned int)pthread_self();
+	char idstr[32];
+
 	sprintf(idstr, ".%u", tid);
 
 	char *tmpfn = (char *) malloc(strlen(fn) + strlen(idstr) + strlen(TMP_EXT) + 1);
@@ -410,11 +402,10 @@
 
 	tmp = jobs;
 	while (tmp != NULL) {
-		cureta = get_eta(tmp, tmp->mirror);
-		neweta = get_eta(tmp, dmmirr);
+		cureta = get_eta(&(tmp->oldstat));
+		neweta = mirr_eta(tmp->oldstat.size, dmmirr);
 	
 		if (neweta < cureta) {
-			/* notify the current owner worker to let go */
 			tmp->preempted = 1;
 			break;
 		}
@@ -456,6 +447,9 @@
 		tv.tv_usec = 0;
 
 		ret = select(dmjob->client + 1, &fds, NULL, NULL, &tv);
+		if (!FD_ISSET(dmjob->client, &fds))
+			break;
+
 		msg = recv_dmmsg(dmjob->client);
 		sig = (int *)msg->buf;	
 		if (*sig == SIGINT)
@@ -562,6 +556,7 @@
 	if (dmjob->preempted && dmjob->worker != tid)
 		goto preempted;
 	
+	
 	/* check that size is as expected */
 	/*if (dmreq->S_size) {
 		if (us.size == -1) {
@@ -770,6 +765,7 @@
 
 
 	stat_end(&xs, dmjob);
+	goto signal;
 
 preempted:
 	r = -1;
@@ -932,9 +928,9 @@
 	tmpreq.path = get_tmpfn(dmreq->path);
 	if (tmpreq.path == NULL)
 		goto done;
-
+	
 	tmpjob.ofd = open(tmpreq.path, O_RDWR | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR);
-
+		
 	FILE *f = fetchXGet(tmpjob.url, us, flags);
 	if (f == NULL) {
 		close(tmpjob.ofd);
@@ -946,6 +942,7 @@
 	ret = fetch(&tmpjob, f, *us);
 	if (ret == -1) {
 		f = NULL;
+		fprintf(stderr, "Failed now\n");
 		goto done;
 	}
 
@@ -984,6 +981,7 @@
 			/* Notify the client of the same */
 			return -1;
 		}
+
 		break;
 	case MD5_CHKSUM:
 		MD5_Init(&md5_ctx);
@@ -998,6 +996,7 @@
 			fprintf(stderr, "dms: checksum mismatch\n");
 			return -1;
 		}
+
 		break;
 	default:
 		break;
@@ -1045,6 +1044,7 @@
 {
 	struct dmrep report;
 	struct dmjob *tmp;
+	struct dmmirr *dmmirr;
 	struct url_stat us;
 	int ret;
 	FILE *f;
@@ -1086,7 +1086,7 @@
 	f = dmXGet(dmjob, &us);
 	if (f == NULL && dmjob->preempted && dmjob->worker != pthread_self())
 		return NULL;
-
+	
 	/* Acquire job queue lock */
 	ret = pthread_mutex_lock(&job_queue_mutex);
 	if (ret == -1) {
@@ -1095,6 +1095,7 @@
 		return NULL;
 	}
 
+
 	/* Serve any outstanding requests from the local tmp file */
 	tmp = jobs;
 	while (tmp != NULL) {
@@ -1129,18 +1130,24 @@
 	if (f != NULL) {
 		tmppath = get_tmpfn(dmjob->request->path);
 		if (tmppath != NULL) {
-			remove(tmppath);
+//			remove(tmppath);
 			free(tmppath);
 		}
 	}
 
+	dmmirr = dmjob->mirror;
+	
+	rm_dmreq(&(dmjob->request));
+	jobs = rm_job(jobs, dmjob);
 
 	/* Check if this worker can prempt any downloads */
-	dmjob = find_potential_job(dmjob->mirror);	
-	if (dmjob != NULL)
+	tmp = find_potential_job(dmmirr);	
+	if (tmp != NULL) {
+		dmjob = tmp;
 		goto start;
-	
-	release_mirror(dmjob->mirror);
+	}
+
+	release_mirror(dmmirr);
 	/* TODO : What is this? Yew!! */
 	sleep(10);
 }


More information about the svn-soc-all mailing list