Data Schema
Get schema information about a DataFrame or Arrow Table.
Data Schema
Processing
This function analyzes an incoming data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) to generate a detailed schema report. The resulting schema is returned as a structured table (DataFrame or Arrow Table) in the format specified by the user, alongside a list of column names.
Inputs
- data
- The input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) for which the schema should be extracted.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Outputs
- schema
- A structured data table (DataFrame or Arrow Table) containing the schema details and computed statistics for each column.
- columns
- A list containing the names of all columns found in the input data.
The schema output contains the following metadata fields:
- column_position: The 1-based index position of the column in the dataset.
- column_name: The name of the column.
- data_type: The inferred SQL data type (DuckDB type) for the column.
- null_count (If requested): The number of null values in the column.
- min_value (If requested/Numeric): The minimum value found in the column.
- max_value (If requested/Numeric): The maximum value found in the column.
- avg_value (If requested/Numeric): The average value of the column, rounded to two decimal places.
- distinct_count (If requested): The total number of unique non-null values in the column.
- sample_values (If requested): A comma-separated string containing up to the specified sample size of distinct non-null values.
Outputs Types
| Output | Types |
|---|---|
schema |
DataFrame, ArrowTable |
columns |
List |
You can check the list of supported types here: Available Type Hints.
Options
The Data Schema brick contains some changeable options:
- Include Statistics
- If enabled, the function computes and includes basic statistics (min, max, average) for numeric columns, and the distinct count for all column types.
- Include Null Count
- If enabled, the function calculates and includes the total count of null values for each column.
- Include Sample Values
- If enabled, the function extracts a small sample of distinct, non-null values for each column.
- Sample Size
- Defines the maximum number of distinct sample values to retrieve if 'Include Sample Values' is enabled (between 1 and 20).
- Output Format
- Defines the data structure format for the resulting schema table (Pandas DataFrame, Polars DataFrame, or Arrow Table).
- Verbose
- If enabled, the function prints detailed logs about the steps performed during schema extraction.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, Tuple, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def get_data_schema(
data: Union[DataFrame, ArrowTable], options=None
) -> Tuple[Union[DataFrame, ArrowTable], List]:
brick_display_name = "Data Schema"
options = options or {}
verbose = options.get("verbose", True)
include_stats = options.get("include_stats", True)
include_null_count = options.get("include_null_count", True)
include_sample_values = options.get("include_sample_values", True)
sample_size = options.get("sample_size", 3)
output_format = options.get("output_format", "pandas")
schema = None
columns = []
conn = None
try:
verbose and logger.info(
f"[{brick_display_name}] Starting schema extraction operation."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
verbose and logger.info(
f"[{brick_display_name}] Found {len(column_info)} columns in data."
)
schema_data = []
for idx, (
col_name,
col_type,
null_allowed,
key_info,
default_val,
extra,
) in enumerate(column_info):
columns.append(col_name)
sanitized_col = _sanitize_identifier(col_name)
row_data = {
"column_position": idx + 1,
"column_name": col_name,
"data_type": col_type,
}
verbose and logger.info(
f"[{brick_display_name}] Processing column: {col_name} (type: {col_type})."
)
if include_null_count:
try:
null_count_query = f'SELECT COUNT(*) FROM input_table WHERE "{sanitized_col}" IS NULL'
null_count = conn.execute(null_count_query).fetchone()[0]
row_data["null_count"] = null_count
verbose and logger.info(
f"[{brick_display_name}] Column '{col_name}': {null_count} nulls."
)
except Exception as e:
verbose and logger.warning(
f"[{brick_display_name}] Could not compute null count for column '{col_name}'."
)
row_data["null_count"] = None
if include_stats:
try:
if any(
(
t in col_type.lower()
for t in [
"int",
"float",
"double",
"decimal",
"numeric",
"real",
]
)
):
stats_query = f'\n SELECT \n MIN("{sanitized_col}") as min_val,\n MAX("{sanitized_col}") as max_val,\n AVG("{sanitized_col}") as avg_val,\n COUNT(DISTINCT "{sanitized_col}") as distinct_count\n FROM input_table\n '
stats_result = conn.execute(stats_query).fetchone()
row_data["min_value"] = stats_result[0]
row_data["max_value"] = stats_result[1]
row_data["avg_value"] = (
round(stats_result[2], 2)
if stats_result[2] is not None
else None
)
row_data["distinct_count"] = stats_result[3]
verbose and logger.info(
f"[{brick_display_name}] Column '{col_name}': min={row_data['min_value']}, max={row_data['max_value']}, avg={row_data['avg_value']}, distinct={row_data['distinct_count']}."
)
else:
distinct_query = (
f'SELECT COUNT(DISTINCT "{sanitized_col}") FROM input_table'
)
distinct_count = conn.execute(distinct_query).fetchone()[0]
row_data["distinct_count"] = distinct_count
verbose and logger.info(
f"[{brick_display_name}] Column '{col_name}': {distinct_count} distinct values."
)
except Exception as e:
verbose and logger.warning(
f"[{brick_display_name}] Could not compute statistics for column '{col_name}'."
)
if "distinct_count" not in row_data:
row_data["distinct_count"] = None
if include_sample_values:
try:
sample_query = f'\n SELECT DISTINCT "{sanitized_col}" \n FROM input_table \n WHERE "{sanitized_col}" IS NOT NULL \n LIMIT {sample_size}\n '
sample_results = conn.execute(sample_query).fetchall()
sample_values = [str(row[0]) for row in sample_results]
row_data["sample_values"] = (
", ".join(sample_values)
if sample_values
else "No non-null values"
)
verbose and logger.info(
f"[{brick_display_name}] Column '{col_name}': sample values = [{row_data['sample_values']}]."
)
except Exception as e:
verbose and logger.warning(
f"[{brick_display_name}] Could not get sample values for column '{col_name}'."
)
row_data["sample_values"] = None
schema_data.append(row_data)
verbose and logger.info(
f"[{brick_display_name}] Converting schema to {output_format} format."
)
schema_df = pd.DataFrame(schema_data)
if output_format == "pandas":
schema = schema_df
verbose and logger.info(
f"[{brick_display_name}] Schema as pandas DataFrame."
)
elif output_format == "polars":
schema = pl.from_pandas(schema_df)
verbose and logger.info(
f"[{brick_display_name}] Schema as Polars DataFrame."
)
elif output_format == "arrow":
schema = pa.Table.from_pandas(schema_df)
verbose and logger.info(f"[{brick_display_name}] Schema as Arrow Table.")
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Schema extraction completed successfully. Processed {len(schema_data)} columns."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during schema extraction: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return (schema, columns)
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow