Compare commits
23 Commits
8b50a85620
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 5ea2863b70 | |||
| 48b52d7c5e | |||
| 029d6a566a | |||
| 1aded7ff0d | |||
| 433fd93a36 | |||
| 1a2dd00bed | |||
| ead0dd6a0f | |||
| a08777d759 | |||
| 69adfe6abb | |||
| be0a79095f | |||
| 90d47248da | |||
| a194ea3dc0 | |||
| 9f44b8c4f2 | |||
| a90c542d9f | |||
| 2ee0d055fa | |||
| 3057538642 | |||
| 35dccd4f1b | |||
| 8a80021983 | |||
| 3dcf4bf5e8 | |||
| c58817becc | |||
| 9f4985a444 | |||
| 08dabfb57c | |||
| ccf878485d |
@@ -1,2 +0,0 @@
|
||||
# Claude Vision API Key (optional — nur für KI-Analyse benötigt)
|
||||
ANTHROPIC_API_KEY=sk-ant-...
|
||||
@@ -0,0 +1,7 @@
|
||||
.env
|
||||
.env.example
|
||||
venv/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
*.pyo
|
||||
.pytest_cache/
|
||||
@@ -0,0 +1 @@
|
||||
Foto-Kurator hilft Fotografen dabei, schlechte Fotos aus einem Shooting schnell auszusortieren. Die App analysiert einen Ordner automatisch auf unscharfe, über- oder unterbelichtete Fotos sowie Duplikate — optional auch mit KI. Bevor Fotos verschoben werden, zeigt die App eine Übersicht zur manuellen Kontrolle an.
|
||||
@@ -0,0 +1,39 @@
|
||||
# Foto-Kurator
|
||||
|
||||
Automatisches Aussortieren von Fotos nach Qualitätskriterien.
|
||||
|
||||
## Setup
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
Für KI-Analyse (optional):
|
||||
```bash
|
||||
echo "ANTHROPIC_API_KEY=your_key_here" > .env
|
||||
```
|
||||
|
||||
## Starten
|
||||
|
||||
```bash
|
||||
python server.py
|
||||
```
|
||||
|
||||
Der Browser öffnet automatisch http://localhost:8000.
|
||||
|
||||
## Kriterien
|
||||
|
||||
- **Unscharf** — Laplacian Variance (einstellbar)
|
||||
- **Überbelichtet / Unterbelichtet** — Durchschnittliche Helligkeit (einstellbar)
|
||||
- **Duplikate** — Perceptual Hashing (einstellbar)
|
||||
- **KI-Analyse** — Claude Vision API (optional, ca. 0,003 € / Foto)
|
||||
|
||||
Aussortierte Fotos landen in `_aussortiert/` im analysierten Ordner.
|
||||
|
||||
## Tech Stack
|
||||
|
||||
- **Backend:** Python 3, FastAPI, Uvicorn
|
||||
- **Bildanalyse:** OpenCV (Laplacian Variance), Pillow, ImageHash (pHash/MD5)
|
||||
- **KI-Analyse (optional):** Anthropic Claude Vision API
|
||||
- **Frontend:** Vanilla HTML/CSS/JavaScript (kein Framework)
|
||||
- **Konfiguration:** python-dotenv
|
||||
+50
-4
@@ -33,6 +33,28 @@ def is_underexposed(path: str, threshold: float = 30.0) -> bool:
|
||||
return _mean_brightness(path) < threshold
|
||||
|
||||
|
||||
def find_exact_copies(paths: List[str]) -> List[List[str]]:
|
||||
"""
|
||||
Findet exakte Kopien anhand von MD5-Hash (byte-identische Dateien).
|
||||
Das erste Element jeder Gruppe gilt als Original, der Rest als Kopien.
|
||||
"""
|
||||
import hashlib
|
||||
|
||||
hashes: dict = {}
|
||||
for path in paths:
|
||||
try:
|
||||
h = hashlib.md5()
|
||||
with open(path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(65536), b""):
|
||||
h.update(chunk)
|
||||
digest = h.hexdigest()
|
||||
hashes.setdefault(digest, []).append(path)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return [group for group in hashes.values() if len(group) > 1]
|
||||
|
||||
|
||||
def find_duplicates(paths: List[str], threshold: int = 8) -> List[List[str]]:
|
||||
"""
|
||||
Findet Gruppen aehnlicher Bilder via perceptual hashing.
|
||||
@@ -67,7 +89,7 @@ def find_duplicates(paths: List[str], threshold: int = 8) -> List[List[str]]:
|
||||
return groups
|
||||
|
||||
|
||||
SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png"}
|
||||
SUPPORTED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"}
|
||||
|
||||
|
||||
def _analyze_with_ai(paths: List[str], api_key: str) -> dict:
|
||||
@@ -130,21 +152,28 @@ def analyze_folder(
|
||||
dup_threshold: int = 8,
|
||||
use_ai: bool = False,
|
||||
api_key: Optional[str] = None,
|
||||
progress_callback=None, # callable(done: int, total: int, phase: str)
|
||||
) -> List[dict]:
|
||||
"""
|
||||
Analysiert alle Bilder im Ordner.
|
||||
Gibt Liste zurueck: [{"path": "/foo/bar.jpg", "reasons": ["unscharf"]}, ...]
|
||||
Nur Bilder mit mindestens einem Grund werden zurueckgegeben.
|
||||
"""
|
||||
def report(done, total, phase):
|
||||
if progress_callback:
|
||||
progress_callback(done, total, phase)
|
||||
|
||||
paths = [
|
||||
os.path.join(folder, f)
|
||||
for f in os.listdir(folder)
|
||||
if os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
|
||||
]
|
||||
total = len(paths)
|
||||
|
||||
results: dict = {path: [] for path in paths}
|
||||
|
||||
for path in paths:
|
||||
# Phase 1: Qualitätsanalyse pro Foto (macht ~70% der Arbeit aus)
|
||||
for i, path in enumerate(paths):
|
||||
try:
|
||||
if is_blurry(path, blur_threshold):
|
||||
results[path].append("unscharf")
|
||||
@@ -153,19 +182,36 @@ def analyze_folder(
|
||||
if is_underexposed(path, under_threshold):
|
||||
results[path].append("unterbelichtet")
|
||||
except Exception:
|
||||
continue
|
||||
pass
|
||||
report(i + 1, total, "quality")
|
||||
|
||||
dup_groups = find_duplicates(paths, dup_threshold)
|
||||
# Phase 2: Exakte Kopien (MD5)
|
||||
report(total, total, "exact_copies")
|
||||
exact_copy_paths: set = set()
|
||||
exact_groups = find_exact_copies(paths)
|
||||
for group in exact_groups:
|
||||
original = os.path.basename(group[0])
|
||||
for copy_path in group[1:]:
|
||||
results[copy_path].append(f"exakte Kopie von {original}")
|
||||
exact_copy_paths.add(copy_path)
|
||||
|
||||
# Phase 3: Duplikate (pHash)
|
||||
report(total, total, "duplicates")
|
||||
dup_paths = [p for p in paths if p not in exact_copy_paths]
|
||||
dup_groups = find_duplicates(dup_paths, dup_threshold)
|
||||
for group in dup_groups:
|
||||
original = os.path.basename(group[0])
|
||||
for dup_path in group[1:]:
|
||||
results[dup_path].append(f"Duplikat von {original}")
|
||||
|
||||
# Phase 4: KI-Analyse (optional)
|
||||
if use_ai and api_key:
|
||||
report(total, total, "ai")
|
||||
ai_results = _analyze_with_ai(paths, api_key)
|
||||
for path, ai_reasons in ai_results.items():
|
||||
results[path].extend(ai_reasons)
|
||||
|
||||
report(total, total, "done")
|
||||
return [
|
||||
{"path": path, "reasons": reasons}
|
||||
for path, reasons in results.items()
|
||||
|
||||
+1775
File diff suppressed because it is too large
Load Diff
+321
@@ -0,0 +1,321 @@
|
||||
import io
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime
|
||||
from typing import List, Optional, Set
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image, ImageDraw, ImageEnhance, ImageFont, ImageOps
|
||||
from PIL.ExifTags import TAGS
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# EXIF helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_exif_info(path: str) -> dict:
|
||||
"""Returns dict with keys: date_str, date_formatted, camera, lens."""
|
||||
info = {"date_str": None, "date_formatted": None, "camera": None, "lens": None}
|
||||
try:
|
||||
img = Image.open(path)
|
||||
exif_raw = img.getexif()
|
||||
if not exif_raw:
|
||||
return info
|
||||
exif = {TAGS.get(k, k): v for k, v in exif_raw.items()}
|
||||
|
||||
# Date
|
||||
date_str = exif.get("DateTimeOriginal") or exif.get("DateTime")
|
||||
if date_str and isinstance(date_str, str):
|
||||
info["date_str"] = date_str
|
||||
# "2024:07:15 14:30:22" -> "2024-07-15_143022"
|
||||
clean = date_str.replace(":", "-", 2).replace(" ", "_").replace(":", "")
|
||||
info["date_formatted"] = clean
|
||||
|
||||
# Camera
|
||||
make = str(exif.get("Make", "")).strip().rstrip("\x00")
|
||||
model = str(exif.get("Model", "")).strip().rstrip("\x00")
|
||||
if model:
|
||||
info["camera"] = model if make and make in model else f"{make} {model}".strip()
|
||||
|
||||
# Lens
|
||||
lens = str(exif.get("LensModel", "")).strip().rstrip("\x00")
|
||||
if lens:
|
||||
info["lens"] = lens
|
||||
except Exception:
|
||||
pass
|
||||
return info
|
||||
|
||||
|
||||
def resolve_wm_template(template: str, exif: dict) -> str:
|
||||
"""Replaces {date}, {time}, {camera}, {lens} in watermark text."""
|
||||
date_str = exif.get("date_str") or ""
|
||||
date_part = date_str[:10].replace(":", "-") if date_str else ""
|
||||
time_part = date_str[11:] if len(date_str) > 10 else ""
|
||||
return (template
|
||||
.replace("{date}", date_part)
|
||||
.replace("{time}", time_part)
|
||||
.replace("{camera}", exif.get("camera") or "")
|
||||
.replace("{lens}", exif.get("lens") or ""))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rename
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_new_name(original_path: str, mode: str, prefix: str,
|
||||
index: int, exif_info: dict,
|
||||
is_fav: bool, fav_prefix: str) -> str:
|
||||
original = os.path.basename(original_path)
|
||||
stem, ext = os.path.splitext(original)
|
||||
ext = ext.lower() or ".jpg"
|
||||
|
||||
date_fmt = exif_info.get("date_formatted") # "2024-07-15_143022"
|
||||
|
||||
if mode == "original":
|
||||
new = prefix + original if prefix else original
|
||||
elif mode == "datetime":
|
||||
if date_fmt:
|
||||
new = f"{prefix}{date_fmt}{ext}"
|
||||
else:
|
||||
new = f"{prefix}{stem}{ext}"
|
||||
elif mode == "date_seq":
|
||||
date_part = date_fmt[:10] if date_fmt else "nodate"
|
||||
new = f"{prefix}{date_part}_{index:04d}{ext}"
|
||||
elif mode == "prefix_seq":
|
||||
new = f"{prefix}{index:04d}{ext}"
|
||||
else:
|
||||
new = f"{prefix}{index:04d}_{stem}{ext}"
|
||||
|
||||
if is_fav:
|
||||
new = fav_prefix + new
|
||||
return new
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Horizon detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def detect_horizon_angle(path: str) -> float:
|
||||
"""Returns skew angle in degrees (positive = clockwise). 0.0 if undetermined."""
|
||||
try:
|
||||
img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
|
||||
if img is None:
|
||||
return 0.0
|
||||
h, w = img.shape
|
||||
# Downsample for speed
|
||||
scale = min(1.0, 800 / max(w, 1))
|
||||
if scale < 1.0:
|
||||
img = cv2.resize(img, (int(w * scale), int(h * scale)))
|
||||
h, w = img.shape
|
||||
# Focus on middle horizontal band
|
||||
roi = img[h // 3: 2 * h // 3, :]
|
||||
edges = cv2.Canny(roi, 50, 150)
|
||||
lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=60,
|
||||
minLineLength=w // 6, maxLineGap=15)
|
||||
if lines is None:
|
||||
return 0.0
|
||||
angles = []
|
||||
for line in lines:
|
||||
x1, y1, x2, y2 = line[0]
|
||||
if x2 != x1:
|
||||
angle = math.degrees(math.atan2(y2 - y1, x2 - x1))
|
||||
if -20 < angle < 20:
|
||||
angles.append(angle)
|
||||
if not angles:
|
||||
return 0.0
|
||||
return round(float(np.median(angles)), 1)
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Feature detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_face_cascade = None
|
||||
|
||||
|
||||
def _get_face_cascade():
|
||||
global _face_cascade
|
||||
if _face_cascade is None:
|
||||
path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
|
||||
_face_cascade = cv2.CascadeClassifier(path)
|
||||
return _face_cascade
|
||||
|
||||
|
||||
def detect_features(path: str, enabled: Set[str]) -> List[str]:
|
||||
"""
|
||||
Detects visual features and returns prefix strings to prepend to filename.
|
||||
enabled: set of strings from {"qr", "barcode", "face", "pano"}
|
||||
Returns e.g. ["QR_", "FACE_"]
|
||||
"""
|
||||
prefixes = []
|
||||
try:
|
||||
img = cv2.imread(path)
|
||||
if img is None:
|
||||
return prefixes
|
||||
h, w = img.shape[:2]
|
||||
|
||||
if "qr" in enabled:
|
||||
data, _, _ = cv2.QRCodeDetector().detectAndDecode(img)
|
||||
if data:
|
||||
prefixes.append("QR_")
|
||||
|
||||
if "barcode" in enabled:
|
||||
try:
|
||||
ok, decoded, _, _ = cv2.barcode.BarcodeDetector().detectAndDecode(img)
|
||||
if ok and any(decoded):
|
||||
prefixes.append("BC_")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if "face" in enabled:
|
||||
scale = min(1.0, 640 / max(w, 1))
|
||||
small = cv2.resize(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY),
|
||||
(int(w * scale), int(h * scale))) if scale < 1.0 \
|
||||
else cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||||
faces = _get_face_cascade().detectMultiScale(
|
||||
small, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
|
||||
if len(faces) == 1:
|
||||
prefixes.append("FACE_")
|
||||
elif len(faces) > 1:
|
||||
prefixes.append("GROUP_")
|
||||
|
||||
if "pano" in enabled and w / max(h, 1) > 2.5:
|
||||
prefixes.append("PANO_")
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
return prefixes
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Image processing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _load_font(size: int) -> ImageFont.FreeTypeFont:
|
||||
candidates = [
|
||||
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
||||
"/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf",
|
||||
"/usr/share/fonts/truetype/freefont/FreeSans.ttf",
|
||||
]
|
||||
for c in candidates:
|
||||
if os.path.exists(c):
|
||||
try:
|
||||
return ImageFont.truetype(c, size)
|
||||
except Exception:
|
||||
pass
|
||||
return ImageFont.load_default()
|
||||
|
||||
|
||||
def _wm_xy(img_w: int, img_h: int, elem_w: int, elem_h: int,
|
||||
position: str, margin: int = 24) -> tuple:
|
||||
positions = {
|
||||
"br": (img_w - elem_w - margin, img_h - elem_h - margin),
|
||||
"bl": (margin, img_h - elem_h - margin),
|
||||
"tr": (img_w - elem_w - margin, margin),
|
||||
"tl": (margin, margin),
|
||||
"bc": ((img_w - elem_w) // 2, img_h - elem_h - margin),
|
||||
"tc": ((img_w - elem_w) // 2, margin),
|
||||
"center": ((img_w - elem_w) // 2, (img_h - elem_h) // 2),
|
||||
}
|
||||
return positions.get(position, positions["br"])
|
||||
|
||||
|
||||
def apply_corrections(img: Image.Image, rotation: float = 0.0,
|
||||
brightness: float = 1.0, contrast: float = 1.0,
|
||||
saturation: float = 1.0) -> Image.Image:
|
||||
try:
|
||||
img = ImageOps.exif_transpose(img)
|
||||
except Exception:
|
||||
pass
|
||||
if rotation != 0.0:
|
||||
img = img.rotate(-rotation, expand=True, resample=Image.BICUBIC)
|
||||
if brightness != 1.0:
|
||||
img = ImageEnhance.Brightness(img).enhance(brightness)
|
||||
if contrast != 1.0:
|
||||
img = ImageEnhance.Contrast(img).enhance(contrast)
|
||||
if saturation != 1.0:
|
||||
img = ImageEnhance.Color(img).enhance(saturation)
|
||||
return img
|
||||
|
||||
|
||||
def apply_text_watermark(img: Image.Image, text: str, position: str = "br",
|
||||
font_size: int = 24, opacity: float = 0.7) -> Image.Image:
|
||||
if not text.strip():
|
||||
return img
|
||||
alpha = int(opacity * 255)
|
||||
font = _load_font(font_size)
|
||||
base = img.convert("RGBA")
|
||||
overlay = Image.new("RGBA", base.size, (0, 0, 0, 0))
|
||||
draw = ImageDraw.Draw(overlay)
|
||||
bbox = draw.textbbox((0, 0), text, font=font)
|
||||
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
|
||||
x, y = _wm_xy(base.width, base.height, tw, th, position)
|
||||
# Shadow
|
||||
draw.text((x + 2, y + 2), text, font=font, fill=(0, 0, 0, min(255, alpha)))
|
||||
draw.text((x, y), text, font=font, fill=(255, 255, 255, alpha))
|
||||
return Image.alpha_composite(base, overlay).convert("RGB")
|
||||
|
||||
|
||||
def apply_image_watermark(img: Image.Image, wm_path: str, position: str = "br",
|
||||
opacity: float = 0.6, scale: float = 0.2) -> Image.Image:
|
||||
if not wm_path or not os.path.isfile(wm_path):
|
||||
return img
|
||||
try:
|
||||
wm = Image.open(wm_path).convert("RGBA")
|
||||
target_w = max(10, int(img.width * scale))
|
||||
target_h = int(wm.height * target_w / wm.width)
|
||||
wm = wm.resize((target_w, target_h), Image.LANCZOS)
|
||||
r, g, b, a = wm.split()
|
||||
a = a.point(lambda v: int(v * opacity))
|
||||
wm = Image.merge("RGBA", (r, g, b, a))
|
||||
base = img.convert("RGBA")
|
||||
x, y = _wm_xy(base.width, base.height, target_w, target_h, position)
|
||||
base.paste(wm, (x, y), wm)
|
||||
return base.convert("RGB")
|
||||
except Exception:
|
||||
return img
|
||||
|
||||
|
||||
def process_photo(path: str,
|
||||
rotation: float = 0.0,
|
||||
brightness: float = 1.0,
|
||||
contrast: float = 1.0,
|
||||
saturation: float = 1.0,
|
||||
text_watermark: Optional[dict] = None,
|
||||
image_watermark_path: Optional[str] = None,
|
||||
image_watermark_settings: Optional[dict] = None,
|
||||
exif_info: Optional[dict] = None) -> bytes:
|
||||
"""Process a single photo and return JPEG bytes."""
|
||||
img = Image.open(path)
|
||||
img = apply_corrections(img, rotation=rotation, brightness=brightness,
|
||||
contrast=contrast, saturation=saturation)
|
||||
|
||||
if text_watermark:
|
||||
text = text_watermark.get("text", "")
|
||||
if exif_info:
|
||||
text = resolve_wm_template(text, exif_info)
|
||||
img = apply_text_watermark(
|
||||
img,
|
||||
text=text,
|
||||
position=text_watermark.get("position", "br"),
|
||||
font_size=text_watermark.get("font_size", 24),
|
||||
opacity=text_watermark.get("opacity", 0.7),
|
||||
)
|
||||
|
||||
if image_watermark_path and image_watermark_settings:
|
||||
img = apply_image_watermark(
|
||||
img,
|
||||
wm_path=image_watermark_path,
|
||||
position=image_watermark_settings.get("position", "br"),
|
||||
opacity=image_watermark_settings.get("opacity", 0.6),
|
||||
scale=image_watermark_settings.get("scale", 0.2),
|
||||
)
|
||||
|
||||
img = img.convert("RGB")
|
||||
buf = io.BytesIO()
|
||||
img.save(buf, "JPEG", quality=92)
|
||||
return buf.getvalue()
|
||||
+2
-1
@@ -4,6 +4,7 @@ pillow==12.2.0
|
||||
opencv-python-headless==4.13.0.92
|
||||
imagehash==4.3.1
|
||||
python-dotenv==1.0.1
|
||||
anthropic==0.25.0
|
||||
anthropic==0.89.0
|
||||
pillow-heif==1.3.0
|
||||
pytest==8.1.1
|
||||
httpx==0.27.0
|
||||
|
||||
@@ -1,27 +1,98 @@
|
||||
import io
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import uuid
|
||||
import webbrowser
|
||||
import threading
|
||||
from typing import List
|
||||
import zipfile
|
||||
from datetime import date
|
||||
from time import time
|
||||
from typing import List, Optional
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.responses import FileResponse, Response
|
||||
from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
|
||||
from fastapi.responses import FileResponse, JSONResponse, Response, StreamingResponse
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
import secrets
|
||||
import uvicorn
|
||||
from PIL import Image
|
||||
try:
|
||||
from pillow_heif import register_heif_opener
|
||||
register_heif_opener()
|
||||
_HEIF_SUPPORTED = True
|
||||
except ImportError:
|
||||
_HEIF_SUPPORTED = False
|
||||
|
||||
from analyzer import analyze_folder
|
||||
|
||||
load_dotenv()
|
||||
|
||||
app = FastAPI(title="Foto-Kurator")
|
||||
|
||||
def cleanup_old_uploads():
|
||||
"""Löscht onlyframes-tmp-Ordner die älter als 24h sind."""
|
||||
tmp = tempfile.gettempdir()
|
||||
cutoff = time() - 24 * 3600
|
||||
for name in os.listdir(tmp):
|
||||
if name.startswith("onlyframes-") and name != "onlyframes-server.log":
|
||||
path = os.path.join(tmp, name)
|
||||
if os.path.isdir(path):
|
||||
try:
|
||||
if os.path.getmtime(path) < cutoff:
|
||||
shutil.rmtree(path, ignore_errors=True)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
cleanup_old_uploads()
|
||||
|
||||
APP_PASSWORD = os.getenv("APP_PASSWORD", "") # empty = no auth
|
||||
_SESSION_TTL = 24 * 3600 # tokens expire after 24 h
|
||||
_sessions: dict = {} # token -> created_at timestamp
|
||||
|
||||
app = FastAPI(title="OnlyFrames")
|
||||
|
||||
|
||||
def _purge_expired_sessions():
|
||||
cutoff = time() - _SESSION_TTL
|
||||
expired = [t for t, ts in _sessions.items() if ts < cutoff]
|
||||
for t in expired:
|
||||
del _sessions[t]
|
||||
|
||||
|
||||
@app.middleware("http")
|
||||
async def auth_middleware(request: Request, call_next):
|
||||
if not APP_PASSWORD:
|
||||
return await call_next(request)
|
||||
path = request.url.path
|
||||
# Always allow: login endpoint and the frontend page itself
|
||||
if path in ("/login", "/"):
|
||||
return await call_next(request)
|
||||
# Check Bearer token
|
||||
auth = request.headers.get("Authorization", "")
|
||||
token = auth.removeprefix("Bearer ").strip()
|
||||
_purge_expired_sessions()
|
||||
if token not in _sessions:
|
||||
return JSONResponse({"detail": "Nicht autorisiert"}, status_code=401)
|
||||
return await call_next(request)
|
||||
|
||||
|
||||
@app.post("/login")
|
||||
def login(payload: dict):
|
||||
if not APP_PASSWORD:
|
||||
return {"token": "noauth"}
|
||||
if payload.get("password") != APP_PASSWORD:
|
||||
raise HTTPException(status_code=401, detail="Falsches Passwort")
|
||||
token = secrets.token_hex(24)
|
||||
_sessions[token] = time()
|
||||
return {"token": token}
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["http://localhost:8000"],
|
||||
allow_methods=["GET", "POST"],
|
||||
allow_headers=["Content-Type"],
|
||||
allow_origins=["*"],
|
||||
allow_methods=["GET", "POST", "DELETE"],
|
||||
allow_headers=["Content-Type", "Authorization"],
|
||||
)
|
||||
|
||||
|
||||
@@ -39,58 +110,420 @@ class MoveRequest(BaseModel):
|
||||
folder: str
|
||||
|
||||
|
||||
class ExportRequest(BaseModel):
|
||||
folder: str
|
||||
paths: List[str]
|
||||
fav_paths: List[str] = []
|
||||
rename_mode: str = "original"
|
||||
rename_prefix: str = ""
|
||||
fav_prefix: str = "FAV_"
|
||||
rotation: float = 0.0
|
||||
brightness: float = 1.0
|
||||
contrast: float = 1.0
|
||||
saturation: float = 1.0
|
||||
text_watermark: dict = Field(default_factory=dict)
|
||||
image_watermark_path: str = ""
|
||||
image_watermark_settings: dict = Field(default_factory=dict)
|
||||
feature_detectors: List[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
def serve_frontend():
|
||||
return FileResponse("index.html")
|
||||
|
||||
|
||||
UPLOAD_ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".heic", ".heif"}
|
||||
_HEIC_EXTS = {".heic", ".heif"}
|
||||
MAX_UPLOAD_BYTES = 50 * 1024 * 1024 # 50 MB per file
|
||||
|
||||
|
||||
@app.post("/upload")
|
||||
async def upload_files(files: List[UploadFile] = File(...), folder: str = Form("")):
|
||||
tmp_base = tempfile.gettempdir()
|
||||
if folder and os.path.isdir(folder) and os.path.abspath(folder).startswith(tmp_base):
|
||||
tmp_dir = folder
|
||||
else:
|
||||
tmp_dir = os.path.join(tmp_base, "onlyframes-" + uuid.uuid4().hex[:8])
|
||||
os.makedirs(tmp_dir)
|
||||
saved = 0
|
||||
for file in files:
|
||||
ext = os.path.splitext(file.filename or "")[1].lower()
|
||||
if ext not in UPLOAD_ALLOWED_EXTENSIONS:
|
||||
continue
|
||||
raw = await file.read(MAX_UPLOAD_BYTES + 1)
|
||||
if len(raw) > MAX_UPLOAD_BYTES:
|
||||
continue
|
||||
if ext in _HEIC_EXTS and _HEIF_SUPPORTED:
|
||||
# Convert HEIC/HEIF → JPEG so cv2 and browsers can handle it
|
||||
safe_name = os.path.splitext(os.path.basename(file.filename))[0] + ".jpg"
|
||||
dest = os.path.join(tmp_dir, safe_name)
|
||||
try:
|
||||
img = Image.open(io.BytesIO(raw)).convert("RGB")
|
||||
img.save(dest, "JPEG", quality=92)
|
||||
saved += 1
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
safe_name = os.path.basename(file.filename)
|
||||
dest = os.path.join(tmp_dir, safe_name)
|
||||
with open(dest, "wb") as f:
|
||||
f.write(raw)
|
||||
saved += 1
|
||||
return {"folder": tmp_dir, "count": saved}
|
||||
|
||||
|
||||
@app.get("/download")
|
||||
def download_kept(folder: str):
|
||||
folder_abs = os.path.abspath(folder)
|
||||
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
|
||||
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
|
||||
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
for fname in sorted(os.listdir(folder_abs)):
|
||||
if fname == "_aussortiert":
|
||||
continue
|
||||
fpath = os.path.join(folder_abs, fname)
|
||||
if os.path.isfile(fpath):
|
||||
zf.write(fpath, fname)
|
||||
buf.seek(0)
|
||||
shutil.rmtree(folder_abs, ignore_errors=True)
|
||||
|
||||
filename = f"onlyframes_aussortiert_{date.today().isoformat()}.zip"
|
||||
return StreamingResponse(
|
||||
buf,
|
||||
media_type="application/zip",
|
||||
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/browse")
|
||||
def browse(path: str = "/home/vchuser"):
|
||||
path = os.path.abspath(path)
|
||||
if not os.path.isdir(path):
|
||||
raise HTTPException(status_code=400, detail="Kein gültiger Ordner")
|
||||
try:
|
||||
dirs = sorted(
|
||||
name for name in os.listdir(path)
|
||||
if not name.startswith(".") and os.path.isdir(os.path.join(path, name))
|
||||
)
|
||||
except PermissionError:
|
||||
raise HTTPException(status_code=403, detail="Kein Zugriff")
|
||||
parent = os.path.dirname(path) if path != "/" else None
|
||||
return {"path": path, "parent": parent, "dirs": dirs}
|
||||
|
||||
|
||||
@app.get("/pick-folder")
|
||||
def pick_folder():
|
||||
raise HTTPException(status_code=501, detail="Nicht verfügbar")
|
||||
|
||||
|
||||
PREVIEW_ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"}
|
||||
_PREVIEW_MEDIA = {".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".webp": "image/webp"}
|
||||
|
||||
|
||||
@app.get("/preview")
|
||||
def preview(path: str):
|
||||
if not os.path.isfile(path):
|
||||
path_abs = os.path.abspath(path)
|
||||
if not path_abs.startswith(tempfile.gettempdir()):
|
||||
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
|
||||
ext = os.path.splitext(path_abs)[1].lower()
|
||||
if ext not in PREVIEW_ALLOWED_EXTENSIONS:
|
||||
raise HTTPException(status_code=403, detail="Dateityp nicht erlaubt")
|
||||
if not os.path.isfile(path_abs):
|
||||
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
|
||||
ext = os.path.splitext(path)[1].lower()
|
||||
media = "image/jpeg" if ext in (".jpg", ".jpeg") else "image/png"
|
||||
with open(path, "rb") as f:
|
||||
media = _PREVIEW_MEDIA.get(ext, "image/jpeg")
|
||||
with open(path_abs, "rb") as f:
|
||||
return Response(content=f.read(), media_type=media)
|
||||
|
||||
|
||||
_jobs: dict = {} # job_id -> {"status": "running"|"done"|"error", "result": ..., "error": ...}
|
||||
|
||||
|
||||
_PHASE_LABELS = {
|
||||
"quality": "Qualität prüfen",
|
||||
"exact_copies": "Exakte Kopien suchen",
|
||||
"duplicates": "Duplikate suchen",
|
||||
"ai": "KI-Analyse",
|
||||
"done": "Fertig",
|
||||
}
|
||||
|
||||
|
||||
def _run_analyze_job(job_id: str, req: AnalyzeRequest):
|
||||
try:
|
||||
def on_progress(done, total, phase):
|
||||
_jobs[job_id]["done"] = done
|
||||
_jobs[job_id]["total"] = total
|
||||
_jobs[job_id]["phase"] = _PHASE_LABELS.get(phase, phase)
|
||||
|
||||
api_key = os.getenv("ANTHROPIC_API_KEY") if req.use_ai else None
|
||||
results = analyze_folder(
|
||||
folder=req.folder,
|
||||
blur_threshold=req.blur_threshold,
|
||||
over_threshold=req.over_threshold,
|
||||
under_threshold=req.under_threshold,
|
||||
dup_threshold=req.dup_threshold,
|
||||
use_ai=req.use_ai,
|
||||
api_key=api_key,
|
||||
progress_callback=on_progress,
|
||||
)
|
||||
from analyzer import SUPPORTED_EXTENSIONS
|
||||
all_paths = {
|
||||
os.path.join(req.folder, f)
|
||||
for f in os.listdir(req.folder)
|
||||
if os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
|
||||
}
|
||||
flagged_paths = {item["path"] for item in results}
|
||||
ok_paths = sorted(all_paths - flagged_paths)
|
||||
_jobs[job_id] = {"status": "done", "result": {"results": results, "ok_paths": ok_paths}}
|
||||
except Exception as e:
|
||||
_jobs[job_id] = {"status": "error", "error": str(e)}
|
||||
|
||||
|
||||
@app.post("/analyze")
|
||||
def analyze(req: AnalyzeRequest):
|
||||
if not os.path.isdir(req.folder):
|
||||
raise HTTPException(status_code=400, detail=f"Ordner nicht gefunden: {req.folder}")
|
||||
api_key = os.getenv("ANTHROPIC_API_KEY") if req.use_ai else None
|
||||
results = analyze_folder(
|
||||
folder=req.folder,
|
||||
blur_threshold=req.blur_threshold,
|
||||
over_threshold=req.over_threshold,
|
||||
under_threshold=req.under_threshold,
|
||||
dup_threshold=req.dup_threshold,
|
||||
use_ai=req.use_ai,
|
||||
api_key=api_key,
|
||||
)
|
||||
return {"results": results}
|
||||
job_id = uuid.uuid4().hex
|
||||
_jobs[job_id] = {"status": "running"}
|
||||
threading.Thread(target=_run_analyze_job, args=(job_id, req), daemon=True).start()
|
||||
return {"job_id": job_id}
|
||||
|
||||
|
||||
@app.get("/analyze/status/{job_id}")
|
||||
def analyze_status(job_id: str):
|
||||
job = _jobs.get(job_id)
|
||||
if job is None:
|
||||
raise HTTPException(status_code=404, detail="Job nicht gefunden")
|
||||
if job["status"] == "error":
|
||||
raise HTTPException(status_code=500, detail=job["error"])
|
||||
if job["status"] == "running":
|
||||
return {
|
||||
"status": "running",
|
||||
"done": job.get("done", 0),
|
||||
"total": job.get("total", 0),
|
||||
"phase": job.get("phase", "Vorbereitung…"),
|
||||
}
|
||||
# done — return result and clean up
|
||||
result = job["result"]
|
||||
del _jobs[job_id]
|
||||
return {"status": "done", **result}
|
||||
|
||||
|
||||
@app.get("/uploads")
|
||||
def list_uploads():
|
||||
tmp = tempfile.gettempdir()
|
||||
sessions = []
|
||||
for name in sorted(os.listdir(tmp)):
|
||||
if not name.startswith("onlyframes-") or name == "onlyframes-server.log":
|
||||
continue
|
||||
path = os.path.join(tmp, name)
|
||||
if not os.path.isdir(path):
|
||||
continue
|
||||
files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
|
||||
size = sum(os.path.getsize(os.path.join(path, f)) for f in files)
|
||||
sessions.append({"folder": path, "id": name, "count": len(files), "size": size})
|
||||
return {"sessions": sessions}
|
||||
|
||||
|
||||
@app.delete("/uploads")
|
||||
def delete_upload(folder: str):
|
||||
folder_abs = os.path.abspath(folder)
|
||||
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
|
||||
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
|
||||
shutil.rmtree(folder_abs, ignore_errors=True)
|
||||
return {"ok": True}
|
||||
|
||||
|
||||
@app.post("/move")
|
||||
def move_files(req: MoveRequest):
|
||||
target_dir = os.path.join(req.folder, "_aussortiert")
|
||||
folder_abs = os.path.abspath(req.folder)
|
||||
if not os.path.isdir(folder_abs):
|
||||
raise HTTPException(status_code=400, detail=f"Ordner nicht gefunden: {req.folder}")
|
||||
target_dir = os.path.join(folder_abs, "_aussortiert")
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
moved = []
|
||||
errors = []
|
||||
for path in req.paths:
|
||||
path_abs = os.path.abspath(path)
|
||||
if not path_abs.startswith(folder_abs + os.sep):
|
||||
errors.append({"path": path, "error": "Pfad liegt außerhalb des analysierten Ordners"})
|
||||
continue
|
||||
try:
|
||||
dest = os.path.join(target_dir, os.path.basename(path))
|
||||
shutil.move(path, dest)
|
||||
dest = os.path.join(target_dir, os.path.basename(path_abs))
|
||||
shutil.move(path_abs, dest)
|
||||
moved.append(path)
|
||||
except Exception as e:
|
||||
errors.append({"path": path, "error": str(e)})
|
||||
return {"moved": moved, "errors": errors}
|
||||
|
||||
|
||||
_zip_store: dict = {} # zip_id -> path
|
||||
_ZIP_TTL = 3600 # ZIPs older than 1 h are deleted automatically
|
||||
|
||||
|
||||
def _cleanup_zips():
|
||||
while True:
|
||||
import time as _time
|
||||
_time.sleep(_ZIP_TTL)
|
||||
cutoff = time() - _ZIP_TTL
|
||||
for zip_id, path in list(_zip_store.items()):
|
||||
try:
|
||||
if os.path.getmtime(path) < cutoff:
|
||||
os.unlink(path)
|
||||
_zip_store.pop(zip_id, None)
|
||||
except OSError:
|
||||
_zip_store.pop(zip_id, None)
|
||||
|
||||
|
||||
threading.Thread(target=_cleanup_zips, daemon=True).start()
|
||||
|
||||
|
||||
def _run_export_job(job_id: str, req: ExportRequest):
|
||||
from processor import get_exif_info, get_new_name, process_photo
|
||||
from analyzer import SUPPORTED_EXTENSIONS
|
||||
try:
|
||||
folder_abs = os.path.abspath(req.folder)
|
||||
fav_set = set(req.fav_paths)
|
||||
|
||||
# Collect kept files (what's in folder, minus _aussortiert)
|
||||
all_files = sorted([
|
||||
os.path.join(folder_abs, f)
|
||||
for f in os.listdir(folder_abs)
|
||||
if os.path.isfile(os.path.join(folder_abs, f))
|
||||
and os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
|
||||
])
|
||||
# Only export paths the client passed (already-kept set)
|
||||
export_paths = [p for p in all_files if p in set(req.paths)] or all_files
|
||||
total = len(export_paths)
|
||||
_jobs[job_id]["total"] = total
|
||||
|
||||
buf = io.BytesIO()
|
||||
used_names: set = set()
|
||||
|
||||
detectors = set(req.feature_detectors)
|
||||
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||
for i, path in enumerate(export_paths):
|
||||
exif = get_exif_info(path)
|
||||
new_name = get_new_name(
|
||||
path, req.rename_mode, req.rename_prefix,
|
||||
i + 1, exif, path in fav_set, req.fav_prefix,
|
||||
)
|
||||
if detectors:
|
||||
from processor import detect_features
|
||||
feat_prefix = "".join(detect_features(path, detectors))
|
||||
if feat_prefix:
|
||||
new_name = feat_prefix + new_name
|
||||
# Deduplicate filenames
|
||||
base, ext = os.path.splitext(new_name)
|
||||
candidate, n = new_name, 1
|
||||
while candidate in used_names:
|
||||
candidate = f"{base}_{n:03d}{ext}"
|
||||
n += 1
|
||||
used_names.add(candidate)
|
||||
|
||||
try:
|
||||
data = process_photo(
|
||||
path,
|
||||
rotation=req.rotation,
|
||||
brightness=req.brightness,
|
||||
contrast=req.contrast,
|
||||
saturation=req.saturation,
|
||||
text_watermark=req.text_watermark or None,
|
||||
image_watermark_path=req.image_watermark_path or None,
|
||||
image_watermark_settings=req.image_watermark_settings or None,
|
||||
exif_info=exif,
|
||||
)
|
||||
zf.writestr(candidate, data)
|
||||
except Exception:
|
||||
pass
|
||||
_jobs[job_id]["done"] = i + 1
|
||||
|
||||
buf.seek(0)
|
||||
zip_id = uuid.uuid4().hex
|
||||
zip_path = os.path.join(tempfile.gettempdir(), f"onlyframes-export-{zip_id}.zip")
|
||||
with open(zip_path, "wb") as f:
|
||||
f.write(buf.read())
|
||||
_zip_store[zip_id] = zip_path
|
||||
_jobs[job_id] = {"status": "done", "zip_id": zip_id}
|
||||
except Exception as e:
|
||||
_jobs[job_id] = {"status": "error", "error": str(e)}
|
||||
|
||||
|
||||
@app.post("/export")
|
||||
def start_export(req: ExportRequest):
|
||||
folder_abs = os.path.abspath(req.folder)
|
||||
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
|
||||
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
|
||||
job_id = uuid.uuid4().hex
|
||||
_jobs[job_id] = {"status": "running", "done": 0, "total": 0}
|
||||
threading.Thread(target=_run_export_job, args=(job_id, req), daemon=True).start()
|
||||
return {"job_id": job_id}
|
||||
|
||||
|
||||
@app.get("/export/status/{job_id}")
|
||||
def export_status(job_id: str):
|
||||
job = _jobs.get(job_id)
|
||||
if not job:
|
||||
raise HTTPException(status_code=404, detail="Job nicht gefunden")
|
||||
if job["status"] == "error":
|
||||
raise HTTPException(status_code=500, detail=job["error"])
|
||||
if job["status"] == "running":
|
||||
return {"status": "running", "done": job.get("done", 0), "total": job.get("total", 0)}
|
||||
return {"status": "done", "zip_id": job["zip_id"]}
|
||||
|
||||
|
||||
@app.get("/export/download/{zip_id}")
|
||||
def export_download(zip_id: str):
|
||||
zip_path = _zip_store.get(zip_id)
|
||||
if not zip_path or not os.path.isfile(zip_path):
|
||||
raise HTTPException(status_code=404, detail="Export nicht gefunden")
|
||||
|
||||
def stream_and_cleanup():
|
||||
try:
|
||||
with open(zip_path, "rb") as f:
|
||||
yield from iter(lambda: f.read(65536), b"")
|
||||
finally:
|
||||
os.unlink(zip_path)
|
||||
_zip_store.pop(zip_id, None)
|
||||
|
||||
filename = f"onlyframes_{date.today().isoformat()}.zip"
|
||||
return StreamingResponse(
|
||||
stream_and_cleanup(),
|
||||
media_type="application/zip",
|
||||
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
||||
)
|
||||
|
||||
|
||||
@app.post("/upload-watermark")
|
||||
async def upload_watermark(file: UploadFile = File(...), folder: str = Form("")):
|
||||
ext = os.path.splitext(file.filename or "")[1].lower()
|
||||
if ext not in {".jpg", ".jpeg", ".png", ".gif", ".webp"}:
|
||||
raise HTTPException(status_code=400, detail="Ungültiges Format")
|
||||
tmp_base = tempfile.gettempdir()
|
||||
dest_dir = folder if (folder and os.path.isdir(folder)
|
||||
and os.path.abspath(folder).startswith(tmp_base)) else tmp_base
|
||||
dest = os.path.join(dest_dir, "wm_" + uuid.uuid4().hex[:8] + ext)
|
||||
with open(dest, "wb") as f:
|
||||
f.write(await file.read())
|
||||
return {"path": dest}
|
||||
|
||||
|
||||
@app.get("/detect-angle")
|
||||
def detect_angle(path: str):
|
||||
from processor import detect_horizon_angle
|
||||
path_abs = os.path.abspath(path)
|
||||
if not path_abs.startswith(tempfile.gettempdir()):
|
||||
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
|
||||
if not os.path.isfile(path_abs):
|
||||
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
|
||||
return {"angle": detect_horizon_angle(path_abs)}
|
||||
|
||||
|
||||
def open_browser():
|
||||
webbrowser.open("http://localhost:8000")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
threading.Timer(1.0, open_browser).start()
|
||||
uvicorn.run(app, host="127.0.0.1", port=8000)
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
|
||||
Reference in New Issue
Block a user