Files
OnlyFrames/server.py
T
ferdi2go 1aded7ff0d feat: auto feature detection with filename prefixes on export
Detects QR codes (QR_), barcodes (BC_), faces (FACE_/GROUP_),
and panoramas (PANO_) per photo using OpenCV — no new dependencies.
Opt-in checkboxes in the rename tab; prefixes prepend to filename.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 12:50:34 +00:00

530 lines
18 KiB
Python

import io
import os
import shutil
import tempfile
import uuid
import webbrowser
import threading
import zipfile
from datetime import date
from time import time
from typing import List, Optional
from dotenv import load_dotenv
from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import FileResponse, JSONResponse, Response, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import secrets
import uvicorn
from PIL import Image
try:
from pillow_heif import register_heif_opener
register_heif_opener()
_HEIF_SUPPORTED = True
except ImportError:
_HEIF_SUPPORTED = False
from analyzer import analyze_folder
load_dotenv()
def cleanup_old_uploads():
"""Löscht onlyframes-tmp-Ordner die älter als 24h sind."""
tmp = tempfile.gettempdir()
cutoff = time() - 24 * 3600
for name in os.listdir(tmp):
if name.startswith("onlyframes-") and name != "onlyframes-server.log":
path = os.path.join(tmp, name)
if os.path.isdir(path):
try:
if os.path.getmtime(path) < cutoff:
shutil.rmtree(path, ignore_errors=True)
except OSError:
pass
cleanup_old_uploads()
APP_PASSWORD = os.getenv("APP_PASSWORD", "") # empty = no auth
_SESSION_TTL = 24 * 3600 # tokens expire after 24 h
_sessions: dict = {} # token -> created_at timestamp
app = FastAPI(title="OnlyFrames")
def _purge_expired_sessions():
cutoff = time() - _SESSION_TTL
expired = [t for t, ts in _sessions.items() if ts < cutoff]
for t in expired:
del _sessions[t]
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
if not APP_PASSWORD:
return await call_next(request)
path = request.url.path
# Always allow: login endpoint and the frontend page itself
if path in ("/login", "/"):
return await call_next(request)
# Check Bearer token
auth = request.headers.get("Authorization", "")
token = auth.removeprefix("Bearer ").strip()
_purge_expired_sessions()
if token not in _sessions:
return JSONResponse({"detail": "Nicht autorisiert"}, status_code=401)
return await call_next(request)
@app.post("/login")
def login(payload: dict):
if not APP_PASSWORD:
return {"token": "noauth"}
if payload.get("password") != APP_PASSWORD:
raise HTTPException(status_code=401, detail="Falsches Passwort")
token = secrets.token_hex(24)
_sessions[token] = time()
return {"token": token}
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST", "DELETE"],
allow_headers=["Content-Type", "Authorization"],
)
class AnalyzeRequest(BaseModel):
folder: str
blur_threshold: float = 100.0
over_threshold: float = 240.0
under_threshold: float = 30.0
dup_threshold: int = 8
use_ai: bool = False
class MoveRequest(BaseModel):
paths: List[str]
folder: str
class ExportRequest(BaseModel):
folder: str
paths: List[str]
fav_paths: List[str] = []
rename_mode: str = "original"
rename_prefix: str = ""
fav_prefix: str = "FAV_"
rotation: float = 0.0
brightness: float = 1.0
contrast: float = 1.0
saturation: float = 1.0
text_watermark: dict = Field(default_factory=dict)
image_watermark_path: str = ""
image_watermark_settings: dict = Field(default_factory=dict)
feature_detectors: List[str] = Field(default_factory=list)
@app.get("/")
def serve_frontend():
return FileResponse("index.html")
UPLOAD_ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".heic", ".heif"}
_HEIC_EXTS = {".heic", ".heif"}
MAX_UPLOAD_BYTES = 50 * 1024 * 1024 # 50 MB per file
@app.post("/upload")
async def upload_files(files: List[UploadFile] = File(...), folder: str = Form("")):
tmp_base = tempfile.gettempdir()
if folder and os.path.isdir(folder) and os.path.abspath(folder).startswith(tmp_base):
tmp_dir = folder
else:
tmp_dir = os.path.join(tmp_base, "onlyframes-" + uuid.uuid4().hex[:8])
os.makedirs(tmp_dir)
saved = 0
for file in files:
ext = os.path.splitext(file.filename or "")[1].lower()
if ext not in UPLOAD_ALLOWED_EXTENSIONS:
continue
raw = await file.read(MAX_UPLOAD_BYTES + 1)
if len(raw) > MAX_UPLOAD_BYTES:
continue
if ext in _HEIC_EXTS and _HEIF_SUPPORTED:
# Convert HEIC/HEIF → JPEG so cv2 and browsers can handle it
safe_name = os.path.splitext(os.path.basename(file.filename))[0] + ".jpg"
dest = os.path.join(tmp_dir, safe_name)
try:
img = Image.open(io.BytesIO(raw)).convert("RGB")
img.save(dest, "JPEG", quality=92)
saved += 1
except Exception:
pass
else:
safe_name = os.path.basename(file.filename)
dest = os.path.join(tmp_dir, safe_name)
with open(dest, "wb") as f:
f.write(raw)
saved += 1
return {"folder": tmp_dir, "count": saved}
@app.get("/download")
def download_kept(folder: str):
folder_abs = os.path.abspath(folder)
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for fname in sorted(os.listdir(folder_abs)):
if fname == "_aussortiert":
continue
fpath = os.path.join(folder_abs, fname)
if os.path.isfile(fpath):
zf.write(fpath, fname)
buf.seek(0)
shutil.rmtree(folder_abs, ignore_errors=True)
filename = f"onlyframes_aussortiert_{date.today().isoformat()}.zip"
return StreamingResponse(
buf,
media_type="application/zip",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@app.get("/browse")
def browse(path: str = "/home/vchuser"):
path = os.path.abspath(path)
if not os.path.isdir(path):
raise HTTPException(status_code=400, detail="Kein gültiger Ordner")
try:
dirs = sorted(
name for name in os.listdir(path)
if not name.startswith(".") and os.path.isdir(os.path.join(path, name))
)
except PermissionError:
raise HTTPException(status_code=403, detail="Kein Zugriff")
parent = os.path.dirname(path) if path != "/" else None
return {"path": path, "parent": parent, "dirs": dirs}
@app.get("/pick-folder")
def pick_folder():
raise HTTPException(status_code=501, detail="Nicht verfügbar")
PREVIEW_ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"}
_PREVIEW_MEDIA = {".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".webp": "image/webp"}
@app.get("/preview")
def preview(path: str):
path_abs = os.path.abspath(path)
if not path_abs.startswith(tempfile.gettempdir()):
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
ext = os.path.splitext(path_abs)[1].lower()
if ext not in PREVIEW_ALLOWED_EXTENSIONS:
raise HTTPException(status_code=403, detail="Dateityp nicht erlaubt")
if not os.path.isfile(path_abs):
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
media = _PREVIEW_MEDIA.get(ext, "image/jpeg")
with open(path_abs, "rb") as f:
return Response(content=f.read(), media_type=media)
_jobs: dict = {} # job_id -> {"status": "running"|"done"|"error", "result": ..., "error": ...}
_PHASE_LABELS = {
"quality": "Qualität prüfen",
"exact_copies": "Exakte Kopien suchen",
"duplicates": "Duplikate suchen",
"ai": "KI-Analyse",
"done": "Fertig",
}
def _run_analyze_job(job_id: str, req: AnalyzeRequest):
try:
def on_progress(done, total, phase):
_jobs[job_id]["done"] = done
_jobs[job_id]["total"] = total
_jobs[job_id]["phase"] = _PHASE_LABELS.get(phase, phase)
api_key = os.getenv("ANTHROPIC_API_KEY") if req.use_ai else None
results = analyze_folder(
folder=req.folder,
blur_threshold=req.blur_threshold,
over_threshold=req.over_threshold,
under_threshold=req.under_threshold,
dup_threshold=req.dup_threshold,
use_ai=req.use_ai,
api_key=api_key,
progress_callback=on_progress,
)
from analyzer import SUPPORTED_EXTENSIONS
all_paths = {
os.path.join(req.folder, f)
for f in os.listdir(req.folder)
if os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
}
flagged_paths = {item["path"] for item in results}
ok_paths = sorted(all_paths - flagged_paths)
_jobs[job_id] = {"status": "done", "result": {"results": results, "ok_paths": ok_paths}}
except Exception as e:
_jobs[job_id] = {"status": "error", "error": str(e)}
@app.post("/analyze")
def analyze(req: AnalyzeRequest):
if not os.path.isdir(req.folder):
raise HTTPException(status_code=400, detail=f"Ordner nicht gefunden: {req.folder}")
job_id = uuid.uuid4().hex
_jobs[job_id] = {"status": "running"}
threading.Thread(target=_run_analyze_job, args=(job_id, req), daemon=True).start()
return {"job_id": job_id}
@app.get("/analyze/status/{job_id}")
def analyze_status(job_id: str):
job = _jobs.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job nicht gefunden")
if job["status"] == "error":
raise HTTPException(status_code=500, detail=job["error"])
if job["status"] == "running":
return {
"status": "running",
"done": job.get("done", 0),
"total": job.get("total", 0),
"phase": job.get("phase", "Vorbereitung…"),
}
# done — return result and clean up
result = job["result"]
del _jobs[job_id]
return {"status": "done", **result}
@app.get("/uploads")
def list_uploads():
tmp = tempfile.gettempdir()
sessions = []
for name in sorted(os.listdir(tmp)):
if not name.startswith("onlyframes-") or name == "onlyframes-server.log":
continue
path = os.path.join(tmp, name)
if not os.path.isdir(path):
continue
files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
size = sum(os.path.getsize(os.path.join(path, f)) for f in files)
sessions.append({"folder": path, "id": name, "count": len(files), "size": size})
return {"sessions": sessions}
@app.delete("/uploads")
def delete_upload(folder: str):
folder_abs = os.path.abspath(folder)
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
shutil.rmtree(folder_abs, ignore_errors=True)
return {"ok": True}
@app.post("/move")
def move_files(req: MoveRequest):
folder_abs = os.path.abspath(req.folder)
if not os.path.isdir(folder_abs):
raise HTTPException(status_code=400, detail=f"Ordner nicht gefunden: {req.folder}")
target_dir = os.path.join(folder_abs, "_aussortiert")
os.makedirs(target_dir, exist_ok=True)
moved = []
errors = []
for path in req.paths:
path_abs = os.path.abspath(path)
if not path_abs.startswith(folder_abs + os.sep):
errors.append({"path": path, "error": "Pfad liegt außerhalb des analysierten Ordners"})
continue
try:
dest = os.path.join(target_dir, os.path.basename(path_abs))
shutil.move(path_abs, dest)
moved.append(path)
except Exception as e:
errors.append({"path": path, "error": str(e)})
return {"moved": moved, "errors": errors}
_zip_store: dict = {} # zip_id -> path
_ZIP_TTL = 3600 # ZIPs older than 1 h are deleted automatically
def _cleanup_zips():
while True:
import time as _time
_time.sleep(_ZIP_TTL)
cutoff = time() - _ZIP_TTL
for zip_id, path in list(_zip_store.items()):
try:
if os.path.getmtime(path) < cutoff:
os.unlink(path)
_zip_store.pop(zip_id, None)
except OSError:
_zip_store.pop(zip_id, None)
threading.Thread(target=_cleanup_zips, daemon=True).start()
def _run_export_job(job_id: str, req: ExportRequest):
from processor import get_exif_info, get_new_name, process_photo
from analyzer import SUPPORTED_EXTENSIONS
try:
folder_abs = os.path.abspath(req.folder)
fav_set = set(req.fav_paths)
# Collect kept files (what's in folder, minus _aussortiert)
all_files = sorted([
os.path.join(folder_abs, f)
for f in os.listdir(folder_abs)
if os.path.isfile(os.path.join(folder_abs, f))
and os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
])
# Only export paths the client passed (already-kept set)
export_paths = [p for p in all_files if p in set(req.paths)] or all_files
total = len(export_paths)
_jobs[job_id]["total"] = total
buf = io.BytesIO()
used_names: set = set()
detectors = set(req.feature_detectors)
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for i, path in enumerate(export_paths):
exif = get_exif_info(path)
new_name = get_new_name(
path, req.rename_mode, req.rename_prefix,
i + 1, exif, path in fav_set, req.fav_prefix,
)
if detectors:
from processor import detect_features
feat_prefix = "".join(detect_features(path, detectors))
if feat_prefix:
new_name = feat_prefix + new_name
# Deduplicate filenames
base, ext = os.path.splitext(new_name)
candidate, n = new_name, 1
while candidate in used_names:
candidate = f"{base}_{n:03d}{ext}"
n += 1
used_names.add(candidate)
try:
data = process_photo(
path,
rotation=req.rotation,
brightness=req.brightness,
contrast=req.contrast,
saturation=req.saturation,
text_watermark=req.text_watermark or None,
image_watermark_path=req.image_watermark_path or None,
image_watermark_settings=req.image_watermark_settings or None,
exif_info=exif,
)
zf.writestr(candidate, data)
except Exception:
pass
_jobs[job_id]["done"] = i + 1
buf.seek(0)
zip_id = uuid.uuid4().hex
zip_path = os.path.join(tempfile.gettempdir(), f"onlyframes-export-{zip_id}.zip")
with open(zip_path, "wb") as f:
f.write(buf.read())
_zip_store[zip_id] = zip_path
_jobs[job_id] = {"status": "done", "zip_id": zip_id}
except Exception as e:
_jobs[job_id] = {"status": "error", "error": str(e)}
@app.post("/export")
def start_export(req: ExportRequest):
folder_abs = os.path.abspath(req.folder)
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
job_id = uuid.uuid4().hex
_jobs[job_id] = {"status": "running", "done": 0, "total": 0}
threading.Thread(target=_run_export_job, args=(job_id, req), daemon=True).start()
return {"job_id": job_id}
@app.get("/export/status/{job_id}")
def export_status(job_id: str):
job = _jobs.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job nicht gefunden")
if job["status"] == "error":
raise HTTPException(status_code=500, detail=job["error"])
if job["status"] == "running":
return {"status": "running", "done": job.get("done", 0), "total": job.get("total", 0)}
return {"status": "done", "zip_id": job["zip_id"]}
@app.get("/export/download/{zip_id}")
def export_download(zip_id: str):
zip_path = _zip_store.get(zip_id)
if not zip_path or not os.path.isfile(zip_path):
raise HTTPException(status_code=404, detail="Export nicht gefunden")
def stream_and_cleanup():
try:
with open(zip_path, "rb") as f:
yield from iter(lambda: f.read(65536), b"")
finally:
os.unlink(zip_path)
_zip_store.pop(zip_id, None)
filename = f"onlyframes_{date.today().isoformat()}.zip"
return StreamingResponse(
stream_and_cleanup(),
media_type="application/zip",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@app.post("/upload-watermark")
async def upload_watermark(file: UploadFile = File(...), folder: str = Form("")):
ext = os.path.splitext(file.filename or "")[1].lower()
if ext not in {".jpg", ".jpeg", ".png", ".gif", ".webp"}:
raise HTTPException(status_code=400, detail="Ungültiges Format")
tmp_base = tempfile.gettempdir()
dest_dir = folder if (folder and os.path.isdir(folder)
and os.path.abspath(folder).startswith(tmp_base)) else tmp_base
dest = os.path.join(dest_dir, "wm_" + uuid.uuid4().hex[:8] + ext)
with open(dest, "wb") as f:
f.write(await file.read())
return {"path": dest}
@app.get("/detect-angle")
def detect_angle(path: str):
from processor import detect_horizon_angle
path_abs = os.path.abspath(path)
if not path_abs.startswith(tempfile.gettempdir()):
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
if not os.path.isfile(path_abs):
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
return {"angle": detect_horizon_angle(path_abs)}
def open_browser():
webbrowser.open("http://localhost:8000")
if __name__ == "__main__":
threading.Timer(1.0, open_browser).start()
uvicorn.run(app, host="0.0.0.0", port=8000)