diff --git a/app/batch_process.py b/app/batch_process.py new file mode 100755 index 0000000..222746f --- /dev/null +++ b/app/batch_process.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +import sys +import argparse +from pathlib import Path +import psycopg2 +import logging +from typing import Dict, List + +sys.path.insert(0, str(Path(__file__).parent)) + +from extraction.incremental import IncrementalExtractor +from parsers.text_parser import TextParser +from parsers.code_parser import CodeParser +from parsers.pdf_parser import PDFParser +from parsers.image_parser import ImageParser +from parsers.audio_parser import AudioParser +from parsers.document_parser import DocumentParser + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +DISK_MOUNT_MAP = { + 'SMT': '/media/mike/SMT', + 'DISK1': '/media/mike/DISK1', + 'LLM': '/media/mike/LLM', + 'WINDOWS': '/media/mike/WINDOWS', + 'Apps': '/media/mike/Apps', + 'Space': '/media/mike/Space', + 'LVM': '/media/mike/LVM' +} + +DB_CONFIG = { + 'host': '192.168.1.159', + 'database': 'disk_reorganizer_db', + 'user': 'disk_reorg_user', + 'password': 'heel-goed-wachtwoord' +} + +def get_files_to_parse(parser_type: str, limit: int, max_size: int = 10 * 1024 * 1024) -> List[Dict]: + conn = psycopg2.connect(**DB_CONFIG) + cursor = conn.cursor() + + ext_map = { + 'text': "'.txt', '.md', '.log', '.json', '.yaml', '.yml', '.xml', '.csv'", + 'code': "'.py', '.js', '.java', '.go', '.rs', '.ts', '.tsx', '.jsx', '.cpp', '.h', '.c', '.php'", + 'pdf': "'.pdf'", + 'image': "'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'", + 'audio': "'.mp3', '.wav', '.flac', '.m4a', '.ogg'", + 'document': "'.docx', '.doc', '.odt', '.rtf'" + } + + extensions = ext_map.get(parser_type, '') + if not extensions: + logger.error(f'Unknown parser type: {parser_type}') + return [] + + query = f''' + SELECT path, disk_label, size, checksum + FROM files + WHERE extracted_text IS NULL + AND size < {max_size} + AND size > 0 + AND ( + {' OR '.join([f"path LIKE '%{ext}'" for ext in extensions.replace("'", "").split(', ')])} + ) + LIMIT {limit} + ''' + + cursor.execute(query) + files = [] + + for row in cursor.fetchall(): + path, disk, size, checksum = row + mount = DISK_MOUNT_MAP.get(disk, '') + if not mount: + continue + + full_path = Path(mount) / path + if full_path.exists(): + files.append({ + 'path': path, + 'full_path': full_path, + 'disk_label': disk, + 'size': size, + 'checksum': checksum + }) + + cursor.close() + conn.close() + + logger.info(f'Found {len(files)} {parser_type} files to parse') + return files + +def batch_parse(parser_type: str, limit: int, max_size: int): + parsers = { + 'text': ('text_parser', TextParser()), + 'code': ('code_parser', CodeParser()), + 'pdf': ('pdf_parser', PDFParser()), + 'image': ('image_parser', ImageParser()), + 'audio': ('audio_parser', AudioParser(whisper_model='base')), + 'document': ('document_parser', DocumentParser()) + } + + if parser_type not in parsers: + logger.error(f'Unknown parser type: {parser_type}') + return + + parser_name, parser = parsers[parser_type] + files = get_files_to_parse(parser_type, limit, max_size) + + if not files: + logger.info(f'No files to parse for {parser_type}') + return + + extractor = IncrementalExtractor(DB_CONFIG) + + logger.info(f'Starting batch parse of {len(files)} files with {parser_name}') + + def parse_func(path): + for f in files: + if str(f['full_path']) == str(path) or f['path'] == str(path): + return parser.parse(f['full_path']) + return parser.parse(path) + + stats = extractor.batch_extract( + files, + parse_func, + parser_name=parser_name, + batch_size=100, + skip_existing=True + ) + + logger.info(f'\n=== BATCH PARSE COMPLETE ===') + logger.info(f'Processed: {stats["processed"]}') + logger.info(f'Extracted: {stats["extracted"]}') + logger.info(f'Skipped: {stats["skipped"]}') + logger.info(f'Errors: {stats["errors"]}') + if stats['extracted'] > 0: + logger.info(f'Avg time: {stats["total_time_ms"]/stats["extracted"]:.1f}ms per file') + +def main(): + parser = argparse.ArgumentParser(description='Batch process files with incremental extraction') + parser.add_argument('parser_type', choices=['text', 'code', 'pdf', 'image', 'audio', 'document'], + help='Type of parser to use') + parser.add_argument('--limit', type=int, default=1000, help='Maximum files to process') + parser.add_argument('--max-size', type=int, default=10*1024*1024, help='Maximum file size in bytes') + + args = parser.parse_args() + batch_parse(args.parser_type, args.limit, args.max_size) + +if __name__ == '__main__': + main() diff --git a/app/chunking/structured_chunker.py b/app/chunking/structured_chunker.py new file mode 100644 index 0000000..c21c086 --- /dev/null +++ b/app/chunking/structured_chunker.py @@ -0,0 +1,136 @@ +import hashlib +import re +from typing import List, Dict, Optional +from pathlib import Path + +class StructuredChunker: + def __init__(self, chunk_size: int = 1000, overlap: int = 200): + self.chunk_size = chunk_size + self.overlap = overlap + + def chunk_text(self, text: str, metadata: Optional[Dict] = None) -> List[Dict]: + if not text or len(text) < self.chunk_size: + return [self._create_chunk(text, 0, metadata)] + + paragraphs = text.split('\n\n') + chunks = [] + current_chunk = [] + current_size = 0 + chunk_idx = 0 + + for para_idx, para in enumerate(paragraphs): + para_size = len(para) + + if current_size + para_size > self.chunk_size and current_chunk: + chunk_text = '\n\n'.join(current_chunk) + chunks.append(self._create_chunk(chunk_text, chunk_idx, metadata, para_idx=para_idx)) + chunk_idx += 1 + + overlap_text = self._get_overlap_text(current_chunk) + current_chunk = [overlap_text] if overlap_text else [] + current_size = len(overlap_text) if overlap_text else 0 + + current_chunk.append(para) + current_size += para_size + + if current_chunk: + chunk_text = '\n\n'.join(current_chunk) + chunks.append(self._create_chunk(chunk_text, chunk_idx, metadata)) + + return chunks + + def chunk_code(self, text: str, metadata: Optional[Dict] = None) -> List[Dict]: + chunks = [] + functions = self._extract_functions(text) + + if functions: + for idx, func in enumerate(functions): + chunks.append(self._create_chunk( + func['code'], + idx, + {**(metadata or {}), 'type': 'function', 'name': func['name']}, + section=func['name'] + )) + else: + return self.chunk_text(text, {**(metadata or {}), 'type': 'code'}) + + return chunks + + def chunk_markdown(self, text: str, metadata: Optional[Dict] = None) -> List[Dict]: + sections = self._extract_md_sections(text) + + if sections: + chunks = [] + for idx, section in enumerate(sections): + chunks.append(self._create_chunk( + section['content'], + idx, + {**(metadata or {}), 'type': 'section', 'heading': section['heading']}, + section=section['heading'] + )) + return chunks + else: + return self.chunk_text(text, {**(metadata or {}), 'type': 'markdown'}) + + def _create_chunk(self, text: str, index: int, metadata: Optional[Dict], para_idx: Optional[int] = None, section: Optional[str] = None) -> Dict: + chunk_id = hashlib.sha256(f'{text}{index}'.encode()).hexdigest()[:16] + + return { + 'chunk_id': chunk_id, + 'index': index, + 'text': text, + 'size': len(text), + 'offset_start': para_idx if para_idx is not None else index * (self.chunk_size - self.overlap), + 'offset_end': (para_idx if para_idx is not None else index * (self.chunk_size - self.overlap)) + len(text), + 'section': section, + 'metadata': metadata or {} + } + + def _get_overlap_text(self, chunks: List[str]) -> str: + combined = '\n\n'.join(chunks) + if len(combined) <= self.overlap: + return combined + return combined[-self.overlap:] + + def _extract_functions(self, text: str) -> List[Dict]: + patterns = [ + r'(def\s+(\w+)\s*\([^)]*\):.*?)(?=\ndef\s+\w+|class\s+\w+|\Z)', + r'(function\s+(\w+)\s*\([^)]*\)\s*\{.*?\n\})', + r'(public\s+(?:static\s+)?[\w<>]+\s+(\w+)\s*\([^)]*\)\s*\{.*?\n\})' + ] + + functions = [] + for pattern in patterns: + matches = re.finditer(pattern, text, re.DOTALL | re.MULTILINE) + for match in matches: + functions.append({ + 'name': match.group(2), + 'code': match.group(1) + }) + + return functions + + def _extract_md_sections(self, text: str) -> List[Dict]: + lines = text.split('\n') + sections = [] + current_section = {'heading': 'Introduction', 'content': []} + + for line in lines: + if line.startswith('#'): + if current_section['content']: + sections.append({ + 'heading': current_section['heading'], + 'content': '\n'.join(current_section['content']) + }) + heading = re.sub(r'^#+\s*', '', line) + current_section = {'heading': heading, 'content': []} + else: + current_section['content'].append(line) + + if current_section['content']: + sections.append({ + 'heading': current_section['heading'], + 'content': '\n'.join(current_section['content']) + }) + + return sections if len(sections) > 1 else [] diff --git a/app/enrichment/llm_client.py b/app/enrichment/llm_client.py index 50b3279..93d91f1 100644 --- a/app/enrichment/llm_client.py +++ b/app/enrichment/llm_client.py @@ -13,7 +13,7 @@ class LLMClient: self.lm_studio_endpoints = { 'plato': {'url': 'http://192.168.1.74:1234', 'model': 'openai/gpt-oss-20b'}, - 'postgres': {'url': 'http://192.168.1.159:1234', 'model': 'mistralai/devstral-small-2507'}, + 'postgres': {'url': 'http://192.168.1.159:1234', 'model': 'mistralai/devstral-small-2-2512'}, 'local': {'url': 'http://localhost:11434', 'model': 'llama3'} } diff --git a/app/main.py b/app/main.py index a47eeb9..b24f3a2 100644 --- a/app/main.py +++ b/app/main.py @@ -894,3 +894,126 @@ class DiskReorganizer: cursor.close() conn.close() + + def inventory_file_types(self, disk: Optional[str]=None, limit: int=50): + from analysis.inventory import FileTypeInventory + inventory = FileTypeInventory(self.db_config) + results = inventory.analyze(disk=disk, limit=limit) + + print(f'\n=== FILE TYPE INVENTORY ===\n') + print(f'{"Extension":<15} {"Count":>10} {"Total Size":>12} {"Parsed":>8} {"Status":>8} {"Parser":>15}') + print('=' * 95) + + for ext_info in results['extensions']: + ext = ext_info['extension'] + count = ext_info['count'] + size = ext_info['total_size'] + parsed = ext_info['parsed'] + ptype = ext_info['parser_type'] + status = '✓' if ext_info['is_parseable'] else '✗' + print(f'{ext:<15} {count:>10,} {inventory.format_size(size):>12} {parsed:>8,} {status:>8} {ptype:>15}') + + print('=' * 95) + summary = results['summary'] + print(f'Total files: {summary["total_files"]:,}') + print(f'Parseable: {summary["parseable_files"]:,} ({100*summary["parseable_files"]/summary["total_files"]:.1f}%)') + print(f'Parsed: {summary["parsed_files"]:,} ({summary["coverage"]:.1f}% coverage)') + + print(f'\n=== PARSER STATUS ===\n') + for ptype, info in results['parser_status'].items(): + status = '✓ Implemented' if info['implemented'] else '✗ Not yet' + print(f'{ptype:<15} {status:<20} {", ".join(info["extensions"][:10])}') + + if results['unparsed_by_type']: + print(f'\n=== UNPARSED FILES BY TYPE ===\n') + for ptype, info in sorted(results['unparsed_by_type'].items(), key=lambda x: x[1]['count'], reverse=True): + print(f'{ptype:<15} {info["count"]:>10,} files unparsed') + exts = sorted(info["extensions"])[:10] + print(f' Extensions: {", ".join(exts)}') + + def review_migration(self, category: Optional[str]=None, show_build: bool=False): + from classification.classifier import FileClassifier + classifier = FileClassifier() + conn = self.get_connection() + cursor = conn.cursor() + try: + query = 'SELECT path, size, category FROM files WHERE 1=1' + params = [] + if category: + query += ' AND category = %s' + params.append(category) + if not show_build: + query += " AND category NOT LIKE 'artifacts%'" + query += ' LIMIT 100' + cursor.execute(query, params) + results = cursor.fetchall() + print(f'\n=== MIGRATION REVIEW ({len(results)} files) ===\n') + for path, size, cat in results: + print(f'{path[:70]:<70} {cat:>20}') + finally: + cursor.close() + conn.close() + + def format_size(self, size: int) -> str: + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if size < 1024: + return f'{size:.1f}{unit}' + size /= 1024 + return f'{size:.1f}PB' + +def main(): + parser = argparse.ArgumentParser(description='Disk Reorganizer with Content Understanding') + subparsers = parser.add_subparsers(dest='command', required=True) + + inventory_parser = subparsers.add_parser('inventory', help='Analyze file types and parser coverage') + inventory_parser.add_argument('--disk', help='Analyze specific disk') + inventory_parser.add_argument('--limit', type=int, default=50, help='Limit results') + + index_parser = subparsers.add_parser('index', help='Index files on a disk') + index_parser.add_argument('disk_root', help='Root path of disk') + index_parser.add_argument('disk_name', help='Logical name for disk') + + parse_parser = subparsers.add_parser('parse', help='Parse files to extract text') + parse_parser.add_argument('--kind', help='Parse specific kind (text, code, pdf)') + parse_parser.add_argument('--limit', type=int, default=100, help='Limit parse batch') + parse_parser.add_argument('--update', action='store_true', help='Save extracted text to database') + + enrich_parser = subparsers.add_parser('enrich', help='Enrich content with LLM analysis') + enrich_parser.add_argument('--limit', type=int, default=10, help='Limit enrichment batch') + enrich_parser.add_argument('--use-llm', action='store_true', help='Use LLM for summarization') + enrich_parser.add_argument('--network', action='store_true', help='Use network LM_STUDIO') + + search_parser = subparsers.add_parser('search', help='Search indexed content') + search_parser.add_argument('query', help='Search query') + search_parser.add_argument('--type', choices=['text', 'enrichment', 'path'], default='enrichment') + search_parser.add_argument('--limit', type=int, default=20, help='Max results') + + classify_parser = subparsers.add_parser('classify', help='Classify files') + classify_parser.add_argument('--disk', help='Classify specific disk') + classify_parser.add_argument('--update', action='store_true', help='Update database') + classify_parser.add_argument('--no-resume', action='store_true', help='Start from scratch') + + folders_parser = subparsers.add_parser('analyze-folders', help='Analyze folder structure') + folders_parser.add_argument('--disk', help='Analyze specific disk') + folders_parser.add_argument('--min-files', type=int, default=3) + + args = parser.parse_args() + tool = DiskReorganizer() + + if args.command == 'inventory': + tool.inventory_file_types(disk=args.disk, limit=args.limit) + elif args.command == 'index': + tool.index_disk(args.disk_root, args.disk_name) + elif args.command == 'parse': + tool.parse_files(kind=args.kind, limit=args.limit, update_db=args.update) + elif args.command == 'enrich': + tool.enrich_files(limit=args.limit, use_llm=args.use_llm, use_local=not args.network) + elif args.command == 'search': + tool.search_content(query=args.query, limit=args.limit, search_type=args.type) + elif args.command == 'classify': + tool.classify_files(disk=args.disk, update_db=args.update, resume=not args.no_resume) + elif args.command == 'analyze-folders': + tool.analyze_folders(disk=args.disk, min_files=args.min_files) + +if __name__ == '__main__': + main() diff --git a/logs/batch_code.pid b/logs/batch_code.pid new file mode 100644 index 0000000..e0e5bad --- /dev/null +++ b/logs/batch_code.pid @@ -0,0 +1 @@ +676500 diff --git a/sql/migrations/003_content_graph.sql b/sql/migration/V004__content_graph.sql similarity index 100% rename from sql/migrations/003_content_graph.sql rename to sql/migration/V004__content_graph.sql diff --git a/sql/migration/V005__fts_and_chunks.sql b/sql/migration/V005__fts_and_chunks.sql new file mode 100644 index 0000000..3d98c4c --- /dev/null +++ b/sql/migration/V005__fts_and_chunks.sql @@ -0,0 +1,94 @@ +ALTER TABLE files ADD COLUMN IF NOT EXISTS fts_vector tsvector; + +CREATE INDEX IF NOT EXISTS idx_files_fts ON files USING GIN(fts_vector); + +CREATE OR REPLACE FUNCTION files_fts_update() RETURNS trigger AS $$ +BEGIN + NEW.fts_vector := + setweight(to_tsvector('english', COALESCE(NEW.path, '')), 'A') || + setweight(to_tsvector('english', COALESCE(NEW.extracted_text, '')), 'B'); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER files_fts_trigger + BEFORE INSERT OR UPDATE OF path, extracted_text + ON files + FOR EACH ROW + EXECUTE FUNCTION files_fts_update(); + +UPDATE files SET fts_vector = + setweight(to_tsvector('english', COALESCE(path, '')), 'A') || + setweight(to_tsvector('english', COALESCE(extracted_text, '')), 'B') +WHERE extracted_text IS NOT NULL AND fts_vector IS NULL; + +CREATE TABLE IF NOT EXISTS content_chunks ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + node_id UUID REFERENCES content_nodes(id) ON DELETE CASCADE, + file_path TEXT NOT NULL, + disk_label VARCHAR(50), + + chunk_id VARCHAR(32) NOT NULL, + chunk_index INT NOT NULL, + chunk_text TEXT NOT NULL, + chunk_size INT, + + offset_start INT, + offset_end INT, + section_title TEXT, + + metadata JSONB, + fts_vector tsvector, + + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + + UNIQUE(file_path, chunk_id) +); + +CREATE INDEX IF NOT EXISTS idx_chunks_node ON content_chunks(node_id); +CREATE INDEX IF NOT EXISTS idx_chunks_file ON content_chunks(file_path); +CREATE INDEX IF NOT EXISTS idx_chunks_fts ON content_chunks USING GIN(fts_vector); +CREATE INDEX IF NOT EXISTS idx_chunks_metadata ON content_chunks USING GIN(metadata); + +CREATE OR REPLACE FUNCTION chunks_fts_update() RETURNS trigger AS $$ +BEGIN + NEW.fts_vector := + setweight(to_tsvector('english', COALESCE(NEW.section_title, '')), 'A') || + setweight(to_tsvector('english', COALESCE(NEW.chunk_text, '')), 'B'); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER chunks_fts_trigger + BEFORE INSERT OR UPDATE OF chunk_text, section_title + ON content_chunks + FOR EACH ROW + EXECUTE FUNCTION chunks_fts_update(); + +CREATE TABLE IF NOT EXISTS directory_index ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + dir_path TEXT NOT NULL UNIQUE, + disk_label VARCHAR(50), + + file_count INT DEFAULT 0, + total_size BIGINT DEFAULT 0, + indexed_files INT DEFAULT 0, + + aggregated_text TEXT, + fts_vector tsvector, + + bm25_stats JSONB, + + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_dir_index_path ON directory_index(dir_path); +CREATE INDEX IF NOT EXISTS idx_dir_index_disk ON directory_index(disk_label); +CREATE INDEX IF NOT EXISTS idx_dir_index_fts ON directory_index USING GIN(fts_vector); + +COMMENT ON TABLE content_chunks IS 'Structured chunks for BM25 and vector search'; +COMMENT ON TABLE directory_index IS 'Aggregated directory-level BM25 index'; +COMMENT ON COLUMN content_chunks.chunk_id IS 'Stable hash-based chunk identifier'; +COMMENT ON COLUMN content_chunks.offset_start IS 'Character offset or paragraph index'; +COMMENT ON COLUMN content_chunks.section_title IS 'Heading/function name for structured chunks';