Skip to content

Commit e148f05

Browse files
sqlite3 copy uses inefficient method when sqlite3 cli not available
1 parent e4c3ff2 commit e148f05

File tree

4 files changed

+75
-44
lines changed

4 files changed

+75
-44
lines changed

parsons/databases/sqlite/sqlite.py

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
1+
import datetime
2+
import logging
3+
import pickle
4+
import shutil
15
import sqlite3
2-
from pathlib import Path
36
import subprocess
4-
import datetime
5-
from typing import Optional, Literal, Union
67
from collections.abc import Iterator
7-
from parsons.utilities import files
8-
from parsons.etl.table import Table
9-
import pickle
8+
from contextlib import contextmanager
9+
from pathlib import Path
10+
from typing import Literal, Optional, Union
11+
1012
import petl
13+
1114
from parsons.databases.database_connector import DatabaseConnector
1215
from parsons.databases.table import BaseTable
13-
from contextlib import contextmanager
14-
15-
import logging
16+
from parsons.etl.table import Table
17+
from parsons.utilities import files
1618

1719
# Max number of rows that we query at a time, so we can avoid loading huge
1820
# data sets into memory.
@@ -183,6 +185,7 @@ def copy(
183185
table_name: str,
184186
if_exists: str = "fail",
185187
strict_length: bool = False,
188+
force_python_sdk: bool = False,
186189
):
187190
"""
188191
Copy a :ref:`parsons-table` to Sqlite.
@@ -200,6 +203,8 @@ def copy(
200203
the created table's column sizes will be sized to exactly fit the current data,
201204
or if their size will be rounded up to account for future values being larger
202205
then the current dataset. Defaults to ``False``.
206+
force_python_sdk: bool
207+
Use the python SDK to import data to sqlite3, even if the sqlite3 cli utility is available for more efficient loading. Defaults to False.
203208
"""
204209

205210
with self.connection() as connection:
@@ -211,13 +216,51 @@ def copy(
211216
self.query_with_connection(sql, connection, commit=False, return_values=False)
212217
logger.info(f"{table_name} created.")
213218

214-
csv_file_path = tbl.to_csv()
215-
216-
self._cli_command(f'".import --csv --skip 1 {csv_file_path} {table_name}"')
219+
# Use the sqlite3 command line for csv import if possible, as it is much more efficient
220+
if shutil.which("sqlite3") and not force_python_sdk:
221+
csv_file_path = tbl.to_csv()
222+
self._cli_command(f'".import --csv --skip 1 {csv_file_path} {table_name}"')
223+
else:
224+
self.import_table_iteratively(tbl, table_name, if_exists)
217225

218226
logger.info(f"{len(tbl)} rows copied to {table_name}.")
219227

228+
def import_table_iteratively(
229+
self, tbl: Table, table_name: str, if_exists: str, chunksize=10000
230+
) -> None:
231+
"""Import a CSV row by row using the python sqlite3 API.
232+
233+
Iterates over chunks of length `chunksize`
234+
235+
It is generally more efficient to use the sqlite3 CLI to
236+
import a CSV, but not all machines have the shell utility
237+
available, so we can fall back to this method.
238+
"""
239+
chunked_tbls = tbl.chunk(chunksize)
240+
insert_sql = "INSERT INTO {} ({}) VALUES ({});".format(
241+
table_name,
242+
", ".join(tbl.columns),
243+
", ".join(["?" for _ in tbl.columns]),
244+
)
245+
with self.connection() as connection:
246+
with self.cursor(connection) as cursor:
247+
for chunked_tbl in chunked_tbls:
248+
cursor.executemany(
249+
insert_sql,
250+
tuple([tuple(row.values()) for row in chunked_tbl]),
251+
)
252+
220253
def _cli_command(self, command: str) -> None:
254+
"""Use the sqlite3 command line utility to run a command.
255+
256+
Certain commands are only possible via the shell utility and
257+
not via the python API, such as the CSV import command.
258+
259+
sqlite3 comes as part of the python stdlib, but the shell
260+
utility is not available by default on all systems. Windows
261+
machines in particular generally don't have the sqlite3
262+
utility unless it is explicitly installed.
263+
"""
221264
db_path = Path(self.db_path).resolve()
222265
full_command = f"sqlite3 {db_path} {command}"
223266
resp = subprocess.run(

parsons/etl/etl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -896,7 +896,7 @@ def concat(self, *tables, missing=None):
896896

897897
self.table = petl.cat(self.table, *petl_tables, missing=missing)
898898

899-
def chunk(self, rows):
899+
def chunk(self, rows: int):
900900
"""
901901
Divides a Parsons table into smaller tables of a specified row count. If the table
902902
cannot be divided evenly, then the final table will only include the remainder.

test/test_databases/test_dbsync.py

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
import os
2-
import unittest
3-
from parsons import Postgres, DBSync, Table, Redshift
4-
from parsons.databases.database_connector import DatabaseConnector
52
import tempfile
6-
from parsons.databases.sqlite import Sqlite
3+
import unittest
74
from abc import ABC
85
from typing import Optional, Type
96

107
from parsons import DBSync, Postgres, Redshift, Table
118
from parsons.databases.database_connector import DatabaseConnector
9+
from parsons.databases.sqlite import Sqlite
1210
from test.test_databases.fakes import FakeDatabase
1311
from test.utils import assert_matching_tables
1412

@@ -46,9 +44,7 @@ def setUp(self):
4644
f"{self.temp_schema}.source_table" if self.temp_schema else "source_table"
4745
)
4846
self.destination_table = (
49-
f"{self.temp_schema}.destination_table"
50-
if self.temp_schema
51-
else "destination_table"
47+
f"{self.temp_schema}.destination_table" if self.temp_schema else "destination_table"
5248
)
5349

5450
# Create source table
@@ -71,9 +67,7 @@ def tearDown(self):
7167

7268
def assert_matching_tables(self) -> None:
7369
source = self.source_db.query(f"SELECT * FROM {self.source_table}")
74-
destination = self.destination_db.query(
75-
f"SELECT * FROM {self.destination_table}"
76-
)
70+
destination = self.destination_db.query(f"SELECT * FROM {self.destination_table}")
7771
assert_matching_tables(source, destination)
7872

7973
def table_sync_full(self, if_exists: str, **kwargs):
@@ -109,19 +103,15 @@ def test_table_sync_full_empty_table(self):
109103
def test_table_sync_full_chunk(self):
110104
# Test chunking in full sync.
111105
self.db_sync.chunk_size = 10
112-
self.db_sync.table_sync_full(
113-
self.source_table, self.destination_table, if_exists="drop"
114-
)
106+
self.db_sync.table_sync_full(self.source_table, self.destination_table, if_exists="drop")
115107
self.assert_matching_tables()
116108

117109
def test_table_sync_incremental(self):
118110
# Test that incremental sync
119111

120112
self.destination_db.copy(self.table1, self.destination_table)
121113
self.source_db.copy(self.table2, self.source_table, if_exists="append")
122-
self.db_sync.table_sync_incremental(
123-
self.source_table, self.destination_table, "pk"
124-
)
114+
self.db_sync.table_sync_incremental(self.source_table, self.destination_table, "pk")
125115
self.assert_matching_tables()
126116

127117
def test_table_sync_incremental_chunk(self):
@@ -130,17 +120,13 @@ def test_table_sync_incremental_chunk(self):
130120
self.db_sync.chunk_size = 10
131121
self.destination_db.copy(self.table1, self.destination_table)
132122
self.source_db.copy(self.table2, self.source_table, if_exists="append")
133-
self.db_sync.table_sync_incremental(
134-
self.source_table, self.destination_table, "pk"
135-
)
123+
self.db_sync.table_sync_incremental(self.source_table, self.destination_table, "pk")
136124

137125
self.assert_matching_tables()
138126

139127
def test_table_sync_incremental_create_destination_table(self):
140128
# Test that an incremental sync works if the destination table does not exist.
141-
self.db_sync.table_sync_incremental(
142-
self.source_table, self.destination_table, "pk"
143-
)
129+
self.db_sync.table_sync_incremental(self.source_table, self.destination_table, "pk")
144130
self.assert_matching_tables()
145131

146132
def test_table_sync_incremental_empty_table(self):
@@ -210,9 +196,7 @@ def initialize_db_connections(self) -> None:
210196

211197
# These tests interact directly with the Postgres database. In order to run, set the
212198
# env to LIVE_TEST='TRUE'.
213-
@unittest.skipIf(
214-
not os.environ.get("LIVE_TEST"), "Skipping because not running live test"
215-
)
199+
@unittest.skipIf(not os.environ.get("LIVE_TEST"), "Skipping because not running live test")
216200
class TestPostgresDBSync(TestDBSync):
217201
db = Postgres
218202
setup_sql = f"""
@@ -226,8 +210,6 @@ class TestPostgresDBSync(TestDBSync):
226210

227211
# These tests interact directly with the Postgres database. In order to run, set the
228212
# env to LIVE_TEST='TRUE'.
229-
@unittest.skipIf(
230-
not os.environ.get("LIVE_TEST"), "Skipping because not running live test"
231-
)
213+
@unittest.skipIf(not os.environ.get("LIVE_TEST"), "Skipping because not running live test")
232214
class TestRedshiftDBSync(TestPostgresDBSync):
233215
db = Redshift

test/test_databases/test_sqlite.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
import tempfile
2+
import unittest
3+
14
from parsons import Table
25
from parsons.databases.sqlite import Sqlite
36
from test.utils import assert_matching_tables
4-
import unittest
5-
import tempfile
67

78

8-
class TestSqliteCreateStatement(unittest.TestCase):
9+
class TestSqlite(unittest.TestCase):
910
def setUp(self):
1011
temp_db = tempfile.mkstemp(suffix=".db")[1]
1112
self.sqlite = Sqlite(temp_db)
@@ -17,6 +18,11 @@ def test_copy(self) -> None:
1718
tbl1_read = self.sqlite.query("select * from tbl1")
1819
assert_matching_tables(self.tbl, tbl1_read)
1920

21+
def test_copy_no_cli(self) -> None:
22+
self.sqlite.copy(self.tbl, "tbl1", if_exists="drop", force_python_sdk=True)
23+
tbl1_read = self.sqlite.query("select * from tbl1")
24+
assert_matching_tables(self.tbl, tbl1_read)
25+
2026
def test_copy_append(self) -> None:
2127
self.sqlite.copy(self.tbl, "tbl1", if_exists="drop")
2228
self.sqlite.copy(self.tbl, "tbl1", if_exists="append")

0 commit comments

Comments
 (0)