Boost C++ Libraries

“...one of the most highly regarded and expertly designed C++ library projects in the world.” Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

Boost.MapReduce Word Count example

Note: This library is not yet part of the Boost Library and is still under development and review.

By way of an example of using the MapReduce library, we implement a Word Count application. We'll use a datasource class supplied by the library to iterate through a directory of files containing words to be counted. The Map phase will create a list of words and a count of 1, and the Reduce phase will accept a list of words and corresponding counts, total the counts for each word, and produce a final list of words with their totals.

map (filename; string, file stream; ifstream) --> list(word; string, count; unsigned int)
reduce (word; string, list(count; unsigned int)) --> list(count; unsigned int)

MapTask

The MapTask will be implemented by a function-object wordcount::map_task. There are two required data types to be defined by the MapTask. These are defined by the base class via template parameters. These are key/value types of the input of the map task.

The map function operator takes three parameters; the runtime which is passed by the MapReduce library to be used as a callback to emit intermediate key/value pairs. The other two parameters are the key and value for the map task to process. Normally these parameters would be expected to be passed as a reference-to-const, but this is not a requirement. For example, if the value type was to be defined as a file stream std::ifstream, then reference-to-const is not possible as the function would not be able to read from the file as the read operation modifies the state of the object.

The function simply loops through the character array (defined by begin/end pointers in the value parameter), defining a word to be a contiguous set of alphabetic characters or an apostrophe. The word is then stored as an intermediate key with a value of 1, by calling the emit_intermediate() function of the runtime parameter object.

struct map_task : public boost::mapreduce::map_task<
                             std::string,                            // MapKey
                             std::pair<char const *, char const *> > // MapValue
{
    template<typename Runtime>
    void operator()(Runtime &runtime, std::string const &/*key*/, value_type &value) const
    {
        bool in_word = false;
        char const *ptr = value.first;
        char const *end = value.second;
        char const *word = ptr;
        for (; ptr != end; ++ptr)
        {
            char const ch = std::toupper(*ptr);
            if (in_word)
            {
                if ((ch < 'A' || ch > 'Z') && ch != '\'')
                {
                    runtime.emit_intermediate(std::string(word,ptr-word), 1);
                    in_word = false;
                }
            }
            else
            {
                if (ch >= 'A'  &&  ch <= 'Z')
                {
                    word = ptr;
                    in_word = true;
                }
            }
        }
        if (in_word)
        {
            BOOST_ASSERT(ptr-word > 0);
            runtime.emit_intermediate(std::string(word,ptr-word), 1);
        }
    }
};

ReduceTask

The ReduceTask will be implemented by a function-object wordcount::reduce_task. This functor is derived from the library's reduce_task class which takes two template parameters to define the key and value types output of the reduce task.

The reduce function operator takes four parameters; the runtime object is the library's callback as described above. The second parametrer is the key of the reduce task and the third and fourth parameters are a pair of iterators dictating the range of value objects for the reduce task. In this Word Count example, the key is a text string containing the word, and the iterators contain a list of frequencies for the word. The ReduceTask simply sums the frequencies by calling std::accumulate and stores the final result by calling the emit() function of the job::reduce_task_runner object which was passed to the constructor of the reduce_task object.

struct reduce_task : public boost::mapreduce::reduce_task<std::string, unsigned>
{
    template<typename Runtime, typename It>
    void operator()(Runtime &runtime, std::string const &key, It it, It const ite) const
    {
        runtime.emit(key, std::accumulate(it, ite, 0));
    }
};

Type Definitions

For convenience, brevity and maintainability, we define a job type for the MapReduce job. This local job type will be an defined in terms of the library's mapreduce::job class with template parameters specific to the Word Count application.

typedef
mapreduce::job<
  wordcount::map_task,
  wordcount::reduce_task>
job;

The class mapreduce::job actually has six template parameters. The first two must be supplied, the last three have default values. The definition above is actually equivalent to

class boost::mapreduce::job<
    struct wordcount::map_task,
    struct wordcount::reduce_task,
    struct boost::mapreduce::null_combiner,
    class boost::mapreduce::datasource::directory_iterator<
        struct wordcount::map_task,
        class boost::mapreduce::datasource::file_handler<
            class std::basic_string<
                char,class std::char_traits<char>,
                class std::allocator<char> >,
            struct std::pair<char const *,char const *> > >,
    class boost::mapreduce::intermediates::in_memory<
        struct wordcount::map_task,
        struct wordcount::reduce_task,
        struct boost::mapreduce::hash_partitioner>,
    class boost::mapreduce::intermediates::reduce_null_output<
        struct wordcount::map_task,struct wordcount::reduce_task> >

Program

To run the MapReduce Word Count algorithm, we need a program to set up an environment, run the algorithm and report the results.

The code below shows an example. Note that error handling has been removed for brevity. A datasource object is created to iterate through a directory of files and pass each file into a map task. A mapreduce::specification object is then created. This is used to specify system parameters such a the number of map tasks to run. Note that this is a hint to the MapReduce runtime, and may differ from the actual number of maps that are used. The final supporting object that is created is an instance of mapreduce::results. This structure will be populated by the runtime to provide metrics and timings of the MapReduce job execution.

To run the MapReduce job, call the run function of the job class. There are two variant of run, for coding convenience.

    template<typename SchedulePolicy>
    void run(specification const &spec, results &result);

    template<typename SchedulePolicy>
    void run(SchedulePolicy &schedule, specification const &spec, results &result);

Both overloads of run() are template functions where the template parameter is a SchedulePolicy. The first variant will default construct a schedule policy class, and the second variant will use the supplied policy class. This enables the library user to develop their own scheduler policies that may need configuration before being used. See Schedule Policies for more information.

To reduce the complexity of dealing with the compexity of MapReduce, the library provides a very simple interface to running a system with default values. In this example, we want to run the Job without any specific configuration, so we can use the function

template<typename Job>
void run(boost::mapreduce::specification &spec, boost::mapreduce::results &result);

This function will default construct the objects required using the type definitions provide - or defaulted - in the job definition. The user-defined program is therefore very simple, with all the generic complexity being handled by the library:

int main(int argc, char **argv)
{
    std::cout << "MapReduce Wordcount Application";
    if (argc < 2)
    {
        std::cerr << "Usage: wordcount directory [num_map_tasks]\n";
        return 1;
    }

    boost::mapreduce::specification spec;
    spec.input_directory = argv[1];

    std::cout << "\n" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores";
    std::cout << "\n" << typeid(wordcount::job).name() << "\n";

    boost::mapreduce::results result;
    wordcount::job::datasource_type datasource(spec);
    try
    {
        if (argc > 2)
            spec.map_tasks = atoi(argv[2]);

        if (argc > 3)
            spec.reduce_tasks = atoi(argv[3]);
        else
            spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency());

        std::cout << "\nRunning Parallel WordCount MapReduce...";
        wordcount::job job(datasource, spec);
        job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
        std::cout << "\nMapReduce Finished.";

At the end of the MapReduce job execution, the statistics can be written to the screen.

        std::cout << std::endl << "\nMapReduce statistics:";
        std::cout << "\n  MapReduce job runtime                     : " << result.job_runtime << " seconds, of which...";
        std::cout << "\n    Map phase runtime                       : " << result.map_runtime << " seconds";
        std::cout << "\n    Reduce phase runtime                    : " << result.reduce_runtime << " seconds";
        std::cout << "\n\n  Map:";
        std::cout << "\n    Total Map keys                          : " << result.counters.map_keys_executed;
        std::cout << "\n    Map keys processed                      : " << result.counters.map_keys_completed;
        std::cout << "\n    Map key processing errors               : " << result.counters.map_key_errors;
        std::cout << "\n    Number of Map Tasks run (in parallel)   : " << result.counters.actual_map_tasks;
        std::cout << "\n    Fastest Map key processed in            : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
        std::cout << "\n    Slowest Map key processed in            : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
        std::cout << "\n    Average time to process Map keys        : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";

        std::cout << "\n\n  Reduce:";
        std::cout << "\n    Total Reduce keys                       : " << result.counters.reduce_keys_executed;
        std::cout << "\n    Reduce keys processed                   : " << result.counters.reduce_keys_completed;
        std::cout << "\n    Reduce key processing errors            : " << result.counters.reduce_key_errors;
        std::cout << "\n    Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
        std::cout << "\n    Number of Result Files                  : " << result.counters.num_result_files;
        if (result.reduce_times.size() > 0)
        {
            std::cout << "\n    Fastest Reduce key processed in         : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
            std::cout << "\n    Slowest Reduce key processed in         : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
            std::cout << "\n    Average time to process Reduce keys     : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
        }

and the top 10 words in descending frequency:

        wordcount::job::const_result_iterator it  = job.begin_results();
        wordcount::job::const_result_iterator ite = job.end_results();
        if (it != ite)
        {
            typedef std::list<wordcount::job::keyvalue_t> frequencies_t;
            frequencies_t frequencies;
            frequencies.push_back(*it);
            frequencies_t::reverse_iterator it_smallest = frequencies.rbegin();
            for (++it; it!=ite; ++it)
            {
                if (frequencies.size() < 10)    // show top 10
                {
                    frequencies.push_back(*it);
                    if (it->second < it_smallest->second)
                        it_smallest = frequencies.rbegin();
                }
                else if (it->second > it_smallest->second)
                {
                    *it_smallest = *it;
                    it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<wordcount::job::keyvalue_t>);
                }
            }

            frequencies.sort(boost::mapreduce::detail::greater_2nd<wordcount::job::keyvalue_t>);
            std::cout << "\n\nMapReduce results:";
            for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq)
                std::cout << "\n" << freq->first << "\t" << freq->second;
        }

Output

The wordcount program build using Microsoft Visual Studio 2005 and STLPort. It was run on a sample dataset consists of six plain text files consisting a total of 96 MB (100,628,434 bytes). The smallest file is 163 KB (167,529 bytes) and the largest is 88.1 MB (92,392,601 bytes).

MapReduce Wordcount Application
2 CPU cores
class boost::mapreduce::job<struct wordcount::map_task,struct wordcount::reduce_
task,struct boost::mapreduce::null_combiner,class boost::mapreduce::datasource::
directory_iterator<struct wordcount::map_task,class boost::mapreduce::datasource
::file_handler<class stlp_std::basic_string<char,class stlp_std::char_traits<cha
r>,class stlp_std::allocator<char> >,struct stlp_std::pair<char const *,char con
st *> > >,class boost::mapreduce::intermediates::in_memory<struct wordcount::map
_task,struct wordcount::reduce_task,struct boost::mapreduce::hash_partitioner>,c
lass boost::mapreduce::intermediates::reduce_null_output<struct wordcount::map_t
ask,struct wordcount::reduce_task> >

Running Parallel WordCount MapReduce...
MapReduce Finished.

MapReduce statistics:
  MapReduce job runtime                     : 00:00:23.031250 seconds, of which.
..
    Map phase runtime                       : 00:00:20.390625 seconds
    Reduce phase runtime                    : 00:00:02.640625 seconds

  Map:
    Total Map keys                          : 100
    Map keys processed                      : 100
    Map key processing errors               : 0
    Number of Map Tasks run (in parallel)   : 2
    Fastest Map key processed in            : 00:00:00.046875 seconds
    Slowest Map key processed in            : 00:00:00.531250 seconds
    Average time to process Map keys        : 00:00:00.406093 seconds

  Reduce:
    Total Reduce keys                       : 120925
    Reduce keys processed                   : 120925
    Reduce key processing errors            : 0
    Number of Reduce Tasks run (in parallel): 2
    Number of Result Files                  : 2
    Fastest Reduce key processed in         : 00:00:02.500000 seconds
    Slowest Reduce key processed in         : 00:00:02.640625 seconds
    Average time to process Reduce keys     : 00:00:00.051406 seconds

MapReduce results:
the     817758
of      441398
to      426628
and     410033
a       327046
in      282907
that    212299
is      204080
you     162297
i       156817

Adding a Combiner

In some circumstances, an optimisation can be made by consolidating the results of the Map phase before they are passed to the Reduce phase. This consolidation is done by a combiner functor.

In the case of the Word Count example, the Map phase will naturally produce list of words, each with a count of 1. The combiner can be used to total the number of each word in the list and produce a shorter list with unique word occurrences.

class combiner
{
  public:
    void start(map_task::intermediate_key_type const &)
    {
        total_ = 0;
    }

    template<typename IntermediateStore>
    void finish(map_task::intermediate_key_type const &key, IntermediateStore &intermediate_store)
    {
        if (total_ > 0)
            intermediate_store.insert(key, total_);
    }

    void operator()(map_task::intermediate_value_type const &value)
    {
        total_ += value;
    }

  private:
    size_t total_;
};

The combiner runs as a part of the Map Task.

MapReduce Wordcount Application
2 CPU cores
class boost::mapreduce::job<struct wordcount::map_task,struct wordcount::reduce_
task,class wordcount::combiner,class boost::mapreduce::datasource::directory_ite
rator<struct wordcount::map_task,class boost::mapreduce::datasource::file_handle
r<class stlp_std::basic_string<char,class stlp_std::char_traits<char>,class stlp
_std::allocator<char> >,struct stlp_std::pair<char const *,char const *> > >,cla
ss boost::mapreduce::intermediates::in_memory<struct wordcount::map_task,struct
wordcount::reduce_task,struct boost::mapreduce::hash_partitioner>,class boost::m
apreduce::intermediates::reduce_null_output<struct wordcount::map_task,struct wo
rdcount::reduce_task> >

Running Parallel WordCount MapReduce...
MapReduce Finished.

MapReduce statistics:
  MapReduce job runtime                     : 00:00:19 seconds, of which...
    Map phase runtime                       : 00:00:18.578125 seconds
    Reduce phase runtime                    : 00:00:00.421875 seconds

  Map:
    Total Map keys                          : 100
    Map keys processed                      : 100
    Map key processing errors               : 0
    Number of Map Tasks run (in parallel)   : 2
    Fastest Map key processed in            : 00:00:00.031250 seconds
    Slowest Map key processed in            : 00:00:00.593750 seconds
    Average time to process Map keys        : 00:00:00.365468 seconds

  Reduce:
    Total Reduce keys                       : 120925
    Reduce keys processed                   : 120925
    Reduce key processing errors            : 0
    Number of Reduce Tasks run (in parallel): 2
    Number of Result Files                  : 2
    Fastest Reduce key processed in         : 00:00:00.421875 seconds
    Slowest Reduce key processed in         : 00:00:00.421875 seconds
    Average time to process Reduce keys     : 00:00:00.008437 seconds

MapReduce results:
the     817758
of      441398
to      426628
and     410033
a       327046
in      282907
that    212299
is      204080
you     162297
i       156817

Source Code

The full source code for the Word Count example can be found in libs/mapreduce/examples/wordcount/wordcount.cpp