Merging large streams
Merging two or more huge sources of ordered data into one.
I was thinking about order in data, and more specifically, how easy it is for me to destroy order when processing input data only to then need to apply the same ordering to the result. For exsmple , it is so simple to just do something like:
merged_list = sorted(ordered_list1 + ordered_list2)
To merge ordered lists, especisally when you know that Timsort's algorithm will search for, and optimise the sorting of "runs" of pre-ordered values.
I was looking for an algorithm for data too big to process at once due to size, or in which the size is unknown such as data feeds from a pipes/ports and needing to generate merged values as soon as possible, possibly to an output pipe/port.
Modelling
I model input streams as iterators genersting increasing randomised ints. The merger is a generator function that consumes just enough of its input iterator streams to generate successive output values; its first value can be generated from reading the first values of each of its input iterators, for example.
Two input streams merged
1
from enum import Enum
2
3
4
def merge_2s(lhs, rhs):
5
"""
6
Two ordered iterator streams of unknown length
7
-> merged iterator generating data as soon as possible
8
"""
9
10
State = Enum('MergeState', 'STOP, DATA, NEXT')
11
STOP = State.STOP # Iterator finished
12
DATA = State.DATA # Have iterator data for comparisons
13
NEXT = State.NEXT # Need next item from iterator
14
15
state_l = state_r = NEXT
16
while state_l != STOP or state_r != STOP:
17
if state_l == NEXT:
18
try:
19
state_l = DATA
20
datum_l = next(lhs)
21
except StopIteration:
22
state_l = STOP
23
if state_r == NEXT:
24
try:
25
state_r = DATA
26
datum_r = next(rhs)
27
except StopIteration:
28
state_r = STOP
29
#
30
if state_l == state_r == DATA: # Merge?
31
if datum_l < datum_r:
32
yield datum_l
33
state_l = NEXT
34
else:
35
yield datum_r
36
state_r = NEXT
37
elif state_r == STOP and state_l == DATA: # Drain lhs?
38
yield datum_l
39
yield from lhs
40
break
41
elif state_l == STOP and state_r == DATA: # Drain rhs?
42
yield datum_r
43
yield from rhs
44
break
45
return
Two separate state variables for the left and right input streams control how they are to be processed in the functions loop: is the stream empty? do we have a value from the stream? Do we need to get the next value from the stream?
Only one value is read from any stream at any one time to yield an output merged value, (after the initial read of one value from each input stream). The merge is independent of the size of input streams.
Multiple input streams merged
1
def merge_s(nstreams):
2
"""
3
N ordered iterator streams of unknown length
4
-> merged iterator generating data as soon as possible
5
"""
6
nstreams = nstreams.copy() # Preserve argument
7
8
# Merge machine state enumberation
9
State = Enum('MergeState', 'STOP, DATA, NEXT')
10
STOP = State.STOP # Iterator finished
11
DATA = State.DATA # Have iterator data for comparisons
12
NEXT = State.NEXT # Request next item from iterator
13
14
data = [None for _ in nstreams]
15
states = [NEXT for _ in nstreams]
16
while any(st != STOP for st in states):
17
finished = 0
18
for i, (st, stream) in enumerate(zip(states, nstreams)):
19
if st == NEXT: # Next from stream?
20
try:
21
states[i], data[i] = DATA, next(stream)
22
except StopIteration:
23
states[i], data[i] = STOP, None
24
finished += 1
25
if finished: # purge STOPped streams
26
nstreams = [x for x, st in zip(nstreams, states) if st != STOP]
27
data = [x for x, st in zip(data, states) if st != STOP]
28
states = [st for st in states if st != STOP]
29
#
30
if len(nstreams) == 1: # Drain final stream?
31
yield data[0]
32
yield from nstreams[0]
33
data[0], states[0] = None, STOP
34
elif states: # Merge the minimal
35
datum, i = min((datum, i)
36
for i, (st, datum) in enumerate(zip(states, data))
37
if st == DATA)
38
yield datum
39
states[i] = NEXT
40
return
This merges any numnber of input streams in a similar fashion. A list of input streams is given as an argument, and a listy of state stracking values is used instead of individual ones. streams are removed from lists when they are emptied, and, as before, if we get down to only one active input stream then we just yield from it
TESTING
Given two lists ordered lists then we can generate a target answer which is just the two lists concatenated and sorted into
merged_lists
. We can create input streams by calling iter
on each list for the merge generators functions to work from then check the output by turning it into a list and comparing to merged_lists
.
1
if __name__ == '__main__':
2
listl, listr = [1, 3, 5, 7], [0, 2, 4, 6, 7, 11]
3
merged_lists = sorted(listl + listr) # Target answer
4
5
print(f"Given left and right streams of {listl} and {listr} as iterators")
6
lstream, rstream = iter(listl), iter(listr)
7
answer_merge_lr = merge_2s(lstream, rstream)
8
lstream, rstream = iter(listl), iter(listr)
9
answer_merge_rl = merge_2s(rstream, lstream)
10
lstream, rstream = iter(listl), iter(listr)
11
answer_merge_slr = merge_s([lstream, rstream])
12
lstream, rstream = iter(listl), iter(listr)
13
answer_merge_srl = merge_s([rstream, lstream])
14
15
assert list(answer_merge_lr) == merged_lists
16
assert list(answer_merge_rl) == merged_lists
17
assert list(answer_merge_slr) == merged_lists
18
assert list(answer_merge_srl) == merged_lists
19
print("\n Streaming merges of two streams correct\n")
Given left and right streams of [1, 3, 5, 7] and [0, 2, 4, 6, 7, 11] as iterators Streaming merges of two streams correct
Testing three streams
Three streams of different lengths tested in all orderings of streams
1
if __name__ == '__main__':
2
from itertools import permutations
3
lists = [[0, 1, 3, 6, 9], [4, 7, 10, 11, 12, 16], [2, 5, 8, 10, 13, 14, 15, 20, 21]]
4
merged_lists = sorted(sum(lists, [])) # Target answer
5
6
print(f"Given {len(lists)} streams of iterators:", end='\n ')
7
print('\n '.join(f"{lst}" for lst in lists))
8
for list_perm in permutations((lists)):
9
streams = [iter(x) for x in list_perm]
10
answer_merge_s = merge_s(streams)
11
assert list(answer_merge_s) == merged_lists
12
print(f"\n Streaming merges of {len(lists)} streams correct\n")
Given 3 streams of iterators: [0, 1, 3, 6, 9] [4, 7, 10, 11, 12, 16] [2, 5, 8, 10, 13, 14, 15, 20, 21] Streaming merges of 3 streams correct
Constrained Random testing
Generate
N
tests with randomised number of input streams; randomised number of input stream lengths; randomised input stream ordered values.
As before, data is generated this time randomly, as lists - the lists summed and sorted to form the merge_target, then converted to streams and fed to
merge_s
.
1
if __name__ == '__main__':
2
import random
3
4
print(f"\n## \n## RANDOM SOAK TEST\n##")
5
N = 10_000
6
7
def weighted_choice(item_weights, bincount=1_000):
8
'''\
9
Weighted choice of item from (item, weight), ... pairs.
10
11
Puts items in bins in proportion to probs
12
then uses random.choice() to select items.
13
14
Larger bincount for more memory use but
15
higher accuracy (on avarage).
16
'''
17
18
bins = []
19
weights = sum(iw[1] for iw in item_weights)
20
for item, wt in item_weights:
21
bins += [item] * int(bincount * wt / weights)
22
while True:
23
yield random.choice(bins)
24
25
26
stream_counter = weighted_choice([(0, 1), [1, 2]] +
27
[(n, 5) for n in range(2, 10)])
28
29
stream_length_counter = weighted_choice([(0, 1), [1, 2]] +
30
[(n, 5) for n in range(2, 35)])
31
32
for i in range(N):
33
stream_count = next(stream_counter)
34
stream_lengths = [next(stream_length_counter)
35
for _ in range(stream_count)]
36
37
lists = [sorted(random.randint(-100, 100) for _ in range(slen))
38
for slen in stream_lengths]
39
40
merged_lists = sorted(sum(lists, [])) # Target answer
41
streams = [iter(x) for x in lists]
42
answer_merge_s = merge_s(streams)
43
44
assert list(answer_merge_s) == merged_lists
45
46
print(f"\n Soak test of {N:_} random sets of random streams correct\n")
47
## ## RANDOM SOAK TEST ## Soak test of 10_000 random sets of random streams correct
The above passes now. During development it did fail on the equivalent of
merge_s([iter([])])
. I created the following set of extra 'directed' tests of this, and similar inputsDirected tests
1
if __name__ == '__main__':
2
print("\n\n##\n## DIRECTED TESTS:\n##\n")
3
4
# Previousely found fail
5
try:
6
merge_s([iter([])]).__next__()
7
except StopIteration:
8
pass
9
# Around this fail...
10
for x in range(10):
11
try:
12
merge_s([iter(list()) for _ in range(x)]).__next__()
13
except StopIteration:
14
pass
15
16
print(" Directed tests pass.")
## ## DIRECTED TESTS: ## Directed tests pass.
Review
Created a generator function to merge multiple ordered input streams needing little memory, and yielding values while holding only one value from each of its input streams.
Ran simple "shakedown" tests then built a Coonstrained Random framework of tests to disclose hidden bugs.
End.
Last saved 43 minutes
Python 3 | not connected
No comments:
Post a Comment