Mainly Tech projects on Python and Electronic Design Automation.

Wednesday, June 01, 2022

From pairwise to n-wise

 Python3.10 introduced the itertools.pairwise function:

itertools.pairwise(iterable)

Return successive overlapping pairs taken from the input iterable.

The number of 2-tuples in the output iterator will be one fewer than the number of inputs. It will be empty if the input iterable has fewer than two values.

 

Here's a few examples showing what it does:

from itertools import pairwise, tee

x = range(7)
print(f"{x = }\n{list(pairwise(x)) = }")
x = 'ABCDEFG'
print(f"{x = }\n{list(pairwise(x)) = }")
print(f"{x = }\n{[''.join(w) for w in pairwise(x)] = }")
print()

 Which produces output:

x = range(0, 7)
list(pairwise(x)) = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6)]
x = 'ABCDEFG'
list(pairwise(x)) = [('A', 'B'), ('B', 'C'), ('C', 'D'), ('D', 'E'), ('E', 'F'), ('F', 'G')]
x = 'ABCDEFG'
[''.join(w) for w in pairwise(x)] = ['AB', 'BC', 'CD', 'DE', 'EF', 'FG']

 pairwise equivalent

The docs also give a roughly equivalent function that I reproduce below:

def pairwise_eq(iterable):
    # pairwise('ABCDEFG') --> AB BC CD DE EF FG
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

x = range(7)
print(f"{x = }\n{list(pairwise_eq(x)) = }")
x = 'ABCDEFG'
print(f"{x = }\n{list(pairwise_eq(x)) = }")
print(f"{x = }\n{[''.join(w) for w in pairwise_eq(x)] = }")
print()

pairwise_eq output is the same:

x = range(0, 7)
list(pairwise_eq(x)) = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6)]
x = 'ABCDEFG'
list(pairwise_eq(x)) = [('A', 'B'), ('B', 'C'), ('C', 'D'), ('D', 'E'), ('E', 'F'), ('F', 'G')]
x = 'ABCDEFG'
[''.join(w) for w in pairwise_eq(x)] = ['AB', 'BC', 'CD', 'DE', 'EF', 'FG']

tri-wise

Looking at the code for pairwise_eq it can be easily expanded to generate a similar moving window of three items by:

  1. tee'ing into three streams.
  2. dropping two items from the third stream.

That produces the following code:

def tri_wise(iterable):
    a, b, c = tee(iterable, 3)
    next(b, None)
    for i in range(2):
        next(c, None)
    return zip(a, b, c)

x = range(7)
print(f"{x = }\n{list(tri_wise(x)) = }")
x = 'ABCDEFG'
print(f"{x = }\n{list(tri_wise(x)) = }")
print(f"{x = }\n{[''.join(w) for w in tri_wise(x)] = }")
print()

It produces the following output:

x = range(0, 7)
list(tri_wise(x)) = [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)]
x = 'ABCDEFG'
list(tri_wise(x)) = [('A', 'B', 'C'), ('B', 'C', 'D'), ('C', 'D', 'E'), ('D', 'E', 'F'), ('E', 'F', 'G')]
x = 'ABCDEFG'
[''.join(w) for w in tri_wise(x)] = ['ABC', 'BCD', 'CDE', 'DEF', 'EFG']

n-wise

 To generalise the code to generate moving windows of output of a given width I need to:

  1. Generate several tee'd streams.
    Easily done with tee(stream, n).
  2. drop items from the front of each stream to skew each stream progressively.
    The list comprehensions output expression has a nested list comprehension to drop items from the stream and the stream itself as a tuple, from which is selects, (via the index [1]), the resultant skewed stream
  3. zip the resultant streams together to form the output.

Code:

def nwise(iterable, width):
    tees = tee(iterable, width)
    skewed = [([next(t) for _ in range(position)], t)[1]
              for position, t in enumerate(tees)]
    return zip(*skewed)

x = range(7)
for wdth in range(11):
    try:
        print(f"{x = }; {wdth = }\n{list(nwise(x, wdth)) = }")
    except StopIteration:
        print(f"{x = }; {wdth = }\nlist(nwise(x, wdth)) Raised StopIteration!")
print()

The output nwise(iterator, 2) matches pairwise(iterator).

Function nwise has no range checks as it works with general iterators whose length may not be known. it raises errors when the width is one more than the number of items in the iterator.

x = range(0, 7); wdth = 0
list(nwise(x, wdth)) = []
x = range(0, 7); wdth = 1
list(nwise(x, wdth)) = [(0,), (1,), (2,), (3,), (4,), (5,), (6,)]
x = range(0, 7); wdth = 2
list(nwise(x, wdth)) = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6)]
x = range(0, 7); wdth = 3
list(nwise(x, wdth)) = [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)]
x = range(0, 7); wdth = 4
list(nwise(x, wdth)) = [(0, 1, 2, 3), (1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6)]
x = range(0, 7); wdth = 5
list(nwise(x, wdth)) = [(0, 1, 2, 3, 4), (1, 2, 3, 4, 5), (2, 3, 4, 5, 6)]
x = range(0, 7); wdth = 6
list(nwise(x, wdth)) = [(0, 1, 2, 3, 4, 5), (1, 2, 3, 4, 5, 6)]
x = range(0, 7); wdth = 7
list(nwise(x, wdth)) = [(0, 1, 2, 3, 4, 5, 6)]
x = range(0, 7); wdth = 8
list(nwise(x, wdth)) = []
x = range(0, 7); wdth = 9
list(nwise(x, wdth)) Raised StopIteration!
x = range(0, 7); wdth = 10
list(nwise(x, wdth)) Raised StopIteration!

Single expression

The intermediate variables, tees, and skewed of function nwise can be removed to create a function of just one return expression:

def nwise_squashed(iterable, width):
    return zip(*(([next(t) for _ in range(position)], t)[1]
                 for position, t in enumerate(tee(iterable, width))))

x = range(7)
for wdth in range(11):
    try:
        print(f"{x = }; {wdth = }\n{list(nwise_squashed(x, wdth)) = }")
    except Exception as e:
        print(f"{x = }; {wdth = }\nlist(nwise_squashed(x, wdth)): {e}!")
print()

 It works, but I don't like it - function nwise_squashed is less maintainable for me than function nwise.


End Bit

I managed to generalise itertools.pairwise. A tricky bit was running next() appropriate times on tee'd streams in a comprehension; I'll remember the trick of using expressions in a tuple to change an iterator before returning the resultant via indexing.

END.

 

No comments:

Post a Comment

Followers

Subscribe Now: google

Add to Google Reader or Homepage

Go deh too!

whos.amung.us

Blog Archive