message-center/connector_m365.go

680 lines
17 KiB
Go

package main
import (
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
)
// M365Config holds M365 connector configuration
type M365Config struct {
Enabled bool `yaml:"enabled"`
PollInterval int `yaml:"poll_interval"`
TenantID string `yaml:"tenant_id"`
ClientID string `yaml:"client_id"`
TokenFile string `yaml:"token_file"`
UserEmail string `yaml:"user_email"`
WebhookURL string `yaml:"webhook_url"`
WebhookToken string `yaml:"webhook_token"`
}
// M365Connector polls M365 email, Teams chat, and calendar
type M365Connector struct {
cfg M365Config
webhookFn func()
stop chan struct{}
mu sync.Mutex
// Token cache
graphToken string
graphExpiry time.Time
skypeToken string
skypeExpiry time.Time
teamsToken string
teamsExpiry time.Time
refreshToken string
// Auth expired state
authExpired bool
tokenFileMtime time.Time
// State
stateFile string
state m365State
}
type m365State struct {
LastEmailIDs []string `json:"last_email_ids"`
LastTeamsMsgs map[string]string `json:"last_teams_messages"`
LastCalendarHash string `json:"last_calendar_hash"`
}
type m365TokenFile struct {
RefreshToken string `json:"refresh_token"`
}
// NewM365Connector creates a new M365 connector
func NewM365Connector(cfg M365Config, webhookFn func()) *M365Connector {
tokenFile := cfg.TokenFile
if strings.HasPrefix(tokenFile, "~/") {
home, _ := os.UserHomeDir()
tokenFile = filepath.Join(home, tokenFile[2:])
}
cfg.TokenFile = tokenFile
home, _ := os.UserHomeDir()
stateFile := filepath.Join(home, ".message-center", "m365-state.json")
c := &M365Connector{
cfg: cfg,
webhookFn: webhookFn,
stop: make(chan struct{}),
stateFile: stateFile,
}
c.loadState()
return c
}
func (c *M365Connector) loadState() {
c.state.LastTeamsMsgs = make(map[string]string)
data, err := os.ReadFile(c.stateFile)
if err != nil {
return
}
json.Unmarshal(data, &c.state)
if c.state.LastTeamsMsgs == nil {
c.state.LastTeamsMsgs = make(map[string]string)
}
}
func (c *M365Connector) saveState() {
data, _ := json.MarshalIndent(c.state, "", " ")
os.WriteFile(c.stateFile, data, 0644)
}
func (c *M365Connector) loadRefreshToken() error {
data, err := os.ReadFile(c.cfg.TokenFile)
if err != nil {
return fmt.Errorf("read token file: %w", err)
}
var tf m365TokenFile
if err := json.Unmarshal(data, &tf); err != nil {
return fmt.Errorf("parse token file: %w", err)
}
if tf.RefreshToken == "" {
return fmt.Errorf("no refresh_token in token file")
}
c.refreshToken = tf.RefreshToken
return nil
}
func (c *M365Connector) saveRefreshToken() {
data, _ := json.Marshal(m365TokenFile{RefreshToken: c.refreshToken})
os.WriteFile(c.cfg.TokenFile, data, 0644)
}
// exchangeToken gets an access token for the given scope
func (c *M365Connector) exchangeToken(scope string) (accessToken string, expiresIn int, newRefresh string, err error) {
if c.refreshToken == "" {
if err := c.loadRefreshToken(); err != nil {
return "", 0, "", err
}
}
tokenURL := fmt.Sprintf("https://login.microsoftonline.com/%s/oauth2/v2.0/token", c.cfg.TenantID)
form := url.Values{
"client_id": {c.cfg.ClientID},
"grant_type": {"refresh_token"},
"refresh_token": {c.refreshToken},
"scope": {scope},
}
resp, err := http.PostForm(tokenURL, form)
if err != nil {
return "", 0, "", fmt.Errorf("token request: %w", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
// Check for expired/invalid refresh token
if resp.StatusCode == 401 || strings.Contains(string(body), "invalid_grant") {
if !c.authExpired {
c.authExpired = true
log.Printf("[m365] Auth expired — refresh token invalid. Waiting for new token file.")
c.sendM365Webhook("auth_expired", "M365 refresh token expired. Re-authentication required.")
// Record current mtime so we can detect external updates
if fi, err := os.Stat(c.cfg.TokenFile); err == nil {
c.tokenFileMtime = fi.ModTime()
}
}
return "", 0, "", fmt.Errorf("auth expired")
}
return "", 0, "", fmt.Errorf("token error %d: %s", resp.StatusCode, string(body))
}
var result struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn int `json:"expires_in"`
}
if err := json.Unmarshal(body, &result); err != nil {
return "", 0, "", fmt.Errorf("parse token response: %w", err)
}
if result.RefreshToken != "" {
c.refreshToken = result.RefreshToken
c.saveRefreshToken()
}
return result.AccessToken, result.ExpiresIn, result.RefreshToken, nil
}
func (c *M365Connector) getGraphToken() (string, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.graphToken != "" && time.Now().Before(c.graphExpiry.Add(-2*time.Minute)) {
return c.graphToken, nil
}
token, expiresIn, _, err := c.exchangeToken("https://graph.microsoft.com/.default offline_access")
if err != nil {
return "", err
}
c.graphToken = token
c.graphExpiry = time.Now().Add(time.Duration(expiresIn) * time.Second)
return token, nil
}
func (c *M365Connector) getTeamsToken() (string, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.teamsToken != "" && time.Now().Before(c.teamsExpiry.Add(-2*time.Minute)) {
return c.teamsToken, nil
}
// First get Skype API token
skypeToken, expiresIn, _, err := c.exchangeToken("https://api.spaces.skype.com/.default offline_access")
if err != nil {
return "", fmt.Errorf("skype token: %w", err)
}
c.skypeToken = skypeToken
c.skypeExpiry = time.Now().Add(time.Duration(expiresIn) * time.Second)
// Exchange for Teams skypetoken via authsvc
req, _ := http.NewRequest("POST", "https://authsvc.teams.microsoft.com/v1.0/authz", bytes.NewReader([]byte("{}")))
req.Header.Set("Authorization", "Bearer "+skypeToken)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("authsvc request: %w", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
return "", fmt.Errorf("authsvc error %d: %s", resp.StatusCode, string(body))
}
var authResp struct {
Tokens struct {
SkypeToken string `json:"skypeToken"`
ExpiresIn int64 `json:"expiresIn"`
} `json:"tokens"`
}
if err := json.Unmarshal(body, &authResp); err != nil {
return "", fmt.Errorf("parse authsvc response: %w", err)
}
c.teamsToken = authResp.Tokens.SkypeToken
if authResp.Tokens.ExpiresIn > 0 {
c.teamsExpiry = time.Now().Add(time.Duration(authResp.Tokens.ExpiresIn) * time.Second)
} else {
c.teamsExpiry = time.Now().Add(50 * time.Minute)
}
return c.teamsToken, nil
}
// Start begins the polling loop
func (c *M365Connector) Start() {
interval := time.Duration(c.cfg.PollInterval) * time.Second
if interval < 30*time.Second {
interval = 60 * time.Second
}
// Record initial token file mtime
if fi, err := os.Stat(c.cfg.TokenFile); err == nil {
c.tokenFileMtime = fi.ModTime()
}
// Log token expiry by doing a test exchange and reading the access token expiry
go func() {
token, expiresIn, _, err := c.exchangeToken("https://graph.microsoft.com/.default offline_access")
if err != nil {
log.Printf("[m365] Initial token exchange failed: %v", err)
return
}
c.mu.Lock()
c.graphToken = token
c.graphExpiry = time.Now().Add(time.Duration(expiresIn) * time.Second)
log.Printf("[m365] Graph access token valid until %s (%s from now)", c.graphExpiry.Format("2006-01-02 15:04:05"), time.Duration(expiresIn)*time.Second)
c.mu.Unlock()
}()
go func() {
// Initial poll after short delay
time.Sleep(5 * time.Second)
c.poll()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-c.stop:
return
case <-ticker.C:
c.poll()
}
}
}()
}
func (c *M365Connector) Stop() {
close(c.stop)
}
func (c *M365Connector) poll() {
// If auth is expired, check if token file was updated externally
if c.authExpired {
fi, err := os.Stat(c.cfg.TokenFile)
if err != nil {
return
}
if !fi.ModTime().After(c.tokenFileMtime) {
return // Token file hasn't changed, skip this cycle
}
// Token file updated — try to recover
log.Printf("[m365] Token file updated, attempting re-auth...")
c.mu.Lock()
c.refreshToken = "" // Force reload from file
c.graphToken = ""
c.skypeToken = ""
c.teamsToken = ""
c.authExpired = false
c.mu.Unlock()
}
fired := false
if c.pollEmail() {
fired = true
}
if c.pollTeams() {
fired = true
}
if c.pollCalendar() {
fired = true
}
if fired && c.webhookFn != nil {
c.webhookFn()
}
}
func (c *M365Connector) sendM365Webhook(typ, summary string) {
webhookURL := c.cfg.WebhookURL
if webhookURL == "" {
webhookURL = config.Webhook.URL
}
if webhookURL == "" {
return
}
payload := map[string]string{
"event": "m365",
"type": typ,
"summary": summary,
}
data, _ := json.Marshal(payload)
req, err := http.NewRequest("POST", webhookURL, bytes.NewReader(data))
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")
token := c.cfg.WebhookToken
if token == "" {
token = config.Webhook.Token
}
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[m365] Webhook error: %v", err)
return
}
resp.Body.Close()
}
// --- Email poller ---
func (c *M365Connector) pollEmail() bool {
token, err := c.getGraphToken()
if err != nil {
log.Printf("[m365] Graph token error: %v", err)
return false
}
u := "https://graph.microsoft.com/v1.0/me/messages?$top=20&$filter=isRead eq false&$select=subject,from,receivedDateTime,isRead,conversationId&$orderby=receivedDateTime desc"
req, _ := http.NewRequest("GET", u, nil)
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[m365] Email fetch error: %v", err)
return false
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
log.Printf("[m365] Email fetch error %d: %s", resp.StatusCode, string(body))
return false
}
var result struct {
Value []struct {
ID string `json:"id"`
Subject string `json:"subject"`
ReceivedDateTime string `json:"receivedDateTime"`
From struct {
EmailAddress struct {
Name string `json:"name"`
Address string `json:"address"`
} `json:"emailAddress"`
} `json:"from"`
} `json:"value"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
log.Printf("[m365] Email decode error: %v", err)
return false
}
// Build seen set
seenSet := make(map[string]bool)
for _, id := range c.state.LastEmailIDs {
seenSet[id] = true
}
var newEmails []string
var summaries []string
for _, msg := range result.Value {
if !seenSet[msg.ID] {
newEmails = append(newEmails, msg.ID)
from := msg.From.EmailAddress.Name
if from == "" {
from = msg.From.EmailAddress.Address
}
summaries = append(summaries, fmt.Sprintf("%s: %s", from, msg.Subject))
}
}
// Update state with current unread IDs
newIDs := make([]string, 0, len(result.Value))
for _, msg := range result.Value {
newIDs = append(newIDs, msg.ID)
}
c.state.LastEmailIDs = newIDs
c.saveState()
if len(newEmails) > 0 {
summary := fmt.Sprintf("%d new unread email(s): %s", len(newEmails), strings.Join(summaries, "; "))
log.Printf("[m365] %s", summary)
c.sendM365Webhook("email", summary)
return true
}
return false
}
// --- Teams chat poller ---
var htmlTagRe = regexp.MustCompile(`<[^>]*>`)
func stripHTMLTags(s string) string {
return strings.TrimSpace(htmlTagRe.ReplaceAllString(s, ""))
}
func (c *M365Connector) pollTeams() bool {
token, err := c.getTeamsToken()
if err != nil {
log.Printf("[m365] Teams token error: %v", err)
return false
}
// Fetch conversations
req, _ := http.NewRequest("GET", "https://amer.ng.msg.teams.microsoft.com/v1/users/ME/conversations?view=msnp24Equivalent&pageSize=20", nil)
req.Header.Set("Authentication", "skypetoken="+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[m365] Teams conversations error: %v", err)
return false
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
log.Printf("[m365] Teams conversations error %d: %s", resp.StatusCode, string(body))
return false
}
var convResult struct {
Conversations []struct {
ID string `json:"id"`
} `json:"conversations"`
}
if err := json.NewDecoder(resp.Body).Decode(&convResult); err != nil {
log.Printf("[m365] Teams conversations decode error: %v", err)
return false
}
hasNew := false
var summaries []string
for _, conv := range convResult.Conversations {
if strings.HasPrefix(conv.ID, "48:") {
continue
}
msgs := c.fetchTeamsMessages(token, conv.ID)
if msgs == nil {
continue
}
lastSeen := c.state.LastTeamsMsgs[conv.ID]
foundNew := false
for _, msg := range msgs {
if msg.ID == lastSeen {
break
}
// Skip if sender is us
sender := msg.From
if strings.Contains(strings.ToLower(sender), strings.ToLower(c.cfg.UserEmail)) {
continue
}
if strings.EqualFold(sender, "Johan Jongsma") {
continue
}
content := stripHTMLTags(msg.Content)
if content == "" {
continue
}
summaries = append(summaries, fmt.Sprintf("%s: %s", msg.DisplayName, truncateStr(content, 80)))
foundNew = true
}
if len(msgs) > 0 {
c.state.LastTeamsMsgs[conv.ID] = msgs[0].ID
}
if foundNew {
hasNew = true
}
}
c.saveState()
if hasNew {
summary := fmt.Sprintf("%d new Teams message(s): %s", len(summaries), strings.Join(summaries, "; "))
log.Printf("[m365] %s", summary)
c.sendM365Webhook("teams", summary)
return true
}
return false
}
type teamsMsg struct {
ID string
From string
DisplayName string
Content string
}
func (c *M365Connector) fetchTeamsMessages(token, convID string) []teamsMsg {
u := fmt.Sprintf("https://amer.ng.msg.teams.microsoft.com/v1/users/ME/conversations/%s/messages?pageSize=5&view=msnp24Equivalent", url.PathEscape(convID))
req, _ := http.NewRequest("GET", u, nil)
req.Header.Set("Authentication", "skypetoken="+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil
}
var result struct {
Messages []struct {
ID string `json:"id"`
From string `json:"from"`
IMDisplayName string `json:"imdisplayname"`
Content string `json:"content"`
ComposeTime string `json:"composetime"`
MessageType string `json:"messagetype"`
} `json:"messages"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil
}
var msgs []teamsMsg
for _, m := range result.Messages {
if m.MessageType != "RichText/Html" && m.MessageType != "Text" && m.MessageType != "" {
continue
}
msgs = append(msgs, teamsMsg{
ID: m.ID,
From: m.From,
DisplayName: m.IMDisplayName,
Content: m.Content,
})
}
return msgs
}
// --- Calendar poller ---
func (c *M365Connector) pollCalendar() bool {
token, err := c.getGraphToken()
if err != nil {
log.Printf("[m365] Graph token error: %v", err)
return false
}
now := time.Now().UTC()
end := now.Add(48 * time.Hour)
u := fmt.Sprintf("https://graph.microsoft.com/v1.0/me/calendarView?$top=50&$select=subject,start,end,organizer,isCancelled&startDateTime=%s&endDateTime=%s&$orderby=start/dateTime",
url.QueryEscape(now.Format(time.RFC3339)),
url.QueryEscape(end.Format(time.RFC3339)))
req, _ := http.NewRequest("GET", u, nil)
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[m365] Calendar fetch error: %v", err)
return false
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
log.Printf("[m365] Calendar fetch error %d: %s", resp.StatusCode, string(body))
return false
}
hash := fmt.Sprintf("%x", sha256.Sum256(body))
if hash == c.state.LastCalendarHash {
return false
}
c.state.LastCalendarHash = hash
c.saveState()
// Parse for summary
var result struct {
Value []struct {
Subject string `json:"subject"`
IsCancelled bool `json:"isCancelled"`
Start struct {
DateTime string `json:"dateTime"`
} `json:"start"`
} `json:"value"`
}
json.Unmarshal(body, &result)
var subjects []string
for _, evt := range result.Value {
if !evt.IsCancelled {
subjects = append(subjects, evt.Subject)
}
}
summary := fmt.Sprintf("Calendar updated (%d events in next 48h)", len(subjects))
if len(subjects) > 0 && len(subjects) <= 5 {
summary += ": " + strings.Join(subjects, ", ")
}
log.Printf("[m365] %s", summary)
c.sendM365Webhook("calendar", summary)
return true
}