89 lines
2.8 KiB
Python
89 lines
2.8 KiB
Python
import tiktoken
|
|
from . import defines
|
|
from typing import List, Dict, Any, Union
|
|
|
|
def get_encoding(model=defines.model):
|
|
"""Get the tokenizer for counting tokens."""
|
|
try:
|
|
return tiktoken.get_encoding("cl100k_base") # Default encoding used by many embedding models
|
|
except:
|
|
return tiktoken.encoding_for_model(model)
|
|
|
|
def count_tokens(text: str) -> int:
|
|
"""Count the number of tokens in a text string."""
|
|
encoding = get_encoding()
|
|
return len(encoding.encode(text))
|
|
|
|
def chunk_text(text: str, max_tokens: int = 512, overlap: int = 50) -> List[str]:
|
|
"""
|
|
Split a text into chunks based on token count with overlap between chunks.
|
|
|
|
Args:
|
|
text: The text to split into chunks
|
|
max_tokens: Maximum number of tokens per chunk
|
|
overlap: Number of tokens to overlap between chunks
|
|
|
|
Returns:
|
|
List of text chunks
|
|
"""
|
|
if not text or max_tokens <= 0:
|
|
return []
|
|
|
|
encoding = get_encoding()
|
|
tokens = encoding.encode(text)
|
|
chunks = []
|
|
|
|
i = 0
|
|
while i < len(tokens):
|
|
# Get the current chunk of tokens
|
|
chunk_end = min(i + max_tokens, len(tokens))
|
|
chunk_tokens = tokens[i:chunk_end]
|
|
chunks.append(encoding.decode(chunk_tokens))
|
|
|
|
# Move to the next position with overlap
|
|
if chunk_end == len(tokens):
|
|
break
|
|
i += max_tokens - overlap
|
|
|
|
return chunks
|
|
|
|
def chunk_document(document: Dict[str, Any],
|
|
text_key: str = "text",
|
|
max_tokens: int = 512,
|
|
overlap: int = 50) -> List[Dict[str, Any]]:
|
|
"""
|
|
Chunk a document dictionary into multiple chunks.
|
|
|
|
Args:
|
|
document: Document dictionary with metadata and text
|
|
text_key: The key in the document that contains the text to chunk
|
|
max_tokens: Maximum number of tokens per chunk
|
|
overlap: Number of tokens to overlap between chunks
|
|
|
|
Returns:
|
|
List of document dictionaries, each with chunked text and preserved metadata
|
|
"""
|
|
if text_key not in document:
|
|
raise Exception(f"{text_key} not in document")
|
|
|
|
# Extract text and create chunks
|
|
if "title" in document:
|
|
text = f"{document["title"]}: {document[text_key]}"
|
|
else:
|
|
text = document[text_key]
|
|
chunks = chunk_text(text, max_tokens, overlap)
|
|
|
|
# Create document chunks with preserved metadata
|
|
chunked_docs = []
|
|
for i, chunk in enumerate(chunks):
|
|
# Create a new doc with all original fields
|
|
doc_chunk = document.copy()
|
|
# Replace text with the chunk
|
|
doc_chunk[text_key] = chunk
|
|
# Add chunk metadata
|
|
doc_chunk["chunk_id"] = i
|
|
doc_chunk["chunk_total"] = len(chunks)
|
|
chunked_docs.append(doc_chunk)
|
|
|
|
return chunked_docs
|