diff --git a/config.yaml b/config.yaml index d8eb893..eae9923 100644 --- a/config.yaml +++ b/config.yaml @@ -49,6 +49,16 @@ webhook: url: http://localhost:18789/hooks/messages token: "kuma-alert-token-2026" +m365: + enabled: true + poll_interval: 60 + tenant_id: "a1cd3436-6062-4169-a1bd-79efdcfd8a5e" + client_id: "1fec8e78-bce4-4aaf-ab1b-5451cc387264" + token_file: "~/.message-center/m365-token.json" + user_email: "johan.jongsma@kaseya.com" + webhook_url: "http://localhost:18789/hooks/m365" + webhook_token: "kuma-alert-token-2026" + triage: enabled: true prompt_file: /home/johan/clawd/config/email-triage-prompt.md diff --git a/connector_m365.go b/connector_m365.go new file mode 100644 index 0000000..7fcc46a --- /dev/null +++ b/connector_m365.go @@ -0,0 +1,679 @@ +package main + +import ( + "bytes" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "os" + "path/filepath" + "regexp" + "strings" + "sync" + "time" +) + +// M365Config holds M365 connector configuration +type M365Config struct { + Enabled bool `yaml:"enabled"` + PollInterval int `yaml:"poll_interval"` + TenantID string `yaml:"tenant_id"` + ClientID string `yaml:"client_id"` + TokenFile string `yaml:"token_file"` + UserEmail string `yaml:"user_email"` + WebhookURL string `yaml:"webhook_url"` + WebhookToken string `yaml:"webhook_token"` +} + +// M365Connector polls M365 email, Teams chat, and calendar +type M365Connector struct { + cfg M365Config + webhookFn func() + stop chan struct{} + mu sync.Mutex + + // Token cache + graphToken string + graphExpiry time.Time + skypeToken string + skypeExpiry time.Time + teamsToken string + teamsExpiry time.Time + refreshToken string + + // Auth expired state + authExpired bool + tokenFileMtime time.Time + + // State + stateFile string + state m365State +} + +type m365State struct { + LastEmailIDs []string `json:"last_email_ids"` + LastTeamsMsgs map[string]string `json:"last_teams_messages"` + LastCalendarHash string `json:"last_calendar_hash"` +} + +type m365TokenFile struct { + RefreshToken string `json:"refresh_token"` +} + +// NewM365Connector creates a new M365 connector +func NewM365Connector(cfg M365Config, webhookFn func()) *M365Connector { + tokenFile := cfg.TokenFile + if strings.HasPrefix(tokenFile, "~/") { + home, _ := os.UserHomeDir() + tokenFile = filepath.Join(home, tokenFile[2:]) + } + cfg.TokenFile = tokenFile + + home, _ := os.UserHomeDir() + stateFile := filepath.Join(home, ".message-center", "m365-state.json") + + c := &M365Connector{ + cfg: cfg, + webhookFn: webhookFn, + stop: make(chan struct{}), + stateFile: stateFile, + } + c.loadState() + return c +} + +func (c *M365Connector) loadState() { + c.state.LastTeamsMsgs = make(map[string]string) + data, err := os.ReadFile(c.stateFile) + if err != nil { + return + } + json.Unmarshal(data, &c.state) + if c.state.LastTeamsMsgs == nil { + c.state.LastTeamsMsgs = make(map[string]string) + } +} + +func (c *M365Connector) saveState() { + data, _ := json.MarshalIndent(c.state, "", " ") + os.WriteFile(c.stateFile, data, 0644) +} + +func (c *M365Connector) loadRefreshToken() error { + data, err := os.ReadFile(c.cfg.TokenFile) + if err != nil { + return fmt.Errorf("read token file: %w", err) + } + var tf m365TokenFile + if err := json.Unmarshal(data, &tf); err != nil { + return fmt.Errorf("parse token file: %w", err) + } + if tf.RefreshToken == "" { + return fmt.Errorf("no refresh_token in token file") + } + c.refreshToken = tf.RefreshToken + return nil +} + +func (c *M365Connector) saveRefreshToken() { + data, _ := json.Marshal(m365TokenFile{RefreshToken: c.refreshToken}) + os.WriteFile(c.cfg.TokenFile, data, 0644) +} + +// exchangeToken gets an access token for the given scope +func (c *M365Connector) exchangeToken(scope string) (accessToken string, expiresIn int, newRefresh string, err error) { + if c.refreshToken == "" { + if err := c.loadRefreshToken(); err != nil { + return "", 0, "", err + } + } + + tokenURL := fmt.Sprintf("https://login.microsoftonline.com/%s/oauth2/v2.0/token", c.cfg.TenantID) + form := url.Values{ + "client_id": {c.cfg.ClientID}, + "grant_type": {"refresh_token"}, + "refresh_token": {c.refreshToken}, + "scope": {scope}, + } + + resp, err := http.PostForm(tokenURL, form) + if err != nil { + return "", 0, "", fmt.Errorf("token request: %w", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != 200 { + // Check for expired/invalid refresh token + if resp.StatusCode == 401 || strings.Contains(string(body), "invalid_grant") { + if !c.authExpired { + c.authExpired = true + log.Printf("[m365] Auth expired — refresh token invalid. Waiting for new token file.") + c.sendM365Webhook("auth_expired", "M365 refresh token expired. Re-authentication required.") + // Record current mtime so we can detect external updates + if fi, err := os.Stat(c.cfg.TokenFile); err == nil { + c.tokenFileMtime = fi.ModTime() + } + } + return "", 0, "", fmt.Errorf("auth expired") + } + return "", 0, "", fmt.Errorf("token error %d: %s", resp.StatusCode, string(body)) + } + + var result struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + ExpiresIn int `json:"expires_in"` + } + if err := json.Unmarshal(body, &result); err != nil { + return "", 0, "", fmt.Errorf("parse token response: %w", err) + } + + if result.RefreshToken != "" { + c.refreshToken = result.RefreshToken + c.saveRefreshToken() + } + + return result.AccessToken, result.ExpiresIn, result.RefreshToken, nil +} + +func (c *M365Connector) getGraphToken() (string, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.graphToken != "" && time.Now().Before(c.graphExpiry.Add(-2*time.Minute)) { + return c.graphToken, nil + } + + token, expiresIn, _, err := c.exchangeToken("https://graph.microsoft.com/.default offline_access") + if err != nil { + return "", err + } + c.graphToken = token + c.graphExpiry = time.Now().Add(time.Duration(expiresIn) * time.Second) + return token, nil +} + +func (c *M365Connector) getTeamsToken() (string, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.teamsToken != "" && time.Now().Before(c.teamsExpiry.Add(-2*time.Minute)) { + return c.teamsToken, nil + } + + // First get Skype API token + skypeToken, expiresIn, _, err := c.exchangeToken("https://api.spaces.skype.com/.default offline_access") + if err != nil { + return "", fmt.Errorf("skype token: %w", err) + } + c.skypeToken = skypeToken + c.skypeExpiry = time.Now().Add(time.Duration(expiresIn) * time.Second) + + // Exchange for Teams skypetoken via authsvc + req, _ := http.NewRequest("POST", "https://authsvc.teams.microsoft.com/v1.0/authz", bytes.NewReader([]byte("{}"))) + req.Header.Set("Authorization", "Bearer "+skypeToken) + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", fmt.Errorf("authsvc request: %w", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != 200 { + return "", fmt.Errorf("authsvc error %d: %s", resp.StatusCode, string(body)) + } + + var authResp struct { + Tokens struct { + SkypeToken string `json:"skypeToken"` + ExpiresIn int64 `json:"expiresIn"` + } `json:"tokens"` + } + if err := json.Unmarshal(body, &authResp); err != nil { + return "", fmt.Errorf("parse authsvc response: %w", err) + } + + c.teamsToken = authResp.Tokens.SkypeToken + if authResp.Tokens.ExpiresIn > 0 { + c.teamsExpiry = time.Now().Add(time.Duration(authResp.Tokens.ExpiresIn) * time.Second) + } else { + c.teamsExpiry = time.Now().Add(50 * time.Minute) + } + + return c.teamsToken, nil +} + +// Start begins the polling loop +func (c *M365Connector) Start() { + interval := time.Duration(c.cfg.PollInterval) * time.Second + if interval < 30*time.Second { + interval = 60 * time.Second + } + + // Record initial token file mtime + if fi, err := os.Stat(c.cfg.TokenFile); err == nil { + c.tokenFileMtime = fi.ModTime() + } + + // Log token expiry by doing a test exchange and reading the access token expiry + go func() { + token, expiresIn, _, err := c.exchangeToken("https://graph.microsoft.com/.default offline_access") + if err != nil { + log.Printf("[m365] Initial token exchange failed: %v", err) + return + } + c.mu.Lock() + c.graphToken = token + c.graphExpiry = time.Now().Add(time.Duration(expiresIn) * time.Second) + log.Printf("[m365] Graph access token valid until %s (%s from now)", c.graphExpiry.Format("2006-01-02 15:04:05"), time.Duration(expiresIn)*time.Second) + c.mu.Unlock() + }() + + go func() { + // Initial poll after short delay + time.Sleep(5 * time.Second) + c.poll() + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-c.stop: + return + case <-ticker.C: + c.poll() + } + } + }() +} + +func (c *M365Connector) Stop() { + close(c.stop) +} + +func (c *M365Connector) poll() { + // If auth is expired, check if token file was updated externally + if c.authExpired { + fi, err := os.Stat(c.cfg.TokenFile) + if err != nil { + return + } + if !fi.ModTime().After(c.tokenFileMtime) { + return // Token file hasn't changed, skip this cycle + } + // Token file updated — try to recover + log.Printf("[m365] Token file updated, attempting re-auth...") + c.mu.Lock() + c.refreshToken = "" // Force reload from file + c.graphToken = "" + c.skypeToken = "" + c.teamsToken = "" + c.authExpired = false + c.mu.Unlock() + } + + fired := false + + if c.pollEmail() { + fired = true + } + if c.pollTeams() { + fired = true + } + if c.pollCalendar() { + fired = true + } + + if fired && c.webhookFn != nil { + c.webhookFn() + } +} + +func (c *M365Connector) sendM365Webhook(typ, summary string) { + webhookURL := c.cfg.WebhookURL + if webhookURL == "" { + webhookURL = config.Webhook.URL + } + if webhookURL == "" { + return + } + + payload := map[string]string{ + "event": "m365", + "type": typ, + "summary": summary, + } + data, _ := json.Marshal(payload) + + req, err := http.NewRequest("POST", webhookURL, bytes.NewReader(data)) + if err != nil { + return + } + req.Header.Set("Content-Type", "application/json") + token := c.cfg.WebhookToken + if token == "" { + token = config.Webhook.Token + } + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Printf("[m365] Webhook error: %v", err) + return + } + resp.Body.Close() +} + +// --- Email poller --- + +func (c *M365Connector) pollEmail() bool { + token, err := c.getGraphToken() + if err != nil { + log.Printf("[m365] Graph token error: %v", err) + return false + } + + u := "https://graph.microsoft.com/v1.0/me/messages?$top=20&$filter=isRead eq false&$select=subject,from,receivedDateTime,isRead,conversationId&$orderby=receivedDateTime desc" + req, _ := http.NewRequest("GET", u, nil) + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Printf("[m365] Email fetch error: %v", err) + return false + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + log.Printf("[m365] Email fetch error %d: %s", resp.StatusCode, string(body)) + return false + } + + var result struct { + Value []struct { + ID string `json:"id"` + Subject string `json:"subject"` + ReceivedDateTime string `json:"receivedDateTime"` + From struct { + EmailAddress struct { + Name string `json:"name"` + Address string `json:"address"` + } `json:"emailAddress"` + } `json:"from"` + } `json:"value"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + log.Printf("[m365] Email decode error: %v", err) + return false + } + + // Build seen set + seenSet := make(map[string]bool) + for _, id := range c.state.LastEmailIDs { + seenSet[id] = true + } + + var newEmails []string + var summaries []string + for _, msg := range result.Value { + if !seenSet[msg.ID] { + newEmails = append(newEmails, msg.ID) + from := msg.From.EmailAddress.Name + if from == "" { + from = msg.From.EmailAddress.Address + } + summaries = append(summaries, fmt.Sprintf("%s: %s", from, msg.Subject)) + } + } + + // Update state with current unread IDs + newIDs := make([]string, 0, len(result.Value)) + for _, msg := range result.Value { + newIDs = append(newIDs, msg.ID) + } + c.state.LastEmailIDs = newIDs + c.saveState() + + if len(newEmails) > 0 { + summary := fmt.Sprintf("%d new unread email(s): %s", len(newEmails), strings.Join(summaries, "; ")) + log.Printf("[m365] %s", summary) + c.sendM365Webhook("email", summary) + return true + } + + return false +} + +// --- Teams chat poller --- + +var htmlTagRe = regexp.MustCompile(`<[^>]*>`) + +func stripHTMLTags(s string) string { + return strings.TrimSpace(htmlTagRe.ReplaceAllString(s, "")) +} + +func (c *M365Connector) pollTeams() bool { + token, err := c.getTeamsToken() + if err != nil { + log.Printf("[m365] Teams token error: %v", err) + return false + } + + // Fetch conversations + req, _ := http.NewRequest("GET", "https://amer.ng.msg.teams.microsoft.com/v1/users/ME/conversations?view=msnp24Equivalent&pageSize=20", nil) + req.Header.Set("Authentication", "skypetoken="+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Printf("[m365] Teams conversations error: %v", err) + return false + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + log.Printf("[m365] Teams conversations error %d: %s", resp.StatusCode, string(body)) + return false + } + + var convResult struct { + Conversations []struct { + ID string `json:"id"` + } `json:"conversations"` + } + if err := json.NewDecoder(resp.Body).Decode(&convResult); err != nil { + log.Printf("[m365] Teams conversations decode error: %v", err) + return false + } + + hasNew := false + var summaries []string + + for _, conv := range convResult.Conversations { + if strings.HasPrefix(conv.ID, "48:") { + continue + } + + msgs := c.fetchTeamsMessages(token, conv.ID) + if msgs == nil { + continue + } + + lastSeen := c.state.LastTeamsMsgs[conv.ID] + foundNew := false + + for _, msg := range msgs { + if msg.ID == lastSeen { + break + } + // Skip if sender is us + sender := msg.From + if strings.Contains(strings.ToLower(sender), strings.ToLower(c.cfg.UserEmail)) { + continue + } + if strings.EqualFold(sender, "Johan Jongsma") { + continue + } + content := stripHTMLTags(msg.Content) + if content == "" { + continue + } + summaries = append(summaries, fmt.Sprintf("%s: %s", msg.DisplayName, truncateStr(content, 80))) + foundNew = true + } + + if len(msgs) > 0 { + c.state.LastTeamsMsgs[conv.ID] = msgs[0].ID + } + + if foundNew { + hasNew = true + } + } + + c.saveState() + + if hasNew { + summary := fmt.Sprintf("%d new Teams message(s): %s", len(summaries), strings.Join(summaries, "; ")) + log.Printf("[m365] %s", summary) + c.sendM365Webhook("teams", summary) + return true + } + + return false +} + +type teamsMsg struct { + ID string + From string + DisplayName string + Content string +} + +func (c *M365Connector) fetchTeamsMessages(token, convID string) []teamsMsg { + u := fmt.Sprintf("https://amer.ng.msg.teams.microsoft.com/v1/users/ME/conversations/%s/messages?pageSize=5&view=msnp24Equivalent", url.PathEscape(convID)) + req, _ := http.NewRequest("GET", u, nil) + req.Header.Set("Authentication", "skypetoken="+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil + } + + var result struct { + Messages []struct { + ID string `json:"id"` + From string `json:"from"` + IMDisplayName string `json:"imdisplayname"` + Content string `json:"content"` + ComposeTime string `json:"composetime"` + MessageType string `json:"messagetype"` + } `json:"messages"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil + } + + var msgs []teamsMsg + for _, m := range result.Messages { + if m.MessageType != "RichText/Html" && m.MessageType != "Text" && m.MessageType != "" { + continue + } + msgs = append(msgs, teamsMsg{ + ID: m.ID, + From: m.From, + DisplayName: m.IMDisplayName, + Content: m.Content, + }) + } + + return msgs +} + +// --- Calendar poller --- + +func (c *M365Connector) pollCalendar() bool { + token, err := c.getGraphToken() + if err != nil { + log.Printf("[m365] Graph token error: %v", err) + return false + } + + now := time.Now().UTC() + end := now.Add(48 * time.Hour) + + u := fmt.Sprintf("https://graph.microsoft.com/v1.0/me/calendarView?$top=50&$select=subject,start,end,organizer,isCancelled&startDateTime=%s&endDateTime=%s&$orderby=start/dateTime", + url.QueryEscape(now.Format(time.RFC3339)), + url.QueryEscape(end.Format(time.RFC3339))) + + req, _ := http.NewRequest("GET", u, nil) + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Printf("[m365] Calendar fetch error: %v", err) + return false + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != 200 { + log.Printf("[m365] Calendar fetch error %d: %s", resp.StatusCode, string(body)) + return false + } + + hash := fmt.Sprintf("%x", sha256.Sum256(body)) + + if hash == c.state.LastCalendarHash { + return false + } + + c.state.LastCalendarHash = hash + c.saveState() + + // Parse for summary + var result struct { + Value []struct { + Subject string `json:"subject"` + IsCancelled bool `json:"isCancelled"` + Start struct { + DateTime string `json:"dateTime"` + } `json:"start"` + } `json:"value"` + } + json.Unmarshal(body, &result) + + var subjects []string + for _, evt := range result.Value { + if !evt.IsCancelled { + subjects = append(subjects, evt.Subject) + } + } + + summary := fmt.Sprintf("Calendar updated (%d events in next 48h)", len(subjects)) + if len(subjects) > 0 && len(subjects) <= 5 { + summary += ": " + strings.Join(subjects, ", ") + } + + log.Printf("[m365] %s", summary) + c.sendM365Webhook("calendar", summary) + return true +} diff --git a/main.go b/main.go index b0c7d4e..a2d7d9b 100644 --- a/main.go +++ b/main.go @@ -26,6 +26,7 @@ type Config struct { Connectors ConnectorsConfig `yaml:"connectors"` Webhook WebhookConfig `yaml:"webhook"` Triage TriageConfig `yaml:"triage"` + M365 M365Config `yaml:"m365"` } type ServerConfig struct { @@ -205,6 +206,17 @@ func main() { log.Fatalf("Failed to start connectors: %v", err) } + // Start M365 connector if enabled + log.Printf("M365 config: enabled=%v tenant=%s", config.M365.Enabled, config.M365.TenantID) + var m365Connector *M365Connector + if config.M365.Enabled { + m365Connector = NewM365Connector(config.M365, func() { + // M365 connector sends its own webhooks; this is a no-op callback + }) + m365Connector.Start() + log.Printf("M365 connector started (poll interval: %ds)", config.M365.PollInterval) + } + // HTTP server mux := http.NewServeMux() @@ -228,6 +240,9 @@ func main() { signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh log.Println("Shutting down...") + if m365Connector != nil { + m365Connector.Stop() + } store.StopAll() server.Shutdown(nil) }()