clawd/skills/uptime-kuma/scripts/kuma.py

277 lines
8.8 KiB
Python

#!/usr/bin/env python3
"""
Uptime Kuma CLI wrapper using uptime-kuma-api library.
Requires: pip install uptime-kuma-api
Environment variables:
UPTIME_KUMA_URL - Uptime Kuma server URL (e.g., http://localhost:3001)
UPTIME_KUMA_USERNAME - Username for authentication
UPTIME_KUMA_PASSWORD - Password for authentication
"""
import argparse
import json
import os
import sys
from typing import Optional
try:
from uptime_kuma_api import UptimeKumaApi, MonitorType
except ImportError:
print("Error: uptime-kuma-api not installed. Run: pip install uptime-kuma-api", file=sys.stderr)
sys.exit(1)
def get_env_or_exit(name: str) -> str:
"""Get environment variable or exit with error."""
value = os.environ.get(name)
if not value:
print(f"Error: {name} environment variable not set", file=sys.stderr)
sys.exit(1)
return value
def get_api() -> UptimeKumaApi:
"""Create and authenticate API connection."""
url = get_env_or_exit("UPTIME_KUMA_URL")
username = get_env_or_exit("UPTIME_KUMA_USERNAME")
password = get_env_or_exit("UPTIME_KUMA_PASSWORD")
api = UptimeKumaApi(url)
api.login(username, password)
return api
def cmd_list_monitors(args):
"""List all monitors."""
with get_api() as api:
monitors = api.get_monitors()
if args.json:
print(json.dumps(monitors, indent=2, default=str))
else:
for m in monitors:
status = "🟢" if m.get("active") else ""
print(f"{status} [{m['id']}] {m['name']} ({m['type']})")
def cmd_get_monitor(args):
"""Get details of a specific monitor."""
with get_api() as api:
monitor = api.get_monitor(args.id)
print(json.dumps(monitor, indent=2, default=str))
def cmd_add_monitor(args):
"""Add a new monitor."""
monitor_types = {
"http": MonitorType.HTTP,
"https": MonitorType.HTTP,
"port": MonitorType.PORT,
"ping": MonitorType.PING,
"keyword": MonitorType.KEYWORD,
"dns": MonitorType.DNS,
"docker": MonitorType.DOCKER,
"push": MonitorType.PUSH,
"steam": MonitorType.STEAM,
"gamedig": MonitorType.GAMEDIG,
"mqtt": MonitorType.MQTT,
"sqlserver": MonitorType.SQLSERVER,
"postgres": MonitorType.POSTGRES,
"mysql": MonitorType.MYSQL,
"mongodb": MonitorType.MONGODB,
"radius": MonitorType.RADIUS,
"redis": MonitorType.REDIS,
"group": MonitorType.GROUP,
}
monitor_type = monitor_types.get(args.type.lower())
if not monitor_type:
print(f"Error: Unknown monitor type '{args.type}'. Valid types: {', '.join(monitor_types.keys())}", file=sys.stderr)
sys.exit(1)
kwargs = {
"type": monitor_type,
"name": args.name,
}
if args.url:
kwargs["url"] = args.url
if args.hostname:
kwargs["hostname"] = args.hostname
if args.port:
kwargs["port"] = args.port
if args.interval:
kwargs["interval"] = args.interval
if args.keyword:
kwargs["keyword"] = args.keyword
with get_api() as api:
result = api.add_monitor(**kwargs)
print(json.dumps(result, indent=2, default=str))
def cmd_delete_monitor(args):
"""Delete a monitor."""
with get_api() as api:
result = api.delete_monitor(args.id)
print(json.dumps(result, indent=2, default=str))
def cmd_pause_monitor(args):
"""Pause a monitor."""
with get_api() as api:
result = api.pause_monitor(args.id)
print(json.dumps(result, indent=2, default=str))
def cmd_resume_monitor(args):
"""Resume a monitor."""
with get_api() as api:
result = api.resume_monitor(args.id)
print(json.dumps(result, indent=2, default=str))
def cmd_status(args):
"""Get overall status summary."""
with get_api() as api:
monitors = api.get_monitors()
total = len(monitors)
active = sum(1 for m in monitors if m.get("active"))
paused = total - active
# Get heartbeats for status
up = 0
down = 0
pending = 0
for m in monitors:
if not m.get("active"):
continue
beats = api.get_monitor_beats(m["id"], 1)
if beats:
status = beats[0].get("status")
if status == 1:
up += 1
elif status == 0:
down += 1
else:
pending += 1
else:
pending += 1
if args.json:
print(json.dumps({
"total": total,
"active": active,
"paused": paused,
"up": up,
"down": down,
"pending": pending
}, indent=2))
else:
print(f"📊 Uptime Kuma Status")
print(f" Total monitors: {total}")
print(f" Active: {active} | Paused: {paused}")
print(f" 🟢 Up: {up} | 🔴 Down: {down} | ⏳ Pending: {pending}")
def cmd_heartbeats(args):
"""Get recent heartbeats for a monitor."""
with get_api() as api:
beats = api.get_monitor_beats(args.id, args.hours)
if args.json:
print(json.dumps(beats, indent=2, default=str))
else:
for b in beats[-10:]: # Show last 10
status = "🟢" if b.get("status") == 1 else "🔴"
time = b.get("time", "?")
ping = b.get("ping", "?")
print(f"{status} {time} - {ping}ms")
def cmd_notifications(args):
"""List notification channels."""
with get_api() as api:
notifications = api.get_notifications()
if args.json:
print(json.dumps(notifications, indent=2, default=str))
else:
for n in notifications:
active = "" if n.get("active") else ""
print(f"[{active}] [{n['id']}] {n['name']} ({n['type']})")
def main():
parser = argparse.ArgumentParser(description="Uptime Kuma CLI")
subparsers = parser.add_subparsers(dest="command", help="Commands")
# list
p_list = subparsers.add_parser("list", help="List all monitors")
p_list.add_argument("--json", action="store_true", help="Output as JSON")
p_list.set_defaults(func=cmd_list_monitors)
# get
p_get = subparsers.add_parser("get", help="Get monitor details")
p_get.add_argument("id", type=int, help="Monitor ID")
p_get.set_defaults(func=cmd_get_monitor)
# add
p_add = subparsers.add_parser("add", help="Add a new monitor")
p_add.add_argument("--name", required=True, help="Monitor name")
p_add.add_argument("--type", required=True, help="Monitor type (http, ping, port, etc.)")
p_add.add_argument("--url", help="URL to monitor (for HTTP)")
p_add.add_argument("--hostname", help="Hostname (for ping/port)")
p_add.add_argument("--port", type=int, help="Port number")
p_add.add_argument("--interval", type=int, default=60, help="Check interval in seconds")
p_add.add_argument("--keyword", help="Keyword to search (for keyword type)")
p_add.set_defaults(func=cmd_add_monitor)
# delete
p_del = subparsers.add_parser("delete", help="Delete a monitor")
p_del.add_argument("id", type=int, help="Monitor ID")
p_del.set_defaults(func=cmd_delete_monitor)
# pause
p_pause = subparsers.add_parser("pause", help="Pause a monitor")
p_pause.add_argument("id", type=int, help="Monitor ID")
p_pause.set_defaults(func=cmd_pause_monitor)
# resume
p_resume = subparsers.add_parser("resume", help="Resume a monitor")
p_resume.add_argument("id", type=int, help="Monitor ID")
p_resume.set_defaults(func=cmd_resume_monitor)
# status
p_status = subparsers.add_parser("status", help="Get overall status")
p_status.add_argument("--json", action="store_true", help="Output as JSON")
p_status.set_defaults(func=cmd_status)
# heartbeats
p_hb = subparsers.add_parser("heartbeats", help="Get heartbeats for a monitor")
p_hb.add_argument("id", type=int, help="Monitor ID")
p_hb.add_argument("--hours", type=int, default=24, help="Hours of history")
p_hb.add_argument("--json", action="store_true", help="Output as JSON")
p_hb.set_defaults(func=cmd_heartbeats)
# notifications
p_notif = subparsers.add_parser("notifications", help="List notification channels")
p_notif.add_argument("--json", action="store_true", help="Output as JSON")
p_notif.set_defaults(func=cmd_notifications)
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
try:
args.func(args)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()