immich-compare/immich-compare.py

242 lines
7.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Photo Deduplication Tool: Compare old Windows drive against Immich library.
Identifies photos/videos on the old drive (via SSH to Mac Mini) that are NOT
already in Immich, using SHA-1 checksum comparison.
"""
import argparse
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
# Configuration
IMMICH_DB_CONTAINER = "immich_postgres"
IMMICH_DB_NAME = "immich"
IMMICH_DB_USER = "postgres"
MAC_MINI_HOST = "macmini"
OLD_DRIVE_PATH = "/Volumes/Untitled/Users/Johan/Mylio/"
# File extensions to check (lowercase)
MEDIA_EXTENSIONS = {
'.jpg', '.jpeg', '.png', '.heic', '.heif',
'.mov', '.mp4', '.avi', '.gif', '.m4v', '.mkv', '.webp',
'.tiff', '.tif', '.bmp', '.raw', '.cr2', '.nef', '.arw', '.dng',
'.3gp', '.mts', '.webm'
}
# Directories to skip (Mylio-generated content, not actual photos)
SKIP_DIRECTORIES = {
'Generated Images',
}
def get_immich_checksums() -> set[str]:
"""Export all SHA-1 checksums from Immich's PostgreSQL database."""
print("Fetching checksums from Immich database...")
query = "SELECT encode(checksum, 'hex') FROM asset WHERE \"deletedAt\" IS NULL AND checksum IS NOT NULL;"
cmd = [
"docker", "exec", IMMICH_DB_CONTAINER,
"psql", "-U", IMMICH_DB_USER, "-d", IMMICH_DB_NAME,
"-t", "-A", "-c", query
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
checksums = set(line.strip().lower() for line in result.stdout.strip().split('\n') if line.strip())
print(f" Found {len(checksums):,} checksums in Immich")
return checksums
except subprocess.CalledProcessError as e:
print(f"Error querying Immich database: {e}", file=sys.stderr)
print(f"stderr: {e.stderr}", file=sys.stderr)
sys.exit(1)
def build_remote_scan_script(include_generated: bool = False) -> str:
"""Build the shell script to run on Mac Mini for scanning files."""
extensions_pattern = " -o ".join(f'-iname "*.{ext.lstrip(".")}"' for ext in MEDIA_EXTENSIONS)
# Build prune patterns for directories to skip
if include_generated:
prune_pattern = ""
else:
prune_dirs = " -o ".join(f'-name "{d}"' for d in SKIP_DIRECTORIES)
prune_pattern = f'\\( {prune_dirs} \\) -prune -o'
script = f'''
set -e
cd "{OLD_DRIVE_PATH}" 2>/dev/null || {{ echo "ERROR: Cannot access {OLD_DRIVE_PATH}" >&2; exit 1; }}
# Find all media files and calculate SHA-1
find . {prune_pattern} -type f \\( {extensions_pattern} \\) -print0 2>/dev/null | while IFS= read -r -d '' file; do
# Calculate SHA-1 checksum
checksum=$(shasum -a 1 "$file" 2>/dev/null | cut -d' ' -f1)
if [ -n "$checksum" ]; then
echo "$checksum $file"
fi
done
'''
return script
def scan_old_drive(checksums: set[str], include_generated: bool = False) -> tuple[list[str], int, int, int]:
"""
SSH to Mac Mini and scan the old drive, comparing against Immich checksums.
Returns: (missing_files, checked_count, found_count, skipped_count)
"""
print(f"\nScanning old drive via SSH to {MAC_MINI_HOST}...")
print(f" Path: {OLD_DRIVE_PATH}")
if not include_generated:
print(f" Skipping directories: {', '.join(SKIP_DIRECTORIES)}")
script = build_remote_scan_script(include_generated)
cmd = ["ssh", MAC_MINI_HOST, "bash -s"]
missing_files = []
checked_count = 0
found_count = 0
skipped_count = 0
try:
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Send script to remote bash
stdout, stderr = process.communicate(input=script, timeout=7200) # 2 hour timeout
if process.returncode != 0:
print(f"Error scanning remote drive: {stderr}", file=sys.stderr)
sys.exit(1)
# Process results
for line in stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split(' ', 1)
if len(parts) != 2:
continue
checksum, filepath = parts
checksum = checksum.lower().strip()
filepath = filepath.strip()
checked_count += 1
if checksum in checksums:
found_count += 1
else:
# Reconstruct full path
full_path = str(Path(OLD_DRIVE_PATH) / filepath.lstrip('./'))
missing_files.append(full_path)
# Progress update every 1000 files
if checked_count % 1000 == 0:
print(f" Processed {checked_count:,} files... ({found_count:,} in Immich, {len(missing_files):,} missing)")
return missing_files, checked_count, found_count, skipped_count
except subprocess.TimeoutExpired:
print("Error: Remote scan timed out after 2 hours", file=sys.stderr)
process.kill()
sys.exit(1)
except Exception as e:
print(f"Error during remote scan: {e}", file=sys.stderr)
sys.exit(1)
def analyze_missing_files(missing_files: list[str]) -> dict[str, list[str]]:
"""Group missing files by their parent directory for better analysis."""
by_folder = defaultdict(list)
for filepath in missing_files:
# Get the immediate parent folder name relative to Mylio root
rel_path = filepath.replace(OLD_DRIVE_PATH, '')
parts = rel_path.split('/')
if len(parts) >= 2:
folder = '/'.join(parts[:2]) # e.g., "Mylio/2020" or "Apple Photos"
else:
folder = parts[0] if parts else "root"
by_folder[folder].append(filepath)
return dict(by_folder)
def write_report(missing_files: list[str], output_file: Path):
"""Write the list of missing files to a text file."""
with open(output_file, 'w') as f:
for filepath in sorted(missing_files):
f.write(f"{filepath}\n")
print(f" Full list written to: {output_file}")
def main():
parser = argparse.ArgumentParser(
description="Compare photos/videos on old drive against Immich library"
)
parser.add_argument(
"--output", "-o",
type=Path,
default=Path("missing_files.txt"),
help="Output file for missing files list (default: missing_files.txt)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Only fetch Immich checksums, don't scan remote drive"
)
parser.add_argument(
"--include-generated",
action="store_true",
help="Include 'Generated Images' folder (Mylio thumbnails/previews)"
)
args = parser.parse_args()
# Step 1: Get Immich checksums
immich_checksums = get_immich_checksums()
if args.dry_run:
print("\n[DRY RUN] Skipping remote scan")
return
# Step 2: Scan old drive and compare
missing_files, checked_count, found_count, _ = scan_old_drive(
immich_checksums,
include_generated=args.include_generated
)
# Step 3: Report results
print("\n" + "=" * 60)
print("RESULTS")
print("=" * 60)
print(f"Total files checked: {checked_count:,}")
print(f"Already in Immich: {found_count:,}")
print(f"NOT in Immich: {len(missing_files):,}")
print("=" * 60)
if missing_files:
# Analyze by folder
by_folder = analyze_missing_files(missing_files)
print("\nBreakdown by folder:")
for folder in sorted(by_folder.keys()):
print(f" {folder}: {len(by_folder[folder]):,} files")
write_report(missing_files, args.output)
print(f"\nReview {args.output} to see files that need to be imported.")
else:
print("\nAll files from the old drive are already in Immich!")
if __name__ == "__main__":
main()