Mainly Tech projects on Python and Electronic Design Automation.

Saturday, May 23, 2009

Pipe Fitting with Python Generators

I was investigating the Hofstadter-Conway href=""
target="_blank">$10,000 sequence which, for
mathematics, has a very href=""
target="_blank"> interesting story.  To
'follow the money', you have to generate the sequence of values and
calculate maxima and minima of the sequence values so I decided to
create a generator of the sequence values and pass it through filters
that would emit local maxima/minima. I ended up with something like:

high_generator =

highs = [ for i in range(1000)]

Now I just hated the nested call of generators. If it were a Unix
shell, I would have hoped to have written:

sequence_generator |
maxima |head 1000 > highs

Notice how the data flows from source at the left to sink at the right.
I like that.

I spent some time trying to use classes and redefine bitwise or for
class instances, but it was looking messy and my heart wasn't really in
a class based solution.

I left it for a week. And in that week I remembered seeing a function
called pipe being used to pipe things so decided to write my own pipe
function for combining generators in a more natural way.


What I wanted was a function called pipe, that when called with several
generators, returned a nested call of all its arguments, with the
leftmost argument being in the center of the nest. For example, a call


Would return the equivalent of:


The eventual definition turned out to be quite straight-forward:

 color="#804040">def  color="#008080">pipe(*cmds):
''' color="#ff00ff"> pipe(a,b,c,d, ...) -> yield from ...d(c(b(a())))
color="#ff00ff"> '''
gen = cmds[0]
color="#804040">for cmd color="#804040">in cmds[1:]:
gen = cmd(gen)
color="#804040">for x color="#804040">in gen:
color="#804040">yield x

Pipe in use

Lets create some simple generators; style="font-family: monospace;">count is just
itertools.count; sqr
yields the square of items on its input iterator

from itertools  color="#a020f0">import count

color="#804040">def color="#008080">sqr(x):
color="#804040"> for val color="#804040">in x:
color="#804040"> yield val*val

I can now generate the squares of integers:

>>> p = pipe(count(), sqr)
>>> [ color="#804040">for i color="#804040">in range(5)]
[0, 1, 4, 9, 16]

But I need a more complex definition for head as it needs to have a
parameter of the maximum number of items to pass through.

def  color="#008080">head(n):
''' color="#ff00ff"> return a generator that passes through at most n items
color="#ff00ff"> '''
color="#804040">def color="#008080">head(x):
i = n
color="#804040">for val color="#804040">in x:
color="#804040">if i>0:
i -= 1
color="#804040">yield val
color="#804040">raise StopIteration
color="#804040">return head

And now with head I can more naturally express nested generator
functions as I can stop an infinite generator stream easily for example:

>>> list(pipe(count(), sqr, head(4)))
[0, 1, 4, 9]
>>> color="#804040">for i color="#804040">in pipe(count(), sqr, head(10)):
color="#804040">print i,

0 1 4 9 16 25 36 49 64 81

Generalized function generator and filter

I thought it might be handy to have a function that skipped an input if
the prefilter(input) was false; applied a given function, fn(input) to
the input stream and passed on fn(input) to the output if
postfilter(fn(input)) was true. Think of a stream of data. You can
filter it, apply a function to it, or filter after applying the
function - all via this pipe filter. I called it fnf for function

def  color="#008080">fnf(fn, prefilter=None, postfilter=None):
''' color="#ff00ff"> functionfilter:
color="#ff00ff"> for s in x:
color="#ff00ff"> yield fn(s) subject to prefilter(s), postfilter(fn(s)) returning True if present,
color="#ff00ff"> or skip this s
color="#ff00ff"> '''
color="#804040">def color="#008080">_fnf(x):
color="#804040">for s color="#804040">in x:
color="#804040">if prefilter color="#804040">is None color="#804040">or prefilter(s):
fns = fn(s)
color="#804040">if postfilter color="#804040">is None color="#804040">or postfilter(fns):
color="#804040">yield fns
_fnf.__doc__ = ' color="#ff00ff">_fnf = fnf(%s, %s, %s)' %(fn, prefilter, postfilter)
color="#804040">return _fnf

I'll show fnf in action.

>>> # turns the input stream of numbers, x into a stream of tuples x,x/2.0
>>> color="#804040">def color="#008080">n_nby2(x): color="#804040">return x, x/2.0

>>> list(pipe( count(), sqr, head(5), style="font-weight: bold; text-decoration: underline;">fnf(n_nby2) ))
[(0, 0.0), (1, 0.5), (4, 2.0), (9, 4.5), (16, 8.0)]
>>> color="#0000ff"># Lets create a prefilter
>>> color="#804040">def color="#008080">even(x): color="#804040">return x%2 == 0

>>> list(pipe( count(), sqr, head(5), fnf(n_nby2, style="font-weight: bold; text-decoration: underline;">even) ))
[(0, 0.0), (4, 2.0), (16, 8.0)]
>>> color="#0000ff"># And now a postfilter which must work on the result of applying fn
>>> color="#804040">def color="#008080">frac(x): color="#804040">return int(x[1]) != x[1]

>>> list(pipe( count(), sqr, head(5), fnf(n_nby2, style="font-style: italic; font-weight: bold;">None, style="font-weight: bold; text-decoration: underline;">frac) ))
[(1, 0.5), (9, 4.5)]
>>> color="#0000ff"># Note the presence of style="font-weight: bold; font-style: italic;">None in the above call as we don't use a prefilter

Thats it for now with the pipe fitting. I do have a tee function for
printing intermediate results,  but it's style="text-decoration: line-through;">late
early and I'm tired....

- Paddy.


  1. is 'head' not simply a re-invention of itertools.islice?

  2. Hmm, thanks An. there's a lot in itertools that could be made use of, (as well as a name clash over tee). I should have read the whole of itertools before posting.

    What do you think of the general idea of pipe? I just hate the standard gen3(gen2(gen1))) syntax, and would rather a function based rather than a class based alternative.

    - Paddy.

  3. Very cool!

    I really like pipe. I find functioncall backwardsness annoying.

  4. And you can also use "pipe notation" as in

    for a even more readable code...

  5. Ah recipe 576759. That is the class based solution I was trying to avoid, (but it is rather succinct compared to what I thought might be needed).
    - Paddy.

  6. Hi Martin G.
    Thanks. I did like watching "A Curious Course on Coroutines and Concurrency". I liked the contrast he gave between data being pushed through coroutines versus being pulled through generators, but have no wish to write an OS :-)

    - Paddy.

  7. You should go on the Python ideas mailing list and propose this as an addition to the itertools module.

  8. I used to write lazy-reduce function and
    reduce(lambda n,m: n(m), cmds[1:], cmds[0])
    to transform func(a, b, c, d) -> d(c(b(a())))
    that can be used for filter-chain like a pipe.

    Thise decorators may be useful?

    def pipe_map(func):
    return lambda xs: (func(x) for x in xs)
    def pipe_filter(func):
    return lambda xs: (x for x in xs if func(x))

    def sqr(x):
    return x*x

    def even(x):
    return x & 1 == 0

  9. @theearthboundkid: Thanks for the vote, but I'd like to just kick it around for a while longer. There are other solutions, such as the use of classes to overload '|' that people need to compare and contrast I guess.

    @Anon: reduce could be a way to create pipe as you've shown. I can see how pipe_map and pipe_filter do everything that fnf does, in faacct I was weighing separate filter and map over the combination when I wrote it. After not enough rumination I thought that although in Unix we have grep which does filtering, when I want to do apply a function to a stream I end up in Awk/perl anyway where you can do both, so fnf it was.



Subscribe Now: google

Add to Google Reader or Homepage

Go deh too!

Blog Archive