Note: We no longer publish the latest version of our code here. We primarily use a kumc-bmi github organization. The heron ETL repository, in particular, is not public. Peers in the informatics community should see MultiSiteDev for details on requesting access.

source: heron_load/devdoc/cltrack.py @ 0:42ad7288920a

heron-michigan tip
Last change on this file since 0:42ad7288920a was 0:42ad7288920a, checked in by Matt Hoag <mhoag@…>, 6 years ago

Merge with demo_concepts_3800

File size: 7.0 KB
Line 
1'''cltrack -- assist with clarity upgrade tracking documentation
2
3Usage:
4  cltrack staging [options] --srcdir=DIR
5  cltrack updates [options] XLS TK
6  cltrack loaddeps [options] ETLDEPS
7  cltrack findcols [options] --srcdir=DIR TK
8
9staging:   parse PAR for staging tables and put in `staging` table of DB
10updates:   export Summary sheet of XLS to table TK in DB
11loaddeps:  load results of ETL dependency analysis (see datadeps.py)
12findcols:  join TK updates with DEPS static analysis to find tables used
13           in SQL source; for each such table, find filename/line occurrences
14           of each col from TK.
15
16Options:
17  --srcdir=DIR   HERON ETL source code directory
18  --db=FILE      sqlite database filename [default: devdoc.db]
19  --deps=NAME    name of table holding etl-deps results [default: etl_deps]
20  --colpos=NAME  name of table to store column hits [default: colpos]
21  --debug        set logging level to DEBUG
22
23'''
24
25from collections import namedtuple
26from itertools import dropwhile
27import csv
28import logging
29import re
30
31from docopt import docopt
32
33from lalib import maker, dbmgr, Rd
34
35log = logging.getLogger(__name__)
36
37
38def main(stdout, cli_access):
39    cli = cli_access()
40    dbtrx = cli.dbtrx('--db')
41
42    if cli.staging:
43        hs = HeronSource(cli.rd('--srcdir'))
44        names = stagingTables(hs.staging_par.open().read())
45        dest = ExportTable(dbtrx, 'staging', ['tab'])
46        dest.recreate()
47        dest.insert(rows=[[tab] for tab in names])
48
49    elif cli.updates:
50        ct = ClarityTracking(cli.xlrd('XLS'))
51        updates = [ct.record(row) for row in ct]
52        try:
53            example = updates[0]
54        except IndexError:
55            raise SystemExit('no updates found')
56
57        cols = example.__class__._fields
58        log.info('found %d updates', len(updates))
59
60        tk = ExportTable(dbtrx, cli.TK, cols)
61        tk.recreate()
62        tk.insert(updates)
63
64    elif cli.loaddeps:
65        header, rows = cli.csv_data('ETLDEPS')
66        deps = ExportTable(dbtrx, cli.deps, header)
67        deps.recreate()
68        deps.insert(rows)
69
70    elif cli.findcols:
71        cols = trackingCols(dbtrx, cli.TK, cli.deps)
72        hs = HeronSource(cli.rd('--srcdir'))
73        hs.findcols(dbtrx, cols, cli.colpos)
74
75
76@maker
77def CLI(argv, openf, listdir, open_workbook, connect):
78    options = docopt(__doc__, argv=argv[1:])
79
80    log.debug('options: %s', options)
81
82    def rd(_, n):
83        return Rd(options[n], openf, listdir)
84
85    def csv_data(_, n):
86        with rd(_, n).open() as infp:
87            reader = csv.reader(infp)
88            header = reader.next()
89            rows = list(reader)
90            return header, rows
91
92    def xlrd(_, n):
93        # Log to show what's taking so long
94        log.info('opening workbook: %s', options[n])
95        return open_workbook(options[n])
96
97    def dbtrx(_, n):
98        # log (default) database name
99        log.info('using database: %s', options[n])
100        return dbmgr(lambda: connect(options[n]))
101
102    return [rd, csv_data, xlrd, dbtrx], dict(
103        (k.replace('--', ''), v)
104        for (k, v) in options.iteritems())
105
106
107@maker
108def ExportTable(dbtrx, name, cols):
109    create_stmt, insert_stmt = sql_for(name, cols)
110
111    def recreate(_):
112        log.info('(re-)creating %s: %s',
113                 name, create_stmt)
114        with dbtrx() as ddl:
115            ddl.execute('drop table if exists %s' % name)
116            ddl.execute(create_stmt)
117
118    def insert(_, rows):
119        with dbtrx() as dml:
120            dml.executemany(insert_stmt, rows)
121        log.info('inserted %d rows into %s', len(rows), name)
122
123    return [recreate, insert], dict(name=name)
124
125
126def sql_for(table, cols):
127    '''
128    >>> c, i = sql_for('t1', ['cx', 'cy', 'cz'])
129    >>> print c
130    create table t1 ( cx, cy, cz )
131    >>> print i
132    insert into t1 (cx, cy, cz) values (?, ?, ?)
133    '''
134    create_stmt = 'create table %s ( %s )' % (
135        table,
136        ', '.join(cols))
137    insert_stmt = 'insert into %s (%s) values (%s)' % (
138        table,
139        ', '.join(cols),
140        ', '.join(['?'] * len(cols)))
141    return create_stmt, insert_stmt
142
143
144def stagingTables(data):
145    '''parse list of tables used in staging
146    '''
147    # TABLES = (
148    # 'clarity_medication',
149    # ...
150    tlines = dropwhile(lambda l: 'TABLES =' not in l, data.split('\n'))
151    tlines.next()  # skip TABLES =
152
153    # Note:  We'll go with an upper-case convention
154    tables = [re.sub('[^A-Za-z0-9_]', '', expr.strip()).upper()
155              for expr in tlines if expr]
156
157    log.info('loadTables: Found %d tables', len(tables))
158    return tables
159
160
161@maker
162def ClarityTracking(wkb,
163                    summary_sheet='Summary'):
164    sheet = wkb.sheet_by_name(summary_sheet)
165
166    # TODO: use \W or the like
167    def sqlname(txt):
168        txt = txt.replace('/', '_').replace(' ', '_')
169        return ('tab' if txt == 'Table' else
170                'col' if txt == 'Column' else
171                txt)
172
173    R = namedtuple('ColumnTracking',
174                   [sqlname(cell.value) for cell in sheet.row(0)])
175
176    def __iter__(_):
177        return (sheet.row(ix) for ix in range(1, sheet.nrows))
178
179    def record(_, row):
180        return R(**dict((th, td.value or None)
181                        for (th, td) in zip(R._fields, row)))
182
183    return [filter, __iter__, record], {}
184
185
186def trackingCols(dbtrx, tk_table, code_table,
187                 qtpl="""
188                 select distinct tab, col
189                 from %(tk)s
190                 join %(code)s on %(tk)s.tab = %(code)s.src_table
191                 """):
192    ddl = qtpl % dict(tk=tk_table, code=code_table)
193    log.info('trackingCols query: %s', ddl)
194    with dbtrx() as q:
195        q.execute(ddl)
196        results = q.fetchall()
197    log.info('%d columns from tables used in ETL code', len(results))
198    return results
199
200
201@maker
202def HeronSource(
203        rd,
204        staging_par='heron_staging/Clarity_data_load/clarity_import.par'):
205    def findcols(_, dbtrx, cols, dest_name):
206        dest = ExportTable(dbtrx, dest_name,
207                           ['code_filename', 'code_line', 'tab', 'col'])
208        dest.recreate()
209
210        for etlcode in sorted(rd.iterdir(), key=lambda r: r.name):
211            if not etlcode.suffix == '.sql':
212                continue
213
214            hits = [(etlcode.name, lineno, tab, col)
215                    for (lineno, line) in enumerate(etlcode.open())
216                    for (tab, col) in cols
217                    if tab and col and col.lower() in line.lower()]
218
219            if not hits:
220                continue
221
222            log.info('%d hits for %s', len(hits), etlcode.name)
223            dest.insert(hits)
224
225    return [findcols], dict(staging_par=rd / staging_par)
226
227
228if __name__ == '__main__':
229    def _privileged_main():
230        from __builtin__ import open as openf
231        from sys import argv, stdout
232        from sqlite3 import connect
233        from os import listdir
234
235        from xlrd import open_workbook
236
237        def cli_access():
238            logging.basicConfig(
239                level=logging.DEBUG if '--debug' in argv else logging.INFO)
240            return CLI(argv, openf, listdir, open_workbook, connect)
241
242        main(stdout, cli_access)
243    _privileged_main()
Note: See TracBrowser for help on using the repository browser.