clawd/tmp/contacts/dedup.py

249 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
vCard deduplicator — conservative, correct.
Rules (in order):
1. Same exact email address → same person
2. Same phone (appears ≤2 times total — personal phones only) → same person
3. Same normalized full name (≥2 significant words) → same person
Post-merge guard:
If a merged record would have >4 different email domains → bad chain merge.
Revert: keep each original record separately.
"""
import glob, re
from collections import defaultdict
INPUT_DIR = "/home/johan/clawd/tmp/contacts"
OUTPUT = "/home/johan/clawd/tmp/contacts/merged.vcf"
# ── parser ────────────────────────────────────────────────────────────────────
def parse_vcf(path):
with open(path, encoding='utf-8', errors='replace') as f:
raw = re.sub(r'\r?\n[ \t]', '', f.read())
blocks = [b for b in re.split(r'(?=BEGIN:VCARD)', raw, flags=re.I)
if b.strip().upper().startswith('BEGIN:VCARD')]
cards = []
for block in blocks:
card = defaultdict(list)
for line in block.splitlines():
if ':' not in line: continue
k, _, v = line.partition(':')
k = k.strip().upper(); v = v.strip()
if k in ('BEGIN', 'END', 'VERSION'): continue
card[k].append(v)
cards.append(dict(card))
return cards
# ── field helpers ─────────────────────────────────────────────────────────────
def get_field(card, prefix):
for k, vs in card.items():
if re.match(r'(ITEM\d+\.)?' + prefix + r'($|[;:])', k, re.I):
return vs[0] if vs else ''
return ''
def get_emails(card):
out = set()
for k, vs in card.items():
if re.match(r'(ITEM\d+\.)?EMAIL', k, re.I):
for v in vs:
if v and '@' in v:
out.add(v.strip().lower())
return out
def get_phones(card):
out = {} # norm → original
for k, vs in card.items():
if re.match(r'(ITEM\d+\.)?TEL', k, re.I):
for v in vs:
if v:
digits = re.sub(r'\D', '', v)
norm = digits[-9:] if len(digits) >= 9 else digits
if norm:
out[norm] = v.strip()
return out
def get_name(card):
fn = get_field(card, 'FN').strip()
if not fn:
fn = get_field(card, 'ORG').strip()
return fn
def normalize_name(name):
"""Lowercase + sort words (order-independent matching)."""
words = re.sub(r'\s+', ' ', name.strip().lower()).split()
return ' '.join(sorted(words))
def completeness(card):
return sum(len([v for v in vs if v]) for vs in card.values())
def email_domains(emails):
return set(e.split('@')[1] for e in emails if '@' in e)
# ── merge ─────────────────────────────────────────────────────────────────────
NOISE_CATS = re.compile(
r'^(imported on .+|restored from google.*|mycontacts)$', re.I)
def merge_cards(cards):
base = max(cards, key=completeness)
all_emails = set()
all_phones = {}
all_cats = []
for c in cards:
all_emails |= get_emails(c)
all_phones.update(get_phones(c))
for k, vs in c.items():
if k.upper() == 'CATEGORIES':
for v in vs:
all_cats.extend(x.strip() for x in v.split(','))
all_cats = list(dict.fromkeys(
c for c in all_cats if not NOISE_CATS.match(c)))
out = {}
skip = ('EMAIL', 'TEL', 'CATEGORIES')
for k, vs in base.items():
if any(re.match(r'(ITEM\d+\.)?' + p, k, re.I) for p in skip):
continue
out[k] = list(vs)
for i, email in enumerate(sorted(all_emails)):
out[f'ITEM{i+1}.EMAIL;TYPE=INTERNET'] = [email]
for i, (norm, orig) in enumerate(all_phones.items()):
out[f'TEL_PHONE_{i}'] = [orig]
if all_cats:
out['CATEGORIES'] = [','.join(all_cats)]
return out
def serialize(card):
lines = ['BEGIN:VCARD', 'VERSION:3.0']
priority = ['FN','N','ORG','TITLE','EMAIL','TEL','ADR','URL',
'NOTE','BDAY','PHOTO','CATEGORIES','X-']
def key_order(k):
ku = k.upper()
for i, p in enumerate(priority):
if ku.startswith(p): return i
return 99
for k in sorted(card.keys(), key=key_order):
display = re.sub(r'^TEL_PHONE_\d+$', 'TEL;TYPE=CELL', k)
for v in card[k]:
if v:
lines.append(f'{display}:{v}')
lines.append('END:VCARD')
return '\n'.join(lines)
# ── union-find ────────────────────────────────────────────────────────────────
def make_uf(n):
g = list(range(n))
def find(x):
while g[x] != x:
g[x] = g[g[x]]; x = g[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra != rb: g[rb] = ra
return find, union
# ── main ──────────────────────────────────────────────────────────────────────
def main():
files = sorted(glob.glob(f'{INPUT_DIR}/*.vcf'))
files = [f for f in files
if not any(x in f for x in ('merged','final','dedup','postprocess'))]
all_cards = []
for f in files:
cards = parse_vcf(f)
print(f" {f.split('/')[-1]}: {len(cards)}")
all_cards.extend(cards)
n = len(all_cards)
print(f"Total: {n}")
find, union = make_uf(n)
# ── Rule 1: same email ────────────────────────────────────────────────────
email_map = {}
for i, c in enumerate(all_cards):
for e in get_emails(c):
if e in email_map:
union(i, email_map[e])
else:
email_map[e] = i
# ── Rule 2: same phone (personal only — skip phones in 3+ contacts) ───────
phone_freq = defaultdict(int)
for c in all_cards:
for norm in get_phones(c):
phone_freq[norm] += 1
phone_map = {}
for i, c in enumerate(all_cards):
for norm, orig in get_phones(c).items():
if phone_freq[norm] >= 3:
continue # shared/switchboard — skip
if norm in phone_map:
union(i, phone_map[norm])
else:
phone_map[norm] = i
# ── Rule 3: exact full name (≥2 significant words) ────────────────────────
name_map = {}
for i, c in enumerate(all_cards):
name = get_name(c)
sig = [w for w in name.split() if len(w) > 2]
if len(sig) < 2:
continue # single word / too short — skip
key = normalize_name(name)
if key in name_map:
union(i, name_map[key])
else:
name_map[key] = i
# ── Group ─────────────────────────────────────────────────────────────────
groups = defaultdict(list)
for i, c in enumerate(all_cards):
groups[find(i)].append(c)
# ── Merge + post-merge guard ──────────────────────────────────────────────
MAX_DOMAINS = 4 # >4 email domains = chain merge gone wrong → revert
result = []
reverted = 0
dup_removed = 0
for root, grp in groups.items():
if len(grp) == 1:
result.append(grp[0])
continue
merged = merge_cards(grp)
domains = email_domains(get_emails(merged))
if len(domains) > MAX_DOMAINS:
# Bad chain merge — keep each record individually
reverted += len(grp) - 1
for c in grp:
result.append(c)
else:
dup_removed += len(grp) - 1
result.append(merged)
print(f"Duplicates removed: {dup_removed}")
print(f"Reverted (chain merges): {reverted} groups → individual records")
print(f"Output: {len(result)}")
with open(OUTPUT, 'w', encoding='utf-8') as f:
f.write('\n\n'.join(serialize(c) for c in result) + '\n')
print(f"Written: {OUTPUT}")
if __name__ == '__main__':
main()