Sunday, September 07, 2014

Find repeated blocks of lines in a text file: The block_lines tool.

I am doing some accelerated testing of a microcontroller running some diagnostic software that is put in a harsh environment and communicates its state and any firmware determinable errors it detects back to a controlling PC via a USB port.
Ultimately I decode from the data stream a series of appropriately named records interspersed with a few corrupted bytes from the remains of unrecoverable records.
When trying to assess what the microcontroller was doing, (as opposed to what it should be doing in normal operation), I found myself at one stage with a list of recieved records that simplify to something like this:
In [2]:
with open('example_records.log') as f:
    record_lines = f.read()
print(record_lines)
RESET1
RESET3
RESET1
RESET3
RESET1
RESET3
ERROR3
DATUM
ERROR3
DATUM
ERROR3
DATUM
CHANGE
RESET1
CHANGE
ERROR1
ERROR1
ERROR3
RESET1
ERROR3
RESET1
ERROR3
RESET1
ERROR1
STAGED
ERROR1
STAGED
DATUM
ERROR1
RESET1
DATUM
RESET1
ERROR2
DATUM
ERROR1
RESET1
DATUM
RESET1
ERROR2
DATUM
ERROR1
RESET1
DATUM
RESET1
ERROR2
ERROR2
RESET3
ERROR2
RESET3
RESET2
RESET1
RESET2
RESET1
ERROR2
RESET3
DATUM
CHANGE
RESET2
RESET1
RESET2
DATUM
CHANGE
DATUM
CHANGE
DATUM
CHANGE
RESET1
RESET1
ERROR1
ERROR3
ERROR2
RESET2
DATUM
RESET2
RESET3
STAGED
ERROR1
ERROR3
ERROR2
RESET2
DATUM
RESET2
RESET3
STAGED
ERROR1
ERROR3
ERROR2
RESET2
DATUM
RESET2
RESET3
STAGED
STAGED
CHANGE
ERROR3
ERROR2
RESET3
CHANGE
ERROR1
ERROR1


Meaning from chaos

When discussing with colleagues what was wrong with the above record sequence and what could be the cause, we spent a lot of time looking for repeated patterns in the series of records by eye.
I searched for, and could not find an existing tool to find repeated blocks of whole lines in a file. Stackoverflow gave me no more than my own suggestions so I decided to develop my own utility.

Fun with Regexps

My idea was to investigate if some regexp could extract repeated blocks of lines. If it's regexp exploration, then it is out with Kodos.
Cut'n'paste the example records into the Search string box, then I can develop the regexp in the top window.
In [2]:
from IPython.core.display import Image 
Image(filename='files/block_rep_in_kodos.png') 
Out[2]:
The essence of this first regexp of:
(?ms)(?P<repeat>(?P<lines>^.*?$)(?:\n(?P=lines))+)
is read in sections from the inside out as:
^.*?$             # Match a series of whole lines, non-greedily due to the ?
(?P<lines>^.*?$)  # Give the matched lines the name "lines"
(?:\n(?P=lines))+ # Match repetitions of '\n' followed by the group named "lines"
(?P<repeat>(?P<lines>^.*?$)(?:\n(?P=lines))+) # call the repeated block of lines "repeat"

The kodos Group window shows the first match where the two "lines" of "RESET13" are repeated three times to give the "repeat" group.
Use regexp.findall with the above regexp and I created the heart of my script.

The block_lines utility

The final utility was in three parts:
  1. block_gen, to create test data of records with optional debug information showing the repeats in the generated blocks
  2. block_rep, to extract and show the repeated blocks.
  3. block_exp, to expand output from block_rep so you can round-trip the data.

block_gen output

In this sample generated in debug mode, everything matching ' # .*' at the end of any line is debug comments showing what is repeated lines between curly braces), and by how many times.
For example in this:
In [3]:
RESET1 # 3 {}
RESET1
RESET1
RESET3 # 1 {
ERROR2
ERROR1 # }
STAGED # 1 {
ERROR1
CHANGE
ERROR3
RESET2
RESET1
CHANGE # }
ERROR3 # 2 {
RESET2
RESET2
STAGED
STAGED
ERROR2
RESET2
RESET1 # }
ERROR3
RESET2
RESET2
STAGED
STAGED
ERROR2
RESET2
RESET1
DATUM # 1 {
RESET3 # }
ERROR2 # 2 {
ERROR1 # }
ERROR2
ERROR1
DATUM # 1 {
DATUM
ERROR2 # }
ERROR2 # 1 {
DATUM
RESET1 # }
...

