diff --git a/migrations/versions/1038c2174f5d_make_case_insensitive_hash_of_query_text.py b/migrations/versions/1038c2174f5d_make_case_insensitive_hash_of_query_text.py index c872a918b9..da41288cb8 100644 --- a/migrations/versions/1038c2174f5d_make_case_insensitive_hash_of_query_text.py +++ b/migrations/versions/1038c2174f5d_make_case_insensitive_hash_of_query_text.py @@ -26,7 +26,7 @@ def change_query_hash(conn, table, query_text_to): table .update() .where(table.c.id == record.id) - .values(query_hash=gen_query_hash(query_text))) + .values(query_hash=gen_query_hash(query_text, {}, False))) def upgrade(): diff --git a/migrations/versions/e4d9a0b448cb_calc_query_hash_with_seperate_params.py b/migrations/versions/e4d9a0b448cb_calc_query_hash_with_seperate_params.py new file mode 100644 index 0000000000..87465e620c --- /dev/null +++ b/migrations/versions/e4d9a0b448cb_calc_query_hash_with_seperate_params.py @@ -0,0 +1,78 @@ +"""calc query_hash with seperate params + +Revision ID: e4d9a0b448cb +Revises: 7205816877ec +Create Date: 2024-05-20 00:48:25.674748 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.sql import table + +from redash.query_runner import get_query_runner +from redash.utils import gen_query_hash + +# revision identifiers, used by Alembic. +revision = "e4d9a0b448cb" +down_revision = "7205816877ec" +branch_labels = None +depends_on = None + + +queries = table( + "queries", + sa.Column("id", sa.Integer, primary_key=True), + sa.Column("query", sa.Text), + sa.Column("query_hash", sa.String(length=32)), + sa.Column("data_source_id", sa.Integer), + sa.Column("options", sa.JSON)) + +data_sources = table( + "data_sources", + sa.Column("id", sa.Integer, primary_key=True), + sa.Column("type", sa.String(length=255))) + + +def load_data_sources(conn): + data_source_type_map = {} + for data_source in conn.execute(data_sources.select()): + data_source_type_map[data_source.id] = data_source.type + return data_source_type_map + + +def upgrade(): + conn = op.get_bind() + + data_source_type_map = load_data_sources(conn) + + for query in conn.execute(queries.select()): + data_source_type = data_source_type_map.get(query.data_source_id) + if not data_source_type: + continue + + query_runner = get_query_runner(data_source_type, {}) + if not query_runner: + print(f"query #{query.id}: can't get query runner '{data_source_type}'") + continue + + parameters_dict = {p["name"]: p.get("value") for p in query.options.get("parameters", [])} + + if query_runner.supports_auto_limit: + should_apply_auto_limit = query.options.get("apply_auto_limit", False) + else: + should_apply_auto_limit = False + + new_query_hash = gen_query_hash(query.query, parameters_dict, should_apply_auto_limit) + + conn.execute( + queries + .update() + .where(queries.c.id == query.id) + .values(query_hash=new_query_hash)) + + +def downgrade(): + # We can't calculate the old query_hash. + # Because the dynamic date(-range) parameters were lost. + # This is the root cause of the problem I am trying to fix. + pass diff --git a/redash/handlers/query_results.py b/redash/handlers/query_results.py index bfc4371d08..ab4ca26aa3 100644 --- a/redash/handlers/query_results.py +++ b/redash/handlers/query_results.py @@ -68,6 +68,8 @@ def run_query(query, parameters, data_source, query_id, should_apply_auto_limit, return error_response(message) + query_hash = data_source.query_runner.gen_query_hash(query.text, parameters, should_apply_auto_limit) + try: query.apply(parameters) except (InvalidParameterError, QueryDetachedFromDataSourceError) as e: @@ -81,7 +83,7 @@ def run_query(query, parameters, data_source, query_id, should_apply_auto_limit, if max_age == 0: query_result = None else: - query_result = models.QueryResult.get_latest(data_source, query_text, max_age) + query_result = models.QueryResult.get_latest(data_source, query_hash, max_age) record_event( current_user.org, @@ -102,6 +104,7 @@ def run_query(query, parameters, data_source, query_id, should_apply_auto_limit, else: job = enqueue_query( query_text, + query_hash, data_source, current_user.id, current_user.is_api_user(), diff --git a/redash/models/__init__.py b/redash/models/__init__.py index 91ddd91ee3..cf374bc7e2 100644 --- a/redash/models/__init__.py +++ b/redash/models/__init__.py @@ -70,7 +70,6 @@ ) from redash.utils import ( base_url, - gen_query_hash, generate_token, json_dumps, json_loads, @@ -343,9 +342,7 @@ def unused(cls, days=7): ) @classmethod - def get_latest(cls, data_source, query, max_age=0): - query_hash = gen_query_hash(query) - + def get_latest(cls, data_source, query_hash, max_age=0): if max_age == -1 and settings.QUERY_RESULTS_EXPIRED_TTL_ENABLED: max_age = settings.QUERY_RESULTS_EXPIRED_TTL @@ -815,12 +812,11 @@ def dashboard_api_keys(self): def update_query_hash(self): should_apply_auto_limit = self.options.get("apply_auto_limit", False) if self.options else False query_runner = self.data_source.query_runner if self.data_source else BaseQueryRunner({}) - query_text = self.query_text parameters_dict = {p["name"]: p.get("value") for p in self.parameters} if self.options else {} if any(parameters_dict): try: - query_text = self.parameterized.apply(parameters_dict).query + self.parameterized.apply(parameters_dict).query except InvalidParameterError as e: logging.info(f"Unable to update hash for query {self.id} because of invalid parameters: {str(e)}") except QueryDetachedFromDataSourceError as e: @@ -828,7 +824,7 @@ def update_query_hash(self): f"Unable to update hash for query {self.id} because of dropdown query {e.query_id} is unattached from datasource" ) - self.query_hash = query_runner.gen_query_hash(query_text, should_apply_auto_limit) + self.query_hash = query_runner.gen_query_hash(self.query_text, parameters_dict, should_apply_auto_limit) @listens_for(Query, "before_insert") diff --git a/redash/query_runner/__init__.py b/redash/query_runner/__init__.py index 10d6d6edf5..77c8c1106c 100644 --- a/redash/query_runner/__init__.py +++ b/redash/query_runner/__init__.py @@ -261,9 +261,10 @@ def supports_auto_limit(self): def apply_auto_limit(self, query_text, should_apply_auto_limit): return query_text - def gen_query_hash(self, query_text, set_auto_limit=False): - query_text = self.apply_auto_limit(query_text, set_auto_limit) - return utils.gen_query_hash(query_text) + def gen_query_hash(self, query_text, parameters={}, set_auto_limit=False): + if not self.supports_auto_limit: + set_auto_limit = False + return utils.gen_query_hash(query_text, parameters, set_auto_limit) class BaseSQLQueryRunner(BaseQueryRunner): diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index 695375f99c..16cbb93c4e 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -27,8 +27,7 @@ def _unlock(query_hash, data_source_id): redis_connection.delete(_job_lock_id(query_hash, data_source_id)) -def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query=None, metadata={}): - query_hash = gen_query_hash(query) +def enqueue_query(query, query_hash, data_source, user_id, is_api_key=False, scheduled_query=None, metadata={}): logger.info("Inserting job for %s with metadata=%s", query_hash, metadata) try_count = 0 job = None @@ -99,7 +98,9 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query if not scheduled_query: enqueue_kwargs["result_ttl"] = settings.JOB_EXPIRY_TIME - job = queue.enqueue(execute_query, query, data_source.id, metadata, **enqueue_kwargs) + job = queue.enqueue( + execute_query, query, data_source.id, metadata, query_hash=query_hash, **enqueue_kwargs + ) logger.info("[%s] Created new job: %s", query_hash, job.id) pipe.set( @@ -146,9 +147,10 @@ def _resolve_user(user_id, is_api_key, query_id): class QueryExecutor: - def __init__(self, query, data_source_id, user_id, is_api_key, metadata, is_scheduled_query): + def __init__(self, query, query_hash, data_source_id, user_id, is_api_key, metadata, is_scheduled_query): self.job = get_current_job() self.query = query + self.query_hash = query_hash or gen_query_hash(query) self.data_source_id = data_source_id self.metadata = metadata self.data_source = self._load_data_source() @@ -162,7 +164,6 @@ def __init__(self, query, data_source_id, user_id, is_api_key, metadata, is_sche # Close DB connection to prevent holding a connection for a long time while the query is executing. models.db.session.close() - self.query_hash = gen_query_hash(self.query) self.is_scheduled_query = is_scheduled_query if self.is_scheduled_query: # Load existing tracker or create a new one if the job was created before code update: @@ -271,10 +272,12 @@ def execute_query( user_id=None, scheduled_query_id=None, is_api_key=False, + query_hash=None, ): try: return QueryExecutor( query, + query_hash, data_source_id, user_id, is_api_key, diff --git a/redash/tasks/queries/maintenance.py b/redash/tasks/queries/maintenance.py index bbfebd053f..4ac0660f78 100644 --- a/redash/tasks/queries/maintenance.py +++ b/redash/tasks/queries/maintenance.py @@ -93,6 +93,7 @@ def refresh_queries(): query_text = _apply_auto_limit(query_text, query) enqueue_query( query_text, + query.query_hash, query.data_source, query.user_id, scheduled_query=query, diff --git a/redash/utils/__init__.py b/redash/utils/__init__.py index a4005b6725..5214997441 100644 --- a/redash/utils/__init__.py +++ b/redash/utils/__init__.py @@ -50,7 +50,7 @@ def slugify(s): return re.sub(r"[^a-z0-9_\-]+", "-", s.lower()) -def gen_query_hash(sql): +def gen_query_hash(sql, parameters={}, auto_limit=False): """Return hash of the given query after stripping all comments, line breaks and multiple spaces. @@ -60,6 +60,10 @@ def gen_query_hash(sql): """ sql = COMMENTS_REGEX.sub("", sql) sql = "".join(sql.split()) + + query_parameters = {"parameters": parameters, "auto_limit": auto_limit} + sql += "\n" + json.dumps(query_parameters, sort_keys=True, separators=(",", ":")) + return hashlib.md5(sql.encode("utf-8")).hexdigest() diff --git a/tests/factories.py b/tests/factories.py index ddb56de38a..5e31f2e977 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -126,7 +126,7 @@ def __call__(self): runtime=1, retrieved_at=utcnow, query_text="SELECT 1", - query_hash=gen_query_hash("SELECT 1"), + query_hash=gen_query_hash("SELECT 1", {}, False), data_source=data_source_factory.create, org_id=1, ) diff --git a/tests/models/test_queries.py b/tests/models/test_queries.py index e914ecd6ca..c813e31c5a 100644 --- a/tests/models/test_queries.py +++ b/tests/models/test_queries.py @@ -464,7 +464,7 @@ def setUp(self): super(TestQueryUpdateLatestResult, self).setUp() self.data_source = self.factory.data_source self.query = "SELECT 1" - self.query_hash = gen_query_hash(self.query) + self.query_hash = gen_query_hash(self.query, {}, False) self.runtime = 123 self.utcnow = utcnow() self.data = {"columns": {}, "rows": []} diff --git a/tests/models/test_query_results.py b/tests/models/test_query_results.py index 16ea2de3d7..baf64cbedd 100644 --- a/tests/models/test_query_results.py +++ b/tests/models/test_query_results.py @@ -12,14 +12,14 @@ def test_get_latest_returns_none_if_not_found(self): def test_get_latest_returns_when_found(self): qr = self.factory.create_query_result() - found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, 60) + found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, 60) self.assertEqual(qr, found_query_result) def test_get_latest_doesnt_return_query_from_different_data_source(self): qr = self.factory.create_query_result() data_source = self.factory.create_data_source() - found_query_result = models.QueryResult.get_latest(data_source, qr.query_text, 60) + found_query_result = models.QueryResult.get_latest(data_source, qr.query_hash, 60) self.assertIsNone(found_query_result) @@ -27,7 +27,7 @@ def test_get_latest_doesnt_return_if_ttl_expired(self): yesterday = utcnow() - datetime.timedelta(days=1) qr = self.factory.create_query_result(retrieved_at=yesterday) - found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, max_age=60) + found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, max_age=60) self.assertIsNone(found_query_result) @@ -35,7 +35,7 @@ def test_get_latest_returns_if_ttl_not_expired(self): yesterday = utcnow() - datetime.timedelta(seconds=30) qr = self.factory.create_query_result(retrieved_at=yesterday) - found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, max_age=120) + found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, max_age=120) self.assertEqual(found_query_result, qr) @@ -44,7 +44,7 @@ def test_get_latest_returns_the_most_recent_result(self): self.factory.create_query_result(retrieved_at=yesterday) qr = self.factory.create_query_result() - found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, 60) + found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, 60) self.assertEqual(found_query_result.id, qr.id) @@ -54,7 +54,7 @@ def test_get_latest_returns_the_last_cached_result_for_negative_ttl(self): yesterday = utcnow() + datetime.timedelta(days=-1) qr = self.factory.create_query_result(retrieved_at=yesterday) - found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, -1) + found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, -1) self.assertEqual(found_query_result.id, qr.id) diff --git a/tests/query_runner/test_basesql_queryrunner.py b/tests/query_runner/test_basesql_queryrunner.py index b54f4936be..4716cc16e4 100644 --- a/tests/query_runner/test_basesql_queryrunner.py +++ b/tests/query_runner/test_basesql_queryrunner.py @@ -1,7 +1,6 @@ import unittest from redash.query_runner import BaseQueryRunner, BaseSQLQueryRunner -from redash.utils import gen_query_hash class TestBaseSQLQueryRunner(unittest.TestCase): @@ -118,16 +117,16 @@ def test_apply_auto_limit_inline_comment(self): def test_gen_query_hash_baseSQL(self): origin_query_text = "select *" - expected_query_text = "select * LIMIT 1000" - base_runner = BaseQueryRunner({}) - self.assertEqual( - base_runner.gen_query_hash(expected_query_text), self.query_runner.gen_query_hash(origin_query_text, True) - ) + base_hash = self.query_runner.gen_query_hash(origin_query_text) + self.assertEqual(base_hash, self.query_runner.gen_query_hash(origin_query_text, {}, False)) + self.assertNotEqual(base_hash, self.query_runner.gen_query_hash(origin_query_text, {}, True)) def test_gen_query_hash_NoneSQL(self): origin_query_text = "select *" base_runner = BaseQueryRunner({}) - self.assertEqual(gen_query_hash(origin_query_text), base_runner.gen_query_hash(origin_query_text, True)) + self.assertEqual( + base_runner.gen_query_hash(origin_query_text), base_runner.gen_query_hash(origin_query_text, {}, True) + ) if __name__ == "__main__": diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index 93d5d73b86..41d4d912b5 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -39,6 +39,7 @@ def test_multiple_enqueue_of_same_query(self, enqueue, _): with Connection(rq_redis_connection): enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False, @@ -47,6 +48,7 @@ def test_multiple_enqueue_of_same_query(self, enqueue, _): ) enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False, @@ -55,6 +57,7 @@ def test_multiple_enqueue_of_same_query(self, enqueue, _): ) enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False, @@ -70,6 +73,7 @@ def test_multiple_enqueue_of_expired_job(self, enqueue, fetch_job): with Connection(rq_redis_connection): enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False, @@ -82,6 +86,7 @@ def test_multiple_enqueue_of_expired_job(self, enqueue, fetch_job): enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False, @@ -97,6 +102,7 @@ def test_reenqueue_during_job_cancellation(self, enqueue, my_fetch_job): with Connection(rq_redis_connection): enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False, @@ -114,6 +120,7 @@ def cancel_job(*args, **kwargs): enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False, @@ -130,6 +137,7 @@ def test_limits_query_time(self, _, enqueue, __): with Connection(rq_redis_connection): enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False, @@ -141,32 +149,37 @@ def test_limits_query_time(self, _, enqueue, __): self.assertEqual(60, kwargs.get("job_timeout")) def test_multiple_enqueue_of_different_query(self, enqueue, _): - query = self.factory.create_query() + query1 = self.factory.create_query(query_text="SELECT 1") + query2 = self.factory.create_query(query_text="SELECT 2") + query3 = self.factory.create_query(query_text="SELECT 3") with Connection(rq_redis_connection): enqueue_query( - query.query_text, - query.data_source, - query.user_id, + query1.query_text, + query1.query_hash, + query1.data_source, + query1.user_id, False, None, - {"Username": "Arik", "query_id": query.id}, + {"Username": "Arik", "query_id": query1.id}, ) enqueue_query( - query.query_text + "2", - query.data_source, - query.user_id, + query2.query_text, + query2.query_hash, + query2.data_source, + query2.user_id, False, None, - {"Username": "Arik", "query_id": query.id}, + {"Username": "Arik", "query_id": query2.id}, ) enqueue_query( - query.query_text + "3", - query.data_source, - query.user_id, + query3.query_text, + query3.query_hash, + query3.data_source, + query3.user_id, False, None, - {"Username": "Arik", "query_id": query.id}, + {"Username": "Arik", "query_id": query3.id}, ) self.assertEqual(3, enqueue.call_count) diff --git a/tests/tasks/test_refresh_queries.py b/tests/tasks/test_refresh_queries.py index 6a306be18d..2e2b10d54f 100644 --- a/tests/tasks/test_refresh_queries.py +++ b/tests/tasks/test_refresh_queries.py @@ -27,6 +27,7 @@ def test_enqueues_outdated_queries_for_sqlquery(self): [ call( query1.query_text + " LIMIT 1000", + query1.query_hash, query1.data_source, query1.user_id, scheduled_query=query1, @@ -34,6 +35,7 @@ def test_enqueues_outdated_queries_for_sqlquery(self): ), call( "select 42 LIMIT 1000", + query2.query_hash, query2.data_source, query2.user_id, scheduled_query=query2, @@ -59,6 +61,7 @@ def test_enqueues_outdated_queries_for_non_sqlquery(self): [ call( query1.query_text, + query1.query_hash, query1.data_source, query1.user_id, scheduled_query=query1, @@ -66,6 +69,7 @@ def test_enqueues_outdated_queries_for_non_sqlquery(self): ), call( query2.query_text, + query2.query_hash, query2.data_source, query2.user_id, scheduled_query=query2, @@ -94,6 +98,7 @@ def test_doesnt_enqueue_outdated_queries_for_paused_data_source_for_sqlquery(sel refresh_queries() add_job_mock.assert_called_with( query.query_text + " LIMIT 1000", + query.query_hash, query.data_source, query.user_id, scheduled_query=query, @@ -122,6 +127,7 @@ def test_doesnt_enqueue_outdated_queries_for_paused_data_source_for_non_sqlquery refresh_queries() add_job_mock.assert_called_with( query.query_text, + query.query_hash, query.data_source, query.user_id, scheduled_query=query, @@ -152,6 +158,7 @@ def test_enqueues_parameterized_queries_for_sqlquery(self): refresh_queries() add_job_mock.assert_called_with( "select 42 LIMIT 1000", + query.query_hash, query.data_source, query.user_id, scheduled_query=query, @@ -184,6 +191,7 @@ def test_enqueues_parameterized_queries_for_non_sqlquery(self): refresh_queries() add_job_mock.assert_called_with( "select 42", + query.query_hash, query.data_source, query.user_id, scheduled_query=query, diff --git a/tests/tasks/test_worker.py b/tests/tasks/test_worker.py index 2497c93275..658d854f01 100644 --- a/tests/tasks/test_worker.py +++ b/tests/tasks/test_worker.py @@ -22,6 +22,7 @@ def test_worker_records_success_metrics(self, incr): with Connection(rq_redis_connection): enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False, @@ -49,6 +50,7 @@ def test_worker_records_failure_metrics(self, _, incr): with Connection(rq_redis_connection): job = enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False, @@ -81,6 +83,7 @@ def test_enqueue_query_records_created_metric(self, incr): with Connection(rq_redis_connection): enqueue_query( query.query_text, + query.query_hash, query.data_source, query.user_id, False,