security: fix path traversal, CORS auth header, session expiry
- #1: /preview and /detect-angle now validate path is within tempdir - #2: Add Authorization to CORS allow_headers - #3: Sessions stored as {token: timestamp}, expire after 24h via _purge_expired_sessions() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,27 +1,93 @@
|
|||||||
|
import io
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
import uuid
|
||||||
import webbrowser
|
import webbrowser
|
||||||
import threading
|
import threading
|
||||||
|
import zipfile
|
||||||
|
from datetime import date
|
||||||
|
from time import time
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from fastapi import FastAPI, HTTPException
|
from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
|
||||||
from fastapi.responses import FileResponse, Response
|
from fastapi.responses import FileResponse, JSONResponse, Response, StreamingResponse
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
import secrets
|
||||||
import uvicorn
|
import uvicorn
|
||||||
|
from PIL import Image
|
||||||
|
try:
|
||||||
|
from pillow_heif import register_heif_opener
|
||||||
|
register_heif_opener()
|
||||||
|
_HEIF_SUPPORTED = True
|
||||||
|
except ImportError:
|
||||||
|
_HEIF_SUPPORTED = False
|
||||||
|
|
||||||
from analyzer import analyze_folder
|
from analyzer import analyze_folder
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
app = FastAPI(title="Foto-Kurator")
|
|
||||||
|
def cleanup_old_uploads():
|
||||||
|
"""Löscht beim Start alle alten onlyframes-tmp-Ordner."""
|
||||||
|
tmp = tempfile.gettempdir()
|
||||||
|
for name in os.listdir(tmp):
|
||||||
|
if name.startswith("onlyframes-") and name != "onlyframes-server.log":
|
||||||
|
path = os.path.join(tmp, name)
|
||||||
|
if os.path.isdir(path):
|
||||||
|
shutil.rmtree(path, ignore_errors=True)
|
||||||
|
|
||||||
|
|
||||||
|
cleanup_old_uploads()
|
||||||
|
|
||||||
|
APP_PASSWORD = os.getenv("APP_PASSWORD", "") # empty = no auth
|
||||||
|
_SESSION_TTL = 24 * 3600 # tokens expire after 24 h
|
||||||
|
_sessions: dict = {} # token -> created_at timestamp
|
||||||
|
|
||||||
|
app = FastAPI(title="OnlyFrames")
|
||||||
|
|
||||||
|
|
||||||
|
def _purge_expired_sessions():
|
||||||
|
cutoff = time() - _SESSION_TTL
|
||||||
|
expired = [t for t, ts in _sessions.items() if ts < cutoff]
|
||||||
|
for t in expired:
|
||||||
|
del _sessions[t]
|
||||||
|
|
||||||
|
|
||||||
|
@app.middleware("http")
|
||||||
|
async def auth_middleware(request: Request, call_next):
|
||||||
|
if not APP_PASSWORD:
|
||||||
|
return await call_next(request)
|
||||||
|
path = request.url.path
|
||||||
|
# Always allow: login endpoint and the frontend page itself
|
||||||
|
if path in ("/login", "/"):
|
||||||
|
return await call_next(request)
|
||||||
|
# Check Bearer token
|
||||||
|
auth = request.headers.get("Authorization", "")
|
||||||
|
token = auth.removeprefix("Bearer ").strip()
|
||||||
|
_purge_expired_sessions()
|
||||||
|
if token not in _sessions:
|
||||||
|
return JSONResponse({"detail": "Nicht autorisiert"}, status_code=401)
|
||||||
|
return await call_next(request)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/login")
|
||||||
|
def login(payload: dict):
|
||||||
|
if not APP_PASSWORD:
|
||||||
|
return {"token": "noauth"}
|
||||||
|
if payload.get("password") != APP_PASSWORD:
|
||||||
|
raise HTTPException(status_code=401, detail="Falsches Passwort")
|
||||||
|
token = secrets.token_hex(24)
|
||||||
|
_sessions[token] = time()
|
||||||
|
return {"token": token}
|
||||||
|
|
||||||
app.add_middleware(
|
app.add_middleware(
|
||||||
CORSMiddleware,
|
CORSMiddleware,
|
||||||
allow_origins=["*"],
|
allow_origins=["*"],
|
||||||
allow_methods=["GET", "POST"],
|
allow_methods=["GET", "POST", "DELETE"],
|
||||||
allow_headers=["Content-Type"],
|
allow_headers=["Content-Type", "Authorization"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -39,62 +105,227 @@ class MoveRequest(BaseModel):
|
|||||||
folder: str
|
folder: str
|
||||||
|
|
||||||
|
|
||||||
|
class ExportRequest(BaseModel):
|
||||||
|
folder: str
|
||||||
|
paths: List[str]
|
||||||
|
fav_paths: List[str] = []
|
||||||
|
rename_mode: str = "original"
|
||||||
|
rename_prefix: str = ""
|
||||||
|
fav_prefix: str = "FAV_"
|
||||||
|
rotation: float = 0.0
|
||||||
|
brightness: float = 1.0
|
||||||
|
contrast: float = 1.0
|
||||||
|
saturation: float = 1.0
|
||||||
|
text_watermark: dict = {}
|
||||||
|
image_watermark_path: str = ""
|
||||||
|
image_watermark_settings: dict = {}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/")
|
@app.get("/")
|
||||||
def serve_frontend():
|
def serve_frontend():
|
||||||
return FileResponse("index.html")
|
return FileResponse("index.html")
|
||||||
|
|
||||||
|
|
||||||
|
UPLOAD_ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".heic", ".heif"}
|
||||||
|
_HEIC_EXTS = {".heic", ".heif"}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/upload")
|
||||||
|
async def upload_files(files: List[UploadFile] = File(...), folder: str = Form("")):
|
||||||
|
tmp_base = tempfile.gettempdir()
|
||||||
|
if folder and os.path.isdir(folder) and os.path.abspath(folder).startswith(tmp_base):
|
||||||
|
tmp_dir = folder
|
||||||
|
else:
|
||||||
|
tmp_dir = os.path.join(tmp_base, "onlyframes-" + uuid.uuid4().hex[:8])
|
||||||
|
os.makedirs(tmp_dir)
|
||||||
|
saved = 0
|
||||||
|
for file in files:
|
||||||
|
ext = os.path.splitext(file.filename or "")[1].lower()
|
||||||
|
if ext not in UPLOAD_ALLOWED_EXTENSIONS:
|
||||||
|
continue
|
||||||
|
raw = await file.read()
|
||||||
|
if ext in _HEIC_EXTS and _HEIF_SUPPORTED:
|
||||||
|
# Convert HEIC/HEIF → JPEG so cv2 and browsers can handle it
|
||||||
|
safe_name = os.path.splitext(os.path.basename(file.filename))[0] + ".jpg"
|
||||||
|
dest = os.path.join(tmp_dir, safe_name)
|
||||||
|
try:
|
||||||
|
img = Image.open(io.BytesIO(raw)).convert("RGB")
|
||||||
|
img.save(dest, "JPEG", quality=92)
|
||||||
|
saved += 1
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
safe_name = os.path.basename(file.filename)
|
||||||
|
dest = os.path.join(tmp_dir, safe_name)
|
||||||
|
with open(dest, "wb") as f:
|
||||||
|
f.write(raw)
|
||||||
|
saved += 1
|
||||||
|
return {"folder": tmp_dir, "count": saved}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/download")
|
||||||
|
def download_kept(folder: str):
|
||||||
|
folder_abs = os.path.abspath(folder)
|
||||||
|
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
|
||||||
|
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
|
||||||
|
|
||||||
|
buf = io.BytesIO()
|
||||||
|
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||||
|
for fname in sorted(os.listdir(folder_abs)):
|
||||||
|
if fname == "_aussortiert":
|
||||||
|
continue
|
||||||
|
fpath = os.path.join(folder_abs, fname)
|
||||||
|
if os.path.isfile(fpath):
|
||||||
|
zf.write(fpath, fname)
|
||||||
|
buf.seek(0)
|
||||||
|
shutil.rmtree(folder_abs, ignore_errors=True)
|
||||||
|
|
||||||
|
filename = f"onlyframes_aussortiert_{date.today().isoformat()}.zip"
|
||||||
|
return StreamingResponse(
|
||||||
|
buf,
|
||||||
|
media_type="application/zip",
|
||||||
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/browse")
|
||||||
|
def browse(path: str = "/home/vchuser"):
|
||||||
|
path = os.path.abspath(path)
|
||||||
|
if not os.path.isdir(path):
|
||||||
|
raise HTTPException(status_code=400, detail="Kein gültiger Ordner")
|
||||||
|
try:
|
||||||
|
dirs = sorted(
|
||||||
|
name for name in os.listdir(path)
|
||||||
|
if not name.startswith(".") and os.path.isdir(os.path.join(path, name))
|
||||||
|
)
|
||||||
|
except PermissionError:
|
||||||
|
raise HTTPException(status_code=403, detail="Kein Zugriff")
|
||||||
|
parent = os.path.dirname(path) if path != "/" else None
|
||||||
|
return {"path": path, "parent": parent, "dirs": dirs}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/pick-folder")
|
@app.get("/pick-folder")
|
||||||
def pick_folder():
|
def pick_folder():
|
||||||
import subprocess
|
raise HTTPException(status_code=501, detail="Nicht verfügbar")
|
||||||
result = subprocess.run(
|
|
||||||
["osascript", "-e", 'POSIX path of (choose folder with prompt "Ordner auswählen")'],
|
|
||||||
capture_output=True, text=True
|
|
||||||
)
|
|
||||||
folder = result.stdout.strip().rstrip("/")
|
|
||||||
if not folder:
|
|
||||||
raise HTTPException(status_code=204, detail="Kein Ordner ausgewählt")
|
|
||||||
return {"folder": folder}
|
|
||||||
|
|
||||||
|
|
||||||
PREVIEW_ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png"}
|
PREVIEW_ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"}
|
||||||
|
_PREVIEW_MEDIA = {".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".webp": "image/webp"}
|
||||||
|
|
||||||
|
|
||||||
@app.get("/preview")
|
@app.get("/preview")
|
||||||
def preview(path: str):
|
def preview(path: str):
|
||||||
ext = os.path.splitext(path)[1].lower()
|
path_abs = os.path.abspath(path)
|
||||||
|
if not path_abs.startswith(tempfile.gettempdir()):
|
||||||
|
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
|
||||||
|
ext = os.path.splitext(path_abs)[1].lower()
|
||||||
if ext not in PREVIEW_ALLOWED_EXTENSIONS:
|
if ext not in PREVIEW_ALLOWED_EXTENSIONS:
|
||||||
raise HTTPException(status_code=403, detail="Dateityp nicht erlaubt")
|
raise HTTPException(status_code=403, detail="Dateityp nicht erlaubt")
|
||||||
if not os.path.isfile(path):
|
if not os.path.isfile(path_abs):
|
||||||
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
|
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
|
||||||
media = "image/jpeg" if ext in (".jpg", ".jpeg") else "image/png"
|
media = _PREVIEW_MEDIA.get(ext, "image/jpeg")
|
||||||
with open(path, "rb") as f:
|
with open(path_abs, "rb") as f:
|
||||||
return Response(content=f.read(), media_type=media)
|
return Response(content=f.read(), media_type=media)
|
||||||
|
|
||||||
|
|
||||||
|
_jobs: dict = {} # job_id -> {"status": "running"|"done"|"error", "result": ..., "error": ...}
|
||||||
|
|
||||||
|
|
||||||
|
_PHASE_LABELS = {
|
||||||
|
"quality": "Qualität prüfen",
|
||||||
|
"exact_copies": "Exakte Kopien suchen",
|
||||||
|
"duplicates": "Duplikate suchen",
|
||||||
|
"ai": "KI-Analyse",
|
||||||
|
"done": "Fertig",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _run_analyze_job(job_id: str, req: AnalyzeRequest):
|
||||||
|
try:
|
||||||
|
def on_progress(done, total, phase):
|
||||||
|
_jobs[job_id].update({
|
||||||
|
"done": done,
|
||||||
|
"total": total,
|
||||||
|
"phase": _PHASE_LABELS.get(phase, phase),
|
||||||
|
})
|
||||||
|
|
||||||
|
api_key = os.getenv("ANTHROPIC_API_KEY") if req.use_ai else None
|
||||||
|
results = analyze_folder(
|
||||||
|
folder=req.folder,
|
||||||
|
blur_threshold=req.blur_threshold,
|
||||||
|
over_threshold=req.over_threshold,
|
||||||
|
under_threshold=req.under_threshold,
|
||||||
|
dup_threshold=req.dup_threshold,
|
||||||
|
use_ai=req.use_ai,
|
||||||
|
api_key=api_key,
|
||||||
|
progress_callback=on_progress,
|
||||||
|
)
|
||||||
|
from analyzer import SUPPORTED_EXTENSIONS
|
||||||
|
all_paths = {
|
||||||
|
os.path.join(req.folder, f)
|
||||||
|
for f in os.listdir(req.folder)
|
||||||
|
if os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
|
||||||
|
}
|
||||||
|
flagged_paths = {item["path"] for item in results}
|
||||||
|
ok_paths = sorted(all_paths - flagged_paths)
|
||||||
|
_jobs[job_id] = {"status": "done", "result": {"results": results, "ok_paths": ok_paths}}
|
||||||
|
except Exception as e:
|
||||||
|
_jobs[job_id] = {"status": "error", "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/analyze")
|
@app.post("/analyze")
|
||||||
def analyze(req: AnalyzeRequest):
|
def analyze(req: AnalyzeRequest):
|
||||||
if not os.path.isdir(req.folder):
|
if not os.path.isdir(req.folder):
|
||||||
raise HTTPException(status_code=400, detail=f"Ordner nicht gefunden: {req.folder}")
|
raise HTTPException(status_code=400, detail=f"Ordner nicht gefunden: {req.folder}")
|
||||||
api_key = os.getenv("ANTHROPIC_API_KEY") if req.use_ai else None
|
job_id = uuid.uuid4().hex
|
||||||
results = analyze_folder(
|
_jobs[job_id] = {"status": "running"}
|
||||||
folder=req.folder,
|
threading.Thread(target=_run_analyze_job, args=(job_id, req), daemon=True).start()
|
||||||
blur_threshold=req.blur_threshold,
|
return {"job_id": job_id}
|
||||||
over_threshold=req.over_threshold,
|
|
||||||
under_threshold=req.under_threshold,
|
|
||||||
dup_threshold=req.dup_threshold,
|
@app.get("/analyze/status/{job_id}")
|
||||||
use_ai=req.use_ai,
|
def analyze_status(job_id: str):
|
||||||
api_key=api_key,
|
job = _jobs.get(job_id)
|
||||||
)
|
if job is None:
|
||||||
from analyzer import SUPPORTED_EXTENSIONS
|
raise HTTPException(status_code=404, detail="Job nicht gefunden")
|
||||||
all_paths = {
|
if job["status"] == "error":
|
||||||
os.path.join(req.folder, f)
|
raise HTTPException(status_code=500, detail=job["error"])
|
||||||
for f in os.listdir(req.folder)
|
if job["status"] == "running":
|
||||||
if os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
|
return {
|
||||||
}
|
"status": "running",
|
||||||
flagged_paths = {item["path"] for item in results}
|
"done": job.get("done", 0),
|
||||||
ok_paths = sorted(all_paths - flagged_paths)
|
"total": job.get("total", 0),
|
||||||
return {"results": results, "ok_paths": ok_paths}
|
"phase": job.get("phase", "Vorbereitung…"),
|
||||||
|
}
|
||||||
|
# done — return result and clean up
|
||||||
|
result = job["result"]
|
||||||
|
del _jobs[job_id]
|
||||||
|
return {"status": "done", **result}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/uploads")
|
||||||
|
def list_uploads():
|
||||||
|
tmp = tempfile.gettempdir()
|
||||||
|
sessions = []
|
||||||
|
for name in sorted(os.listdir(tmp)):
|
||||||
|
if not name.startswith("onlyframes-") or name == "onlyframes-server.log":
|
||||||
|
continue
|
||||||
|
path = os.path.join(tmp, name)
|
||||||
|
if not os.path.isdir(path):
|
||||||
|
continue
|
||||||
|
files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
|
||||||
|
size = sum(os.path.getsize(os.path.join(path, f)) for f in files)
|
||||||
|
sessions.append({"folder": path, "id": name, "count": len(files), "size": size})
|
||||||
|
return {"sessions": sessions}
|
||||||
|
|
||||||
|
|
||||||
|
@app.delete("/uploads")
|
||||||
|
def delete_upload(folder: str):
|
||||||
|
folder_abs = os.path.abspath(folder)
|
||||||
|
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
|
||||||
|
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
|
||||||
|
shutil.rmtree(folder_abs, ignore_errors=True)
|
||||||
|
return {"ok": True}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/move")
|
@app.post("/move")
|
||||||
@@ -120,6 +351,144 @@ def move_files(req: MoveRequest):
|
|||||||
return {"moved": moved, "errors": errors}
|
return {"moved": moved, "errors": errors}
|
||||||
|
|
||||||
|
|
||||||
|
_zip_store: dict = {} # zip_id -> path
|
||||||
|
|
||||||
|
|
||||||
|
def _run_export_job(job_id: str, req: ExportRequest):
|
||||||
|
from processor import get_exif_info, get_new_name, process_photo
|
||||||
|
from analyzer import SUPPORTED_EXTENSIONS
|
||||||
|
try:
|
||||||
|
folder_abs = os.path.abspath(req.folder)
|
||||||
|
fav_set = set(req.fav_paths)
|
||||||
|
|
||||||
|
# Collect kept files (what's in folder, minus _aussortiert)
|
||||||
|
all_files = sorted([
|
||||||
|
os.path.join(folder_abs, f)
|
||||||
|
for f in os.listdir(folder_abs)
|
||||||
|
if os.path.isfile(os.path.join(folder_abs, f))
|
||||||
|
and os.path.splitext(f)[1].lower() in SUPPORTED_EXTENSIONS
|
||||||
|
])
|
||||||
|
# Only export paths the client passed (already-kept set)
|
||||||
|
export_paths = [p for p in all_files if p in set(req.paths)] or all_files
|
||||||
|
total = len(export_paths)
|
||||||
|
_jobs[job_id]["total"] = total
|
||||||
|
|
||||||
|
buf = io.BytesIO()
|
||||||
|
used_names: set = set()
|
||||||
|
|
||||||
|
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||||
|
for i, path in enumerate(export_paths):
|
||||||
|
exif = get_exif_info(path)
|
||||||
|
new_name = get_new_name(
|
||||||
|
path, req.rename_mode, req.rename_prefix,
|
||||||
|
i + 1, exif, path in fav_set, req.fav_prefix,
|
||||||
|
)
|
||||||
|
# Deduplicate filenames
|
||||||
|
base, ext = os.path.splitext(new_name)
|
||||||
|
candidate, n = new_name, 1
|
||||||
|
while candidate in used_names:
|
||||||
|
candidate = f"{base}_{n}{ext}"
|
||||||
|
n += 1
|
||||||
|
used_names.add(candidate)
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = process_photo(
|
||||||
|
path,
|
||||||
|
rotation=req.rotation,
|
||||||
|
brightness=req.brightness,
|
||||||
|
contrast=req.contrast,
|
||||||
|
saturation=req.saturation,
|
||||||
|
text_watermark=req.text_watermark or None,
|
||||||
|
image_watermark_path=req.image_watermark_path or None,
|
||||||
|
image_watermark_settings=req.image_watermark_settings or None,
|
||||||
|
exif_info=exif,
|
||||||
|
)
|
||||||
|
zf.writestr(candidate, data)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
_jobs[job_id]["done"] = i + 1
|
||||||
|
|
||||||
|
buf.seek(0)
|
||||||
|
zip_id = uuid.uuid4().hex
|
||||||
|
zip_path = os.path.join(tempfile.gettempdir(), f"onlyframes-export-{zip_id}.zip")
|
||||||
|
with open(zip_path, "wb") as f:
|
||||||
|
f.write(buf.read())
|
||||||
|
_zip_store[zip_id] = zip_path
|
||||||
|
_jobs[job_id] = {"status": "done", "zip_id": zip_id}
|
||||||
|
except Exception as e:
|
||||||
|
_jobs[job_id] = {"status": "error", "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/export")
|
||||||
|
def start_export(req: ExportRequest):
|
||||||
|
folder_abs = os.path.abspath(req.folder)
|
||||||
|
if not folder_abs.startswith(tempfile.gettempdir()) or not os.path.isdir(folder_abs):
|
||||||
|
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
|
||||||
|
job_id = uuid.uuid4().hex
|
||||||
|
_jobs[job_id] = {"status": "running", "done": 0, "total": 0}
|
||||||
|
threading.Thread(target=_run_export_job, args=(job_id, req), daemon=True).start()
|
||||||
|
return {"job_id": job_id}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/export/status/{job_id}")
|
||||||
|
def export_status(job_id: str):
|
||||||
|
job = _jobs.get(job_id)
|
||||||
|
if not job:
|
||||||
|
raise HTTPException(status_code=404, detail="Job nicht gefunden")
|
||||||
|
if job["status"] == "error":
|
||||||
|
raise HTTPException(status_code=500, detail=job["error"])
|
||||||
|
if job["status"] == "running":
|
||||||
|
return {"status": "running", "done": job.get("done", 0), "total": job.get("total", 0)}
|
||||||
|
return {"status": "done", "zip_id": job["zip_id"]}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/export/download/{zip_id}")
|
||||||
|
def export_download(zip_id: str):
|
||||||
|
zip_path = _zip_store.get(zip_id)
|
||||||
|
if not zip_path or not os.path.isfile(zip_path):
|
||||||
|
raise HTTPException(status_code=404, detail="Export nicht gefunden")
|
||||||
|
|
||||||
|
def stream_and_cleanup():
|
||||||
|
try:
|
||||||
|
with open(zip_path, "rb") as f:
|
||||||
|
yield from iter(lambda: f.read(65536), b"")
|
||||||
|
finally:
|
||||||
|
os.unlink(zip_path)
|
||||||
|
_zip_store.pop(zip_id, None)
|
||||||
|
|
||||||
|
filename = f"onlyframes_{date.today().isoformat()}.zip"
|
||||||
|
return StreamingResponse(
|
||||||
|
stream_and_cleanup(),
|
||||||
|
media_type="application/zip",
|
||||||
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/upload-watermark")
|
||||||
|
async def upload_watermark(file: UploadFile = File(...), folder: str = Form("")):
|
||||||
|
ext = os.path.splitext(file.filename or "")[1].lower()
|
||||||
|
if ext not in {".jpg", ".jpeg", ".png", ".gif", ".webp"}:
|
||||||
|
raise HTTPException(status_code=400, detail="Ungültiges Format")
|
||||||
|
tmp_base = tempfile.gettempdir()
|
||||||
|
dest_dir = folder if (folder and os.path.isdir(folder)
|
||||||
|
and os.path.abspath(folder).startswith(tmp_base)) else tmp_base
|
||||||
|
dest = os.path.join(dest_dir, "wm_" + uuid.uuid4().hex[:8] + ext)
|
||||||
|
with open(dest, "wb") as f:
|
||||||
|
f.write(await file.read())
|
||||||
|
return {"path": dest}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/detect-angle")
|
||||||
|
def detect_angle(path: str):
|
||||||
|
from processor import detect_horizon_angle
|
||||||
|
path_abs = os.path.abspath(path)
|
||||||
|
if not path_abs.startswith(tempfile.gettempdir()):
|
||||||
|
raise HTTPException(status_code=403, detail="Zugriff nicht erlaubt")
|
||||||
|
if not os.path.isfile(path_abs):
|
||||||
|
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
|
||||||
|
return {"angle": detect_horizon_angle(path_abs)}
|
||||||
|
|
||||||
|
|
||||||
def open_browser():
|
def open_browser():
|
||||||
webbrowser.open("http://localhost:8000")
|
webbrowser.open("http://localhost:8000")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user