Skip to main content
Version: Next

Databricks

DataHub supports integration with Databricks ecosystem using a multitude of connectors, depending on your exact setup.

Databricks Hive

The simplest way to integrate is usually via the Hive connector. The Hive starter recipe has a section describing how to connect to your Databricks workspace.

Databricks Unity Catalog (new)

The recently introduced Unity Catalog provides a new way to govern your assets within the Databricks lakehouse. If you have enabled Unity Catalog, you can use the unity-catalog source (see below) to integrate your metadata into DataHub as an alternate to the Hive pathway.

Databricks Spark

To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions here.

Watch the DataHub Talk at the Data and AI Summit 2022

For a deeper look at how to think about DataHub within and across your Databricks ecosystem, watch the recording of our talk at the Data and AI Summit 2022.

Incubating

Important Capabilities

CapabilityStatusNotes
Asset ContainersEnabled by default
Column-level LineageEnabled by default
Dataset UsageEnabled by default
DescriptionsEnabled by default
Detect Deleted EntitiesOptionally enabled via stateful_ingestion.remove_stale_metadata
DomainsSupported via the domain config field
Extract OwnershipSupported via the include_ownership config
Platform InstanceEnabled by default
Schema MetadataEnabled by default
Table-Level LineageEnabled by default

This plugin extracts the following metadata from Databricks Unity Catalog:

  • metastores
  • schemas
  • tables and column lineage

Prerequisities

  • Get your Databricks instance's workspace url
  • Create a Databricks Service Principal
    • You can skip this step and use your own account to get things running quickly, but we strongly recommend creating a dedicated service principal for production use.
  • Generate a Databricks Personal Access token following the following guides:
  • Provision your service account:
    • To ingest your workspace's metadata and lineage, your service principal must have all of the following:
      • One of: metastore admin role, ownership of, or USE CATALOG privilege on any catalogs you want to ingest
      • One of: metastore admin role, ownership of, or USE SCHEMA privilege on any schemas you want to ingest
      • Ownership of or SELECT privilege on any tables and views you want to ingest
      • Ownership documentation
      • Privileges documentation
    • To ingest legacy hive_metastore catalog (include_hive_metastore - disabled by default), your service principal must have all of the following:
      • READ_METADATA and USAGE privilege on hive_metastore catalog
      • READ_METADATA and USAGE privilege on schemas you want to ingest
      • READ_METADATA and USAGE privilege on tables and views you want to ingest
      • Hive Metastore Privileges documentation
    • To ingest your workspace's notebooks and respective lineage, your service principal must have CAN_READ privileges on the folders containing the notebooks you want to ingest: guide.
    • To include_usage_statistics (enabled by default), your service principal must have CAN_MANAGE permissions on any SQL Warehouses you want to ingest: guide.
    • To ingest profiling information with method: ge, you need SELECT privileges on all profiled tables.
    • To ingest profiling information with method: analyze and call_analyze: true (enabled by default), your service principal must have ownership or MODIFY privilege on any tables you want to profile.
      • Alternatively, you can run ANALYZE TABLE yourself on any tables you want to profile, then set call_analyze to false. You will still need SELECT privilege on those tables to fetch the results.
  • Check the starter recipe below and replace workspace_url and token with your information from the previous steps.

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[unity-catalog]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: unity-catalog
config:
workspace_url: https://my-workspace.cloud.databricks.com
token: "<token>"
include_metastore: false
include_ownership: true
profiling:
method: "ge"
enabled: true
warehouse_id: "<warehouse_id>"
profile_table_level_only: false
max_wait_secs: 60
pattern:
deny:
- ".*\\.unwanted_schema"

# profiling:
# method: "analyze"
# enabled: true
# warehouse_id: "<warehouse_id>"
# profile_table_level_only: true
# call_analyze: true

# catalogs: ["my_catalog"]
# schema_pattern:
# deny:
# - information_schema
# table_pattern:
# allow:
# - my_catalog.my_schema.my_table
# First you have to create domains on Datahub by following this guide -> https://datahubproject.io/docs/domains/#domains-setup-prerequisites-and-permissions
# domain:
# urn:li:domain:1111-222-333-444-555:
# allow:
# - main.*

stateful_ingestion:
enabled: true

pipeline_name: acme-corp-unity


# sink configs if needed

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

FieldDescription
token 
string
Databricks personal access token
workspace_url 
string
Databricks workspace url. e.g. https://my-workspace.cloud.databricks.com
bucket_duration
Enum
Size of the time window to aggregate usage stats.
Default: DAY
catalogs
array(string)
column_lineage_column_limit
integer
Limit the number of columns to get column level lineage.
Default: 300
convert_urns_to_lowercase
boolean
Whether to convert dataset urns to lowercase.
Default: False
enable_stateful_profiling
boolean
Enable stateful profiling. This will store profiling timestamps per dataset after successful profiling. and will not run profiling again in subsequent run if table has not been updated.
Default: True
end_time
string(date-time)
Latest date of lineage/usage to consider. Default: Current time in UTC
format_sql_queries
boolean
Whether to format sql queries
Default: False
include_column_lineage
boolean
Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion.
Default: True
include_external_lineage
boolean
Option to enable/disable lineage generation for external tables. Only external S3 tables are supported at the moment.
Default: True
include_hive_metastore
boolean
Whether to ingest legacy hive_metastore catalog. This requires executing queries on SQL warehouse.
Default: True
include_metastore
boolean
Whether to ingest the workspace's metastore as a container and include it in all urns. Changing this will affect the urns of all entities in the workspace. This config is deprecated and will be removed in the future, so it is recommended to not set this to True for new ingestions. If you have an existing unity catalog ingestion, you'll want to avoid duplicates by soft deleting existing data. If stateful ingestion is enabled, running with include_metastore: false should be sufficient. Otherwise, we recommend deleting via the cli: datahub delete --platform databricks and re-ingesting with include_metastore: false.
Default: False
include_notebooks
boolean
Ingest notebooks, represented as DataHub datasets.
Default: False
include_operational_stats
boolean
Whether to display operational stats.
Default: True
include_ownership
boolean
Option to enable/disable ownership generation for metastores, catalogs, schemas, and tables.
Default: False
include_read_operational_stats
boolean
Whether to report read operational stats. Experimental.
Default: False
include_table_lineage
boolean
Option to enable/disable lineage generation.
Default: True
include_table_location_lineage
boolean
If the source supports it, include table lineage to the underlying storage location.
Default: True
include_tables
boolean
Whether tables should be ingested.
Default: True
include_top_n_queries
boolean
Whether to ingest the top_n_queries.
Default: True
include_usage_statistics
boolean
Generate usage statistics.
Default: True
include_view_column_lineage
boolean
Populates column-level lineage for view->view and table->view lineage using DataHub's sql parser. Requires include_view_lineage to be enabled.
Default: True
include_view_lineage
boolean
Populates view->view and table->view lineage using DataHub's sql parser.
Default: True
include_views
boolean
Whether views should be ingested.
Default: True
incremental_lineage
boolean
When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.
Default: False
ingest_data_platform_instance_aspect
boolean
Option to enable/disable ingestion of the data platform instance aspect. The default data platform instance id for a dataset is workspace_name
Default: False
options
object
Any options specified here will be passed to SQLAlchemy.create_engine as kwargs.
platform_instance
string
The instance of the platform that all assets produced by this recipe belong to
scheme
string
Default: databricks
sql_parser_use_external_process
boolean
When enabled, sql parser will run in isolated in a separate process. This can affect processing time but can protect from sql parser's mem leak.
Default: False
start_time
string(date-time)
Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on bucket_duration). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'.
top_n_queries
integer
Number of top queries to save to each table.
Default: 10
use_file_backed_cache
boolean
Whether to use a file backed cache for the view definitions.
Default: True
warehouse_id
string
SQL Warehouse id, for running queries. If not set, will use the default warehouse.
workspace_name
string
Name of the workspace. Default to deployment name present in workspace_url
env
string
The environment that all assets produced by this connector belong to
Default: PROD
catalog_pattern
AllowDenyPattern
Regex patterns for catalogs to filter in ingestion. Specify regex to match the full metastore.catalog name.
Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True}
catalog_pattern.allow
array(string)
catalog_pattern.deny
array(string)
catalog_pattern.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
domain
map(str,AllowDenyPattern)
A class to store allow deny regexes
domain.key.allow
array(string)
domain.key.deny
array(string)
domain.key.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
notebook_pattern
AllowDenyPattern
Regex patterns for notebooks to filter in ingestion, based on notebook path. Specify regex to match the entire notebook path in /<dir>/.../<name> format. e.g. to match all notebooks in the root Shared directory, use the regex /Shared/.*.
Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True}
notebook_pattern.allow
array(string)
notebook_pattern.deny
array(string)
notebook_pattern.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
profile_pattern
AllowDenyPattern
Regex patterns to filter tables (or specific columns) for profiling during ingestion. Note that only tables allowed by the table_pattern will be considered.
Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True}
profile_pattern.allow
array(string)
profile_pattern.deny
array(string)
profile_pattern.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
schema_pattern
AllowDenyPattern
Regex patterns for schemas to filter in ingestion. Specify regex to the full metastore.catalog.schema name. e.g. to match all tables in schema analytics, use the regex ^mymetastore\.mycatalog\.analytics$.
Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True}
schema_pattern.allow
array(string)
schema_pattern.deny
array(string)
schema_pattern.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
table_pattern
AllowDenyPattern
Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in catalog.schema.table format. e.g. to match all tables starting with customer in Customer catalog and public schema, use the regex Customer\.public\.customer.*.
Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True}
table_pattern.allow
array(string)
table_pattern.deny
array(string)
table_pattern.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
user_email_pattern
AllowDenyPattern
regex patterns for user emails to filter in usage.
Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True}
user_email_pattern.allow
array(string)
user_email_pattern.deny
array(string)
user_email_pattern.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
view_pattern
AllowDenyPattern
Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'
Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True}
view_pattern.allow
array(string)
view_pattern.deny
array(string)
view_pattern.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
profiling
One of UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig
Data profiling configuration
Default: {'enabled': False, 'operation_config': {'lower_fre...
profiling.call_analyze
boolean
Whether to call ANALYZE TABLE as part of profile ingestion.If false, will ingest the results of the most recent ANALYZE TABLE call, if any.
Default: True
profiling.catch_exceptions
boolean
Default: True
profiling.enabled
boolean
Whether profiling should be done.
Default: False
profiling.field_sample_values_limit
integer
Upper limit for number of sample values to collect for all columns.
Default: 20
profiling.include_field_distinct_count
boolean
Whether to profile for the number of distinct values for each column.
Default: True
profiling.include_field_distinct_value_frequencies
boolean
Whether to profile for distinct value frequencies.
Default: False
profiling.include_field_histogram
boolean
Whether to profile for the histogram for numeric fields.
Default: False
profiling.include_field_max_value
boolean
Whether to profile for the max value of numeric columns.
Default: True
profiling.include_field_mean_value
boolean
Whether to profile for the mean value of numeric columns.
Default: True
profiling.include_field_median_value
boolean
Whether to profile for the median value of numeric columns.
Default: True
profiling.include_field_min_value
boolean
Whether to profile for the min value of numeric columns.
Default: True
profiling.include_field_null_count
boolean
Whether to profile for the number of nulls for each column.
Default: True
profiling.include_field_quantiles
boolean
Whether to profile for the quantiles of numeric columns.
Default: False
profiling.include_field_sample_values
boolean
Whether to profile for the sample values for all columns.
Default: True
profiling.include_field_stddev_value
boolean
Whether to profile for the standard deviation of numeric columns.
Default: True
profiling.limit
integer
Max number of documents to profile. By default, profiles all documents.
profiling.max_number_of_fields_to_profile
integer
A positive integer that specifies the maximum number of columns to profile for any table. None implies all columns. The cost of profiling goes up significantly as the number of columns to profile goes up.
profiling.max_wait_secs
integer
Maximum time to wait for a table to be profiled.
profiling.max_workers
integer
Number of worker threads to use for profiling. Set to 1 to disable.
Default: 20
profiling.method
Enum
One of: "ge"
Default: ge
profiling.offset
integer
Offset in documents to profile. By default, uses no offset.
profiling.partition_datetime
string(date-time)
If specified, profile only the partition which matches this datetime. If not specified, profile the latest partition. Only Bigquery supports this.
profiling.partition_profiling_enabled
boolean
Whether to profile partitioned tables. Only BigQuery supports this. If enabled, latest partition data is used for profiling.
Default: True
profiling.profile_external_tables
boolean
Whether to profile external tables. Only Snowflake and Redshift supports this.
Default: False
profiling.profile_if_updated_since_days
number
Profile table only if it has been updated since these many number of days. If set to null, no constraint of last modified time for tables to profile. Supported only in snowflake and BigQuery.
profiling.profile_table_level_only
boolean
Whether to perform profiling at table-level only, or include column-level profiling as well.
Default: False
profiling.profile_table_row_count_estimate_only
boolean
Use an approximate query for row count. This will be much faster but slightly less accurate. Only supported for Postgres and MySQL.
Default: False
profiling.profile_table_row_limit
integer
Profile tables only if their row count is less then specified count. If set to null, no limit on the row count of tables to profile. Supported only in snowflake and BigQuery
Default: 5000000
profiling.profile_table_size_limit
integer
Profile tables only if their size is less then specified GBs. If set to null, no limit on the size of tables to profile. Supported only in snowflake and BigQuery
Default: 5
profiling.query_combiner_enabled
boolean
This feature is still experimental and can be disabled if it causes issues. Reduces the total number of queries issued and speeds up profiling by dynamically combining SQL queries where possible.
Default: True
profiling.report_dropped_profiles
boolean
Whether to report datasets or dataset columns which were not profiled. Set to True for debugging purposes.
Default: False
profiling.sample_size
integer
Number of rows to be sampled from table for column level profiling.Applicable only if use_sampling is set to True.
Default: 10000
profiling.turn_off_expensive_profiling_metrics
boolean
Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.
Default: False
profiling.use_sampling
boolean
Whether to profile column level stats on sample of table. Only BigQuery and Snowflake support this. If enabled, profiling is done on rows sampled from table. Sampling is not done for smaller tables.
Default: True
profiling.warehouse_id
string
SQL Warehouse id, for running profiling queries.
profiling.operation_config
One of OperationConfig, union(allOf), OperationConfig
Experimental feature. To specify operation configs.
profiling.operation_config.lower_freq_profile_enabled
boolean
Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling.
Default: False
profiling.operation_config.profile_date_of_month
integer
Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect.
profiling.operation_config.profile_day_of_week
integer
Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect.
profiling.pattern
One of AllowDenyPattern, union(allOf), AllowDenyPattern
Regex patterns to filter tables for profiling during ingestion. Specify regex to match the catalog.schema.table format. Note that only tables allowed by the table_pattern will be considered.
Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True}
profiling.pattern.allow
array(string)
profiling.pattern.deny
array(string)
profiling.pattern.ignoreCase
boolean
Whether to ignore case sensitivity during pattern matching.
Default: True
stateful_ingestion
StatefulStaleMetadataRemovalConfig
Unity Catalog Stateful Ingestion Config.
stateful_ingestion.enabled
boolean
The type of the ingestion state provider registered with datahub.
Default: False
stateful_ingestion.remove_stale_metadata
boolean
Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.
Default: True

Advanced

Multiple Databricks Workspaces

If you have multiple databricks workspaces that point to the same Unity Catalog metastore, our suggestion is to use separate recipes for ingesting the workspace-specific Hive Metastore catalog and Unity Catalog metastore's information schema.

To ingest Hive metastore information schema

  • Setup one ingestion recipe per workspace
  • Use platform instance equivalent to workspace name
  • Ingest only hive_metastore catalog in the recipe using config catalogs: ["hive_metastore"]

To ingest Unity Catalog information schema

  • Disable hive metastore catalog ingestion in the recipe using config include_hive_metastore: False
  • Ideally, just ingest from one workspace
  • To ingest from both workspaces (e.g. if each workspace has different permissions and therefore restricted view of the UC metastore):
    • Use same platform instance for all workspaces using same UC metastore
    • Ingest usage from only one workspace (you lose usage from other workspace)
    • Use filters to only ingest each catalog once, but shouldn’t be necessary

Troubleshooting

No data lineage captured or missing lineage

Check that you meet the Unity Catalog lineage requirements.

Also check the Unity Catalog limitations to make sure that lineage would be expected to exist in this case.

Lineage extraction is too slow

Currently, there is no way to get table or column lineage in bulk from the Databricks Unity Catalog REST api. Table lineage calls require one API call per table, and column lineage calls require one API call per column. If you find metadata extraction taking too long, you can turn off column level lineage extraction via the include_column_lineage config flag.

Code Coordinates

  • Class Name: datahub.ingestion.source.unity.source.UnityCatalogSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Databricks, feel free to ping us on our Slack.