Publish Lineage Information from Custom Applications to OCI Data Catalog

In this tutorial, you set up data processing application to push and publish data lineage to Data Catalog. Key tasks include how to:

  • Data Catalog
  • Setup for accepting Data Lineage.
  • Add Openlineage-spark plugin to the Spark application to generate Data Lineage
  • Publish Data Lineage to Data Catalog.

Data lineage indicates the journey that data takes as it flows from data sources to consumption. Through metadata, data consumers can understand and visualize the transformations that the data went through in the data pipelines.

Data lineage can be generated by any data processing applications/services. Most of the standard data processing applications support generation and publication of data lineage.

Data Catalog provides options to capture data lineage from different services, including

Standard Practice for Capturing Data Lineage

For information on understanding of the standard for collection and analysis of data lineage, see OpenLineage

OpenLineage Specification

As per OpenLineage specification, the Data Lineage generated by any Data Processing application should be represented as follows:

OpenLineage Format Expand source

{

    "eventTime": "2019-08-24T14:15:22Z",
    "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
    "schemaURL": "https://openlineage.io/spec/0-0-1/OpenLineage.json",
    "eventType": "START|RUNNING|COMPLETE|ABORT|FAIL|OTHER",
    "run": {
        "runId": "78c33d18-170c-44d3-a227-b3194f134f73",
        "facets": {
            "property1": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
            },
            "property2": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
            }
        }
    },
    "job": {
        "namespace": "my-scheduler-namespace",
        "name": "myjob.mytask",
        "facets": {
            "property1": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                "_deleted": true
            },
            "property2": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                "_deleted": true
            }
        }
    },
    "inputs": [
        {
            "namespace": "my-datasource-namespace",
            "name": "instance.schema.table",
            "facets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                }
            },
            "inputFacets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                }
            }
        }
    ],
    "outputs": [
        {
            "namespace": "my-datasource-namespace",
            "name": "instance.schema.table",
            "facets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                }
            },
            "outputFacets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                }
            }
        }
    ]
}

Before You Begin

To successfully perform this tutorial, you must have:

If you have administrative rights to your account, skip the rest of this section. Otherwise, have your administrator add the following policy to your account:
allow group <the-group-your-username-belongs> to manage all-resources in compartment catalog-compartment

See Common Policies for more examples.

Note

In the next section, you make a compartment for your data catalog instances, called catalog-compartment.
Setting Up Data Catalog to Accept Data Lineage

In this section, you set up the data processing applications to push data lineage to Data Catalog.

The following task is an example of a Spark application.
  1. Open the navigation menu  and select Analytics & AI. Under Data Lake, select Data Catalog.
  2. Fill in the following information:
    • Name: Lineage - Sales application
  3. For Type, select Custom Lineage Provider.
  4. Click Create.

Add Openlineage-spark Plugin to the Spark Application to Generate Data Lineage

Openlineage provides a Apache spark plugin that binds to spark-context and generates Data Lineage from it. This plugin can be extended to publish the Data Lineage to Data Catalog. The following snippets provide the plugin extension code and spark-submit options to invoke the plugin.

Plugin POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>OpenLineageExtension</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <openlineage.version>1.8.0</openlineage.version>
        <oci.sdk.version>3.41.2</oci.sdk.version>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/io.openlineage/openlineage-spark -->
        <dependency>
            <groupId>io.openlineage</groupId>
            <artifactId>openlineage-spark</artifactId>
            <version>${openlineage.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.13</artifactId>
            <version>3.5.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.13.12</version>
        </dependency>

        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-datacatalog</artifactId>
            <version>${oci.sdk.version}</version>
        </dependency>

        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-common</artifactId>
            <version>${oci.sdk.version}</version>
        </dependency>

        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-common-httpclient-jersey</artifactId>
            <version>${oci.sdk.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.0.2</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Openlineage Plugin Extension - OciConfig

package io.openlineage.client.transports;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.With;

@NoArgsConstructor
@AllArgsConstructor
@ToString
@With
public final class OciConfig implements TransportConfig {

    @Getter
    @Setter
    private String catalogId;
    @Getter
    @Setter
    private String dataAssetKey;
    @Getter
    @Setter
    private String authType;
    @Getter
    @Setter
    private String authProfile;
    @Getter
    @Setter
    private String endpoint;
}

Openlineage Plugin Extension - OciTransport

package io.openlineage.client.transports;

import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.auth.SessionTokenAuthenticationDetailsProvider;
import com.oracle.bmc.datacatalog.DataCatalogClient;
import com.oracle.bmc.datacatalog.model.ImportLineageDetails;
import com.oracle.bmc.datacatalog.requests.ImportLineageRequest;
import com.oracle.bmc.datacatalog.responses.ImportLineageResponse;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OciTransport extends Transport {

    private static final Logger log = LoggerFactory.getLogger(OciTransport.class);
    private final OciConfig config;
    private final String SESSION_TOKEN_AUTH = "security_token";
    private final boolean isValidConfig;

    public OciTransport(final OciConfig ociConfig) {
        config = ociConfig;
        isValidConfig = isValidConfig();
    }

    /***
     * Selectively emit only complete events to OCI Data Catalog
     * @param runEvent
     */
    @Override
    public void emit(OpenLineage.RunEvent runEvent) {
        if (runEvent.getEventType().equals(OpenLineage.RunEvent.EventType.COMPLETE)) {
            log.info("Found event type - Complete. Emitting event using OciTransport");
            final String eventAsJson = OpenLineageClientUtils.toJson(runEvent);
            emit(eventAsJson);
        } else {
            log.info("Found event type - {}. Skipping emission",runEvent.getEventType());
        }
    }

    @SneakyThrows
    @Override
    public void emit(String eventAsJson) {
        if (!isValidConfig) {
            log.info("OCI config is not valid. Please update OCI config. Skipping lineage emission.");
            return;
        }
        AuthenticationDetailsProvider provider;
        if (config.getAuthType()!=null && config.getAuthType().equals(SESSION_TOKEN_AUTH)) {
            provider = new SessionTokenAuthenticationDetailsProvider(config.getAuthProfile());
        } else {
            provider = new ConfigFileAuthenticationDetailsProvider(config.getAuthProfile());
        }

        try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
            if (config.getEndpoint() != null && !config.getEndpoint().isBlank()) {
                client.setEndpoint(config.getEndpoint());
            }
            ImportLineageRequest importLineageRequest = ImportLineageRequest
                .builder()
                .importLineageDetails(ImportLineageDetails
                    .builder()
                    .lineagePayload(eventAsJson.getBytes())
                    .build())
                .dataAssetKey(config.getDataAssetKey())
                .catalogId(config.getCatalogId())
                .build();
            ImportLineageResponse response = client.importLineage(importLineageRequest);
            log.info("Pushed lineage event to catalog - {}, opc-request-id = {}", config.getCatalogId(), response.getOpcRequestId());
        } catch (Exception ex) {
            log.warn("Failed to push lineage {}", ex.getMessage(), ex);
        }
    }

    private boolean isValidConfig() {
        log.info("OCI Config - {}", config.toString());
        if (config.getAuthProfile() == null || config.getAuthProfile().isBlank()) {
            log.info("OCI config is missing Authentication profile. Lineage will not be emitted to OCI Data Catalog. " +
                "Check documentation on how to create Authentication profile.");
            return false;
        }
        if (config.getCatalogId() == null || config.getCatalogId().isBlank()) {
            log.info("OCI config is missing CatalogId. Lineage will not be emitted to OCI Data Catalog. " +
                "Check documentation on how to get CatalogId.");
            return false;
        }
        if (config.getDataAssetKey() == null || config.getDataAssetKey().isBlank()) {
            log.info("OCI config is missing DataAssetResourceId. Lineage will not be emitted to OCI Data Catalog. " +
                "Check documentation on how to create DataAsset and get its DataAssetResourceId");
            return false;
        }
        return true;
    }
}

Openlineage Plugin Extension - OciTransportBuilder

package io.openlineage.client.transports;

public class OciTransportBuilder implements TransportBuilder {

    @Override
    public String getType() {
        return "oci";
    }

    @Override
    public TransportConfig getConfig() {
        return new OciConfig();
    }

    @Override
    public Transport build(TransportConfig config) {
        return new OciTransport((OciConfig) config);
    }
}

Spark Submit Options

--packages "io.openlineage:openlineage-spark:1.8.0,com.oracle.oci.sdk:oci-java-sdk-common:{oci-sdk-version},com.oracle.oci.sdk:oci-java-sdk-common-httpclient-jersey:{oci-sdk-version},com.oracle.pic.dcat:datacatalog-java-client:{oci-sdk-version},io.openlineage:openlineage-oci-extension:{generated-from-above-mentioned-code-snippet}"
--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
--conf "spark.openlineage.debugFacet=enabled"
--conf "spark.openlineage.transport.type=oci"
--conf "spark.openlineage.transport.catalogId={catalog instance OCID}"
--conf "spark.openlineage.transport.dataAssetKey={UUID of the dataAsset registered in Data catalog}"
--conf "spark.openlineage.transport.authProfile={user authentication profile}"
--conf "spark.openlineage.transport.authType={optionally specify security_token as auth type}"
--conf "spark.openlineage.application.name={application name to display on catalog UI}"

Publish Data Lineage to Data Catalog

To publish data lineage to Data Catalog, you must create an IAM user and setup authentication for this user on the system where the data processing applications are running. This document provides details on the available authentication methods to connect to OCI services. Specify the following IAM policy for the user

allow group lineage-group to CATALOG_LINEAGE_IMPORT in tenancy where all {target.catalog.id = <catalog-ocid>, target.data-asset.key=<data-asset-key>}
Publish Data Lineage to Data Catalog Using Other Apps

For any other data processing application in your ecosystem, you follow the previous steps.

Only the Data Lineage generation step varies depending on the data processing application. You can lookup the Openlineage integrations page to verify if a Openlineage plugin already exists for your application. Otherwise, you can write your own implementation to produce the Data Lineage payload in Openlineage format. The following is sample code to call importLineage endpoint of OCI Data Catalog service.

ImportLineage - Java API

try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
    ImportLineageRequest importLineageRequest = ImportLineageRequest
        .builder()
        .importLineageDetails(ImportLineageDetails
            .builder()
            .lineagePayload(payload.getBytes())
            .build())
        .dataAssetKey(dataAssetKey)
        .catalogId(catalogOcid)
        .build();
    ImportLineageResponse response = client.importLineage(importLineageRequest);
} catch (Exception ex) {
    log.warn("Failed to push lineage {}", ex.getMessage(), ex);
}

ImportLineage - Python API

import oci

# Create a default config using DEFAULT profile in default location
# Refer to
# https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm#SDK_and_CLI_Configuration_File
# for more info
config = oci.config.from_file()


# Initialize service client with default config file
data_catalog_client = oci.data_catalog.DataCatalogClient(config)


# Send the request to service, some parameters are not required, see API
# doc for more info
import_lineage_response = data_catalog_client.import_lineage(
    catalog_id="ocid1.test.oc1..<unique_ID>EXAMPLE-catalogId-Value",
    data_asset_key="EXAMPLE-dataAssetKey-Value",
    import_lineage_details=oci.data_catalog.models.ImportLineageDetails(
        lineage_payload="openlineage-payload"),
    opc_retry_token="EXAMPLE-opcRetryToken-Value",
    opc_request_id="AOR2TTPR06HMXDHS3NZU<unique_ID>")

# Get the data from response
print(import_lineage_response.data)