242 lines
7.7 KiB
Python
Executable File
242 lines
7.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Photo Deduplication Tool: Compare old Windows drive against Immich library.
|
|
|
|
Identifies photos/videos on the old drive (via SSH to Mac Mini) that are NOT
|
|
already in Immich, using SHA-1 checksum comparison.
|
|
"""
|
|
|
|
import argparse
|
|
import subprocess
|
|
import sys
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
# Configuration
|
|
IMMICH_DB_CONTAINER = "immich_postgres"
|
|
IMMICH_DB_NAME = "immich"
|
|
IMMICH_DB_USER = "postgres"
|
|
|
|
MAC_MINI_HOST = "macmini"
|
|
OLD_DRIVE_PATH = "/Volumes/Untitled/Users/Johan/Mylio/"
|
|
|
|
# File extensions to check (lowercase)
|
|
MEDIA_EXTENSIONS = {
|
|
'.jpg', '.jpeg', '.png', '.heic', '.heif',
|
|
'.mov', '.mp4', '.avi', '.gif', '.m4v', '.mkv', '.webp',
|
|
'.tiff', '.tif', '.bmp', '.raw', '.cr2', '.nef', '.arw', '.dng',
|
|
'.3gp', '.mts', '.webm'
|
|
}
|
|
|
|
# Directories to skip (Mylio-generated content, not actual photos)
|
|
SKIP_DIRECTORIES = {
|
|
'Generated Images',
|
|
}
|
|
|
|
|
|
def get_immich_checksums() -> set[str]:
|
|
"""Export all SHA-1 checksums from Immich's PostgreSQL database."""
|
|
print("Fetching checksums from Immich database...")
|
|
|
|
query = "SELECT encode(checksum, 'hex') FROM asset WHERE \"deletedAt\" IS NULL AND checksum IS NOT NULL;"
|
|
|
|
cmd = [
|
|
"docker", "exec", IMMICH_DB_CONTAINER,
|
|
"psql", "-U", IMMICH_DB_USER, "-d", IMMICH_DB_NAME,
|
|
"-t", "-A", "-c", query
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
checksums = set(line.strip().lower() for line in result.stdout.strip().split('\n') if line.strip())
|
|
print(f" Found {len(checksums):,} checksums in Immich")
|
|
return checksums
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error querying Immich database: {e}", file=sys.stderr)
|
|
print(f"stderr: {e.stderr}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def build_remote_scan_script(include_generated: bool = False) -> str:
|
|
"""Build the shell script to run on Mac Mini for scanning files."""
|
|
extensions_pattern = " -o ".join(f'-iname "*.{ext.lstrip(".")}"' for ext in MEDIA_EXTENSIONS)
|
|
|
|
# Build prune patterns for directories to skip
|
|
if include_generated:
|
|
prune_pattern = ""
|
|
else:
|
|
prune_dirs = " -o ".join(f'-name "{d}"' for d in SKIP_DIRECTORIES)
|
|
prune_pattern = f'\\( {prune_dirs} \\) -prune -o'
|
|
|
|
script = f'''
|
|
set -e
|
|
cd "{OLD_DRIVE_PATH}" 2>/dev/null || {{ echo "ERROR: Cannot access {OLD_DRIVE_PATH}" >&2; exit 1; }}
|
|
|
|
# Find all media files and calculate SHA-1
|
|
find . {prune_pattern} -type f \\( {extensions_pattern} \\) -print0 2>/dev/null | while IFS= read -r -d '' file; do
|
|
# Calculate SHA-1 checksum
|
|
checksum=$(shasum -a 1 "$file" 2>/dev/null | cut -d' ' -f1)
|
|
if [ -n "$checksum" ]; then
|
|
echo "$checksum $file"
|
|
fi
|
|
done
|
|
'''
|
|
return script
|
|
|
|
|
|
def scan_old_drive(checksums: set[str], include_generated: bool = False) -> tuple[list[str], int, int, int]:
|
|
"""
|
|
SSH to Mac Mini and scan the old drive, comparing against Immich checksums.
|
|
|
|
Returns: (missing_files, checked_count, found_count, skipped_count)
|
|
"""
|
|
print(f"\nScanning old drive via SSH to {MAC_MINI_HOST}...")
|
|
print(f" Path: {OLD_DRIVE_PATH}")
|
|
if not include_generated:
|
|
print(f" Skipping directories: {', '.join(SKIP_DIRECTORIES)}")
|
|
|
|
script = build_remote_scan_script(include_generated)
|
|
|
|
cmd = ["ssh", MAC_MINI_HOST, "bash -s"]
|
|
|
|
missing_files = []
|
|
checked_count = 0
|
|
found_count = 0
|
|
skipped_count = 0
|
|
|
|
try:
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
|
|
# Send script to remote bash
|
|
stdout, stderr = process.communicate(input=script, timeout=7200) # 2 hour timeout
|
|
|
|
if process.returncode != 0:
|
|
print(f"Error scanning remote drive: {stderr}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Process results
|
|
for line in stdout.strip().split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
|
|
parts = line.split(' ', 1)
|
|
if len(parts) != 2:
|
|
continue
|
|
|
|
checksum, filepath = parts
|
|
checksum = checksum.lower().strip()
|
|
filepath = filepath.strip()
|
|
|
|
checked_count += 1
|
|
|
|
if checksum in checksums:
|
|
found_count += 1
|
|
else:
|
|
# Reconstruct full path
|
|
full_path = str(Path(OLD_DRIVE_PATH) / filepath.lstrip('./'))
|
|
missing_files.append(full_path)
|
|
|
|
# Progress update every 1000 files
|
|
if checked_count % 1000 == 0:
|
|
print(f" Processed {checked_count:,} files... ({found_count:,} in Immich, {len(missing_files):,} missing)")
|
|
|
|
return missing_files, checked_count, found_count, skipped_count
|
|
|
|
except subprocess.TimeoutExpired:
|
|
print("Error: Remote scan timed out after 2 hours", file=sys.stderr)
|
|
process.kill()
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"Error during remote scan: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def analyze_missing_files(missing_files: list[str]) -> dict[str, list[str]]:
|
|
"""Group missing files by their parent directory for better analysis."""
|
|
by_folder = defaultdict(list)
|
|
for filepath in missing_files:
|
|
# Get the immediate parent folder name relative to Mylio root
|
|
rel_path = filepath.replace(OLD_DRIVE_PATH, '')
|
|
parts = rel_path.split('/')
|
|
if len(parts) >= 2:
|
|
folder = '/'.join(parts[:2]) # e.g., "Mylio/2020" or "Apple Photos"
|
|
else:
|
|
folder = parts[0] if parts else "root"
|
|
by_folder[folder].append(filepath)
|
|
return dict(by_folder)
|
|
|
|
|
|
def write_report(missing_files: list[str], output_file: Path):
|
|
"""Write the list of missing files to a text file."""
|
|
with open(output_file, 'w') as f:
|
|
for filepath in sorted(missing_files):
|
|
f.write(f"{filepath}\n")
|
|
print(f" Full list written to: {output_file}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Compare photos/videos on old drive against Immich library"
|
|
)
|
|
parser.add_argument(
|
|
"--output", "-o",
|
|
type=Path,
|
|
default=Path("missing_files.txt"),
|
|
help="Output file for missing files list (default: missing_files.txt)"
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Only fetch Immich checksums, don't scan remote drive"
|
|
)
|
|
parser.add_argument(
|
|
"--include-generated",
|
|
action="store_true",
|
|
help="Include 'Generated Images' folder (Mylio thumbnails/previews)"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Step 1: Get Immich checksums
|
|
immich_checksums = get_immich_checksums()
|
|
|
|
if args.dry_run:
|
|
print("\n[DRY RUN] Skipping remote scan")
|
|
return
|
|
|
|
# Step 2: Scan old drive and compare
|
|
missing_files, checked_count, found_count, _ = scan_old_drive(
|
|
immich_checksums,
|
|
include_generated=args.include_generated
|
|
)
|
|
|
|
# Step 3: Report results
|
|
print("\n" + "=" * 60)
|
|
print("RESULTS")
|
|
print("=" * 60)
|
|
print(f"Total files checked: {checked_count:,}")
|
|
print(f"Already in Immich: {found_count:,}")
|
|
print(f"NOT in Immich: {len(missing_files):,}")
|
|
print("=" * 60)
|
|
|
|
if missing_files:
|
|
# Analyze by folder
|
|
by_folder = analyze_missing_files(missing_files)
|
|
print("\nBreakdown by folder:")
|
|
for folder in sorted(by_folder.keys()):
|
|
print(f" {folder}: {len(by_folder[folder]):,} files")
|
|
|
|
write_report(missing_files, args.output)
|
|
print(f"\nReview {args.output} to see files that need to be imported.")
|
|
else:
|
|
print("\nAll files from the old drive are already in Immich!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|