#!/usr/bin/env python3 """ Photo Deduplication Tool: Compare old Windows drive against Immich library. Identifies photos/videos on the old drive (via SSH to Mac Mini) that are NOT already in Immich, using SHA-1 checksum comparison. """ import argparse import subprocess import sys from collections import defaultdict from pathlib import Path # Configuration IMMICH_DB_CONTAINER = "immich_postgres" IMMICH_DB_NAME = "immich" IMMICH_DB_USER = "postgres" MAC_MINI_HOST = "macmini" OLD_DRIVE_PATH = "/Volumes/Untitled/Users/Johan/Mylio/" # File extensions to check (lowercase) MEDIA_EXTENSIONS = { '.jpg', '.jpeg', '.png', '.heic', '.heif', '.mov', '.mp4', '.avi', '.gif', '.m4v', '.mkv', '.webp', '.tiff', '.tif', '.bmp', '.raw', '.cr2', '.nef', '.arw', '.dng', '.3gp', '.mts', '.webm' } # Directories to skip (Mylio-generated content, not actual photos) SKIP_DIRECTORIES = { 'Generated Images', } def get_immich_checksums() -> set[str]: """Export all SHA-1 checksums from Immich's PostgreSQL database.""" print("Fetching checksums from Immich database...") query = "SELECT encode(checksum, 'hex') FROM asset WHERE \"deletedAt\" IS NULL AND checksum IS NOT NULL;" cmd = [ "docker", "exec", IMMICH_DB_CONTAINER, "psql", "-U", IMMICH_DB_USER, "-d", IMMICH_DB_NAME, "-t", "-A", "-c", query ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) checksums = set(line.strip().lower() for line in result.stdout.strip().split('\n') if line.strip()) print(f" Found {len(checksums):,} checksums in Immich") return checksums except subprocess.CalledProcessError as e: print(f"Error querying Immich database: {e}", file=sys.stderr) print(f"stderr: {e.stderr}", file=sys.stderr) sys.exit(1) def build_remote_scan_script(include_generated: bool = False) -> str: """Build the shell script to run on Mac Mini for scanning files.""" extensions_pattern = " -o ".join(f'-iname "*.{ext.lstrip(".")}"' for ext in MEDIA_EXTENSIONS) # Build prune patterns for directories to skip if include_generated: prune_pattern = "" else: prune_dirs = " -o ".join(f'-name "{d}"' for d in SKIP_DIRECTORIES) prune_pattern = f'\\( {prune_dirs} \\) -prune -o' script = f''' set -e cd "{OLD_DRIVE_PATH}" 2>/dev/null || {{ echo "ERROR: Cannot access {OLD_DRIVE_PATH}" >&2; exit 1; }} # Find all media files and calculate SHA-1 find . {prune_pattern} -type f \\( {extensions_pattern} \\) -print0 2>/dev/null | while IFS= read -r -d '' file; do # Calculate SHA-1 checksum checksum=$(shasum -a 1 "$file" 2>/dev/null | cut -d' ' -f1) if [ -n "$checksum" ]; then echo "$checksum $file" fi done ''' return script def scan_old_drive(checksums: set[str], include_generated: bool = False) -> tuple[list[str], int, int, int]: """ SSH to Mac Mini and scan the old drive, comparing against Immich checksums. Returns: (missing_files, checked_count, found_count, skipped_count) """ print(f"\nScanning old drive via SSH to {MAC_MINI_HOST}...") print(f" Path: {OLD_DRIVE_PATH}") if not include_generated: print(f" Skipping directories: {', '.join(SKIP_DIRECTORIES)}") script = build_remote_scan_script(include_generated) cmd = ["ssh", MAC_MINI_HOST, "bash -s"] missing_files = [] checked_count = 0 found_count = 0 skipped_count = 0 try: process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Send script to remote bash stdout, stderr = process.communicate(input=script, timeout=7200) # 2 hour timeout if process.returncode != 0: print(f"Error scanning remote drive: {stderr}", file=sys.stderr) sys.exit(1) # Process results for line in stdout.strip().split('\n'): if not line.strip(): continue parts = line.split(' ', 1) if len(parts) != 2: continue checksum, filepath = parts checksum = checksum.lower().strip() filepath = filepath.strip() checked_count += 1 if checksum in checksums: found_count += 1 else: # Reconstruct full path full_path = str(Path(OLD_DRIVE_PATH) / filepath.lstrip('./')) missing_files.append(full_path) # Progress update every 1000 files if checked_count % 1000 == 0: print(f" Processed {checked_count:,} files... ({found_count:,} in Immich, {len(missing_files):,} missing)") return missing_files, checked_count, found_count, skipped_count except subprocess.TimeoutExpired: print("Error: Remote scan timed out after 2 hours", file=sys.stderr) process.kill() sys.exit(1) except Exception as e: print(f"Error during remote scan: {e}", file=sys.stderr) sys.exit(1) def analyze_missing_files(missing_files: list[str]) -> dict[str, list[str]]: """Group missing files by their parent directory for better analysis.""" by_folder = defaultdict(list) for filepath in missing_files: # Get the immediate parent folder name relative to Mylio root rel_path = filepath.replace(OLD_DRIVE_PATH, '') parts = rel_path.split('/') if len(parts) >= 2: folder = '/'.join(parts[:2]) # e.g., "Mylio/2020" or "Apple Photos" else: folder = parts[0] if parts else "root" by_folder[folder].append(filepath) return dict(by_folder) def write_report(missing_files: list[str], output_file: Path): """Write the list of missing files to a text file.""" with open(output_file, 'w') as f: for filepath in sorted(missing_files): f.write(f"{filepath}\n") print(f" Full list written to: {output_file}") def main(): parser = argparse.ArgumentParser( description="Compare photos/videos on old drive against Immich library" ) parser.add_argument( "--output", "-o", type=Path, default=Path("missing_files.txt"), help="Output file for missing files list (default: missing_files.txt)" ) parser.add_argument( "--dry-run", action="store_true", help="Only fetch Immich checksums, don't scan remote drive" ) parser.add_argument( "--include-generated", action="store_true", help="Include 'Generated Images' folder (Mylio thumbnails/previews)" ) args = parser.parse_args() # Step 1: Get Immich checksums immich_checksums = get_immich_checksums() if args.dry_run: print("\n[DRY RUN] Skipping remote scan") return # Step 2: Scan old drive and compare missing_files, checked_count, found_count, _ = scan_old_drive( immich_checksums, include_generated=args.include_generated ) # Step 3: Report results print("\n" + "=" * 60) print("RESULTS") print("=" * 60) print(f"Total files checked: {checked_count:,}") print(f"Already in Immich: {found_count:,}") print(f"NOT in Immich: {len(missing_files):,}") print("=" * 60) if missing_files: # Analyze by folder by_folder = analyze_missing_files(missing_files) print("\nBreakdown by folder:") for folder in sorted(by_folder.keys()): print(f" {folder}: {len(by_folder[folder]):,} files") write_report(missing_files, args.output) print(f"\nReview {args.output} to see files that need to be imported.") else: print("\nAll files from the old drive are already in Immich!") if __name__ == "__main__": main()