Boost.MapReduce Word Count example
Note: This library is not yet part of the Boost Library and is still under development and review.
By way of an example of using the MapReduce library, we implement a Word Count application.
We'll use a datasource class supplied by the library to iterate through a directory
of files containing words to be counted. The Map phase will create a list of words and a count of 1,
and the Reduce phase will accept a list of words and corresponding counts, total the counts
for each word, and produce a final list of words with their totals.
map (filename; string, file stream; ifstream) --> list(word; string, count; unsigned int) reduce (word; string, list(count; unsigned int)) --> list(count; unsigned int)
MapTask
The MapTask will be implemented by a function-object wordcount::map_task. There are two required
data types to be defined by the MapTask. These are defined by the base class via template parameters. These are
key/value types of the input of the map task.
The map function operator takes three parameters; the runtime which is passed by the MapReduce
library to be used as a callback to emit intermediate key/value pairs. The other two parameters are the
key and value for the map task to process. Normally these parameters would be expected
to be passed as a reference-to-const, but this is not a requirement. For example, if the value type
was to be defined as a file stream std::ifstream, then reference-to-const is not possible as the
function would not be able to read from the file as the read operation modifies the state of the object.
The function simply loops through the character array (defined by begin/end pointers in the value
parameter), defining a word to be a contiguous set of alphabetic characters or an apostrophe. The
word is then stored as an intermediate key with a value of 1,
by calling the emit_intermediate() function of the runtime parameter object.
struct map_task : public boost::mapreduce::map_task<
std::string, // MapKey
std::pair<char const *, char const *> > // MapValue
{
template<typename Runtime>
void operator()(Runtime &runtime, std::string const &/*key*/, value_type &value) const
{
bool in_word = false;
char const *ptr = value.first;
char const *end = value.second;
char const *word = ptr;
for (; ptr != end; ++ptr)
{
char const ch = std::toupper(*ptr);
if (in_word)
{
if ((ch < 'A' || ch > 'Z') && ch != '\'')
{
runtime.emit_intermediate(std::string(word,ptr-word), 1);
in_word = false;
}
}
else
{
if (ch >= 'A' && ch <= 'Z')
{
word = ptr;
in_word = true;
}
}
}
if (in_word)
{
BOOST_ASSERT(ptr-word > 0);
runtime.emit_intermediate(std::string(word,ptr-word), 1);
}
}
};
ReduceTask
The ReduceTask will be implemented by a function-object wordcount::reduce_task. This
functor is derived from the library's reduce_task class which takes two template
parameters to define the key and value types output of the reduce task.
The reduce function operator takes four parameters; the runtime object is the library's
callback as described above. The second parametrer is the key of the reduce task and the
third and fourth parameters are a pair of iterators dictating the range of value objects
for the reduce task. In this Word Count example, the key is a text string containing the
word, and the iterators contain a list of frequencies for the word. The ReduceTask simply sums
the frequencies by calling std::accumulate and stores the final result by calling the
emit() function of the job::reduce_task_runner object which was passed to the
constructor of the reduce_task object.
struct reduce_task : public boost::mapreduce::reduce_task<std::string, unsigned>
{
template<typename Runtime, typename It>
void operator()(Runtime &runtime, std::string const &key, It it, It const ite) const
{
runtime.emit(key, std::accumulate(it, ite, 0));
}
};
Type Definitions
For convenience, brevity and maintainability, we define a job type for the MapReduce job.
This local job type will be an defined in terms of the library's mapreduce::job
class with template parameters specific to the Word Count application.
typedef mapreduce::job< wordcount::map_task, wordcount::reduce_task> job;
The class mapreduce::job actually has six template parameters. The first two must be supplied, the last
three have default values. The definition above is actually equivalent to
class boost::mapreduce::job<
struct wordcount::map_task,
struct wordcount::reduce_task,
struct boost::mapreduce::null_combiner,
class boost::mapreduce::datasource::directory_iterator<
struct wordcount::map_task,
class boost::mapreduce::datasource::file_handler<
class std::basic_string<
char,class std::char_traits<char>,
class std::allocator<char> >,
struct std::pair<char const *,char const *> > >,
class boost::mapreduce::intermediates::in_memory<
struct wordcount::map_task,
struct wordcount::reduce_task,
struct boost::mapreduce::hash_partitioner>,
class boost::mapreduce::intermediates::reduce_null_output<
struct wordcount::map_task,struct wordcount::reduce_task> >
Program
To run the MapReduce Word Count algorithm, we need a program to set up an environment, run the algorithm and report the results.
The code below shows an example. Note that error handling has been removed for brevity.
A datasource object is created to iterate through a directory of files and
pass each file into a map task. A mapreduce::specification object is then
created. This is used to specify system parameters such a the number of map tasks to run.
Note that this is a hint to the MapReduce runtime, and may differ from the actual
number of maps that are used. The final supporting object that is created is an
instance of mapreduce::results. This structure will be populated by the
runtime to provide metrics and timings of the MapReduce job execution.
To run the MapReduce job, call the run function of the job class.
There are two variant of run, for coding convenience.
template<typename SchedulePolicy>
void run(specification const &spec, results &result);
template<typename SchedulePolicy>
void run(SchedulePolicy &schedule, specification const &spec, results &result);
Both overloads of run() are template functions where the template parameter
is a SchedulePolicy. The first variant will default construct a schedule policy
class, and the second variant will use the supplied policy class. This enables the library
user to develop their own scheduler policies that may need configuration before being used.
See Schedule Policies for more information.
To reduce the complexity of dealing with the compexity of MapReduce, the library provides a very simple interface to running a system with default values. In this example, we want to run the Job without any specific configuration, so we can use the function
template<typename Job> void run(boost::mapreduce::specification &spec, boost::mapreduce::results &result);
This function will default construct the objects required using the type definitions provide -
or defaulted - in the job definition. The user-defined program is therefore very
simple, with all the generic complexity being handled by the library:
int main(int argc, char **argv)
{
std::cout << "MapReduce Wordcount Application";
if (argc < 2)
{
std::cerr << "Usage: wordcount directory [num_map_tasks]\n";
return 1;
}
boost::mapreduce::specification spec;
spec.input_directory = argv[1];
std::cout << "\n" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores";
std::cout << "\n" << typeid(wordcount::job).name() << "\n";
boost::mapreduce::results result;
wordcount::job::datasource_type datasource(spec);
try
{
if (argc > 2)
spec.map_tasks = atoi(argv[2]);
if (argc > 3)
spec.reduce_tasks = atoi(argv[3]);
else
spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency());
std::cout << "\nRunning Parallel WordCount MapReduce...";
wordcount::job job(datasource, spec);
job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
std::cout << "\nMapReduce Finished.";
At the end of the MapReduce job execution, the statistics can be written to the screen.
std::cout << std::endl << "\nMapReduce statistics:";
std::cout << "\n MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
std::cout << "\n Map phase runtime : " << result.map_runtime << " seconds";
std::cout << "\n Reduce phase runtime : " << result.reduce_runtime << " seconds";
std::cout << "\n\n Map:";
std::cout << "\n Total Map keys : " << result.counters.map_keys_executed;
std::cout << "\n Map keys processed : " << result.counters.map_keys_completed;
std::cout << "\n Map key processing errors : " << result.counters.map_key_errors;
std::cout << "\n Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
std::cout << "\n Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
std::cout << "\n Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
std::cout << "\n\n Reduce:";
std::cout << "\n Total Reduce keys : " << result.counters.reduce_keys_executed;
std::cout << "\n Reduce keys processed : " << result.counters.reduce_keys_completed;
std::cout << "\n Reduce key processing errors : " << result.counters.reduce_key_errors;
std::cout << "\n Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
std::cout << "\n Number of Result Files : " << result.counters.num_result_files;
if (result.reduce_times.size() > 0)
{
std::cout << "\n Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
std::cout << "\n Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
}
and the top 10 words in descending frequency:
wordcount::job::const_result_iterator it = job.begin_results();
wordcount::job::const_result_iterator ite = job.end_results();
if (it != ite)
{
typedef std::list<wordcount::job::keyvalue_t> frequencies_t;
frequencies_t frequencies;
frequencies.push_back(*it);
frequencies_t::reverse_iterator it_smallest = frequencies.rbegin();
for (++it; it!=ite; ++it)
{
if (frequencies.size() < 10) // show top 10
{
frequencies.push_back(*it);
if (it->second < it_smallest->second)
it_smallest = frequencies.rbegin();
}
else if (it->second > it_smallest->second)
{
*it_smallest = *it;
it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<wordcount::job::keyvalue_t>);
}
}
frequencies.sort(boost::mapreduce::detail::greater_2nd<wordcount::job::keyvalue_t>);
std::cout << "\n\nMapReduce results:";
for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq)
std::cout << "\n" << freq->first << "\t" << freq->second;
}
Output
The wordcount program build using Microsoft Visual Studio 2005 and STLPort. It was run on a sample dataset consists of six plain text files consisting a total of 96 MB (100,628,434 bytes). The smallest file is 163 KB (167,529 bytes) and the largest is 88.1 MB (92,392,601 bytes).
MapReduce Wordcount Application
2 CPU cores
class boost::mapreduce::job<struct wordcount::map_task,struct wordcount::reduce_
task,struct boost::mapreduce::null_combiner,class boost::mapreduce::datasource::
directory_iterator<struct wordcount::map_task,class boost::mapreduce::datasource
::file_handler<class stlp_std::basic_string<char,class stlp_std::char_traits<cha
r>,class stlp_std::allocator<char> >,struct stlp_std::pair<char const *,char con
st *> > >,class boost::mapreduce::intermediates::in_memory<struct wordcount::map
_task,struct wordcount::reduce_task,struct boost::mapreduce::hash_partitioner>,c
lass boost::mapreduce::intermediates::reduce_null_output<struct wordcount::map_t
ask,struct wordcount::reduce_task> >
Running Parallel WordCount MapReduce...
MapReduce Finished.
MapReduce statistics:
MapReduce job runtime : 00:00:23.031250 seconds, of which.
..
Map phase runtime : 00:00:20.390625 seconds
Reduce phase runtime : 00:00:02.640625 seconds
Map:
Total Map keys : 100
Map keys processed : 100
Map key processing errors : 0
Number of Map Tasks run (in parallel) : 2
Fastest Map key processed in : 00:00:00.046875 seconds
Slowest Map key processed in : 00:00:00.531250 seconds
Average time to process Map keys : 00:00:00.406093 seconds
Reduce:
Total Reduce keys : 120925
Reduce keys processed : 120925
Reduce key processing errors : 0
Number of Reduce Tasks run (in parallel): 2
Number of Result Files : 2
Fastest Reduce key processed in : 00:00:02.500000 seconds
Slowest Reduce key processed in : 00:00:02.640625 seconds
Average time to process Reduce keys : 00:00:00.051406 seconds
MapReduce results:
the 817758
of 441398
to 426628
and 410033
a 327046
in 282907
that 212299
is 204080
you 162297
i 156817
Adding a Combiner
In some circumstances, an optimisation can be made by consolidating the results of
the Map phase before they are passed to the Reduce phase. This consolidation is
done by a combiner functor.
In the case of the Word Count example, the Map phase will naturally produce list of
words, each with a count of 1. The combiner can be used to total the
number of each word in the list and produce a shorter list with unique word occurrences.
class combiner
{
public:
void start(map_task::intermediate_key_type const &)
{
total_ = 0;
}
template<typename IntermediateStore>
void finish(map_task::intermediate_key_type const &key, IntermediateStore &intermediate_store)
{
if (total_ > 0)
intermediate_store.insert(key, total_);
}
void operator()(map_task::intermediate_value_type const &value)
{
total_ += value;
}
private:
size_t total_;
};
The combiner runs as a part of the Map Task.
MapReduce Wordcount Application
2 CPU cores
class boost::mapreduce::job<struct wordcount::map_task,struct wordcount::reduce_
task,class wordcount::combiner,class boost::mapreduce::datasource::directory_ite
rator<struct wordcount::map_task,class boost::mapreduce::datasource::file_handle
r<class stlp_std::basic_string<char,class stlp_std::char_traits<char>,class stlp
_std::allocator<char> >,struct stlp_std::pair<char const *,char const *> > >,cla
ss boost::mapreduce::intermediates::in_memory<struct wordcount::map_task,struct
wordcount::reduce_task,struct boost::mapreduce::hash_partitioner>,class boost::m
apreduce::intermediates::reduce_null_output<struct wordcount::map_task,struct wo
rdcount::reduce_task> >
Running Parallel WordCount MapReduce...
MapReduce Finished.
MapReduce statistics:
MapReduce job runtime : 00:00:19 seconds, of which...
Map phase runtime : 00:00:18.578125 seconds
Reduce phase runtime : 00:00:00.421875 seconds
Map:
Total Map keys : 100
Map keys processed : 100
Map key processing errors : 0
Number of Map Tasks run (in parallel) : 2
Fastest Map key processed in : 00:00:00.031250 seconds
Slowest Map key processed in : 00:00:00.593750 seconds
Average time to process Map keys : 00:00:00.365468 seconds
Reduce:
Total Reduce keys : 120925
Reduce keys processed : 120925
Reduce key processing errors : 0
Number of Reduce Tasks run (in parallel): 2
Number of Result Files : 2
Fastest Reduce key processed in : 00:00:00.421875 seconds
Slowest Reduce key processed in : 00:00:00.421875 seconds
Average time to process Reduce keys : 00:00:00.008437 seconds
MapReduce results:
the 817758
of 441398
to 426628
and 410033
a 327046
in 282907
that 212299
is 204080
you 162297
i 156817
Source Code
The full source code for the Word Count example can be found in
libs/mapreduce/examples/wordcount/wordcount.cpp




