Friday, June 03, 2022

Python: Yield the first item of consecutive series of equal items, from an iterable.

 I was reading the source to the more_itertools.unique_unseen function. The function takes an iterator that may have regions where successive items are the same, and yields the item if next item is not the same as it.

itertools.groupby

Since the problem involves grouping, my first thought is to think of ways to apply itertools.groupby; but that is used in the more_itertools function reproduced below:

def unique_justseen(iterable, key=None):
    """Yields elements in order, ignoring serial duplicates

    >>> list(unique_justseen('AAAABBBCCDAABBB'))
    ['A', 'B', 'C', 'D', 'A', 'B']
    >>> list(unique_justseen('ABBCcAD', str.lower))
    ['A', 'B', 'C', 'A', 'D']

    """
    return map(next, map(operator.itemgetter(1), groupby(iterable, key)))

It groups, then extracts the first of each group, and looks to be a good, tight implementation. 

I decided to find another way.

Successive items method

I thought a bit then decided:

  • The first item of the iterable is always returned.
  • Other items are returned if they are not equal to the item before it.

You need the item and the item next to it to determine if that item is yielded. I quiclky toyed with thoughts of teeing and dropping one item from a tee'd branch then comparing items from the zip of the tees; but discarded that. I thought that I could use the walrus operator in a comprehension to keep track of the previous item, (called this below), and doodled this code:

it = iter('AAAABBBCCDAABBB')
print(this := next(it), end=' ')
print([(this := item) for item in it if item != this])
# Outputs: A ['B', 'C', 'D', 'A', 'B']


It seemed to work, and I set about creating my function.

My Function first_of_series

Looking at the more_itertools.unique_justseen function above, you should note that it has a key argument. The key argument is a function used to transmute the items of the iterable prior to comparison, much like its use in the sorted function. I decided to add that functionality too.


My final function is as follows:

def first_of_series(iterable, key=None):
    """
    Yield the first item of consecutive series of equal items from an iterable.
    Items have the key function applied for the comparison, if given.

    >>> list(first_of_series('AAAABBBCCDAABBB'))
    ['A', 'B', 'C', 'D', 'A', 'B']
    >>> list(first_of_series('ABBCcAD'))
    ['A', 'B', 'C', 'c', 'A', 'D']
    >>> list(first_of_series('ABBCcAD', key=str.lower))
    ['A', 'B', 'C', 'A', 'D']
    >>> list(first_of_series('ABBcCAD', key=str.lower))
    ['A', 'B', 'c', 'A', 'D']

    """
    it = iter(iterable) # Change iterable into an iterator!
    yield (this := next(it))
    if key is None:
        yield from (item for item in it if this != (this := item))
    else:
        this = key(this)
        yield from (item for item in it if this != (this := key(item)))

It works like this:

  1. Form an iterator from any input iterable so you can call next() on it.
  2. Set this to the first item from the iterable and also yield it.
  3. If there is no key:
    1. yield from a comprehension yielding items from the iterable but only if the last i.e this is not equal to the current, item from the list. 
    2. The walrus assignment updates this.
  4. Else if there is a key function:
    1. this is now always the value of the key function applied to items from the iterable.
    2. The comparison in the comprehension is always of values of the key function applied to items from the iterable; although the plain items are yielded.

I have to rely on the left-to-right order of evaluation for the if condition in the comprehensions to get the updating of variable this correct; and it's more verbose than the groupby version.


I did have fun creating it though, (and it reinforces my rule of using groupby when grouping).

Thursday, June 02, 2022

Python: My nwise compared to more_itertools.sliding_window

In my last post I evolved two n-wise functions from the itertools.pairwise code to create functions that will generate a sliding window of arbitrary width from an input iterator.

Somone commented on reddit about my blog stating:

"Is there a reason why not just do it as they do it in more-itertools?"

Then gave the source from the more itertools site that I extract below:

from collections import deque, Counter
from itertools import islice
from timeit import Timer
import pandas as pd

def sliding_window(iterable, n):
    """Return a sliding window of width *n* over *iterable*.

        >>> list(sliding_window(range(6), 4))
        [(0, 1, 2, 3), (1, 2, 3, 4), (2, 3, 4, 5)]

    If *iterable* has fewer than *n* items, then nothing is yielded:

        >>> list(sliding_window(range(3), 4))
        []

    For a variant with more features, see :func:`windowed`.
    """
    it = iter(iterable)
    window = deque(islice(it, n), maxlen=n)
    if len(window) == n:
        yield tuple(window)
    for x in it:
        window.append(x)
        yield tuple(window)

Visual Comparison

I'll show my nwise function for comparison:

def nwise(iterable, width):
    tees = tee(iterable, width)
    skewed = [([next(t) for _ in range(position)], t)[1]
              for position, t in enumerate(tees)]
    return zip(*skewed)

 

sliding_window is nicely commented as it is in a public library; nwise is written to be read as part of its original blog post.

It's up to the reader to judge which code is more understandable. more_itertools does not have a blog post to help understand the code. nwise isn't packaged as a module - its a coding example.

Timing Comparison

 Yes, well, the timing methodology is set to compare run times over a range of inputs iterable sizes and output window widths. A particular application might benefit from more specific timings.

I added the following timing code:

data = dict(items=[], window=[], func=[], time=[])
fastest = []
for n in range(1,7,2):
    items = range(10**n)
    print()
    reps = None
    for w in range(9):
        wdth  = 2**w
        if wdth <= 10**n:
            print()
            func_speeds = []
            for func in (nwise, nwise_squashed, sliding_window):
                data['items'].append(len(items))
                data['window'].append(wdth)
                data['func'].append(func.__name__)
                cmd = f"list({func.__name__:<15}({str(items):<12}, {wdth:10_}))"
                print(cmd, end=' # ')
                timer = Timer(cmd, globals=globals())
                if reps is None:
                    reps, secs = timer.autorange()
                    if reps <3:
                        reps = 3
                secs = min(timer.repeat(3, reps)) # min of 3 runs of timeit(number=reps)
                data['time'].append(secs)
                print("%1.2fs, %i reps" % (secs, reps))
                func_speeds.append((secs, func.__name__))

            fastest.append(min(func_speeds)[1])

print("\nFunction most fast for a particular set of arguments:")
print("   ", Counter(fastest).most_common())



Generated Timings:

list(nwise          (range(0, 10),          1)) # 0.20s, 100000 reps
list(nwise_squashed (range(0, 10),          1)) # 0.22s, 100000 reps
list(sliding_window (range(0, 10),          1)) # 0.30s, 100000 reps

list(nwise          (range(0, 10),          2)) # 0.27s, 100000 reps
list(nwise_squashed (range(0, 10),          2)) # 0.29s, 100000 reps
list(sliding_window (range(0, 10),          2)) # 0.29s, 100000 reps

list(nwise          (range(0, 10),          4)) # 0.40s, 100000 reps
list(nwise_squashed (range(0, 10),          4)) # 0.43s, 100000 reps
list(sliding_window (range(0, 10),          4)) # 0.25s, 100000 reps

list(nwise          (range(0, 10),          8)) # 0.76s, 100000 reps
list(nwise_squashed (range(0, 10),          8)) # 0.78s, 100000 reps
list(sliding_window (range(0, 10),          8)) # 0.20s, 100000 reps


list(nwise          (range(0, 1000),          1)) # 0.21s, 5000 reps
list(nwise_squashed (range(0, 1000),          1)) # 0.21s, 5000 reps
list(sliding_window (range(0, 1000),          1)) # 0.90s, 5000 reps

list(nwise          (range(0, 1000),          2)) # 0.23s, 5000 reps
list(nwise_squashed (range(0, 1000),          2)) # 0.23s, 5000 reps
list(sliding_window (range(0, 1000),          2)) # 0.94s, 5000 reps

list(nwise          (range(0, 1000),          4)) # 0.26s, 5000 reps
list(nwise_squashed (range(0, 1000),          4)) # 0.28s, 5000 reps
list(sliding_window (range(0, 1000),          4)) # 0.96s, 5000 reps

list(nwise          (range(0, 1000),          8)) # 0.35s, 5000 reps
list(nwise_squashed (range(0, 1000),          8)) # 0.36s, 5000 reps
list(sliding_window (range(0, 1000),          8)) # 1.03s, 5000 reps

list(nwise          (range(0, 1000),         16)) # 0.56s, 5000 reps
list(nwise_squashed (range(0, 1000),         16)) # 0.56s, 5000 reps
list(sliding_window (range(0, 1000),         16)) # 1.23s, 5000 reps

list(nwise          (range(0, 1000),         32)) # 1.19s, 5000 reps
list(nwise_squashed (range(0, 1000),         32)) # 1.20s, 5000 reps
list(sliding_window (range(0, 1000),         32)) # 1.57s, 5000 reps

list(nwise          (range(0, 1000),         64)) # 2.69s, 5000 reps
list(nwise_squashed (range(0, 1000),         64)) # 2.85s, 5000 reps
list(sliding_window (range(0, 1000),         64)) # 2.41s, 5000 reps

list(nwise          (range(0, 1000),        128)) # 5.74s, 5000 reps
list(nwise_squashed (range(0, 1000),        128)) # 5.80s, 5000 reps
list(sliding_window (range(0, 1000),        128)) # 3.35s, 5000 reps

