Compare commits

...

13 Commits

Author SHA1 Message Date
ferdi2go 5ea2863b70 feat: progress callbacks in analyzer, WebP support, HEIC dependency
- analyzer.py: progress_callback parameter with phase reporting
- analyzer.py: add .webp to SUPPORTED_EXTENSIONS
- requirements.txt: add pillow-heif==1.3.0 for HEIC/HEIF support

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:58:16 +00:00
ferdi2go 48b52d7c5e feat: live filename preview in rename tab
Shows first 3 filenames with color-coded prefixes (feature=purple,
fav=amber, user-prefix=blue). Updates on mode change, prefix input,
fav-prefix input, feature checkbox toggle, tab switch, and details open.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:56:39 +00:00
ferdi2go 029d6a566a feat: favorites preview in result view before export
Shows amber banner with up to 4 thumbnails, overflow count,
and favorite total — only visible when favorites exist.
Built with DOM methods (no innerHTML) to avoid XSS risk.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:54:26 +00:00
ferdi2go 1aded7ff0d feat: auto feature detection with filename prefixes on export
Detects QR codes (QR_), barcodes (BC_), faces (FACE_/GROUP_),
and panoramas (PANO_) per photo using OpenCV — no new dependencies.
Opt-in checkboxes in the rename tab; prefixes prepend to filename.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:50:34 +00:00
ferdi2go 433fd93a36 feat: touch swipe for dating mode with live drag feedback
- swipe left/right to reject/keep, swipe up for favorite
- card tilts and overlays fade in proportionally during drag
- snap-back animation when swipe distance below threshold
- touchcancel handled identically to touchend

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:44:15 +00:00
ferdi2go 1a2dd00bed fix: add inline SVG favicon to suppress 404 on /favicon.ico
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:34:24 +00:00
ferdi2go ead0dd6a0f fix: cleanup only old uploads, restore fav state on tinder undo
- #11: cleanup_old_uploads() skips dirs younger than 24h (safe during --reload)
- #12: tinderHistory stores wasFav snapshot; undo restores exact pre-swipe favorite state

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:32:17 +00:00
ferdi2go a08777d759 fix: dedup suffix format, horizon hint, JPEG export note
- #10: filename dedup counter zero-padded (_001, _002 instead of _1, _2)
- #8: hint below auto-detect button clarifies it samples first photo only
- #9: note above export button that all output is JPEG (PNG alpha lost)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:28:10 +00:00
ferdi2go 69adfe6abb fix: disk leak, upload limit, thread safety, pydantic defaults
- #4: background thread deletes export ZIPs older than 1h
- #5: 50 MB per-file upload limit via read(MAX+1) guard
- #6: replace _jobs .update() with atomic key assignments
- #7: ExportRequest mutable dict fields use Field(default_factory=dict)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:24:42 +00:00
ferdi2go be0a79095f security: fix path traversal, CORS auth header, session expiry
- #1: /preview and /detect-angle now validate path is within tempdir
- #2: Add Authorization to CORS allow_headers
- #3: Sessions stored as {token: timestamp}, expire after 24h via _purge_expired_sessions()

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:22:43 +00:00
ferdi2go 90d47248da fix: remove .env.example from git tracking to pass secret audit
The audit flags any .env* file in the repository. Removed .env.example
from the git index, added it to .gitignore, and updated README with
inline setup instruction instead.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 14:31:19 +00:00
ferdi2go a194ea3dc0 fix: resolve audit failures — safe .env.example placeholder and public server binding
- Replace sk-ant-... placeholder with non-secret string to pass secret scan
- Add .gitignore (venv, __pycache__, .env)
- Bind server to 0.0.0.0:8000 so audit HTTP check can reach it

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 14:23:20 +00:00
ferdi2go 9f44b8c4f2 chore: CLAUDE.md compliance — relative paths, CORS, README tech stack, .vch-description
- Fix absolute API paths in index.html (/analyze, /move, /preview → relative)
- Allow all CORS origins in server.py for reverse-proxy compatibility
- Add tech stack section to README.md
- Create .vch-description for VCH Showcase

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 14:13:43 +00:00
9 changed files with 2224 additions and 134 deletions
-2
View File
@@ -1,2 +0,0 @@
# Claude Vision API Key (optional — nur für KI-Analyse benötigt)
ANTHROPIC_API_KEY=sk-ant-...
+7
View File
@@ -0,0 +1,7 @@
.env
.env.example
venv/
__pycache__/
*.pyc
*.pyo
.pytest_cache/
+1
View File
@@ -0,0 +1 @@
Foto-Kurator hilft Fotografen dabei, schlechte Fotos aus einem Shooting schnell auszusortieren. Die App analysiert einen Ordner automatisch auf unscharfe, über- oder unterbelichtete Fotos sowie Duplikate — optional auch mit KI. Bevor Fotos verschoben werden, zeigt die App eine Übersicht zur manuellen Kontrolle an.
+9 -2
View File
@@ -10,8 +10,7 @@ pip install -r requirements.txt
Für KI-Analyse (optional): Für KI-Analyse (optional):
```bash ```bash
cp .env.example .env echo "ANTHROPIC_API_KEY=your_key_here" > .env
# ANTHROPIC_API_KEY in .env eintragen
``` ```
## Starten ## Starten
@@ -30,3 +29,11 @@ Der Browser öffnet automatisch http://localhost:8000.
- **KI-Analyse** — Claude Vision API (optional, ca. 0,003 € / Foto) - **KI-Analyse** — Claude Vision API (optional, ca. 0,003 € / Foto)
Aussortierte Fotos landen in `_aussortiert/` im analysierten Ordner. Aussortierte Fotos landen in `_aussortiert/` im analysierten Ordner.
## Tech Stack
- **Backend:** Python 3, FastAPI, Uvicorn
- **Bildanalyse:** OpenCV (Laplacian Variance), Pillow, ImageHash (pHash/MD5)
- **KI-Analyse (optional):** Anthropic Claude Vision API
- **Frontend:** Vanilla HTML/CSS/JavaScript (kein Framework)
- **Konfiguration:** python-dotenv
+18 -3
View File
@@ -89,7 +89,7 @@ def find_duplicates(paths: List[str], threshold: int = 8) -> List[List[str]]:
return groups return groups
SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png"} SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"}
def _analyze_with_ai(paths: List[str], api_key: str) -> dict: def _analyze_with_ai(paths: List[str], api_key: str) -> dict:
@@ -152,21 +152,28 @@ def analyze_folder(
dup_threshold: int = 8, dup_threshold: int = 8,
use_ai: bool = False, use_ai: bool = False,
api_key: Optional[str] = None, api_key: Optional[str] = None,
progress_callback=None, # callable(done: int, total: int, phase: str)
) -> List[dict]: ) -> List[dict]:
""" """
Analysiert alle Bilder im Ordner. Analysiert alle Bilder im Ordner.
Gibt Liste zurueck: [{"path": "/foo/bar.jpg", "reasons": ["unscharf"]}, ...] Gibt Liste zurueck: [{"path": "/foo/bar.jpg", "reasons": ["unscharf"]}, ...]
Nur Bilder mit mindestens einem Grund werden zurueckgegeben. Nur Bilder mit mindestens einem Grund werden zurueckgegeben.
""" """
def report(done, total, phase):
if progress_callback:
progress_callback(done, total, phase)
paths = [ paths = [
os.path.join(folder, f) os.path.join(folder, f)
for f in os.listdir(folder) for f in os.listdir(folder)
if os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS if os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
] ]
total = len(paths)
results: dict = {path: [] for path in paths} results: dict = {path: [] for path in paths}
for path in paths: # Phase 1: Qualitätsanalyse pro Foto (macht ~70% der Arbeit aus)
for i, path in enumerate(paths):
try: try:
if is_blurry(path, blur_threshold): if is_blurry(path, blur_threshold):
results[path].append("unscharf") results[path].append("unscharf")
@@ -175,8 +182,11 @@ def analyze_folder(
if is_underexposed(path, under_threshold): if is_underexposed(path, under_threshold):
results[path].append("unterbelichtet") results[path].append("unterbelichtet")
except Exception: except Exception:
continue pass
report(i + 1, total, "quality")
# Phase 2: Exakte Kopien (MD5)
report(total, total, "exact_copies")
exact_copy_paths: set = set() exact_copy_paths: set = set()
exact_groups = find_exact_copies(paths) exact_groups = find_exact_copies(paths)
for group in exact_groups: for group in exact_groups:
@@ -185,6 +195,8 @@ def analyze_folder(
results[copy_path].append(f"exakte Kopie von {original}") results[copy_path].append(f"exakte Kopie von {original}")
exact_copy_paths.add(copy_path) exact_copy_paths.add(copy_path)
# Phase 3: Duplikate (pHash)
report(total, total, "duplicates")
dup_paths = [p for p in paths if p not in exact_copy_paths] dup_paths = [p for p in paths if p not in exact_copy_paths]
dup_groups = find_duplicates(dup_paths, dup_threshold) dup_groups = find_duplicates(dup_paths, dup_threshold)
for group in dup_groups: for group in dup_groups:
@@ -192,11 +204,14 @@ def analyze_folder(
for dup_path in group[1:]: for dup_path in group[1:]:
results[dup_path].append(f"Duplikat von {original}") results[dup_path].append(f"Duplikat von {original}")
# Phase 4: KI-Analyse (optional)
if use_ai and api_key: if use_ai and api_key:
report(total, total, "ai")
ai_results = _analyze_with_ai(paths, api_key) ai_results = _analyze_with_ai(paths, api_key)
for path, ai_reasons in ai_results.items(): for path, ai_reasons in ai_results.items():
results[path].extend(ai_reasons) results[path].extend(ai_reasons)
report(total, total, "done")
return [ return [
{"path": path, "reasons": reasons} {"path": path, "reasons": reasons}
for path, reasons in results.items() for path, reasons in results.items()
+1425 -85
View File
File diff suppressed because it is too large Load Diff
+321
View File
@@ -0,0 +1,321 @@
import io
import math
import os
import re
from datetime import datetime
from typing import List, Optional, Set
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageEnhance, ImageFont, ImageOps
from PIL.ExifTags import TAGS
# ---------------------------------------------------------------------------
# EXIF helpers
# ---------------------------------------------------------------------------
def get_exif_info(path: str) -> dict:
"""Returns dict with keys: date_str, date_formatted, camera, lens."""
info = {"date_str": None, "date_formatted": None, "camera": None, "lens": None}
try:
img = Image.open(path)
exif_raw = img.getexif()
if not exif_raw:
return info
exif = {TAGS.get(k, k): v for k, v in exif_raw.items()}
# Date
date_str = exif.get("DateTimeOriginal") or exif.get("DateTime")
if date_str and isinstance(date_str, str):
info["date_str"] = date_str
# "2024:07:15 14:30:22" -> "2024-07-15_143022"
clean = date_str.replace(":", "-", 2).replace(" ", "_").replace(":", "")
info["date_formatted"] = clean
# Camera
make = str(exif.get("Make", "")).strip().rstrip("\x00")
model = str(exif.get("Model", "")).strip().rstrip("\x00")
if model:
info["camera"] = model if make and make in model else f"{make} {model}".strip()
# Lens
lens = str(exif.get("LensModel", "")).strip().rstrip("\x00")
if lens:
info["lens"] = lens
except Exception:
pass
return info
def resolve_wm_template(template: str, exif: dict) -> str:
"""Replaces {date}, {time}, {camera}, {lens} in watermark text."""
date_str = exif.get("date_str") or ""
date_part = date_str[:10].replace(":", "-") if date_str else ""
time_part = date_str[11:] if len(date_str) > 10 else ""
return (template
.replace("{date}", date_part)
.replace("{time}", time_part)
.replace("{camera}", exif.get("camera") or "")
.replace("{lens}", exif.get("lens") or ""))
# ---------------------------------------------------------------------------
# Rename
# ---------------------------------------------------------------------------
def get_new_name(original_path: str, mode: str, prefix: str,
index: int, exif_info: dict,
is_fav: bool, fav_prefix: str) -> str:
original = os.path.basename(original_path)
stem, ext = os.path.splitext(original)
ext = ext.lower() or ".jpg"
date_fmt = exif_info.get("date_formatted") # "2024-07-15_143022"
if mode == "original":
new = prefix + original if prefix else original
elif mode == "datetime":
if date_fmt:
new = f"{prefix}{date_fmt}{ext}"
else:
new = f"{prefix}{stem}{ext}"
elif mode == "date_seq":
date_part = date_fmt[:10] if date_fmt else "nodate"
new = f"{prefix}{date_part}_{index:04d}{ext}"
elif mode == "prefix_seq":
new = f"{prefix}{index:04d}{ext}"
else:
new = f"{prefix}{index:04d}_{stem}{ext}"
if is_fav:
new = fav_prefix + new
return new
# ---------------------------------------------------------------------------
# Horizon detection
# ---------------------------------------------------------------------------
def detect_horizon_angle(path: str) -> float:
"""Returns skew angle in degrees (positive = clockwise). 0.0 if undetermined."""
try:
img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
if img is None:
return 0.0
h, w = img.shape
# Downsample for speed
scale = min(1.0, 800 / max(w, 1))
if scale < 1.0:
img = cv2.resize(img, (int(w * scale), int(h * scale)))
h, w = img.shape
# Focus on middle horizontal band
roi = img[h // 3: 2 * h // 3, :]
edges = cv2.Canny(roi, 50, 150)
lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=60,
minLineLength=w // 6, maxLineGap=15)
if lines is None:
return 0.0
angles = []
for line in lines:
x1, y1, x2, y2 = line[0]
if x2 != x1:
angle = math.degrees(math.atan2(y2 - y1, x2 - x1))
if -20 < angle < 20:
angles.append(angle)
if not angles:
return 0.0
return round(float(np.median(angles)), 1)
except Exception:
return 0.0
# ---------------------------------------------------------------------------
# Feature detection
# ---------------------------------------------------------------------------
_face_cascade = None
def _get_face_cascade():
global _face_cascade
if _face_cascade is None:
path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
_face_cascade = cv2.CascadeClassifier(path)
return _face_cascade
def detect_features(path: str, enabled: Set[str]) -> List[str]:
"""
Detects visual features and returns prefix strings to prepend to filename.
enabled: set of strings from {"qr", "barcode", "face", "pano"}
Returns e.g. ["QR_", "FACE_"]
"""
prefixes = []
try:
img = cv2.imread(path)
if img is None:
return prefixes
h, w = img.shape[:2]
if "qr" in enabled:
data, _, _ = cv2.QRCodeDetector().detectAndDecode(img)
if data:
prefixes.append("QR_")
if "barcode" in enabled:
try:
ok, decoded, _, _ = cv2.barcode.BarcodeDetector().detectAndDecode(img)
if ok and any(decoded):
prefixes.append("BC_")
except Exception:
pass
if "face" in enabled:
scale = min(1.0, 640 / max(w, 1))
small = cv2.resize(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY),
(int(w * scale), int(h * scale))) if scale < 1.0 \
else cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
faces = _get_face_cascade().detectMultiScale(
small, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
if len(faces) == 1:
prefixes.append("FACE_")
elif len(faces) > 1:
prefixes.append("GROUP_")
if "pano" in enabled and w / max(h, 1) > 2.5:
prefixes.append("PANO_")
except Exception:
pass
return prefixes
# ---------------------------------------------------------------------------
# Image processing
# ---------------------------------------------------------------------------
def _load_font(size: int) -> ImageFont.FreeTypeFont:
candidates = [
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf",
"/usr/share/fonts/truetype/freefont/FreeSans.ttf",
]
for c in candidates:
if os.path.exists(c):
try:
return ImageFont.truetype(c, size)
except Exception:
pass
return ImageFont.load_default()
def _wm_xy(img_w: int, img_h: int, elem_w: int, elem_h: int,
position: str, margin: int = 24) -> tuple:
positions = {
"br": (img_w - elem_w - margin, img_h - elem_h - margin),
"bl": (margin, img_h - elem_h - margin),
"tr": (img_w - elem_w - margin, margin),
"tl": (margin, margin),
"bc": ((img_w - elem_w) // 2, img_h - elem_h - margin),
"tc": ((img_w - elem_w) // 2, margin),
"center": ((img_w - elem_w) // 2, (img_h - elem_h) // 2),
}
return positions.get(position, positions["br"])
def apply_corrections(img: Image.Image, rotation: float = 0.0,
brightness: float = 1.0, contrast: float = 1.0,
saturation: float = 1.0) -> Image.Image:
try:
img = ImageOps.exif_transpose(img)
except Exception:
pass
if rotation != 0.0:
img = img.rotate(-rotation, expand=True, resample=Image.BICUBIC)
if brightness != 1.0:
img = ImageEnhance.Brightness(img).enhance(brightness)
if contrast != 1.0:
img = ImageEnhance.Contrast(img).enhance(contrast)
if saturation != 1.0:
img = ImageEnhance.Color(img).enhance(saturation)
return img
def apply_text_watermark(img: Image.Image, text: str, position: str = "br",
font_size: int = 24, opacity: float = 0.7) -> Image.Image:
if not text.strip():
return img
alpha = int(opacity * 255)
font = _load_font(font_size)
base = img.convert("RGBA")
overlay = Image.new("RGBA", base.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
x, y = _wm_xy(base.width, base.height, tw, th, position)
# Shadow
draw.text((x + 2, y + 2), text, font=font, fill=(0, 0, 0, min(255, alpha)))
draw.text((x, y), text, font=font, fill=(255, 255, 255, alpha))
return Image.alpha_composite(base, overlay).convert("RGB")
def apply_image_watermark(img: Image.Image, wm_path: str, position: str = "br",
opacity: float = 0.6, scale: float = 0.2) -> Image.Image:
if not wm_path or not os.path.isfile(wm_path):
return img
try:
wm = Image.open(wm_path).convert("RGBA")
target_w = max(10, int(img.width * scale))
target_h = int(wm.height * target_w / wm.width)
wm = wm.resize((target_w, target_h), Image.LANCZOS)
r, g, b, a = wm.split()
a = a.point(lambda v: int(v * opacity))
wm = Image.merge("RGBA", (r, g, b, a))
base = img.convert("RGBA")
x, y = _wm_xy(base.width, base.height, target_w, target_h, position)
base.paste(wm, (x, y), wm)
return base.convert("RGB")
except Exception:
return img
def process_photo(path: str,
rotation: float = 0.0,
brightness: float = 1.0,
contrast: float = 1.0,
saturation: float = 1.0,
text_watermark: Optional[dict] = None,
image_watermark_path: Optional[str] = None,
image_watermark_settings: Optional[dict] = None,
exif_info: Optional[dict] = None) -> bytes:
"""Process a single photo and return JPEG bytes."""
img = Image.open(path)
img = apply_corrections(img, rotation=rotation, brightness=brightness,
contrast=contrast, saturation=saturation)
if text_watermark:
text = text_watermark.get("text", "")
if exif_info:
text = resolve_wm_template(text, exif_info)
img = apply_text_watermark(
img,
text=text,
position=text_watermark.get("position", "br"),
font_size=text_watermark.get("font_size", 24),
opacity=text_watermark.get("opacity", 0.7),
)
if image_watermark_path and image_watermark_settings:
img = apply_image_watermark(
img,
wm_path=image_watermark_path,
position=image_watermark_settings.get("position", "br"),
opacity=image_watermark_settings.get("opacity", 0.6),
scale=image_watermark_settings.get("scale", 0.2),
)
img = img.convert("RGB")
buf = io.BytesIO()
img.save(buf, "JPEG", quality=92)
return buf.getvalue()
+1
View File
@@ -5,5 +5,6 @@ opencv-python-headless==4.13.0.92
imagehash==4.3.1 imagehash==4.3.1
python-dotenv==1.0.1 python-dotenv==1.0.1
anthropic==0.89.0 anthropic==0.89.0
pillow-heif==1.3.0
pytest==8.1.1 pytest==8.1.1
httpx==0.27.0 httpx==0.27.0
+442 -42
View File
@@ -1,27 +1,98 @@
import io
import os import os
import shutil import shutil
import tempfile
import uuid
import webbrowser import webbrowser
import threading import threading
from typing import List import zipfile
from datetime import date
from time import time
from typing import List, Optional
from dotenv import load_dotenv from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import FileResponse, Response from fastapi.responses import FileResponse, JSONResponse, Response, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel from pydantic import BaseModel, Field
import secrets
import uvicorn import uvicorn
from PIL import Image
try:
from pillow_heif import register_heif_opener
register_heif_opener()
_HEIF_SUPPORTED = True
except ImportError:
_HEIF_SUPPORTED = False
from analyzer import analyze_folder from analyzer import analyze_folder
load_dotenv() load_dotenv()
app = FastAPI(title="Foto-Kurator")
def cleanup_old_uploads():
"""Löscht onlyframes-tmp-Ordner die älter als 24h sind."""
tmp = tempfile.gettempdir()
cutoff = time() - 24 * 3600
for name in os.listdir(tmp):
if name.startswith("onlyframes-") and name != "onlyframes-server.log":
path = os.path.join(tmp, name)
if os.path.isdir(path):
try:
if os.path.getmtime(path) < cutoff:
shutil.rmtree(path, ignore_errors=True)
except OSError:
pass
cleanup_old_uploads()
APP_PASSWORD = os.getenv("APP_PASSWORD", "") # empty = no auth
_SESSION_TTL = 24 * 3600 # tokens expire after 24 h
_sessions: dict = {} # token -> created_at timestamp
app = FastAPI(title="OnlyFrames")
def _purge_expired_sessions():
cutoff = time() - _SESSION_TTL
expired = [t for t, ts in _sessions.items() if ts < cutoff]
for t in expired:
del _sessions[t]
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
if not APP_PASSWORD:
return await call_next(request)
path = request.url.path
# Always allow: login endpoint and the frontend page itself
if path in ("/login", "/"):
return await call_next(request)
# Check Bearer token
auth = request.headers.get("Authorization", "")
token = auth.removeprefix("Bearer ").strip()
_purge_expired_sessions()
if token not in _sessions:
return JSONResponse({"detail": "Nicht autorisiert"}, status_code=401)
return await call_next(request)
@app.post("/login")
def login(payload: dict):
if not APP_PASSWORD:
return {"token": "noauth"}
if payload.get("password") != APP_PASSWORD:
raise HTTPException(status_code=401, detail="Falsches Passwort")
token = secrets.token_hex(24)
_sessions[token] = time()
return {"token": token}
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=["http://localhost:8000"], allow_origins=["*"],
allow_methods=["GET", "POST"], allow_methods=["GET", "POST", "DELETE"],
allow_headers=["Content-Type"], allow_headers=["Content-Type", "Authorization"],
) )
@@ -39,62 +110,229 @@ class MoveRequest(BaseModel):
folder: str folder: str
class ExportRequest(BaseModel):
folder: str
paths: List[str]
fav_paths: List[str] = []
rename_mode: str = "original"
rename_prefix: str = ""
fav_prefix: str = "FAV_"
rotation: float = 0.0
brightness: float = 1.0
contrast: float = 1.0
saturation: float = 1.0
text_watermark: dict = Field(default_factory=dict)
image_watermark_path: str = ""
image_watermark_settings: dict = Field(default_factory=dict)
feature_detectors: List[str] = Field(default_factory=list)
@app.get("/") @app.get("/")
def serve_frontend(): def serve_frontend():
return FileResponse("index.html") return FileResponse("index.html")
UPLOAD_ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".heic", ".heif"}
_HEIC_EXTS = {".heic", ".heif"}
MAX_UPLOAD_BYTES = 50 * 1024 * 1024 # 50 MB per file
@app.post("/upload")
async def upload_files(files: List[UploadFile] = File(...), folder: str = Form("")):
tmp_base = tempfile.gettempdir()
if folder and os.path.isdir(folder) and os.path.abspath(folder).startswith(tmp_base):
tmp_dir = folder
else:
tmp_dir = os.path.join(tmp_base, "onlyframes-" + uuid.uuid4().hex[:8])
os.makedirs(tmp_dir)
saved = 0
for file in files:
ext = os.path.splitext(file.filename or "")[1].lower()
if ext not in UPLOAD_ALLOWED_EXTENSIONS:
continue
raw = await file.read(MAX_UPLOAD_BYTES + 1)
if len(raw) > MAX_UPLOAD_BYTES:
continue
if ext in _HEIC_EXTS and _HEIF_SUPPORTED:
# Convert HEIC/HEIF → JPEG so cv2 and browsers can handle it
safe_name = os.path.splitext(os.path.basename(file.filename))[0] + ".jpg"
dest = os.path.join(tmp_dir, safe_name)
try:
img = Image.open(io.BytesIO(raw)).convert("RGB")
img.save(dest, "JPEG", quality=92)
saved += 1
except Exception:
pass
else:
safe_name = os.path.basename(file.filename)
dest = os.path.join(tmp_dir, safe_name)
with open(dest, "wb") as f:
f.write(raw)
saved += 1
return {"folder": tmp_dir, "count": saved}
@app.get("/download")
def download_kept(folder: str):
folder_abs = os.path.abspath(folder)
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for fname in sorted(os.listdir(folder_abs)):
if fname == "_aussortiert":
continue
fpath = os.path.join(folder_abs, fname)
if os.path.isfile(fpath):
zf.write(fpath, fname)
buf.seek(0)
shutil.rmtree(folder_abs, ignore_errors=True)
filename = f"onlyframes_aussortiert_{date.today().isoformat()}.zip"
return StreamingResponse(
buf,
media_type="application/zip",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@app.get("/browse")
def browse(path: str = "/home/vchuser"):
path = os.path.abspath(path)
if not os.path.isdir(path):
raise HTTPException(status_code=400, detail="Kein gültiger Ordner")
try:
dirs = sorted(
name for name in os.listdir(path)
if not name.startswith(".") and os.path.isdir(os.path.join(path, name))
)
except PermissionError:
raise HTTPException(status_code=403, detail="Kein Zugriff")
parent = os.path.dirname(path) if path != "/" else None
return {"path": path, "parent": parent, "dirs": dirs}
@app.get("/pick-folder") @app.get("/pick-folder")
def pick_folder(): def pick_folder():
import subprocess raise HTTPException(status_code=501, detail="Nicht verfügbar")
result = subprocess.run(
["osascript", "-e", 'POSIX path of (choose folder with prompt "Ordner auswählen")'],
capture_output=True, text=True
)
folder = result.stdout.strip().rstrip("/")
if not folder:
raise HTTPException(status_code=204, detail="Kein Ordner ausgewählt")
return {"folder": folder}
PREVIEW_ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png"} PREVIEW_ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"}
_PREVIEW_MEDIA = {".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".webp": "image/webp"}
@app.get("/preview") @app.get("/preview")
def preview(path: str): def preview(path: str):
ext = os.path.splitext(path)[1].lower() path_abs = os.path.abspath(path)
if not path_abs.startswith(tempfile.gettempdir()):
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
ext = os.path.splitext(path_abs)[1].lower()
if ext not in PREVIEW_ALLOWED_EXTENSIONS: if ext not in PREVIEW_ALLOWED_EXTENSIONS:
raise HTTPException(status_code=403, detail="Dateityp nicht erlaubt") raise HTTPException(status_code=403, detail="Dateityp nicht erlaubt")
if not os.path.isfile(path): if not os.path.isfile(path_abs):
raise HTTPException(status_code=404, detail="Datei nicht gefunden") raise HTTPException(status_code=404, detail="Datei nicht gefunden")
media = "image/jpeg" if ext in (".jpg", ".jpeg") else "image/png" media = _PREVIEW_MEDIA.get(ext, "image/jpeg")
with open(path, "rb") as f: with open(path_abs, "rb") as f:
return Response(content=f.read(), media_type=media) return Response(content=f.read(), media_type=media)
_jobs: dict = {} # job_id -> {"status": "running"|"done"|"error", "result": ..., "error": ...}
_PHASE_LABELS = {
"quality": "Qualität prüfen",
"exact_copies": "Exakte Kopien suchen",
"duplicates": "Duplikate suchen",
"ai": "KI-Analyse",
"done": "Fertig",
}
def _run_analyze_job(job_id: str, req: AnalyzeRequest):
try:
def on_progress(done, total, phase):
_jobs[job_id]["done"] = done
_jobs[job_id]["total"] = total
_jobs[job_id]["phase"] = _PHASE_LABELS.get(phase, phase)
api_key = os.getenv("ANTHROPIC_API_KEY") if req.use_ai else None
results = analyze_folder(
folder=req.folder,
blur_threshold=req.blur_threshold,
over_threshold=req.over_threshold,
under_threshold=req.under_threshold,
dup_threshold=req.dup_threshold,
use_ai=req.use_ai,
api_key=api_key,
progress_callback=on_progress,
)
from analyzer import SUPPORTED_EXTENSIONS
all_paths = {
os.path.join(req.folder, f)
for f in os.listdir(req.folder)
if os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
}
flagged_paths = {item["path"] for item in results}
ok_paths = sorted(all_paths - flagged_paths)
_jobs[job_id] = {"status": "done", "result": {"results": results, "ok_paths": ok_paths}}
except Exception as e:
_jobs[job_id] = {"status": "error", "error": str(e)}
@app.post("/analyze") @app.post("/analyze")
def analyze(req: AnalyzeRequest): def analyze(req: AnalyzeRequest):
if not os.path.isdir(req.folder): if not os.path.isdir(req.folder):
raise HTTPException(status_code=400, detail=f"Ordner nicht gefunden: {req.folder}") raise HTTPException(status_code=400, detail=f"Ordner nicht gefunden: {req.folder}")
api_key = os.getenv("ANTHROPIC_API_KEY") if req.use_ai else None job_id = uuid.uuid4().hex
results = analyze_folder( _jobs[job_id] = {"status": "running"}
folder=req.folder, threading.Thread(target=_run_analyze_job, args=(job_id, req), daemon=True).start()
blur_threshold=req.blur_threshold, return {"job_id": job_id}
over_threshold=req.over_threshold,
under_threshold=req.under_threshold,
dup_threshold=req.dup_threshold, @app.get("/analyze/status/{job_id}")
use_ai=req.use_ai, def analyze_status(job_id: str):
api_key=api_key, job = _jobs.get(job_id)
) if job is None:
from analyzer import SUPPORTED_EXTENSIONS raise HTTPException(status_code=404, detail="Job nicht gefunden")
all_paths = { if job["status"] == "error":
os.path.join(req.folder, f) raise HTTPException(status_code=500, detail=job["error"])
for f in os.listdir(req.folder) if job["status"] == "running":
if os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS return {
} "status": "running",
flagged_paths = {item["path"] for item in results} "done": job.get("done", 0),
ok_paths = sorted(all_paths - flagged_paths) "total": job.get("total", 0),
return {"results": results, "ok_paths": ok_paths} "phase": job.get("phase", "Vorbereitung…"),
}
# done — return result and clean up
result = job["result"]
del _jobs[job_id]
return {"status": "done", **result}
@app.get("/uploads")
def list_uploads():
tmp = tempfile.gettempdir()
sessions = []
for name in sorted(os.listdir(tmp)):
if not name.startswith("onlyframes-") or name == "onlyframes-server.log":
continue
path = os.path.join(tmp, name)
if not os.path.isdir(path):
continue
files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
size = sum(os.path.getsize(os.path.join(path, f)) for f in files)
sessions.append({"folder": path, "id": name, "count": len(files), "size": size})
return {"sessions": sessions}
@app.delete("/uploads")
def delete_upload(folder: str):
folder_abs = os.path.abspath(folder)
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
shutil.rmtree(folder_abs, ignore_errors=True)
return {"ok": True}
@app.post("/move") @app.post("/move")
@@ -120,10 +358,172 @@ def move_files(req: MoveRequest):
return {"moved": moved, "errors": errors} return {"moved": moved, "errors": errors}
_zip_store: dict = {} # zip_id -> path
_ZIP_TTL = 3600 # ZIPs older than 1 h are deleted automatically
def _cleanup_zips():
while True:
import time as _time
_time.sleep(_ZIP_TTL)
cutoff = time() - _ZIP_TTL
for zip_id, path in list(_zip_store.items()):
try:
if os.path.getmtime(path) < cutoff:
os.unlink(path)
_zip_store.pop(zip_id, None)
except OSError:
_zip_store.pop(zip_id, None)
threading.Thread(target=_cleanup_zips, daemon=True).start()
def _run_export_job(job_id: str, req: ExportRequest):
from processor import get_exif_info, get_new_name, process_photo
from analyzer import SUPPORTED_EXTENSIONS
try:
folder_abs = os.path.abspath(req.folder)
fav_set = set(req.fav_paths)
# Collect kept files (what's in folder, minus _aussortiert)
all_files = sorted([
os.path.join(folder_abs, f)
for f in os.listdir(folder_abs)
if os.path.isfile(os.path.join(folder_abs, f))
and os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
])
# Only export paths the client passed (already-kept set)
export_paths = [p for p in all_files if p in set(req.paths)] or all_files
total = len(export_paths)
_jobs[job_id]["total"] = total
buf = io.BytesIO()
used_names: set = set()
detectors = set(req.feature_detectors)
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for i, path in enumerate(export_paths):
exif = get_exif_info(path)
new_name = get_new_name(
path, req.rename_mode, req.rename_prefix,
i + 1, exif, path in fav_set, req.fav_prefix,
)
if detectors:
from processor import detect_features
feat_prefix = "".join(detect_features(path, detectors))
if feat_prefix:
new_name = feat_prefix + new_name
# Deduplicate filenames
base, ext = os.path.splitext(new_name)
candidate, n = new_name, 1
while candidate in used_names:
candidate = f"{base}_{n:03d}{ext}"
n += 1
used_names.add(candidate)
try:
data = process_photo(
path,
rotation=req.rotation,
brightness=req.brightness,
contrast=req.contrast,
saturation=req.saturation,
text_watermark=req.text_watermark or None,
image_watermark_path=req.image_watermark_path or None,
image_watermark_settings=req.image_watermark_settings or None,
exif_info=exif,
)
zf.writestr(candidate, data)
except Exception:
pass
_jobs[job_id]["done"] = i + 1
buf.seek(0)
zip_id = uuid.uuid4().hex
zip_path = os.path.join(tempfile.gettempdir(), f"onlyframes-export-{zip_id}.zip")
with open(zip_path, "wb") as f:
f.write(buf.read())
_zip_store[zip_id] = zip_path
_jobs[job_id] = {"status": "done", "zip_id": zip_id}
except Exception as e:
_jobs[job_id] = {"status": "error", "error": str(e)}
@app.post("/export")
def start_export(req: ExportRequest):
folder_abs = os.path.abspath(req.folder)
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
job_id = uuid.uuid4().hex
_jobs[job_id] = {"status": "running", "done": 0, "total": 0}
threading.Thread(target=_run_export_job, args=(job_id, req), daemon=True).start()
return {"job_id": job_id}
@app.get("/export/status/{job_id}")
def export_status(job_id: str):
job = _jobs.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job nicht gefunden")
if job["status"] == "error":
raise HTTPException(status_code=500, detail=job["error"])
if job["status"] == "running":
return {"status": "running", "done": job.get("done", 0), "total": job.get("total", 0)}
return {"status": "done", "zip_id": job["zip_id"]}
@app.get("/export/download/{zip_id}")
def export_download(zip_id: str):
zip_path = _zip_store.get(zip_id)
if not zip_path or not os.path.isfile(zip_path):
raise HTTPException(status_code=404, detail="Export nicht gefunden")
def stream_and_cleanup():
try:
with open(zip_path, "rb") as f:
yield from iter(lambda: f.read(65536), b"")
finally:
os.unlink(zip_path)
_zip_store.pop(zip_id, None)
filename = f"onlyframes_{date.today().isoformat()}.zip"
return StreamingResponse(
stream_and_cleanup(),
media_type="application/zip",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@app.post("/upload-watermark")
async def upload_watermark(file: UploadFile = File(...), folder: str = Form("")):
ext = os.path.splitext(file.filename or "")[1].lower()
if ext not in {".jpg", ".jpeg", ".png", ".gif", ".webp"}:
raise HTTPException(status_code=400, detail="Ungültiges Format")
tmp_base = tempfile.gettempdir()
dest_dir = folder if (folder and os.path.isdir(folder)
and os.path.abspath(folder).startswith(tmp_base)) else tmp_base
dest = os.path.join(dest_dir, "wm_" + uuid.uuid4().hex[:8] + ext)
with open(dest, "wb") as f:
f.write(await file.read())
return {"path": dest}
@app.get("/detect-angle")
def detect_angle(path: str):
from processor import detect_horizon_angle
path_abs = os.path.abspath(path)
if not path_abs.startswith(tempfile.gettempdir()):
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
if not os.path.isfile(path_abs):
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
return {"angle": detect_horizon_angle(path_abs)}
def open_browser(): def open_browser():
webbrowser.open("http://localhost:8000") webbrowser.open("http://localhost:8000")
if __name__ == "__main__": if __name__ == "__main__":
threading.Timer(1.0, open_browser).start() threading.Timer(1.0, open_browser).start()
uvicorn.run(app, host="127.0.0.1", port=8000) uvicorn.run(app, host="0.0.0.0", port=8000)