.. _parallel_multiengine: ========================== IPython's Direct interface ========================== The direct, or multiengine, interface represents one possible way of working with a set of IPython engines. The basic idea behind the multiengine interface is that the capabilities of each engine are directly and explicitly exposed to the user. Thus, in the multiengine interface, each engine is given an id that is used to identify the engine and give it work to do. This interface is very intuitive and is designed with interactive usage in mind, and is the best place for new users of IPython to begin. Creating a ``DirectView`` instance ================================== For direct execution, we will make use of a :class:`DirectView` object, which can be constructed via index-access to the client: .. sourcecode:: ipython In [4]: dview = rc[:] # use all engines We can also create views for different subsets of engines: .. sourcecode:: ipython In [5]: even = rc[::2] # every other engine In [6]: odd = rc[1::2] Quick and easy parallelism ========================== In many cases, you simply want to apply a Python function to a sequence of objects, but *in parallel*. The client interface provides a simple way of accomplishing this: using the DirectView's :meth:`~DirectView.map` method. Parallel map ------------ Python's builtin :func:`map` functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, since IPython's interface is all about functions anyway, you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a DirectView's :meth:`map` method: .. sourcecode:: ipython In [61]: dview.block = True In [62]: serial_result = map(lambda x:x**10, range(32)) In [63]: parallel_result = dview.map(lambda x: x**10, range(32)) In [67]: serial_result==parallel_result Out[67]: True Remote function decorators -------------------------- Remote functions are a bit more interesting when the view has multiple engines: .. sourcecode:: ipython In [10]: @dview.remote(block=True) ....: def getpid(): ....: import os ....: return os.getpid() ....: In [11]: getpid() Out[11]: [12345, 12346, 12347, 12348] The ``@parallel`` decorator creates parallel functions, that break up an element-wise operations and distribute them, reconstructing the result. .. sourcecode:: ipython In [12]: import numpy as np In [13]: A = np.random.random((64,48)) In [14]: @dview.parallel(block=True) ....: def pmul(A,B): ....: return A*B In [15]: C_local = A*A In [16]: C_remote = pmul(A,A) In [17]: (C_local == C_remote).all() Out[17]: True Calling Python functions ======================== Now we are back to apply, but with more than one target. apply ----- The main method for doing remote execution (in fact, all methods that communicate with the engines are built on top of it), is :meth:`View.apply`. We strive to provide the cleanest interface we can, so `apply` has the following signature: .. sourcecode:: python view.apply(f, *args, **kwargs) There are various ways to call functions with IPython, and these flags are set as attributes of the View. The ``DirectView`` has just two of these flags: dv.block : bool whether to wait for the result, or return an :class:`AsyncResult` object immediately dv.track : bool whether to instruct pyzmq to track when This is primarily useful for non-copying sends of numpy arrays that you plan to edit in-place. You need to know when it becomes safe to edit the buffer without corrupting the message. dv.targets : int, list of ints which targets this view is associated with. For convenience, you can set block temporarily for a single call with the extra sync/async methods. Blocking execution ------------------ In blocking mode, the :class:`.DirectView` object (called ``dview`` in these examples) submits the command to the controller, which places the command in the engines' queues for execution. The :meth:`apply` call then blocks until the engines are done executing the command: .. sourcecode:: ipython In [2]: dview = rc[:] # A DirectView of all engines In [3]: dview.block=True In [4]: dview['a'] = 5 In [5]: dview['b'] = 10 In [6]: dview.apply(lambda x: a+b+x, 27) Out[6]: [42, 42, 42, 42] You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync` method: In [7]: dview.block=False In [8]: dview.apply_sync(lambda x: a+b+x, 27) Out[8]: [42, 42, 42, 42] Python commands can be executed as strings on specific engines by using a View's ``execute`` method: .. sourcecode:: ipython In [6]: rc[::2].execute('c=a+b') In [7]: rc[1::2].execute('c=a-b') In [8]: dview['c'] # shorthand for dview.pull('c', block=True) Out[8]: [15, -5, 15, -5] Non-blocking execution ---------------------- In non-blocking mode, :meth:`apply` submits the command to be executed and then returns a :class:`AsyncResult` object immediately. The :class:`AsyncResult` object gives you a way of getting a result at a later time through its :meth:`get` method. This allows you to quickly submit long running commands without blocking your local Python/IPython session: .. sourcecode:: ipython # define our function In [6]: def wait(t): ....: import time ....: tic = time.time() ....: time.sleep(t) ....: return time.time()-tic # In non-blocking mode In [7]: ar = dview.apply_async(wait, 2) # Now block for the result In [8]: ar.get() Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154] # Again in non-blocking mode In [9]: ar = dview.apply_async(wait, 10) # Poll to see if the result is ready In [10]: ar.ready() Out[10]: False # ask for the result, but wait a maximum of 1 second: In [45]: ar.get(1) --------------------------------------------------------------------------- TimeoutError Traceback (most recent call last) /home/you/ in () ----> 1 ar.get(1) /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout) 62 raise self._exception 63 else: ---> 64 raise error.TimeoutError("Result not ready.") 65 66 def ready(self): TimeoutError: Result not ready. Often, it is desirable to wait until a set of :class:`AsyncResult` objects are done. For this, there is a the method :meth:`wait`. This method takes a tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History), and blocks until all of the associated results are ready: .. sourcecode:: ipython In [72]: dview.block=False # A trivial list of AsyncResults objects In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)] # Wait until all of them are done In [74]: dview.wait(pr_list) # Then, their results are ready using get() or the `.r` attribute In [75]: pr_list[0].get() Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752] The ``block`` and ``targets`` attributes ---------------------------------------- Most DirectView methods (excluding :meth:`apply`) accept ``block`` and ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the blocking mode and which engines the command is applied to. The :class:`View` class also has :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`: * If no keyword argument is provided, the instance attributes are used. * Keyword argument, if provided override the instance attributes for the duration of a single call. The following examples demonstrate how to use the instance attributes: .. sourcecode:: ipython In [16]: dview.targets = [0,2] In [17]: dview.block = False In [18]: ar = dview.apply(lambda : 10) In [19]: ar.get() Out[19]: [10, 10] In [16]: dview.targets = v.client.ids # all engines (4) In [21]: dview.block = True In [22]: dview.apply(lambda : 42) Out[22]: [42, 42, 42, 42] The :attr:`block` and :attr:`targets` instance attributes of the :class:`.DirectView` also determine the behavior of the parallel magic commands. Parallel magic commands ----------------------- We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) that make it more pleasant to execute Python commands on the engines interactively. These are simply shortcuts to :meth:`execute` and :meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single Python command on the engines specified by the :attr:`targets` attribute of the :class:`DirectView` instance: .. sourcecode:: ipython # Create a DirectView for all targets In [22]: dv = rc[:] # Make this DirectView active for parallel magic commands In [23]: dv.activate() In [24]: dv.block=True # import numpy here and everywhere In [25]: with dv.sync_imports(): ....: import numpy importing numpy on engine(s) In [27]: %px a = numpy.random.rand(2,2) Parallel execution on engines: [0, 1, 2, 3] In [28]: %px ev = numpy.linalg.eigvals(a) Parallel execution on engines: [0, 1, 2, 3] In [28]: dv['ev'] Out[28]: [ array([ 1.09522024, -0.09645227]), ....: array([ 1.21435496, -0.35546712]), ....: array([ 0.72180653, 0.07133042]), ....: array([ 1.46384341, 1.04353244e-04]) ....: ] The ``%result`` magic gets the most recent result, or takes an argument specifying the index of the result to be requested. It is simply a shortcut to the :meth:`get_result` method: .. sourcecode:: ipython In [29]: dv.apply_async(lambda : ev) In [30]: %result Out[30]: [ [ 1.28167017 0.14197338], ....: [-0.14093616 1.27877273], ....: [-0.37023573 1.06779409], ....: [ 0.83664764 -0.25602658] ] The ``%autopx`` magic switches to a mode where everything you type is executed on the engines given by the :attr:`targets` attribute: .. sourcecode:: ipython In [30]: dv.block=False In [31]: %autopx Auto Parallel Enabled Type %autopx to disable In [32]: max_evals = [] In [33]: for i in range(100): ....: a = numpy.random.rand(10,10) ....: a = a+a.transpose() ....: evals = numpy.linalg.eigvals(a) ....: max_evals.append(evals[0].real) ....: ....: In [34]: %autopx Auto Parallel Disabled In [35]: dv.block=True In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals)) Parallel execution on engines: [0, 1, 2, 3] In [37]: dv['ans'] Out[37]: [ 'Average max eigenvalue is: 10.1387247332', ....: 'Average max eigenvalue is: 10.2076902286', ....: 'Average max eigenvalue is: 10.1891484655', ....: 'Average max eigenvalue is: 10.1158837784',] Moving Python objects around ============================ Just like elsewhere, now that we have multiple engines, push/pull work with multiple engines at once. Basic push and pull ------------------- Here are some examples of how you use :meth:`push` and :meth:`pull`: .. sourcecode:: ipython In [38]: dview.push(dict(a=1.03234,b=3453)) Out[38]: [None,None,None,None] In [39]: dview.pull('a') Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234] In [40]: dview.pull('b', targets=0) Out[40]: 3453 In [41]: dview.pull(('a','b')) Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ] In [43]: dview.push(dict(c='speed')) Out[43]: [None,None,None,None] In non-blocking mode :meth:`push` and :meth:`pull` also return :class:`AsyncResult` objects: .. sourcecode:: ipython In [48]: ar = dview.pull('a', block=False) In [49]: ar.get() Out[49]: [1.03234, 1.03234, 1.03234, 1.03234] Dictionary interface -------------------- Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide dictionary-style access by key and methods such as :meth:`get` and :meth:`update` for convenience. This make the remote namespaces of the engines appear as a local dictionary. Underneath, these methods call :meth:`apply`: .. sourcecode:: ipython In [51]: dview['a']=['foo','bar'] In [52]: dview['a'] Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ] Scatter and gather ------------------ Sometimes it is useful to partition a sequence and push the partitions to different engines. In MPI language, this is know as scatter/gather and we follow that terminology. However, it is important to remember that in IPython's :class:`Client` class, :meth:`scatter` is from the interactive IPython session to the engines and :meth:`gather` is from the engines back to the interactive IPython session. For scatter/gather operations between engines, MPI, pyzmq, or some other direct interconnect should be used. .. sourcecode:: ipython In [58]: dview.scatter('a',range(16)) Out[58]: [None,None,None,None] In [59]: dview['a'] Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ] In [60]: dview.gather('a') Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] Exercise: Matrix Multiply ========================= Using scatter and gather, can you write a parallel matrix-multiply? `solution <_static/soln/matmul.py>`_ Other things to look at ======================= How to do parallel list comprehensions -------------------------------------- In many cases list comprehensions are nicer than using the map function. While we don't have fully parallel list comprehensions, it is simple to get the basic effect using :meth:`scatter` and :meth:`gather`: .. sourcecode:: ipython In [66]: dview.scatter('x',range(64)) In [67]: %px y = [i**10 for i in x] Parallel execution on engines: [0, 1, 2, 3] Out[67]: In [68]: y = dview.gather('y') In [69]: print y [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...] .. _parallel_exceptions: Parallel exceptions ------------------- In the multiengine interface, parallel commands can raise Python exceptions, just like serial commands. But, it is a little subtle, because a single parallel command can actually raise multiple exceptions (one for each engine the command was run on). To express this idea, we have a :exc:`CompositeError` exception class that will be raised in most cases. The :exc:`CompositeError` class is a special type of exception that wraps one or more other types of exceptions. Here is how it works: .. sourcecode:: ipython In [76]: dview.block=True In [77]: dview.execute('1/0') --------------------------------------------------------------------------- CompositeError Traceback (most recent call last) /home/user/ in () ----> 1 dview.execute('1/0') /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block) 591 default: self.block 592 """ --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets) 594 595 def run(self, filename, targets=None, block=None): /home/user/ in _really_apply(self, f, args, kwargs, targets, block, track) /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs) 55 def sync_results(f, self, *args, **kwargs): 56 """sync relevant results from self.client to our results attribute.""" ---> 57 ret = f(self, *args, **kwargs) 58 delta = self.outstanding.difference(self.client.outstanding) 59 completed = self.outstanding.intersection(delta) /home/user/ in _really_apply(self, f, args, kwargs, targets, block, track) /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs) 44 n_previous = len(self.client.history) 45 try: ---> 46 ret = f(self, *args, **kwargs) 47 finally: 48 nmsgs = len(self.client.history) - n_previous /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track) 529 if block: 530 try: --> 531 return ar.get() 532 except KeyboardInterrupt: 533 pass /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout) 101 return self._result 102 else: --> 103 raise self._exception 104 else: 105 raise error.TimeoutError("Result not ready.") CompositeError: one or more exceptions from call to method: _execute [0:apply]: ZeroDivisionError: integer division or modulo by zero [1:apply]: ZeroDivisionError: integer division or modulo by zero [2:apply]: ZeroDivisionError: integer division or modulo by zero [3:apply]: ZeroDivisionError: integer division or modulo by zero Notice how the error message printed when :exc:`CompositeError` is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions: .. sourcecode:: ipython In [80]: try: ....: dview.execute('1/0') ....: except parallel.error.CompositeError, e: ....: e.raise_exception() ....: ....: --------------------------------------------------------------------------- RemoteError Traceback (most recent call last) /home/user/ in () 2 dview.execute('1/0') 3 except CompositeError as e: ----> 4 e.raise_exception() /path/to/site-packages/IPython/parallel/error.pyc in raise_exception(self, excid) 266 raise IndexError("an exception with index %i does not exist"%excid) 267 else: --> 268 raise RemoteError(en, ev, etb, ei) 269 270 RemoteError: ZeroDivisionError(integer division or modulo by zero) Traceback (most recent call last): File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request exec code in working,working File "", line 1, in File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute exec code in globals() File "", line 1, in ZeroDivisionError: integer division or modulo by zero If you are working in IPython, you can simple type ``%debug`` after one of these :exc:`CompositeError` exceptions is raised, and inspect the exception instance: .. sourcecode:: ipython In [81]: dview.execute('1/0') --------------------------------------------------------------------------- CompositeError Traceback (most recent call last) /home/user/ in () ----> 1 dview.execute('1/0') /path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block) 591 default: self.block 592 """ --> 593 return self._really_apply(util._execute, args=(code,), block=block, targets=targets) 594 595 def run(self, filename, targets=None, block=None): /home/user/ in _really_apply(self, f, args, kwargs, targets, block, track) /path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs) 55 def sync_results(f, self, *args, **kwargs): 56 """sync relevant results from self.client to our results attribute.""" ---> 57 ret = f(self, *args, **kwargs) 58 delta = self.outstanding.difference(self.client.outstanding) 59 completed = self.outstanding.intersection(delta) /home/user/ in _really_apply(self, f, args, kwargs, targets, block, track) /path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs) 44 n_previous = len(self.client.history) 45 try: ---> 46 ret = f(self, *args, **kwargs) 47 finally: 48 nmsgs = len(self.client.history) - n_previous /path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track) 529 if block: 530 try: --> 531 return ar.get() 532 except KeyboardInterrupt: 533 pass /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout) 101 return self._result 102 else: --> 103 raise self._exception 104 else: 105 raise error.TimeoutError("Result not ready.") CompositeError: one or more exceptions from call to method: _execute [0:apply]: ZeroDivisionError: integer division or modulo by zero [1:apply]: ZeroDivisionError: integer division or modulo by zero [2:apply]: ZeroDivisionError: integer division or modulo by zero [3:apply]: ZeroDivisionError: integer division or modulo by zero In [82]: %debug > /path/to/site-packages/IPython/parallel/client/asyncresult.py(103)get() 102 else: --> 103 raise self._exception 104 else: # With the debugger running, self._exception is the exceptions instance. We can tab complete # on it and see the extra methods that are available. ipdb> self._exception. e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception ipdb> self._exception.print_tracebacks() [0:apply]: Traceback (most recent call last): File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request exec code in working,working File "", line 1, in File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute exec code in globals() File "", line 1, in ZeroDivisionError: integer division or modulo by zero [1:apply]: Traceback (most recent call last): File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request exec code in working,working File "", line 1, in File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute exec code in globals() File "", line 1, in ZeroDivisionError: integer division or modulo by zero [2:apply]: Traceback (most recent call last): File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request exec code in working,working File "", line 1, in File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute exec code in globals() File "", line 1, in ZeroDivisionError: integer division or modulo by zero [3:apply]: Traceback (most recent call last): File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request exec code in working,working File "", line 1, in File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute exec code in globals() File "", line 1, in ZeroDivisionError: integer division or modulo by zero All of this same error handling magic even works in non-blocking mode: .. sourcecode:: ipython In [83]: dview.block=False In [84]: ar = dview.execute('1/0') In [85]: ar.get() --------------------------------------------------------------------------- CompositeError Traceback (most recent call last) /home/user/ in () ----> 1 ar.get() /path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout) 101 return self._result 102 else: --> 103 raise self._exception 104 else: 105 raise error.TimeoutError("Result not ready.") CompositeError: one or more exceptions from call to method: _execute [0:apply]: ZeroDivisionError: integer division or modulo by zero [1:apply]: ZeroDivisionError: integer division or modulo by zero [2:apply]: ZeroDivisionError: integer division or modulo by zero [3:apply]: ZeroDivisionError: integer division or modulo by zero