// Example 4.2. See page 21 of the FG tutorial. #include "FG.h" #include #include #include using namespace std; // Class to package together a file name and a stdio file desriptor. class IO_params { public : char *file_name; FILE *fd; }; // Function prototypes for stage functions, initialization functions, // and cleanup functions. int read_func(FG_stage, FG_stage_params); int sort_func(FG_stage, FG_stage_params); int write_func(FG_stage, FG_stage_params); int open_for_read(FG_thread_params); int close_for_read(FG_thread_params); int open_for_write(FG_thread_params); int close_for_write(FG_thread_params); int main() { // Declare the pipeline variables. const int N = 1024; // number of ints in each buffer const int SIZE = N * sizeof(int); // number of bytes in each buffer // Declare the threads, and store them in a NULL-terminated array. FG_pipeline_thread_helper read_thread = new FG_pipeline_thread_helper_info(), sort_thread = new FG_pipeline_thread_helper_info(), write_thread = new FG_pipeline_thread_helper_info(); FG_pipeline_thread_helper threads[] = {read_thread, sort_thread, write_thread, NULL}; // Declare the stages. FG_pipeline_stage_helper read_stage = new FG_pipeline_stage_helper_info(read_thread), sort_stage = new FG_pipeline_stage_helper_info(sort_thread), write_stage = new FG_pipeline_stage_helper_info(write_thread); // Make IO_params objects for the read and write threads. IO_params *read_params = new IO_params(), *write_params = new IO_params(); // Assign file names for reading and writing. read_params->file_name = "read_file.dat"; write_params->file_name = "write_file.dat"; // Define the read thread's initialization function, stage function, // and cleanup function. read_thread->set_init_func(open_for_read, read_params); read_stage->set_func(read_func, read_params); read_thread->set_cleanup_func(close_for_read, read_params); // The sort thread has just a stage function. sort_stage->set_func(sort_func, NULL); // Define the write thread's initialization function, stage // function, and cleanup function. write_thread->set_init_func(open_for_write, write_params); write_stage->set_func(write_func, write_params); write_thread->set_cleanup_func(close_for_write, write_params); // Store the stages in a NULL-terminated array. FG_pipeline_stage_helper stages[] = {read_stage, sort_stage, write_stage, NULL}; // Instantiate the pipeline. FG_pipeline my_pipeline = FG_pipeline_info::create(stages, threads); // Set the pipeline variables. my_pipeline->set_buff_size(SIZE); // Make the read stage the only caboose setter. read_stage->make_caboose_setter(); // Fix the pipeline, which also runs it, and store the return code. int error = my_pipeline->fix(); // Display the return code. cout << "FINISHED PIPELINE WITH ERROR CODE " << error << endl; // Dismantle the pipeline. my_pipeline->dismantle(); // Delete allocated memory. delete read_params; delete write_params; return 0; } // Initialization for the read thread. Opens a file and stores the // file descriptor. int open_for_read(FG_thread_params params) { // Treat params as a pointer to IO_params. IO_params *read_params = (IO_params *) params; read_params->fd = fopen(read_params->file_name, "r"); return 0; } // Function for the read stage. Reads into a buffer from the file. If // its reads eof, it sets the caboose. int read_func(FG_stage my_stage, FG_stage_params my_params) { // Accept a thumbnail from the preceding stage. FG_pipeline_thumbnail thumb = my_stage->accept_thumbnail(); // Compute the number of bytes to read. int buff_size = (int) thumb->get_size(); // Get the file descriptor. FILE *f = ((IO_params *) my_params)->fd; // Read into the buffer. fread(thumb->get_address(), 1, buff_size, f); // If we read eof, set the caboose. if (feof(f)) my_stage->set_caboose(thumb); // Convey the thumbnail to the next stage. my_stage->convey_thumbnail(thumb); return 0; } // Cleanup function for the read thread. Closes the file. int close_for_read(FG_thread_params params) { fclose(((IO_params *) params)->fd); return 0; } // Comparison function for the call to qsort. int compare(const void *a, const void *b) { int aint = * (int *) a; int bint = * (int *) b; if (aint < bint) return -1; else if (aint > bint) return 1; else return 0; } // Function for the sort stage. Runs qsort on the ints in the buffer, // if this buffer is not the caboose. int sort_func(FG_stage my_stage, FG_stage_params my_params) { // Accept a thumbnail from the preceding stage. FG_pipeline_thumbnail thumb = my_stage->accept_thumbnail(); // If this buffer is not the caboose, sort it. if (!thumb->get_caboose()) { // Compute the number of ints in the buffer. int n = thumb->get_size() / sizeof(int); // Sort the buffer. qsort(thumb->get_address(), n, sizeof(int), compare); } // Convey the thumbnail to the next stage. my_stage->convey_thumbnail(thumb); return 0; } // Initialization for the write thread. Opens a file and stores the // file descriptor. int open_for_write(FG_thread_params params) { // Treat params as a pointer to IO_params. IO_params *write_params = (IO_params *) params; write_params->fd = fopen(write_params->file_name, "w"); return 0; } // Function for the write stage. Writes the buffer into a file, if // this buffer is not the caboose. int write_func(FG_stage my_stage, FG_stage_params my_params) { // Accept a thumbnail from the preceding stage. FG_pipeline_thumbnail thumb = my_stage->accept_thumbnail(); // If this buffer is not the caboose, write it. if (!thumb->get_caboose()) { // Compute the number of bytes to write. int buff_size = (int) thumb->get_size(); // Get the file descriptor. FILE *f = ((IO_params *) my_params)->fd; // Do the write. fwrite(thumb->get_address(), 1, buff_size, f); } // Convey the thumbnail to the next stage. my_stage->convey_thumbnail(thumb); return 0; } // Cleanup function for the write thread. Closes the file. int close_for_write(FG_thread_params params) { fclose(((IO_params *) params)->fd); return 0; }