feat: add daemon mode to import-renpho, delete Python health-poller
Add --daemon and --interval flags to import-renpho for continuous polling. Daemon handles SIGTERM/SIGINT gracefully, logs errors per account without exiting. Systemd user service at ~/.config/systemd/user/renpho-poller.service. Remove health-poller/ (Python stub that never wrote data). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c0e7362970
commit
20795e1ea8
|
|
@ -1,6 +0,0 @@
|
|||
config.yaml
|
||||
integrations/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
.venv/
|
||||
dedup.db
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
import yaml
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def load_config(path: str) -> dict:
|
||||
with open(path) as f:
|
||||
return yaml.safe_load(f)
|
||||
|
|
@ -1,39 +0,0 @@
|
|||
import sqlite3
|
||||
from pathlib import Path
|
||||
from poller.sources.base import Reading
|
||||
|
||||
|
||||
class Dedup:
|
||||
"""SQLite-backed deduplication. Tracks which readings have been pushed."""
|
||||
|
||||
def __init__(self, db_path: str = "dedup.db"):
|
||||
self.conn = sqlite3.connect(db_path)
|
||||
self.conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS seen (
|
||||
source_type TEXT,
|
||||
source_user_id TEXT,
|
||||
metric TEXT,
|
||||
timestamp INTEGER,
|
||||
PRIMARY KEY (source_type, source_user_id, metric, timestamp)
|
||||
)
|
||||
""")
|
||||
|
||||
def filter_new(self, readings: list[Reading]) -> list[Reading]:
|
||||
"""Return only readings not yet seen."""
|
||||
new = []
|
||||
for r in readings:
|
||||
cur = self.conn.execute(
|
||||
"SELECT 1 FROM seen WHERE source_type=? AND source_user_id=? AND metric=? AND timestamp=?",
|
||||
(r.source_type, r.source_user_id, r.metric, r.timestamp),
|
||||
)
|
||||
if not cur.fetchone():
|
||||
new.append(r)
|
||||
return new
|
||||
|
||||
def mark_seen(self, readings: list[Reading]):
|
||||
"""Mark readings as pushed."""
|
||||
self.conn.executemany(
|
||||
"INSERT OR IGNORE INTO seen (source_type, source_user_id, metric, timestamp) VALUES (?,?,?,?)",
|
||||
[(r.source_type, r.source_user_id, r.metric, r.timestamp) for r in readings],
|
||||
)
|
||||
self.conn.commit()
|
||||
|
|
@ -1,68 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
health-poller: pull vitals from consumer health devices into Inou.
|
||||
Wraps Home Assistant integrations — never reimplements vendor APIs.
|
||||
|
||||
Usage:
|
||||
python -m poller.main --config config.yaml
|
||||
"""
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
from poller.config import load_config
|
||||
from poller.dedup import Dedup
|
||||
from poller.sink import Sink
|
||||
from poller.sources.renpho import RenphoSource
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
log = logging.getLogger("health-poller")
|
||||
|
||||
SOURCE_CLASSES = {
|
||||
"renpho": RenphoSource,
|
||||
}
|
||||
|
||||
|
||||
def make_source(cfg: dict):
|
||||
cls = SOURCE_CLASSES.get(cfg["type"])
|
||||
if not cls:
|
||||
raise ValueError(f"unknown source type: {cfg['type']}")
|
||||
if cfg["type"] == "renpho":
|
||||
return cls(email=cfg["email"], password=cfg["password"], user_id=cfg.get("user_id"))
|
||||
raise ValueError(f"no constructor for source type: {cfg['type']}")
|
||||
|
||||
|
||||
async def poll_source(src_cfg: dict, dedup: Dedup, sink: Sink):
|
||||
source = make_source(src_cfg)
|
||||
dossier_id = src_cfg.get("dossier_id", "")
|
||||
readings = await source.fetch()
|
||||
new = dedup.filter_new(readings)
|
||||
if new:
|
||||
sink.push(dossier_id, new)
|
||||
dedup.mark_seen(new)
|
||||
log.info(f"{src_cfg['type']}: pushed {len(new)} new readings")
|
||||
else:
|
||||
log.info(f"{src_cfg['type']}: no new readings")
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(description="Inou health data poller")
|
||||
parser.add_argument("--config", default="config.yaml", help="config file path")
|
||||
args = parser.parse_args()
|
||||
|
||||
cfg = load_config(args.config)
|
||||
dedup = Dedup()
|
||||
sink = Sink(cfg["inou"]["api_url"], cfg["inou"].get("api_key", ""))
|
||||
|
||||
for src_cfg in cfg["sources"]:
|
||||
try:
|
||||
await poll_source(src_cfg, dedup, sink)
|
||||
except Exception:
|
||||
log.exception(f"error polling {src_cfg['type']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
|
@ -1,16 +0,0 @@
|
|||
import logging
|
||||
from poller.sources.base import Reading
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Sink:
|
||||
"""Push readings to Inou. Stub until the API endpoint exists."""
|
||||
|
||||
def __init__(self, api_url: str, api_key: str):
|
||||
self.api_url = api_url
|
||||
self.api_key = api_key
|
||||
|
||||
def push(self, dossier_id: str, readings: list[Reading]):
|
||||
for r in readings:
|
||||
log.info(f" WOULD PUSH → dossier={dossier_id} {r.metric}={r.value}{r.unit} @ {r.timestamp}")
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Reading:
|
||||
"""A single normalized vital reading."""
|
||||
source_type: str # "renpho", "garmin", etc.
|
||||
source_user_id: str # user identifier within source
|
||||
metric: str # "weight", "body_fat", "bmi", etc.
|
||||
value: float
|
||||
unit: str # "kg", "%", "bpm", etc.
|
||||
timestamp: int # unix seconds
|
||||
|
||||
|
||||
class Source(ABC):
|
||||
"""Base class for health data source adapters."""
|
||||
|
||||
@abstractmethod
|
||||
async def fetch(self) -> list[Reading]:
|
||||
"""Authenticate if needed, fetch measurements, return normalized readings."""
|
||||
...
|
||||
|
|
@ -1,69 +0,0 @@
|
|||
import importlib.util
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from poller.sources.base import Source, Reading
|
||||
|
||||
# Import api_renpho directly — bypasses their __init__.py which pulls in HA dependencies.
|
||||
# We load const.py first (api_renpho imports from it), then api_renpho itself.
|
||||
_renpho = Path(__file__).resolve().parents[2] / "integrations" / "hass_renpho" / "custom_components" / "renpho"
|
||||
|
||||
def _load_module(name, path):
|
||||
spec = importlib.util.spec_from_file_location(name, path)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
import sys
|
||||
sys.modules[name] = mod
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
_load_module("renpho.const", _renpho / "const.py")
|
||||
_load_module("renpho.api_object", _renpho / "api_object.py")
|
||||
_api = _load_module("renpho.api_renpho", _renpho / "api_renpho.py")
|
||||
RenphoWeight = _api.RenphoWeight
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# Metrics to extract from MeasurementDetail and their units.
|
||||
# key = field name on MeasurementDetail, value = (metric_name, unit)
|
||||
METRICS = {
|
||||
"weight": ("weight", "kg"),
|
||||
"bmi": ("bmi", ""),
|
||||
"bodyfat": ("body_fat", "%"),
|
||||
"water": ("body_water", "%"),
|
||||
"muscle": ("muscle_mass", "kg"),
|
||||
"bone": ("bone_mass", "kg"),
|
||||
"subfat": ("subcutaneous_fat", "%"),
|
||||
"visfat": ("visceral_fat", ""),
|
||||
"bmr": ("bmr", "kcal"),
|
||||
"protein": ("protein", "%"),
|
||||
"bodyage": ("body_age", "years"),
|
||||
"heart_rate": ("heart_rate", "bpm"),
|
||||
"fat_free_weight": ("fat_free_weight", "kg"),
|
||||
}
|
||||
|
||||
|
||||
class RenphoSource(Source):
|
||||
def __init__(self, email: str, password: str, user_id: str | None = None):
|
||||
self.client = RenphoWeight(email=email, password=password, user_id=user_id)
|
||||
|
||||
async def fetch(self) -> list[Reading]:
|
||||
await self.client.auth()
|
||||
await self.client.get_scale_users()
|
||||
await self.client.get_measurements()
|
||||
|
||||
readings = []
|
||||
for m in self.client.weight_history:
|
||||
ts = m.time_stamp
|
||||
uid = str(m.b_user_id)
|
||||
for field, (metric, unit) in METRICS.items():
|
||||
val = getattr(m, field, None)
|
||||
if val is not None and val != 0:
|
||||
readings.append(Reading(
|
||||
source_type="renpho",
|
||||
source_user_id=uid,
|
||||
metric=metric,
|
||||
value=float(val),
|
||||
unit=unit,
|
||||
timestamp=ts,
|
||||
))
|
||||
log.info(f"renpho: fetched {len(self.client.weight_history)} measurements, {len(readings)} readings")
|
||||
return readings
|
||||
|
|
@ -1,5 +0,0 @@
|
|||
aiohttp
|
||||
aiohttp_socks
|
||||
pycryptodome
|
||||
pydantic
|
||||
pyyaml
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
#!/bin/bash
|
||||
# Clone or update HA integrations used as libraries
|
||||
INTDIR="$(dirname "$0")/integrations"
|
||||
|
||||
clone_or_pull() {
|
||||
local repo=$1 dir=$2
|
||||
if [ -d "$INTDIR/$dir" ]; then
|
||||
git -C "$INTDIR/$dir" pull --ff-only
|
||||
else
|
||||
git clone "$repo" "$INTDIR/$dir"
|
||||
fi
|
||||
}
|
||||
|
||||
clone_or_pull https://github.com/antoinebou12/hass_renpho hass_renpho
|
||||
|
|
@ -9,9 +9,12 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"inou/lib"
|
||||
|
|
@ -84,6 +87,8 @@ func main() {
|
|||
discover := flag.Bool("discover", false, "Login and show Renpho user IDs for mapping")
|
||||
fileImport := flag.String("file", "", "Import from JSON file instead of API (format: measurements array)")
|
||||
dossierID := flag.String("dossier", "", "Target dossier ID (required with -file)")
|
||||
daemon := flag.Bool("daemon", false, "Run continuously, importing on an interval")
|
||||
interval := flag.Int("interval", 3600, "Seconds between imports in daemon mode")
|
||||
flag.Parse()
|
||||
|
||||
if err := lib.Init(); err != nil {
|
||||
|
|
@ -108,13 +113,40 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
if *daemon {
|
||||
runDaemon(*interval)
|
||||
return
|
||||
}
|
||||
|
||||
runImport()
|
||||
}
|
||||
|
||||
func runDaemon(interval int) {
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
|
||||
|
||||
for {
|
||||
log.Printf("running import...")
|
||||
runImport()
|
||||
|
||||
select {
|
||||
case s := <-sig:
|
||||
log.Printf("received %v, exiting", s)
|
||||
return
|
||||
case <-time.After(time.Duration(interval) * time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func runImport() {
|
||||
renphoID, cfg, err := loadConfig()
|
||||
if err != nil {
|
||||
fatal("load config: %v", err)
|
||||
log.Printf("ERROR load config: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(cfg.Accounts) == 0 {
|
||||
fmt.Println("No Renpho accounts configured. Run with -setup first.")
|
||||
log.Printf("no Renpho accounts configured")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -123,7 +155,7 @@ func main() {
|
|||
for _, acct := range cfg.Accounts {
|
||||
fmt.Printf("=== %s ===\n", acct.Email)
|
||||
if err := syncAccount(renphoID, &acct, importID); err != nil {
|
||||
fmt.Printf(" ERROR: %v\n", err)
|
||||
log.Printf(" ERROR syncing %s: %v", acct.Email, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue