source: heron_load/heron_build.py

heron-michigan
Last change on this file was 0:42ad7288920a, checked in by Matt Hoag <mhoag@…>, 2 years ago

Merge with demo_concepts_3800

File size: 42.4 KB
Line 
1'''heron_build.py -- tasks for building HERON i2b2 repositories
2---------------------------------------------------------------
3
4The :func:`heron_load` task
5  1. transforms data from clinical sources into the identified repository
6  2. copies data to the de-identified repository, modifying as appropriate
7
8Logging
9*******
10
11Output is logged to a file named by date if configuration gives a directory::
12
13  >>> options = _option  # un-hide for testing
14  >>> options.log_dir is None
15  True
16
17Using paver command line conventions, you can do::
18
19  > paver log_dir=~/heron_etl_logs heron_load
20
21
22Managing UPLOAD_STATUS
23**********************
24
25UPLOAD_STATUS records are managed automatically. To add a message, use:
26
27  >>> options.message is ''
28  True
29
30Oracle Database Access
31**********************
32
33See heron-test.ini.example for sample configuration.
34
35  >>> _, flat = flat_options(read_config=config_res(options.config))
36  >>> options.update(flat)
37  >>> _ = (options.id_sid, options.id_host, int(options.id_port), ID_SCHEMA)
38  >>> _ = (options.deid_sid, options.deid_host, int(options.deid_port))
39  >>> DEID_SCHEMA
40  'BLUEHERONDATA'
41
42Usernames and passwords can be supplied various ways;
43see :mod:`db_util` for details.
44
45In production, run these tasks on the identified data server
46so that the data never leaves that machine.  For production
47settings, see `heron-prod1.ini` and `heron-prod2.ini`.
48
49
50:copyright: Copyright 2010-2013 University of Kansas Medical Center
51            part of the `HERON open source codebase`__;
52            see NOTICE file for license details.
53
54__ http://informatics.kumc.edu/work/wiki/HERON
55'''
56
57from ConfigParser import SafeConfigParser
58from contextlib import contextmanager
59from functools import wraps
60import datetime
61import logging
62import pkg_resources as res
63import re
64import types
65
66from paver.easy import task, needs, might_call, no_help
67from paver.tasks import environment as paver_env
68
69
70def _paver_import_work_around():
71    import sys
72    import os
73    sys.path.append(os.path.dirname(__file__))
74_paver_import_work_around()
75
76from db_util import run_script, insert_dml, fold_col_case
77import db_util
78
79log = logging.getLogger(__name__)
80ID_SCHEMA = 'NIGHTHERONDATA'
81DEID_SCHEMA = 'BLUEHERONDATA'
82
83
84def flat_options(expanduser=lambda path: path,  # lambdas just document types
85                 getuser=lambda: 'nobody',
86                 read_config=lambda cp: None):
87    p = SafeConfigParser(dict(home=expanduser('~'),
88                              me=getuser()))
89    read_config(p)
90
91    # section_item for each [section] and item.
92    flat = dict([('%s_%s' % (section, option), value)
93                 for section in p.sections()
94                 # see pythong logging.config API
95                 if not (section.startswith('logger')
96                         or section.startswith('handler')
97                         or section.startswith('formatter'))
98                 for option, value in p.items(section)])
99    return p, flat
100
101
102def config_res(n):
103    '''Make a function to read config from a source code resource
104    so that tests don't depend on current working directory.
105    '''
106    def read_config_res(cp):
107        cp.readfp(res.resource_stream(__name__, n), n)
108    return read_config_res
109
110
111def make_dbs(events,
112             # these lambdas just document types
113             get_hostport=lambda section: 1 / 0,
114             key=lambda section: 1 / 0,
115             ora_connect=lambda host_port, sid, username, password: 1 / 0):
116    id_server = db_util.DBServer(
117        'id', get_hostport('id'), events, key, connect=ora_connect)
118    deid_server = db_util.DBServer(
119        'deid', get_hostport('deid'), events, key, connect=ora_connect)
120    return dict(
121        id_server=id_server,
122        deid_server=deid_server,
123        id_db=id_server.database('id'),
124        deid_db=deid_server.database('deid'))
125
126
127def _kwd_arg_transform(func_to_wrap,
128                       capture_release_list=[],
129                       squelch_args=[]):
130    @wraps(func_to_wrap)
131    def do_capture_eval(*args, **kwds):
132        for closure_key in capture_release_list:
133            closure_value = kwds.pop(closure_key, None)
134            if(closure_value):
135                if(not isinstance(closure_value, types.FunctionType)):
136                    raise TypeError(
137                        'The keyword argument: \'{kwd}\' must reference a '
138                        'function.\nThe type of value \'{val}\' is instead '
139                        '{type}'.format(kwd=closure_key, val=closure_value,
140                                        type=type(closure_value)))
141                import inspect
142                supply_args = inspect.getargspec(closure_value).args
143                func_kwds = dict((s_arg, kwds[s_arg]) for s_arg in supply_args)
144                kwds[closure_key] = closure_value(**func_kwds)
145
146        for arg in squelch_args:
147            kwds.pop(arg)
148
149        return func_to_wrap(*args, **kwds)
150    return do_capture_eval
151
152
153def _test_kwd_arg_transform():
154    '''
155    **For example**, to construct a function `show_item` and decorator
156    (`show_item_dec`) to invoke it:
157
158    .. note:: `show_item` needs an `item` but in in this case it
159              cannot be supplied directly to the decorator.
160
161    At the time when decorator is being constructed, it
162    does not have the capability to take an item from the hat, it is only
163    when the def is invoked that it has this capability becomes available.
164
165    Thus, a closure to grab the item (`grab_item`) is supplied to the decorator
166    (under the variable name `item`) and a capture_release_list (`spec`)
167    identifying the fact that value assigned to 'item' should be captured
168    and released.
169
170    >>> show_item = lambda item: item
171    >>> grab_item = lambda cap, hat: cap['grab_first'](hat)
172    >>> def show_item_dec(func, spec, *dec_args, **dec_kwds):
173    ...     def push(fn):
174    ...         def call(cap={'grab_first': lambda hat: 'Got a ' + hat[0]}):
175    ...             show_item_wrap = _kwd_arg_transform(func,
176    ...                                                 spec, ['cap', 'hat'])
177    ...             return show_item_wrap(cap=cap, *dec_args, **dec_kwds)
178    ...         return call
179    ...     return push
180    ...
181    >>> @show_item_dec(show_item,
182    ...                spec=['item'],
183    ...                item=grab_item,
184    ...                hat=['rabbit', 'bat'])
185    ... def test_the_decorator():
186    ...     pass
187    ...
188    >>> test_the_decorator()
189    'Got a rabbit'
190
191    Be sure to **use function values, not function names**:
192
193    >>> invalid_grab_item = 'invalid function'
194    >>> @show_item_dec(show_item,
195    ...                spec=['item'],
196    ...                item=invalid_grab_item, hat=['rabbit', 'bat'])
197    ... def test_the_invalid_decoration():
198    ...     pass
199    ...
200    >>> test_the_invalid_decoration()
201    Traceback (most recent call last):
202      ...
203    TypeError: The keyword argument: 'item' must reference a function.
204    The type of value 'invalid function' is instead <type 'str'>
205
206    While 3 arguments come in to wrapped version of show_item, after 2
207    of them are **squelched** only one goes into 1 goes into show_item.
208
209    >>> show_item_invalid = lambda cap, item, hat: item
210    >>> @show_item_dec(show_item_invalid,
211    ...                spec=['item'],
212    ...                item=grab_item, hat=['rabbit', 'bat'])
213    ... def test_the_invalid_show_item():
214    ...     pass
215    >>> test_the_invalid_show_item()
216    ... #doctest:+ELLIPSIS
217    Traceback (most recent call last):
218      ...
219    TypeError: <lambda>() takes exactly 3...
220    '''
221    pass
222
223
224def run_scripts(scripts,
225                capture_release_list=[],
226                squelch_args=['options'],
227                *load_args,
228                **load_kwds):
229    '''Decorator to run scripts prior to a task
230
231    At the time the task is decorated, the `db` argument for
232    :func:`db_util.run_script` is not available. So we delay
233    evaluation of `db` using `capture_release_list` and a function
234    that will get it at task invocation time::
235
236      >>> @run_scripts(
237      ...     ('i2b2_star_truncate.sql', 'metadata_init.sql'),
238      ...      capture_release_list=['db'],
239      ...      db=lambda options: options['id_server'].database('clarity'),
240      ...      variables=dict(star='I2B2_DEMO'))
241      ... def test_run_scripts(options):
242      ...     pass
243
244      >>> import logging
245      >>> options = db_util.DryDBI.make_options(logging.getLogger)
246      >>> test_run_scripts(options=options)
247      DryDBI.connect(DryDBI, user1, 6)
248      DryDBI.connect(DryDBI, user1, 6)
249
250    The available **magic parameters**, i.e. those provided by this
251    decorator to the delay functions, are `options` and `fn` (of the
252    script).
253
254    :param scripts: the name of the script(s) to run
255    :type scripts: sequence of `string`
256
257    :param capture_release_list: list of variable names to capture,
258                                 evaluate then release (the result) to
259                                 the wrapped function.
260    :type capture_release_list: sequence `string`
261    :keyword db: database connection (*consider keyword closure*)
262    :type db: required :class:`db_util.DBEngine`
263
264    For keyword arguments `variables`, `params`, `ignore_error`, and
265    `chunk_qty`, see :func:`db_util.run_script`.
266
267    :raises: :class:`TypeError` if looking up a name from
268             `capture_release_list` in (which??) kwargs does not
269             result in a function.
270
271    .. note:: This decorator should always be *after* the `@needs`
272              decorator otherwise the dependency chain will not be
273              respected.
274
275    '''
276    scripts = scripts if not isinstance(scripts, basestring) else (scripts, )
277
278    def make_run_script_task(task):
279        @wraps(task)
280        def do_run_script(options, *args, **kwds):
281            with options.events.event('run_scripts(%s)', task.__name__,
282                                      name=task.__name__,
283                                      task=task.__name__):
284                for script in scripts:
285                    wrapped_run_script = _kwd_arg_transform(
286                        db_util.run_script,
287                        capture_release_list,
288                        squelch_args)
289
290                    wrapped_run_script(
291                        fn=script,
292                        options=options,
293                        *load_args,
294                        **load_kwds
295                    )
296            return task(options, *args, **kwds)
297        return do_run_script
298    return make_run_script_task
299
300
301def run_id_scripts(inner):
302    return run_scripts(inner,
303                       capture_release_list=['db'],
304                       db=lambda options: options.id_db)
305
306
307class CuratedData(object):
308    '''Treat curated CSV files as if the rows were source code literals.
309    '''
310    # TODO: refactor load_curated_data to use this
311    # TODO: move more terms stuff from .sql to .csv
312    provider_terms = 'curated_data/provider_terms.csv'
313
314    @classmethod
315    def records(cls, resource):
316        # TODO: move to new curated_data module?
317        import csv
318        import pkg_resources as pkg
319
320        rows = csv.reader(pkg.resource_stream(__name__, resource))
321        colnames = [n.replace(' ', '_').upper()
322                    for n in rows.next()]
323        return colnames, list(rows)
324
325    @classmethod
326    def load_terms(cls, resource, conn, clock, dest_table, folder, label):
327        colnames, rows = cls.records(resource)
328        normal_concept = dict(c_synonym_cd='N',
329                              c_facttablecolumn='concept_cd',
330                              c_tablename='concept_dimension',
331                              c_columnname='concept_path',
332                              c_columndatatype='T',
333                              c_operator='like',
334                              m_applied_path='@')
335        shared_cols = dict(normal_concept,
336                           import_date=clock.now(),
337                           # KLUDGE: update_date should come from
338                           # curated data, but I keep running into
339                           # ORA-01861: literal does not match format string
340                           update_date=clock.now())
341        records = [dict(shared_cols.items() + zip(colnames, row))
342                   for row in rows]
343        statement = insert_dml(
344            dest_table,
345            fold_col_case(colnames + sorted(shared_cols.keys())))
346        with conn.transaction_event(label) as t_e:
347            work, event = t_e
348            work.execute(
349                """delete from %s
350                where c_fullname like (:folder || '%%')""" % dest_table,
351                dict(folder=folder))
352            work.executemany(statement, records)
353
354
355def load_curated_data(db, table_name, csv_file, create=True):
356    '''
357    Load curated data from a csv file to a designated table.
358    '''
359    with db.transaction(script=re.sub(r'\W+', '_', csv_file)) as work:
360        if create:
361            work.execute('drop table %s' % table_name, ignore_error=True)
362        db_util.import_csv(work, csv_file, table_name, create=create)
363
364
365def curated_data(db, table, csv_file, create=True):
366    '''Decorator for specifying the curation of data.
367
368    .. note:: The curation of data occurs *prior* to the execution of
369              the decorated task.
370
371    .. note:: This decorator should always be *after* the `@needs`
372              decorator otherwise the dependency chain will not be
373              respected.
374
375    :param string db: the name of a database (e.g. 'id_db', 'deid_db', etc)
376    :param string table: the name of the table in which the data will
377        be stored
378    :param string csv_file: the name of the csv file containing the
379                            data to be curated.
380    :param boolean create: Drop/create specified table before loading.
381    '''
382    def make_curated_task(task):
383        ''' Wraps the incoming task as a data curation task.
384        (i.e. keep the old 'task' functionality, but prepend the
385        the curation functionality to the __call__ invocation.
386        '''
387        @wraps(task)
388        def curate_data(options, *args, **kwds):
389            with options.events.event('curated_data(%s)', task.__name__,
390                                      name=task.__name__,
391                                      task=task.__name__):
392                load_curated_data(options[db], table, csv_file, create)
393            return task(options, *args, **kwds)
394        return curate_data
395    return make_curated_task
396
397
398def view_loader(scripts,
399                capture_release_list=[],
400                *load_args,
401                **load_kwds):
402    '''Decorator for loading source views (transformations).
403
404    .. note::  The creation of the view_loader occurs *prior*
405               to the execution of the decorated task.
406
407    .. note:: This decorator should always be *after* the `@needs`
408              decorator otherwise the dependency chain will not
409              be respected.
410
411    :param scripts:  file name(s) of scripts to be run.
412    :param capture_release_list: See :func:`run_scripts`.
413
414    See :func:`make_source_views` for `server`, `section`, `script`,
415    `chunks`, and `variables` keyword args.
416
417    The available **magic parameters** are: **options** and **script**.
418
419    '''
420    scripts = scripts if not isinstance(scripts, basestring) else (scripts, )
421
422    def make_view_task(task):
423        ''' Wraps the incoming task as a view_loader.  We wrap because we
424        don't want to override the the 'task' with new functionality, we only
425        want to prepend the loading functionality to the __call__ invocation.
426        '''
427        @wraps(task)
428        def make_view(options, *args, **kwds):
429            with options.events.event('view_loader(%s)', task.__name__,
430                                      name=task.__name__,
431                                      task=task.__name__):
432                for script in scripts:
433                    wrapped_make_source_views = _kwd_arg_transform(
434                        make_source_views,
435                        capture_release_list)
436                    wrapped_make_source_views(options=options,
437                                              script=script,
438                                              *load_args,
439                                              **load_kwds)
440            return task(options, *args, **kwds)
441        return make_view
442    return make_view_task
443
444
445def single_loader(transform_name,
446                  capture_release_list=[],
447                  *load_args,
448                  **load_kwds):
449    '''Decorator for specifying the load into the id server
450
451    :param string transform_name:  the name of the transform (likely
452                                   the name of the view_loader being loaded)
453    :capture_release_list: see :func:`run_scripts`
454
455    The available **magic parameters** are: `options` and `transform_name`.
456
457    See also :func:`run_load_script` for keyword parameters `script`,
458    `label`,`dump`, `source_hint`, `chunks`, `variables`, and
459    `postprocess`.
460
461    .. note:: The load occurs *prior* to the execution of the
462              decorated task.
463
464    .. note: This decorator should always be `after` the `@needs`
465    decorator otherwise the dependency chain will not be respected.
466
467    '''
468    def make_loader_task(task):
469        ''' Wraps the incoming task as a single_loader.  We wrap because we
470        don't want to override the the 'task' with new functionality, we only
471        want to prepend the loading functionality to the __call__ invocation.
472        '''
473
474        @wraps(task)
475        def do_load(options, *args, **kwds):
476            '''
477            '''
478            with options.events.event('single_loader(%s)', task.__name__,
479                                      name=task.__name__,
480                                      task=task.__name__):
481                if not(is_transform_loaded(options, transform_name)):
482                    wrapped_run_load_script = _kwd_arg_transform(
483                        run_load_script,
484                        capture_release_list)
485                    wrapped_run_load_script(options=options,
486                                            transform_name=transform_name,
487                                            *load_args, **load_kwds)
488            return task(options, *args, **kwds)
489        return do_load
490    return make_loader_task
491
492
493def multi_loader(transform_list,
494                 capture_release_list=[],
495                 *load_args,
496                 **load_kwds):
497    '''Decorator for specifying multiple loads into the id server.
498
499    :param transform_list: the name of the transforms (likely the name
500                           of the view_loader being loaded) and their
501                           labels
502    :type transform_list: sequence of (`string`, `string`) pairs
503
504    :capture_release_list: see :func:`run_scripts`
505
506    The available magic parameters are: **options**, **transform_name**,
507    and **label**.
508
509    See also :func:`run_load_script` for keyword parameters `script`,
510    `label`,`dump`, `source_hint`, `chunks`, `variables`, and
511    `postprocess`.
512
513    .. note:: The load occurs *prior* to the execution of the
514              decorated task.
515
516    .. note: This decorator should always be `after` the `@needs`
517             decorator otherwise the dependency chain will not be respected.
518
519    '''
520    def make_loader_task(task):
521        ''' Wraps the incoming task as a multi_loader.  We wrap because we
522        don't want to override the the 'task' with new functionality, we only
523        want to prepend the loading functionality to the __call__ invocation.
524        '''
525
526        @wraps(task)
527        def do_load(options, *args, **kwds):
528            '''Iterates over all the views in the transform_dict and loads them
529            if they are not already there.
530            '''
531            with options.events.event('multi_loader(%s)', task.__name__,
532                                      name=task.__name__,
533                                      task=task.__name__):
534                for transform_name, label in transform_list:
535                    if not (is_transform_loaded(options, transform_name)):
536                        wrapped_run_load_script = _kwd_arg_transform(
537                            run_load_script,
538                            capture_release_list)
539                        wrapped_run_load_script(
540                            options=options,
541                            transform_name=transform_name,
542                            label=label,
543                            *load_args,
544                            **load_kwds)
545                return task(options, *args, **kwds)
546        return do_load
547    return make_loader_task
548
549
550def is_transform_loaded(options,
551                        transform_name,
552                        script=('is transform loaded? (no file)')):
553    '''Returns True if id.upload_status has any entries matching the
554    the provided 'transform_name'.
555    '''
556    with options.id_db.transaction(script) as checker:
557        checker.execute('select upload_id from nightherondata.upload_status '
558                        'where transform_name = :transform_name and '
559                        'load_status = \'OK\'',
560                        dict(transform_name=transform_name))
561        return (len(checker.fetchall()) > 0)
562
563
564@task
565@curated_data('deid_db',
566              'ncdr_manual_selections',
567              'curated_data/ncdr_manual_selections.csv')
568def load_ncdr_manual_selections(options):
569    '''NCDR selections not explictly parsed from the data dictionary.'''
570
571
572@task
573@curated_data('id_db',
574              'zips_near_kumc', 'curated_data/zips_near_66160.csv')
575def load_zip_data(options):
576    '''Load datafile regarding the zip codes near KUMC'''
577
578
579@task
580@curated_data('deid_db',
581              'school_districts', 'curated_data/school_districts.csv')
582@curated_data('id_db',
583              'school_districts', 'curated_data/school_districts.csv')
584def load_school_districts(options):
585    '''Load datafile regarding the school districts
586
587    into both id and de-id.
588    '''
589
590
591@task
592@curated_data('deid_db',
593              'code_sync_ex', 'curated_data/code_sync_ex.csv')
594def load_code_sync_ex(options):
595    '''Load data file containing exceptions for known concepts test'''
596
597
598@task
599@curated_data('deid_db',
600              'enc_type_adt_map', 'curated_data/enc_type_adt_map.csv')
601def load_enc_type_adt_map(options):
602    '''Load encounter type to ADT Patient Class mappings'''
603
604
605@task
606@needs('dblink_deid_id')
607@might_call('enhance_concepts')
608def backup_pat_enc_mappings(options,
609                            script='pat_enc_backup.sql'):
610    '''Backup patient and encounter mappings
611    '''
612    db_util.run_script(options.id_db, fn=script)
613
614
615def _ensure_dblink(conn, linkname, connect_string, test_cols, test_table):
616    paver_env.info('_ensure_dblink: %s (%s)' % (linkname, connect_string))
617    with conn.transaction(script='$ensure_dblink') as work:
618        work.execute("drop public database link %s" % linkname,
619                     ignore_error=True, line=1)
620        work.execute("create public database link %s "
621                     "using %s" % (linkname, connect_string), line=2)
622        # Now test that it works.
623        work.execute("select %s from %s@%s where 1=0" % (
624            test_cols, test_table, linkname), line=3)
625
626
627@task
628def dblink_id_deid(options,
629                   linkname='deid'):
630    '''Ensure database link 'deid' from identified repository to de-identified.
631
632    We assume any necessary ssh tunnel is already running.
633    '''
634    _ensure_dblink(options.id_db, linkname,
635                   db_util.connect_string(options.deid_sid,
636                                          options.deid_host,
637                                          options.deid_port),
638                   'patient_num', 'BlueHeronData.patient_dimension')
639
640
641@task
642def dblink_deid_id(options,
643                   linkname='id'):
644    '''Ensure database link 'epic' from de-identified repository to clarity.
645
646    We assume any necessary ssh tunnel is already running.
647
648    .. todo: consider eliminating the need for this link (see #482)
649    '''
650    _ensure_dblink(options.deid_db, linkname,
651                   db_util.connect_string(options.clarity_sid,
652                                          'localhost',
653                                          options.clarity_concept_port),
654                   'pat_id', 'clarity.patient')
655
656
657@task
658@needs('dblink_id_deid',
659       'dblink_deid_id')
660def ensure_ssh_tunnels():
661    '''set up dblinks 1st to check ssh connections'''
662    pass
663
664
665@task
666@no_help
667@view_loader('schemes.sql', server='deid_server', section='deid')
668def make_scheme_labels(options):
669    pass
670
671
672@task
673@needs('ensure_ssh_tunnels',
674       'load_epic_dimensions',
675       'load_idx_dimensions',)
676def load_dimensions():
677    pass
678
679
680@task
681@needs('load_epic_demographics',
682       'load_idx_demographics',
683       'ssdmf_load')
684def load_heron_demographics():
685    pass
686
687
688@task
689@needs('load_idx_clinical_facts',
690       'load_epic_diagnosis',
691       'load_epic_billing_diagnosis')
692def load_heron_dx():
693    pass
694
695
696@task
697@needs('load_epic_services',
698       'load_epic_enc_vitals')
699def load_visit_details():
700    pass
701
702
703@task
704@curated_data('id_db', 'etl_tests', 'curated_data/etl_tests.csv',
705              create=False)
706@curated_data('deid_db', 'etl_tests', 'curated_data/etl_tests.csv',
707              create=False)
708def load_etl_tests(options):
709    '''Load ETL tests descriptions, thresholds, etc from .csv.
710    '''
711    pass
712
713
714@task
715@needs('load_etl_tests')
716@run_scripts('validate_test_data.sql',
717             capture_release_list=['db'],
718             db=lambda options: options.id_db)
719def validate_test_data(options):
720    '''Validates that certain custom edge cases exist in the staged test data.
721    '''
722    pass
723
724
725def _ensure_dump_file_access(options, dump_types=('bsr', 'uhc',
726                                                  'ssdmf', 'naaccr',
727                                                  'ssdmf', 'idx',
728                                                  'clarity', 'redcap')):
729    ''' Make sure that dump files exist and can be read.
730
731    >>> options = _option  # un-hide for testing
732    >>> options.path_for = MockPathFor(['ok_dump', 'noread_dump'])
733
734    What if a dump isn't specified in the ini?
735
736    >>> _ensure_dump_file_access(options, dump_types = ('silly',))
737    Traceback (most recent call last):
738      ...
739    AttributeError: silly_dump
740
741    What if a dump file isn't readable?
742
743    >>> _ensure_dump_file_access(options, dump_types = ('ok', 'noread'))
744    Traceback (most recent call last):
745      ...
746    IOError: /path/to/noread_dump is not readable.
747    '''
748    for s in dump_types:
749        dump_path = getattr(options.path_for, s + '_dump')
750        if not dump_path.exists():
751            raise IOError('%s is not readable.' % dump_path)
752
753
754class MockPath(object):
755    def __init__(self, pathname):
756        self.n = pathname
757
758    def exists(self):
759        return 'ok' in self.n
760
761    def __str__(self):
762        return self.n
763
764
765class MockPathFor(object):
766    def __init__(self, ok):
767        self.ok = ok
768
769    def __getattr__(self, attr):
770        if attr in self.ok:
771            return MockPath('/path/to/%s' % attr)
772        else:
773            raise AttributeError(attr)
774
775
776@task
777def ensure_dump_file_access(options):
778    ''' Make sure that dump files exist and can be read.
779    '''
780    _ensure_dump_file_access(options)
781
782
783@task
784@needs('dblink_id_deid', 'dblink_deid_id', 'ensure_dump_file_access')
785def ensure_db_connections(options,
786                          script='Check DB Connections',
787                          id_sections=('id', 'clarity'),
788                          deid_sections=('deid',),
789                          skip_sections=('DBA', 'i2b2_id', 'i2b2_deid')):
790    ''' Check for database connectivity.
791    '''
792    config_sections = sorted([k[:-4] for k in options.keys()
793                              if k.endswith('_sid')])
794    expected_sections = sorted(id_sections + deid_sections + skip_sections)
795    assert (config_sections == expected_sections), (
796        'section mismatch: missing: %s, extra: %s' %
797        (set(expected_sections) - set(config_sections),
798         set(config_sections) - set(expected_sections)))
799
800    for section in id_sections:
801        with options.id_server.database(section).transaction(script) as chk:
802            chk.execute('select 1 from dual')
803    for section in deid_sections:
804        with options.deid_server.database(section).transaction(script) as chk:
805            chk.execute('select 1 from dual')
806
807
808@task
809@needs('ensure_dump_file_access', 'ensure_db_connections', 'validate_test_data')
810def check_etl_ready(options):
811    ''' Check for database connectivity and dump file access and run test data
812        validation.
813    '''
814    pass
815
816
817@task
818@needs(
819       # Demographics: needed for de-identification of other facts  # noqa
820       'load_heron_demographics',
821       # My Chart facts are part of the demographics Terminology
822       'load_mychart_facts',
823       # Allergies come 1st in the Terminology: alphabetical order
824       'load_epic_allergy',
825       # Cancer Cases
826       'load_tumor_facts',
827       # Diagnoses (and Procedures)
828       'load_heron_dx',
829       # Admission Discharge and Transfer Visit Details
830       'load_epic_adt_visit_details',
831       # Discharge Disposition Codes
832       'load_epic_disch_disp_codes',
833       # Flowsheets
834       'load_epic_flowsheets',
835       # History
836       'load_epic_medical_history',
837       'load_epic_social_history',
838       # Lab
839       'load_epic_labs',
840       # Lab
841       'load_epic_alerts',       
842       # Alerts
843       'load_epic_med_facts',
844       # Microbiology
845       'load_epic_microbiology',
846       # Order Sets, Procedure Orders
847       'load_epic_orders',
848       # Redcap
849       'load_redcap_clinical_facts',
850       # Reports
851       'load_epic_notes',
852       # Specimens
853       'bsr_load',
854       # UHC
855       'load_uhc_clinical_facts',
856       # Visit Details
857       'load_visit_details',
858       'load_epic_ed_episode',
859           # NCDR
860           'load_ncdr_facts')
861def load_all_facts():
862    pass
863
864
865@task
866@needs('check_etl_ready',
867       # Construct views so that their tests fail early.
868       # TODO: replace load_dimensions task by dependencies from load tasks.
869       'load_dimensions',
870       'load_all_facts',
871       'index_deid_facts',
872       'index_id_facts',
873       'deid_verify_age90',
874       'deid_verify_med_orders',
875       'deid_verify_inst_num',
876       'verify_ob_fact_enc_ide',
877
878       # count_facts_by_concept should be last.
879       'count_facts_by_concept')
880def heron_load():
881    '''Load HERON from Epic, IDX.
882    '''
883    pass
884
885
886@task
887@might_call('index_deid_facts')
888def count_facts_by_concept(options,
889                           script='count_facts_by_concept.sql'):
890    '''Build table of counts by concept code.
891    '''
892    db_util.run_script(options.deid_db, fn=script)
893
894
895@task
896def prune_id_facts(options,
897                   script='i2b2_facts_prune.sql'):
898    '''Delete identified facts from incomplete uploads.
899
900    i.e. where load_status is null
901    '''
902    db_util.run_script(options.id_db, fn=script,
903                       variables=dict(star=ID_SCHEMA))
904
905
906@task
907def prune_deid_facts(options,
908                     script='i2b2_facts_prune.sql'):
909    '''Delete de-identified facts from incomplete uploads.
910
911    i.e. where load_status is null
912    '''
913    db_util.run_script(options.deid_db, fn=script,
914                       variables=dict(star=DEID_SCHEMA))
915
916
917class UploadStatus(object):
918    '''A proxy for UPLOAD_STATUS in an i2b2 datamart.
919
920    Each record in the datamart is related to an upload_status; this allows
921    batches of records to be distinguished even after they are uploaded.
922
923    Use `self.upload_id` to refer to the upload_id allocated in the creation
924    of this record.
925    '''
926
927    table = 'nightherondata.upload_status'
928    columns = (
929        "UPLOAD_ID",  # fixed
930        "UPLOAD_LABEL",
931        "USER_ID",
932        "SOURCE_CD",
933        "NO_OF_RECORD",
934        "LOADED_RECORD",
935        "DELETED_RECORD",
936        "LOAD_DATE",
937        "END_DATE",
938        "LOAD_STATUS",
939        "MESSAGE",
940        "INPUT_FILE_NAME",
941        "LOG_FILE_NAME",
942        "TRANSFORM_NAME")
943
944    @classmethod
945    @contextmanager
946    def make(cls, conn, meta, label, user_id,
947             input_file_name, source_cd_hint):
948        with conn.transaction_event(script=re.sub('\W+', '_', label)) as w_e:
949            work, events = w_e
950            with meta.transaction(script='$source_cd_lookup') as q:
951                cd = cls._cd_from_hint(q, source_cd_hint)
952            job = cls(work, events, upload_id=cls._nextid(work))
953            job._insert(label, user_id, input_file_name,
954                        source_cd=cd)
955            yield job
956
957    @classmethod
958    def _nextid(self, work):
959        work.execute(
960            'select nightherondata.SQ_UPLOADSTATUS_UPLOADID.nextval'
961            ' from dual')
962        return work.fetchone()[0]
963
964    @classmethod
965    def _cd_from_hint(cls, q, hint):
966        '''e.g. _cd_from_hint(q, 'Epic') = 'Epic@kumed.com'
967
968        see also metadata_init.sql
969        '''
970        q.execute("select source_cd from BlueHeronData.source_master"
971                  " where source_cd like (:hint || '@%')",
972                  dict(hint=hint))
973        ans = q.fetchall()
974
975        if len(ans) != 1:
976            raise ValueError('bad source_cd hint %s for %s' % (
977                hint, 'BlueHeronData.source_master'))
978        return ans[0][0]
979
980    def __init__(self, work, events, upload_id):
981        self._tx = work
982        self._events = events
983        self.upload_id = upload_id
984
985    def _insert(self, label, user_id, input_file_name, source_cd):
986        '''
987        :param label: a label for related facts for audit purposes
988        :param user_id: an indication of who uploaded the related facts
989        :param input_file_name: path object for input file (e.g. clarity.dmp)
990        :param source_cd: value for upload_status.source_cd
991        '''
992        self._tx.execute(
993            """
994            insert into nightherondata.upload_status
995            (upload_id, upload_label, user_id,
996              source_cd,
997              load_date, input_file_name)
998            values (:upload_id, :label, :user_id,
999              :source_cd,
1000              sysdate, :filename)""",
1001            dict(upload_id=self.upload_id,
1002                 label=label,
1003                 user_id=user_id,
1004                 source_cd=source_cd,
1005                 filename=input_file_name))
1006
1007    def update(self, **args):
1008        '''Update SQL fields using python arguments.
1009        For example::
1010
1011           r.update(load_status='OK')
1012        '''
1013        self._tx.execute(('update ' + self.table + ' '
1014                          + self._update_set(**args)
1015                          + ' where upload_id = :upload_id'),
1016                         dict(args, upload_id=self.upload_id))
1017
1018    def log_record_counts(self):
1019        self._tx.execute('select no_of_record, loaded_record'
1020                         ' from %s where upload_id=:upload_id' % self.table,
1021                         parameters=dict(upload_id=self.upload_id))
1022        total, loaded = self._tx.fetchone()
1023        self._events.report(upload_id=self.upload_id,
1024                            z_total=total, z_loaded=loaded)
1025
1026    @classmethod
1027    def _update_set(cls, **args):
1028        '''
1029        >>> UploadStatus._update_set(message='done', no_of_record=1234)
1030        'set no_of_record=:no_of_record, message=:message'
1031        '''
1032        for k in args.keys():
1033            if k.upper() not in cls.columns:
1034                raise TypeError('bad column: %s' % k)
1035
1036        return 'set ' + ', '.join(['%s=:%s' % (k, k) for k in args.keys()])
1037
1038
1039def run_load_script(options,
1040                    script, label, dump, transform_name, source_hint,
1041                    chunks=5,
1042                    chunk_param='part',
1043                    variables=None,
1044                    addl_params=None,
1045                    preprocess=(lambda options, transform_name,
1046                                chunks, chunk_param: 0),
1047                    postprocess=lambda options, upload_id: 1 / 0):
1048    '''Runs a script and load data into the id server
1049    '''
1050    with UploadStatus.make(options.id_db, options.deid_db,
1051                           label, options.getuser(),
1052                           input_file_name=str(dump),
1053                           source_cd_hint=source_hint) as job:
1054        preprocess(options, transform_name, chunks, chunk_param)
1055
1056        job.update(message=options.message,
1057                   log_file_name=str(options.log_file_name))
1058        download_date = datetime.datetime.fromtimestamp(dump.mtime)
1059        start_date = options.clock.now()
1060
1061        log_fact_exceptions = (
1062            ("""
1063             log errors into %(table)s ('%(label)s')
1064                        reject limit %(limit)s""" %
1065             dict(table=options.id_fact_exceptions,
1066                  label=label,
1067                  limit=options.id_fact_exceptions_limit))
1068            if options.id_fact_exceptions else '')
1069        run_script(options.id_db, fn=script,
1070                   params=dict(addl_params or {},
1071                               upload_id=job.upload_id,
1072                               download_date=download_date,
1073                               part=None),
1074                   variables=dict(variables or {},
1075                                  heron_etl_chunks=chunks,
1076                                  log_fact_exceptions=log_fact_exceptions),
1077                   chunk_param=chunk_param,
1078                   chunk_qty=chunks)
1079
1080        end_date = options.clock.now()
1081        duration = end_date - start_date
1082        job.update(load_status='OK',
1083                   message='%s %s' % (options.message, duration),
1084                   end_date=end_date,
1085                   transform_name=transform_name)
1086        job.log_record_counts()
1087        assert isinstance(job.upload_id, type(0))
1088
1089        return postprocess(options, job.upload_id)
1090
1091
1092def make_source_views(options, server, section, script,
1093                      chunks=5, variables=None):
1094    '''Set up views to transform source data.
1095
1096    :param string server: the name of server (e.g. 'id_server',
1097                         'deid_server', etc)
1098    :param string section: the name of the database section
1099                           (e.g. 'clarity', 'kumc', etc).
1100    :param int chunks: see :func:`db_util.run_script`.
1101    :param dict variables: see :func:`db_util.run_script`.
1102
1103    `CREATE VIEW` and `SELECT ANY TABLE` privileges seem to be needed.
1104    '''
1105    run_script(options[server].database(section), fn=script,
1106               variables=dict(variables or {},
1107                              heron_etl_chunks=chunks))
1108
1109
1110@task
1111@needs('dblink_deid_id', 'dblink_id_deid',
1112       'load_provider_terms',
1113       'load_epic_concepts', 'load_age_terms', 'load_med_map_curated_data',
1114       'load_risk_type_curated_data', 'load_tumor_concepts','load_ncdr_concepts',
1115       'load_bsr_categories')
1116@might_call('count_facts_by_concept')
1117def load_source_concepts(options,
1118                         deid_scripts=('bsr_concepts_load.sql',
1119                                       'dem_concepts_load.sql'),
1120                         id_scripts=('uhc_concepts_load.sql',
1121                                     'umls_dx_concepts.sql',
1122                                     'epic_med_mapping.sql',
1123                                     'epic_dx_concepts.sql',
1124                                     'redcap_concepts_load.sql')):
1125    '''Load dem, UHC, BSR, med concepts into i2b2 de-identified repository.'''
1126    for script in deid_scripts:
1127        db_util.run_script(options.deid_db, fn=script)
1128    for script in id_scripts:
1129        db_util.run_script(options.id_db, fn=script)
1130
1131
1132@task
1133@no_help
1134@needs('load_source_concepts')
1135@run_scripts('concepts_merge.sql',
1136             capture_release_list=['db'],
1137             db=lambda options: options.deid_db)
1138def concepts_merge(options):
1139    pass
1140
1141
1142@task
1143@no_help
1144@might_call('concepts_merge')
1145@run_scripts('concepts_activate.sql',
1146             capture_release_list=['db'],
1147             db=lambda options: options.deid_db)
1148def concepts_activate(options):
1149    pass
1150
1151
1152@task
1153@needs('concepts_merge')
1154def report_dup_fullnames(options,
1155                         script='report_dup_fullnames.sql'):
1156    '''Exception report entry for duplicate fullnames in the terms tree
1157    '''
1158    db_util.run_script(options.deid_db, fn=script)
1159
1160
1161@task
1162@needs('concepts_merge', 'load_tumor_shortcuts',
1163       'report_dup_fullnames', 'concepts_activate')
1164def load_all_concepts(options):
1165    '''Loads all of the source concepts and shortcut concepts in to heron_terms
1166    '''
1167
1168
1169@task
1170@needs('dblink_deid_id', 'load_code_sync_ex')
1171@might_call('count_facts_by_concept')
1172def gather_stats(options,
1173                 script='concept_stats.sql'):
1174    '''Count facts, patients by concept/term.'''
1175    db_util.run_script(options.deid_db, fn=script)
1176
1177
1178@task
1179@might_call('load_all_concepts')
1180def add_tooltips(options,
1181                 script='concept_tooltips.sql'):
1182    '''Add tooltips to any concepts that don't already have one.'''
1183    db_util.run_script(options.deid_db, fn=script)
1184
1185
1186@task
1187@needs('gather_stats', 'add_tooltips')
1188def enhance_concepts():
1189    '''Gather stats, fill in missing tooltips & create redcap projects.'''
1190    pass
1191
1192
1193@task
1194@needs('dblink_id_deid')
1195@might_call('load_heron_demographics')
1196def deid_verify_age90(options,
1197                      script='epic_i2b2_deid_verify.sql'):
1198    db_util.run_script(options.id_db, fn=script)
1199
1200
1201@task
1202@needs('dblink_id_deid')
1203@might_call('load_epic_med_facts')
1204@run_scripts('epic_verify_deid_med_orders.sql',
1205             capture_release_list=['db'],
1206             db=lambda options: options.deid_db)
1207def deid_verify_med_orders(options):
1208    '''Verify that med_order_ids are not in the instance_nums
1209       of the deid observation fact table'''
1210
1211
1212@task
1213@needs('dblink_id_deid')
1214@might_call('load_all_facts')
1215@run_scripts('verify_deid_instance_num.sql',
1216             capture_release_list=['db', 'variables'],
1217             db=lambda options: options.id_db,
1218             variables=lambda options: {'kupi': options.kupi_schema})
1219def deid_verify_inst_num(options):
1220    '''Check that the instance_num doesn't match various identifiers.
1221    '''
1222
1223
1224@task
1225@needs('dblink_id_deid')
1226@might_call('load_all_facts')
1227@run_scripts('verify_ob_fact_encounter_ide.sql',
1228             capture_release_list=['db'],
1229             db=lambda options: options.id_db)
1230def verify_ob_fact_enc_ide(options):
1231    '''Check that an encounter_ide is associated with every fact.
1232    '''
1233
1234
1235@task
1236@needs('load_etl_tests')
1237def etl_tests_report(options, report_script='etl_tests_report.sql'):
1238    ''' Generate report for ETL tests.
1239    '''
1240    run_script(options.id_db, fn=report_script)
1241
1242
1243##############
1244# Trusted code
1245
1246from paver.easy import options as _option
1247
1248# TODO: Consider relying on config file for these.
1249_option(
1250    config='heron-test.ini.example',  # Default during development
1251    i2b2_source='mock_i2b2_source',
1252    id_sid=None,
1253    id_host=None,
1254    id_port=1521,  # oracle listener port
1255    id_fact_exceptions='NightHerondata.observation_fact_exceptions',
1256    id_fact_exceptions_limit='50',
1257    deid_sid=None,
1258    deid_host=None,
1259    deid_port=1521,
1260    message='',
1261    log_file_name=None,
1262    log_dir=None,
1263    timing_test_desc='Default timing test description.')
Note: See TracBrowser for help on using the repository browser.