svn commit: r246922 - head/sbin/hastd

Pawel Jakub Dawidek pjd at FreeBSD.org
Sun Feb 17 21:12:36 UTC 2013


Author: pjd
Date: Sun Feb 17 21:12:34 2013
New Revision: 246922
URL: http://svnweb.freebsd.org/changeset/base/246922

Log:
  - Add support for 'memsync' mode. This is the fastest replication mode that's
    why it will now be the default.
  - Bump protocol version to 2 and add backward compatibility for version 1.
  - Allow to specify hosts by kern.hostid as well (in addition to hostname and
    kern.hostuuid) in configuration file.
  
  Sponsored by:	Panzura
  Tested by:	trociny

Added:
  head/sbin/hastd/refcnt.h   (contents, props changed)
Modified:
  head/sbin/hastd/hast.conf.5
  head/sbin/hastd/hast.h
  head/sbin/hastd/hast_proto.c
  head/sbin/hastd/hastd.c
  head/sbin/hastd/parse.y
  head/sbin/hastd/primary.c
  head/sbin/hastd/secondary.c

Modified: head/sbin/hastd/hast.conf.5
==============================================================================
--- head/sbin/hastd/hast.conf.5	Sun Feb 17 20:35:54 2013	(r246921)
+++ head/sbin/hastd/hast.conf.5	Sun Feb 17 21:12:34 2013	(r246922)
@@ -129,9 +129,13 @@ The
 .Aq node
 argument can be replaced either by a full hostname as obtained by
 .Xr gethostname 3 ,
-only first part of the hostname, or by node's UUID as found in the
+only first part of the hostname, by node's UUID as found in the
 .Va kern.hostuuid
 .Xr sysctl 8
+variable
+or by node's hostid as found in the
+.Va kern.hostid
+.Xr sysctl 8
 variable.
 .Pp
 The following statements are available:
@@ -208,15 +212,12 @@ to the application was lost.
 The risk of such a situation is very small.
 The
 .Ic memsync
-replication mode is currently not implemented.
+replication mode is the default.
 .It Ic fullsync
 .Pp
 Mark the write operation as completed when local as well as remote
 write completes.
 This is the safest and the slowest replication mode.
-The
-.Ic fullsync
-replication mode is the default.
 .It Ic async
 .Pp
 The write operation is reported as complete right after the local write

Modified: head/sbin/hastd/hast.h
==============================================================================
--- head/sbin/hastd/hast.h	Sun Feb 17 20:35:54 2013	(r246921)
+++ head/sbin/hastd/hast.h	Sun Feb 17 21:12:34 2013	(r246922)
@@ -53,8 +53,9 @@
  * Version history:
  * 0 - initial version
  * 1 - HIO_KEEPALIVE added
+ * 2 - "memsync" and "received" attributes added for memsync mode
  */
-#define	HAST_PROTO_VERSION	1
+#define	HAST_PROTO_VERSION	2
 
 #define	EHAST_OK		0
 #define	EHAST_NOENTRY		1
@@ -142,8 +143,10 @@ struct hastd_config {
 struct hast_resource {
 	/* Resource name. */
 	char	hr_name[NAME_MAX];
-	/* Replication mode (HAST_REPLICATION_*). */
+	/* Negotiated replication mode (HAST_REPLICATION_*). */
 	int	hr_replication;
+	/* Configured replication mode (HAST_REPLICATION_*). */
+	int	hr_original_replication;
 	/* Provider name that will appear in /dev/hast/. */
 	char	hr_provname[NAME_MAX];
 	/* Synchronization extent size. */
@@ -156,6 +159,8 @@ struct hast_resource {
 	int	hr_compression;
 	/* Checksum algorithm. */
 	int	hr_checksum;
+	/* Protocol version. */
+	int	hr_version;
 
 	/* Path to local component. */
 	char	hr_localpath[PATH_MAX];

Modified: head/sbin/hastd/hast_proto.c
==============================================================================
--- head/sbin/hastd/hast_proto.c	Sun Feb 17 20:35:54 2013	(r246921)
+++ head/sbin/hastd/hast_proto.c	Sun Feb 17 21:12:34 2013	(r246922)
@@ -112,7 +112,7 @@ hast_proto_send(const struct hast_resour
 	if (eb == NULL)
 		goto end;
 
-	hdr.version = HAST_PROTO_VERSION;
+	hdr.version = res != NULL ? res->hr_version : HAST_PROTO_VERSION;
 	hdr.size = htole32((uint32_t)ebuf_size(eb));
 	if (ebuf_add_head(eb, &hdr, sizeof(hdr)) == -1)
 		goto end;
@@ -144,7 +144,7 @@ hast_proto_recv_hdr(const struct proto_c
 	if (proto_recv(conn, &hdr, sizeof(hdr)) == -1)
 		goto fail;
 
-	if (hdr.version != HAST_PROTO_VERSION) {
+	if (hdr.version > HAST_PROTO_VERSION) {
 		errno = ERPCMISMATCH;
 		goto fail;
 	}

Modified: head/sbin/hastd/hastd.c
==============================================================================
--- head/sbin/hastd/hastd.c	Sun Feb 17 20:35:54 2013	(r246921)
+++ head/sbin/hastd/hastd.c	Sun Feb 17 21:12:34 2013	(r246922)
@@ -68,7 +68,7 @@ static struct hastd_config *cfg;
 bool sigexit_received = false;
 /* Path to pidfile. */
 static const char *pidfile;
-/* PID file handle. */
+/* Pidfile handle. */
 struct pidfh *pfh;
 /* Do we run in foreground? */
 static bool foreground;
@@ -748,6 +748,7 @@ listen_accept(struct hastd_listen *lst)
 	const char *resname;
 	const unsigned char *token;
 	char laddr[256], raddr[256];
+	uint8_t version;
 	size_t size;
 	pid_t pid;
 	int status;
@@ -797,6 +798,20 @@ listen_accept(struct hastd_listen *lst)
 		goto close;
 	}
 	pjdlog_debug(2, "%s: resource=%s", raddr, resname);
+	version = nv_get_uint8(nvin, "version");
+	pjdlog_debug(2, "%s: version=%hhu", raddr, version);
+	if (version == 0) {
+		/*
+		 * If no version is sent, it means this is protocol version 1.
+		 */
+		version = 1;
+	}
+	if (version > HAST_PROTO_VERSION) {
+		pjdlog_info("Remote protocol version %hhu is not supported, falling back to version %hhu.",
+		    version, (unsigned char)HAST_PROTO_VERSION);
+		version = HAST_PROTO_VERSION;
+	}
+	pjdlog_debug(1, "Negotiated protocol version %hhu.", version);
 	token = nv_get_uint8_array(nvin, &size, "token");
 	/*
 	 * NULL token means that this is first connection.
@@ -910,8 +925,10 @@ listen_accept(struct hastd_listen *lst)
 	 */
 
 	if (token == NULL) {
+		res->hr_version = version;
 		arc4random_buf(res->hr_token, sizeof(res->hr_token));
 		nvout = nv_alloc();
+		nv_add_uint8(nvout, version, "version");
 		nv_add_uint8_array(nvout, res->hr_token,
 		    sizeof(res->hr_token), "token");
 		if (nv_error(nvout) != 0) {
@@ -922,7 +939,7 @@ listen_accept(struct hastd_listen *lst)
 			    strerror(nv_error(nvout)));
 			goto fail;
 		}
-		if (hast_proto_send(NULL, conn, nvout, NULL, 0) == -1) {
+		if (hast_proto_send(res, conn, nvout, NULL, 0) == -1) {
 			int error = errno;
 
 			pjdlog_errno(LOG_ERR, "Unable to send response to %s",

Modified: head/sbin/hastd/parse.y
==============================================================================
--- head/sbin/hastd/parse.y	Sun Feb 17 20:35:54 2013	(r246921)
+++ head/sbin/hastd/parse.y	Sun Feb 17 21:12:34 2013	(r246922)
@@ -236,6 +236,7 @@ replication_statement:	REPLICATION repli
 		case 1:
 			PJDLOG_ASSERT(curres != NULL);
 			curres->hr_replication = $2;
+			curres->hr_original_replication = $2;
 			break;
 		default:
 			PJDLOG_ABORT("replication at wrong depth level");
@@ -533,8 +534,10 @@ resource_start:	STR
 		curres->hr_role = HAST_ROLE_INIT;
 		curres->hr_previous_role = HAST_ROLE_INIT;
 		curres->hr_replication = -1;
+		curres->hr_original_replication = -1;
 		curres->hr_checksum = -1;
 		curres->hr_compression = -1;
+		curres->hr_version = 1;
 		curres->hr_timeout = -1;
 		curres->hr_exec[0] = '\0';
 		curres->hr_provname[0] = '\0';
@@ -724,6 +727,7 @@ static int
 isitme(const char *name)
 {
 	char buf[MAXHOSTNAMELEN];
+	unsigned long hostid;
 	char *pos;
 	size_t bufsize;
 
@@ -738,7 +742,7 @@ isitme(const char *name)
 		return (1);
 
 	/*
-	 * Now check if it matches first part of the host name.
+	 * Check if it matches first part of the host name.
 	 */
 	pos = strchr(buf, '.');
 	if (pos != NULL && (size_t)(pos - buf) == strlen(name) &&
@@ -747,7 +751,7 @@ isitme(const char *name)
 	}
 
 	/*
-	 * At the end check if name is equal to our host's UUID.
+	 * Check if it matches host UUID.
 	 */
 	bufsize = sizeof(buf);
 	if (sysctlbyname("kern.hostuuid", buf, &bufsize, NULL, 0) < 0) {
@@ -758,6 +762,18 @@ isitme(const char *name)
 		return (1);
 
 	/*
+	 * Check if it matches hostid.
+	 */
+	bufsize = sizeof(hostid);
+	if (sysctlbyname("kern.hostid", &hostid, &bufsize, NULL, 0) < 0) {
+		pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostid) failed");
+		return (-1);
+	}
+	(void)snprintf(buf, sizeof(buf), "hostid%lu", hostid);
+	if (strcmp(buf, name) == 0)
+		return (1);
+
+	/*
 	 * Looks like this isn't about us.
 	 */
 	return (0);
@@ -769,7 +785,7 @@ family_supported(int family)
 	int sock;
 
 	sock = socket(family, SOCK_STREAM, 0);
-	if (sock == -1 && errno == EAFNOSUPPORT)
+	if (sock == -1 && errno == EPROTONOSUPPORT)
 		return (false);
 	if (sock >= 0)
 		(void)close(sock);
@@ -781,6 +797,7 @@ node_names(char **namesp)
 {
 	static char names[MAXHOSTNAMELEN * 3];
 	char buf[MAXHOSTNAMELEN];
+	unsigned long hostid;
 	char *pos;
 	size_t bufsize;
 
@@ -808,6 +825,16 @@ node_names(char **namesp)
 		return (-1);
 	}
 	(void)strlcat(names, buf, sizeof(names));
+	(void)strlcat(names, ", ", sizeof(names));
+
+	/* Host ID. */
+	bufsize = sizeof(hostid);
+	if (sysctlbyname("kern.hostid", &hostid, &bufsize, NULL, 0) < 0) {
+		pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostid) failed");
+		return (-1);
+	}
+	(void)snprintf(buf, sizeof(buf), "hostid%lu", hostid);
+	(void)strlcat(names, buf, sizeof(names));
 
 	*namesp = names;
 
@@ -833,7 +860,7 @@ yy_config_parse(const char *config, bool
 	lineno = 0;
 
 	depth0_timeout = HAST_TIMEOUT;
-	depth0_replication = HAST_REPLICATION_FULLSYNC;
+	depth0_replication = HAST_REPLICATION_MEMSYNC;
 	depth0_checksum = HAST_CHECKSUM_NONE;
 	depth0_compression = HAST_COMPRESSION_HOLE;
 	strlcpy(depth0_control, HAST_CONTROL, sizeof(depth0_control));
@@ -943,11 +970,7 @@ yy_config_parse(const char *config, bool
 			 * Use global or default setting.
 			 */
 			curres->hr_replication = depth0_replication;
-		}
-		if (curres->hr_replication == HAST_REPLICATION_MEMSYNC) {
-			pjdlog_warning("Replication mode \"%s\" is not implemented, falling back to \"%s\".",
-			    "memsync", "fullsync");
-			curres->hr_replication = HAST_REPLICATION_FULLSYNC;
+			curres->hr_original_replication = depth0_replication;
 		}
 		if (curres->hr_checksum == -1) {
 			/*

Modified: head/sbin/hastd/primary.c
==============================================================================
--- head/sbin/hastd/primary.c	Sun Feb 17 20:35:54 2013	(r246921)
+++ head/sbin/hastd/primary.c	Sun Feb 17 21:12:34 2013	(r246922)
@@ -35,7 +35,6 @@ __FBSDID("$FreeBSD$");
 #include <sys/time.h>
 #include <sys/bio.h>
 #include <sys/disk.h>
-#include <sys/refcount.h>
 #include <sys/stat.h>
 
 #include <geom/gate/g_gate.h>
@@ -65,6 +64,7 @@ __FBSDID("$FreeBSD$");
 #include "metadata.h"
 #include "proto.h"
 #include "pjdlog.h"
+#include "refcnt.h"
 #include "subr.h"
 #include "synch.h"
 
@@ -543,7 +543,7 @@ primary_connect(struct hast_resource *re
 
 	return (0);
 }
- 
+
 /*
  * Function instructs GEOM_GATE to handle reads directly from within the kernel.
  */
@@ -577,6 +577,7 @@ init_remote(struct hast_resource *res, s
 	int32_t extentsize;
 	int64_t datasize;
 	uint32_t mapsize;
+	uint8_t version;
 	size_t size;
 	int error;
 
@@ -597,6 +598,7 @@ init_remote(struct hast_resource *res, s
 	 */
 	nvout = nv_alloc();
 	nv_add_string(nvout, res->hr_name, "resource");
+	nv_add_uint8(nvout, HAST_PROTO_VERSION, "version");
 	if (nv_error(nvout) != 0) {
 		pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
 		    "Unable to allocate header for connection with %s",
@@ -626,6 +628,20 @@ init_remote(struct hast_resource *res, s
 		nv_free(nvin);
 		goto close;
 	}
+	version = nv_get_uint8(nvin, "version");
+	if (version == 0) {
+		/*
+		 * If no version is sent, it means this is protocol version 1.
+		 */
+		version = 1;
+	}
+	if (version > HAST_PROTO_VERSION) {
+		pjdlog_warning("Invalid version received (%hhu).", version);
+		nv_free(nvin);
+		goto close;
+	}
+	res->hr_version = version;
+	pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version);
 	token = nv_get_uint8_array(nvin, &size, "token");
 	if (token == NULL) {
 		pjdlog_warning("Handshake header from %s has no 'token' field.",
@@ -776,6 +792,16 @@ init_remote(struct hast_resource *res, s
 		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
 #endif
 	pjdlog_info("Connected to %s.", res->hr_remoteaddr);
+	if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC &&
+	    res->hr_version < 2) {
+		pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode.");
+		res->hr_replication = HAST_REPLICATION_FULLSYNC;
+	} else if (res->hr_replication != res->hr_original_replication) {
+		/*
+		 * This is in case hastd disconnected and was upgraded.
+		 */
+		res->hr_replication = res->hr_original_replication;
+	}
 	if (inp != NULL && outp != NULL) {
 		*inp = in;
 		*outp = out;
@@ -1009,7 +1035,8 @@ hastd_primary(struct hast_resource *res)
 }
 
 static void
-reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...)
+reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio,
+    const char *fmt, ...)
 {
 	char msg[1024];
 	va_list ap;
@@ -1020,21 +1047,18 @@ reqlog(int loglevel, int debuglevel, str
 	switch (ggio->gctl_cmd) {
 	case BIO_READ:
 		(void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).",
-		    (uintmax_t)ggio->gctl_offset,
-		    (uintmax_t)ggio->gctl_length);
+		    (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
 		break;
 	case BIO_DELETE:
 		(void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).",
-		    (uintmax_t)ggio->gctl_offset,
-		    (uintmax_t)ggio->gctl_length);
+		    (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
 		break;
 	case BIO_FLUSH:
 		(void)snprlcat(msg, sizeof(msg), "FLUSH.");
 		break;
 	case BIO_WRITE:
 		(void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).",
-		    (uintmax_t)ggio->gctl_offset,
-		    (uintmax_t)ggio->gctl_length);
+		    (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
 		break;
 	default:
 		(void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).",
@@ -1274,8 +1298,13 @@ ggate_recv_thread(void *arg)
 		}
 		pjdlog_debug(2,
 		    "ggate_recv: (%p) Moving request to the send queues.", hio);
-		refcount_init(&hio->hio_countdown, ncomps);
-		for (ii = ncomp; ii < ncomp + ncomps; ii++)
+		hio->hio_countdown = ncomps;
+		if (hio->hio_replication == HAST_REPLICATION_MEMSYNC &&
+		    ggio->gctl_cmd == BIO_WRITE) {
+			/* Each remote request needs two responses in memsync. */
+			hio->hio_countdown++;
+		}
+		for (ii = ncomp; ii < ncomps; ii++)
 			QUEUE_INSERT1(hio, send, ii);
 	}
 	/* NOTREACHED */
@@ -1346,8 +1375,7 @@ local_send_thread(void *arg)
 			} else {
 				hio->hio_errors[ncomp] = 0;
 				if (hio->hio_replication ==
-				    HAST_REPLICATION_ASYNC &&
-				    !ISSYNCREQ(hio)) {
+				    HAST_REPLICATION_ASYNC) {
 					ggio->gctl_error = 0;
 					write_complete(res, hio);
 				}
@@ -1385,8 +1413,42 @@ local_send_thread(void *arg)
 			}
 			break;
 		}
-		if (!refcount_release(&hio->hio_countdown))
-			continue;
+
+		if (hio->hio_replication != HAST_REPLICATION_MEMSYNC ||
+		    ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) {
+			if (refcnt_release(&hio->hio_countdown) > 0)
+				continue;
+		} else {
+			/*
+			 * Depending on hio_countdown value, requests finished
+			 * in the following order:
+			 * 0: remote memsync, remote final, local write
+			 * 1: remote memsync, local write, (remote final)
+			 * 2: local write, (remote memsync), (remote final)
+			 */
+			switch (refcnt_release(&hio->hio_countdown)) {
+			case 0:
+				/*
+				 * Local write finished as last.
+				 */
+				break;
+			case 1:
+				/*
+				 * Local write finished after remote memsync
+				 * reply arrvied. We can complete the write now.
+				 */
+				if (hio->hio_errors[0] == 0)
+					write_complete(res, hio);
+				continue;
+			case 2:
+				/*
+				 * Local write finished as first.
+				 */
+				continue;
+			default:
+				PJDLOG_ABORT("Invalid hio_countdown.");
+			}
+		}
 		if (ISSYNCREQ(hio)) {
 			mtx_lock(&sync_lock);
 			SYNCREQDONE(hio);
@@ -1508,6 +1570,10 @@ remote_send_thread(void *arg)
 		nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
 		nv_add_uint64(nv, offset, "offset");
 		nv_add_uint64(nv, length, "length");
+		if (hio->hio_replication == HAST_REPLICATION_MEMSYNC &&
+		    ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) {
+			nv_add_uint8(nv, 1, "memsync");
+		}
 		if (nv_error(nv) != 0) {
 			hio->hio_errors[ncomp] = nv_error(nv);
 			pjdlog_debug(2,
@@ -1568,7 +1634,7 @@ remote_send_thread(void *arg)
 done_queue:
 		nv_free(nv);
 		if (ISSYNCREQ(hio)) {
-			if (!refcount_release(&hio->hio_countdown))
+			if (refcnt_release(&hio->hio_countdown) > 0)
 				continue;
 			mtx_lock(&sync_lock);
 			SYNCREQDONE(hio);
@@ -1583,8 +1649,10 @@ done_queue:
 				(void)hast_activemap_flush(res);
 			}
 			mtx_unlock(&res->hr_amp_lock);
+			if (hio->hio_replication == HAST_REPLICATION_MEMSYNC)
+				(void)refcnt_release(&hio->hio_countdown);
 		}
-		if (!refcount_release(&hio->hio_countdown))
+		if (refcnt_release(&hio->hio_countdown) > 0)
 			continue;
 		pjdlog_debug(2,
 		    "remote_send: (%p) Moving request to the done queue.",
@@ -1608,6 +1676,7 @@ remote_recv_thread(void *arg)
 	struct nv *nv;
 	unsigned int ncomp;
 	uint64_t seq;
+	bool memsyncack;
 	int error;
 
 	/* Remote component is 1 for now. */
@@ -1623,6 +1692,8 @@ remote_recv_thread(void *arg)
 		}
 		mtx_unlock(&hio_recv_list_lock[ncomp]);
 
+		memsyncack = false;
+
 		rw_rlock(&hio_remote_lock[ncomp]);
 		if (!ISCONNECTED(res, ncomp)) {
 			rw_unlock(&hio_remote_lock[ncomp]);
@@ -1652,6 +1723,7 @@ remote_recv_thread(void *arg)
 			nv_free(nv);
 			continue;
 		}
+		memsyncack = nv_exists(nv, "received");
 		mtx_lock(&hio_recv_list_lock[ncomp]);
 		TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
 			if (hio->hio_ggio.gctl_seq == seq) {
@@ -1707,8 +1779,80 @@ remote_recv_thread(void *arg)
 		hio->hio_errors[ncomp] = 0;
 		nv_free(nv);
 done_queue:
-		if (!refcount_release(&hio->hio_countdown))
-			continue;
+		if (hio->hio_replication != HAST_REPLICATION_MEMSYNC ||
+		    hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) {
+			if (refcnt_release(&hio->hio_countdown) > 0)
+				continue;
+		} else {
+			/*
+			 * Depending on hio_countdown value, requests finished
+			 * in the following order:
+			 *
+			 * 0: local write, remote memsync, remote final
+			 * or
+			 * 0: remote memsync, local write, remote final
+			 *
+			 * 1: local write, remote memsync, (remote final)
+			 * or
+			 * 1: remote memsync, remote final, (local write)
+			 *
+			 * 2: remote memsync, (local write), (remote final)
+			 * or
+			 * 2: remote memsync, (remote final), (local write)
+			 */
+			switch (refcnt_release(&hio->hio_countdown)) {
+			case 0:
+				/*
+				 * Remote final reply arrived.
+				 */
+				PJDLOG_ASSERT(!memsyncack);
+				break;
+			case 1:
+				if (memsyncack) {
+					/*
+					 * Local request already finished, so we
+					 * can complete the write.
+					 */
+					if (hio->hio_errors[0] == 0)
+						write_complete(res, hio);
+					/*
+					 * We still need to wait for final
+					 * remote reply.
+					 */
+					pjdlog_debug(2,
+					    "remote_recv: (%p) Moving request back to the recv queue.",
+					    hio);
+					mtx_lock(&hio_recv_list_lock[ncomp]);
+					TAILQ_INSERT_TAIL(&hio_recv_list[ncomp],
+					    hio, hio_next[ncomp]);
+					mtx_unlock(&hio_recv_list_lock[ncomp]);
+				} else {
+					/*
+					 * Remote final reply arrived before
+					 * local write finished.
+					 * Nothing to do in such case.
+					 */
+				}
+				continue;
+			case 2:
+				/*
+				 * We received remote memsync reply even before
+				 * local write finished.
+				 */
+				PJDLOG_ASSERT(memsyncack);
+
+				pjdlog_debug(2,
+				    "remote_recv: (%p) Moving request back to the recv queue.",
+				    hio);
+				mtx_lock(&hio_recv_list_lock[ncomp]);
+				TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio,
+				    hio_next[ncomp]);
+				mtx_unlock(&hio_recv_list_lock[ncomp]);
+				continue;
+			default:
+				PJDLOG_ABORT("Invalid hio_countdown.");
+			}
+		}
 		if (ISSYNCREQ(hio)) {
 			mtx_lock(&sync_lock);
 			SYNCREQDONE(hio);
@@ -1977,7 +2121,7 @@ sync_thread(void *arg __unused)
 			ncomp = 1;
 		}
 		mtx_unlock(&metadata_lock);
-		refcount_init(&hio->hio_countdown, 1);
+		hio->hio_countdown = 1;
 		QUEUE_INSERT1(hio, send, ncomp);
 
 		/*
@@ -2027,7 +2171,7 @@ sync_thread(void *arg __unused)
 
 		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
 		    hio);
-		refcount_init(&hio->hio_countdown, 1);
+		hio->hio_countdown = 1;
 		QUEUE_INSERT1(hio, send, ncomp);
 
 		/*

Added: head/sbin/hastd/refcnt.h
==============================================================================
--- /dev/null	00:00:00 1970	(empty, because file is newly added)
+++ head/sbin/hastd/refcnt.h	Sun Feb 17 21:12:34 2013	(r246922)
@@ -0,0 +1,57 @@
+/*-
+ * Copyright (c) 2005 John Baldwin <jhb at FreeBSD.org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the name of the author nor the names of any co-contributors
+ *    may be used to endorse or promote products derived from this software
+ *    without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $FreeBSD$
+ */
+
+#ifndef __REFCNT_H__
+#define __REFCNT_H__
+
+#include <machine/atomic.h>
+
+#include "pjdlog.h"
+
+static __inline void
+refcnt_acquire(volatile unsigned int *count)
+{
+
+	atomic_add_acq_int(count, 1);
+}
+
+static __inline unsigned int
+refcnt_release(volatile unsigned int *count)
+{
+	unsigned int old;
+
+	/* XXX: Should this have a rel membar? */
+	old = atomic_fetchadd_int(count, -1);
+	PJDLOG_ASSERT(old > 0);
+	return (old - 1);
+}
+
+#endif	/* ! __REFCNT_H__ */

Modified: head/sbin/hastd/secondary.c
==============================================================================
--- head/sbin/hastd/secondary.c	Sun Feb 17 20:35:54 2013	(r246921)
+++ head/sbin/hastd/secondary.c	Sun Feb 17 21:12:34 2013	(r246922)
@@ -71,6 +71,7 @@ struct hio {
 	uint8_t		 hio_cmd;
 	uint64_t	 hio_offset;
 	uint64_t	 hio_length;
+	bool		 hio_memsync;
 	TAILQ_ENTRY(hio) hio_next;
 };
 
@@ -135,6 +136,22 @@ hio_clear(struct hio *hio)
 	hio->hio_cmd = HIO_UNDEF;
 	hio->hio_offset = 0;
 	hio->hio_length = 0;
+	hio->hio_memsync = false;
+}
+
+static void
+hio_copy(const struct hio *srchio, struct hio *dsthio)
+{
+
+	/*
+	 * We don't copy hio_error, hio_data and hio_next fields.
+	 */
+
+	dsthio->hio_seq = srchio->hio_seq;
+	dsthio->hio_cmd = srchio->hio_cmd;
+	dsthio->hio_offset = srchio->hio_offset;
+	dsthio->hio_length = srchio->hio_length;
+	dsthio->hio_memsync = srchio->hio_memsync;
 }
 
 static void
@@ -543,8 +560,10 @@ requnpack(struct hast_resource *res, str
 	case HIO_FLUSH:
 	case HIO_KEEPALIVE:
 		break;
-	case HIO_READ:
 	case HIO_WRITE:
+		hio->hio_memsync = nv_exists(nv, "memsync");
+		/* FALLTHROUGH */
+	case HIO_READ:
 	case HIO_DELETE:
 		hio->hio_offset = nv_get_uint64(nv, "offset");
 		if (nv_error(nv) != 0) {
@@ -621,7 +640,7 @@ static void *
 recv_thread(void *arg)
 {
 	struct hast_resource *res = arg;
-	struct hio *hio;
+	struct hio *hio, *mshio;
 	struct nv *nv;
 
 	for (;;) {
@@ -675,6 +694,27 @@ recv_thread(void *arg)
 				secondary_exit(EX_TEMPFAIL,
 				    "Unable to receive request data");
 			}
+			if (hio->hio_memsync) {
+				/*
+				 * For memsync requests we expect two replies.
+				 * Clone the hio so we can handle both of them.
+				 */
+				pjdlog_debug(2, "recv: Taking free request.");
+				QUEUE_TAKE(free, mshio);
+				pjdlog_debug(2, "recv: (%p) Got request.",
+				    mshio);
+				hio_copy(hio, mshio);
+				mshio->hio_error = 0;
+				/*
+				 * We want to keep 'memsync' tag only on the
+				 * request going onto send queue (mshio).
+				 */
+				hio->hio_memsync = false;
+				pjdlog_debug(2,
+				    "recv: (%p) Moving memsync request to the send queue.",
+				    mshio);
+				QUEUE_INSERT(send, mshio);
+			}
 		}
 		nv_free(nv);
 		pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
@@ -818,6 +858,10 @@ send_thread(void *arg)
 		nvout = nv_alloc();
 		/* Copy sequence number. */
 		nv_add_uint64(nvout, hio->hio_seq, "seq");
+		if (hio->hio_memsync) {
+			PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE);
+			nv_add_int8(nvout, 1, "received");
+		}
 		switch (hio->hio_cmd) {
 		case HIO_READ:
 			if (hio->hio_error == 0) {


More information about the svn-src-all mailing list