[code.view]

[top] / python / PyMOTW / docs / multiprocessing / mapreduce.html


<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
  "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">

<html xmlns="http://www.w3.org/1999/xhtml">
  <head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    
    <title>Implementing MapReduce with multiprocessing &mdash; Python Module of the Week</title>
    <link rel="stylesheet" href="../_static/sphinxdoc.css" type="text/css" />
    <link rel="stylesheet" href="../_static/pygments.css" type="text/css" />
    <script type="text/javascript">
      var DOCUMENTATION_OPTIONS = {
        URL_ROOT:    '../',
        VERSION:     '1.132',
        COLLAPSE_INDEX: false,
        FILE_SUFFIX: '.html',
        HAS_SOURCE:  true
      };
    </script>
    <script type="text/javascript" src="../_static/jquery.js"></script>
    <script type="text/javascript" src="../_static/underscore.js"></script>
    <script type="text/javascript" src="../_static/doctools.js"></script>
    <link rel="author" title="About these documents" href="../about.html" />
    <link rel="top" title="Python Module of the Week" href="../index.html" />
    <link rel="up" title="multiprocessing – Manage processes like threads" href="index.html" />
    <link rel="next" title="readline – Interface to the GNU readline library" href="../readline/index.html" />
    <link rel="prev" title="Communication Between Processes" href="communication.html" /> 
  </head>
  <body>
    <div class="related">
      <h3>Navigation</h3>
      <ul>
        <li class="right" style="margin-right: 10px">
          <a href="../genindex.html" title="General Index"
             accesskey="I">index</a></li>
        <li class="right" >
          <a href="../py-modindex.html" title="Python Module Index"
             >modules</a> |</li>
        <li class="right" >
          <a href="../readline/index.html" title="readline – Interface to the GNU readline library"
             accesskey="N">next</a> |</li>
        <li class="right" >
          <a href="communication.html" title="Communication Between Processes"
             accesskey="P">previous</a> |</li>
        <li><a href="../contents.html">PyMOTW</a> &raquo;</li>
          <li><a href="../optional_os.html" >Optional Operating System Services</a> &raquo;</li>
          <li><a href="index.html" accesskey="U">multiprocessing &#8211; Manage processes like threads</a> &raquo;</li> 
      </ul>
    </div>
      <div class="sphinxsidebar">
        <div class="sphinxsidebarwrapper">
  <h3><a href="../contents.html">Table Of Contents</a></h3>
  <ul>
<li><a class="reference internal" href="#">Implementing MapReduce with multiprocessing</a><ul>
<li><a class="reference internal" href="#simplemapreduce">SimpleMapReduce</a></li>
<li><a class="reference internal" href="#counting-words-in-files">Counting Words in Files</a></li>
</ul>
</li>
</ul>

  <h4>Previous topic</h4>
  <p class="topless"><a href="communication.html"
                        title="previous chapter">Communication Between Processes</a></p>
  <h4>Next topic</h4>
  <p class="topless"><a href="../readline/index.html"
                        title="next chapter">readline &#8211; Interface to the GNU readline library</a></p>
  <h3>This Page</h3>
  <ul class="this-page-menu">
    <li><a href="../_sources/multiprocessing/mapreduce.txt"
           rel="nofollow">Show Source</a></li>
  </ul>
<div id="searchbox" style="display: none">
  <h3>Quick search</h3>
    <form class="search" action="../search.html" method="get">
      <input type="text" name="q" size="18" />
      <input type="submit" value="Go" />
      <input type="hidden" name="check_keywords" value="yes" />
      <input type="hidden" name="area" value="default" />
    </form>
    <p class="searchtip" style="font-size: 90%">
    Enter search terms or a module, class or function name.
    </p>
</div>
<script type="text/javascript">$('#searchbox').show(0);</script>
        </div>
      </div>

    <div class="document">
      <div class="documentwrapper">
        <div class="bodywrapper">
          <div class="body">
            
  <div class="section" id="implementing-mapreduce-with-multiprocessing">
