message-center/connector_m365.go

1006 lines
27 KiB
Go

package main
import (
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
)
// M365Config holds M365 connector configuration
type M365Config struct {
Enabled bool `yaml:"enabled"`
PollInterval int `yaml:"poll_interval"`
TenantID string `yaml:"tenant_id"`
ClientID string `yaml:"client_id"`
TokenFile string `yaml:"token_file"`
UserEmail string `yaml:"user_email"`
WebhookURL string `yaml:"webhook_url"`
WebhookToken string `yaml:"webhook_token"`
}
// M365Connector polls M365 email, Teams chat, and calendar
type M365Connector struct {
cfg M365Config
webhookFn func()
stop chan struct{}
mu sync.Mutex
// Token cache
graphToken string
graphExpiry time.Time
skypeToken string
skypeExpiry time.Time
teamsToken string
teamsExpiry time.Time
refreshToken string
// Auth expired state
authExpired bool
tokenFileMtime time.Time
// State
stateFile string
state m365State
// Fully alert tracking: conversationID → []alertID
fullyAlerts map[string][]string
fullyAlertsMu sync.Mutex
}
type m365State struct {
LastEmailIDs []string `json:"last_email_ids"`
LastTeamsMsgs map[string]string `json:"last_teams_messages"`
LastCalendarHash string `json:"last_calendar_hash"`
}
type m365TokenFile struct {
RefreshToken string `json:"refresh_token"`
}
// NewM365Connector creates a new M365 connector
func NewM365Connector(cfg M365Config, webhookFn func()) *M365Connector {
tokenFile := cfg.TokenFile
if strings.HasPrefix(tokenFile, "~/") {
home, _ := os.UserHomeDir()
tokenFile = filepath.Join(home, tokenFile[2:])
}
cfg.TokenFile = tokenFile
home, _ := os.UserHomeDir()
stateFile := filepath.Join(home, ".message-center", "m365-state.json")
c := &M365Connector{
cfg: cfg,
webhookFn: webhookFn,
stop: make(chan struct{}),
stateFile: stateFile,
fullyAlerts: make(map[string][]string),
}
c.loadState()
return c
}
func (c *M365Connector) loadState() {
c.state.LastTeamsMsgs = make(map[string]string)
data, err := os.ReadFile(c.stateFile)
if err != nil {
return
}
json.Unmarshal(data, &c.state)
if c.state.LastTeamsMsgs == nil {
c.state.LastTeamsMsgs = make(map[string]string)
}
}
func (c *M365Connector) saveState() {
data, _ := json.MarshalIndent(c.state, "", " ")
os.WriteFile(c.stateFile, data, 0644)
}
func (c *M365Connector) loadRefreshToken() error {
data, err := os.ReadFile(c.cfg.TokenFile)
if err != nil {
return fmt.Errorf("read token file: %w", err)
}
var tf m365TokenFile
if err := json.Unmarshal(data, &tf); err != nil {
return fmt.Errorf("parse token file: %w", err)
}
if tf.RefreshToken == "" {
return fmt.Errorf("no refresh_token in token file")
}
c.refreshToken = tf.RefreshToken
return nil
}
func (c *M365Connector) saveRefreshToken() {
data, _ := json.Marshal(m365TokenFile{RefreshToken: c.refreshToken})
os.WriteFile(c.cfg.TokenFile, data, 0644)
}
// exchangeToken gets an access token for the given scope
func (c *M365Connector) exchangeToken(scope string) (accessToken string, expiresIn int, newRefresh string, err error) {
if c.refreshToken == "" {
if err := c.loadRefreshToken(); err != nil {
return "", 0, "", err
}
}
tokenURL := fmt.Sprintf("https://login.microsoftonline.com/%s/oauth2/v2.0/token", c.cfg.TenantID)
form := url.Values{
"client_id": {c.cfg.ClientID},
"grant_type": {"refresh_token"},
"refresh_token": {c.refreshToken},
"scope": {scope},
}
resp, err := http.PostForm(tokenURL, form)
if err != nil {
return "", 0, "", fmt.Errorf("token request: %w", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
// Check for expired/invalid refresh token
if resp.StatusCode == 401 || strings.Contains(string(body), "invalid_grant") {
if !c.authExpired {
c.authExpired = true
log.Printf("[m365] Auth expired — refresh token invalid. Waiting for new token file.")
c.sendM365Webhook("auth_expired", "M365 refresh token expired. Re-authentication required.")
// Record current mtime so we can detect external updates
if fi, err := os.Stat(c.cfg.TokenFile); err == nil {
c.tokenFileMtime = fi.ModTime()
}
}
return "", 0, "", fmt.Errorf("auth expired")
}
return "", 0, "", fmt.Errorf("token error %d: %s", resp.StatusCode, string(body))
}
var result struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn int `json:"expires_in"`
}
if err := json.Unmarshal(body, &result); err != nil {
return "", 0, "", fmt.Errorf("parse token response: %w", err)
}
if result.RefreshToken != "" {
c.refreshToken = result.RefreshToken
c.saveRefreshToken()
}
return result.AccessToken, result.ExpiresIn, result.RefreshToken, nil
}
func (c *M365Connector) getGraphToken() (string, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.graphToken != "" && time.Now().Before(c.graphExpiry.Add(-2*time.Minute)) {
return c.graphToken, nil
}
token, expiresIn, _, err := c.exchangeToken("https://graph.microsoft.com/.default offline_access")
if err != nil {
return "", err
}
c.graphToken = token
c.graphExpiry = time.Now().Add(time.Duration(expiresIn) * time.Second)
return token, nil
}
func (c *M365Connector) getTeamsToken() (string, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.teamsToken != "" && time.Now().Before(c.teamsExpiry.Add(-2*time.Minute)) {
return c.teamsToken, nil
}
// First get Skype API token
skypeToken, expiresIn, _, err := c.exchangeToken("https://api.spaces.skype.com/.default offline_access")
if err != nil {
return "", fmt.Errorf("skype token: %w", err)
}
c.skypeToken = skypeToken
c.skypeExpiry = time.Now().Add(time.Duration(expiresIn) * time.Second)
// Exchange for Teams skypetoken via authsvc
req, _ := http.NewRequest("POST", "https://authsvc.teams.microsoft.com/v1.0/authz", bytes.NewReader([]byte("{}")))
req.Header.Set("Authorization", "Bearer "+skypeToken)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", fmt.Errorf("authsvc request: %w", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
return "", fmt.Errorf("authsvc error %d: %s", resp.StatusCode, string(body))
}
var authResp struct {
Tokens struct {
SkypeToken string `json:"skypeToken"`
ExpiresIn int64 `json:"expiresIn"`
} `json:"tokens"`
}
if err := json.Unmarshal(body, &authResp); err != nil {
return "", fmt.Errorf("parse authsvc response: %w", err)
}
c.teamsToken = authResp.Tokens.SkypeToken
if authResp.Tokens.ExpiresIn > 0 {
c.teamsExpiry = time.Now().Add(time.Duration(authResp.Tokens.ExpiresIn) * time.Second)
} else {
c.teamsExpiry = time.Now().Add(50 * time.Minute)
}
return c.teamsToken, nil
}
// Start begins the polling loop
func (c *M365Connector) Start() {
interval := time.Duration(c.cfg.PollInterval) * time.Second
if interval < 30*time.Second {
interval = 60 * time.Second
}
// Record initial token file mtime
if fi, err := os.Stat(c.cfg.TokenFile); err == nil {
c.tokenFileMtime = fi.ModTime()
}
// Log token expiry by doing a test exchange and reading the access token expiry
go func() {
token, expiresIn, _, err := c.exchangeToken("https://graph.microsoft.com/.default offline_access")
if err != nil {
log.Printf("[m365] Initial token exchange failed: %v", err)
return
}
c.mu.Lock()
c.graphToken = token
c.graphExpiry = time.Now().Add(time.Duration(expiresIn) * time.Second)
log.Printf("[m365] Graph access token valid until %s (%s from now)", c.graphExpiry.Format("2006-01-02 15:04:05"), time.Duration(expiresIn)*time.Second)
c.mu.Unlock()
}()
go func() {
// Initial poll after short delay
time.Sleep(5 * time.Second)
c.poll()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-c.stop:
return
case <-ticker.C:
c.poll()
}
}
}()
}
func (c *M365Connector) Stop() {
close(c.stop)
}
func (c *M365Connector) poll() {
// If auth is expired, check if token file was updated externally
if c.authExpired {
fi, err := os.Stat(c.cfg.TokenFile)
if err != nil {
return
}
if !fi.ModTime().After(c.tokenFileMtime) {
return // Token file hasn't changed, skip this cycle
}
// Token file updated — try to recover
log.Printf("[m365] Token file updated, attempting re-auth...")
c.mu.Lock()
c.refreshToken = "" // Force reload from file
c.graphToken = ""
c.skypeToken = ""
c.teamsToken = ""
c.authExpired = false
c.mu.Unlock()
}
fired := false
if c.pollEmail() {
fired = true
}
if c.pollTeams() {
fired = true
}
if c.pollCalendar() {
fired = true
}
if fired && c.webhookFn != nil {
c.webhookFn()
}
}
func (c *M365Connector) sendM365Webhook(typ, summary string) {
c.sendM365WebhookRich(typ, summary, nil)
}
func (c *M365Connector) sendM365WebhookRich(typ, summary string, messages interface{}) {
webhookURL := c.cfg.WebhookURL
if webhookURL == "" {
webhookURL = config.Webhook.URL
}
if webhookURL == "" {
return
}
payload := map[string]interface{}{
"event": "m365",
"type": typ,
"summary": stripEmails(summary),
}
if messages != nil {
// Strip emails from message details before sending to LLM
sanitized, _ := json.Marshal(messages)
var clean interface{}
json.Unmarshal([]byte(stripEmails(string(sanitized))), &clean)
payload["messages"] = clean
}
data, _ := json.Marshal(payload)
req, err := http.NewRequest("POST", webhookURL, bytes.NewReader(data))
if err != nil {
return
}
req.Header.Set("Content-Type", "application/json")
token := c.cfg.WebhookToken
if token == "" {
token = config.Webhook.Token
}
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[m365] Webhook error: %v", err)
return
}
resp.Body.Close()
}
// --- Fully alert management ---
// postFullyAlert sends an alert to the Fully tablet and returns the alert ID
func postFullyAlert(message, priority, group string) string {
payload, _ := json.Marshal(map[string]string{
"message": message,
"priority": priority,
"group": group,
})
resp, err := http.Post("http://localhost:9202/api/alerts", "application/json", bytes.NewReader(payload))
if err != nil {
log.Printf("[m365] Fully alert error: %v", err)
return ""
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
var result struct {
ID string `json:"id"`
}
json.Unmarshal(body, &result)
log.Printf("[m365] Fully alert posted: %s", truncateStr(message, 80))
return result.ID
}
// removeFullyAlert deletes an alert from the Fully tablet
func removeFullyAlert(alertID string) {
req, _ := http.NewRequest("DELETE", "http://localhost:9202/api/alerts/"+alertID, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[m365] Fully alert delete error: %v", err)
return
}
resp.Body.Close()
log.Printf("[m365] Fully alert removed: %s", alertID)
}
// m365Fallback builds a simple formatted message when LLM is unavailable
func m365Fallback(prefix, from, subject, content string) string {
text := subject
if text == "" {
text = truncateStr(content, 100)
}
return fmt.Sprintf("%s %s: %s", prefix, from, text)
}
// summarizeM365 uses K2.5 completions API to generate a 1-sentence summary
func summarizeM365(prefix, from, subject, content string) string {
if !config.Triage.Enabled || config.Triage.Provider.APIKey == "" {
return m365Fallback(prefix, from, subject, content)
}
// Build the input line
var input string
if subject != "" && content != "" {
input = fmt.Sprintf("Email from %s, Subject: %s, Body: %s", from, subject, truncateStr(content, 200))
} else if subject != "" {
input = fmt.Sprintf("Email from %s, Subject: %s", from, subject)
} else if content != "" {
input = fmt.Sprintf("%s says: %s", from, truncateStr(content, 200))
} else {
return m365Fallback(prefix, from, subject, content)
}
// Few-shot completions prompt — K2.5 follows this pattern reliably
prompt := fmt.Sprintf(`Input: Jane says: Can we move the standup to 3pm tomorrow?
Alert: 💬 Jane wants to reschedule standup to 3pm tomorrow
Input: Email from Win Alert, Subject: Vonahi NCA New Business, Body: Congratulations to Vishnu for the win.
Alert: 📧 Win Alert: Vonahi NCA new business closed with Vishnu
Input: %s
Alert: %s`, input, prefix)
reqBody := map[string]interface{}{
"model": config.Triage.Provider.Model,
"prompt": prompt,
"temperature": 0.0,
"max_tokens": 40,
"stop": []string{"\n"},
}
data, _ := json.Marshal(reqBody)
apiURL := strings.TrimRight(config.Triage.Provider.BaseURL, "/") + "/completions"
req, err := http.NewRequest("POST", apiURL, bytes.NewReader(data))
if err != nil {
return m365Fallback(prefix, from, subject, content)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+config.Triage.Provider.APIKey)
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
log.Printf("[m365] Summary LLM error: %v", err)
return m365Fallback(prefix, from, subject, content)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
log.Printf("[m365] Summary LLM error %d: %s", resp.StatusCode, truncateStr(string(body), 100))
return m365Fallback(prefix, from, subject, content)
}
var apiResp struct {
Choices []struct {
Text string `json:"text"`
} `json:"choices"`
}
json.Unmarshal(body, &apiResp)
if len(apiResp.Choices) > 0 {
summary := strings.TrimSpace(apiResp.Choices[0].Text)
if summary != "" && !strings.HasPrefix(summary, "The user") {
return prefix + " " + summary
}
}
return m365Fallback(prefix, from, subject, content)
}
// trackFullyAlert records an alert ID for a conversation
func (c *M365Connector) trackFullyAlert(convID, alertID string) {
if alertID == "" {
return
}
c.fullyAlertsMu.Lock()
defer c.fullyAlertsMu.Unlock()
c.fullyAlerts[convID] = append(c.fullyAlerts[convID], alertID)
}
// clearFullyAlerts removes all Fully alerts for a conversation (Johan replied)
func (c *M365Connector) clearFullyAlerts(convID string) {
c.fullyAlertsMu.Lock()
alerts := c.fullyAlerts[convID]
delete(c.fullyAlerts, convID)
c.fullyAlertsMu.Unlock()
for _, id := range alerts {
removeFullyAlert(id)
}
if len(alerts) > 0 {
log.Printf("[m365] Cleared %d Fully alert(s) for conversation (Johan replied)", len(alerts))
}
}
// --- Email poller ---
func (c *M365Connector) pollEmail() bool {
token, err := c.getGraphToken()
if err != nil {
log.Printf("[m365] Graph token error: %v", err)
return false
}
u := "https://graph.microsoft.com/v1.0/me/messages?$top=20&$filter=isRead+eq+false&$select=subject,from,receivedDateTime,isRead,conversationId,bodyPreview"
req, _ := http.NewRequest("GET", u, nil)
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[m365] Email fetch error: %v", err)
return false
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
log.Printf("[m365] Email fetch error %d: %s", resp.StatusCode, string(body))
return false
}
var result struct {
Value []struct {
ID string `json:"id"`
Subject string `json:"subject"`
BodyPreview string `json:"bodyPreview"`
ReceivedDateTime string `json:"receivedDateTime"`
From struct {
EmailAddress struct {
Name string `json:"name"`
Address string `json:"address"`
} `json:"emailAddress"`
} `json:"from"`
} `json:"value"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
log.Printf("[m365] Email decode error: %v", err)
return false
}
// Build seen set
seenSet := make(map[string]bool)
for _, id := range c.state.LastEmailIDs {
seenSet[id] = true
}
type emailDetail struct {
From string `json:"from"`
Subject string `json:"subject"`
BodyPreview string `json:"bodyPreview"`
}
var newEmails []string
var summaries []string
var details []emailDetail
for _, msg := range result.Value {
if !seenSet[msg.ID] {
newEmails = append(newEmails, msg.ID)
from := msg.From.EmailAddress.Name
if from == "" {
from = msg.From.EmailAddress.Address
}
summaries = append(summaries, fmt.Sprintf("%s: %s", from, msg.Subject))
details = append(details, emailDetail{
From: from,
Subject: msg.Subject,
BodyPreview: truncateStr(msg.BodyPreview, 200),
})
}
}
// Update state with current unread IDs
newIDs := make([]string, 0, len(result.Value))
for _, msg := range result.Value {
newIDs = append(newIDs, msg.ID)
}
c.state.LastEmailIDs = newIDs
c.saveState()
if len(newEmails) > 0 {
summary := fmt.Sprintf("%d new unread email(s): %s", len(newEmails), strings.Join(summaries, "; "))
log.Printf("[m365] %s", summary)
c.sendM365WebhookRich("email", summary, details)
// Post AI-summarized alerts to Fully per email (grouped by sender)
for _, d := range details {
msg := summarizeM365("📧", d.From, d.Subject, d.BodyPreview)
postFullyAlert(msg, "info", "email:"+d.From)
}
return true
}
return false
}
// --- Teams chat poller ---
var htmlTagRe = regexp.MustCompile(`<[^>]*>`)
var emailRe = regexp.MustCompile(`[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}`)
func stripEmails(s string) string {
return emailRe.ReplaceAllString(s, "[email]")
}
func stripHTMLTags(s string) string {
s = htmlTagRe.ReplaceAllString(s, "")
s = strings.ReplaceAll(s, "&nbsp;", " ")
s = strings.ReplaceAll(s, "&amp;", "&")
s = strings.ReplaceAll(s, "&lt;", "<")
s = strings.ReplaceAll(s, "&gt;", ">")
s = strings.ReplaceAll(s, "&quot;", "\"")
s = strings.ReplaceAll(s, "&#39;", "'")
return strings.TrimSpace(s)
}
func (c *M365Connector) pollTeams() bool {
token, err := c.getTeamsToken()
if err != nil {
log.Printf("[m365] Teams token error: %v", err)
return false
}
// Fetch conversations
req, _ := http.NewRequest("GET", "https://amer.ng.msg.teams.microsoft.com/v1/users/ME/conversations?view=msnp24Equivalent&pageSize=20", nil)
req.Header.Set("Authentication", "skypetoken="+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[m365] Teams conversations error: %v", err)
return false
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
log.Printf("[m365] Teams conversations error %d: %s", resp.StatusCode, string(body))
return false
}
var convResult struct {
Conversations []struct {
ID string `json:"id"`
} `json:"conversations"`
}
if err := json.NewDecoder(resp.Body).Decode(&convResult); err != nil {
log.Printf("[m365] Teams conversations decode error: %v", err)
return false
}
// Whitelisted Teams channels for Fully alerts (channel conv ID → display name)
// DMs (1:1 chats ending in @unq.gbl.spaces) are always included
teamsChannelWhitelist := map[string]string{
// Product and Engineering Org
"19:o6FHAkOQFnCYaM9A9tvLZk-3d6PMahG9JDW_Vh54x8g1@thread.tacv2": "PE - All",
"19:a6cb2387d54a498f93699e07f07540a3@thread.tacv2": "PE - Leadership",
"19:b5937a229fe746eb8c287ae0e6cae49d@thread.tacv2": "Pratik Directs",
"19:e0d5a7a1a7174b47aa9f781a24bb2ba9@thread.tacv2": "PE - People Managers",
"19:Ysc-g52Y9D1v9MGC6AtBXC9FeOy3eWJcPzze17qcDBA1@thread.tacv2": "Cyber Resilience ENG",
// Kaseya Leadership Announcements
"19:V7R25uWAm-CHiqqvc1NBP36hfpa4NzwgNeSPNbhCDck1@thread.tacv2": "Leadership Announcements",
}
type teamsDetail struct {
From string `json:"from"`
Content string `json:"content"`
ConvID string `json:"-"`
ChannelName string `json:"-"`
}
hasNew := false
var summaries []string
var details []teamsDetail
for _, conv := range convResult.Conversations {
if strings.HasPrefix(conv.ID, "48:") {
continue
}
// Check whitelist: allow DMs + group chats + whitelisted channels
// DMs: @unq.gbl.spaces or @unq
// Group chats: @thread.v2 or @thread.skype
// Channels (need whitelist): @thread.tacv2
isDM := strings.Contains(conv.ID, "@unq.gbl.spaces") || strings.Contains(conv.ID, "@unq")
isGroupChat := strings.HasSuffix(conv.ID, "@thread.v2") || strings.HasSuffix(conv.ID, "@thread.skype")
channelName, isWhitelisted := teamsChannelWhitelist[conv.ID]
if !isDM && !isGroupChat && !isWhitelisted {
// Still track last seen to avoid replaying when whitelisted later
msgs := c.fetchTeamsMessages(token, conv.ID)
if msgs != nil && len(msgs) > 0 {
c.state.LastTeamsMsgs[conv.ID] = msgs[0].ID
}
continue
}
msgs := c.fetchTeamsMessages(token, conv.ID)
if msgs == nil {
continue
}
lastSeen := c.state.LastTeamsMsgs[conv.ID]
johanReplied := false
var convDetails []teamsDetail
for _, msg := range msgs {
if msg.ID == lastSeen {
break
}
isJohan := strings.Contains(strings.ToLower(msg.From), strings.ToLower(c.cfg.UserEmail)) ||
strings.EqualFold(msg.DisplayName, "Johan Jongsma")
if isJohan {
johanReplied = true
continue
}
// Skip messages older than 24h — prevents old channel messages from surfacing
if msg.ComposeTime != "" {
if t, err := time.Parse(time.RFC3339Nano, msg.ComposeTime); err == nil {
if time.Since(t) > 24*time.Hour {
log.Printf("[m365] Skipping old Teams message from %s (age: %s)", msg.DisplayName, time.Since(t).Round(time.Minute))
continue
}
}
}
content := stripHTMLTags(msg.Content)
if content == "" {
continue
}
convDetails = append(convDetails, teamsDetail{
From: msg.DisplayName,
Content: truncateStr(content, 300),
ConvID: conv.ID,
ChannelName: channelName,
})
}
if len(msgs) > 0 {
c.state.LastTeamsMsgs[conv.ID] = msgs[0].ID
}
// If Johan replied in this conversation, clear existing Fully alerts
if johanReplied {
c.clearFullyAlerts(conv.ID)
// Don't alert for messages in a conversation Johan already addressed
continue
}
if len(convDetails) > 0 {
for _, d := range convDetails {
summaries = append(summaries, fmt.Sprintf("%s: %s", d.From, truncateStr(d.Content, 80)))
details = append(details, d)
}
hasNew = true
}
}
c.saveState()
if hasNew {
summary := fmt.Sprintf("%d new Teams message(s): %s", len(summaries), strings.Join(summaries, "; "))
log.Printf("[m365] %s", summary)
c.sendM365WebhookRich("teams", summary, details)
// Post AI-summarized alerts to Fully per message
for _, d := range details {
msg := summarizeM365("💬", d.From, "", d.Content)
if d.ChannelName != "" {
msg = "[" + d.ChannelName + "] " + msg
}
alertID := postFullyAlert(msg, "info", d.ConvID)
c.trackFullyAlert(d.ConvID, alertID)
}
return true
}
return false
}
type teamsMsg struct {
ID string
From string
DisplayName string
Content string
ComposeTime string
}
func (c *M365Connector) fetchTeamsMessages(token, convID string) []teamsMsg {
u := fmt.Sprintf("https://amer.ng.msg.teams.microsoft.com/v1/users/ME/conversations/%s/messages?pageSize=5&view=msnp24Equivalent", url.PathEscape(convID))
req, _ := http.NewRequest("GET", u, nil)
req.Header.Set("Authentication", "skypetoken="+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil
}
var result struct {
Messages []struct {
ID string `json:"id"`
From string `json:"from"`
IMDisplayName string `json:"imdisplayname"`
Content string `json:"content"`
ComposeTime string `json:"composetime"`
MessageType string `json:"messagetype"`
} `json:"messages"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil
}
var msgs []teamsMsg
for _, m := range result.Messages {
if m.MessageType != "RichText/Html" && m.MessageType != "Text" && m.MessageType != "" {
continue
}
msgs = append(msgs, teamsMsg{
ID: m.ID,
From: m.From,
DisplayName: m.IMDisplayName,
Content: m.Content,
ComposeTime: m.ComposeTime,
})
}
return msgs
}
// --- Calendar poller ---
func (c *M365Connector) pollCalendar() bool {
token, err := c.getGraphToken()
if err != nil {
log.Printf("[m365] Graph token error: %v", err)
return false
}
now := time.Now().UTC()
end := now.Add(48 * time.Hour)
u := fmt.Sprintf("https://graph.microsoft.com/v1.0/me/calendarView?$top=50&$select=subject,start,end,organizer,isCancelled&startDateTime=%s&endDateTime=%s&$orderby=start/dateTime",
url.QueryEscape(now.Format(time.RFC3339)),
url.QueryEscape(end.Format(time.RFC3339)))
req, _ := http.NewRequest("GET", u, nil)
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[m365] Calendar fetch error: %v", err)
return false
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
log.Printf("[m365] Calendar fetch error %d: %s", resp.StatusCode, string(body))
return false
}
hash := fmt.Sprintf("%x", sha256.Sum256(body))
if hash == c.state.LastCalendarHash {
return false
}
c.state.LastCalendarHash = hash
c.saveState()
// Parse for summary
var result struct {
Value []struct {
Subject string `json:"subject"`
IsCancelled bool `json:"isCancelled"`
Start struct {
DateTime string `json:"dateTime"`
} `json:"start"`
} `json:"value"`
}
json.Unmarshal(body, &result)
var subjects []string
for _, evt := range result.Value {
if !evt.IsCancelled {
subjects = append(subjects, evt.Subject)
}
}
summary := fmt.Sprintf("Calendar updated (%d events in next 48h)", len(subjects))
if len(subjects) > 0 && len(subjects) <= 5 {
summary += ": " + strings.Join(subjects, ", ")
}
log.Printf("[m365] %s", summary)
// Push next upcoming meeting to Fully dashboard
c.pushNextMeeting(result.Value)
// Send calendar events in webhook for rich processing
type calDetail struct {
Subject string `json:"subject"`
Start string `json:"start"`
}
var calDetails []calDetail
for _, evt := range result.Value {
if !evt.IsCancelled {
calDetails = append(calDetails, calDetail{
Subject: stripEmails(evt.Subject),
Start: evt.Start.DateTime,
})
}
}
c.sendM365WebhookRich("calendar", summary, calDetails)
return true
}
func (c *M365Connector) pushNextMeeting(events []struct {
Subject string `json:"subject"`
IsCancelled bool `json:"isCancelled"`
Start struct {
DateTime string `json:"dateTime"`
} `json:"start"`
}) {
now := time.Now()
for _, evt := range events {
if evt.IsCancelled {
continue
}
// Parse start time (Graph returns UTC without Z suffix)
startStr := evt.Start.DateTime
if !strings.HasSuffix(startStr, "Z") {
startStr += "Z"
}
start, err := time.Parse(time.RFC3339, startStr)
if err != nil {
start, err = time.Parse("2006-01-02T15:04:05Z", startStr)
if err != nil {
continue
}
}
if start.After(now) {
// Push to Fully dashboard
payload := map[string]string{
"title": evt.Subject,
"time": start.Format(time.RFC3339),
"id": fmt.Sprintf("m365-%d", start.Unix()),
}
data, _ := json.Marshal(payload)
resp, err := http.Post("http://localhost:9202/api/meeting", "application/json", bytes.NewReader(data))
if err != nil {
log.Printf("[m365] Failed to push meeting to Fully: %v", err)
} else {
resp.Body.Close()
log.Printf("[m365] Pushed next meeting to Fully: %s at %s", evt.Subject, start.Local().Format("3:04 PM"))
}
return
}
}
}