list(nwise          (range(0, 1000),        256)) # 15.83s, 5000 reps
list(nwise_squashed (range(0, 1000),        256)) # 15.87s, 5000 reps
list(sliding_window (range(0, 1000),        256)) # 4.58s, 5000 reps


list(nwise          (range(0, 100000),          1)) # 0.30s, 50 reps
list(nwise_squashed (range(0, 100000),          1)) # 0.30s, 50 reps
list(sliding_window (range(0, 100000),          1)) # 1.04s, 50 reps

list(nwise          (range(0, 100000),          2)) # 0.34s, 50 reps
list(nwise_squashed (range(0, 100000),          2)) # 0.33s, 50 reps
list(sliding_window (range(0, 100000),          2)) # 1.05s, 50 reps

list(nwise          (range(0, 100000),          4)) # 0.36s, 50 reps
list(nwise_squashed (range(0, 100000),          4)) # 0.37s, 50 reps
list(sliding_window (range(0, 100000),          4)) # 1.09s, 50 reps

list(nwise          (range(0, 100000),          8)) # 0.44s, 50 reps
list(nwise_squashed (range(0, 100000),          8)) # 0.46s, 50 reps
list(sliding_window (range(0, 100000),          8)) # 1.22s, 50 reps

list(nwise          (range(0, 100000),         16)) # 0.64s, 50 reps
list(nwise_squashed (range(0, 100000),         16)) # 0.64s, 50 reps
list(sliding_window (range(0, 100000),         16)) # 1.39s, 50 reps

list(nwise          (range(0, 100000),         32)) # 1.13s, 50 reps
list(nwise_squashed (range(0, 100000),         32)) # 1.13s, 50 reps
list(sliding_window (range(0, 100000),         32)) # 1.83s, 50 reps

list(nwise          (range(0, 100000),         64)) # 2.67s, 50 reps
list(nwise_squashed (range(0, 100000),         64)) # 2.64s, 50 reps
list(sliding_window (range(0, 100000),         64)) # 3.26s, 50 reps

list(nwise          (range(0, 100000),        128)) # 4.47s, 50 reps
list(nwise_squashed (range(0, 100000),        128)) # 4.38s, 50 reps
list(sliding_window (range(0, 100000),        128)) # 4.88s, 50 reps

list(nwise          (range(0, 100000),        256)) # 8.70s, 50 reps
list(nwise_squashed (range(0, 100000),        256)) # 8.57s, 50 reps
list(sliding_window (range(0, 100000),        256)) # 8.55s, 50 reps
 
Function most fast for a particular set of arguments:
    [('nwise', 11), ('sliding_window', 6), ('nwise_squashed', 5)]

  • sliding_window is only fastest 6 out of the 22 runs of the groups of three functions running with the same arguments. 
  • Either the nwise, or nwise_squashed algorithms are fastest most often, and with similar times between these two.


END.






Wednesday, June 01, 2022

From pairwise to n-wise

 Python3.10 introduced the itertools.pairwise function:

itertools.pairwise(iterable)

Return successive overlapping pairs taken from the input iterable.

The number of 2-tuples in the output iterator will be one fewer than the number of inputs. It will be empty if the input iterable has fewer than two values.

 

Here's a few examples showing what it does:

from itertools import pairwise, tee

x = range(7)
print(f"{x = }\n{list(pairwise(x)) = }")
x = 'ABCDEFG'
print(f"{x = }\n{list(pairwise(x)) = }")
print(f"{x = }\n{[''.join(w) for w in pairwise(x)] = }")
print()

 Which produces output:

x = range(0, 7)
list(pairwise(x)) = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6)]
x = 'ABCDEFG'
list(pairwise(x)) = [('A', 'B'), ('B', 'C'), ('C', 'D'), ('D', 'E'), ('E', 'F'), ('F', 'G')]
x = 'ABCDEFG'
[''.join(w) for w in pairwise(x)] = ['AB', 'BC', 'CD', 'DE', 'EF', 'FG']

 pairwise equivalent

The docs also give a roughly equivalent function that I reproduce below:

def pairwise_eq(iterable):
    # pairwise('ABCDEFG') --> AB BC CD DE EF FG
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

x = range(7)
print(f"{x = }\n{list(pairwise_eq(x)) = }")
x = 'ABCDEFG'
print(f"{x = }\n{list(pairwise_eq(x)) = }")
print(f"{x = }\n{[''.join(w) for w in pairwise_eq(x)] = }")
print()

pairwise_eq output is the same:

