mlm_insights.core.data_sources package

Subpackages

Submodules

mlm_insights.core.data_sources.data_source module

class mlm_insights.core.data_sources.data_source.DataSource(type: str, **kwargs: Any)

Bases: ABC

This interface is responsible for encapsulating the file_path, file type. It can be used to implement special functionality that allow for taking parameters and forming a list of file paths to be read by the readers.

For example: if current date needs to be used for reading a specific folder with today’s date, a data source can be used for this purpose.

It is an optional component to implement and use. It can be omitted if the customer explicitly passes the file paths/glob expressions to be read by the readers.

fetch(filename: str, **kwargs: Any) Any

This method is responsible for fetching the contents of the file for the underlying data source

Parameters

filename:

The canonical file path for which the client has to fetch the raw content

kwargs:

Extra keyword arguments.

Returns

Any:

The raw content of the file in the accepted format by underlying engine read method by default returns the file path

get_client(**kwargs: Any) Any

Parameters

kwargs:

Extra keyword arguments.

Returns

Any:

the underlying authenticated client if any

abstract get_data_location(**kwargs: Any) List[str]

This method is responsible for returning the list of files for the underlying datasource path

Parameters

kwargs:

Extra keyword arguments.

Returns

List of file Paths

mlm_insights.core.data_sources.file_url_data_source module

class mlm_insights.core.data_sources.file_url_data_source.FileUrlDataSource(file_path: List[str] | str = '', **kwargs: Any)

Bases: DataSource

This is the default data source used by the Dask Data Reader, in case no explicit Data Source is passed. It is not meant to be used by the users directly.

Returns

List[str]:

List of files present on the file path in the data location

get_data_location(**kwargs: Any) List[str]

Parameters

kwargs:

Extra keyword arguments

Returns

List[str]:

List of files present on the file path in the data location

mlm_insights.core.data_sources.local_date_prefix_data_source module

class mlm_insights.core.data_sources.local_date_prefix_data_source.LocalDatePrefixDataSource(base_location: str, file_type: str, offset: int = -1, date_range: Dict[Any, Any] = {}, **kwargs: Any)

Bases: DataSource

This class implements the OOB Data source for retrieving file locations based on a user provided offset or date range. These set of locations are passed to the reader for reading.

User needs to provide only 1 of the 2 - either offset or date range for calculating the folder location. If both are provided, date range is given preference followed by offset value.

Configuration

base_location: str

The prefix to the folder location

file_type: str

File format for the input data files. eg. csv, jsonl etc.

date_range: Dict[str, str]
  • Specify the date range for the dates to be used for folder locations, with ‘start’ and ‘end’ as keys. eg. {'start': '2023-03-18', 'end': '2023-03-19'}

  • Either date range or offset needs to be provided by the user

offset: int, default=-1

No. of days from current time, for calculating the date to pick up data from. Example: for yesterday, offset=1, for 2 days back date, offset=2

Returns

List[str]:

List of file locations

Example code

# For using date_range
data = {
    "file_type": "csv",
    "date_range": {"start": "2023-03-18", "end": "2023-03-19"}
}
ds = LocalDatePrefixDataSource(base_location, **data)
csv_reader = CSVDaskDataReader(data_source=ds)
# Returns 2 data locations ['<base_location>/2023-03-18/*.csv', '<base_location>/2023-03-19/*.csv']
actual_df = csv_reader.read(None)  # Reads from the data locations

# For using offset
data = {
    "file_type": "csv",
    "offset": 1
}
ds = LocalDatePrefixDataSource(base_location, **data)
csv_reader = CSVDaskDataReader(data_source=ds)
# Returns 1 data location, given today's date is 2023-03-19: ['<base_location>/2023-03-18/*.csv']
actual_df = csv_reader.read(None)  # Reads from the data locations
get_data_location(**kwargs: Any) List[str]

Returns

List[str]:

List of files present in the data location

Raises

DataSourceException :

Exception if the list of files returned is empty.

Notes

Data source returns list of glob expressions

mlm_insights.core.data_sources.local_date_prefix_data_source.validate(base_location: str, offset: int, date_range: Dict[Any, Any], file_type: str) None

mlm_insights.core.data_sources.local_file_data_source module

class mlm_insights.core.data_sources.local_file_data_source.LocalFileDataSource(file_path: List[str] | str = '', **kwargs: Any)

Bases: DataSource

This class implements the OOB Data source for retrieving file locations based on a simple file path string or list of strings or a glob string

Configuration

file_path: Union[List[str], str]

A simple file path string / list of string / glob string

Returns

List[str]:

List of files present on the file path in the local system

Example code

ds = LocalFileDataSource(file_path = 'location/csv/*.csv')
csv_reader = CSVDaskDataReader(data_source=ds)
# Data source will return a list of csv files within the folder location/csv/

actual_df = csv_reader.read(None)  # Reads all the files returned by the LocalFileDataSource
get_data_location(**kwargs: Any) List[str]

Parameters

kwargs:

Extra keyword arguments

Returns

List of files present on the file path in the local system

Raises

DataSourceException :

Exception if the file list is empty

mlm_insights.core.data_sources.oci_date_prefix_data_source module

class mlm_insights.core.data_sources.oci_date_prefix_data_source.OCIDatePrefixDataSource(bucket_name: str, namespace: str, file_type: str, object_prefix: str, offset: int = -1, date_range: Dict[Any, Any] = {}, storage_options: Dict[str, Any] = {}, **kwargs: Any)

Bases: DataSource

This class implements the OOB Data source for retrieving file locations based on a user provided offset or date range from OCI Object storage. These set of locations are passed to the reader for reading

User needs to provide only 1 of the 2 - either offset or date range for calculating the folder location. If both are provided, date range is given preference followed by offset value.

Configuration

bucket_name: str

Name of the bucket

namespace: str

oci cloud namespace of the bucket location

object_prefix: str

folder path of the data relative to the bucket location, cannot be empty

file_type: str

File format for the input data files. eg. csv, jsonl etc.

date_range: Dict[str, str]
  • Specify the date range for the dates to be used for folder locations, with ‘start’ and ‘end’ as keys. eg. {'start': '2023-03-18', 'end': '2023-03-19'}

  • Either date range or offset needs to be provided by the user

offset: int, default=-1

No. of days from current time, for calculating the date to pick up data from. Example: for yesterday, offset=1, for 2 days back date, offset=2

storage_options: Dict[str, Any]

storage options are the authentication provided to the underlying ocifs client

Returns

List[str]:

List of OCI Object storage file locations

Example code

# For using date_range
data = {
    "bucket_name": "mlm",
    "namespace": "mlm",
    "object_prefix": "mlm",
    "file_type": "csv",
    "date_range": {"start": "2023-03-18", "end": "2023-03-19"}
}
ds = OCIDatePrefixDataSource(**data)
csv_reader = CSVDaskDataReader(data_source=ds)
# Returns 2 data locations ['oci://mlm@mlm/mlm/2023-03-18/*.csv', 'oci://mlm@mlm/mlm/2023-03-19/*.csv']
actual_df = csv_reader.read(None)  # Reads from the data locations

# For using offset
data = {
    "bucket_name": "mlm",
    "namespace": "mlm",
    "object_prefix": "mlm",
    "file_type": "csv",
    "offset": 1
}
ds = OCIDatePrefixDataSource(**data)
csv_reader = CSVDaskDataReader(data_source=ds)
# Returns 1 data location, given today's date is 2023-03-19: ['oci://mlm@mlm/mlm/2023-03-18/*.csv']
actual_df = csv_reader.read(None)  # Reads from the data locations
get_data_location(**kwargs: Any) List[str]

Parameters

kwargs:

Extra keyword arguments

Returns

List of files present in the Object storage data location :List[str]

Raises

DataSourceException

If the list of files returned is empty.

Notes

Data source returns list of glob expressions

mlm_insights.core.data_sources.oci_date_prefix_data_source.validate(bucket_name: str, namespace: str, object_prefix: str, offset: int, date_range: Dict[Any, Any], file_type: str) None

mlm_insights.core.data_sources.oci_object_storage_data_source module

class mlm_insights.core.data_sources.oci_object_storage_data_source.OCIObjectStorageDataSource(file_path: List[str] | str = '', storage_options: Dict[str, Any] = {}, **kwargs: Any)

Bases: DataSource

This class implements the OOB Data source for retrieving file locations based on an OCI file path string or list of OCI file path strings or a glob string.

Configuration

file_path: Union[List[str], str]

A simple file path string / list of string / glob string

Returns

List[str]:

List of files present on the file path in the oci object system

Example code

ds = OCIObjectStorageDataSource(file_path = 'oci://location/csv/*.csv')
csv_reader = CSVDaskDataReader(data_source=ds)
# Data source will return a list of csv files within the OCI Object store location: oci://location/csv/

actual_df = csv_reader.read(None)  # Reads all the files returned by the OCIObjectStorageDataSource
get_client(**kwargs: Any) OCIFileSystem

Parameters

kwargs:

Extra keyword arguments

Returns

object_storage_client: ocifs.OCIFileSystem

Object store client

get_data_location(**kwargs: Any) List[str]

Parameters

kwargs:

Extra keyword arguments

Returns

List[str]:

List of files present on the file path in the oci object system

Raises

DataSourceException: DataSourceException

Exception if the file list is empty

mlm_insights.core.data_sources.oci_object_storage_data_source.validate(file_path: List[str] | str) None

mlm_insights.core.data_sources.oci_object_storage_file_search_data_source module

class mlm_insights.core.data_sources.oci_object_storage_file_search_data_source.ObjectStorageFileSearchDataSource(file_path: List[str] | str = '', storage_options: Dict[str, Any] = {}, filter_arg: List[Any] = [], **kwargs: Any)

Bases: DataSource

This class implements the ObjectStorageFileSearch Data source for retrieving file locations based on an OCI file path string or list of OCI file path strings and filters provided by user .

Configuration

file_path: Union[List[str], str]

A simple file path string / list of string / glob string

filter_arg: List[Any]

A list of filters arguments

Returns

List[str]:

List of files present on the file path in the oci object system

Example code

data = [
  { "contains": "iris_dataset"},
  {"date_range": {"start": "2024-01-01", "end": "2024-01-09", "date_format" : "yyyy-mm-dd", "search_type" : "metadata"}},
  {"filetype" : "csv"},
  {"suffix": "iris.csv"}
]
base_locations = 'oci://location/csv/*.csv'
ds = ObjectStorageFileSearchDataSource(file_path=base_locations, data_arg=data)
csv_reader = CSVNativeDataReader(data_source=ds)
# Data source will return a list of csv files within the OCI Object store location: oci://location/csv/

actual_df = csv_reader.read(None)  # Reads all the files returned by the OCIObjectStorageDataSource
get_client(**kwargs: Any) OCIFileSystem

Parameters

kwargs:

Extra keyword arguments

Returns

object_storage_client: ocifs.OCIFileSystem

Object store client

get_data_location(**kwargs: Any) List[str]

Parameters

kwargs:

Extra keyword arguments

Returns

List[str]:

List of files present on the file path in the oci object system

Raises

DataSourceException: DataSourceException

Exception if invalid filter argument passed

get_filter(filter_name: str) Any

Parameters

filter_name:

Get filter name

Returns

Filter class from interface

Module contents