Note: We no longer publish the latest version of our code here. We primarily use a kumc-bmi github organization. The heron ETL repository, in particular, is not public. Peers in the informatics community should see MultiSiteDev for details on requesting access.

source: heron_load/structured_logging.py @ 0:42ad7288920a

heron-michigan tip
Last change on this file since 0:42ad7288920a was 0:42ad7288920a, checked in by Matt Hoag <mhoag@…>, 6 years ago

Merge with demo_concepts_3800

File size: 10.1 KB
Line 
1r'''structured_logging -- Nested events, CSV support
2
3Preface: Mock Capabilities
4==========================
5
6>>> timer = make_mock_timer()
7
8
9Connecting NestedEvents and CSVFormatter with python logging
10============================================================
11
12Suppose we want to log some ETL events which naturally nest.
13
14First, add a handler to a logger and set its level to DEBUG to see
15event starts as well as their outcomes:
16
17>>> import logging, sys
18>>> etl_log = logging.getLogger('ETL')
19>>> detail = logging.StreamHandler(sys.stdout)
20>>> etl_log.addHandler(detail)
21>>> etl_log.setLevel(logging.DEBUG)
22
23
24timer_kludge
25************
26
27The timer_kludge is a work-around for the fact that the python
28logging API uses global references (ambient authority) rather than
29explicit authority to get to the real clock:
30
31>>> detail.setFormatter(CSVFormatter(timer_kludge=timer))
32
33Passing `logging.getLogger` supplies NestedEvents with access to the
34logger above and all its descendants:
35
36>>> events = NestedEvents('ETL', logging.getLogger, timer)
37
38
39Using NestedEvents with CSVFormatter
40====================================
41
42Now `NestedEvents.event()` takes the same arguments as
43`logging.debug()` (including custom keywords) plus:
44  - name (default: '_eventNNN') for the subevent logger
45  - skip (default: False) to WARN about exceptions and continue
46
47For each event, the start is logged at level DEBUG, and the completion
48is logged, with duration, at INFO for success, ERROR for fatal exception, or
49WARN for skipped exception.
50
51Note the use of custom keyword argument (`sql`), the event
52sequence numbers, and reporting event details:
53
54>>> with events.event('run %s', 'script_x', name='script_x') as script:
55...     sql = '\nselect a, b, c, d, e from t1, t2'
56...     with script.event('SQL: %s...', sql[:10].strip(), sql=sql) as s:
57...         s.report(rows=10)
58...     sql = 'drop table X'
59...     with script.event('SQL: %s', sql, skip=True, sql=sql) as s:
60...         raise EventFailure('SQL failed', offset=12, remainder='X')
61... # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
62time,usec,level,log_path_name,message,dur,skip,parent,seqno,detail
632000-01-01 18:30:01,DEBUG,ETL.script_x,run script_x,,"
64REQ",0,1
652000-01-01 18:30:06,DEBUG,ETL.script_x._event2,"SQL: select a,...",,"
66REQ",1,2,sql,"
67select a, b, c, d, e from t1, t2"
682000-01-01 18:30:10,INFO,ETL.script_x._event2,"SQL: select a,...",0:00:12,"
69REQ",1,2,rows,10,sql,"
70select a, b, c, d, e from t1, t2"
712000-01-01 18:30:28,DEBUG,ETL.script_x._event3,SQL: drop table X,,"
72OPT",1,3,sql,drop table X
732000-01-01 18:30:36,WARNING,ETL.script_x._event3,SQL failed,0:00:24,"
74OPT",1,3,offset,12,remainder,X,sql,drop table X,exc_info,"Traceback
75 (most recent call last):
76  ...
77EventFailure: SQL failed"
78time,usec,level,log_path_name,message,dur,skip,parent,seqno,detail
792000-01-01 18:30:55,INFO,ETL.script_x,run script_x,0:01:06,"
80REQ",0,1
81
82>>> _ = timer(reset=True)
83>>> try:
84...     with events.event('part %d', 2, name='part') as e3:
85...         with e3.event('store', shelf='A12') as make:
86...             raise ValueError('cannot find it.')
87... except:
88...     pass
89... # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
902000-01-01 18:30:01,DEBUG,ETL.part,part 2,,"
91REQ",0,4
922000-01-01 18:30:06,DEBUG,ETL.part._event5,store,,"
93REQ",4,5,shelf,A12
942000-01-01 18:30:10,ERROR,ETL.part._event5,cannot find it.,0:00:12,"
95REQ",4,5,shelf,A12,exc_info,"Traceback (most recent call last):
96  ...
97ValueError: cannot find it."
982000-01-01 18:30:21,ERROR,ETL.part,cannot find it.,0:00:28,"
99REQ",0,4,exc_info,"Traceback (most recent call last):
100  ...
101ValueError: cannot find it."
102
103>>> with events.event('part %d', 1, name='part') as e2:
104...     with e2.event('store', shelf='A12') as make:
105...         raise ValueError('cannot find it.')
106Traceback (most recent call last):
107  ...
108ValueError: cannot find it.
109
110>>> etl_log.removeHandler(detail)
111
112
113
114'''
115
116from contextlib import contextmanager
117from datetime import timedelta
118from logging import StreamHandler, Filter, Formatter, ERROR, WARN
119from time import mktime
120
121
122class TimerKludge(object):
123    def __init__(self, fs=None, dfs=None, timer_kludge=None):
124        '''
125        :param fs: format spec (not used)
126        :param dfs: date format spec (not used)
127        :param timer_kludge: mock timer for testability
128        '''
129        self._timer = timer_kludge
130        if timer_kludge:
131            from time import strftime, gmtime
132
133            def testTime(record):
134                return strftime('%Y-%m-%d %H:%M:%S', gmtime(timer_kludge()))
135            self.formatTime = testTime
136
137    def _duration(self, record):
138        if not hasattr(record, 'event_start'):
139            return None
140
141        now = self._timer() if self._timer else record.created
142        return timedelta(seconds=now - record.event_start)
143
144
145class NestedEvents(object):
146    def __init__(self, name, getLogger, timer,
147                 counter=None, stack=None):
148        self._getLogger = getLogger
149        self._log = getLogger(name)
150        self._timer = timer
151        self._counter = counter or [0]
152        self._stack = stack or [(name, 0)]
153        self._results = {}
154
155    def subLog(self, name=None):
156        counter = self._counter
157        counter[0] += 1
158        name = name or '_event%d' % counter[0]
159        stack = self._stack
160        stack.append((name, counter[0]))
161        # TODO: doctest for fixing path nesting
162        sub = NestedEvents('.'.join([n for (n, seq) in stack]),
163                           self._getLogger, self._timer,
164                           counter, stack)
165        return sub, sub._log, counter[0], stack[-2][1]
166
167    @contextmanager
168    def event(self, msg, *args, **kwargs):
169        name = kwargs.pop('name', None)
170        skip = kwargs.pop('skip', False)
171        sub, log, seqno, parent = self.subLog(name)
172        start = self._timer()
173        extra = dict(seqno=seqno, parent=parent, skip=skip, event=kwargs)
174        log.debug(msg, *args, extra=extra)
175
176        def log_problem(ex, detail=None):
177            if detail:
178                extra['event'].update(detail)
179            log.log(WARN if skip else ERROR,
180                    '%s', ex,
181                    exc_info=ex, extra=dict(extra, event_start=start))
182
183        try:
184            yield sub
185        except EventFailure as failure:
186            log_problem(failure, failure.detail)
187            if not skip:
188                raise
189        except Exception as ex:
190            log_problem(ex)
191            if not skip:
192                raise
193        else:
194            extra['event'].update(sub._results)
195            log.info(msg, *args,
196                     extra=dict(extra, event_start=start))
197        finally:
198            self._stack.pop()
199
200    def report(self, **results):
201        self._results.update(results)
202
203    def log(self, name, level, message, *args, **event):
204        _, subLog, seqno, parent = self.subLog(name=name)
205
206        subLog.log(level, message, *args,
207                   extra=dict(seqno=seqno, parent=parent,
208                              skip=False, event=event))
209        self._stack.pop()
210
211
212class EventFailure(Exception):
213    def __init__(self, message, **kwargs):
214        self.message = message
215        self.detail = kwargs
216
217    def __str__(self):
218        return self.message
219
220
221class CSVFormatter(TimerKludge, Formatter):
222    r'''CSV formatter for use with NestedEvents and the python logging API.
223    '''
224
225    @classmethod
226    def header(cls):
227        return ('time,usec,level,log_path_name,message,dur,'
228                'skip,parent,seqno,detail\n')
229
230    def format(self, record):
231        # TODO: consider using python's record.filename, lineno
232        # rather than record.event['script'], line
233        exc_cols = (['exc_info', self.formatException(record.exc_info)]
234                    if record.exc_info else [])
235
236        cols = [self.formatTime(record),
237                record.levelname,
238                record.name,
239                record.getMessage(),
240                str(self._duration(record) or ''),
241                '\nOPT' if record.skip else '\nREQ',
242                str(record.parent),
243                str(record.seqno)] + self.event_cols(record.event) + exc_cols
244
245        # For compatibility with spreadsheets, don't quote time col
246        # so ,msec part goes in a separate column.
247        txt = ','.join(cols[:1] + [tval(txt) for txt in cols[1:]])
248        return (self.header() if record.seqno <= 1 else '') + txt
249
250    def event_cols(self, event):
251        '''Default implementation formats all columns; subclasses may edit.
252        '''
253        return [txt
254                for (k, v) in sorted(event.items())
255                for txt in [k, str(v)]]
256
257
258def tval(s):
259    '''Quote a string value for use in CSV
260    '''
261    return ('"' + s.replace('"', '""') + '"'
262            if '"' in s or ',' in s or '\n' in s else s)
263
264
265class BigStepFilter(Filter):
266    def filter(self, rec):
267        e = rec.event
268        ok = ('task' in e
269              or (e.get('script', '').endswith('.sql')
270                  and 'code' not in e and 'line' not in e))
271        return ok
272
273
274class BigStepHandler(StreamHandler):
275    def __init__(self, *args, **kwargs):
276        StreamHandler.__init__(self, *args, **kwargs)
277        self.addFilter(BigStepFilter())
278
279
280def make_mock_timer():
281    c = make_mock_clock()
282
283    def mock_timer(reset=False):
284        return mktime(c(reset).timetuple())
285    return mock_timer
286
287
288def make_mock_clock():
289    from datetime import datetime, timedelta
290    t0 = datetime(2000, 1, 1, 12, 30, 0)
291    tcell = [t0, 1]
292
293    def mock_clock(reset=False):
294        t, delta = tcell
295        if reset:
296            tcell[:] = [t0, 1]
297        else:
298            tcell[:] = [t + timedelta(seconds=delta), delta + 1]
299        return t
300    return mock_clock
301
302
303def total_seconds(td):
304    '''td.total_seconds is new in 2.7 and our deployment platform is 2.6
305
306    >>> from datetime import timedelta
307    >>> total_seconds(timedelta(0, 1, 500000))
308    1.5
309    '''
310    return (td.seconds + td.days * 24 * 3600) + (td.microseconds / 10.0 ** 6)
311
312
313def HERE():
314    '''Get filename, function, and line number.
315
316    A little iffy w.r.t. ocap, but not a source of non-determinism.
317
318    ack: `junjanes Jul 25 '11`__
319    __ http://stackoverflow.com/a/6811020/846824
320    '''
321    import inspect
322    caller = inspect.stack()[1]
323    info = inspect.getframeinfo(caller[0])
324    return info
Note: See TracBrowser for help on using the repository browser.