Filter Rows by Date
Filter rows from the dataset using a date filter defined by a fixed date range, relative date range, or matching date part.
Filter Rows by Date
Processing
Filter rows from the dataset using a date filter defined by a specific criteria applied to a designated date column. Filtering criteria can be set using a fixed date range (start/end date), a relative date range (e.g., last 7 days), or by matching specific extracted date parts (e.g., month number, year). The function supports various actions, including keeping matching rows, removing matching rows, or clearing the contents of the date column cells based on the match result.
Inputs
- data
- The input dataset (DataFrame or Arrow Table) to be filtered.
- action (optional)
- Defines how to handle rows/cells that match the date filter like keeping matching rows (
keep_matching_rows), removing matching rows (remove_matching_rows), clearing matching cells in the date column (clear_matching_cells), or clearing non-matching cells in the date column (clear_non_matching_cells). - date column (optional)
- The name of the column containing date values against which the filter will be applied.
- filter type (optional)
- Specifies the method used to define the filter: fixed
date_range,relative_range, or matchingdate_part. - start date (optional)
- The starting boundary for a fixed date range filter (format YYYY-MM-DD).
- end date (optional)
- The ending boundary for a fixed date range filter (format YYYY-MM-DD).
- relative type (optional)
- Defines the direction and context of the relative range (e.g.,
last_n,next_n,current). - relative value (optional)
- The integer value N used in a relative range filter (e.g., 7 for 7 units).
- relative unit (optional)
- The unit of time for the relative range (e.g.,
days,weeks,years). - date part type (optional)
- The specific date part to extract for matching (e.g.,
month,year,day_of_week). - date part values (optional)
- A list of values that the extracted date part must match.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
action |
Str |
date column |
Str |
filter type |
Str |
start date |
Str |
end date |
Str |
relative type |
Str |
relative value |
Int |
relative unit |
Str |
date part type |
Str |
date part values |
List |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The dataset after applying the specified date filter and action. The format matches the configuration specified in the
output_formatoption.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Filter Rows by Date brick contains some changeable options:
- Action
- Defines how to handle rows/cells that match the filter criteria. Choices include keeping matching rows (
keep_matching_rows), removing matching rows (remove_matching_rows), clearing matching cells in the date column (clear_matching_cells), or clearing non-matching cells in the date column (clear_non_matching_cells). - Date Column
- The name of the column containing the date values to filter against.
- Filter On
- Specifies the method used for defining the filter:
date_range(fixed start/end),relative_range(e.g., last N days), ordate_part(matching month, year, etc.). - Start Date (YYYY-MM-DD)
- The fixed starting date boundary for the
date_rangefilter. - End Date (YYYY-MM-DD)
- The fixed ending date boundary for the
date_rangefilter. - Relative Type
- Defines the context for the
relative_rangefilter, such aslast_n,next_n,currentperiod, oruntil_now. - Relative Value (N)
- The numerical value N used when defining
last_nornext_nrelative ranges. - Relative Unit
- The unit of time corresponding to the Relative Value (e.g.,
days,weeks,months,years). - Date Part Type
- The specific component of the date to extract for the
date_partfilter (e.g.,year,month,day_of_week). - Date Part Values
- A list of numerical values (as strings) that the extracted Date Part must match.
- Output Format
- Specifies the desired format for the resulting dataset:
pandasDataFrame,polarsDataFrame, orarrowTable. - Verbose
- If enabled, detailed logs about the filtering process, query construction, and conversion steps will be displayed.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from datetime import datetime, date
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Int, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
"""
return identifier.replace('"', '""')
def _build_date_range_condition(column_expr, start_date, end_date):
"""Build SQL condition for fixed date range filter."""
conditions = []
date_expr = f"TRY_CAST({column_expr} AS DATE)"
conditions.append(f"{date_expr} IS NOT NULL")
if start_date:
conditions.append(f"{date_expr} >= DATE '{start_date}'")
if end_date:
conditions.append(f"{date_expr} <= DATE '{end_date}'")
return f"({' AND '.join(conditions)})"
def _build_relative_range_condition(
column_expr, relative_type, relative_value, relative_unit
):
"""Build SQL condition for relative date range filter."""
date_expr = f"TRY_CAST({column_expr} AS DATE)"
current_date = "CURRENT_DATE"
if relative_unit == "days":
interval = f"INTERVAL '{relative_value}' DAY"
elif relative_unit == "weeks":
interval = f"INTERVAL '{relative_value}' WEEK"
elif relative_unit == "months":
interval = f"INTERVAL '{relative_value}' MONTH"
elif relative_unit == "quarters":
interval = f"INTERVAL '{relative_value * 3}' MONTH"
elif relative_unit == "years":
interval = f"INTERVAL '{relative_value}' YEAR"
else:
interval = f"INTERVAL '{relative_value}' DAY"
if relative_type == "last_n":
condition = f"({date_expr} IS NOT NULL AND {date_expr} >= {current_date} - {interval} AND {date_expr} <= {current_date})"
elif relative_type == "next_n":
condition = f"({date_expr} IS NOT NULL AND {date_expr} >= {current_date} AND {date_expr} <= {current_date} + {interval})"
elif relative_type == "current":
if relative_unit == "days":
condition = f"({date_expr} IS NOT NULL AND {date_expr} = {current_date})"
elif relative_unit == "weeks":
condition = f"({date_expr} IS NOT NULL AND date_trunc('week', {date_expr}) = date_trunc('week', {current_date}))"
elif relative_unit == "months":
condition = f"({date_expr} IS NOT NULL AND date_trunc('month', {date_expr}) = date_trunc('month', {current_date}))"
elif relative_unit == "quarters":
condition = f"({date_expr} IS NOT NULL AND date_trunc('quarter', {date_expr}) = date_trunc('quarter', {current_date}))"
elif relative_unit == "years":
condition = f"({date_expr} IS NOT NULL AND date_trunc('year', {date_expr}) = date_trunc('year', {current_date}))"
else:
condition = f"({date_expr} IS NOT NULL AND {date_expr} = {current_date})"
elif relative_type == "until_now":
condition = f"({date_expr} IS NOT NULL AND {date_expr} <= {current_date})"
else:
condition = f"{date_expr} IS NOT NULL"
return condition
def _build_date_part_condition(column_expr, date_part_type, date_part_values):
"""Build SQL condition for date part filter."""
if not date_part_values:
return "FALSE"
date_expr = f"TRY_CAST({column_expr} AS DATE)"
if date_part_type == "year":
part_expr = f"EXTRACT(YEAR FROM {date_expr})"
elif date_part_type == "quarter":
part_expr = f"EXTRACT(QUARTER FROM {date_expr})"
elif date_part_type == "month":
part_expr = f"EXTRACT(MONTH FROM {date_expr})"
elif date_part_type == "day":
part_expr = f"EXTRACT(DAY FROM {date_expr})"
elif date_part_type == "day_of_week":
part_expr = f"EXTRACT(DOW FROM {date_expr})"
elif date_part_type == "day_of_year":
part_expr = f"EXTRACT(DOY FROM {date_expr})"
elif date_part_type == "week":
part_expr = f"EXTRACT(WEEK FROM {date_expr})"
else:
part_expr = f"EXTRACT(YEAR FROM {date_expr})"
try:
int_values = [int(v) for v in date_part_values]
values_list = ", ".join((str(v) for v in int_values))
condition = f"({date_expr} IS NOT NULL AND {part_expr} IN ({values_list}))"
except (ValueError, TypeError):
condition = "FALSE"
return condition
def filter_rows_by_date(
data: Union[DataFrame, ArrowTable],
action: Str = None,
date_column: Str = None,
filter_type: Str = None,
start_date: Str = None,
end_date: Str = None,
relative_type: Str = None,
relative_value: Int = None,
relative_unit: Str = None,
date_part_type: Str = None,
date_part_values: List = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Filter Rows by Date"
options = options or {}
verbose = options.get("verbose", True)
action = _coalesce(action, options.get("action", "keep_matching_rows"))
date_column = _coalesce(date_column, options.get("date_column", ""))
filter_type = _coalesce(filter_type, options.get("filter_type", "date_range"))
start_date = _coalesce(start_date, options.get("start_date", ""))
end_date = _coalesce(end_date, options.get("end_date", ""))
relative_type = _coalesce(relative_type, options.get("relative_type", "last_n"))
relative_value = _coalesce(relative_value, options.get("relative_value", 7))
relative_unit = _coalesce(relative_unit, options.get("relative_unit", "days"))
date_part_type = _coalesce(date_part_type, options.get("date_part_type", "month"))
date_part_values = _coalesce(date_part_values, options.get("date_part_values", []))
output_format = options.get("output_format", "pandas")
result = None
conn = None
if not date_column:
verbose and logger.error(
f"[{brick_display_name}] Date column must be specified!"
)
raise ValueError("Date column must be specified!")
valid_actions = [
"keep_matching_rows",
"remove_matching_rows",
"clear_matching_cells",
"clear_non_matching_cells",
]
if action not in valid_actions:
verbose and logger.error(f"[{brick_display_name}] Invalid action: {action}.")
raise ValueError(f"Action must be one of {valid_actions}")
valid_filter_types = ["date_range", "relative_range", "date_part"]
if filter_type not in valid_filter_types:
verbose and logger.error(
f"[{brick_display_name}] Invalid filter type: {filter_type}."
)
raise ValueError(f"Filter type must be one of {valid_filter_types}")
if filter_type == "date_range":
if not start_date and (not end_date):
verbose and logger.warning(
f"[{brick_display_name}] No date range specified. Returning data unchanged."
)
result = data
if filter_type == "date_part":
if not isinstance(date_part_values, list):
verbose and logger.error(
f"[{brick_display_name}] Invalid date_part_values format! Expected a list."
)
raise ValueError("Date part values must be provided as a list!")
if not date_part_values:
verbose and logger.warning(
f"[{brick_display_name}] No date part values specified. Returning data unchanged."
)
result = data
if result is None:
try:
verbose and logger.info(
f"[{brick_display_name}] Starting filter with action '{action}' and filter type '{filter_type}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
if date_column not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Date column '{date_column}' not found in data."
)
raise ValueError(f"Date column '{date_column}' not found in data.")
verbose and logger.info(
f"[{brick_display_name}] Using date column: '{date_column}' (type: {all_columns[date_column]})."
)
sanitized_col = _sanitize_identifier(date_column)
col_expr = f'"{sanitized_col}"'
if filter_type == "date_range":
date_condition = _build_date_range_condition(
col_expr, start_date, end_date
)
verbose and logger.info(
f"[{brick_display_name}] Date range filter: [{start_date or 'unbounded'}, {end_date or 'unbounded'}]."
)
elif filter_type == "relative_range":
date_condition = _build_relative_range_condition(
col_expr, relative_type, relative_value, relative_unit
)
verbose and logger.info(
f"[{brick_display_name}] Relative range filter: {relative_type} {relative_value} {relative_unit}."
)
elif filter_type == "date_part":
date_condition = _build_date_part_condition(
col_expr, date_part_type, date_part_values
)
verbose and logger.info(
f"[{brick_display_name}] Date part filter: {date_part_type} IN {date_part_values}."
)
if action == "keep_matching_rows":
query = f"SELECT * FROM input_table WHERE {date_condition}"
verbose and logger.info(
f"[{brick_display_name}] Keeping rows where date condition matches."
)
elif action == "remove_matching_rows":
query = f"SELECT * FROM input_table WHERE NOT ({date_condition})"
verbose and logger.info(
f"[{brick_display_name}] Removing rows where date condition matches."
)
elif action in ["clear_matching_cells", "clear_non_matching_cells"]:
select_parts = []
clear_on_match = action == "clear_matching_cells"
for col in all_columns.keys():
sanitized_col_iter = _sanitize_identifier(col)
if col == date_column:
if clear_on_match:
select_parts.append(
f'CASE WHEN {date_condition} THEN NULL ELSE "{sanitized_col_iter}" END AS "{sanitized_col_iter}"'
)
else:
select_parts.append(
f'CASE WHEN {date_condition} THEN "{sanitized_col_iter}" ELSE NULL END AS "{sanitized_col_iter}"'
)
else:
select_parts.append(f'"{sanitized_col_iter}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Clearing content of {('matching' if clear_on_match else 'non-matching')} cells."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Filter completed successfully."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during filtering: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow