I am doing some accelerated testing of a microcontroller running some diagnostic software that is put in a harsh environment and communicates its state and any firmware determinable errors it detects back to a controlling PC via a USB port.
Ultimately I decode from the data stream a series of appropriately named records interspersed with a few corrupted bytes from the remains of unrecoverable records.
When trying to assess what the microcontroller was doing, (as opposed to what it should be doing in normal operation), I found myself at one stage with a list of recieved records that simplify to something like this:
In [2]:
with open('example_records.log') as f:
record_lines = f.read()
print(record_lines)
Meaning from chaos
When discussing with colleagues what was wrong with the above record sequence and what could be the cause, we spent a lot of time looking for repeated patterns in the series of records by eye.
I searched for, and could not find an existing tool to find repeated blocks of whole lines in a file. Stackoverflow gave me no more than my own suggestions so I decided to develop my own utility.
Fun with Regexps
My idea was to investigate if some regexp could extract repeated blocks of lines. If it's regexp exploration, then it is out with Kodos.
Cut'n'paste the example records into the Search string box, then I can develop the regexp in the top window.
In [2]:
from IPython.core.display import Image
Image(filename='files/block_rep_in_kodos.png')
Out[2]:
The essence of this first regexp of:
(?ms)(?P<repeat>(?P<lines>^.*?$)(?:\n(?P=lines))+)
is read in sections from the inside out as:^.*?$ # Match a series of whole lines, non-greedily due to the ? (?P<lines>^.*?$) # Give the matched lines the name "lines"
(?:\n(?P=lines))+ # Match repetitions of '\n' followed by the group named "lines" (?P <repeat>(?P <lines>^.*?$)(?:\n(?P=lines))+) # call the repeated block of lines "repeat"
The kodos Group window shows the first match where the two "lines" of "RESET13" are repeated three times to give the "repeat" group.
Use regexp.findall with the above regexp and I created the heart of my script.
The block_lines utility
The final utility was in three parts:
- block_gen, to create test data of records with optional debug information showing the repeats in the generated blocks
- block_rep, to extract and show the repeated blocks.
- block_exp, to expand output from block_rep so you can round-trip the data.
block_gen output
In this sample generated in debug mode, everything matching ' # .*' at the end of any line is debug comments showing what is repeated lines between curly braces), and by how many times.
For example in this:
In [3]:
RESET1 # 3 {}
RESET1
RESET1
RESET3 # 1 {
ERROR2
ERROR1 # }
STAGED # 1 {
ERROR1
CHANGE
ERROR3
RESET2
RESET1
CHANGE # }
ERROR3 # 2 {
RESET2
RESET2
STAGED
STAGED
ERROR2
RESET2
RESET1 # }
ERROR3
RESET2
RESET2
STAGED
STAGED
ERROR2
RESET2
RESET1
DATUM # 1 {
RESET3 # }
ERROR2 # 2 {
ERROR1 # }
ERROR2
ERROR1
DATUM # 1 {
DATUM
ERROR2 # }
ERROR2 # 1 {
DATUM
RESET1 # }
...
The comment of ' # 3 {}' means that the first line 'RESET1' is repeated three times.
The fourth line starts a block that occurs only 1 time and ends at line 6 because of the closing curly brace.
The fourth line starts a block that occurs only 1 time and ends at line 6 because of the closing curly brace.
block_rep output
Although block_gen can show what repetitions were used when generating record sequences, block_rep is likely to find its own set of repeated blocks as the generator might generated a record repeated twice then generate the same record repeated three times more. block_rep may treat that as the one block repeated five times.
block_rep takes a file of lines and in debug mode strips off the ' #.
In [2]:
3 {} RESET1
1 { RESET3
ERROR2
ERROR1
STAGED
ERROR1
CHANGE
ERROR3
RESET2
RESET1
} CHANGE
2 { ERROR3
RESET2
RESET2
STAGED
STAGED
ERROR2
RESET2
} RESET1
1 { DATUM
} RESET3
2 { ERROR2
} ERROR1
2 {} DATUM
2 {} ERROR2
'''
Note that in this form the repetition information is on the left and although the repeat counts of each block is present, only the block that is repeated is shown - not the expanded repetitions.
Docopt
I used the docopt Python library for the argument handling of the final utility:PS C:\Users\Paddy\Google Drive\Code> python .\block_lines -h Extract repeated blocks of lines information from a file Usage: block_lines (rep|exp) [options] [INPUT-FILE [OUTPUT-FILE]] block_lines gen [options] [OUTPUT-FILE] block_lines (-h|--help|--version) Modes: rep Find the repeated blocks of lines. exp Expand the output from rep. gen (Random gen of output suitable for example rep mode processing). Options: -h --help Show this screen. -v --version Show version. -w N, --width N Digits in field for repetition count [default: 4]. -d --debug Debug: * In rep mode remove anything after '#' on all input lines. * In gen mode annotate with repeat counts after '#'. I/O Defaults: INPUT-FILE Defaults to stdin. OUTPUT-FILE Defaults to stdout.
Comments
- Note that I did not have to think too hard about an algorithm for extracting repeated blocks - I make the regexp library take the strain.
- It is fast enough for my purposes - I do slurp the whole file into memory at once for example, but that is fine for my real world data so no (over) optimization is needed.
- The utility proved invaluable in being able to discuss and come to a consesnsus with colleagues about some experimental data we had. We could condense tens of thousands of lines, without bias, and reason about the result.
- Real-world data might need pre-conditioning before being able to usufully apply the utility, for example: fixed fields of varying value such as indices, timestamps, and maybe usernames, might need to be deleted or replaced with a fixed dummy value.
Show me the code!
License: It's mine, mine mine do you hear me!
In [3]:
#!/bin/env python
"""
Extract repeated blocks of lines information from a file
Usage:
block_lines (rep|exp) [options] [INPUT-FILE [OUTPUT-FILE]]
block_lines gen [options] [OUTPUT-FILE]
block_lines (-h|--help|--version)
Modes:
rep Find the repeated blocks of lines.
exp Expand the output from rep.
gen (Random gen of output suitable for example rep mode processing).
Options:
-h --help Show this screen.
-v --version Show version.
-w N, --width N Digits in field for repetition count [default: 4].
-d --debug Debug:
* In rep mode remove anything after '#' on all input lines.
* In gen mode annotate with repeat counts after '#'.
I/O Defaults:
INPUT-FILE Defaults to stdin.
OUTPUT-FILE Defaults to stdout.
"""
from __future__ import print_function
import re
import sys
import random
from docopt import docopt
__all__ = 'block_rep block_exp block_gen'.split()
__version__ = '0.0.1'
# noinspection PyShadowingNames,PyUnusedLocal
def _compressed(output, blines, brep, width):
"""output += annotated, repeated block of lines"""
if len(blines) == 1:
output += [' %*i {} %s' % (width, brep, blines[0])]
else:
output += [' %*i { %s' % (width, brep, blines[0])]
output += [' %*s %s' % (width, '', singleline)
for singleline in blines[1:-1]]
output += [' %*s } %s' % (width, '', blines[-1])]
# noinspection PyShadowingNames
def block_rep(text, width=3):
"""return repeated blocks of lines in text with their repeat count"""
output = []
lastend = 0
for match in re.finditer(r"""(?ms)(?P<repeat>(?P<lines>^.*?$)(?:\n(?P=lines))+) """ , text):
beginpos, endpos = match.span()
if lastend < beginpos: # Skipped a non-repeated, 'single' block
_compressed(output,
blines=text[lastend:beginpos - 1].split('\n'),
brep=1,
width=width)
bsize, repeatedlines = sorted(x.count('\n') + 1 for x in match.groupdict().values())
_compressed(output,
blines=match.groupdict()['lines'].split('\n'),
brep=repeatedlines // bsize,
width=width)
lastend = endpos + 1
if lastend < len(text) - 1: # Add the non-repeated, 'single' block at end
_compressed(output,
blines=text[lastend:len(text)].split('\n'),
brep=1,
width=width)
return '\n'.join(output)
# noinspection PyShadowingNames
def block_exp(text):
"""Expand lines"""
# The column that lines start at after the block repetition info to the left.
tagcolumn = len(re.search(r"""(?m)^\s+} """, text).group())
output = []
for match in re.finditer(r"""(?msx)(?P<block>
(?P<multiline>
^ \s* (?P<rep> \d+) \s+ [{] \s\s\s (?P<first> .*?$)
(?P<middle> .*?$) \n
(?: (?P<repcolumn>^\s+ } \s\s )(?P<last> [^\n]*))
)
|
(?P<singleline>
^ \s* (?P<srep> \d+) \s+ [{][}] \s\s (?P<only> .*?$)
)
)""", text):
if match.group('multiline'):
(rep, first, middle, repcolumn,
last) = [match.group(name)
for name in 'rep, first, middle, repcolumn, last'.split(', ')]
rep = int(rep)
blocklines = [first]
# The column that lines start at after the block repetition info to the left.
tagcolumn = len(repcolumn)
if middle and middle[0] == '\n':
middle = middle[1:]
blocklines += [line[tagcolumn:] for line in middle.split('\n') if middle]
blocklines += [last]
output += blocklines * rep
elif match.group('singleline'):
srep, only = [match.group(name) for name in 'srep, only'.split(', ')]
srep = int(srep)
output += [only] * srep
return '\n'.join(output)
def block_gen(tags='DATUM ERROR1 ERROR2 ERROR3 CHANGE RESET1 RESET2 RESET3 STAGED'.split(),
minblocks=100, debug=False):
"""Generate repeated blocks of lines to be later found by block_rep"""
series = []
while len(series) < minblocks:
blocksize = min(len(tags), random.choice([1, 1, 2, 2, 2, 2, 3, 3, 3, 4, 4, 5, 6, 7, 8]))
blockrep = random.choice([1, 1, 1, 2, 2, 3])
blocklines = [random.choice(tags) for _ in range(blocksize)]
expanded = blocklines * blockrep
if debug: # Add annotations
expanded[0] += '\t# %i {' % blockrep # Annotate with repeat count and block start
if blocksize == 1:
expanded[0] += '}' # Annotate with end of 1 line block
else:
expanded[blocksize - 1] += '\t# }' # Annotate with end of repeated block
series += expanded
return '\n'.join(series)
if __name__ == '__main__':
arguments = docopt(__doc__, version=__version__)
arguments["--width"] = int(arguments["--width"])
if arguments["rep"] or arguments["exp"]:
with (sys.stdin
if arguments["INPUT-FILE"] is None
else open(arguments["INPUT-FILE"], 'r')) as f:
text = f.read()
#
if arguments["rep"] and arguments["--debug"]:
# Remove mode gen-type comments
# noinspection PyUnboundLocalVariable
text = re.sub(r'\s+# .*', '', text)
#
if arguments["rep"]:
# noinspection PyUnboundLocalVariable
output = block_rep(text, width=arguments["--width"])
elif arguments["exp"]:
# noinspection PyUnboundLocalVariable
output = block_exp(text)
elif arguments["gen"]:
# noinspection PyUnboundLocalVariable
output = block_gen(debug=arguments["--debug"])
if arguments["rep"] or arguments["exp"] or arguments["gen"]:
with (sys.stdout
if arguments["OUTPUT-FILE"] is None
else open(arguments["OUTPUT-FILE"], 'w')) as f:
f.write(output)
No comments:
Post a Comment