Split Column
Split a textual column into several columns on each occurrence of the delimiter. Output columns are numbered with a prefix.
Split Column
Processing
Split an existing textual column in the input dataframe or table based on a specified delimiter. The function supports outputting the results either as multiple new, numbered columns (prefixed by output prefix) or as a single array column. If the specified input column is not of a textual (string) type, the original data is returned unchanged.
Inputs
- data
- Input dataframe or arrow table to be processed.
- input column (optional)
- Name of the column containing the text data to split. This field is required and typically configured via options.
- delimiter (optional)
- The character or string used to separate the textual chunks within the input column.
- output prefix (optional)
- The base name used to prefix the newly created chunk columns (e.g., resulting in columns like 'prefix_0', 'prefix_1').
- output as (optional)
- Specifies whether the output should be generated as
separate_columnsor combined into a singlearraycolumn. - truncate mode (optional)
- Defines the truncation strategy for the resulting chunks:
none,keep_first_n, orkeep_last_n. - truncate count (optional)
- The maximum number of resulting chunks (N) to keep when truncation is enabled.
- keep empty chunks (optional)
- If True, empty strings resulting from consecutive delimiters will be included as chunks.
- remove original column (optional)
- If True, the original
input_columnis dropped from the output data.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
input column |
Str |
delimiter |
Str |
output prefix |
Str |
output as |
Str |
truncate mode |
Str |
truncate count |
Int |
keep empty chunks |
Bool |
remove original column |
Bool |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The processed data structure (DataFrame or Arrow Table) containing the original data plus the new split columns or the new array column.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Split Column brick contains some changeable options:
- Input Column
- The name of the textual column that needs to be split.
- Delimiter
- The string or character used as the separation boundary when splitting the text.
- Output Columns Prefix
- The prefix used for the newly generated split columns (e.g., 'chunk_0', 'chunk_1', etc.).
- Output As
- Determines the format of the split output:
separate_columns(multiple new columns) or anarray(a single column containing lists of chunks). - Truncate Mode
- Specifies the method for limiting the number of resulting chunks:
none,keep_first_n, orkeep_last_n. - Truncate Count (N)
- The specific number (N) of chunks to retain if a truncate mode is selected.
- Keep Empty Chunks
- If toggled on, retains empty strings generated when multiple delimiters occur consecutively.
- Remove Original Column
- If toggled on, the source column used for the split operation will be removed from the output.
- Output Format
- Specifies the desired format of the output data structure:
pandas,polars, orarrow. - Verbose
- Enables detailed logging during execution.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Int, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
"""Return the first non-None value from the arguments."""
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Args:
identifier: Column name or identifier to sanitize
Returns:
Sanitized identifier safe for SQL queries
"""
return identifier.replace('"', '""')
def _escape_sql_string(s):
"""
Escape single quotes in SQL string literals.
Args:
s: String to escape
Returns:
Escaped string safe for SQL queries
"""
return s.replace("'", "''")
def _is_textual_column(column_type):
"""
Check if a column type is textual (string-like).
Args:
column_type: The DuckDB column type as a string
Returns:
True if the column is textual, False otherwise
"""
column_type_upper = str(column_type).upper()
textual_types = ["VARCHAR", "CHAR", "TEXT", "STRING", "BPCHAR"]
for text_type in textual_types:
if column_type_upper.startswith(text_type):
return True
return False
def split_column(
data: Union[DataFrame, ArrowTable],
input_column: Str = None,
delimiter: Str = None,
output_prefix: Str = None,
output_as: Str = None,
truncate_mode: Str = None,
truncate_count: Int = None,
keep_empty_chunks: Bool = None,
remove_original_column: Bool = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Split Column"
options = options or {}
verbose = options.get("verbose", True)
input_column = _coalesce(input_column, options.get("input_column", ""))
delimiter = _coalesce(delimiter, options.get("delimiter", ","))
output_prefix = _coalesce(output_prefix, options.get("output_prefix", "chunk"))
output_as = _coalesce(output_as, options.get("output_as", "separate_columns"))
truncate_mode = _coalesce(truncate_mode, options.get("truncate_mode", "none"))
truncate_count = _coalesce(truncate_count, options.get("truncate_count", 2))
keep_empty_chunks = _coalesce(
keep_empty_chunks, options.get("keep_empty_chunks", True)
)
remove_original_column = _coalesce(
remove_original_column, options.get("remove_original_column", False)
)
output_format = options.get("output_format", "pandas")
result = data
conn = None
if not input_column:
verbose and logger.error(
f"[{brick_display_name}] Input column must be specified!"
)
raise ValueError("Input column must be specified!")
if not output_prefix:
verbose and logger.error(
f"[{brick_display_name}] Output prefix must be specified!"
)
raise ValueError("Output prefix must be specified!")
if not delimiter:
verbose and logger.error(f"[{brick_display_name}] Delimiter must be specified!")
raise ValueError("Delimiter must be specified!")
valid_output_as = ["separate_columns", "array"]
if output_as not in valid_output_as:
verbose and logger.error(
f"[{brick_display_name}] Invalid output_as: {output_as}."
)
raise ValueError(f"output_as must be one of {valid_output_as}")
valid_truncate_modes = ["none", "keep_first_n", "keep_last_n"]
if truncate_mode not in valid_truncate_modes:
verbose and logger.error(
f"[{brick_display_name}] Invalid truncate_mode: {truncate_mode}."
)
raise ValueError(f"truncate_mode must be one of {valid_truncate_modes}")
if truncate_mode != "none" and truncate_count < 1:
verbose and logger.error(
f"[{brick_display_name}] truncate_count must be at least 1."
)
raise ValueError("truncate_count must be at least 1")
try:
verbose and logger.info(
f"[{brick_display_name}] Starting split on column '{input_column}' with delimiter '{delimiter}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
if input_column not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Input column '{input_column}' not found in data."
)
raise ValueError(f"Input column '{input_column}' not found in data.")
column_type = all_columns[input_column]
if not _is_textual_column(column_type):
warning_msg = f"Column '{input_column}' has non-textual type '{column_type}'. Split operation only supports textual (string) columns. Returning data unchanged."
logger.warning(f"[{brick_display_name}] {warning_msg}")
else:
verbose and logger.info(
f"[{brick_display_name}] Using input column: '{input_column}' (type: {column_type})."
)
sanitized_input_col = _sanitize_identifier(input_column)
escaped_delimiter = _escape_sql_string(delimiter)
col_expr = f'CAST("{sanitized_input_col}" AS VARCHAR)'
if truncate_mode == "none":
if keep_empty_chunks:
max_chunks_query = f"\n SELECT MAX(array_length(string_split({col_expr}, '{escaped_delimiter}'))) as max_chunks\n FROM input_table\n WHERE {col_expr} IS NOT NULL\n "
else:
max_chunks_query = f"\n SELECT MAX(array_length(\n list_filter(string_split({col_expr}, '{escaped_delimiter}'), x -> x != '')\n )) as max_chunks\n FROM input_table\n WHERE {col_expr} IS NOT NULL\n "
max_chunks_result = conn.execute(max_chunks_query).fetchone()
max_chunks = (
max_chunks_result[0] if max_chunks_result[0] is not None else 0
)
verbose and logger.info(
f"[{brick_display_name}] Maximum number of chunks detected: {max_chunks}."
)
else:
max_chunks = truncate_count
verbose and logger.info(
f"[{brick_display_name}] Truncating to {truncate_count} chunks ({truncate_mode})."
)
if max_chunks == 0:
verbose and logger.warning(
f"[{brick_display_name}] No data to split. Returning original data."
)
else:
if keep_empty_chunks:
split_expr = f"string_split({col_expr}, '{escaped_delimiter}')"
else:
split_expr = f"list_filter(string_split({col_expr}, '{escaped_delimiter}'), x -> x != '')"
if output_as == "array":
array_col_name = f"{output_prefix}_array"
sanitized_array_col = _sanitize_identifier(array_col_name)
if truncate_mode == "keep_first_n":
array_expr = f"list_slice({split_expr}, 1, {truncate_count})"
elif truncate_mode == "keep_last_n":
array_expr = (
f"list_slice({split_expr}, -{truncate_count}, NULL)"
)
else:
array_expr = split_expr
select_parts = []
for col in all_columns.keys():
if col == input_column and remove_original_column:
continue
sanitized_col = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col}"')
select_parts.append(f'{array_expr} AS "{sanitized_array_col}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Creating array column '{array_col_name}'."
)
else:
select_parts = []
for col in all_columns.keys():
if col == input_column and remove_original_column:
continue
sanitized_col = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col}"')
if truncate_mode == "keep_last_n":
for i in range(max_chunks):
chunk_col_name = f"{output_prefix}_{i}"
sanitized_chunk_col = _sanitize_identifier(chunk_col_name)
index = -truncate_count + i + 1
chunk_expr = f"list_extract({split_expr}, {index})"
select_parts.append(
f'{chunk_expr} AS "{sanitized_chunk_col}"'
)
else:
for i in range(max_chunks):
chunk_col_name = f"{output_prefix}_{i}"
sanitized_chunk_col = _sanitize_identifier(chunk_col_name)
chunk_expr = f"list_extract({split_expr}, {i + 1})"
select_parts.append(
f'{chunk_expr} AS "{sanitized_chunk_col}"'
)
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Creating {max_chunks} separate columns with prefix '{output_prefix}_'."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Split completed successfully."
)
except Exception as e:
verbose and logger.error(f"[{brick_display_name}] Error during split: {str(e)}")
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow