svn commit: r226859 - head/sbin/hastd

Pawel Jakub Dawidek pjd at FreeBSD.org
Thu Oct 27 20:32:57 UTC 2011


Author: pjd
Date: Thu Oct 27 20:32:57 2011
New Revision: 226859
URL: http://svn.freebsd.org/changeset/base/226859

Log:
  Implement 'async' mode for HAST.
  
  MFC after:	3 days

Modified:
  head/sbin/hastd/hast.conf.5
  head/sbin/hastd/parse.y
  head/sbin/hastd/primary.c

Modified: head/sbin/hastd/hast.conf.5
==============================================================================
--- head/sbin/hastd/hast.conf.5	Thu Oct 27 20:23:03 2011	(r226858)
+++ head/sbin/hastd/hast.conf.5	Thu Oct 27 20:32:57 2011	(r226859)
@@ -28,7 +28,7 @@
 .\"
 .\" $FreeBSD$
 .\"
-.Dd May 20, 2011
+.Dd October 27, 2011
 .Dt HAST.CONF 5
 .Os
 .Sh NAME
@@ -224,9 +224,6 @@ completes.
 This is the fastest and the most dangerous replication mode.
 This mode should be used when replicating to a distant node where
 latency is too high for other modes.
-The
-.Ic async
-replication mode is currently not implemented.
 .El
 .It Ic checksum Aq algorithm
 .Pp

Modified: head/sbin/hastd/parse.y
==============================================================================
--- head/sbin/hastd/parse.y	Thu Oct 27 20:23:03 2011	(r226858)
+++ head/sbin/hastd/parse.y	Thu Oct 27 20:32:57 2011	(r226859)
@@ -301,11 +301,9 @@ yy_config_parse(const char *config, bool
 			 */
 			curres->hr_replication = depth0_replication;
 		}
-		if (curres->hr_replication == HAST_REPLICATION_MEMSYNC ||
-		    curres->hr_replication == HAST_REPLICATION_ASYNC) {
+		if (curres->hr_replication == HAST_REPLICATION_MEMSYNC) {
 			pjdlog_warning("Replication mode \"%s\" is not implemented, falling back to \"%s\".",
-			    curres->hr_replication == HAST_REPLICATION_MEMSYNC ?
-			    "memsync" : "async", "fullsync");
+			    "memsync", "fullsync");
 			curres->hr_replication = HAST_REPLICATION_FULLSYNC;
 		}
 		if (curres->hr_checksum == -1) {

Modified: head/sbin/hastd/primary.c
==============================================================================
--- head/sbin/hastd/primary.c	Thu Oct 27 20:23:03 2011	(r226858)
+++ head/sbin/hastd/primary.c	Thu Oct 27 20:32:57 2011	(r226859)
@@ -89,6 +89,15 @@ struct hio {
 	 * Structure used to communicate with GEOM Gate class.
 	 */
 	struct g_gate_ctl_io	 hio_ggio;
+	/*
+	 * Request was already confirmed to GEOM Gate.
+	 */
+	bool			 hio_done;
+	/*
+	 * Remember replication from the time the request was initiated,
+	 * so we won't get confused when replication changes on reload.
+	 */
+	int			 hio_replication;
 	TAILQ_ENTRY(hio)	*hio_next;
 };
 #define	hio_free_next	hio_next[0]
@@ -1056,6 +1065,42 @@ remote_close(struct hast_resource *res, 
 }
 
 /*
+ * Acknowledge write completion to the kernel, but don't update activemap yet.
+ */
+static void
+write_complete(struct hast_resource *res, struct hio *hio)
+{
+	struct g_gate_ctl_io *ggio;
+	unsigned int ncomp;
+
+	PJDLOG_ASSERT(!hio->hio_done);
+
+	ggio = &hio->hio_ggio;
+	PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE);
+
+	/*
+	 * Bump local count if this is first write after
+	 * connection failure with remote node.
+	 */
+	ncomp = 1;
+	rw_rlock(&hio_remote_lock[ncomp]);
+	if (!ISCONNECTED(res, ncomp)) {
+		mtx_lock(&metadata_lock);
+		if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) {
+			res->hr_primary_localcnt++;
+			pjdlog_debug(1, "Increasing localcnt to %ju.",
+			    (uintmax_t)res->hr_primary_localcnt);
+			(void)metadata_write(res);
+		}
+		mtx_unlock(&metadata_lock);
+	}
+	rw_unlock(&hio_remote_lock[ncomp]);
+	if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
+		primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
+	hio->hio_done = true;
+}
+
+/*
  * Thread receives ggate I/O requests from the kernel and passes them to
  * appropriate threads:
  * WRITE - always goes to both local_send and remote_send threads
@@ -1075,8 +1120,6 @@ ggate_recv_thread(void *arg)
 	unsigned int ii, ncomp, ncomps;
 	int error;
 
-	ncomps = HAST_NCOMPONENTS;
-
 	for (;;) {
 		pjdlog_debug(2, "ggate_recv: Taking free request.");
 		QUEUE_TAKE2(hio, free);
@@ -1085,6 +1128,8 @@ ggate_recv_thread(void *arg)
 		ggio->gctl_unit = res->hr_ggateunit;
 		ggio->gctl_length = MAXPHYS;
 		ggio->gctl_error = 0;
+		hio->hio_done = false;
+		hio->hio_replication = res->hr_replication;
 		pjdlog_debug(2,
 		    "ggate_recv: (%p) Waiting for request from the kernel.",
 		    hio);
@@ -1117,11 +1162,16 @@ ggate_recv_thread(void *arg)
 			primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
 			    strerror(error));
 		}
+
+		ncomp = 0;
+		ncomps = HAST_NCOMPONENTS;
+
 		for (ii = 0; ii < ncomps; ii++)
 			hio->hio_errors[ii] = EINVAL;
 		reqlog(LOG_DEBUG, 2, ggio,
 		    "ggate_recv: (%p) Request received from the kernel: ",
 		    hio);
+
 		/*
 		 * Inform all components about new write request.
 		 * For read request prefer local component unless the given
@@ -1130,10 +1180,7 @@ ggate_recv_thread(void *arg)
 		switch (ggio->gctl_cmd) {
 		case BIO_READ:
 			res->hr_stat_read++;
-			pjdlog_debug(2,
-			    "ggate_recv: (%p) Moving request to the send queue.",
-			    hio);
-			refcount_init(&hio->hio_countdown, 1);
+			ncomps = 1;
 			mtx_lock(&metadata_lock);
 			if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
 			    res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
@@ -1155,7 +1202,6 @@ ggate_recv_thread(void *arg)
 				ncomp = 1;
 			}
 			mtx_unlock(&metadata_lock);
-			QUEUE_INSERT1(hio, send, ncomp);
 			break;
 		case BIO_WRITE:
 			res->hr_stat_write++;
@@ -1198,25 +1244,19 @@ ggate_recv_thread(void *arg)
 				(void)hast_activemap_flush(res);
 			}
 			mtx_unlock(&res->hr_amp_lock);
-			/* FALLTHROUGH */
+			break;
 		case BIO_DELETE:
+			res->hr_stat_delete++;
+			break;
 		case BIO_FLUSH:
-			switch (ggio->gctl_cmd) {
-			case BIO_DELETE:
-				res->hr_stat_delete++;
-				break;
-			case BIO_FLUSH:
-				res->hr_stat_flush++;
-				break;
-			}
-			pjdlog_debug(2,
-			    "ggate_recv: (%p) Moving request to the send queue.",
-			    hio);
-			refcount_init(&hio->hio_countdown, ncomps);
-			for (ii = 0; ii < ncomps; ii++)
-				QUEUE_INSERT1(hio, send, ii);
+			res->hr_stat_flush++;
 			break;
 		}
+		pjdlog_debug(2,
+		    "ggate_recv: (%p) Moving request to the send queues.", hio);
+		refcount_init(&hio->hio_countdown, ncomps);
+		for (ii = ncomp; ii < ncomps; ii++)
+			QUEUE_INSERT1(hio, send, ii);
 	}
 	/* NOTREACHED */
 	return (NULL);
@@ -1285,6 +1325,11 @@ local_send_thread(void *arg)
 				    ret, (intmax_t)ggio->gctl_length);
 			} else {
 				hio->hio_errors[ncomp] = 0;
+				if (hio->hio_replication ==
+				    HAST_REPLICATION_ASYNC) {
+					ggio->gctl_error = 0;
+					write_complete(res, hio);
+				}
 			}
 			break;
 		case BIO_DELETE:
@@ -1668,7 +1713,7 @@ ggate_send_thread(void *arg)
 	struct hast_resource *res = arg;
 	struct g_gate_ctl_io *ggio;
 	struct hio *hio;
-	unsigned int ii, ncomp, ncomps;
+	unsigned int ii, ncomps;
 
 	ncomps = HAST_NCOMPONENTS;
 
@@ -1718,28 +1763,14 @@ ggate_send_thread(void *arg)
 			if (range_sync_wait)
 				cv_signal(&range_sync_cond);
 			mtx_unlock(&range_lock);
-			/*
-			 * Bump local count if this is first write after
-			 * connection failure with remote node.
-			 */
-			ncomp = 1;
-			rw_rlock(&hio_remote_lock[ncomp]);
-			if (!ISCONNECTED(res, ncomp)) {
-				mtx_lock(&metadata_lock);
-				if (res->hr_primary_localcnt ==
-				    res->hr_secondary_remotecnt) {
-					res->hr_primary_localcnt++;
-					pjdlog_debug(1,
-					    "Increasing localcnt to %ju.",
-					    (uintmax_t)res->hr_primary_localcnt);
-					(void)metadata_write(res);
-				}
-				mtx_unlock(&metadata_lock);
+			if (!hio->hio_done)
+				write_complete(res, hio);
+		} else {
+			if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) {
+				primary_exit(EX_OSERR,
+				    "G_GATE_CMD_DONE failed");
 			}
-			rw_unlock(&hio_remote_lock[ncomp]);
 		}
-		if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
-			primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
 		pjdlog_debug(2,
 		    "ggate_send: (%p) Moving request to the free queue.", hio);
 		QUEUE_INSERT2(hio, free);
@@ -1892,6 +1923,8 @@ sync_thread(void *arg __unused)
 		ggio->gctl_offset = offset;
 		ggio->gctl_length = length;
 		ggio->gctl_error = 0;
+		hio->hio_done = false;
+		hio->hio_replication = res->hr_replication;
 		for (ii = 0; ii < ncomps; ii++)
 			hio->hio_errors[ii] = EINVAL;
 		reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
@@ -2080,8 +2113,7 @@ primary_config_reload(struct hast_resour
 	 * Don't bother if we need to reconnect.
 	 */
 	if ((modified & MODIFIED_TIMEOUT) != 0 &&
-	    (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR |
-	    MODIFIED_REPLICATION)) == 0) {
+	    (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) {
 		for (ii = 0; ii < ncomps; ii++) {
 			if (!ISREMOTE(ii))
 				continue;
@@ -2103,8 +2135,7 @@ primary_config_reload(struct hast_resour
 			}
 		}
 	}
-	if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR |
-	    MODIFIED_REPLICATION)) != 0) {
+	if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) {
 		for (ii = 0; ii < ncomps; ii++) {
 			if (!ISREMOTE(ii))
 				continue;


More information about the svn-src-head mailing list