The comment of ' # 3 {}' means that the first line 'RESET1' is repeated three times.
The fourth line starts a block that occurs only 1 time and ends at line 6 because of the closing curly brace.

block_rep output

Although block_gen can show what repetitions were used when generating record sequences, block_rep is likely to find its own set of repeated blocks as the generator might generated a record repeated twice then generate the same record repeated three times more. block_rep may treat that as the one block repeated five times.
block_rep takes a file of lines and in debug mode strips off the ' #.
In [2]:
  3 {}  RESET1
  1 {   RESET3
        ERROR2
        ERROR1
        STAGED
        ERROR1
        CHANGE
        ERROR3
        RESET2
        RESET1
     }  CHANGE
  2 {   ERROR3
        RESET2
        RESET2
        STAGED
        STAGED
        ERROR2
        RESET2
     }  RESET1
  1 {   DATUM
     }  RESET3
  2 {   ERROR2
     }  ERROR1
  2 {}  DATUM
  2 {}  ERROR2
'''

Note that in this form the repetition information is on the left and although the repeat counts of each block is present, only the block that is repeated is shown - not the expanded repetitions.

Docopt

I used the docopt Python library for the argument handling of the final utility:
PS C:\Users\Paddy\Google Drive\Code> python .\block_lines -h

Extract repeated blocks of lines information from a file

Usage:
  block_lines (rep|exp) [options]  [INPUT-FILE [OUTPUT-FILE]]
  block_lines gen [options]  [OUTPUT-FILE]
  block_lines (-h|--help|--version)

Modes:
  rep               Find the repeated blocks of lines.
  exp               Expand the output from rep.
  gen               (Random gen of output suitable for example rep mode processing).

Options:
  -h --help         Show this screen.
  -v --version      Show version.
  -w N, --width N   Digits in field for repetition count [default: 4].
  -d --debug        Debug:
                      * In rep mode remove anything after '#' on all input lines.
                      * In gen mode annotate with repeat counts after '#'.

I/O Defaults:
  INPUT-FILE        Defaults to stdin.
  OUTPUT-FILE       Defaults to stdout.

Comments

  • Note that I did not have to think too hard about an algorithm for extracting repeated blocks - I make the regexp library take the strain.
  • It is fast enough for my purposes - I do slurp the whole file into memory at once for example, but that is fine for my real world data so no (over) optimization is needed.
  • The utility proved invaluable in being able to discuss and come to a consesnsus with colleagues about some experimental data we had. We could condense tens of thousands of lines, without bias, and reason about the result.
  • Real-world data might need pre-conditioning before being able to usufully apply the utility, for example: fixed fields of varying value such as indices, timestamps, and maybe usernames, might need to be deleted or replaced with a fixed dummy value.

Show me the code!

License: It's mine, mine mine do you hear me!
In [3]:
#!/bin/env python
"""
Extract repeated blocks of lines information from a file

Usage:
  block_lines (rep|exp) [options]  [INPUT-FILE [OUTPUT-FILE]]
  block_lines gen [options]  [OUTPUT-FILE]
  block_lines (-h|--help|--version)

Modes:
  rep               Find the repeated blocks of lines.
  exp               Expand the output from rep.
  gen               (Random gen of output suitable for example rep mode processing).

Options:
  -h --help         Show this screen.
  -v --version      Show version.
  -w N, --width N   Digits in field for repetition count [default: 4].
  -d --debug        Debug:
                      * In rep mode remove anything after '#' on all input lines.
                      * In gen mode annotate with repeat counts after '#'.

I/O Defaults:
  INPUT-FILE        Defaults to stdin.
  OUTPUT-FILE       Defaults to stdout.

"""
from __future__ import print_function

import re
import sys
import random

from docopt import docopt


__all__ = 'block_rep block_exp block_gen'.split()
__version__ = '0.0.1'


# noinspection PyShadowingNames,PyUnusedLocal
def _compressed(output, blines, brep, width):
    """output += annotated, repeated block of lines"""
    if len(blines) == 1:
        output += [' %*i {}  %s' % (width, brep, blines[0])]
    else:
        output += [' %*i {   %s' % (width, brep, blines[0])]
        output += [' %*s     %s' % (width, '', singleline)
                   for singleline in blines[1:-1]]
        output += [' %*s  }  %s' % (width, '', blines[-1])]


# noinspection PyShadowingNames
def block_rep(text, width=3):
    """return repeated blocks of lines in text with their repeat count"""
    output = []
    lastend = 0
    for match in re.finditer(r"""(?ms)(?P<repeat>(?P<lines>^.*?$)(?:\n(?P=lines))+)""", text):
        beginpos, endpos = match.span()
        if lastend < beginpos:  # Skipped a non-repeated, 'single' block
            _compressed(output,
                        blines=text[lastend:beginpos - 1].split('\n'),
                        brep=1,
                        width=width)
        bsize, repeatedlines = sorted(x.count('\n') + 1 for x in match.groupdict().values())
        _compressed(output,
                    blines=match.groupdict()['lines'].split('\n'),
                    brep=repeatedlines // bsize,
                    width=width)
        lastend = endpos + 1
    if lastend < len(text) - 1:  # Add the non-repeated, 'single' block at end
        _compressed(output,
                    blines=text[lastend:len(text)].split('\n'),
                    brep=1,
                    width=width)
    return '\n'.join(output)


# noinspection PyShadowingNames
def block_exp(text):
    """Expand lines"""
    # The column that lines start at after the block repetition info to the left.
    tagcolumn = len(re.search(r"""(?m)^\s+}  """, text).group())
    output = []
    for match in re.finditer(r"""(?msx)(?P<block>
                              (?P<multiline>
                                ^ \s* (?P<rep> \d+)  \s+ [{] \s\s\s (?P<first> .*?$)
                                (?P<middle> .*?$) \n
                                (?:  (?P<repcolumn>^\s+ } \s\s )(?P<last> [^\n]*))
                              )
                             |
                              (?P<singleline>
                                ^ \s* (?P<srep> \d+)  \s+ [{][}] \s\s (?P<only> .*?$)
                              )
                            )""", text):
        if match.group('multiline'):
            (rep, first, middle, repcolumn,
             last) = [match.group(name)
                      for name in 'rep, first, middle, repcolumn, last'.split(', ')]
            rep = int(rep)
            blocklines = [first]
            # The column that lines start at after the block repetition info to the left.
            tagcolumn = len(repcolumn)
            if middle and middle[0] == '\n':
                middle = middle[1:]
            blocklines += [line[tagcolumn:] for line in middle.split('\n') if middle]
            blocklines += [last]
            output += blocklines * rep
        elif match.group('singleline'):
            srep, only = [match.group(name) for name in 'srep, only'.split(', ')]
            srep = int(srep)
            output += [only] * srep
    return '\n'.join(output)


def block_gen(tags='DATUM ERROR1 ERROR2 ERROR3 CHANGE RESET1 RESET2 RESET3 STAGED'.split(),
              minblocks=100, debug=False):
    """Generate repeated blocks of lines to be later found by block_rep"""
    series = []
    while len(series) < minblocks:
        blocksize = min(len(tags), random.choice([1, 1, 2, 2, 2, 2, 3, 3, 3, 4, 4, 5, 6, 7, 8]))
        blockrep = random.choice([1, 1, 1, 2, 2, 3])
        blocklines = [random.choice(tags) for _ in range(blocksize)]
        expanded = blocklines * blockrep
        if debug:  # Add annotations
            expanded[0] += '\t# %i {' % blockrep  # Annotate with repeat count and block start
            if blocksize == 1:
                expanded[0] += '}'  # Annotate with end of 1 line block
            else:
                expanded[blocksize - 1] += '\t# }'  # Annotate with end of repeated block
        series += expanded
    return '\n'.join(series)


if __name__ == '__main__':
    arguments = docopt(__doc__, version=__version__)
    arguments["--width"] = int(arguments["--width"])
    if arguments["rep"] or arguments["exp"]:
        with (sys.stdin
              if arguments["INPUT-FILE"] is None
              else open(arguments["INPUT-FILE"], 'r')) as f:
            text = f.read()
    #
    if arguments["rep"] and arguments["--debug"]:
        # Remove mode gen-type  comments
        # noinspection PyUnboundLocalVariable
        text = re.sub(r'\s+# .*', '', text)
    #
    if arguments["rep"]:
        # noinspection PyUnboundLocalVariable
        output = block_rep(text, width=arguments["--width"])
    elif arguments["exp"]:
        # noinspection PyUnboundLocalVariable
        output = block_exp(text)
    elif arguments["gen"]:
        # noinspection PyUnboundLocalVariable
        output = block_gen(debug=arguments["--debug"])
    if arguments["rep"] or arguments["exp"] or arguments["gen"]:
        with (sys.stdout
              if arguments["OUTPUT-FILE"] is None
              else open(arguments["OUTPUT-FILE"], 'w')) as f:
            f.write(output)