# -*- coding: utf-8 -*-
# pylint: disable=protected-access
"""The Excel reader to read and chunk Excel files."""
import base64
import hashlib
import json
from typing import Any, Literal
from ._reader_base import ReaderBase
from ._text_reader import TextReader
from ._utils import _get_media_type_from_data
from .._document import Document, DocMetadata
from ...message import ImageBlock, Base64Source, TextBlock
from ..._logging import logger
def _get_excel_column_name(col_index: int) -> str:
"""Convert a 0-based column index to Excel column name (A, B, ..., Z, AA,
AB, ...).
Args:
col_index (`int`):
The 0-based column index.
Returns:
`str`:
The Excel column name (e.g., 'A' for 0, 'B' for 1, 'AA' for 26).
"""
result = ""
col_index += 1 # Convert to 1-based
while col_index > 0:
col_index -= 1
result = chr(ord("A") + col_index % 26) + result
col_index //= 26
return result
def _extract_table_data(df: Any) -> list[list[str]]:
"""Extract table data from a DataFrame, handling NaN values.
Args:
df (`Any`):
The pandas DataFrame object.
Returns:
`list[list[str]]`:
Table data represented as a 2D list, where each inner list
represents a row, and each string in the row represents a cell.
"""
import pandas as pd
table_data = []
for _, row in df.iterrows():
row_data = []
for cell_val in row:
# Convert NaN to empty string, preserve line breaks
if pd.isna(cell_val):
cell_text = ""
else:
cell_text = str(cell_val).strip()
# Normalize line breaks
cell_text = cell_text.replace("\r\n", "\n").replace("\r", "\n")
row_data.append(cell_text)
table_data.append(row_data)
return table_data
def _extract_images_from_worksheet(
worksheet: Any,
) -> list[tuple[int, ImageBlock]]:
"""Extract images from a worksheet with their row positions.
Args:
worksheet (`Any`):
The openpyxl worksheet object.
Returns:
`list[tuple[int, ImageBlock]]`:
A list of tuples containing (row_index, ImageBlock), where
row_index is 0-based. Empty if no images found.
"""
images = []
if not (hasattr(worksheet, "_images") and worksheet._images):
return images
for img in worksheet._images:
try:
# Get image row position (0-based)
row_index = 0
if hasattr(img, "anchor") and hasattr(img.anchor, "_from"):
row_index = img.anchor._from.row
# Get image data
img_data = img._data()
# Determine media type
media_type = _get_media_type_from_data(img_data)
# Convert to base64
base64_data = base64.b64encode(img_data).decode("utf-8")
image_block = ImageBlock(
type="image",
source=Base64Source(
type="base64",
media_type=media_type,
data=base64_data,
),
)
images.append((row_index, image_block))
except Exception as e:
logger.warning("Failed to extract image from worksheet: %s", e)
return images
[docs]
class ExcelReader(ReaderBase):
"""The Excel reader that supports reading text, image, and table
content from Excel files (.xlsx, .xls files), and chunking the text
content into smaller pieces.
.. note:: The table content can be extracted in Markdown or JSON format.
**Markdown format example** (``include_cell_coordinates=False``):
.. code-block:: text
| Name | Age | City |
|-------|-----|----------|
| Alice | 25 | New York |
| Bob | 30 | London |
**Markdown format example** (``include_cell_coordinates=True``):
.. code-block:: text
| [A1] Name | [B1] Age | [C1] City |
|------------|----------|---------------|
| [A2] Alice | [B2] 25 | [C2] New York |
| [A3] Bob | [B3] 30 | [C3] London |
**JSON format example** (``include_cell_coordinates=False``):
.. code-block:: json
["Name", "Age", "City"]
["Alice", "25", "New York"]
["Bob", "30", "London"]
**JSON format example** (``include_cell_coordinates=True``):
.. code-block:: json
{"A1": "Name", "B1": "Age", "C1": "City"}
{"A2": "Alice", "B2": "25", "C2": "New York"}
{"A3": "Bob", "B3": "30", "C3": "London"}
"""
[docs]
def __init__(
self,
chunk_size: int = 512,
split_by: Literal["char", "sentence", "paragraph"] = "sentence",
include_sheet_names: bool = True,
include_cell_coordinates: bool = False,
include_image: bool = False,
separate_sheet: bool = False,
separate_table: bool = False,
table_format: Literal["markdown", "json"] = "markdown",
) -> None:
"""Initialize the Excel reader.
Args:
chunk_size (`int`, default to 512):
The size of each chunk, in number of characters.
split_by (`Literal["char", "sentence", "paragraph"]`, default to \
"sentence"):
The unit to split the text, can be "char", "sentence", or
"paragraph". The "sentence" option is implemented using the
"nltk" library, which only supports English text.
include_sheet_names (`bool`, default to True):
Whether to include sheet names in the extracted text.
include_cell_coordinates (`bool`, default to False):
Whether to include cell coordinates (e.g., A1, B2) in the
extracted text.
include_image (`bool`, default to False):
Whether to include image content in the document. If True,
images will be extracted and included as base64-encoded images.
separate_sheet (`bool`, default to False):
Whether to treat each sheet as a separate document. If True,
each sheet will be extracted as a separate Document object
instead of being merged together.
separate_table (`bool`, default to False):
If True, tables will be treated as a new chunk to avoid
truncation. But note when the table exceeds the chunk size,
it will still be truncated.
table_format (`Literal["markdown", "json"]`, \
default to "markdown"):
The format to extract table content. Note if the table cell
contains `\n`, the Markdown format may not render correctly.
In that case, you can use the `json` format, which extracts
the table as a JSON string of a `list[list[str]]` object.
"""
self._validate_init_params(chunk_size, split_by)
if table_format not in ["markdown", "json"]:
raise ValueError(
"The table_format must be one of 'markdown' or 'json', "
f"got {table_format}",
)
self.chunk_size = chunk_size
self.split_by = split_by
self.include_sheet_names = include_sheet_names
self.include_cell_coordinates = include_cell_coordinates
self.include_image = include_image
self.separate_sheet = separate_sheet
self.separate_table = separate_table
self.table_format = table_format
# Use TextReader to do the chunking
self._text_reader = TextReader(self.chunk_size, self.split_by)
def _validate_init_params(self, chunk_size: int, split_by: str) -> None:
"""Validate initialization parameters.
Args:
chunk_size (`int`):
The chunk size to validate.
split_by (`str`):
The split mode to validate.
"""
if chunk_size <= 0:
raise ValueError(
f"The chunk_size must be positive, got {chunk_size}",
)
if split_by not in ["char", "sentence", "paragraph"]:
raise ValueError(
"The split_by must be one of 'char', 'sentence' or "
f"'paragraph', got {split_by}",
)
[docs]
async def __call__(
self,
excel_path: str,
) -> list[Document]:
"""Read an Excel file, split it into chunks, and return a list of
Document objects. The text, image, and table content will be returned
in the same order as they appear in the Excel file.
Args:
excel_path (`str`):
The input Excel file path (.xlsx or .xls file).
Returns:
`list[Document]`:
A list of Document objects, where the metadata contains the
chunked text, doc id and chunk id.
"""
# Generate document ID
doc_id = self.get_doc_id(excel_path)
# Load Excel file and workbook
excel_file = None
workbook = None
try:
import pandas as pd
except ImportError as e:
raise ImportError(
"Please install pandas to use the Excel reader. "
"You can install it by `pip install pandas`.",
) from e
try:
excel_file = pd.ExcelFile(excel_path)
# Load workbook if images are needed
if self.include_image:
try:
from openpyxl import load_workbook
workbook = load_workbook(excel_path)
except ImportError:
logger.warning(
"openpyxl not available, image extraction disabled",
)
workbook = None
# Process sheets
if self.separate_sheet:
return await self._process_sheets_separately(
excel_file,
doc_id,
workbook,
)
else:
return await self._process_sheets_merged(
excel_file,
doc_id,
workbook,
)
except (
pd.errors.EmptyDataError,
pd.errors.ParserError,
FileNotFoundError,
PermissionError,
) as e:
raise ValueError(
f"Failed to read Excel file {excel_path}: {e}",
) from e
finally:
# Ensure all resources are closed
if workbook is not None:
workbook.close()
if excel_file is not None:
excel_file.close()
async def _process_sheets_merged(
self,
excel_file: Any,
doc_id: str,
workbook: Any = None,
) -> list[Document]:
"""Process all sheets as a merged document, maintaining order of
text, table, and image content.
Args:
excel_file (`Any`):
The pandas ExcelFile object.
doc_id (`str`):
The document ID.
workbook (`Any`, optional):
The openpyxl workbook if available.
Returns:
`list[Document]`:
A list of Document objects from all sheets merged together,
maintaining content order.
"""
# Get all blocks from all sheets in order
all_blocks = []
for sheet_name in excel_file.sheet_names:
sheet_blocks = self._get_sheet_blocks(
excel_file,
sheet_name,
workbook,
)
all_blocks.extend(sheet_blocks)
# Convert blocks to documents
return await self._blocks_to_documents(all_blocks, doc_id)
async def _process_sheets_separately(
self,
excel_file: Any,
doc_id: str,
workbook: Any = None,
) -> list[Document]:
"""Process each sheet as separate documents.
Args:
excel_file (`Any`):
The pandas ExcelFile object.
doc_id (`str`):
The document ID.
workbook (`Any`, optional):
The openpyxl workbook if available.
Returns:
`list[Document]`:
A list of Document objects with each sheet processed
separately.
"""
all_docs = []
for sheet_name in excel_file.sheet_names:
sheet_blocks = self._get_sheet_blocks(
excel_file,
sheet_name,
workbook,
)
sheet_docs = await self._blocks_to_documents(sheet_blocks, doc_id)
all_docs.extend(sheet_docs)
return all_docs
def _get_sheet_blocks(
self,
excel_file: Any,
sheet_name: str,
workbook: Any = None,
) -> list[TextBlock | ImageBlock]:
"""Extract all data blocks from a sheet in order (text, table, image).
Args:
excel_file (`Any`):
The pandas ExcelFile object.
sheet_name (`str`):
The name of the sheet.
workbook (`Any`, optional):
The openpyxl workbook if available.
Returns:
`list[TextBlock | ImageBlock]`:
A list of data blocks extracted from the sheet, maintaining
the order they appear in the sheet based on row positions.
"""
blocks: list[TextBlock | ImageBlock] = []
positioned_blocks: list[tuple[int, TextBlock | ImageBlock, str]] = []
# Add sheet header
sheet_header = (
f"Sheet: {sheet_name}" if self.include_sheet_names else None
)
try:
df = excel_file.parse(sheet_name=sheet_name)
if df.empty:
return blocks
# Extract images with their row positions if enabled
images_with_positions: list[tuple[int, ImageBlock]] = []
if self.include_image and workbook is not None:
try:
worksheet = workbook[sheet_name]
images_with_positions = _extract_images_from_worksheet(
worksheet,
)
except Exception as e:
logger.warning(
"Failed to extract images from sheet '%s': %s",
sheet_name,
e,
)
# Extract table data
table_data = _extract_table_data(df)
if self.table_format == "markdown":
table_text = self._table_to_markdown(table_data, sheet_header)
else:
table_text = self._table_to_json(table_data, sheet_header)
# Calculate table row position for sorting
# Row 0 is the header row in pandas (if header exists)
# Table data spans from row 0 to row len(df)
# In Excel, this is typically row 1 to row (len(df) + 1) in
# 1-based indexing
# In 0-based indexing used by openpyxl: row 0 to row len(df)
table_start_row = 0
# Create table block
table_block = TextBlock(
type="text",
text=table_text,
)
# Add table block with its position for sorting
positioned_blocks.append((table_start_row, table_block, "table"))
# Add image blocks with their positions
for row_index, image_block in images_with_positions:
positioned_blocks.append((row_index, image_block, "image"))
# Sort blocks by row position
positioned_blocks.sort(key=lambda x: x[0])
# Extract blocks in sorted order and merge consecutive blocks
# if needed
last_type = None
for row_index, block, block_type in positioned_blocks:
if block_type == "table":
# Handle table block merging based on separate_table
# Logic matches WordReader: merge if not separate_table and
# last_type is "text" or "table"
if not self.separate_table and last_type in [
"text",
"table",
]:
blocks[-1]["text"] += "\n" + block["text"]
else:
blocks.append(block)
last_type = "table"
elif block_type == "image":
blocks.append(block)
last_type = "image"
except Exception as e:
logger.warning("Failed to process sheet '%s': %s", sheet_name, e)
return blocks
async def _blocks_to_documents(
self,
blocks: list[TextBlock | ImageBlock],
doc_id: str,
) -> list[Document]:
"""Convert data blocks to Document objects.
Args:
blocks (`list[TextBlock | ImageBlock]`):
A list of data blocks.
doc_id (`str`):
The document ID.
Returns:
`list[Document]`:
A list of Document objects.
"""
documents = []
for block in blocks:
if block["type"] == "text":
# Process text blocks through TextReader for chunking
text_docs = await self._text_reader(block["text"])
for doc in text_docs:
# Update doc_id but keep other metadata
doc.metadata.doc_id = doc_id
doc.id = doc_id
documents.append(doc)
elif block["type"] == "image":
# Images are independent documents
documents.append(
Document(
metadata=DocMetadata(
content=block,
doc_id=doc_id,
chunk_id=0, # Will be set later
total_chunks=1,
),
),
)
# Set chunk ids and total chunks
total_chunks = len(documents)
for idx, doc in enumerate(documents):
doc.metadata.chunk_id = idx
doc.metadata.total_chunks = total_chunks
return documents
def _table_to_markdown(
self,
table_data: list[list[str]],
sheet_header: str | None = None,
) -> str:
"""Convert table data to Markdown format.
Args:
table_data (`list[list[str]]`):
Table data represented as a 2D list.
sheet_header (`str | None`, optional):
Optional sheet header to prepend.
Returns:
`str`:
Table in Markdown format.
"""
if not table_data:
return sheet_header or ""
md_table = ""
# Add sheet header if provided
if sheet_header:
md_table += sheet_header + "\n"
# If no rows, return header only
if not table_data or not table_data[0]:
return md_table.strip() or ""
num_cols = len(table_data[0])
# Escape pipe characters in cells to avoid breaking Markdown table
# structure
def escape_pipes(cell_text: str) -> str:
"""Escape pipe characters in cell content."""
return cell_text.replace("|", "\\|")
def format_cell(cell: str, row_idx: int, col_idx: int) -> str:
"""Format cell content with optional coordinates."""
escaped = escape_pipes(cell)
if self.include_cell_coordinates:
coord = f"{_get_excel_column_name(col_idx)}{row_idx + 1}"
return f"[{coord}] {escaped}"
return escaped
# Header row (first row)
escaped_header = [
format_cell(cell, 0, col_idx)
for col_idx, cell in enumerate(table_data[0])
]
header_row = "| " + " | ".join(escaped_header) + " |\n"
md_table += header_row
# Separator row
separator_row = "| " + " | ".join(["---"] * num_cols) + " |\n"
md_table += separator_row
# Data rows
for row_idx, row in enumerate(table_data[1:], start=1):
# Ensure row has same number of columns as header
while len(row) < num_cols:
row.append("")
# Format each cell with optional coordinates
formatted_row = [
format_cell(cell, row_idx, col_idx)
for col_idx, cell in enumerate(row[:num_cols])
]
data_row = "| " + " | ".join(formatted_row) + " |\n"
md_table += data_row
return md_table
def _table_to_json(
self,
table_data: list[list[str]],
sheet_header: str | None = None,
) -> str:
"""Convert table data to JSON string.
Args:
table_data (`list[list[str]]`):
Table data represented as a 2D list.
sheet_header (`str | None`, optional):
Optional sheet header to prepend.
Returns:
`str`:
Table in JSON string format.
"""
json_strs = []
# Add sheet header if provided
if sheet_header:
json_strs.append(sheet_header)
# Add system info marker
json_strs.append(
"<system-info>A table loaded as a JSON array:</system-info>",
)
for row_idx, row in enumerate(table_data):
if self.include_cell_coordinates:
# Include cell coordinates in the format {"A1": "value", ...}
row_dict = {
f"{_get_excel_column_name(col_idx)}{row_idx + 1}": cell
for col_idx, cell in enumerate(row)
}
json_strs.append(json.dumps(row_dict, ensure_ascii=False))
else:
json_strs.append(json.dumps(row, ensure_ascii=False))
return "\n".join(json_strs)
[docs]
def get_doc_id(self, excel_path: str) -> str:
"""Generate unique document ID from file path.
Args:
excel_path (`str`):
The path to the Excel file.
Returns:
`str`:
The document ID (SHA256 hash of the file path).
"""
return hashlib.sha256(excel_path.encode("utf-8")).hexdigest()