Flag Rows by Date
Flag rows from the dataset using a date filter defined by a fixed date range, relative date range, or matching date part. Creates a new column with '1' for matching rows.
Flag Rows by Date
Processing
Flag rows from the dataset using a date filter defined by a fixed date range, relative date range, or matching date part. Rows matching the specified criteria are flagged with '1', while non-matching rows or rows with invalid date values are flagged with NULL.
Inputs
data- The input dataset (Pandas DataFrame, Polars DataFrame, or PyArrow Table) to which the flag column will be added.
flag column name(optional)- The name of the new column that will contain the date match flag (1 or NULL).
date column(optional)- The name of the column in the dataset containing the date values used for filtering.
filter type(optional)- Defines the type of date filter to apply: fixed
date_range,relative_range, ordate_partmatching. start date(optional)- The starting date (YYYY-MM-DD, inclusive) for the fixed date range filter.
end date(optional)- The ending date (YYYY-MM-DD, inclusive) for the fixed date range filter.
relative type(optional)- Specifies the direction or context for relative filtering (e.g.,
last_n,next_n,current). relative value(optional)- The numerical value (N) used in conjunction with
relative unitfor relative filtering. relative unit(optional)- The time unit (e.g.,
days,months,years) used withrelative valuefor relative filtering. date part type(optional)- The date component (
year,month,day, etc.) to extract and check when using thedate_partfilter. date part values(optional)- A list of specific values that the extracted date part must match.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
flag column name |
Str |
date column |
Str |
filter type |
Str |
start date |
Str |
end date |
Str |
relative type |
Str |
relative value |
Int |
relative unit |
Str |
date part type |
Str |
date part values |
List |
You can check the list of supported types here: Available Type Hints.
Outputs
result- The output dataset, which is the original data augmented with the new flag column (
flag column name).
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Flag Rows by Date brick contains some changeable options:
- Flag Column Name
- The name of the new column that will store the flag result. Defaults to
date_match_flag. - Date Column
- The name of the column in the input data containing the date values to filter against.
- Filter On
- Defines the type of filtering to be applied:
date_range(fixed dates),relative_range(relative to today), ordate_part(matching specific components). Defaults todate_range. - Start Date (YYYY-MM-DD)
- The fixed starting date for the date range filter.
- End Date (YYYY-MM-DD)
- The fixed ending date for the date range filter.
- Relative Type
- Specifies the direction or scope for relative filtering:
last_n,next_n,currentperiod, oruntil_now. Defaults tolast_n. - Relative Value (N)
- The numerical quantity (N) defining the extent of the relative range. Defaults to 7.
- Relative Unit
- The unit of time used for relative filtering:
days,weeks,months,quarters, oryears. Defaults todays. - Date Part Type
- The specific date component to match when using the
date_partfilter:year,quarter,month,day,day_of_week,day_of_year, orweek. Defaults tomonth. - Date Part Values
- A list of numerical values that the extracted date part must match (e.g., [1, 2] for January and February).
- Output Format
- Specifies the desired format of the output dataset:
pandas,polars, orarrow. Defaults topandas. - Verbose
- If enabled, detailed execution logs and information will be outputted.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Int
from coded_flows.utils import CodedFlowsLogger
logger = CodedFlowsLogger(name="Flag Rows by Date", level=logging.INFO)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
"""
return identifier.replace('"', '""')
def _build_date_range_condition(column_expr, start_date, end_date):
"""Build SQL condition for fixed date range filter."""
conditions = []
date_expr = f"TRY_CAST({column_expr} AS DATE)"
conditions.append(f"{date_expr} IS NOT NULL")
if start_date:
conditions.append(f"{date_expr} >= DATE '{start_date}'")
if end_date:
conditions.append(f"{date_expr} <= DATE '{end_date}'")
return f"({' AND '.join(conditions)})"
def _build_relative_range_condition(
column_expr, relative_type, relative_value, relative_unit
):
"""Build SQL condition for relative date range filter."""
date_expr = f"TRY_CAST({column_expr} AS DATE)"
current_date = "CURRENT_DATE"
if relative_unit == "days":
interval = f"INTERVAL '{relative_value}' DAY"
elif relative_unit == "weeks":
interval = f"INTERVAL '{relative_value}' WEEK"
elif relative_unit == "months":
interval = f"INTERVAL '{relative_value}' MONTH"
elif relative_unit == "quarters":
interval = f"INTERVAL '{relative_value * 3}' MONTH"
elif relative_unit == "years":
interval = f"INTERVAL '{relative_value}' YEAR"
else:
interval = f"INTERVAL '{relative_value}' DAY"
if relative_type == "last_n":
condition = f"({date_expr} IS NOT NULL AND {date_expr} >= {current_date} - {interval} AND {date_expr} <= {current_date})"
elif relative_type == "next_n":
condition = f"({date_expr} IS NOT NULL AND {date_expr} >= {current_date} AND {date_expr} <= {current_date} + {interval})"
elif relative_type == "current":
if relative_unit == "days":
condition = f"({date_expr} IS NOT NULL AND {date_expr} = {current_date})"
elif relative_unit == "weeks":
condition = f"({date_expr} IS NOT NULL AND date_trunc('week', {date_expr}) = date_trunc('week', {current_date}))"
elif relative_unit == "months":
condition = f"({date_expr} IS NOT NULL AND date_trunc('month', {date_expr}) = date_trunc('month', {current_date}))"
elif relative_unit == "quarters":
condition = f"({date_expr} IS NOT NULL AND date_trunc('quarter', {date_expr}) = date_trunc('quarter', {current_date}))"
elif relative_unit == "years":
condition = f"({date_expr} IS NOT NULL AND date_trunc('year', {date_expr}) = date_trunc('year', {current_date}))"
else:
condition = f"({date_expr} IS NOT NULL AND {date_expr} = {current_date})"
elif relative_type == "until_now":
condition = f"({date_expr} IS NOT NULL AND {date_expr} <= {current_date})"
else:
condition = f"{date_expr} IS NOT NULL"
return condition
def _build_date_part_condition(column_expr, date_part_type, date_part_values):
"""Build SQL condition for date part filter."""
if not date_part_values:
return "FALSE"
date_expr = f"TRY_CAST({column_expr} AS DATE)"
if date_part_type == "year":
part_expr = f"EXTRACT(YEAR FROM {date_expr})"
elif date_part_type == "quarter":
part_expr = f"EXTRACT(QUARTER FROM {date_expr})"
elif date_part_type == "month":
part_expr = f"EXTRACT(MONTH FROM {date_expr})"
elif date_part_type == "day":
part_expr = f"EXTRACT(DAY FROM {date_expr})"
elif date_part_type == "day_of_week":
part_expr = f"EXTRACT(DOW FROM {date_expr})"
elif date_part_type == "day_of_year":
part_expr = f"EXTRACT(DOY FROM {date_expr})"
elif date_part_type == "week":
part_expr = f"EXTRACT(WEEK FROM {date_expr})"
else:
part_expr = f"EXTRACT(YEAR FROM {date_expr})"
try:
int_values = [int(v) for v in date_part_values]
values_list = ", ".join((str(v) for v in int_values))
condition = f"({date_expr} IS NOT NULL AND {part_expr} IN ({values_list}))"
except (ValueError, TypeError):
condition = "FALSE"
return condition
def flag_rows_by_date(
data: Union[DataFrame, ArrowTable],
flag_column_name: Str = None,
date_column: Str = None,
filter_type: Str = None,
start_date: Str = None,
end_date: Str = None,
relative_type: Str = None,
relative_value: Int = None,
relative_unit: Str = None,
date_part_type: Str = None,
date_part_values: List = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
options = options or {}
verbose = options.get("verbose", True)
flag_column_name = _coalesce(
flag_column_name, options.get("flag_column_name", "date_match_flag")
)
date_column = _coalesce(date_column, options.get("date_column", ""))
filter_type = _coalesce(filter_type, options.get("filter_type", "date_range"))
start_date = _coalesce(start_date, options.get("start_date", ""))
end_date = _coalesce(end_date, options.get("end_date", ""))
relative_type = _coalesce(relative_type, options.get("relative_type", "last_n"))
relative_value = _coalesce(relative_value, options.get("relative_value", 7))
relative_unit = _coalesce(relative_unit, options.get("relative_unit", "days"))
date_part_type = _coalesce(date_part_type, options.get("date_part_type", "month"))
date_part_values = _coalesce(date_part_values, options.get("date_part_values", []))
output_format = options.get("output_format", "pandas")
result = None
conn = None
if not date_column:
verbose and logger.error(f"Date column must be specified!")
raise ValueError("Date column must be specified!")
if not flag_column_name or not isinstance(flag_column_name, str):
verbose and logger.error(f"Invalid flag column name!")
raise ValueError("Flag column name must be a non-empty string!")
valid_filter_types = ["date_range", "relative_range", "date_part"]
if filter_type not in valid_filter_types:
verbose and logger.error(f"Invalid filter type: {filter_type}.")
raise ValueError(f"Filter type must be one of {valid_filter_types}")
if filter_type == "date_range" and (not start_date) and (not end_date):
verbose and logger.warning(
f"No date range specified. All rows will be flagged as not matching."
)
if filter_type == "date_part":
if not isinstance(date_part_values, list):
verbose and logger.error(
f"Invalid date_part_values format! Expected a list."
)
raise ValueError("Date part values must be provided as a list!")
if not date_part_values:
verbose and logger.warning(
f"No date part values specified. All rows will be flagged as not matching."
)
try:
verbose and logger.info(f"Starting flagging with filter type '{filter_type}'.")
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(f"Detected input format: {data_type}.")
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(f"Total columns in data: {len(all_columns)}.")
if flag_column_name in all_columns:
verbose and logger.warning(
f"Flag column '{flag_column_name}' already exists and will be overwritten."
)
if date_column not in all_columns:
verbose and logger.error(f"Date column '{date_column}' not found in data.")
raise ValueError(f"Date column '{date_column}' not found in data.")
verbose and logger.info(
f"Using date column: '{date_column}' (type: {all_columns[date_column]})."
)
sanitized_col = _sanitize_identifier(date_column)
col_expr = f'"{sanitized_col}"'
if filter_type == "date_range":
if not start_date and (not end_date):
date_condition = "FALSE"
else:
date_condition = _build_date_range_condition(
col_expr, start_date, end_date
)
verbose and logger.info(
f"Date range filter: [{start_date or 'unbounded'}, {end_date or 'unbounded'}]."
)
elif filter_type == "relative_range":
date_condition = _build_relative_range_condition(
col_expr, relative_type, relative_value, relative_unit
)
verbose and logger.info(
f"Relative range filter: {relative_type} {relative_value} {relative_unit}."
)
elif filter_type == "date_part":
date_condition = _build_date_part_condition(
col_expr, date_part_type, date_part_values
)
verbose and logger.info(
f"Date part filter: {date_part_type} IN {date_part_values}."
)
flag_condition = f"CASE WHEN {date_condition} THEN '1' ELSE NULL END"
select_parts = []
for col in all_columns.keys():
if col != flag_column_name:
sanitized_col_iter = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col_iter}"')
sanitized_flag_col = _sanitize_identifier(flag_column_name)
select_parts.append(f'{flag_condition} AS "{sanitized_flag_col}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(f"Creating flag column '{flag_column_name}'.")
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(f"Converted result to pandas DataFrame.")
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(f"Converted result to Polars DataFrame.")
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(f"Converted result to Arrow Table.")
else:
verbose and logger.error(f"Unsupported output format: {output_format}")
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(f"Flagging completed successfully.")
except Exception as e:
verbose and logger.error(f"Error during flagging: {str(e)}")
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- pyarrow
- polars[pyarrow]
- duckdb