Query Database

Extract information from database tables using SQL queries.

Query Database


Processing

Extracts information from database tables by executing SQL queries against various database systems including PostgreSQL, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, and SQLite. The function supports both Pandas and Polars DataFrame outputs and can retrieve database connection parameters from environment variables or direct inputs.

Inputs

sql text
The SQL query to execute against the database
database name (optional)
Name of the database to connect to. If not provided, will use environment variable
host (optional)
Database server hostname or IP address (Not required for SQLite databases). If not provided, will use environment variable
port (optional)
Database server port number. Must be between 1 and 65535 (Not required for SQLite databases). If not provided, will use environment variable
username (optional)
Database username for authentication (Not required for SQLite databases). If not provided, will use environment variable
password (optional)
Database password for authentication (Not required for SQLite databases). If not provided, will use environment variable

Inputs Types

Input Types
sql text Str
database name Str
host Str
port Int
username Str
password SecretStr

You can check the list of supported types here: Available Type Hints.

Outputs

df
DataFrame containing the query results in either Pandas or Polars format depending on configuration

Outputs Types

Output Types
df DataFrame

You can check the list of supported types here: Available Type Hints.

Options

The Query Database brick contains some changeable options:

Database type
Selects the type of database to connect to. Available options are Postgres, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, and SQLite
DataFrame type
Determines the output DataFrame format. Choose between Pandas or Polars
Env variables prefix
Adds a custom prefix to environment variable names to avoid conflicts with other bricks or routines. An underscore will be automatically added between the prefix and the original environment variable names
Verbose
Enables detailed logging output during query execution for debugging and monitoring purposes

Environment Variables Configuration

If you don't provide database connection parameters as brick inputs, you can configure them using environment variables.

Step 1 — Create an environment file

Create an environment file in the flow project folder. You can name it anything you like (e.g. .env, db.env, or mysettings.env).

Example: db.env

# Example database environment configuration
DB_NAME=mydatabase
DB_HOST=localhost
DB_PORT=5432
DB_USERNAME=myuser
DB_PASSWORD=mypassword

💡 For SQLite, only DB_NAME is required (e.g., DB_NAME=mydb.sqlite).

Step 2 — Reference the env file in CODED FLOWS

Inside the CODED FLOWS app, use the LOAD ENV control brick to load the file you created.

  • Specify your env filename (db.env for example or whichever name you chose).
  • All defined variables become available to other any bricks in the connected graph flow.

Step 3 — Use with prefix (optional)

If you enable the "Env variables prefix" option in the LOAD ENV brick, the system will automatically prepend the prefix and an underscore to each variable.

Example with prefix MYAPP:

MYAPP_DB_NAME=mydatabase
MYAPP_DB_HOST=localhost
MYAPP_DB_PORT=5432
MYAPP_DB_USERNAME=myuser
MYAPP_DB_PASSWORD=mypassword

Bricks will then look for MYAPP_DB_NAME, MYAPP_DB_HOST, etc. instead of the default variable names.

import logging
import sqlalchemy as sa
from sqlalchemy import text
import pandas as pd
import polars as pl
from os import getenv
from coded_flows.types import Str, SecretStr, Int, DataFrame

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None))


def query_db(
    sql_text: Str,
    database_name: Str = None,
    host: Str = None,
    port: Int = None,
    username: Str = None,
    password: SecretStr = None,
    options=None,
) -> DataFrame:
    brick_display_name = "Query Database"
    df = None
    options = options or {}
    verbose = options.get("verbose", True)
    db_type = options.get("db_type", "SQLite")
    df_type = options.get("df_type", "Pandas")
    env_prefix = options.get("env_prefix", "")
    env_prefix = env_prefix + "_" if env_prefix.strip() else ""
    try:
        database_name = _coalesce(database_name, getenv(f"{env_prefix}DB_NAME"))
        if db_type != "SQLite":
            host = _coalesce(host, getenv(f"{env_prefix}DB_HOST"))
            port = _coalesce(port, getenv(f"{env_prefix}DB_PORT"))
            username = _coalesce(username, getenv(f"{env_prefix}DB_USERNAME"))
            password = _coalesce(password, getenv(f"{env_prefix}DB_PASSWORD"))
        if not database_name or (
            db_type != "SQLite"
            and (not host or not port or (not username) or (not password))
        ):
            verbose and logger.error(
                f"[{brick_display_name}] Either the `env` file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
            )
            raise ValueError(
                "Either the `.env` file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
            )
        try:
            if hasattr(password, "get_secret_value"):
                password = password.get_secret_value()
            else:
                password = str(password)
        except Exception:
            password = str(password)
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Either the `env` file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
        )
        raise ValueError(
            "Either the `.env` file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
        )
    try:
        if port:
            port = int(port)
    except (ValueError, TypeError):
        verbose and logger.error(
            f"[{brick_display_name}] Port must be an integer, got {port!r}"
        )
        raise ValueError(f"Port must be an integer, got {port!r}")
    if port and (not 1 <= port <= 65535):
        verbose and logger.error(
            f"[{brick_display_name}] Port must be between 1 and 65535, got {port}"
        )
        raise ValueError(f"Port must be between 1 and 65535, got {port}")
    if verbose:
        logger.info(f"[{brick_display_name}] Starting query to {db_type} database.")
    driver_map = {
        "Postgres": "postgresql+psycopg",
        "MySQL": "mysql+pymysql",
        "MariaDB": "mariadb+pymysql",
        "Oracle Database": "oracle+oracledb",
        "Microsoft SQL Server": "mssql+pymssql",
        "SQLite": "sqlite",
    }
    if db_type == "SQLite":
        if host or username or password:
            verbose and logger.warning(
                f"[{brick_display_name}] Ignoring host, username, password for SQLite."
            )
        url = f"{driver_map[db_type]}:///{database_name}"
    else:
        if not all([host, str(port), username, password, database_name]):
            verbose and logger.error(
                f"[{brick_display_name}] Missing required connection parameters for {db_type}."
            )
            raise ValueError(
                f"[{brick_display_name}] Missing required connection parameters for {db_type}."
            )
        url = f"{driver_map[db_type]}://{username}:{password}@{host}:{port}/{database_name}"
    verbose and logger.info(f"[{brick_display_name}] Connection URL constructed.")
    try:
        engine = sa.create_engine(url)
        verbose and logger.info(f"[{brick_display_name}] Engine created.")
        with engine.connect() as conn:
            if df_type.lower() == "pandas":
                df = pd.read_sql(text(sql_text), conn)
            elif df_type.lower() == "polars":
                df = pl.read_database(text(sql_text), connection=conn)
            else:
                raise ValueError("df_type must be 'pandas' or 'polars'")
        engine.dispose()
        verbose and logger.info(
            f"[{brick_display_name}] Query executed successfully. Rows returned: {(len(df) if df is not None else 0)}"
        )
    except Exception as e:
        verbose and logger.error(f"[{brick_display_name}] Error during query: {str(e)}")
        raise
    return df

Brick Info

version v0.1.0
python 3.10, 3.11, 3.12, 3.13
requirements
  • cryptography
  • sqlalchemy
  • psycopg[binary]
  • pymysql
  • pymssql
  • pandas
  • polars
  • oracledb