Mainly Tech projects on Python and Electronic Design Automation.

Thursday, December 07, 2017

Elixirs' chunk_every in Python


Another way to do it...

I had created a parallel process Python Program to solve a stack overflow users problem and decided to share it on reddit
Along came user aseigo with a solution written in the Elixir programming language.
My solution looked like this:
def calc_with_concurrency(s):
    cores = max(os.cpu_count() -2, 1)
    n = math.ceil(len(s) / max(1, cores - 1))
    sections = [s[i * n: (i + 1) * n + 3] for i in range(cores)]
    sections = [ss for ss in sections if len(ss) >= 3]
    with Pool(cores) as pool:
        counts = pool.map(triples, sections)
    double_counted = Counter([s[-3:] for s in sections[:-1]])
    tot = sum(counts, Counter()) - double_counted
    return tot
aseigo's solution was this:
input
|> Stream.chunk_every(3, 1, :discard)
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.reduce(fn -> %{} end,  fn trip, acc -> Map.update(acc, trip, 1, &(&1 + 1)) end)
|> Flow.reject(fn {_, 1} -> true
                       _ -> false end)

Comparison of solutions

In my Python solution I spent a lot of time working out how to:
  1. Split the string of characters, s into parts for each sub-proces.
  2. Have each sub-process chunk its fragment into threes and count the triples.
  3. Subtract the doubly-counted triples that appear at the split points as I use an overlap of three characters at each split.
The Elixir solution works differently:
  1. It calls chunk_every to stream the whole of its input as triples.
  2. It then splits the triples between its sub-processes.
  3. These sub-prcesses only have to count their recieved triples.
I was thinking that my method of splitting the input with an overlap, then removing the double-counting on the overlap might become a personal design pattern and I had made a mental note of this. I did wonder about the Elixir chunk_every: Wasits definition missing? was it a built-in? I decided to look it up.

Elixir's chunk_every

Elixir's chunk_every is indeed a language built-in function.
The docs state:
chunk_every(enumerable, count, step, leftover \\ [])
Returns list of lists containing count items each, where each new chunk starts step elements into the enumerable.
step is optional and, if not passed, defaults to count, i.e. chunks do not overlap.
If the last chunk does not have count elements to fill the chunk, elements are taken from leftover to fill in the chunk. If leftover does not have enough elements to fill the chunk, then a partial chunk is returned with less than count elements.
If :discard is given in leftover, the last chunk is discarded unless it has exactly count elements.

Examples

iex> Enum.chunk_every([1, 2, 3, 4, 5, 6], 2)
[[1, 2], [3, 4], [5, 6]]

iex> Enum.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, :discard)
[[1, 2, 3], [3, 4, 5]]

iex> Enum.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, [7])
[[1, 2, 3], [3, 4, 5], [5, 6, 7]]

iex> Enum.chunk_every([1, 2, 3, 4], 3, 3, [])
[[1, 2, 3], [4]]

iex> Enum.chunk_every([1, 2, 3, 4], 10)
[[1, 2, 3, 4]]

Does Python need its own?

I thought that by creating a Python version of chunk_every, then the function might proove useful in other cases and I could add it to my toolbox.
I conducted a web search and couldn't find anything like it in Python and so set about implementing the function.

The Python implementation

Function type

I want to stream input from any iterable and generate chunks of output as needed. This means I have to write a generator function with yield statements.

Parameters

  • The Python itertools functions use the name iterable and not enumerable, I'll switch.
  • The Elixir leftover parameter has two distinct types of values:
    1. A (possibly empty), container of fill values.
    2. Or the value :discard
I split and replace leftover by two distinct Python parameters of fillvalue and discard_partial.

What I want the Python to do

