Extract Table

Extract data from a database table with filtering, sorting, and column selection.

Extract Table


Processing

Extracts data from a database table with filtering, sorting, and column selection. The function supports multiple database types including PostgreSQL, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, and SQLite. It provides basic filtering with Excel-like expressions supporting parentheses, quoted column names, and logical operators.

Inputs

table name
The name of the database table to extract data from
filter expression (optional)
Excel-like filter expression for filtering rows with support for operators like =, <>, >, <, >=, <=, CONTAINS, STARTS WITH, ENDS WITH, IS NULL, IS NOT NULL, and logical operators AND, OR with parentheses
database name (optional)
The name of the database containing the target table. If not provided, will use environment variable
host (optional)
The database server hostname or IP address (ignored for SQLite). If not provided, will use environment variable
port (optional)
The database server port number (ignored for SQLite). If not provided, will use environment variable
username (optional)
The database username for authentication (ignored for SQLite). If not provided, will use environment variable
password (optional)
The database password for authentication (ignored for SQLite). If not provided, will use environment variable

Inputs Types

Input Types
table name Str
filter expression Str
database name Str
host Str
port Int
username Str
password SecretStr
options Dict

You can check the list of supported types here: Available Type Hints.

Outputs

df
DataFrame containing the extracted table data with applied filters, sorting, and column selection

Outputs Types

Output Types
df DataFrame

You can check the list of supported types here: Available Type Hints.

Options

The Extract Table brick contains some changeable options:

Table Name
Name of the database table to extract data from (maximum 250 characters)
Database type
Type of database to connect to with choices: Postgres, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, SQLite (default: SQLite)
DataFrame type
Output DataFrame format with choices: Pandas, Polars (default: Pandas)
Columns to Extract
List of specific column names to extract from the table (leave empty to extract all columns)
Limit Number of Rows
Toggle to enable row limiting (default: False)
Row Limit
Maximum number of rows to return when limiting is enabled (default: 1000, range: 1-1000000)
Sort By
List of column names with optional direction (ASC/DESC) for sorting results
Filter Expression
Excel-like filter expression for row filtering with support for complex conditions (maximum 2000 characters)
Env variables prefix
Prefix for environment variable names when loading database connection parameters (maximum 150 characters)
Verbose
Enable verbose logging during extraction process (default: True)

Environment Variables Configuration

If you don't provide database connection parameters as brick inputs, you can configure them using environment variables.

Step 1 — Create an environment file

Create an environment file in the flow project folder. You can name it anything you like (e.g. .env, db.env, or mysettings.env).

Example: db.env

# Example database environment configuration
DB_NAME=mydatabase
DB_HOST=localhost
DB_PORT=5432
DB_USERNAME=myuser
DB_PASSWORD=mypassword

💡 For SQLite, only DB_NAME is required (e.g., DB_NAME=mydb.sqlite).

Step 2 — Reference the env file in CODED FLOWS

Inside the CODED FLOWS app, use the LOAD ENV control brick to load the file you created.

  • Specify your env filename (db.env for example or whichever name you chose).
  • All defined variables become available to other any bricks in the connected graph flow.

Step 3 — Use with prefix (optional)

If you enable the "Env variables prefix" option in the LOAD ENV brick, the system will automatically prepend the prefix and an underscore to each variable.

Example with prefix MYAPP:

MYAPP_DB_NAME=mydatabase
MYAPP_DB_HOST=localhost
MYAPP_DB_PORT=5432
MYAPP_DB_USERNAME=myuser
MYAPP_DB_PASSWORD=mypassword

Bricks will then look for MYAPP_DB_NAME, MYAPP_DB_HOST, etc. instead of the default variable names.

Filtering Guide

The Filter Expression option supports Excel-like syntax for data filtering. You can combine multiple conditions using logical operators and parentheses for complex queries.

Basic Comparison Operators

Operator Description Example
= Equal to status = 'Active'
!= or <> Not equal to status != 'Cancelled'
> Greater than age > 25
>= Greater than or equal price >= 100
< Less than quantity < 10
<= Less than or equal discount <= 0.5

Case-Sensitive Text-Based Operators

Operator Description Example
CONTAINS Column contains the specified text email CONTAINS '@gmail'
STARTS WITH Column starts with the specified text name STARTS WITH 'John'
ENDS WITH Column ends with the specified text filename ENDS WITH '.pdf'

Case-Insensitive Text-Based Operators

Operator Description Example
ICONTAINS Column contains the specified text email CONTAINS '@gmail'
ISTARTS WITH Column starts with the specified text name STARTS WITH 'john'
IENDS WITH Column ends with the specified text filename ENDS WITH '.pdf'

Null Value Operators

Operator Description Example
IS NULL Column is empty or null phone IS NULL
IS NOT NULL Column is not empty email IS NOT NULL

Logical Operators

Operator Description Example
AND Both conditions must be true age > 25 AND city = 'New York'
OR At least one condition must be true status = 'Active' OR status = 'Pending'

Working with Quoted Column Names

If your column names contain spaces or special characters, wrap them in quotes:

  • Double quotes: "column name" = 'value'
  • Square brackets: [column name] = 'value'
  • Backticks: `column name` = 'value'

Using Parentheses for Complex Logic

Parentheses control the order of operations in complex filters:

(status = 'Active' OR status = 'Pending') AND age > 25

This finds records where the status is either Active OR Pending, AND the age is greater than 25.

Filter Examples

Example 1: Simple text filter

department = 'Sales'

Example 2: Range filter with AND

salary >= 50000 AND salary <= 100000

Example 3: Multiple text conditions with OR

city = 'New York' OR city = 'Los Angeles' OR city = 'Chicago'

Example 4: Complex condition with parentheses

(department = 'Sales' OR department = 'Marketing') AND hire_date >= '2020-01-01'

Example 5: Text search with null check

email CONTAINS '@company.com' AND phone IS NOT NULL

Example 6: Date range with text filter

order_date STARTS WITH '2024' AND status != 'Cancelled'

Important Notes for Filtering

  • Text values must be wrapped in single quotes: 'value'
  • Numbers should not be quoted: age > 25
  • Column names are case-sensitive in some databases
  • Filter keywords (AND, OR, CONTAINS, etc.) are case-insensitive
  • Empty values should use IS NULL or IS NOT NULL

Sorting Guide

The Sort By option allows you to order your results by one or multiple columns with flexible direction control.

Sort Direction Options

  • ASC or ASCENDING: Sorts from smallest to largest (A→Z, 1→9, oldest→newest)
  • DESC or DESCENDING: Sorts from largest to smallest (Z→A, 9→1, newest→oldest)

Single Column Sorting

To sort by a single column, add the column name to the Sort By list:

  • Column name: created_date
  • Direction: ASC by default

Multiple Column Sorting

For multi-column sorting, the order in the Sort By list determines priority:

Example: Sort by department, then by salary (highest first), then by name

Sort By list:

  1. department
  2. salary DESC
  3. name ASC

This will:

  1. First group all records by department (alphabetically)
  2. Within each department, sort by salary (highest first)
  3. Within the same salary, sort by name (alphabetically)

Sorting with Quoted Column Names

If column names contain spaces or special characters, use the same quoting as in filters:

  • "Last Name" DESC
  • [Order Date] ASC
  • `Employee ID` ASC

Sort Direction Syntax

You can specify direction by adding it after column name

salary DESC
created_date ASC

Sorting Examples

Example 1: Recent orders first

order_date DESC

Example 2: Alphabetical by customer, then by order amount (highest first)

customer_name ASC
order_amount DESC

Example 3: Priority-based sorting

priority DESC
created_date DESC
customer_name ASC

Example 4: Complex business logic sorting

status ASC
due_date ASC
amount DESC

This sorts by:

  1. Status (Active, Completed, Pending - alphabetically)
  2. Due date (earliest first within each status)
  3. Amount (highest first within same status and date)

Important Notes for Sorting

  • Sort order matters: First column in the list has highest priority
  • Default direction: If no direction specified, ASC is usually assumed
  • Null values: Usually sorted at the beginning (ASC) or end (DESC) depending on database
  • Case sensitivity: Depends on database configuration
  • Performance: Sorting large datasets may take longer

Combining Filtering and Sorting

You can use both filtering and sorting together for powerful data extraction:

Example: Get active customers from 2024, sorted by registration date (newest first)

  • Filter: status = 'Active' AND registration_date STARTS WITH '2024'
  • Sort By: registration_date DESC

This combination first filters the data to only active customers from 2024, then sorts the results by registration date with the newest customers appearing first.

import logging
import sqlalchemy as sa
from sqlalchemy import select, Table, MetaData, and_, or_, desc, asc
import pandas as pd
import polars as pl
from os import getenv
from coded_flows.types import Str, SecretStr, Int, DataFrame
import re
from typing import Any, List, Dict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    """Return the first non-None value."""
    return next((v for v in values if v is not None), None)


class FilterTokenizer:
    """Tokenizer for filter expressions that handles quoted column names and operators."""

    def __init__(self, expression: str):
        self.expression = expression.strip()
        self.tokens = []
        self.position = 0

    def tokenize(self) -> List[Dict[str, Any]]:
        """Tokenize the filter expression."""
        self.tokens = []
        self.position = 0
        while self.position < len(self.expression):
            self._skip_whitespace()
            if self.position >= len(self.expression):
                break
            if self._match_parentheses():
                continue
            elif self._match_quoted_identifier():
                continue
            elif self._match_operators():
                continue
            elif self._match_logical_operators():
                continue
            elif self._match_quoted_value():
                continue
            elif self._match_unquoted_value():
                continue
            else:
                self.position += 1
        return self.tokens

    def _skip_whitespace(self):
        """Skip whitespace characters."""
        while (
            self.position < len(self.expression)
            and self.expression[self.position].isspace()
        ):
            self.position += 1

    def _match_parentheses(self) -> bool:
        """Match opening or closing parentheses."""
        if (
            self.position < len(self.expression)
            and self.expression[self.position] in "()"
        ):
            char = self.expression[self.position]
            self.tokens.append(
                {"type": "LPAREN" if char == "(" else "RPAREN", "value": char}
            )
            self.position += 1
            return True
        return False

    def _match_quoted_identifier(self) -> bool:
        """Match quoted column names (double quotes, square brackets, or backticks)."""
        quote_chars = {'"': '"', "[": "]", "`": "`"}
        if self.position >= len(self.expression):
            return False
        start_char = self.expression[self.position]
        if start_char not in quote_chars:
            return False
        end_char = quote_chars[start_char]
        start_pos = self.position + 1
        end_pos = start_pos
        while end_pos < len(self.expression):
            if self.expression[end_pos] == end_char:
                if (
                    end_pos + 1 < len(self.expression)
                    and self.expression[end_pos + 1] == end_char
                ):
                    end_pos += 2
                    continue
                else:
                    break
            end_pos += 1
        if end_pos >= len(self.expression):
            return False
        value = self.expression[start_pos:end_pos].replace(
            end_char + end_char, end_char
        )
        self.tokens.append({"type": "COLUMN", "value": value})
        self.position = end_pos + 1
        return True

    def _match_logical_operators(self) -> bool:
        """Match AND/OR operators."""
        remaining = self.expression[self.position :].upper()
        if remaining.startswith("AND") and (
            len(remaining) == 3 or not remaining[3].isalnum()
        ):
            self.tokens.append({"type": "AND", "value": "AND"})
            self.position += 3
            return True
        if remaining.startswith("OR") and (
            len(remaining) == 2 or not remaining[2].isalnum()
        ):
            self.tokens.append({"type": "OR", "value": "OR"})
            self.position += 2
            return True
        return False

    def _match_operators(self) -> bool:
        """Match comparison operators and special operators."""
        remaining = self.expression[self.position :].upper()
        operators = [
            ("ICONTAINS", "ICONTAINS"),
            ("ISTARTS WITH", "ISTARTS_WITH"),
            ("IENDS WITH", "IENDS_WITH"),
            ("STARTS WITH", "STARTS_WITH"),
            ("ENDS WITH", "ENDS_WITH"),
            ("IS NOT NULL", "IS_NOT_NULL"),
            ("IS NULL", "IS_NULL"),
            ("CONTAINS", "CONTAINS"),
            ("<=", "LTE"),
            (">=", "GTE"),
            ("<>", "NE"),
            ("!=", "NE"),
            ("<", "LT"),
            (">", "GT"),
            ("=", "EQ"),
        ]
        for op_text, op_type in operators:
            if remaining.startswith(op_text):
                next_pos = self.position + len(op_text)
                if (
                    next_pos >= len(self.expression)
                    or not self.expression[next_pos].isalnum()
                ):
                    self.tokens.append({"type": op_type, "value": op_text})
                    self.position = next_pos
                    return True
        return False

    def _match_quoted_value(self) -> bool:
        """Match quoted string values."""
        if self.position >= len(self.expression):
            return False
        quote_char = self.expression[self.position]
        if quote_char not in ["'", '"']:
            return False
        start_pos = self.position + 1
        end_pos = start_pos
        while end_pos < len(self.expression):
            if self.expression[end_pos] == quote_char:
                if (
                    end_pos + 1 < len(self.expression)
                    and self.expression[end_pos + 1] == quote_char
                ):
                    end_pos += 2
                    continue
                else:
                    break
            end_pos += 1
        if end_pos >= len(self.expression):
            return False
        value = self.expression[start_pos:end_pos].replace(
            quote_char + quote_char, quote_char
        )
        self.tokens.append({"type": "VALUE", "value": value})
        self.position = end_pos + 1
        return True

    def _match_unquoted_value(self) -> bool:
        """Match unquoted values (numbers or identifiers)."""
        start_pos = self.position
        while (
            self.position < len(self.expression)
            and (not self.expression[self.position].isspace())
            and (self.expression[self.position] not in "()")
        ):
            remaining = self.expression[self.position :].upper()
            if (
                remaining.startswith("AND ")
                or remaining.startswith("OR ")
                or remaining.startswith("<=")
                or remaining.startswith(">=")
                or remaining.startswith("<>")
                or remaining.startswith("!=")
                or remaining.startswith("IS ")
                or remaining.startswith("CONTAINS ")
                or remaining.startswith("STARTS ")
                or remaining.startswith("ENDS ")
                or (self.expression[self.position] in "<>=")
            ):
                break
            self.position += 1
        if self.position > start_pos:
            value = self.expression[start_pos : self.position]
            token_type = "VALUE" if self._is_numeric(value) else "COLUMN"
            self.tokens.append({"type": token_type, "value": value})
            return True
        return False

    def _is_numeric(self, value: str) -> bool:
        """Check if a string represents a number."""
        try:
            float(value)
            return True
        except ValueError:
            return False


class FilterParser:
    """Parser for filter expressions with support for parentheses."""

    def __init__(self, tokens: List[Dict[str, Any]], table: Table, table_name: str):
        self.tokens = tokens
        self.table = table
        self.table_name = table_name
        self.position = 0

    def parse(self) -> Any:
        """Parse the tokens into SQLAlchemy conditions."""
        if not self.tokens:
            return None
        result = self._parse_or()
        return result

    def _parse_or(self) -> Any:
        """Parse OR expressions (lowest precedence)."""
        left = self._parse_and()
        while self._current_token_is("OR"):
            self.position += 1
            right = self._parse_and()
            if left is None:
                left = right
            elif right is not None:
                left = or_(left, right)
        return left

    def _parse_and(self) -> Any:
        """Parse AND expressions (higher precedence than OR)."""
        left = self._parse_condition()
        while self._current_token_is("AND"):
            self.position += 1
            right = self._parse_condition()
            if left is None:
                left = right
            elif right is not None:
                left = and_(left, right)
        return left

    def _parse_condition(self) -> Any:
        """Parse individual conditions or parenthesized expressions."""
        if self._current_token_is("LPAREN"):
            self.position += 1
            result = self._parse_or()
            if not self._current_token_is("RPAREN"):
                raise ValueError("Missing closing parenthesis in filter expression")
            self.position += 1
            return result
        return self._parse_simple_condition()

    def _parse_simple_condition(self) -> Any:
        """Parse a simple condition (column operator value)."""
        if self.position >= len(self.tokens):
            return None
        column_token = self.tokens[self.position]
        if column_token["type"] != "COLUMN":
            raise ValueError(f"Expected column name, got {column_token['value']}")
        column_name = column_token["value"]
        if column_name not in self.table.c:
            raise ValueError(
                f"Column '{column_name}' not found in table '{self.table_name}'"
            )
        column = self.table.c[column_name]
        self.position += 1
        if self.position >= len(self.tokens):
            raise ValueError("Expected operator after column name")
        op_token = self.tokens[self.position]
        self.position += 1
        if op_token["type"] == "IS_NULL":
            return column.is_(None)
        elif op_token["type"] == "IS_NOT_NULL":
            return column.isnot(None)
        if self.position >= len(self.tokens):
            raise ValueError("Expected value after operator")
        value_token = self.tokens[self.position]
        if value_token["type"] != "VALUE":
            raise ValueError(f"Expected value, got {value_token['value']}")
        value = self._convert_value(value_token["value"])
        self.position += 1
        if op_token["type"] == "EQ":
            return column == value
        elif op_token["type"] == "NE":
            return column != value
        elif op_token["type"] == "LT":
            return column < value
        elif op_token["type"] == "GT":
            return column > value
        elif op_token["type"] == "LTE":
            return column <= value
        elif op_token["type"] == "GTE":
            return column >= value
        elif op_token["type"] == "CONTAINS":
            return column.like(f"%{value}%")
        elif op_token["type"] == "STARTS_WITH":
            return column.like(f"{value}%")
        elif op_token["type"] == "ENDS_WITH":
            return column.like(f"%{value}")
        elif op_token["type"] == "ICONTAINS":
            return column.ilike(f"%{value}%")
        elif op_token["type"] == "ISTARTS_WITH":
            return column.ilike(f"{value}%")
        elif op_token["type"] == "IENDS_WITH":
            return column.ilike(f"%{value}")
        else:
            raise ValueError(f"Unknown operator: {op_token['type']}")

    def _current_token_is(self, token_type: str) -> bool:
        """Check if current token is of given type."""
        return (
            self.position < len(self.tokens)
            and self.tokens[self.position]["type"] == token_type
        )

    def _convert_value(self, value: str) -> Any:
        """Convert string value to appropriate Python type."""
        try:
            return int(value)
        except ValueError:
            pass
        try:
            return float(value)
        except ValueError:
            pass
        return value


def _parse_excel_filter(filter_expr: str, table: Table, table_name: str) -> Any:
    """
    Parse Excel-like filter expressions into SQLAlchemy conditions with robust handling.
    Supports parentheses, quoted column names, and proper precedence.

    Supported operators:
    - Comparison: =, !=, <>, <, >, <=, >=
    - Text (case-sensitive): CONTAINS, STARTS WITH, ENDS WITH
    - Text (case-insensitive): ICONTAINS, ISTARTS WITH, IENDS WITH
    - Null checks: IS NULL, IS NOT NULL
    - Logical: AND, OR
    - Parentheses for grouping: (, )

    Examples:
    - name = 'John'
    - age > 25 AND status = 'Active'
    - description ICONTAINS 'important'
    - title ISTARTS WITH 'mr'
    """
    if not filter_expr or filter_expr.strip() == "":
        return None
    try:
        tokenizer = FilterTokenizer(filter_expr)
        tokens = tokenizer.tokenize()
        if not tokens:
            return None
        parser = FilterParser(tokens, table, table_name)
        return parser.parse()
    except Exception as e:
        raise ValueError(f"Error parsing filter expression '{filter_expr}': {str(e)}")


def _parse_sort_column(sort_item: str, available_columns: set) -> tuple:
    """
    Parse a sort column specification, handling quoted names and directions.
    Supports double quotes, square brackets, and backticks for column names.
    Returns (column_name, direction) tuple.

    Examples:
    - "Last Name" DESC
    - [Order Date] ASC
    - `Employee ID` ASC
    """
    sort_item = sort_item.strip()
    if not sort_item:
        return (None, None)
    direction = "ASC"
    col_name = sort_item
    upper_item = sort_item.upper()
    if upper_item.endswith(" DESC"):
        direction = "DESC"
        col_name = sort_item[:-5].strip()
    elif upper_item.endswith(" ASC"):
        direction = "ASC"
        col_name = sort_item[:-4].strip()
    original_col_name = col_name
    if col_name.startswith('"') and col_name.endswith('"'):
        col_name = col_name[1:-1].replace('""', '"')
    elif col_name.startswith("[") and col_name.endswith("]"):
        col_name = col_name[1:-1]
    elif col_name.startswith("`") and col_name.endswith("`"):
        col_name = col_name[1:-1].replace("``", "`")
    if col_name not in available_columns:
        raise ValueError(
            f"Column '{col_name}' in 'Sort By' not found in table. Original sort specification: '{original_col_name} {direction}'"
        )
    return (col_name, direction)


def extract_table(
    table_name: Str = None,
    filter_expression: Str = None,
    database_name: Str = None,
    host: Str = None,
    port: Int = None,
    username: Str = None,
    password: SecretStr = None,
    options: Dict = None,
) -> DataFrame:
    brick_display_name = "Extract Table"
    df = None
    options = options or {}
    verbose = options.get("verbose", True)
    db_type = options.get("db_type", "SQLite")
    df_type = options.get("df_type", "Pandas")
    env_prefix = options.get("env_prefix", "")
    op_table_name = options.get("table_name", "")
    columns_to_extract = options.get("columns_to_extract", [])
    apply_limit = options.get("apply_limit", False)
    limit_value = options.get("limit_value", 1000)
    sort_by = options.get("sort_by", [])
    op_filter_expression = options.get("filter_expression", "")
    table_name = _coalesce(table_name, op_table_name)
    filter_expression = _coalesce(filter_expression, op_filter_expression)
    if not table_name:
        raise ValueError("Table name is required")
    env_prefix = env_prefix + "_" if env_prefix.strip() else ""
    try:
        database_name = _coalesce(database_name, getenv(f"{env_prefix}DB_NAME"))
        if db_type != "SQLite":
            host = _coalesce(host, getenv(f"{env_prefix}DB_HOST"))
            port = _coalesce(port, getenv(f"{env_prefix}DB_PORT"))
            username = _coalesce(username, getenv(f"{env_prefix}DB_USERNAME"))
            password = _coalesce(password, getenv(f"{env_prefix}DB_PASSWORD"))
        if not database_name or (
            db_type != "SQLite"
            and (not host or not port or (not username) or (not password))
        ):
            raise ValueError("Missing database configuration")
        if hasattr(password, "get_secret_value"):
            password = password.get_secret_value()
        else:
            password = str(password) if password else None
        if port:
            port = int(port)
            if not 1 <= port <= 65535:
                raise ValueError(f"Port must be between 1 and 65535, got {port}")
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Configuration error encountered"
        )
        raise
    if verbose:
        logger.info(
            f"[{brick_display_name}] Extracting from table '{table_name}' in {db_type} database."
        )
    driver_map = {
        "Postgres": "postgresql+psycopg",
        "MySQL": "mysql+pymysql",
        "MariaDB": "mariadb+pymysql",
        "Oracle Database": "oracle+oracledb",
        "Microsoft SQL Server": "mssql+pymssql",
        "SQLite": "sqlite",
    }
    url = (
        f"{driver_map[db_type]}:///{database_name}"
        if db_type == "SQLite"
        else f"{driver_map[db_type]}://{username}:{password}@{host}:{port}/{database_name}"
    )
    try:
        engine = sa.create_engine(url)
        with engine.connect() as conn:
            metadata = MetaData()
            table = Table(table_name, metadata, autoload_with=engine)
            available_columns = {c.name for c in table.columns}
            if columns_to_extract:
                missing_columns = []
                for col in columns_to_extract:
                    clean_col = col.strip()
                    if clean_col.startswith('"') and clean_col.endswith('"'):
                        clean_col = clean_col[1:-1].replace('""', '"')
                    elif clean_col.startswith("[") and clean_col.endswith("]"):
                        clean_col = clean_col[1:-1]
                    elif clean_col.startswith("`") and clean_col.endswith("`"):
                        clean_col = clean_col[1:-1].replace("``", "`")
                    if clean_col not in available_columns:
                        missing_columns.append(col)
                if missing_columns:
                    error_msg = f"The following columns were not found in table '{table_name}': {', '.join(missing_columns)}."
                    verbose and logger.info(f"[{brick_display_name}] {error_msg}")
                    raise ValueError(error_msg)
                column_list = []
                for col in columns_to_extract:
                    clean_col = col.strip()
                    if clean_col.startswith('"') and clean_col.endswith('"'):
                        clean_col = clean_col[1:-1].replace('""', '"')
                    elif clean_col.startswith("[") and clean_col.endswith("]"):
                        clean_col = clean_col[1:-1]
                    elif clean_col.startswith("`") and clean_col.endswith("`"):
                        clean_col = clean_col[1:-1].replace("``", "`")
                    column_list.append(table.c[clean_col])
                query = select(*column_list)
            else:
                query = select(table)
            if filter_expression:
                try:
                    filter_condition = _parse_excel_filter(
                        filter_expression, table, table_name
                    )
                    if filter_condition is not None:
                        query = query.where(filter_condition)
                        verbose and logger.info(
                            f"[{brick_display_name}] Filter applied: {filter_expression}"
                        )
                except Exception as e:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Failed to apply filter: {str(e)}"
                    )
                    raise
            if sort_by:
                order_by_clauses = []
                for item in sort_by:
                    try:
                        (col_name, direction) = _parse_sort_column(
                            item, available_columns
                        )
                        if col_name:
                            order_by_clauses.append(
                                desc(table.c[col_name])
                                if direction == "DESC"
                                else asc(table.c[col_name])
                            )
                            verbose and logger.info(
                                f"[{brick_display_name}] Sorting by '{col_name}' ({direction})"
                            )
                    except Exception as e:
                        verbose and logger.warning(
                            f"[{brick_display_name}] Error parsing sort column '{item}': {str(e)}"
                        )
                        raise
                if order_by_clauses:
                    query = query.order_by(*order_by_clauses)
            if apply_limit and limit_value > 0:
                query = query.limit(limit_value)
                verbose and logger.info(
                    f"[{brick_display_name}] Limiting to {limit_value} rows"
                )
            if df_type.lower() == "pandas":
                df = pd.read_sql(query, conn)
            elif df_type.lower() == "polars":
                compiled_query = query.compile(compile_kwargs={"literal_binds": True})
                df = pl.read_database(str(compiled_query), connection=conn)
            else:
                raise ValueError("df_type must be 'Pandas' or 'Polars'")
        engine.dispose()
        verbose and logger.info(
            f"[{brick_display_name}] Table extracted successfully. Rows returned: {(len(df) if df is not None else 0)}"
        )
    except sa.exc.NoSuchTableError:
        verbose and logger.error(
            f"[{brick_display_name}] Table '{table_name}' not found in the database."
        )
        raise
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during table extraction: {str(e)}"
        )
        raise
    return df

Brick Info

version v0.1.0
python 3.10, 3.11, 3.12, 3.13
requirements
  • cryptography
  • sqlalchemy
  • psycopg[binary]
  • pymysql
  • pymssql
  • pandas
  • polars
  • oracledb