immich-compare/check-corruption.py

191 lines
6.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Check for corrupted files in Immich by comparing database records with actual file sizes on disk.
"""
import json
import os
import subprocess
import sys
from pathlib import Path
# Configuration
IMMICH_API_URL = "http://localhost:2283/api"
IMMICH_API_KEY = "GsWQUTR6EXlkKp1M82jDJ3KmzhM0fMAbbIbfHDyI"
IMMICH_LIBRARY_PATH = "/tank/immich/library"
# Minimum sizes for real media files (bytes)
MIN_SIZES = {
'IMAGE': 10_000, # 10 KB - real photos are larger
'VIDEO': 100_000, # 100 KB - real videos are larger
}
# Suspiciously small threshold
SUSPICIOUS_SIZE = 50_000 # 50 KB
def get_all_assets():
"""Fetch all assets from Immich API."""
print("Fetching assets from Immich API...")
all_assets = []
page = 1
page_size = 1000
while True:
cmd = [
"curl", "-s",
"-H", f"x-api-key: {IMMICH_API_KEY}",
"-H", "Content-Type: application/json",
f"{IMMICH_API_URL}/search/metadata",
"-d", json.dumps({"size": page_size, "page": page})
]
result = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(result.stdout)
items = data.get('assets', {}).get('items', [])
if not items:
break
all_assets.extend(items)
print(f" Fetched {len(all_assets)} assets...")
if len(items) < page_size:
break
page += 1
print(f" Total: {len(all_assets)} assets")
return all_assets
def check_file_on_disk(original_path: str) -> tuple[bool, int]:
"""
Check if file exists on disk and get its size.
Returns (exists, size_bytes)
"""
# Convert container path to host path
# /data/library/... -> /tank/immich/library/...
disk_path = original_path.replace("/data/library", IMMICH_LIBRARY_PATH + "/library")
disk_path = original_path.replace("/data/", IMMICH_LIBRARY_PATH + "/")
path = Path(disk_path)
if path.exists():
return True, path.stat().st_size
return False, 0
def format_size(size_bytes: int) -> str:
"""Format bytes as human-readable size."""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"
def analyze_assets(assets: list) -> dict:
"""Analyze all assets and categorize issues."""
results = {
'missing': [], # File doesn't exist on disk
'corrupted': [], # File exists but suspiciously small
'no_thumbnail': [], # File exists but no thumbnail generated
'ok': [], # File looks fine
}
print("\nAnalyzing files on disk...")
for i, asset in enumerate(assets):
if (i + 1) % 500 == 0:
print(f" Checked {i + 1}/{len(assets)}...")
asset_id = asset.get('id')
filename = asset.get('originalFileName', 'unknown')
original_path = asset.get('originalPath', '')
asset_type = asset.get('type', 'IMAGE')
thumbhash = asset.get('thumbhash')
exists, disk_size = check_file_on_disk(original_path)
info = {
'id': asset_id,
'filename': filename,
'path': original_path,
'type': asset_type,
'disk_size': disk_size,
'disk_size_human': format_size(disk_size),
'has_thumbnail': thumbhash is not None,
}
if not exists:
results['missing'].append(info)
elif disk_size < SUSPICIOUS_SIZE:
results['corrupted'].append(info)
elif not thumbhash:
results['no_thumbnail'].append(info)
else:
results['ok'].append(info)
return results
def main():
# Get all assets
assets = get_all_assets()
# Analyze
results = analyze_assets(assets)
# Report
print("\n" + "=" * 70)
print("RESULTS")
print("=" * 70)
print(f"Total assets: {len(assets):,}")
print(f"OK: {len(results['ok']):,}")
print(f"Missing from disk: {len(results['missing']):,}")
print(f"Corrupted (tiny): {len(results['corrupted']):,}")
print(f"No thumbnail: {len(results['no_thumbnail']):,}")
print("=" * 70)
# Write detailed reports
if results['corrupted']:
print(f"\n--- CORRUPTED FILES (< {format_size(SUSPICIOUS_SIZE)}) ---")
with open('corrupted_files.txt', 'w') as f:
for item in sorted(results['corrupted'], key=lambda x: x['disk_size']):
line = f"{item['disk_size_human']:>10} {item['filename']} ({item['path']})"
print(line)
f.write(f"{item['filename']}\t{item['disk_size']}\t{item['path']}\n")
print(f"\nList saved to: corrupted_files.txt")
if results['missing']:
print(f"\n--- MISSING FILES ---")
with open('missing_from_disk.txt', 'w') as f:
for item in results['missing'][:20]: # Show first 20
print(f" {item['filename']} ({item['path']})")
f.write(f"{item['filename']}\t{item['path']}\n")
if len(results['missing']) > 20:
print(f" ... and {len(results['missing']) - 20} more")
for item in results['missing'][20:]:
f.write(f"{item['filename']}\t{item['path']}\n")
print(f"\nList saved to: missing_from_disk.txt")
if results['no_thumbnail']:
print(f"\n--- NO THUMBNAIL (first 20) ---")
with open('no_thumbnail.txt', 'w') as f:
for item in results['no_thumbnail'][:20]:
print(f" {item['disk_size_human']:>10} {item['filename']}")
f.write(f"{item['filename']}\t{item['disk_size']}\t{item['path']}\n")
if len(results['no_thumbnail']) > 20:
print(f" ... and {len(results['no_thumbnail']) - 20} more")
for item in results['no_thumbnail'][20:]:
f.write(f"{item['filename']}\t{item['disk_size']}\t{item['path']}\n")
print(f"\nList saved to: no_thumbnail.txt")
if __name__ == "__main__":
main()