<h1>Implementing MapReduce with multiprocessing<a class="headerlink" href="#implementing-mapreduce-with-multiprocessing" title="Permalink to this headline">¶</a></h1>
<p>The <tt class="xref py py-class docutils literal"><span class="pre">Pool</span></tt> class can be used to create a simple single-server
MapReduce implementation.  Although it does not give the full benefits
of distributed processing, it does illustrate how easy it is to break
some problems down into distributable units of work.</p>
<div class="section" id="simplemapreduce">
<h2>SimpleMapReduce<a class="headerlink" href="#simplemapreduce" title="Permalink to this headline">¶</a></h2>
<p>In a MapReduce-based system, input data is broken down into chunks for
processing by different worker instances.  Each chunk of input data is
<em>mapped</em> to an intermediate state using a simple transformation.  The
intermediate data is then collected together and partitioned based on
a key value so that all of the related values are together.  Finally,
the partitioned data is <em>reduced</em> to a result set.</p>
<div class="highlight-python"><div class="highlight"><pre><span class="kn">import</span> <span class="nn">collections</span>
<span class="kn">import</span> <span class="nn">itertools</span>
<span class="kn">import</span> <span class="nn">multiprocessing</span>

<span class="k">class</span> <span class="nc">SimpleMapReduce</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
    
    <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">map_func</span><span class="p">,</span> <span class="n">reduce_func</span><span class="p">,</span> <span class="n">num_workers</span><span class="o">=</span><span class="bp">None</span><span class="p">):</span>
        <span class="sd">&quot;&quot;&quot;</span>
<span class="sd">        map_func</span>

<span class="sd">          Function to map inputs to intermediate data. Takes as</span>
<span class="sd">          argument one input value and returns a tuple with the key</span>
<span class="sd">          and a value to be reduced.</span>
<span class="sd">        </span>
<span class="sd">        reduce_func</span>

<span class="sd">          Function to reduce partitioned version of intermediate data</span>
<span class="sd">          to final output. Takes as argument a key as produced by</span>
<span class="sd">          map_func and a sequence of the values associated with that</span>
<span class="sd">          key.</span>
<span class="sd">         </span>
<span class="sd">        num_workers</span>

<span class="sd">          The number of workers to create in the pool. Defaults to the</span>
<span class="sd">          number of CPUs available on the current host.</span>
<span class="sd">        &quot;&quot;&quot;</span>
        <span class="bp">self</span><span class="o">.</span><span class="n">map_func</span> <span class="o">=</span> <span class="n">map_func</span>
        <span class="bp">self</span><span class="o">.</span><span class="n">reduce_func</span> <span class="o">=</span> <span class="n">reduce_func</span>
        <span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">multiprocessing</span><span class="o">.</span><span class="n">Pool</span><span class="p">(</span><span class="n">num_workers</span><span class="p">)</span>
    
    <span class="k">def</span> <span class="nf">partition</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">mapped_values</span><span class="p">):</span>
        <span class="sd">&quot;&quot;&quot;Organize the mapped values by their key.</span>
<span class="sd">        Returns an unsorted sequence of tuples with a key and a sequence of values.</span>
<span class="sd">        &quot;&quot;&quot;</span>
        <span class="n">partitioned_data</span> <span class="o">=</span> <span class="n">collections</span><span class="o">.</span><span class="n">defaultdict</span><span class="p">(</span><span class="nb">list</span><span class="p">)</span>
        <span class="k">for</span> <span class="n">key</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">mapped_values</span><span class="p">:</span>
            <span class="n">partitioned_data</span><span class="p">[</span><span class="n">key</span><span class="p">]</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">value</span><span class="p">)</span>
        <span class="k">return</span> <span class="n">partitioned_data</span><span class="o">.</span><span class="n">items</span><span class="p">()</span>
    
    <span class="k">def</span> <span class="nf">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">inputs</span><span class="p">,</span> <span class="n">chunksize</span><span class="o">=</span><span class="mi">1</span><span class="p">):</span>
        <span class="sd">&quot;&quot;&quot;Process the inputs through the map and reduce functions given.</span>
<span class="sd">        </span>
<span class="sd">        inputs</span>
<span class="sd">          An iterable containing the input data to be processed.</span>
<span class="sd">        </span>
<span class="sd">        chunksize=1</span>
<span class="sd">          The portion of the input data to hand to each worker.  This</span>
<span class="sd">          can be used to tune performance during the mapping phase.</span>
<span class="sd">        &quot;&quot;&quot;</span>
        <span class="n">map_responses</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pool</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">map_func</span><span class="p">,</span> <span class="n">inputs</span><span class="p">,</span> <span class="n">chunksize</span><span class="o">=</span><span class="n">chunksize</span><span class="p">)</span>
        <span class="n">partitioned_data</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">partition</span><span class="p">(</span><span class="n">itertools</span><span class="o">.</span><span class="n">chain</span><span class="p">(</span><span class="o">*</span><span class="n">map_responses</span><span class="p">))</span>
        <span class="n">reduced_values</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">pool</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">reduce_func</span><span class="p">,</span> <span class="n">partitioned_data</span><span class="p">)</span>
        <span class="k">return</span> <span class="n">reduced_values</span>
</pre></div>
</div>
</div>
<div class="section" id="counting-words-in-files">
<h2>Counting Words in Files<a class="headerlink" href="#counting-words-in-files" title="Permalink to this headline">¶</a></h2>
<p>The following example script uses SimpleMapReduce to counts the
&#8220;words&#8221; in the reStructuredText source for this article, ignoring some
of the markup.</p>
<div class="highlight-python"><div class="highlight"><pre><span class="kn">import</span> <span class="nn">multiprocessing</span>
<span class="kn">import</span> <span class="nn">string</span>

<span class="kn">from</span> <span class="nn">multiprocessing_mapreduce</span> <span class="kn">import</span> <span class="n">SimpleMapReduce</span>

<span class="k">def</span> <span class="nf">file_to_words</span><span class="p">(</span><span class="n">filename</span><span class="p">):</span>
    <span class="sd">&quot;&quot;&quot;Read a file and return a sequence of (word, occurances) values.</span>
<span class="sd">    &quot;&quot;&quot;</span>
    <span class="n">STOP_WORDS</span> <span class="o">=</span> <span class="nb">set</span><span class="p">([</span>
            <span class="s">&#39;a&#39;</span><span class="p">,</span> <span class="s">&#39;an&#39;</span><span class="p">,</span> <span class="s">&#39;and&#39;</span><span class="p">,</span> <span class="s">&#39;are&#39;</span><span class="p">,</span> <span class="s">&#39;as&#39;</span><span class="p">,</span> <span class="s">&#39;be&#39;</span><span class="p">,</span> <span class="s">&#39;by&#39;</span><span class="p">,</span> <span class="s">&#39;for&#39;</span><span class="p">,</span> <span class="s">&#39;if&#39;</span><span class="p">,</span> <span class="s">&#39;in&#39;</span><span class="p">,</span> 
            <span class="s">&#39;is&#39;</span><span class="p">,</span> <span class="s">&#39;it&#39;</span><span class="p">,</span> <span class="s">&#39;of&#39;</span><span class="p">,</span> <span class="s">&#39;or&#39;</span><span class="p">,</span> <span class="s">&#39;py&#39;</span><span class="p">,</span> <span class="s">&#39;rst&#39;</span><span class="p">,</span> <span class="s">&#39;that&#39;</span><span class="p">,</span> <span class="s">&#39;the&#39;</span><span class="p">,</span> <span class="s">&#39;to&#39;</span><span class="p">,</span> <span class="s">&#39;with&#39;</span><span class="p">,</span>
            <span class="p">])</span>
    <span class="n">TR</span> <span class="o">=</span> <span class="n">string</span><span class="o">.</span><span class="n">maketrans</span><span class="p">(</span><span class="n">string</span><span class="o">.</span><span class="n">punctuation</span><span class="p">,</span> <span class="s">&#39; &#39;</span> <span class="o">*</span> <span class="nb">len</span><span class="p">(</span><span class="n">string</span><span class="o">.</span><span class="n">punctuation</span><span class="p">))</span>

    <span class="k">print</span> <span class="n">multiprocessing</span><span class="o">.</span><span class="n">current_process</span><span class="p">()</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> <span class="s">&#39;reading&#39;</span><span class="p">,</span> <span class="n">filename</span>
    <span class="n">output</span> <span class="o">=</span> <span class="p">[]</span>

    <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">filename</span><span class="p">,</span> <span class="s">&#39;rt&#39;</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
        <span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">f</span><span class="p">:</span>
            <span class="k">if</span> <span class="n">line</span><span class="o">.</span><span class="n">lstrip</span><span class="p">()</span><span class="o">.</span><span class="n">startswith</span><span class="p">(</span><span class="s">&#39;..&#39;</span><span class="p">):</span> <span class="c"># Skip rst comment lines</span>
                <span class="k">continue</span>
            <span class="n">line</span> <span class="o">=</span> <span class="n">line</span><span class="o">.</span><span class="n">translate</span><span class="p">(</span><span class="n">TR</span><span class="p">)</span> <span class="c"># Strip punctuation</span>
            <span class="k">for</span> <span class="n">word</span> <span class="ow">in</span> <span class="n">line</span><span class="o">.</span><span class="n">split</span><span class="p">():</span>
                <span class="n">word</span> <span class="o">=</span> <span class="n">word</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span>
                <span class="k">if</span> <span class="n">word</span><span class="o">.</span><span class="n">isalpha</span><span class="p">()</span> <span class="ow">and</span> <span class="n">word</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">STOP_WORDS</span><span class="p">:</span>
                    <span class="n">output</span><span class="o">.</span><span class="n">append</span><span class="p">(</span> <span class="p">(</span><span class="n">word</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span> <span class="p">)</span>
    <span class="k">return</span> <span class="n">output</span>


<span class="k">def</span> <span class="nf">count_words</span><span class="p">(</span><span class="n">item</span><span class="p">):</span>
    <span class="sd">&quot;&quot;&quot;Convert the partitioned data for a word to a</span>
<span class="sd">    tuple containing the word and the number of occurances.</span>
<span class="sd">    &quot;&quot;&quot;</span>
    <span class="n">word</span><span class="p">,</span> <span class="n">occurances</span> <span class="o">=</span> <span class="n">item</span>
    <span class="k">return</span> <span class="p">(</span><span class="n">word</span><span class="p">,</span> <span class="nb">sum</span><span class="p">(</span><span class="n">occurances</span><span class="p">))</span>


<span class="k">if</span> <span class="n">__name__</span> <span class="o">==</span> <span class="s">&#39;__main__&#39;</span><span class="p">:</span>
    <span class="kn">import</span> <span class="nn">operator</span>
    <span class="kn">import</span> <span class="nn">glob</span>

    <span class="n">input_files</span> <span class="o">=</span> <span class="n">glob</span><span class="o">.</span><span class="n">glob</span><span class="p">(</span><span class="s">&#39;*.rst&#39;</span><span class="p">)</span>
    
    <span class="n">mapper</span> <span class="o">=</span> <span class="n">SimpleMapReduce</span><span class="p">(</span><span class="n">file_to_words</span><span class="p">,</span> <span class="n">count_words</span><span class="p">)</span>
    <span class="n">word_counts</span> <span class="o">=</span> <span class="n">mapper</span><span class="p">(</span><span class="n">input_files</span><span class="p">)</span>
    <span class="n">word_counts</span><span class="o">.</span><span class="n">sort</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="n">operator</span><span class="o">.</span><span class="n">itemgetter</span><span class="p">(</span><span class="mi">1</span><span class="p">))</span>
    <span class="n">word_counts</span><span class="o">.</span><span class="n">reverse</span><span class="p">()</span>
    
    <span class="k">print</span> <span class="s">&#39;</span><span class="se">\n</span><span class="s">TOP 20 WORDS BY FREQUENCY</span><span class="se">\n</span><span class="s">&#39;</span>
    <span class="n">top20</span> <span class="o">=</span> <span class="n">word_counts</span><span class="p">[:</span><span class="mi">20</span><span class="p">]</span>
    <span class="n">longest</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="n">word</span><span class="p">)</span> <span class="k">for</span> <span class="n">word</span><span class="p">,</span> <span class="n">count</span> <span class="ow">in</span> <span class="n">top20</span><span class="p">)</span>
    <span class="k">for</span> <span class="n">word</span><span class="p">,</span> <span class="n">count</span> <span class="ow">in</span> <span class="n">top20</span><span class="p">:</span>
        <span class="k">print</span> <span class="s">&#39;</span><span class="si">%-*s</span><span class="s">: </span><span class="si">%5s</span><span class="s">&#39;</span> <span class="o">%</span> <span class="p">(</span><span class="n">longest</span><span class="o">+</span><span class="mi">1</span><span class="p">,</span> <span class="n">word</span><span class="p">,</span> <span class="n">count</span><span class="p">)</span>
</pre></div>
</div>
<p>The <tt class="xref py py-func docutils literal"><span class="pre">file_to_words()</span></tt> function converts each input file to a
sequence of tuples containing the word and the number 1 (representing
a single occurrence) .The data is partitioned by <tt class="xref py py-func docutils literal"><span class="pre">partition()</span></tt>
using the word as the key, so the partitioned data consists of a key
and a sequence of 1 values representing each occurrence of the word.
The partioned data is converted to a set of suples containing a word
and the count for that word by <tt class="xref py py-func docutils literal"><span class="pre">count_words()</span></tt> during the
reduction phase.</p>
<div class="highlight-python"><pre>$ python multiprocessing_wordcount.py

PoolWorker-2 reading communication.rst
PoolWorker-2 reading index.rst
PoolWorker-1 reading basics.rst
PoolWorker-1 reading mapreduce.rst

TOP 20 WORDS BY FREQUENCY

process         :    80
multiprocessing :    40
worker          :    37
after           :    33
starting        :    32
running         :    31
processes       :    30
python          :    29
start           :    28
class           :    27
literal         :    26
header          :    26
pymotw          :    26
end             :    26
daemon          :    22
now             :    21
func            :    20
can             :    20
consumer        :    19
mod             :    17</pre>
</div>
<div class="admonition-see-also admonition seealso">
<p class="first admonition-title">See also</p>
<dl class="last docutils">
<dt><a class="reference external" href="http://en.wikipedia.org/wiki/MapReduce">MapReduce - Wikipedia</a></dt>
<dd>Overview of MapReduce on Wikipedia.</dd>
<dt><a class="reference external" href="http://labs.google.com/papers/mapreduce.html">MapReduce: Simplified Data Processing on Large Clusters</a></dt>
<dd>Google Labs presentation and paper on MapReduce.</dd>
<dt><a class="reference internal" href="../operator/index.html#module-operator" title="operator: Functional interface to built-in operators"><tt class="xref py py-mod docutils literal"><span class="pre">operator</span></tt></a></dt>
<dd>Operator tools such as <tt class="docutils literal"><span class="pre">itemgetter()</span></tt>.</dd>
</dl>
</div>
<p><em>Special thanks to Jesse Noller for helping review this information.</em></p>
</div>
</div>


          </div>
        </div>
      </div>
      <div class="clearer"></div>
    </div>
    <div class="related">
      <h3>Navigation</h3>
      <ul>
        <li class="right" style="margin-right: 10px">
          <a href="../genindex.html" title="General Index"
             >index</a></li>
        <li class="right" >
          <a href="../py-modindex.html" title="Python Module Index"
             >modules</a> |</li>
        <li class="right" >
          <a href="../readline/index.html" title="readline – Interface to the GNU readline library"
             >next</a> |</li>
        <li class="right" >
          <a href="communication.html" title="Communication Between Processes"
             >previous</a> |</li>
        <li><a href="../contents.html">PyMOTW</a> &raquo;</li>
          <li><a href="../optional_os.html" >Optional Operating System Services</a> &raquo;</li>
          <li><a href="index.html" >multiprocessing &#8211; Manage processes like threads</a> &raquo;</li> 
      </ul>
    </div>
    <div class="footer">
      &copy; Copyright Doug Hellmann.
      Last updated on Oct 24, 2010.
      Created using <a href="http://sphinx.pocoo.org/">Sphinx</a>.

    <br/><a href="http://creativecommons.org/licenses/by-nc-sa/3.0/us/" rel="license"><img alt="Creative Commons License" style="border-width:0" src="http://i.creativecommons.org/l/by-nc-sa/3.0/us/88x31.png"/></a>
    
    </div>
  </body>
</html>

[top] / python / PyMOTW / docs / multiprocessing / mapreduce.html

contact | logmethods.com