mlm_insights.mlm_dask.readers package

Submodules

mlm_insights.mlm_dask.readers.csv_dask_data_readers module

class mlm_insights.mlm_dask.readers.csv_dask_data_readers.CSVDaskDataReader(file_path: List[str] | str = '', data_source: DataSource | None = None, **kwargs: Any)

Bases: DaskDataReader

This Data Reader can read CSV using Dask execution engine.
This reader can handle reading both from Local file system and OCI Object storage as well.

Configuration

file_path: Union[List[str], str]
  • The path or list of paths to CSV files.

data_source: Optional[DataSource]
  • A DataSource object to read data from.

Sample code

For reading from file_path which is a string
    test_files = [
        'data/csv/2000-01-01.csv',
        'data/csv/2000-01-30.csv'
    ]
    csv_reader = CSVDaskDataReader(file_path=test_files)
    actual_df = csv_reader.read(None)

For reading using some datasource
    data_source_args = {
        'bucket_name': bucket_name,
        'namespace': namespace,
        'object_prefix': object_prefix,
        'file_type': 'csv',
        'storage_options' : {"config": "~/.oci/config"} to authenticate the file systems
    }
    file_location = 'oci://%s@%s/%s' % (bucket_name, namespace, object_prefix)
    ds = SomeDataSource(file_path=file_location, **data_source_args)
    csv_reader = CSVDaskDataReader(data_source=ds)
    actual_df = csv_reader.read(None)
classmethod create(config: Dict[str, Any]) DataReader

Factory method to create an instance of CSVDaskDataReader from a configuration dictionary.

Parameters

config (Dict[str, Any]):
  • A dictionary containing configuration information.

Configuration

file_path: Union[List[str], str]
  • The path or list of paths to CSV files.

Returns:

CSVDaskDataReader: An instance of CSVDaskDataReader.

read(schema_provider: SchemaProvider, **kwargs: Any) DataFrame
Reads the data from the local file system / OCI file system.

Parameters

schema_providerSchemaProvider
  • dtypes of the column present

Other parameters

storage_options :
  • {“config”: “~/.oci/config”} to authenticate the file systems

kwargs :
  • Extra keyword arguments to forward to dask.read_csv().

Returns

dask.dataframe.DataFrame
  • Result of reading the data from the local file system / OCI file system.

mlm_insights.mlm_dask.readers.jsonl_dask_data_readers module

class mlm_insights.mlm_dask.readers.jsonl_dask_data_readers.JsonlDaskDataReader(file_path: List[str] | str = '', data_source: DataSource | None = None, additional_read_config: Dict[str, Any] | None = None, **kwargs: Any)

Bases: DaskDataReader

This Data Reader can read JSONL using Dask execution engine.
This reader can handle reading both from Local file system and OCI Object storage as well.

Configuration

file_path: Union[List[str], str]
  • The path or list of paths to JSONL files.

data_source Optional[DataSource]:
  • A DataSource object to read data from.

Sample code

For reading from file_path which is a string
    test_files = [
        'data/jsonl/2000-01-01.jsonl',
        'data/jsonl/2000-01-30.jsonl'
    ]
    jsonl_reader = JsonlDaskDataReader(file_path=test_files)
    actual_df = jsonl_reader.read(None)

For reading using some datasource
    data_source_args = {
        'bucket_name': bucket_name,
        'namespace': namespace,
        'object_prefix': object_prefix,
        'file_type': 'jsonl',
        'storage_options' : {"config": "~/.oci/config"} to authenticate the file systems
    }

    file_location = 'oci://%s@%s/%s' % (bucket_name, namespace, object_prefix)
    ds = SomeDataSource(file_path=file_location, **data_source_args)
    jsonl_reader = JsonlDaskDataReader(data_source=ds)
    actual_df = jsonl_reader.read(None)
classmethod create(config: Dict[str, Any]) JsonlDaskDataReader
Factory method to create an instance of JsonlDaskDataReader from a configuration dictionary.
Creates data reader specific to ‘Dask’ engine

Parameters

config (Dict[str, Any]):
  • A dictionary containing configuration information.

Configuration

file_path: Union[List[str], str]
  • The path or list of paths to JSONL files.

data_source: Optional[DataSource]
  • A DataSource object to read data from.

Parameters

configDict[str, Any]

dictionary for providing config inputs like FILE_PATH_KEY or DATA_SOURCE

Returns:

JsonlDaskDataReader: An instance of JsonlDaskDataReader.

read(schema_provider: SchemaProvider, **kwargs: Any) DataFrame
Read the data form the local file system / oci file system

Parameters

schema_providerSchemaProvider
  • dtypes of the column present

Other parameters

storage_options :
  • {“config”: “~/.oci/config”} to authenticate the file systems

kwargs:
  • Extra keyword arguments to forward to dask.read_json().

Returns

dask.dataframe.DataFrame
  • Result of reading the data from the local file system / OCI file system.

mlm_insights.mlm_dask.readers.nested_json_dask_data_reader module

class mlm_insights.mlm_dask.readers.nested_json_dask_data_reader.NestedJsonDaskDataReader(file_path: List[str] | str = '', query: str = '', query_engine_name: str = '', data_source: DataSource | None = None, **kwargs: Any)

Bases: DaskDataReader

This Data Reader can extract data from Nested JSON using Dask execution engine.
This reader can handle reading both from Local file system and OCI Object storage as well.

Configuration

file_path: Union[List[str], str]
  • The path or list of paths to JSON files.

data_source Optional[DataSource]:
  • A DataSource object to read data from.

query: str
  • A query string to extract data from the JSON files.

query_engine_namestr
  • Name of query engine to run the query. Currently, only JMESPATH is supported.

Sample code:

For reading from file_path which is a string
    test_files = [
        'data/json/2000-01-01.json',
        'data/json/2000-01-30.json'
    ]
    query = "user defined query"
    query_engine_name = "JMESPATH"

    nested_json_reader = NestedJsonDaskDataReader(file_path=test_files, query=query, query_engine_name=query_engine_name)
    actual_df = nested_json_reader.read(None)

For reading using some datasource
    data_source_args = {
        'bucket_name': bucket_name,
        'namespace': namespace,
        'object_prefix': object_prefix,
        'file_type': 'jsonl',
        'storage_options' : {"config": "~/.oci/config"} to authenticate the file systems
    }

    file_location = 'oci://%s@%s/%s' % (bucket_name, namespace, object_prefix)
    ds = SomeDataSource(file_path=file_location, **data_source_args
    query = "user defined query"
    query_engine_name = "JMESPATH"
    nested_json_reader = NestedJsonDaskDataReader(data_source=ds,query=query, query_engine_name=query_engine_name)
    actual_df = nested_json_reader.read(None)
classmethod create(config: Dict[str, Any]) NestedJsonDaskDataReader
Factory method to create an instance of NestedJsonDaskDataReader from a configuration dictionary.

Parameters

config (Dict[str, Any]):
  • A dictionary containing configuration information.

Configuration

file_path: Union[List[str], str]
  • The path or list of paths to JSON files.

data_source Optional[DataSource]:
  • A DataSource object to read data from.

query: str
  • A query string to extract data from the JSON files.

query_engine_namestr
  • Name of query engine to run the query. Currently, only JMESPATH is supported.

Parameters

configDict[str, Any]

dictionary for providing config inputs like FILE_PATH_KEY or DATA_SOURCE, QUERY and QUERY_ENGINE_NAME.

Returns:

NestedJsonDaskDataReader: An instance of NestedJsonDaskDataReader.

read(schema_provider: SchemaProvider, **kwargs: Any) DataFrame | Series
Reads the data from the local file system / OCI file system.

Parameters

schema_providerSchemaProvider
  • dtypes of the column present

Other parameters

kwargs:
  • Extra keyword arguments to forward to dask.DataFrame.

Returns

dask.dataframe.DataFrame
  • Result of reading the data from the local file system / OCI file system.

Module contents