OpenSearch Pipelines
Create and manage OpenSearch pipelines using Data Prepper to ingest data into an OpenSearch cluster.
Data Prepper is an open source data collector that can filter, enrich, transform, normalize, and aggregate data for downstream analysis and visualisation. It's one of the most recommended data ingestion tool for processing large and complex datasets.
The OpenSearch Cluster with Data Prepper feature is currently available in the OC1 realm.
Required Policies
Complete the following tasks before proceeding with the steps described in this topic:
If you're a non-administrator in your tenancy, contact your tenancy administrators to grant these permissions to you. The administrator must update the following users permission to allow non-administrator users to manage and CRUD operations the pipelines
The following policy allows the administrator to grant permission to all users in the respective tenancy:
Allow any-user to manage opensearch-cluster-pipeline in tenancy
The following policy allows the administrator to grant permission for a group in a compartment (recommended)
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
where <group>
is all the users inside that group can access the resource.
The following policy allows OpenSearch pipelines to read the secrets from the Oracle Cloud Infrastructure Vault.
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Object Storage Policies
These policies are only required for Object Storage source:
The following policy allows OpenSearch pipelines to use a bucket from Object Storage as the source coordination persistence:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
The following policy allows OpenSearch pipelines to ingest objects from Object Storage:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
The following policy allows OpenSearch pipelines to read buckets from Object Storage:
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
The following policy allows OpenSearch pipelines to read buckets from the source coordination bucket.
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
OCI Streaming and Self-Managed Kafka Policies
These policies are required for OCI Streaming service or self-managed Kafka sources.
Common Network Policies
These policies are not needed for the public OCI Streaming service.
Policies to be added to allow OpenSearch service to create, read, update the delete the private endpoints in the customer subnet.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
OCI Streaming Service Policies (Public and Private)
These policies are only required for the OCI Streaming service.
The following policy allows OpenSearch pipelines to consume the records from the OCI Streaming service.
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
The following policy allows OpenSearch pipelines to read stream pools from OCI streaming service
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
Self-Managed Kafka Permission
These permissions are only required for self-managed Kafka sources.
Select the following link to add required permission to list the topics, describe the topics, to join the group and to consume the records from the topic by the OpenSearch pipeline:
Network Security Rules
This configuration is only required for Private OCI Streaming service and Self-managed Kafka. In-case of Public OCI Streaming service, select none.
Add an ingress security rule in the Security List of the subnet or the Network Security Group to all OpenSearch pipelines to communicate to the private OCI Streaming service running in your subnet.
To add the security rule, see Security Rules and Access and Security.
To find the CIDR of your subnet, see Getting a Subnet's Details.
The following image shows the ingress rules for the Network Security Group.

Creating the Secrets in the Vault
All plain text secrets that are required by the OpenSearch pipelines should be passed to it through Vault, as OpenSearch pipelines doesn't accept plain text secrets such as usernames and passwords in the pipelines yaml code.
Perform the following tasks:
-
Create a new user with write permissions, in your OpenSearch cluster. For instructions, see the following OpenSearch documentation topics:
-
Create secrets (username and password) in Vault for the new user you created. For instructions, see the following Oracle Cloud Infrastructure Vault topics:
-
Add the following resource principal policies in your OpenSearch tenancy to allow OpenSearch pipelines to read the secrets from the Vault.
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
For more information on see creating policies for Vault, see Details for the Vault Service.
You can perform the following OpenSearch pipeline tasks:
List the OpenSearch pipelines in a compartment.
Create a new OpenSearch pipeline.
Get an OpenSearch pipeline's details.
Supported Processors
The following table list those processors are supported in the pipeline.
Object Storage and Source Coordination YAML
Object Storage source depends the source coordination configuration. The OpenSearch pipeline support source coordination using Object Storage as persistence. You must provide the Object Storage bucket details from your tenancy.
The following is an example of source coordination:
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
For information on how to obtain the Object Storage namespace of your tenancy, see Understanding Object Storage Namespaces.
The following is an example of source coordination using Object Storage persistence:
source_coordination:
store:
oci-object-bucket:
name: "dataprepper-test-pipelines" <-- bucket name
namespace: "idee4xpu3dvm". <-- namespace
Object Storage YAML
The following are sections of the pipeline configuration YAML to be aware of:
-
OCI Secrets: Yo can create a secret in Vault with your OpenSearch cluster credentials and use it in the pipeline YAML for connecting to the OpenSearch cluster.
-
OpenSearch Sink: The sink contains OpenSearch cluster OCIDs with index names for ingestion.
-
oci-object source: The Data Prepper supports scan-based ingestion using Object Storage which has many configurations supported. You can configure the source to ingest objects within you Object Storage bucket based on scheduled frequency or not based on any schedule. You have the following scan options
-
One or more time scan: This option allows you to configure pipeline which reads objects in Object Storage buckets one or more times based on last modified time of objects.
-
Scheduling based scan: This option allows you to schedule a scan on a regular interval after pipeline is created.
-
The following table lists the options you can use to configure the Object Storage source.
Options | Required | Type | Description |
---|---|---|---|
acknowledgments |
No | Boolean | When true , enables Object Storage object sources to receive end-to-end acknowledgments when events are received by OpenSearch sinks. |
buffer_timeout |
No | Duration | The amount of time allowed for writing events to the Data Prepper buffer before timeout occurs. Any events that the OCI source can't write to the buffer during the specified amount of time are discarded. Default is 10s . |
codec |
Yes | Codec | The codec from data-prepper to apply. |
compression |
No | String | The compression algorithm to apply: none , gzip , snappy , or automatic . Default is none . |
delete_oci_objects_on_read |
No | Boolean | When true , the Object Storage source scan attempts to delete Object Storage objects after all events from the Object Storage object are successfully acknowledged by all sinks. acknowledgments should be enabled when deleting Object Storage objects. Default is false . Delete doesn't work if end-to-end acknowledgments isn't enabled. |
oci |
No | OCI | The OCI configuration. See the following OCI section for more information. |
records_to_accumulate |
No | Integer | The number of messages that accumulate before being written to the buffer. Default is 100 . |
workers |
No | Integer | Configures the number of worker threads that the source uses to read data from OCI bucket. Leaving this value at the default unless your Object Storage objects are less than 1 MB. Performance may decrease for larger Object Storage objects. Default is 1 . |
The following is an example of the Object Storage pipeline configuration YAML:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
compression: none
scan:
start_time: 2024-11-18T08:01:59.363Z
buckets:
- bucket:
namespace: <namespace>
name: <bucket-name>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Sample Configurations
The following are one time scan options you can apply on individual Object Storage bucket level or at scan level
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "<namespace>"
name: "data-prepper-object-storage-testing"
start_time: 2023-01-01T00:00:00Z
compression: "none"
The following is an example of end time:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
end_time: 2024-12-01T00:00:00Z
compression: "none"
The following is an example of start time and end time:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
compression: "none"
The following is an example of range:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
range: "PT12H"
compression: "none"
The following is an example of start time, end time and range:
oci-object:
codec:
newline:
scan:
start_time: 2023-01-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
range: "PT12H"
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
The following is an example of the include_prefix
filter:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest1", "10-05-2024"]
compression: "none"
The following is an example of filter files from folder. To read files only from specific folders, use filter to specify folders. Here is an example of including files from folder2 within folder1 using include_prefix
.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["folder1/folder2"]
compression: "none"
The following is an example of the exclude_prefix
filter:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest", "10-05-2024"]
exclude_suffix: [".png"]
compression: "none"
The following is an example of codec support for JSON:
source:
oci-object:
acknowledgments: true
codec:
json: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
The following is an example of codec support for CSV:
source:
oci-object:
acknowledgments: true
codec:
csv: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
The following is an example of codec support for newline:
source:
oci-object:
acknowledgments: true
codec:
newline: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
The following is an example of scheduling ingestion options without count:
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
The following is an example of scheduling ingestion options with count:
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
count: 10
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
The following is an example of scheduling ingestion options with start time:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
start_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
The following is an example of scheduling ingestion options with end time:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
end_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Kafka YAML
The Kafka source doesn't require any source coordination.
For information on all the available configurations for the Kafka Source, access the following link:
https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/
You can use the OCI Streaming Service as the Kafka source to ingest into the OpenSearch cluster. For information on how to do this, see Using Kafka APIs.
OCI Streaming Public Access YAML
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Pipelines OCI Streaming Service Private Access YAML
The following is an example of the YAML for the OpenSearch pipelines in the OCI Streaming service:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Self-Managed Kafka YAML
The following is an example of the self-managed Kafka YAML for the OpenSearch:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-credentials:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
kafka:
bootstrap_servers:
- "https://<bootstrap_server_fqdn>:9092"
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
certificate: <certificate-in-pem-format>
authentication:
sasl:
plaintext:
username: ${{oci_secrets:kafka-credentials:username}}
password: ${{oci_secrets:kafka-credentials:password}}
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>