Move Columns
Move one or more columns to a specific position in a DataFrame or Arrow Table.
Move Columns
Processing
Move one or more columns to a specific position within the input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table).
Inputs
- data
- The input DataFrame or Arrow Table whose columns need to be reordered.
- columns to move (optional)
- A list of column names that should be moved to the new position. This is usually set via options.
- position type (optional)
- Defines where the columns should be moved (e.g., 'beginning', 'end', 'before', 'after'). This is usually set via options.
- reference column (optional)
- The column used as a reference point when
position typeis 'before' or 'after'. This is usually set via options.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
columns to_move |
List |
position type |
Str |
reference column |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting data structure (DataFrame or Arrow Table) with columns reordered according to the specified configuration, converted to the chosen
Output Format.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Move Columns brick contains some changeable options:
- Columns to Move
- A list of column names to be moved to the target position.
- Position Type
- Specifies the location where the columns should be moved. Choices include "beginning", "end", "before" (relative to a reference column), or "after" (relative to a reference column).
- Reference Column
- The name of the existing column used as a reference point when
Position Typeis set to "before" or "after". - Output Format
- Defines the desired format of the output data structure (
pandas,polars, orarrow). Defaults topandas. - Safe Mode
- If enabled, columns listed in Columns to Move that do not exist in the input data will be ignored, allowing the function to proceed without error.
- Verbose
- If enabled, the function will print detailed execution logs.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None))
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def move_columns(
data: Union[DataFrame, ArrowTable],
columns_to_move: List = None,
position_type: Str = None,
reference_column: Str = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Move Columns"
options = options or {}
verbose = options.get("verbose", True)
columns_to_move = _coalesce(columns_to_move, options.get("columns_to_move", []))
position_type = _coalesce(position_type, options.get("position_type", "beginning"))
reference_column = _coalesce(reference_column, options.get("reference_column", ""))
output_format = options.get("output_format", "pandas")
safe_mode = options.get("safe_mode", False)
result = None
no_reorder = False
if not columns_to_move:
verbose and logger.warning(
f"[{brick_display_name}] No columns specified for reordering. Returning data unchanged."
)
result = data
no_reorder = True
if not no_reorder:
if not isinstance(columns_to_move, list) and (
not all((isinstance(c, str) for c in columns_to_move))
):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list."
)
raise ValueError("Columns to move must be provided as a list!")
valid_positions = ["beginning", "end", "before", "after"]
if position_type not in valid_positions:
verbose and logger.error(
f"[{brick_display_name}] Invalid position type: {position_type}. Must be one of {valid_positions}."
)
raise ValueError(f"Position type must be one of {valid_positions}")
if position_type in ["before", "after"] and (not reference_column):
verbose and logger.error(
f"[{brick_display_name}] Reference column is required when position type is '{position_type}'."
)
raise ValueError(
f"Reference column is required for position type '{position_type}'"
)
try:
verbose and logger.info(
f"[{brick_display_name}] Starting column reorder operation: moving {len(columns_to_move)} column(s) to {position_type}{(f' {reference_column}' if reference_column else '')}."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = [col[0] for col in column_info]
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
if not safe_mode:
missing_columns = [
col for col in columns_to_move if col not in all_columns
]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}"
)
conn.close()
raise ValueError(f"Columns not found in data: {missing_columns}")
if position_type in ["before", "after"]:
if reference_column not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Reference column '{reference_column}' not found in data."
)
conn.close()
raise ValueError(
f"Reference column '{reference_column}' not found in data"
)
if reference_column in columns_to_move:
verbose and logger.error(
f"[{brick_display_name}] Reference column '{reference_column}' cannot be in the list of columns to move."
)
conn.close()
raise ValueError(
f"Reference column '{reference_column}' cannot be in the list of columns to move"
)
columns_to_move_set = set(columns_to_move)
valid_columns_to_move = [
col for col in columns_to_move if col in all_columns
]
remaining_columns = [
col for col in all_columns if col not in columns_to_move_set
]
new_column_order = []
if position_type == "beginning":
new_column_order = valid_columns_to_move + remaining_columns
verbose and logger.info(
f"[{brick_display_name}] Moving columns to beginning."
)
elif position_type == "end":
new_column_order = remaining_columns + valid_columns_to_move
verbose and logger.info(
f"[{brick_display_name}] Moving columns to end."
)
elif position_type == "before":
for col in remaining_columns:
if col == reference_column:
new_column_order.extend(valid_columns_to_move)
new_column_order.append(col)
verbose and logger.info(
f"[{brick_display_name}] Moving columns before '{reference_column}'."
)
elif position_type == "after":
for col in remaining_columns:
new_column_order.append(col)
if col == reference_column:
new_column_order.extend(valid_columns_to_move)
verbose and logger.info(
f"[{brick_display_name}] Moving columns after '{reference_column}'."
)
select_parts = []
for col in new_column_order:
sanitized_col = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Executing query to reorder columns."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
conn.close()
skipped_count = len(columns_to_move) - len(valid_columns_to_move)
if safe_mode and skipped_count > 0:
skipped_columns = [
col for col in columns_to_move if col not in all_columns
]
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipped {skipped_count} non-existent columns: {skipped_columns}"
)
verbose and logger.info(
f"[{brick_display_name}] Column reorder operation completed successfully. Moved {len(valid_columns_to_move)} column(s)."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during column reorder operation: {str(e)}"
)
raise
return result
Brick Info
version
v0.1.3
python
3.10,
3.11,
3.12,
3.13
requirements
- pandas
- polars[pyarrow]
- duckdb
- pyarrow