Pipe direct write and pipeselwakeup()

Jean-Sébastien Pédron dumbbell at freebsd.org
Fri Sep 7 06:14:33 PDT 2007


-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hello,

I'm investigating a problem with select/poll/kevent not triggered when
writing to a pipe. Here I explain what I understood and, at the end of
this mail, I propose a patch. I would like to have feedback about this
solution.

The problem comes from the way pipes are implemented. The kernel uses
two ways to write data on a pipe:

    o  buffered write. This is done when there is less than 8192 bytes
       (PIPE_MINDIRECT) in the _current_ iov. Data from _all_ iov are
       uiomove()'d to an internal buffer until there's no more data or
       the buffer is full.

    o  direct write. This is done when there is at least 8192 bytes in
       the current iov.

Both techniques can't be mixed. So during a single call to writev(2), if
there's a need to switch from one to the other, the kernel must wake
reader processes and select/poll/kqueue up before the write continues.
But when switching from direct write to buffered write, the kernel only
wakes reader processes up, not select/poll/kqueue.

Someone provided me with a testcase to reproduce the bug. I attached the
sources to this mail ("rd.c" and "wr.c"). Use it like this:
    ./rd ./wr

Here's is what's going on with this testcase:
    1.  the first iov is smaller than 8192 bytes (1 or 2 bytes), so
        buffered write is selected.
    2.  the kernel internal buffer is 65536 bytes long, so uiomove()
        will fill it completly with the data (73727 or 73728 bytes).
        At the end, 8191 or 8192 bytes remain, depending on
        TRIGGER_WRITEV_BUG in "wr.c".
    3a. with 8191 bytes remaining, buffered write is still selected but
        the buffer is full: readers and selects are awaken. Everything's
        fine.
    3b. with 8192 bytes remaining, direct write is selected. It sees
        that the internal buffer is in use: readers are awaken (so the
        buffer can be flushed) but not selects. Here, the
        select/poll/kevent times out.

There are 3 cases where only readers are awaken. The attached patch add
calls to pipeselwakeup(). This fixes the testcase but I'd like to know
if there was a good reason to not call pipeselwakeup() in this 3
specific cases?

Also, in the third case, the PIPE_WANTW flag isn't set either. I think
it should be set too. What do you think?

Thanks for any feedback!

- --
Jean-Sébastien Pédron
http://www.dumbbell.fr/

PGP Key: http://www.dumbbell.fr/pgp/pubkey.asc
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.7 (FreeBSD)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iD8DBQFG4UwZa+xGJsFYOlMRAqKBAJwLx+9WoQmPs4pa8VEPOzT2b5r3VQCfarLY
giS8UUEYvNuUQGBqtJ4jhJU=
=Rato
-----END PGP SIGNATURE-----
-------------- next part --------------

/* "gcc -Wall -o rd rd.c" */
/* "gcc -D_REENTRANT -Wall -o rd rd.c -lc_r" */
#include <fcntl.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <sys/uio.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
#include <sys/select.h>

#if 1
#define USE_KQ
#elif 1
#define USE_POLL
#endif

#define EV_TIMEOUT 10

int
main(int argc, char *argv[])
{
    int fds[2];
    int res;

    if (argc != 2) {
	fprintf(stderr, "Usage: rd <path to writer>\n");
	exit(1);
    }

    if (pipe(fds) < 0) {
	perror("pipe() failed");
	exit(1);
    }

    fprintf(stderr, "pipe fds={%d, %d}\n", fds[0], fds[1]);

    fprintf(stderr, "setting %d in non-blocking mode\n", fds[0]);
    fcntl(fds[0], F_SETFL, fcntl(fds[0], F_GETFL, 0) | O_NONBLOCK);

    res = fork();
    if (res < 0) {
	perror("fork() failed\n");
	exit(1);
    }
    else if (res == 0) {
	close(1);
	dup(fds[1]);
	close(fds[0]);
	close(fds[1]);
	execl(argv[1], argv[1], NULL);
	perror("execl() failed\n");
	exit(1);
    }

    close(fds[1]);

#ifdef USE_KQ
    {
	int kq;
	int i;
	struct kevent ev[10];
	struct timespec tv = {0, 0};
	struct kevent chg[1];

	kq = kqueue();
	if (kq < 0) {
	    perror("kqueue() failed");
	    exit(1);
	}
	
	fprintf(stderr, "setting EVFILT_READ on %d\n", fds[0]);
	EV_SET(&chg[0], fds[0], EVFILT_READ, EV_ADD, 0, 0, (void *) 2);
	res = kevent(kq, &chg[0], 1, NULL, 0, &tv);
	if (res < 0) {
	    perror("kevent() failed\n");
	    exit(1);
	}
	else {
	    fprintf(stderr, "kevent() returned = %d\n", res);
	}
	tv.tv_sec = EV_TIMEOUT;
	tv.tv_nsec = 0;
	fprintf(stderr, "kevent waiting for %d secs for events...\n",
		EV_TIMEOUT);
	res = kevent(kq, NULL, 0, &ev[0], 10, &tv);
	if (res < 0) {
	    perror("kevent failed\n");
	    exit(1);
	}
	else if (res == 0) {
	    fprintf(stderr, "kevent timed out\n");
	    exit(1);
	}
	fprintf(stderr, "kevent returned = %d\n", res);

	for (i = 0; i < 10 && i < res; i++) {
	    fprintf(stderr, "result event %d: fd=%d: ", i, (int)ev[i].ident);
	    if (ev[i].flags & EV_ERROR) {
		fprintf(stderr, "EV_ERROR: %s ", strerror(ev[i].data));
	    }
	    else {
		if (ev[i].filter == EVFILT_READ)
		    fprintf(stderr, "EVFILT_READ ");
		if (ev[i].filter == EVFILT_WRITE)
		    fprintf(stderr, "EVFILT_WRITE ");
	    }
	    fprintf(stderr, "\n");
	}
    }

#elif defined(USE_POLL)
    {
	struct pollfd pfds[1];
	
	pfds[0].fd = fds[0];
	pfds[0].events = (POLLIN|POLLRDNORM);
	pfds[0].revents = 0;

	fprintf(stderr, "pfds[0].fd = %d pfds[0].events = POLLIN|POLLRDNORM\n",
		pfds[0].fd);
	    

	fprintf(stderr, "poll waiting for %d secs for events...\n",
		EV_TIMEOUT);
	res = poll(&pfds[0], 1, EV_TIMEOUT*1000);
	if (res < 0) {
	    perror("poll failed\n");
	    exit(1);
	}
	else if (res == 0) {
	    fprintf(stderr, "poll timed out\n");
	    exit(1);
	}
	fprintf(stderr, "poll returned = %d\n", res);
	fprintf(stderr, "fd=%d ", pfds[0].fd);
	if (pfds[0].revents & POLLIN)
	    fprintf(stderr, "POLLIN ");
	if (pfds[0].revents & POLLRDNORM)
	    fprintf(stderr, "POLLRDNORM ");
	if (pfds[0].revents & POLLOUT)
	    fprintf(stderr, "POLLOUT ");
	fprintf(stderr, "\n");
    }
#else /* use select */
    {
	fd_set readfds;
	struct timeval stv = {EV_TIMEOUT, 0};
	FD_ZERO(&readfds);
	FD_SET(fds[0], &readfds);

	fprintf(stderr, "selecting fd = %d\n", fds[0]);
	    

	fprintf(stderr, "select waiting for %d secs for events...\n",
		EV_TIMEOUT);
	res = select(fds[0]+1, &readfds, NULL, NULL, &stv);
	if (res < 0) {
	    perror("select failed\n");
	    exit(1);
	}
	else if (res == 0) {
	    fprintf(stderr, "select timed out\n");
	    exit(1);
	}
	fprintf(stderr, "select returned = %d\n", res);
	fprintf(stderr, "fd=%d ", fds[0]);
	if (FD_ISSET(fds[0], &readfds))
	    fprintf(stderr, "fd is set ");
	fprintf(stderr, "\n");
    }
#endif
    close(fds[0]);
    return 0;
}





-------------- next part --------------

/* "gcc -Wall -o wr wr.c" buggy */
/* "gcc -D_REENTRANT -Wall -o wr wr.c -lc_r" not buggy! */


#define TRIGGER_WRITEV_BUG 1


#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include <stdio.h>

#ifndef PAGE_SIZE
#define PAGE_SIZE 4096
#endif

/* The following "pipe defines" cut from sys/pipe.h */

#ifndef PIPE_SIZE
#define PIPE_SIZE       16384
#endif

#ifndef BIG_PIPE_SIZE
#define BIG_PIPE_SIZE   (64*1024)
#endif

#ifndef SMALL_PIPE_SIZE
#define SMALL_PIPE_SIZE PAGE_SIZE
#endif

#ifndef PIPE_MINDIRECT
#define PIPE_MINDIRECT  8192
#endif

#ifndef PIPENPAGES
#define PIPENPAGES      (BIG_PIPE_SIZE / PAGE_SIZE + 1)
#endif

#if TRIGGER_WRITEV_BUG
#define BUF0_SZ 2
#else
#define BUF0_SZ 1
#endif
#define BUF1_SZ PAGE_SIZE
#define BUF2_SZ (PIPENPAGES * PAGE_SIZE - 2)

static char buf[BUF0_SZ + BUF1_SZ + BUF2_SZ];


int main(void)
{
    int wr;
    struct iovec iov[3];

    sleep(1);

    fprintf(stderr, "PIPENPAGES=%d\nPAGE_SIZE=%d\n",
	    PIPENPAGES, PAGE_SIZE);
    fprintf(stderr, "BUF0_SZ=%d, BUF1_SZ=%d, BUF2_SZ=%d\n",
	    BUF0_SZ, BUF1_SZ, BUF2_SZ);

    iov[0].iov_base =
#if BUF0_SZ == 0
	NULL
#else
	&buf[0]
#endif
	;
    iov[0].iov_len = BUF0_SZ;


    iov[1].iov_base =
#if BUF1_SZ == 0
	NULL
#else
	&buf[BUF0_SZ]
#endif
	;
    iov[1].iov_len = BUF1_SZ;


    iov[2].iov_base =
#if BUF2_SZ == 0
	NULL
#else
	&buf[BUF0_SZ+BUF1_SZ]
#endif
	;
    iov[2].iov_len = BUF2_SZ;
		
    wr = writev(1, &iov[0], 3);
    fprintf(stderr, "write returned: %d\n", wr);
    return 0;
}




-------------- next part --------------
Index: sys/kern/sys_pipe.c
===================================================================
RCS file: /home/dumbbell/projects/freebsd/cvs-mirror/src/sys/kern/sys_pipe.c,v
retrieving revision 1.191
diff -u -r1.191 sys_pipe.c
--- sys/kern/sys_pipe.c	27 May 2007 17:33:10 -0000	1.191
+++ sys/kern/sys_pipe.c	6 Sep 2007 13:28:21 -0000
@@ -881,6 +881,7 @@
 			wakeup(wpipe);
 		}
 		wpipe->pipe_state |= PIPE_WANTW;
+		pipeselwakeup(wpipe);
 		pipeunlock(wpipe);
 		error = msleep(wpipe, PIPE_MTX(wpipe),
 		    PRIBIO | PCATCH, "pipdww", 0);
@@ -896,6 +897,7 @@
 			wakeup(wpipe);
 		}
 		wpipe->pipe_state |= PIPE_WANTW;
+		pipeselwakeup(wpipe);
 		pipeunlock(wpipe);
 		error = msleep(wpipe, PIPE_MTX(wpipe),
 		    PRIBIO | PCATCH, "pipdwc", 0);
@@ -1080,6 +1082,7 @@
 				wpipe->pipe_state &= ~PIPE_WANTR;
 				wakeup(wpipe);
 			}
+			pipeselwakeup(wpipe);
 			pipeunlock(wpipe);
 			error = msleep(wpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH,
 			    "pipbww", 0);


More information about the freebsd-arch mailing list