Read Data File
Read data from files supporting multiple standard formats (parquet, json, csv, excel).
Read Data File
Processing
Reads structured data from a specified file path. The function supports reading data in multiple standard formats including Parquet, CSV, JSON, and Excel. If the file format is set to 'auto', it attempts to detect the format based on the file extension. The resulting dataset can be returned as a Pandas DataFrame, Polars DataFrame, or PyArrow Table, selectable via the Output Format option.
Inputs
- file path
- The full path to the data file that needs to be read.
- file format (optional)
- Specifies the expected format of the input file (e.g., 'csv', 'parquet', 'excel', 'json'). If omitted, it defaults to 'auto' detection based on file extension.
- sheet name (optional)
- Applicable only for Excel files. Specifies the name of the sheet to read within the workbook.
- excel range (optional)
- Applicable only for Excel files. Defines a specific cell range (e.g., A1:B10) from which to load data.
- stop at empty (optional)
- Applicable only for Excel files. If enabled, reading stops upon encountering the first empty row outside a specified range.
- all varchar (optional)
- Applicable only for Excel files. Forces all columns to be read as text (VARCHAR) to manage mixed data types effectively.
- csv delimiter (optional)
- Applicable only for CSV files. Specifies the character used to delimit fields.
- csv header (optional)
- Applicable only for CSV files. Boolean indicating whether the file includes a header row.
- json lines (optional)
- Applicable only for JSON files. If true, the file is interpreted using the JSON Lines (newline-delimited JSON) format.
Inputs Types
| Input | Types |
|---|---|
file path |
Str, Path |
file format |
Str |
sheet name |
Str |
excel range |
Str |
stop at empty |
Bool |
all varchar |
Bool |
csv delimiter |
Str |
csv header |
Bool |
json lines |
Bool |
You can check the list of supported types here: Available Type Hints.
Outputs
- data
- The loaded dataset, structured according to the selected Output Format option (Pandas DataFrame, Polars DataFrame, or PyArrow Table).
Outputs Types
| Output | Types |
|---|---|
data |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Read Data File brick contains some changeable options:
- File Format
- Specifies the type of file being read. Options are 'auto' (default), 'parquet', 'csv', 'json', and 'excel'.
- Excel Sheet Name
- For Excel files, the name of the sheet to load. Defaults to the first sheet if empty.
- Excel Range (e.g., A1:B10)
- For Excel files, specifies a cell range to limit the data loaded.
- Stop at Empty Row
- For Excel files, controls whether the reader should stop upon encountering an empty row (default: True).
- All Excel Columns as Text
- If enabled, forces all columns in the Excel file to be read as text (VARCHAR), useful for consistency and handling mixed data types (default: False).
- CSV Delimiter
- The character used to separate values in CSV files (default:
,). - CSV Has Header
- Specifies whether the CSV file includes a header row (default: True).
- JSON Lines Format
- If enabled, treats the input file as newline-delimited JSON (JSONL) (default: False).
- Output Format
- Selects the desired output structure for the loaded data: 'pandas' (default), 'polars', or 'arrow' (PyArrow Table).
- Verbose
- Enables detailed logging messages during execution (default: True).
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, Path, Str, Bool, DataFrame, ArrowTable
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _detect_file_format(file_path):
"""
Detect file format based on file extension.
"""
path_obj = Path(file_path)
suffix = path_obj.suffix.lower()
format_map = {
".parquet": "parquet",
".pq": "parquet",
".csv": "csv",
".txt": "csv",
".json": "json",
".jsonl": "json",
".xlsx": "excel",
".xls": "excel",
}
return format_map.get(suffix, None)
def read_file(
file_path: Union[Str, Path],
file_format: Str = None,
sheet_name: Str = None,
excel_range: Str = None,
stop_at_empty: Bool = None,
all_varchar: Bool = None,
csv_delimiter: Str = None,
csv_header: Bool = None,
json_lines: Bool = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Read File"
options = options or {}
verbose = options.get("verbose", True)
file_format = _coalesce(file_format, options.get("file_format", "auto"))
sheet_name = _coalesce(sheet_name, options.get("sheet_name", ""))
excel_range = _coalesce(excel_range, options.get("excel_range", ""))
stop_at_empty = _coalesce(stop_at_empty, options.get("stop_at_empty", True))
all_varchar = _coalesce(all_varchar, options.get("all_varchar", False))
csv_delimiter = _coalesce(csv_delimiter, options.get("csv_delimiter", ","))
csv_header = _coalesce(csv_header, options.get("csv_header", True))
json_lines = _coalesce(json_lines, options.get("json_lines", False))
output_format = options.get("output_format", "pandas")
data = None
conn = None
try:
path_obj = Path(file_path)
verbose and logger.info(
f"[{brick_display_name}] Reading file from path: '{path_obj}'."
)
if not path_obj.exists():
verbose and logger.error(
f"[{brick_display_name}] File does not exist: '{path_obj}'."
)
raise FileNotFoundError(f"File does not exist: '{path_obj}'.")
if not path_obj.is_file():
verbose and logger.error(
f"[{brick_display_name}] Path is not a file: '{path_obj}'."
)
raise ValueError(f"Path is not a file: '{path_obj}'.")
detected_format = file_format
if file_format == "auto":
detected_format = _detect_file_format(path_obj)
if detected_format is None:
verbose and logger.error(
f"[{brick_display_name}] Could not auto-detect file format for: '{path_obj}'."
)
raise ValueError(
f"Could not auto-detect file format. Please specify format explicitly."
)
verbose and logger.info(
f"[{brick_display_name}] Auto-detected file format: '{detected_format}'."
)
else:
verbose and logger.info(
f"[{brick_display_name}] Using specified file format: '{detected_format}'."
)
valid_formats = ["parquet", "csv", "json", "excel"]
if detected_format not in valid_formats:
verbose and logger.error(
f"[{brick_display_name}] Unsupported file format: '{detected_format}'."
)
raise ValueError(
f"Unsupported file format: '{detected_format}'. Supported formats: {valid_formats}."
)
conn = duckdb.connect(":memory:")
if detected_format == "parquet":
verbose and logger.info(f"[{brick_display_name}] Reading Parquet file.")
query = f"SELECT * FROM read_parquet('{str(path_obj)}')"
elif detected_format == "csv":
verbose and logger.info(
f"[{brick_display_name}] Reading CSV file with delimiter '{csv_delimiter}' and header={csv_header}."
)
header_opt = "true" if csv_header else "false"
query = f"SELECT * FROM read_csv('{str(path_obj)}', delim='{csv_delimiter}', header={header_opt}, auto_detect=true)"
elif detected_format == "json":
if json_lines:
verbose and logger.info(
f"[{brick_display_name}] Reading JSON Lines file."
)
query = f"SELECT * FROM read_json_auto('{str(path_obj)}', format='newline_delimited')"
else:
verbose and logger.info(f"[{brick_display_name}] Reading JSON file.")
query = f"SELECT * FROM read_json_auto('{str(path_obj)}')"
elif detected_format == "excel":
excel_params = []
if sheet_name:
excel_params.append(f"sheet='{sheet_name}'")
verbose and logger.info(
f"[{brick_display_name}] Reading Excel file with sheet: '{sheet_name}'."
)
else:
verbose and logger.info(
f"[{brick_display_name}] Reading Excel file (first sheet)."
)
if excel_range:
excel_params.append(f"range='{excel_range}'")
verbose and logger.info(
f"[{brick_display_name}] Using Excel range: '{excel_range}'."
)
if excel_range or not stop_at_empty:
stop_at_empty_str = "true" if stop_at_empty else "false"
excel_params.append(f"stop_at_empty={stop_at_empty_str}")
verbose and logger.info(
f"[{brick_display_name}] Stop at empty row: {stop_at_empty}."
)
if all_varchar:
excel_params.append("all_varchar=true")
verbose and logger.info(
f"[{brick_display_name}] Reading all columns as text (VARCHAR) to handle mixed types."
)
if excel_params:
params_str = ", " + ", ".join(excel_params)
query = f"SELECT * FROM read_xlsx('{str(path_obj)}'{params_str})"
else:
query = f"SELECT * FROM read_xlsx('{str(path_obj)}')"
verbose and logger.info(f"[{brick_display_name}] Executing query to load data.")
if output_format == "pandas":
data = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted data to pandas DataFrame with {len(data)} rows and {len(data.columns)} columns."
)
elif output_format == "polars":
data = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted data to Polars DataFrame with {len(data)} rows and {len(data.columns)} columns."
)
elif output_format == "arrow":
data = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted data to Arrow Table with {data.num_rows} rows and {data.num_columns} columns."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: '{output_format}'."
)
raise ValueError(f"Unsupported output format: '{output_format}'.")
verbose and logger.info(
f"[{brick_display_name}] File read successfully from '{path_obj}'."
)
except Exception as e:
verbose and logger.error(f"[{brick_display_name}] Error reading file: {str(e)}")
raise
finally:
if conn is not None:
conn.close()
return data
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow