Query Database
Extract information from database tables using SQL queries.
Query Database
Processing
Extracts information from database tables by executing SQL queries against various database systems including PostgreSQL, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, and SQLite. The function supports both Pandas and Polars DataFrame outputs and can retrieve database connection parameters from environment variables or direct inputs.
Inputs
- sql text
- The SQL query to execute against the database
- database name (optional)
- Name of the database to connect to. If not provided, will use environment variable
- host (optional)
- Database server hostname or IP address (Not required for SQLite databases). If not provided, will use environment variable
- port (optional)
- Database server port number. Must be between 1 and 65535 (Not required for SQLite databases). If not provided, will use environment variable
- username (optional)
- Database username for authentication (Not required for SQLite databases). If not provided, will use environment variable
- password (optional)
- Database password for authentication (Not required for SQLite databases). If not provided, will use environment variable
Inputs Types
Input | Types |
---|---|
sql text |
Str |
database name |
Str |
host |
Str |
port |
Int |
username |
Str |
password |
SecretStr |
You can check the list of supported types here: Available Type Hints.
Outputs
- df
- DataFrame containing the query results in either Pandas or Polars format depending on configuration
Outputs Types
Output | Types |
---|---|
df |
DataFrame |
You can check the list of supported types here: Available Type Hints.
Options
The Query Database brick contains some changeable options:
- Database type
- Selects the type of database to connect to. Available options are Postgres, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, and SQLite
- DataFrame type
- Determines the output DataFrame format. Choose between Pandas or Polars
- Env variables prefix
- Adds a custom prefix to environment variable names to avoid conflicts with other bricks or routines. An underscore will be automatically added between the prefix and the original environment variable names
- Verbose
- Enables detailed logging output during query execution for debugging and monitoring purposes
Environment Variables Configuration
If you don't provide database connection parameters as brick inputs, you can configure them using environment variables.
Step 1 — Create an environment file
Create an environment file in the flow project folder. You can name it anything you like (e.g. .env
, db.env
, or mysettings.env
).
Example: db.env
# Example database environment configuration
DB_NAME=mydatabase
DB_HOST=localhost
DB_PORT=5432
DB_USERNAME=myuser
DB_PASSWORD=mypassword
💡 For SQLite, only
DB_NAME
is required (e.g.,DB_NAME=mydb.sqlite
).
Step 2 — Reference the env file in CODED FLOWS
Inside the CODED FLOWS app, use the LOAD ENV control brick to load the file you created.
- Specify your env filename (
db.env
for example or whichever name you chose). - All defined variables become available to other any bricks in the connected graph flow.
Step 3 — Use with prefix (optional)
If you enable the "Env variables prefix" option in the LOAD ENV brick, the system will automatically prepend the prefix and an underscore to each variable.
Example with prefix MYAPP
:
MYAPP_DB_NAME=mydatabase
MYAPP_DB_HOST=localhost
MYAPP_DB_PORT=5432
MYAPP_DB_USERNAME=myuser
MYAPP_DB_PASSWORD=mypassword
Bricks will then look for MYAPP_DB_NAME
, MYAPP_DB_HOST
, etc. instead of the default variable names.
import logging
import sqlalchemy as sa
from sqlalchemy import text
import pandas as pd
import polars as pl
from os import getenv
from coded_flows.types import Str, SecretStr, Int, DataFrame
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None))
def query_db(
sql_text: Str,
database_name: Str = None,
host: Str = None,
port: Int = None,
username: Str = None,
password: SecretStr = None,
options=None,
) -> DataFrame:
brick_display_name = "Query Database"
df = None
options = options or {}
verbose = options.get("verbose", True)
db_type = options.get("db_type", "SQLite")
df_type = options.get("df_type", "Pandas")
env_prefix = options.get("env_prefix", "")
env_prefix = env_prefix + "_" if env_prefix.strip() else ""
try:
database_name = _coalesce(database_name, getenv(f"{env_prefix}DB_NAME"))
if db_type != "SQLite":
host = _coalesce(host, getenv(f"{env_prefix}DB_HOST"))
port = _coalesce(port, getenv(f"{env_prefix}DB_PORT"))
username = _coalesce(username, getenv(f"{env_prefix}DB_USERNAME"))
password = _coalesce(password, getenv(f"{env_prefix}DB_PASSWORD"))
if not database_name or (
db_type != "SQLite"
and (not host or not port or (not username) or (not password))
):
verbose and logger.error(
f"[{brick_display_name}] Either the `env` file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
)
raise ValueError(
"Either the `.env` file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
)
try:
if hasattr(password, "get_secret_value"):
password = password.get_secret_value()
else:
password = str(password)
except Exception:
password = str(password)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Either the `env` file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
)
raise ValueError(
"Either the `.env` file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
)
try:
if port:
port = int(port)
except (ValueError, TypeError):
verbose and logger.error(
f"[{brick_display_name}] Port must be an integer, got {port!r}"
)
raise ValueError(f"Port must be an integer, got {port!r}")
if port and (not 1 <= port <= 65535):
verbose and logger.error(
f"[{brick_display_name}] Port must be between 1 and 65535, got {port}"
)
raise ValueError(f"Port must be between 1 and 65535, got {port}")
if verbose:
logger.info(f"[{brick_display_name}] Starting query to {db_type} database.")
driver_map = {
"Postgres": "postgresql+psycopg",
"MySQL": "mysql+pymysql",
"MariaDB": "mariadb+pymysql",
"Oracle Database": "oracle+oracledb",
"Microsoft SQL Server": "mssql+pymssql",
"SQLite": "sqlite",
}
if db_type == "SQLite":
if host or username or password:
verbose and logger.warning(
f"[{brick_display_name}] Ignoring host, username, password for SQLite."
)
url = f"{driver_map[db_type]}:///{database_name}"
else:
if not all([host, str(port), username, password, database_name]):
verbose and logger.error(
f"[{brick_display_name}] Missing required connection parameters for {db_type}."
)
raise ValueError(
f"[{brick_display_name}] Missing required connection parameters for {db_type}."
)
url = f"{driver_map[db_type]}://{username}:{password}@{host}:{port}/{database_name}"
verbose and logger.info(f"[{brick_display_name}] Connection URL constructed.")
try:
engine = sa.create_engine(url)
verbose and logger.info(f"[{brick_display_name}] Engine created.")
with engine.connect() as conn:
if df_type.lower() == "pandas":
df = pd.read_sql(text(sql_text), conn)
elif df_type.lower() == "polars":
df = pl.read_database(text(sql_text), connection=conn)
else:
raise ValueError("df_type must be 'pandas' or 'polars'")
engine.dispose()
verbose and logger.info(
f"[{brick_display_name}] Query executed successfully. Rows returned: {(len(df) if df is not None else 0)}"
)
except Exception as e:
verbose and logger.error(f"[{brick_display_name}] Error during query: {str(e)}")
raise
return df
Brick Info
- cryptography
- sqlalchemy
- psycopg[binary]
- pymysql
- pymssql
- pandas
- polars
- oracledb