#!/usr/bin/env python3 """ Check for corrupted files in Immich by comparing database records with actual file sizes on disk. """ import json import os import subprocess import sys from pathlib import Path # Configuration IMMICH_API_URL = "http://localhost:2283/api" IMMICH_API_KEY = "GsWQUTR6EXlkKp1M82jDJ3KmzhM0fMAbbIbfHDyI" IMMICH_LIBRARY_PATH = "/tank/immich/library" # Minimum sizes for real media files (bytes) MIN_SIZES = { 'IMAGE': 10_000, # 10 KB - real photos are larger 'VIDEO': 100_000, # 100 KB - real videos are larger } # Suspiciously small threshold SUSPICIOUS_SIZE = 50_000 # 50 KB def get_all_assets(): """Fetch all assets from Immich API.""" print("Fetching assets from Immich API...") all_assets = [] page = 1 page_size = 1000 while True: cmd = [ "curl", "-s", "-H", f"x-api-key: {IMMICH_API_KEY}", "-H", "Content-Type: application/json", f"{IMMICH_API_URL}/search/metadata", "-d", json.dumps({"size": page_size, "page": page}) ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout) items = data.get('assets', {}).get('items', []) if not items: break all_assets.extend(items) print(f" Fetched {len(all_assets)} assets...") if len(items) < page_size: break page += 1 print(f" Total: {len(all_assets)} assets") return all_assets def check_file_on_disk(original_path: str) -> tuple[bool, int]: """ Check if file exists on disk and get its size. Returns (exists, size_bytes) """ # Convert container path to host path # /data/library/... -> /tank/immich/library/... disk_path = original_path.replace("/data/library", IMMICH_LIBRARY_PATH + "/library") disk_path = original_path.replace("/data/", IMMICH_LIBRARY_PATH + "/") path = Path(disk_path) if path.exists(): return True, path.stat().st_size return False, 0 def format_size(size_bytes: int) -> str: """Format bytes as human-readable size.""" if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024 * 1024: return f"{size_bytes / 1024:.1f} KB" elif size_bytes < 1024 * 1024 * 1024: return f"{size_bytes / (1024 * 1024):.1f} MB" else: return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" def analyze_assets(assets: list) -> dict: """Analyze all assets and categorize issues.""" results = { 'missing': [], # File doesn't exist on disk 'corrupted': [], # File exists but suspiciously small 'no_thumbnail': [], # File exists but no thumbnail generated 'ok': [], # File looks fine } print("\nAnalyzing files on disk...") for i, asset in enumerate(assets): if (i + 1) % 500 == 0: print(f" Checked {i + 1}/{len(assets)}...") asset_id = asset.get('id') filename = asset.get('originalFileName', 'unknown') original_path = asset.get('originalPath', '') asset_type = asset.get('type', 'IMAGE') thumbhash = asset.get('thumbhash') exists, disk_size = check_file_on_disk(original_path) info = { 'id': asset_id, 'filename': filename, 'path': original_path, 'type': asset_type, 'disk_size': disk_size, 'disk_size_human': format_size(disk_size), 'has_thumbnail': thumbhash is not None, } if not exists: results['missing'].append(info) elif disk_size < SUSPICIOUS_SIZE: results['corrupted'].append(info) elif not thumbhash: results['no_thumbnail'].append(info) else: results['ok'].append(info) return results def main(): # Get all assets assets = get_all_assets() # Analyze results = analyze_assets(assets) # Report print("\n" + "=" * 70) print("RESULTS") print("=" * 70) print(f"Total assets: {len(assets):,}") print(f"OK: {len(results['ok']):,}") print(f"Missing from disk: {len(results['missing']):,}") print(f"Corrupted (tiny): {len(results['corrupted']):,}") print(f"No thumbnail: {len(results['no_thumbnail']):,}") print("=" * 70) # Write detailed reports if results['corrupted']: print(f"\n--- CORRUPTED FILES (< {format_size(SUSPICIOUS_SIZE)}) ---") with open('corrupted_files.txt', 'w') as f: for item in sorted(results['corrupted'], key=lambda x: x['disk_size']): line = f"{item['disk_size_human']:>10} {item['filename']} ({item['path']})" print(line) f.write(f"{item['filename']}\t{item['disk_size']}\t{item['path']}\n") print(f"\nList saved to: corrupted_files.txt") if results['missing']: print(f"\n--- MISSING FILES ---") with open('missing_from_disk.txt', 'w') as f: for item in results['missing'][:20]: # Show first 20 print(f" {item['filename']} ({item['path']})") f.write(f"{item['filename']}\t{item['path']}\n") if len(results['missing']) > 20: print(f" ... and {len(results['missing']) - 20} more") for item in results['missing'][20:]: f.write(f"{item['filename']}\t{item['path']}\n") print(f"\nList saved to: missing_from_disk.txt") if results['no_thumbnail']: print(f"\n--- NO THUMBNAIL (first 20) ---") with open('no_thumbnail.txt', 'w') as f: for item in results['no_thumbnail'][:20]: print(f" {item['disk_size_human']:>10} {item['filename']}") f.write(f"{item['filename']}\t{item['disk_size']}\t{item['path']}\n") if len(results['no_thumbnail']) > 20: print(f" ... and {len(results['no_thumbnail']) - 20} more") for item in results['no_thumbnail'][20:]: f.write(f"{item['filename']}\t{item['disk_size']}\t{item['path']}\n") print(f"\nList saved to: no_thumbnail.txt") if __name__ == "__main__": main()