Execute SQL Scripts

Run one-or-many SQL statements (CRUD) and return results + status.

Execute SQL Scripts


Processing

Executes one or multiple SQL statements (CRUD operations)and returns the results along with execution status. The function supports SELECT queries that return data as DataFrames, as well as INSERT/UPDATE/DELETE operations.

Inputs

sql text
The SQL statements to execute, which can include multiple statements separated by semicolons
database name (optional)
Name of the database to connect to, can be provided via environment variables
host (optional)
Database server hostname or IP address, can be provided via environment variables
port (optional)
Database server port number, can be provided via environment variables
username (optional)
Database username for authentication, can be provided via environment variables
password (optional)
Database password for authentication, can be provided via environment variables

Inputs Types

Input Types
sql text Str
database name Str
host Str
port Int
username Str
password SecretStr

You can check the list of supported types here: Available Type Hints.

Outputs

result df
DataFrame containing the results of the last SELECT statement executed, empty DataFrame if no SELECT statements
status
Boolean indicating whether the SQL execution was successful
error_message
String containing error details if execution failed, empty string if successful

Outputs Types

Output Types
result df DataFrame
status Bool
error message Str

You can check the list of supported types here: Available Type Hints.

Options

The Execute SQL Scripts brick contains some changeable options:

Database type
Selects the type of database to connect to (Postgres, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, SQLite)
DataFrame type
Chooses the DataFrame library for return results (Pandas or Polars)
Env variables prefix
Prefix for environment variable names when resolving database credentials (e.g., "DEV" creates DEV_DB_NAME, DEV_DB_HOST, etc.)
Verbose
Enables or disables detailed logging output during execution

Environment Variables Configuration

If you don't provide database connection parameters as brick inputs, you can configure them using environment variables.

Step 1 — Create an environment file

Create an environment file in the flow project folder. You can name it anything you like (e.g. .env, db.env, or mysettings.env).

Example: db.env

# Example database environment configuration
DB_NAME=mydatabase
DB_HOST=localhost
DB_PORT=5432
DB_USERNAME=myuser
DB_PASSWORD=mypassword

💡 For SQLite, only DB_NAME is required (e.g., DB_NAME=mydb.sqlite).

Step 2 — Reference the env file in CODED FLOWS

Inside the CODED FLOWS app, use the LOAD ENV control brick to load the file you created.

  • Specify your env filename (db.env for example or whichever name you chose).
  • All defined variables become available to other any bricks in the connected graph flow.

Step 3 — Use with prefix (optional)

If you enable the "Env variables prefix" option in the LOAD ENV brick, the system will automatically prepend the prefix and an underscore to each variable.

Example with prefix MYAPP:

MYAPP_DB_NAME=mydatabase
MYAPP_DB_HOST=localhost
MYAPP_DB_PORT=5432
MYAPP_DB_USERNAME=myuser
MYAPP_DB_PASSWORD=mypassword

Bricks will then look for MYAPP_DB_NAME, MYAPP_DB_HOST, etc. instead of the default variable names.

import logging
import sqlparse
import sqlalchemy as sa
from sqlalchemy import text
import pandas as pd
import polars as pl
from os import getenv
from urllib.parse import quote_plus
from coded_flows.types import Str, SecretStr, Int, DataFrame, Tuple, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _split_sql_statements(sql_text: str) -> list[str]:
    statements = sqlparse.split(sql_text)
    return [stmt.strip() for stmt in statements if stmt.strip()]


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def sql_scripts(
    sql_text: Str,
    database_name: Str = None,
    host: Str = None,
    port: Int = None,
    username: Str = None,
    password: SecretStr = None,
    options=None,
) -> Tuple[DataFrame, Bool, Str]:
    brick_display_name = "Execute SQL Scripts"
    options = options or {}
    verbose = options.get("verbose", True)
    db_type = options.get("db_type", "SQLite")
    df_type = options.get("df_type", "Pandas")
    env_prefix = options.get("env_prefix", "")
    env_prefix = env_prefix + "_" if env_prefix.strip() else ""
    result_df = pd.DataFrame() if df_type.lower() == "pandas" else pl.DataFrame()
    status = False
    error_message = ""

    def _info(msg: str):
        verbose and logger.info(f"[{brick_display_name}] {msg}")

    def _warn(msg: str):
        verbose and logger.warning(f"[{brick_display_name}] {msg}")

    def _error(msg: str):
        verbose and logger.error(f"[{brick_display_name}] {msg}")

    setup_ok = True
    pw = None
    try:
        database_name = _coalesce(database_name, getenv(f"{env_prefix}DB_NAME"))
        if db_type != "SQLite":
            host = _coalesce(host, getenv(f"{env_prefix}DB_HOST"))
            port = _coalesce(port, getenv(f"{env_prefix}DB_PORT"))
            username = _coalesce(username, getenv(f"{env_prefix}DB_USERNAME"))
            password = _coalesce(password, getenv(f"{env_prefix}DB_PASSWORD"))
        if not database_name or (
            db_type != "SQLite"
            and (not host or not port or (not username) or (not password))
        ):
            msg = "Either the env file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
            _error(msg)
            raise ValueError(msg)
        try:
            if hasattr(password, "get_secret_value"):
                pw = password.get_secret_value()
            else:
                pw = str(password)
        except Exception:
            pw = str(password)
        if port is not None:
            try:
                port = int(port)
            except (TypeError, ValueError):
                _error(f"Port must be an integer, got {port!r}")
                raise ValueError(f"Port must be an integer, got {port!r}")
            if not 1 <= port <= 65535:
                _error(f"Port must be between 1 and 65535, got {port}")
                raise ValueError(f"Port must be between 1 and 65535, got {port}")
    except Exception as e:
        status = False
        msg = "Either the env file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
        _error(msg)
        raise
    _info(f"Starting execution against {db_type} database.")
    driver_map = {
        "Postgres": "postgresql+psycopg",
        "MySQL": "mysql+pymysql",
        "MariaDB": "mariadb+pymysql",
        "Oracle Database": "oracle+oracledb",
        "Microsoft SQL Server": "mssql+pymssql",
        "SQLite": "sqlite",
    }
    url = None
    try:
        if db_type == "SQLite":
            if host or username or password:
                _warn("Ignoring host, username, password for SQLite.")
            url = f"{driver_map[db_type]}:///{database_name}"
        else:
            if not all([host, port, username, pw, database_name]):
                _error(f"Missing required connection parameters for {db_type}.")
                raise ValueError(
                    f"Missing required connection parameters for {db_type}."
                )
            username_q = quote_plus(str(username))
            password_q = quote_plus(str(pw))
            url = f"{driver_map[db_type]}://{username_q}:{password_q}@{host}:{port}/{database_name}"
        _info("Connection URL constructed.")
    except Exception as e:
        setup_ok = False
        status = False
        error_message = str(e)
        _error("Error during connection URL constructed.")
    engine = None
    transaction = None
    if setup_ok:
        try:
            engine = sa.create_engine(url, future=True)
            _info("Engine created.")
            with engine.connect() as conn:
                transaction = conn.begin()
                _info("Transaction started.")
                raw_statements = _split_sql_statements(sql_text)
                last_select_df = None
                total_rows_affected = 0
                found_select = False
                for statement in raw_statements:
                    if not statement:
                        continue
                    _info(
                        f"Executing statement: {statement[:120]}{('...' if len(statement) > 120 else '')}"
                    )
                    stmt = text(statement)
                    result = conn.execute(stmt)
                    if result.returns_rows:
                        rows = result.fetchall()
                        cols = result.keys()
                        pandas_df = pd.DataFrame(rows, columns=cols)
                        if df_type.lower() == "pandas":
                            last_select_df = pandas_df
                        elif df_type.lower() == "polars":
                            last_select_df = pl.from_pandas(pandas_df)
                        else:
                            raise ValueError("df_type must be 'Pandas' or 'Polars'")
                        _info(f"SELECT returned {len(pandas_df)} rows.")
                        rows_count = len(pandas_df)
                        total_rows_affected = rows_count
                        found_select = True
                    else:
                        rc = result.rowcount if hasattr(result, "rowcount") else None
                        if isinstance(rc, int) and rc >= 0:
                            total_rows_affected += rc
                            _info(f"Statement affected {rc} rows.")
                        else:
                            _info(
                                "Statement executed; rowcount not available from driver."
                            )
                transaction.commit()
                _info("Transaction committed.")
                if found_select:
                    result_df = last_select_df
                status = True
                error_message = ""
        except Exception as e:
            try:
                if transaction is not None:
                    transaction.rollback()
                    _info("Transaction rolled back due to error.")
            except Exception:
                _error("Failed to rollback transaction.")
            status = False
            error_message = str(e)
            _error(f"Error during execution: {error_message}")
    try:
        if engine is not None:
            engine.dispose()
            _info("Engine disposed.")
    except Exception:
        pass
    return (result_df, status, error_message)

Brick Info

version v0.1.0
python 3.10, 3.11, 3.12, 3.13
requirements
  • cryptography
  • sqlalchemy
  • psycopg[binary]
  • pymysql
  • sqlparse
  • pymssql
  • pandas
  • polars
  • oracledb