Keep Columns

Keep only specified columns in a DataFrame or Arrow Table.

Keep Columns

Processing

This brick filters the input data (DataFrame or Arrow Table) to retain only the columns explicitly specified in the options or input. The resulting filtered data can be outputted in various formats (Pandas DataFrame, Polars DataFrame, or PyArrow Table). If no columns are specified for filtering, the input data is returned unchanged.

Inputs

data
The input data (Pandas DataFrame, Polars DataFrame, or PyArrow Table) that needs to be filtered down to a specific set of columns.
columns (optional)
A list of string names representing the columns that should be kept in the output data structure. This input, if connected, overrides the list defined in the options menu.

Inputs Types

Input Types
data DataFrame, ArrowTable
columns List

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data structure containing only the specified columns. The type of the output depends on the selected Output Format option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Keep Columns brick contains some changeable options:

List of Columns
A list of column names (as strings) that must be present in the output data.
Output Format
Specifies the desired data format for the output structure. Choices include pandas (Pandas DataFrame), polars (Polars DataFrame), or arrow (PyArrow Table). Defaults to pandas.
Safe Mode
If enabled, the brick will ignore any column names listed that do not exist in the input data, instead of raising an error and stopping the flow. Defaults to False.
Verbose
If enabled, detailed logging messages regarding the operation steps (e.g., input format detection, column counts, query execution) are displayed. Defaults to True.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, List, DataFrame, ArrowTable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None))


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def keep_columns(
    data: Union[DataFrame, ArrowTable], columns: List = None, options=None
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Keep Columns"
    options = options or {}
    verbose = options.get("verbose", True)
    columns = _coalesce(columns, options.get("columns", []))
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    no_filter = False
    if not columns:
        verbose and logger.warning(
            f"[{brick_display_name}] No columns specified. Returning data unchanged."
        )
        result = data
        no_filter = True
    if not no_filter:
        if not isinstance(columns, list) and (
            not all((isinstance(c, str) for c in columns))
        ):
            verbose and logger.error(f"[{brick_display_name}] Invalid columns format!")
            raise ValueError("Columns must be provided as a list!")
        try:
            verbose and logger.info(
                f"[{brick_display_name}] Starting column filter operation with {len(columns)} columns to keep."
            )
            data_type = None
            if isinstance(data, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(data, pl.DataFrame):
                data_type = "polars"
            elif isinstance(data, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
                raise ValueError(
                    "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Detected input format: {data_type}."
            )
            conn = duckdb.connect(":memory:")
            conn.register("input_table", data)
            verbose and logger.info(
                f"[{brick_display_name}] Building SQL query for column filtering."
            )
            column_info = conn.execute("DESCRIBE input_table").fetchall()
            all_columns = [col[0] for col in column_info]
            if not safe_mode:
                missing_columns = [col for col in columns if col not in all_columns]
                if missing_columns:
                    verbose and logger.error(
                        f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                    )
                    conn.close()
                    raise ValueError(f"Columns not found in data: {missing_columns}")
            select_parts = []
            kept_count = 0
            skipped_count = 0
            for col in columns:
                if col in all_columns:
                    sanitized_col = _sanitize_identifier(col)
                    select_parts.append(f'"{sanitized_col}"')
                    verbose and logger.info(
                        f"[{brick_display_name}] Keeping column: {col}."
                    )
                    kept_count += 1
                elif safe_mode:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Safe mode: Skipping non-existent column: {col}"
                    )
                    skipped_count += 1
            if not select_parts:
                verbose and logger.error(
                    f"[{brick_display_name}] No valid columns to keep! Result would be empty."
                )
                conn.close()
                raise ValueError(
                    "No valid columns to keep! All specified columns are missing from the data."
                )
            select_clause = ", ".join(select_parts)
            query = f"SELECT {select_clause} FROM input_table"
            verbose and logger.info(
                f"[{brick_display_name}] Executing query to filter columns."
            )
            if output_format == "pandas":
                result = conn.execute(query).df()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to pandas DataFrame."
                )
            elif output_format == "polars":
                result = conn.execute(query).pl()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Polars DataFrame."
                )
            elif output_format == "arrow":
                result = conn.execute(query).fetch_arrow_table()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Arrow Table."
                )
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unsupported output format: {output_format}"
                )
                raise ValueError(f"Unsupported output format: {output_format}")
            conn.close()
            verbose and logger.info(
                f"[{brick_display_name}] Column filter operation completed successfully. Kept {kept_count} columns{(f', skipped {skipped_count} missing columns' if safe_mode and skipped_count > 0 else '')}."
            )
        except Exception as e:
            verbose and logger.error(
                f"[{brick_display_name}] Error during column filter operation."
            )
            raise
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow