Skip to content

schema

flamme.schema

Contain functionalities to manipulate DataFrame's schemas.

flamme.schema.reader

Contain schema readers.

flamme.schema.reader.BaseSchemaReader

Bases: ABC

Define the base class to implement a schema reader.

Example usage:

>>> import tempfile
>>> import pandas as pd
>>> from pathlib import Path
>>> from flamme.schema.reader import ParquetSchemaReader
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     path = Path(tmpdir).joinpath("data.parquet")
...     pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}).to_parquet(
...         path, index=False
...     )
...     reader = ParquetSchemaReader(path)
...     reader
...     schema = reader.read()
...     schema
...
ParquetSchemaReader(path=.../data.parquet)
col1: int64
col2: string
...

flamme.schema.reader.BaseSchemaReader.read

read() -> Schema

Read the schema associated to a DataFrame.

Returns:

Type Description
Schema

The ingested DataFrame.

Example usage:

>>> import tempfile
>>> import pandas as pd
>>> from pathlib import Path
>>> from flamme.schema.reader import ParquetSchemaReader
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     path = Path(tmpdir).joinpath("data.parquet")
...     pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}).to_parquet(
...         path, index=False
...     )
...     reader = ParquetSchemaReader(path)
...     schema = reader.read()
...     schema
...
col1: int64
col2: string
...

flamme.schema.reader.ClickHouseSchemaReader

Bases: BaseSchemaReader

Implement a simple DataFrame ingestor.

Parameters:

Name Type Description Default
query str

The query to get the data.

required
client Client | dict

The clickhouse client or its configuration. Please check the documentation of clickhouse_connect.get_client to get more information.

required

Example usage:

>>> import pandas as pd
>>> from flamme.schema.reader import ClickHouseSchemaReader
>>> client = clickhouse_connect.get_client()  # doctest: +SKIP
>>> reader = ClickHouseSchemaReader(query="", client=client)  # doctest: +SKIP
>>> schema = reader.read()  # doctest: +SKIP

flamme.schema.reader.ParquetSchemaReader

Bases: BaseSchemaReader

Implement a parquet schema reader.

Parameters:

Name Type Description Default
path Path | str

The path to the parquet file to ingest.

required

Example usage:

>>> import tempfile
>>> import pandas as pd
>>> from pathlib import Path
>>> from flamme.schema.reader import ParquetSchemaReader
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     path = Path(tmpdir).joinpath("data.parquet")
...     pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]}).to_parquet(
...         path, index=False
...     )
...     reader = ParquetSchemaReader(path)
...     reader
...     schema = reader.read()
...     schema
...
ParquetSchemaReader(path=.../data.parquet)
col1: int64
col2: string
...

flamme.schema.reader.SchemaReader

Bases: BaseSchemaReader

Implement a simple DataFrame ingestor.

Parameters:

Name Type Description Default
frame DataFrame

The DataFrame to ingest.

required

Example usage:

>>> import pandas as pd
>>> from flamme.schema.reader import SchemaReader
>>> reader = SchemaReader(
...     frame=pd.DataFrame(
...         {
...             "col1": [1, 2, 3, 4, 5],
...             "col2": [1.1, 2.2, 3.3, 4.4, 5.5],
...             "col4": ["a", "b", "c", "d", "e"],
...         }
...     )
... )
>>> reader
SchemaReader(shape=(5, 3))
>>> schema = reader.read()
>>> schema
col1: int64
col2: double
col4: string
...

flamme.schema.reader.is_schema_reader_config

is_schema_reader_config(config: dict) -> bool

Indicate if the input configuration is a configuration for a BaseSchemaReader.

This function only checks if the value of the key _target_ is valid. It does not check the other values. If _target_ indicates a function, the returned type hint is used to check the class.

Parameters:

Name Type Description Default
config dict

The configuration to check.

required

Returns:

Type Description
bool

True if the input configuration is a configuration for a BaseSchemaReader object.

Example usage:

>>> from flamme.schema.reader import is_schema_reader_config
>>> is_schema_reader_config(
...     {
...         "_target_": "flamme.schema.reader.ParquetSchemaReader",
...         "path": "/path/to/data.parquet",
...     }
... )
True

flamme.schema.reader.setup_schema_reader

setup_schema_reader(
    reader: BaseSchemaReader | dict,
) -> BaseSchemaReader

Set up a schema reader.

The reader is instantiated from its configuration by using the BaseSchemaReader factory function.

Parameters:

Name Type Description Default
reader BaseSchemaReader | dict

Specifies a schema reader or its configuration.

required

Returns:

Type Description
BaseSchemaReader

An instantiated schema reader.

Example usage:

>>> from flamme.schema.reader import setup_schema_reader
>>> reader = setup_schema_reader(
...     {
...         "_target_": "flamme.schema.reader.ParquetSchemaReader",
...         "path": "/path/to/data.parquet",
...     }
... )
>>> reader
ParquetSchemaReader(path=.../data.parquet)