x = range(0, 7)
list(pairwise_eq(x)) = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6)]
x = 'ABCDEFG'
list(pairwise_eq(x)) = [('A', 'B'), ('B', 'C'), ('C', 'D'), ('D', 'E'), ('E', 'F'), ('F', 'G')]
x = 'ABCDEFG'
[''.join(w) for w in pairwise_eq(x)] = ['AB', 'BC', 'CD', 'DE', 'EF', 'FG']

tri-wise

Looking at the code for pairwise_eq it can be easily expanded to generate a similar moving window of three items by:

  1. tee'ing into three streams.
  2. dropping two items from the third stream.

That produces the following code:

def tri_wise(iterable):
    a, b, c = tee(iterable, 3)
    next(b, None)
    for i in range(2):
        next(c, None)
    return zip(a, b, c)

x = range(7)
print(f"{x = }\n{list(tri_wise(x)) = }")
x = 'ABCDEFG'
print(f"{x = }\n{list(tri_wise(x)) = }")
print(f"{x = }\n{[''.join(w) for w in tri_wise(x)] = }")
print()

It produces the following output:

x = range(0, 7)
list(tri_wise(x)) = [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)]
x = 'ABCDEFG'
list(tri_wise(x)) = [('A', 'B', 'C'), ('B', 'C', 'D'), ('C', 'D', 'E'), ('D', 'E', 'F'), ('E', 'F', 'G')]
x = 'ABCDEFG'
[''.join(w) for w in tri_wise(x)] = ['ABC', 'BCD', 'CDE', 'DEF', 'EFG']

n-wise

 To generalise the code to generate moving windows of output of a given width I need to:

  1. Generate several tee'd streams.
    Easily done with tee(stream, n).
  2. drop items from the front of each stream to skew each stream progressively.
    The list comprehensions output expression has a nested list comprehension to drop items from the stream and the stream itself as a tuple, from which is selects, (via the index [1]), the resultant skewed stream
  3. zip the resultant streams together to form the output.

Code:

def nwise(iterable, width):
    tees = tee(iterable, width)
    skewed = [([next(t) for _ in range(position)], t)[1]
              for position, t in enumerate(tees)]
    return zip(*skewed)

x = range(7)
for wdth in range(11):
    try:
        print(f"{x = }; {wdth = }\n{list(nwise(x, wdth)) = }")
    except StopIteration:
        print(f"{x = }; {wdth = }\nlist(nwise(x, wdth)) Raised StopIteration!")
print()

The output nwise(iterator, 2) matches pairwise(iterator).

Function nwise has no range checks as it works with general iterators whose length may not be known. it raises errors when the width is one more than the number of items in the iterator.

x = range(0, 7); wdth = 0
list(nwise(x, wdth)) = []
x = range(0, 7); wdth = 1
list(nwise(x, wdth)) = [(0,), (1,), (2,), (3,), (4,), (5,), (6,)]
x = range(0, 7); wdth = 2
list(nwise(x, wdth)) = [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6)]
x = range(0, 7); wdth = 3
list(nwise(x, wdth)) = [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)]
x = range(0, 7); wdth = 4
list(nwise(x, wdth)) = [(0, 1, 2, 3), (1, 2, 3, 4), (2, 3, 4, 5), (3, 4, 5, 6)]
x = range(0, 7); wdth = 5
list(nwise(x, wdth)) = [(0, 1, 2, 3, 4), (1, 2, 3, 4, 5), (2, 3, 4, 5, 6)]
x = range(0, 7); wdth = 6
list(nwise(x, wdth)) = [(0, 1, 2, 3, 4, 5), (1, 2, 3, 4, 5, 6)]
x = range(0, 7); wdth = 7
list(nwise(x, wdth)) = [(0, 1, 2, 3, 4, 5, 6)]
x = range(0, 7); wdth = 8
list(nwise(x, wdth)) = []
x = range(0, 7); wdth = 9
list(nwise(x, wdth)) Raised StopIteration!
x = range(0, 7); wdth = 10
list(nwise(x, wdth)) Raised StopIteration!

Single expression

The intermediate variables, tees, and skewed of function nwise can be removed to create a function of just one return expression:

def nwise_squashed(iterable, width):
    return zip(*(([next(t) for _ in range(position)], t)[1]
                 for position, t in enumerate(tee(iterable, width))))

x = range(7)
for wdth in range(11):
    try:
        print(f"{x = }; {wdth = }\n{list(nwise_squashed(x, wdth)) = }")
    except Exception as e:
        print(f"{x = }; {wdth = }\nlist(nwise_squashed(x, wdth)): {e}!")
print()

 It works, but I don't like it - function nwise_squashed is less maintainable for me than function nwise.


End Bit

I managed to generalise itertools.pairwise. A tricky bit was running next() appropriate times on tee'd streams in a comprehension; I'll remember the trick of using expressions in a tuple to change an iterator before returning the resultant via indexing.

END.