/* compressloop-distributed, creates a cloop version 2 compressed image Copyright (C) 2004 James Cameron This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA Derived from compressloop.c (c) 1999 by Paul `Rusty' Russell, GPL, with contributions in 2000 and 2001 by Klaus Knopper, and in 2002 by Valentijn Sessink. DESIGN master { start slaves read input file and feed uncompressed blocks to slaves accept compressed blocks from slaves merge back into order and write to output file } slave { listen for connections while(true) { accept connection while(connected) { read blocks, compress, write blocks } } } TODO - profiling, to ensure we really are spending all of the time waiting in the event loop where we should (otherwise parallism must suffer). - in test network, one slave is kept 100% busy but another isn't, why? - a design fault, without increasing TCP socket buffers from the default, processing halts because of an inter-slave channel deadlock; the master is waiting for a write() to complete to the slave's connection, yet the slave is waiting for the master to open the window by reading what it has already sent; this might be fixed by pacing. - gracefully degrade if a slave is unavailable or fails during run. - cleanup stderr output and make verbosity consistent. - accept compression level and pass to slaves (protocol change). - add protocol version number for backward compatibility. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include /*--------------------------*/ #ifndef _COMPRESSED_LOOP_H #define _COMPRESSED_LOOP_H #define CLOOP_HEADROOM 128 struct cloop_head { char preamble[CLOOP_HEADROOM]; u_int32_t block_size; u_int32_t num_blocks; }; #endif /*_COMPRESSED_LOOP_H*/ /*--------------------------*/ #define MAX_KMALLOC_SIZE 2L<<17 #define CLOOP_PREAMBLE "#!/bin/sh\n" "#V2.0 Format\n" "insmod cloop.o file=$0 && mount -r -t iso9660 /dev/cloop $1\n" "exit $?\n" /* defaults */ #define IMAGESIZE 2000000000 #define BLOCKSIZE 65536 #define PORT 8522 #define VERSION 20040717 /* per slave structure */ struct slave { char *host; /* host name */ int port; /* port number */ int sock; /* file descriptor of network socket */ GIOChannel *channel; /* glib I/O channel number of socket */ guint writable, readable; /* glib watch numbers for events */ int blocks; }; /* a data block structure */ struct block { unsigned long number; /* block number in input file */ int size; /* size of the data in bytes */ char *data; /* malloc'd pointer to data */ /* uncompressed or compressed, depends on context */ }; GMainLoop *loop; /* main processing event loop */ int ended; /* input end of file was seen */ int head; /* current head of queue (input block number) */ int tail; /* current tail of queue (output block number) */ int in; /* file descriptor of input */ int out; /* file descriptor of output */ unsigned long blocksize=BLOCKSIZE; /* bytes per block */ unsigned int indexsize; /* size of index in bytes */ unsigned long guessedblocks; /* guessed count of blocks */ unsigned long countedblocks=0; /* count of blocks written */ loff_t *pointerblock; /* in-memory block index */ int pointercurrent; /* current index pointer number */ unsigned long position; /* offset to current block */ unsigned long imagesize=IMAGESIZE; /* input image size */ unsigned int forceimagesize=0; unsigned int verbose=0; unsigned int determinedimagesize=0; GHashTable *queue; /* block merge queue, hashed by block number */ /* free a block structure */ static void block_free(struct block *block) { if (block->data != NULL) g_free(block->data); g_free(block); } /* open the input file and calculate sizes */ static void input_open(char *name) { struct stat filestat; in = strcmp(name, "-") == 0 ? dup(fileno(stdin)) : open(name, O_RDONLY); if (in < 0) { perror("input_open: open"); exit(1); } if ((fstat(in, &filestat) < 0)||(filestat.st_size == 0)||(forceimagesize)) { guessedblocks = imagesize / blocksize; } else { imagesize = filestat.st_size; determinedimagesize = 1; if (verbose) fprintf(stderr, "Input file size determined: %lli\n", (long long) filestat.st_size); } guessedblocks = (imagesize / blocksize) + 1; indexsize = sizeof(loff_t) * (guessedblocks + 1); if ((pointerblock = malloc((size_t)indexsize)) == NULL) { fprintf(stderr, "Sorry, could not allocate pointerblock\n"); exit(1); } } /* read an input block, return NULL on EOF */ static struct block *input_get() { int total = 0; struct block *block = g_new(struct block, 1); g_assert (block != NULL); block->data = g_new0(char, blocksize); g_assert (block->data != NULL); while (total < blocksize) { ssize_t bytes = read(in, block->data + total, blocksize - total); if (bytes < 0) { perror("read"); exit(1); } if (bytes == 0) { if (total == 0) { block_free(block); return NULL; } fprintf(stderr, " input_get: partial, %d bytes of %lu, padding up\n", total, blocksize); break; } total += bytes; } block->size = blocksize; block->number = head++; if (verbose > 3) fprintf(stderr, " input_get: block %lu size %d\n", block->number, block->size); return block; } /* initialise the merge queue */ static void queue_init() { queue = g_hash_table_new(g_int_hash, g_int_equal); } /* put a block into the merge queue */ static void queue_put(struct block *block) { if (verbose > 3) fprintf(stderr, " queue_put: block %lu size %d\n", block->number, block->size); g_hash_table_insert(queue, &block->number, block); } /* get a block from the merge queue, and remove it */ static struct block *queue_get(int number) { struct block *block; block = g_hash_table_lookup(queue, &number); if (block == NULL) { if (verbose > 3) fprintf(stderr, " queue_get: block %d not here\n", number); return NULL; } g_hash_table_remove(queue, &number); if (verbose > 3) fprintf(stderr, " queue_get: block %lu size %d\n", block->number, block->size); return block; } /* start the output file */ static void output_open(char *name) { /* create file */ out = open(name, O_WRONLY|O_TRUNC|O_CREAT,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); if (out < 0) { perror("output_open: open"); exit(1); } /* set position of first block */ position = sizeof(struct cloop_head) + indexsize; /* seek beyond index */ if (lseek(out, (off_t) position, SEEK_CUR) < 0) { perror("output_open: lseek"); exit(1); } pointercurrent = 0; } /* put a block into the output (caller to be strict in ordering) */ static void output_put(struct block *block) { /* fill in the index */ pointerblock[pointercurrent++] = __cpu_to_be64(position); position += block->size; /* write to the file */ if (write(out, block->data, block->size) != block->size) { perror("output_put: write"); exit(1); } if (verbose > 3) fprintf(stderr, " output_put: block %lu size %d offset %lu\n", block->number, block->size, position - block->size); else if (verbose > 1) fprintf(stderr, "."); countedblocks++; if (countedblocks > guessedblocks) { fprintf(stderr, "You guessed the number of blocks wrong. Failing.\n"); exit(1); } } /* finish the output file */ static void output_close() { struct cloop_head header; /* make note of last block index */ pointerblock[pointercurrent]=__cpu_to_be64(position); if (verbose) { fprintf(stderr, "output_close: counted blocks: %lli (guessed %lli)\n", (long long) countedblocks, (long long) guessedblocks); } /* seek to the header position */ if (lseek(out, (off_t) 0, SEEK_SET) < 0) { perror("output_close: header lseek"); exit(1); } /* construct and write out the header */ memset(header.preamble, 0, sizeof(header.preamble)); memcpy(header.preamble, CLOOP_PREAMBLE, sizeof(CLOOP_PREAMBLE)); header.block_size = htonl(blocksize); header.num_blocks = htonl(countedblocks); if (write(out, &header, sizeof(header)) != sizeof(header)) { perror("output_close: header write"); exit(1); } /* write out the index */ if (write(out, pointerblock, indexsize) != indexsize) { perror("output_close: index write"); exit(1); } /* close file */ close(out); } /* persistent form of write */ static GIOError g_io_channel_write_persist(GIOChannel *channel, gchar *buf, guint count, guint *bytes_written) { guint bytes_this; GIOError gioerr = G_IO_ERROR_NONE; *bytes_written = 0; while (*bytes_written < count) { gioerr = g_io_channel_write(channel, buf+(*bytes_written), count-(*bytes_written), &bytes_this); *bytes_written += bytes_this; if (gioerr != G_IO_ERROR_NONE) return gioerr; } return gioerr; } /* send a block to a slave */ static void slave_put(struct slave *slave, struct block *block) { GIOError gioerr; guint bytes_written; unsigned long number; int size; /* send block number */ number = htonl(block->number); gioerr = g_io_channel_write_persist(slave->channel, (char *) &number, sizeof(number), &bytes_written); g_assert (bytes_written == sizeof(number)); g_assert (gioerr == G_IO_ERROR_NONE); /* send block size */ size = htonl(block->size); gioerr = g_io_channel_write_persist(slave->channel, (char *) &size, sizeof(size), &bytes_written); g_assert (bytes_written == sizeof(size)); g_assert (gioerr == G_IO_ERROR_NONE); /* send block data */ gioerr = g_io_channel_write_persist(slave->channel, block->data, block->size, &bytes_written); g_assert (bytes_written == block->size); g_assert (gioerr == G_IO_ERROR_NONE); if (verbose > 3) fprintf(stderr, " slave_put: %s:%d: block %lu size %d\n", slave->host, slave->port, block->number, block->size); } /* persistent form of read */ static GIOError g_io_channel_read_persist(GIOChannel *channel, gchar *buf, guint count, guint *bytes_read) { guint bytes_this; GIOError gioerr = G_IO_ERROR_NONE; *bytes_read = 0; while (*bytes_read < count) { gioerr = g_io_channel_read(channel, buf+(*bytes_read), count-(*bytes_read), &bytes_this); *bytes_read += bytes_this; if (gioerr != G_IO_ERROR_NONE) return gioerr; } return gioerr; } /* get a block returned by a slave */ static struct block *slave_get(struct slave *slave) { GIOError gioerr; guint bytes_read; unsigned long number; int size; struct block *block = g_new(struct block, 1); g_assert (block != NULL); /* receive block number */ gioerr = g_io_channel_read_persist(slave->channel, (char *) &number, sizeof(number), &bytes_read); g_assert (gioerr == G_IO_ERROR_NONE); g_assert (bytes_read == sizeof(number)); block->number = ntohl(number); /* receive block size */ gioerr = g_io_channel_read_persist(slave->channel, (char *) &size, sizeof(size), &bytes_read); g_assert (gioerr == G_IO_ERROR_NONE); g_assert (bytes_read == sizeof(size)); block->size = ntohl(size); /* allocate block data */ g_assert ((block->size < (65536*2))); block->data = malloc(block->size); /* receive block data */ gioerr = g_io_channel_read_persist(slave->channel, block->data, block->size, &bytes_read); g_assert (gioerr == G_IO_ERROR_NONE); g_assert (bytes_read == block->size); /* return the block, caller to free */ if (verbose > 3) fprintf(stderr, " slave_get: %s:%d: block %lu size %d\n", slave->host, slave->port, block->number, block->size); slave->blocks++; return block; } /* a slave compressor is ready to take data */ static gboolean slave_writable(GIOChannel *source, GIOCondition condition, gpointer data) { struct slave *slave = (struct slave *) data; struct block *block; if (verbose > 3) fprintf(stderr, "slave_writable: %s:%d\n", slave->host, slave->port); /* if we have already seen end of input, no need to be called again */ if (ended) return FALSE; /* read input file block */ block = input_get(); /* if no more data available, note the event, no need to be called again */ if (block == NULL) { ended++; return FALSE; } /* send block to slave */ slave_put(slave, block); block_free(block); /* please call again */ return TRUE; } /* a slave compressor has returned data to us */ static gboolean slave_readable(GIOChannel *source, GIOCondition condition, gpointer data) { struct slave *slave = (struct slave *) data; struct block *block; if (verbose > 3) fprintf(stderr, "slave_readable: %s:%d\n", slave->host, slave->port); /* read from slave */ block = slave_get(slave); /* is it the block we are waiting for */ if (block->number == tail) { /* write to file */ output_put(block); tail++; block_free(block); /* write merge queue while it matches tail */ while ((block = queue_get(tail))) { output_put(block); tail++; block_free(block); } /* check for completion */ if ((ended) && (tail == head)) g_main_quit(loop); } else { /* these aren't the droids we're looking for, add to merge queue */ queue_put(block); } /* please call again */ return TRUE; } /* create a new slave */ static struct slave *slave_new(char *host, int port) { struct slave *my; int stat, ov; struct sockaddr_in addr; struct in_addr result; int version; GIOError gioerr; int bytes_written; my = g_new0(struct slave, 1); g_assert (my != NULL); my->host = g_strdup(host); my->port = port; /* create socket and set buffer sizes to prevent blocking */ my->sock = socket(AF_INET, SOCK_STREAM, 0); if (my->sock == -1) { perror("socket"); exit(1); } ov = blocksize * 4; stat = setsockopt(my->sock, SOL_SOCKET, SO_RCVBUF, (char *) &ov, sizeof(ov)); if (stat < 0) { perror("setsockopt (SO_RCVBUF)"); } ov = blocksize * 4; stat = setsockopt(my->sock, SOL_SOCKET, SO_SNDBUF, (char *) &ov, sizeof(ov)); if (stat < 0) { perror("setsockopt (SO_RCVBUF)"); } /* resolve the host address and prepare address structure */ addr.sin_family = AF_INET; if (inet_aton(host, &result)) { addr.sin_addr = result; } else { struct hostent *hostent; hostent = gethostbyname(host); if (hostent == NULL) { herror(host); exit(1); } addr.sin_addr.s_addr = * (long *) hostent->h_addr; } addr.sin_port = htons(port); /* connect to the slave */ stat = connect(my->sock, (struct sockaddr *) &addr, sizeof(addr)); if (stat != 0) { perror("connect"); exit(1); } /* express an interest in the slave's file descriptors */ my->channel = g_io_channel_unix_new(my->sock); my->writable = g_io_add_watch(my->channel, G_IO_OUT, slave_writable, my); my->readable = g_io_add_watch(my->channel, G_IO_IN, slave_readable, my); /* send protocol version */ version = htonl(VERSION); gioerr = g_io_channel_write_persist(my->channel, (char *) &version, sizeof(version), &bytes_written); g_assert (bytes_written == sizeof(version)); g_assert (gioerr == G_IO_ERROR_NONE); if (verbose > 3) fprintf(stderr, "slave_new: %s:%d: connected\n", my->host, my->port); return my; } static void usage(char* argv) { fprintf(stderr, "Usage: %s [options] infile outfile slave [slave ...]\n", argv); fprintf(stderr, "Options:\n"); fprintf(stderr, "\t-b blocksize\t\tset block size (multiple of 512 bytes)\n"); fprintf(stderr, "\t-i imagesize\t\tset input image size (in bytes) if it cannot be determined\n"); fprintf(stderr, "\t-I forced-imagesize\tforce input image size (in bytes), even if determined\n"); fprintf(stderr, "\t-v \tbe more verbose (accumulates)\n"); exit(1); } int main(int argc, char **argv) { int optionchar; GPtrArray *slaves; /* process command line options */ while ((optionchar = getopt(argc, argv, "b:i:I:c:Fv")) != -1) { switch (optionchar) { case 'b': blocksize = atoi(optarg); if (blocksize == 0 || blocksize % 512 != 0) fprintf(stderr, "Illegal block size, isn't a multiple of 512.\n"); break; case 'i': imagesize = atoi(optarg); break; case 'I': imagesize = atoi(optarg); forceimagesize = 1; break; case 'v': verbose++; break; default: fprintf(stderr, "Unknown option: %c\n", optionchar); usage(argv[0]); } if (verbose >= 2) fprintf(stderr, "Option %c with possible sub-option %s\n", optionchar, optarg); } if (optind == argc) { fprintf(stderr, "ERROR: no infile/outfile present\n"); exit(1); } if (optind == argc-1) { fprintf(stderr, "ERROR: no outfile present\n"); exit(1); } if (optind == argc-2) { fprintf(stderr, "ERROR: no slaves\n"); exit(1); } if (blocksize > MAX_KMALLOC_SIZE) { fprintf(stderr, "WARNING: Blocksize %lu may be too big for a kmalloc() (%lu max).\n", blocksize, MAX_KMALLOC_SIZE); sleep(2); } if (sizeof(CLOOP_PREAMBLE) > CLOOP_HEADROOM) { fprintf(stderr, "*** Preamble (%u chars) > headroom (%u)\n", sizeof(CLOOP_PREAMBLE), CLOOP_HEADROOM); exit(1); } // open input and output files input_open(argv[optind]); output_open(argv[optind+1]); // create the main loop loop = g_main_new(FALSE); ended = head = tail = 0; queue_init(); // create the slaves, keep them fed with blocks as their sockets // become writeable, and as they return the compressed result // enforce block ordering and write to the output file. slaves = g_ptr_array_new(); for (optind += 2;optind < argc;optind++) { char **split; int port; struct slave *slave; split = g_strsplit(argv[optind], ":", 2); if (split[1] != NULL) port = atoi(split[1]); else port = PORT; slave = slave_new(split[0], port); g_strfreev(split); g_ptr_array_add(slaves, (gpointer) slave); } // main loop will be terminated when output file is completed g_main_run(loop); output_close(); /* report slave processing statistics */ { struct slave *slave; int i, total=0; for(i=0;(slave = g_ptr_array_index(slaves, i));i++) { total += slave->blocks; } for(i=0;(slave = g_ptr_array_index(slaves, i));i++) { fprintf(stderr, "%s processed %d blocks (%d%%)\n", slave->host, slave->blocks, slave->blocks * 100 / total); } } g_ptr_array_free(slaves, TRUE); return 0; } // timings, output to /dev/null // distributed 1718284288 1.7Gb to 700MHz Celeron localhost, 500MHz Pentium III, 350MHz Pentium II took 6m46s. // normal 1.7Gb to 700MHz Celeron took 13m56s. // watch -d --interval=1 "(netstat -an|head -2|tail -1;netstat -an|grep 8522)" // 127.0.0.1 processed 7253 blocks // 10.0.0.5 processed 10611 blocks // 10.0.0.16 processed 8355 blocks // 5m 15s // emma processed 8167 blocks // quozl processed 7125 blocks // easy processed 5958 blocks // lenny processed 4969 blocks // 7m32.517s // quozl processed 12533 blocks // emma processed 13686 blocks // 6m41s // emma processed 15104 blocks (57%) // quozl processed 11115 blocks (42%) // 5m59.053s (to file, not /dev/null like other tests) // emma processed 8617 blocks (32%) // quozl processed 7029 blocks (26%) // easy processed 5871 blocks (22%) // lenny processed 4702 blocks (17%)