Skip to content

ingestor

flamme.ingestor

Contain data ingestors.

flamme.ingestor.BaseIngestor

Bases: ABC

Define the base class to implement a DataFrame ingestor.

Example usage:

>>> from flamme.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor(path="/path/to/frame.parquet")
>>> ingestor
ParquetIngestor(path=/path/to/frame.parquet)
>>> frame = ingestor.ingest()  # doctest: +SKIP

flamme.ingestor.BaseIngestor.ingest

ingest() -> DataFrame

Ingest a DataFrame.

Returns:

Type Description
DataFrame

The ingested DataFrame.

Example usage:

>>> from flamme.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor(path="/path/to/frame.parquet")
>>> frame = ingestor.ingest()  # doctest: +SKIP

flamme.ingestor.ClickHouseIngestor

Bases: BaseIngestor

Implement a clickhouse DataFrame ingestor.

Parameters:

Name Type Description Default
query str

The query to get the data.

required
client Client | dict

The clickhouse client or its configuration. Please check the documentation of clickhouse_connect.get_client to get more information.

required

Example usage:

>>> from flamme.ingestor import ClickHouseIngestor
>>> import clickhouse_connect
>>> client = clickhouse_connect.get_client()  # doctest: +SKIP
>>> ingestor = ClickHouseIngestor(query="", client=client)  # doctest: +SKIP
>>> frame = ingestor.ingest()  # doctest: +SKIP

flamme.ingestor.CsvIngestor

Bases: BaseIngestor

Implement a CSV DataFrame ingestor.

Parameters:

Name Type Description Default
path Path | str

The path to the CSV file to ingest.

required
**kwargs Any

Additional keyword arguments for pandas.read_csv.

{}

Example usage:

>>> from flamme.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor(path="/path/to/frame.csv")
>>> ingestor
ParquetIngestor(path=/path/to/frame.csv)
>>> frame = ingestor.ingest()  # doctest: +SKIP

flamme.ingestor.Ingestor

Bases: BaseIngestor

Implement a simple DataFrame ingestor.

Parameters:

Name Type Description Default
frame DataFrame

The DataFrame to ingest.

required

Example usage:

>>> import pandas as pd
>>> from flamme.ingestor import Ingestor
>>> ingestor = Ingestor(
...     frame=pd.DataFrame(
...         {
...             "col1": [1, 2, 3, 4, 5],
...             "col2": ["1", "2", "3", "4", "5"],
...             "col3": ["1", "2", "3", "4", "5"],
...             "col4": ["a", "b", "c", "d", "e"],
...         }
...     )
... )
>>> ingestor
Ingestor(shape=(5, 4))
>>> frame = ingestor.ingest()

flamme.ingestor.ParquetIngestor

Bases: BaseIngestor

Implement a parquet DataFrame ingestor.

Parameters:

Name Type Description Default
path Path | str

The path to the parquet file to ingest.

required
**kwargs Any

Additional keyword arguments for pandas.read_parquet.

{}

Example usage:

>>> from flamme.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor(path="/path/to/frame.parquet")
>>> ingestor
ParquetIngestor(path=/path/to/frame.parquet)
>>> frame = ingestor.ingest()  # doctest: +SKIP

flamme.ingestor.TransformedIngestor

Bases: BaseIngestor

Implement an ingestor that also transforms the DataFrame.

Parameters:

Name Type Description Default
ingestor BaseIngestor | dict

The base ingestor.

required
transformer BaseDataFrameTransformer | dict

Specifies a pandas.DataFrame transformer or its configuration.

required

Example usage:

>>> from flamme.ingestor import TransformedIngestor, ParquetIngestor
>>> from flamme.transformer.dataframe import ToNumeric
>>> ingestor = TransformedIngestor(
...     ingestor=ParquetIngestor(path="/path/to/frame.csv"),
...     transformer=ToNumeric(columns=["col1", "col3"]),
... )
>>> ingestor
TransformedIngestor(
  (ingestor): ParquetIngestor(path=/path/to/frame.csv)
  (transformer): ToNumericDataFrameTransformer(columns=('col1', 'col3'), ignore_missing=False)
)
>>> frame = ingestor.ingest()  # doctest: +SKIP

flamme.ingestor.is_ingestor_config

is_ingestor_config(config: dict) -> bool

Indicate if the input configuration is a configuration for a BaseIngestor.

This function only checks if the value of the key _target_ is valid. It does not check the other values. If _target_ indicates a function, the returned type hint is used to check the class.

Parameters:

Name Type Description Default
config dict

The configuration to check.

required

Returns:

Type Description
bool

True if the input configuration is a configuration for a BaseIngestor object.

Example usage:

>>> from flamme.ingestor import is_ingestor_config
>>> is_ingestor_config(
...     {"_target_": "flamme.ingestor.CsvIngestor", "path": "/path/to/data.csv"}
... )
True

flamme.ingestor.setup_ingestor

setup_ingestor(
    ingestor: BaseIngestor | dict,
) -> BaseIngestor

Set up an ingestor.

The ingestor is instantiated from its configuration by using the BaseIngestor factory function.

Parameters:

Name Type Description Default
ingestor BaseIngestor | dict

Specifies an ingestor or its configuration.

required

Returns:

Type Description
BaseIngestor

An instantiated ingestor.

Example usage:

>>> from flamme.ingestor import setup_ingestor
>>> ingestor = setup_ingestor(
...     {"_target_": "flamme.ingestor.CsvIngestor", "path": "/path/to/data.csv"}
... )
>>> ingestor
CsvIngestor(path=/path/to/data.csv)