137 lines
5.0 KiB
Python
137 lines
5.0 KiB
Python
import hashlib
|
|
import re
|
|
from typing import List, Dict, Optional
|
|
from pathlib import Path
|
|
|
|
class StructuredChunker:
|
|
def __init__(self, chunk_size: int = 1000, overlap: int = 200):
|
|
self.chunk_size = chunk_size
|
|
self.overlap = overlap
|
|
|
|
def chunk_text(self, text: str, metadata: Optional[Dict] = None) -> List[Dict]:
|
|
if not text or len(text) < self.chunk_size:
|
|
return [self._create_chunk(text, 0, metadata)]
|
|
|
|
paragraphs = text.split('\n\n')
|
|
chunks = []
|
|
current_chunk = []
|
|
current_size = 0
|
|
chunk_idx = 0
|
|
|
|
for para_idx, para in enumerate(paragraphs):
|
|
para_size = len(para)
|
|
|
|
if current_size + para_size > self.chunk_size and current_chunk:
|
|
chunk_text = '\n\n'.join(current_chunk)
|
|
chunks.append(self._create_chunk(chunk_text, chunk_idx, metadata, para_idx=para_idx))
|
|
chunk_idx += 1
|
|
|
|
overlap_text = self._get_overlap_text(current_chunk)
|
|
current_chunk = [overlap_text] if overlap_text else []
|
|
current_size = len(overlap_text) if overlap_text else 0
|
|
|
|
current_chunk.append(para)
|
|
current_size += para_size
|
|
|
|
if current_chunk:
|
|
chunk_text = '\n\n'.join(current_chunk)
|
|
chunks.append(self._create_chunk(chunk_text, chunk_idx, metadata))
|
|
|
|
return chunks
|
|
|
|
def chunk_code(self, text: str, metadata: Optional[Dict] = None) -> List[Dict]:
|
|
chunks = []
|
|
functions = self._extract_functions(text)
|
|
|
|
if functions:
|
|
for idx, func in enumerate(functions):
|
|
chunks.append(self._create_chunk(
|
|
func['code'],
|
|
idx,
|
|
{**(metadata or {}), 'type': 'function', 'name': func['name']},
|
|
section=func['name']
|
|
))
|
|
else:
|
|
return self.chunk_text(text, {**(metadata or {}), 'type': 'code'})
|
|
|
|
return chunks
|
|
|
|
def chunk_markdown(self, text: str, metadata: Optional[Dict] = None) -> List[Dict]:
|
|
sections = self._extract_md_sections(text)
|
|
|
|
if sections:
|
|
chunks = []
|
|
for idx, section in enumerate(sections):
|
|
chunks.append(self._create_chunk(
|
|
section['content'],
|
|
idx,
|
|
{**(metadata or {}), 'type': 'section', 'heading': section['heading']},
|
|
section=section['heading']
|
|
))
|
|
return chunks
|
|
else:
|
|
return self.chunk_text(text, {**(metadata or {}), 'type': 'markdown'})
|
|
|
|
def _create_chunk(self, text: str, index: int, metadata: Optional[Dict], para_idx: Optional[int] = None, section: Optional[str] = None) -> Dict:
|
|
chunk_id = hashlib.sha256(f'{text}{index}'.encode()).hexdigest()[:16]
|
|
|
|
return {
|
|
'chunk_id': chunk_id,
|
|
'index': index,
|
|
'text': text,
|
|
'size': len(text),
|
|
'offset_start': para_idx if para_idx is not None else index * (self.chunk_size - self.overlap),
|
|
'offset_end': (para_idx if para_idx is not None else index * (self.chunk_size - self.overlap)) + len(text),
|
|
'section': section,
|
|
'metadata': metadata or {}
|
|
}
|
|
|
|
def _get_overlap_text(self, chunks: List[str]) -> str:
|
|
combined = '\n\n'.join(chunks)
|
|
if len(combined) <= self.overlap:
|
|
return combined
|
|
return combined[-self.overlap:]
|
|
|
|
def _extract_functions(self, text: str) -> List[Dict]:
|
|
patterns = [
|
|
r'(def\s+(\w+)\s*\([^)]*\):.*?)(?=\ndef\s+\w+|class\s+\w+|\Z)',
|
|
r'(function\s+(\w+)\s*\([^)]*\)\s*\{.*?\n\})',
|
|
r'(public\s+(?:static\s+)?[\w<>]+\s+(\w+)\s*\([^)]*\)\s*\{.*?\n\})'
|
|
]
|
|
|
|
functions = []
|
|
for pattern in patterns:
|
|
matches = re.finditer(pattern, text, re.DOTALL | re.MULTILINE)
|
|
for match in matches:
|
|
functions.append({
|
|
'name': match.group(2),
|
|
'code': match.group(1)
|
|
})
|
|
|
|
return functions
|
|
|
|
def _extract_md_sections(self, text: str) -> List[Dict]:
|
|
lines = text.split('\n')
|
|
sections = []
|
|
current_section = {'heading': 'Introduction', 'content': []}
|
|
|
|
for line in lines:
|
|
if line.startswith('#'):
|
|
if current_section['content']:
|
|
sections.append({
|
|
'heading': current_section['heading'],
|
|
'content': '\n'.join(current_section['content'])
|
|
})
|
|
heading = re.sub(r'^#+\s*', '', line)
|
|
current_section = {'heading': heading, 'content': []}
|
|
else:
|
|
current_section['content'].append(line)
|
|
|
|
if current_section['content']:
|
|
sections.append({
|
|
'heading': current_section['heading'],
|
|
'content': '\n'.join(current_section['content'])
|
|
})
|
|
|
|
return sections if len(sections) > 1 else []
|