#include "RunReduce.hpp" #include "../utils/defines.hpp" #include "../utils/merger.hpp" #include "../utils/parseMapLine.hpp" namespace MapReduce { struct get_lines {}; /********************************************************* * RunReduce constructor sets the state of the worker and* * retreives input files from the advert database posted * * by the master. * * ******************************************************/ RunReduce::RunReduce(saga::advert::directory workerDir, saga::advert::directory reduceInputDir, std::string outputPrefix) { int mode = saga::advert::ReadWrite; workerDir_ = workerDir; reduceInputDir_ = reduceInputDir; outputPrefix_ = outputPrefix; try { std::vector entries = reduceInputDir_.list("*"); std::vector::const_iterator entriesIT = entries.begin(); while(entriesIT != entries.end()) { saga::advert::entry adv(reduceInputDir_.open(*entriesIT, mode)); files_.push_back(adv.retrieve_string()); entriesIT++; } } catch(saga::exception const & e) { throw; } } /********************************************************* * ~RunReduce destructor returns the state of the worker * * to what is expected by the master after a reduce is * * finished. * * ******************************************************/ RunReduce::~RunReduce() { try { saga::url fileurl(outputPrefix_ + "/mapFile-reduced"); saga::advert::entry adv(workerDir_.open(saga::url("./output"), saga::advert::ReadWrite | saga::advert::Create)); adv.store_string(fileurl.get_string()); } catch(saga::exception const& e) { throw; } } /********************************************************* * getLines returns a representation of each line from * * the input files as a map > to * * be passed to the user defined reduce function. * * ******************************************************/ void RunReduce::getLines(unorderedMap &keyValues) { std::vector lines; std::vector::const_iterator linesIT; merger(lines, files_); for(linesIT = lines.begin();linesIT!=lines.end();linesIT++) { std::string key(getKey(*linesIT)); std::vector values; parseMapLine(values, *linesIT); if(keyValues.find(key) == keyValues.end()) { //Not in structure strVectorPtr initialValue(new std::vector(values)); keyValues[key] = initialValue; } else { //Contained in structure std::vector::const_iterator valuesIT = values.begin(); std::vector::const_iterator end = values.end(); while(valuesIT != end) { keyValues[key]->push_back(*valuesIT); ++valuesIT; } } } } } // namespace MapReduce