socsvn commit: r257720 - soc2013/ambarisha/head/usr.bin/dms
ambarisha at FreeBSD.org
ambarisha at FreeBSD.org
Thu Sep 26 12:32:13 UTC 2013
Author: ambarisha
Date: Thu Sep 26 12:32:12 2013
New Revision: 257720
URL: http://svnweb.FreeBSD.org/socsvn/?view=rev&rev=257720
Log:
Job migration first changes
Modified:
soc2013/ambarisha/head/usr.bin/dms/dms.c
soc2013/ambarisha/head/usr.bin/dms/dms.h
soc2013/ambarisha/head/usr.bin/dms/mirror.c
soc2013/ambarisha/head/usr.bin/dms/mirror.h
soc2013/ambarisha/head/usr.bin/dms/worker.c
Modified: soc2013/ambarisha/head/usr.bin/dms/dms.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.c Thu Sep 26 12:30:28 2013 (r257719)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.c Thu Sep 26 12:32:12 2013 (r257720)
@@ -144,6 +144,7 @@
dmjob->siginfo = 0;
dmjob->siginfo_en = 0;
dmjob->state = RUNNING;
+ dmjob->preempted = 0;
dmjob->url = NULL;
return dmjob;
}
Modified: soc2013/ambarisha/head/usr.bin/dms/dms.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/dms.h Thu Sep 26 12:30:28 2013 (r257719)
+++ soc2013/ambarisha/head/usr.bin/dms/dms.h Thu Sep 26 12:32:12 2013 (r257720)
@@ -7,6 +7,8 @@
#define MINBUFSIZE 4096
#define MAX_SAMPLES 256
+#include "dm.h"
+
struct dmjob {
int ofd;
int client;
@@ -15,6 +17,7 @@
int siginfo;
int siginfo_en;
unsigned timeout;
+ int preempted;
enum {
RUNNING = 0,
@@ -26,6 +29,7 @@
struct dmreq *request;
struct url *url;
struct dmmirr *mirror;
+ struct xferstat oldstat;
struct dmjob *next;
struct dmjob *prev;
Modified: soc2013/ambarisha/head/usr.bin/dms/mirror.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/mirror.c Thu Sep 26 12:30:28 2013 (r257719)
+++ soc2013/ambarisha/head/usr.bin/dms/mirror.c Thu Sep 26 12:32:12 2013 (r257720)
@@ -103,6 +103,7 @@
/* TODO: What if fscanf fails? */
}
+ mirror->nconns = 0;
return mirror;
}
@@ -168,7 +169,7 @@
/* Profile list lock */
ret = pthread_mutex_lock(&mirror_list_mutex);
if (ret == -1) {
- fprintf(stderr, "get_mirror: Attempt to acquire"
+ fprintf(stderr, "load_mirrors: Attempt to acquire"
" profile list mutex failed\n");
return -1;
}
@@ -181,7 +182,7 @@
ret = pthread_mutex_unlock(&mirror_list_mutex);
if (ret == -1) {
- fprintf(stderr, "get_mirror: Couldn't release "
+ fprintf(stderr, "load_mirrors: Couldn't release "
"profile list lock\n");
return -1;
}
@@ -202,7 +203,7 @@
/* Profile list lock */
ret = pthread_mutex_lock(&mirror_list_mutex);
if (ret == -1) {
- fprintf(stderr, "get_mirror: Attempt to acquire"
+ fprintf(stderr, "save_mirrors: Attempt to acquire"
" profile list mutex failed\n");
return -1;
}
@@ -214,7 +215,7 @@
ret = pthread_mutex_unlock(&mirror_list_mutex);
if (ret == -1) {
- fprintf(stderr, "get_mirror: Couldn't release "
+ fprintf(stderr, "save_mirrors: Couldn't release "
"profile list lock\n");
return -1;
}
@@ -229,6 +230,7 @@
{
struct timeval tv;
double speed;
+ int ret;
gettimeofday(&tv, NULL);
if (tv.tv_sec - dmmirr->timestamps[dmmirr->index].tv_sec < 60)
@@ -236,11 +238,54 @@
speed = get_speed(xs);
+ /* Profile list lock */
+ ret = pthread_mutex_lock(&mirror_list_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "update_mirror: Attempt to acquire"
+ " profile list mutex failed\n");
+ return;
+ }
+
/* TODO: This assumes that workers and sites have 1-1 correspondence */
dmmirr->index = (dmmirr->index + 1) % MAX_SAMPLES;
dmmirr->timestamps[dmmirr->index] = tv;
dmmirr->samples[dmmirr->index] = speed;
dmmirr->remark = ACTIVE;
+
+ ret = pthread_mutex_unlock(&mirror_list_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "update_mirror: Couldn't release "
+ "profile list lock\n");
+ return;
+ }
+ /* Profile list lock released */
+}
+
+double
+get_average_speed(struct dmmirr *dmmirr)
+{
+ int i, cnt;
+ double average;
+ struct timeval now;
+ long week_sec;
+
+ week_sec = 7 * 24 * 60 * 60;
+
+ i = dmmirr->index;
+ cnt = 0;
+ average = 0.0;
+
+ do {
+ gettimeofday(&now, NULL);
+ if (dmmirr->timestamps[i].tv_sec < now.tv_sec - week_sec)
+ break;
+ average = (average * cnt + dmmirr->samples[i]) / (cnt + 1);
+ cnt++;
+
+ i = (i - 1) % MAX_SAMPLES;
+ } while (i != dmmirr->index);
+
+ return average;
}
struct dmmirr *
@@ -249,12 +294,8 @@
struct dmmirr *cur, *tmp;
double tmpmax = -1.0;
int cnt, ret, i;
- struct timeval now;
- long week_sec;
double average;
- week_sec = 7 * 24 * 60 * 60;
-
/* Profile list lock */
ret = pthread_mutex_lock(&mirror_list_mutex);
if (ret == -1) {
@@ -277,19 +318,7 @@
if (cur->nconns > MAX_CONNS)
goto next;
-
- i = cur->index;
- cnt = 0;
- average = 0.0;
- do {
- gettimeofday(&now, NULL);
- if (cur->timestamps[i].tv_sec < now.tv_sec - week_sec)
- break;
- average = (average * cnt + cur->samples[i]) / (cnt + 1);
- cnt++;
-
- i = (i - 1) % MAX_SAMPLES;
- } while (i != cur->index);
+ average = get_average_speed(cur);
if (average > tmpmax) {
tmpmax = average;
@@ -298,10 +327,10 @@
next:
cur = cur->next;
}
-
/* TODO: If we couldn't pick up a mirror? */
success:
+ tmp->nconns++;
ret = pthread_mutex_unlock(&mirror_list_mutex);
if (ret == -1) {
fprintf(stderr, "get_mirror: Couldn't release "
@@ -309,6 +338,32 @@
return NULL;
}
/* Profile list lock released */
-
+
return tmp;
}
+
+int
+release_mirror(struct dmmirr *dmmirr)
+{
+ int ret;
+
+ /* Profile list lock */
+ ret = pthread_mutex_lock(&mirror_list_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "update_mirror: Attempt to acquire"
+ " profile list mutex failed\n");
+ return -1;
+ }
+
+ dmmirr->nconns--;
+
+ ret = pthread_mutex_unlock(&mirror_list_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "update_mirror: Couldn't release "
+ "profile list lock\n");
+ return -1;
+ }
+ /* Profile list lock released */
+
+ return 0;
+}
Modified: soc2013/ambarisha/head/usr.bin/dms/mirror.h
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/mirror.h Thu Sep 26 12:30:28 2013 (r257719)
+++ soc2013/ambarisha/head/usr.bin/dms/mirror.h Thu Sep 26 12:32:12 2013 (r257720)
@@ -8,5 +8,7 @@
int save_mirrors(void);
void update_mirror(struct dmmirr *, struct xferstat *);
struct dmmirr *get_mirror(void);
+int release_mirror(struct dmmirr *);
+double get_average_speed(struct dmmirr *);
#endif
Modified: soc2013/ambarisha/head/usr.bin/dms/worker.c
==============================================================================
--- soc2013/ambarisha/head/usr.bin/dms/worker.c Thu Sep 26 12:30:28 2013 (r257719)
+++ soc2013/ambarisha/head/usr.bin/dms/worker.c Thu Sep 26 12:32:12 2013 (r257720)
@@ -92,6 +92,24 @@
return strcmp(j1->request->URL, j2->request->URL);
}
+static long
+get_eta(struct dmjob *dmjob, struct dmmirr *dmmirr)
+{
+ long eta, elapsed, speed, received, expected;
+ if (dmmirr == dmjob->mirror) {
+ elapsed = dmjob->oldstat.last.tv_sec - dmjob->oldstat.start.tv_sec;
+ received = dmjob->oldstat.rcvd - dmjob->oldstat.offset;
+ expected = dmjob->oldstat.size - dmjob->oldstat.rcvd;
+ eta = (long)((double) elapsed * expected / received);
+ } else {
+ expected = dmjob->oldstat.size;
+ speed = get_average_speed(dmmirr);
+ eta = (long)((double) expected / speed);
+ }
+
+ return eta;
+}
+
static void
stat_send(struct xferstat *xs, int force)
{
@@ -177,6 +195,15 @@
stat_start(struct xferstat *xs, const char *name, off_t size,
off_t offset, struct dmjob *dmjob)
{
+ /* If there is no absolute progress because of a premption,
+ * do nothing. Otherwise update status incase there's a
+ * preemption later
+ */
+
+ if (dmjob->preempted != 0 &&
+ dmjob->oldstat.rcvd > xs->rcvd)
+ return;
+
snprintf(xs->name, sizeof xs->name, "%s", name);
gettimeofday(&xs->start, NULL);
xs->last.tv_sec = xs->last.tv_usec = 0;
@@ -185,7 +212,8 @@
xs->rcvd = offset;
xs->lastrcvd = offset;
- update_mirror(dmjob->mirror , xs);
+ update_mirror(dmjob->mirror, xs);
+ dmjob->oldstat = *xs;
if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0)
stat_send(xs, 1);
@@ -196,6 +224,16 @@
static void
stat_end(struct xferstat *xs, struct dmjob *dmjob)
{
+ /* If there is no absolute progress because of a premption,
+ * do nothing. Otherwise update status incase there's a
+ * preemption later
+ */
+
+ if (dmjob->preempted != 0 &&
+ dmjob->oldstat.rcvd > xs->rcvd)
+ return;
+ dmjob->oldstat = *xs;
+
gettimeofday(&xs->last, NULL);
update_mirror(dmjob->mirror , xs);
if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0) {
@@ -210,7 +248,18 @@
static void
stat_update(struct xferstat *xs, off_t rcvd, struct dmjob *dmjob)
{
+ /* If there is no absolute progress because of a premption,
+ * do nothing. Otherwise update status incase there's a
+ * preemption later
+ */
+
+ if (dmjob->preempted != 0 &&
+ dmjob->oldstat.rcvd > xs->rcvd)
+ return;
+
xs->rcvd = rcvd;
+ dmjob->oldstat = *xs;
+
update_mirror(dmjob->mirror , xs);
if ((dmjob->request->flags & V_TTY) && dmjob->request->v_level > 0)
stat_send(xs, 0);
@@ -231,6 +280,9 @@
struct dmreq *dmreq = dmjob->request;
struct stat sb;
int r;
+
+ /* Init flags */
+ *flags = '\0';
if (dmjob->url != NULL)
return 0;
@@ -319,6 +371,46 @@
return (r);
}
+static struct dmjob *
+find_potential_job(struct dmmirr *dmmirr)
+{
+ int ret;
+ long cureta, neweta;
+ struct dmjob *tmp;
+
+ /* Acquire job queue lock */
+ ret = pthread_mutex_lock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Attempt to acquire"
+ " job queue mutex failed\n");
+ return NULL;
+ }
+
+ tmp = jobs;
+ while (tmp != NULL) {
+ cureta = get_eta(tmp, tmp->mirror);
+ neweta = get_eta(tmp, dmmirr);
+
+ if (neweta < cureta) {
+ /* notify the current owner worker to let go */
+ tmp->preempted = 1;
+ break;
+ }
+
+ tmp = tmp->next;
+ }
+
+ ret = pthread_mutex_unlock(&job_queue_mutex);
+ if (ret == -1) {
+ fprintf(stderr, "handle_request: Couldn't release "
+ "job queue lock\n");
+ return NULL;
+ }
+ /* Job queue lock released */
+
+ return tmp;
+}
+
static int
check_signal(int signum, struct dmjob *dmjob)
{
@@ -926,6 +1018,7 @@
return NULL;
}
+ /* check if this is a duplicate */
tmp = jobs;
while (tmp != NULL) {
if (tmp != dmjob && compare_jobs(tmp, dmjob) == 0) {
@@ -940,12 +1033,11 @@
if (ret == -1) {
fprintf(stderr, "handle_request: Couldn't release "
"job queue lock\n");
-
return NULL;
}
/* Job queue lock released */
- /* check if this is a duplicate */
+start:
ret = mk_url(dmjob, flags);
dmjob->worker = pthread_self();
@@ -968,11 +1060,10 @@
continue;
}
- if (f == NULL) {
+ if (f == NULL)
ret = -1;
- } else {
+ else
ret = validate_and_copy(dmjob, f, us);
- }
report.status = ret;
report.errcode = fetchLastErrCode;
@@ -993,14 +1084,22 @@
/* remove the local tmp file */
if (f != NULL) {
- tmppath = (char *) malloc(strlen(dmjob->request->path) + strlen(TMP_EXT));
+ tmppath = (char *) malloc(strlen(dmjob->request->path) +
+ strlen(TMP_EXT));
strcpy(tmppath, dmjob->request->path);
strcat(tmppath, TMP_EXT);
remove(tmppath);
free(tmppath);
}
+
+
+ /* Check if this worker can prempt any downloads */
+ dmjob = find_potential_job(dmjob->mirror);
+ if (dmjob != NULL)
+ goto start;
+ release_mirror(dmjob->mirror);
/* TODO : What is this? Yew!! */
sleep(10);
}
More information about the svn-soc-all
mailing list