Skip to content

Commit

Permalink
Merge pull request #61 from mdesmet/feature/types
Browse files Browse the repository at this point in the history
Use native prepared statements from trino-python-client
  • Loading branch information
hovaesco authored May 9, 2022
2 parents 95e613f + 9a48ac9 commit 70e4449
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 33 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
- Add support for `on_table_exists` in table materialization ([#26](https://github.com/starburstdata/dbt-trino/issues/26), [#54](https://github.com/starburstdata/dbt-trino/pull/54))
- Adds support for OAuth2 authentication using web browser ([#40](https://github.com/starburstdata/dbt-trino/issues/40), [#41](https://github.com/starburstdata/dbt-trino/pull/41))
- Add `view_security` to define security mode for views ([#65](https://github.com/starburstdata/dbt-trino/pull/65))
- Support for dbt source freshness ([#28](https://github.com/starburstdata/dbt-trino/issues/28), [#61](https://github.com/starburstdata/dbt-trino/pull/61))

### Fixes
- Add support for future versions of dbt-core ([#55](https://github.com/starburstdata/dbt-trino/issues/55), [#65](https://github.com/starburstdata/dbt-trino/pull/65))

### Under the hood
- Add PostgreSQL docker container for testing ([#66](https://github.com/starburstdata/dbt-trino/issues/66), [#67](https://github.com/starburstdata/dbt-trino/pull/67))
- Migrate to new adapter testing framework ([#57](https://github.com/starburstdata/dbt-trino/issues/57), [#65](https://github.com/starburstdata/dbt-trino/pull/65))

- Implement trino-python-client's prepared statements using `experimental_python_types` ([#61](https://github.com/starburstdata/dbt-trino/pull/61))

Contributors:
* [@hovaesco](https://github.com/hovaesco) ([#54](https://github.com/starburstdata/dbt-trino/pull/54), [#65](https://github.com/starburstdata/dbt-trino/pull/65), [#67](https://github.com/starburstdata/dbt-trino/pull/67))
* [@smith-m](https://github.com/smith-m) ([#65](https://github.com/starburstdata/dbt-trino/pull/65))
* [@mdesmet](https://github.com/mdesmet) ([#41](https://github.com/starburstdata/dbt-trino/pull/41))
* [@mdesmet](https://github.com/mdesmet) ([#41](https://github.com/starburstdata/dbt-trino/pull/41), [#61](https://github.com/starburstdata/dbt-trino/pull/61))

## dbt-trino 1.0.3 (March 2, 2022)

Expand Down
48 changes: 31 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,25 @@ $ pip install dbt-trino

A dbt profile can be configured to run against Trino using the following configuration:

| Option | Description | Required? | Example |
|----------------------|----------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------|----------------------------------|
| method | The Trino authentication method to use | Optional (default is `none`, supported methods are `ldap`, `kerberos`, `jwt`, `oauth` or `certificate`) | `none` or `kerberos` |
| user | Username for authentication | Optional (required if `method` is `none`, `ldap` or `kerberos`) | `commander` |
| password | Password for authentication | Optional (required if `method` is `ldap` or `kerberos`) | `none` or `abc123` |
| jwt_token | JWT token for authentication | Optional (required if `method` is `jwt`) | `none` or `abc123` |
| client_certificate | Path to client certificate to be used for certificate based authentication | Optional (required if `method` is `certificate`) | `/tmp/tls.crt` |
| client_private_key | Path to client private key to be used for certificate based authentication | Optional (required if `method` is `certificate`) | `/tmp/tls.key` |
| http_headers | HTTP Headers to send alongside requests to Trino, specified as a yaml dictionary of (header, value) pairs. | Optional | `X-Trino-Client-Info: dbt-trino` |
| http_scheme | The HTTP scheme to use for requests to Trino | Optional (default is `http`, or `https` for `method: kerberos`, `ldap` or `jwt`) | `https` or `http` |
| cert | The full path to a certificate file for authentication with trino | Optional | |
| session_properties | Sets Trino session properties used in the connection | Optional | `query_max_run_time: 5d` |
| database | Specify the database to build models into | Required | `analytics` |
| schema | Specify the schema to build models into. Note: it is not recommended to use upper or mixed case schema names | Required | `public` |
| host | The hostname to connect to | Required | `127.0.0.1` |
| port | The port to connect to the host on | Required | `8080` |
| threads | How many threads dbt should use | Optional (default is `1`) | `8` |
| Option | Description | Required? | Example |
|--------------------------------|--------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------|----------------------------------|
| method | The Trino authentication method to use | Optional (default is `none`, supported methods are `ldap`, `kerberos`, `jwt`, `oauth` or `certificate`) | `none` or `kerberos` |
| user | Username for authentication | Optional (required if `method` is `none`, `ldap` or `kerberos`) | `commander` |
| password | Password for authentication | Optional (required if `method` is `ldap` or `kerberos`) | `none` or `abc123` |
| jwt_token | JWT token for authentication | Optional (required if `method` is `jwt`) | `none` or `abc123` |
| client_certificate | Path to client certificate to be used for certificate based authentication | Optional (required if `method` is `certificate`) | `/tmp/tls.crt` |
| client_private_key | Path to client private key to be used for certificate based authentication | Optional (required if `method` is `certificate`) | `/tmp/tls.key` |
| http_headers | HTTP Headers to send alongside requests to Trino, specified as a yaml dictionary of (header, value) pairs. | Optional | `X-Trino-Client-Info: dbt-trino` |
| http_scheme | The HTTP scheme to use for requests to Trino | Optional (default is `http`, or `https` for `method: kerberos`, `ldap` or `jwt`) | `https` or `http` |
| cert | The full path to a certificate file for authentication with trino | Optional | |
| session_properties | Sets Trino session properties used in the connection | Optional | `query_max_run_time: 5d` |
| database | Specify the database to build models into | Required | `analytics` |
| schema | Specify the schema to build models into. Note: it is not recommended to use upper or mixed case schema names | Required | `public` |
| host | The hostname to connect to | Required | `127.0.0.1` |
| port | The port to connect to the host on | Required | `8080` |
| threads | How many threads dbt should use | Optional (default is `1`) | `8` |
| prepared_statements_enabled | Enable usage of Trino prepared statements (used in `dbt seed` commands) | Optional (default is `true`) | `true` or `false` |


**Example profiles.yml entry:**

Expand Down Expand Up @@ -292,6 +294,18 @@ dbt docs serve --port 8081 # starts local server (by default docs server runs on
By default, all dbt models are built in the schema specified in your target. But sometimes you wish to build some of the models in a custom schema. In order to do so, use the `schema` configuration key to specify a custom schema for a model. See [here](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/using-custom-schemas) for the documentation. It is important to note that by default, dbt will generate the schema name for a model by concatenating the custom schema to the target schema, as in: `<target_schema>_<custom_schema>`.


#### Prepared statements

The `dbt seed` feature uses [Trino's prepared statements](https://trino.io/docs/current/sql/prepare.html).

Python's http client has a hardcoded limit of 65536 bytes for a header line.

When executing a prepared statement with a large number of parameters, you might encounter following error:

`requests.exceptions.ConnectionError: ('Connection aborted.', LineTooLong('got more than 65536 bytes when reading header line'))`.

The prepared statements can be disabled by setting `prepared_statements_enabled` to `true` in your dbt profile (reverting back to the legacy behavior using Python string interpolation). This flag may be removed in later releases.

## Development

### Running tests
Expand Down
41 changes: 32 additions & 9 deletions dbt/adapters/trino/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


logger = AdapterLogger("Trino")
PREPARED_STATEMENTS_ENABLED_DEFAULT = True


class HttpScheme(Enum):
Expand Down Expand Up @@ -75,7 +76,16 @@ def unique_field(self):
return self.host

def _connection_keys(self):
return ("method", "host", "port", "user", "database", "schema", "cert")
return (
"method",
"host",
"port",
"user",
"database",
"schema",
"cert",
"prepared_statements_enabled"
)

@abstractmethod
def trino_auth() -> Optional[trino.auth.Authentication]:
Expand All @@ -91,6 +101,7 @@ class TrinoNoneCredentials(TrinoCredentials):
http_scheme: HttpScheme = HttpScheme.HTTP
http_headers: Optional[Dict[str, str]] = None
session_properties: Optional[Dict[str, Any]] = None
prepared_statements_enabled: bool = PREPARED_STATEMENTS_ENABLED_DEFAULT

@property
def method(self):
Expand All @@ -109,6 +120,7 @@ class TrinoCertificateCredentials(TrinoCredentials):
cert: Optional[str] = None
http_headers: Optional[Dict[str, str]] = None
session_properties: Optional[Dict[str, Any]] = None
prepared_statements_enabled: bool = PREPARED_STATEMENTS_ENABLED_DEFAULT

@property
def http_scheme(self):
Expand Down Expand Up @@ -138,6 +150,7 @@ class TrinoLdapCredentials(TrinoCredentials):
cert: Optional[str] = None
http_headers: Optional[Dict[str, str]] = None
session_properties: Optional[Dict[str, Any]] = None
prepared_statements_enabled: bool = PREPARED_STATEMENTS_ENABLED_DEFAULT

@property
def http_scheme(self):
Expand All @@ -163,6 +176,7 @@ class TrinoKerberosCredentials(TrinoCredentials):
cert: Optional[str] = None
http_headers: Optional[Dict[str, str]] = None
session_properties: Optional[Dict[str, Any]] = None
prepared_statements_enabled: bool = PREPARED_STATEMENTS_ENABLED_DEFAULT

@property
def http_scheme(self):
Expand All @@ -184,6 +198,7 @@ class TrinoJwtCredentials(TrinoCredentials):
cert: Optional[str] = None
http_headers: Optional[Dict[str, str]] = None
session_properties: Optional[Dict[str, Any]] = None
prepared_statements_enabled: bool = PREPARED_STATEMENTS_ENABLED_DEFAULT

@property
def http_scheme(self):
Expand All @@ -208,6 +223,7 @@ class TrinoOauthCredentials(TrinoCredentials):
cert: Optional[str] = None
http_headers: Optional[Dict[str, str]] = None
session_properties: Optional[Dict[str, Any]] = None
prepared_statements_enabled: bool = PREPARED_STATEMENTS_ENABLED_DEFAULT
OAUTH = trino.auth.OAuth2Authentication(
redirect_auth_url_handler=trino.auth.WebBrowserRedirectHandler()
)
Expand Down Expand Up @@ -237,13 +253,14 @@ class ConnectionWrapper(object):
"""

def __init__(self, handle):
def __init__(self, handle, prepared_statements_enabled):
self.handle = handle
self._cursor = None
self._fetch_result = None
self._prepared_statements_enabled = prepared_statements_enabled

def cursor(self):
self._cursor = self.handle.cursor()
self._cursor = self.handle.cursor(experimental_python_types=True)
return self

def cancel(self):
Expand Down Expand Up @@ -286,14 +303,17 @@ def fetchone(self):
return None

def execute(self, sql, bindings=None):

if bindings is not None:
# trino doesn't actually pass bindings along so we have to do the
# escaping and formatting ourselves
if not self._prepared_statements_enabled and bindings is not None:
# DEPRECATED: by default prepared statements are used.
# Code is left as an escape hatch if prepared statements
# are failing.
bindings = tuple(self._escape_value(b) for b in bindings)
sql = sql % bindings

result = self._cursor.execute(sql)
result = self._cursor.execute(sql)
else:
result = self._cursor.execute(sql, params=bindings)

self._fetch_result = self._cursor.fetchall()
return result

Expand Down Expand Up @@ -372,7 +392,10 @@ def open(cls, connection):
)
trino_conn._http_session.verify = credentials.cert
connection.state = "open"
connection.handle = ConnectionWrapper(trino_conn)
connection.handle = ConnectionWrapper(
trino_conn,
credentials.prepared_statements_enabled
)
return connection

@classmethod
Expand Down
8 changes: 8 additions & 0 deletions dbt/include/trino/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,11 @@
{% macro trino__current_timestamp() -%}
CURRENT_TIMESTAMP
{%- endmacro %}

{% macro trino__get_binding_char() %}
{%- if target.prepared_statements_enabled|as_bool -%}
{{ return('?') }}
{%- else -%}
{{ return('%s') }}
{%- endif -%}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
('test','abc',CHAR 'd',CHAR 'ghi',VARBINARY '65683F',JSON '{"k1":1,"k2":23,"k3":456}'),(NULL,NULL,NULL,NULL,NULL,NULL)

Usually seed row's values through agate_table's data type detection and come through as python types, in this case typing is
handled in `ConnectionWrapper._escape_value`. However dbt also allows you to override the data types of the created table
handled by using bindings in `ConnectionWrapper.execute`. However dbt also allows you to override the data types of the created table
through setting `column_types`, this case is handled here where we have the type information of the seed table.
#}

Expand Down
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pytest
dbt-tests-adapter
mock>=1.3.0
flake8>=3.5.0
pytz==2017.2
pytz
bumpversion==0.5.3
tox==3.2.0
ipdb
Expand Down
25 changes: 22 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import pytest
import trino

# Import the fuctional fixtures as a plugin
# Import the functional fixtures as a plugin
# Note: fixtures with session scope need to be local

pytest_plugins = ["dbt.tests.fixtures.project"]

# The profile dictionary, used to write out profiles.yml
@pytest.fixture(scope="class")
def dbt_profile_target():
return {
def dbt_profile_target(request):
target = {
'type': 'trino',
'method': 'none',
'threads': 1,
Expand All @@ -19,3 +20,21 @@ def dbt_profile_target():
'catalog': 'memory',
'schema': 'default'
}

marker = request.node.get_closest_marker("prepared_statements_disabled")
if marker:
target.update({
'prepared_statements_enabled': False
})

return target

@pytest.fixture(scope="class")
def trino_connection(dbt_profile_target):
return trino.dbapi.connect(
host=dbt_profile_target['host'],
port=dbt_profile_target['port'],
user=dbt_profile_target['user'],
catalog=dbt_profile_target['catalog'],
schema=dbt_profile_target['schema']
)
Loading

0 comments on commit 70e4449

Please sign in to comment.