DEPRECATION WARNING: This package is no longer maintained. Databricks now officially maintains a DBAPI package called databricks-sql-connector that is compatible with workspace and sql analytics clusters. There is also the newer sqlalchemy-databricks package which uses the databricks-sql-connector
as a driver.
A thin wrapper around pyhive and pyodbc for creating a DBAPI connection to Databricks Workspace and SQL Analytics clusters. SQL Analytics clusters require the Simba ODBC driver.
Also provides SQLAlchemy Dialects using pyhive
and pyodbc
for Databricks clusters. Databricks SQL Analytics clusters only support the pyodbc
-driven dialect.
Install using pip. You must specify at least one of the extras {hive
or odbc
}. For odbc
the Simba driver is required:
pip install databricks-dbapi[hive,odbc]
For SQLAlchemy support install with:
pip install databricks-dbapi[hive,odbc,sqlalchemy]
The connect()
function returns a pyhive
Hive connection object, which internally wraps a thrift
connection.
Connecting with http_path
, host
, and a token
:
import os
from databricks_dbapi import hive
token = os.environ["DATABRICKS_TOKEN"]
host = os.environ["DATABRICKS_HOST"]
http_path = os.environ["DATABRICKS_HTTP_PATH"]
connection = hive.connect(
host=host,
http_path=http_path,
token=token,
)
cursor = connection.cursor()
cursor.execute("SELECT * FROM some_table LIMIT 100")
print(cursor.fetchone())
print(cursor.fetchall())
The pyhive
connection also provides async functionality:
import os
from databricks_dbapi import hive
from TCLIService.ttypes import TOperationState
token = os.environ["DATABRICKS_TOKEN"]
host = os.environ["DATABRICKS_HOST"]
cluster = os.environ["DATABRICKS_CLUSTER"]
connection = hive.connect(
host=host,
cluster=cluster,
token=token,
)
cursor = connection.cursor()
cursor.execute("SELECT * FROM some_table LIMIT 100", async_=True)
status = cursor.poll().operationState
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
logs = cursor.fetch_logs()
for message in logs:
print(message)
# If needed, an asynchronous query can be cancelled at any time with:
# cursor.cancel()
status = cursor.poll().operationState
print(cursor.fetchall())
The ODBC DBAPI requires the Simba ODBC driver.
Connecting with http_path
, host
, and a token
:
import os
from databricks_dbapi import odbc
token = os.environ["DATABRICKS_TOKEN"]
host = os.environ["DATABRICKS_HOST"]
http_path = os.environ["DATABRICKS_HTTP_PATH"]
connection = odbc.connect(
host=host,
http_path=http_path,
token=token,
driver_path="/path/to/simba/driver",
)
cursor = connection.cursor()
cursor.execute("SELECT * FROM some_table LIMIT 100")
print(cursor.fetchone())
print(cursor.fetchall())
Installing registers the databricks+pyhive
dialect/driver with SQLAlchemy. Fill in the required information when passing the engine URL.
from sqlalchemy import *
from sqlalchemy.engine import create_engine
from sqlalchemy.schema import *
engine = create_engine(
"databricks+pyhive://token:<databricks_token>@<host>:<port>/<database>",
connect_args={"http_path": "<cluster_http_path>"}
)
logs = Table("my_table", MetaData(bind=engine), autoload=True)
print(select([func.count("*")], from_obj=logs).scalar())
Installing registers the databricks+pyodbc
dialect/driver with SQLAlchemy. Fill in the required information when passing the engine URL.
from sqlalchemy import *
from sqlalchemy.engine import create_engine
from sqlalchemy.schema import *
engine = create_engine(
"databricks+pyodbc://token:<databricks_token>@<host>:<port>/<database>",
connect_args={"http_path": "<cluster_http_path>", "driver_path": "/path/to/simba/driver"}
)
logs = Table("my_table", MetaData(bind=engine), autoload=True)
print(select([func.count("*")], from_obj=logs).scalar())
Refer to the following documentation for more details on hostname, cluster name, and http path: