Skip to content

Commit 8ad45ec

Browse files
authored
Move Reflections to Rest API Calls and Extend Implementation (dremio#256)
### Summary Reflections were being created through the SQL Runner and that was the cause for some issues we were facing. ### Description The current reflections implementation requires sys.reflections access, which is only available to superusers (admins). When creating the reflection with a regular user, it will error. To fix this issue, the creation of the reflections has been moved from queries being ran inside the SQL Runner to Dremio's Rest API calls. The current logic of writing reflections through a custom materialization remains to make sure reflections created under the previous approach still work. On top of this, the behaviour where all reflections would be dropped and then recreated when running a dbt project has been changed to instead first check if a reflection with the same name is already present in the same dataset and, if that's the case, it will update the already existing reflection instead. ### Test Results A total of 11 tests have been added, approaching different situations that may occur when dealing with reflections ### Changelog - Reflections are now handled through the Rest API, allowing non-admin users to also create them - It is now possible to set a custom name for reflections - If a reflection already exists in the dataset with the same name defined in the model, it will be updated instead of creating a new one - New `date_dimensions` parameter was added to the reflection materialization, to set fields that have a `DATE` granularity - Added distribution fields under `distribute_by` - Added partition transformations under `partition_transform` - Defaults to Original/Identity if not defined - year/month/day/hour/bucket(n), truncate(n) - Computations default to `SUM, COUNT` if mapped measure is numeric, `COUNT` if not - `reflections_enabled` adapter option has been renamed to `reflections_metadata_enabled` (requires user privileges to run in dremio) - CI now creates and formats a `Samples` source as it is needed for the reflection tests
1 parent 56af5ce commit 8ad45ec

File tree

16 files changed

+772
-151
lines changed

16 files changed

+772
-151
lines changed

.github/scripts/create_dremio_s3_source.sh renamed to .github/scripts/create_and_format_sources.sh

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,28 @@ curl -s -X PUT "http://localhost:9047/apiv2/source/dbt_test_source" \
5050
-H "Authorization: _dremio$AUTH_TOKEN" \
5151
--data "{\"name\":\"dbt_test_source\",\"config\":{\"credentialType\":\"ACCESS_KEY\",\"accessKey\":\"$MINIO_ROOT_USER\",\"accessSecret\":\"$MINIO_ROOT_PASSWORD\",\"secure\":false,\"externalBucketList\":[],\"enableAsync\":true,\"enableFileStatusCheck\":true,\"rootPath\":\"/\",\"defaultCtasFormat\":\"ICEBERG\",\"propertyList\":[{\"name\":\"fs.s3a.path.style.access\",\"value\":\"true\"},{\"name\":\"fs.s3a.endpoint\",\"value\":\"minio:9000\"},{\"name\":\"dremio.s3.compat\",\"value\":\"true\"}],\"whitelistedBuckets\":[],\"isCachingEnabled\":false,\"maxCacheSpacePct\":100},\"type\":\"S3\",\"metadataPolicy\":{\"deleteUnavailableDatasets\":true,\"autoPromoteDatasets\":false,\"namesRefreshMillis\":3600000,\"datasetDefinitionRefreshAfterMillis\":3600000,\"datasetDefinitionExpireAfterMillis\":10800000,\"authTTLMillis\":86400000,\"updateMode\":\"PREFETCH_QUERIED\"}}"
5252

53-
echo "S3 Source created in Dremio."
53+
echo "S3 Source created in Dremio."
54+
55+
echo "Creating the Samples source in Dremio..."
56+
curl -s -X PUT "http://localhost:9047/apiv2/source/Samples" \
57+
-H "Content-Type: application/json" \
58+
-H "Authorization: _dremio$AUTH_TOKEN" \
59+
--data-raw "{\"name\":\"Samples\",\"config\":{\"externalBucketList\":[\"samples.dremio.com\"],\"credentialType\":\"NONE\",\"secure\":false,\"propertyList\":[]},\"name\":\"Samples\",\"accelerationRefreshPeriod\":3600000,\"accelerationGracePeriod\":10800000,\"accelerationNeverRefresh\":true,\"accelerationNeverExpire\":true,\"accelerationActivePolicyType\":\"PERIOD\",\"accelerationRefreshSchedule\":\"0 0 8 * * *\",\"accelerationRefreshOnDataChanges\":false,\"type\":\"S3\"}"
60+
61+
echo "Samples source created in Dremio."
62+
63+
echo "Formatting SF_incidents2016..."
64+
curl -s -X PUT "http://localhost:9047/apiv2/source/Samples/file_format/samples.dremio.com/SF_incidents2016.json" \
65+
-H "Content-Type: application/json" \
66+
-H "Authorization: _dremio$AUTH_TOKEN" \
67+
--data-raw "{\"type\":\"JSON\"}"
68+
69+
echo "SF_incidents2016 formatted in Dremio."
70+
71+
echo "Formatting NYC-taxi-trips..."
72+
curl -s -X PUT "http://localhost:9047/apiv2/source/Samples/folder_format/samples.dremio.com/NYC-taxi-trips" \
73+
-H "Content-Type: application/json" \
74+
-H "Authorization: _dremio$AUTH_TOKEN" \
75+
--data-raw "{\"ignoreOtherFileFormats\":false,\"type\":\"Parquet\"}"
76+
77+
echo "NYC-taxi-trips formatted in Dremio."

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ jobs:
4848
- name: Create MinIO bucket
4949
run: bash .github/scripts/create_minio_bucket.sh
5050

51-
- name: Create Dremio S3 Source
52-
run: bash .github/scripts/create_dremio_s3_source.sh
51+
- name: Create and Format Sources
52+
run: bash .github/scripts/create_and_format_sources.sh
5353

5454
- name: Install Dependencies
5555
uses: actions/setup-python@v4

CHANGELOG.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,18 @@
33
## Changes
44

55
- Added [DremioRestClient](dbt/adapters/dremio/api/rest/client.py) to isolate all Dremio API calls inside one class
6+
- [#256](https://github.com/dremio/dbt-dremio/pull/256) Reflections are now handled through the Rest API
7+
- Non-admin users are now able to use reflections
8+
- It is now possible to set a custom name for reflections
9+
- If a reflection already exists in the dataset with the same name defined in the model, it will be updated instead of creating a new one
10+
- New `date_dimensions` parameter was added to the reflection materialization, to set fields that have a `DATE` granularity
11+
- Added Distribution Fields under `distribute_by`
12+
- Added partition transformations under `partition_transform`
13+
- Defaults to Original/Identity if not defined
14+
- `year/month/day/hour/bucket(n)/truncate(n)`
15+
- Computations default to `SUM, COUNT` if mapped measure is numeric, `COUNT` if not
16+
- `reflections_enabled` adapter option has been renamed to `reflections_metadata_enabled` (requires user privileges to run in dremio)
617
- Removing duplicated macros array_append, array_concat as Dremio already has SQL functions analogues.
7-
818
## Dependency
919

1020
- [#222](https://github.com/dremio/dbt-dremio/issues/222) Upgrade dbt-core to 1.8.8 and dbt-tests-adapter to 1.8.0

dbt/adapters/dremio/api/rest/client.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
# limitations under the License.
1616

1717

18-
1918
import requests
2019

2120
from dbt.adapters.dremio.api.authentication import DremioPatAuthentication
2221
from dbt.adapters.dremio.api.parameters import Parameters
23-
from dbt.adapters.dremio.api.rest.utils import _post, _get, _delete
22+
from dbt.adapters.dremio.api.rest.utils import _post, _get, _put, _delete
2423
from dbt.adapters.dremio.api.rest.url_builder import UrlBuilder
2524

2625
from dbt.adapters.events.logging import AdapterLogger
@@ -132,4 +131,30 @@ def delete_catalog(self, cid):
132131
url,
133132
self._parameters.authentication.get_headers(),
134133
ssl_verify=self._parameters.authentication.verify_ssl,
135-
)
134+
)
135+
136+
def get_reflections(self, dataset_id):
137+
url = UrlBuilder.get_reflection_url(self._parameters, dataset_id)
138+
return _get(
139+
url,
140+
self._parameters.authentication.get_headers(),
141+
ssl_verify=self._parameters.authentication.verify_ssl,
142+
)
143+
144+
def create_reflection(self, payload):
145+
url = UrlBuilder.create_reflection_url(self._parameters)
146+
return _post(
147+
url,
148+
self._parameters.authentication.get_headers(),
149+
json=payload,
150+
ssl_verify=self._parameters.authentication.verify_ssl,
151+
)
152+
153+
def update_reflection(self, reflection_id, payload):
154+
url = UrlBuilder.update_reflection_url(self._parameters, reflection_id)
155+
return _put(
156+
url,
157+
self._parameters.authentication.get_headers(),
158+
json=payload,
159+
ssl_verify=self._parameters.authentication.verify_ssl,
160+
)

dbt/adapters/dremio/api/rest/entities/__init__.py

Whitespace-only changes.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
from enum import Enum
2+
3+
4+
class TransformType(Enum):
5+
YEAR = "YEAR"
6+
MONTH = "MONTH"
7+
DAY = "DAY"
8+
HOUR = "HOUR"
9+
IDENTITY = "IDENTITY"
10+
BUCKET = "BUCKET"
11+
TRUNCATE = "TRUNCATE"
12+
13+
@classmethod
14+
def from_string(cls, transform_str):
15+
transform_str = transform_str.upper()
16+
17+
if transform_str.startswith("BUCKET("):
18+
return cls.BUCKET
19+
elif transform_str.startswith("TRUNCATE("):
20+
return cls.TRUNCATE
21+
22+
try:
23+
return cls(transform_str)
24+
except ValueError:
25+
return cls.IDENTITY
26+
27+
def to_transform(self, raw_str):
28+
if self in (
29+
TransformType.YEAR,
30+
TransformType.MONTH,
31+
TransformType.DAY,
32+
TransformType.HOUR,
33+
TransformType.IDENTITY
34+
):
35+
return {"type": self.value}
36+
37+
if self == TransformType.BUCKET:
38+
bucket_count = int(raw_str.split("(")[1].split(")")[0])
39+
return {
40+
"type": "BUCKET",
41+
"bucketTransform": {"bucketCount": bucket_count},
42+
}
43+
44+
if self == TransformType.TRUNCATE:
45+
truncate_length = int(raw_str.split("(")[1].split(")")[0])
46+
return {
47+
"type": "TRUNCATE",
48+
"truncateTransform": {"truncateLength": truncate_length},
49+
}
50+
51+
return {"type": TransformType.IDENTITY.value}
52+
53+
54+
# https://docs.dremio.com/current/reference/api/reflections/
55+
class ReflectionEntity:
56+
def __init__(self, name, reflection_type, dataset_id, display_fields, dimensions, date_dimensions, measures,
57+
computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by,
58+
arrow_cache):
59+
self.__name = name
60+
self.__type = reflection_type
61+
self.__dataset_id = dataset_id
62+
self.__partition_method = partition_method.upper()
63+
self.__display_fields = display_fields
64+
self.__dimensions_fields = dimensions
65+
self.__date_dimensions_fields = date_dimensions
66+
self.__measure_fields = measures
67+
self.__computation_fields = computations
68+
self.__partition_by_fields = partition_by
69+
self.__partition_transformations = partition_transform
70+
self.__partition_method = partition_method
71+
self.__distribute_by_fields = distribute_by
72+
self.__local_sort_fields = localsort_by
73+
self.__arrow_cache = arrow_cache
74+
75+
def buildDisplayFields(self):
76+
return [{"name": field} for field in self.__display_fields]
77+
78+
def buildDimensionFields(self):
79+
return [{"name": field} for field in self.__dimensions_fields]
80+
81+
def buildDateFields(self):
82+
return [{"name": date_dimension, "granularity": "DATE"} for date_dimension in self.__date_dimensions_fields]
83+
84+
def buildMeasureFields(self):
85+
return [{"name": measure, "measureTypeList": computation.split(',')} for
86+
measure, computation in zip(self.__measure_fields, self.__computation_fields)]
87+
88+
def buildPartitionFields(self):
89+
if not self.__partition_transformations:
90+
self.__partition_transformations = ["IDENTITY"] * len(self.__partition_by_fields)
91+
92+
partition_fields = []
93+
94+
for partition, transform in zip(self.__partition_by_fields, self.__partition_transformations):
95+
transform_type = TransformType.from_string(transform)
96+
partition_fields.append({
97+
"name": partition,
98+
"transform": transform_type.to_transform(transform)
99+
})
100+
101+
return partition_fields
102+
103+
def buildDistributionFields(self):
104+
return [{"name": distribute} for distribute in self.__distribute_by_fields]
105+
106+
def buildSortFields(self):
107+
return [{"name": sort} for sort in self.__local_sort_fields]
108+
109+
def build_payload(self):
110+
payload = {
111+
"type": self.__type,
112+
"name": self.__name,
113+
"datasetId": self.__dataset_id,
114+
"enabled": True,
115+
"arrowCachingEnabled": self.__arrow_cache,
116+
"partitionDistributionStrategy": self.__partition_method.upper(),
117+
"entityType": "reflection"
118+
}
119+
120+
if self.__display_fields:
121+
payload["displayFields"] = self.buildDisplayFields()
122+
123+
if self.__dimensions_fields:
124+
payload["dimensionFields"] = self.buildDimensionFields()
125+
126+
if self.__date_dimensions_fields:
127+
payload["dateFields"] = self.buildDateFields()
128+
129+
if self.__measure_fields and self.__computation_fields:
130+
payload["measureFields"] = self.buildMeasureFields()
131+
132+
if self.__partition_by_fields:
133+
payload["partitionFields"] = self.buildPartitionFields()
134+
135+
if self.__distribute_by_fields:
136+
payload["distributionFields"] = self.buildDistributionFields()
137+
138+
if self.__local_sort_fields:
139+
payload["sortFields"] = self.buildSortFields()
140+
141+
return payload

dbt/adapters/dremio/api/rest/url_builder.py

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ class UrlBuilder:
3434
SOFTWARE_CATALOG_ENDPOINT = "/api/v3/catalog"
3535
CLOUD_CATALOG_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/catalog"
3636

37+
SOFTWARE_REFLECTIONS_ENDPOINT = "/api/v3/reflection"
38+
CLOUD_REFLECTIONS_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/reflection"
39+
40+
SOFTWARE_DATASET_ENDPOIT = "/api/v3/dataset"
41+
CLOUD_DATASET_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/dataset"
42+
3743
# https://docs.dremio.com/software/rest-api/jobs/get-job/
3844
OFFSET_DEFAULT = 0
3945
LIMIT_DEFAULT = 100
@@ -56,10 +62,10 @@ def sql_url(cls, parameters: Parameters):
5662
def job_status_url(cls, parameters: Parameters, job_id):
5763
if type(parameters) is CloudParameters:
5864
return (
59-
parameters.base_url
60-
+ UrlBuilder.CLOUD_JOB_ENDPOINT.format(parameters.cloud_project_id)
61-
+ "/"
62-
+ job_id
65+
parameters.base_url
66+
+ UrlBuilder.CLOUD_JOB_ENDPOINT.format(parameters.cloud_project_id)
67+
+ "/"
68+
+ job_id
6369
)
6470
return parameters.base_url + UrlBuilder.SOFTWARE_JOB_ENDPOINT + "/" + job_id
6571

@@ -75,11 +81,11 @@ def job_cancel_url(cls, parameters: Parameters, job_id):
7581

7682
@classmethod
7783
def job_results_url(
78-
cls,
79-
parameters: Parameters,
80-
job_id,
81-
offset=OFFSET_DEFAULT,
82-
limit=LIMIT_DEFAULT,
84+
cls,
85+
parameters: Parameters,
86+
job_id,
87+
offset=OFFSET_DEFAULT,
88+
limit=LIMIT_DEFAULT,
8389
):
8490
url_path = parameters.base_url
8591
if type(parameters) is CloudParameters:
@@ -139,3 +145,41 @@ def catalog_item_by_path_url(cls, parameters: Parameters, path_list):
139145
joined_path_str = "/".join(quoted_path_list).replace('"', "")
140146
endpoint = f"/by-path/{joined_path_str}"
141147
return url_path + endpoint
148+
149+
@classmethod
150+
def create_reflection_url(cls, parameters: Parameters):
151+
url_path = parameters.base_url
152+
if type(parameters) is CloudParameters:
153+
url_path += UrlBuilder.CLOUD_REFLECTIONS_ENDPOINT.format(
154+
parameters.cloud_project_id
155+
)
156+
else:
157+
url_path += UrlBuilder.SOFTWARE_REFLECTIONS_ENDPOINT
158+
159+
return url_path
160+
161+
@classmethod
162+
def update_reflection_url(cls, parameters: Parameters, dataset_id):
163+
url_path = parameters.base_url
164+
if type(parameters) is CloudParameters:
165+
url_path += UrlBuilder.CLOUD_REFLECTIONS_ENDPOINT.format(
166+
parameters.cloud_project_id
167+
)
168+
else:
169+
url_path += UrlBuilder.SOFTWARE_REFLECTIONS_ENDPOINT
170+
171+
endpoint = "/{}".format(dataset_id)
172+
return url_path + endpoint
173+
174+
@classmethod
175+
def get_reflection_url(cls, parameters: Parameters, dataset_id):
176+
url_path = parameters.base_url
177+
if type(parameters) is CloudParameters:
178+
url_path += UrlBuilder.CLOUD_DATASET_ENDPOINT.format(
179+
parameters.cloud_project_id
180+
)
181+
else:
182+
url_path += UrlBuilder.SOFTWARE_DATASET_ENDPOIT
183+
184+
endpoint = "/{}/reflection".format(dataset_id)
185+
return url_path + endpoint

dbt/adapters/dremio/api/rest/utils.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import json as jsonlib
3232
from requests.exceptions import HTTPError
3333

34-
3534
from dbt.adapters.events.logging import AdapterLogger
3635

3736
logger = AdapterLogger("dremio")
@@ -45,12 +44,12 @@ def _get(url, request_headers, details="", ssl_verify=True):
4544

4645

4746
def _post(
48-
url,
49-
request_headers=None,
50-
json=None,
51-
details="",
52-
ssl_verify=True,
53-
timeout=None,
47+
url,
48+
request_headers=None,
49+
json=None,
50+
details="",
51+
ssl_verify=True,
52+
timeout=None,
5453
):
5554
if isinstance(json, str):
5655
json = jsonlib.loads(json)
@@ -64,6 +63,13 @@ def _post(
6463
return _check_error(response, details)
6564

6665

66+
def _put(url, request_headers, json=None, details="", ssl_verify=True):
67+
response = session.put(
68+
url, headers=request_headers, verify=ssl_verify, json=json
69+
)
70+
return _check_error(response, details)
71+
72+
6773
def _delete(url, request_headers, details="", ssl_verify=True):
6874
response = session.delete(url, headers=request_headers, verify=ssl_verify)
6975
return _check_error(response, details)
@@ -148,5 +154,3 @@ def _check_error(response, details=""):
148154
"Gateway Timeout:" + details, error, response
149155
)
150156
raise DremioException("Unknown error", error)
151-
152-

0 commit comments

Comments
 (0)