Data Schema

Get schema information about a DataFrame or Arrow Table.

Data Schema

Processing

This function analyzes an incoming data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) to generate a detailed schema report. The resulting schema is returned as a structured table (DataFrame or Arrow Table) in the format specified by the user, alongside a list of column names.

Inputs

data
The input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) for which the schema should be extracted.

Inputs Types

Input Types
data DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Outputs

schema
A structured data table (DataFrame or Arrow Table) containing the schema details and computed statistics for each column.
columns
A list containing the names of all columns found in the input data.

The schema output contains the following metadata fields:

  • column_position: The 1-based index position of the column in the dataset.
  • column_name: The name of the column.
  • data_type: The inferred SQL data type (DuckDB type) for the column.
  • null_count (If requested): The number of null values in the column.
  • min_value (If requested/Numeric): The minimum value found in the column.
  • max_value (If requested/Numeric): The maximum value found in the column.
  • avg_value (If requested/Numeric): The average value of the column, rounded to two decimal places.
  • distinct_count (If requested): The total number of unique non-null values in the column.
  • sample_values (If requested): A comma-separated string containing up to the specified sample size of distinct non-null values.

Outputs Types

Output Types
schema DataFrame, ArrowTable
columns List

You can check the list of supported types here: Available Type Hints.

Options

The Data Schema brick contains some changeable options:

Include Statistics
If enabled, the function computes and includes basic statistics (min, max, average) for numeric columns, and the distinct count for all column types.
Include Null Count
If enabled, the function calculates and includes the total count of null values for each column.
Include Sample Values
If enabled, the function extracts a small sample of distinct, non-null values for each column.
Sample Size
Defines the maximum number of distinct sample values to retrieve if 'Include Sample Values' is enabled (between 1 and 20).
Output Format
Defines the data structure format for the resulting schema table (Pandas DataFrame, Polars DataFrame, or Arrow Table).
Verbose
If enabled, the function prints detailed logs about the steps performed during schema extraction.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, Tuple, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def get_data_schema(
    data: Union[DataFrame, ArrowTable], options=None
) -> Tuple[Union[DataFrame, ArrowTable], List]:
    brick_display_name = "Data Schema"
    options = options or {}
    verbose = options.get("verbose", True)
    include_stats = options.get("include_stats", True)
    include_null_count = options.get("include_null_count", True)
    include_sample_values = options.get("include_sample_values", True)
    sample_size = options.get("sample_size", 3)
    output_format = options.get("output_format", "pandas")
    schema = None
    columns = []
    conn = None
    try:
        verbose and logger.info(
            f"[{brick_display_name}] Starting schema extraction operation."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        verbose and logger.info(
            f"[{brick_display_name}] Found {len(column_info)} columns in data."
        )
        schema_data = []
        for idx, (
            col_name,
            col_type,
            null_allowed,
            key_info,
            default_val,
            extra,
        ) in enumerate(column_info):
            columns.append(col_name)
            sanitized_col = _sanitize_identifier(col_name)
            row_data = {
                "column_position": idx + 1,
                "column_name": col_name,
                "data_type": col_type,
            }
            verbose and logger.info(
                f"[{brick_display_name}] Processing column: {col_name} (type: {col_type})."
            )
            if include_null_count:
                try:
                    null_count_query = f'SELECT COUNT(*) FROM input_table WHERE "{sanitized_col}" IS NULL'
                    null_count = conn.execute(null_count_query).fetchone()[0]
                    row_data["null_count"] = null_count
                    verbose and logger.info(
                        f"[{brick_display_name}] Column '{col_name}': {null_count} nulls."
                    )
                except Exception as e:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Could not compute null count for column '{col_name}'."
                    )
                    row_data["null_count"] = None
            if include_stats:
                try:
                    if any(
                        (
                            t in col_type.lower()
                            for t in [
                                "int",
                                "float",
                                "double",
                                "decimal",
                                "numeric",
                                "real",
                            ]
                        )
                    ):
                        stats_query = f'\n                            SELECT \n                                MIN("{sanitized_col}") as min_val,\n                                MAX("{sanitized_col}") as max_val,\n                                AVG("{sanitized_col}") as avg_val,\n                                COUNT(DISTINCT "{sanitized_col}") as distinct_count\n                            FROM input_table\n                        '
                        stats_result = conn.execute(stats_query).fetchone()
                        row_data["min_value"] = stats_result[0]
                        row_data["max_value"] = stats_result[1]
                        row_data["avg_value"] = (
                            round(stats_result[2], 2)
                            if stats_result[2] is not None
                            else None
                        )
                        row_data["distinct_count"] = stats_result[3]
                        verbose and logger.info(
                            f"[{brick_display_name}] Column '{col_name}': min={row_data['min_value']}, max={row_data['max_value']}, avg={row_data['avg_value']}, distinct={row_data['distinct_count']}."
                        )
                    else:
                        distinct_query = (
                            f'SELECT COUNT(DISTINCT "{sanitized_col}") FROM input_table'
                        )
                        distinct_count = conn.execute(distinct_query).fetchone()[0]
                        row_data["distinct_count"] = distinct_count
                        verbose and logger.info(
                            f"[{brick_display_name}] Column '{col_name}': {distinct_count} distinct values."
                        )
                except Exception as e:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Could not compute statistics for column '{col_name}'."
                    )
                    if "distinct_count" not in row_data:
                        row_data["distinct_count"] = None
            if include_sample_values:
                try:
                    sample_query = f'\n                        SELECT DISTINCT "{sanitized_col}" \n                        FROM input_table \n                        WHERE "{sanitized_col}" IS NOT NULL \n                        LIMIT {sample_size}\n                    '
                    sample_results = conn.execute(sample_query).fetchall()
                    sample_values = [str(row[0]) for row in sample_results]
                    row_data["sample_values"] = (
                        ", ".join(sample_values)
                        if sample_values
                        else "No non-null values"
                    )
                    verbose and logger.info(
                        f"[{brick_display_name}] Column '{col_name}': sample values = [{row_data['sample_values']}]."
                    )
                except Exception as e:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Could not get sample values for column '{col_name}'."
                    )
                    row_data["sample_values"] = None
            schema_data.append(row_data)
        verbose and logger.info(
            f"[{brick_display_name}] Converting schema to {output_format} format."
        )
        schema_df = pd.DataFrame(schema_data)
        if output_format == "pandas":
            schema = schema_df
            verbose and logger.info(
                f"[{brick_display_name}] Schema as pandas DataFrame."
            )
        elif output_format == "polars":
            schema = pl.from_pandas(schema_df)
            verbose and logger.info(
                f"[{brick_display_name}] Schema as Polars DataFrame."
            )
        elif output_format == "arrow":
            schema = pa.Table.from_pandas(schema_df)
            verbose and logger.info(f"[{brick_display_name}] Schema as Arrow Table.")
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported output format: {output_format}"
            )
            raise ValueError(f"Unsupported output format: {output_format}")
        verbose and logger.info(
            f"[{brick_display_name}] Schema extraction completed successfully. Processed {len(schema_data)} columns."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during schema extraction: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return (schema, columns)

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow