Note: We no longer publish the latest version of our code here. We primarily use a kumc-bmi github organization. The heron ETL repository, in particular, is not public. Peers in the informatics community should see MultiSiteDev for details on requesting access.

source: heron_load/i2b2_deid.py @ 0:42ad7288920a

heron-michigan tip
Last change on this file since 0:42ad7288920a was 0:42ad7288920a, checked in by Matt Hoag <mhoag@…>, 6 years ago

Merge with demo_concepts_3800

File size: 6.4 KB
Line 
1'''i2b2_deid.py -- tasks to de-identify data from one i2b2 datamart to another
2------------------------------------------------------------------------------
3
4Records are de-identified in chunks according to upload_id.
5The default is `upload_id` from all `upload_status` records
6where `load_status` is not null::
7
8  >>> options = _option  # un-hide for testing
9  >>> options.upload_ids
10  'all'
11
12.. todo:: add support for update_date. date shift?
13
14:copyright: Copyright 2010-2013 University of Kansas Medical Center
15            part of the `HERON open source codebase`__;
16            see NOTICE file for license details.
17
18__ http://informatics.kumc.edu/work/wiki/HERON
19'''
20
21from paver.easy import task, needs, might_call
22
23import sys, os  # work around paver import wierdness
24sys.path.append(os.path.dirname(__file__))
25import db_util
26from heron_build import ID_SCHEMA, DEID_SCHEMA, load_zip_data
27from heron_build import load_school_districts
28
29DEID_CHUNKS = 10
30
31
32@task
33@needs('dblink_id_deid')
34def deid_dimensions(options,
35                    script='i2b2_dimensions_deid.sql',
36                    truncate_script='i2b2_star_truncate.sql'):
37    '''Load patient, visit dimensions into the de-id datamart.
38
39    Truncate datamart first.
40    '''
41    do_deid_dimensions(options,
42                       script=script, truncate_script=truncate_script)
43
44
45def do_deid_dimensions(options,
46                       upload_ids=None,
47                       truncate_script=None,
48                       script='i2b2_dimensions_deid.sql'):
49    # grumble. paver messes up arguments in calls to @task functions
50    if truncate_script:
51        db_util.run_script(options.deid_db, fn=truncate_script,
52                           variables=dict(star=DEID_SCHEMA))
53    _run_deid_script(options, script, upload_ids, dimensions=True)
54
55
56def _run_deid_script(options, script, upload_ids, dimensions,
57                     variables=dict(), chunks=5):
58    if upload_ids is None:
59        upload_ids = _find_upload_ids(options, dimensions)
60    for upload_id in upload_ids:
61        db_util.run_script(options.id_db, fn=script,
62                           variables=dict(variables, heron_etl_chunks=chunks),
63                           params=dict(upload_id=upload_id, part=None),
64                           chunk_qty=chunks,
65                           chunk_param='part')
66
67
68def _find_upload_ids(options, dimensions=False):
69    if options.upload_ids == 'all':
70        ids = []
71        with options.id_db.transaction(script='$find_upload_ids') as q:
72            q.execute('''select upload_id, transform_name
73                      from %s.upload_status
74                      where load_status is not null''' % ID_SCHEMA)
75            for uid, tname in q.fetchall():
76                if dimensions == ('dimensions' in tname):
77                    ids.append(uid)
78        return ids
79    else:
80        return map(int, options.upload_ids.split(','))
81
82
83@task
84@needs('dblink_id_deid')
85def deid_facts(options):
86    '''Migrate HERON facts from identified to de-identified repository.
87    '''
88    do_deid_facts(options)
89
90
91def do_deid_facts(options,
92                  upload_ids=None,
93                  script='i2b2_facts_deid.sql',
94                  deid_instance_num=True):
95    # "add_to_instance_mapping" is a variable preceeded by a "where" clause
96    # so 1 = 1 will cause the instance nums to be added to the instance mapping
97    # table whereas 1 = 0 will not.
98
99    # "instance_num" specifies the column/table that should be used for
100    # the instance number.
101
102    # "instance_mapping_join" essentially turns joining with the
103    # NightHeronData.instance_mapping table on and off.  This allows for
104    # a significant performance speed up when de-identifying facts where
105    # instance number does not contain PHI.
106    if(deid_instance_num):
107        variables = dict(add_to_instance_mapping=" 1 = 1 ",
108                         instance_num=" im.deid_instance_num ",
109                         instance_mapping_join=" join NightHeronData.instance_mapping im on im.id_instance_num = f.instance_num "
110                        )
111    else:
112        variables = dict(add_to_instance_mapping=" 1 = 0 ",
113                     instance_num=" f.instance_num ",
114                     instance_mapping_join="")
115    _run_deid_script(options, script, upload_ids, dimensions=False,
116                     variables=variables)
117
118
119def index_facts(options,
120                db,
121                star,
122                tablespace,
123                script='i2b2_facts_index.sql',
124                index_opts='nologging parallel 6'):
125    db_util.run_script(db, fn=script, variables=dict(
126        star=star, index_opts='%s tablespace %s' % (index_opts, tablespace)))
127
128
129@task
130@might_call('load_all_facts')
131def index_deid_facts(options):
132    index_facts(options, options.deid_db, DEID_SCHEMA,
133                options.deid_index_tablespace)
134
135
136@task
137@might_call('load_all_facts')
138def index_id_facts(options):
139    index_facts(options, options.id_db, ID_SCHEMA,
140                options.id_index_tablespace)
141
142
143@task
144@needs('deid_dimensions', 'deid_facts', 'index_deid_facts')
145def deid_all():
146    '''Make a de-identified copy of the HERON repository.
147    '''
148    pass
149
150
151@task
152def drop_uploads(options):
153    '''Drop facts by upload_id, e.g. to clean up after a buggy load.
154
155    Drop from deid_db, then id_db.
156    '''
157    # Similarity between option syntax and SQL syntax is coincidental.
158    ids = [int(x) for x in options.upload_ids.split(',')]
159    ids_sql = ', '.join([str(x) for x in ids])
160
161    for db, schema in ((options.deid_db, DEID_SCHEMA),
162                       (options.id_db, ID_SCHEMA)):
163        db_util.ensure_index(db, 'observation_fact_upload_id',
164                             schema, 'observation_fact', ('upload_id',))
165        with db.transaction(
166            script='$drop_uploads(%s, %s)' % (schema, ids_sql)) as tx:
167            tx.execute('delete from %s.observation_fact '
168                       'where upload_id in (%s)' % (
169                           schema, ids_sql))
170            tx.execute('delete from %s.encounter_mapping '
171                       'where upload_id in (%s)' % (
172                           schema, ids_sql))
173            tx.execute("update %s.upload_status "
174                       "set load_status='Dropped' "
175                       "where upload_id in (%s)" % (
176                           schema, ids_sql))
177
178
179from paver.easy import options as _option
180
181_option(
182    upload_ids='all')
183
184
185if __name__ == '__main__':
186    import doctest
187    doctest.testmod()
Note: See TracBrowser for help on using the repository browser.