In [52]:
from itertools import chain
def chunk_every(iterable, count, step=None, fillvalue=None, discard_partial=True):
# Check arguments
if step is None:
step = count
if fillvalue is None:
fillvalue = []
else:
fillvalue = list(fillvalue)
if type(count) != int or type(step) != int or count <1 or step < 1:
raise NotImplementedError
chunks = [] # list of possibly multiple chunk accumulators
for n, x in enumerate(iterable):
if n >= count and (n - count) % step == 0:
yield tuple(chunks.pop(0)) # time to yield the first chunk
if n % step == 0:
chunks.append([]) # Start accumulating another chunk
for c in chunks:
c.append(x) # Extend current chunks with iterables latest element
# End processing
for c in chunks:
c += fillvalue
if len(c) < count and discard_partial:
continue # skip yielding end chunk
# else:
yield tuple(c[:count]) # yield end chunks (possibly) extended by the fillvalue.
In [53]:
list(chunk_every([1, 2, 3, 4, 5, 6], 2))
Out[53]:
In [54]:
list(chunk_every([1, 2, 3, 4, 5, 6], 3, 2)) # discard_partial=True is the default value
Out[54]:
In [55]:
# In contrast to the above, this shows the short chunk accumulated at the end, but discarded above
list(chunk_every([1, 2, 3, 4, 5, 6], 3, 2, discard_partial=False))
Out[55]:
In [56]:
list(chunk_every([1, 2, 3, 4, 5, 6], 3, 2, [7])) # That last previously short chunk from above is filled
Out[56]:
In [57]:
list(chunk_every([1, 2, 3, 4], 3, 3, [])) # Totally different answer to the Elixir doc!
Out[57]:
In [58]:
list(chunk_every([1, 2, 3, 4], 3, 3, [], discard_partial=False)) # Now same answer as the Elixir doc!
Out[58]:
In [59]:
list(chunk_every([1, 2, 3, 4], 10, discard_partial=False)) # Last Elixir example
Out[59]:
In [60]:
def ppchunk_every(iterable, count, step=None, fillvalue=None, discard_partial=True):
'Prettyprinting wrapper for chunk_every generator'
iterable = list(iterable) # So it can be both printed and passed.
fillvalue = None if fillvalue==None else list(fillvalue) # ditto.
print(f'chunk_every({iterable}, count={count}, step={step}, fillvalue={fillvalue}, '
f'discard_partial={discard_partial})\n -> ', end='')
print(list(chunk_every(iterable, count, step, fillvalue, discard_partial)))
In [61]:
ppchunk_every([1, 2, 3, 4, 5, 6], 2)
In [62]:
for size in range(2, 10):
ppchunk_every(range(size), 3, 3, None, True)
In [63]:
for size in range(2, 10):
ppchunk_every(range(size), 3, 1, None, True)
In [64]:
for size in range(3, 12):
ppchunk_every(range(size), 3, 4, None, True)
In [65]:
for size in range(0, 5):
ppchunk_every(range(size), 4, 1, [], True)
ppchunk_every(range(size), 4, 1, [], False)
ppchunk_every(range(size), 4, 1, 'F0 F1'.split(), True)
ppchunk_every(range(size), 4, 1, 'F0 F1'.split(), False)
print('===')
For note, Elixir's `Stream.chunk_every/4` is not a built-in function, but rather is just a normal function, it is defined at:
ReplyDeletehttps://github.com/elixir-lang/elixir/blob/v1.5.2/lib/elixir/lib/stream.ex#L172
Which delegates to:
https://github.com/elixir-lang/elixir/blob/v1.5.2/lib/elixir/lib/stream/reducers.ex#L5
Thus you can see the entire implementation there. :-)
In comparison, your Python version requires the entire input to be loaded into memory all at once, which is (in general) faster if the input is small, but slower if the input is huge.
No, the Python function is what is called a Python generator. It *lazily* consumes just enought of the input iterator to produce, (or yield), the next chunk.
ReplyDeleteThanks for the data on the Elixir implementation repository :-)