Flag Rows by Date

Flag rows from the dataset using a date filter defined by a fixed date range, relative date range, or matching date part. Creates a new column with '1' for matching rows.

Flag Rows by Date

Processing

Flag rows from the dataset using a date filter defined by a fixed date range, relative date range, or matching date part. Rows matching the specified criteria are flagged with '1', while non-matching rows or rows with invalid date values are flagged with NULL.

Inputs

data
The input dataset (Pandas DataFrame, Polars DataFrame, or PyArrow Table) to which the flag column will be added.
flag column name (optional)
The name of the new column that will contain the date match flag (1 or NULL).
date column (optional)
The name of the column in the dataset containing the date values used for filtering.
filter type (optional)
Defines the type of date filter to apply: fixed date_range, relative_range, or date_part matching.
start date (optional)
The starting date (YYYY-MM-DD, inclusive) for the fixed date range filter.
end date (optional)
The ending date (YYYY-MM-DD, inclusive) for the fixed date range filter.
relative type (optional)
Specifies the direction or context for relative filtering (e.g., last_n, next_n, current).
relative value (optional)
The numerical value (N) used in conjunction with relative unit for relative filtering.
relative unit (optional)
The time unit (e.g., days, months, years) used with relative value for relative filtering.
date part type (optional)
The date component (year, month, day, etc.) to extract and check when using the date_part filter.
date part values (optional)
A list of specific values that the extracted date part must match.

Inputs Types

Input Types
data DataFrame, ArrowTable
flag column name Str
date column Str
filter type Str
start date Str
end date Str
relative type Str
relative value Int
relative unit Str
date part type Str
date part values List

You can check the list of supported types here: Available Type Hints.

Outputs

result
The output dataset, which is the original data augmented with the new flag column (flag column name).

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Flag Rows by Date brick contains some changeable options:

Flag Column Name
The name of the new column that will store the flag result. Defaults to date_match_flag.
Date Column
The name of the column in the input data containing the date values to filter against.
Filter On
Defines the type of filtering to be applied: date_range (fixed dates), relative_range (relative to today), or date_part (matching specific components). Defaults to date_range.
Start Date (YYYY-MM-DD)
The fixed starting date for the date range filter.
End Date (YYYY-MM-DD)
The fixed ending date for the date range filter.
Relative Type
Specifies the direction or scope for relative filtering: last_n, next_n, current period, or until_now. Defaults to last_n.
Relative Value (N)
The numerical quantity (N) defining the extent of the relative range. Defaults to 7.
Relative Unit
The unit of time used for relative filtering: days, weeks, months, quarters, or years. Defaults to days.
Date Part Type
The specific date component to match when using the date_part filter: year, quarter, month, day, day_of_week, day_of_year, or week. Defaults to month.
Date Part Values
A list of numerical values that the extracted date part must match (e.g., [1, 2] for January and February).
Output Format
Specifies the desired format of the output dataset: pandas, polars, or arrow. Defaults to pandas.
Verbose
If enabled, detailed execution logs and information will be outputted.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from datetime import datetime, date
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Int, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    """
    return identifier.replace('"', '""')


def _build_date_range_condition(column_expr, start_date, end_date):
    """Build SQL condition for fixed date range filter."""
    conditions = []
    date_expr = f"TRY_CAST({column_expr} AS DATE)"
    conditions.append(f"{date_expr} IS NOT NULL")
    if start_date:
        conditions.append(f"{date_expr} >= DATE '{start_date}'")
    if end_date:
        conditions.append(f"{date_expr} <= DATE '{end_date}'")
    return f"({' AND '.join(conditions)})"


def _build_relative_range_condition(
    column_expr, relative_type, relative_value, relative_unit
):
    """Build SQL condition for relative date range filter."""
    date_expr = f"TRY_CAST({column_expr} AS DATE)"
    current_date = "CURRENT_DATE"
    if relative_unit == "days":
        interval = f"INTERVAL '{relative_value}' DAY"
    elif relative_unit == "weeks":
        interval = f"INTERVAL '{relative_value}' WEEK"
    elif relative_unit == "months":
        interval = f"INTERVAL '{relative_value}' MONTH"
    elif relative_unit == "quarters":
        interval = f"INTERVAL '{relative_value * 3}' MONTH"
    elif relative_unit == "years":
        interval = f"INTERVAL '{relative_value}' YEAR"
    else:
        interval = f"INTERVAL '{relative_value}' DAY"
    if relative_type == "last_n":
        condition = f"({date_expr} IS NOT NULL AND {date_expr} >= {current_date} - {interval} AND {date_expr} <= {current_date})"
    elif relative_type == "next_n":
        condition = f"({date_expr} IS NOT NULL AND {date_expr} >= {current_date} AND {date_expr} <= {current_date} + {interval})"
    elif relative_type == "current":
        if relative_unit == "days":
            condition = f"({date_expr} IS NOT NULL AND {date_expr} = {current_date})"
        elif relative_unit == "weeks":
            condition = f"({date_expr} IS NOT NULL AND date_trunc('week', {date_expr}) = date_trunc('week', {current_date}))"
        elif relative_unit == "months":
            condition = f"({date_expr} IS NOT NULL AND date_trunc('month', {date_expr}) = date_trunc('month', {current_date}))"
        elif relative_unit == "quarters":
            condition = f"({date_expr} IS NOT NULL AND date_trunc('quarter', {date_expr}) = date_trunc('quarter', {current_date}))"
        elif relative_unit == "years":
            condition = f"({date_expr} IS NOT NULL AND date_trunc('year', {date_expr}) = date_trunc('year', {current_date}))"
        else:
            condition = f"({date_expr} IS NOT NULL AND {date_expr} = {current_date})"
    elif relative_type == "until_now":
        condition = f"({date_expr} IS NOT NULL AND {date_expr} <= {current_date})"
    else:
        condition = f"{date_expr} IS NOT NULL"
    return condition


def _build_date_part_condition(column_expr, date_part_type, date_part_values):
    """Build SQL condition for date part filter."""
    if not date_part_values:
        return "FALSE"
    date_expr = f"TRY_CAST({column_expr} AS DATE)"
    if date_part_type == "year":
        part_expr = f"EXTRACT(YEAR FROM {date_expr})"
    elif date_part_type == "quarter":
        part_expr = f"EXTRACT(QUARTER FROM {date_expr})"
    elif date_part_type == "month":
        part_expr = f"EXTRACT(MONTH FROM {date_expr})"
    elif date_part_type == "day":
        part_expr = f"EXTRACT(DAY FROM {date_expr})"
    elif date_part_type == "day_of_week":
        part_expr = f"EXTRACT(DOW FROM {date_expr})"
    elif date_part_type == "day_of_year":
        part_expr = f"EXTRACT(DOY FROM {date_expr})"
    elif date_part_type == "week":
        part_expr = f"EXTRACT(WEEK FROM {date_expr})"
    else:
        part_expr = f"EXTRACT(YEAR FROM {date_expr})"
    try:
        int_values = [int(v) for v in date_part_values]
        values_list = ", ".join((str(v) for v in int_values))
        condition = f"({date_expr} IS NOT NULL AND {part_expr} IN ({values_list}))"
    except (ValueError, TypeError):
        condition = "FALSE"
    return condition


def flag_rows_by_date(
    data: Union[DataFrame, ArrowTable],
    flag_column_name: Str = None,
    date_column: Str = None,
    filter_type: Str = None,
    start_date: Str = None,
    end_date: Str = None,
    relative_type: Str = None,
    relative_value: Int = None,
    relative_unit: Str = None,
    date_part_type: Str = None,
    date_part_values: List = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Flag Rows by Date"
    options = options or {}
    verbose = options.get("verbose", True)
    flag_column_name = _coalesce(
        flag_column_name, options.get("flag_column_name", "date_match_flag")
    )
    date_column = _coalesce(date_column, options.get("date_column", ""))
    filter_type = _coalesce(filter_type, options.get("filter_type", "date_range"))
    start_date = _coalesce(start_date, options.get("start_date", ""))
    end_date = _coalesce(end_date, options.get("end_date", ""))
    relative_type = _coalesce(relative_type, options.get("relative_type", "last_n"))
    relative_value = _coalesce(relative_value, options.get("relative_value", 7))
    relative_unit = _coalesce(relative_unit, options.get("relative_unit", "days"))
    date_part_type = _coalesce(date_part_type, options.get("date_part_type", "month"))
    date_part_values = _coalesce(date_part_values, options.get("date_part_values", []))
    output_format = options.get("output_format", "pandas")
    result = None
    conn = None
    if not date_column:
        verbose and logger.error(
            f"[{brick_display_name}] Date column must be specified!"
        )
        raise ValueError("Date column must be specified!")
    if not flag_column_name or not isinstance(flag_column_name, str):
        verbose and logger.error(f"[{brick_display_name}] Invalid flag column name!")
        raise ValueError("Flag column name must be a non-empty string!")
    valid_filter_types = ["date_range", "relative_range", "date_part"]
    if filter_type not in valid_filter_types:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid filter type: {filter_type}."
        )
        raise ValueError(f"Filter type must be one of {valid_filter_types}")
    if filter_type == "date_range" and (not start_date) and (not end_date):
        verbose and logger.warning(
            f"[{brick_display_name}] No date range specified. All rows will be flagged as not matching."
        )
    if filter_type == "date_part":
        if not isinstance(date_part_values, list):
            verbose and logger.error(
                f"[{brick_display_name}] Invalid date_part_values format! Expected a list."
            )
            raise ValueError("Date part values must be provided as a list!")
        if not date_part_values:
            verbose and logger.warning(
                f"[{brick_display_name}] No date part values specified. All rows will be flagged as not matching."
            )
    try:
        verbose and logger.info(
            f"[{brick_display_name}] Starting flagging with filter type '{filter_type}'."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        verbose and logger.info(
            f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
        )
        if flag_column_name in all_columns:
            verbose and logger.warning(
                f"[{brick_display_name}] Flag column '{flag_column_name}' already exists and will be overwritten."
            )
        if date_column not in all_columns:
            verbose and logger.error(
                f"[{brick_display_name}] Date column '{date_column}' not found in data."
            )
            raise ValueError(f"Date column '{date_column}' not found in data.")
        verbose and logger.info(
            f"[{brick_display_name}] Using date column: '{date_column}' (type: {all_columns[date_column]})."
        )
        sanitized_col = _sanitize_identifier(date_column)
        col_expr = f'"{sanitized_col}"'
        if filter_type == "date_range":
            if not start_date and (not end_date):
                date_condition = "FALSE"
            else:
                date_condition = _build_date_range_condition(
                    col_expr, start_date, end_date
                )
                verbose and logger.info(
                    f"[{brick_display_name}] Date range filter: [{start_date or 'unbounded'}, {end_date or 'unbounded'}]."
                )
        elif filter_type == "relative_range":
            date_condition = _build_relative_range_condition(
                col_expr, relative_type, relative_value, relative_unit
            )
            verbose and logger.info(
                f"[{brick_display_name}] Relative range filter: {relative_type} {relative_value} {relative_unit}."
            )
        elif filter_type == "date_part":
            date_condition = _build_date_part_condition(
                col_expr, date_part_type, date_part_values
            )
            verbose and logger.info(
                f"[{brick_display_name}] Date part filter: {date_part_type} IN {date_part_values}."
            )
        flag_condition = f"CASE WHEN {date_condition} THEN '1' ELSE NULL END"
        select_parts = []
        for col in all_columns.keys():
            if col != flag_column_name:
                sanitized_col_iter = _sanitize_identifier(col)
                select_parts.append(f'"{sanitized_col_iter}"')
        sanitized_flag_col = _sanitize_identifier(flag_column_name)
        select_parts.append(f'{flag_condition} AS "{sanitized_flag_col}"')
        select_clause = ", ".join(select_parts)
        query = f"SELECT {select_clause} FROM input_table"
        verbose and logger.info(
            f"[{brick_display_name}] Creating flag column '{flag_column_name}'."
        )
        if output_format == "pandas":
            result = conn.execute(query).df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to pandas DataFrame."
            )
        elif output_format == "polars":
            result = conn.execute(query).pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Polars DataFrame."
            )
        elif output_format == "arrow":
            result = conn.execute(query).fetch_arrow_table()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Arrow Table."
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported output format: {output_format}"
            )
            raise ValueError(f"Unsupported output format: {output_format}")
        verbose and logger.info(
            f"[{brick_display_name}] Flagging completed successfully."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during flagging: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow