// Example 4.3. See page 27 of the FG tutorial. #include "FG.h" #include #include #include using namespace std; // Class to package together a file name and a stdio file desriptor. class IO_params { public : char *read_file_name, *write_file_name; FILE *read_fd, *write_fd; }; // Function prototypes for stage functions, initialization functions, // and cleanup functions. int read_func(FG_stage, FG_stage_params); int sort_func(FG_stage, FG_stage_params); int write_func(FG_stage, FG_stage_params); int open_for_io(FG_thread_params); int close_for_io(FG_thread_params); int main() { // Declare the pipeline variables. const int N = 1024; // number of ints in each buffer const int SIZE = N * sizeof(int); // number of bytes in each buffer // Declare the threads, and store them in a NULL-terminated array. FG_pipeline_thread_helper io_thread = new FG_pipeline_thread_helper_info(), sort_thread = new FG_pipeline_thread_helper_info(); FG_pipeline_thread_helper threads[] = {io_thread, sort_thread, NULL}; // Declare the stages. FG_pipeline_stage_helper read_stage = new FG_pipeline_stage_helper_info(io_thread), sort_stage = new FG_pipeline_stage_helper_info(sort_thread), write_stage = new FG_pipeline_stage_helper_info(io_thread); // Make an IO_params object for the I/O threads. IO_params *io_params = new IO_params(); // Assign file names for reading and writing. io_params->read_file_name = "read_file.dat"; io_params->write_file_name = "write_file.dat"; // Define the I/O thread's initialization function, stage function, // and cleanup function. io_thread->set_init_func(open_for_io, io_params); read_stage->set_func(read_func, io_params); write_stage->set_func(write_func, io_params); io_thread->set_cleanup_func(close_for_io, io_params); // The sort thread has just a stage function. sort_stage->set_func(sort_func, NULL); // Store the stages in a NULL-terminated array. FG_pipeline_stage_helper stages[] = {read_stage, sort_stage, write_stage, NULL}; // Instantiate the pipeline. FG_pipeline my_pipeline = FG_pipeline_info::create(stages, threads); // Set the pipeline variables. my_pipeline->set_buff_size(SIZE); // Make the read stage the only caboose setter. read_stage->make_caboose_setter(); // Fix the pipeline, which also runs it, and store the return code. int error = my_pipeline->fix(); // Display the return code. cout << "FINISHED PIPELINE WITH ERROR CODE " << error << endl; // Dismantle the pipeline. my_pipeline->dismantle(); return 0; } // Initialization for the I/O thread. Opens both files and stores the // file descriptors. int open_for_io(FG_thread_params params) { // Treat params as a pointer to IO_params. IO_params *io_params = (IO_params *) params; io_params->read_fd = fopen(io_params->read_file_name, "r"); io_params->write_fd = fopen(io_params->write_file_name, "w"); return 0; } // Function for the read stage. Reads into a buffer from the file. If // its reads eof, it sets the caboose. int read_func(FG_stage my_stage, FG_stage_params my_params) { // Accept a thumbnail from the preceding stage. FG_pipeline_thumbnail thumb = my_stage->accept_thumbnail(); // Compute the number of bytes to read. int buff_size = (int) thumb->get_size(); // Get the file descriptor. FILE *f = ((IO_params *) my_params)->read_fd; // Read into the buffer. fread(thumb->get_address(), 1, buff_size, f); // If we read eof, set the caboose. if (feof(f)) my_stage->set_caboose(thumb); // Convey the thumbnail to the next stage. my_stage->convey_thumbnail(thumb); return 0; } // Function for the write stage. Writes the buffer into a file, if // this buffer is not the caboose. int write_func(FG_stage my_stage, FG_stage_params my_params) { // Accept a thumbnail from the preceding stage. FG_pipeline_thumbnail thumb = my_stage->accept_thumbnail(); // If this buffer is not the caboose, write it. if (!thumb->get_caboose()) { // Compute the number of bytes to write. int buff_size = (int) thumb->get_size(); // Get the file descriptor. FILE *f = ((IO_params *) my_params)->write_fd; // Do the write. fwrite(thumb->get_address(), 1, buff_size, f); } // Convey the thumbnail to the next stage. my_stage->convey_thumbnail(thumb); return 0; } // Cleanup function for the I/O thread. Closes both files. int close_for_io(FG_thread_params params) { fclose(((IO_params *) params)->read_fd); fclose(((IO_params *) params)->write_fd); return 0; } // Comparison function for the call to qsort. int compare(const void *a, const void *b) { int aint = * (int *) a; int bint = * (int *) b; if (aint < bint) return -1; else if (aint > bint) return 1; else return 0; } // Function for the sort stage. Runs qsort on the ints in the buffer, // if this buffer is not the caboose. int sort_func(FG_stage my_stage, FG_stage_params my_params) { // Accept a thumbnail from the preceding stage. FG_pipeline_thumbnail thumb = my_stage->accept_thumbnail(); // If this buffer is not the caboose, sort it. if (!thumb->get_caboose()) { // Compute the number of ints in the buffer. int n = thumb->get_size() / sizeof(int); // Sort the buffer. qsort(thumb->get_address(), n, sizeof(int), compare); } // Convey the thumbnail to the next stage. my_stage->convey_thumbnail(thumb); return 0; }