Distinct Rows

Return distinct rows from a DataFrame or Arrow Table.

Distinct Rows

Processing

Returns a dataset containing only the unique and distinct rows from the input data structure (DataFrame or Arrow Table).

Inputs

data
The input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) from which distinct rows should be extracted.

Inputs Types

Input Types
data DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data structure containing only the unique rows found in the input. The format depends on the selected Output Format option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Distinct Rows brick contains some changeable options:

Output Format
Specifies the desired format of the output data structure. Available choices are pandas (DataFrame), polars (DataFrame), or arrow (Arrow Table). Defaults to pandas.
Verbose
If enabled, detailed logging and status updates will be displayed during the execution of the distinct rows operation. Defaults to True.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None))


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def distinct_rows(
    data: Union[DataFrame, ArrowTable], options=None
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Distinct Rows"
    options = options or {}
    verbose = options.get("verbose", True)
    output_format = options.get("output_format", "pandas")
    result = None
    try:
        verbose and logger.info(
            f"[{brick_display_name}] Starting distinct rows operation."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = [col[0] for col in column_info]
        sanitized_columns = [f'"{_sanitize_identifier(col)}"' for col in all_columns]
        select_clause = ", ".join(sanitized_columns)
        verbose and logger.info(
            f"[{brick_display_name}] Getting distinct rows for all columns."
        )
        query = f"SELECT DISTINCT {select_clause} FROM input_table"
        verbose and logger.info(
            f"[{brick_display_name}] Executing query to get distinct rows."
        )
        if output_format == "pandas":
            result = conn.execute(query).df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to pandas DataFrame."
            )
        elif output_format == "polars":
            result = conn.execute(query).pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Polars DataFrame."
            )
        elif output_format == "arrow":
            result = conn.execute(query).fetch_arrow_table()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Arrow Table."
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported output format: {output_format}"
            )
            conn.close()
            raise ValueError(f"Unsupported output format: {output_format}")
        conn.close()
        verbose and logger.info(
            f"[{brick_display_name}] Distinct rows operation completed successfully. Returned {len(result)} distinct rows."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during distinct rows operation."
        )
        raise
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow