Join Data
Enrich one dataset with columns from another using various join types (inner, left, right, full outer, cross).
Join Data
Processing
This function enriches a primary dataset (left data) by combining it with columns from a lookup dataset (right data) based on common keys. Users can specify the type of join (inner, left, right, full outer, cross), map corresponding join columns, define which columns to keep from each source, and handle duplicate column names using suffixes.
Inputs
- left data
- The primary dataset (left side of the join) to be enriched.
- right data
- The lookup dataset (right side of the join) containing supplementary data.
- join columns (optional)
- A list defining the columns used for matching records. This can be a list of simple column names (if they match in both datasets) or a key-value list defining the mapping (e.g.,
left_key: right_key). Required for all joins except cross joins. - join type (optional)
- The type of join to perform (e.g., inner, left, right, full, cross). Defaults to the value set in options.
- left columns (optional)
- A list of specific column names to retain from the left dataset. If left empty, all columns from the left dataset are kept.
- right columns (optional)
- A list of specific column names to retain from the right dataset. If left empty, all columns from the right dataset are kept.
Inputs Types
| Input | Types |
|---|---|
left data |
DataFrame, ArrowTable |
right data |
DataFrame, ArrowTable |
join columns |
List |
join type |
Str |
left columns |
List |
right columns |
List |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting dataset after the join operation, containing merged and selected columns from both input datasets. The format depends on the
Output Formatoption.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Join Data brick contains some changeable options:
- Join Type
- Determines how the datasets are combined. Available choices are:
inner,left,right,full(full outer), andcross. Defaults toleft. - Join Columns Mapping
- Specifies which columns in the left dataset match which columns in the right dataset. This uses key-value pair mapping, where the key is the left column and the value is the right column.
- Left Columns to Keep
- List of column names to retain from the left dataset. If this list is empty, all columns from the left dataset are included in the output.
- Right Columns to Keep
- List of column names to retain from the right dataset. If this list is empty, all columns from the right dataset are included in the output.
- Left Suffix for Duplicate
- A string suffix appended to columns originating from the left table if their names conflict with columns from the right table. Defaults to
_left. - Right Suffix for Duplicate
- A string suffix appended to columns originating from the right table if their names conflict with columns from the left table. Defaults to
_right. - Drop Right Join Columns
- If enabled (default is True), the columns used for matching keys in the right dataset will be excluded from the final output, as they are redundant.
- Output Format
- Specifies the desired format for the resulting dataset. Choices include
pandas(DataFrame),polars(DataFrame), orarrow(Arrow Table). Defaults topandas. - Safe Mode
- If enabled, the operation will log a warning and skip non-existent columns specified in the join or column selection lists, rather than raising a critical error.
- Verbose
- If enabled, detailed logging of the execution steps, detected types, and final shape will be outputted.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def join_data(
left_data: Union[DataFrame, ArrowTable],
right_data: Union[DataFrame, ArrowTable],
join_columns: List = None,
join_type: Str = None,
left_columns: List = None,
right_columns: List = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Join Data"
options = options or {}
verbose = options.get("verbose", True)
join_type = _coalesce(join_type, options.get("join_type", "left"))
join_columns = _coalesce(join_columns, options.get("join_columns", []))
left_columns = _coalesce(left_columns, options.get("left_columns", []))
right_columns = _coalesce(right_columns, options.get("right_columns", []))
suffix_left = options.get("suffix_left", "_left")
suffix_right = options.get("suffix_right", "_right")
drop_right_join_columns = options.get("drop_right_join_columns", True)
output_format = options.get("output_format", "pandas")
safe_mode = options.get("safe_mode", False)
result = None
conn = None
try:
valid_join_types = ["inner", "left", "right", "full", "cross"]
join_type_lower = join_type.lower()
if join_type_lower not in valid_join_types:
verbose and logger.error(
f"[{brick_display_name}] Invalid join type '{join_type}'. Must be one of: {valid_join_types}"
)
raise ValueError(
f"Invalid join type '{join_type}'. Must be one of: {valid_join_types}"
)
join_type_map = {
"inner": "INNER JOIN",
"left": "LEFT JOIN",
"right": "RIGHT JOIN",
"full": "FULL OUTER JOIN",
"cross": "CROSS JOIN",
}
sql_join_type = join_type_map[join_type_lower]
if join_type_lower != "cross":
if (
not join_columns
or not isinstance(join_columns, list)
or len(join_columns) == 0
):
verbose and logger.error(
f"[{brick_display_name}] Join columns must be specified for {join_type} join!"
)
raise ValueError(
f"Join columns must be specified for {join_type} join!"
)
verbose and logger.info(
f"[{brick_display_name}] Starting {join_type} join operation."
)
left_data_type = None
right_data_type = None
if isinstance(left_data, pd.DataFrame):
left_data_type = "pandas"
elif isinstance(left_data, pl.DataFrame):
left_data_type = "polars"
elif isinstance(left_data, (pa.Table, pa.lib.Table)):
left_data_type = "arrow"
if isinstance(right_data, pd.DataFrame):
right_data_type = "pandas"
elif isinstance(right_data, pl.DataFrame):
right_data_type = "polars"
elif isinstance(right_data, (pa.Table, pa.lib.Table)):
right_data_type = "arrow"
if left_data_type is None or right_data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Both datasets must be pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Both datasets must be pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected left format: {left_data_type}, right format: {right_data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("left_table", left_data)
conn.register("right_table", right_data)
left_column_info = conn.execute("DESCRIBE left_table").fetchall()
right_column_info = conn.execute("DESCRIBE right_table").fetchall()
left_all_columns = {col[0]: col[1] for col in left_column_info}
right_all_columns = {col[0]: col[1] for col in right_column_info}
verbose and logger.info(
f"[{brick_display_name}] Left table columns: {len(left_all_columns)}, Right table columns: {len(right_all_columns)}."
)
join_conditions = []
left_join_cols = []
right_join_cols = []
if join_type_lower != "cross":
for join_spec in join_columns:
if isinstance(join_spec, dict):
left_col = join_spec.get("key", "")
right_col = join_spec.get("value", "")
else:
left_col = join_spec
right_col = join_spec
if not left_col or not right_col:
continue
if left_col not in left_all_columns:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipping non-existent left column '{left_col}'."
)
continue
else:
verbose and logger.error(
f"[{brick_display_name}] Left join column '{left_col}' not found in left dataset!"
)
raise ValueError(
f"Left join column '{left_col}' not found in left dataset!"
)
if right_col not in right_all_columns:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipping non-existent right column '{right_col}'."
)
continue
else:
verbose and logger.error(
f"[{brick_display_name}] Right join column '{right_col}' not found in right dataset!"
)
raise ValueError(
f"Right join column '{right_col}' not found in right dataset!"
)
sanitized_left = _sanitize_identifier(left_col)
sanitized_right = _sanitize_identifier(right_col)
join_conditions.append(
f'left_table."{sanitized_left}" = right_table."{sanitized_right}"'
)
left_join_cols.append(left_col)
right_join_cols.append(right_col)
if not join_conditions:
verbose and logger.error(
f"[{brick_display_name}] No valid join columns found!"
)
raise ValueError("No valid join columns found!")
verbose and logger.info(
f"[{brick_display_name}] Join conditions: {len(join_conditions)} column pair(s)."
)
left_cols_to_select = []
if left_columns and isinstance(left_columns, list) and (len(left_columns) > 0):
for col in left_columns:
if col not in left_all_columns:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipping non-existent left column '{col}'."
)
continue
else:
verbose and logger.error(
f"[{brick_display_name}] Left column '{col}' not found!"
)
raise ValueError(f"Left column '{col}' not found!")
left_cols_to_select.append(col)
else:
left_cols_to_select = list(left_all_columns.keys())
verbose and logger.info(
f"[{brick_display_name}] Selecting {len(left_cols_to_select)} columns from left table."
)
right_cols_to_select = []
if (
right_columns
and isinstance(right_columns, list)
and (len(right_columns) > 0)
):
for col in right_columns:
if col not in right_all_columns:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipping non-existent right column '{col}'."
)
continue
else:
verbose and logger.error(
f"[{brick_display_name}] Right column '{col}' not found!"
)
raise ValueError(f"Right column '{col}' not found!")
right_cols_to_select.append(col)
else:
right_cols_to_select = list(right_all_columns.keys())
if drop_right_join_columns and join_type_lower != "cross":
right_cols_to_select = [
col for col in right_cols_to_select if col not in right_join_cols
]
verbose and logger.info(
f"[{brick_display_name}] Selecting {len(right_cols_to_select)} columns from right table."
)
select_parts = []
for col in left_cols_to_select:
sanitized_col = _sanitize_identifier(col)
if col in right_cols_to_select and col not in left_join_cols:
sanitized_suffix = _sanitize_identifier(col + suffix_left)
select_parts.append(
f'left_table."{sanitized_col}" AS "{sanitized_suffix}"'
)
else:
select_parts.append(f'left_table."{sanitized_col}"')
for col in right_cols_to_select:
sanitized_col = _sanitize_identifier(col)
if col in left_cols_to_select and col not in right_join_cols:
sanitized_suffix = _sanitize_identifier(col + suffix_right)
select_parts.append(
f'right_table."{sanitized_col}" AS "{sanitized_suffix}"'
)
else:
select_parts.append(f'right_table."{sanitized_col}"')
select_clause = ", ".join(select_parts)
if join_type_lower == "cross":
query = f"\n SELECT {select_clause}\n FROM left_table\n {sql_join_type} right_table\n "
else:
on_clause = " AND ".join(join_conditions)
query = f"\n SELECT {select_clause}\n FROM left_table\n {sql_join_type} right_table\n ON {on_clause}\n "
verbose and logger.info(
f"[{brick_display_name}] Executing {join_type} join query."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame. Shape: {result.shape}"
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame. Shape: {result.shape}"
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table. Rows: {result.num_rows}"
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Join operation completed successfully."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during join operation: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow