Extract Table
Extract data from a database table with filtering, sorting, and column selection.
Extract Table
Processing
Extracts data from a database table with filtering, sorting, and column selection. The function supports multiple database types including PostgreSQL, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, and SQLite. It provides basic filtering with Excel-like expressions supporting parentheses, quoted column names, and logical operators.
Inputs
- table name
- The name of the database table to extract data from
- filter expression (optional)
- Excel-like filter expression for filtering rows with support for operators like =, <>, >, <, >=, <=, CONTAINS, STARTS WITH, ENDS WITH, IS NULL, IS NOT NULL, and logical operators AND, OR with parentheses
- database name (optional)
- The name of the database containing the target table. If not provided, will use environment variable
- host (optional)
- The database server hostname or IP address (ignored for SQLite). If not provided, will use environment variable
- port (optional)
- The database server port number (ignored for SQLite). If not provided, will use environment variable
- username (optional)
- The database username for authentication (ignored for SQLite). If not provided, will use environment variable
- password (optional)
- The database password for authentication (ignored for SQLite). If not provided, will use environment variable
Inputs Types
Input | Types |
---|---|
table name |
Str |
filter expression |
Str |
database name |
Str |
host |
Str |
port |
Int |
username |
Str |
password |
SecretStr |
options |
Dict |
You can check the list of supported types here: Available Type Hints.
Outputs
- df
- DataFrame containing the extracted table data with applied filters, sorting, and column selection
Outputs Types
Output | Types |
---|---|
df |
DataFrame |
You can check the list of supported types here: Available Type Hints.
Options
The Extract Table brick contains some changeable options:
- Table Name
- Name of the database table to extract data from (maximum 250 characters)
- Database type
- Type of database to connect to with choices: Postgres, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, SQLite (default: SQLite)
- DataFrame type
- Output DataFrame format with choices: Pandas, Polars (default: Pandas)
- Columns to Extract
- List of specific column names to extract from the table (leave empty to extract all columns)
- Limit Number of Rows
- Toggle to enable row limiting (default: False)
- Row Limit
- Maximum number of rows to return when limiting is enabled (default: 1000, range: 1-1000000)
- Sort By
- List of column names with optional direction (ASC/DESC) for sorting results
- Filter Expression
- Excel-like filter expression for row filtering with support for complex conditions (maximum 2000 characters)
- Env variables prefix
- Prefix for environment variable names when loading database connection parameters (maximum 150 characters)
- Verbose
- Enable verbose logging during extraction process (default: True)
Environment Variables Configuration
If you don't provide database connection parameters as brick inputs, you can configure them using environment variables.
Step 1 — Create an environment file
Create an environment file in the flow project folder. You can name it anything you like (e.g. .env
, db.env
, or mysettings.env
).
Example: db.env
# Example database environment configuration
DB_NAME=mydatabase
DB_HOST=localhost
DB_PORT=5432
DB_USERNAME=myuser
DB_PASSWORD=mypassword
💡 For SQLite, only
DB_NAME
is required (e.g.,DB_NAME=mydb.sqlite
).
Step 2 — Reference the env file in CODED FLOWS
Inside the CODED FLOWS app, use the LOAD ENV control brick to load the file you created.
- Specify your env filename (
db.env
for example or whichever name you chose). - All defined variables become available to other any bricks in the connected graph flow.
Step 3 — Use with prefix (optional)
If you enable the "Env variables prefix" option in the LOAD ENV brick, the system will automatically prepend the prefix and an underscore to each variable.
Example with prefix MYAPP
:
MYAPP_DB_NAME=mydatabase
MYAPP_DB_HOST=localhost
MYAPP_DB_PORT=5432
MYAPP_DB_USERNAME=myuser
MYAPP_DB_PASSWORD=mypassword
Bricks will then look for MYAPP_DB_NAME
, MYAPP_DB_HOST
, etc. instead of the default variable names.
Filtering Guide
The Filter Expression option supports Excel-like syntax for data filtering. You can combine multiple conditions using logical operators and parentheses for complex queries.
Basic Comparison Operators
Operator | Description | Example |
---|---|---|
= |
Equal to | status = 'Active' |
!= or <> |
Not equal to | status != 'Cancelled' |
> |
Greater than | age > 25 |
>= |
Greater than or equal | price >= 100 |
< |
Less than | quantity < 10 |
<= |
Less than or equal | discount <= 0.5 |
Case-Sensitive Text-Based Operators
Operator | Description | Example |
---|---|---|
CONTAINS |
Column contains the specified text | email CONTAINS '@gmail' |
STARTS WITH |
Column starts with the specified text | name STARTS WITH 'John' |
ENDS WITH |
Column ends with the specified text | filename ENDS WITH '.pdf' |
Case-Insensitive Text-Based Operators
Operator | Description | Example |
---|---|---|
ICONTAINS |
Column contains the specified text | email CONTAINS '@gmail' |
ISTARTS WITH |
Column starts with the specified text | name STARTS WITH 'john' |
IENDS WITH |
Column ends with the specified text | filename ENDS WITH '.pdf' |
Null Value Operators
Operator | Description | Example |
---|---|---|
IS NULL |
Column is empty or null | phone IS NULL |
IS NOT NULL |
Column is not empty | email IS NOT NULL |
Logical Operators
Operator | Description | Example |
---|---|---|
AND |
Both conditions must be true | age > 25 AND city = 'New York' |
OR |
At least one condition must be true | status = 'Active' OR status = 'Pending' |
Working with Quoted Column Names
If your column names contain spaces or special characters, wrap them in quotes:
- Double quotes:
"column name" = 'value'
- Square brackets:
[column name] = 'value'
- Backticks:
`column name` = 'value'
Using Parentheses for Complex Logic
Parentheses control the order of operations in complex filters:
(status = 'Active' OR status = 'Pending') AND age > 25
This finds records where the status is either Active OR Pending, AND the age is greater than 25.
Filter Examples
Example 1: Simple text filter
department = 'Sales'
Example 2: Range filter with AND
salary >= 50000 AND salary <= 100000
Example 3: Multiple text conditions with OR
city = 'New York' OR city = 'Los Angeles' OR city = 'Chicago'
Example 4: Complex condition with parentheses
(department = 'Sales' OR department = 'Marketing') AND hire_date >= '2020-01-01'
Example 5: Text search with null check
email CONTAINS '@company.com' AND phone IS NOT NULL
Example 6: Date range with text filter
order_date STARTS WITH '2024' AND status != 'Cancelled'
Important Notes for Filtering
- Text values must be wrapped in single quotes:
'value'
- Numbers should not be quoted:
age > 25
- Column names are case-sensitive in some databases
- Filter keywords (AND, OR, CONTAINS, etc.) are case-insensitive
- Empty values should use
IS NULL
orIS NOT NULL
Sorting Guide
The Sort By option allows you to order your results by one or multiple columns with flexible direction control.
Sort Direction Options
ASC
orASCENDING
: Sorts from smallest to largest (A→Z, 1→9, oldest→newest)DESC
orDESCENDING
: Sorts from largest to smallest (Z→A, 9→1, newest→oldest)
Single Column Sorting
To sort by a single column, add the column name to the Sort By list:
- Column name:
created_date
- Direction:
ASC
by default
Multiple Column Sorting
For multi-column sorting, the order in the Sort By list determines priority:
Example: Sort by department, then by salary (highest first), then by name
Sort By list:
department
salary DESC
name ASC
This will:
- First group all records by department (alphabetically)
- Within each department, sort by salary (highest first)
- Within the same salary, sort by name (alphabetically)
Sorting with Quoted Column Names
If column names contain spaces or special characters, use the same quoting as in filters:
"Last Name" DESC
[Order Date] ASC
`Employee ID` ASC
Sort Direction Syntax
You can specify direction by adding it after column name
salary DESC
created_date ASC
Sorting Examples
Example 1: Recent orders first
order_date DESC
Example 2: Alphabetical by customer, then by order amount (highest first)
customer_name ASC
order_amount DESC
Example 3: Priority-based sorting
priority DESC
created_date DESC
customer_name ASC
Example 4: Complex business logic sorting
status ASC
due_date ASC
amount DESC
This sorts by:
- Status (Active, Completed, Pending - alphabetically)
- Due date (earliest first within each status)
- Amount (highest first within same status and date)
Important Notes for Sorting
- Sort order matters: First column in the list has highest priority
- Default direction: If no direction specified, ASC is usually assumed
- Null values: Usually sorted at the beginning (ASC) or end (DESC) depending on database
- Case sensitivity: Depends on database configuration
- Performance: Sorting large datasets may take longer
Combining Filtering and Sorting
You can use both filtering and sorting together for powerful data extraction:
Example: Get active customers from 2024, sorted by registration date (newest first)
- Filter:
status = 'Active' AND registration_date STARTS WITH '2024'
- Sort By:
registration_date DESC
This combination first filters the data to only active customers from 2024, then sorts the results by registration date with the newest customers appearing first.
import logging
import sqlalchemy as sa
from sqlalchemy import select, Table, MetaData, and_, or_, desc, asc
import pandas as pd
import polars as pl
from os import getenv
from coded_flows.types import Str, SecretStr, Int, DataFrame
import re
from typing import Any, List, Dict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
"""Return the first non-None value."""
return next((v for v in values if v is not None), None)
class FilterTokenizer:
"""Tokenizer for filter expressions that handles quoted column names and operators."""
def __init__(self, expression: str):
self.expression = expression.strip()
self.tokens = []
self.position = 0
def tokenize(self) -> List[Dict[str, Any]]:
"""Tokenize the filter expression."""
self.tokens = []
self.position = 0
while self.position < len(self.expression):
self._skip_whitespace()
if self.position >= len(self.expression):
break
if self._match_parentheses():
continue
elif self._match_quoted_identifier():
continue
elif self._match_operators():
continue
elif self._match_logical_operators():
continue
elif self._match_quoted_value():
continue
elif self._match_unquoted_value():
continue
else:
self.position += 1
return self.tokens
def _skip_whitespace(self):
"""Skip whitespace characters."""
while (
self.position < len(self.expression)
and self.expression[self.position].isspace()
):
self.position += 1
def _match_parentheses(self) -> bool:
"""Match opening or closing parentheses."""
if (
self.position < len(self.expression)
and self.expression[self.position] in "()"
):
char = self.expression[self.position]
self.tokens.append(
{"type": "LPAREN" if char == "(" else "RPAREN", "value": char}
)
self.position += 1
return True
return False
def _match_quoted_identifier(self) -> bool:
"""Match quoted column names (double quotes, square brackets, or backticks)."""
quote_chars = {'"': '"', "[": "]", "`": "`"}
if self.position >= len(self.expression):
return False
start_char = self.expression[self.position]
if start_char not in quote_chars:
return False
end_char = quote_chars[start_char]
start_pos = self.position + 1
end_pos = start_pos
while end_pos < len(self.expression):
if self.expression[end_pos] == end_char:
if (
end_pos + 1 < len(self.expression)
and self.expression[end_pos + 1] == end_char
):
end_pos += 2
continue
else:
break
end_pos += 1
if end_pos >= len(self.expression):
return False
value = self.expression[start_pos:end_pos].replace(
end_char + end_char, end_char
)
self.tokens.append({"type": "COLUMN", "value": value})
self.position = end_pos + 1
return True
def _match_logical_operators(self) -> bool:
"""Match AND/OR operators."""
remaining = self.expression[self.position :].upper()
if remaining.startswith("AND") and (
len(remaining) == 3 or not remaining[3].isalnum()
):
self.tokens.append({"type": "AND", "value": "AND"})
self.position += 3
return True
if remaining.startswith("OR") and (
len(remaining) == 2 or not remaining[2].isalnum()
):
self.tokens.append({"type": "OR", "value": "OR"})
self.position += 2
return True
return False
def _match_operators(self) -> bool:
"""Match comparison operators and special operators."""
remaining = self.expression[self.position :].upper()
operators = [
("ICONTAINS", "ICONTAINS"),
("ISTARTS WITH", "ISTARTS_WITH"),
("IENDS WITH", "IENDS_WITH"),
("STARTS WITH", "STARTS_WITH"),
("ENDS WITH", "ENDS_WITH"),
("IS NOT NULL", "IS_NOT_NULL"),
("IS NULL", "IS_NULL"),
("CONTAINS", "CONTAINS"),
("<=", "LTE"),
(">=", "GTE"),
("<>", "NE"),
("!=", "NE"),
("<", "LT"),
(">", "GT"),
("=", "EQ"),
]
for op_text, op_type in operators:
if remaining.startswith(op_text):
next_pos = self.position + len(op_text)
if (
next_pos >= len(self.expression)
or not self.expression[next_pos].isalnum()
):
self.tokens.append({"type": op_type, "value": op_text})
self.position = next_pos
return True
return False
def _match_quoted_value(self) -> bool:
"""Match quoted string values."""
if self.position >= len(self.expression):
return False
quote_char = self.expression[self.position]
if quote_char not in ["'", '"']:
return False
start_pos = self.position + 1
end_pos = start_pos
while end_pos < len(self.expression):
if self.expression[end_pos] == quote_char:
if (
end_pos + 1 < len(self.expression)
and self.expression[end_pos + 1] == quote_char
):
end_pos += 2
continue
else:
break
end_pos += 1
if end_pos >= len(self.expression):
return False
value = self.expression[start_pos:end_pos].replace(
quote_char + quote_char, quote_char
)
self.tokens.append({"type": "VALUE", "value": value})
self.position = end_pos + 1
return True
def _match_unquoted_value(self) -> bool:
"""Match unquoted values (numbers or identifiers)."""
start_pos = self.position
while (
self.position < len(self.expression)
and (not self.expression[self.position].isspace())
and (self.expression[self.position] not in "()")
):
remaining = self.expression[self.position :].upper()
if (
remaining.startswith("AND ")
or remaining.startswith("OR ")
or remaining.startswith("<=")
or remaining.startswith(">=")
or remaining.startswith("<>")
or remaining.startswith("!=")
or remaining.startswith("IS ")
or remaining.startswith("CONTAINS ")
or remaining.startswith("STARTS ")
or remaining.startswith("ENDS ")
or (self.expression[self.position] in "<>=")
):
break
self.position += 1
if self.position > start_pos:
value = self.expression[start_pos : self.position]
token_type = "VALUE" if self._is_numeric(value) else "COLUMN"
self.tokens.append({"type": token_type, "value": value})
return True
return False
def _is_numeric(self, value: str) -> bool:
"""Check if a string represents a number."""
try:
float(value)
return True
except ValueError:
return False
class FilterParser:
"""Parser for filter expressions with support for parentheses."""
def __init__(self, tokens: List[Dict[str, Any]], table: Table, table_name: str):
self.tokens = tokens
self.table = table
self.table_name = table_name
self.position = 0
def parse(self) -> Any:
"""Parse the tokens into SQLAlchemy conditions."""
if not self.tokens:
return None
result = self._parse_or()
return result
def _parse_or(self) -> Any:
"""Parse OR expressions (lowest precedence)."""
left = self._parse_and()
while self._current_token_is("OR"):
self.position += 1
right = self._parse_and()
if left is None:
left = right
elif right is not None:
left = or_(left, right)
return left
def _parse_and(self) -> Any:
"""Parse AND expressions (higher precedence than OR)."""
left = self._parse_condition()
while self._current_token_is("AND"):
self.position += 1
right = self._parse_condition()
if left is None:
left = right
elif right is not None:
left = and_(left, right)
return left
def _parse_condition(self) -> Any:
"""Parse individual conditions or parenthesized expressions."""
if self._current_token_is("LPAREN"):
self.position += 1
result = self._parse_or()
if not self._current_token_is("RPAREN"):
raise ValueError("Missing closing parenthesis in filter expression")
self.position += 1
return result
return self._parse_simple_condition()
def _parse_simple_condition(self) -> Any:
"""Parse a simple condition (column operator value)."""
if self.position >= len(self.tokens):
return None
column_token = self.tokens[self.position]
if column_token["type"] != "COLUMN":
raise ValueError(f"Expected column name, got {column_token['value']}")
column_name = column_token["value"]
if column_name not in self.table.c:
raise ValueError(
f"Column '{column_name}' not found in table '{self.table_name}'"
)
column = self.table.c[column_name]
self.position += 1
if self.position >= len(self.tokens):
raise ValueError("Expected operator after column name")
op_token = self.tokens[self.position]
self.position += 1
if op_token["type"] == "IS_NULL":
return column.is_(None)
elif op_token["type"] == "IS_NOT_NULL":
return column.isnot(None)
if self.position >= len(self.tokens):
raise ValueError("Expected value after operator")
value_token = self.tokens[self.position]
if value_token["type"] != "VALUE":
raise ValueError(f"Expected value, got {value_token['value']}")
value = self._convert_value(value_token["value"])
self.position += 1
if op_token["type"] == "EQ":
return column == value
elif op_token["type"] == "NE":
return column != value
elif op_token["type"] == "LT":
return column < value
elif op_token["type"] == "GT":
return column > value
elif op_token["type"] == "LTE":
return column <= value
elif op_token["type"] == "GTE":
return column >= value
elif op_token["type"] == "CONTAINS":
return column.like(f"%{value}%")
elif op_token["type"] == "STARTS_WITH":
return column.like(f"{value}%")
elif op_token["type"] == "ENDS_WITH":
return column.like(f"%{value}")
elif op_token["type"] == "ICONTAINS":
return column.ilike(f"%{value}%")
elif op_token["type"] == "ISTARTS_WITH":
return column.ilike(f"{value}%")
elif op_token["type"] == "IENDS_WITH":
return column.ilike(f"%{value}")
else:
raise ValueError(f"Unknown operator: {op_token['type']}")
def _current_token_is(self, token_type: str) -> bool:
"""Check if current token is of given type."""
return (
self.position < len(self.tokens)
and self.tokens[self.position]["type"] == token_type
)
def _convert_value(self, value: str) -> Any:
"""Convert string value to appropriate Python type."""
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
return value
def _parse_excel_filter(filter_expr: str, table: Table, table_name: str) -> Any:
"""
Parse Excel-like filter expressions into SQLAlchemy conditions with robust handling.
Supports parentheses, quoted column names, and proper precedence.
Supported operators:
- Comparison: =, !=, <>, <, >, <=, >=
- Text (case-sensitive): CONTAINS, STARTS WITH, ENDS WITH
- Text (case-insensitive): ICONTAINS, ISTARTS WITH, IENDS WITH
- Null checks: IS NULL, IS NOT NULL
- Logical: AND, OR
- Parentheses for grouping: (, )
Examples:
- name = 'John'
- age > 25 AND status = 'Active'
- description ICONTAINS 'important'
- title ISTARTS WITH 'mr'
"""
if not filter_expr or filter_expr.strip() == "":
return None
try:
tokenizer = FilterTokenizer(filter_expr)
tokens = tokenizer.tokenize()
if not tokens:
return None
parser = FilterParser(tokens, table, table_name)
return parser.parse()
except Exception as e:
raise ValueError(f"Error parsing filter expression '{filter_expr}': {str(e)}")
def _parse_sort_column(sort_item: str, available_columns: set) -> tuple:
"""
Parse a sort column specification, handling quoted names and directions.
Supports double quotes, square brackets, and backticks for column names.
Returns (column_name, direction) tuple.
Examples:
- "Last Name" DESC
- [Order Date] ASC
- `Employee ID` ASC
"""
sort_item = sort_item.strip()
if not sort_item:
return (None, None)
direction = "ASC"
col_name = sort_item
upper_item = sort_item.upper()
if upper_item.endswith(" DESC"):
direction = "DESC"
col_name = sort_item[:-5].strip()
elif upper_item.endswith(" ASC"):
direction = "ASC"
col_name = sort_item[:-4].strip()
original_col_name = col_name
if col_name.startswith('"') and col_name.endswith('"'):
col_name = col_name[1:-1].replace('""', '"')
elif col_name.startswith("[") and col_name.endswith("]"):
col_name = col_name[1:-1]
elif col_name.startswith("`") and col_name.endswith("`"):
col_name = col_name[1:-1].replace("``", "`")
if col_name not in available_columns:
raise ValueError(
f"Column '{col_name}' in 'Sort By' not found in table. Original sort specification: '{original_col_name} {direction}'"
)
return (col_name, direction)
def extract_table(
table_name: Str = None,
filter_expression: Str = None,
database_name: Str = None,
host: Str = None,
port: Int = None,
username: Str = None,
password: SecretStr = None,
options: Dict = None,
) -> DataFrame:
brick_display_name = "Extract Table"
df = None
options = options or {}
verbose = options.get("verbose", True)
db_type = options.get("db_type", "SQLite")
df_type = options.get("df_type", "Pandas")
env_prefix = options.get("env_prefix", "")
op_table_name = options.get("table_name", "")
columns_to_extract = options.get("columns_to_extract", [])
apply_limit = options.get("apply_limit", False)
limit_value = options.get("limit_value", 1000)
sort_by = options.get("sort_by", [])
op_filter_expression = options.get("filter_expression", "")
table_name = _coalesce(table_name, op_table_name)
filter_expression = _coalesce(filter_expression, op_filter_expression)
if not table_name:
raise ValueError("Table name is required")
env_prefix = env_prefix + "_" if env_prefix.strip() else ""
try:
database_name = _coalesce(database_name, getenv(f"{env_prefix}DB_NAME"))
if db_type != "SQLite":
host = _coalesce(host, getenv(f"{env_prefix}DB_HOST"))
port = _coalesce(port, getenv(f"{env_prefix}DB_PORT"))
username = _coalesce(username, getenv(f"{env_prefix}DB_USERNAME"))
password = _coalesce(password, getenv(f"{env_prefix}DB_PASSWORD"))
if not database_name or (
db_type != "SQLite"
and (not host or not port or (not username) or (not password))
):
raise ValueError("Missing database configuration")
if hasattr(password, "get_secret_value"):
password = password.get_secret_value()
else:
password = str(password) if password else None
if port:
port = int(port)
if not 1 <= port <= 65535:
raise ValueError(f"Port must be between 1 and 65535, got {port}")
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Configuration error encountered"
)
raise
if verbose:
logger.info(
f"[{brick_display_name}] Extracting from table '{table_name}' in {db_type} database."
)
driver_map = {
"Postgres": "postgresql+psycopg",
"MySQL": "mysql+pymysql",
"MariaDB": "mariadb+pymysql",
"Oracle Database": "oracle+oracledb",
"Microsoft SQL Server": "mssql+pymssql",
"SQLite": "sqlite",
}
url = (
f"{driver_map[db_type]}:///{database_name}"
if db_type == "SQLite"
else f"{driver_map[db_type]}://{username}:{password}@{host}:{port}/{database_name}"
)
try:
engine = sa.create_engine(url)
with engine.connect() as conn:
metadata = MetaData()
table = Table(table_name, metadata, autoload_with=engine)
available_columns = {c.name for c in table.columns}
if columns_to_extract:
missing_columns = []
for col in columns_to_extract:
clean_col = col.strip()
if clean_col.startswith('"') and clean_col.endswith('"'):
clean_col = clean_col[1:-1].replace('""', '"')
elif clean_col.startswith("[") and clean_col.endswith("]"):
clean_col = clean_col[1:-1]
elif clean_col.startswith("`") and clean_col.endswith("`"):
clean_col = clean_col[1:-1].replace("``", "`")
if clean_col not in available_columns:
missing_columns.append(col)
if missing_columns:
error_msg = f"The following columns were not found in table '{table_name}': {', '.join(missing_columns)}."
verbose and logger.info(f"[{brick_display_name}] {error_msg}")
raise ValueError(error_msg)
column_list = []
for col in columns_to_extract:
clean_col = col.strip()
if clean_col.startswith('"') and clean_col.endswith('"'):
clean_col = clean_col[1:-1].replace('""', '"')
elif clean_col.startswith("[") and clean_col.endswith("]"):
clean_col = clean_col[1:-1]
elif clean_col.startswith("`") and clean_col.endswith("`"):
clean_col = clean_col[1:-1].replace("``", "`")
column_list.append(table.c[clean_col])
query = select(*column_list)
else:
query = select(table)
if filter_expression:
try:
filter_condition = _parse_excel_filter(
filter_expression, table, table_name
)
if filter_condition is not None:
query = query.where(filter_condition)
verbose and logger.info(
f"[{brick_display_name}] Filter applied: {filter_expression}"
)
except Exception as e:
verbose and logger.warning(
f"[{brick_display_name}] Failed to apply filter: {str(e)}"
)
raise
if sort_by:
order_by_clauses = []
for item in sort_by:
try:
(col_name, direction) = _parse_sort_column(
item, available_columns
)
if col_name:
order_by_clauses.append(
desc(table.c[col_name])
if direction == "DESC"
else asc(table.c[col_name])
)
verbose and logger.info(
f"[{brick_display_name}] Sorting by '{col_name}' ({direction})"
)
except Exception as e:
verbose and logger.warning(
f"[{brick_display_name}] Error parsing sort column '{item}': {str(e)}"
)
raise
if order_by_clauses:
query = query.order_by(*order_by_clauses)
if apply_limit and limit_value > 0:
query = query.limit(limit_value)
verbose and logger.info(
f"[{brick_display_name}] Limiting to {limit_value} rows"
)
if df_type.lower() == "pandas":
df = pd.read_sql(query, conn)
elif df_type.lower() == "polars":
compiled_query = query.compile(compile_kwargs={"literal_binds": True})
df = pl.read_database(str(compiled_query), connection=conn)
else:
raise ValueError("df_type must be 'Pandas' or 'Polars'")
engine.dispose()
verbose and logger.info(
f"[{brick_display_name}] Table extracted successfully. Rows returned: {(len(df) if df is not None else 0)}"
)
except sa.exc.NoSuchTableError:
verbose and logger.error(
f"[{brick_display_name}] Table '{table_name}' not found in the database."
)
raise
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during table extraction: {str(e)}"
)
raise
return df
Brick Info
- cryptography
- sqlalchemy
- psycopg[binary]
- pymysql
- pymssql
- pandas
- polars
- oracledb