Sunday, December 03, 2017

Triple counter - in parallel!


The Problem

Some one on stackoverflow mentioned a particular problem they were given in an interview:
Given a string of a million numbers (Pi for example), write a function/program that returns all repeating 3 digit numbers and number of repetition greater than 1
They went on to show a small example string of 123412345123456 were the answer is:
123 - 3 times
234 - 3 times
345 - 2 times

A dead end

I first thought of using re.findall but the words that the regular expression returns are all non-overlapping so I started again.

Straight forward solution

You need to count each triple; from sub strings s[0:3], s[1:4], s[2:5], ...
Out with collections.Counter for the counting and we get the following.
In [2]:
from collections import Counter

def triple_counter(s):
    c = Counter(s[n-3: n] for n in range(3, len(s)))
    for tri, n in c.most_common():
        if n > 1:
            print('%s - %i times.' % (tri, n))
        else:
            break

if __name__ == '__main__':
    import random

    #s = ''.join(random.choice('0123456789') for _ in range(1_000_000))
    s = '123412345123456'
    triple_counter(s)
123 - 3 times.
234 - 3 times.
345 - 2 times.
If you swap the s = ... lines you are able to run it with much larger strings.

Paralellisation

All was well until a commenter mentioned scenarios where if speed was an issue then he would move to a C language solution.
I thought of replying that C isn't necessarily the next speedup step to take, there are many ways that use the code you have, or with only slight changes.
On second thoughts, it seemed that one speedup, in straight Python, would be parallelisation using the multiprocessing module.

Separate Counting from printing

The printing of the counts is a separate concern from calculating the triples so I split that out
In [6]:
from collections import Counter
import random
from multiprocessing import Pool
import os
import math


def triples(s):
    return Counter(s[n-3: n] for n in range(3, len(s) + 1))

def triple_print(c):
    for tri, n in c.most_common():
        if n > 1:
            print('  %s - %i times.' % (tri, n))
        else:
            break
In [7]:
triple_print(triples('123412345123456'))
  123 - 3 times.
  234 - 3 times.
  345 - 2 times.

Split the calculation

If I have a million character string, and three processes to run the calculation, then I want to split the string twice into three near equal sections; perform the triple counting in the three parallel processes, then sum the results.

Count across junctions

if I have the original string s = '1231234' and split and count triples of the two sections s1 = '1231' and s2 = '234' I will miss out on counting any triples that go across the gap. Counts for triple '312' will be missing for example.
Trial and error lead me to the solution of splitting into sections with a three character overlap, then removing one count of the triple at the intersection of two sections as it will be counted twice.
In the previous example I might split s = '1231234' into s1 = '12312' and s2 = '31234' and decrement the count for triple '312' by one.

Getting Ready

The following function
  • Works serially.
  • Simulates an idea of splitting to run on a number of cores.
  • Uses map and not my usual list comprehensions as I know that parallel libraries usually have a straight forward interface to map functions. I wrote this and debugged it before moving on to running in multiple processes.
In [8]:
def calc_in_sections(s):
    cores = random.randint(1, min(32, len(s) // 3)) # use this many splits to simulate parallelism
    # Splits from 0..n+3, n..2n+3, 2n..3n+3, mn..(m+1)n+3 and (m+1)n+3 == len(s); m == cores
    n = math.ceil(len(s) / max(1, cores - 1))
    sections = [s[i * n: (i + 1) * n + 3] for i in range(cores)]
    sections = [ss for ss in sections if len(ss) >= 3]
    counts = map(triples, sections)
    double_counted = Counter([s[-3:] for s in sections[:-1]])
    tot = sum(counts, Counter()) - double_counted
    return tot, cores
In [10]:
tot, _ = calc_in_sections('123412345123456')
triple_print(tot)
  123 - 3 times.
  234 - 3 times.
  345 - 2 times.

The Parallelised version

There's not much changed:
  • I use all but two of the eight cores on my laptop.
  • I create a pool and call its pool.map method.
In [11]:
def calc_with_concurrency(s):
    cores = max(os.cpu_count() -2, 1)
    n = math.ceil(len(s) / max(1, cores - 1))
    sections = [s[i * n: (i + 1) * n + 3] for i in range(cores)]
    sections = [ss for ss in sections if len(ss) >= 3]
    with Pool(cores) as pool:
        counts = pool.map(triples, sections)
    double_counted = Counter([s[-3:] for s in sections[:-1]])
    tot = sum(counts, Counter()) - double_counted
    return tot
In [ ]:
# triple_print(calc_with_concurrency('123412345123456')) ## See note on execution environment

main

I exercised the code with the following. It uses all three functions for calculating triple count and compares them for equality. See my previous blog post for information on comparing Counters
In [ ]:
if __name__ == '__main__':
    s = '123412345123456'
    c = triples(s)
    assert c == Counter({'123': 3,
         '234': 3,
         '345': 2,
         '341': 1,
         '412': 1,
         '451': 1,
         '456': 1,
         '512': 1})
    print(f'\n{s} ->')
    triple_print(c)
    for _ in range(10):
        digits = random.randint(10000, 50000)
        s = ''.join(random.choice('0123456789') for _ in range(digits))
        print(f'Checking {digits} long string for triples calculated in three ways, including using concurrency.')
        # In one go
        c = triples(s)
        # In sections
        tot, cores = calc_in_sections(s)
        # In parallel
        ptot = calc_with_concurrency(s)
        assert +c == +tot
        assert +c == +ptot

Summary

The parallel code was much more difficult, but the difficulty was in the sectioning. Once that was done and the algorithm mapped and summed multiple sections then the multiprocessing library was trivial to insert.

END

P.S. Note on execution environment.

The calc_with_concurrency function was developed and ran as a stand alone script and will not run from IPython or Jupyter.

2 comments:

  1. You first solution is the same as MapReduce processing in Hadoop.

    ReplyDelete
    Replies
    1. ... Okay. Not sure when data size makes hadoop and its harnessing of multiple machines become necessary, but that is another parallelization option, yes.

      Delete