[code.view]

[top] / python / PyMOTW / multiprocessing / multiprocessing_mapreduce.py

     #!/usr/bin/env python
     # encoding: utf-8
     #
     # Copyright (c) 2009 Doug Hellmann All rights reserved.
     #
     """
     """
     #end_pymotw_header
     import collections
     import itertools
     import multiprocessing
     
     class SimpleMapReduce(object):
         
         def __init__(self, map_func, reduce_func, num_workers=None):
             """
             map_func
     
               Function to map inputs to intermediate data. Takes as
               argument one input value and returns a tuple with the key
               and a value to be reduced.
             
             reduce_func
     
               Function to reduce partitioned version of intermediate data
               to final output. Takes as argument a key as produced by
               map_func and a sequence of the values associated with that
               key.
              
             num_workers
     
               The number of workers to create in the pool. Defaults to the
               number of CPUs available on the current host.
             """
             self.map_func = map_func
             self.reduce_func = reduce_func
             self.pool = multiprocessing.Pool(num_workers)
         
         def partition(self, mapped_values):
             """Organize the mapped values by their key.
             Returns an unsorted sequence of tuples with a key and a sequence of values.
             """
             partitioned_data = collections.defaultdict(list)
             for key, value in mapped_values:
                 partitioned_data[key].append(value)
             return partitioned_data.items()
         
         def __call__(self, inputs, chunksize=1):
             """Process the inputs through the map and reduce functions given.
             
             inputs
               An iterable containing the input data to be processed.
             
             chunksize=1
               The portion of the input data to hand to each worker.  This
               can be used to tune performance during the mapping phase.
             """
             map_responses = self.pool.map(self.map_func, inputs, chunksize=chunksize)
             partitioned_data = self.partition(itertools.chain(*map_responses))
             reduced_values = self.pool.map(self.reduce_func, partitioned_data)
             return reduced_values
     

[top] / python / PyMOTW / multiprocessing / multiprocessing_mapreduce.py

contact | logmethods.com