I had many questions about the Elixir chunk_every function that were not answered by the docs. In the end I installed Elixir and tried a few things; asked around; and finally decided to do what came naturally in the coding too.
  • chunk_every should always start from the first element, and produce chunks made from consecutive elements of length count in size (assuming its not complicated by end considerations).
  • When traversing the input iterable it should start a new chunk from every step'th item.
  • Chunks should be yielded when they reach length count
  • At the end of the iterable, fillvalue's items, (if any) should be appended to any remeaining chunks then, if they are of count in length, they too should be yielded, in order of their starting .
  • If there remains chunks that are too small, then they should only be yielded if discard_partial == False, otherwise they are discarded.
  • Yielded chunks should be tuples of items that can become dict keys.
Elixir will only yield one partial chunk at the end of the iterable, even though if count >= 2 * step there can be more than one incomplete chunk.

chunk_every in Python

In [52]:
from itertools import chain

def chunk_every(iterable, count, step=None, fillvalue=None, discard_partial=True):
    # Check arguments
    if step is None:
        step = count
    if fillvalue is None:
        fillvalue = []
    else: 
        fillvalue = list(fillvalue)
    if type(count) != int or type(step) != int or count <1 or step < 1:
        raise NotImplementedError
    chunks = []  # list of possibly multiple chunk accumulators
    for n, x in enumerate(iterable):
        if n >= count and (n - count) % step == 0:
            yield tuple(chunks.pop(0))  # time to yield the first chunk
        if n % step == 0:
            chunks.append([])  # Start accumulating another chunk
        for c in chunks:
            c.append(x)  # Extend current chunks with iterables latest element
    # End processing
    for c in chunks:
        c += fillvalue
        if len(c) < count and discard_partial:
            continue # skip yielding end chunk
        # else:
        yield tuple(c[:count])  # yield end chunks (possibly) extended by the fillvalue.

Elixir examples

lets try out the Python with the original Elixir docs chunk_every examples arguments. I turn the Python generators output into a list to show the output.
In [53]:
list(chunk_every([1, 2, 3, 4, 5, 6], 2))
Out[53]:
[(1, 2), (3, 4), (5, 6)]
In [54]:
list(chunk_every([1, 2, 3, 4, 5, 6], 3, 2))  # discard_partial=True is the default value
Out[54]:
[(1, 2, 3), (3, 4, 5)]
In [55]:
# In contrast to the above, this shows the short chunk accumulated at the end, but discarded above
list(chunk_every([1, 2, 3, 4, 5, 6], 3, 2, discard_partial=False))
Out[55]:
[(1, 2, 3), (3, 4, 5), (5, 6)]
In [56]:
list(chunk_every([1, 2, 3, 4, 5, 6], 3, 2, [7])) # That last previously short chunk from above is filled
Out[56]:
[(1, 2, 3), (3, 4, 5), (5, 6, 7)]
In [57]:
list(chunk_every([1, 2, 3, 4], 3, 3, [])) # Totally different answer to the Elixir doc!
Out[57]:
[(1, 2, 3)]
Now the example above comes about because of a sublte difference between the Python and Elixir API's.
The Elixir result makes sense if their discard feature defaults to False unless :default is explicitely stated as the value for their leftover parameter.
In Python that similar feature is controlled by a separate parameter, discard_partial which defaults to True.
(In previous examples, the default value for the feature would not change the output).
In [58]:
list(chunk_every([1, 2, 3, 4], 3, 3, [], discard_partial=False)) # Now same answer as the Elixir doc!
Out[58]:
[(1, 2, 3), (4,)]
In [59]:
list(chunk_every([1, 2, 3, 4], 10, discard_partial=False)) # Last Elixir example
Out[59]:
[(1, 2, 3, 4)]

Python examples

In [60]:
def ppchunk_every(iterable, count, step=None, fillvalue=None, discard_partial=True):
    'Prettyprinting wrapper for chunk_every generator'
    iterable = list(iterable)   # So it can be both printed and passed.
    fillvalue = None if fillvalue==None else list(fillvalue) # ditto.
    print(f'chunk_every({iterable}, count={count}, step={step}, fillvalue={fillvalue}, '
          f'discard_partial={discard_partial})\n  -> ', end='')
    print(list(chunk_every(iterable, count, step, fillvalue, discard_partial)))
In [61]:
ppchunk_every([1, 2, 3, 4, 5, 6], 2)
chunk_every([1, 2, 3, 4, 5, 6], count=2, step=None, fillvalue=None, discard_partial=True)
  -> [(1, 2), (3, 4), (5, 6)]

With increasing iterable, chunk into 3 on every third item

In [62]:
for size in range(2, 10):
    ppchunk_every(range(size), 3, 3, None, True)
chunk_every([0, 1], count=3, step=3, fillvalue=None, discard_partial=True)
  -> []
chunk_every([0, 1, 2], count=3, step=3, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2)]
chunk_every([0, 1, 2, 3], count=3, step=3, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2)]
chunk_every([0, 1, 2, 3, 4], count=3, step=3, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2)]
chunk_every([0, 1, 2, 3, 4, 5], count=3, step=3, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (3, 4, 5)]
chunk_every([0, 1, 2, 3, 4, 5, 6], count=3, step=3, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (3, 4, 5)]
chunk_every([0, 1, 2, 3, 4, 5, 6, 7], count=3, step=3, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (3, 4, 5)]
chunk_every([0, 1, 2, 3, 4, 5, 6, 7, 8], count=3, step=3, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (3, 4, 5), (6, 7, 8)]

Overlap chunks by using step < count

In [63]:
for size in range(2, 10):
    ppchunk_every(range(size), 3, 1, None, True)
chunk_every([0, 1], count=3, step=1, fillvalue=None, discard_partial=True)
  -> []
chunk_every([0, 1, 2], count=3, step=1, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2)]
chunk_every([0, 1, 2, 3], count=3, step=1, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (1, 2, 3)]
chunk_every([0, 1, 2, 3, 4], count=3, step=1, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (1, 2, 3), (2, 3, 4)]
chunk_every([0, 1, 2, 3, 4, 5], count=3, step=1, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5)]
chunk_every([0, 1, 2, 3, 4, 5, 6], count=3, step=1, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)]
chunk_every([0, 1, 2, 3, 4, 5, 6, 7], count=3, step=1, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6), (5, 6, 7)]
chunk_every([0, 1, 2, 3, 4, 5, 6, 7, 8], count=3, step=1, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6), (5, 6, 7), (6, 7, 8)]

Leave gaps between chunks by using step > count

In [64]:
for size in range(3, 12):
    ppchunk_every(range(size), 3, 4, None, True)
chunk_every([0, 1, 2], count=3, step=4, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2)]
chunk_every([0, 1, 2, 3], count=3, step=4, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2)]
chunk_every([0, 1, 2, 3, 4], count=3, step=4, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2)]
chunk_every([0, 1, 2, 3, 4, 5], count=3, step=4, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2)]
chunk_every([0, 1, 2, 3, 4, 5, 6], count=3, step=4, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (4, 5, 6)]
chunk_every([0, 1, 2, 3, 4, 5, 6, 7], count=3, step=4, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (4, 5, 6)]
chunk_every([0, 1, 2, 3, 4, 5, 6, 7, 8], count=3, step=4, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (4, 5, 6)]
chunk_every([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], count=3, step=4, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (4, 5, 6)]
chunk_every([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], count=3, step=4, fillvalue=None, discard_partial=True)
  -> [(0, 1, 2), (4, 5, 6), (8, 9, 10)]

(Partially) complete end chunks from fillvalues

In [65]:
for size in range(0, 5):
    ppchunk_every(range(size), 4, 1, [], True)
    ppchunk_every(range(size), 4, 1, [], False)
    ppchunk_every(range(size), 4, 1, 'F0 F1'.split(), True)
    ppchunk_every(range(size), 4, 1, 'F0 F1'.split(), False)
    print('===')
chunk_every([], count=4, step=1, fillvalue=[], discard_partial=True)
  -> []
chunk_every([], count=4, step=1, fillvalue=[], discard_partial=False)
  -> []
chunk_every([], count=4, step=1, fillvalue=['F0', 'F1'], discard_partial=True)
  -> []
chunk_every([], count=4, step=1, fillvalue=['F0', 'F1'], discard_partial=False)
  -> []
===
chunk_every([0], count=4, step=1, fillvalue=[], discard_partial=True)
  -> []
chunk_every([0], count=4, step=1, fillvalue=[], discard_partial=False)
  -> [(0,)]
chunk_every([0], count=4, step=1, fillvalue=['F0', 'F1'], discard_partial=True)
  -> []
chunk_every([0], count=4, step=1, fillvalue=['F0', 'F1'], discard_partial=False)
  -> [(0, 'F0', 'F1')]
===
chunk_every([0, 1], count=4, step=1, fillvalue=[], discard_partial=True)
  -> []
chunk_every([0, 1], count=4, step=1, fillvalue=[], discard_partial=False)
  -> [(0, 1), (1,)]
chunk_every([0, 1], count=4, step=1, fillvalue=['F0', 'F1'], discard_partial=True)
  -> [(0, 1, 'F0', 'F1')]
chunk_every([0, 1], count=4, step=1, fillvalue=['F0', 'F1'], discard_partial=False)
  -> [(0, 1, 'F0', 'F1'), (1, 'F0', 'F1')]
===
chunk_every([0, 1, 2], count=4, step=1, fillvalue=[], discard_partial=True)
  -> []
chunk_every([0, 1, 2], count=4, step=1, fillvalue=[], discard_partial=False)
  -> [(0, 1, 2), (1, 2), (2,)]
chunk_every([0, 1, 2], count=4, step=1, fillvalue=['F0', 'F1'], discard_partial=True)
  -> [(0, 1, 2, 'F0'), (1, 2, 'F0', 'F1')]
chunk_every([0, 1, 2], count=4, step=1, fillvalue=['F0', 'F1'], discard_partial=False)
  -> [(0, 1, 2, 'F0'), (1, 2, 'F0', 'F1'), (2, 'F0', 'F1')]
===
chunk_every([0, 1, 2, 3], count=4, step=1, fillvalue=[], discard_partial=True)
  -> [(0, 1, 2, 3)]
chunk_every([0, 1, 2, 3], count=4, step=1, fillvalue=[], discard_partial=False)
  -> [(0, 1, 2, 3), (1, 2, 3), (2, 3), (3,)]
chunk_every([0, 1, 2, 3], count=4, step=1, fillvalue=['F0', 'F1'], discard_partial=True)
  -> [(0, 1, 2, 3), (1, 2, 3, 'F0'), (2, 3, 'F0', 'F1')]
chunk_every([0, 1, 2, 3], count=4, step=1, fillvalue=['F0', 'F1'], discard_partial=False)
  -> [(0, 1, 2, 3), (1, 2, 3, 'F0'), (2, 3, 'F0', 'F1'), (3, 'F0', 'F1')]
===
In the last section above, at the end of the iterable, all chunks are filled from copies of the fillvalue; chunks of length count are yielded, then the discard_partial argument determines if any remaining short chunks are yielded.

2 comments:

  1. For note, Elixir's `Stream.chunk_every/4` is not a built-in function, but rather is just a normal function, it is defined at:
    https://github.com/elixir-lang/elixir/blob/v1.5.2/lib/elixir/lib/stream.ex#L172

    Which delegates to:
    https://github.com/elixir-lang/elixir/blob/v1.5.2/lib/elixir/lib/stream/reducers.ex#L5

    Thus you can see the entire implementation there. :-)

    In comparison, your Python version requires the entire input to be loaded into memory all at once, which is (in general) faster if the input is small, but slower if the input is huge.

    ReplyDelete
  2. No, the Python function is what is called a Python generator. It *lazily* consumes just enought of the input iterator to produce, (or yield), the next chunk.

    Thanks for the data on the Elixir implementation repository :-)

    ReplyDelete

Followers

Subscribe Now: google

Add to Google Reader or Homepage

Go deh too!

whos.amung.us

Blog Archive