399 lines
12 KiB
Python
399 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
import datetime as dt
|
|
import json
|
|
import os
|
|
import random
|
|
import re
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
import json, re
|
|
|
|
# --- USER-FRIENDLY CONFIG ---
|
|
# Max 7 letters for shorter, more common words
|
|
WORD_RE = re.compile(r"^[A-Z]{3,7}$")
|
|
EMPTY = " "
|
|
# Slightly smaller grid for denser puzzles
|
|
SIZE = 10
|
|
# More words needed since they're shorter
|
|
TARGET_WORDS = 15
|
|
MIN_ACCEPT_WORDS = 10
|
|
|
|
FEEDS = [
|
|
"https://feeds.nos.nl/nosnieuwsalgemeen",
|
|
"https://feeds.nos.nl/nosnieuwstech",
|
|
]
|
|
|
|
|
|
def env(name, default=None):
|
|
v = os.getenv(name)
|
|
return default if v is None or v == "" else v
|
|
|
|
|
|
def http_get(url, timeout=15):
|
|
req = urllib.request.Request(url, headers={"User-Agent": "puzzle-gen/1.0"})
|
|
with urllib.request.urlopen(req, timeout=timeout) as r:
|
|
return r.read()
|
|
|
|
|
|
def http_post_json(url, payload, timeout=45):
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=data,
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": "Bearer lm-studio",
|
|
"User-Agent": "puzzle-gen/1.0",
|
|
},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=timeout) as r:
|
|
return json.loads(r.read().decode("utf-8"))
|
|
|
|
|
|
def fetch_rss_items(url, limit=12):
|
|
raw = http_get(url)
|
|
root = ET.fromstring(raw)
|
|
channel = root.find("channel") if root.tag.lower().endswith("rss") else root
|
|
items = []
|
|
for it in channel.findall("item"):
|
|
title = (it.findtext("title") or "").strip()
|
|
desc = (it.findtext("description") or "").strip()
|
|
if title:
|
|
items.append((title, desc))
|
|
if len(items) >= limit:
|
|
break
|
|
return items
|
|
|
|
|
|
def safe_slug(s, maxlen=50):
|
|
s = s.lower()
|
|
s = re.sub(r"[^a-z0-9]+", "-", s).strip("-")
|
|
return (s[:maxlen] or "news")
|
|
|
|
|
|
def extract_first_json(text: str):
|
|
"""Parse first JSON value (object OR array) from any text."""
|
|
if not text:
|
|
return None
|
|
starts = [i for i in (text.find("{"), text.find("[")) if i != -1]
|
|
if not starts:
|
|
return None
|
|
i = min(starts)
|
|
try:
|
|
return json.JSONDecoder().raw_decode(text[i:])[0]
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def normalize_word(raw: str) -> str:
|
|
# A-Z only, remove hyphens/digits/spaces/etc.
|
|
w = re.sub(r"[^A-Za-z]", "", (raw or "")).upper()
|
|
return w
|
|
|
|
|
|
def sanitize_wordcluemap(obj):
|
|
"""
|
|
Accepts:
|
|
- dict: {"WORD":"clue", ...}
|
|
- list: [{"word":"...","clue":"..."}, {"WOORD":"...","clue":"..."}, ...]
|
|
Returns dict with keys A-Z 3..7 and non-empty clue.
|
|
"""
|
|
out = {}
|
|
|
|
if isinstance(obj, dict):
|
|
items = list(obj.items())
|
|
elif isinstance(obj, list):
|
|
items = []
|
|
for it in obj:
|
|
if not isinstance(it, dict):
|
|
continue
|
|
raw_word = it.get("word") or it.get("WOORD") or it.get("Word")
|
|
clue = it.get("clue") or it.get("CLUE") or it.get("hint") or it.get("HINT")
|
|
items.append((raw_word, clue))
|
|
else:
|
|
return out
|
|
|
|
for raw_word, clue in items:
|
|
if not isinstance(raw_word, str) or not isinstance(clue, str):
|
|
continue
|
|
w = normalize_word(raw_word)
|
|
if not WORD_RE.fullmatch(w):
|
|
continue
|
|
clue = clue.strip()
|
|
if not clue:
|
|
continue
|
|
out[w] = clue
|
|
|
|
return out
|
|
|
|
|
|
# ---- generator (no-touch) ----
|
|
def make_grid():
|
|
return [[EMPTY for _ in range(SIZE)] for _ in range(SIZE)]
|
|
|
|
|
|
def in_bounds(g, r, c):
|
|
return 0 <= r < len(g) and 0 <= c < len(g[0])
|
|
|
|
|
|
def can_place_notouch(g, word, r, c, direction):
|
|
H, W = len(g), len(g[0])
|
|
if r < 0 or c < 0:
|
|
return False
|
|
if direction == "horizontal" and c + len(word) > W:
|
|
return False
|
|
if direction == "vertical" and r + len(word) > H:
|
|
return False
|
|
|
|
# no "glue" before/after
|
|
br = r if direction == "horizontal" else r - 1
|
|
bc = c - 1 if direction == "horizontal" else c
|
|
if in_bounds(g, br, bc) and g[br][bc] != EMPTY:
|
|
return False
|
|
|
|
ar = r if direction == "horizontal" else r + len(word)
|
|
ac = c + len(word) if direction == "horizontal" else c
|
|
if in_bounds(g, ar, ac) and g[ar][ac] != EMPTY:
|
|
return False
|
|
|
|
for i, ch in enumerate(word):
|
|
rr = r if direction == "horizontal" else r + i
|
|
cc = c + i if direction == "horizontal" else c
|
|
cell = g[rr][cc]
|
|
crossing = cell != EMPTY
|
|
if crossing and cell != ch:
|
|
return False
|
|
|
|
if not crossing:
|
|
if direction == "horizontal":
|
|
if in_bounds(g, rr - 1, cc) and g[rr - 1][cc] != EMPTY: return False
|
|
if in_bounds(g, rr + 1, cc) and g[rr + 1][cc] != EMPTY: return False
|
|
else:
|
|
if in_bounds(g, rr, cc - 1) and g[rr][cc - 1] != EMPTY: return False
|
|
if in_bounds(g, rr, cc + 1) and g[rr][cc + 1] != EMPTY: return False
|
|
return True
|
|
|
|
|
|
def place_word(g, word, r, c, direction):
|
|
for i, ch in enumerate(word):
|
|
rr = r if direction == "horizontal" else r + i
|
|
cc = c + i if direction == "horizontal" else c
|
|
g[rr][cc] = ch
|
|
|
|
|
|
def find_spots(g, word, placed):
|
|
spots = []
|
|
for p in placed:
|
|
pw = p["word"]
|
|
for i, pch in enumerate(pw):
|
|
pr = p["row"] if p["direction"] == "horizontal" else p["row"] + i
|
|
pc = p["col"] + i if p["direction"] == "horizontal" else p["col"]
|
|
for j, wch in enumerate(word):
|
|
if wch != pch:
|
|
continue
|
|
direction = "vertical" if p["direction"] == "horizontal" else "horizontal"
|
|
r = pr if direction == "horizontal" else pr - j
|
|
c = pc - j if direction == "horizontal" else pc
|
|
if can_place_notouch(g, word, r, c, direction):
|
|
spots.append((r, c, direction))
|
|
return spots
|
|
|
|
|
|
def generate_puzzle(wordcluemap, rnd):
|
|
words = sorted(wordcluemap.keys(), key=len, reverse=True)
|
|
g = make_grid()
|
|
placed = []
|
|
|
|
first = words[0]
|
|
sr = SIZE // 2
|
|
sc = (SIZE - len(first)) // 2
|
|
if not can_place_notouch(g, first, sr, sc, "horizontal"):
|
|
return None
|
|
place_word(g, first, sr, sc, "horizontal")
|
|
placed.append({"word": first, "clue": wordcluemap[first], "row": sr, "col": sc, "direction": "horizontal"})
|
|
|
|
for w in words[1:]:
|
|
spots = find_spots(g, w, placed)
|
|
rnd.shuffle(spots)
|
|
if not spots:
|
|
continue
|
|
r, c, d = spots[0]
|
|
place_word(g, w, r, c, d)
|
|
placed.append({"word": w, "clue": wordcluemap[w], "row": r, "col": c, "direction": d})
|
|
|
|
return {"grid": g, "placed": placed}
|
|
|
|
|
|
def export_format(puz, difficulty=1, rewards=None):
|
|
if rewards is None:
|
|
rewards = {"coins": 50, "stars": 2, "hints": 1}
|
|
|
|
g = puz["grid"]
|
|
placed = puz["placed"]
|
|
H, W = len(g), len(g[0])
|
|
|
|
cells = []
|
|
for p in placed:
|
|
for i in range(len(p["word"])):
|
|
r = p["row"] if p["direction"] == "horizontal" else p["row"] + i
|
|
c = p["col"] + i if p["direction"] == "horizontal" else p["col"]
|
|
cells.append((r, c))
|
|
# arrow cell: before the start
|
|
ar = p["row"] if p["direction"] == "horizontal" else p["row"] - 1
|
|
ac = p["col"] - 1 if p["direction"] == "horizontal" else p["col"]
|
|
cells.append((ar, ac))
|
|
|
|
minR = min(r for r, _ in cells) - 1
|
|
minC = min(c for _, c in cells) - 1
|
|
maxR = max(r for r, _ in cells) + 1
|
|
maxC = max(c for _, c in cells) + 1
|
|
|
|
def ch_at(r, c):
|
|
if r < 0 or c < 0 or r >= H or c >= W:
|
|
return "#"
|
|
ch = g[r][c]
|
|
return "#" if ch == EMPTY else ch
|
|
|
|
gridv2 = []
|
|
for r in range(minR, maxR + 1):
|
|
row = "".join(ch_at(r, c) for c in range(minC, maxC + 1))
|
|
gridv2.append(row)
|
|
|
|
words_out = []
|
|
for p in placed:
|
|
arrowRow = (p["row"] if p["direction"] == "horizontal" else p["row"] - 1) - minR
|
|
arrowCol = (p["col"] - 1 if p["direction"] == "horizontal" else p["col"]) - minC
|
|
words_out.append({
|
|
"word": p["word"],
|
|
"clue": p["clue"],
|
|
"startRow": p["row"] - minR,
|
|
"startCol": p["col"] - minC,
|
|
"direction": p["direction"],
|
|
"answer": p["word"],
|
|
"arrowRow": arrowRow,
|
|
"arrowCol": arrowCol,
|
|
})
|
|
|
|
return {"gridv2": gridv2, "words": words_out, "difficulty": difficulty, "rewards": rewards}
|
|
|
|
|
|
def list_models(base_url):
|
|
try:
|
|
data = json.loads(http_get(f"{base_url}/models").decode("utf-8"))
|
|
return [m.get("id") for m in data.get("data", []) if m.get("id")]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def llm_make_wordcluemap(base_url, model, title, desc, n_words=12):
|
|
prompt = f"""
|
|
Geef ALLEEN een JSON object terug (geen array, geen markdown).
|
|
Formaat exact:
|
|
{{
|
|
"WOORD": "clue",
|
|
...
|
|
}}
|
|
|
|
REGELS:
|
|
- WOORD: alleen letters A-Z, geen streepjes/cijfers, lengte 3..7.
|
|
- Gebruik KORTE, GEBRUIKELIJKE Nederlandse woorden (geen jargon, geen moeilijke termen).
|
|
- Clue: korte, duidelijke hint in het Nederlands.
|
|
- Maak {n_words} items.
|
|
Thema: {title}
|
|
Context: {desc[:260]}
|
|
""".strip()
|
|
|
|
payload = {
|
|
"model": model,
|
|
"temperature": 0.7,
|
|
"messages": [
|
|
{"role": "system", "content": "Return STRICT JSON object only."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
}
|
|
|
|
data = http_post_json(f"{base_url}/chat/completions", payload)
|
|
content = data["choices"][0]["message"]["content"]
|
|
obj = extract_first_json(content)
|
|
wc = sanitize_wordcluemap(obj)
|
|
|
|
# Aggressive repair for short words
|
|
if len(wc) < MIN_ACCEPT_WORDS:
|
|
repair = f"""
|
|
Zet dit om naar een STRICT JSON OBJECT (geen array) "WOORD":"clue".
|
|
KRITIEK:
|
|
- WOORD: A-Z only, lengte 3..7. GEEN lange woorden!
|
|
- Gebruik ALLEEN korte, bekende Nederlandse woorden bij twijfel.
|
|
- Vervang ongeldige/moeilijke woorden door veelvoorkomende synoniemen.
|
|
Input:
|
|
{content}
|
|
""".strip()
|
|
|
|
payload["messages"] = [
|
|
{"role": "system", "content": "Return STRICT JSON object only."},
|
|
{"role": "user", "content": repair},
|
|
]
|
|
data = http_post_json(f"{base_url}/chat/completions", payload)
|
|
content2 = data["choices"][0]["message"]["content"]
|
|
obj2 = extract_first_json(content2)
|
|
wc2 = sanitize_wordcluemap(obj2)
|
|
if len(wc2) > len(wc):
|
|
wc = wc2
|
|
|
|
return wc
|
|
|
|
|
|
def main():
|
|
base_url = env("LM_STUDIO_BASE_URL", "http://192.168.1.159:1234/v1")
|
|
out_dir = env("OUT_DIR", "/data/puzzles")
|
|
per_day = int(env("PUZZLES_PER_DAY", "3"))
|
|
today = dt.date.today().isoformat()
|
|
rnd = random.Random(today)
|
|
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
items = []
|
|
for f in FEEDS:
|
|
try:
|
|
items.extend(fetch_rss_items(f))
|
|
except Exception:
|
|
pass
|
|
if not items:
|
|
raise SystemExit("No RSS items found")
|
|
|
|
models = list_models(base_url)
|
|
model = env("LM_MODEL", models[0] if models else "model-identifier")
|
|
|
|
made = 0
|
|
for idx in range(1, per_day + 1):
|
|
title, desc = rnd.choice(items)
|
|
slug = safe_slug(title)
|
|
|
|
wc = llm_make_wordcluemap(base_url, model, title, desc, n_words=TARGET_WORDS)
|
|
# Stricter validation: need more words since they're shorter
|
|
if len(wc) < MIN_ACCEPT_WORDS:
|
|
continue
|
|
|
|
puz = generate_puzzle(wc, rnd)
|
|
# Require at least 7 placed words for a decent puzzle
|
|
if not puz or len(puz["placed"]) < 7:
|
|
continue
|
|
|
|
exported = export_format(puz, difficulty=1, rewards={"coins": 50, "stars": 2, "hints": 1})
|
|
fn = f"crossword_{today}_{idx:02d}_{slug}.json"
|
|
path = os.path.join(out_dir, fn)
|
|
with open(path, "w", encoding="utf-8") as fp:
|
|
json.dump(exported, fp, ensure_ascii=False, indent=2)
|
|
made += 1
|
|
|
|
# index.json (handig voor je frontend)
|
|
files = sorted([f for f in os.listdir(out_dir) if f.startswith(f"crossword_{today}_") and f.endswith(".json")])
|
|
with open(os.path.join(out_dir, "index.json"), "w", encoding="utf-8") as fp:
|
|
json.dump({"date": today, "files": files}, fp, ensure_ascii=False, indent=2)
|
|
|
|
print(f"Generated {made} puzzles for {today}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |