// Example 5.1. See page 32 of the FG tutorial. #include "FG.h" #include #include using namespace std; // Prototypes for stage functions. int generate_func(FG_stage, FG_stage_params); int sort_func(FG_stage, FG_stage_params); int merge_func(FG_stage, FG_stage_params); int print_func(FG_stage, FG_stage_params); int main() { // Declare the pipeline variables. const int N = 10; // number of ints in each buffer const int SIZE = N * sizeof(int); // number of bytes in each buffer const int ROUNDS = 20; // 20 rounds const int AUX_BUFFS = 2; // number of auxiliary buffers const int PIPELINE_BUFFS = 6; // number of pipeline buffers const int MERGE_BUFFS = 2; // the merge stage needs 2 pipeline buffers. // Declare the threads, and store them in a NULL-terminated array. FG_pipeline_thread_helper generate_thread = new FG_pipeline_thread_helper_info(), sort_thread = new FG_pipeline_thread_helper_info(), merge_thread = new FG_pipeline_thread_helper_info(), print_thread = new FG_pipeline_thread_helper_info(); FG_pipeline_thread_helper threads[] = {generate_thread, sort_thread, merge_thread, print_thread, NULL}; // Declare the stages. FG_pipeline_stage_helper generate_stage = new FG_pipeline_stage_helper_info(generate_thread), sort_stage = new FG_pipeline_stage_helper_info(sort_thread), print_stage = new FG_pipeline_stage_helper_info(print_thread); // The merge stage is a soft barrier. FG_soft_barrier merge_stage = new FG_soft_barrier_info(merge_thread); // The merge stage takes 2 pipeline buffers as input. merge_stage->set_buffer_wait(MERGE_BUFFS); // Define the stage functions. generate_stage->set_func(generate_func, NULL); sort_stage->set_func(sort_func, NULL); merge_stage->set_func(merge_func, NULL); print_stage->set_func(print_func, NULL); // Store the stages in a NULL-terminated array. FG_node stages[] = {generate_stage, sort_stage, merge_stage, print_stage, NULL}; // Instantiate the pipeline. FG_pipeline my_pipeline = FG_pipeline_info::create(stages, threads); // Set the pipeline variables. my_pipeline->set_buff_size(SIZE); my_pipeline->set_num_rounds(ROUNDS); my_pipeline->set_num_aux(AUX_BUFFS); my_pipeline->set_num_buffs(PIPELINE_BUFFS); // Fix the pipeline, which also runs it, and store the return code. int error = my_pipeline->fix(); // Display the return code. cout << "FINISHED PIPELINE WITH ERROR CODE " << error << endl; // Dismantle the pipeline. my_pipeline->dismantle(); return 0; } // Function for the generate stage. Fills a buffer with random ints. int generate_func(FG_stage my_stage, FG_stage_params my_params) { // Accept a thumbnail from the preceding stage, and save the // location of its buffer. FG_pipeline_thumbnail thumb = my_stage->accept_thumbnail(); int *buff = (int *) thumb->get_address(); // Determine how many ints this buffer holds. int n = thumb->get_size() / sizeof(int); // Fill this buffer with random integers. for (int i = 0; i < n; i++) buff[i] = rand(); // Convey the thumbnail to the next stage. my_stage->convey_thumbnail(thumb); return 0; } // Comparison function for the call to qsort. int compare(const void *a, const void *b) { int aint = * (int *) a; int bint = * (int *) b; if (aint < bint) return -1; else if (aint > bint) return 1; else return 0; } // Function for the sort stage. Runs qsort on the ints in the buffer, // if this buffer is not the caboose. int sort_func(FG_stage my_stage, FG_stage_params my_params) { // Accept a thumbnail from the preceding stage. FG_pipeline_thumbnail thumb = my_stage->accept_thumbnail(); // If this buffer is not the caboose, sort it. if (!thumb->get_caboose()) { // Compute the number of ints in the buffer. int n = thumb->get_size() / sizeof(int); // Sort the buffer. qsort(thumb->get_address(), n, sizeof(int), compare); } // Convey the thumbnail to the next stage. my_stage->convey_thumbnail(thumb); return 0; } // Function for the merge stage. This stage merges two pipeline // buffers, each of which is assumed to be already sorted, into two // auxiliary buffers. It then turns the auxiliary buffers into // pipeline buffers and passes them on to the next stage. The // contents of the two output buffers, together, will be sorted. int merge_func(FG_stage my_stage, FG_stage_params my_params) { // Accept two thumbnails. FG_pipeline_thumbnail *pipeline_thumbs = my_stage->accept_multiple_thumbnails(); // Get two auxiliary thumbnails. FG_aux_thumbnail aux_thumbs[2]; aux_thumbs[0] = my_stage->get_aux_thumbnail(); aux_thumbs[1] = my_stage->get_aux_thumbnail(); // Store the buffer addresses. int *pipeline_buffs[2], *aux_buffs[2]; pipeline_buffs[0] = (int *) pipeline_thumbs[0]->get_address(); pipeline_buffs[1] = (int *) pipeline_thumbs[1]->get_address(); aux_buffs[0] = (int *) aux_thumbs[0]->get_address(); aux_buffs[1] = (int *) aux_thumbs[1]->get_address(); // Determine how many ints each buffer holds. int n = pipeline_thumbs[0]->get_size() / sizeof(int); int *output_buff = aux_buffs[0]; // pointer to the current output buffer int i=0, // index into pipeline_buffs[0] j = 0, // index into pipeline_buffs[1] k = 0; // index into output_buff // As long as we have not copied either one of the pipeline buffers // in its entirety, copy the smallest remaining element into the // output buffer. Once i or j reaches n, we have copied one of the // pipeline buffers in its entirety. while (i < n && j < n) { if (pipeline_buffs[0][i] < pipeline_buffs[1][j]) output_buff[k++] = pipeline_buffs[0][i++]; else output_buff[k++] = pipeline_buffs[1][j++]; // We need to switch to the other output buffer once k reaches n. if (k >= n) { output_buff = aux_buffs[1]; k = 0; } } // We have copied one of the pipeline buffers but the other remains // partially uncopied. Exactly one of the next two while loops will // undergo zero iterations. while (i < n) output_buff[k++] = pipeline_buffs[0][i++]; while (j < n) output_buff[k++] = pipeline_buffs[1][j++]; // Swap the auxiliary and pipeline buffers. aux_thumbs[0]->swap(pipeline_thumbs[0]); aux_thumbs[1]->swap(pipeline_thumbs[1]); // We are done with the auxiliary thumbnails and buffers. my_stage->release_aux_thumbnail(aux_thumbs[0]); my_stage->release_aux_thumbnail(aux_thumbs[1]); // Convey the pipeline thumbnails with their new buffers to the next // stage. my_stage->convey_multiple_thumbnails(pipeline_thumbs); return 0; } // Function for the print stage. Prints the contents of a buffer. int print_func(FG_stage my_stage, FG_stage_params my_params) { // Accept a thumbnail from the preceding stage, and save the // location of its buffer. FG_pipeline_thumbnail thumb = my_stage->accept_thumbnail(); int *buff = (int *) thumb->get_address(); // Determine how many ints this buffer holds. int n = thumb->get_size() / sizeof(int); // Print the contents of this buffer. for (int i = 0; i < n; i++) cout << buff[i] << endl; // Convey the thumbnail to the next stage. my_stage->convey_thumbnail(thumb); return 0; }