Get Table Schema

Retrieve the schema of a database table.

Get Table Schema


Processing

Retrieves the schema information of a database table from various database types including PostgreSQL, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, and SQLite. The function connects to the specified database, inspects the table structure, and returns detailed column information including names, types, and other metadata. The output can be formatted as Pandas DataFrame, Polars DataFrame, or raw data records.

Inputs

table name
The name of the database table to retrieve schema information from
database name (optional)
The name of the database containing the target table. If not provided, will use environment variable
host (optional)
The database server hostname or IP address (ignored for SQLite). If not provided, will use environment variable
port (optional)
The database server port number (ignored for SQLite). If not provided, will use environment variable
username (optional)
The database username for authentication (ignored for SQLite). If not provided, will use environment variable
password (optional)
The database password for authentication (ignored for SQLite). If not provided, will use environment variable

Inputs Types

Input Types
table name Str
database name Str
host Str
port Int
username Str
password SecretStr

You can check the list of supported types here: Available Type Hints.

Outputs

schema
The table schema information containing column details such as names, data types, nullable status, and other metadata. Format depends on the selected output type option.

The schema output contains the following metadata for each column:

  • name: The column name
  • type: The column data type as a string representation
  • nullable: Whether the column can contain NULL values
  • default: The default value for the column (if any)
  • autoincrement: Whether the column is auto-incrementing
  • primary_key: Whether the column is part of the primary key

Outputs Types

Output Types
schema DataFrame, DataRecords

You can check the list of supported types here: Available Type Hints.

Options

The Get Table Schema brick contains some changeable options:

Table Name
The name of the table to retrieve schema information from (alternative to the table name input parameter)
Database type
The type of database system to connect to (Postgres, MySQL, MariaDB, Oracle Database, Microsoft SQL Server, or SQLite)
Output type
The format of the returned schema data (Records for raw data, Pandas for Pandas DataFrame, or Polars for Polars DataFrame)
Env variables prefix
A prefix to add to environment variable names when looking up database connection parameters. This allows you to use different sets of database credentials for different purposes.
Verbose
Controls whether detailed logging information is displayed during execution

Environment Variables Configuration

If you don't provide database connection parameters as brick inputs, you can configure them using environment variables.

Step 1 — Create an environment file

Create an environment file in the flow project folder. You can name it anything you like (e.g. .env, db.env, or mysettings.env).

Example: db.env

# Example database environment configuration
DB_NAME=mydatabase
DB_HOST=localhost
DB_PORT=5432
DB_USERNAME=myuser
DB_PASSWORD=mypassword

💡 For SQLite, only DB_NAME is required (e.g., DB_NAME=mydb.sqlite).

Step 2 — Reference the env file in CODED FLOWS

Inside the CODED FLOWS app, use the LOAD ENV control brick to load the file you created.

  • Specify your env filename (db.env for example or whichever name you chose).
  • All defined variables become available to other any bricks in the connected graph flow.

Step 3 — Use with prefix (optional)

If you enable the "Env variables prefix" option in the LOAD ENV brick, the system will automatically prepend the prefix and an underscore to each variable.

Example with prefix MYAPP:

MYAPP_DB_NAME=mydatabase
MYAPP_DB_HOST=localhost
MYAPP_DB_PORT=5432
MYAPP_DB_USERNAME=myuser
MYAPP_DB_PASSWORD=mypassword

Bricks will then look for MYAPP_DB_NAME, MYAPP_DB_HOST, etc. instead of the default variable names.

import logging
import sqlalchemy as sa
from sqlalchemy import inspect
import pandas as pd
import polars as pl
from os import getenv
from coded_flows.types import Str, SecretStr, Int, DataFrame, DataRecords, Union

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None))


def get_table_schema(
    table_name: Str = None,
    database_name: Str = None,
    host: Str = None,
    port: Int = None,
    username: Str = None,
    password: SecretStr = None,
    options=None,
) -> Union[DataFrame, DataRecords]:
    brick_display_name = "Get Table Schema"
    schema = None
    options = options or {}
    op_table_name = options.get("table_name", "")
    verbose = options.get("verbose", True)
    db_type = options.get("db_type", "SQLite")
    out_type = options.get("out_type", "Pandas")
    env_prefix = options.get("env_prefix", "")
    env_prefix = env_prefix + "_" if env_prefix.strip() else ""
    table_name = _coalesce(table_name, op_table_name)
    try:
        database_name = _coalesce(database_name, getenv(f"{env_prefix}DB_NAME"))
        if db_type != "SQLite":
            host = _coalesce(host, getenv(f"{env_prefix}DB_HOST"))
            port = _coalesce(port, getenv(f"{env_prefix}DB_PORT"))
            username = _coalesce(username, getenv(f"{env_prefix}DB_USERNAME"))
            password = _coalesce(password, getenv(f"{env_prefix}DB_PASSWORD"))
        if not database_name or (
            db_type != "SQLite"
            and (not host or not port or (not username) or (not password))
        ):
            verbose and logger.error(
                f"[{brick_display_name}] Either the env file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
            )
            raise ValueError(
                "Either the env file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
            )
        try:
            if hasattr(password, "get_secret_value"):
                password = password.get_secret_value()
            else:
                password = str(password)
        except Exception:
            password = str(password)
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Either the env file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
        )
        raise ValueError(
            "Either the env file is not defined, not referenced in the project folder, or one or more DB configuration inputs are missing."
        )
    try:
        if port:
            port = int(port)
    except (ValueError, TypeError):
        verbose and logger.error(
            f"[{brick_display_name}] Port must be an integer, got {port!r}"
        )
        raise ValueError(f"Port must be an integer, got {port!r}")
    if port and (not 1 <= port <= 65535):
        verbose and logger.error(
            f"[{brick_display_name}] Port must be between 1 and 65535, got {port}"
        )
        raise ValueError(f"Port must be between 1 and 65535, got {port}")
    verbose and logger.info(
        f"[{brick_display_name}] Retrieving schema from {db_type} database for table '{table_name}'."
    )
    driver_map = {
        "Postgres": "postgresql+psycopg",
        "MySQL": "mysql+pymysql",
        "MariaDB": "mariadb+pymysql",
        "Oracle Database": "oracle+oracledb",
        "Microsoft SQL Server": "mssql+pymssql",
        "SQLite": "sqlite",
    }
    if db_type == "SQLite":
        if host or username or password:
            verbose and logger.warning(
                f"[{brick_display_name}] Ignoring host, username, password for SQLite."
            )
        url = f"{driver_map[db_type]}:///{database_name}"
    else:
        if not all([host, str(port), username, password, database_name]):
            verbose and logger.error(
                f"[{brick_display_name}] Missing required connection parameters for {db_type}."
            )
            raise ValueError(
                f"[{brick_display_name}] Missing required connection parameters for {db_type}."
            )
        url = f"{driver_map[db_type]}://{username}:{password}@{host}:{port}/{database_name}"
    verbose and logger.info(f"[{brick_display_name}] Connection URL constructed.")
    try:
        engine = sa.create_engine(url)
        verbose and logger.info(f"[{brick_display_name}] Engine created.")
        inspector = inspect(engine)
        if not inspector.has_table(table_name):
            verbose and logger.error(
                f"[{brick_display_name}] Table '{table_name}' does not exist in the database."
            )
            raise ValueError(f"Table '{table_name}' does not exist in the database.")
        columns = inspector.get_columns(table_name)
        columns = [{**col, "type": str(col["type"])} for col in columns]
        if out_type.lower() == "pandas":
            schema = pd.DataFrame(columns)
        elif out_type.lower() == "polars":
            schema = pl.DataFrame(columns)
        else:
            schema = columns
        engine.dispose()
        verbose and logger.info(
            f"[{brick_display_name}] Schema retrieved successfully. Columns found: {(len(schema) if schema is not None else 0)}"
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] An error occured while retrieving schema"
        )
        raise
    return schema

Brick Info

version v0.1.0
python 3.10, 3.11, 3.12, 3.13
requirements
  • cryptography
  • sqlalchemy
  • psycopg[binary]
  • pymysql
  • pymssql
  • pandas
  • polars
  • oracledb