ingestor
grizz.ingestor ¶
Contain DataFrame ingestors.
grizz.ingestor.BaseIngestor ¶
Bases: ABC
Define the base class to implement a DataFrame ingestor.
Example usage:
>>> from grizz.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor("/path/to/frame.parquet")
>>> ingestor
ParquetIngestor(source=/path/to/frame.parquet)
>>> frame = ingestor.ingest() # doctest: +SKIP
grizz.ingestor.BaseIngestor.equal
abstractmethod
¶
equal(other: Any, equal_nan: bool = False) -> bool
Indicate if two ingestor objects are equal or not.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other
|
Any
|
The other object to compare. |
required |
equal_nan
|
bool
|
Whether to compare NaN's as equal. If |
False
|
Returns:
Type | Description |
---|---|
bool
|
|
Example usage:
>>> from grizz.ingestor import CsvIngestor
>>> obj1 = CsvIngestor("/path/to/frame.csv")
>>> obj2 = CsvIngestor("/path/to/frame.csv")
>>> obj3 = CsvIngestor("/path/to/frame2.csv")
>>> obj1.equal(obj2)
True
>>> obj1.equal(obj3)
False
grizz.ingestor.BaseIngestor.ingest
abstractmethod
¶
ingest() -> DataFrame
Ingest a DataFrame.
Returns:
Type | Description |
---|---|
DataFrame
|
The ingested DataFrame. |
Raises:
Type | Description |
---|---|
DataNotFoundError
|
if the DataFrame cannot be ingested. |
Example usage:
>>> from grizz.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor("/path/to/frame.parquet")
>>> frame = ingestor.ingest() # doctest: +SKIP
grizz.ingestor.CacheIngestor ¶
Bases: BaseIngestor
Implement an ingestor that attempts the fast ingestor first, falling back to the slow ingestor if needed.
Internally, this ingestor attempts to load the DataFrame using
the fast ingestor. If a DataNotFoundError
is raised,
it falls back to the slow ingestor, then exports the DataFrame
for ingestion by the fast ingestor during the next cycle.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fast_ingestor
|
BaseIngestor | dict
|
The fast DataFrame ingestor or its configuration. |
required |
slow_ingestor
|
BaseIngestor | dict
|
The slow DataFrame ingestor or its configuration. |
required |
exporter
|
BaseExporter | dict
|
The DataFrame exporter or its configuration. The DataFrame exporter is responsible for storing the output of the slower DataFrame ingestor, allowing it to be ingested by the faster DataFrame ingestor during the next ingestion cycle. |
required |
Example usage:
>>> import polars as pl
>>> from grizz.ingestor import CacheIngestor, Ingestor
>>> from grizz.exporter import InMemoryExporter
>>> slow_ingestor = Ingestor(
... pl.DataFrame(
... {
... "col1": ["1", "2", "3", "4", "5"],
... "col2": ["a", "b", "c", "d", "e"],
... "col3": [1.2, 2.2, 3.2, 4.2, 5.2],
... }
... )
... )
>>> exporter_ingestor = InMemoryExporter()
>>> ingestor = CacheIngestor(
... fast_ingestor=exporter_ingestor,
... slow_ingestor=slow_ingestor,
... exporter=exporter_ingestor,
... )
>>> ingestor
CacheIngestor(
(fast_ingestor): InMemoryExporter(frame=None)
(slow_ingestor): Ingestor(shape=(5, 3))
(exporter): InMemoryExporter(frame=None)
)
>>> frame = ingestor.ingest()
>>> frame
shape: (5, 3)
┌──────┬──────┬──────┐
│ col1 ┆ col2 ┆ col3 │
│ --- ┆ --- ┆ --- │
│ str ┆ str ┆ f64 │
╞══════╪══════╪══════╡
│ 1 ┆ a ┆ 1.2 │
│ 2 ┆ b ┆ 2.2 │
│ 3 ┆ c ┆ 3.2 │
│ 4 ┆ d ┆ 4.2 │
│ 5 ┆ e ┆ 5.2 │
└──────┴──────┴──────┘
grizz.ingestor.ClickHouseArrowIngestor ¶
Bases: BaseIngestor
Implement a ClickHouse DataFrame ingestor that uses Arrow.
This ingestor requires clickhouse_connect
and pyarrow
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query
|
str
|
The query to get the data. |
required |
client
|
Client | dict
|
The ClickHouse client or its configuration.
Please check the documentation of
|
required |
Example usage:
>>> from grizz.ingestor import ClickHouseArrowIngestor
>>> import clickhouse_connect
>>> client = clickhouse_connect.get_client() # doctest: +SKIP
>>> ingestor = ClickHouseArrowIngestor(query="", client=client) # doctest: +SKIP
>>> frame = ingestor.ingest() # doctest: +SKIP
grizz.ingestor.CsvFileIngestor ¶
Bases: CsvIngestor
Implement a CSV file ingestor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
Path | str
|
The path to the CSV file to ingest. |
required |
**kwargs
|
Any
|
Additional keyword arguments for
|
{}
|
Example usage:
>>> from grizz.ingestor import CsvFileIngestor
>>> ingestor = CsvFileIngestor(path="/path/to/frame.csv")
>>> ingestor
CsvFileIngestor(source=/path/to/frame.csv)
>>> frame = ingestor.ingest() # doctest: +SKIP
grizz.ingestor.CsvIngestor ¶
Bases: BaseIngestor
Implement a CSV ingestor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source
|
FileSource
|
The source to the CSV data to ingest. |
required |
**kwargs
|
Any
|
Additional keyword arguments for
|
{}
|
Example usage:
>>> from grizz.ingestor import CsvIngestor
>>> ingestor = CsvIngestor(source="/path/to/frame.csv")
>>> ingestor
CsvIngestor(source=/path/to/frame.csv)
>>> frame = ingestor.ingest() # doctest: +SKIP
grizz.ingestor.Ingestor ¶
Bases: BaseIngestor
Implement a simple DataFrame ingestor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame
|
DataFrame
|
The DataFrame to ingest. |
required |
Example usage:
>>> import polars as pl
>>> from grizz.ingestor import Ingestor
>>> ingestor = Ingestor(
... frame=pl.DataFrame(
... {
... "col1": [1, 2, 3, 4, 5],
... "col2": ["1", "2", "3", "4", "5"],
... "col3": ["1", "2", "3", "4", "5"],
... "col4": ["a", "b", "c", "d", "e"],
... }
... )
... )
>>> ingestor
Ingestor(shape=(5, 4))
>>> frame = ingestor.ingest()
grizz.ingestor.JoinIngestor ¶
Bases: BaseIngestor
Implement an ingestor that joins the output of multiple ingestors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ingestors
|
Sequence[BaseIngestor | dict]
|
The list of ingestors. |
required |
**kwargs
|
Any
|
Additional keyword arguments for
|
{}
|
Example usage:
>>> import polars as pl
>>> from grizz.ingestor import JoinIngestor, Ingestor
>>> ingestor1 = Ingestor(
... frame=pl.DataFrame(
... {
... "col": [1, 2, 3, 4, 5],
... "col1": ["1", "2", "3", "4", "5"],
... "col2": ["a", "b", "c", "d", "e"],
... }
... )
... )
>>> ingestor2 = Ingestor(
... frame=pl.DataFrame(
... {
... "col": [1, 2, 3, 5],
... "col3": [-1, -2, -3, -5],
... }
... )
... )
>>> ingestor3 = Ingestor(
... frame=pl.DataFrame(
... {
... "col": [1, 2, 3, 4, 5],
... "col4": [1.1, 2.2, 3.3, 4.4, 5.5],
... "col5": ["1.1", "2.2", "3.3", "4.4", "5.5"],
... }
... )
... )
>>> ingestor = JoinIngestor([ingestor1, ingestor2, ingestor3], on="col", how="inner")
>>> ingestor
JoinIngestor(
(ingestors):
(0): Ingestor(shape=(5, 3))
(1): Ingestor(shape=(4, 2))
(2): Ingestor(shape=(5, 3))
(kwargs): on='col', how='inner'
)
>>> frame = ingestor.ingest()
>>> frame
shape: (4, 6)
┌─────┬──────┬──────┬──────┬──────┬──────┐
│ col ┆ col1 ┆ col2 ┆ col3 ┆ col4 ┆ col5 │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ str ┆ i64 ┆ f64 ┆ str │
╞═════╪══════╪══════╪══════╪══════╪══════╡
│ 1 ┆ 1 ┆ a ┆ -1 ┆ 1.1 ┆ 1.1 │
│ 2 ┆ 2 ┆ b ┆ -2 ┆ 2.2 ┆ 2.2 │
│ 3 ┆ 3 ┆ c ┆ -3 ┆ 3.3 ┆ 3.3 │
│ 5 ┆ 5 ┆ e ┆ -5 ┆ 5.5 ┆ 5.5 │
└─────┴──────┴──────┴──────┴──────┴──────┘
grizz.ingestor.ParquetFileIngestor ¶
Bases: ParquetIngestor
Implement a parquet file ingestor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
Path | str
|
The path to the parquet file to ingest. |
required |
**kwargs
|
Any
|
Additional keyword arguments for
|
{}
|
Example usage:
>>> from grizz.ingestor import ParquetFileIngestor
>>> ingestor = ParquetFileIngestor(path="/path/to/frame.parquet")
>>> ingestor
ParquetFileIngestor(source=/path/to/frame.parquet)
>>> frame = ingestor.ingest() # doctest: +SKIP
grizz.ingestor.ParquetIngestor ¶
Bases: BaseIngestor
Implement a parquet ingestor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source
|
FileSource
|
The source to the parquet data to ingest. |
required |
**kwargs
|
Any
|
Additional keyword arguments for
|
{}
|
Example usage:
>>> from grizz.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor(source="/path/to/frame.parquet")
>>> ingestor
ParquetIngestor(source=/path/to/frame.parquet)
>>> frame = ingestor.ingest() # doctest: +SKIP
grizz.ingestor.TransformIngestor ¶
Bases: BaseIngestor
Implement an ingestor that also transforms the DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ingestor
|
BaseIngestor | dict
|
The base ingestor. |
required |
transformer
|
BaseTransformer | dict
|
The |
required |
Example usage:
>>> import polars as pl
>>> from grizz.ingestor import TransformIngestor, Ingestor
>>> from grizz.transformer import InplaceCast
>>> ingestor = TransformIngestor(
... ingestor=Ingestor(
... pl.DataFrame(
... {
... "col1": ["1", "2", "3", "4", "5"],
... "col2": ["a", "b", "c", "d", "e"],
... "col3": [1.2, 2.2, 3.2, 4.2, 5.2],
... }
... )
... ),
... transformer=InplaceCast(columns=["col1", "col3"], dtype=pl.Float32),
... )
>>> ingestor
TransformIngestor(
(ingestor): Ingestor(shape=(5, 3))
(transformer): InplaceCastTransformer(columns=('col1', 'col3'), exclude_columns=(), missing_policy='raise', dtype=Float32)
)
>>> frame = ingestor.ingest()
>>> frame
shape: (5, 3)
┌──────┬──────┬──────┐
│ col1 ┆ col2 ┆ col3 │
│ --- ┆ --- ┆ --- │
│ f32 ┆ str ┆ f32 │
╞══════╪══════╪══════╡
│ 1.0 ┆ a ┆ 1.2 │
│ 2.0 ┆ b ┆ 2.2 │
│ 3.0 ┆ c ┆ 3.2 │
│ 4.0 ┆ d ┆ 4.2 │
│ 5.0 ┆ e ┆ 5.2 │
└──────┴──────┴──────┘
grizz.ingestor.is_ingestor_config ¶
is_ingestor_config(config: dict) -> bool
Indicate if the input configuration is a configuration for a
BaseIngestor
.
This function only checks if the value of the key _target_
is valid. It does not check the other values. If _target_
indicates a function, the returned type hint is used to check
the class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
dict
|
The configuration to check. |
required |
Returns:
Type | Description |
---|---|
bool
|
|
Example usage:
>>> from grizz.ingestor import is_ingestor_config
>>> is_ingestor_config(
... {"_target_": "grizz.ingestor.CsvIngestor", "source": "/path/to/data.csv"}
... )
True
grizz.ingestor.setup_ingestor ¶
setup_ingestor(
ingestor: BaseIngestor | dict,
) -> BaseIngestor
Set up an ingestor.
The ingestor is instantiated from its configuration
by using the BaseIngestor
factory function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ingestor
|
BaseIngestor | dict
|
An ingestor or its configuration. |
required |
Returns:
Type | Description |
---|---|
BaseIngestor
|
An instantiated ingestor. |
Example usage:
>>> from grizz.ingestor import setup_ingestor
>>> ingestor = setup_ingestor(
... {"_target_": "grizz.ingestor.CsvIngestor", "source": "/path/to/data.csv"}
... )
>>> ingestor
CsvIngestor(source=/path/to/data.csv)