diff --git a/crates/catalog-unity/src/lib.rs b/crates/catalog-unity/src/lib.rs index 816376eba8..c47d029332 100644 --- a/crates/catalog-unity/src/lib.rs +++ b/crates/catalog-unity/src/lib.rs @@ -181,6 +181,11 @@ pub enum UnityCatalogConfigKey { /// - `azure_use_azure_cli` /// - `use_azure_cli` UseAzureCli, + + /// Allow http url (e.g. http://localhost:8080/api/2.1/...) + /// Supported keys: + /// - `unity_allow_http_url` + AllowHttpUrl, } impl FromStr for UnityCatalogConfigKey { @@ -224,6 +229,7 @@ impl FromStr for UnityCatalogConfigKey { "workspace_url" | "unity_workspace_url" | "databricks_workspace_url" => { Ok(UnityCatalogConfigKey::WorkspaceUrl) } + "allow_http_url" | "unity_allow_http_url" => Ok(UnityCatalogConfigKey::AllowHttpUrl), _ => Err(DataCatalogError::UnknownConfigKey { catalog: "unity", key: s.to_string(), @@ -237,6 +243,7 @@ impl AsRef for UnityCatalogConfigKey { fn as_ref(&self) -> &str { match self { UnityCatalogConfigKey::AccessToken => "unity_access_token", + UnityCatalogConfigKey::AllowHttpUrl => "unity_allow_http_url", UnityCatalogConfigKey::AuthorityHost => "unity_authority_host", UnityCatalogConfigKey::AuthorityId => "unity_authority_id", UnityCatalogConfigKey::ClientId => "unity_client_id", @@ -289,6 +296,9 @@ pub struct UnityCatalogBuilder { /// When set to true, azure cli has to be used for acquiring access token use_azure_cli: bool, + /// When set to true, http will be allowed in the catalog url + allow_http_url: bool, + /// Retry config retry_config: RetryConfig, @@ -311,6 +321,9 @@ impl UnityCatalogBuilder { ) -> DataCatalogResult { match UnityCatalogConfigKey::from_str(key.as_ref())? { UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()), + UnityCatalogConfigKey::AllowHttpUrl => { + self.allow_http_url = str_is_truthy(&value.into()) + } UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()), UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()), UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()), @@ -407,6 +420,44 @@ impl UnityCatalogBuilder { self } + /// Returns true if table uri is a valid Unity Catalog URI, false otherwise. + pub fn is_unity_catalog_uri(table_uri: &str) -> bool { + table_uri.starts_with("uc://") + } + + /// Returns the storage location and temporary token to be used with the + /// Unity Catalog table. + pub async fn get_uc_location_and_token(table_uri: &str) -> Result<(String, String), UnityCatalogError> { + let uri_parts: Vec<&str> = table_uri[5..].split('.').collect(); + if uri_parts.len() != 3 { + panic!("Invalid Unity Catalog URI: {}", table_uri); + } + + let catalog_id = uri_parts[0]; + let database_name = uri_parts[1]; + let table_name = uri_parts[2]; + + let unity_catalog = match UnityCatalogBuilder::from_env().build() { + Ok(uc) => uc, + Err(_e) => panic!("Unable to build Unity Catalog."), + }; + let storage_location = + match unity_catalog.get_table_storage_location( + Some(catalog_id.to_string()), + database_name, + table_name, + ).await { + Ok(s) => s, + Err(_e) => panic!("Unable to find the table's storage location."), + }; + let token = unity_catalog.get_credential().await?; + let credential = match token.to_str() { + Ok(header_str) => header_str.to_string(), + Err(_e) => panic!("Unable to get string value from Unity Catalog token."), + }; + Ok((storage_location, credential)) + } + fn get_credential_provider(&self) -> Option { if let Some(token) = self.bearer_token.as_ref() { return Some(CredentialProvider::BearerToken(token.clone())); @@ -451,7 +502,12 @@ impl UnityCatalogBuilder { .trim_end_matches('/') .to_string(); - let client = self.client_options.client()?; + let client_options = if self.allow_http_url { + self.client_options.with_allow_http(true) + } else { + self.client_options + }; + let client = client_options.client()?; Ok(UnityCatalog { client, @@ -612,7 +668,7 @@ impl UnityCatalog { self.catalog_url(), catalog_id.as_ref(), database_name.as_ref(), - table_name.as_ref() + table_name.as_ref(), )) .header(AUTHORIZATION, token) .send() @@ -661,6 +717,7 @@ mod tests { use crate::models::tests::{GET_SCHEMA_RESPONSE, GET_TABLE_RESPONSE, LIST_SCHEMAS_RESPONSE}; use crate::models::*; use crate::UnityCatalogBuilder; + use deltalake_core::DataCatalog; use httpmock::prelude::*; #[tokio::test] @@ -716,5 +773,11 @@ mod tests { .await .unwrap(); assert!(matches!(get_table_response, GetTableResponse::Success(_))); + + let storage_location = client.get_table_storage_location( + Some("catalog_name".to_string()), "schema_name", "table_name", + ).await + .unwrap(); + assert!(storage_location.eq_ignore_ascii_case("string")); } } diff --git a/crates/catalog-unity/src/models.rs b/crates/catalog-unity/src/models.rs index 2066a4ee86..9d0edd7ab2 100644 --- a/crates/catalog-unity/src/models.rs +++ b/crates/catalog-unity/src/models.rs @@ -266,14 +266,14 @@ pub struct TableSummary { pub struct Table { /// Username of table creator. #[serde(default)] - pub created_by: String, + pub created_by: Option, /// Name of table, relative to parent schema. pub name: String, /// Username of user who last modified the table. #[serde(default)] - pub updated_by: String, + pub updated_by: Option, /// List of schemes whose objects can be referenced without qualification. #[serde(default)] @@ -283,6 +283,7 @@ pub struct Table { pub data_source_format: DataSourceFormat, /// Full name of table, in form of catalog_name.schema_name.table_name + #[serde(default)] pub full_name: String, /// Name of parent schema relative to its parent catalog. @@ -292,6 +293,7 @@ pub struct Table { pub storage_location: String, /// Unique identifier of parent metastore. + #[serde(default)] pub metastore_id: String, } diff --git a/python/Cargo.toml b/python/Cargo.toml index cb2f6d944d..eedcad1576 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -17,6 +17,9 @@ doc = false [dependencies] delta_kernel.workspace = true +# deltalake_catalog_unity - local crate +deltalake-catalog-unity = { path = "../crates/catalog-unity" } + # arrow arrow-schema = { workspace = true, features = ["serde"] } diff --git a/python/src/lib.rs b/python/src/lib.rs index 7c86aeec9e..df37e8d5da 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -76,6 +76,7 @@ use crate::merge::PyMergeBuilder; use crate::query::PyQueryBuilder; use crate::schema::{schema_to_pyobject, Field}; use crate::utils::rt; +use deltalake_catalog_unity::UnityCatalogBuilder; #[derive(FromPyObject)] enum PartitionFilterValue { @@ -169,12 +170,24 @@ impl RawDeltaTable { log_buffer_size: Option, ) -> PyResult { py.allow_threads(|| { - let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri) - .with_io_runtime(IORuntime::default()); - let options = storage_options.clone().unwrap_or_default(); - if let Some(storage_options) = storage_options { - builder = builder.with_storage_options(storage_options) + let (table_path, uc_token) = if UnityCatalogBuilder::is_unity_catalog_uri(table_uri) { + match rt().block_on(UnityCatalogBuilder::get_uc_location_and_token(table_uri)) { + Ok(tup) => tup, + Err(err) => return Err(PyRuntimeError::new_err(err.to_string())), + } + } else { + (table_uri.to_string(), "".to_string()) + }; + + let mut options = storage_options.clone().unwrap_or_default(); + if !uc_token.is_empty() { + options.insert("UNITY_CATALOG_TEMPORARY_TOKEN".to_string(), uc_token); } + + let mut builder = deltalake::DeltaTableBuilder::from_uri(&table_path) + .with_io_runtime(IORuntime::default()); + builder = builder.with_storage_options(options.clone()); + if let Some(version) = version { builder = builder.with_version(version) } @@ -191,7 +204,7 @@ impl RawDeltaTable { Ok(RawDeltaTable { _table: Arc::new(Mutex::new(table)), _config: FsConfig { - root_url: table_uri.into(), + root_url: table_path, options, }, }) @@ -204,10 +217,23 @@ impl RawDeltaTable { table_uri: &str, storage_options: Option>, ) -> PyResult { - let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri); - if let Some(storage_options) = storage_options { - builder = builder.with_storage_options(storage_options) + let (table_path, uc_token) = if UnityCatalogBuilder::is_unity_catalog_uri(table_uri) { + match rt().block_on(UnityCatalogBuilder::get_uc_location_and_token(table_uri)) { + Ok(tup) => tup, + Err(err) => return Err(PyRuntimeError::new_err(err.to_string())), + } + } else { + (table_uri.to_string(), "".to_string()) + }; + + let mut options = storage_options.clone().unwrap_or_default(); + if !uc_token.is_empty() { + options.insert("UNITY_CATALOG_TEMPORARY_TOKEN".to_string(), uc_token); } + + let mut builder = deltalake::DeltaTableBuilder::from_uri(&table_path) + .with_io_runtime(IORuntime::default()); + builder = builder.with_storage_options(options.clone()); Ok(rt() .block_on(async { match builder.build() {