40 lines
1.4 KiB
Python
40 lines
1.4 KiB
Python
import sqlite3
|
|
from pathlib import Path
|
|
from poller.sources.base import Reading
|
|
|
|
|
|
class Dedup:
|
|
"""SQLite-backed deduplication. Tracks which readings have been pushed."""
|
|
|
|
def __init__(self, db_path: str = "dedup.db"):
|
|
self.conn = sqlite3.connect(db_path)
|
|
self.conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS seen (
|
|
source_type TEXT,
|
|
source_user_id TEXT,
|
|
metric TEXT,
|
|
timestamp INTEGER,
|
|
PRIMARY KEY (source_type, source_user_id, metric, timestamp)
|
|
)
|
|
""")
|
|
|
|
def filter_new(self, readings: list[Reading]) -> list[Reading]:
|
|
"""Return only readings not yet seen."""
|
|
new = []
|
|
for r in readings:
|
|
cur = self.conn.execute(
|
|
"SELECT 1 FROM seen WHERE source_type=? AND source_user_id=? AND metric=? AND timestamp=?",
|
|
(r.source_type, r.source_user_id, r.metric, r.timestamp),
|
|
)
|
|
if not cur.fetchone():
|
|
new.append(r)
|
|
return new
|
|
|
|
def mark_seen(self, readings: list[Reading]):
|
|
"""Mark readings as pushed."""
|
|
self.conn.executemany(
|
|
"INSERT OR IGNORE INTO seen (source_type, source_user_id, metric, timestamp) VALUES (?,?,?,?)",
|
|
[(r.source_type, r.source_user_id, r.metric, r.timestamp) for r in readings],
|
|
)
|
|
self.conn.commit()
|