This commit is contained in:
mike
2025-12-13 11:35:33 +01:00
parent 9759001f4c
commit e9eb7ea5d9
16 changed files with 899 additions and 216 deletions

View File

@@ -0,0 +1,110 @@
from pathlib import Path
from typing import Dict, Set, List
from collections import Counter
class FolderAnalyzer:
def __init__(self):
self.manifest_files = {
'java': ['pom.xml', 'build.gradle', 'build.gradle.kts'],
'javascript': ['package.json', 'yarn.lock', 'package-lock.json'],
'python': ['pyproject.toml', 'setup.py', 'requirements.txt', 'Pipfile'],
'go': ['go.mod', 'go.sum'],
'rust': ['Cargo.toml', 'Cargo.lock'],
'docker': ['Dockerfile', 'docker-compose.yml', 'docker-compose.yaml'],
'k8s': ['helm', 'kustomization.yaml', 'deployment.yaml']
}
self.intent_keywords = {
'infrastructure': ['infra', 'deploy', 'k8s', 'docker', 'terraform', 'ansible'],
'application': ['app', 'service', 'api', 'server', 'client'],
'data': ['data', 'dataset', 'models', 'training', 'ml'],
'documentation': ['docs', 'documentation', 'wiki', 'readme'],
'testing': ['test', 'tests', 'spec', 'e2e', 'integration'],
'build': ['build', 'dist', 'target', 'out', 'bin'],
'config': ['config', 'conf', 'settings', 'env']
}
def analyze_folder(self, folder_path: Path, files: List[Dict]) -> Dict:
files_list = [Path(f['path']) for f in files]
has_readme = any('readme' in f.name.lower() for f in files_list)
has_git = any('.git' in str(f) for f in files_list)
manifest_types = self._detect_manifests(files_list)
has_manifest = len(manifest_types) > 0
file_types = Counter(f.suffix.lower() for f in files_list if f.suffix)
dominant_types = dict(file_types.most_common(10))
intent = self._infer_intent(folder_path.name.lower(), files_list)
project_type = self._infer_project_type(manifest_types, dominant_types)
structure = {
'depth': len(folder_path.parts),
'has_src': any('src' in str(f) for f in files_list[:20]),
'has_tests': any('test' in str(f) for f in files_list[:20]),
'has_docs': any('doc' in str(f) for f in files_list[:20])
}
return {
'has_readme': has_readme,
'has_git': has_git,
'has_manifest': has_manifest,
'manifest_types': manifest_types,
'dominant_file_types': dominant_types,
'project_type': project_type,
'intent': intent,
'structure': structure
}
def _detect_manifests(self, files: List[Path]) -> List[str]:
detected = []
file_names = {f.name for f in files}
for tech, manifests in self.manifest_files.items():
if any(m in file_names for m in manifests):
detected.append(tech)
return detected
def _infer_intent(self, folder_name: str, files: List[Path]) -> str:
file_str = ' '.join(str(f) for f in files[:50])
for intent, keywords in self.intent_keywords.items():
if any(kw in folder_name or kw in file_str.lower() for kw in keywords):
return intent
return 'unknown'
def _infer_project_type(self, manifests: List[str], file_types: Dict) -> str:
if manifests:
return manifests[0]
if '.py' in file_types and file_types.get('.py', 0) > 5:
return 'python'
if '.js' in file_types or '.ts' in file_types:
return 'javascript'
if '.java' in file_types:
return 'java'
if '.go' in file_types:
return 'go'
return 'mixed'
def generate_summary(self, folder_analysis: Dict, readme_text: str = None) -> str:
parts = []
if folder_analysis.get('project_type'):
parts.append(f"{folder_analysis['project_type']} project")
if folder_analysis.get('intent'):
parts.append(f"for {folder_analysis['intent']}")
if folder_analysis.get('manifest_types'):
parts.append(f"using {', '.join(folder_analysis['manifest_types'])}")
if readme_text:
first_para = readme_text.split('\n\n')[0][:200]
parts.append(f"Description: {first_para}")
return ' '.join(parts) if parts else 'Mixed content folder'

View File

@@ -0,0 +1,59 @@
from typing import Dict
import re
class ContentEnricher:
def __init__(self, llm_client=None):
self.llm_client = llm_client
self.pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
}
def enrich(self, text: str, use_llm: bool = False) -> Dict:
enrichment = {
'summary': self._basic_summary(text),
'word_count': len(text.split()),
'has_pii': self._detect_pii(text),
'quality': self._assess_quality(text),
'topics': self._extract_basic_topics(text)
}
if use_llm and self.llm_client:
llm_result = self.llm_client.classify_content(text)
if llm_result.get('success'):
enrichment['llm_classification'] = llm_result['text']
return enrichment
def _basic_summary(self, text: str) -> str:
sentences = re.split(r'[.!?]+', text)
return ' '.join(sentences[:3])[:200]
def _detect_pii(self, text: str) -> Dict:
detected = {}
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, text)
if matches:
detected[pii_type] = len(matches)
return detected
def _assess_quality(self, text: str) -> str:
if len(text.strip()) < 10:
return 'low'
special_char_ratio = sum(1 for c in text if not c.isalnum() and not c.isspace()) / len(text)
if special_char_ratio > 0.3:
return 'low'
return 'high' if len(text.split()) > 50 else 'medium'
def _extract_basic_topics(self, text: str) -> list:
words = re.findall(r'\b[A-Z][a-z]+\b', text)
word_freq = {}
for word in words:
if len(word) > 3:
word_freq[word] = word_freq.get(word, 0) + 1
return sorted(word_freq, key=word_freq.get, reverse=True)[:10]

View File

@@ -0,0 +1,54 @@
import requests
import json
from typing import Dict, Optional
class LLMClient:
def __init__(self, endpoint: str = 'http://192.168.1.74:1234', model: str = 'local'):
self.endpoint = endpoint
self.model = model
self.local_ollama = 'http://localhost:11434'
def summarize(self, text: str, max_length: int = 200) -> Dict:
prompt = f"Summarize the following in {max_length} chars or less:\n\n{text[:2000]}"
return self._query(prompt)
def extract_topics(self, text: str) -> Dict:
prompt = f"Extract 5-10 key topics/tags from this text. Return as comma-separated list:\n\n{text[:2000]}"
return self._query(prompt)
def classify_content(self, text: str) -> Dict:
prompt = f"Classify this content. Return: category, topics, has_pii (yes/no), quality (high/medium/low):\n\n{text[:1000]}"
return self._query(prompt)
def _query(self, prompt: str, use_local: bool = False) -> Dict:
try:
endpoint = self.local_ollama if use_local else self.endpoint
if use_local:
response = requests.post(
f'{endpoint}/api/generate',
json={'model': 'llama3.2', 'prompt': prompt, 'stream': False},
timeout=30
)
else:
response = requests.post(
f'{endpoint}/v1/chat/completions',
json={
'model': self.model,
'messages': [{'role': 'user', 'content': prompt}],
'max_tokens': 500
},
timeout=30
)
if response.status_code == 200:
data = response.json()
if use_local:
return {'success': True, 'text': data.get('response', '')}
else:
return {'success': True, 'text': data['choices'][0]['message']['content']}
else:
return {'success': False, 'error': f'HTTP {response.status_code}'}
except Exception as e:
return {'success': False, 'error': str(e)}

View File

@@ -27,7 +27,7 @@ class DiskReorganizer:
def __init__(self, db_config: Dict=None):
if db_config is None:
db_config = {'host': os.getenv('DB_HOST', '192.168.1.159'), 'port': int(os.getenv('DB_PORT', 5432)), 'database': os.getenv('DB_NAME', 'disk_reorganizer_db'), 'user': os.getenv('DB_USER', 'disk_reorg_user'), 'password': os.getenv('DB_PASSWORD', 'heel-goed-wachtwoord')}
db_config = {'host': os.getenv('DB_HOST', '192.168.1.159'), 'port': int(os.getenv('DB_PORT', 5432)), 'database': os.getenv('DB_NAME', 'disk_reorganizer_db'), 'user': os.getenv('DB_USER', 'auction'), 'password': os.getenv('DB_PASSWORD', 'heel-goed-wachtwoord')}
self.db_config = db_config
self.init_database()
@@ -522,23 +522,126 @@ class DiskReorganizer:
cursor.close()
conn.close()
def classify_files(self, disk: Optional[str]=None, update_db: bool=False):
def parse_files(self, kind: Optional[str] = None, limit: int = 100, update_db: bool = False):
from parsers.text_parser import TextParser
from parsers.code_parser import CodeParser
from parsers.pdf_parser import PDFParser
parsers = {'text': TextParser(), 'code': CodeParser(), 'pdf': PDFParser()}
disk_mount_map = {'SMT': '/media/mike/SMT', 'DISK1': '/media/mike/DISK1', 'LLM': '/media/mike/LLM'}
conn = self.get_connection()
cursor = conn.cursor()
try:
query = "SELECT path, size, disk_label FROM files WHERE 1=1"
params = []
if kind:
suffix_map = {'text': "('.txt', '.md', '.log', '.json')", 'code': "('.py', '.js', '.java', '.go')", 'pdf': "('.pdf',)"}
if kind in suffix_map:
query += f" AND RIGHT(path, 4) IN {suffix_map[kind]} OR RIGHT(path, 3) IN {suffix_map[kind]}"
query += f" LIMIT {limit}"
cursor.execute(query, params)
files = cursor.fetchall()
print(f"\n=== PARSING FILES ===\nProcessing {len(files)} files\n")
parsed_count = 0
for path, size, disk_label in files:
mount_point = disk_mount_map.get(disk_label, disk_label)
full_path = Path(mount_point) / path if not Path(path).is_absolute() else Path(path)
if not full_path.exists() or int(size) > 10 * 1024 * 1024:
continue
file_kind = 'pdf' if path.endswith('.pdf') else 'code' if any(path.endswith(e) for e in ['.py', '.js', '.java']) else 'text'
parser = parsers.get(file_kind)
if not parser:
continue
result = parser.parse(full_path)
if 'error' not in result:
text = result.get('text', '')
quality = result.get('quality', 'unknown')
print(f"{path[:60]} | {file_kind} | {len(text):,} chars")
if update_db and text:
cursor.execute("UPDATE files SET extracted_text = %s, text_quality = %s WHERE path = %s", (text[:50000], quality, path))
parsed_count += 1
if parsed_count % 10 == 0:
conn.commit()
if update_db:
conn.commit()
print(f"\nParsed {parsed_count} files")
finally:
cursor.close()
conn.close()
def enrich_files(self, limit: int = 10, llm_endpoint: str = None, use_local: bool = False):
from enrichment.enricher import ContentEnricher
enricher = ContentEnricher()
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute(f"SELECT path, extracted_text FROM files WHERE extracted_text IS NOT NULL LIMIT {limit}")
files = cursor.fetchall()
print(f"\n=== ENRICHING CONTENT ===\nProcessing {len(files)} files\n")
for path, text in files:
enrichment = enricher.enrich(text[:5000], use_llm=False)
print(f"{path[:60]}")
print(f" Quality: {enrichment.get('quality')} | Words: {enrichment.get('word_count'):,}")
print(f" PII: {list(enrichment.get('has_pii', {}).keys())}")
print(f" Topics: {', '.join(enrichment.get('topics', [])[:5])}\n")
cursor.execute("UPDATE files SET enrichment = %s::jsonb WHERE path = %s", (json.dumps(enrichment), path))
conn.commit()
print(f"Enriched {len(files)} files")
finally:
cursor.close()
conn.close()
def classify_files(self, disk: Optional[str]=None, update_db: bool=False, resume: bool=True):
from classification.classifier import FileClassifier
classifier = FileClassifier()
conn = self.get_connection()
cursor = conn.cursor()
try:
task_name = f"classify_{disk or 'all'}"
skip_count = 0
if resume and update_db:
cursor.execute('SELECT last_processed_path, processed_count FROM processing_checkpoints WHERE task_name = %s', (task_name,))
checkpoint = cursor.fetchone()
if checkpoint:
last_path, skip_count = checkpoint
logger.info(f'Resuming from checkpoint: {skip_count:,} files already processed')
if disk:
cursor.execute('SELECT path, size, disk_label FROM files WHERE disk_label = %s', (disk,))
cursor.execute('SELECT path, size, disk_label FROM files WHERE disk_label = %s ORDER BY path', (disk,))
else:
cursor.execute('SELECT path, size, disk_label FROM files')
cursor.execute('SELECT path, size, disk_label FROM files ORDER BY path')
files = cursor.fetchall()
total = len(files)
logger.info(f'Classifying {total:,} files...')
categories = {}
build_artifacts = 0
batch = []
processed = 0
for idx, (path, size, disk_label) in enumerate(files, 1):
if idx <= skip_count:
continue
labels, category, is_build = classifier.classify_path(path, int(size))
if is_build:
build_artifacts += 1
@@ -546,18 +649,40 @@ class DiskReorganizer:
categories[category] = {'count': 0, 'size': 0}
categories[category]['count'] += 1
categories[category]['size'] += int(size)
if update_db:
labels_str = ','.join(labels)
batch.append((category, labels_str, path))
if len(batch) >= 1000:
cursor.executemany('UPDATE files SET category = %s WHERE path = %s', [(cat, p) for cat, lbl, p in batch])
cursor.execute('''
INSERT INTO processing_checkpoints (task_name, last_processed_path, processed_count, updated_at)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (task_name) DO UPDATE SET
last_processed_path = EXCLUDED.last_processed_path,
processed_count = EXCLUDED.processed_count,
updated_at = CURRENT_TIMESTAMP
''', (task_name, path, idx))
conn.commit()
batch.clear()
processed += 1
if idx % 1000 == 0:
print(f'\rClassified: {idx:,}/{total:,}', end='', flush=True)
print(f'\rClassified: {idx:,}/{total:,} ({100*idx/total:.1f}%)', end='', flush=True)
if update_db and batch:
cursor.executemany('UPDATE files SET category = %s WHERE path = %s', [(cat, p) for cat, lbl, p in batch])
cursor.execute('''
INSERT INTO processing_checkpoints (task_name, last_processed_path, processed_count, updated_at)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (task_name) DO UPDATE SET
last_processed_path = EXCLUDED.last_processed_path,
processed_count = EXCLUDED.processed_count,
updated_at = CURRENT_TIMESTAMP
''', (task_name, files[-1][0] if files else '', total))
conn.commit()
print()
print(f'\n=== CLASSIFICATION SUMMARY ===')
print(f'Total files: {total:,}')
@@ -570,6 +695,99 @@ class DiskReorganizer:
cursor.close()
conn.close()
def analyze_folders(self, disk: Optional[str]=None, min_files: int=3):
from analysis.folder_analyzer import FolderAnalyzer
analyzer = FolderAnalyzer()
conn = self.get_connection()
cursor = conn.cursor()
try:
query = '''
SELECT DISTINCT SUBSTRING(path FROM 1 FOR POSITION('/' IN path || '/') - 1) as folder, disk_label
FROM files
WHERE 1=1
'''
params = []
if disk:
query += ' AND disk_label = %s'
params.append(disk)
cursor.execute(query, params)
potential_folders = cursor.fetchall()
logger.info(f'Found {len(potential_folders)} potential folders to analyze')
processed = 0
for folder_name, disk_label in potential_folders:
cursor.execute('''
SELECT path, size FROM files
WHERE disk_label = %s AND path LIKE %s
''', (disk_label, f'{folder_name}%'))
files = cursor.fetchall()
if len(files) < min_files:
continue
files_list = [{'path': f[0], 'size': int(f[1])} for f in files]
folder_path = Path(folder_name)
analysis = analyzer.analyze_folder(folder_path, files_list)
readme_text = None
for file_dict in files_list:
if 'readme' in file_dict['path'].lower():
readme_text = f"Found README at {file_dict['path']}"
break
summary = analyzer.generate_summary(analysis, readme_text)
cursor.execute('''
INSERT INTO folders (path, disk_label, file_count, total_size, project_type, intent, summary,
has_readme, has_git, has_manifest, manifest_types, dominant_file_types, structure)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (path) DO UPDATE SET
file_count = EXCLUDED.file_count,
total_size = EXCLUDED.total_size,
project_type = EXCLUDED.project_type,
intent = EXCLUDED.intent,
summary = EXCLUDED.summary,
has_readme = EXCLUDED.has_readme,
has_git = EXCLUDED.has_git,
has_manifest = EXCLUDED.has_manifest,
manifest_types = EXCLUDED.manifest_types,
dominant_file_types = EXCLUDED.dominant_file_types,
structure = EXCLUDED.structure,
updated_at = CURRENT_TIMESTAMP
''', (
str(folder_path), disk_label, len(files_list), sum(f['size'] for f in files_list),
analysis.get('project_type'), analysis.get('intent'), summary,
analysis.get('has_readme'), analysis.get('has_git'), analysis.get('has_manifest'),
analysis.get('manifest_types'), json.dumps(analysis.get('dominant_file_types', {})),
json.dumps(analysis.get('structure', {}))
))
processed += 1
if processed % 100 == 0:
conn.commit()
print(f'\rAnalyzed: {processed} folders', end='', flush=True)
conn.commit()
print()
logger.info(f'Completed folder analysis: {processed} folders')
cursor.execute('''
SELECT project_type, COUNT(*), SUM(file_count), SUM(total_size)
FROM folders
GROUP BY project_type
''')
print(f'\n=== FOLDER ANALYSIS SUMMARY ===')
for row in cursor.fetchall():
proj_type, count, files, size = row
print(f'{proj_type:20}: {count:6,} folders, {files:8,} files, {self.format_size(int(size or 0))}')
finally:
cursor.close()
conn.close()
def review_migration(self, category: Optional[str]=None, show_build: bool=False):
from classification.classifier import FileClassifier
classifier = FileClassifier()
@@ -640,9 +858,24 @@ def main():
extract_parser = subparsers.add_parser('extract', help='Extract content from files')
extract_parser.add_argument('--kind', help='Extract specific kind (pdf, image, audio, video)')
extract_parser.add_argument('--limit', type=int, default=10, help='Limit extraction batch')
parse_parser = subparsers.add_parser('parse', help='Parse files to extract text')
parse_parser.add_argument('--kind', help='Parse specific kind (text, code, pdf)')
parse_parser.add_argument('--limit', type=int, default=100, help='Limit parse batch')
parse_parser.add_argument('--update', action='store_true', help='Save extracted text to database')
enrich_parser = subparsers.add_parser('enrich', help='Enrich content with LLM analysis')
enrich_parser.add_argument('--limit', type=int, default=10, help='Limit enrichment batch')
enrich_parser.add_argument('--llm-endpoint', default='http://192.168.1.74:1234', help='LLM endpoint')
enrich_parser.add_argument('--local', action='store_true', help='Use local Ollama')
classify_parser = subparsers.add_parser('classify', help='Classify files and suggest organization')
classify_parser.add_argument('--disk', help='Classify specific disk')
classify_parser.add_argument('--update', action='store_true', help='Update database with classifications')
classify_parser.add_argument('--no-resume', action='store_true', help='Start from scratch instead of resuming')
folders_parser = subparsers.add_parser('analyze-folders', help='Analyze folder structure and infer project intent')
folders_parser.add_argument('--disk', help='Analyze specific disk')
folders_parser.add_argument('--min-files', type=int, default=3, help='Minimum files per folder')
review_parser = subparsers.add_parser('review', help='Review proposed migration structure')
review_parser.add_argument('--category', help='Review specific category')
review_parser.add_argument('--show-build', action='store_true', help='Include build artifacts')
@@ -669,8 +902,14 @@ def main():
tool.profile_content(disk=args.disk, update_db=args.update, limit=args.limit)
elif args.command == 'extract':
tool.extract_content(kind=args.kind, limit=args.limit)
elif args.command == 'parse':
tool.parse_files(kind=args.kind, limit=args.limit, update_db=args.update)
elif args.command == 'enrich':
tool.enrich_files(limit=args.limit, llm_endpoint=args.llm_endpoint, use_local=args.local)
elif args.command == 'classify':
tool.classify_files(disk=args.disk, update_db=args.update)
tool.classify_files(disk=args.disk, update_db=args.update, resume=not args.no_resume)
elif args.command == 'analyze-folders':
tool.analyze_folders(disk=args.disk, min_files=args.min_files)
elif args.command == 'review':
tool.review_migration(category=args.category, show_build=args.show_build)
elif args.command == 'report':

View File

@@ -0,0 +1,44 @@
from pathlib import Path
from typing import Dict
import re
class CodeParser:
def __init__(self):
self.patterns = {
'python': {'imports': r'^import |^from .+ import', 'class': r'^class \w+', 'function': r'^def \w+'},
'javascript': {'imports': r'^import |^require\(', 'class': r'^class \w+', 'function': r'^function \w+|^const \w+ = '},
'java': {'package': r'^package ', 'imports': r'^import ', 'class': r'^public class \w+'},
'go': {'package': r'^package ', 'imports': r'^import ', 'function': r'^func \w+'}
}
def parse(self, file_path: Path) -> Dict:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
language = self._detect_language(file_path, text)
structure = self._extract_structure(text, language)
return {
'text': text,
'language': language,
'line_count': len(text.split('\n')),
'structure': structure,
'quality': 'high'
}
except Exception as e:
return {'error': str(e)}
def _detect_language(self, file_path: Path, text: str) -> str:
lang_map = {'.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.java': 'java', '.go': 'go'}
return lang_map.get(file_path.suffix.lower(), 'unknown')
def _extract_structure(self, text: str, language: str) -> Dict:
patterns = self.patterns.get(language, {})
structure = {'type': 'code', 'language': language}
for key, pattern in patterns.items():
matches = re.findall(pattern, text, re.MULTILINE)
structure[key] = len(matches)
return structure

View File

@@ -0,0 +1,42 @@
from pathlib import Path
from typing import Dict
class MediaParser:
def parse_audio(self, file_path: Path) -> Dict:
return {
'text': '[Audio transcription pending]',
'needs_transcription': True,
'transcription_service': 'whisper',
'structure': {'type': 'audio'},
'quality': 'pending'
}
def parse_video(self, file_path: Path) -> Dict:
return {
'text': '[Video transcription pending]',
'needs_transcription': True,
'needs_scene_detection': True,
'transcription_service': 'whisper',
'structure': {'type': 'video'},
'quality': 'pending'
}
def parse_image(self, file_path: Path) -> Dict:
try:
from PIL import Image
with Image.open(file_path) as img:
width, height = img.size
mode = img.mode
return {
'text': '[Image caption/OCR pending]',
'needs_ocr': True,
'needs_caption': True,
'dimensions': f'{width}x{height}',
'mode': mode,
'structure': {'type': 'image', 'width': width, 'height': height},
'quality': 'pending'
}
except Exception as e:
return {'error': str(e)}

31
app/parsers/pdf_parser.py Normal file
View File

@@ -0,0 +1,31 @@
from pathlib import Path
from typing import Dict, List
class PDFParser:
def parse(self, file_path: Path) -> Dict:
try:
import PyPDF2
pages = []
with open(file_path, 'rb') as f:
pdf = PyPDF2.PdfReader(f)
page_count = len(pdf.pages)
for i, page in enumerate(pdf.pages[:50]):
text = page.extract_text()
pages.append({'page': i + 1, 'text': text, 'char_count': len(text)})
full_text = '\n\n'.join([p['text'] for p in pages])
has_text_layer = sum(p['char_count'] for p in pages) > 100
return {
'text': full_text,
'page_count': page_count,
'pages_extracted': len(pages),
'has_text_layer': has_text_layer,
'needs_ocr': not has_text_layer,
'structure': {'type': 'document', 'pages': pages[:5]},
'quality': 'high' if has_text_layer else 'needs_ocr'
}
except Exception as e:
return {'error': str(e), 'needs_ocr': True}

View File

@@ -0,0 +1,26 @@
from pathlib import Path
from typing import Dict, Optional
import chardet
class TextParser:
def parse(self, file_path: Path) -> Dict:
try:
with open(file_path, 'rb') as f:
raw_data = f.read(1024 * 1024)
encoding = chardet.detect(raw_data)['encoding'] or 'utf-8'
text = raw_data.decode(encoding, errors='ignore')
lines = text.split('\n')
return {
'text': text,
'encoding': encoding,
'line_count': len(lines),
'char_count': len(text),
'word_count': len(text.split()),
'structure': {'type': 'plain_text'},
'quality': 'high' if encoding == 'utf-8' else 'medium'
}
except Exception as e:
return {'error': str(e)}