inou/health-poller/poller/sources/renpho.py

70 lines
2.6 KiB
Python

import importlib.util
import logging
from pathlib import Path
from poller.sources.base import Source, Reading
# Import api_renpho directly — bypasses their __init__.py which pulls in HA dependencies.
# We load const.py first (api_renpho imports from it), then api_renpho itself.
_renpho = Path(__file__).resolve().parents[2] / "integrations" / "hass_renpho" / "custom_components" / "renpho"
def _load_module(name, path):
spec = importlib.util.spec_from_file_location(name, path)
mod = importlib.util.module_from_spec(spec)
import sys
sys.modules[name] = mod
spec.loader.exec_module(mod)
return mod
_load_module("renpho.const", _renpho / "const.py")
_load_module("renpho.api_object", _renpho / "api_object.py")
_api = _load_module("renpho.api_renpho", _renpho / "api_renpho.py")
RenphoWeight = _api.RenphoWeight
log = logging.getLogger(__name__)
# Metrics to extract from MeasurementDetail and their units.
# key = field name on MeasurementDetail, value = (metric_name, unit)
METRICS = {
"weight": ("weight", "kg"),
"bmi": ("bmi", ""),
"bodyfat": ("body_fat", "%"),
"water": ("body_water", "%"),
"muscle": ("muscle_mass", "kg"),
"bone": ("bone_mass", "kg"),
"subfat": ("subcutaneous_fat", "%"),
"visfat": ("visceral_fat", ""),
"bmr": ("bmr", "kcal"),
"protein": ("protein", "%"),
"bodyage": ("body_age", "years"),
"heart_rate": ("heart_rate", "bpm"),
"fat_free_weight": ("fat_free_weight", "kg"),
}
class RenphoSource(Source):
def __init__(self, email: str, password: str, user_id: str | None = None):
self.client = RenphoWeight(email=email, password=password, user_id=user_id)
async def fetch(self) -> list[Reading]:
await self.client.auth()
await self.client.get_scale_users()
await self.client.get_measurements()
readings = []
for m in self.client.weight_history:
ts = m.time_stamp
uid = str(m.b_user_id)
for field, (metric, unit) in METRICS.items():
val = getattr(m, field, None)
if val is not None and val != 0:
readings.append(Reading(
source_type="renpho",
source_user_id=uid,
metric=metric,
value=float(val),
unit=unit,
timestamp=ts,
))
log.info(f"renpho: fetched {len(self.client.weight_history)} measurements, {len(readings)} readings")
return readings