Split Column

Split a textual column into several columns on each occurrence of the delimiter. Output columns are numbered with a prefix.

Split Column

Processing

Split an existing textual column in the input dataframe or table based on a specified delimiter. The function supports outputting the results either as multiple new, numbered columns (prefixed by output prefix) or as a single array column. If the specified input column is not of a textual (string) type, the original data is returned unchanged.

Inputs

data
Input dataframe or arrow table to be processed.
input column (optional)
Name of the column containing the text data to split. This field is required and typically configured via options.
delimiter (optional)
The character or string used to separate the textual chunks within the input column.
output prefix (optional)
The base name used to prefix the newly created chunk columns (e.g., resulting in columns like 'prefix_0', 'prefix_1').
output as (optional)
Specifies whether the output should be generated as separate_columns or combined into a single array column.
truncate mode (optional)
Defines the truncation strategy for the resulting chunks: none, keep_first_n, or keep_last_n.
truncate count (optional)
The maximum number of resulting chunks (N) to keep when truncation is enabled.
keep empty chunks (optional)
If True, empty strings resulting from consecutive delimiters will be included as chunks.
remove original column (optional)
If True, the original input_column is dropped from the output data.

Inputs Types

Input Types
data DataFrame, ArrowTable
input column Str
delimiter Str
output prefix Str
output as Str
truncate mode Str
truncate count Int
keep empty chunks Bool
remove original column Bool

You can check the list of supported types here: Available Type Hints.

Outputs

result
The processed data structure (DataFrame or Arrow Table) containing the original data plus the new split columns or the new array column.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Split Column brick contains some changeable options:

Input Column
The name of the textual column that needs to be split.
Delimiter
The string or character used as the separation boundary when splitting the text.
Output Columns Prefix
The prefix used for the newly generated split columns (e.g., 'chunk_0', 'chunk_1', etc.).
Output As
Determines the format of the split output: separate_columns (multiple new columns) or an array (a single column containing lists of chunks).
Truncate Mode
Specifies the method for limiting the number of resulting chunks: none, keep_first_n, or keep_last_n.
Truncate Count (N)
The specific number (N) of chunks to retain if a truncate mode is selected.
Keep Empty Chunks
If toggled on, retains empty strings generated when multiple delimiters occur consecutively.
Remove Original Column
If toggled on, the source column used for the split operation will be removed from the output.
Output Format
Specifies the desired format of the output data structure: pandas, polars, or arrow.
Verbose
Enables detailed logging during execution.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Int, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    """Return the first non-None value from the arguments."""
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.

    Args:
        identifier: Column name or identifier to sanitize

    Returns:
        Sanitized identifier safe for SQL queries
    """
    return identifier.replace('"', '""')


def _escape_sql_string(s):
    """
    Escape single quotes in SQL string literals.

    Args:
        s: String to escape

    Returns:
        Escaped string safe for SQL queries
    """
    return s.replace("'", "''")


def _is_textual_column(column_type):
    """
    Check if a column type is textual (string-like).

    Args:
        column_type: The DuckDB column type as a string

    Returns:
        True if the column is textual, False otherwise
    """
    column_type_upper = str(column_type).upper()
    textual_types = ["VARCHAR", "CHAR", "TEXT", "STRING", "BPCHAR"]
    for text_type in textual_types:
        if column_type_upper.startswith(text_type):
            return True
    return False


def split_column(
    data: Union[DataFrame, ArrowTable],
    input_column: Str = None,
    delimiter: Str = None,
    output_prefix: Str = None,
    output_as: Str = None,
    truncate_mode: Str = None,
    truncate_count: Int = None,
    keep_empty_chunks: Bool = None,
    remove_original_column: Bool = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Split Column"
    options = options or {}
    verbose = options.get("verbose", True)
    input_column = _coalesce(input_column, options.get("input_column", ""))
    delimiter = _coalesce(delimiter, options.get("delimiter", ","))
    output_prefix = _coalesce(output_prefix, options.get("output_prefix", "chunk"))
    output_as = _coalesce(output_as, options.get("output_as", "separate_columns"))
    truncate_mode = _coalesce(truncate_mode, options.get("truncate_mode", "none"))
    truncate_count = _coalesce(truncate_count, options.get("truncate_count", 2))
    keep_empty_chunks = _coalesce(
        keep_empty_chunks, options.get("keep_empty_chunks", True)
    )
    remove_original_column = _coalesce(
        remove_original_column, options.get("remove_original_column", False)
    )
    output_format = options.get("output_format", "pandas")
    result = data
    conn = None
    if not input_column:
        verbose and logger.error(
            f"[{brick_display_name}] Input column must be specified!"
        )
        raise ValueError("Input column must be specified!")
    if not output_prefix:
        verbose and logger.error(
            f"[{brick_display_name}] Output prefix must be specified!"
        )
        raise ValueError("Output prefix must be specified!")
    if not delimiter:
        verbose and logger.error(f"[{brick_display_name}] Delimiter must be specified!")
        raise ValueError("Delimiter must be specified!")
    valid_output_as = ["separate_columns", "array"]
    if output_as not in valid_output_as:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid output_as: {output_as}."
        )
        raise ValueError(f"output_as must be one of {valid_output_as}")
    valid_truncate_modes = ["none", "keep_first_n", "keep_last_n"]
    if truncate_mode not in valid_truncate_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid truncate_mode: {truncate_mode}."
        )
        raise ValueError(f"truncate_mode must be one of {valid_truncate_modes}")
    if truncate_mode != "none" and truncate_count < 1:
        verbose and logger.error(
            f"[{brick_display_name}] truncate_count must be at least 1."
        )
        raise ValueError("truncate_count must be at least 1")
    try:
        verbose and logger.info(
            f"[{brick_display_name}] Starting split on column '{input_column}' with delimiter '{delimiter}'."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        verbose and logger.info(
            f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
        )
        if input_column not in all_columns:
            verbose and logger.error(
                f"[{brick_display_name}] Input column '{input_column}' not found in data."
            )
            raise ValueError(f"Input column '{input_column}' not found in data.")
        column_type = all_columns[input_column]
        if not _is_textual_column(column_type):
            warning_msg = f"Column '{input_column}' has non-textual type '{column_type}'. Split operation only supports textual (string) columns. Returning data unchanged."
            logger.warning(f"[{brick_display_name}] {warning_msg}")
        else:
            verbose and logger.info(
                f"[{brick_display_name}] Using input column: '{input_column}' (type: {column_type})."
            )
            sanitized_input_col = _sanitize_identifier(input_column)
            escaped_delimiter = _escape_sql_string(delimiter)
            col_expr = f'CAST("{sanitized_input_col}" AS VARCHAR)'
            if truncate_mode == "none":
                if keep_empty_chunks:
                    max_chunks_query = f"\n                        SELECT MAX(array_length(string_split({col_expr}, '{escaped_delimiter}'))) as max_chunks\n                        FROM input_table\n                        WHERE {col_expr} IS NOT NULL\n                    "
                else:
                    max_chunks_query = f"\n                        SELECT MAX(array_length(\n                            list_filter(string_split({col_expr}, '{escaped_delimiter}'), x -> x != '')\n                        )) as max_chunks\n                        FROM input_table\n                        WHERE {col_expr} IS NOT NULL\n                    "
                max_chunks_result = conn.execute(max_chunks_query).fetchone()
                max_chunks = (
                    max_chunks_result[0] if max_chunks_result[0] is not None else 0
                )
                verbose and logger.info(
                    f"[{brick_display_name}] Maximum number of chunks detected: {max_chunks}."
                )
            else:
                max_chunks = truncate_count
                verbose and logger.info(
                    f"[{brick_display_name}] Truncating to {truncate_count} chunks ({truncate_mode})."
                )
            if max_chunks == 0:
                verbose and logger.warning(
                    f"[{brick_display_name}] No data to split. Returning original data."
                )
            else:
                if keep_empty_chunks:
                    split_expr = f"string_split({col_expr}, '{escaped_delimiter}')"
                else:
                    split_expr = f"list_filter(string_split({col_expr}, '{escaped_delimiter}'), x -> x != '')"
                if output_as == "array":
                    array_col_name = f"{output_prefix}_array"
                    sanitized_array_col = _sanitize_identifier(array_col_name)
                    if truncate_mode == "keep_first_n":
                        array_expr = f"list_slice({split_expr}, 1, {truncate_count})"
                    elif truncate_mode == "keep_last_n":
                        array_expr = (
                            f"list_slice({split_expr}, -{truncate_count}, NULL)"
                        )
                    else:
                        array_expr = split_expr
                    select_parts = []
                    for col in all_columns.keys():
                        if col == input_column and remove_original_column:
                            continue
                        sanitized_col = _sanitize_identifier(col)
                        select_parts.append(f'"{sanitized_col}"')
                    select_parts.append(f'{array_expr} AS "{sanitized_array_col}"')
                    select_clause = ", ".join(select_parts)
                    query = f"SELECT {select_clause} FROM input_table"
                    verbose and logger.info(
                        f"[{brick_display_name}] Creating array column '{array_col_name}'."
                    )
                else:
                    select_parts = []
                    for col in all_columns.keys():
                        if col == input_column and remove_original_column:
                            continue
                        sanitized_col = _sanitize_identifier(col)
                        select_parts.append(f'"{sanitized_col}"')
                    if truncate_mode == "keep_last_n":
                        for i in range(max_chunks):
                            chunk_col_name = f"{output_prefix}_{i}"
                            sanitized_chunk_col = _sanitize_identifier(chunk_col_name)
                            index = -truncate_count + i + 1
                            chunk_expr = f"list_extract({split_expr}, {index})"
                            select_parts.append(
                                f'{chunk_expr} AS "{sanitized_chunk_col}"'
                            )
                    else:
                        for i in range(max_chunks):
                            chunk_col_name = f"{output_prefix}_{i}"
                            sanitized_chunk_col = _sanitize_identifier(chunk_col_name)
                            chunk_expr = f"list_extract({split_expr}, {i + 1})"
                            select_parts.append(
                                f'{chunk_expr} AS "{sanitized_chunk_col}"'
                            )
                    select_clause = ", ".join(select_parts)
                    query = f"SELECT {select_clause} FROM input_table"
                    verbose and logger.info(
                        f"[{brick_display_name}] Creating {max_chunks} separate columns with prefix '{output_prefix}_'."
                    )
                if output_format == "pandas":
                    result = conn.execute(query).df()
                    verbose and logger.info(
                        f"[{brick_display_name}] Converted result to pandas DataFrame."
                    )
                elif output_format == "polars":
                    result = conn.execute(query).pl()
                    verbose and logger.info(
                        f"[{brick_display_name}] Converted result to Polars DataFrame."
                    )
                elif output_format == "arrow":
                    result = conn.execute(query).fetch_arrow_table()
                    verbose and logger.info(
                        f"[{brick_display_name}] Converted result to Arrow Table."
                    )
                else:
                    verbose and logger.error(
                        f"[{brick_display_name}] Unsupported output format: {output_format}"
                    )
                    raise ValueError(f"Unsupported output format: {output_format}")
                verbose and logger.info(
                    f"[{brick_display_name}] Split completed successfully."
                )
    except Exception as e:
        verbose and logger.error(f"[{brick_display_name}] Error during split: {str(e)}")
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow