Compare commits
2 Commits
f906e6666a
...
f4ef534e3f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f4ef534e3f | ||
|
|
91d4592272 |
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
httpx
|
||||
prompt_toolkit
|
||||
rich
|
||||
hashlib
|
||||
368
src/finish.py
Executable file
368
src/finish.py
Executable file
@@ -0,0 +1,368 @@
|
||||
#!/home/mike/.venvs/cli/bin/python3
|
||||
"""
|
||||
finish.py – AI shell completions that never leave your machine.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import hashlib
|
||||
|
||||
import argparse, asyncio, json, os, re, shlex, subprocess, sys, time
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
import httpx
|
||||
from prompt_toolkit import ANSI, Application
|
||||
from prompt_toolkit.buffer import Buffer
|
||||
from prompt_toolkit.key_binding import KeyBindings
|
||||
from prompt_toolkit.layout import (
|
||||
ConditionalContainer, FormattedTextControl, HSplit, Layout, Window
|
||||
)
|
||||
from prompt_toolkit.widgets import Label
|
||||
from rich.console import Console
|
||||
from rich.spinner import Spinner
|
||||
|
||||
VERSION = "0.5.0"
|
||||
CFG_DIR = Path.home()/".finish"
|
||||
CFG_FILE = CFG_DIR/"finish.json"
|
||||
CACHE_DIR = CFG_DIR/"cache"
|
||||
LOG_FILE = CFG_DIR/"finish.log"
|
||||
|
||||
console = Console()
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Config
|
||||
# --------------------------------------------------------------------------- #
|
||||
DEFAULT_CFG = {
|
||||
"provider": "ollama",
|
||||
"model": "llama3:latest",
|
||||
"endpoint": "http://localhost:11434/api/chat",
|
||||
"temperature": 0.0,
|
||||
"api_prompt_cost": 0.0,
|
||||
"api_completion_cost": 0.0,
|
||||
"max_history_commands": 20,
|
||||
"max_recent_files": 20,
|
||||
"cache_size": 100,
|
||||
}
|
||||
|
||||
def cfg() -> dict:
|
||||
if not CFG_FILE.exists():
|
||||
CFG_DIR.mkdir(exist_ok=True)
|
||||
CFG_FILE.write_text(json.dumps(DEFAULT_CFG, indent=2))
|
||||
return json.loads(CFG_FILE.read_text())
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Context builders
|
||||
# --------------------------------------------------------------------------- #
|
||||
def _sanitise_history() -> str:
|
||||
hist = subprocess.check_output(["bash", "-ic", "history"]).decode()
|
||||
# scrub tokens / hashes
|
||||
for pat in [
|
||||
r"\b[0-9a-f]{32,40}\b", # long hex
|
||||
r"\b[A-Za-z0-9-]{36}\b", # uuid
|
||||
r"\b[A-Za-z0-9]{16,40}\b", # api-keyish
|
||||
]:
|
||||
hist = re.sub(pat, "REDACTED", hist)
|
||||
return "\n".join(hist.splitlines()[-cfg()["max_history_commands"]:])
|
||||
|
||||
def _recent_files() -> str:
|
||||
try:
|
||||
files = subprocess.check_output(
|
||||
["find", ".", "-maxdepth", "1", "-type", "f", "-printf", "%T@ %p\n"],
|
||||
stderr=subprocess.DEVNULL,
|
||||
).decode()
|
||||
return "\n".join(sorted(files.splitlines(), reverse=True)[: cfg()["max_recent_files"]])
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def _help_for(cmd: str) -> str:
|
||||
try:
|
||||
return subprocess.check_output([cmd, "--help"], stderr=subprocess.STDOUT, timeout=2).decode()
|
||||
except Exception:
|
||||
return f"{cmd} --help not available"
|
||||
|
||||
def build_prompt(user_input: str) -> str:
|
||||
c = cfg()
|
||||
term_info = f"""User: {os.getenv("USER")}
|
||||
PWD: {os.getcwd()}
|
||||
HOME: {os.getenv("HOME")}
|
||||
HOST: {os.getenv("HOSTNAME")}
|
||||
SHELL: bash"""
|
||||
prompt = f"""You are a helpful bash-completion script.
|
||||
Generate 2–5 concise, valid bash commands that complete the user’s intent.
|
||||
|
||||
Reply **only** JSON: {{"completions":["cmd1","cmd2",...]}}
|
||||
|
||||
User command: {user_input}
|
||||
|
||||
Terminal context:
|
||||
{term_info}
|
||||
|
||||
History:
|
||||
{_sanitise_history()}
|
||||
|
||||
Recent files:
|
||||
{_recent_files()}
|
||||
|
||||
Help:
|
||||
{_help_for(user_input.split()[0])}"""
|
||||
return prompt
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# LLM call
|
||||
# --------------------------------------------------------------------------- #
|
||||
async def llm_complete(prompt: str) -> List[str]:
|
||||
c = cfg()
|
||||
payload = {
|
||||
"model": c["model"],
|
||||
"temperature": c["temperature"],
|
||||
"messages": [
|
||||
{"role": "system", "content": "You are a helpful bash completion assistant."},
|
||||
{"role": "user", "content": prompt},
|
||||
],
|
||||
"response_format": {"type": "text"},
|
||||
}
|
||||
if c["provider"] == "ollama":
|
||||
payload["format"] = "json"
|
||||
payload["stream"] = False
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if c.get("api_key"):
|
||||
headers["Authorization"] = f"Bearer {c['api_key']}"
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.post(c["endpoint"], headers=headers, json=payload)
|
||||
resp.raise_for_status()
|
||||
body = resp.json()
|
||||
except httpx.ConnectError as e:
|
||||
console.print(f"[red]Error:[/] Cannot connect to {c['endpoint']}")
|
||||
console.print(f"[yellow]Check that your LLM server is running and endpoint is correct.[/]")
|
||||
console.print(f"[yellow]Run 'finish config' to see current config.[/]")
|
||||
return []
|
||||
except Exception as e:
|
||||
console.print(f"[red]Error:[/] {e}")
|
||||
return []
|
||||
|
||||
# extract content
|
||||
if c["provider"] == "ollama":
|
||||
raw = body["message"]["content"]
|
||||
else:
|
||||
raw = body["choices"][0]["message"]["content"]
|
||||
|
||||
# Debug logging
|
||||
if os.getenv("FINISH_DEBUG"):
|
||||
LOG_FILE.parent.mkdir(exist_ok=True)
|
||||
with open(LOG_FILE, "a") as f:
|
||||
f.write(f"\n=== {time.strftime('%Y-%m-%d %H:%M:%S')} ===\n")
|
||||
f.write(f"Raw response:\n{raw}\n")
|
||||
|
||||
# try json first
|
||||
try:
|
||||
result = json.loads(raw)["completions"]
|
||||
if os.getenv("FINISH_DEBUG"):
|
||||
with open(LOG_FILE, "a") as f:
|
||||
f.write(f"Parsed completions: {result}\n")
|
||||
return result
|
||||
except Exception as e:
|
||||
if os.getenv("FINISH_DEBUG"):
|
||||
with open(LOG_FILE, "a") as f:
|
||||
f.write(f"JSON parse failed: {e}\n")
|
||||
# fallback: grep command-like lines
|
||||
fallback = [ln for ln in raw.splitlines() if re.match(r"^(ls|cd|find|cat|grep|echo|mkdir|rm|cp|mv|pwd|chmod|chown)\b", ln)][:5]
|
||||
if os.getenv("FINISH_DEBUG"):
|
||||
with open(LOG_FILE, "a") as f:
|
||||
f.write(f"Fallback completions: {fallback}\n")
|
||||
return fallback
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# TUI picker
|
||||
# --------------------------------------------------------------------------- #
|
||||
async def select_completion(completions: List[str]) -> Optional[str]:
|
||||
if not completions:
|
||||
return None
|
||||
if len(completions) == 1:
|
||||
return completions[0]
|
||||
|
||||
# If not in an interactive terminal, return first completion
|
||||
if not sys.stdin.isatty():
|
||||
return completions[0]
|
||||
|
||||
# Import here to avoid issues when not needed
|
||||
from prompt_toolkit.input import create_input
|
||||
from prompt_toolkit.output import create_output
|
||||
|
||||
kb = KeyBindings()
|
||||
current = 0
|
||||
|
||||
def get_text():
|
||||
return [
|
||||
("", "Select completion (↑↓ navigate, Enter accept, Esc cancel)\n\n"),
|
||||
*[
|
||||
("[SetCursorPosition]" if i == current else "", f"{c}\n")
|
||||
for i, c in enumerate(completions)
|
||||
],
|
||||
]
|
||||
|
||||
@kb.add("up")
|
||||
def _up(event):
|
||||
nonlocal current
|
||||
current = (current - 1) % len(completions)
|
||||
|
||||
@kb.add("down")
|
||||
def _down(event):
|
||||
nonlocal current
|
||||
current = (current + 1) % len(completions)
|
||||
|
||||
@kb.add("enter")
|
||||
def _accept(event):
|
||||
event.app.exit(result=completions[current])
|
||||
|
||||
@kb.add("escape")
|
||||
def _cancel(event):
|
||||
event.app.exit(result=None)
|
||||
|
||||
control = FormattedTextControl(get_text)
|
||||
|
||||
# Force output to /dev/tty to avoid interfering with bash command substitution
|
||||
try:
|
||||
output = create_output(stdout=open('/dev/tty', 'w'))
|
||||
input_obj = create_input(stdin=open('/dev/tty', 'r'))
|
||||
except Exception:
|
||||
# Fallback to first completion if tty not available
|
||||
return completions[0]
|
||||
|
||||
app = Application(
|
||||
layout=Layout(HSplit([Window(control, height=len(completions) + 2)])),
|
||||
key_bindings=kb,
|
||||
mouse_support=False,
|
||||
erase_when_done=True,
|
||||
output=output,
|
||||
input=input_obj,
|
||||
)
|
||||
return await app.run_async()
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Cache
|
||||
# --------------------------------------------------------------------------- #
|
||||
def cached(key: str) -> Optional[List[str]]:
|
||||
if not CACHE_DIR.exists():
|
||||
CACHE_DIR.mkdir(parents=True)
|
||||
f = CACHE_DIR / f"{key}.json"
|
||||
if f.exists():
|
||||
return json.loads(f.read_text())
|
||||
return None
|
||||
|
||||
def store_cache(key: str, completions: List[str]) -> None:
|
||||
f = CACHE_DIR / f"{key}.json"
|
||||
f.write_text(json.dumps(completions))
|
||||
# LRU eviction
|
||||
all_files = sorted(CACHE_DIR.glob("*.json"), key=lambda p: p.stat().st_mtime)
|
||||
for old in all_files[: -cfg()["cache_size"]]:
|
||||
old.unlink(missing_ok=True)
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# Main entry
|
||||
# --------------------------------------------------------------------------- #
|
||||
async def complete_line(line: str) -> Optional[str]:
|
||||
key = hashlib.md5(line.encode()).hexdigest()
|
||||
comps = cached(key)
|
||||
if comps is None:
|
||||
# Write to /dev/tty if available to avoid interfering with command substitution
|
||||
try:
|
||||
status_console = Console(file=open('/dev/tty', 'w'), stderr=False)
|
||||
except Exception:
|
||||
status_console = console
|
||||
|
||||
with status_console.status("[green]Thinking…[/]"):
|
||||
comps = await llm_complete(build_prompt(line))
|
||||
store_cache(key, comps)
|
||||
return await select_completion(comps)
|
||||
|
||||
# --------------------------------------------------------------------------- #
|
||||
# CLI
|
||||
# --------------------------------------------------------------------------- #
|
||||
def install_keybinding():
|
||||
r"""Inject Bash binding Alt+\ -> finish"""
|
||||
rc = Path.home()/".bashrc"
|
||||
marker = "# finish.py key-binding"
|
||||
|
||||
# Use bind -x to allow modifying READLINE_LINE
|
||||
snippet = f'''{marker}
|
||||
_finish_complete() {{
|
||||
local result
|
||||
result=$(finish --readline-complete "$READLINE_LINE" 2>/dev/null)
|
||||
if [[ -n "$result" ]]; then
|
||||
READLINE_LINE="$result"
|
||||
READLINE_POINT=${{#READLINE_LINE}}
|
||||
fi
|
||||
}}
|
||||
bind -x '"\\e\\\\": _finish_complete'
|
||||
'''
|
||||
|
||||
text = rc.read_text() if rc.exists() else ""
|
||||
if marker in text:
|
||||
return
|
||||
rc.write_text(text + "\n" + snippet)
|
||||
console.print("[green]Key-binding installed (Alt+\\)[/] – restart your shell.")
|
||||
|
||||
def main():
|
||||
# Handle readline-complete flag before argparse (for bash bind -x)
|
||||
if "--readline-complete" in sys.argv:
|
||||
idx = sys.argv.index("--readline-complete")
|
||||
line = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else ""
|
||||
if line.strip():
|
||||
choice = asyncio.run(complete_line(line))
|
||||
if choice:
|
||||
print(choice)
|
||||
return
|
||||
|
||||
# Legacy flag support
|
||||
if sys.argv[-1] == "--accept-current-line":
|
||||
line = os.environ.get("READLINE_LINE", "")
|
||||
if line.strip():
|
||||
choice = asyncio.run(complete_line(line))
|
||||
if choice:
|
||||
sys.stdout.write(f"\x1b]0;\a")
|
||||
sys.stdout.write(f"\x1b[2K\r")
|
||||
sys.stdout.write(choice)
|
||||
sys.stdout.flush()
|
||||
return
|
||||
|
||||
parser = argparse.ArgumentParser(prog="finish", description="AI shell completions")
|
||||
parser.add_argument("--version", action="version", version=VERSION)
|
||||
sub = parser.add_subparsers(dest="cmd")
|
||||
sub.add_parser("install", help="add Alt+\\ key-binding to ~/.bashrc")
|
||||
sub.add_parser("config", help="show current config")
|
||||
p = sub.add_parser("command", help="simulate double-tab")
|
||||
p.add_argument("words", nargs="*", help="partial command")
|
||||
p.add_argument("--dry-run", action="store_true", help="show prompt only")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.cmd == "install":
|
||||
install_keybinding()
|
||||
return
|
||||
if args.cmd == "config":
|
||||
console.print_json(json.dumps(cfg()))
|
||||
return
|
||||
if args.cmd == "command":
|
||||
line = " ".join(args.words)
|
||||
if args.dry_run:
|
||||
console.print(build_prompt(line))
|
||||
return
|
||||
choice = asyncio.run(complete_line(line))
|
||||
if choice:
|
||||
# Check if we're in an interactive shell
|
||||
if os.isatty(sys.stdout.fileno()):
|
||||
console.print(f"[dim]Selected:[/] [cyan]{choice}[/]")
|
||||
console.print(f"\n[yellow]Tip:[/] Use [bold]Alt+\\[/] keybinding for seamless completion!")
|
||||
else:
|
||||
print(choice)
|
||||
return
|
||||
if len(sys.argv) == 1:
|
||||
parser.print_help()
|
||||
return
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
@@ -253,6 +253,7 @@ log_request() {
|
||||
created=$(echo "$response_body" | jq -r ".created // $created")
|
||||
api_cost=$(echo "$prompt_tokens_int * $ACSH_API_PROMPT_COST + $completion_tokens_int * $ACSH_API_COMPLETION_COST" | bc)
|
||||
log_file=${ACSH_LOG_FILE:-"$HOME/.finish/finish.log"}
|
||||
mkdir -p "$(dirname "$log_file")" 2>/dev/null
|
||||
echo "$created,$user_input_hash,$prompt_tokens_int,$completion_tokens_int,$api_cost" >> "$log_file"
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user