import io import math import os import re from datetime import datetime from typing import List, Optional, Set import cv2 import numpy as np from PIL import Image, ImageDraw, ImageEnhance, ImageFont, ImageOps from PIL.ExifTags import TAGS # --------------------------------------------------------------------------- # EXIF helpers # --------------------------------------------------------------------------- def get_exif_info(path: str) -> dict: """Returns dict with keys: date_str, date_formatted, camera, lens.""" info = {"date_str": None, "date_formatted": None, "camera": None, "lens": None} try: img = Image.open(path) exif_raw = img.getexif() if not exif_raw: return info exif = {TAGS.get(k, k): v for k, v in exif_raw.items()} # Date date_str = exif.get("DateTimeOriginal") or exif.get("DateTime") if date_str and isinstance(date_str, str): info["date_str"] = date_str # "2024:07:15 14:30:22" -> "2024-07-15_143022" clean = date_str.replace(":", "-", 2).replace(" ", "_").replace(":", "") info["date_formatted"] = clean # Camera make = str(exif.get("Make", "")).strip().rstrip("\x00") model = str(exif.get("Model", "")).strip().rstrip("\x00") if model: info["camera"] = model if make and make in model else f"{make} {model}".strip() # Lens lens = str(exif.get("LensModel", "")).strip().rstrip("\x00") if lens: info["lens"] = lens except Exception: pass return info def resolve_wm_template(template: str, exif: dict) -> str: """Replaces {date}, {time}, {camera}, {lens} in watermark text.""" date_str = exif.get("date_str") or "" date_part = date_str[:10].replace(":", "-") if date_str else "" time_part = date_str[11:] if len(date_str) > 10 else "" return (template .replace("{date}", date_part) .replace("{time}", time_part) .replace("{camera}", exif.get("camera") or "") .replace("{lens}", exif.get("lens") or "")) # --------------------------------------------------------------------------- # Rename # --------------------------------------------------------------------------- def get_new_name(original_path: str, mode: str, prefix: str, index: int, exif_info: dict, is_fav: bool, fav_prefix: str) -> str: original = os.path.basename(original_path) stem, ext = os.path.splitext(original) ext = ext.lower() or ".jpg" date_fmt = exif_info.get("date_formatted") # "2024-07-15_143022" if mode == "original": new = prefix + original if prefix else original elif mode == "datetime": if date_fmt: new = f"{prefix}{date_fmt}{ext}" else: new = f"{prefix}{stem}{ext}" elif mode == "date_seq": date_part = date_fmt[:10] if date_fmt else "nodate" new = f"{prefix}{date_part}_{index:04d}{ext}" elif mode == "prefix_seq": new = f"{prefix}{index:04d}{ext}" else: new = f"{prefix}{index:04d}_{stem}{ext}" if is_fav: new = fav_prefix + new return new # --------------------------------------------------------------------------- # Horizon detection # --------------------------------------------------------------------------- def detect_horizon_angle(path: str) -> float: """Returns skew angle in degrees (positive = clockwise). 0.0 if undetermined.""" try: img = cv2.imread(path, cv2.IMREAD_GRAYSCALE) if img is None: return 0.0 h, w = img.shape # Downsample for speed scale = min(1.0, 800 / max(w, 1)) if scale < 1.0: img = cv2.resize(img, (int(w * scale), int(h * scale))) h, w = img.shape # Focus on middle horizontal band roi = img[h // 3: 2 * h // 3, :] edges = cv2.Canny(roi, 50, 150) lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=60, minLineLength=w // 6, maxLineGap=15) if lines is None: return 0.0 angles = [] for line in lines: x1, y1, x2, y2 = line[0] if x2 != x1: angle = math.degrees(math.atan2(y2 - y1, x2 - x1)) if -20 < angle < 20: angles.append(angle) if not angles: return 0.0 return round(float(np.median(angles)), 1) except Exception: return 0.0 # --------------------------------------------------------------------------- # Feature detection # --------------------------------------------------------------------------- _face_cascade = None def _get_face_cascade(): global _face_cascade if _face_cascade is None: path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml" _face_cascade = cv2.CascadeClassifier(path) return _face_cascade def detect_features(path: str, enabled: Set[str]) -> List[str]: """ Detects visual features and returns prefix strings to prepend to filename. enabled: set of strings from {"qr", "barcode", "face", "pano"} Returns e.g. ["QR_", "FACE_"] """ prefixes = [] try: img = cv2.imread(path) if img is None: return prefixes h, w = img.shape[:2] if "qr" in enabled: data, _, _ = cv2.QRCodeDetector().detectAndDecode(img) if data: prefixes.append("QR_") if "barcode" in enabled: try: ok, decoded, _, _ = cv2.barcode.BarcodeDetector().detectAndDecode(img) if ok and any(decoded): prefixes.append("BC_") except Exception: pass if "face" in enabled: scale = min(1.0, 640 / max(w, 1)) small = cv2.resize(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), (int(w * scale), int(h * scale))) if scale < 1.0 \ else cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) faces = _get_face_cascade().detectMultiScale( small, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) if len(faces) == 1: prefixes.append("FACE_") elif len(faces) > 1: prefixes.append("GROUP_") if "pano" in enabled and w / max(h, 1) > 2.5: prefixes.append("PANO_") except Exception: pass return prefixes # --------------------------------------------------------------------------- # Image processing # --------------------------------------------------------------------------- def _load_font(size: int) -> ImageFont.FreeTypeFont: candidates = [ "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf", "/usr/share/fonts/truetype/freefont/FreeSans.ttf", ] for c in candidates: if os.path.exists(c): try: return ImageFont.truetype(c, size) except Exception: pass return ImageFont.load_default() def _wm_xy(img_w: int, img_h: int, elem_w: int, elem_h: int, position: str, margin: int = 24) -> tuple: positions = { "br": (img_w - elem_w - margin, img_h - elem_h - margin), "bl": (margin, img_h - elem_h - margin), "tr": (img_w - elem_w - margin, margin), "tl": (margin, margin), "bc": ((img_w - elem_w) // 2, img_h - elem_h - margin), "tc": ((img_w - elem_w) // 2, margin), "center": ((img_w - elem_w) // 2, (img_h - elem_h) // 2), } return positions.get(position, positions["br"]) def apply_corrections(img: Image.Image, rotation: float = 0.0, brightness: float = 1.0, contrast: float = 1.0, saturation: float = 1.0) -> Image.Image: try: img = ImageOps.exif_transpose(img) except Exception: pass if rotation != 0.0: img = img.rotate(-rotation, expand=True, resample=Image.BICUBIC) if brightness != 1.0: img = ImageEnhance.Brightness(img).enhance(brightness) if contrast != 1.0: img = ImageEnhance.Contrast(img).enhance(contrast) if saturation != 1.0: img = ImageEnhance.Color(img).enhance(saturation) return img def apply_text_watermark(img: Image.Image, text: str, position: str = "br", font_size: int = 24, opacity: float = 0.7) -> Image.Image: if not text.strip(): return img alpha = int(opacity * 255) font = _load_font(font_size) base = img.convert("RGBA") overlay = Image.new("RGBA", base.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(overlay) bbox = draw.textbbox((0, 0), text, font=font) tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] x, y = _wm_xy(base.width, base.height, tw, th, position) # Shadow draw.text((x + 2, y + 2), text, font=font, fill=(0, 0, 0, min(255, alpha))) draw.text((x, y), text, font=font, fill=(255, 255, 255, alpha)) return Image.alpha_composite(base, overlay).convert("RGB") def apply_image_watermark(img: Image.Image, wm_path: str, position: str = "br", opacity: float = 0.6, scale: float = 0.2) -> Image.Image: if not wm_path or not os.path.isfile(wm_path): return img try: wm = Image.open(wm_path).convert("RGBA") target_w = max(10, int(img.width * scale)) target_h = int(wm.height * target_w / wm.width) wm = wm.resize((target_w, target_h), Image.LANCZOS) r, g, b, a = wm.split() a = a.point(lambda v: int(v * opacity)) wm = Image.merge("RGBA", (r, g, b, a)) base = img.convert("RGBA") x, y = _wm_xy(base.width, base.height, target_w, target_h, position) base.paste(wm, (x, y), wm) return base.convert("RGB") except Exception: return img def process_photo(path: str, rotation: float = 0.0, brightness: float = 1.0, contrast: float = 1.0, saturation: float = 1.0, text_watermark: Optional[dict] = None, image_watermark_path: Optional[str] = None, image_watermark_settings: Optional[dict] = None, exif_info: Optional[dict] = None) -> bytes: """Process a single photo and return JPEG bytes.""" img = Image.open(path) img = apply_corrections(img, rotation=rotation, brightness=brightness, contrast=contrast, saturation=saturation) if text_watermark: text = text_watermark.get("text", "") if exif_info: text = resolve_wm_template(text, exif_info) img = apply_text_watermark( img, text=text, position=text_watermark.get("position", "br"), font_size=text_watermark.get("font_size", 24), opacity=text_watermark.get("opacity", 0.7), ) if image_watermark_path and image_watermark_settings: img = apply_image_watermark( img, wm_path=image_watermark_path, position=image_watermark_settings.get("position", "br"), opacity=image_watermark_settings.get("opacity", 0.6), scale=image_watermark_settings.get("scale", 0.2), ) img = img.convert("RGB") buf = io.BytesIO() img.save(buf, "JPEG", quality=92) return buf.getvalue()