ingestor
flamme.ingestor ¶
Contain data ingestors.
flamme.ingestor.BaseIngestor ¶
Bases: ABC
Define the base class to implement a DataFrame ingestor.
Example usage:
>>> from flamme.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor(path="/path/to/frame.parquet")
>>> ingestor
ParquetIngestor(path=/path/to/frame.parquet)
>>> frame = ingestor.ingest() # doctest: +SKIP
flamme.ingestor.BaseIngestor.ingest ¶
ingest() -> DataFrame
Ingest a DataFrame.
Returns:
Type | Description |
---|---|
DataFrame
|
The ingested DataFrame. |
Example usage:
>>> from flamme.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor(path="/path/to/frame.parquet")
>>> frame = ingestor.ingest() # doctest: +SKIP
flamme.ingestor.ClickHouseIngestor ¶
Bases: BaseIngestor
Implement a clickhouse DataFrame ingestor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
query |
str
|
The query to get the data. |
required |
client |
Client | dict
|
The clickhouse client or its configuration.
Please check the documentation of
|
required |
Example usage:
>>> from flamme.ingestor import ClickHouseIngestor
>>> import clickhouse_connect
>>> client = clickhouse_connect.get_client() # doctest: +SKIP
>>> ingestor = ClickHouseIngestor(query="", client=client) # doctest: +SKIP
>>> frame = ingestor.ingest() # doctest: +SKIP
flamme.ingestor.CsvIngestor ¶
Bases: BaseIngestor
Implement a CSV DataFrame ingestor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Path | str
|
The path to the CSV file to ingest. |
required |
**kwargs |
Any
|
Additional keyword arguments for
|
{}
|
Example usage:
>>> from flamme.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor(path="/path/to/frame.csv")
>>> ingestor
ParquetIngestor(path=/path/to/frame.csv)
>>> frame = ingestor.ingest() # doctest: +SKIP
flamme.ingestor.Ingestor ¶
Bases: BaseIngestor
Implement a simple DataFrame ingestor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame |
DataFrame
|
The DataFrame to ingest. |
required |
Example usage:
>>> import pandas as pd
>>> from flamme.ingestor import Ingestor
>>> ingestor = Ingestor(
... frame=pd.DataFrame(
... {
... "col1": [1, 2, 3, 4, 5],
... "col2": ["1", "2", "3", "4", "5"],
... "col3": ["1", "2", "3", "4", "5"],
... "col4": ["a", "b", "c", "d", "e"],
... }
... )
... )
>>> ingestor
Ingestor(shape=(5, 4))
>>> frame = ingestor.ingest()
flamme.ingestor.ParquetIngestor ¶
Bases: BaseIngestor
Implement a parquet DataFrame ingestor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Path | str
|
The path to the parquet file to ingest. |
required |
**kwargs |
Any
|
Additional keyword arguments for
|
{}
|
Example usage:
>>> from flamme.ingestor import ParquetIngestor
>>> ingestor = ParquetIngestor(path="/path/to/frame.parquet")
>>> ingestor
ParquetIngestor(path=/path/to/frame.parquet)
>>> frame = ingestor.ingest() # doctest: +SKIP
flamme.ingestor.TransformedIngestor ¶
Bases: BaseIngestor
Implement an ingestor that also transforms the DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ingestor |
BaseIngestor | dict
|
The base ingestor. |
required |
transformer |
BaseDataFrameTransformer | dict
|
Specifies a |
required |
Example usage:
>>> from flamme.ingestor import TransformedIngestor, ParquetIngestor
>>> from flamme.transformer.dataframe import ToNumeric
>>> ingestor = TransformedIngestor(
... ingestor=ParquetIngestor(path="/path/to/frame.csv"),
... transformer=ToNumeric(columns=["col1", "col3"]),
... )
>>> ingestor
TransformedIngestor(
(ingestor): ParquetIngestor(path=/path/to/frame.csv)
(transformer): ToNumericDataFrameTransformer(columns=('col1', 'col3'), ignore_missing=False)
)
>>> frame = ingestor.ingest() # doctest: +SKIP
flamme.ingestor.is_ingestor_config ¶
is_ingestor_config(config: dict) -> bool
Indicate if the input configuration is a configuration for a
BaseIngestor
.
This function only checks if the value of the key _target_
is valid. It does not check the other values. If _target_
indicates a function, the returned type hint is used to check
the class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
dict
|
The configuration to check. |
required |
Returns:
Type | Description |
---|---|
bool
|
|
Example usage:
>>> from flamme.ingestor import is_ingestor_config
>>> is_ingestor_config(
... {"_target_": "flamme.ingestor.CsvIngestor", "path": "/path/to/data.csv"}
... )
True
flamme.ingestor.setup_ingestor ¶
setup_ingestor(
ingestor: BaseIngestor | dict,
) -> BaseIngestor
Set up an ingestor.
The ingestor is instantiated from its configuration
by using the BaseIngestor
factory function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ingestor |
BaseIngestor | dict
|
Specifies an ingestor or its configuration. |
required |
Returns:
Type | Description |
---|---|
BaseIngestor
|
An instantiated ingestor. |
Example usage:
>>> from flamme.ingestor import setup_ingestor
>>> ingestor = setup_ingestor(
... {"_target_": "flamme.ingestor.CsvIngestor", "path": "/path/to/data.csv"}
... )
>>> ingestor
CsvIngestor(path=/path/to/data.csv)