Data to File
Write data to files supporting multiple standard formats (parquet, json, csv, excel).
Data to File
Processing
Write structured data (Pandas DataFrame, Polars DataFrame, or PyArrow Table) to a specified file path. This brick supports standard data formats including parquet, CSV, JSON, and Excel, utilizing underlying libraries (Pandas, Polars, or PyArrow) based on the input data type for efficient writing. It handles automatic format detection based on the file extension and manages directory creation and overwrite settings.
Inputs
- data
- The input structured data (Pandas DataFrame, Polars DataFrame, or PyArrow Table) to be written to the file system.
- file path
- The destination path (including filename and extension) where the data will be saved.
- file format (optional)
- Defines the output file format (e.g., 'parquet', 'csv', 'json', 'excel'). Defaults to 'auto', which detects the format from the file extension.
- sheet name (optional)
- Specifies the worksheet name if writing to an Excel file.
- csv delimiter (optional)
- Specifies the column separator character if writing to a CSV file.
- csv header (optional)
- Determines whether the header row should be included when writing CSV.
- json lines (optional)
- If writing JSON, set to True to output the data in JSON Lines format (one record per line).
- overwrite (optional)
- If True, allows the brick to overwrite the destination file if it already exists.
- compression (optional)
- Specifies the compression algorithm to use for supported file formats (e.g., 'gzip', 'snappy', 'zstd').
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
file path |
Str, Path |
file format |
Str |
sheet name |
Str |
csv delimiter |
Str |
csv header |
Bool |
json lines |
Bool |
overwrite |
Bool |
compression |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- success
- Returns True if the file was written successfully; returns False or raises an exception otherwise.
Outputs Types
| Output | Types |
|---|---|
success |
Bool |
You can check the list of supported types here: Available Type Hints.
Options
The Data to File brick contains some changeable options:
- File Format
- Defines the output file format. Choices include 'auto', 'parquet', 'csv', 'json', or 'excel'. 'auto' attempts to infer the format from the file extension (default: auto).
- Excel Sheet Name
- The name of the worksheet to use when writing to an Excel file (default: Sheet1).
- CSV Delimiter
- The character used to separate fields when writing CSV (default: ,).
- CSV Include Header
- Toggle whether the header/column names should be written as the first row in CSV output (default: True).
- JSON Lines Format
- Toggle to write JSON output using the JSON Lines format (one JSON object per line) instead of a standard JSON array (default: False).
- Overwrite Existing File
- Toggle whether to overwrite the destination file if it already exists (default: True).
- Compression
- Selects the compression scheme to apply to the output file (especially for Parquet). Options include 'none', 'gzip', 'snappy', or 'zstd' (default: none).
- Verbose
- If enabled, detailed operational logs and information messages will be printed during execution (default: True).
import logging
import pandas as pd
import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv as pa_csv
from coded_flows.types import Union, Path, Str, DataFrame, ArrowTable, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _detect_file_format(file_path):
"""
Detect file format based on file extension.
"""
path_obj = Path(file_path)
suffix = path_obj.suffix.lower()
format_map = {
".parquet": "parquet",
".pq": "parquet",
".csv": "csv",
".txt": "csv",
".json": "json",
".jsonl": "json",
".xlsx": "excel",
".xls": "excel",
}
return format_map.get(suffix, None)
def _write_pandas(
df,
path_obj,
detected_format,
sheet_name,
csv_delimiter,
csv_header,
json_lines,
compression,
verbose,
brick_display_name,
):
"""Write pandas DataFrame to file."""
comp = None if compression == "none" else compression
if detected_format == "parquet":
verbose and logger.info(
f"[{brick_display_name}] Writing Parquet file using pandas with compression: '{compression}'."
)
df.to_parquet(str(path_obj), compression=comp, index=False, engine="pyarrow")
elif detected_format == "csv":
verbose and logger.info(
f"[{brick_display_name}] Writing CSV file using pandas with delimiter '{csv_delimiter}' and header={csv_header}."
)
df.to_csv(str(path_obj), sep=csv_delimiter, header=csv_header, index=False)
elif detected_format == "json":
verbose and logger.info(
f"[{brick_display_name}] Writing JSON file using pandas (lines={json_lines})."
)
df.to_json(str(path_obj), orient="records", lines=json_lines, force_ascii=False)
elif detected_format == "excel":
verbose and logger.info(
f"[{brick_display_name}] Writing Excel file using pandas with sheet name: '{sheet_name}'."
)
with pd.ExcelWriter(str(path_obj), engine="openpyxl") as writer:
df.to_excel(writer, sheet_name=sheet_name, index=False)
def _write_polars(
df,
path_obj,
detected_format,
sheet_name,
csv_delimiter,
csv_header,
json_lines,
compression,
verbose,
brick_display_name,
):
"""Write polars DataFrame to file."""
if detected_format == "parquet":
verbose and logger.info(
f"[{brick_display_name}] Writing Parquet file using polars with compression: '{compression}'."
)
comp = compression if compression != "none" else "uncompressed"
df.write_parquet(str(path_obj), compression=comp)
elif detected_format == "csv":
verbose and logger.info(
f"[{brick_display_name}] Writing CSV file using polars with delimiter '{csv_delimiter}' and header={csv_header}."
)
df.write_csv(str(path_obj), separator=csv_delimiter, include_header=csv_header)
elif detected_format == "json":
verbose and logger.info(
f"[{brick_display_name}] Writing JSON file using polars (lines={json_lines})."
)
if json_lines:
df.write_ndjson(str(path_obj))
else:
df.write_json(str(path_obj))
elif detected_format == "excel":
verbose and logger.info(
f"[{brick_display_name}] Writing Excel file using polars with sheet name: '{sheet_name}'."
)
df.write_excel(str(path_obj), worksheet=sheet_name)
def _write_arrow(
table,
path_obj,
detected_format,
sheet_name,
csv_delimiter,
csv_header,
json_lines,
compression,
verbose,
brick_display_name,
):
"""Write PyArrow Table to file."""
if detected_format == "parquet":
verbose and logger.info(
f"[{brick_display_name}] Writing Parquet file using pyarrow with compression: '{compression}'."
)
comp = None if compression == "none" else compression.upper()
pq.write_table(table, str(path_obj), compression=comp)
elif detected_format == "csv":
verbose and logger.info(
f"[{brick_display_name}] Writing CSV file using pyarrow with delimiter '{csv_delimiter}' and header={csv_header}."
)
write_options = pa_csv.WriteOptions(
include_header=csv_header, delimiter=csv_delimiter
)
pa_csv.write_csv(table, str(path_obj), write_options=write_options)
elif detected_format == "json":
verbose and logger.info(
f"[{brick_display_name}] Writing JSON file using pyarrow (via pandas conversion, lines={json_lines})."
)
df = table.to_pandas()
df.to_json(str(path_obj), orient="records", lines=json_lines, force_ascii=False)
elif detected_format == "excel":
verbose and logger.info(
f"[{brick_display_name}] Writing Excel file using pyarrow (via pandas conversion) with sheet name: '{sheet_name}'."
)
df = table.to_pandas()
with pd.ExcelWriter(str(path_obj), engine="openpyxl") as writer:
df.to_excel(writer, sheet_name=sheet_name, index=False)
def write_file(
data: Union[DataFrame, ArrowTable],
file_path: Union[Str, Path],
file_format: Str = None,
sheet_name: Str = None,
csv_delimiter: Str = None,
csv_header: Bool = None,
json_lines: Bool = None,
overwrite: Bool = None,
compression: Str = None,
options=None,
) -> Bool:
brick_display_name = "Data to File"
options = options or {}
verbose = options.get("verbose", True)
file_format = _coalesce(file_format, options.get("file_format", "auto"))
sheet_name = _coalesce(sheet_name, options.get("sheet_name", "Sheet1"))
csv_delimiter = _coalesce(csv_delimiter, options.get("csv_delimiter", ","))
csv_header = _coalesce(csv_header, options.get("csv_header", True))
json_lines = _coalesce(json_lines, options.get("json_lines", False))
overwrite = _coalesce(overwrite, options.get("overwrite", True))
compression = _coalesce(compression, options.get("compression", "none"))
success = False
try:
path_obj = Path(file_path)
verbose and logger.info(
f"[{brick_display_name}] Writing data to path: '{path_obj}'."
)
if path_obj.exists() and (not overwrite):
verbose and logger.error(
f"[{brick_display_name}] File already exists and overwrite is disabled: '{path_obj}'."
)
raise FileExistsError(
f"File already exists and overwrite is disabled: '{path_obj}'."
)
path_obj.parent.mkdir(parents=True, exist_ok=True)
verbose and logger.info(
f"[{brick_display_name}] Ensured parent directory exists: '{path_obj.parent}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
row_count = len(data)
column_count = len(data.columns)
elif isinstance(data, pl.DataFrame):
data_type = "polars"
row_count = len(data)
column_count = len(data.columns)
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
row_count = len(data)
column_count = len(data.column_names)
else:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type} with {row_count} rows and {column_count} columns."
)
detected_format = file_format
if file_format == "auto":
detected_format = _detect_file_format(path_obj)
if detected_format is None:
verbose and logger.error(
f"[{brick_display_name}] Could not auto-detect file format for: '{path_obj}'."
)
raise ValueError(
f"Could not auto-detect file format. Please specify format explicitly."
)
verbose and logger.info(
f"[{brick_display_name}] Auto-detected file format: '{detected_format}'."
)
else:
verbose and logger.info(
f"[{brick_display_name}] Using specified file format: '{detected_format}'."
)
valid_formats = ["parquet", "csv", "json", "excel"]
if detected_format not in valid_formats:
verbose and logger.error(
f"[{brick_display_name}] Unsupported file format: '{detected_format}'."
)
raise ValueError(
f"Unsupported file format: '{detected_format}'. Supported formats: {valid_formats}."
)
if data_type == "pandas":
_write_pandas(
data,
path_obj,
detected_format,
sheet_name,
csv_delimiter,
csv_header,
json_lines,
compression,
verbose,
brick_display_name,
)
elif data_type == "polars":
_write_polars(
data,
path_obj,
detected_format,
sheet_name,
csv_delimiter,
csv_header,
json_lines,
compression,
verbose,
brick_display_name,
)
elif data_type == "arrow":
_write_arrow(
data,
path_obj,
detected_format,
sheet_name,
csv_delimiter,
csv_header,
json_lines,
compression,
verbose,
brick_display_name,
)
verbose and logger.info(
f"[{brick_display_name}] File written successfully to '{path_obj}'."
)
success = True
except Exception as e:
verbose and logger.error(f"[{brick_display_name}] Error writing file.")
raise
return success
Brick Info
- pandas
- openpyxl
- pyarrow
- polars[pyarrow]
- xlsxwriter