Asynchronous pipe I/O

Nate Eldredge neldredge at math.ucsd.edu
Wed Nov 5 09:10:27 PST 2008


On Wed, 5 Nov 2008, rihad wrote:

> Imagine this shell pipeline:
>
> sh prog1 | sh prog2
>
>
> As given above, prog1 blocks if prog2 hasn't yet read previously written
> data (actually, newline separated commands) or is busy. What I want is
> for prog1 to never block:
>
> sh prog1 | buffer | sh prog2

[and misc/buffer is unsuitable]

I found an old piece of code laying around that I wrote for this purpose. 
Looking at it, I can see a number of inefficiencies, but it might do in a 
pinch.  You're welcome to use it; I hereby release it to the public 
domain.

Another hack that you could use, if you don't mind storing the buffer on 
disk rather than memory, is

sh prog1 > tmpfile &
tail -f -c +0 tmpfile | sh prog2

Here's my program.

/* Buffering filter. */

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>

/* Size of a single buffer. */

#define BUFSIZE 512

struct buffer {
   struct buffer *next;
   size_t length;
   unsigned char buf[BUFSIZE];
};

struct buffer *reader;
struct buffer *writer;


int max_mem = 100 * 1024;

int current_mem;

#define OK 0
#define WAIT 1
#define GIVEUP 2

int read_one (int fd)
{
   int result;

   if (current_mem > (max_mem - sizeof(*reader->next)))
     {
       fprintf(stderr, "Reached max_mem!\n");
       return WAIT;
     }
   /* Get a new buffer. */
   reader->next = malloc(sizeof(*reader->next));
   if (reader->next)
     {
       current_mem += sizeof(*reader->next);
       fprintf(stderr, "\rReading: \t%u bytes in buffer                 ",
 	      current_mem);
     }
   else
     {
       fprintf(stderr, "Virtual memory exhausted\n");
       return WAIT;
     }

   reader = reader->next;
   reader->next = NULL;

   result = read(fd, reader->buf, BUFSIZE);
   if (result > 0)
     reader->length = result;
   else if (result == 0)
     {
       fprintf(stderr, "Hit EOF on reader\n");
       return GIVEUP;
     }
   else if (result < 0)
     {
       fprintf(stderr, "Error on reader: %s\n", strerror(errno));
       return GIVEUP;
     }
   return OK;
}



int write_one (int fd)
{
   struct buffer *newwriter;

   if (reader == writer)
     return WAIT; /* the reader owns the last buffer */

   if (writer->length > 0)
     {
       int result;
       result = write(fd, writer->buf, writer->length);
       if (result == 0)
 	{
 	  fprintf(stderr, "Hit EOF on writer\n");
 	  return GIVEUP;
 	}
       else if (result < 0)
 	{
 	  fprintf(stderr, "Error on writer: %s\n", strerror(errno));
 	  return GIVEUP;
 	}
     }
   newwriter = writer->next;
   free(writer);
   current_mem -= sizeof(*writer);
   fprintf(stderr, "\rWriting: \t%u bytes in buffer                 ",
 	  current_mem);
   writer = newwriter;
   return OK;
}

void move_data(int in_fd, int out_fd)
{
   int reader_state = OK;
   int writer_state = OK;

   int maxfd = ((in_fd > out_fd) ? in_fd : out_fd) + 1;

   reader = malloc(sizeof(*reader));
   if (!reader)
     {
       fprintf(stderr, "No memory at all!\n");
       return;
     }
   reader->next = NULL;
   reader->length = 0;
   writer = reader;
   current_mem = sizeof(*reader);

   while (1) /* break when done */
     {
       int result;
       fd_set read_set, write_set;
       FD_ZERO(&read_set);
       FD_ZERO(&write_set);
       if (reader_state == OK)
 	FD_SET(in_fd, &read_set);
       if (writer_state == OK)
 	FD_SET(out_fd, &write_set);
       result = select(maxfd, &read_set, &write_set, NULL, NULL);

       /* If we're ready to do something, do it.  Also let the other
          end get a chance if something changed. */

       if (FD_ISSET(in_fd, &read_set))
 	{
 	  reader_state = read_one(in_fd);
 	  if (writer_state == WAIT)
 	    writer_state = OK;
 	}

       if (FD_ISSET(out_fd, &write_set))
 	{
 	  writer_state = write_one(out_fd);
 	  if (reader_state == WAIT)
 	    reader_state = OK;
 	}

       /* Check for termination */
       if (writer_state == GIVEUP)
 	break; /* can't write any more */
       if (reader_state == GIVEUP && writer_state == WAIT)
 	break; /* can't read any more, and wrote all we have */
     }
}

int main(void)
{
   move_data(0, 1);
   return 0;
}

-- 

Nate Eldredge
neldredge at math.ucsd.edu


More information about the freebsd-hackers mailing list