Skip to content

ingestor

grizz.ingestor

Contain DataFrame ingestors.

grizz.ingestor.BaseIngestor

Bases: ABC

Define the base class to implement a DataFrame ingestor.

Example usage:

>>> from grizz.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor("/path/to/frame.parquet")
>>> ingestor
ParquetIngestor(source=/path/to/frame.parquet)
>>> frame = ingestor.ingest()  # doctest: +SKIP

grizz.ingestor.BaseIngestor.equal abstractmethod

equal(other: Any, equal_nan: bool = False) -> bool

Indicate if two ingestor objects are equal or not.

Parameters:

Name Type Description Default
other Any

The other object to compare.

required
equal_nan bool

Whether to compare NaN's as equal. If True, NaN's in both objects will be considered equal.

False

Returns:

Type Description
bool

True if the two ingestors are equal, otherwise False.

Example usage:

>>> from grizz.ingestor import CsvIngestor
>>> obj1 = CsvIngestor("/path/to/frame.csv")
>>> obj2 = CsvIngestor("/path/to/frame.csv")
>>> obj3 = CsvIngestor("/path/to/frame2.csv")
>>> obj1.equal(obj2)
True
>>> obj1.equal(obj3)
False

grizz.ingestor.BaseIngestor.ingest abstractmethod

ingest() -> DataFrame

Ingest a DataFrame.

Returns:

Type Description
DataFrame

The ingested DataFrame.

Raises:

Type Description
DataNotFoundError

if the DataFrame cannot be ingested.

Example usage:

>>> from grizz.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor("/path/to/frame.parquet")
>>> frame = ingestor.ingest()  # doctest: +SKIP

grizz.ingestor.CacheIngestor

Bases: BaseIngestor

Implement an ingestor that attempts the fast ingestor first, falling back to the slow ingestor if needed.

Internally, this ingestor attempts to load the DataFrame using the fast ingestor. If a DataNotFoundError is raised, it falls back to the slow ingestor, then exports the DataFrame for ingestion by the fast ingestor during the next cycle.

Parameters:

Name Type Description Default
fast_ingestor BaseIngestor | dict

The fast DataFrame ingestor or its configuration.

required
slow_ingestor BaseIngestor | dict

The slow DataFrame ingestor or its configuration.

required
exporter BaseExporter | dict

The DataFrame exporter or its configuration. The DataFrame exporter is responsible for storing the output of the slower DataFrame ingestor, allowing it to be ingested by the faster DataFrame ingestor during the next ingestion cycle.

required

Example usage:

>>> import polars as pl
>>> from grizz.ingestor import CacheIngestor, Ingestor
>>> from grizz.exporter import InMemoryExporter
>>> slow_ingestor = Ingestor(
...     pl.DataFrame(
...         {
...             "col1": ["1", "2", "3", "4", "5"],
...             "col2": ["a", "b", "c", "d", "e"],
...             "col3": [1.2, 2.2, 3.2, 4.2, 5.2],
...         }
...     )
... )
>>> exporter_ingestor = InMemoryExporter()
>>> ingestor = CacheIngestor(
...     fast_ingestor=exporter_ingestor,
...     slow_ingestor=slow_ingestor,
...     exporter=exporter_ingestor,
... )
>>> ingestor
CacheIngestor(
  (fast_ingestor): InMemoryExporter(frame=None)
  (slow_ingestor): Ingestor(shape=(5, 3))
  (exporter): InMemoryExporter(frame=None)
)
>>> frame = ingestor.ingest()
>>> frame
shape: (5, 3)
┌──────┬──────┬──────┐
│ col1 ┆ col2 ┆ col3 │
│ ---  ┆ ---  ┆ ---  │
│ str  ┆ str  ┆ f64  │
╞══════╪══════╪══════╡
│ 1    ┆ a    ┆ 1.2  │
│ 2    ┆ b    ┆ 2.2  │
│ 3    ┆ c    ┆ 3.2  │
│ 4    ┆ d    ┆ 4.2  │
│ 5    ┆ e    ┆ 5.2  │
└──────┴──────┴──────┘

grizz.ingestor.ClickHouseArrowIngestor

Bases: BaseIngestor

Implement a ClickHouse DataFrame ingestor that uses Arrow.

This ingestor requires clickhouse_connect and pyarrow.

Parameters:

Name Type Description Default
query str

The query to get the data.

required
client Client | dict

The ClickHouse client or its configuration. Please check the documentation of clickhouse_connect.get_client to get more information.

required

Example usage:

>>> from grizz.ingestor import ClickHouseArrowIngestor
>>> import clickhouse_connect
>>> client = clickhouse_connect.get_client()  # doctest: +SKIP
>>> ingestor = ClickHouseArrowIngestor(query="", client=client)  # doctest: +SKIP
>>> frame = ingestor.ingest()  # doctest: +SKIP

grizz.ingestor.CsvFileIngestor

Bases: CsvIngestor

Implement a CSV file ingestor.

Parameters:

Name Type Description Default
path Path | str

The path to the CSV file to ingest.

required
**kwargs Any

Additional keyword arguments for polars.scan_csv.

{}

Example usage:

>>> from grizz.ingestor import CsvFileIngestor
>>> ingestor = CsvFileIngestor(path="/path/to/frame.csv")
>>> ingestor
CsvFileIngestor(source=/path/to/frame.csv)
>>> frame = ingestor.ingest()  # doctest: +SKIP

grizz.ingestor.CsvIngestor

Bases: BaseIngestor

Implement a CSV ingestor.

Parameters:

Name Type Description Default
source FileSource

The source to the CSV data to ingest.

required
**kwargs Any

Additional keyword arguments for polars.scan_csv.

{}

Example usage:

>>> from grizz.ingestor import CsvIngestor
>>> ingestor = CsvIngestor(source="/path/to/frame.csv")
>>> ingestor
CsvIngestor(source=/path/to/frame.csv)
>>> frame = ingestor.ingest()  # doctest: +SKIP

grizz.ingestor.Ingestor

Bases: BaseIngestor

Implement a simple DataFrame ingestor.

Parameters:

Name Type Description Default
frame DataFrame

The DataFrame to ingest.

required

Example usage:

>>> import polars as pl
>>> from grizz.ingestor import Ingestor
>>> ingestor = Ingestor(
...     frame=pl.DataFrame(
...         {
...             "col1": [1, 2, 3, 4, 5],
...             "col2": ["1", "2", "3", "4", "5"],
...             "col3": ["1", "2", "3", "4", "5"],
...             "col4": ["a", "b", "c", "d", "e"],
...         }
...     )
... )
>>> ingestor
Ingestor(shape=(5, 4))
>>> frame = ingestor.ingest()

grizz.ingestor.JoinIngestor

Bases: BaseIngestor

Implement an ingestor that joins the output of multiple ingestors.

Parameters:

Name Type Description Default
ingestors Sequence[BaseIngestor | dict]

The list of ingestors.

required
**kwargs Any

Additional keyword arguments for polars.DataFrame.join.

{}

Example usage:

>>> import polars as pl
>>> from grizz.ingestor import JoinIngestor, Ingestor
>>> ingestor1 = Ingestor(
...     frame=pl.DataFrame(
...         {
...             "col": [1, 2, 3, 4, 5],
...             "col1": ["1", "2", "3", "4", "5"],
...             "col2": ["a", "b", "c", "d", "e"],
...         }
...     )
... )
>>> ingestor2 = Ingestor(
...     frame=pl.DataFrame(
...         {
...             "col": [1, 2, 3, 5],
...             "col3": [-1, -2, -3, -5],
...         }
...     )
... )
>>> ingestor3 = Ingestor(
...     frame=pl.DataFrame(
...         {
...             "col": [1, 2, 3, 4, 5],
...             "col4": [1.1, 2.2, 3.3, 4.4, 5.5],
...             "col5": ["1.1", "2.2", "3.3", "4.4", "5.5"],
...         }
...     )
... )
>>> ingestor = JoinIngestor([ingestor1, ingestor2, ingestor3], on="col", how="inner")
>>> ingestor
JoinIngestor(
  (ingestors):
    (0): Ingestor(shape=(5, 3))
    (1): Ingestor(shape=(4, 2))
    (2): Ingestor(shape=(5, 3))
  (kwargs): on='col', how='inner'
)
>>> frame = ingestor.ingest()
>>> frame
shape: (4, 6)
┌─────┬──────┬──────┬──────┬──────┬──────┐
│ col ┆ col1 ┆ col2 ┆ col3 ┆ col4 ┆ col5 │
│ --- ┆ ---  ┆ ---  ┆ ---  ┆ ---  ┆ ---  │
│ i64 ┆ str  ┆ str  ┆ i64  ┆ f64  ┆ str  │
╞═════╪══════╪══════╪══════╪══════╪══════╡
│ 1   ┆ 1    ┆ a    ┆ -1   ┆ 1.1  ┆ 1.1  │
│ 2   ┆ 2    ┆ b    ┆ -2   ┆ 2.2  ┆ 2.2  │
│ 3   ┆ 3    ┆ c    ┆ -3   ┆ 3.3  ┆ 3.3  │
│ 5   ┆ 5    ┆ e    ┆ -5   ┆ 5.5  ┆ 5.5  │
└─────┴──────┴──────┴──────┴──────┴──────┘

grizz.ingestor.ParquetFileIngestor

Bases: ParquetIngestor

Implement a parquet file ingestor.

Parameters:

Name Type Description Default
path Path | str

The path to the parquet file to ingest.

required
**kwargs Any

Additional keyword arguments for polars.read_parquet.

{}

Example usage:

>>> from grizz.ingestor import ParquetFileIngestor
>>> ingestor = ParquetFileIngestor(path="/path/to/frame.parquet")
>>> ingestor
ParquetFileIngestor(source=/path/to/frame.parquet)
>>> frame = ingestor.ingest()  # doctest: +SKIP

grizz.ingestor.ParquetIngestor

Bases: BaseIngestor

Implement a parquet ingestor.

Parameters:

Name Type Description Default
source FileSource

The source to the parquet data to ingest.

required
**kwargs Any

Additional keyword arguments for polars.read_parquet.

{}

Example usage:

>>> from grizz.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor(source="/path/to/frame.parquet")
>>> ingestor
ParquetIngestor(source=/path/to/frame.parquet)
>>> frame = ingestor.ingest()  # doctest: +SKIP

grizz.ingestor.TransformIngestor

Bases: BaseIngestor

Implement an ingestor that also transforms the DataFrame.

Parameters:

Name Type Description Default
ingestor BaseIngestor | dict

The base ingestor.

required
transformer BaseTransformer | dict

The polars.DataFrame transformer or its configuration.

required

Example usage:

>>> import polars as pl
>>> from grizz.ingestor import TransformIngestor, Ingestor
>>> from grizz.transformer import InplaceCast
>>> ingestor = TransformIngestor(
...     ingestor=Ingestor(
...         pl.DataFrame(
...             {
...                 "col1": ["1", "2", "3", "4", "5"],
...                 "col2": ["a", "b", "c", "d", "e"],
...                 "col3": [1.2, 2.2, 3.2, 4.2, 5.2],
...             }
...         )
...     ),
...     transformer=InplaceCast(columns=["col1", "col3"], dtype=pl.Float32),
... )
>>> ingestor
TransformIngestor(
  (ingestor): Ingestor(shape=(5, 3))
  (transformer): InplaceCastTransformer(columns=('col1', 'col3'), exclude_columns=(), missing_policy='raise', dtype=Float32)
)
>>> frame = ingestor.ingest()
>>> frame
shape: (5, 3)
┌──────┬──────┬──────┐
│ col1 ┆ col2 ┆ col3 │
│ ---  ┆ ---  ┆ ---  │
│ f32  ┆ str  ┆ f32  │
╞══════╪══════╪══════╡
│ 1.0  ┆ a    ┆ 1.2  │
│ 2.0  ┆ b    ┆ 2.2  │
│ 3.0  ┆ c    ┆ 3.2  │
│ 4.0  ┆ d    ┆ 4.2  │
│ 5.0  ┆ e    ┆ 5.2  │
└──────┴──────┴──────┘

grizz.ingestor.is_ingestor_config

is_ingestor_config(config: dict) -> bool

Indicate if the input configuration is a configuration for a BaseIngestor.

This function only checks if the value of the key _target_ is valid. It does not check the other values. If _target_ indicates a function, the returned type hint is used to check the class.

Parameters:

Name Type Description Default
config dict

The configuration to check.

required

Returns:

Type Description
bool

True if the input configuration is a configuration for a BaseIngestor object.

Example usage:

>>> from grizz.ingestor import is_ingestor_config
>>> is_ingestor_config(
...     {"_target_": "grizz.ingestor.CsvIngestor", "source": "/path/to/data.csv"}
... )
True

grizz.ingestor.setup_ingestor

setup_ingestor(
    ingestor: BaseIngestor | dict,
) -> BaseIngestor

Set up an ingestor.

The ingestor is instantiated from its configuration by using the BaseIngestor factory function.

Parameters:

Name Type Description Default
ingestor BaseIngestor | dict

An ingestor or its configuration.

required

Returns:

Type Description
BaseIngestor

An instantiated ingestor.

Example usage:

>>> from grizz.ingestor import setup_ingestor
>>> ingestor = setup_ingestor(
...     {"_target_": "grizz.ingestor.CsvIngestor", "source": "/path/to/data.csv"}
... )
>>> ingestor
CsvIngestor(source=/path/to/data.csv)