Note: We no longer publish the latest version of our code here. We primarily use a kumc-bmi github organization. The heron ETL repository, in particular, is not public. Peers in the informatics community should see MultiSiteDev for details on requesting access.

source: heron_load/log/etl_log_review.py @ 0:42ad7288920a

heron-michigan tip
Last change on this file since 0:42ad7288920a was 0:42ad7288920a, checked in by Matt Hoag <mhoag@…>, 6 years ago

Merge with demo_concepts_3800

File size: 5.5 KB
Line 
1'''summarize ETL logs
2
3cf xunit_log.py
4
5TODO: concentrate logging around run_script
6  - log variables
7TODO: log hg sum
8
9'''
10
11import csv
12import logging
13from collections import namedtuple
14
15import pkg_resources as pkg
16
17log = logging.getLogger(__name__)
18
19
20def main(argv, open_arg, stdout):
21    since, logfilenames = getopt(argv)
22
23    items = log_items((open_arg(n) for n in logfilenames))
24
25    for line in summary(current_items(items, since=since)):
26        stdout.write(line + '\n')
27
28
29def getopt(argv):
30    if '--since' in argv:
31        ix = argv.index('--since')
32        since = argv[ix + 1]
33        del argv[ix:ix + 2]
34    else:
35        since = None
36    logfilenames = argv[1:]
37    return since, logfilenames
38
39
40def log_items(streams):
41    r'''
42    >>> l = log_items([_test_stream(), _test_stream()])
43    >>> len(l)
44    34
45
46    >>> l[0][:4]
47    ['time', 'usec', 'level', 'log_path_name']
48    >>> l[1][:4]
49    ['2013-08-26 16:46:00', '581', 'INFO', 'heron.pavement']
50    '''
51    return [fields
52            for stream in streams
53            for fields in csv.reader(stream)]
54
55
56EntrySchema = ('time,usec,level,log_path_name,message,dur'
57               ',skip,parent,seqno,detail').split(',')
58EntryWidth = len(EntrySchema) - 1
59Entry = namedtuple('Entry', EntrySchema)
60
61
62def make_entry(item):
63    return Entry(*(item[:EntryWidth]
64                   + [item[EntryWidth:]]))
65
66
67def summary(items):
68    '''
69    >>> l = current_items(log_items([_test_stream()]))
70    >>> s = summary(l)
71    >>> len(s)
72    2
73
74    TODO: test for completed script
75
76    >>> s[0]
77    '2013-08-26 17:05:47 [0/2]   curated_data_seer_recode_terms_csv'
78    '''
79    test_start, test_info, test_warn, test_failure = [
80        index_items(items, level)
81        for level in ['DEBUG', 'INFO', 'WARNING', 'ERROR']]
82
83    test_end = dict(test_info.items() + test_warn.items()
84                    + test_failure.items())
85
86    scripts = [t for (seqno, t) in sorted(test_start.items())
87               if t.detail[:1] in (['script'], ['task'])]
88
89    unfinished = sorted(set(test_start.keys())
90                        - set(test_end.keys()))
91
92    script_summary = [fmt_script(s,
93                                 test_end.get(int(s.seqno), None),
94                                 test_failure.get(int(s.seqno), None))
95                      for s in scripts if int(s.seqno) not in unfinished]
96
97    unfinished_summary = [fmt_unfinished(t) for t in
98                          [test_start[seqno] for seqno in unfinished]]
99    return script_summary + unfinished_summary
100
101
102def index_items(items, level):
103    return dict([(int(item[8]), make_entry(item))
104                 for item in items
105                 if len(item) > 2 and item[2] == level])
106
107
108def fmt_script(start, end, failure):
109    r'''
110    >>> rows = list(csv.reader(_test_stream()))
111    >>> s = make_entry(rows[3])
112    >>> e = s._replace(time='2013-08-13 13:48:00', usec='377',
113    ...                level='INFO', dur='0:02:31.126001')
114    >>> fmt_script(s, e, None)[11:]
115    '16:46:00\tfor\t0:02:31.126001\t1\t  curated_data_zips_near_66160_csv'
116
117    '''
118    script_name = start.detail[1]
119    depth = start.log_path_name.count('.')
120    return '%s\tfor\t%s\t%d\t%s%s%s' % (
121        start.time, end.dur, depth, '  ' * depth,
122        script_name,
123        '\tFAILED:' + failure.detail[3] if failure else '')
124
125
126def fmt_unfinished(start):
127    '''
128    >>> rows = list(csv.reader(_test_stream()))
129    >>> s = make_entry(rows[3])
130    >>> fmt_unfinished(s)
131    '2013-08-26 16:46:00 [0/3]   curated_data_zips_near_66160_csv'
132    '''
133    description = ('line %s' % start.detail[3]
134                   if start.detail[:1] == ['code']
135                   # strip off 'heron.'
136                   else '.'.join(start.log_path_name.split('.')[1:]))
137    depth = start.log_path_name.count('.')
138    return '%s [%s/%s] %s%s' % (
139        start.time, start.parent, start.seqno,
140        '  ' * int(depth), description)
141
142
143def current_items(items, since=None):
144    '''Get items since the last logging_start.
145
146    >>> all = log_items([_test_stream()])
147
148    >>> l = current_items(all)
149    >>> l[0][:4]
150    ['2013-08-26 17:05:47', '134', 'INFO', 'heron.kumc_etl.load_seer_terms']
151    >>> len(l)
152    7
153
154    >>> l = current_items(all, since='2013-08-13 13')
155    >>> len(l)
156    9
157    >>> l[0][0:2]
158    ['2013-08-26 16:46:00', '581']
159    >>> l[-2][0:2]
160    ['2013-08-26 16:46:00', '730']
161    '''
162    indexed = enumerate(items)
163
164    starts = [ix + 1 for (ix, item) in indexed
165              if item[0] == 'time']
166
167    if not starts:
168        raise ValueError('no logging_start entries')
169
170    log.info('logging_start entries at:\n%s',
171             '\n'.join([items[ix][0] for ix in starts]))
172
173    current_start = (
174        # first start after since
175        [start for (start, ix) in enumerate(starts)
176         if items[ix][0] >= since][0]
177        if since
178        else -1)
179
180    current = (items[starts[current_start]:]
181               if current_start == -1
182               else items[starts[current_start]:starts[current_start + 1]])
183
184    log.info('%d current items from:\n%s\nto\n%s',
185             len(current), current[0], current[-1])
186
187    return current
188
189
190TEST_DATA = pkg.resource_string(__name__, 'etl_log_ex.txt')
191
192
193def _test_stream():
194    from StringIO import StringIO
195    return StringIO(TEST_DATA)
196
197
198if __name__ == '__main__':
199    def _init_logging(level=logging.INFO):
200        logging.basicConfig(level=level)
201
202    def _initial_caps():
203        from sys import argv, stdout
204
205        def open_arg(path):
206            if path in argv:
207                return open(path)
208
209        return dict(argv=argv[:], stdout=stdout,
210                    open_arg=open_arg)
211
212    _init_logging()
213    main(**_initial_caps())
Note: See TracBrowser for help on using the repository browser.