Note: We no longer publish the latest version of our code here. We primarily use a kumc-bmi github organization. The heron ETL repository, in particular, is not public. Peers in the informatics community should see MultiSiteDev for details on requesting access.

source: heron_load/query_performance.py @ 0:42ad7288920a

heron-michigan tip
Last change on this file since 0:42ad7288920a was 0:42ad7288920a, checked in by Matt Hoag <mhoag@…>, 6 years ago

Merge with demo_concepts_3800

File size: 18.2 KB
Line 
1''' query_performance.py - Measure query times
2'''
3from logging import INFO
4import argparse
5import json
6import re
7import sqlite3
8
9import db_util
10import ocap_file
11from structured_logging import NestedEvents, make_mock_timer
12
13
14def main(
15        # python API caps
16        argv, now,
17        # derived caps
18        configure_db, argv_rd, plans_ed, file_as_dir, aux_rd,
19        # constants
20        logger_name='heron_performance',
21        test_iterations=2):
22
23    options = parse_args(argv[:])
24
25    open_arg = argv_rd(argv).subRdFile
26    query_names = [line.strip()
27                   for line in open_arg(options.query_list_file).inChannel()
28                   if line.strip()]
29    query_rds = file_as_dir(open_arg(options.query_list_file)).subRdFiles()
30
31    query_list = [Query(name, q_rd, aux_rd)
32                  for (name, q_rd) in zip(query_names, query_rds)]
33
34    description = options.test_description
35    if not description:
36        description = 'Test run at %s' % (str(now()))
37
38    db, host_port, events, sid = configure_db(options.config_file,
39                                              options.db_section, logger_name)
40    pt = PerformanceTest(events, now, db, description,
41                         schema_type_to_name(options.db_section)[0])
42
43    if options.plans_output:
44        pf_write = plans_ed().outChannel().write
45    else:
46        pf_write = lambda data: None
47
48    for q in query_list:
49        for i in range(0, test_iterations):
50            events.log('TestQueryStart', INFO, q.name)
51
52            # For compatibility with compare_execution_plans, write
53            # start/end sentinel values to the plan file.
54            pf_write('\n**** Starting Test: %s ****\n' % q.name)
55
56            plan, runsec = pt.runTestQuery(schema_swap(q.sql(),
57                                                       options.db_section),
58                                           q.name, q.description(),
59                                           planOnly=options.plan_only)
60            pf_write(plan)
61            pf_write('\n**** Total run time %d sec ****\n' % runsec)
62            events.log('TestQueryEnd', INFO, q.name)
63
64
65def schema_type_to_name(schema='deid'):
66    ''' Translate id/deid into nightherondata/blueherondata.
67    Return schema and "other" schema
68
69    >>> print schema_type_to_name('deid')
70    ('blueherondata', 'nightherondata')
71    >>> print schema_type_to_name('id')
72    ('nightherondata', 'blueherondata')
73    '''
74    # Only allow known schemas
75    schemas = dict(deid='blueherondata',
76                   id='nightherondata')
77    this_schema = schemas[schema]
78    return this_schema, (set(schemas.values()) - set([this_schema])).pop()
79
80
81def schema_swap(sql, schema='deid'):
82    ''' Replace blueherondata with nightherondata (or vice versa)
83    ignoring case when replacing but preserving case of everything else.
84
85    Don't touch most things that happen to be schema names but aren't
86    (since they're not followed by a period).
87    >>> print schema_swap('blueherondata', schema='id')
88    blueherondata
89    >>> print schema_swap('my name is blueherondata!', schema='id')
90    my name is blueherondata!
91
92    Replace schema names regardless of case.
93    >>> print schema_swap('select * from BlueHeronData.Observation_Fact;',
94    ...                   schema='id')
95    select * from nightherondata.Observation_Fact;
96
97    Try replacement on a real performance query.
98    >>> from pkg_resources import resource_string
99    >>> tsql = resource_string(__name__,
100    ...                        'test_queries/2011-04-11_1652_dzhu.sql')
101    >>> 'blueherondata.' in tsql.lower()
102    True
103    >>> 'nightherondata.' in tsql.lower()
104    False
105    >>> ret = schema_swap(tsql, schema='id')
106    >>> 'blueherondata.' in ret.lower()
107    False
108    >>> 'nightherondata.' in ret.lower()
109    True
110
111    Make sure we don't change the schema if it's already what we want.
112    >>> schema_swap(tsql, schema='deid') == tsql
113    True
114
115    Make sure we can go from id to deid also.
116    >>> print schema_swap('select * from niGHThERONdata.Observation_Fact;',
117    ...                   schema='deid')
118    select * from blueherondata.Observation_Fact;
119
120    Verify we throw an exception if we get a schema type we don't expect.
121    >>> schema_swap('should throw exception', 'invalid_schema_type')
122    Traceback (most recent call last):
123    ...
124    KeyError: 'invalid_schema_type'
125    '''
126    to_s, from_s = [s + '.' for s in schema_type_to_name(schema)]
127
128    i = 0
129    new_sql = ''
130    while True and i < len(sql):
131        idx = str(sql).lower().find(from_s.lower(), i)
132        if idx == -1:
133            break
134        new_sql += sql[i:idx]
135        new_sql += to_s
136        i = idx + len(from_s)
137    if i < len(sql):
138        new_sql += sql[i:]
139    return new_sql
140
141
142class MockDBI(db_util.DryDBI):
143    r'''
144    >>> import logging
145    >>> m = MockDBI.tx_db(logging.getLogger)
146    >>> with m.transaction('sum') as q:
147    ...     q.execute('select 1+1')
148    ...     q.fetchall()
149    [(2,)]
150    >>> db_util.run_script(m, 'HERE',
151    ... """whenever sqlerror continue;
152    ... create table t (n NUMBER);
153    ... whenever sqlerror exit;
154    ... insert into t values(1)""")
155    >>> with m.transaction('dump') as q:
156    ...     q.execute('select * from t')
157    ...     q.fetchall()
158    [(1,)]
159    >>> MockDBI._dialect('select sysdate, col from sometable')
160    "select datetime('now'), col from sometable"
161    >>> MockDBI._dialect('delete from plan_table')
162    ''
163    >>> MockDBI._dialect('SELECT PLAN_TABLE_OUTPUT\n '
164    ... 'FROM TABLE(DBMS_XPLAN.DISPLAY())')
165    ''
166    >>> MockDBI._dialect('select 1+1 from dual')
167    'select 1+1'
168    >>> MockDBI._dialect('explain plan for select * from t')
169    'explain query plan select * from t'
170   '''
171
172    # keep the same sqlite DB accross connections.
173    _conn = sqlite3.connect(':memory:')
174
175    def __init__(self, section='mock'):
176        self._ans = []
177
178    # _cx methods
179    def connect(self, user, password, dsn):
180        return self
181
182    def cursor(self):
183        return self
184
185    # cursor methods
186    def execute(self, sql, params):
187        sql = self._dialect(sql)
188        if len(sql) != 0:
189            cursor = self._conn.cursor()
190            cursor.execute(sql, params)
191            self._ans = cursor.fetchall()
192
193    def executemany(self, sql, params):
194        sql = self._dialect(sql)
195        cursor = self._conn.cursor()
196        cursor.executemany(sql, params)
197        self._ans = cursor.fetchall()
198
199    def fetchone(self):
200        return self._ans[0] if self._ans else None
201
202    def fetchall(self):
203        return self._ans
204
205    def commit(self):
206        self._conn.commit()
207
208    def rollback(self):
209        self._conn.rollback()
210
211    @classmethod
212    def _dialect(cls, sql):
213        sql = sql.strip().lower()
214        sql = sql.replace('sysdate', "datetime('now')")
215        sql = re.sub('delete\s+from\s+plan_table', '', sql)
216        sql = re.sub('select\s+plan_table_output\s+'
217                     'from\s+table\(dbms_xplan.display\(\)\)',
218                     '', sql)
219        sql = re.sub('explain\s+plan\s+for\s+(?P<query>.*)',
220                     'explain query plan \g<query>', sql)
221
222        sql = re.sub('from\s+dual', '', sql)
223        return sql.strip()
224
225    @classmethod
226    def tx_db(cls, getLogger, section='mock'):
227        from logging import getLogger  # cheating slightly
228
229        timer = make_mock_timer()
230        events = NestedEvents('mock', getLogger, timer)
231
232        key = lambda section: ['sqlite3', 'nobody', 'sekret']
233        connect4 = db_util.make_ora_connect(cls, events)
234
235        serv = db_util.DBServer(section, ('localhost', 0), events,
236                                key=key,
237                                connect=connect4)
238
239        return serv.database('mock')
240
241
242class PerformanceTest(object):
243    '''
244    >>> import logging
245    >>> from datetime import datetime
246    >>> timer = make_mock_timer()
247    >>> events = NestedEvents(
248    ...     'test', logging.getLogger, timer)
249    >>> m = MockDBI.tx_db(logging.getLogger)
250    >>> p = PerformanceTest(events, datetime.now, m, 'test')
251
252    Test hint removal
253    >>> p._removeHints('select /*+ h*/ * from table; select '
254    ... # doctest: +NORMALIZE_WHITESPACE
255    ... '/*+ h2 */ * from another_table; select /* just a comment */ '
256    ... '* from yet_another_table')
257    'select * from table; select  * from another_table; select /*\
258     just a comment */ * from yet_another_table'
259
260    Get the plan for a simple query
261    >>> p._statementPlan('select * from timingTests') is not None
262    True
263
264    Initialize a test entry
265    >>> with m.transaction('exec_plan') as q:
266    ...    q.execute("attach database ':memory:' as BlueHeronData")
267    ...    q.execute('create table BlueHeronData.QUERY_GLOBAL_TEMP (n number)')
268    >>> p._initTestEntry(queryName='testName', queryDescription='testDesc')
269    >>> with m.transaction('test_out') as q:
270    ...    q.execute('select * from timingTests')
271    ...    q.fetchall()
272    ... # doctest: +ELLIPSIS
273    [(1, ..., None, None, u'testName', u'testDesc', u'test')]
274
275    Updating test results - should get '0' for the timing. Note that
276    subtracting datetime types results in the number of days (apparently,
277    not fractional days unless using juliandays()).  The main point of
278    the test is to make sure at least one of the columns (duration) is updated.
279
280    >>> p._updateTestEntry()
281    >>> with m.transaction('test_out') as q:
282    ...    q.execute('select * from timingTests')
283    ...    q.fetchall()
284    ... # doctest: +ELLIPSIS
285    [(1, ..., 0, u'testName', u'testDesc', u'test')]
286
287    Given the plan query result, return plan + newPlan
288    >>> np = [('Plan hash value: 2020662085',), (' ',),
289    ... ('-----------------------------------------------'
290    ... '------------------------------------------------'
291    ... '---------',), ('| Id  | Operation               '
292    ... '| Name                 | Rows  | Bytes | Cost (%CPU)'
293    ... ' | Time     |',), ('-----------------------------'
294    ... ' -----------------------------------------------'
295    ... '----------------------------',)]
296    >>> p.strAppendOraclePlan('', np)
297    ... # doctest: +NORMALIZE_WHITESPACE
298    ... # doctest: +ELLIPSIS
299    'Plan hash value: 2020662085\\n\
300 \\n------------------------------------------------------\
301--------------------------------------------------\\n\
302| Id  | Operation ...'
303    '''
304    def __init__(self, events, now, db, description='',
305                 schema='blueherondata'):
306        self._db = db
307        self._description = description
308        self._events = events
309        self._now = now
310        self._initTestResults()
311        self._schema = schema
312
313    def strAppendOraclePlan(self, plan, newPlan):
314        return plan + '\n'.join(l[0] for l in newPlan)
315
316    def runTestQuery(self, sql, queryName,
317                     queryDescription, planOnly=False):
318        '''
319        Run query and record performance in timingtests table and
320        return the execution plan. If planOnly is True, then don't
321        execute the query, just return the execution plan.
322        '''
323        sql = self._removeHints(sql)
324
325        # Execution plan must be done per statement
326        plan = ''
327        for lnum, _c, statement in db_util.iter_statement(sql):
328            plan = self.strAppendOraclePlan(plan,
329                                            self._statementPlan(statement))
330
331        start = self._now()
332        if not planOnly:
333            self._initTestEntry(queryName, queryDescription)
334            db_util.run_script(self._db, fn=queryName, script=sql)
335            self._updateTestEntry()
336        delta = self._now() - start
337
338        return plan, (delta.days * 86400) + delta.seconds
339
340    def _initTestEntry(self, queryName='', queryDescription=''):
341        db_util.run_script(self._db, 'init_test_entry', '''
342                           DELETE FROM &&schema_name.QUERY_GLOBAL_TEMP;
343                           insert into timingTests ( testId, startTime,
344                           endTime, testDuration, queryName,
345                           testRunDesc, queryDesc )
346                           select nv.myNextVal testId, sysdate startTime,
347                           null endTime, null testDuration,
348                           :queryname queryName, :description testRunDesc,
349                           :querydesc queryDesc
350                           from
351                           (select case when mv is null then 1
352                           else mv + 1 end as myNextVal from(
353                           select max(tt.testId) mv
354                           from timingTests tt)) nv;''' %
355                           dict(schema='nightherondata'),
356                           params=dict(queryname=queryName,
357                                       description=self._description,
358                                       querydesc=queryDescription),
359                           variables=dict(schema_name=self._schema),
360                           ignore_error=False)
361
362    def _updateTestEntry(self):
363        db_util.run_script(self._db, 'update_test_entry', '''
364                           update timingTests set endTime = sysdate
365                           where testId=(select max(testId)
366                           from timingTests);
367                           update timingTests
368                           set testDuration = (
369                           select (endtime-starttime)*24*60*60
370                           from timingTests
371                           where testId=(select max(testId)
372                           from timingTests))
373                           where testId=(select max(testId)
374                           from timingTests);''')
375
376    def _initTestResults(self):
377        sql = '''
378        CREATE TABLE timingTests(
379        testId NUMBER PRIMARY KEY,
380        startTime DATE,
381        endTime DATE,
382        testDuration NUMBER,
383        queryName VARCHAR(255),
384        queryDesc VARCHAR(1024),
385        testRunDesc VARCHAR(1024)
386        );
387        '''
388        db_util.run_script(self._db, 'initialize_timing_table', sql,
389                           ignore_error=True)
390
391    def _statementPlan(self, sql_statement):
392        '''
393        Return the execution plan for a single statement
394        '''
395        with self._db.transaction('exec_plan') as q:
396            q.execute('delete from plan_table')
397            q.execute('explain plan for\n%s' % sql_statement)
398            q.execute('SELECT PLAN_TABLE_OUTPUT '
399                      'FROM TABLE(DBMS_XPLAN.DISPLAY())')
400            ans = q.fetchall()
401        return ans
402
403    def _removeHints(self, sqlData):
404        return re.sub(r'(?P<hint>/\*\+.*?\*/)', '', sqlData)
405
406
407class Query(object):
408    def __init__(self, name, rd, aux_rd):
409        self.name = name
410        self._rd = rd
411        self._aux_rd = aux_rd
412
413    def sql(self):
414        return self._rd.getBytes()
415
416    def description(self):
417        jsonFile = self._aux_rd(self._rd, '.json')
418        try:
419            stream = jsonFile.inChannel()
420        except IOError:
421            self._description = 'No JSON file found'
422        else:
423            info = json.load(stream)
424            self._description = info['description']
425            stream.close()
426
427        return self._description
428
429
430def parse_args(argv):
431    # Positional Arguments
432    p = argparse.ArgumentParser(description=
433                                'Run performance tests queries')
434    p.add_argument('config_file', help='Configuration file')
435    p.add_argument('query_list_file', help='List of queries to run')
436    p.add_argument('--plan_only', dest='plan_only',
437                   action='store_true', default=False,
438                   help='Execution plan only (default FALSE)')
439    p.add_argument('--plans_output', dest='plans_output',
440                   metavar='PATH',
441                   help='Write execution plans to file (optional)')
442    p.add_argument('--test_description',
443                   dest='test_description', metavar='DESCRIPTION',
444                   help='Description for this test run (optional)')
445    p.add_argument('--db_section', dest='db_section', metavar='DB_SECTION',
446                   default='deid',
447                   help='Database section from ini file (deid=default)')
448
449    options = p.parse_args()
450    return options
451
452
453def make_argv_rd(argv, os_path, os_listdir, openf):
454    def argv_rd(paths):
455        bad = [p for p in paths if p not in argv]
456        if bad:
457            raise IOError('not authorized: %s' % bad)
458        return ocap_file.ListReadable(paths, os_path, os_listdir, openf)
459
460    return argv_rd
461
462
463def make_file_as_dir(os_path, os_listdir, openf):
464    '''Authority to read from any file named in some file.
465    '''
466    def file_as_dir(rd):
467        lines = rd.inChannel()
468        paths = [os_path.join(os_path.dirname(rd.fullPath()), line.strip())
469                 for line in lines if line.strip()]
470        return ocap_file.ListReadable(paths, os_path, os_listdir, openf)
471    return file_as_dir
472
473
474def make_aux_rd(os_path, os_listdir, openf):
475    '''Authority to read a nearby file by extension.
476    '''
477    def aux_rd(rd, ext):
478        # TODO: make sure ext looks like .xyz
479        base, _ = os_path.splitext(rd.fullPath())
480        return ocap_file.Readable(base + ext, os_path, os_listdir, openf)
481
482    return aux_rd
483
484
485def make_plans_ed(argv, os, openf):
486    '''Allow writing to plans_output arg.
487    '''
488    pfPath = parse_args(argv).plans_output
489
490    def plans_ed():
491        return ocap_file.Editable(pfPath, os, openf)
492
493    return plans_ed
494
495
496if __name__ == '__main__':
497    def _initial_caps(qp_section='query_performance'):
498        from datetime import date, datetime
499        from getpass import getuser
500        from logging import getLogger
501        from logging.config import fileConfig
502        import os
503        from sys import argv
504        from time import time
505
506        argv_rd = make_argv_rd(argv, os.path, os.listdir, open)
507        argv_dir = argv_rd(argv)
508        argv_open = lambda n: argv_dir.subRdFile(n).inChannel()
509        plans_ed = make_plans_ed(argv, os, open)
510
511        def real_cx():
512            import cx_Oracle
513            return cx_Oracle
514
515        configure_db = db_util.make_configure_db(
516            argv_open, fileConfig,
517            calendar=date, timer=time,
518            getLogger=getLogger, getuser=getuser, environ=os.environ,
519            dbi=real_cx)
520
521        return dict(argv=argv[:],
522                    argv_rd=argv_rd, plans_ed=plans_ed,
523                    aux_rd=make_aux_rd(os.path, os.listdir, open),
524                    file_as_dir=make_file_as_dir(os.path, os.listdir, open),
525                    configure_db=configure_db,
526                    now=datetime.now)
527
528    main(**_initial_caps())
Note: See TracBrowser for help on using the repository browser.