[code.view]

[top] / python / PyMOTW / multiprocessing / communication.rst

     ###############################
     Communication Between Processes
     ###############################
     
     As with threads, a common use pattern for multiple processes is to
     divide a job up among several workers to run in parallel.  Effective
     use of multiple processes usually requires some communication between
     them, so that work can be divided and results can be aggregated.
     
     .. _multiprocessing-queues:
     
     Passing Messages to Processes
     =============================
     
     A simple way to communicate between process with
     :mod:`multiprocessing` is to use a :class:`Queue` to pass messages
     back and forth.  Any pickle-able object can pass through a
     :class:`Queue`.
     
     .. include:: multiprocessing_queue.py
         :literal:
         :start-after: #end_pymotw_header
     
     This short example only passes a single message to a single worker,
     then the main process waits for the worker to finish.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, 'multiprocessing_queue.py'))
     .. }}}
     
     ::
     
     	$ python multiprocessing_queue.py
     
     	Doing something fancy in Process-1 for Fancy Dan!
     
     .. {{{end}}}
     
     A more complex example shows how to manage several workers consuming
     data from a :class:`JoinableQueue` and passing results back to the
     parent process.  The *poison pill* technique is used to stop the
     workers.  After setting up the real tasks, the main program adds one
     "stop" value per worker to the job queue.  When a worker encounters
     the special value, it breaks out of its processing loop.  The main
     process uses the task queue's :func:`join` method to wait for all of
     the tasks to finish before processin the results.
     
     .. include:: multiprocessing_producer_consumer.py
         :literal:
         :start-after: #end_pymotw_header
     
     Although the jobs enter the queue in order, since their execution is
     parallelized there is no guarantee about the order they will be
     completed.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, '-u multiprocessing_producer_consumer.py'))
     .. }}}
     
     ::
     
     	$ python -u multiprocessing_producer_consumer.py
     
     	Creating 4 consumers
     	Consumer-1: 0 * 0
     	Consumer-2: 1 * 1
     	Consumer-4: 2 * 2
     	Consumer-3: 3 * 3
     	Consumer-1: 4 * 4
     	Consumer-2: 5 * 5
     	Consumer-4: 6 * 6
     	Consumer-3: 7 * 7
     	Consumer-1: 8 * 8
     	Consumer-2: 9 * 9
     	Consumer-4: Exiting
     	Consumer-3: Exiting
     	Consumer-1: Exiting
     	Consumer-2: Exiting
     	Result: 0 * 0 = 0
     	Result: 1 * 1 = 1
     	Result: 2 * 2 = 4
     	Result: 3 * 3 = 9
     	Result: 4 * 4 = 16
     	Result: 5 * 5 = 25
     	Result: 6 * 6 = 36
     	Result: 7 * 7 = 49
     	Result: 8 * 8 = 64
     	Result: 9 * 9 = 81
     
     .. {{{end}}}
     
     
     
     Signaling between Processes
     ===========================
     
     The :class:`Event` class provides a simple way to communicate state
     information between processes.  An event can be toggled between set
     and unset states.  Users of the event object can wait for it to change
     from unset to set, using an optional timeout value.
     
     .. include:: multiprocessing_event.py
         :literal:
         :start-after: #end_pymotw_header
     
     When :func:`wait` times out it returns without an error.  The caller
     is responsible for checking the state of the event using
     :func:`is_set`.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, '-u multiprocessing_event.py'))
     .. }}}
     
     ::
     
     	$ python -u multiprocessing_event.py
     
     	main: waiting before calling Event.set()
     	wait_for_event: starting
     	wait_for_event_timeout: starting
     	wait_for_event_timeout: e.is_set()-> False
     	main: event is set
     	wait_for_event: e.is_set()-> True
     
     .. {{{end}}}
     
     Controlling Access to Resources
     ===============================
     
     In situations when a single resource needs to be shared between
     multiple processes, a :class:`Lock` can be used to avoid conflicting
     accesses.
     
     .. include:: multiprocessing_lock.py
         :literal:
         :start-after: #end_pymotw_header
     
     In this example, the messages printed to the console may be jumbled
     together if the two processes do not synchronize their access of the
     output stream with the lock.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, 'multiprocessing_lock.py'))
     .. }}}
     
     ::
     
     	$ python multiprocessing_lock.py
     
     	Lock acquired via with
     	Lock acquired directly
     
     .. {{{end}}}
     
     
     Synchronizing Operations
     ========================
     
     :class:`Condition` objects can be used to synchronize parts of a
     workflow so that some run in parallel but others run sequentially,
     even if they are in separate processes.
     
     .. include:: multiprocessing_condition.py
         :literal:
         :start-after: #end_pymotw_header
     
     In this example, two process run the second stage of a job in
     parallel, but only after the first stage is done.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, 'multiprocessing_condition.py'))
     .. }}}
     
     ::
     
     	$ python multiprocessing_condition.py
     
     	Starting s1
     	s1 done and ready for stage 2
     	Starting stage_2[1]
     	stage_2[1] running
     	Starting stage_2[2]
     	stage_2[2] running
     
     .. {{{end}}}
     
     
     Controlling Concurrent Access to Resources
     ==========================================
     
     Sometimes it is useful to allow more than one worker access to a
     resource at a time, while still limiting the overall number. For
     example, a connection pool might support a fixed number of
     simultaneous connections, or a network application might support a
     fixed number of concurrent downloads. A :class:`Semaphore` is one way
     to manage those connections.
     
     .. include:: multiprocessing_semaphore.py
         :literal:
         :start-after: #end_pymotw_header
     
     In this example, the :class:`ActivePool` class simply serves as a
     convenient way to track which processes are running at a given
     moment. A real resource pool would probably allocate a connection or
     some other value to the newly active process, and reclaim the value
     when the task is done. Here, the pool is just used to hold the names
     of the active processes to show that only three are running
     concurrently.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, 'multiprocessing_semaphore.py'))
     .. }}}
     
     ::
     
     	$ python multiprocessing_semaphore.py
     
     	Now running: ['0', '1', '5']
     	Now running: ['0', '1', '5']
     	Now running: ['0', '1', '5']
     	Now running: ['1', '5', '2']
     	Now running: ['2', '6', '3']
     	Now running: ['1', '2', '6']
     	Now running: ['6', '8', '9']
     	Now running: ['9', '7', '4']
     	Now running: ['6', '8', '9']
     	Now running: ['8', '9', '7']
     	Now running: ['1', '5', '2']
     	Now running: ['2', '6', '3']
     	Now running: ['6', '8', '9']
     	Now running: ['6', '8', '9']
     	Now running: ['9', '7']
     	Now running: ['9', '7']
     	Now running: ['9', '7']
     	Now running: []
     	Now running: []
     	Now running: []
     
     .. {{{end}}}
     
     Managing Shared State
     =====================
     
     In the previous example, the list of active processes is maintained
     centrally in the :class:`ActivePool` instance via a special type of
     list object created by a :class:`Manager`.  The :class:`Manager` is
     responsible for coordinating shared information state between all of
     its users.  
     
     .. include:: multiprocessing_manager_dict.py
         :literal:
         :start-after: #end_pymotw_header
     
     By creating the list through the manager, it is shared and updates are
     seen in all processes.  Dictionaries are also supported.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, 'multiprocessing_manager_dict.py'))
     .. }}}
     
     ::
     
     	$ python multiprocessing_manager_dict.py
     
     	Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}
     
     .. {{{end}}}
     
     Shared Namespaces
     =================
     
     In addition to dictionaries and lists, a :class:`Manager` can create a
     shared :class:`Namespace`.  
     
     .. include:: multiprocessing_namespaces.py
         :literal:
         :start-after: #end_pymotw_header
     
     Any named value added to the :class:`Namespace` is visible to all of
     the clients that receive the :class:`Namespace` instance.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, 'multiprocessing_namespaces.py'))
     .. }}}
     
     ::
     
     	$ python multiprocessing_namespaces.py
     
     	Before event, consumer got: 'Namespace' object has no attribute 'value'
     	After event, consumer got: This is the value
     
     .. {{{end}}}
     
     It is important to know that *updates* to the contents of mutable
     values in the namespace are *not* propagated automatically.
     
     .. include:: multiprocessing_namespaces_mutable.py
         :literal:
         :start-after: #end_pymotw_header
     
     To update the list, attach it to the namespace object again.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, 'multiprocessing_namespaces_mutable.py'))
     .. }}}
     
     ::
     
     	$ python multiprocessing_namespaces_mutable.py
     
     	Before event, consumer got: []
     	After event, consumer got: []
     
     .. {{{end}}}
     
     
     Process Pools
     =============
     
     The :class:`Pool` class can be used to manage a fixed number of
     workers for simple cases where the work to be done can be broken up
     and distributed between workers independently.  The return values from
     the jobs are collected and returned as a list.  The pool arguments
     include the number of processes and a function to run when starting
     the task process (invoked once per child).
     
     .. include:: multiprocessing_pool.py
         :literal:
         :start-after: #end_pymotw_header
     
     The result of the :func:`map` method is functionally equivalent to the
     built-in :func:`map`, except that individual tasks run in parallel.
     Since the pool is processing its inputs in parallel, :func:`close` and
     :func:`join` can be used to synchronize the main process with the task
     processes to ensure proper cleanup.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, 'multiprocessing_pool.py'))
     .. }}}
     
     ::
     
     	$ python multiprocessing_pool.py
     
     	Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
     	Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
     	Starting PoolWorker-3
     	Starting PoolWorker-4
     	Starting PoolWorker-1
     	Starting PoolWorker-2
     	Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
     
     .. {{{end}}}
     
     By default :class:`Pool` creates a fixed number of worker processes
     and passes jobs to them until there are no more jobs.  Setting the
     *maxtasksperchild* parameter tells the pool to restart a worker
     process after it has finished a few tasks.  This can be used to avoid
     having long-running workers consume ever more system resources.
     
     .. include:: multiprocessing_pool_maxtasksperchild.py
        :literal:
        :start-after: #end_pymotw_header
     
     The pool restarts the workers when they have completed their allotted
     tasks, even if there is no more work.  In this output, eight workers
     are created, even though there are only 10 tasks, and each worker can
     complete two of them at a time.
     
     .. {{{cog
     .. cog.out(run_script(cog.inFile, 'multiprocessing_pool_maxtasksperchild.py'))
     .. }}}
     
     ::
     
     	$ python multiprocessing_pool_maxtasksperchild.py
     
     	Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
     	Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
     	Starting PoolWorker-1
     	Starting PoolWorker-2
     	Starting PoolWorker-4
     	Starting PoolWorker-3
     	Starting PoolWorker-5
     	Starting PoolWorker-6
     	Starting PoolWorker-7
     	Starting PoolWorker-8
     	Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
     
     .. {{{end}}}
     
     

[top] / python / PyMOTW / multiprocessing / communication.rst

contact | logmethods.com