Delete Columns

Remove specified columns from a DataFrame or Arrow Table.

Delete Columns

Processing

Remove specified columns from the input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table). It allows users to define which columns to delete either via a direct input or through the brick options, and the final output format can be chosen from Pandas, Polars, or Arrow.

Inputs

data
The input DataFrame or Arrow Table from which columns will be removed.
columns (optional)
A list of string column names that should be deleted from the data. If provided, this list overrides the columns specified in the brick options.

Inputs Types

Input Types
data DataFrame, ArrowTable
columns List

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data structure (DataFrame or ArrowTable) after the specified columns have been deleted, formatted according to the Output Format option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Delete Columns brick contains some changeable options:

List of Columns to Delete
A list of column names (strings) that should be removed from the input data. This list is merged with the columns input if provided.
Output Format
Specifies the desired format of the resulting data structure (DataFrame or Arrow Table). Choices include pandas, polars, or arrow. Defaults to pandas.
Safe Mode
If enabled (True), the function will ignore columns specified for deletion that do not exist in the input data, logging a warning instead of raising an error. If disabled (False, default), attempting to delete a non-existent column will raise an error.
Verbose
If enabled (True, default), logs detailed information about the processing steps, detected formats, and results to the console.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, List, DataFrame, ArrowTable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None))


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def delete_columns(
    data: Union[DataFrame, ArrowTable], columns: List = None, options=None
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Delete Columns"
    options = options or {}
    verbose = options.get("verbose", True)
    columns = _coalesce(columns, options.get("columns", []))
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    no_delete = False
    if not columns:
        verbose and logger.warning(
            f"[{brick_display_name}] No columns specified for deletion. Returning data unchanged."
        )
        result = data
        no_delete = True
    if not no_delete:
        if not isinstance(columns, list) and (
            not all((isinstance(c, str) for c in columns))
        ):
            verbose and logger.error(
                f"[{brick_display_name}] Invalid columns format! Expected a list."
            )
            raise ValueError("Columns must be provided as a list!")
        try:
            verbose and logger.info(
                f"[{brick_display_name}] Starting column delete operation with {len(columns)} columns to remove."
            )
            data_type = None
            if isinstance(data, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(data, pl.DataFrame):
                data_type = "polars"
            elif isinstance(data, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
                raise ValueError(
                    "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Detected input format: {data_type}."
            )
            conn = duckdb.connect(":memory:")
            conn.register("input_table", data)
            verbose and logger.info(
                f"[{brick_display_name}] Building SQL query for column deletion."
            )
            column_info = conn.execute("DESCRIBE input_table").fetchall()
            all_columns = [col[0] for col in column_info]
            if not safe_mode:
                missing_columns = [col for col in columns if col not in all_columns]
                if missing_columns:
                    verbose and logger.error(
                        f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                    )
                    conn.close()
                    raise ValueError(f"Columns not found in data: {missing_columns}")
            columns_to_delete = set(columns)
            select_parts = []
            kept_count = 0
            deleted_count = 0
            skipped_count = 0
            for col in all_columns:
                if col not in columns_to_delete:
                    sanitized_col = _sanitize_identifier(col)
                    select_parts.append(f'"{sanitized_col}"')
                    kept_count += 1
                else:
                    verbose and logger.info(
                        f"[{brick_display_name}] Deleting column: {col}."
                    )
                    deleted_count += 1
            if safe_mode:
                requested_columns = set(columns)
                existing_columns = set(all_columns)
                skipped_columns = requested_columns - existing_columns
                skipped_count = len(skipped_columns)
                if skipped_columns:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Safe mode: Skipped {skipped_count} non-existent columns: {list(skipped_columns)}"
                    )
            if not select_parts:
                verbose and logger.error(
                    f"[{brick_display_name}] All columns would be deleted! Result would be empty."
                )
                conn.close()
                raise ValueError(
                    "Cannot delete all columns! At least one column must remain."
                )
            select_clause = ", ".join(select_parts)
            query = f"SELECT {select_clause} FROM input_table"
            verbose and logger.info(
                f"[{brick_display_name}] Executing query to delete columns."
            )
            if output_format == "pandas":
                result = conn.execute(query).df()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to pandas DataFrame."
                )
            elif output_format == "polars":
                result = conn.execute(query).pl()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Polars DataFrame."
                )
            elif output_format == "arrow":
                result = conn.execute(query).fetch_arrow_table()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Arrow Table."
                )
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unsupported output format: {output_format}"
                )
                raise ValueError(f"Unsupported output format: {output_format}")
            conn.close()
            verbose and logger.info(
                f"[{brick_display_name}] Column delete operation completed successfully. Deleted {deleted_count} columns, kept {kept_count} columns{(f', skipped {skipped_count} missing columns' if safe_mode and skipped_count > 0 else '')}."
            )
        except Exception as e:
            verbose and logger.error(
                f"[{brick_display_name}] Error during column delete operation."
            )
            raise
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow