Execute SQL Scripts
Run one-or-many SQL statements (CRUD) and return results + status.
Execute SQL Scripts
Processing
Executes one or multiple SQL statements (CRUD operations)and returns the results along with execution status. The function supports SELECT queries that return data as DataFrames, as well as INSERT/UPDATE/DELETE operations.
Inputs
- sql text
- The SQL statements to execute, which can include multiple statements separated by semicolons
- database name (optional)
- Name of the database to connect to, can be provided via environment variables
- host (optional)
- Database server hostname or IP address, can be provided via environment variables
- port (optional)
- Database server port number, can be provided via environment variables
- username (optional)
- Database username for authentication, can be provided via environment variables
- password (optional)
- Database password for authentication, can be provided via environment variables
Inputs Types
Input | Types |
---|---|
sql text |
Str |
database name |
Str |
host |
Str |
port |
Int |
username |
Str |
password |
SecretStr |
You can check the list of supported types here: Available Type Hints.
Outputs
- result df
- DataFrame containing the results of the last SELECT statement executed, empty DataFrame if no SELECT statements
- status
- Boolean indicating whether the SQL execution was successful
- error_message
- String containing error details if execution failed, empty string if successful
Outputs Types
Output | Types |
---|---|
result df |
DataFrame |
status |
Bool |
error message |
Str |
You can check the list of supported types here: Available Type Hints.
Options
The Execute SQL Scripts brick contains some changeable options:
- Database type
- Selects the type of database to connect to (Postgres, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, SQLite)
- DataFrame type
- Chooses the DataFrame library for return results (Pandas or Polars)
- Env variables prefix
- Prefix for environment variable names when resolving database credentials (e.g., "DEV" creates DEV_DB_NAME, DEV_DB_HOST, etc.)
- Verbose
- Enables or disables detailed logging output during execution
Environment Variables Configuration
If you don't provide database connection parameters as brick inputs, you can configure them using environment variables.
Step 1 — Create an environment file
Create an environment file in the flow project folder. You can name it anything you like (e.g. .env
, db.env
, or mysettings.env
).
Example: db.env
# Example database environment configuration
DB_NAME=mydatabase
DB_HOST=localhost
DB_PORT=5432
DB_USERNAME=myuser
DB_PASSWORD=mypassword
💡 For SQLite, only
DB_NAME
is required (e.g.,DB_NAME=mydb.sqlite
).
Step 2 — Reference the env file in CODED FLOWS
Inside the CODED FLOWS app, use the LOAD ENV control brick to load the file you created.
- Specify your env filename (
db.env
for example or whichever name you chose). - All defined variables become available to other any bricks in the connected graph flow.
Step 3 — Use with prefix (optional)
If you enable the "Env variables prefix" option in the LOAD ENV brick, the system will automatically prepend the prefix and an underscore to each variable.
Example with prefix MYAPP
:
MYAPP_DB_NAME=mydatabase
MYAPP_DB_HOST=localhost
MYAPP_DB_PORT=5432
MYAPP_DB_USERNAME=myuser
MYAPP_DB_PASSWORD=mypassword
Bricks will then look for MYAPP_DB_NAME
, MYAPP_DB_HOST
, etc. instead of the default variable names.
import logging
import sqlparse
import sqlalchemy as sa
from sqlalchemy import text
import pandas as pd
import polars as pl
from os import getenv
from urllib.parse import quote_plus
from coded_flows.types import Str, SecretStr, Int, DataFrame, Tuple, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _split_sql_statements(sql_text: str) -> list[str]:
statements = sqlparse.split(sql_text)
return [stmt.strip() for stmt in statements if stmt.strip()]
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def sql_scripts(
sql_text: Str,
database_name: Str = None,
host: Str = None,
port: Int = None,
username: Str = None,
password: SecretStr = None,
options=None,
) -> Tuple[DataFrame, Bool, Str]:
brick_display_name = "Execute SQL Scripts"
options = options or {}
verbose = options.get("verbose", True)
db_type = options.get("db_type", "SQLite")
df_type = options.get("df_type", "Pandas")
env_prefix = options.get("env_prefix", "")
env_prefix = env_prefix + "_" if env_prefix.strip() else ""
result_df = pd.DataFrame() if df_type.lower() == "pandas" else pl.DataFrame()
status = False
error_message = ""
def _info(msg: str):
verbose and logger.info(f"[{brick_display_name}] {msg}")
def _warn(msg: str):
verbose and logger.warning(f"[{brick_display_name}] {msg}")
def _error(msg: str):
verbose and logger.error(f"[{brick_display_name}] {msg}")
setup_ok = True
pw = None
try:
database_name = _coalesce(database_name, getenv(f"{env_prefix}DB_NAME"))
if db_type != "SQLite":
host = _coalesce(host, getenv(f"{env_prefix}DB_HOST"))
port = _coalesce(port, getenv(f"{env_prefix}DB_PORT"))
username = _coalesce(username, getenv(f"{env_prefix}DB_USERNAME"))
password = _coalesce(password, getenv(f"{env_prefix}DB_PASSWORD"))
if not database_name or (
db_type != "SQLite"
and (not host or not port or (not username) or (not password))
):
msg = "Either the env file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
_error(msg)
raise ValueError(msg)
try:
if hasattr(password, "get_secret_value"):
pw = password.get_secret_value()
else:
pw = str(password)
except Exception:
pw = str(password)
if port is not None:
try:
port = int(port)
except (TypeError, ValueError):
_error(f"Port must be an integer, got {port!r}")
raise ValueError(f"Port must be an integer, got {port!r}")
if not 1 <= port <= 65535:
_error(f"Port must be between 1 and 65535, got {port}")
raise ValueError(f"Port must be between 1 and 65535, got {port}")
except Exception as e:
status = False
msg = "Either the env file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
_error(msg)
raise
_info(f"Starting execution against {db_type} database.")
driver_map = {
"Postgres": "postgresql+psycopg",
"MySQL": "mysql+pymysql",
"MariaDB": "mariadb+pymysql",
"Oracle Database": "oracle+oracledb",
"Microsoft SQL Server": "mssql+pymssql",
"SQLite": "sqlite",
}
url = None
try:
if db_type == "SQLite":
if host or username or password:
_warn("Ignoring host, username, password for SQLite.")
url = f"{driver_map[db_type]}:///{database_name}"
else:
if not all([host, port, username, pw, database_name]):
_error(f"Missing required connection parameters for {db_type}.")
raise ValueError(
f"Missing required connection parameters for {db_type}."
)
username_q = quote_plus(str(username))
password_q = quote_plus(str(pw))
url = f"{driver_map[db_type]}://{username_q}:{password_q}@{host}:{port}/{database_name}"
_info("Connection URL constructed.")
except Exception as e:
setup_ok = False
status = False
error_message = str(e)
_error("Error during connection URL constructed.")
engine = None
transaction = None
if setup_ok:
try:
engine = sa.create_engine(url, future=True)
_info("Engine created.")
with engine.connect() as conn:
transaction = conn.begin()
_info("Transaction started.")
raw_statements = _split_sql_statements(sql_text)
last_select_df = None
total_rows_affected = 0
found_select = False
for statement in raw_statements:
if not statement:
continue
_info(
f"Executing statement: {statement[:120]}{('...' if len(statement) > 120 else '')}"
)
stmt = text(statement)
result = conn.execute(stmt)
if result.returns_rows:
rows = result.fetchall()
cols = result.keys()
pandas_df = pd.DataFrame(rows, columns=cols)
if df_type.lower() == "pandas":
last_select_df = pandas_df
elif df_type.lower() == "polars":
last_select_df = pl.from_pandas(pandas_df)
else:
raise ValueError("df_type must be 'Pandas' or 'Polars'")
_info(f"SELECT returned {len(pandas_df)} rows.")
rows_count = len(pandas_df)
total_rows_affected = rows_count
found_select = True
else:
rc = result.rowcount if hasattr(result, "rowcount") else None
if isinstance(rc, int) and rc >= 0:
total_rows_affected += rc
_info(f"Statement affected {rc} rows.")
else:
_info(
"Statement executed; rowcount not available from driver."
)
transaction.commit()
_info("Transaction committed.")
if found_select:
result_df = last_select_df
status = True
error_message = ""
except Exception as e:
try:
if transaction is not None:
transaction.rollback()
_info("Transaction rolled back due to error.")
except Exception:
_error("Failed to rollback transaction.")
status = False
error_message = str(e)
_error(f"Error during execution: {error_message}")
try:
if engine is not None:
engine.dispose()
_info("Engine disposed.")
except Exception:
pass
return (result_df, status, error_message)
Brick Info
- cryptography
- sqlalchemy
- psycopg[binary]
- pymysql
- sqlparse
- pymssql
- pandas
- polars
- oracledb