Join Data

Enrich one dataset with columns from another using various join types (inner, left, right, full outer, cross).

Join Data

Processing

This function enriches a primary dataset (left data) by combining it with columns from a lookup dataset (right data) based on common keys. Users can specify the type of join (inner, left, right, full outer, cross), map corresponding join columns, define which columns to keep from each source, and handle duplicate column names using suffixes.

Inputs

left data
The primary dataset (left side of the join) to be enriched.
right data
The lookup dataset (right side of the join) containing supplementary data.
join columns (optional)
A list defining the columns used for matching records. This can be a list of simple column names (if they match in both datasets) or a key-value list defining the mapping (e.g., left_key: right_key). Required for all joins except cross joins.
join type (optional)
The type of join to perform (e.g., inner, left, right, full, cross). Defaults to the value set in options.
left columns (optional)
A list of specific column names to retain from the left dataset. If left empty, all columns from the left dataset are kept.
right columns (optional)
A list of specific column names to retain from the right dataset. If left empty, all columns from the right dataset are kept.

Inputs Types

Input Types
left data DataFrame, ArrowTable
right data DataFrame, ArrowTable
join columns List
join type Str
left columns List
right columns List

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting dataset after the join operation, containing merged and selected columns from both input datasets. The format depends on the Output Format option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Join Data brick contains some changeable options:

Join Type
Determines how the datasets are combined. Available choices are: inner, left, right, full (full outer), and cross. Defaults to left.
Join Columns Mapping
Specifies which columns in the left dataset match which columns in the right dataset. This uses key-value pair mapping, where the key is the left column and the value is the right column.
Left Columns to Keep
List of column names to retain from the left dataset. If this list is empty, all columns from the left dataset are included in the output.
Right Columns to Keep
List of column names to retain from the right dataset. If this list is empty, all columns from the right dataset are included in the output.
Left Suffix for Duplicate
A string suffix appended to columns originating from the left table if their names conflict with columns from the right table. Defaults to _left.
Right Suffix for Duplicate
A string suffix appended to columns originating from the right table if their names conflict with columns from the left table. Defaults to _right.
Drop Right Join Columns
If enabled (default is True), the columns used for matching keys in the right dataset will be excluded from the final output, as they are redundant.
Output Format
Specifies the desired format for the resulting dataset. Choices include pandas (DataFrame), polars (DataFrame), or arrow (Arrow Table). Defaults to pandas.
Safe Mode
If enabled, the operation will log a warning and skip non-existent columns specified in the join or column selection lists, rather than raising a critical error.
Verbose
If enabled, detailed logging of the execution steps, detected types, and final shape will be outputted.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def join_data(
    left_data: Union[DataFrame, ArrowTable],
    right_data: Union[DataFrame, ArrowTable],
    join_columns: List = None,
    join_type: Str = None,
    left_columns: List = None,
    right_columns: List = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Join Data"
    options = options or {}
    verbose = options.get("verbose", True)
    join_type = _coalesce(join_type, options.get("join_type", "left"))
    join_columns = _coalesce(join_columns, options.get("join_columns", []))
    left_columns = _coalesce(left_columns, options.get("left_columns", []))
    right_columns = _coalesce(right_columns, options.get("right_columns", []))
    suffix_left = options.get("suffix_left", "_left")
    suffix_right = options.get("suffix_right", "_right")
    drop_right_join_columns = options.get("drop_right_join_columns", True)
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    conn = None
    try:
        valid_join_types = ["inner", "left", "right", "full", "cross"]
        join_type_lower = join_type.lower()
        if join_type_lower not in valid_join_types:
            verbose and logger.error(
                f"[{brick_display_name}] Invalid join type '{join_type}'. Must be one of: {valid_join_types}"
            )
            raise ValueError(
                f"Invalid join type '{join_type}'. Must be one of: {valid_join_types}"
            )
        join_type_map = {
            "inner": "INNER JOIN",
            "left": "LEFT JOIN",
            "right": "RIGHT JOIN",
            "full": "FULL OUTER JOIN",
            "cross": "CROSS JOIN",
        }
        sql_join_type = join_type_map[join_type_lower]
        if join_type_lower != "cross":
            if (
                not join_columns
                or not isinstance(join_columns, list)
                or len(join_columns) == 0
            ):
                verbose and logger.error(
                    f"[{brick_display_name}] Join columns must be specified for {join_type} join!"
                )
                raise ValueError(
                    f"Join columns must be specified for {join_type} join!"
                )
        verbose and logger.info(
            f"[{brick_display_name}] Starting {join_type} join operation."
        )
        left_data_type = None
        right_data_type = None
        if isinstance(left_data, pd.DataFrame):
            left_data_type = "pandas"
        elif isinstance(left_data, pl.DataFrame):
            left_data_type = "polars"
        elif isinstance(left_data, (pa.Table, pa.lib.Table)):
            left_data_type = "arrow"
        if isinstance(right_data, pd.DataFrame):
            right_data_type = "pandas"
        elif isinstance(right_data, pl.DataFrame):
            right_data_type = "polars"
        elif isinstance(right_data, (pa.Table, pa.lib.Table)):
            right_data_type = "arrow"
        if left_data_type is None or right_data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Both datasets must be pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Both datasets must be pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected left format: {left_data_type}, right format: {right_data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("left_table", left_data)
        conn.register("right_table", right_data)
        left_column_info = conn.execute("DESCRIBE left_table").fetchall()
        right_column_info = conn.execute("DESCRIBE right_table").fetchall()
        left_all_columns = {col[0]: col[1] for col in left_column_info}
        right_all_columns = {col[0]: col[1] for col in right_column_info}
        verbose and logger.info(
            f"[{brick_display_name}] Left table columns: {len(left_all_columns)}, Right table columns: {len(right_all_columns)}."
        )
        join_conditions = []
        left_join_cols = []
        right_join_cols = []
        if join_type_lower != "cross":
            for join_spec in join_columns:
                if isinstance(join_spec, dict):
                    left_col = join_spec.get("key", "")
                    right_col = join_spec.get("value", "")
                else:
                    left_col = join_spec
                    right_col = join_spec
                if not left_col or not right_col:
                    continue
                if left_col not in left_all_columns:
                    if safe_mode:
                        verbose and logger.warning(
                            f"[{brick_display_name}] Safe mode: Skipping non-existent left column '{left_col}'."
                        )
                        continue
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Left join column '{left_col}' not found in left dataset!"
                        )
                        raise ValueError(
                            f"Left join column '{left_col}' not found in left dataset!"
                        )
                if right_col not in right_all_columns:
                    if safe_mode:
                        verbose and logger.warning(
                            f"[{brick_display_name}] Safe mode: Skipping non-existent right column '{right_col}'."
                        )
                        continue
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Right join column '{right_col}' not found in right dataset!"
                        )
                        raise ValueError(
                            f"Right join column '{right_col}' not found in right dataset!"
                        )
                sanitized_left = _sanitize_identifier(left_col)
                sanitized_right = _sanitize_identifier(right_col)
                join_conditions.append(
                    f'left_table."{sanitized_left}" = right_table."{sanitized_right}"'
                )
                left_join_cols.append(left_col)
                right_join_cols.append(right_col)
            if not join_conditions:
                verbose and logger.error(
                    f"[{brick_display_name}] No valid join columns found!"
                )
                raise ValueError("No valid join columns found!")
            verbose and logger.info(
                f"[{brick_display_name}] Join conditions: {len(join_conditions)} column pair(s)."
            )
        left_cols_to_select = []
        if left_columns and isinstance(left_columns, list) and (len(left_columns) > 0):
            for col in left_columns:
                if col not in left_all_columns:
                    if safe_mode:
                        verbose and logger.warning(
                            f"[{brick_display_name}] Safe mode: Skipping non-existent left column '{col}'."
                        )
                        continue
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Left column '{col}' not found!"
                        )
                        raise ValueError(f"Left column '{col}' not found!")
                left_cols_to_select.append(col)
        else:
            left_cols_to_select = list(left_all_columns.keys())
        verbose and logger.info(
            f"[{brick_display_name}] Selecting {len(left_cols_to_select)} columns from left table."
        )
        right_cols_to_select = []
        if (
            right_columns
            and isinstance(right_columns, list)
            and (len(right_columns) > 0)
        ):
            for col in right_columns:
                if col not in right_all_columns:
                    if safe_mode:
                        verbose and logger.warning(
                            f"[{brick_display_name}] Safe mode: Skipping non-existent right column '{col}'."
                        )
                        continue
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Right column '{col}' not found!"
                        )
                        raise ValueError(f"Right column '{col}' not found!")
                right_cols_to_select.append(col)
        else:
            right_cols_to_select = list(right_all_columns.keys())
        if drop_right_join_columns and join_type_lower != "cross":
            right_cols_to_select = [
                col for col in right_cols_to_select if col not in right_join_cols
            ]
        verbose and logger.info(
            f"[{brick_display_name}] Selecting {len(right_cols_to_select)} columns from right table."
        )
        select_parts = []
        for col in left_cols_to_select:
            sanitized_col = _sanitize_identifier(col)
            if col in right_cols_to_select and col not in left_join_cols:
                sanitized_suffix = _sanitize_identifier(col + suffix_left)
                select_parts.append(
                    f'left_table."{sanitized_col}" AS "{sanitized_suffix}"'
                )
            else:
                select_parts.append(f'left_table."{sanitized_col}"')
        for col in right_cols_to_select:
            sanitized_col = _sanitize_identifier(col)
            if col in left_cols_to_select and col not in right_join_cols:
                sanitized_suffix = _sanitize_identifier(col + suffix_right)
                select_parts.append(
                    f'right_table."{sanitized_col}" AS "{sanitized_suffix}"'
                )
            else:
                select_parts.append(f'right_table."{sanitized_col}"')
        select_clause = ", ".join(select_parts)
        if join_type_lower == "cross":
            query = f"\n            SELECT {select_clause}\n            FROM left_table\n            {sql_join_type} right_table\n            "
        else:
            on_clause = " AND ".join(join_conditions)
            query = f"\n            SELECT {select_clause}\n            FROM left_table\n            {sql_join_type} right_table\n            ON {on_clause}\n            "
        verbose and logger.info(
            f"[{brick_display_name}] Executing {join_type} join query."
        )
        if output_format == "pandas":
            result = conn.execute(query).df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to pandas DataFrame. Shape: {result.shape}"
            )
        elif output_format == "polars":
            result = conn.execute(query).pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Polars DataFrame. Shape: {result.shape}"
            )
        elif output_format == "arrow":
            result = conn.execute(query).fetch_arrow_table()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Arrow Table. Rows: {result.num_rows}"
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported output format: {output_format}"
            )
            raise ValueError(f"Unsupported output format: {output_format}")
        verbose and logger.info(
            f"[{brick_display_name}] Join operation completed successfully."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during join operation: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow