IPython Documentation

Table Of Contents

Previous topic

Basics of remote execution

Next topic

Multiplexing Exercises

This Page

IPython’s Direct interface

The direct, or multiengine, interface represents one possible way of working with a set of IPython engines. The basic idea behind the multiengine interface is that the capabilities of each engine are directly and explicitly exposed to the user. Thus, in the multiengine interface, each engine is given an id that is used to identify the engine and give it work to do. This interface is very intuitive and is designed with interactive usage in mind, and is the best place for new users of IPython to begin.

Creating a DirectView instance

For direct execution, we will make use of a DirectView object, which can be constructed via index-access to the client:

In [4]: dview = rc[:] # use all engines

We can also create views for different subsets of engines:

In [5]: even = rc[::2] # every other engine

In [6]: odd = rc[1::2]

Quick and easy parallelism

In many cases, you simply want to apply a Python function to a sequence of objects, but in parallel. The client interface provides a simple way of accomplishing this: using the DirectView’s map() method.

Parallel map

Python’s builtin map() functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, since IPython’s interface is all about functions anyway, you can just use the builtin map() with a RemoteFunction, or a DirectView’s map() method:

In [61]: dview.block = True

In [62]: serial_result = map(lambda x:x**10, range(32))

In [63]: parallel_result = dview.map(lambda x: x**10, range(32))

In [67]: serial_result==parallel_result
Out[67]: True

Remote function decorators

Remote functions are a bit more interesting when the view has multiple engines:

In [10]: @dview.remote(block=True)
   ....: def getpid():
   ....:     import os
   ....:     return os.getpid()
   ....:

In [11]: getpid()
Out[11]: [12345, 12346, 12347, 12348]

The @parallel decorator creates parallel functions, that break up an element-wise operations and distribute them, reconstructing the result.

In [12]: import numpy as np

In [13]: A = np.random.random((64,48))

In [14]: @dview.parallel(block=True)
   ....: def pmul(A,B):
   ....:     return A*B

In [15]: C_local = A*A

In [16]: C_remote = pmul(A,A)

In [17]: (C_local == C_remote).all()
Out[17]: True

Calling Python functions

Now we are back to apply, but with more than one target. apply —–

The main method for doing remote execution (in fact, all methods that communicate with the engines are built on top of it), is View.apply().

We strive to provide the cleanest interface we can, so apply has the following signature:

view.apply(f, *args, **kwargs)

There are various ways to call functions with IPython, and these flags are set as attributes of the View. The DirectView has just two of these flags:

dv.block : bool
whether to wait for the result, or return an AsyncResult object immediately
dv.track : bool
whether to instruct pyzmq to track when This is primarily useful for non-copying sends of numpy arrays that you plan to edit in-place. You need to know when it becomes safe to edit the buffer without corrupting the message.
dv.targets : int, list of ints
which targets this view is associated with.

For convenience, you can set block temporarily for a single call with the extra sync/async methods.

Blocking execution

In blocking mode, the DirectView object (called dview in these examples) submits the command to the controller, which places the command in the engines’ queues for execution. The apply() call then blocks until the engines are done executing the command:

In [2]: dview = rc[:] # A DirectView of all engines
In [3]: dview.block=True
In [4]: dview['a'] = 5

In [5]: dview['b'] = 10

In [6]: dview.apply(lambda x: a+b+x, 27)
Out[6]: [42, 42, 42, 42]

You can also select blocking execution on a call-by-call basis with the apply_sync() method:

In [7]: dview.block=False

In [8]: dview.apply_sync(lambda x: a+b+x, 27) Out[8]: [42, 42, 42, 42]

Python commands can be executed as strings on specific engines by using a View’s execute method:

In [6]: rc[::2].execute('c=a+b')

In [7]: rc[1::2].execute('c=a-b')

In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
Out[8]: [15, -5, 15, -5]

Non-blocking execution

In non-blocking mode, apply() submits the command to be executed and then returns a AsyncResult object immediately. The AsyncResult object gives you a way of getting a result at a later time through its get() method.

This allows you to quickly submit long running commands without blocking your local Python/IPython session:

# define our function
In [6]: def wait(t):
  ....:     import time
  ....:     tic = time.time()
  ....:     time.sleep(t)
  ....:     return time.time()-tic

# In non-blocking mode
In [7]: ar = dview.apply_async(wait, 2)

# Now block for the result
In [8]: ar.get()
Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]

# Again in non-blocking mode
In [9]: ar = dview.apply_async(wait, 10)

# Poll to see if the result is ready
In [10]: ar.ready()
Out[10]: False

# ask for the result, but wait a maximum of 1 second:
In [45]: ar.get(1)
---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
/home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
----> 1 ar.get(1)

/path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
     62                 raise self._exception
     63         else:
---> 64             raise error.TimeoutError("Result not ready.")
     65
     66     def ready(self):

TimeoutError: Result not ready.

Often, it is desirable to wait until a set of AsyncResult objects are done. For this, there is a the method wait(). This method takes a tuple of AsyncResult objects (or msg_ids or indices to the client’s History), and blocks until all of the associated results are ready:

In [72]: dview.block=False

# A trivial list of AsyncResults objects
In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]

# Wait until all of them are done
In [74]: dview.wait(pr_list)

# Then, their results are ready using get() or the `.r` attribute
In [75]: pr_list[0].get()
Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]

The block and targets attributes

Most DirectView methods (excluding apply()) accept block and targets as keyword arguments. As we have seen above, these keyword arguments control the blocking mode and which engines the command is applied to. The View class also has block and targets attributes that control the default behavior when the keyword arguments are not provided. Thus the following logic is used for block and targets:

  • If no keyword argument is provided, the instance attributes are used.
  • Keyword argument, if provided override the instance attributes for the duration of a single call.

The following examples demonstrate how to use the instance attributes:

In [16]: dview.targets = [0,2]

In [17]: dview.block = False

In [18]: ar = dview.apply(lambda : 10)

In [19]: ar.get()
Out[19]: [10, 10]

In [16]: dview.targets = v.client.ids # all engines (4)

In [21]: dview.block = True

In [22]: dview.apply(lambda : 42)
Out[22]: [42, 42, 42, 42]

The block and targets instance attributes of the DirectView also determine the behavior of the parallel magic commands.

Parallel magic commands

We provide a few IPython magic commands (%px, %autopx and %result) that make it more pleasant to execute Python commands on the engines interactively. These are simply shortcuts to execute() and get_result() of the DirectView. The %px magic executes a single Python command on the engines specified by the targets attribute of the DirectView instance:

# Create a DirectView for all targets
In [22]: dv = rc[:]

# Make this DirectView active for parallel magic commands
In [23]: dv.activate()

In [24]: dv.block=True

# import numpy here and everywhere
In [25]: with dv.sync_imports():
   ....:    import numpy
importing numpy on engine(s)

In [27]: %px a = numpy.random.rand(2,2)
Parallel execution on engines: [0, 1, 2, 3]

In [28]: %px ev = numpy.linalg.eigvals(a)
Parallel execution on engines: [0, 1, 2, 3]

In [28]: dv['ev']
Out[28]: [ array([ 1.09522024, -0.09645227]),
   ....: array([ 1.21435496, -0.35546712]),
   ....:   array([ 0.72180653,  0.07133042]),
   ....:   array([  1.46384341,   1.04353244e-04])
   ....: ]

The %result magic gets the most recent result, or takes an argument specifying the index of the result to be requested. It is simply a shortcut to the get_result() method:

In [29]: dv.apply_async(lambda : ev)

In [30]: %result
Out[30]: [ [ 1.28167017  0.14197338],
   ....:    [-0.14093616  1.27877273],
   ....:    [-0.37023573  1.06779409],
   ....:    [ 0.83664764 -0.25602658] ]

The %autopx magic switches to a mode where everything you type is executed on the engines given by the targets attribute:

In [30]: dv.block=False

In [31]: %autopx
Auto Parallel Enabled
Type %autopx to disable

In [32]: max_evals = []
<IPython.parallel.AsyncResult object at 0x17b8a70>

In [33]: for i in range(100):
   ....:     a = numpy.random.rand(10,10)
   ....:     a = a+a.transpose()
   ....:     evals = numpy.linalg.eigvals(a)
   ....:     max_evals.append(evals[0].real)
   ....:
   ....:
<IPython.parallel.AsyncResult object at 0x17af8f0>

In [34]: %autopx
Auto Parallel Disabled

In [35]: dv.block=True

In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
Parallel execution on engines: [0, 1, 2, 3]

In [37]: dv['ans']
Out[37]: [ 'Average max eigenvalue is:  10.1387247332',
   ....:   'Average max eigenvalue is:  10.2076902286',
   ....:   'Average max eigenvalue is:  10.1891484655',
   ....:   'Average max eigenvalue is:  10.1158837784',]

Moving Python objects around

Just like elsewhere, now that we have multiple engines, push/pull work with multiple engines at once.

Basic push and pull

Here are some examples of how you use push() and pull():

In [38]: dview.push(dict(a=1.03234,b=3453))
Out[38]: [None,None,None,None]

In [39]: dview.pull('a')
Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]

In [40]: dview.pull('b', targets=0)
Out[40]: 3453

In [41]: dview.pull(('a','b'))
Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]

In [43]: dview.push(dict(c='speed'))
Out[43]: [None,None,None,None]

In non-blocking mode push() and pull() also return AsyncResult objects:

In [48]: ar = dview.pull('a', block=False)

In [49]: ar.get()
Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]

Dictionary interface

Since a Python namespace is just a dict, DirectView objects provide dictionary-style access by key and methods such as get() and update() for convenience. This make the remote namespaces of the engines appear as a local dictionary. Underneath, these methods call apply():

In [51]: dview['a']=['foo','bar']

In [52]: dview['a']
Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]

Scatter and gather

Sometimes it is useful to partition a sequence and push the partitions to different engines. In MPI language, this is know as scatter/gather and we follow that terminology. However, it is important to remember that in IPython’s Client class, scatter() is from the interactive IPython session to the engines and gather() is from the engines back to the interactive IPython session. For scatter/gather operations between engines, MPI, pyzmq, or some other direct interconnect should be used.

In [58]: dview.scatter('a',range(16))
Out[58]: [None,None,None,None]

In [59]: dview['a']
Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]

In [60]: dview.gather('a')
Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

Exercise: Matrix Multiply

Using scatter and gather, can you write a parallel matrix-multiply?

solution

Other things to look at

How to do parallel list comprehensions

In many cases list comprehensions are nicer than using the map function. While we don’t have fully parallel list comprehensions, it is simple to get the basic effect using scatter() and gather():

In [66]: dview.scatter('x',range(64))

In [67]: %px y = [i**10 for i in x]
Parallel execution on engines: [0, 1, 2, 3]
Out[67]:

In [68]: y = dview.gather('y')

In [69]: print y
[0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]

Parallel exceptions

In the multiengine interface, parallel commands can raise Python exceptions, just like serial commands. But, it is a little subtle, because a single parallel command can actually raise multiple exceptions (one for each engine the command was run on). To express this idea, we have a CompositeError exception class that will be raised in most cases. The CompositeError class is a special type of exception that wraps one or more other types of exceptions. Here is how it works:

In [76]: dview.block=True

In [77]: dview.execute('1/0')
---------------------------------------------------------------------------
CompositeError                            Traceback (most recent call last)
/home/user/<ipython-input-10-5d56b303a66c> in <module>()
----> 1 dview.execute('1/0')

/path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
    591                 default: self.block
    592         """
--> 593         return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
    594
    595     def run(self, filename, targets=None, block=None):

/home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)

/path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
     55 def sync_results(f, self, *args, **kwargs):
     56     """sync relevant results from self.client to our results attribute."""
---> 57     ret = f(self, *args, **kwargs)
     58     delta = self.outstanding.difference(self.client.outstanding)
     59     completed = self.outstanding.intersection(delta)

/home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)

/path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
     44     n_previous = len(self.client.history)
     45     try:
---> 46         ret = f(self, *args, **kwargs)
     47     finally:
     48         nmsgs = len(self.client.history) - n_previous

/path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
    529         if block:
    530             try:
--> 531                 return ar.get()
    532             except KeyboardInterrupt:
    533                 pass

/path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
    101                 return self._result
    102             else:
--> 103                 raise self._exception
    104         else:
    105             raise error.TimeoutError("Result not ready.")

CompositeError: one or more exceptions from call to method: _execute
[0:apply]: ZeroDivisionError: integer division or modulo by zero
[1:apply]: ZeroDivisionError: integer division or modulo by zero
[2:apply]: ZeroDivisionError: integer division or modulo by zero
[3:apply]: ZeroDivisionError: integer division or modulo by zero

Notice how the error message printed when CompositeError is raised has information about the individual exceptions that were raised on each engine. If you want, you can even raise one of these original exceptions:

In [80]: try:
   ....:     dview.execute('1/0')
   ....: except parallel.error.CompositeError, e:
   ....:     e.raise_exception()
   ....:
   ....:
---------------------------------------------------------------------------
RemoteError                               Traceback (most recent call last)
/home/user/<ipython-input-17-8597e7e39858> in <module>()
      2     dview.execute('1/0')
      3 except CompositeError as e:
----> 4     e.raise_exception()

/path/to/site-packages/IPython/parallel/error.pyc in raise_exception(self, excid)
    266             raise IndexError("an exception with index %i does not exist"%excid)
    267         else:
--> 268             raise RemoteError(en, ev, etb, ei)
    269
    270

RemoteError: ZeroDivisionError(integer division or modulo by zero)
Traceback (most recent call last):
  File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
    exec code in working,working
  File "<string>", line 1, in <module>
  File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
    exec code in globals()
  File "<string>", line 1, in <module>
ZeroDivisionError: integer division or modulo by zero

If you are working in IPython, you can simple type %debug after one of these CompositeError exceptions is raised, and inspect the exception instance:

In [81]: dview.execute('1/0')
---------------------------------------------------------------------------
CompositeError                            Traceback (most recent call last)
/home/user/<ipython-input-10-5d56b303a66c> in <module>()
----> 1 dview.execute('1/0')

/path/to/site-packages/IPython/parallel/client/view.pyc in execute(self, code, targets, block)
    591                 default: self.block
    592         """
--> 593         return self._really_apply(util._execute, args=(code,), block=block, targets=targets)
    594
    595     def run(self, filename, targets=None, block=None):

/home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)

/path/to/site-packages/IPython/parallel/client/view.pyc in sync_results(f, self, *args, **kwargs)
     55 def sync_results(f, self, *args, **kwargs):
     56     """sync relevant results from self.client to our results attribute."""
---> 57     ret = f(self, *args, **kwargs)
     58     delta = self.outstanding.difference(self.client.outstanding)
     59     completed = self.outstanding.intersection(delta)

/home/user/<string> in _really_apply(self, f, args, kwargs, targets, block, track)

/path/to/site-packages/IPython/parallel/client/view.pyc in save_ids(f, self, *args, **kwargs)
     44     n_previous = len(self.client.history)
     45     try:
---> 46         ret = f(self, *args, **kwargs)
     47     finally:
     48         nmsgs = len(self.client.history) - n_previous

/path/to/site-packages/IPython/parallel/client/view.pyc in _really_apply(self, f, args, kwargs, targets, block, track)
    529         if block:
    530             try:
--> 531                 return ar.get()
    532             except KeyboardInterrupt:
    533                 pass

/path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
    101                 return self._result
    102             else:
--> 103                 raise self._exception
    104         else:
    105             raise error.TimeoutError("Result not ready.")

CompositeError: one or more exceptions from call to method: _execute
[0:apply]: ZeroDivisionError: integer division or modulo by zero
[1:apply]: ZeroDivisionError: integer division or modulo by zero
[2:apply]: ZeroDivisionError: integer division or modulo by zero
[3:apply]: ZeroDivisionError: integer division or modulo by zero

In [82]: %debug
> /path/to/site-packages/IPython/parallel/client/asyncresult.py(103)get()
    102             else:
--> 103                 raise self._exception
    104         else:

ipdb> self._exception.<tab>
e.__class__         e.__getitem__       e.__new__           e.__setstate__      e.args
e.__delattr__       e.__getslice__      e.__reduce__        e.__str__           e.elist
e.__dict__          e.__hash__          e.__reduce_ex__     e.__weakref__       e.message
e.__doc__           e.__init__          e.__repr__          e._get_engine_str   e.print_tracebacks
e.__getattribute__  e.__module__        e.__setattr__       e._get_traceback    e.raise_exception
ipdb> self._exception.print_tracebacks()
[0:apply]:
Traceback (most recent call last):
  File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
    exec code in working,working
  File "<string>", line 1, in <module>
  File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
    exec code in globals()
  File "<string>", line 1, in <module>
ZeroDivisionError: integer division or modulo by zero


[1:apply]:
Traceback (most recent call last):
  File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
    exec code in working,working
  File "<string>", line 1, in <module>
  File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
    exec code in globals()
  File "<string>", line 1, in <module>
ZeroDivisionError: integer division or modulo by zero


[2:apply]:
Traceback (most recent call last):
  File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
    exec code in working,working
  File "<string>", line 1, in <module>
  File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
    exec code in globals()
  File "<string>", line 1, in <module>
ZeroDivisionError: integer division or modulo by zero


[3:apply]:
Traceback (most recent call last):
  File "/path/to/site-packages/IPython/parallel/engine/streamkernel.py", line 330, in apply_request
    exec code in working,working
  File "<string>", line 1, in <module>
  File "/path/to/site-packages/IPython/parallel/util.py", line 354, in _execute
    exec code in globals()
  File "<string>", line 1, in <module>
ZeroDivisionError: integer division or modulo by zero

All of this same error handling magic even works in non-blocking mode:

In [83]: dview.block=False

In [84]: ar = dview.execute('1/0')

In [85]: ar.get()
---------------------------------------------------------------------------
CompositeError                            Traceback (most recent call last)
/home/user/<ipython-input-21-8531eb3d26fb> in <module>()
----> 1 ar.get()

/path/to/site-packages/IPython/parallel/client/asyncresult.pyc in get(self, timeout)
    101                 return self._result
    102             else:
--> 103                 raise self._exception
    104         else:
    105             raise error.TimeoutError("Result not ready.")

CompositeError: one or more exceptions from call to method: _execute
[0:apply]: ZeroDivisionError: integer division or modulo by zero
[1:apply]: ZeroDivisionError: integer division or modulo by zero
[2:apply]: ZeroDivisionError: integer division or modulo by zero
[3:apply]: ZeroDivisionError: integer division or modulo by zero