Copy Column
Copy the content of a source column into a target column.
Copy Column
Processing
This brick copies the data content from a specified source column into a target column within the input dataset. If the target column already exists, the function's behavior is governed by the Overwrite if Target Exists option.
Inputs
- data
- The input dataset (Pandas DataFrame, Polars DataFrame, or PyArrow Table) containing the columns to be processed.
- source column (optional)
- The name of the existing column whose contents will be copied. This input is typically set via the brick options.
- target column (optional)
- The name of the column where the copied data will be placed. This input is typically set via the brick options.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
source column |
Str |
target column |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The modified dataset, with the contents of the source column duplicated into the target column, returned in the format specified by the
Output Typeoption.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Copy Column brick contains some changeable options:
- Source Column
- The name of the existing column from which data will be copied. This field must be specified.
- Target Column
- The name of the column where the copied data will be stored. This column will be created if it does not exist.
- Output Type
- Determines the desired format for the returned dataset (pandas, polars, or arrow). Defaults to 'pandas'.
- Overwrite if Target Exists
- If enabled (default), and the target column already exists, its contents will be overwritten with the source data. If disabled and the target column exists, an error will be raised.
- Verbose
- Enables detailed logging messages regarding the column duplication process during execution.
import logging
from coded_flows.types import Union, DataFrame, ArrowTable, Str
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None))
def copy_column(
data: Union[DataFrame, ArrowTable],
source_column: Str = None,
target_column: Str = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Copy Column"
options = options or {}
source_column = _coalesce(source_column, options.get("source_column", ""))
target_column = _coalesce(target_column, options.get("target_column", ""))
output_type = options.get("output_type", "pandas")
overwrite = options.get("overwrite", True)
verbose = options.get("verbose", True)
result = None
con = duckdb.connect(":memory:")
try:
if not source_column:
verbose and logger.error(
f"[{brick_display_name}] Source column name cannot be empty"
)
raise ValueError("Source column name cannot be empty")
if not target_column:
verbose and logger.error(
f"[{brick_display_name}] Target column name cannot be empty"
)
raise ValueError("Target column name cannot be empty")
verbose and logger.info(
f"[{brick_display_name}] Starting column duplication: '{source_column}' -> '{target_column}'."
)
is_pandas = isinstance(data, pd.DataFrame)
is_polars = isinstance(data, pl.DataFrame)
is_arrow = isinstance(data, pa.Table)
if is_arrow or is_pandas or is_polars:
con.register("temp_table", data)
columns = con.execute("DESCRIBE temp_table").fetchall()
column_names = [col[0] for col in columns]
target_exists = target_column in column_names
if source_column not in column_names:
verbose and logger.error(
f"[{brick_display_name}] Source column '{source_column}' not found"
)
raise ValueError(f"Source column '{source_column}' not found")
verbose and logger.info(
f"[{brick_display_name}] Duplicating column using DuckDB."
)
query = ""
if target_exists:
if not overwrite:
error_msg = f"Target column '{target_column}' already exists and overwrite is set to False."
verbose and logger.error(f"[{brick_display_name}] {error_msg}")
raise ValueError(error_msg)
else:
verbose and logger.info(
f"[{brick_display_name}] Target column '{target_column}' exists and will be overwritten in place."
)
query = f'SELECT * REPLACE ("{target_column}" AS "{source_column}") FROM temp_table'
else:
query = (
f'SELECT *, "{source_column}" AS "{target_column}" FROM temp_table'
)
if output_type == "pandas":
result = con.execute(query).df()
elif output_type == "polars":
result = con.execute(query).pl()
elif output_type == "arrow":
result = con.execute(query).fetch_arrow_table()
else:
verbose and logger.error(
f"Unsupported data type. Supported types: DataFrame, ArrowTable"
)
raise TypeError(
f"Unsupported data type. Supported types: DataFrame, ArrowTable"
)
verbose and logger.info(
f"[{brick_display_name}] Column '{source_column}' successfully duplicated to '{target_column}'."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error duplicating column: {str(e)}"
)
raise
finally:
con.close()
return result
Brick Info
version
v0.1.3
python
3.10,
3.11,
3.12,
3.13
requirements
- pandas
- polars[pyarrow]
- duckdb
- pyarrow