-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgemini_merge_chunks.py
243 lines (190 loc) · 6.95 KB
/
gemini_merge_chunks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#!/usr/bin/env python
import os
import shutil
import sqlite3
import sys
import uuid
import database as gemini_db
import gemini_load_chunk
def append_variant_info(main_curr, chunk_db):
"""
Append the variant and variant_info data from a chunk_db
to the main database.
"""
cmd = "attach ? as toMerge"
main_curr.execute(cmd, (chunk_db, ))
main_curr.execute("BEGIN TRANSACTION")
cmd = "INSERT INTO variants SELECT * FROM toMerge.variants"
main_curr.execute(cmd)
cmd = \
"INSERT INTO variant_impacts SELECT * FROM toMerge.variant_impacts"
main_curr.execute(cmd)
cmd = \
"INSERT INTO variants_cnv SELECT * FROM toMerge.variants_cnv"
main_curr.execute(cmd)
main_curr.execute("END TRANSACTION")
cmd = "detach toMerge"
main_curr.execute(cmd)
def append_sample_genotype_counts(main_curr, chunk_db):
"""
Append the sample_genotype_counts from a chunk_db
to the main database.
"""
cmd = "attach ? as toMerge"
main_curr.execute(cmd, (chunk_db, ))
cmd = "INSERT INTO sample_genotype_counts \
SELECT * FROM toMerge.sample_genotype_counts"
main_curr.execute(cmd)
cmd = "detach toMerge"
main_curr.execute(cmd)
def append_sample_info(main_curr, chunk_db):
"""
Append the sample info from a chunk_db
to the main database.
"""
cmd = "attach ? as toMerge"
main_curr.execute(cmd, (chunk_db, ))
cmd = "create table samples as select * from toMerge.samples where 1=0"
main_curr.execute(cmd)
cmd = "INSERT INTO samples SELECT * FROM toMerge.samples"
main_curr.execute(cmd)
cmd = "detach toMerge"
main_curr.execute(cmd)
def append_resource_info(main_curr, chunk_db):
"""
Append the resource info from a chunk_db
to the main database.
"""
cmd = "attach ? as toMerge"
main_curr.execute(cmd, (chunk_db, ))
cmd = "INSERT INTO resources SELECT * FROM toMerge.resources"
main_curr.execute(cmd)
cmd = "detach toMerge"
main_curr.execute(cmd)
def append_version_info(main_curr, chunk_db):
"""
Append the version info from a chunk_db
to the main database.
"""
cmd = "attach ? as toMerge"
main_curr.execute(cmd, (chunk_db, ))
cmd = "INSERT INTO version SELECT * FROM toMerge.version"
main_curr.execute(cmd)
cmd = "detach toMerge"
main_curr.execute(cmd)
def append_vcf_header(main_curr, chunk_db):
"""
Append the vcf_header from a chunk_db
to the main database.
"""
cmd = "attach ? as toMerge"
main_curr.execute(cmd, (chunk_db, ))
cmd = "INSERT INTO vcf_header SELECT * FROM toMerge.vcf_header"
main_curr.execute(cmd)
cmd = "detach toMerge"
main_curr.execute(cmd)
def append_gene_summary(main_curr, chunk_db):
"""
Append the gene_summary from a chunk_db
to the main database.
"""
cmd = "attach ? as toMerge"
main_curr.execute(cmd, (chunk_db, ))
cmd = "INSERT INTO gene_summary SELECT * FROM toMerge.gene_summary"
main_curr.execute(cmd)
cmd = "detach toMerge"
main_curr.execute(cmd)
def append_gene_detailed(main_curr, chunk_db):
"""
Append the gene_detailed from a chunk_db
to the main database.
"""
cmd = "attach ? as toMerge"
main_curr.execute(cmd, (chunk_db, ))
cmd = "INSERT INTO gene_detailed SELECT * FROM toMerge.gene_detailed"
main_curr.execute(cmd)
cmd = "detach toMerge"
main_curr.execute(cmd)
def update_sample_genotype_counts(main_curr, chunk_db):
"""
Update the main sample_genotype_counts table with the
counts observed in one of the chunked databases (chunk_db)
"""
curr_db_conn = sqlite3.connect(chunk_db)
curr_db_conn.isolation_level = None
curr_db_conn.row_factory = sqlite3.Row
curr_db_curr = curr_db_conn.cursor()
cmd = "SELECT sample_id, num_hom_ref, \
num_het, num_hom_alt, \
num_unknown FROM sample_genotype_counts"
curr_db_curr.execute(cmd)
for row in curr_db_curr:
main_curr.execute("""UPDATE sample_genotype_counts
SET num_hom_ref = num_hom_ref + ?,
num_het = num_het + ?,
num_hom_alt = num_hom_alt + ?,
num_unknown = num_unknown + ?
WHERE sample_id= ? """,
(row['num_hom_ref'],
row['num_het'],
row['num_hom_alt'],
row['num_unknown'],
row['sample_id']))
curr_db_curr.close()
def merge_db_chunks(args):
# open up a new database
if os.path.exists(args.db):
os.remove(args.db)
gemini_db.create_tables(args.db, gemini_load_chunk.get_extra_effects_fields(args) if args.vcf else [])
main_conn = sqlite3.connect(args.db)
main_conn.isolation_level = None
main_curr = main_conn.cursor()
main_curr.execute('PRAGMA synchronous = OFF')
main_curr.execute('PRAGMA journal_mode=MEMORY')
databases = []
for database in args.chunkdbs:
databases.append(database)
for idx, database in enumerate(databases):
db = database[0]
append_variant_info(main_curr, db)
# we only need to add these tables from one of the chunks.
if idx == 0:
append_sample_genotype_counts(main_curr, db)
append_sample_info(main_curr, db)
append_resource_info(main_curr, db)
append_version_info(main_curr, db)
append_vcf_header(main_curr, db)
append_gene_summary(main_curr, db)
append_gene_detailed(main_curr, db)
else:
update_sample_genotype_counts(main_curr, db)
if args.index:
gemini_db.create_indices(main_curr)
main_conn.commit()
main_curr.close()
def merge_chunks(parser, args):
errors = []
for try_count in range(2):
try:
if try_count > 0:
tmp_dbs = [os.path.join(args.tempdir, "%s.db" % uuid.uuid4())
for _ in args.chunkdbs]
for chunk_db, tmp_db in zip(args.chunkdbs, tmp_dbs):
shutil.copyfile(chunk_db[0], tmp_db)
chunk_db[0] = tmp_db
output_db = args.db
args.db = os.path.join(args.tempdir, "%s.db" % uuid.uuid4())
merge_db_chunks(args)
if try_count > 0:
shutil.move(args.db, output_db)
for tmp_db in tmp_dbs:
os.remove(tmp_db)
break
except sqlite3.OperationalError, e:
errors.append(str(e))
sys.stderr.write("sqlite3.OperationalError: %s\n" % e)
else:
raise Exception("Attempted workaround for SQLite locking issue on NFS "
"drives has failed. One possible reason is that the temp directory "
"%s is also on an NFS drive. Error messages from SQLite: %s"
% (args.tempdir, " ".join(errors)))