Publish Lineage Information from Custom Applications to OCI Data Catalog
In this tutorial, you set up data processing application to push and publish data lineage to Data Catalog. Key tasks include how to:
- Data Catalog
- Setup for accepting Data Lineage.
- Add
Openlineage-spark
plugin to the Spark application to generate Data Lineage - Publish Data Lineage to Data Catalog.
Data lineage indicates the journey that data takes as it flows from data sources to consumption. Through metadata, data consumers can understand and visualize the transformations that the data went through in the data pipelines.
Data lineage can be generated by any data processing applications/services. Most of the standard data processing applications support generation and publication of data lineage.
Data Catalog provides options to capture data lineage from different services, including
- Data Integration
- Data Flow
- Data Integration
- Your own custom data processing applications
Standard Practice for Capturing Data Lineage
For information on understanding of the standard for collection and analysis of data lineage, see OpenLineage
OpenLineage Specification
As per OpenLineage specification, the Data Lineage generated by any Data Processing application should be represented as follows:
OpenLineage Format Expand source
{
"eventTime": "2019-08-24T14:15:22Z",
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"schemaURL": "https://openlineage.io/spec/0-0-1/OpenLineage.json",
"eventType": "START|RUNNING|COMPLETE|ABORT|FAIL|OTHER",
"run": {
"runId": "78c33d18-170c-44d3-a227-b3194f134f73",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
}
}
},
"job": {
"namespace": "my-scheduler-namespace",
"name": "myjob.mytask",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
}
}
},
"inputs": [
{
"namespace": "my-datasource-namespace",
"name": "instance.schema.table",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
}
},
"inputFacets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
}
}
}
],
"outputs": [
{
"namespace": "my-datasource-namespace",
"name": "instance.schema.table",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
}
},
"outputFacets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
}
}
}
]
}
Before You Begin
To successfully perform this tutorial, you must have:
- An Oracle Cloud Infrastructure account. For more information, see Request and Manage Free Oracle Cloud Promotions.
- Access to use the Data Catalog resources. For more information, see Getting Started with Data Catalog and Creating Data Catalog Policies.
- A created data catalog instance. For more information, see Creating a Data Catalog Instance. It's not required to be a catalog admin, however the following IAM policy is required:
allow group lineage-group to CATALOG_LINEAGE_IMPORT in tenancy where all {target.catalog.id = <catalog-ocid>, target.data-asset.key=<data-asset-key>}
allow group <the-group-your-username-belongs> to manage all-resources in compartment catalog-compartment
See Common Policies for more examples.
In the next section, you make a compartment for your data catalog instances, called
catalog-compartment
.In this section, you set up the data processing applications to push data lineage to Data Catalog.
Add Openlineage-spark Plugin to the Spark Application to Generate Data Lineage
Openlineage provides a Apache spark plugin that binds to spark-context and generates Data Lineage from it. This plugin can be extended to publish the Data Lineage to Data Catalog. The following snippets provide the plugin extension code and spark-submit options to invoke the plugin.
Plugin POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>OpenLineageExtension</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<openlineage.version>1.8.0</openlineage.version>
<oci.sdk.version>3.41.2</oci.sdk.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.openlineage/openlineage-spark -->
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark</artifactId>
<version>${openlineage.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.13.12</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-datacatalog</artifactId>
<version>${oci.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-common</artifactId>
<version>${oci.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-common-httpclient-jersey</artifactId>
<version>${oci.sdk.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Openlineage Plugin Extension - OciConfig
package io.openlineage.client.transports;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.With;
@NoArgsConstructor
@AllArgsConstructor
@ToString
@With
public final class OciConfig implements TransportConfig {
@Getter
@Setter
private String catalogId;
@Getter
@Setter
private String dataAssetKey;
@Getter
@Setter
private String authType;
@Getter
@Setter
private String authProfile;
@Getter
@Setter
private String endpoint;
}
Openlineage Plugin Extension - OciTransport
package io.openlineage.client.transports;
import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.auth.SessionTokenAuthenticationDetailsProvider;
import com.oracle.bmc.datacatalog.DataCatalogClient;
import com.oracle.bmc.datacatalog.model.ImportLineageDetails;
import com.oracle.bmc.datacatalog.requests.ImportLineageRequest;
import com.oracle.bmc.datacatalog.responses.ImportLineageResponse;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OciTransport extends Transport {
private static final Logger log = LoggerFactory.getLogger(OciTransport.class);
private final OciConfig config;
private final String SESSION_TOKEN_AUTH = "security_token";
private final boolean isValidConfig;
public OciTransport(final OciConfig ociConfig) {
config = ociConfig;
isValidConfig = isValidConfig();
}
/***
* Selectively emit only complete events to OCI Data Catalog
* @param runEvent
*/
@Override
public void emit(OpenLineage.RunEvent runEvent) {
if (runEvent.getEventType().equals(OpenLineage.RunEvent.EventType.COMPLETE)) {
log.info("Found event type - Complete. Emitting event using OciTransport");
final String eventAsJson = OpenLineageClientUtils.toJson(runEvent);
emit(eventAsJson);
} else {
log.info("Found event type - {}. Skipping emission",runEvent.getEventType());
}
}
@SneakyThrows
@Override
public void emit(String eventAsJson) {
if (!isValidConfig) {
log.info("OCI config is not valid. Please update OCI config. Skipping lineage emission.");
return;
}
AuthenticationDetailsProvider provider;
if (config.getAuthType()!=null && config.getAuthType().equals(SESSION_TOKEN_AUTH)) {
provider = new SessionTokenAuthenticationDetailsProvider(config.getAuthProfile());
} else {
provider = new ConfigFileAuthenticationDetailsProvider(config.getAuthProfile());
}
try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
if (config.getEndpoint() != null && !config.getEndpoint().isBlank()) {
client.setEndpoint(config.getEndpoint());
}
ImportLineageRequest importLineageRequest = ImportLineageRequest
.builder()
.importLineageDetails(ImportLineageDetails
.builder()
.lineagePayload(eventAsJson.getBytes())
.build())
.dataAssetKey(config.getDataAssetKey())
.catalogId(config.getCatalogId())
.build();
ImportLineageResponse response = client.importLineage(importLineageRequest);
log.info("Pushed lineage event to catalog - {}, opc-request-id = {}", config.getCatalogId(), response.getOpcRequestId());
} catch (Exception ex) {
log.warn("Failed to push lineage {}", ex.getMessage(), ex);
}
}
private boolean isValidConfig() {
log.info("OCI Config - {}", config.toString());
if (config.getAuthProfile() == null || config.getAuthProfile().isBlank()) {
log.info("OCI config is missing Authentication profile. Lineage will not be emitted to OCI Data Catalog. " +
"Check documentation on how to create Authentication profile.");
return false;
}
if (config.getCatalogId() == null || config.getCatalogId().isBlank()) {
log.info("OCI config is missing CatalogId. Lineage will not be emitted to OCI Data Catalog. " +
"Check documentation on how to get CatalogId.");
return false;
}
if (config.getDataAssetKey() == null || config.getDataAssetKey().isBlank()) {
log.info("OCI config is missing DataAssetResourceId. Lineage will not be emitted to OCI Data Catalog. " +
"Check documentation on how to create DataAsset and get its DataAssetResourceId");
return false;
}
return true;
}
}
Openlineage Plugin Extension - OciTransportBuilder
package io.openlineage.client.transports;
public class OciTransportBuilder implements TransportBuilder {
@Override
public String getType() {
return "oci";
}
@Override
public TransportConfig getConfig() {
return new OciConfig();
}
@Override
public Transport build(TransportConfig config) {
return new OciTransport((OciConfig) config);
}
}
Spark Submit Options
--packages "io.openlineage:openlineage-spark:1.8.0,com.oracle.oci.sdk:oci-java-sdk-common:{oci-sdk-version},com.oracle.oci.sdk:oci-java-sdk-common-httpclient-jersey:{oci-sdk-version},com.oracle.pic.dcat:datacatalog-java-client:{oci-sdk-version},io.openlineage:openlineage-oci-extension:{generated-from-above-mentioned-code-snippet}"
--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
--conf "spark.openlineage.debugFacet=enabled"
--conf "spark.openlineage.transport.type=oci"
--conf "spark.openlineage.transport.catalogId={catalog instance OCID}"
--conf "spark.openlineage.transport.dataAssetKey={UUID of the dataAsset registered in Data catalog}"
--conf "spark.openlineage.transport.authProfile={user authentication profile}"
--conf "spark.openlineage.transport.authType={optionally specify security_token as auth type}"
--conf "spark.openlineage.application.name={application name to display on catalog UI}"
Publish Data Lineage to Data Catalog
To publish data lineage to Data Catalog, you must create an IAM user and setup authentication for this user on the system where the data processing applications are running. This document provides details on the available authentication methods to connect to OCI services. Specify the following IAM policy for the user
allow group lineage-group to CATALOG_LINEAGE_IMPORT in tenancy where all {target.catalog.id = <catalog-ocid>, target.data-asset.key=<data-asset-key>}
For any other data processing application in your ecosystem, you follow the previous steps.
Only the Data Lineage generation step varies depending on the data processing application. You can lookup the Openlineage integrations page to verify if a Openlineage plugin already exists for your application. Otherwise, you can write your own implementation to produce the Data Lineage payload in Openlineage format. The following is sample code to call importLineage endpoint of OCI Data Catalog service.
ImportLineage - Java API
try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
ImportLineageRequest importLineageRequest = ImportLineageRequest
.builder()
.importLineageDetails(ImportLineageDetails
.builder()
.lineagePayload(payload.getBytes())
.build())
.dataAssetKey(dataAssetKey)
.catalogId(catalogOcid)
.build();
ImportLineageResponse response = client.importLineage(importLineageRequest);
} catch (Exception ex) {
log.warn("Failed to push lineage {}", ex.getMessage(), ex);
}
ImportLineage - Python API
import oci
# Create a default config using DEFAULT profile in default location
# Refer to
# https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm#SDK_and_CLI_Configuration_File
# for more info
config = oci.config.from_file()
# Initialize service client with default config file
data_catalog_client = oci.data_catalog.DataCatalogClient(config)
# Send the request to service, some parameters are not required, see API
# doc for more info
import_lineage_response = data_catalog_client.import_lineage(
catalog_id="ocid1.test.oc1..<unique_ID>EXAMPLE-catalogId-Value",
data_asset_key="EXAMPLE-dataAssetKey-Value",
import_lineage_details=oci.data_catalog.models.ImportLineageDetails(
lineage_payload="openlineage-payload"),
opc_retry_token="EXAMPLE-opcRetryToken-Value",
opc_request_id="AOR2TTPR06HMXDHS3NZU<unique_ID>")
# Get the data from response
print(import_lineage_response.data)